73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
"""
|
|
A quick script to test the async Bleak-backed adapter helper.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
|
|
from nxbt import (
|
|
AsyncBleakAdapter,
|
|
async_find_objects,
|
|
find_objects,
|
|
SERVICE_NAME,
|
|
ADAPTER_INTERFACE,
|
|
)
|
|
|
|
TARGET_ADDRESS = os.environ.get("NXBT_TEST_DEVICE")
|
|
|
|
|
|
async def main():
|
|
# Prefer the async helper when running inside an event loop.
|
|
try:
|
|
adapters = await async_find_objects(None, SERVICE_NAME, ADAPTER_INTERFACE)
|
|
except RuntimeError:
|
|
# Fallback to the legacy synchronous variant if no loop is running.
|
|
adapters = find_objects(None, SERVICE_NAME, ADAPTER_INTERFACE)
|
|
print(adapters)
|
|
|
|
adapter = AsyncBleakAdapter(adapter_path=adapters[0])
|
|
|
|
address = await adapter.get_address()
|
|
print("Address", address)
|
|
print("Name", adapter.name)
|
|
print("Alias", adapter.alias)
|
|
print("Pairable", adapter.pairable)
|
|
|
|
print("")
|
|
print("Pairable Timeout", adapter.pairable_timeout)
|
|
adapter.set_pairable_timeout(10)
|
|
print("Pairable Timeout", adapter.pairable_timeout)
|
|
adapter.set_pairable_timeout(0)
|
|
print("Pairable Timeout", adapter.pairable_timeout)
|
|
|
|
print("")
|
|
print("Discoverable", adapter.discoverable)
|
|
adapter.set_discoverable(True)
|
|
print("Discoverable", adapter.discoverable)
|
|
adapter.set_discoverable(False)
|
|
print("Discoverable", adapter.discoverable)
|
|
|
|
print("")
|
|
print("Discoverable Timeout", adapter.discoverable_timeout)
|
|
adapter.set_discoverable_timeout(0)
|
|
print("Discoverable Timeout", adapter.discoverable_timeout)
|
|
adapter.set_discoverable_timeout(180)
|
|
print("Discoverable Timeout", adapter.discoverable_timeout)
|
|
|
|
print("\nScanning for nearby devices...")
|
|
try:
|
|
devices = await adapter.discover_devices(timeout=5)
|
|
except Exception as exc:
|
|
print(f"Discovery failed: {exc}")
|
|
else:
|
|
for path, props in devices.items():
|
|
print(f"{props['Alias'] or 'UNKNOWN'} -> {props['Address']} ({path})")
|
|
|
|
if TARGET_ADDRESS:
|
|
print(f"\nAttempting a short Bleak connection to {TARGET_ADDRESS}")
|
|
await adapter.connect_device(TARGET_ADDRESS)
|
|
print("Connection attempt complete.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|