Controller tweaks

This commit is contained in:
Joey Yakimowich-Payne 2025-11-21 15:46:06 -07:00
commit f6409ba905
No known key found for this signature in database
GPG key ID: 6BFE655FA5ABD1E1

View file

@ -37,6 +37,9 @@ UART_HEADER = 0xAA
RUMBLE_HEADER = 0xBB
RUMBLE_TYPE_RUMBLE = 0x01
UART_BAUD = 900000
RUMBLE_IDLE_TIMEOUT = 0.25 # seconds without packets before forcing rumble off
RUMBLE_STUCK_TIMEOUT = 0.60 # continuous same-energy rumble will be stopped after this
RUMBLE_MIN_ACTIVE = 0.50 # below this, rumble is treated as off/noise
class SwitchButton:
@ -269,8 +272,9 @@ def decode_rumble(payload: bytes) -> Tuple[float, float]:
if payload == b"\x00\x01\x40\x40\x00\x01\x40\x40":
return 0.0, 0.0
# Rumble amp is 10 bits across bytes 0/1 and 4/5.
left_raw = ((payload[1] & 0x03) << 8) | payload[0]
right_raw = ((payload[5] & 0x03) << 8) | payload[4]
# Switch format is right rumble first, then left rumble (4 bytes each).
right_raw = ((payload[1] & 0x03) << 8) | payload[0]
left_raw = ((payload[5] & 0x03) << 8) | payload[4]
if left_raw < 8 and right_raw < 8:
return 0.0, 0.0
left = min(max(left_raw / 1023.0, 0.0), 1.0)
@ -278,19 +282,20 @@ def decode_rumble(payload: bytes) -> Tuple[float, float]:
return left, right
def apply_rumble(controller: sdl2.SDL_GameController, payload: bytes) -> None:
def apply_rumble(controller: sdl2.SDL_GameController, payload: bytes) -> float:
left_norm, right_norm = decode_rumble(payload)
max_norm = max(left_norm, right_norm)
# Treat small rumble as "off" to avoid idle buzz.
if max_norm < 0.40:
if max_norm < RUMBLE_MIN_ACTIVE:
sdl2.SDL_GameControllerRumble(controller, 0, 0, 0)
return
return 0.0
# Attenuate to feel closer to a real controller; cap at ~25% strength.
scale = 0.40
left = int(min(1.0, left_norm * scale) * 0xFFFF)
right = int(min(1.0, right_norm * scale) * 0xFFFF)
scale = 0.60
low = int(min(1.0, left_norm * scale) * 0xFFFF) # SDL: low_frequency_rumble
high = int(min(1.0, right_norm * scale) * 0xFFFF) # SDL: high_frequency_rumble
duration = 10
sdl2.SDL_GameControllerRumble(controller, left, right, duration)
sdl2.SDL_GameControllerRumble(controller, low, high, duration)
return max_norm
@dataclass
@ -298,13 +303,17 @@ class ControllerContext:
controller: sdl2.SDL_GameController
instance_id: int
controller_index: int
port: str
port: Optional[str]
uart: Optional[PicoUART]
report: SwitchReport = field(default_factory=SwitchReport)
dpad: Dict[str, bool] = field(default_factory=lambda: {"up": False, "down": False, "left": False, "right": False})
last_trigger_state: Dict[str, bool] = field(default_factory=lambda: {"left": False, "right": False})
last_send: float = 0.0
last_reopen_attempt: float = 0.0
last_rumble: float = 0.0
last_rumble_change: float = 0.0
last_rumble_energy: float = 0.0
rumble_active: bool = False
def open_controller(index: int) -> Tuple[sdl2.SDL_GameController, int]:
@ -407,6 +416,11 @@ def main() -> None:
controller_count = sdl2.SDL_NumJoysticks()
if controller_count < 0:
parser.error(f"SDL error: {sdl2.SDL_GetError().decode()}")
auto_pairing_enabled = not args.map and not args.interactive
auto_discover_ports = auto_pairing_enabled and not args.ports
available_ports: List[str] = []
auto_assigned_indices: set[int] = set()
include_non_usb = args.all_ports or False
for index in range(controller_count):
if sdl2.SDL_IsGameController(index):
name = sdl2.SDL_GameControllerNameForIndex(index)
@ -423,70 +437,65 @@ def main() -> None:
if args.interactive:
if not controller_indices:
parser.error("No controllers detected for interactive pairing.")
discovered = discover_ports(include_non_usb=args.all_ports or False)
discovered = discover_ports(include_non_usb=include_non_usb)
if not discovered:
parser.error("No UART devices found for interactive pairing.")
mappings = interactive_pairing(console, controller_names, discovered)
if not mappings:
parser.error("No controller-to-UART mappings were selected.")
elif not mappings:
elif auto_pairing_enabled:
if args.ports:
validated = list(args.ports)
if not controller_indices:
parser.error("No controllers available to pair.")
pair_count = min(len(controller_indices), len(validated))
if pair_count == 0:
parser.error("No UART ports specified.")
if len(validated) < len(controller_indices):
console.print(
f"[yellow]Warning: only {len(validated)} UART(s) provided; pairing first {pair_count} controllers.[/yellow]"
)
mappings = list((controller_indices[i], validated[i]) for i in range(pair_count))
console.print("[green]Paired controllers to specified UART ports:[/green]")
for idx, port in mappings:
console.print(f" Controller {idx} -> {port}")
available_ports.extend(list(args.ports))
console.print(f"[green]Prepared {len(available_ports)} specified UART port(s) for auto-pairing.[/green]")
else:
discovered = discover_ports(include_non_usb=args.all_ports or False)
if not discovered:
parser.error("No UART devices detected automatically.")
pair_count = min(len(controller_indices), len(discovered))
if pair_count == 0:
parser.error("No controllers detected to pair.")
if len(discovered) < len(controller_indices):
console.print(
f"[yellow]Warning: detected {len(discovered)} UART(s) but {len(controller_indices)} controller(s); pairing first {pair_count}.[/yellow]"
)
mappings = list((controller_indices[i], discovered[i]["device"]) for i in range(pair_count))
console.print("[green]Auto-detected UARTs:[/green]")
for info in discovered:
console.print(f" {info['device']} ({info['description']})")
console.print("[green]Paired controllers to detected UARTs:[/green]")
for idx, port in mappings:
console.print(f" Controller {idx} -> {port}")
discovered = discover_ports(include_non_usb=include_non_usb)
if discovered:
available_ports.extend(info["device"] for info in discovered)
console.print("[green]Auto-detected UARTs:[/green]")
for info in discovered:
console.print(f" {info['device']} ({info['description']})")
else:
console.print("[yellow]No UART devices detected yet; waiting for hotplug...[/yellow]")
for index, port in mappings:
mapping_by_index[index] = port
# Open currently connected controllers that match the mapping.
for index, port in mappings:
def assign_port_for_index(idx: int) -> Optional[str]:
if idx in mapping_by_index:
return mapping_by_index[idx]
if not auto_pairing_enabled:
return None
if not available_ports:
return None
port_choice = available_ports.pop(0)
mapping_by_index[idx] = port_choice
auto_assigned_indices.add(idx)
console.print(f"[green]Auto-paired controller {idx} to {port_choice}[/green]")
return port_choice
# Open currently connected controllers that we can pair.
for index in controller_indices:
if index >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(index):
# Try to turn non-GameController into a controller via hint reload.
if index < sdl2.SDL_NumJoysticks():
name = sdl2.SDL_JoystickNameForIndex(index)
name_str = name.decode() if isinstance(name, bytes) else str(name)
console.print(f"[yellow]Index {index} is not a GameController ({name_str}). Trying raw open failed.[/yellow]")
name = sdl2.SDL_JoystickNameForIndex(index)
name_str = name.decode() if isinstance(name, bytes) else str(name)
console.print(f"[yellow]Index {index} is not a GameController ({name_str}). Trying raw open failed.[/yellow]")
continue
port = assign_port_for_index(index)
if port is None and not auto_pairing_enabled:
continue
try:
controller, instance_id = open_controller(index)
except Exception as exc:
console.print(f"[red]Failed to open controller {index}: {exc}[/red]")
continue
uart = try_open_uart(port, args.baud)
uart = try_open_uart(port, args.baud) if port else None
if uart:
uarts.append(uart)
console.print(f"[green]Controller {index} ({instance_id}) paired to {port}[/green]")
else:
elif port:
console.print(f"[yellow]Controller {index} ({instance_id}) waiting for UART {port}[/yellow]")
else:
console.print(f"[yellow]Controller {index} ({instance_id}) connected; waiting for an available UART[/yellow]")
ctx = ControllerContext(
controller=controller,
instance_id=instance_id,
@ -496,10 +505,51 @@ def main() -> None:
)
contexts[instance_id] = ctx
if not contexts and not mapping_by_index:
parser.error("No controllers opened. Check --map/--ports/--interactive values.")
if not contexts:
console.print("[yellow]No controllers opened; waiting for hotplug events...[/yellow]")
def ports_in_use() -> set:
used = set(mapping_by_index.values())
used.update(ctx.port for ctx in contexts.values() if ctx.port)
return used
def discover_new_ports() -> None:
if not auto_discover_ports:
return
discovered = discover_ports(include_non_usb=include_non_usb)
in_use = ports_in_use()
for info in discovered:
path = info["device"]
if path in in_use or path in available_ports:
continue
available_ports.append(path)
console.print(f"[green]Discovered UART {path} ({info['description']}); available for pairing.[/green]")
def pair_waiting_contexts() -> None:
for ctx in list(contexts.values()):
if ctx.port is not None:
continue
port_choice = assign_port_for_index(ctx.controller_index)
if port_choice is None:
continue
ctx.port = port_choice
uart = try_open_uart(port_choice, args.baud)
ctx.last_reopen_attempt = time.monotonic()
if uart:
uarts.append(uart)
ctx.uart = uart
console.print(
f"[green]Controller {ctx.controller_index} ({ctx.instance_id}) paired to {port_choice}[/green]"
)
else:
ctx.uart = None
console.print(
f"[yellow]Controller {ctx.controller_index} ({ctx.instance_id}) waiting for UART {port_choice}[/yellow]"
)
event = sdl2.SDL_Event()
port_scan_interval = 2.0
last_port_scan = time.monotonic()
running = True
while running:
while sdl2.SDL_PollEvent(event):
@ -553,24 +603,31 @@ def main() -> None:
ctx.report.hat = dpad_to_hat(ctx.dpad)
elif event.type == sdl2.SDL_CONTROLLERDEVICEADDED:
idx = event.cdevice.which
port = mapping_by_index.get(idx)
if port is None:
if any(c.controller_index == idx for c in contexts.values()):
continue
# Avoid duplicate opens for already connected instance IDs.
already = any(c.controller_index == idx for c in contexts.values())
if already:
port = assign_port_for_index(idx)
if port is None and not auto_pairing_enabled:
continue
if idx >= sdl2.SDL_NumJoysticks() or not sdl2.SDL_IsGameController(idx):
name = sdl2.SDL_JoystickNameForIndex(idx)
name_str = name.decode() if isinstance(name, bytes) else str(name)
console.print(
f"[yellow]Index {idx} is not a GameController ({name_str}). Trying raw open failed.[/yellow]"
)
continue
try:
controller, instance_id = open_controller(idx)
except Exception as exc:
console.print(f"[red]Hotplug open failed for controller {idx}: {exc}[/red]")
continue
uart = try_open_uart(port, args.baud)
uart = try_open_uart(port, args.baud) if port else None
if uart:
uarts.append(uart)
console.print(f"[green]Controller {idx} ({instance_id}) paired to {port}[/green]")
else:
elif port:
console.print(f"[yellow]Controller {idx} ({instance_id}) waiting for UART {port}[/yellow]")
else:
console.print(f"[yellow]Controller {idx} ({instance_id}) connected; waiting for an available UART[/yellow]")
ctx = ControllerContext(
controller=controller,
instance_id=instance_id,
@ -589,12 +646,24 @@ def main() -> None:
ctx = contexts.pop(instance_id, None)
if ctx:
console.print(f"[yellow]Controller {instance_id} removed[/yellow]")
if ctx.controller_index in auto_assigned_indices:
freed = mapping_by_index.pop(ctx.controller_index, None)
auto_assigned_indices.discard(ctx.controller_index)
if freed and freed not in available_ports:
available_ports.append(freed)
console.print(f"[cyan]Released UART {freed} back to pool[/cyan]")
sdl2.SDL_GameControllerClose(ctx.controller)
now = time.monotonic()
if now - last_port_scan > port_scan_interval:
discover_new_ports()
last_port_scan = now
pair_waiting_contexts()
else:
pair_waiting_contexts()
for ctx in list(contexts.values()):
# Reconnect UART if needed.
if ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0:
if ctx.port and ctx.uart is None and (now - ctx.last_reopen_attempt) > 1.0:
ctx.last_reopen_attempt = now
uart = try_open_uart(ctx.port, args.baud)
if uart:
@ -610,14 +679,31 @@ def main() -> None:
# Poll rumble quickly while we have the port.
payload = ctx.uart.read_rumble_payload()
if payload:
apply_rumble(ctx.controller, payload)
energy = apply_rumble(ctx.controller, payload)
ctx.rumble_active = energy >= RUMBLE_MIN_ACTIVE
if ctx.rumble_active and energy != ctx.last_rumble_energy:
ctx.last_rumble_change = now
ctx.last_rumble_energy = energy
ctx.last_rumble = now
elif ctx.rumble_active and (now - ctx.last_rumble) > RUMBLE_IDLE_TIMEOUT:
sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0)
ctx.rumble_active = False
ctx.last_rumble_energy = 0.0
elif ctx.rumble_active and (now - ctx.last_rumble_change) > RUMBLE_STUCK_TIMEOUT:
# Guard against a stream of tiny-but-nonzero rumble packets that never decay.
sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0)
ctx.rumble_active = False
ctx.last_rumble_energy = 0.0
except SerialException as exc:
console.print(f"[yellow]UART {ctx.port} disconnected: {exc}[/yellow]")
try:
ctx.uart.close()
except Exception:
pass
sdl2.SDL_GameControllerRumble(ctx.controller, 0, 0, 0)
ctx.uart = None
ctx.rumble_active = False
ctx.last_rumble_energy = 0.0
ctx.last_reopen_attempt = now
except Exception as exc:
console.print(f"[red]UART error on {ctx.port}: {exc}[/red]")