Add zeroing ability
This commit is contained in:
parent
80528ff39e
commit
8e740d4fff
2 changed files with 185 additions and 3 deletions
|
|
@ -16,6 +16,8 @@ Features inspired by ``host/controller_bridge.py``:
|
|||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from ctypes import create_string_buffer
|
||||
|
|
@ -64,6 +66,18 @@ def parse_mapping(value: str) -> Tuple[int, str]:
|
|||
return idx, port.strip()
|
||||
|
||||
|
||||
def parse_hotkey(value: str) -> str:
|
||||
"""Validate a single-character hotkey (empty string disables)."""
|
||||
if value is None:
|
||||
return ""
|
||||
value = value.strip()
|
||||
if not value:
|
||||
return ""
|
||||
if len(value) != 1:
|
||||
raise argparse.ArgumentTypeError("Hotkeys must be a single character (or empty to disable).")
|
||||
return value
|
||||
|
||||
|
||||
def set_hint(name: str, value: str) -> None:
|
||||
"""Set an SDL hint safely even if the constant is missing in PySDL2."""
|
||||
try:
|
||||
|
|
@ -95,6 +109,15 @@ DPAD_BUTTONS = {
|
|||
sdl2.SDL_CONTROLLER_BUTTON_DPAD_RIGHT: "right",
|
||||
}
|
||||
|
||||
STICK_AXIS_LABELS = (
|
||||
(sdl2.SDL_CONTROLLER_AXIS_LEFTX, "LX"),
|
||||
(sdl2.SDL_CONTROLLER_AXIS_LEFTY, "LY"),
|
||||
(sdl2.SDL_CONTROLLER_AXIS_RIGHTX, "RX"),
|
||||
(sdl2.SDL_CONTROLLER_AXIS_RIGHTY, "RY"),
|
||||
)
|
||||
|
||||
STICK_AXES = tuple(axis for axis, _ in STICK_AXIS_LABELS)
|
||||
|
||||
|
||||
def is_usb_serial(path: str) -> bool:
|
||||
"""
|
||||
|
|
@ -226,6 +249,128 @@ class ControllerContext:
|
|||
last_rumble_change: float = 0.0
|
||||
last_rumble_energy: float = 0.0
|
||||
rumble_active: bool = False
|
||||
axis_offsets: Dict[int, int] = field(default_factory=dict)
|
||||
|
||||
|
||||
def capture_stick_offsets(controller: sdl2.SDL_GameController) -> Dict[int, int]:
|
||||
"""Sample the current stick axes so they can be treated as the neutral center."""
|
||||
offsets: Dict[int, int] = {}
|
||||
for axis in STICK_AXES:
|
||||
offsets[axis] = int(sdl2.SDL_GameControllerGetAxis(controller, axis))
|
||||
return offsets
|
||||
|
||||
|
||||
def format_axis_offsets(offsets: Dict[int, int]) -> str:
|
||||
"""Return a human-friendly summary of per-axis offsets (for logging)."""
|
||||
return ", ".join(f"{label}={offsets.get(axis, 0):+d}" for axis, label in STICK_AXIS_LABELS)
|
||||
|
||||
|
||||
def calibrate_axis_value(value: int, axis: int, ctx: ControllerContext) -> int:
|
||||
"""Apply any stored calibration offset to a raw axis reading."""
|
||||
if not ctx.axis_offsets:
|
||||
return value
|
||||
offset = ctx.axis_offsets.get(axis)
|
||||
if offset is None:
|
||||
return value
|
||||
return max(-32768, min(32767, value - offset))
|
||||
|
||||
|
||||
class ZeroHotkeyMonitor:
|
||||
"""Platform-aware helper that watches for a single keypress without blocking the main loop."""
|
||||
|
||||
def __init__(self, key: str, console: Console) -> None:
|
||||
self.key = (key or "").lower()
|
||||
self.console = console
|
||||
self._active = False
|
||||
self._platform = os.name
|
||||
self._fd: Optional[int] = None
|
||||
self._old_termios = None
|
||||
self._msvcrt = None
|
||||
|
||||
def start(self) -> bool:
|
||||
if not self.key:
|
||||
return False
|
||||
if self._platform == "nt":
|
||||
try:
|
||||
import msvcrt # type: ignore
|
||||
except ImportError:
|
||||
self.console.print("[yellow]Zero hotkey disabled: msvcrt unavailable.[/yellow]")
|
||||
return False
|
||||
self._msvcrt = msvcrt
|
||||
self._active = True
|
||||
self.console.print(
|
||||
f"[magenta]Press '{self.key.upper()}' in this terminal to re-zero controller sticks.[/magenta]"
|
||||
)
|
||||
return True
|
||||
|
||||
if not sys.stdin.isatty():
|
||||
self.console.print("[yellow]Zero hotkey disabled: stdin is not a TTY.[/yellow]")
|
||||
return False
|
||||
import termios
|
||||
import tty
|
||||
|
||||
self._fd = sys.stdin.fileno()
|
||||
self._old_termios = termios.tcgetattr(self._fd)
|
||||
tty.setcbreak(self._fd)
|
||||
self._active = True
|
||||
self.console.print(
|
||||
f"[magenta]Press '{self.key.upper()}' in this terminal to re-zero controller sticks.[/magenta]"
|
||||
)
|
||||
return True
|
||||
|
||||
def stop(self) -> None:
|
||||
if not self._active:
|
||||
return
|
||||
if self._platform != "nt" and self._fd is not None and self._old_termios is not None:
|
||||
import termios
|
||||
|
||||
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._old_termios)
|
||||
self._active = False
|
||||
|
||||
def poll_trigger(self) -> bool:
|
||||
if not self._active:
|
||||
return False
|
||||
key = self._read_key()
|
||||
if key and key.lower() == self.key:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _read_key(self) -> Optional[str]:
|
||||
if self._platform == "nt":
|
||||
if self._msvcrt and self._msvcrt.kbhit():
|
||||
ch = self._msvcrt.getwch()
|
||||
if ch == "\x03":
|
||||
raise KeyboardInterrupt
|
||||
return ch
|
||||
return None
|
||||
import select
|
||||
|
||||
ready, _, _ = select.select([sys.stdin], [], [], 0)
|
||||
if not ready:
|
||||
return None
|
||||
ch = sys.stdin.read(1)
|
||||
if ch == "\x03":
|
||||
raise KeyboardInterrupt
|
||||
return ch
|
||||
|
||||
|
||||
def zero_context_sticks(ctx: ControllerContext, console: Optional[Console] = None, reason: str = "Zeroed stick centers") -> None:
|
||||
"""Capture and store the current stick positions for a controller."""
|
||||
offsets = capture_stick_offsets(ctx.controller)
|
||||
ctx.axis_offsets = offsets
|
||||
if console:
|
||||
console.print(
|
||||
f"[cyan]{reason} for controller {ctx.controller_index} (inst {ctx.instance_id}): {format_axis_offsets(offsets)}[/cyan]"
|
||||
)
|
||||
|
||||
|
||||
def zero_all_context_sticks(contexts: Dict[int, ControllerContext], console: Console) -> None:
|
||||
"""Zero every connected controller's sticks."""
|
||||
if not contexts:
|
||||
console.print("[yellow]No controllers available to zero right now.[/yellow]")
|
||||
return
|
||||
for ctx in contexts.values():
|
||||
zero_context_sticks(ctx, console, reason="Re-zeroed stick centers")
|
||||
|
||||
|
||||
def open_controller(index: int) -> Tuple[sdl2.SDL_GameController, int, str]:
|
||||
|
|
@ -295,7 +440,24 @@ def build_arg_parser() -> argparse.ArgumentParser:
|
|||
default=500.0,
|
||||
help="Report send frequency per controller (Hz, default 500)",
|
||||
)
|
||||
parser.add_argument("--deadzone", type=float, default=0.08, help="Stick deadzone (0.0-1.0, default 0.08)")
|
||||
parser.add_argument(
|
||||
"--deadzone",
|
||||
type=float,
|
||||
default=0.08,
|
||||
help="Stick deadzone (0.0-1.0, default 0.08)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--zero-sticks",
|
||||
action="store_true",
|
||||
help="Capture stick positions on connect and treat them as neutral to cancel drift.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--zero-hotkey",
|
||||
type=parse_hotkey,
|
||||
default="z",
|
||||
metavar="KEY",
|
||||
help="Press this key in the terminal to re-zero sticks at runtime (default: 'z', empty string disables).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--trigger-threshold",
|
||||
type=float,
|
||||
|
|
@ -390,6 +552,8 @@ class BridgeConfig:
|
|||
interval: float
|
||||
deadzone_raw: int
|
||||
trigger_threshold: int
|
||||
zero_sticks: bool
|
||||
zero_hotkey: str
|
||||
button_map_default: Dict[int, SwitchButton]
|
||||
button_map_swapped: Dict[int, SwitchButton]
|
||||
swap_abxy_indices: set[int]
|
||||
|
|
@ -442,6 +606,8 @@ def build_bridge_config(console: Console, args: argparse.Namespace) -> BridgeCon
|
|||
interval=interval,
|
||||
deadzone_raw=deadzone_raw,
|
||||
trigger_threshold=trigger_threshold,
|
||||
zero_sticks=bool(args.zero_sticks),
|
||||
zero_hotkey=args.zero_hotkey or "",
|
||||
button_map_default=button_map_default,
|
||||
button_map_swapped=button_map_swapped,
|
||||
swap_abxy_indices=swap_abxy_indices,
|
||||
|
|
@ -723,6 +889,8 @@ def open_initial_contexts(
|
|||
port=port,
|
||||
uart=uart,
|
||||
)
|
||||
if config.zero_sticks:
|
||||
zero_context_sticks(ctx, console)
|
||||
contexts[instance_id] = ctx
|
||||
return contexts, uarts
|
||||
|
||||
|
|
@ -733,7 +901,7 @@ def handle_axis_motion(event: sdl2.SDL_Event, contexts: Dict[int, ControllerCont
|
|||
if not ctx:
|
||||
return
|
||||
axis = event.caxis.axis
|
||||
value = event.caxis.value
|
||||
value = calibrate_axis_value(event.caxis.value, axis, ctx)
|
||||
if axis == sdl2.SDL_CONTROLLER_AXIS_LEFTX:
|
||||
ctx.report.lx = axis_to_stick(value, config.deadzone_raw)
|
||||
elif axis == sdl2.SDL_CONTROLLER_AXIS_LEFTY:
|
||||
|
|
@ -838,6 +1006,8 @@ def handle_device_added(
|
|||
port=port,
|
||||
uart=uart,
|
||||
)
|
||||
if config.zero_sticks:
|
||||
zero_context_sticks(ctx, console)
|
||||
contexts[instance_id] = ctx
|
||||
|
||||
|
||||
|
|
@ -939,6 +1109,7 @@ def run_bridge_loop(
|
|||
pairing: PairingState,
|
||||
contexts: Dict[int, ControllerContext],
|
||||
uarts: List[PicoUART],
|
||||
hotkey: Optional[ZeroHotkeyMonitor] = None,
|
||||
) -> None:
|
||||
"""Main event loop for bridging controllers to UART and handling rumble."""
|
||||
event = sdl2.SDL_Event()
|
||||
|
|
@ -969,6 +1140,8 @@ def run_bridge_loop(
|
|||
else:
|
||||
pair_waiting_contexts(args, pairing, contexts, uarts, console)
|
||||
service_contexts(now, args, config, contexts, uarts, console)
|
||||
if hotkey and hotkey.poll_trigger():
|
||||
zero_all_context_sticks(contexts, console)
|
||||
sdl2.SDL_Delay(1)
|
||||
|
||||
|
||||
|
|
@ -990,17 +1163,24 @@ def main() -> None:
|
|||
initialize_sdl(parser)
|
||||
contexts: Dict[int, ControllerContext] = {}
|
||||
uarts: List[PicoUART] = []
|
||||
hotkey_monitor: Optional[ZeroHotkeyMonitor] = None
|
||||
try:
|
||||
if args.list_controllers:
|
||||
list_controllers_with_guids(console, parser)
|
||||
return
|
||||
controller_indices, controller_names = detect_controllers(console, args, parser)
|
||||
pairing = prepare_pairing_state(args, console, parser, controller_indices, controller_names)
|
||||
if config.zero_hotkey:
|
||||
candidate = ZeroHotkeyMonitor(config.zero_hotkey, console)
|
||||
if candidate.start():
|
||||
hotkey_monitor = candidate
|
||||
contexts, uarts = open_initial_contexts(args, pairing, controller_indices, console, config)
|
||||
if not contexts:
|
||||
console.print("[yellow]No controllers opened; waiting for hotplug events...[/yellow]")
|
||||
run_bridge_loop(args, console, config, pairing, contexts, uarts)
|
||||
run_bridge_loop(args, console, config, pairing, contexts, uarts, hotkey_monitor)
|
||||
finally:
|
||||
if hotkey_monitor:
|
||||
hotkey_monitor.stop()
|
||||
cleanup(contexts, uarts)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue