Source code for gatenet.mesh.radio

import time
from typing import List, Dict

[docs] class MeshRadio:
[docs] def on_signal(self, handler): """ Register a signal handler (no-op for MeshRadio, for diagnostics integration compatibility). """ # MeshRadio does not emit RF events, so this is a no-op pass
[docs] def sync_logs(self, file_path: str = "mesh_radio_logs.json") -> bool: """ Sync mesh radio logs (packets, topology, GPS, RF, Wi-Fi) to a file (base node or Mini PC). Args: file_path (str): Path to save logs (default: mesh_radio_logs.json) Returns: bool: True if successful, False otherwise Example: >>> radio = MeshRadio() >>> radio.sync_logs() >>> radio.sync_logs("/mnt/base_node/mesh_radio_logs.json") """ import json try: logs = { "packets": self.packets, "topology": self.topology, "gps_location": self.gps_location, "rf_signal": self.rf_signal, "wifi_networks": self.wifi_networks } with open(file_path, "w") as f: json.dump(logs, f, indent=2) return True except Exception as e: print(f"Error syncing logs: {e}") return False
""" MeshRadio base class for mesh packet parsing, encrypted messaging, and topology mapping. Example: >>> from gatenet.mesh.radio import MeshRadio >>> radio = MeshRadio() >>> radio.log_gps(37.7749, -122.4194) >>> radio.log_rf_signal(-55.0) >>> radio.scan_wifi(mock=True) >>> radio.send_message("Hello", "node2") >>> packets = radio.receive_packets() >>> topology = radio.get_topology() """ def __init__(self): self.topology = { "nodes": [], "links": [], "locations": [], "signals": [], "wifi": [] } self.packets = [] self.gps_location = None self.rf_signal = None self.wifi_networks = []
[docs] def scan_wifi(self, interface: str = "wlan0", mock: bool = False): """ Scan for Wi-Fi SSIDs/devices and correlate with mesh activity. On Raspberry Pi, uses 'iwlist' for real scan. If mock=True, returns sample data. Example: >>> radio = MeshRadio() >>> radio.scan_wifi(mock=True) """ if mock: networks = [ {"ssid": "TestNet", "mac": "AA:BB:CC:DD:EE:FF", "signal": -40}, {"ssid": "HomeWiFi", "mac": "11:22:33:44:55:66", "signal": -60} ] else: import subprocess, re try: output = subprocess.check_output(["sudo", "iwlist", interface, "scan"], text=True) ssid_re = re.compile(r'ESSID:"([^"]+)"') mac_re = re.compile(r'Address: ([0-9A-Fa-f:]+)') signal_re = re.compile(r'Signal level=(-?\d+)') networks = [] for cell in output.split("Cell ")[1:]: ssid = ssid_re.search(cell) mac = mac_re.search(cell) signal = signal_re.search(cell) networks.append({ "ssid": ssid.group(1) if ssid else None, "mac": mac.group(1) if mac else None, "signal": int(signal.group(1)) if signal else None }) except Exception as e: networks = [{"ssid": None, "mac": None, "signal": None, "error": str(e)}] self.wifi_networks = networks self.topology["wifi"] = networks return networks
[docs] def log_gps(self, lat: float, lon: float): """ Log GPS location for the mesh node. Example: >>> radio = MeshRadio() >>> radio.log_gps(37.7749, -122.4194) """ self.gps_location = (lat, lon) self.topology["locations"].append({"node": "self", "lat": lat, "lon": lon})
[docs] def log_rf_signal(self, signal_info): """ Log RF signal info (strength, freq, type, etc) and propagate as mesh event. Args: signal_info (float or dict): RF info (can be just strength or full dict) Example: >>> radio = MeshRadio() >>> radio.log_rf_signal({"freq": 868000000, "power": 24, "type": "lora"}) """ self.rf_signal = signal_info self.topology["signals"].append({"node": "self", "signal": signal_info}) handler = getattr(self, "on_rf_event", None) if callable(handler): handler(signal_info)
[docs] def send_message(self, msg: str, dest: str, rf_signal=None) -> bool: """ Send encrypted message to mesh node, optionally attaching RF signal info. Args: msg (str): Message to send dest (str): Destination node rf_signal (optional): RF info from radio module Example: >>> radio.send_message("alert", "node2", rf_signal={"freq": 868000000}) """ packet = { "src": "self", "dest": dest, "msg": self._encrypt(msg), "timestamp": time.time(), "gps": self.gps_location, "rf_signal": rf_signal if rf_signal is not None else self.rf_signal } self.packets.append(packet) self._update_topology(dest) return True
[docs] def receive_packets(self) -> List[Dict]: """ Receive and decrypt all packets sent by this node. Example: >>> radio = MeshRadio() >>> packets = radio.receive_packets() """ return [self._decrypt_packet(pkt) for pkt in self.packets]
[docs] def get_topology(self) -> Dict: """ Get the current mesh topology (nodes, links, locations, signals, wifi). Example: >>> radio = MeshRadio() >>> topology = radio.get_topology() """ return self.topology
def _encrypt(self, msg: str) -> str: return f"enc({msg})" def _decrypt_packet(self, pkt: Dict) -> Dict: pkt = pkt.copy() pkt["msg"] = pkt["msg"].replace("enc(", "").replace(")", "") return pkt def _update_topology(self, dest: str): if dest not in self.topology["nodes"]: self.topology["nodes"].append(dest) self.topology["links"].append({"src": "self", "dest": dest})