diff --git a/controller_uart_bridge.py b/controller_uart_bridge.py index 7c60667..dd38821 100644 --- a/controller_uart_bridge.py +++ b/controller_uart_bridge.py @@ -41,6 +41,7 @@ UART_BAUD = 921600 RUMBLE_IDLE_TIMEOUT = 0.25 # seconds without packets before forcing rumble off RUMBLE_STUCK_TIMEOUT = 0.60 # continuous same-energy rumble will be stopped after this RUMBLE_MIN_ACTIVE = 0.50 # below this, rumble is treated as off/noise +RUMBLE_SCALE = 0.8 class SwitchButton: @@ -262,37 +263,56 @@ class PicoUART: write_timeout=0.0, xonxoff=False, rtscts=False, - dsrdtr=False + dsrdtr=False, ) self._buffer = bytearray() def send_report(self, report: SwitchReport) -> None: + # Non-blocking write; no flush to avoid sync stalls. self.serial.write(report.to_bytes()) - self.serial.flush() def read_rumble_payload(self) -> Optional[bytes]: - chunk = self.serial.read(64) # non-blocking (timeout=0) - if chunk: - self._buffer.extend(chunk) + """ + Drain all currently available UART bytes into an internal buffer, + then try to extract a single valid rumble frame. + + Frame format: + 0: 0xBB (RUMBLE_HEADER) + 1: type (0x01 for rumble) + 2-9: 8-byte rumble payload + 10: checksum (sum of first 10 bytes) & 0xFF + """ + # Read whatever is waiting in OS buffer + waiting = self.serial.in_waiting + if waiting: + self._buffer.extend(self.serial.read(waiting)) while True: - if RUMBLE_HEADER not in self._buffer: + if not self._buffer: + return None + + start = self._buffer.find(bytes([RUMBLE_HEADER])) + if start < 0: + # No header at all, drop garbage self._buffer.clear() return None - start = self._buffer.find(bytes([RUMBLE_HEADER])) + + # Not enough data for a full frame yet if len(self._buffer) - start < 11: - # Need more bytes. if start > 0: del self._buffer[:start] return None - frame = self._buffer[start : start + 11] + + frame = self._buffer[start:start + 11] checksum = sum(frame[:10]) & 0xFF + if frame[1] == RUMBLE_TYPE_RUMBLE and checksum == frame[10]: payload = bytes(frame[2:10]) - del self._buffer[: start + 11] + del self._buffer[:start + 11] return payload - # Bad frame, drop the header and continue. - del self._buffer[: start + 1] + + # Bad frame, drop this header and resync + del self._buffer[:start + 1] def close(self) -> None: self.serial.close() @@ -324,7 +344,7 @@ def apply_rumble(controller: sdl2.SDL_GameController, payload: bytes) -> float: sdl2.SDL_GameControllerRumble(controller, 0, 0, 0) return 0.0 # Attenuate to feel closer to a real controller; cap at ~25% strength. - scale = 0.60 + scale = RUMBLE_SCALE low = int(min(1.0, left_norm * scale) * 0xFFFF) # SDL: low_frequency_rumble high = int(min(1.0, right_norm * scale) * 0xFFFF) # SDL: high_frequency_rumble duration = 10 @@ -341,6 +361,7 @@ class ControllerContext: uart: Optional[PicoUART] report: SwitchReport = field(default_factory=SwitchReport) dpad: Dict[str, bool] = field(default_factory=lambda: {"up": False, "down": False, "left": False, "right": False}) + button_state: Dict[int, bool] = field(default_factory=dict) last_trigger_state: Dict[str, bool] = field(default_factory=lambda: {"left": False, "right": False}) last_send: float = 0.0 last_reopen_attempt: float = 0.0 @@ -374,11 +395,6 @@ def open_uart_or_warn(port: str, baud: int, console: Console) -> Optional[PicoUA return None -def start_rumble_listener(ctx: ControllerContext) -> threading.Thread: - # No-op placeholder (rumble is polled in the main loop for hotplug safety). - return None - - def build_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Bridge SDL2 controllers to switch-pico UART (with rumble)") parser.add_argument( @@ -453,6 +469,32 @@ def build_arg_parser() -> argparse.ArgumentParser: return parser +def poll_controller_buttons(ctx: ControllerContext, button_map: Dict[int, int]) -> None: + changed = False + for sdl_button, switch_bit in button_map.items(): + pressed = bool(sdl2.SDL_GameControllerGetButton(ctx.controller, sdl_button)) + previous = ctx.button_state.get(sdl_button) + if previous == pressed: + continue + ctx.button_state[sdl_button] = pressed + if pressed: + ctx.report.buttons |= switch_bit + else: + ctx.report.buttons &= ~switch_bit + changed = True + + dpad_changed = False + for sdl_button, name in DPAD_BUTTONS.items(): + pressed = bool(sdl2.SDL_GameControllerGetButton(ctx.controller, sdl_button)) + if ctx.dpad[name] == pressed: + continue + ctx.dpad[name] = pressed + dpad_changed = True + + if dpad_changed: + ctx.report.hat = dpad_to_hat(ctx.dpad) + + def main() -> None: parser = build_arg_parser() args = parser.parse_args() @@ -487,7 +529,7 @@ def main() -> None: set_hint("SDL_JOYSTICK_HIDAPI_SWITCH", "1") # Use controller button labels so Nintendo layouts (ABXY) map correctly on Linux. set_hint("SDL_GAMECONTROLLER_USE_BUTTON_LABELS", "1") - if sdl2.SDL_Init(sdl2.SDL_INIT_GAMECONTROLLER | sdl2.SDL_INIT_JOYSTICK) != 0: + if sdl2.SDL_Init(sdl2.SDL_INIT_GAMECONTROLLER | sdl2.SDL_INIT_JOYSTICK | sdl2.SDL_INIT_EVERYTHING) != 0: parser.error(f"SDL init failed: {sdl2.SDL_GetError().decode(errors='ignore')}") contexts: Dict[int, ControllerContext] = {} uarts: List[PicoUART] = [] @@ -707,6 +749,7 @@ def main() -> None: ctx.report.buttons |= bit else: ctx.report.buttons &= ~bit + ctx.button_state[button] = pressed elif button in DPAD_BUTTONS: ctx.dpad[DPAD_BUTTONS[button]] = pressed ctx.report.hat = dpad_to_hat(ctx.dpad) @@ -747,11 +790,6 @@ def main() -> None: contexts[instance_id] = ctx elif event.type == sdl2.SDL_CONTROLLERDEVICEREMOVED: instance_id = event.cdevice.which - # ctx = contexts.get(instance_id) - # if ctx and sdl2.SDL_GameControllerGetAttached(ctx.controller): - # # Spurious detach; ignore. - # console.print(f"[yellow]Ignoring spurious remove for controller {instance_id}[/yellow]") - # continue ctx = contexts.pop(instance_id, None) if ctx: console.print(f"[yellow]Controller {instance_id} removed[/yellow]") @@ -771,6 +809,12 @@ def main() -> None: else: pair_waiting_contexts() for ctx in list(contexts.values()): + current_button_map = ( + button_map_swapped + if (args.swap_abxy or ctx.controller_index in swap_abxy_indices) + else button_map_default + ) + poll_controller_buttons(ctx, current_button_map) # Reconnect UART if needed. if ctx.port and ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0: ctx.last_reopen_attempt = now @@ -782,13 +826,21 @@ def main() -> None: if ctx.uart is None: continue try: + # Send controller state at configured frequency if now - ctx.last_send >= interval: ctx.uart.send_report(ctx.report) ctx.last_send = now - # Poll rumble quickly while we have the port. - payload = ctx.uart.read_rumble_payload() - if payload: - energy = apply_rumble(ctx.controller, payload) + + # Drain all pending rumble frames and apply only the latest one. + last_payload = None + while True: + p = ctx.uart.read_rumble_payload() + if not p: + break + last_payload = p + + if last_payload is not None: + energy = apply_rumble(ctx.controller, last_payload) ctx.rumble_active = energy >= RUMBLE_MIN_ACTIVE if ctx.rumble_active and energy != ctx.last_rumble_energy: ctx.last_rumble_change = now