#!/usr/bin/env python3 """ Bridge multiple SDL2 controllers to switch-pico over UART and mirror rumble back. The framing matches ``switch-pico.cpp``: - Host -> Pico : 0xAA, buttons (LE16), hat, lx, ly, rx, ry - Pico -> Host : 0xBB, 0x01, 8 rumble bytes, checksum (sum of first 10 bytes) Features inspired by ``host/controller_bridge.py``: - Multiple controllers paired to multiple UART ports - Rich-powered interactive pairing UI - Adjustable send frequency, deadzone, and trigger thresholds - Rumble feedback delivered to SDL2 controllers """ from __future__ import annotations import argparse import struct import threading import time from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Tuple import serial from serial import SerialException from serial.tools import list_ports import sdl2 import sdl2.ext from rich.console import Console from rich.prompt import Prompt from rich.table import Table UART_HEADER = 0xAA RUMBLE_HEADER = 0xBB RUMBLE_TYPE_RUMBLE = 0x01 UART_BAUD = 900000 RUMBLE_IDLE_TIMEOUT = 0.25 # seconds without packets before forcing rumble off RUMBLE_STUCK_TIMEOUT = 0.60 # continuous same-energy rumble will be stopped after this RUMBLE_MIN_ACTIVE = 0.50 # below this, rumble is treated as off/noise class SwitchButton: # Mirrors the masks defined in switch_pro_descriptors.h Y = 1 << 0 B = 1 << 1 A = 1 << 2 X = 1 << 3 L = 1 << 4 R = 1 << 5 ZL = 1 << 6 ZR = 1 << 7 MINUS = 1 << 8 PLUS = 1 << 9 LCLICK = 1 << 10 RCLICK = 1 << 11 HOME = 1 << 12 CAPTURE = 1 << 13 class SwitchHat: TOP = 0x00 TOP_RIGHT = 0x01 RIGHT = 0x02 BOTTOM_RIGHT = 0x03 BOTTOM = 0x04 BOTTOM_LEFT = 0x05 LEFT = 0x06 TOP_LEFT = 0x07 CENTER = 0x08 def parse_mapping(value: str) -> Tuple[int, str]: if ":" not in value: raise argparse.ArgumentTypeError("Mapping must look like 'index:serial_port'") idx_str, port = value.split(":", 1) try: idx = int(idx_str, 10) except ValueError as exc: raise argparse.ArgumentTypeError(f"Invalid controller index '{idx_str}'") from exc if not port: raise argparse.ArgumentTypeError("Serial port cannot be empty") return idx, port.strip() def axis_to_stick(value: int, deadzone: int) -> int: if abs(value) < deadzone: value = 0 scaled = int((value + 32768) * 255 / 65535) return max(0, min(255, scaled)) def trigger_to_button(value: int, threshold: int) -> bool: return value >= threshold BUTTON_MAP = { # Direct mapping to the Switch PRO mask definitions in switch_pro_descriptors.h sdl2.SDL_CONTROLLER_BUTTON_A: SwitchButton.A, sdl2.SDL_CONTROLLER_BUTTON_B: SwitchButton.B, sdl2.SDL_CONTROLLER_BUTTON_X: SwitchButton.X, sdl2.SDL_CONTROLLER_BUTTON_Y: SwitchButton.Y, sdl2.SDL_CONTROLLER_BUTTON_LEFTSHOULDER: SwitchButton.L, sdl2.SDL_CONTROLLER_BUTTON_RIGHTSHOULDER: SwitchButton.R, sdl2.SDL_CONTROLLER_BUTTON_BACK: SwitchButton.MINUS, sdl2.SDL_CONTROLLER_BUTTON_START: SwitchButton.PLUS, sdl2.SDL_CONTROLLER_BUTTON_GUIDE: SwitchButton.HOME, sdl2.SDL_CONTROLLER_BUTTON_MISC1: SwitchButton.CAPTURE, sdl2.SDL_CONTROLLER_BUTTON_LEFTSTICK: SwitchButton.LCLICK, sdl2.SDL_CONTROLLER_BUTTON_RIGHTSTICK: SwitchButton.RCLICK, } DPAD_BUTTONS = { sdl2.SDL_CONTROLLER_BUTTON_DPAD_UP: "up", sdl2.SDL_CONTROLLER_BUTTON_DPAD_DOWN: "down", sdl2.SDL_CONTROLLER_BUTTON_DPAD_LEFT: "left", sdl2.SDL_CONTROLLER_BUTTON_DPAD_RIGHT: "right", } def dpad_to_hat(flags: Dict[str, bool]) -> int: up = flags["up"] down = flags["down"] left = flags["left"] right = flags["right"] if up and right: return SwitchHat.TOP_RIGHT if up and left: return SwitchHat.TOP_LEFT if down and right: return SwitchHat.BOTTOM_RIGHT if down and left: return SwitchHat.BOTTOM_LEFT if up: return SwitchHat.TOP if down: return SwitchHat.BOTTOM if right: return SwitchHat.RIGHT if left: return SwitchHat.LEFT return SwitchHat.CENTER def is_usb_serial(path: str) -> bool: if path.startswith("/dev/tty.") and not path.startswith("/dev/tty.usb"): return False if path.startswith("/dev/cu.") and not path.startswith("/dev/cu.usb"): return False return True def is_usb_serial_port(port: list_ports.ListPortInfo) -> bool: """Heuristic: prefer ports with USB VID/PID; fall back to path hints.""" if getattr(port, "vid", None) is not None or getattr(port, "pid", None) is not None: return True path = port.device or "" manufacturer = (getattr(port, "manufacturer", "") or "").upper() if "USB" in manufacturer: return True return is_usb_serial(path) def discover_ports( include_non_usb: bool = False, ignore_descriptions: Optional[List[str]] = None, include_descriptions: Optional[List[str]] = None, ) -> List[Dict[str, str]]: ignored = [d.lower() for d in ignore_descriptions or []] includes = [d.lower() for d in include_descriptions or []] results: List[Dict[str, str]] = [] for port in list_ports.comports(): path = port.device or "" if not path: continue if not include_non_usb and not is_usb_serial_port(port): continue desc_lower = (port.description or "").lower() if includes and not any(keep in desc_lower for keep in includes): continue if any(skip in desc_lower for skip in ignored): continue results.append( { "device": path, "description": port.description or "Unknown", } ) return results def interactive_pairing( console: Console, controller_info: Dict[int, str], ports: List[Dict[str, str]] ) -> List[Tuple[int, str]]: available = ports.copy() mappings: List[Tuple[int, str]] = [] for controller_idx in controller_info: if not available: console.print("[bold red]No more UART devices available for pairing.[/bold red]") break table = Table( title=f"Available UART Devices for Controller {controller_idx} ({controller_info[controller_idx]})" ) table.add_column("Choice", justify="center") table.add_column("Port") table.add_column("Description") for i, port in enumerate(available): table.add_row(str(i), port["device"], port["description"]) console.print(table) choices = [str(i) for i in range(len(available))] + ["q"] selection = Prompt.ask( "Select UART index (or 'q' to stop pairing)", choices=choices, default=choices[0] if choices else "q", ) if selection == "q": break idx = int(selection) port = available.pop(idx) mappings.append((controller_idx, port["device"])) console.print(f"[bold green]Paired controller {controller_idx} with {port['device']}[/bold green]") return mappings @dataclass class SwitchReport: buttons: int = 0 hat: int = SwitchHat.CENTER lx: int = 128 ly: int = 128 rx: int = 128 ry: int = 128 def to_bytes(self) -> bytes: return struct.pack( " None: self.serial = serial.Serial( port=port, baudrate=baudrate, bytesize=serial.EIGHTBITS, stopbits=serial.STOPBITS_ONE, parity=serial.PARITY_NONE, timeout=0.0, write_timeout=0.0, ) self._buffer = bytearray() def send_report(self, report: SwitchReport) -> None: self.serial.write(report.to_bytes()) self.serial.flush() def read_rumble_payload(self) -> Optional[bytes]: chunk = self.serial.read(64) # non-blocking (timeout=0) if chunk: self._buffer.extend(chunk) while True: if RUMBLE_HEADER not in self._buffer: self._buffer.clear() return None start = self._buffer.find(bytes([RUMBLE_HEADER])) if len(self._buffer) - start < 11: # Need more bytes. if start > 0: del self._buffer[:start] return None frame = self._buffer[start : start + 11] checksum = sum(frame[:10]) & 0xFF if frame[1] == RUMBLE_TYPE_RUMBLE and checksum == frame[10]: payload = bytes(frame[2:10]) del self._buffer[: start + 11] return payload # Bad frame, drop the header and continue. del self._buffer[: start + 1] def close(self) -> None: self.serial.close() def decode_rumble(payload: bytes) -> Tuple[float, float]: """Return normalized rumble amplitudes (0.0-1.0) for left/right.""" if len(payload) < 8: return 0.0, 0.0 # Neutral/idle pattern used by Switch: no rumble energy. if payload == b"\x00\x01\x40\x40\x00\x01\x40\x40": return 0.0, 0.0 # Rumble amp is 10 bits across bytes 0/1 and 4/5. # Switch format is right rumble first, then left rumble (4 bytes each). right_raw = ((payload[1] & 0x03) << 8) | payload[0] left_raw = ((payload[5] & 0x03) << 8) | payload[4] if left_raw < 8 and right_raw < 8: return 0.0, 0.0 left = min(max(left_raw / 1023.0, 0.0), 1.0) right = min(max(right_raw / 1023.0, 0.0), 1.0) return left, right def apply_rumble(controller: sdl2.SDL_GameController, payload: bytes) -> float: left_norm, right_norm = decode_rumble(payload) max_norm = max(left_norm, right_norm) # Treat small rumble as "off" to avoid idle buzz. if max_norm < RUMBLE_MIN_ACTIVE: sdl2.SDL_GameControllerRumble(controller, 0, 0, 0) return 0.0 # Attenuate to feel closer to a real controller; cap at ~25% strength. scale = 0.60 low = int(min(1.0, left_norm * scale) * 0xFFFF) # SDL: low_frequency_rumble high = int(min(1.0, right_norm * scale) * 0xFFFF) # SDL: high_frequency_rumble duration = 10 sdl2.SDL_GameControllerRumble(controller, low, high, duration) return max_norm @dataclass class ControllerContext: controller: sdl2.SDL_GameController instance_id: int controller_index: int port: Optional[str] uart: Optional[PicoUART] report: SwitchReport = field(default_factory=SwitchReport) dpad: Dict[str, bool] = field(default_factory=lambda: {"up": False, "down": False, "left": False, "right": False}) last_trigger_state: Dict[str, bool] = field(default_factory=lambda: {"left": False, "right": False}) last_send: float = 0.0 last_reopen_attempt: float = 0.0 last_rumble: float = 0.0 last_rumble_change: float = 0.0 last_rumble_energy: float = 0.0 rumble_active: bool = False def open_controller(index: int) -> Tuple[sdl2.SDL_GameController, int]: controller = sdl2.SDL_GameControllerOpen(index) if not controller: raise RuntimeError(f"Failed to open controller {index}: {sdl2.SDL_GetError().decode()}") joystick = sdl2.SDL_GameControllerGetJoystick(controller) instance_id = sdl2.SDL_JoystickInstanceID(joystick) return controller, instance_id def try_open_uart(port: str, baud: int) -> Optional[PicoUART]: try: return PicoUART(port, baud) except Exception: return None def open_uart_or_warn(port: str, baud: int, console: Console) -> Optional[PicoUART]: try: return PicoUART(port, baud) except Exception as exc: console.print(f"[yellow]Failed to open UART {port}: {exc}[/yellow]") return None def start_rumble_listener(ctx: ControllerContext) -> threading.Thread: # No-op placeholder (rumble is polled in the main loop for hotplug safety). return None def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Bridge SDL2 controllers to switch-pico UART (with rumble)") parser.add_argument( "--map", action="append", type=parse_mapping, default=[], help="Controller mapping 'index:serial_port'. Repeat per controller.", ) parser.add_argument( "--ports", nargs="+", help="Serial ports to auto-pair with controllers in ascending index order.", ) parser.add_argument("--interactive", action="store_true", help="Launch an interactive pairing UI using Rich.") parser.add_argument("--all-ports", action="store_true", help="Include non-USB serial ports when listing devices.") parser.add_argument( "--frequency", type=float, default=500.0, help="Report send frequency per controller (Hz, default 500)", ) parser.add_argument("--deadzone", type=float, default=0.08, help="Stick deadzone (0.0-1.0, default 0.08)") parser.add_argument( "--trigger-threshold", type=float, default=0.35, help="Trigger threshold treated as a digital press (0.0-1.0, default 0.35)", ) parser.add_argument( "--baud", type=int, default=UART_BAUD, help=f"UART baud rate (default {UART_BAUD}; must match switch-pico firmware)", ) parser.add_argument( "--ignore-port-desc", action="append", default=[], help="Substring filter to exclude serial ports by description (case-insensitive). Repeatable.", ) parser.add_argument( "--include-port-desc", action="append", default=[], help="Only include serial ports whose description contains this substring (case-insensitive). Repeatable.", ) parser.add_argument( "--sdl-mapping", action="append", default=[], help="Path to an SDL2 controller mapping database (e.g. controllerdb.txt). Repeatable.", ) return parser def main() -> None: parser = build_arg_parser() args = parser.parse_args() interval = 1.0 / max(args.frequency, 1.0) deadzone_raw = int(max(0.0, min(args.deadzone, 1.0)) * 32767) trigger_threshold = int(max(0.0, min(args.trigger_threshold, 1.0)) * 32767) # Load bundled mapping plus any user-supplied mapping files. default_mapping = Path(__file__).parent / "controller_db" / "gamecontrollerdb.txt" mappings_to_load = [] if default_mapping.exists(): mappings_to_load.append(str(default_mapping)) mappings_to_load.extend(args.sdl_mapping) for mapping_path in mappings_to_load: try: loaded = sdl2.SDL_GameControllerAddMappingsFromFile(mapping_path.encode()) console = Console() console.print(f"[green]Loaded {loaded} SDL mapping(s) from {mapping_path}[/green]") except Exception as exc: console = Console() console.print(f"[red]Failed to load SDL mapping {mapping_path}: {exc}[/red]") sdl2.SDL_SetHint(sdl2.SDL_HINT_JOYSTICK_ALLOW_BACKGROUND_EVENTS, b"1") if sdl2.SDL_Init(sdl2.SDL_INIT_GAMECONTROLLER | sdl2.SDL_INIT_JOYSTICK) != 0: parser.error(f"SDL init failed: {sdl2.SDL_GetError().decode(errors='ignore')}") contexts: Dict[int, ControllerContext] = {} uarts: List[PicoUART] = [] mapping_by_index: Dict[int, str] = {} console = Console() try: controller_indices: List[int] = [] controller_names: Dict[int, str] = {} controller_count = sdl2.SDL_NumJoysticks() if controller_count < 0: parser.error(f"SDL error: {sdl2.SDL_GetError().decode()}") auto_pairing_enabled = not args.map and not args.interactive auto_discover_ports = auto_pairing_enabled and not args.ports available_ports: List[str] = [] auto_assigned_indices: set[int] = set() include_non_usb = args.all_ports or False ignore_port_desc = [d.lower() for d in args.ignore_port_desc] include_port_desc = [d.lower() for d in args.include_port_desc] for index in range(controller_count): if sdl2.SDL_IsGameController(index): name = sdl2.SDL_GameControllerNameForIndex(index) name_str = name.decode() if isinstance(name, bytes) else str(name) console.print(f"[cyan]Detected controller {index}: {name_str}[/cyan]") controller_indices.append(index) controller_names[index] = name_str else: name = sdl2.SDL_JoystickNameForIndex(index) name_str = name.decode() if isinstance(name, bytes) else str(name) console.print(f"[yellow]Found joystick {index} (not a GameController): {name_str}[/yellow]") mappings = list(args.map) if args.interactive: if not controller_indices: parser.error("No controllers detected for interactive pairing.") discovered = discover_ports( include_non_usb=include_non_usb, ignore_descriptions=ignore_port_desc, include_descriptions=include_port_desc, ) if not discovered: parser.error("No UART devices found for interactive pairing.") mappings = interactive_pairing(console, controller_names, discovered) if not mappings: parser.error("No controller-to-UART mappings were selected.") elif auto_pairing_enabled: if args.ports: available_ports.extend(list(args.ports)) console.print(f"[green]Prepared {len(available_ports)} specified UART port(s) for auto-pairing.[/green]") else: discovered = discover_ports( include_non_usb=include_non_usb, ignore_descriptions=ignore_port_desc, include_descriptions=include_port_desc, ) if discovered: available_ports.extend(info["device"] for info in discovered) console.print("[green]Auto-detected UARTs:[/green]") for info in discovered: console.print(f" {info['device']} ({info['description']})") else: console.print("[yellow]No UART devices detected yet; waiting for hotplug...[/yellow]") for index, port in mappings: mapping_by_index[index] = port def assign_port_for_index(idx: int) -> Optional[str]: if idx in mapping_by_index: return mapping_by_index[idx] if not auto_pairing_enabled: return None if not available_ports: return None port_choice = available_ports.pop(0) mapping_by_index[idx] = port_choice auto_assigned_indices.add(idx) console.print(f"[green]Auto-paired controller {idx} to {port_choice}[/green]") return port_choice # Open currently connected controllers that we can pair. for index in controller_indices: if index >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(index): name = sdl2.SDL_JoystickNameForIndex(index) name_str = name.decode() if isinstance(name, bytes) else str(name) console.print(f"[yellow]Index {index} is not a GameController ({name_str}). Trying raw open failed.[/yellow]") continue port = assign_port_for_index(index) if port is None and not auto_pairing_enabled: continue try: controller, instance_id = open_controller(index) except Exception as exc: console.print(f"[red]Failed to open controller {index}: {exc}[/red]") continue uart = open_uart_or_warn(port, args.baud, console) if port else None if uart: uarts.append(uart) console.print(f"[green]Controller {index} ({instance_id}) paired to {port}[/green]") elif port: console.print(f"[yellow]Controller {index} ({instance_id}) waiting for UART {port}[/yellow]") else: console.print(f"[yellow]Controller {index} ({instance_id}) connected; waiting for an available UART[/yellow]") ctx = ControllerContext( controller=controller, instance_id=instance_id, controller_index=index, port=port, uart=uart, ) contexts[instance_id] = ctx if not contexts: console.print("[yellow]No controllers opened; waiting for hotplug events...[/yellow]") def ports_in_use() -> set: used = set(mapping_by_index.values()) used.update(ctx.port for ctx in contexts.values() if ctx.port) return used def discover_new_ports() -> None: if not auto_discover_ports: return discovered = discover_ports( include_non_usb=include_non_usb, ignore_descriptions=ignore_port_desc, include_descriptions=include_port_desc, ) in_use = ports_in_use() for info in discovered: path = info["device"] if path in in_use or path in available_ports: continue available_ports.append(path) console.print(f"[green]Discovered UART {path} ({info['description']}); available for pairing.[/green]") def pair_waiting_contexts() -> None: for ctx in list(contexts.values()): if ctx.port is not None: continue port_choice = assign_port_for_index(ctx.controller_index) if port_choice is None: continue ctx.port = port_choice uart = open_uart_or_warn(port_choice, args.baud, console) ctx.last_reopen_attempt = time.monotonic() if uart: uarts.append(uart) ctx.uart = uart console.print( f"[green]Controller {ctx.controller_index} ({ctx.instance_id}) paired to {port_choice}[/green]" ) else: ctx.uart = None console.print( f"[yellow]Controller {ctx.controller_index} ({ctx.instance_id}) waiting for UART {port_choice}[/yellow]" ) event = sdl2.SDL_Event() port_scan_interval = 2.0 last_port_scan = time.monotonic() running = True while running: while sdl2.SDL_PollEvent(event): if event.type == sdl2.SDL_QUIT: running = False break if event.type == sdl2.SDL_CONTROLLERAXISMOTION: ctx = contexts.get(event.caxis.which) if not ctx: continue axis = event.caxis.axis value = event.caxis.value if axis == sdl2.SDL_CONTROLLER_AXIS_LEFTX: ctx.report.lx = axis_to_stick(value, deadzone_raw) elif axis == sdl2.SDL_CONTROLLER_AXIS_LEFTY: ctx.report.ly = axis_to_stick(value, deadzone_raw) elif axis == sdl2.SDL_CONTROLLER_AXIS_RIGHTX: ctx.report.rx = axis_to_stick(value, deadzone_raw) elif axis == sdl2.SDL_CONTROLLER_AXIS_RIGHTY: ctx.report.ry = axis_to_stick(value, deadzone_raw) elif axis == sdl2.SDL_CONTROLLER_AXIS_TRIGGERLEFT: pressed = trigger_to_button(value, trigger_threshold) if pressed != ctx.last_trigger_state["left"]: if pressed: ctx.report.buttons |= SwitchButton.ZL else: ctx.report.buttons &= ~SwitchButton.ZL ctx.last_trigger_state["left"] = pressed elif axis == sdl2.SDL_CONTROLLER_AXIS_TRIGGERRIGHT: pressed = trigger_to_button(value, trigger_threshold) if pressed != ctx.last_trigger_state["right"]: if pressed: ctx.report.buttons |= SwitchButton.ZR else: ctx.report.buttons &= ~SwitchButton.ZR ctx.last_trigger_state["right"] = pressed elif event.type in (sdl2.SDL_CONTROLLERBUTTONDOWN, sdl2.SDL_CONTROLLERBUTTONUP): ctx = contexts.get(event.cbutton.which) if not ctx: continue button = event.cbutton.button pressed = event.type == sdl2.SDL_CONTROLLERBUTTONDOWN if button in BUTTON_MAP: bit = BUTTON_MAP[button] if pressed: ctx.report.buttons |= bit else: ctx.report.buttons &= ~bit elif button in DPAD_BUTTONS: ctx.dpad[DPAD_BUTTONS[button]] = pressed ctx.report.hat = dpad_to_hat(ctx.dpad) elif event.type == sdl2.SDL_CONTROLLERDEVICEADDED: idx = event.cdevice.which if any(c.controller_index == idx for c in contexts.values()): continue port = assign_port_for_index(idx) if port is None and not auto_pairing_enabled: continue if idx >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(idx): name = sdl2.SDL_JoystickNameForIndex(idx) name_str = name.decode() if isinstance(name, bytes) else str(name) console.print( f"[yellow]Index {idx} is not a GameController ({name_str}). Trying raw open failed.[/yellow]" ) continue try: controller, instance_id = open_controller(idx) except Exception as exc: console.print(f"[red]Hotplug open failed for controller {idx}: {exc}[/red]") continue uart = open_uart_or_warn(port, args.baud, console) if port else None if uart: uarts.append(uart) console.print(f"[green]Controller {idx} ({instance_id}) paired to {port}[/green]") elif port: console.print(f"[yellow]Controller {idx} ({instance_id}) waiting for UART {port}[/yellow]") else: console.print(f"[yellow]Controller {idx} ({instance_id}) connected; waiting for an available UART[/yellow]") ctx = ControllerContext( controller=controller, instance_id=instance_id, controller_index=idx, port=port, uart=uart, ) contexts[instance_id] = ctx elif event.type == sdl2.SDL_CONTROLLERDEVICEREMOVED: instance_id = event.cdevice.which # ctx = contexts.get(instance_id) # if ctx and sdl2.SDL_GameControllerGetAttached(ctx.controller): # # Spurious detach; ignore. # console.print(f"[yellow]Ignoring spurious remove for controller {instance_id}[/yellow]") # continue ctx = contexts.pop(instance_id, None) if ctx: console.print(f"[yellow]Controller {instance_id} removed[/yellow]") if ctx.controller_index in auto_assigned_indices: freed = mapping_by_index.pop(ctx.controller_index, None) auto_assigned_indices.discard(ctx.controller_index) if freed and freed not in available_ports: available_ports.append(freed) console.print(f"[cyan]Released UART {freed} back to pool[/cyan]") sdl2.SDL_GameControllerClose(ctx.controller) now = time.monotonic() if now - last_port_scan > port_scan_interval: discover_new_ports() last_port_scan = now pair_waiting_contexts() else: pair_waiting_contexts() for ctx in list(contexts.values()): # Reconnect UART if needed. if ctx.port and ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0: ctx.last_reopen_attempt = now uart = open_uart_or_warn(ctx.port, args.baud, console) if uart: uarts.append(uart) console.print(f"[green]Reconnected UART {ctx.port} for controller {ctx.controller_index}[/green]") ctx.uart = uart if ctx.uart is None: continue try: if now - ctx.last_send >= interval: ctx.uart.send_report(ctx.report) ctx.last_send = now # Poll rumble quickly while we have the port. payload = ctx.uart.read_rumble_payload() if payload: energy = apply_rumble(ctx.controller, payload) ctx.rumble_active = energy >= RUMBLE_MIN_ACTIVE if ctx.rumble_active and energy != ctx.last_rumble_energy: ctx.last_rumble_change = now ctx.last_rumble_energy = energy ctx.last_rumble = now elif ctx.rumble_active and (now - ctx.last_rumble) > RUMBLE_IDLE_TIMEOUT: sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) ctx.rumble_active = False ctx.last_rumble_energy = 0.0 elif ctx.rumble_active and (now - ctx.last_rumble_change) > RUMBLE_STUCK_TIMEOUT: # Guard against a stream of tiny-but-nonzero rumble packets that never decay. sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) ctx.rumble_active = False ctx.last_rumble_energy = 0.0 except SerialException as exc: console.print(f"[yellow]UART {ctx.port} disconnected: {exc}[/yellow]") try: ctx.uart.close() except Exception: pass sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) ctx.uart = None ctx.rumble_active = False ctx.last_rumble_energy = 0.0 ctx.last_reopen_attempt = now except Exception as exc: console.print(f"[red]UART error on {ctx.port}: {exc}[/red]") sdl2.SDL_Delay(1) finally: for ctx in contexts.values(): sdl2.SDL_GameControllerClose(ctx.controller) for uart in uarts: uart.close() sdl2.SDL_Quit() if __name__ == "__main__": main()