Docs and connection
This commit is contained in:
parent
954b47253c
commit
215e7f8bde
2 changed files with 62 additions and 18 deletions
|
|
@ -49,8 +49,14 @@ python controller_uart_bridge.py --interactive
|
||||||
Options:
|
Options:
|
||||||
- `--map index:PORT` (repeatable) to pin controller index to serial (e.g., `--map 0:/dev/cu.usbserial-0001` or `--map 0:COM5`).
|
- `--map index:PORT` (repeatable) to pin controller index to serial (e.g., `--map 0:/dev/cu.usbserial-0001` or `--map 0:COM5`).
|
||||||
- `--ports PORTS...` or `--interactive` for auto/interactive pairing.
|
- `--ports PORTS...` or `--interactive` for auto/interactive pairing.
|
||||||
|
- `--all-ports` to include non-USB serial devices in discovery.
|
||||||
|
- `--ignore-port-desc SUBSTR` / `--include-port-desc SUBSTR` to filter serial ports by description (repeatable).
|
||||||
|
- `--include-controller-name SUBSTR` to only open controllers whose name matches (repeatable).
|
||||||
- `--baud 921600` (default 921600; use `500000` if your adapter can’t do 900K).
|
- `--baud 921600` (default 921600; use `500000` if your adapter can’t do 900K).
|
||||||
- `--frequency 1000` to send at 1 kHz.
|
- `--frequency 1000` to send at 1 kHz.
|
||||||
|
- `--deadzone 0.08` to change stick deadzone (0.0-1.0).
|
||||||
|
- `--trigger-threshold 0.35` to change analog trigger press threshold (0.0-1.0).
|
||||||
|
- `--swap-abxy` or `--swap-abxy-index N` to flip AB/XY globally or for specific controller indices (repeatable).
|
||||||
- `--sdl-mapping path/to/gamecontrollerdb.txt` to load extra SDL mappings (defaults to `controller_db/gamecontrollerdb.txt`).
|
- `--sdl-mapping path/to/gamecontrollerdb.txt` to load extra SDL mappings (defaults to `controller_db/gamecontrollerdb.txt`).
|
||||||
|
|
||||||
Hot‑plugging: controllers and UARTs can be plugged/unplugged while running; the bridge will auto reconnect when possible.
|
Hot‑plugging: controllers and UARTs can be plugged/unplugged while running; the bridge will auto reconnect when possible.
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import struct
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
from ctypes import create_string_buffer
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Optional, Tuple
|
from typing import Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
|
@ -369,6 +370,7 @@ class ControllerContext:
|
||||||
controller: sdl2.SDL_GameController
|
controller: sdl2.SDL_GameController
|
||||||
instance_id: int
|
instance_id: int
|
||||||
controller_index: int
|
controller_index: int
|
||||||
|
stable_id: int
|
||||||
port: Optional[str]
|
port: Optional[str]
|
||||||
uart: Optional[PicoUART]
|
uart: Optional[PicoUART]
|
||||||
report: SwitchReport = field(default_factory=SwitchReport)
|
report: SwitchReport = field(default_factory=SwitchReport)
|
||||||
|
|
@ -383,14 +385,18 @@ class ControllerContext:
|
||||||
rumble_active: bool = False
|
rumble_active: bool = False
|
||||||
|
|
||||||
|
|
||||||
def open_controller(index: int) -> Tuple[sdl2.SDL_GameController, int]:
|
def open_controller(index: int) -> Tuple[sdl2.SDL_GameController, int, str]:
|
||||||
"""Open an SDL GameController by index and return it with instance ID."""
|
"""Open an SDL GameController by index and return it with instance ID and GUID string."""
|
||||||
controller = sdl2.SDL_GameControllerOpen(index)
|
controller = sdl2.SDL_GameControllerOpen(index)
|
||||||
if not controller:
|
if not controller:
|
||||||
raise RuntimeError(f"Failed to open controller {index}: {sdl2.SDL_GetError().decode()}")
|
raise RuntimeError(f"Failed to open controller {index}: {sdl2.SDL_GetError().decode()}")
|
||||||
joystick = sdl2.SDL_GameControllerGetJoystick(controller)
|
joystick = sdl2.SDL_GameControllerGetJoystick(controller)
|
||||||
instance_id = sdl2.SDL_JoystickInstanceID(joystick)
|
instance_id = sdl2.SDL_JoystickInstanceID(joystick)
|
||||||
return controller, instance_id
|
guid = sdl2.SDL_JoystickGetGUID(joystick)
|
||||||
|
buf = create_string_buffer(33)
|
||||||
|
sdl2.SDL_JoystickGetGUIDString(guid, buf, 33)
|
||||||
|
guid_str = buf.value.decode() if buf.value else ""
|
||||||
|
return controller, instance_id, guid_str
|
||||||
|
|
||||||
|
|
||||||
def try_open_uart(port: str, baud: int) -> Optional[PicoUART]:
|
def try_open_uart(port: str, baud: int) -> Optional[PicoUART]:
|
||||||
|
|
@ -534,6 +540,19 @@ class PairingState:
|
||||||
include_port_desc: List[str] = field(default_factory=list)
|
include_port_desc: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ControllerIdRegistry:
|
||||||
|
"""Assign stable IDs to controllers based on their GUID."""
|
||||||
|
guid_to_id: Dict[str, int] = field(default_factory=dict)
|
||||||
|
next_id: int = 0
|
||||||
|
|
||||||
|
def stable_id_for_guid(self, guid: str) -> int:
|
||||||
|
if guid not in self.guid_to_id:
|
||||||
|
self.guid_to_id[guid] = self.next_id
|
||||||
|
self.next_id += 1
|
||||||
|
return self.guid_to_id[guid]
|
||||||
|
|
||||||
|
|
||||||
def load_button_maps(console: Console, args: argparse.Namespace) -> Tuple[Dict[int, int], Dict[int, int], set[int]]:
|
def load_button_maps(console: Console, args: argparse.Namespace) -> Tuple[Dict[int, int], Dict[int, int], set[int]]:
|
||||||
"""Load SDL controller mappings and return button map variants."""
|
"""Load SDL controller mappings and return button map variants."""
|
||||||
default_mapping = Path(__file__).parent / "controller_db" / "gamecontrollerdb.txt"
|
default_mapping = Path(__file__).parent / "controller_db" / "gamecontrollerdb.txt"
|
||||||
|
|
@ -770,14 +789,22 @@ def pair_waiting_contexts(
|
||||||
if uart:
|
if uart:
|
||||||
uarts.append(uart)
|
uarts.append(uart)
|
||||||
ctx.uart = uart
|
ctx.uart = uart
|
||||||
console.print(f"[green]Controller {ctx.controller_index} ({ctx.instance_id}) paired to {port_choice}[/green]")
|
console.print(
|
||||||
|
f"[green]Controller {ctx.controller_index} (id {ctx.stable_id}, inst {ctx.instance_id}) paired to {port_choice}[/green]"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
ctx.uart = None
|
ctx.uart = None
|
||||||
console.print(f"[yellow]Controller {ctx.controller_index} ({ctx.instance_id}) waiting for UART {port_choice}[/yellow]")
|
console.print(
|
||||||
|
f"[yellow]Controller {ctx.controller_index} (id {ctx.stable_id}, inst {ctx.instance_id}) waiting for UART {port_choice}[/yellow]"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def open_initial_contexts(
|
def open_initial_contexts(
|
||||||
args: argparse.Namespace, pairing: PairingState, controller_indices: List[int], console: Console
|
args: argparse.Namespace,
|
||||||
|
pairing: PairingState,
|
||||||
|
controller_indices: List[int],
|
||||||
|
console: Console,
|
||||||
|
id_registry: ControllerIdRegistry,
|
||||||
) -> Tuple[Dict[int, ControllerContext], List[PicoUART]]:
|
) -> Tuple[Dict[int, ControllerContext], List[PicoUART]]:
|
||||||
"""Open initial controllers and UARTs for detected indices."""
|
"""Open initial controllers and UARTs for detected indices."""
|
||||||
contexts: Dict[int, ControllerContext] = {}
|
contexts: Dict[int, ControllerContext] = {}
|
||||||
|
|
@ -792,22 +819,26 @@ def open_initial_contexts(
|
||||||
if port is None and not pairing.auto_pairing_enabled:
|
if port is None and not pairing.auto_pairing_enabled:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
controller, instance_id = open_controller(index)
|
controller, instance_id, guid = open_controller(index)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
console.print(f"[red]Failed to open controller {index}: {exc}[/red]")
|
console.print(f"[red]Failed to open controller {index}: {exc}[/red]")
|
||||||
continue
|
continue
|
||||||
|
stable_id = id_registry.stable_id_for_guid(guid)
|
||||||
uart = open_uart_or_warn(port, args.baud, console) if port else None
|
uart = open_uart_or_warn(port, args.baud, console) if port else None
|
||||||
if uart:
|
if uart:
|
||||||
uarts.append(uart)
|
uarts.append(uart)
|
||||||
console.print(f"[green]Controller {index} ({instance_id}) paired to {port}[/green]")
|
console.print(f"[green]Controller {index} (id {stable_id}, inst {instance_id}) paired to {port}[/green]")
|
||||||
elif port:
|
elif port:
|
||||||
console.print(f"[yellow]Controller {index} ({instance_id}) waiting for UART {port}[/yellow]")
|
console.print(f"[yellow]Controller {index} (id {stable_id}, inst {instance_id}) waiting for UART {port}[/yellow]")
|
||||||
else:
|
else:
|
||||||
console.print(f"[yellow]Controller {index} ({instance_id}) connected; waiting for an available UART[/yellow]")
|
console.print(
|
||||||
|
f"[yellow]Controller {index} (id {stable_id}, inst {instance_id}) connected; waiting for an available UART[/yellow]"
|
||||||
|
)
|
||||||
ctx = ControllerContext(
|
ctx = ControllerContext(
|
||||||
controller=controller,
|
controller=controller,
|
||||||
instance_id=instance_id,
|
instance_id=instance_id,
|
||||||
controller_index=index,
|
controller_index=index,
|
||||||
|
stable_id=stable_id,
|
||||||
port=port,
|
port=port,
|
||||||
uart=uart,
|
uart=uart,
|
||||||
)
|
)
|
||||||
|
|
@ -884,6 +915,7 @@ def handle_device_added(
|
||||||
contexts: Dict[int, ControllerContext],
|
contexts: Dict[int, ControllerContext],
|
||||||
uarts: List[PicoUART],
|
uarts: List[PicoUART],
|
||||||
console: Console,
|
console: Console,
|
||||||
|
id_registry: ControllerIdRegistry,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Handle controller hotplug by opening and pairing UART if possible."""
|
"""Handle controller hotplug by opening and pairing UART if possible."""
|
||||||
idx = event.cdevice.which
|
idx = event.cdevice.which
|
||||||
|
|
@ -899,22 +931,26 @@ def handle_device_added(
|
||||||
console.print(f"[yellow]Index {idx} is not a GameController ({name_str}). Trying raw open failed.[/yellow]")
|
console.print(f"[yellow]Index {idx} is not a GameController ({name_str}). Trying raw open failed.[/yellow]")
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
controller, instance_id = open_controller(idx)
|
controller, instance_id, guid = open_controller(idx)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
console.print(f"[red]Hotplug open failed for controller {idx}: {exc}[/red]")
|
console.print(f"[red]Hotplug open failed for controller {idx}: {exc}[/red]")
|
||||||
return
|
return
|
||||||
|
stable_id = id_registry.stable_id_for_guid(guid)
|
||||||
uart = open_uart_or_warn(port, args.baud, console) if port else None
|
uart = open_uart_or_warn(port, args.baud, console) if port else None
|
||||||
if uart:
|
if uart:
|
||||||
uarts.append(uart)
|
uarts.append(uart)
|
||||||
console.print(f"[green]Controller {idx} ({instance_id}) paired to {port}[/green]")
|
console.print(f"[green]Controller {idx} (id {stable_id}, inst {instance_id}) paired to {port}[/green]")
|
||||||
elif port:
|
elif port:
|
||||||
console.print(f"[yellow]Controller {idx} ({instance_id}) waiting for UART {port}[/yellow]")
|
console.print(f"[yellow]Controller {idx} (id {stable_id}, inst {instance_id}) waiting for UART {port}[/yellow]")
|
||||||
else:
|
else:
|
||||||
console.print(f"[yellow]Controller {idx} ({instance_id}) connected; waiting for an available UART[/yellow]")
|
console.print(
|
||||||
|
f"[yellow]Controller {idx} (id {stable_id}, inst {instance_id}) connected; waiting for an available UART[/yellow]"
|
||||||
|
)
|
||||||
ctx = ControllerContext(
|
ctx = ControllerContext(
|
||||||
controller=controller,
|
controller=controller,
|
||||||
instance_id=instance_id,
|
instance_id=instance_id,
|
||||||
controller_index=idx,
|
controller_index=idx,
|
||||||
|
stable_id=stable_id,
|
||||||
port=port,
|
port=port,
|
||||||
uart=uart,
|
uart=uart,
|
||||||
)
|
)
|
||||||
|
|
@ -932,7 +968,7 @@ def handle_device_removed(
|
||||||
ctx = contexts.pop(instance_id, None)
|
ctx = contexts.pop(instance_id, None)
|
||||||
if not ctx:
|
if not ctx:
|
||||||
return
|
return
|
||||||
console.print(f"[yellow]Controller {instance_id} removed[/yellow]")
|
console.print(f"[yellow]Controller {instance_id} (id {ctx.stable_id}) removed[/yellow]")
|
||||||
if ctx.controller_index in pairing.auto_assigned_indices:
|
if ctx.controller_index in pairing.auto_assigned_indices:
|
||||||
# Return auto-paired UART back to the pool so a future device can use it.
|
# Return auto-paired UART back to the pool so a future device can use it.
|
||||||
freed = pairing.mapping_by_index.pop(ctx.controller_index, None)
|
freed = pairing.mapping_by_index.pop(ctx.controller_index, None)
|
||||||
|
|
@ -1019,6 +1055,7 @@ def run_bridge_loop(
|
||||||
pairing: PairingState,
|
pairing: PairingState,
|
||||||
contexts: Dict[int, ControllerContext],
|
contexts: Dict[int, ControllerContext],
|
||||||
uarts: List[PicoUART],
|
uarts: List[PicoUART],
|
||||||
|
id_registry: ControllerIdRegistry,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Main event loop for bridging controllers to UART and handling rumble."""
|
"""Main event loop for bridging controllers to UART and handling rumble."""
|
||||||
event = sdl2.SDL_Event()
|
event = sdl2.SDL_Event()
|
||||||
|
|
@ -1036,7 +1073,7 @@ def run_bridge_loop(
|
||||||
elif event.type in (sdl2.SDL_CONTROLLERBUTTONDOWN, sdl2.SDL_CONTROLLERBUTTONUP):
|
elif event.type in (sdl2.SDL_CONTROLLERBUTTONDOWN, sdl2.SDL_CONTROLLERBUTTONUP):
|
||||||
handle_button_event(event, args, config, contexts)
|
handle_button_event(event, args, config, contexts)
|
||||||
elif event.type == sdl2.SDL_CONTROLLERDEVICEADDED:
|
elif event.type == sdl2.SDL_CONTROLLERDEVICEADDED:
|
||||||
handle_device_added(event, args, pairing, contexts, uarts, console)
|
handle_device_added(event, args, pairing, contexts, uarts, console, id_registry)
|
||||||
elif event.type == sdl2.SDL_CONTROLLERDEVICEREMOVED:
|
elif event.type == sdl2.SDL_CONTROLLERDEVICEREMOVED:
|
||||||
handle_device_removed(event, pairing, contexts, console)
|
handle_device_removed(event, pairing, contexts, console)
|
||||||
|
|
||||||
|
|
@ -1068,15 +1105,16 @@ def main() -> None:
|
||||||
console = Console()
|
console = Console()
|
||||||
config = build_bridge_config(console, args)
|
config = build_bridge_config(console, args)
|
||||||
initialize_sdl(parser)
|
initialize_sdl(parser)
|
||||||
|
id_registry = ControllerIdRegistry()
|
||||||
contexts: Dict[int, ControllerContext] = {}
|
contexts: Dict[int, ControllerContext] = {}
|
||||||
uarts: List[PicoUART] = []
|
uarts: List[PicoUART] = []
|
||||||
try:
|
try:
|
||||||
controller_indices, controller_names = detect_controllers(console, args, parser)
|
controller_indices, controller_names = detect_controllers(console, args, parser)
|
||||||
pairing = prepare_pairing_state(args, console, parser, controller_indices, controller_names)
|
pairing = prepare_pairing_state(args, console, parser, controller_indices, controller_names)
|
||||||
contexts, uarts = open_initial_contexts(args, pairing, controller_indices, console)
|
contexts, uarts = open_initial_contexts(args, pairing, controller_indices, console, id_registry)
|
||||||
if not contexts:
|
if not contexts:
|
||||||
console.print("[yellow]No controllers opened; waiting for hotplug events...[/yellow]")
|
console.print("[yellow]No controllers opened; waiting for hotplug events...[/yellow]")
|
||||||
run_bridge_loop(args, console, config, pairing, contexts, uarts)
|
run_bridge_loop(args, console, config, pairing, contexts, uarts, id_registry)
|
||||||
finally:
|
finally:
|
||||||
cleanup(contexts, uarts)
|
cleanup(contexts, uarts)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue