Make stuff work in windows

This commit is contained in:
jojomawswan 2025-11-22 18:37:54 -07:00
commit 25212d870b

View file

@ -41,6 +41,7 @@ UART_BAUD = 921600
RUMBLE_IDLE_TIMEOUT = 0.25 # seconds without packets before forcing rumble off RUMBLE_IDLE_TIMEOUT = 0.25 # seconds without packets before forcing rumble off
RUMBLE_STUCK_TIMEOUT = 0.60 # continuous same-energy rumble will be stopped after this RUMBLE_STUCK_TIMEOUT = 0.60 # continuous same-energy rumble will be stopped after this
RUMBLE_MIN_ACTIVE = 0.50 # below this, rumble is treated as off/noise RUMBLE_MIN_ACTIVE = 0.50 # below this, rumble is treated as off/noise
RUMBLE_SCALE = 0.8
class SwitchButton: class SwitchButton:
@ -262,37 +263,56 @@ class PicoUART:
write_timeout=0.0, write_timeout=0.0,
xonxoff=False, xonxoff=False,
rtscts=False, rtscts=False,
dsrdtr=False dsrdtr=False,
) )
self._buffer = bytearray() self._buffer = bytearray()
def send_report(self, report: SwitchReport) -> None: def send_report(self, report: SwitchReport) -> None:
# Non-blocking write; no flush to avoid sync stalls.
self.serial.write(report.to_bytes()) self.serial.write(report.to_bytes())
self.serial.flush()
def read_rumble_payload(self) -> Optional[bytes]: def read_rumble_payload(self) -> Optional[bytes]:
chunk = self.serial.read(64) # non-blocking (timeout=0) """
if chunk: Drain all currently available UART bytes into an internal buffer,
self._buffer.extend(chunk) then try to extract a single valid rumble frame.
Frame format:
0: 0xBB (RUMBLE_HEADER)
1: type (0x01 for rumble)
2-9: 8-byte rumble payload
10: checksum (sum of first 10 bytes) & 0xFF
"""
# Read whatever is waiting in OS buffer
waiting = self.serial.in_waiting
if waiting:
self._buffer.extend(self.serial.read(waiting))
while True: while True:
if RUMBLE_HEADER not in self._buffer: if not self._buffer:
return None
start = self._buffer.find(bytes([RUMBLE_HEADER]))
if start < 0:
# No header at all, drop garbage
self._buffer.clear() self._buffer.clear()
return None return None
start = self._buffer.find(bytes([RUMBLE_HEADER]))
# Not enough data for a full frame yet
if len(self._buffer) - start < 11: if len(self._buffer) - start < 11:
# Need more bytes.
if start > 0: if start > 0:
del self._buffer[:start] del self._buffer[:start]
return None return None
frame = self._buffer[start : start + 11]
frame = self._buffer[start:start + 11]
checksum = sum(frame[:10]) & 0xFF checksum = sum(frame[:10]) & 0xFF
if frame[1] == RUMBLE_TYPE_RUMBLE and checksum == frame[10]: if frame[1] == RUMBLE_TYPE_RUMBLE and checksum == frame[10]:
payload = bytes(frame[2:10]) payload = bytes(frame[2:10])
del self._buffer[: start + 11] del self._buffer[:start + 11]
return payload return payload
# Bad frame, drop the header and continue.
del self._buffer[: start + 1] # Bad frame, drop this header and resync
del self._buffer[:start + 1]
def close(self) -> None: def close(self) -> None:
self.serial.close() self.serial.close()
@ -324,7 +344,7 @@ def apply_rumble(controller: sdl2.SDL_GameController, payload: bytes) -> float:
sdl2.SDL_GameControllerRumble(controller, 0, 0, 0) sdl2.SDL_GameControllerRumble(controller, 0, 0, 0)
return 0.0 return 0.0
# Attenuate to feel closer to a real controller; cap at ~25% strength. # Attenuate to feel closer to a real controller; cap at ~25% strength.
scale = 0.60 scale = RUMBLE_SCALE
low = int(min(1.0, left_norm * scale) * 0xFFFF) # SDL: low_frequency_rumble low = int(min(1.0, left_norm * scale) * 0xFFFF) # SDL: low_frequency_rumble
high = int(min(1.0, right_norm * scale) * 0xFFFF) # SDL: high_frequency_rumble high = int(min(1.0, right_norm * scale) * 0xFFFF) # SDL: high_frequency_rumble
duration = 10 duration = 10
@ -341,6 +361,7 @@ class ControllerContext:
uart: Optional[PicoUART] uart: Optional[PicoUART]
report: SwitchReport = field(default_factory=SwitchReport) report: SwitchReport = field(default_factory=SwitchReport)
dpad: Dict[str, bool] = field(default_factory=lambda: {"up": False, "down": False, "left": False, "right": False}) dpad: Dict[str, bool] = field(default_factory=lambda: {"up": False, "down": False, "left": False, "right": False})
button_state: Dict[int, bool] = field(default_factory=dict)
last_trigger_state: Dict[str, bool] = field(default_factory=lambda: {"left": False, "right": False}) last_trigger_state: Dict[str, bool] = field(default_factory=lambda: {"left": False, "right": False})
last_send: float = 0.0 last_send: float = 0.0
last_reopen_attempt: float = 0.0 last_reopen_attempt: float = 0.0
@ -374,11 +395,6 @@ def open_uart_or_warn(port: str, baud: int, console: Console) -> Optional[PicoUA
return None return None
def start_rumble_listener(ctx: ControllerContext) -> threading.Thread:
# No-op placeholder (rumble is polled in the main loop for hotplug safety).
return None
def build_arg_parser() -> argparse.ArgumentParser: def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Bridge SDL2 controllers to switch-pico UART (with rumble)") parser = argparse.ArgumentParser(description="Bridge SDL2 controllers to switch-pico UART (with rumble)")
parser.add_argument( parser.add_argument(
@ -453,6 +469,32 @@ def build_arg_parser() -> argparse.ArgumentParser:
return parser return parser
def poll_controller_buttons(ctx: ControllerContext, button_map: Dict[int, int]) -> None:
changed = False
for sdl_button, switch_bit in button_map.items():
pressed = bool(sdl2.SDL_GameControllerGetButton(ctx.controller, sdl_button))
previous = ctx.button_state.get(sdl_button)
if previous == pressed:
continue
ctx.button_state[sdl_button] = pressed
if pressed:
ctx.report.buttons |= switch_bit
else:
ctx.report.buttons &= ~switch_bit
changed = True
dpad_changed = False
for sdl_button, name in DPAD_BUTTONS.items():
pressed = bool(sdl2.SDL_GameControllerGetButton(ctx.controller, sdl_button))
if ctx.dpad[name] == pressed:
continue
ctx.dpad[name] = pressed
dpad_changed = True
if dpad_changed:
ctx.report.hat = dpad_to_hat(ctx.dpad)
def main() -> None: def main() -> None:
parser = build_arg_parser() parser = build_arg_parser()
args = parser.parse_args() args = parser.parse_args()
@ -487,7 +529,7 @@ def main() -> None:
set_hint("SDL_JOYSTICK_HIDAPI_SWITCH", "1") set_hint("SDL_JOYSTICK_HIDAPI_SWITCH", "1")
# Use controller button labels so Nintendo layouts (ABXY) map correctly on Linux. # Use controller button labels so Nintendo layouts (ABXY) map correctly on Linux.
set_hint("SDL_GAMECONTROLLER_USE_BUTTON_LABELS", "1") set_hint("SDL_GAMECONTROLLER_USE_BUTTON_LABELS", "1")
if sdl2.SDL_Init(sdl2.SDL_INIT_GAMECONTROLLER | sdl2.SDL_INIT_JOYSTICK) != 0: if sdl2.SDL_Init(sdl2.SDL_INIT_GAMECONTROLLER | sdl2.SDL_INIT_JOYSTICK | sdl2.SDL_INIT_EVERYTHING) != 0:
parser.error(f"SDL init failed: {sdl2.SDL_GetError().decode(errors='ignore')}") parser.error(f"SDL init failed: {sdl2.SDL_GetError().decode(errors='ignore')}")
contexts: Dict[int, ControllerContext] = {} contexts: Dict[int, ControllerContext] = {}
uarts: List[PicoUART] = [] uarts: List[PicoUART] = []
@ -707,6 +749,7 @@ def main() -> None:
ctx.report.buttons |= bit ctx.report.buttons |= bit
else: else:
ctx.report.buttons &= ~bit ctx.report.buttons &= ~bit
ctx.button_state[button] = pressed
elif button in DPAD_BUTTONS: elif button in DPAD_BUTTONS:
ctx.dpad[DPAD_BUTTONS[button]] = pressed ctx.dpad[DPAD_BUTTONS[button]] = pressed
ctx.report.hat = dpad_to_hat(ctx.dpad) ctx.report.hat = dpad_to_hat(ctx.dpad)
@ -747,11 +790,6 @@ def main() -> None:
contexts[instance_id] = ctx contexts[instance_id] = ctx
elif event.type == sdl2.SDL_CONTROLLERDEVICEREMOVED: elif event.type == sdl2.SDL_CONTROLLERDEVICEREMOVED:
instance_id = event.cdevice.which instance_id = event.cdevice.which
# ctx = contexts.get(instance_id)
# if ctx and sdl2.SDL_GameControllerGetAttached(ctx.controller):
# # Spurious detach; ignore.
# console.print(f"[yellow]Ignoring spurious remove for controller {instance_id}[/yellow]")
# continue
ctx = contexts.pop(instance_id, None) ctx = contexts.pop(instance_id, None)
if ctx: if ctx:
console.print(f"[yellow]Controller {instance_id} removed[/yellow]") console.print(f"[yellow]Controller {instance_id} removed[/yellow]")
@ -771,6 +809,12 @@ def main() -> None:
else: else:
pair_waiting_contexts() pair_waiting_contexts()
for ctx in list(contexts.values()): for ctx in list(contexts.values()):
current_button_map = (
button_map_swapped
if (args.swap_abxy or ctx.controller_index in swap_abxy_indices)
else button_map_default
)
poll_controller_buttons(ctx, current_button_map)
# Reconnect UART if needed. # Reconnect UART if needed.
if ctx.port and ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0: if ctx.port and ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0:
ctx.last_reopen_attempt = now ctx.last_reopen_attempt = now
@ -782,13 +826,21 @@ def main() -> None:
if ctx.uart is None: if ctx.uart is None:
continue continue
try: try:
# Send controller state at configured frequency
if now - ctx.last_send >= interval: if now - ctx.last_send >= interval:
ctx.uart.send_report(ctx.report) ctx.uart.send_report(ctx.report)
ctx.last_send = now ctx.last_send = now
# Poll rumble quickly while we have the port.
payload = ctx.uart.read_rumble_payload() # Drain all pending rumble frames and apply only the latest one.
if payload: last_payload = None
energy = apply_rumble(ctx.controller, payload) while True:
p = ctx.uart.read_rumble_payload()
if not p:
break
last_payload = p
if last_payload is not None:
energy = apply_rumble(ctx.controller, last_payload)
ctx.rumble_active = energy >= RUMBLE_MIN_ACTIVE ctx.rumble_active = energy >= RUMBLE_MIN_ACTIVE
if ctx.rumble_active and energy != ctx.last_rumble_energy: if ctx.rumble_active and energy != ctx.last_rumble_energy:
ctx.last_rumble_change = now ctx.last_rumble_change = now