Source code for gatenet.diagnostics.ping

import platform
import subprocess
import asyncio
import re
import time
from typing import Dict, Union
import ipaddress
import re
import statistics
GENERIC_ERROR_MESSAGE = "An internal error occurred."

def _is_valid_host(host: str) -> bool:
    """Validate that host is a valid IPv4/IPv6 address or DNS hostname, and does not contain shell-special characters."""
    import socket
    if not host:
        return False
    # Disallow hosts that start with a dash (could be interpreted as an option)
    if host.startswith('-'):
        return False
    allowed = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-.")
    if any(c not in allowed for c in host):
        return False
    try:
        ipaddress.ip_address(host)
        return True
    except ValueError:
        pass
    if len(host) > 253:
        return False
    hostname_regex = re.compile(
        r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*\.?$"
    )
    if not hostname_regex.match(host):
        return False
    try:
        socket.gethostbyname(host)
        return True
    except Exception:
        return False

[docs] def ping_with_rf(host: str, radio=None, count: int = 4, timeout: int = 2, method: str = "icmp") -> Dict[str, Union[str, float, int, bool, list]]: """Ping a host with optional RF signal logging (stub implementation). This is a compatibility function that performs a regular ping and adds an empty RF field for future radio functionality integration. Args: host (str): Host to ping radio: Radio instance (currently unused - for future implementation) count (int): Number of pings timeout (int): Timeout per ping method (str): Ping method Returns: dict: Ping results with an empty 'rf' field for compatibility Example: >>> from gatenet.diagnostics.ping import ping_with_rf >>> result = ping_with_rf("8.8.8.8", count=2) >>> assert "rf" in result """ result = ping(host, count=count, timeout=timeout, method=method) # Add empty RF field for compatibility with radio integration tests result["rf"] = [] return result
def _parse_ping_output(output: str) -> Dict[str, Union[bool, int, float, str, list]]: if "unreachable" in output.lower() or "could not find host" in output.lower(): return { "success": False, "error": "Host unreachable or not found" } stats: Dict[str, Union[bool, int, float, str, list]] = {"success": True} # Linux/macOS format rtt_match = re.search(r"rtt min/avg/max/mdev = ([\d.]+)/([\d.]+)/([\d.]+)/([\d.]+)", output) loss_match = re.search(r"(\d+)% packet loss", output) rtt_list = re.findall(r'time=([\d.]+) ms', output) rtts = [float(rtt) for rtt in rtt_list] if rtts: stats["rtts"] = rtts stats["jitter"] = statistics.stdev(rtts) if len(rtts) > 1 else 0.0 # Windows format if not rtt_match: rtt_match = re.search(r"Minimum = ([\d.]+)ms, Maximum = ([\d.]+)ms, Average = ([\d.]+)ms", output) if rtt_match: stats["rtt_min"] = float(rtt_match.group(1)) stats["rtt_max"] = float(rtt_match.group(2)) stats["rtt_avg"] = float(rtt_match.group(3)) else: stats["rtt_min"] = float(rtt_match.group(1)) stats["rtt_avg"] = float(rtt_match.group(2)) stats["rtt_max"] = float(rtt_match.group(3)) stats["jitter"] = float(rtt_match.group(4)) if loss_match: stats["packet_loss"] = int(loss_match.group(1)) return stats PING_INVALID_HOST_ERROR = "Invalid host format" def _tcp_ping_sync(host: str, count: int, timeout: int) -> Dict[str, Union[str, float, int, bool, list]]: import socket if not _is_valid_host(host): return { "host": host, "success": False, "error": PING_INVALID_HOST_ERROR, "raw_output": "" } rtts = [] port = 80 for _ in range(count): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) start = time.time() s.connect((host, port)) rtt = (time.time() - start) * 1000 rtts.append(rtt) s.close() except Exception: rtts.append(None) valid_rtts = [r for r in rtts if r is not None] packet_loss = int(100 * (1 - len(valid_rtts) / count)) result = { "host": host, "success": bool(valid_rtts), "rtts": valid_rtts, "packet_loss": packet_loss, "raw_output": "", } if valid_rtts: result["rtt_min"] = min(valid_rtts) result["rtt_max"] = max(valid_rtts) result["rtt_avg"] = sum(valid_rtts) / len(valid_rtts) result["jitter"] = statistics.stdev(valid_rtts) if len(valid_rtts) > 1 else 0.0 else: result["error"] = "All TCP pings failed" return result def _icmp_ping_sync(host: str, count: int, timeout: int, system: str) -> Dict[str, Union[str, float, int, bool, list]]: # Use a hardcoded allowlist for the ping command and never pass unchecked user input if not _is_valid_host(host): return { "host": host, "success": False, "error": PING_INVALID_HOST_ERROR, "raw_output": "" } # Only allow the system ping command and validated arguments # Only allow a validated host (IPv4, IPv6, DNS) and never pass unchecked user input to subprocess # This is safe: host is strictly validated, and only appended as a positional argument PING_COMMANDS = { "Windows": ["ping"], "Linux": ["ping"], "Darwin": ["ping"] } cmd = PING_COMMANDS.get(system, ["ping"]).copy() if system == "Windows": cmd += ["-n", str(count), "-w", str(timeout * 1000)] else: cmd += ["-c", str(count), "-W", str(timeout)] # Host is only appended after passing allowlist validation cmd.append(host) try: result = subprocess.run(cmd, capture_output=True, text=True, check=False, shell=False) stats = _parse_ping_output(result.stdout) stats.update({ "host": host, "raw_output": result.stdout.strip(), }) return stats except Exception as e: return { "host": host, "success": False, "error": GENERIC_ERROR_MESSAGE, "raw_output": "" } from gatenet.core import hooks, events
[docs] def ping(host: str, count: int = 4, timeout: int = 2, method: str = "icmp") -> Dict[str, Union[str, float, int, bool, list]]: """ Ping a host and return detailed latency statistics, including jitter and all RTTs. Example: >>> from gatenet.diagnostics.ping import ping >>> result = ping("google.com", count=5, method="icmp") >>> print(result["rtt_avg"]) """ system = platform.system() try: hooks.emit(events.PING_BEFORE, host=host, count=count) except Exception: pass if method == "icmp": result = _icmp_ping_sync(host, count, timeout, system) elif method == "tcp": result = _tcp_ping_sync(host, count, timeout) else: result = { "host": host, "success": False, "error": f"Unknown method: {method}", "raw_output": "" } try: hooks.emit(events.PING_AFTER, host=host, result=result) except Exception: pass return result
async def _tcp_ping_async(host: str, count: int) -> Dict[str, Union[str, float, int, bool, list]]: """ Asynchronously perform TCP ping to a host using a timeout context manager. Parameters ---------- host : str The hostname or IP address to ping. count : int Number of echo requests to send. Returns ------- dict Dictionary with keys: success, rtt_min, rtt_avg, rtt_max, jitter, rtts (list), packet_loss, error, host, raw_output. """ import socket import functools rtts = [] port = 80 loop = asyncio.get_event_loop() for _ in range(count): try: async with asyncio.timeout(2): # Default timeout of 2 seconds per ping s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) start = time.time() await loop.run_in_executor(None, functools.partial(s.connect, (host, port))) rtt = (time.time() - start) * 1000 rtts.append(rtt) s.close() except Exception: rtts.append(None) valid_rtts = [r for r in rtts if r is not None] packet_loss = int(100 * (1 - len(valid_rtts) / count)) result = { "host": host, "success": bool(valid_rtts), "rtts": valid_rtts, "packet_loss": packet_loss, "raw_output": "", } if valid_rtts: result["rtt_min"] = min(valid_rtts) result["rtt_max"] = max(valid_rtts) result["rtt_avg"] = sum(valid_rtts) / len(valid_rtts) result["jitter"] = statistics.stdev(valid_rtts) if len(valid_rtts) > 1 else 0.0 else: result["error"] = "All TCP pings failed" return result async def _icmp_ping_async(host: str, count: int, system: str) -> Dict[str, Union[str, float, int, bool, list]]: if system == "Windows": cmd = ["ping", "-n", str(count), host] else: cmd = ["ping", "-c", str(count), host] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _stderr = await process.communicate() output = stdout.decode() stats = _parse_ping_output(output) stats.update({ "host": host, "raw_output": output.strip(), }) return stats except Exception as e: return { "host": host, "success": False, "error": str(e), "raw_output": "" }
[docs] async def async_ping( host: str, count: int = 4, method: str = "icmp" ) -> Dict[str, Union[str, float, int, bool, list]]: """ Asynchronously ping a host and return detailed latency statistics, including jitter and all RTTs. Example: >>> from gatenet.diagnostics.ping import async_ping >>> import asyncio >>> result = asyncio.run(async_ping("google.com", count=5, method="icmp")) >>> print(result["rtt_avg"]) """ system = platform.system() try: hooks.emit(events.PING_BEFORE, host=host, count=count) except Exception: pass try: async with asyncio.timeout(10): if method == "icmp": result = await _icmp_ping_async(host, count, system) elif method == "tcp": result = await _tcp_ping_async(host, count) else: result = { "host": host, "success": False, "error": f"Unknown method: {method}", "raw_output": "" } try: hooks.emit(events.PING_AFTER, host=host, result=result) except Exception: pass return result except asyncio.TimeoutError: result = { "host": host, "success": False, "error": "Ping operation timed out", "raw_output": "" } try: hooks.emit(events.PING_AFTER, host=host, result=result) except Exception: pass return result