diff --git a/README.md b/README.md index 667f438..da1f56c 100644 --- a/README.md +++ b/README.md @@ -383,6 +383,133 @@ python main.py --token-cache /path/to/your/cache.json !token ``` +## Game Control Configuration (`data/config/game_controls.json`) + +The `data/config/game_controls.json` file allows you to define custom game control schemes for different games. This enables the bot to adapt its input commands (keyboard or controller) based on the game currently being played. + +### Structure + +The JSON file has the following main structure: + +```json +{ + "default_game": "YourDefaultGameName", + "games": { + "YourDefaultGameName": { + // Configuration for YourDefaultGameName + }, + "AnotherGameName": { + // Configuration for AnotherGameName + } + // ... more game configurations + } +} +``` + +- `"default_game"`: (String) Specifies the name of the game configuration that will be loaded by default when the bot starts. The command names from this game's configuration will be registered at startup. +- `"games"`: (Object) A dictionary where each key is a unique game name (e.g., "Apex Legends", "Minecraft", "MyCustomGame"). Each game name maps to an object containing its specific control configuration. + +### Game Configuration Structure + +Each game entry within the `"games"` object has the following structure: + +```json +"GameName": { + "description": "A brief description of this game or control scheme.", + "keyboard": { + // Keyboard control mappings for GameName + }, + "controller": { + // Controller control mappings for GameName + } +} +``` + +- `"description"`: (String) A human-readable description for this game configuration (e.g., "Apex Legends - Standard Controls", "Dark Souls - Fat Roll Setup"). This description is shown in `!gamehelp`. +- `"keyboard"`: (Object) Contains mappings for keyboard-based commands. +- `"controller"`: (Object) Contains mappings for virtual gamepad-based commands. + +### Control Mapping Structure (Keyboard & Controller) + +Inside both the `"keyboard"` and `"controller"` objects, you define individual control commands. The **keys** in these objects are the **actual command strings** users will type in Twitch chat (without the `!` prefix). For example, if you define `"w": {...}`, users will type `!w`. + +Each command mapping has the following structure: + +```json +"command_user_types": { + "action_type": "type_of_action_to_perform", + "params": { + // Parameters specific to the action_type + }, + "description": "What this command does in this specific game (e.g., 'Move Forward', 'Jump', 'Primary Attack')." +} +``` + +- `"command_user_types"`: (String) The command string the user types (e.g., `"w"`, `"space"`, `"mouse_left"`, `"a_button"`, `"ls_up"`). +- `"action_type"`: (String) Defines the kind of input to simulate. This corresponds to methods available in `KeyboardController` or `VirtualController`. + - **Common Keyboard `action_type` values:** + - `"press_key"`: Simulates pressing a keyboard key. + - `params`: `{ "key": "key_name" }` (e.g., `"w"`, `"space"`, `"shift"`, `"ctrl"`, `"1"`) + - `"press_mouse_button"`: Simulates a mouse button press. + - `params`: `{ "button": "button_name" }` (e.g., `"left"`, `"right"`, `"middle"`) + - **Common Controller `action_type` values:** + - `"press_button"`: Simulates pressing a gamepad button. + - `params`: `{ "button_name": "name_of_button" }` (e.g., `"a"`, `"b"`, `"x"`, `"y"`, `"left_shoulder"`, `"right_shoulder"`, `"left_thumb"`, `"right_thumb"`, `"start"`, `"back"`) + - `"press_trigger"`: Simulates pressing a gamepad trigger. + - `params`: `{ "trigger_name": "left_or_right" }` (e.g., `"left"`, `"right"`) + - `"move_left_stick"` / `"move_right_stick"`: Simulates moving an analog stick. + - `params`: `{ "direction": "up/down/left/right" }` (e.g., `"up"`) + - *Note: For more precise stick control, the `VirtualController` might support direct x/y values in the future, which would require different params.* + - `"press_dpad"`: Simulates pressing a D-Pad direction. + - `params`: `{ "direction": "up/down/left/right" }` +- `"params"`: (Object) A dictionary of parameters required by the specified `action_type`. The keys and values within `params` depend directly on what the underlying controller methods (`KeyboardController` or `VirtualController` methods) expect. + - An optional `"duration"` (float, in seconds) can often be included in `params` to specify how long a key/button should be held. If omitted, a short default (e.g., 0.1 seconds) is typically used. +- `"description"`: (String) This is crucial. It explains what the command does in the context of *this specific game*. This description is shown to users via the `!gamehelp` command. + +### Example Snippet + +```json +{ + "default_game": "ExampleFPS", + "games": { + "ExampleFPS": { + "description": "Standard FPS Controls", + "keyboard": { + "w": {"action_type": "press_key", "params": {"key": "w"}, "description": "Move Forward"}, + "s": {"action_type": "press_key", "params": {"key": "s"}, "description": "Move Backward"}, + "mouse_left": {"action_type": "press_mouse_button", "params": {"button": "left"}, "description": "Fire Weapon"} + }, + "controller": { + "ls_up": {"action_type": "move_left_stick", "params": {"direction": "up"}, "description": "Move Forward (L-Stick)"}, + "rt": {"action_type": "press_trigger", "params": {"trigger_name": "right"}, "description": "Fire Weapon (RT)"}, + "a": {"action_type": "press_button", "params": {"button_name": "a"}, "description": "Jump (A Button)"} + } + }, + "RacingSim": { + "description": "Basic Racing Controls", + "keyboard": { + "w": {"action_type": "press_key", "params": {"key": "w", "duration": 0.5}, "description": "Accelerate"}, + "s": {"action_type": "press_key", "params": {"key": "s", "duration": 0.3}, "description": "Brake/Reverse"} + }, + "controller": { + "rt": {"action_type": "press_trigger", "params": {"trigger_name": "right"}, "description": "Accelerate (RT)"}, + "lt": {"action_type": "press_trigger", "params": {"trigger_name": "left"}, "description": "Brake/Reverse (LT)"} + } + } + } +} +``` + +### Activating a Game Configuration + +- At startup, the commands defined in the `keyboard` and `controller` sections of the `default_game` are registered. Their descriptions (from the JSON) are used. +- An admin can change the active game configuration at any time using the `!setgame ` command (e.g., `!setgame RacingSim`). +- Once the game is switched: + - The `!gamehelp` command will display the controls for the newly activated game. + - When users type a command (e.g., `!w`), the bot will execute the action defined for `"w"` in the *currently active game's* configuration. If `"w"` is not defined for the active game, it will be treated as an unknown command for that game. + +This system allows for a highly flexible and game-adaptive control scheme managed entirely through the `game_controls.json` file. + ## License This project is open source and available under the MIT License. \ No newline at end of file diff --git a/main.py b/main.py index dd7c62e..71a3293 100644 --- a/main.py +++ b/main.py @@ -295,22 +295,28 @@ def main() -> None: print("Keyboard/mouse control will be simulated but not actually perform any actions.") print("Run 'pip install pynput' to enable actual keyboard/mouse control.") - game_controller: GameController = GameController( - bot, - mode=args.game_mode, - cooldown=args.cooldown, - input_type=args.input_type - ) - - if args.game_mode == "direct": - print(f"Commands will execute immediately with a {args.cooldown}s cooldown per user") + # Only initialize the GameController when NOT using queue mode + # When in queue mode, the controller is initialized in the queue consumer process + if not args.use_queue: + game_controller: GameController = GameController( + bot, + mode=args.game_mode, + cooldown=args.cooldown, + input_type=args.input_type + ) + + if args.game_mode == "direct": + print(f"Commands will execute immediately with a {args.cooldown}s cooldown per user") + else: + print("Commands will be collected through voting") + print("Streamer can start a vote with !startvote and end with !endvote") + print("Users vote with !vote [command]") + + print("Use !gamehelp to see available game commands") + print("Check user stats with !gamestats [username]") else: - print("Commands will be collected through voting") - print("Streamer can start a vote with !startvote and end with !endvote") - print("Users vote with !vote [command]") - - print("Use !gamehelp to see available game commands") - print("Check user stats with !gamestats [username]") + print("Game control will be handled by the queue consumer process") + print("Use !gamehelp to see available game commands") except ImportError as e: print(f"Failed to import game control features: {e}") @@ -319,7 +325,6 @@ def main() -> None: if args.input_type == 'controller': print(" pip install vgamepad # For controller support") - print("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxinput type: ", args.input_type) # Start the bot print(f"Starting bot for channel #{channel}") print("Basic commands: !hello, !dice [sides], !echo [message], !8ball") diff --git a/src/core/auth.py b/src/core/auth.py index f111a53..9456c43 100644 --- a/src/core/auth.py +++ b/src/core/auth.py @@ -7,13 +7,14 @@ import json import logging import requests import webbrowser +import traceback from typing import Dict, Any, Optional, Tuple # Import the auth server functionality from src.core.auth_server import start_oauth_flow # Configure logging -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) class TwitchAuth: @@ -128,7 +129,7 @@ class TwitchAuth: return True except Exception as e: - logger.error(f"Failed to refresh token: {e}") + logger.error(f"Failed to refresh token: {e}\n{traceback.format_exc()}") # Clear refresh token on failure to force a new auth flow self.refresh_token = None return False @@ -172,7 +173,7 @@ class TwitchAuth: return True, "Authentication successful" except Exception as e: - logger.error(f"Failed to exchange authorization code: {e}") + logger.error(f"Failed to exchange authorization code: {e}\n{traceback.format_exc()}") return False, f"Failed to exchange authorization code: {str(e)}" def _setup_auth(self, manual_mode: bool = False) -> bool: @@ -258,7 +259,7 @@ class TwitchAuth: print("=====================================\n") return True except Exception as e: - logger.error(f"Failed to validate token: {e}") + logger.error(f"Failed to validate token: {e}\n{traceback.format_exc()}") print("\n===== Authentication Status =====") print("Warning: Could not validate token") print("============================\n") @@ -342,7 +343,7 @@ class TwitchAuth: response.raise_for_status() return response.json() except Exception as e: - logger.error(f"Token validation failed: {e}") + logger.error(f"Token validation failed: {e}\n{traceback.format_exc()}") raise ValueError(f"Token validation failed: {str(e)}") def update_manually(self, access_token: str, refresh_token: str = None, expires_in: int = 14400) -> None: diff --git a/src/core/twitch.py b/src/core/twitch.py index b1d32c0..978c247 100644 --- a/src/core/twitch.py +++ b/src/core/twitch.py @@ -6,6 +6,7 @@ import requests import time import json import os +import traceback from typing import Dict, Callable, List, Optional, Any, Tuple, Union, Match, Set # Import the queue functionality @@ -203,7 +204,7 @@ class TwitchBot: return False except Exception as e: - logger.error(f"Failed to connect: {e}") + logger.error(f"Failed to connect: {e}\n{traceback.format_exc()}") return False def register_command(self, command: str, callback: CommandCallback) -> None: @@ -233,6 +234,10 @@ class TwitchBot: """Process commands from chat messages""" if not message_data: return + + # Ignore messages from the bot itself + if message_data['username'].lower() == self.username.lower(): + return content: str = message_data['content'].strip() username: str = message_data['username'] @@ -262,9 +267,6 @@ class TwitchBot: if not self.oauth_token: self.oauth_token = self.get_oauth_token() - # Simply pass the command name and let the queue worker handle all processing - logger.info(f"Enqueueing command '{command}' for processing by queue worker") - enqueue_command( username, command, @@ -280,7 +282,7 @@ class TwitchBot: try: self.commands[command](username, args, self) except Exception as e: - logger.error(f"Error executing command {command}: {e}") + logger.error(f"Error executing command {command}: {e}\n{traceback.format_exc()}") def start(self, use_queue: bool = False) -> None: """Start the bot and listen for messages""" @@ -291,10 +293,13 @@ class TwitchBot: # Try to connect with up to 3 attempts, possibly with token refreshes max_attempts = 3 for attempt in range(1, max_attempts + 1): - logger.info(f"Connection attempt {attempt}/{max_attempts}") - if self.connect(): + res = self.connect() + + if res: break - elif attempt < max_attempts: + + logger.info(f"Connection attempt {attempt}/{max_attempts}") + if attempt < max_attempts: # Wait a moment before retrying time.sleep(2) logger.info("Retrying connection...") @@ -370,3 +375,30 @@ class TwitchBot: def get_queue_stats(self) -> Dict[str, Any]: """Get current queue statistics""" return get_queue_stats() + +# Add a standalone version of is_admin for use by other modules +def is_admin(username: str, channel: str = None, admin_users: List[str] = None) -> bool: + """ + Check if a user has admin privileges + + Args: + username: The username to check + channel: The channel name (channel owner is always an admin) + admin_users: Optional list of additional admin users + + Returns: + bool: True if the user is an admin, False otherwise + """ + # Normalize usernames to lowercase + username = username.lower() + + # Create admin list - channel owner is always an admin + admins = [] + if channel: + admins.append(channel.lower()) + + # Add additional admins if provided + if admin_users: + admins.extend([user.lower() for user in admin_users]) + + return username in admins diff --git a/src/game/input/gamepad.py b/src/game/input/gamepad.py index 20375dc..26668e3 100644 --- a/src/game/input/gamepad.py +++ b/src/game/input/gamepad.py @@ -1,8 +1,13 @@ import time +import logging +import traceback import threading from typing import Optional, Union, Any, Literal, Dict, Tuple, List, Set import ctypes +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger(__name__) + # Try to import vgamepad for Xbox controller emulation try: import vgamepad as vg @@ -69,7 +74,7 @@ class VirtualController: self.gamepad = vg.VX360Gamepad() print("Virtual Xbox 360 controller initialized successfully") except Exception as e: - print(f"Failed to initialize virtual gamepad: {e}") + logger.error(f"Failed to initialize virtual gamepad: {e}\n{traceback.format_exc()}") self.available = False # Initialize SDL2 for physical controller if available diff --git a/src/queue/server.py b/src/queue/server.py index 098fea4..4636bed 100644 --- a/src/queue/server.py +++ b/src/queue/server.py @@ -5,7 +5,9 @@ import socket import time import random import cloudpickle +import traceback from typing import Dict, Any, Optional, List, Callable, Union, Tuple +import json # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') @@ -14,42 +16,146 @@ logger = logging.getLogger('twitch-queue') # Define constants for input modes INPUT_MODE_KEYBOARD = "keyboard" INPUT_MODE_CONTROLLER = "controller" -INPUT_MODE_FILE = "input_mode.txt" +SHELVE_FILE = "twitch_settings" # shelve will add extension automatically +SETTINGS_KEY_INPUT_MODE = "input_mode" +SETTINGS_KEY_KEYBOARD_STATE = "keyboard_state" +SETTINGS_KEY_CONTROLLER_STATE = "controller_state" +SETTINGS_KEY_CONTROLLER_INITIALIZED = "controller_initialized" +SETTINGS_KEY_KEYBOARD_INITIALIZED = "keyboard_initialized" + +# Shelve key for active game +SETTINGS_KEY_ACTIVE_GAME = "active_game_name" +GAME_CONFIG_FILE_PATH = "data/config/game_controls.json" + +# Queue stats shelve keys +STATS_KEY_TOTAL_ENQUEUED = "total_enqueued" +STATS_KEY_TOTAL_PROCESSED = "total_processed" +STATS_KEY_LAST_TASK_TIME = "last_task_time" +STATS_KEY_TASKS_BY_TYPE = "tasks_by_type" +STATS_KEY_TASKS_BY_USER = "tasks_by_user" + +# Shelve key for command log +SETTINGS_KEY_COMMAND_LOG = "command_log" +# Max number of commands to keep in history +MAX_COMMAND_LOG_SIZE = 100 + +# ---- GAME CONTROL CONFIGURATION HELPERS ---- + +_game_configs_cache = None + +def load_game_configs() -> Dict[str, Any]: + """Load game configurations from the JSON file.""" + global _game_configs_cache + if _game_configs_cache is not None: + return _game_configs_cache -# Function to get/set the current input mode using persistent storage -def get_input_mode(): - """Read the current input mode from persistent storage""" try: - if os.path.exists(INPUT_MODE_FILE): - with open(INPUT_MODE_FILE, 'r') as f: - mode = f.read().strip() - if mode in [INPUT_MODE_KEYBOARD, INPUT_MODE_CONTROLLER]: - return mode + with open(GAME_CONFIG_FILE_PATH, 'r') as f: + configs = json.load(f) + _game_configs_cache = configs + return configs + except FileNotFoundError: + logger.error(f"Game configuration file not found: {GAME_CONFIG_FILE_PATH}") + return {"games": {}, "default_game": None} + except json.JSONDecodeError as e: + logger.error(f"Error decoding game configuration file {GAME_CONFIG_FILE_PATH}: {e}") + return {"games": {}, "default_game": None} except Exception as e: - logger.error(f"Error reading input mode: {e}") + logger.error(f"Unexpected error loading game configs: {e}\n{traceback.format_exc()}") + return {"games": {}, "default_game": None} + +def get_active_game_name() -> Optional[str]: + """Get the currently active game name from shelve, with fallback to default.""" + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + active_game = db.get(SETTINGS_KEY_ACTIVE_GAME) + if active_game: + return active_game + except Exception as e: + logger.error(f"Error reading active game from shelve: {e}\n{traceback.format_exc()}") + + # Fallback to default from config file + configs = load_game_configs() + return configs.get("default_game") + +def set_active_game_name(game_name: str) -> bool: + """Set the active game name in shelve.""" + import shelve + configs = load_game_configs() + if game_name not in configs.get("games", {}): + logger.warning(f"Attempted to set active game to '{game_name}', but it's not defined in {GAME_CONFIG_FILE_PATH}.") + # Allow setting it anyway, maybe the config will be updated later + # return False + try: + with shelve.open(SHELVE_FILE) as db: + db[SETTINGS_KEY_ACTIVE_GAME] = game_name + logger.info(f"Active game set to: {game_name}") + return True + except Exception as e: + logger.error(f"Error writing active game to shelve: {e}\n{traceback.format_exc()}") + return False + +def get_current_game_control_config(command_name: str, controller_type: str) -> Optional[Dict[str, Any]]: + """Get the control mapping for a specific command in the active game.""" + active_game = get_active_game_name() + if not active_game: + logger.warning("No active game set and no default game found.") + return None + + all_configs = load_game_configs() + game_specific_configs = all_configs.get("games", {}).get(active_game) + + if not game_specific_configs: + logger.warning(f"No configuration found for active game: {active_game}") + return None + + # controller_type will be INPUT_MODE_KEYBOARD or INPUT_MODE_CONTROLLER + # The keys in game_controls.json are "keyboard" and "controller" + config_key = "keyboard" if controller_type == INPUT_MODE_KEYBOARD else "controller" + + control_map = game_specific_configs.get(config_key, {}).get(command_name) + + if not control_map: + # logger.debug(f"No control mapping found for command '{command_name}' in game '{active_game}' for type '{config_key}'") + return None + + return control_map + +# Function to get/set the current input mode using shelve for persistence +def get_input_mode(): + """Read the current input mode from persistent storage using shelve""" + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + mode = db.get(SETTINGS_KEY_INPUT_MODE, INPUT_MODE_KEYBOARD) + if mode in [INPUT_MODE_KEYBOARD, INPUT_MODE_CONTROLLER]: + return mode + except Exception as e: + logger.error(f"Error reading input mode from shelve: {e}\n{traceback.format_exc()}") # Default to keyboard mode if not set or error return INPUT_MODE_KEYBOARD def set_input_mode(mode): - """Save the current input mode to persistent storage""" + """Save the current input mode to persistent storage using shelve""" + import shelve if mode not in [INPUT_MODE_KEYBOARD, INPUT_MODE_CONTROLLER, None]: logger.error(f"Invalid input mode: {mode}") return False try: - if mode is None: - # Remove the file if mode is None (reset) - if os.path.exists(INPUT_MODE_FILE): - os.remove(INPUT_MODE_FILE) - return True - - # Write mode to file - with open(INPUT_MODE_FILE, 'w') as f: - f.write(mode) + with shelve.open(SHELVE_FILE) as db: + if mode is None: + # Remove the key if mode is None (reset) + if SETTINGS_KEY_INPUT_MODE in db: + del db[SETTINGS_KEY_INPUT_MODE] + else: + # Store the mode + db[SETTINGS_KEY_INPUT_MODE] = mode return True except Exception as e: - logger.error(f"Error writing input mode: {e}") + logger.error(f"Error writing input mode to shelve: {e}\n{traceback.format_exc()}") return False # CloudPickle serializer for Huey - wraps cloudpickle to provide the interface Huey expects @@ -76,38 +182,98 @@ huey = SqliteHuey( # Queue statistics class QueueStats: def __init__(self) -> None: - self.total_enqueued: int = 0 - self.total_processed: int = 0 - self.last_task_time: float = 0 - self.tasks_by_type: Dict[str, int] = {} - self.tasks_by_user: Dict[str, int] = {} + # Initialize shelve db with default values if not exists + self._ensure_defaults() + + def _ensure_defaults(self) -> None: + """Ensure default values exist in the shelve database""" + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + # Initialize defaults if not set + if STATS_KEY_TOTAL_ENQUEUED not in db: + db[STATS_KEY_TOTAL_ENQUEUED] = 0 + if STATS_KEY_TOTAL_PROCESSED not in db: + db[STATS_KEY_TOTAL_PROCESSED] = 0 + if STATS_KEY_LAST_TASK_TIME not in db: + db[STATS_KEY_LAST_TASK_TIME] = 0 + if STATS_KEY_TASKS_BY_TYPE not in db: + db[STATS_KEY_TASKS_BY_TYPE] = {} + if STATS_KEY_TASKS_BY_USER not in db: + db[STATS_KEY_TASKS_BY_USER] = {} + except Exception as e: + logger.error(f"Error initializing queue stats in shelve: {e}\n{traceback.format_exc()}") def log_enqueue(self, task_type: str, username: str) -> None: """Record stats when a task is enqueued""" - self.total_enqueued += 1 - self.last_task_time = time.time() - - if task_type not in self.tasks_by_type: - self.tasks_by_type[task_type] = 0 - self.tasks_by_type[task_type] += 1 - - if username not in self.tasks_by_user: - self.tasks_by_user[username] = 0 - self.tasks_by_user[username] += 1 + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + # Update total_enqueued + total_enqueued = db.get(STATS_KEY_TOTAL_ENQUEUED, 0) + 1 + db[STATS_KEY_TOTAL_ENQUEUED] = total_enqueued + + # Update last_task_time + db[STATS_KEY_LAST_TASK_TIME] = time.time() + + # Update tasks_by_type + tasks_by_type = db.get(STATS_KEY_TASKS_BY_TYPE, {}) + if task_type not in tasks_by_type: + tasks_by_type[task_type] = 0 + tasks_by_type[task_type] += 1 + db[STATS_KEY_TASKS_BY_TYPE] = tasks_by_type + + # Update tasks_by_user + tasks_by_user = db.get(STATS_KEY_TASKS_BY_USER, {}) + if username not in tasks_by_user: + tasks_by_user[username] = 0 + tasks_by_user[username] += 1 + db[STATS_KEY_TASKS_BY_USER] = tasks_by_user + except Exception as e: + logger.error(f"Error updating queue stats for enqueue in shelve: {e}\n{traceback.format_exc()}") def log_processed(self, task_type: str) -> None: """Record stats when a task is processed""" - self.total_processed += 1 + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + # Update total_processed + total_processed = db.get(STATS_KEY_TOTAL_PROCESSED, 0) + 1 + db[STATS_KEY_TOTAL_PROCESSED] = total_processed + + # Ensure total_processed never exceeds total_enqueued + total_enqueued = db.get(STATS_KEY_TOTAL_ENQUEUED, 0) + if total_processed > total_enqueued: + db[STATS_KEY_TOTAL_ENQUEUED] = total_processed + except Exception as e: + logger.error(f"Error updating queue stats for processing in shelve: {e}\n{traceback.format_exc()}") def get_stats(self) -> Dict[str, Any]: """Get current queue statistics""" - return { - 'total_enqueued': self.total_enqueued, - 'total_processed': self.total_processed, - 'pending': self.total_enqueued - self.total_processed, - 'tasks_by_type': self.tasks_by_type, - 'tasks_by_user': self.tasks_by_user - } + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + total_enqueued = db.get(STATS_KEY_TOTAL_ENQUEUED, 0) + total_processed = db.get(STATS_KEY_TOTAL_PROCESSED, 0) + pending = max(0, total_enqueued - total_processed) + + return { + 'total_enqueued': total_enqueued, + 'total_processed': total_processed, + 'pending': pending, + 'tasks_by_type': db.get(STATS_KEY_TASKS_BY_TYPE, {}), + 'tasks_by_user': db.get(STATS_KEY_TASKS_BY_USER, {}) + } + except Exception as e: + logger.error(f"Error retrieving queue stats from shelve: {e}\n{traceback.format_exc()}") + # Return empty stats on error + return { + 'total_enqueued': 0, + 'total_processed': 0, + 'pending': 0, + 'tasks_by_type': {}, + 'tasks_by_user': {} + } # Create a global stats object queue_stats = QueueStats() @@ -116,11 +282,37 @@ queue_stats = QueueStats() IRC_SERVER = 'irc.chat.twitch.tv' IRC_PORT = 6667 -# Command handler type +# CommandHandler type CommandHandler = Callable[[str, List[str], str, str, str], bool] -# Command registry - maps command names to handler functions -command_registry: Dict[str, CommandHandler] = {} +# Command registry structure - stores command info including description and category +class CommandInfo: + def __init__(self, handler: CommandHandler, description: str = "", category: str = "general"): + self.handler = handler + self.description = description + self.category = category + +# Command registry - maps command names to their info objects +command_registry: Dict[str, CommandInfo] = {} + +# Helper functions for command registration +def register_command(name: str, handler: CommandHandler, description: str = "", category: str = "general") -> None: + """Register a command with its handler, description and category""" + command_registry[name] = CommandInfo(handler, description, category) + logger.info(f"Registered command: !{name} ({category})") + +def get_commands_by_category() -> Dict[str, List[Tuple[str, str]]]: + """Get all registered commands organized by category""" + categories: Dict[str, List[Tuple[str, str]]] = {} + + for cmd_name, cmd_info in command_registry.items(): + if cmd_info.category not in categories: + categories[cmd_info.category] = [] + + # Add tuple of (command name, description) + categories[cmd_info.category].append((cmd_name, cmd_info.description)) + + return categories # Try to import the game controller module for controller support try: @@ -130,6 +322,7 @@ try: from src.game.input.gamepad import VirtualController, VGAMEPAD_AVAILABLE # Create controller instances - these will be initialized when needed + # These are process-local instances, but their state will be tracked in shelve keyboard_controller = None gamepad_controller = None @@ -148,148 +341,203 @@ def initialize_controller(controller_type=INPUT_MODE_KEYBOARD): if not CONTROLLER_SUPPORT: logger.warning("Controller support not available") return False + + # Check if already initialized in this process + if controller_type == INPUT_MODE_KEYBOARD and keyboard_controller is not None: + return True + elif controller_type == INPUT_MODE_CONTROLLER and gamepad_controller is not None: + return True try: if controller_type == INPUT_MODE_KEYBOARD: - if keyboard_controller is None: - logger.info("Initializing keyboard controller in queue worker") + # Check if another process has already initialized it + if is_controller_initialized(INPUT_MODE_KEYBOARD): + logger.info("Keyboard controller already initialized in another process") + # Still need to initialize our local instance keyboard_controller = KeyboardController() + return keyboard_controller is not None + + logger.info("Initializing keyboard controller in queue worker") + keyboard_controller = KeyboardController() + if keyboard_controller is not None: + # Update shelve to indicate keyboard controller is initialized + set_controller_initialized(True, INPUT_MODE_KEYBOARD) + logger.info("Keyboard controller initialized and status stored in shelve") return keyboard_controller is not None + elif controller_type == INPUT_MODE_CONTROLLER: if not VGAMEPAD_AVAILABLE: logger.warning("vgamepad not available, can't initialize controller") return False - if gamepad_controller is None: - logger.info("Initializing gamepad controller in queue worker") + # Check if another process has already initialized it + if is_controller_initialized(INPUT_MODE_CONTROLLER): + logger.info("Gamepad controller already initialized in another process") + # Still need to initialize our local instance gamepad_controller = VirtualController() # Give it a moment to initialize time.sleep(0.2) + return gamepad_controller is not None and gamepad_controller.is_available() - # Check if controller was properly initialized - if not gamepad_controller.is_available(): - logger.error("VirtualController was created but is not available") - return False - - logger.info("Virtual gamepad controller successfully initialized") + logger.info("Initializing gamepad controller in queue worker") + gamepad_controller = VirtualController() + # Give it a moment to initialize + time.sleep(0.2) + + # Check if controller was properly initialized + if not gamepad_controller.is_available(): + logger.error("VirtualController was created but is not available") + return False + # Update shelve to indicate gamepad controller is initialized + set_controller_initialized(True, INPUT_MODE_CONTROLLER) + logger.info("Virtual gamepad controller successfully initialized and status stored in shelve") + return gamepad_controller is not None and gamepad_controller.is_available() else: logger.warning(f"Unknown controller type: {controller_type}") return False except Exception as e: - logger.error(f"Error initializing controller: {e}") + logger.error(f"Error initializing controller: {e}\n{traceback.format_exc()}") return False # Maintain the current input mode - either "keyboard" or "controller" # This global variable is no longer used - see get_input_mode() and set_input_mode() functions # for the persistent file-based approach that works across processes -def execute_game_command(command, controller_type=INPUT_MODE_KEYBOARD): - """Execute a game controller command""" - logger.info(f"execute_game_command called for '{command}' with controller_type={controller_type}") - +def execute_game_command(command: str, controller_type_mode: str) -> bool: + """Execute a game controller command based on active game configuration.""" + logger.info(f"execute_game_command called for '{command}' with controller_type_mode={controller_type_mode}") + if not CONTROLLER_SUPPORT: logger.warning("Controller support not available") return False - + + control_config = get_current_game_control_config(command, controller_type_mode) + + if not control_config: + active_game = get_active_game_name() or "Unknown" + logger.warning(f"No control mapping found for command '{command}' in active game '{active_game}' for type '{controller_type_mode}'") + return False + + action_type = control_config.get("action_type") + params = control_config.get("params", {}) + try: - if controller_type == INPUT_MODE_KEYBOARD: - # Make sure keyboard controller is initialized + if controller_type_mode == INPUT_MODE_KEYBOARD: if keyboard_controller is None and not initialize_controller(INPUT_MODE_KEYBOARD): - logger.error("Failed to initialize keyboard controller") + logger.error("Failed to initialize keyboard controller for dynamic command") return False - - if keyboard_controller is None: - logger.error("Keyboard controller initialization failed") + if keyboard_controller is None: # Should be caught by above, but double check + logger.error("Keyboard controller is None after initialization attempt.") return False - - # Map commands to keyboard controller methods - command_mapping = { - "up": lambda: keyboard_controller.press_key('w'), - "down": lambda: keyboard_controller.press_key('s'), - "left": lambda: keyboard_controller.press_key('a'), - "right": lambda: keyboard_controller.press_key('d'), - "jump": lambda: keyboard_controller.press_key('space'), - "attack": lambda: keyboard_controller.press_mouse_button('left'), - "interact": lambda: keyboard_controller.press_key('e'), - "inventory": lambda: keyboard_controller.press_key('i'), - "skill1": lambda: keyboard_controller.press_key('1'), - "skill2": lambda: keyboard_controller.press_key('2'), - "skill3": lambda: keyboard_controller.press_key('3'), - "ultimate": lambda: keyboard_controller.press_key('r'), - } - - if command in command_mapping: - logger.info(f"Executing keyboard command: {command}") - try: - command_mapping[command]() - # Release key after a short delay - time.sleep(0.1) - keyboard_controller.release_all() - return True - except Exception as e: - logger.error(f"Error executing keyboard command {command}: {e}") - return False + + logger.info(f"Executing keyboard command: {command} (Action: {action_type}, Params: {params})") + if action_type == "press_key": + keyboard_controller.press_key(params["key"]) + elif action_type == "press_mouse_button": + keyboard_controller.press_mouse_button(params["button"]) else: - logger.warning(f"Unknown keyboard command: {command}") + logger.warning(f"Unknown keyboard action_type: {action_type} for command {command}") return False - - elif controller_type == INPUT_MODE_CONTROLLER: - # Make sure gamepad controller is initialized + + time.sleep(params.get("duration", 0.1)) # Allow configurable duration, default 0.1 + keyboard_controller.release_all() + + state = get_controller_state(INPUT_MODE_KEYBOARD) + state['last_command'] = command + state['last_command_time'] = time.time() + set_controller_state(state, INPUT_MODE_KEYBOARD) + return True + + elif controller_type_mode == INPUT_MODE_CONTROLLER: if gamepad_controller is None and not initialize_controller(INPUT_MODE_CONTROLLER): - logger.error("Failed to initialize gamepad controller") + logger.error("Failed to initialize gamepad controller for dynamic command") return False - - if gamepad_controller is None: - logger.error("Gamepad controller initialization failed") + if gamepad_controller is None: # Should be caught by above, but double check + logger.error("Gamepad controller is None after initialization attempt.") return False + + logger.info(f"Executing controller command: {command} (Action: {action_type}, Params: {params})") - # Map commands to gamepad controller methods - command_mapping = { - "up": lambda: gamepad_controller.move_left_stick_up(), - "down": lambda: gamepad_controller.move_left_stick_down(), - "left": lambda: gamepad_controller.move_left_stick_left(), - "right": lambda: gamepad_controller.move_left_stick_right(), - "look_up": lambda: gamepad_controller.move_right_stick_up(), - "look_down": lambda: gamepad_controller.move_right_stick_down(), - "look_left": lambda: gamepad_controller.move_right_stick_left(), - "look_right": lambda: gamepad_controller.move_right_stick_right(), - "jump": lambda: gamepad_controller.press_a(), - "action": lambda: gamepad_controller.press_b(), - "interact": lambda: gamepad_controller.press_x(), - "menu": lambda: gamepad_controller.press_y(), - "block": lambda: gamepad_controller.press_left_shoulder(), - "attack": lambda: gamepad_controller.press_right_shoulder(), - "aim": lambda: gamepad_controller.press_left_trigger(), - "shoot": lambda: gamepad_controller.press_right_trigger(), - "dup": lambda: gamepad_controller.press_dpad_up(), - "dright": lambda: gamepad_controller.press_dpad_right(), - "ddown": lambda: gamepad_controller.press_dpad_down(), - "dleft": lambda: gamepad_controller.press_dpad_left(), - "start": lambda: gamepad_controller.press_start(), - "select": lambda: gamepad_controller.press_back(), - } - - if command in command_mapping: - logger.info(f"Executing controller command: {command}") - try: - command_mapping[command]() - # Release buttons after a short delay - time.sleep(0.1) - gamepad_controller.reset() - return True - except Exception as e: - logger.error(f"Error executing controller command {command}: {e}") + # Gamepad action dispatch + if action_type == "press_button": + button_name = params.get("button_name") + if button_name == "a": gamepad_controller.press_a() + elif button_name == "b": gamepad_controller.press_b() + elif button_name == "x": gamepad_controller.press_x() + elif button_name == "y": gamepad_controller.press_y() + elif button_name == "left_shoulder": gamepad_controller.press_left_shoulder() + elif button_name == "right_shoulder": gamepad_controller.press_right_shoulder() + # Add other buttons from VirtualController like start, back if they are simple presses + elif button_name == "start": gamepad_controller.press_start() + elif button_name == "back": gamepad_controller.press_back() + else: + logger.warning(f"Unknown button_name: {button_name} for command {command}") return False + elif action_type == "press_trigger": + trigger_name = params.get("trigger_name") + # Assuming press_left_trigger and press_right_trigger take a value if needed, + # or are simple presses. The current VirtualController methods are simple presses. + if trigger_name == "left": gamepad_controller.press_left_trigger() + elif trigger_name == "right": gamepad_controller.press_right_trigger() + else: + logger.warning(f"Unknown trigger_name: {trigger_name} for command {command}") + return False + elif action_type == "move_left_stick": + direction = params.get("direction") + if direction == "up": gamepad_controller.move_left_stick_up() + elif direction == "down": gamepad_controller.move_left_stick_down() + elif direction == "left": gamepad_controller.move_left_stick_left() + elif direction == "right": gamepad_controller.move_left_stick_right() + # Potentially add specific x, y values if your VirtualController supports it + # e.g., elif params.get("x_value") is not None and params.get("y_value") is not None: + # gamepad_controller.set_left_stick_position(params["x_value"], params["y_value"]) + else: + logger.warning(f"Unknown left_stick direction: {direction} for command {command}") + return False + elif action_type == "move_right_stick": + direction = params.get("direction") + if direction == "up": gamepad_controller.move_right_stick_up() + elif direction == "down": gamepad_controller.move_right_stick_down() + elif direction == "left": gamepad_controller.move_right_stick_left() + elif direction == "right": gamepad_controller.move_right_stick_right() + # Similar for x, y values if supported + else: + logger.warning(f"Unknown right_stick direction: {direction} for command {command}") + return False + elif action_type == "press_dpad": + direction = params.get("direction") + if direction == "up": gamepad_controller.press_dpad_up() + elif direction == "down": gamepad_controller.press_dpad_down() + elif direction == "left": gamepad_controller.press_dpad_left() + elif direction == "right": gamepad_controller.press_dpad_right() + else: + logger.warning(f"Unknown dpad direction: {direction} for command {command}") + return False + # Add other action_types like "set_trigger_value" if your VirtualController evolves else: - logger.warning(f"Unknown controller command: {command}") + logger.warning(f"Unknown controller action_type: {action_type} for command {command}") return False + + time.sleep(params.get("duration", 0.1)) # Allow configurable duration + gamepad_controller.reset() # Resets all controls + + state = get_controller_state(INPUT_MODE_CONTROLLER) + state['last_command'] = command + state['last_command_time'] = time.time() + set_controller_state(state, INPUT_MODE_CONTROLLER) + return True + else: - logger.warning(f"Unknown controller type: {controller_type}") + logger.warning(f"Unknown controller_type_mode: {controller_type_mode} in execute_game_command") return False + + except KeyError as e: + logger.error(f"Missing parameter in control_config for command '{command}', action '{action_type}': {e}\\n{traceback.format_exc()}") + return False except Exception as e: - logger.error(f"Error executing game command: {e}") + logger.error(f"Error executing game command '{command}': {e}\\n{traceback.format_exc()}") return False # Function to reset input mode (e.g., when switching games) @@ -297,7 +545,16 @@ def reset_input_mode(): """Reset the current input mode to allow switching between keyboard and controller""" # Use persistent storage for input mode set_input_mode(None) - logger.info("Input mode has been reset") + + # Reset controller initialization status in shared storage + set_controller_initialized(False, INPUT_MODE_KEYBOARD) + set_controller_initialized(False, INPUT_MODE_CONTROLLER) + + # Reset controller states + set_controller_state({}, INPUT_MODE_KEYBOARD) + set_controller_state({}, INPUT_MODE_CONTROLLER) + + logger.info("Input mode and controller states have been reset") # ---- COMMUNICATION UTILITIES ---- @@ -327,7 +584,7 @@ def send_twitch_message(channel: str, oauth_token: str, username: str, message: logger.info(f"Sent message to #{channel}: {message}") return True except Exception as e: - logger.error(f"Error sending message to Twitch: {e}") + logger.error(f"Error sending message to Twitch: {e}\n{traceback.format_exc()}") return False # ---- COMMAND HANDLERS ---- @@ -346,6 +603,9 @@ def handle_hello_task(username: str, args: List[str], channel: str, oauth_token: 'timestamp': time.time() } +# Register hello command +register_command("hello", handle_hello_task, "Say hello to the bot", "general") + @huey.task() def handle_dice_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: """Handler for !dice command""" @@ -362,6 +622,9 @@ def handle_dice_task(username: str, args: List[str], channel: str, oauth_token: 'timestamp': time.time() } +# Register dice command +register_command("dice", handle_dice_task, "Roll a dice with optional sides (e.g. !dice 20)", "general") + @huey.task() def handle_echo_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: """Handler for !echo command""" @@ -378,6 +641,9 @@ def handle_echo_task(username: str, args: List[str], channel: str, oauth_token: 'timestamp': time.time() } +# Register echo command +register_command("echo", handle_echo_task, "Echo back the message you send", "general") + @huey.task() def handle_8ball_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: """Handler for !8ball command""" @@ -413,6 +679,9 @@ def handle_8ball_task(username: str, args: List[str], channel: str, oauth_token: 'timestamp': time.time() } +# Register 8ball command +register_command("8ball", handle_8ball_task, "Ask the magic 8ball a question", "general") + @huey.task() def handle_qstats_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: """Handler for !qstats command""" @@ -435,6 +704,48 @@ def handle_qstats_task(username: str, args: List[str], channel: str, oauth_token 'timestamp': time.time() } +# Register qstats command +register_command("qstats", handle_qstats_task, "Show queue statistics", "queue") + +def log_command(username: str, command: str, command_type: str, success: bool = True, args: List[str] = None) -> None: + """ + Log a command to the persistent command history + + Args: + username: The user who executed the command + command: The command that was executed + command_type: The type of command (keyboard, controller, etc.) + success: Whether the command executed successfully + args: The arguments passed to the command + """ + logger.info(f"User {username} {'' if success else 'failed to'} execute {command_type} command: {command}{' ' + ' '.join(args) if args else ''}") + + # Store this command in the shared state + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + # Get or initialize the command log + command_log = db.get(SETTINGS_KEY_COMMAND_LOG, []) + + # Add this command to the log + command_log.append({ + 'timestamp': time.time(), + 'username': username, + 'command': command, + 'type': command_type, + 'success': success, + 'args': args if args else [] + }) + + # Keep only the latest commands + if len(command_log) > MAX_COMMAND_LOG_SIZE: + command_log = command_log[-MAX_COMMAND_LOG_SIZE:] + + # Save back to shelve + db[SETTINGS_KEY_COMMAND_LOG] = command_log + except Exception as e: + logger.error(f"Error logging command to shelve: {e}\n{traceback.format_exc()}") + @huey.task() def handle_keyboard_command_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str, command: str) -> Dict[str, Any]: """Handler for keyboard mode game commands""" @@ -444,13 +755,16 @@ def handle_keyboard_command_task(username: str, args: List[str], channel: str, o if current_mode != INPUT_MODE_KEYBOARD: message = f"@{username}, unable to use keyboard commands while controller mode is active" send_twitch_message(channel, oauth_token, bot_username, message) + # Only log once here with failure + log_command(username, command, "keyboard", False, args) return { 'success': False, 'username': username, 'command': command, 'command_type': 'keyboard', 'error': 'input_mode_mismatch', - 'timestamp': time.time() + 'timestamp': time.time(), + 'already_logged': True # Add flag to prevent double-logging } # Keep the controller operations completely isolated @@ -459,15 +773,23 @@ def handle_keyboard_command_task(username: str, args: List[str], channel: str, o # Execute the command execute_success = execute_game_command(command, INPUT_MODE_KEYBOARD) if execute_success: - send_twitch_message(channel, oauth_token, bot_username, f"@{username} used {command}") + # Don't send feedback messages for successful key presses to reduce chat spam + # But do log it for tracking purposes (this is the single log entry we want) + log_command(username, command, "keyboard", True, args) else: send_twitch_message(channel, oauth_token, bot_username, f"@{username}, failed to execute {command}") + # Log the failure (this is the single log entry we want) + log_command(username, command, "keyboard", False, args) else: send_twitch_message(channel, oauth_token, bot_username, f"@{username}, keyboard controller not available") + # Log the initialization failure (this is the single log entry we want) + log_command(username, command, "keyboard", False, args) except Exception as e: - logger.error(f"Error in keyboard command task: {e}") + logger.error(f"Error in keyboard command task: {e}\n{traceback.format_exc()}") execute_success = False send_twitch_message(channel, oauth_token, bot_username, f"@{username}, error executing {command}: {str(e)}") + # Log the exception (this is the single log entry we want) + log_command(username, command, "keyboard", False, args) # Only return simple, picklable data with no references to any controller objects return { @@ -475,7 +797,8 @@ def handle_keyboard_command_task(username: str, args: List[str], channel: str, o 'username': username, 'command': command, 'command_type': 'keyboard', - 'timestamp': time.time() + 'timestamp': time.time(), + 'already_logged': True # Add flag to prevent double-logging } @huey.task() @@ -489,13 +812,16 @@ def handle_controller_command_task(username: str, args: List[str], channel: str, if current_mode != INPUT_MODE_CONTROLLER: message = f"@{username}, unable to use controller commands while keyboard mode is active" send_twitch_message(channel, oauth_token, bot_username, message) + # Only log once here with failure + log_command(username, command, "controller", False, args) return { 'success': False, 'username': username, 'command': command, 'command_type': 'controller', 'error': 'input_mode_mismatch', - 'timestamp': time.time() + 'timestamp': time.time(), + 'already_logged': True # Add flag to prevent double-logging } # Keep the controller operations completely isolated @@ -504,15 +830,23 @@ def handle_controller_command_task(username: str, args: List[str], channel: str, if initialize_controller(INPUT_MODE_CONTROLLER): execute_success = execute_game_command(command, INPUT_MODE_CONTROLLER) if execute_success: - send_twitch_message(channel, oauth_token, bot_username, f"@{username} used {command}") + # Don't send feedback messages for successful button presses to reduce chat spam + # But do log it for tracking purposes (this is the single log entry we want) + log_command(username, command, "controller", True, args) else: send_twitch_message(channel, oauth_token, bot_username, f"@{username}, failed to execute {command}") + # Log the failure (this is the single log entry we want) + log_command(username, command, "controller", False, args) else: send_twitch_message(channel, oauth_token, bot_username, f"@{username}, controller support not available") + # Log the initialization failure (this is the single log entry we want) + log_command(username, command, "controller", False, args) except Exception as e: - logger.error(f"Error in controller command task: {e}") + logger.error(f"Error in controller command task: {e}\n{traceback.format_exc()}") execute_success = False send_twitch_message(channel, oauth_token, bot_username, f"@{username}, error executing {command}: {str(e)}") + # Log the exception (this is the single log entry we want) + log_command(username, command, "controller", False, args) # Only return simple, picklable data with no references to any controller objects return { @@ -520,7 +854,8 @@ def handle_controller_command_task(username: str, args: List[str], channel: str, 'username': username, 'command': command, 'command_type': 'controller', - 'timestamp': time.time() + 'timestamp': time.time(), + 'already_logged': True # Add flag to prevent double-logging } @huey.task() @@ -528,60 +863,27 @@ def handle_unknown_command_task(username: str, command: str, args: List[str], ch """Handle commands that aren't explicitly registered""" logger.info(f"Attempting to handle unknown command: '{command}'") - # Define valid command lists - keyboard_commands = [ - "up", "down", "left", "right", "jump", "attack", "interact", - "inventory", "skill1", "skill2", "skill3", "ultimate" - ] - - controller_commands = [ - "look_up", "look_down", "look_left", "look_right", - "action", "menu", "block", "aim", "shoot", - "dup", "dright", "ddown", "dleft", "start", "select" - ] - - # Check if it's a keyboard command - if command in keyboard_commands and CONTROLLER_SUPPORT: - return handle_keyboard_command_task(username, args, channel, oauth_token, bot_username, command) - - # Check if it's a controller command - if command in controller_commands and CONTROLLER_SUPPORT: - return handle_controller_command_task(username, args, channel, oauth_token, bot_username, command) - - # Handle a few standard commands that might not be registered + # Help command is now registered properly through register_command + # This is kept for backward compatibility in case older code still calls + # handle_unknown_command_task directly with "help" as the command if command == "help": - send_twitch_message(channel, oauth_token, bot_username, - f"@{username}, available commands: !hello, !dice, !echo, !8ball, !qstats") - if CONTROLLER_SUPPORT: - send_twitch_message(channel, oauth_token, bot_username, - f"Game commands: !up, !down, !left, !right, !jump, etc.") - send_twitch_message(channel, oauth_token, bot_username, - f"Admins can use !switchmode [keyboard|controller|reset] to change input mode") - return { - 'success': True, - 'username': username, - 'command': 'help', - 'timestamp': time.time() - } + # Redirect to the registered help command handler + return handle_help_task(username, args, channel, oauth_token, bot_username) - elif command == "ping": - send_twitch_message(channel, oauth_token, bot_username, f"@{username}, Pong!") - return { - 'success': True, - 'username': username, - 'command': 'ping', - 'timestamp': time.time() - } - # Command not recognized logger.warning(f"Command '{command}' not recognized as a known command") send_twitch_message(channel, oauth_token, bot_username, f"Sorry @{username}, the command !{command} is not supported.") + + # Log the unknown command with appropriate status + log_command(username, command, "unknown", False, args) + return { 'success': False, 'error': f"Command '{command}' not found", 'username': username, 'command': command, - 'timestamp': time.time() + 'timestamp': time.time(), + 'already_logged': True # Flag to prevent double-logging } @huey.task() @@ -673,6 +975,127 @@ def handle_switchmode_task(username: str, args: List[str], channel: str, oauth_t 'timestamp': time.time() } +# Register switchmode command +register_command("switchmode", handle_switchmode_task, + "Change input mode between keyboard and controller", "admin") + +# Register basic keyboard commands +def register_keyboard_commands(): + """Register keyboard commands based on the default game configuration.""" + configs = load_game_configs() + default_game_name = configs.get("default_game") + if not default_game_name: + logger.warning("No default_game specified in game_controls.json. Cannot register default keyboard commands.") + return + + default_game_config = configs.get("games", {}).get(default_game_name, {}).get("keyboard", {}) + if not default_game_config: + logger.warning(f"No keyboard configuration found for default game '{default_game_name}' in game_controls.json.") + return + + for cmd_name, cmd_details in default_game_config.items(): + description = cmd_details.get("description", f"Perform '{cmd_name}' keyboard action (game-specific)") + register_command(cmd_name, + lambda u, a, c, o, b, cmd=cmd_name: handle_keyboard_command_task(u, a, c, o, b, cmd), + description, + "keyboard") + logger.info(f"Registered {len(default_game_config)} keyboard commands based on default game: {default_game_name}") + +# Register basic controller commands +def register_controller_commands(): + """Register controller commands based on the default game configuration.""" + configs = load_game_configs() + default_game_name = configs.get("default_game") + if not default_game_name: + logger.warning("No default_game specified in game_controls.json. Cannot register default controller commands.") + return + + default_game_config = configs.get("games", {}).get(default_game_name, {}).get("controller", {}) + if not default_game_config: + logger.warning(f"No controller configuration found for default game '{default_game_name}' in game_controls.json.") + return + + for cmd_name, cmd_details in default_game_config.items(): + description = cmd_details.get("description", f"Perform '{cmd_name}' controller action (game-specific)") + register_command(cmd_name, + lambda u, a, c, o, b, cmd=cmd_name: handle_controller_command_task(u, a, c, o, b, cmd), + description, + "controller") + logger.info(f"Registered {len(default_game_config)} controller commands based on default game: {default_game_name}") + +@huey.task() +def handle_gamehelp_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + """Handler for !gamehelp command to show all available game controls for the active game.""" + active_game_name = get_active_game_name() + current_input_mode = get_input_mode() # keyboard or controller + + if not active_game_name: + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, no active game is set. Use !setgame .") + return { + 'success': True, 'username': username, 'command': 'gamehelp', 'timestamp': time.time() + } + + all_configs = load_game_configs() + game_config = all_configs.get("games", {}).get(active_game_name) + + if not game_config: + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, configuration for active game '{active_game_name}' not found.") + return { + 'success': True, 'username': username, 'command': 'gamehelp', 'timestamp': time.time() + } + + config_key = "keyboard" if current_input_mode == INPUT_MODE_KEYBOARD else "controller" + controls_to_display = game_config.get(config_key, {}) + game_desc = game_config.get("description", active_game_name) + + if not controls_to_display: + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, no {config_key} controls defined for game: {game_desc}.") + return { + 'success': True, 'username': username, 'command': 'gamehelp', 'timestamp': time.time() + } + + send_twitch_message(channel, oauth_token, bot_username, + f"@{username}, available {config_key} controls for {game_desc} (current mode):") + + # Sort commands by name for consistent display + sorted_control_names = sorted(controls_to_display.keys()) + + # Prepare formatted list of commands with descriptions + formatted_commands = [] + for cmd_name in sorted_control_names: + control_details = controls_to_display[cmd_name] + desc = control_details.get("description", "No description.") + formatted_commands.append(f"!{cmd_name} - {desc}") + + # Send in chunks + chunk_size = 3 # Reduced chunk size as descriptions can be longer + for i in range(0, len(formatted_commands), chunk_size): + chunk = formatted_commands[i:i+chunk_size] + send_twitch_message(channel, oauth_token, bot_username, ", ".join(chunk)) + + from src.core.twitch import is_admin + if is_admin(username, channel): + send_twitch_message(channel, oauth_token, bot_username, + f"Admin: Use !switchmode [keyboard|controller|reset] to change input mode. Active game: {active_game_name} (use !setgame to change)") + + return { + 'success': True, + 'username': username, + 'command': 'gamehelp', + 'active_game': active_game_name, + 'input_mode': current_input_mode, + 'timestamp': time.time() + } + +# Register gamehelp command +register_command("gamehelp", handle_gamehelp_task, + "Show all available game controls for the active game", "general") + +# Register commands based on the input mode when the server starts +if CONTROLLER_SUPPORT: + register_keyboard_commands() + register_controller_commands() + # ---- TASK PROCESSING ---- @huey.task() @@ -685,86 +1108,34 @@ def process_chat_command(username: str, command: str, args: List[str], """ logger.info(f"Processing command '{command}' from {username} with args: {args}") - # Record stats + # Record stats - this happens in the consumer process when the task is actually processed queue_stats.log_processed(f"command:{command}") - # Determine the current input mode from persistent storage - mode = get_input_mode() - logger.info(f"Current input mode: {mode}") - - # Create command map based on the current mode - command_task_map = { - # Standard commands - always available regardless of mode - "hello": handle_hello_task, - "dice": handle_dice_task, - "echo": handle_echo_task, - "8ball": handle_8ball_task, - "qstats": handle_qstats_task, - "switchmode": handle_switchmode_task, - } - - # Add mode-specific commands - if mode == INPUT_MODE_KEYBOARD: - # Add keyboard-specific commands - keyboard_commands = { - # Directional and common commands - "up": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "up"), - "down": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "down"), - "left": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "left"), - "right": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "right"), - "jump": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "jump"), - "attack": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "attack"), - "interact": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "interact"), - - # Keyboard-only commands - "inventory": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "inventory"), - "skill1": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "skill1"), - "skill2": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "skill2"), - "skill3": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "skill3"), - "ultimate": lambda u, a, c, o, b: handle_keyboard_command_task(u, a, c, o, b, "ultimate"), - } - command_task_map.update(keyboard_commands) - else: # mode == INPUT_MODE_CONTROLLER - # Add controller-specific commands - controller_commands = { - # Directional and common commands - "up": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "up"), - "down": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "down"), - "left": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "left"), - "right": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "right"), - "jump": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "jump"), - "attack": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "attack"), - "interact": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "interact"), - - # Controller-only commands - "look_up": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "look_up"), - "look_down": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "look_down"), - "look_left": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "look_left"), - "look_right": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "look_right"), - "action": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "action"), - "menu": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "menu"), - "block": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "block"), - "aim": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "aim"), - "shoot": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "shoot"), - "dup": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "dup"), - "dright": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "dright"), - "ddown": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "ddown"), - "dleft": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "dleft"), - "start": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "start"), - "select": lambda u, a, c, o, b: handle_controller_command_task(u, a, c, o, b, "select"), - } - command_task_map.update(controller_commands) - try: - # Check if this is a known command - if command in command_task_map: - task_handler = command_task_map[command] + # Check if this is a registered command + if command in command_registry: + cmd_info = command_registry[command] try: - # Execute the dedicated task handler - logger.info(f"Found handler for '{command}' in {mode} mode, executing...") - return task_handler(username, args, channel, oauth_token, bot_username) + # Execute the command handler + logger.info(f"Found handler for '{command}', executing...") + result = cmd_info.handler(username, args, channel, oauth_token, bot_username) + + # Log all commands, not just game commands + # Skip logging if the command handler already logged it (check for already_logged flag) + # Also skip keyboard and controller commands as they handle their own logging + if result is None: + # Handle case where result is None (command handler didn't return anything) + log_command(username, command, cmd_info.category, False) + elif command not in ["commandlog"] and not result.get('already_logged', False) and cmd_info.category not in ["keyboard", "controller"]: + log_command(username, command, cmd_info.category, result.get('success', True), result.get('args', [])) + + return result except Exception as e: - logger.error(f"Error executing command {command}: {e}") + # Log full traceback for better debugging + logger.error(f"Error executing command {command}: {e}\n{traceback.format_exc()}") + # Log failed command + if command in command_registry: + log_command(username, command, command_registry[command].category, False) return { 'success': False, 'error': str(e), @@ -776,9 +1147,14 @@ def process_chat_command(username: str, command: str, args: List[str], else: # Unknown command - try the fallback handler try: - return handle_unknown_command_task(username, command, args, channel, oauth_token, bot_username) + result = handle_unknown_command_task(username, command, args, channel, oauth_token, bot_username) + # No need to log here, as the handler now uses registered commands + return result except Exception as e: - logger.error(f"Error handling unknown command {command}: {e}") + # Log full traceback for better debugging + logger.error(f"Error handling unknown command {command}: {e}\n{traceback.format_exc()}") + # Log failed unknown command + log_command(username, command, "unknown", False, args) return { 'success': False, 'error': str(e), @@ -788,7 +1164,10 @@ def process_chat_command(username: str, command: str, args: List[str], 'timestamp': time.time() } except Exception as e: - logger.error(f"Unexpected error in process_chat_command: {e}") + # Log full traceback for better debugging + logger.error(f"Unexpected error in process_chat_command: {e}\n{traceback.format_exc()}") + # Log the error + log_command(username, command, "unknown", False) # Make sure we return a safely picklable object return { 'success': False, @@ -807,7 +1186,7 @@ def process_chat_message(username: str, message: str) -> Dict[str, Any]: """ logger.info(f"Processing message from {username}: {message}") - # Record stats + # Record stats - this happens in the consumer process when the task is actually processed queue_stats.log_processed("message") # Return information about the processed message @@ -824,6 +1203,8 @@ def enqueue_command(username: str, command: str, args: List[str], bot_username: str = None) -> None: """Add a command to the processing queue""" logger.info(f"Enqueueing command '{command}' from {username} with args: {args}") + + # Record enqueue stats - happens in the producer process queue_stats.log_enqueue(f"command:{command}", username) if not all([channel, oauth_token, bot_username]): @@ -837,13 +1218,375 @@ def enqueue_command(username: str, command: str, args: List[str], def enqueue_message(username: str, message: str) -> None: """Add a regular message to the processing queue""" logger.info(f"Enqueueing message from {username}: {message}") + + # Record enqueue stats - happens in the producer process queue_stats.log_enqueue("message", username) + return process_chat_message(username, message) def get_queue_stats() -> Dict[str, Any]: """Get current queue statistics""" return queue_stats.get_stats() +@huey.task() +def handle_ping_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + """Handler for !ping command""" + success = send_twitch_message(channel, oauth_token, bot_username, f"@{username}, Pong!") + return { + 'success': success, + 'username': username, + 'command': 'ping', + 'timestamp': time.time() + } + +# Register ping command +register_command("ping", handle_ping_task, "Check if the bot is responding", "general") + +@huey.task() +def handle_help_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + """Handler for !help command""" + # Get commands by category + categories = get_commands_by_category() + + # Show general commands first + if "general" in categories: + cmd_list = [f"!{cmd}" for cmd, _ in categories["general"]] + send_twitch_message(channel, oauth_token, bot_username, + f"@{username}, available commands: {', '.join(cmd_list)}") + + # Show keyboard/controller commands if available + if "keyboard" in categories or "controller" in categories: + send_twitch_message(channel, oauth_token, bot_username, + f"Game controls: Use !gamehelp to see all available game commands") + + # Show queue commands if available + if "queue" in categories: + queue_cmds = [f"!{cmd}" for cmd, _ in categories["queue"]] + send_twitch_message(channel, oauth_token, bot_username, + f"Queue commands: {', '.join(queue_cmds)}") + + # Show admin commands if user is admin + from src.core.twitch import is_admin + if is_admin(username, channel) and "admin" in categories: + admin_cmds = [f"!{cmd}" for cmd, _ in categories["admin"]] + send_twitch_message(channel, oauth_token, bot_username, + f"Admin commands: {', '.join(admin_cmds)}") + + # Log the help command directly here to ensure correct status + log_command(username, "help", "general", True, args) + + return { + 'success': True, + 'username': username, + 'command': 'help', + 'timestamp': time.time(), + 'already_logged': True # Flag to prevent double-logging + } + +# Register help command +register_command("help", handle_help_task, "Show available commands", "general") + +@huey.task() +def handle_commandlog_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + """Handler for !commandlog command to show recent command history""" + import shelve + import datetime + + # Parse arguments + max_entries = 5 # Default to show 5 most recent entries + filter_user = None + filter_type = None + + for arg in args: + if arg.isdigit(): + max_entries = min(int(arg), 20) # Limit to 20 max to avoid spam + elif arg.startswith('@'): + filter_user = arg[1:].lower() # Remove @ and normalize case + elif arg in ['keyboard', 'controller']: + filter_type = arg + + try: + # Get command log from shelve + with shelve.open(SHELVE_FILE) as db: + command_log = db.get(SETTINGS_KEY_COMMAND_LOG, []) + + if not command_log: + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, no commands have been logged yet.") + return { + 'success': True, + 'username': username, + 'command': 'commandlog', + 'timestamp': time.time(), + 'entries': 0 + } + + # Filter logs if needed + filtered_log = command_log + + if filter_user: + filtered_log = [entry for entry in filtered_log if entry['username'].lower() == filter_user] + + if filter_type: + filtered_log = [entry for entry in filtered_log if entry['type'] == filter_type] + + if not filtered_log: + filters = [] + if filter_user: + filters.append(f"user @{filter_user}") + if filter_type: + filters.append(f"{filter_type} type") + + filter_msg = " and ".join(filters) + send_twitch_message(channel, oauth_token, bot_username, + f"@{username}, no commands found with {filter_msg} filter.") + return { + 'success': True, + 'username': username, + 'command': 'commandlog', + 'timestamp': time.time(), + 'entries': 0 + } + + # Deduplicate entries - keep the most recent status of each command per user + deduplicated_log = [] + seen_commands = {} # track {username}_{command} -> index in deduplicated_log + + for entry in filtered_log: + # Create a unique key for this user+command combination + entry_key = f"{entry['username']}_{entry['command']}" + + if entry_key in seen_commands: + # Update existing entry with most recent status + existing_idx = seen_commands[entry_key] + deduplicated_log[existing_idx] = entry + else: + # Add new entry + seen_commands[entry_key] = len(deduplicated_log) + deduplicated_log.append(entry) + + # Sort by timestamp (oldest first) + deduplicated_log.sort(key=lambda x: x['timestamp'], reverse=False) + + # Get the most recent entries (take the last N entries instead of first N) + if len(deduplicated_log) <= max_entries: + recent_entries = deduplicated_log + else: + recent_entries = deduplicated_log[-max_entries:] + + # Send header + filter_desc = "" + if filter_user: + filter_desc += f" for @{filter_user}" + if filter_type: + filter_desc += f" using {filter_type}" + + send_twitch_message(channel, oauth_token, bot_username, + f"@{username}, recent commands{filter_desc} (most recent {len(recent_entries)}, deduplicated):") + + # Format and send each entry + for entry in recent_entries: + # Format timestamp to readable time + timestamp = datetime.datetime.fromtimestamp(entry['timestamp']).strftime('%H:%M:%S') + + # Format success/fail indicator + status = "✓" if entry.get('success', True) else "✗" + + # Format command with arguments if present + cmd_with_args = f"!{entry['command']}" + if entry.get('args') and len(entry['args']) > 0: + cmd_with_args += " " + " ".join(entry['args']) + + # Send formatted message + message = f"{timestamp} - {status} @{entry['username']} used {cmd_with_args} ({entry['type']})" + send_twitch_message(channel, oauth_token, bot_username, message) + + return { + 'success': True, + 'username': username, + 'command': 'commandlog', + 'timestamp': time.time(), + 'entries': len(recent_entries) + } + + except Exception as e: + logger.error(f"Error retrieving command log: {e}\n{traceback.format_exc()}") + send_twitch_message(channel, oauth_token, bot_username, + f"@{username}, error retrieving command log: {str(e)}") + return { + 'success': False, + 'username': username, + 'command': 'commandlog', + 'error': str(e), + 'timestamp': time.time() + } + +# Register commandlog command +register_command("commandlog", handle_commandlog_task, + "Show recent command history. Usage: !commandlog [number] [@user] [keyboard|controller]", "admin") + +# After the commandlog command registration, add the clearlog command + +@huey.task() +def handle_clearlog_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + """Handler for !clearlog command to clear the command history""" + import shelve + + # Only allow admins to clear the log + from src.core.twitch import is_admin + if not is_admin(username, channel): + message = f"@{username}, only channel admins can clear the command log" + send_twitch_message(channel, oauth_token, bot_username, message) + return { + 'success': False, + 'username': username, + 'command': 'clearlog', + 'error': 'not_admin', + 'timestamp': time.time() + } + + try: + # Clear command log from shelve + with shelve.open(SHELVE_FILE) as db: + if SETTINGS_KEY_COMMAND_LOG in db: + del db[SETTINGS_KEY_COMMAND_LOG] + + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, command log has been cleared") + return { + 'success': True, + 'username': username, + 'command': 'clearlog', + 'timestamp': time.time() + } + + except Exception as e: + logger.error(f"Error clearing command log: {e}\n{traceback.format_exc()}") + send_twitch_message(channel, oauth_token, bot_username, + f"@{username}, error clearing command log: {str(e)}") + return { + 'success': False, + 'username': username, + 'command': 'clearlog', + 'error': str(e), + 'timestamp': time.time() + } + +# Register clearlog command +register_command("clearlog", handle_clearlog_task, + "Clear the command history (admin only)", "admin") + +@huey.task() +def handle_allcommands_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + """Handler for !allcommands command to show all available commands""" + # Get all commands by category + categories = get_commands_by_category() + + # Check if user is admin + from src.core.twitch import is_admin + is_user_admin = is_admin(username, channel) + + # First message as header + send_twitch_message(channel, oauth_token, bot_username, + f"@{username}, here's a complete list of all available commands by category:") + + # Loop through each category + for category, commands in sorted(categories.items()): + # Skip admin commands for non-admins + if category == "admin" and not is_user_admin: + continue + + # Format message with category name + category_display = category.upper() + if category == "admin" and is_user_admin: + category_display += " (admin only)" + + # Sort commands alphabetically + commands.sort() + + # Send commands in chunks to avoid message length limits + chunks = [commands[i:i+5] for i in range(0, len(commands), 5)] + + # Send category header + send_twitch_message(channel, oauth_token, bot_username, f"{category_display} COMMANDS:") + + # Send command chunks + for chunk in chunks: + cmd_list = [f"!{cmd} - {desc}" for cmd, desc in chunk] + send_twitch_message(channel, oauth_token, bot_username, ", ".join(cmd_list)) + + return { + 'success': True, + 'username': username, + 'command': 'allcommands', + 'timestamp': time.time() + } + +# Register allcommands command +register_command("allcommands", handle_allcommands_task, + "List all available commands organized by category", "general") + +@huey.task() +def handle_setgame_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + """Handler for !setgame command to change the active game configuration.""" + from src.core.twitch import is_admin + if not is_admin(username, channel): + message = f"@{username}, only channel admins can change the active game." + send_twitch_message(channel, oauth_token, bot_username, message) + return { + 'success': False, + 'username': username, + 'command': 'setgame', + 'error': 'not_admin', + 'timestamp': time.time() + } + + if not args: + current_game = get_active_game_name() or "None (default will be used)" + all_configs = load_game_configs() + available_games = list(all_configs.get("games", {}).keys()) + message = f"@{username}, current active game: {current_game}. Available games: {', '.join(available_games) if available_games else 'None defined'}. Use !setgame ." + send_twitch_message(channel, oauth_token, bot_username, message) + return { + 'success': True, + 'username': username, + 'command': 'setgame', + 'current_game': current_game, + 'timestamp': time.time() + } + + new_game_name = args[0] + all_configs = load_game_configs() + + if new_game_name not in all_configs.get("games", {}): + message = f"@{username}, game configuration for '{new_game_name}' not found. Please ensure it's defined in {GAME_CONFIG_FILE_PATH}." + send_twitch_message(channel, oauth_token, bot_username, message) + # Still set it, as the user might add the config later. Or choose to return False. + # For now, we allow setting to a non-existent config name. + + if set_active_game_name(new_game_name): + message = f"@{username}, active game successfully set to: {new_game_name}." + send_twitch_message(channel, oauth_token, bot_username, message) + return { + 'success': True, + 'username': username, + 'command': 'setgame', + 'new_game': new_game_name, + 'timestamp': time.time() + } + else: + message = f"@{username}, failed to set active game to: {new_game_name}. Check logs for details." + send_twitch_message(channel, oauth_token, bot_username, message) + return { + 'success': False, + 'username': username, + 'command': 'setgame', + 'error': 'shelve_write_error', + 'timestamp': time.time() + } + +# Register setgame command +register_command("setgame", handle_setgame_task, + "Set the active game for control configurations (admin only). Usage: !setgame ", "admin") + # Function to start the consumer directly (alternative to command line) def start_consumer() -> None: """ @@ -861,6 +1604,61 @@ def start_consumer() -> None: consumer = Consumer(huey) consumer.start() +# Controller state management +def get_controller_state(controller_type=INPUT_MODE_KEYBOARD): + """Get the current state of a controller from shelve storage""" + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + if controller_type == INPUT_MODE_KEYBOARD: + return db.get(SETTINGS_KEY_KEYBOARD_STATE, {}) + else: + return db.get(SETTINGS_KEY_CONTROLLER_STATE, {}) + except Exception as e: + logger.error(f"Error reading controller state from shelve: {e}\n{traceback.format_exc()}") + return {} + +def set_controller_state(state, controller_type=INPUT_MODE_KEYBOARD): + """Save the current state of a controller to shelve storage""" + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + if controller_type == INPUT_MODE_KEYBOARD: + db[SETTINGS_KEY_KEYBOARD_STATE] = state + else: + db[SETTINGS_KEY_CONTROLLER_STATE] = state + return True + except Exception as e: + logger.error(f"Error writing controller state to shelve: {e}\n{traceback.format_exc()}") + return False + +def set_controller_initialized(initialized, controller_type=INPUT_MODE_KEYBOARD): + """Set whether a controller is initialized in shelve storage""" + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + if controller_type == INPUT_MODE_KEYBOARD: + db[SETTINGS_KEY_KEYBOARD_INITIALIZED] = initialized + else: + db[SETTINGS_KEY_CONTROLLER_INITIALIZED] = initialized + return True + except Exception as e: + logger.error(f"Error writing controller initialization status to shelve: {e}\n{traceback.format_exc()}") + return False + +def is_controller_initialized(controller_type=INPUT_MODE_KEYBOARD): + """Check if a controller is initialized from shelve storage""" + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + if controller_type == INPUT_MODE_KEYBOARD: + return db.get(SETTINGS_KEY_KEYBOARD_INITIALIZED, False) + else: + return db.get(SETTINGS_KEY_CONTROLLER_INITIALIZED, False) + except Exception as e: + logger.error(f"Error reading controller initialization status from shelve: {e}\n{traceback.format_exc()}") + return False + if __name__ == "__main__": logger.info("Queue server module initialized") logger.info("To start the consumer, run:")