Source code for gatenet.discovery.bluetooth

from typing import Dict, List
import asyncio
from bleak import BleakScanner


[docs] def discover_bluetooth_devices(timeout: float = 8.0) -> List[Dict[str, str]]: """ Scan for nearby Bluetooth devices. Parameters ---------- timeout : float, optional Time to scan in seconds (default is 8.0). Returns ------- List[Dict[str, str]] List of discovered device dictionaries. Raises ------ Exception If an error occurs during Bluetooth discovery (caught and printed, returns empty list). Example ------- >>> from gatenet.discovery.bluetooth import discover_bluetooth_devices >>> devices = discover_bluetooth_devices(timeout=4.0) >>> for device in devices: ... print(device) {'address': '00:1A:7D:DA:71:13', 'name': 'MyBluetoothDevice'} """ import logging try: # Pass timeout to the async function via a closure or global if needed return asyncio.run(_async_discover_bluetooth_devices()) except Exception as e: logging.error(f"Error during Bluetooth discovery: {e}") return [{"error": str(e)}]
async def _async_discover_bluetooth_devices() -> List[Dict[str, str]]: """ Asynchronously scan for nearby Bluetooth devices using a timeout context manager. Returns ------- List[Dict[str, str]] List of discovered device dictionaries. Raises ------ Exception If an error occurs during async Bluetooth scan (caught and printed, returns empty list). """ devices = [] timeout = 8.0 # Default timeout, can be adjusted by caller if needed try: discovered_devices = await asyncio.wait_for( BleakScanner.discover(return_adv=True), timeout=timeout ) for device_with_adv in discovered_devices.values(): device = device_with_adv[0] advertisement_data = device_with_adv[1] device_info = { "address": device.address, "name": device.name or "Unknown Device", "rssi": str(advertisement_data.rssi) if advertisement_data.rssi else "N/A" } # Add service UUIDs if available if advertisement_data.service_uuids: device_info["services"] = ", ".join(str(uuid) for uuid in advertisement_data.service_uuids) # Add manufacturer data if available if advertisement_data.manufacturer_data: manufacturer_info = [] for manufacturer_id, data in advertisement_data.manufacturer_data.items(): manufacturer_info.append(f"{manufacturer_id}: {data.hex()}") device_info["manufacturer_data"] = ", ".join(manufacturer_info) devices.append(device_info) except Exception as e: import logging logging.error(f"Error during async Bluetooth scan: {e}") return [{"error": str(e)}] return devices
[docs] async def async_discover_bluetooth_devices() -> List[Dict[str, str]]: """ Asynchronously scan for nearby Bluetooth devices. :return: List of discovered device dictionaries. """ return await _async_discover_bluetooth_devices()