""" A quick script to test the async Bleak-backed adapter helper. """ import asyncio import os from nxbt import ( AsyncBleakAdapter, async_find_objects, find_objects, SERVICE_NAME, ADAPTER_INTERFACE, ) TARGET_ADDRESS = os.environ.get("NXBT_TEST_DEVICE") async def main(): # Prefer the async helper when running inside an event loop. try: adapters = await async_find_objects(None, SERVICE_NAME, ADAPTER_INTERFACE) except RuntimeError: # Fallback to the legacy synchronous variant if no loop is running. adapters = find_objects(None, SERVICE_NAME, ADAPTER_INTERFACE) print(adapters) adapter = AsyncBleakAdapter(adapter_path=adapters[0]) address = await adapter.get_address() print("Address", address) print("Name", adapter.name) print("Alias", adapter.alias) print("Pairable", adapter.pairable) print("") print("Pairable Timeout", adapter.pairable_timeout) adapter.set_pairable_timeout(10) print("Pairable Timeout", adapter.pairable_timeout) adapter.set_pairable_timeout(0) print("Pairable Timeout", adapter.pairable_timeout) print("") print("Discoverable", adapter.discoverable) adapter.set_discoverable(True) print("Discoverable", adapter.discoverable) adapter.set_discoverable(False) print("Discoverable", adapter.discoverable) print("") print("Discoverable Timeout", adapter.discoverable_timeout) adapter.set_discoverable_timeout(0) print("Discoverable Timeout", adapter.discoverable_timeout) adapter.set_discoverable_timeout(180) print("Discoverable Timeout", adapter.discoverable_timeout) print("\nScanning for nearby devices...") try: devices = await adapter.discover_devices(timeout=5) except Exception as exc: print(f"Discovery failed: {exc}") else: for path, props in devices.items(): print(f"{props['Alias'] or 'UNKNOWN'} -> {props['Address']} ({path})") if TARGET_ADDRESS: print(f"\nAttempting a short Bleak connection to {TARGET_ADDRESS}") await adapter.connect_device(TARGET_ADDRESS) print("Connection attempt complete.") if __name__ == "__main__": asyncio.run(main())