Source code for gatenet.discovery.upnp
import socket
import time
from typing import List, Dict
SSDP_MULTICAST_ADDRESS = "239.255.255.250"
SSDP_PORT = 1900
SSDP_MX = 2
SSDP_ST = "ssdp:all"
[docs]
def discover_upnp_devices(timeout: float = 3.0) -> List[Dict[str, str]]:
"""
Example
-------
>>> from gatenet.discovery.upnp import discover_upnp_devices
>>> devices = discover_upnp_devices(timeout=2.0)
>>> for device in devices:
... print(device)
{'LOCATION': 'http://192.168.1.1:1900/desc.xml', 'SERVER': 'Linux/3.14.0 UPnP/1.0 ...', ...}
Discover UPnP devices using SSDP.
Parameters
----------
timeout : float, optional
Time in seconds to wait for responses (default is 3.0).
Returns
-------
List[Dict[str, str]]
List of discovered device info dictionaries.
"""
message = "\r\n".join([
'M-SEARCH * HTTP/1.1',
f'HOST: {SSDP_MULTICAST_ADDRESS}:{SSDP_PORT}',
'MAN: "ssdp:discover"',
f'MX: {SSDP_MX}',
f'ST: {SSDP_ST}',
'', ''
])
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(timeout)
sock.sendto(message.encode(), (SSDP_MULTICAST_ADDRESS, SSDP_PORT))
devices = []
start = time.time()
import logging
while True:
try:
if time.time() - start > timeout:
break
data, _ = sock.recvfrom(1024)
response = data.decode(errors="ignore")
devices.append(_parse_ssdp_response(response))
except socket.timeout:
break
except Exception as e:
logging.error(f"Error during SSDP/UPnP discovery: {e}")
devices.append({"error": str(e)})
sock.close()
return devices
def _parse_ssdp_response(response: str) -> Dict[str, str]:
"""
Example
-------
>>> from gatenet.discovery.upnp import _parse_ssdp_response
>>> resp = 'LOCATION: http://192.168.1.1:1900/desc.xml\r\nSERVER: Linux/3.14.0 UPnP/1.0 ...\r\n\r\n'
>>> _parse_ssdp_response(resp)
{'LOCATION': 'http://192.168.1.1:1900/desc.xml', 'SERVER': 'Linux/3.14.0 UPnP/1.0 ...'}
Parse an SSDP response into a dictionary of headers.
Parameters
----------
response : str
The SSDP response string.
Returns
-------
Dict[str, str]
Dictionary of SSDP response headers.
"""
headers = {}
lines = response.split("\r\n")
for line in lines:
if ":" in line:
key, value = line.split(":", 1)
headers[key.strip().upper()] = value.strip()
return headers