This commit introduces a significant refactoring to how command arguments are parsed. The primary goal is to allow users to optionally enclose arguments containing spaces in double quotes for all commands, providing a more flexible and intuitive command input experience.
Key changes include:
1. **Centralized Argument Parsing (`src/queue/server.py`):**
* The `process_chat_command` function in `src/queue/server.py` now handles the primary parsing of command arguments.
* Incoming raw arguments (previously a list of strings split by spaces) are joined into a single string.
* This string is then processed using the regular expression `re.findall(r'"([^"]*)"|(\S+)', ...)` to accurately identify and separate arguments. This regex correctly distinguishes between:
* Substrings enclosed in double quotes (e.g., `"my poll question"`), which are treated as single arguments.
* Sequences of non-whitespace characters (e.g., `1`, `myoption`), also treated as single arguments.
* This ensures that all command handlers receive a list of arguments (`parsed_args`) where multi-word arguments (if originally quoted) are correctly preserved as single elements.
* A correction was made to the regex pattern: an initial version mistakenly used `(\\\\S+)` (matching a literal backslash then 'S') instead of the intended `(\\S+)` (matching any non-whitespace sequence) for unquoted arguments. This fix ensures that simple unquoted arguments like numbers (e.g., for the `!vote` command) are parsed correctly.
* The `re` module import was ensured.
2. **Simplification of Poll Command (`src/features/examples.py`):**
* The `cmd_start_poll` method in `src/features/examples.py` has been simplified. Its specific regex-based logic for parsing quoted poll questions and options was removed.
* This method now relies on the generalized, upstream parsing performed by `process_chat_command` in `src/queue/server.py`. It expects to receive a list of arguments that have already been correctly parsed.
* Test calls for `cmd_start_poll` within the `if __name__ == '__main__':` block were updated to align with this change, demonstrating how arguments (like poll questions and options) are now passed as a simple list of strings.
These changes collectively improve the robustness and usability of the command system by standardizing argument parsing and allowing for more complex argument values.
411 lines
20 KiB
Python
411 lines
20 KiB
Python
import os
|
|
import time
|
|
import random
|
|
import argparse
|
|
import json
|
|
from typing import List, Optional, Dict, Any, Tuple, Union, Literal
|
|
from src.core.twitch import TwitchBot, CommandCallback
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
# Default config file path
|
|
DEFAULT_CONFIG_PATH = "default_config.json"
|
|
|
|
# Example command handlers
|
|
def cmd_hello(username: str, args: List[str], bot: TwitchBot) -> None:
|
|
"""Simple hello command that greets the user"""
|
|
bot.send_message(f"Hello, {username}!")
|
|
|
|
def cmd_dice(username: str, args: List[str], bot: TwitchBot) -> None:
|
|
"""Roll a dice with specified number of sides"""
|
|
sides: int = 6 # Default to 6-sided dice
|
|
|
|
if args and args[0].isdigit():
|
|
sides = int(args[0])
|
|
|
|
result: int = random.randint(1, sides)
|
|
bot.send_message(f"@{username} rolled a {result} (d{sides})")
|
|
|
|
def cmd_echo(username: str, args: List[str], bot: TwitchBot) -> None:
|
|
"""Echo back the user's message"""
|
|
if args:
|
|
message: str = " ".join(args)
|
|
bot.send_message(f"Echo: {message}")
|
|
else:
|
|
bot.send_message(f"@{username}, you didn't provide a message to echo!")
|
|
|
|
def cmd_magic8ball(username: str, args: List[str], bot: TwitchBot) -> None:
|
|
"""Magic 8-ball that gives random answers"""
|
|
responses: List[str] = [
|
|
"It is certain.",
|
|
"It is decidedly so.",
|
|
"Without a doubt.",
|
|
"Yes, definitely.",
|
|
"You may rely on it.",
|
|
"As I see it, yes.",
|
|
"Most likely.",
|
|
"Outlook good.",
|
|
"Yes.",
|
|
"Signs point to yes.",
|
|
"Reply hazy, try again.",
|
|
"Ask again later.",
|
|
"Better not tell you now.",
|
|
"Cannot predict now.",
|
|
"Concentrate and ask again.",
|
|
"Don't count on it.",
|
|
"My reply is no.",
|
|
"My sources say no.",
|
|
"Outlook not so good.",
|
|
"Very doubtful."
|
|
]
|
|
bot.send_message(f"@{username}, {random.choice(responses)}")
|
|
|
|
def cmd_queue_stats(username: str, args: List[str], bot: TwitchBot) -> None:
|
|
"""Show queue statistics"""
|
|
stats = bot.get_queue_stats()
|
|
bot.send_message(f"Queue stats: {stats['total_processed']}/{stats['total_enqueued']} processed, {stats['pending']} pending")
|
|
|
|
def cmd_token_status(username: str, args: List[str], bot: TwitchBot) -> None:
|
|
"""Show status of the Twitch API token (streamer only)"""
|
|
# Check if user is an admin
|
|
if not bot.is_admin(username):
|
|
bot.send_message(f"@{username}, only the channel owner or admins can check token status.")
|
|
return
|
|
|
|
if bot.access_token and bot.token_expiry:
|
|
# Calculate time remaining
|
|
time_remaining = bot.token_expiry - time.time()
|
|
if time_remaining > 0:
|
|
hours = int(time_remaining // 3600)
|
|
minutes = int((time_remaining % 3600) // 60)
|
|
bot.send_message(f"@{username}, token is valid for {hours}h {minutes}m.")
|
|
else:
|
|
bot.send_message(f"@{username}, token is expired and will be refreshed on next use.")
|
|
else:
|
|
bot.send_message(f"@{username}, no token available. It will be generated when needed.")
|
|
|
|
def cmd_stop(username: str, args: List[str], bot: TwitchBot) -> None:
|
|
"""Stop the bot (streamer only)"""
|
|
# Check if user is an admin
|
|
if not bot.is_admin(username):
|
|
bot.send_message(f"@{username}, only the channel owner or admins can stop the bot.")
|
|
return
|
|
|
|
bot.send_message(f"Bot is shutting down by request of @{username}.")
|
|
# Stop the bot
|
|
bot.stop()
|
|
|
|
def main() -> None:
|
|
# Parse command line arguments
|
|
parser: argparse.ArgumentParser = argparse.ArgumentParser(description='Twitch Chat Bot')
|
|
parser.add_argument('--advanced', action='store_true', help='Enable advanced features')
|
|
parser.add_argument('--game-control', action='store_true', help='Enable game control features')
|
|
parser.add_argument('--game-mode', choices=['direct', 'vote'], default='direct',
|
|
help='Game control mode: direct (immediate commands) or vote (timed voting)')
|
|
parser.add_argument('--cooldown', type=float, default=2.0,
|
|
help='Cooldown time between commands in seconds (for direct mode)')
|
|
parser.add_argument('--input-type', choices=['keyboard', 'controller'], default='keyboard',
|
|
help='Input type: keyboard/mouse or Xbox controller emulation')
|
|
parser.add_argument('--use-queue', action='store_true', help='Use Huey task queue for processing commands')
|
|
parser.add_argument('--start-consumer', action='store_true', help='Start Huey consumer within the bot process')
|
|
parser.add_argument('--token-cache', type=str, default="data/cache/token_cache.json",
|
|
help='Path to token cache file (default: data/cache/token_cache.json)')
|
|
parser.add_argument('--admin-users', type=str, help='Comma-separated list of additional admin users who have the same permissions as the channel owner')
|
|
parser.add_argument('--config', type=str, default=DEFAULT_CONFIG_PATH,
|
|
help=f'Path to configuration file (default: {DEFAULT_CONFIG_PATH})')
|
|
parser.add_argument('--force-new-token', action='store_true', help='Force generation of a new token even if a cached one exists')
|
|
args: argparse.Namespace = parser.parse_args()
|
|
|
|
# Load configuration from file if it exists
|
|
config: Dict[str, Any] = {}
|
|
if os.path.exists(args.config):
|
|
try:
|
|
with open(args.config, 'r') as f:
|
|
config = json.load(f)
|
|
print(f"Loaded configuration from {args.config}")
|
|
except json.JSONDecodeError:
|
|
print(f"Error: {args.config} is not a valid JSON file")
|
|
except Exception as e:
|
|
print(f"Error loading config file: {e}")
|
|
|
|
# Get Twitch credentials from environment variables
|
|
username: Optional[str] = os.environ.get("TWITCH_USERNAME")
|
|
client_id: Optional[str] = os.environ.get("TWITCH_CLIENT_ID")
|
|
client_secret: Optional[str] = os.environ.get("TWITCH_CLIENT_SECRET")
|
|
channel: Optional[str] = os.environ.get("TWITCH_CHANNEL")
|
|
|
|
# Check for required credentials
|
|
if not all([username, client_id, client_secret, channel]):
|
|
print("Please set the following environment variables:")
|
|
print(" TWITCH_USERNAME - The bot's username")
|
|
print(" TWITCH_CLIENT_ID - Your app's registered client ID")
|
|
print(" TWITCH_CLIENT_SECRET - Your app's registered client secret")
|
|
print(" TWITCH_CHANNEL - The channel to join")
|
|
print("\nTo get client ID and secret:")
|
|
print("1. Go to https://dev.twitch.tv/console/apps")
|
|
print("2. Create a new application or use an existing one")
|
|
print("3. Get the client ID and generate a client secret")
|
|
|
|
# For testing, you can uncomment and set these values
|
|
# username = "your_bot_username"
|
|
# client_id = "your_client_id"
|
|
# client_secret = "your_client_secret"
|
|
# channel = "your_channel"
|
|
|
|
if not all([username, client_id, client_secret, channel]):
|
|
return
|
|
|
|
# Type assertion to satisfy type checker (we checked these are not None above)
|
|
username = username if username is not None else ""
|
|
client_id = client_id if client_id is not None else ""
|
|
client_secret = client_secret if client_secret is not None else ""
|
|
channel = channel if channel is not None else ""
|
|
|
|
# Parse admin users with priority: command line > env var > config file
|
|
admin_users: Optional[List[str]] = None
|
|
|
|
# 1. From command line
|
|
if args.admin_users:
|
|
admin_users = [user.strip() for user in args.admin_users.split(",")]
|
|
print(f"Additional admin users from command line: {', '.join(admin_users)}")
|
|
|
|
# 2. From environment variable
|
|
elif os.environ.get("TWITCH_ADMIN_USERS"):
|
|
env_admins = os.environ.get("TWITCH_ADMIN_USERS", "")
|
|
admin_users = [user.strip() for user in env_admins.split(",")]
|
|
print(f"Additional admin users from environment: {', '.join(admin_users)}")
|
|
|
|
# 3. From config file
|
|
elif "admin_users" in config and isinstance(config["admin_users"], list):
|
|
admin_users = config["admin_users"]
|
|
print(f"Additional admin users from config file: {', '.join(admin_users)}")
|
|
|
|
# Make sure the cache directory exists
|
|
cache_dir = os.path.dirname(args.token_cache)
|
|
os.makedirs(cache_dir, exist_ok=True)
|
|
|
|
print(f"Using token cache file: {args.token_cache}")
|
|
if args.force_new_token:
|
|
print("Forcing new token generation (any cached token will be ignored)")
|
|
# Remove cached token if forcing new one
|
|
if os.path.exists(args.token_cache):
|
|
try:
|
|
os.remove(args.token_cache)
|
|
print("Removed existing token cache file")
|
|
except Exception as e:
|
|
print(f"Warning: Could not remove existing token cache: {e}")
|
|
else:
|
|
print("Using cached token if available")
|
|
|
|
# Create bot instance - explicitly pass None for access_token to avoid using env vars
|
|
bot: TwitchBot = TwitchBot(
|
|
username=username,
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
channel=channel,
|
|
use_queue=args.use_queue,
|
|
command_prefix=config.get("command_prefix", "!"),
|
|
token_cache_file=args.token_cache,
|
|
admin_users=admin_users,
|
|
force_new_token=args.force_new_token,
|
|
access_token=None, # Explicitly set to None to avoid using env vars
|
|
refresh_token=None # Explicitly set to None
|
|
)
|
|
|
|
# Register basic commands
|
|
bot.register_command("hello", cmd_hello)
|
|
bot.register_command("dice", cmd_dice)
|
|
bot.register_command("echo", cmd_echo)
|
|
bot.register_command("8ball", cmd_magic8ball)
|
|
bot.register_command("token", cmd_token_status)
|
|
bot.register_command("stop", cmd_stop)
|
|
|
|
# Add queue-related commands if using the queue
|
|
if args.use_queue:
|
|
bot.register_command("qstats", cmd_queue_stats)
|
|
|
|
# Start the Huey consumer if requested
|
|
if args.start_consumer:
|
|
try:
|
|
from src.queue.server import start_consumer, set_input_mode, get_input_mode
|
|
import threading
|
|
|
|
# Make sure input mode is set before starting consumer
|
|
if args.game_control and args.input_type:
|
|
print(f"Consumer will use input mode: {get_input_mode()}")
|
|
|
|
consumer_thread = threading.Thread(target=start_consumer)
|
|
consumer_thread.daemon = True
|
|
consumer_thread.start()
|
|
print("Started Huey consumer thread")
|
|
print("Note: For production, it's better to use: python -m huey.bin.huey_consumer src.queue.server.huey")
|
|
except ImportError as e:
|
|
print(f"Failed to start consumer: {e}")
|
|
|
|
# Enable advanced features if requested
|
|
if args.advanced:
|
|
try:
|
|
from src.features.examples import AdvancedExamples
|
|
import traceback # For more detailed error logging
|
|
print("Enabling advanced features...")
|
|
|
|
if args.use_queue:
|
|
from src.queue.server import setup_advanced_commands_config
|
|
setup_advanced_commands_config(
|
|
bot_username_main=bot.username,
|
|
channel_main=channel,
|
|
admin_users_main=admin_users if admin_users else [],
|
|
oauth_token_main=bot.access_token if bot.access_token else ""
|
|
)
|
|
print("Advanced features configuration (poll, points, etc.) saved for queue processing.")
|
|
print("The queue consumer will register these commands: !poll, !vote, !endpoll, !points, !givepoints.")
|
|
|
|
else: # Not using queue, direct registration with bot instance
|
|
|
|
# Wrapper for send_message_func for AdvancedExamples
|
|
# Expected: (channel, oauth_token, bot_username, message)
|
|
def non_queue_send_message_wrapper(ch_ignored: str, oauth_ignored: Optional[str], bot_user_ignored: str, message: str):
|
|
bot.send_message(message) # bot.send_message handles its own channel context
|
|
|
|
# Wrapper for is_admin_func for AdvancedExamples
|
|
# Expected: (username, channel, admin_list)
|
|
def non_queue_is_admin_wrapper(usr: str, chan_ignored: str, admin_list_ignored: List[str]) -> bool:
|
|
return bot.is_admin(usr) # bot.is_admin uses its own channel and admin_list
|
|
|
|
advanced_instance = AdvancedExamples(
|
|
send_message_func=non_queue_send_message_wrapper,
|
|
is_admin_func=non_queue_is_admin_wrapper,
|
|
bot_username=bot.username,
|
|
channel=channel, # Pass the specific channel to AdvancedExamples
|
|
admin_users=admin_users if admin_users else [], # Pass specific admin_users
|
|
use_queue=False
|
|
)
|
|
|
|
# Register commands with the bot instance
|
|
# Lambdas pass username, args, and the bot's current access_token
|
|
bot.register_command("poll", lambda u, a, b_inst: advanced_instance.cmd_start_poll(u, a, b_inst.access_token))
|
|
bot.register_command("vote", lambda u, a, b_inst: advanced_instance.cmd_vote(u, a, b_inst.access_token))
|
|
bot.register_command("endpoll", lambda u, a, b_inst: advanced_instance.cmd_end_poll(u, a, b_inst.access_token))
|
|
bot.register_command("points", lambda u, a, b_inst: advanced_instance.cmd_points(u, a, b_inst.access_token))
|
|
# Using "givepoints" for consistency with queue registration
|
|
bot.register_command("givepoints", lambda u, a, b_inst: advanced_instance.cmd_give_points(u, a, b_inst.access_token))
|
|
|
|
print("Advanced features enabled directly: polls, voting, points system.")
|
|
print("Use !poll, !vote, !endpoll, !points, !givepoints.")
|
|
|
|
# Timer and Sound commands are handled by AdvancedExamples based on its use_queue flag
|
|
# For non-queue mode, AdvancedExamples will allow them.
|
|
bot.register_command("timer", lambda u, a, b_inst: advanced_instance.cmd_timer(u,a,b_inst.access_token))
|
|
print("Timer command !timer enabled.")
|
|
if os.name == 'nt':
|
|
bot.register_command("sound", lambda u, a, b_inst: advanced_instance.cmd_play_sound(u, a, b_inst.access_token))
|
|
print("Sound effects !sound enabled (Windows only).")
|
|
# Create sounds directory if it doesn't exist (moved from previous unconditional spot)
|
|
if not os.path.exists("sounds"):
|
|
os.makedirs("sounds")
|
|
print("Created 'sounds' directory. Add .wav files to use with !sound command")
|
|
|
|
except ImportError as e:
|
|
print(f"Failed to import or configure advanced features: {e}")
|
|
print(traceback.format_exc()) # Print full traceback
|
|
except Exception as e:
|
|
print(f"Error setting up advanced features: {e}")
|
|
print(traceback.format_exc()) # Print full traceback
|
|
|
|
# Enable game control if requested
|
|
if args.game_control:
|
|
try:
|
|
from src.game.controller import GameController, PYNPUT_AVAILABLE
|
|
|
|
# Check if controller input is requested
|
|
if args.input_type == 'controller':
|
|
try:
|
|
from src.game.input.gamepad import VirtualController, VGAMEPAD_AVAILABLE
|
|
if not VGAMEPAD_AVAILABLE:
|
|
print("WARNING: vgamepad library not found. Install it with: pip install vgamepad")
|
|
print("Falling back to keyboard/mouse input.")
|
|
args.input_type = 'keyboard'
|
|
except ImportError:
|
|
print("WARNING: controller_support.py not found or error importing it.")
|
|
print("Falling back to keyboard/mouse input.")
|
|
args.input_type = 'keyboard'
|
|
|
|
# Persist the input type to make it available to the queue consumer process
|
|
try:
|
|
from src.queue.server import set_input_mode, INPUT_MODE_KEYBOARD, INPUT_MODE_CONTROLLER
|
|
if args.input_type == 'keyboard':
|
|
set_input_mode(INPUT_MODE_KEYBOARD)
|
|
print(f"Set persistent input mode to keyboard (this will be used by queue consumer)")
|
|
else:
|
|
set_input_mode(INPUT_MODE_CONTROLLER)
|
|
print(f"Set persistent input mode to controller (this will be used by queue consumer)")
|
|
except ImportError as e:
|
|
print(f"Warning: Could not set persistent input mode: {e}")
|
|
|
|
print(f"Enabling game control in {args.game_mode} mode with {args.input_type} input...")
|
|
|
|
if args.input_type == 'keyboard' and not PYNPUT_AVAILABLE:
|
|
print("WARNING: pynput library not found. Install it with: pip install pynput")
|
|
print("Keyboard/mouse control will be simulated but not actually perform any actions.")
|
|
print("Run 'pip install pynput' to enable actual keyboard/mouse control.")
|
|
|
|
# Only initialize the GameController when NOT using queue mode
|
|
# When in queue mode, the controller is initialized in the queue consumer process
|
|
if not args.use_queue:
|
|
game_controller: GameController = GameController(
|
|
bot,
|
|
mode=args.game_mode,
|
|
cooldown=args.cooldown,
|
|
input_type=args.input_type
|
|
)
|
|
|
|
if args.game_mode == "direct":
|
|
print(f"Commands will execute immediately with a {args.cooldown}s cooldown per user")
|
|
else:
|
|
print("Commands will be collected through voting")
|
|
print("Streamer can start a vote with !startvote and end with !endvote")
|
|
print("Users vote with !vote [command]")
|
|
|
|
print("Use !gamehelp to see available game commands")
|
|
print("Check user stats with !gamestats [username]")
|
|
else:
|
|
print("Game control will be handled by the queue consumer process")
|
|
print("Use !gamehelp to see available game commands")
|
|
|
|
except ImportError as e:
|
|
print(f"Failed to import game control features: {e}")
|
|
print("Make sure you have the required dependencies:")
|
|
print(" pip install pynput")
|
|
if args.input_type == 'controller':
|
|
print(" pip install vgamepad # For controller support")
|
|
|
|
# Start the bot
|
|
print(f"Starting bot for channel #{channel}")
|
|
print("Basic commands: !hello, !dice [sides], !echo [message], !8ball")
|
|
print("Admin commands: !token (check token status)")
|
|
|
|
if args.use_queue:
|
|
print("Queue mode enabled, commands will be processed by the queue system")
|
|
print("Use !qstats to see queue statistics")
|
|
|
|
if args.force_new_token:
|
|
print("Note: Forcing generation of a new authentication token")
|
|
|
|
try:
|
|
# Make sure requests library is installed
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
print("ERROR: The requests library is required for the new authentication method.")
|
|
print("Install it with: pip install requests")
|
|
return
|
|
|
|
# Now start the bot
|
|
bot.start(use_queue=args.use_queue)
|
|
except KeyboardInterrupt:
|
|
print("Bot stopped by user")
|
|
bot.stop()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|