From 954b47253c8aa53cefe9870b556ab0746504089e Mon Sep 17 00:00:00 2001 From: Joey Yakimowich-Payne Date: Sun, 23 Nov 2025 09:48:30 -0700 Subject: [PATCH] Fix hotplugging --- controller_uart_bridge.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/controller_uart_bridge.py b/controller_uart_bridge.py index f3acf53..e359fe9 100644 --- a/controller_uart_bridge.py +++ b/controller_uart_bridge.py @@ -697,6 +697,32 @@ def ports_in_use(pairing: PairingState, contexts: Dict[int, ControllerContext]) return used +def handle_removed_port(path: str, pairing: PairingState, contexts: Dict[int, ControllerContext], console: Console) -> None: + """Clear mappings/contexts for a UART path that disappeared.""" + if path in pairing.available_ports: + pairing.available_ports.remove(path) + console.print(f"[yellow]UART {path} removed; dropping from available pool[/yellow]") + indices_to_clear = [idx for idx, mapped in pairing.mapping_by_index.items() if mapped == path] + for idx in indices_to_clear: + pairing.mapping_by_index.pop(idx, None) + pairing.auto_assigned_indices.discard(idx) + for ctx in list(contexts.values()): + if ctx.port != path: + continue + if ctx.uart: + try: + ctx.uart.close() + except Exception: + pass + sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0) + ctx.uart = None + ctx.port = None + ctx.rumble_active = False + ctx.last_rumble_energy = 0.0 + ctx.last_reopen_attempt = time.monotonic() + console.print(f"[yellow]UART {path} removed; controller {ctx.controller_index} waiting for reassignment[/yellow]") + + def discover_new_ports(pairing: PairingState, contexts: Dict[int, ControllerContext], console: Console) -> None: """Scan for new serial ports and add unused ones to the available pool.""" if not pairing.auto_discover_ports: @@ -706,6 +732,14 @@ def discover_new_ports(pairing: PairingState, contexts: Dict[int, ControllerCont ignore_descriptions=pairing.ignore_port_desc, include_descriptions=pairing.include_port_desc, ) + current_paths = {info["device"] for info in discovered} + known_paths = set(pairing.available_ports) + known_paths.update(pairing.mapping_by_index.values()) + known_paths.update(ctx.port for ctx in contexts.values() if ctx.port) + # Drop any paths we previously knew about that are no longer present. + removed_paths = [path for path in known_paths if path not in current_paths] + for path in removed_paths: + handle_removed_port(path, pairing, contexts, console) in_use = ports_in_use(pairing, contexts) for info in discovered: path = info["device"]