From 963c19c5d6660bd217e7450893acc389f8521f12 Mon Sep 17 00:00:00 2001 From: Joey Yakimowich-Payne Date: Sun, 23 Nov 2025 09:27:16 -0700 Subject: [PATCH] Refactor --- controller_uart_bridge.py | 813 ++++++++++++++++++++++---------------- 1 file changed, 471 insertions(+), 342 deletions(-) diff --git a/controller_uart_bridge.py b/controller_uart_bridge.py index dd38821..81f77da 100644 --- a/controller_uart_bridge.py +++ b/controller_uart_bridge.py @@ -495,16 +495,32 @@ def poll_controller_buttons(ctx: ControllerContext, button_map: Dict[int, int]) ctx.report.hat = dpad_to_hat(ctx.dpad) -def main() -> None: - parser = build_arg_parser() - args = parser.parse_args() - interval = 1.0 / max(args.frequency, 1.0) - deadzone_raw = int(max(0.0, min(args.deadzone, 1.0)) * 32767) - trigger_threshold = int(max(0.0, min(args.trigger_threshold, 1.0)) * 32767) +@dataclass +class BridgeConfig: + interval: float + deadzone_raw: int + trigger_threshold: int + button_map_default: Dict[int, int] + button_map_swapped: Dict[int, int] + swap_abxy_indices: set[int] - # Load bundled mapping plus any user-supplied mapping files. + +@dataclass +class PairingState: + mapping_by_index: Dict[int, str] + available_ports: List[str] + auto_assigned_indices: set[int] = field(default_factory=set) + auto_pairing_enabled: bool = False + auto_discover_ports: bool = False + include_non_usb: bool = False + ignore_port_desc: List[str] = field(default_factory=list) + include_port_desc: List[str] = field(default_factory=list) + + +def load_button_maps(console: Console, args: argparse.Namespace) -> Tuple[Dict[int, int], Dict[int, int], set[int]]: + """Load SDL controller mappings and return button map variants.""" default_mapping = Path(__file__).parent / "controller_db" / "gamecontrollerdb.txt" - mappings_to_load = [] + mappings_to_load: List[str] = [] if default_mapping.exists(): mappings_to_load.append(str(default_mapping)) mappings_to_load.extend(args.sdl_mapping) @@ -514,16 +530,32 @@ def main() -> None: button_map_swapped[sdl2.SDL_CONTROLLER_BUTTON_B] = SwitchButton.A button_map_swapped[sdl2.SDL_CONTROLLER_BUTTON_X] = SwitchButton.Y button_map_swapped[sdl2.SDL_CONTROLLER_BUTTON_Y] = SwitchButton.X - swap_abxy_indices = set(idx for idx in args.swap_abxy_index if idx is not None and idx >= 0) + swap_abxy_indices = {idx for idx in args.swap_abxy_index if idx is not None and idx >= 0} for mapping_path in mappings_to_load: try: loaded = sdl2.SDL_GameControllerAddMappingsFromFile(mapping_path.encode()) - console = Console() console.print(f"[green]Loaded {loaded} SDL mapping(s) from {mapping_path}[/green]") except Exception as exc: - console = Console() console.print(f"[red]Failed to load SDL mapping {mapping_path}: {exc}[/red]") + return button_map_default, button_map_swapped, swap_abxy_indices + +def build_bridge_config(console: Console, args: argparse.Namespace) -> BridgeConfig: + interval = 1.0 / max(args.frequency, 1.0) + deadzone_raw = int(max(0.0, min(args.deadzone, 1.0)) * 32767) + trigger_threshold = int(max(0.0, min(args.trigger_threshold, 1.0)) * 32767) + button_map_default, button_map_swapped, swap_abxy_indices = load_button_maps(console, args) + return BridgeConfig( + interval=interval, + deadzone_raw=deadzone_raw, + trigger_threshold=trigger_threshold, + button_map_default=button_map_default, + button_map_swapped=button_map_swapped, + swap_abxy_indices=swap_abxy_indices, + ) + + +def initialize_sdl(parser: argparse.ArgumentParser) -> None: sdl2.SDL_SetHint(sdl2.SDL_HINT_JOYSTICK_ALLOW_BACKGROUND_EVENTS, b"1") set_hint("SDL_JOYSTICK_HIDAPI", "1") set_hint("SDL_JOYSTICK_HIDAPI_SWITCH", "1") @@ -531,350 +563,447 @@ def main() -> None: set_hint("SDL_GAMECONTROLLER_USE_BUTTON_LABELS", "1") if sdl2.SDL_Init(sdl2.SDL_INIT_GAMECONTROLLER | sdl2.SDL_INIT_JOYSTICK | sdl2.SDL_INIT_EVERYTHING) != 0: parser.error(f"SDL init failed: {sdl2.SDL_GetError().decode(errors='ignore')}") + + +def detect_controllers( + console: Console, args: argparse.Namespace, parser: argparse.ArgumentParser +) -> Tuple[List[int], Dict[int, str]]: + controller_indices: List[int] = [] + controller_names: Dict[int, str] = {} + controller_count = sdl2.SDL_NumJoysticks() + if controller_count < 0: + parser.error(f"SDL error: {sdl2.SDL_GetError().decode()}") + include_controller_name = [n.lower() for n in args.include_controller_name] + for index in range(controller_count): + if sdl2.SDL_IsGameController(index): + name = sdl2.SDL_GameControllerNameForIndex(index) + name_str = name.decode() if isinstance(name, bytes) else str(name) + if include_controller_name and all(substr not in name_str.lower() for substr in include_controller_name): + console.print(f"[yellow]Skipping controller {index} ({name_str}) due to name filter[/yellow]") + continue + console.print(f"[cyan]Detected controller {index}: {name_str}[/cyan]") + controller_indices.append(index) + controller_names[index] = name_str + else: + name = sdl2.SDL_JoystickNameForIndex(index) + name_str = name.decode() if isinstance(name, bytes) else str(name) + if include_controller_name and all(substr not in name_str.lower() for substr in include_controller_name): + console.print(f"[yellow]Skipping joystick {index} ({name_str}) due to name filter[/yellow]") + continue + console.print(f"[yellow]Found joystick {index} (not a GameController): {name_str}[/yellow]") + return controller_indices, controller_names + + +def prepare_pairing_state( + args: argparse.Namespace, + console: Console, + parser: argparse.ArgumentParser, + controller_indices: List[int], + controller_names: Dict[int, str], +) -> PairingState: + auto_pairing_enabled = not args.map and not args.interactive + auto_discover_ports = auto_pairing_enabled and not args.ports + include_non_usb = args.all_ports or False + ignore_port_desc = [d.lower() for d in args.ignore_port_desc] + include_port_desc = [d.lower() for d in args.include_port_desc] + available_ports: List[str] = [] + + mappings = list(args.map) + if args.interactive: + if not controller_indices: + parser.error("No controllers detected for interactive pairing.") + discovered = discover_ports( + include_non_usb=include_non_usb, + ignore_descriptions=ignore_port_desc, + include_descriptions=include_port_desc, + ) + if not discovered: + parser.error("No UART devices found for interactive pairing.") + mappings = interactive_pairing(console, controller_names, discovered) + if not mappings: + parser.error("No controller-to-UART mappings were selected.") + elif auto_pairing_enabled: + if args.ports: + available_ports.extend(list(args.ports)) + console.print(f"[green]Prepared {len(available_ports)} specified UART port(s) for auto-pairing.[/green]") + else: + discovered = discover_ports( + include_non_usb=include_non_usb, + ignore_descriptions=ignore_port_desc, + include_descriptions=include_port_desc, + ) + if discovered: + available_ports.extend(info["device"] for info in discovered) + console.print("[green]Auto-detected UARTs:[/green]") + for info in discovered: + console.print(f" {info['device']} ({info['description']})") + else: + console.print("[yellow]No UART devices detected yet; waiting for hotplug...[/yellow]") + + mapping_by_index = {index: port for index, port in mappings} + return PairingState( + mapping_by_index=mapping_by_index, + available_ports=available_ports, + auto_pairing_enabled=auto_pairing_enabled, + auto_discover_ports=auto_discover_ports, + include_non_usb=include_non_usb, + ignore_port_desc=ignore_port_desc, + include_port_desc=include_port_desc, + ) + + +def assign_port_for_index(pairing: PairingState, idx: int, console: Console) -> Optional[str]: + if idx in pairing.mapping_by_index: + return pairing.mapping_by_index[idx] + if not pairing.auto_pairing_enabled: + return None + if not pairing.available_ports: + return None + port_choice = pairing.available_ports.pop(0) + pairing.mapping_by_index[idx] = port_choice + pairing.auto_assigned_indices.add(idx) + console.print(f"[green]Auto-paired controller {idx} to {port_choice}[/green]") + return port_choice + + +def ports_in_use(pairing: PairingState, contexts: Dict[int, ControllerContext]) -> set: + used = set(pairing.mapping_by_index.values()) + used.update(ctx.port for ctx in contexts.values() if ctx.port) + return used + + +def discover_new_ports(pairing: PairingState, contexts: Dict[int, ControllerContext], console: Console) -> None: + if not pairing.auto_discover_ports: + return + discovered = discover_ports( + include_non_usb=pairing.include_non_usb, + ignore_descriptions=pairing.ignore_port_desc, + include_descriptions=pairing.include_port_desc, + ) + in_use = ports_in_use(pairing, contexts) + for info in discovered: + path = info["device"] + if path in in_use or path in pairing.available_ports: + continue + pairing.available_ports.append(path) + console.print(f"[green]Discovered UART {path} ({info['description']}); available for pairing.[/green]") + + +def pair_waiting_contexts( + args: argparse.Namespace, + pairing: PairingState, + contexts: Dict[int, ControllerContext], + uarts: List[PicoUART], + console: Console, +) -> None: + for ctx in list(contexts.values()): + if ctx.port is not None: + continue + port_choice = assign_port_for_index(pairing, ctx.controller_index, console) + if port_choice is None: + continue + ctx.port = port_choice + uart = open_uart_or_warn(port_choice, args.baud, console) + ctx.last_reopen_attempt = time.monotonic() + if uart: + uarts.append(uart) + ctx.uart = uart + console.print(f"[green]Controller {ctx.controller_index} ({ctx.instance_id}) paired to {port_choice}[/green]") + else: + ctx.uart = None + console.print(f"[yellow]Controller {ctx.controller_index} ({ctx.instance_id}) waiting for UART {port_choice}[/yellow]") + + +def open_initial_contexts( + args: argparse.Namespace, pairing: PairingState, controller_indices: List[int], console: Console +) -> Tuple[Dict[int, ControllerContext], List[PicoUART]]: contexts: Dict[int, ControllerContext] = {} uarts: List[PicoUART] = [] - mapping_by_index: Dict[int, str] = {} - console = Console() + for index in controller_indices: + if index >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(index): + name = sdl2.SDL_JoystickNameForIndex(index) + name_str = name.decode() if isinstance(name, bytes) else str(name) + console.print(f"[yellow]Index {index} is not a GameController ({name_str}). Trying raw open failed.[/yellow]") + continue + port = assign_port_for_index(pairing, index, console) + if port is None and not pairing.auto_pairing_enabled: + continue + try: + controller, instance_id = open_controller(index) + except Exception as exc: + console.print(f"[red]Failed to open controller {index}: {exc}[/red]") + continue + uart = open_uart_or_warn(port, args.baud, console) if port else None + if uart: + uarts.append(uart) + console.print(f"[green]Controller {index} ({instance_id}) paired to {port}[/green]") + elif port: + console.print(f"[yellow]Controller {index} ({instance_id}) waiting for UART {port}[/yellow]") + else: + console.print(f"[yellow]Controller {index} ({instance_id}) connected; waiting for an available UART[/yellow]") + ctx = ControllerContext( + controller=controller, + instance_id=instance_id, + controller_index=index, + port=port, + uart=uart, + ) + contexts[instance_id] = ctx + return contexts, uarts + + +def handle_axis_motion(event: sdl2.SDL_Event, contexts: Dict[int, ControllerContext], config: BridgeConfig) -> None: + ctx = contexts.get(event.caxis.which) + if not ctx: + return + axis = event.caxis.axis + value = event.caxis.value + if axis == sdl2.SDL_CONTROLLER_AXIS_LEFTX: + ctx.report.lx = axis_to_stick(value, config.deadzone_raw) + elif axis == sdl2.SDL_CONTROLLER_AXIS_LEFTY: + ctx.report.ly = axis_to_stick(value, config.deadzone_raw) + elif axis == sdl2.SDL_CONTROLLER_AXIS_RIGHTX: + ctx.report.rx = axis_to_stick(value, config.deadzone_raw) + elif axis == sdl2.SDL_CONTROLLER_AXIS_RIGHTY: + ctx.report.ry = axis_to_stick(value, config.deadzone_raw) + elif axis == sdl2.SDL_CONTROLLER_AXIS_TRIGGERLEFT: + pressed = trigger_to_button(value, config.trigger_threshold) + if pressed != ctx.last_trigger_state["left"]: + if pressed: + ctx.report.buttons |= SwitchButton.ZL + else: + ctx.report.buttons &= ~SwitchButton.ZL + ctx.last_trigger_state["left"] = pressed + elif axis == sdl2.SDL_CONTROLLER_AXIS_TRIGGERRIGHT: + pressed = trigger_to_button(value, config.trigger_threshold) + if pressed != ctx.last_trigger_state["right"]: + if pressed: + ctx.report.buttons |= SwitchButton.ZR + else: + ctx.report.buttons &= ~SwitchButton.ZR + ctx.last_trigger_state["right"] = pressed + + +def handle_button_event( + event: sdl2.SDL_Event, + args: argparse.Namespace, + config: BridgeConfig, + contexts: Dict[int, ControllerContext], +) -> None: + ctx = contexts.get(event.cbutton.which) + if not ctx: + return + current_button_map = ( + config.button_map_swapped + if (args.swap_abxy or ctx.controller_index in config.swap_abxy_indices) + else config.button_map_default + ) + button = event.cbutton.button + pressed = event.type == sdl2.SDL_CONTROLLERBUTTONDOWN + if button in current_button_map: + bit = current_button_map[button] + if pressed: + ctx.report.buttons |= bit + else: + ctx.report.buttons &= ~bit + ctx.button_state[button] = pressed + elif button in DPAD_BUTTONS: + ctx.dpad[DPAD_BUTTONS[button]] = pressed + ctx.report.hat = dpad_to_hat(ctx.dpad) + + +def handle_device_added( + event: sdl2.SDL_Event, + args: argparse.Namespace, + pairing: PairingState, + contexts: Dict[int, ControllerContext], + uarts: List[PicoUART], + console: Console, +) -> None: + idx = event.cdevice.which + if any(c.controller_index == idx for c in contexts.values()): + return + port = assign_port_for_index(pairing, idx, console) + if port is None and not pairing.auto_pairing_enabled: + return + if idx >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(idx): + name = sdl2.SDL_JoystickNameForIndex(idx) + name_str = name.decode() if isinstance(name, bytes) else str(name) + console.print(f"[yellow]Index {idx} is not a GameController ({name_str}). Trying raw open failed.[/yellow]") + return try: - controller_indices: List[int] = [] - controller_names: Dict[int, str] = {} - controller_count = sdl2.SDL_NumJoysticks() - if controller_count < 0: - parser.error(f"SDL error: {sdl2.SDL_GetError().decode()}") - auto_pairing_enabled = not args.map and not args.interactive - auto_discover_ports = auto_pairing_enabled and not args.ports - available_ports: List[str] = [] - auto_assigned_indices: set[int] = set() - include_non_usb = args.all_ports or False - ignore_port_desc = [d.lower() for d in args.ignore_port_desc] - include_port_desc = [d.lower() for d in args.include_port_desc] - include_controller_name = [n.lower() for n in args.include_controller_name] - for index in range(controller_count): - if sdl2.SDL_IsGameController(index): - name = sdl2.SDL_GameControllerNameForIndex(index) - name_str = name.decode() if isinstance(name, bytes) else str(name) - if include_controller_name and all(substr not in name_str.lower() for substr in include_controller_name): - console.print(f"[yellow]Skipping controller {index} ({name_str}) due to name filter[/yellow]") - continue - console.print(f"[cyan]Detected controller {index}: {name_str}[/cyan]") - controller_indices.append(index) - controller_names[index] = name_str - else: - name = sdl2.SDL_JoystickNameForIndex(index) - name_str = name.decode() if isinstance(name, bytes) else str(name) - if include_controller_name and all(substr not in name_str.lower() for substr in include_controller_name): - console.print(f"[yellow]Skipping joystick {index} ({name_str}) due to name filter[/yellow]") - continue - console.print(f"[yellow]Found joystick {index} (not a GameController): {name_str}[/yellow]") + controller, instance_id = open_controller(idx) + except Exception as exc: + console.print(f"[red]Hotplug open failed for controller {idx}: {exc}[/red]") + return + uart = open_uart_or_warn(port, args.baud, console) if port else None + if uart: + uarts.append(uart) + console.print(f"[green]Controller {idx} ({instance_id}) paired to {port}[/green]") + elif port: + console.print(f"[yellow]Controller {idx} ({instance_id}) waiting for UART {port}[/yellow]") + else: + console.print(f"[yellow]Controller {idx} ({instance_id}) connected; waiting for an available UART[/yellow]") + ctx = ControllerContext( + controller=controller, + instance_id=instance_id, + controller_index=idx, + port=port, + uart=uart, + ) + contexts[instance_id] = ctx - mappings = list(args.map) - if args.interactive: - if not controller_indices: - parser.error("No controllers detected for interactive pairing.") - discovered = discover_ports( - include_non_usb=include_non_usb, - ignore_descriptions=ignore_port_desc, - include_descriptions=include_port_desc, - ) - if not discovered: - parser.error("No UART devices found for interactive pairing.") - mappings = interactive_pairing(console, controller_names, discovered) - if not mappings: - parser.error("No controller-to-UART mappings were selected.") - elif auto_pairing_enabled: - if args.ports: - available_ports.extend(list(args.ports)) - console.print(f"[green]Prepared {len(available_ports)} specified UART port(s) for auto-pairing.[/green]") - else: - discovered = discover_ports( - include_non_usb=include_non_usb, - ignore_descriptions=ignore_port_desc, - include_descriptions=include_port_desc, - ) - if discovered: - available_ports.extend(info["device"] for info in discovered) - console.print("[green]Auto-detected UARTs:[/green]") - for info in discovered: - console.print(f" {info['device']} ({info['description']})") - else: - console.print("[yellow]No UART devices detected yet; waiting for hotplug...[/yellow]") - for index, port in mappings: - mapping_by_index[index] = port +def handle_device_removed( + event: sdl2.SDL_Event, + pairing: PairingState, + contexts: Dict[int, ControllerContext], + console: Console, +) -> None: + instance_id = event.cdevice.which + ctx = contexts.pop(instance_id, None) + if not ctx: + return + console.print(f"[yellow]Controller {instance_id} removed[/yellow]") + if ctx.controller_index in pairing.auto_assigned_indices: + freed = pairing.mapping_by_index.pop(ctx.controller_index, None) + pairing.auto_assigned_indices.discard(ctx.controller_index) + if freed and freed not in pairing.available_ports: + pairing.available_ports.append(freed) + console.print(f"[cyan]Released UART {freed} back to pool[/cyan]") + sdl2.SDL_GameControllerClose(ctx.controller) - def assign_port_for_index(idx: int) -> Optional[str]: - if idx in mapping_by_index: - return mapping_by_index[idx] - if not auto_pairing_enabled: - return None - if not available_ports: - return None - port_choice = available_ports.pop(0) - mapping_by_index[idx] = port_choice - auto_assigned_indices.add(idx) - console.print(f"[green]Auto-paired controller {idx} to {port_choice}[/green]") - return port_choice - # Open currently connected controllers that we can pair. - for index in controller_indices: - if index >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(index): - name = sdl2.SDL_JoystickNameForIndex(index) - name_str = name.decode() if isinstance(name, bytes) else str(name) - console.print(f"[yellow]Index {index} is not a GameController ({name_str}). Trying raw open failed.[/yellow]") - continue - port = assign_port_for_index(index) - if port is None and not auto_pairing_enabled: - continue - try: - controller, instance_id = open_controller(index) - except Exception as exc: - console.print(f"[red]Failed to open controller {index}: {exc}[/red]") - continue - uart = open_uart_or_warn(port, args.baud, console) if port else None +def service_contexts( + now: float, + args: argparse.Namespace, + config: BridgeConfig, + contexts: Dict[int, ControllerContext], + uarts: List[PicoUART], + console: Console, +) -> None: + for ctx in list(contexts.values()): + current_button_map = ( + config.button_map_swapped + if (args.swap_abxy or ctx.controller_index in config.swap_abxy_indices) + else config.button_map_default + ) + poll_controller_buttons(ctx, current_button_map) + # Reconnect UART if needed. + if ctx.port and ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0: + ctx.last_reopen_attempt = now + uart = open_uart_or_warn(ctx.port, args.baud, console) if uart: uarts.append(uart) - console.print(f"[green]Controller {index} ({instance_id}) paired to {port}[/green]") - elif port: - console.print(f"[yellow]Controller {index} ({instance_id}) waiting for UART {port}[/yellow]") - else: - console.print(f"[yellow]Controller {index} ({instance_id}) connected; waiting for an available UART[/yellow]") - ctx = ControllerContext( - controller=controller, - instance_id=instance_id, - controller_index=index, - port=port, - uart=uart, - ) - contexts[instance_id] = ctx + console.print(f"[green]Reconnected UART {ctx.port} for controller {ctx.controller_index}[/green]") + ctx.uart = uart + if ctx.uart is None: + continue + try: + if now - ctx.last_send >= config.interval: + ctx.uart.send_report(ctx.report) + ctx.last_send = now + last_payload = None + while True: + p = ctx.uart.read_rumble_payload() + if not p: + break + last_payload = p + + if last_payload is not None: + energy = apply_rumble(ctx.controller, last_payload) + ctx.rumble_active = energy >= RUMBLE_MIN_ACTIVE + if ctx.rumble_active and energy != ctx.last_rumble_energy: + ctx.last_rumble_change = now + ctx.last_rumble_energy = energy + ctx.last_rumble = now + elif ctx.rumble_active and (now - ctx.last_rumble) > RUMBLE_IDLE_TIMEOUT: + sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) + ctx.rumble_active = False + ctx.last_rumble_energy = 0.0 + elif ctx.rumble_active and (now - ctx.last_rumble_change) > RUMBLE_STUCK_TIMEOUT: + sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) + ctx.rumble_active = False + ctx.last_rumble_energy = 0.0 + except SerialException as exc: + console.print(f"[yellow]UART {ctx.port} disconnected: {exc}[/yellow]") + try: + ctx.uart.close() + except Exception: + pass + sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) + ctx.uart = None + ctx.rumble_active = False + ctx.last_rumble_energy = 0.0 + ctx.last_reopen_attempt = now + except Exception as exc: + console.print(f"[red]UART error on {ctx.port}: {exc}[/red]") + + +def run_bridge_loop( + args: argparse.Namespace, + console: Console, + config: BridgeConfig, + pairing: PairingState, + contexts: Dict[int, ControllerContext], + uarts: List[PicoUART], +) -> None: + event = sdl2.SDL_Event() + port_scan_interval = 2.0 + last_port_scan = time.monotonic() + running = True + + while running: + while sdl2.SDL_PollEvent(event): + if event.type == sdl2.SDL_QUIT: + running = False + break + if event.type == sdl2.SDL_CONTROLLERAXISMOTION: + handle_axis_motion(event, contexts, config) + elif event.type in (sdl2.SDL_CONTROLLERBUTTONDOWN, sdl2.SDL_CONTROLLERBUTTONUP): + handle_button_event(event, args, config, contexts) + elif event.type == sdl2.SDL_CONTROLLERDEVICEADDED: + handle_device_added(event, args, pairing, contexts, uarts, console) + elif event.type == sdl2.SDL_CONTROLLERDEVICEREMOVED: + handle_device_removed(event, pairing, contexts, console) + + now = time.monotonic() + if now - last_port_scan > port_scan_interval: + discover_new_ports(pairing, contexts, console) + last_port_scan = now + pair_waiting_contexts(args, pairing, contexts, uarts, console) + else: + pair_waiting_contexts(args, pairing, contexts, uarts, console) + service_contexts(now, args, config, contexts, uarts, console) + sdl2.SDL_Delay(1) + + +def cleanup(contexts: Dict[int, ControllerContext], uarts: List[PicoUART]) -> None: + for ctx in contexts.values(): + sdl2.SDL_GameControllerClose(ctx.controller) + for uart in uarts: + uart.close() + sdl2.SDL_Quit() + + +def main() -> None: + parser = build_arg_parser() + args = parser.parse_args() + console = Console() + config = build_bridge_config(console, args) + initialize_sdl(parser) + contexts: Dict[int, ControllerContext] = {} + uarts: List[PicoUART] = [] + try: + controller_indices, controller_names = detect_controllers(console, args, parser) + pairing = prepare_pairing_state(args, console, parser, controller_indices, controller_names) + contexts, uarts = open_initial_contexts(args, pairing, controller_indices, console) if not contexts: console.print("[yellow]No controllers opened; waiting for hotplug events...[/yellow]") - - def ports_in_use() -> set: - used = set(mapping_by_index.values()) - used.update(ctx.port for ctx in contexts.values() if ctx.port) - return used - - def discover_new_ports() -> None: - if not auto_discover_ports: - return - discovered = discover_ports( - include_non_usb=include_non_usb, - ignore_descriptions=ignore_port_desc, - include_descriptions=include_port_desc, - ) - in_use = ports_in_use() - for info in discovered: - path = info["device"] - if path in in_use or path in available_ports: - continue - available_ports.append(path) - console.print(f"[green]Discovered UART {path} ({info['description']}); available for pairing.[/green]") - - def pair_waiting_contexts() -> None: - for ctx in list(contexts.values()): - if ctx.port is not None: - continue - port_choice = assign_port_for_index(ctx.controller_index) - if port_choice is None: - continue - ctx.port = port_choice - uart = open_uart_or_warn(port_choice, args.baud, console) - ctx.last_reopen_attempt = time.monotonic() - if uart: - uarts.append(uart) - ctx.uart = uart - console.print( - f"[green]Controller {ctx.controller_index} ({ctx.instance_id}) paired to {port_choice}[/green]" - ) - else: - ctx.uart = None - console.print( - f"[yellow]Controller {ctx.controller_index} ({ctx.instance_id}) waiting for UART {port_choice}[/yellow]" - ) - - event = sdl2.SDL_Event() - port_scan_interval = 2.0 - last_port_scan = time.monotonic() - running = True - while running: - while sdl2.SDL_PollEvent(event): - if event.type == sdl2.SDL_QUIT: - running = False - break - if event.type == sdl2.SDL_CONTROLLERAXISMOTION: - ctx = contexts.get(event.caxis.which) - if not ctx: - continue - axis = event.caxis.axis - value = event.caxis.value - if axis == sdl2.SDL_CONTROLLER_AXIS_LEFTX: - ctx.report.lx = axis_to_stick(value, deadzone_raw) - elif axis == sdl2.SDL_CONTROLLER_AXIS_LEFTY: - ctx.report.ly = axis_to_stick(value, deadzone_raw) - elif axis == sdl2.SDL_CONTROLLER_AXIS_RIGHTX: - ctx.report.rx = axis_to_stick(value, deadzone_raw) - elif axis == sdl2.SDL_CONTROLLER_AXIS_RIGHTY: - ctx.report.ry = axis_to_stick(value, deadzone_raw) - elif axis == sdl2.SDL_CONTROLLER_AXIS_TRIGGERLEFT: - pressed = trigger_to_button(value, trigger_threshold) - if pressed != ctx.last_trigger_state["left"]: - if pressed: - ctx.report.buttons |= SwitchButton.ZL - else: - ctx.report.buttons &= ~SwitchButton.ZL - ctx.last_trigger_state["left"] = pressed - elif axis == sdl2.SDL_CONTROLLER_AXIS_TRIGGERRIGHT: - pressed = trigger_to_button(value, trigger_threshold) - if pressed != ctx.last_trigger_state["right"]: - if pressed: - ctx.report.buttons |= SwitchButton.ZR - else: - ctx.report.buttons &= ~SwitchButton.ZR - ctx.last_trigger_state["right"] = pressed - elif event.type in (sdl2.SDL_CONTROLLERBUTTONDOWN, sdl2.SDL_CONTROLLERBUTTONUP): - ctx = contexts.get(event.cbutton.which) - if not ctx: - continue - current_button_map = ( - button_map_swapped - if (args.swap_abxy or ctx.controller_index in swap_abxy_indices) - else button_map_default - ) - button = event.cbutton.button - pressed = event.type == sdl2.SDL_CONTROLLERBUTTONDOWN - if button in current_button_map: - bit = current_button_map[button] - if pressed: - ctx.report.buttons |= bit - else: - ctx.report.buttons &= ~bit - ctx.button_state[button] = pressed - elif button in DPAD_BUTTONS: - ctx.dpad[DPAD_BUTTONS[button]] = pressed - ctx.report.hat = dpad_to_hat(ctx.dpad) - elif event.type == sdl2.SDL_CONTROLLERDEVICEADDED: - idx = event.cdevice.which - if any(c.controller_index == idx for c in contexts.values()): - continue - port = assign_port_for_index(idx) - if port is None and not auto_pairing_enabled: - continue - if idx >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(idx): - name = sdl2.SDL_JoystickNameForIndex(idx) - name_str = name.decode() if isinstance(name, bytes) else str(name) - console.print( - f"[yellow]Index {idx} is not a GameController ({name_str}). Trying raw open failed.[/yellow]" - ) - continue - try: - controller, instance_id = open_controller(idx) - except Exception as exc: - console.print(f"[red]Hotplug open failed for controller {idx}: {exc}[/red]") - continue - uart = open_uart_or_warn(port, args.baud, console) if port else None - if uart: - uarts.append(uart) - console.print(f"[green]Controller {idx} ({instance_id}) paired to {port}[/green]") - elif port: - console.print(f"[yellow]Controller {idx} ({instance_id}) waiting for UART {port}[/yellow]") - else: - console.print(f"[yellow]Controller {idx} ({instance_id}) connected; waiting for an available UART[/yellow]") - ctx = ControllerContext( - controller=controller, - instance_id=instance_id, - controller_index=idx, - port=port, - uart=uart, - ) - contexts[instance_id] = ctx - elif event.type == sdl2.SDL_CONTROLLERDEVICEREMOVED: - instance_id = event.cdevice.which - ctx = contexts.pop(instance_id, None) - if ctx: - console.print(f"[yellow]Controller {instance_id} removed[/yellow]") - if ctx.controller_index in auto_assigned_indices: - freed = mapping_by_index.pop(ctx.controller_index, None) - auto_assigned_indices.discard(ctx.controller_index) - if freed and freed not in available_ports: - available_ports.append(freed) - console.print(f"[cyan]Released UART {freed} back to pool[/cyan]") - sdl2.SDL_GameControllerClose(ctx.controller) - - now = time.monotonic() - if now - last_port_scan > port_scan_interval: - discover_new_ports() - last_port_scan = now - pair_waiting_contexts() - else: - pair_waiting_contexts() - for ctx in list(contexts.values()): - current_button_map = ( - button_map_swapped - if (args.swap_abxy or ctx.controller_index in swap_abxy_indices) - else button_map_default - ) - poll_controller_buttons(ctx, current_button_map) - # Reconnect UART if needed. - if ctx.port and ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0: - ctx.last_reopen_attempt = now - uart = open_uart_or_warn(ctx.port, args.baud, console) - if uart: - uarts.append(uart) - console.print(f"[green]Reconnected UART {ctx.port} for controller {ctx.controller_index}[/green]") - ctx.uart = uart - if ctx.uart is None: - continue - try: - # Send controller state at configured frequency - if now - ctx.last_send >= interval: - ctx.uart.send_report(ctx.report) - ctx.last_send = now - - # Drain all pending rumble frames and apply only the latest one. - last_payload = None - while True: - p = ctx.uart.read_rumble_payload() - if not p: - break - last_payload = p - - if last_payload is not None: - energy = apply_rumble(ctx.controller, last_payload) - ctx.rumble_active = energy >= RUMBLE_MIN_ACTIVE - if ctx.rumble_active and energy != ctx.last_rumble_energy: - ctx.last_rumble_change = now - ctx.last_rumble_energy = energy - ctx.last_rumble = now - elif ctx.rumble_active and (now - ctx.last_rumble) > RUMBLE_IDLE_TIMEOUT: - sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) - ctx.rumble_active = False - ctx.last_rumble_energy = 0.0 - elif ctx.rumble_active and (now - ctx.last_rumble_change) > RUMBLE_STUCK_TIMEOUT: - # Guard against a stream of tiny-but-nonzero rumble packets that never decay. - sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) - ctx.rumble_active = False - ctx.last_rumble_energy = 0.0 - except SerialException as exc: - console.print(f"[yellow]UART {ctx.port} disconnected: {exc}[/yellow]") - try: - ctx.uart.close() - except Exception: - pass - sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) - ctx.uart = None - ctx.rumble_active = False - ctx.last_rumble_energy = 0.0 - ctx.last_reopen_attempt = now - except Exception as exc: - console.print(f"[red]UART error on {ctx.port}: {exc}[/red]") - sdl2.SDL_Delay(1) + run_bridge_loop(args, console, config, pairing, contexts, uarts) finally: - for ctx in contexts.values(): - sdl2.SDL_GameControllerClose(ctx.controller) - for uart in uarts: - uart.close() - sdl2.SDL_Quit() + cleanup(contexts, uarts) if __name__ == "__main__":