diff --git a/controller_uart_bridge.py b/controller_uart_bridge.py index 1716b49..84152e7 100644 --- a/controller_uart_bridge.py +++ b/controller_uart_bridge.py @@ -37,6 +37,9 @@ UART_HEADER = 0xAA RUMBLE_HEADER = 0xBB RUMBLE_TYPE_RUMBLE = 0x01 UART_BAUD = 900000 +RUMBLE_IDLE_TIMEOUT = 0.25 # seconds without packets before forcing rumble off +RUMBLE_STUCK_TIMEOUT = 0.60 # continuous same-energy rumble will be stopped after this +RUMBLE_MIN_ACTIVE = 0.50 # below this, rumble is treated as off/noise class SwitchButton: @@ -269,8 +272,9 @@ def decode_rumble(payload: bytes) -> Tuple[float, float]: if payload == b"\x00\x01\x40\x40\x00\x01\x40\x40": return 0.0, 0.0 # Rumble amp is 10 bits across bytes 0/1 and 4/5. - left_raw = ((payload[1] & 0x03) << 8) | payload[0] - right_raw = ((payload[5] & 0x03) << 8) | payload[4] + # Switch format is right rumble first, then left rumble (4 bytes each). + right_raw = ((payload[1] & 0x03) << 8) | payload[0] + left_raw = ((payload[5] & 0x03) << 8) | payload[4] if left_raw < 8 and right_raw < 8: return 0.0, 0.0 left = min(max(left_raw / 1023.0, 0.0), 1.0) @@ -278,19 +282,20 @@ def decode_rumble(payload: bytes) -> Tuple[float, float]: return left, right -def apply_rumble(controller: sdl2.SDL_GameController, payload: bytes) -> None: +def apply_rumble(controller: sdl2.SDL_GameController, payload: bytes) -> float: left_norm, right_norm = decode_rumble(payload) max_norm = max(left_norm, right_norm) # Treat small rumble as "off" to avoid idle buzz. - if max_norm < 0.40: + if max_norm < RUMBLE_MIN_ACTIVE: sdl2.SDL_GameControllerRumble(controller, 0, 0, 0) - return + return 0.0 # Attenuate to feel closer to a real controller; cap at ~25% strength. - scale = 0.40 - left = int(min(1.0, left_norm * scale) * 0xFFFF) - right = int(min(1.0, right_norm * scale) * 0xFFFF) + scale = 0.60 + low = int(min(1.0, left_norm * scale) * 0xFFFF) # SDL: low_frequency_rumble + high = int(min(1.0, right_norm * scale) * 0xFFFF) # SDL: high_frequency_rumble duration = 10 - sdl2.SDL_GameControllerRumble(controller, left, right, duration) + sdl2.SDL_GameControllerRumble(controller, low, high, duration) + return max_norm @dataclass @@ -298,13 +303,17 @@ class ControllerContext: controller: sdl2.SDL_GameController instance_id: int controller_index: int - port: str + port: Optional[str] uart: Optional[PicoUART] report: SwitchReport = field(default_factory=SwitchReport) dpad: Dict[str, bool] = field(default_factory=lambda: {"up": False, "down": False, "left": False, "right": False}) last_trigger_state: Dict[str, bool] = field(default_factory=lambda: {"left": False, "right": False}) last_send: float = 0.0 last_reopen_attempt: float = 0.0 + last_rumble: float = 0.0 + last_rumble_change: float = 0.0 + last_rumble_energy: float = 0.0 + rumble_active: bool = False def open_controller(index: int) -> Tuple[sdl2.SDL_GameController, int]: @@ -407,6 +416,11 @@ def main() -> None: controller_count = sdl2.SDL_NumJoysticks() if controller_count < 0: parser.error(f"SDL error: {sdl2.SDL_GetError().decode()}") + auto_pairing_enabled = not args.map and not args.interactive + auto_discover_ports = auto_pairing_enabled and not args.ports + available_ports: List[str] = [] + auto_assigned_indices: set[int] = set() + include_non_usb = args.all_ports or False for index in range(controller_count): if sdl2.SDL_IsGameController(index): name = sdl2.SDL_GameControllerNameForIndex(index) @@ -423,70 +437,65 @@ def main() -> None: if args.interactive: if not controller_indices: parser.error("No controllers detected for interactive pairing.") - discovered = discover_ports(include_non_usb=args.all_ports or False) + discovered = discover_ports(include_non_usb=include_non_usb) if not discovered: parser.error("No UART devices found for interactive pairing.") mappings = interactive_pairing(console, controller_names, discovered) if not mappings: parser.error("No controller-to-UART mappings were selected.") - elif not mappings: + elif auto_pairing_enabled: if args.ports: - validated = list(args.ports) - if not controller_indices: - parser.error("No controllers available to pair.") - pair_count = min(len(controller_indices), len(validated)) - if pair_count == 0: - parser.error("No UART ports specified.") - if len(validated) < len(controller_indices): - console.print( - f"[yellow]Warning: only {len(validated)} UART(s) provided; pairing first {pair_count} controllers.[/yellow]" - ) - mappings = list((controller_indices[i], validated[i]) for i in range(pair_count)) - console.print("[green]Paired controllers to specified UART ports:[/green]") - for idx, port in mappings: - console.print(f" Controller {idx} -> {port}") + available_ports.extend(list(args.ports)) + console.print(f"[green]Prepared {len(available_ports)} specified UART port(s) for auto-pairing.[/green]") else: - discovered = discover_ports(include_non_usb=args.all_ports or False) - if not discovered: - parser.error("No UART devices detected automatically.") - pair_count = min(len(controller_indices), len(discovered)) - if pair_count == 0: - parser.error("No controllers detected to pair.") - if len(discovered) < len(controller_indices): - console.print( - f"[yellow]Warning: detected {len(discovered)} UART(s) but {len(controller_indices)} controller(s); pairing first {pair_count}.[/yellow]" - ) - mappings = list((controller_indices[i], discovered[i]["device"]) for i in range(pair_count)) - console.print("[green]Auto-detected UARTs:[/green]") - for info in discovered: - console.print(f" {info['device']} ({info['description']})") - console.print("[green]Paired controllers to detected UARTs:[/green]") - for idx, port in mappings: - console.print(f" Controller {idx} -> {port}") + discovered = discover_ports(include_non_usb=include_non_usb) + if discovered: + available_ports.extend(info["device"] for info in discovered) + console.print("[green]Auto-detected UARTs:[/green]") + for info in discovered: + console.print(f" {info['device']} ({info['description']})") + else: + console.print("[yellow]No UART devices detected yet; waiting for hotplug...[/yellow]") for index, port in mappings: mapping_by_index[index] = port - # Open currently connected controllers that match the mapping. - for index, port in mappings: + def assign_port_for_index(idx: int) -> Optional[str]: + if idx in mapping_by_index: + return mapping_by_index[idx] + if not auto_pairing_enabled: + return None + if not available_ports: + return None + port_choice = available_ports.pop(0) + mapping_by_index[idx] = port_choice + auto_assigned_indices.add(idx) + console.print(f"[green]Auto-paired controller {idx} to {port_choice}[/green]") + return port_choice + + # Open currently connected controllers that we can pair. + for index in controller_indices: if index >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(index): - # Try to turn non-GameController into a controller via hint reload. - if index < sdl2.SDL_NumJoysticks(): - name = sdl2.SDL_JoystickNameForIndex(index) - name_str = name.decode() if isinstance(name, bytes) else str(name) - console.print(f"[yellow]Index {index} is not a GameController ({name_str}). Trying raw open failed.[/yellow]") + name = sdl2.SDL_JoystickNameForIndex(index) + name_str = name.decode() if isinstance(name, bytes) else str(name) + console.print(f"[yellow]Index {index} is not a GameController ({name_str}). Trying raw open failed.[/yellow]") + continue + port = assign_port_for_index(index) + if port is None and not auto_pairing_enabled: continue try: controller, instance_id = open_controller(index) except Exception as exc: console.print(f"[red]Failed to open controller {index}: {exc}[/red]") continue - uart = try_open_uart(port, args.baud) + uart = try_open_uart(port, args.baud) if port else None if uart: uarts.append(uart) console.print(f"[green]Controller {index} ({instance_id}) paired to {port}[/green]") - else: + elif port: console.print(f"[yellow]Controller {index} ({instance_id}) waiting for UART {port}[/yellow]") + else: + console.print(f"[yellow]Controller {index} ({instance_id}) connected; waiting for an available UART[/yellow]") ctx = ControllerContext( controller=controller, instance_id=instance_id, @@ -496,10 +505,51 @@ def main() -> None: ) contexts[instance_id] = ctx - if not contexts and not mapping_by_index: - parser.error("No controllers opened. Check --map/--ports/--interactive values.") + if not contexts: + console.print("[yellow]No controllers opened; waiting for hotplug events...[/yellow]") + + def ports_in_use() -> set: + used = set(mapping_by_index.values()) + used.update(ctx.port for ctx in contexts.values() if ctx.port) + return used + + def discover_new_ports() -> None: + if not auto_discover_ports: + return + discovered = discover_ports(include_non_usb=include_non_usb) + in_use = ports_in_use() + for info in discovered: + path = info["device"] + if path in in_use or path in available_ports: + continue + available_ports.append(path) + console.print(f"[green]Discovered UART {path} ({info['description']}); available for pairing.[/green]") + + def pair_waiting_contexts() -> None: + for ctx in list(contexts.values()): + if ctx.port is not None: + continue + port_choice = assign_port_for_index(ctx.controller_index) + if port_choice is None: + continue + ctx.port = port_choice + uart = try_open_uart(port_choice, args.baud) + ctx.last_reopen_attempt = time.monotonic() + if uart: + uarts.append(uart) + ctx.uart = uart + console.print( + f"[green]Controller {ctx.controller_index} ({ctx.instance_id}) paired to {port_choice}[/green]" + ) + else: + ctx.uart = None + console.print( + f"[yellow]Controller {ctx.controller_index} ({ctx.instance_id}) waiting for UART {port_choice}[/yellow]" + ) event = sdl2.SDL_Event() + port_scan_interval = 2.0 + last_port_scan = time.monotonic() running = True while running: while sdl2.SDL_PollEvent(event): @@ -553,24 +603,31 @@ def main() -> None: ctx.report.hat = dpad_to_hat(ctx.dpad) elif event.type == sdl2.SDL_CONTROLLERDEVICEADDED: idx = event.cdevice.which - port = mapping_by_index.get(idx) - if port is None: + if any(c.controller_index == idx for c in contexts.values()): continue - # Avoid duplicate opens for already connected instance IDs. - already = any(c.controller_index == idx for c in contexts.values()) - if already: + port = assign_port_for_index(idx) + if port is None and not auto_pairing_enabled: + continue + if idx >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(idx): + name = sdl2.SDL_JoystickNameForIndex(idx) + name_str = name.decode() if isinstance(name, bytes) else str(name) + console.print( + f"[yellow]Index {idx} is not a GameController ({name_str}). Trying raw open failed.[/yellow]" + ) continue try: controller, instance_id = open_controller(idx) except Exception as exc: console.print(f"[red]Hotplug open failed for controller {idx}: {exc}[/red]") continue - uart = try_open_uart(port, args.baud) + uart = try_open_uart(port, args.baud) if port else None if uart: uarts.append(uart) console.print(f"[green]Controller {idx} ({instance_id}) paired to {port}[/green]") - else: + elif port: console.print(f"[yellow]Controller {idx} ({instance_id}) waiting for UART {port}[/yellow]") + else: + console.print(f"[yellow]Controller {idx} ({instance_id}) connected; waiting for an available UART[/yellow]") ctx = ControllerContext( controller=controller, instance_id=instance_id, @@ -589,12 +646,24 @@ def main() -> None: ctx = contexts.pop(instance_id, None) if ctx: console.print(f"[yellow]Controller {instance_id} removed[/yellow]") + if ctx.controller_index in auto_assigned_indices: + freed = mapping_by_index.pop(ctx.controller_index, None) + auto_assigned_indices.discard(ctx.controller_index) + if freed and freed not in available_ports: + available_ports.append(freed) + console.print(f"[cyan]Released UART {freed} back to pool[/cyan]") sdl2.SDL_GameControllerClose(ctx.controller) now = time.monotonic() + if now - last_port_scan > port_scan_interval: + discover_new_ports() + last_port_scan = now + pair_waiting_contexts() + else: + pair_waiting_contexts() for ctx in list(contexts.values()): # Reconnect UART if needed. - if ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0: + if ctx.port and ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0: ctx.last_reopen_attempt = now uart = try_open_uart(ctx.port, args.baud) if uart: @@ -610,14 +679,31 @@ def main() -> None: # Poll rumble quickly while we have the port. payload = ctx.uart.read_rumble_payload() if payload: - apply_rumble(ctx.controller, payload) + energy = apply_rumble(ctx.controller, payload) + ctx.rumble_active = energy >= RUMBLE_MIN_ACTIVE + if ctx.rumble_active and energy != ctx.last_rumble_energy: + ctx.last_rumble_change = now + ctx.last_rumble_energy = energy + ctx.last_rumble = now + elif ctx.rumble_active and (now - ctx.last_rumble) > RUMBLE_IDLE_TIMEOUT: + sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) + ctx.rumble_active = False + ctx.last_rumble_energy = 0.0 + elif ctx.rumble_active and (now - ctx.last_rumble_change) > RUMBLE_STUCK_TIMEOUT: + # Guard against a stream of tiny-but-nonzero rumble packets that never decay. + sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) + ctx.rumble_active = False + ctx.last_rumble_energy = 0.0 except SerialException as exc: console.print(f"[yellow]UART {ctx.port} disconnected: {exc}[/yellow]") try: ctx.uart.close() except Exception: pass + sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) ctx.uart = None + ctx.rumble_active = False + ctx.last_rumble_energy = 0.0 ctx.last_reopen_attempt = now except Exception as exc: console.print(f"[red]UART error on {ctx.port}: {exc}[/red]")