diff --git a/main.py b/main.py index 71a3293..e2e553f 100644 --- a/main.py +++ b/main.py @@ -205,11 +205,12 @@ def main() -> None: client_secret=client_secret, channel=channel, use_queue=args.use_queue, + command_prefix=config.get("command_prefix", "!"), token_cache_file=args.token_cache, admin_users=admin_users, force_new_token=args.force_new_token, access_token=None, # Explicitly set to None to avoid using env vars - refresh_token=None # Explicitly set to None to avoid using env vars + refresh_token=None # Explicitly set to None ) # Register basic commands @@ -246,17 +247,71 @@ def main() -> None: if args.advanced: try: from src.features.examples import AdvancedExamples + import traceback # For more detailed error logging print("Enabling advanced features...") - advanced: AdvancedExamples = AdvancedExamples(bot) - print("Advanced features enabled: polls, voting, timers, points system") - if os.name == 'nt': - print("Sound effects enabled (Windows only)") - # Create sounds directory if it doesn't exist - if not os.path.exists("sounds"): - os.makedirs("sounds") - print("Created 'sounds' directory. Add .wav files to use with !sound command") + + if args.use_queue: + from src.queue.server import setup_advanced_commands_config + setup_advanced_commands_config( + bot_username_main=bot.username, + channel_main=channel, + admin_users_main=admin_users if admin_users else [], + oauth_token_main=bot.access_token if bot.access_token else "" + ) + print("Advanced features configuration (poll, points, etc.) saved for queue processing.") + print("The queue consumer will register these commands: !poll, !vote, !endpoll, !points, !givepoints.") + + else: # Not using queue, direct registration with bot instance + + # Wrapper for send_message_func for AdvancedExamples + # Expected: (channel, oauth_token, bot_username, message) + def non_queue_send_message_wrapper(ch_ignored: str, oauth_ignored: Optional[str], bot_user_ignored: str, message: str): + bot.send_message(message) # bot.send_message handles its own channel context + + # Wrapper for is_admin_func for AdvancedExamples + # Expected: (username, channel, admin_list) + def non_queue_is_admin_wrapper(usr: str, chan_ignored: str, admin_list_ignored: List[str]) -> bool: + return bot.is_admin(usr) # bot.is_admin uses its own channel and admin_list + + advanced_instance = AdvancedExamples( + send_message_func=non_queue_send_message_wrapper, + is_admin_func=non_queue_is_admin_wrapper, + bot_username=bot.username, + channel=channel, # Pass the specific channel to AdvancedExamples + admin_users=admin_users if admin_users else [], # Pass specific admin_users + use_queue=False + ) + + # Register commands with the bot instance + # Lambdas pass username, args, and the bot's current access_token + bot.register_command("poll", lambda u, a, b_inst: advanced_instance.cmd_start_poll(u, a, b_inst.access_token)) + bot.register_command("vote", lambda u, a, b_inst: advanced_instance.cmd_vote(u, a, b_inst.access_token)) + bot.register_command("endpoll", lambda u, a, b_inst: advanced_instance.cmd_end_poll(u, a, b_inst.access_token)) + bot.register_command("points", lambda u, a, b_inst: advanced_instance.cmd_points(u, a, b_inst.access_token)) + # Using "givepoints" for consistency with queue registration + bot.register_command("givepoints", lambda u, a, b_inst: advanced_instance.cmd_give_points(u, a, b_inst.access_token)) + + print("Advanced features enabled directly: polls, voting, points system.") + print("Use !poll, !vote, !endpoll, !points, !givepoints.") + + # Timer and Sound commands are handled by AdvancedExamples based on its use_queue flag + # For non-queue mode, AdvancedExamples will allow them. + bot.register_command("timer", lambda u, a, b_inst: advanced_instance.cmd_timer(u,a,b_inst.access_token)) + print("Timer command !timer enabled.") + if os.name == 'nt': + bot.register_command("sound", lambda u, a, b_inst: advanced_instance.cmd_play_sound(u, a, b_inst.access_token)) + print("Sound effects !sound enabled (Windows only).") + # Create sounds directory if it doesn't exist (moved from previous unconditional spot) + if not os.path.exists("sounds"): + os.makedirs("sounds") + print("Created 'sounds' directory. Add .wav files to use with !sound command") + except ImportError as e: - print(f"Failed to import advanced features: {e}") + print(f"Failed to import or configure advanced features: {e}") + print(traceback.format_exc()) # Print full traceback + except Exception as e: + print(f"Error setting up advanced features: {e}") + print(traceback.format_exc()) # Print full traceback # Enable game control if requested if args.game_control: diff --git a/src/core/twitch.py b/src/core/twitch.py index 978c247..f789f25 100644 --- a/src/core/twitch.py +++ b/src/core/twitch.py @@ -25,6 +25,7 @@ class TwitchBot: def __init__(self, username: str, client_id: str = None, client_secret: str = None, access_token: str = None, refresh_token: str = None, channel: str = None, use_queue: bool = False, + command_prefix: str = "!", token_cache_file: str = "data/cache/token_cache.json", admin_users: List[str] = None, force_new_token: bool = False) -> None: @@ -32,6 +33,7 @@ class TwitchBot: self.client_id: Optional[str] = client_id or os.environ.get("TWITCH_CLIENT_ID") self.client_secret: Optional[str] = client_secret or os.environ.get("TWITCH_CLIENT_SECRET") self.channel: str = channel.lower() if channel else "" + self.command_prefix: str = command_prefix self.server: str = 'irc.chat.twitch.tv' self.port: int = 6667 self.socket: socket.socket = socket.socket() @@ -243,23 +245,26 @@ class TwitchBot: username: str = message_data['username'] # If this is a regular message (not a command), handle it - if not content.startswith('!'): + if not content.startswith(self.command_prefix): if self.use_queue: # If using queue, send the message to the queue enqueue_message(username, content) return - # At this point, we know this is a command (starts with !) - parts: List[str] = content.split() - command: str = parts[0][1:].lower() # Remove ! and convert to lowercase - args: List[str] = parts[1:] if len(parts) > 1 else [] + # Make sure the content is longer than the prefix itself before trying to split + if len(content) <= len(self.command_prefix): + return # Not a valid command format + + command_parts: List[str] = content[len(self.command_prefix):].split() + command_name: str = command_parts[0].lower() + args: List[str] = command_parts[1:] # Store the last executed command - self.last_command = command + self.last_command = command_name # Log if this is a controller command - if command in self.commands: - logger.info(f"Recognized command: !{command}") + if command_name in self.commands: + logger.info(f"Recognized command: {self.command_prefix}{command_name}") # If using queue, just enqueue the command without any processing if self.use_queue: @@ -269,7 +274,7 @@ class TwitchBot: enqueue_command( username, - command, + command_name, args, channel=self.channel, oauth_token=self.oauth_token, @@ -278,11 +283,11 @@ class TwitchBot: return # Direct execution (no queue) - if command in self.commands: + if command_name in self.commands: try: - self.commands[command](username, args, self) + self.commands[command_name](username, args, self) except Exception as e: - logger.error(f"Error executing command {command}: {e}\n{traceback.format_exc()}") + logger.error(f"Error executing command {self.command_prefix}{command_name}: {e}\n{traceback.format_exc()}") def start(self, use_queue: bool = False) -> None: """Start the bot and listen for messages""" diff --git a/src/core/utils.py b/src/core/utils.py new file mode 100644 index 0000000..b73008e --- /dev/null +++ b/src/core/utils.py @@ -0,0 +1,27 @@ +from typing import List +# Add a standalone version of is_admin for use by other modules +def is_admin(username: str, channel: str = None, admin_users: List[str] = None) -> bool: + """ + Check if a user has admin privileges + + Args: + username: The username to check + channel: The channel name (channel owner is always an admin) + admin_users: Optional list of additional admin users + + Returns: + bool: True if the user is an admin, False otherwise + """ + # Normalize usernames to lowercase + username = username.lower() + + # Create admin list - channel owner is always an admin + admins = [] + if channel: + admins.append(channel.lower()) + + # Add additional admins if provided + if admin_users: + admins.extend([user.lower() for user in admin_users]) + + return username in admins \ No newline at end of file diff --git a/src/features/examples.py b/src/features/examples.py index 3fcd864..198e307 100644 --- a/src/features/examples.py +++ b/src/features/examples.py @@ -3,8 +3,11 @@ import threading import subprocess import json import os -from typing import Dict, List, Any, Optional, Set, Tuple, Union, cast -from src.core.twitch import TwitchBot +import shelve +import re +from typing import Dict, List, Any, Optional, Set, Tuple, Union, cast, Callable + +SHELVE_FILE_ADVANCED_FEATURES = "data/cache/advanced_features_state" class AdvancedExamples: """ @@ -15,27 +18,90 @@ class AdvancedExamples: - User points system """ - def __init__(self, bot: TwitchBot) -> None: - self.bot: TwitchBot = bot + def __init__(self, + send_message_func: Callable[[str, str, str, str], None], + is_admin_func: Callable[[str, str, List[str]], bool], + bot_username: str, + channel: str, + admin_users: List[str], + use_queue: bool = False, + shelve_file_key_prefix: str = "advfeat") -> None: + self.send_message_func = send_message_func + self.is_admin_func = is_admin_func + self.bot_username = bot_username + self.channel = channel + self.admin_users = admin_users + self.use_queue = use_queue + self.shelve_file_key_prefix = shelve_file_key_prefix + self.votes: Dict[str, int] = {} self.current_poll: Optional[Dict[str, Any]] = None self.poll_active: bool = False self.timer_threads: Dict[str, threading.Timer] = {} - self.user_points: Dict[str, int] = self.load_points() - - # Register commands - self.bot.register_command("poll", self.cmd_start_poll) - self.bot.register_command("vote", self.cmd_vote) - self.bot.register_command("endpoll", self.cmd_end_poll) - self.bot.register_command("timer", self.cmd_timer) - self.bot.register_command("points", self.cmd_points) - self.bot.register_command("give", self.cmd_give_points) - - # For external application control (example) - if os.name == 'nt': # Windows - self.bot.register_command("sound", self.cmd_play_sound) - - def load_points(self) -> Dict[str, int]: + self.user_points: Dict[str, int] = {} + + if self.use_queue: + self._load_poll_state() + self._load_user_points() + else: + self.user_points = self._load_points_from_json_file() + + def _get_shelve_db(self): + return shelve.open(SHELVE_FILE_ADVANCED_FEATURES) + + def _load_poll_state(self) -> None: + if not self.use_queue: + return + try: + with self._get_shelve_db() as db: + poll_state_key = f"{self.shelve_file_key_prefix}_poll" + state = db.get(poll_state_key, {}) + self.current_poll = state.get("current_poll") + self.poll_active = state.get("poll_active", False) + self.votes = state.get("votes", {}) + except Exception as e: + print(f"Error loading poll state from shelve: {e}") + self.current_poll = None + self.poll_active = False + self.votes = {} + + def _save_poll_state(self) -> None: + if not self.use_queue: + return + try: + with self._get_shelve_db() as db: + poll_state_key = f"{self.shelve_file_key_prefix}_poll" + db[poll_state_key] = { + "current_poll": self.current_poll, + "poll_active": self.poll_active, + "votes": self.votes + } + except Exception as e: + print(f"Error saving poll state to shelve: {e}") + + def _load_user_points(self) -> None: + if not self.use_queue: + return + try: + with self._get_shelve_db() as db: + points_key = f"{self.shelve_file_key_prefix}_points" + self.user_points = db.get(points_key, {}) + except Exception as e: + print(f"Error loading user points from shelve: {e}") + self.user_points = {} + + def _save_user_points(self) -> None: + if not self.use_queue: + self._save_points_to_json_file() + return + try: + with self._get_shelve_db() as db: + points_key = f"{self.shelve_file_key_prefix}_points" + db[points_key] = self.user_points + except Exception as e: + print(f"Error saving user points to shelve: {e}") + + def _load_points_from_json_file(self) -> Dict[str, int]: """Load user points from a JSON file""" try: with open('user_points.json', 'r') as f: @@ -43,27 +109,23 @@ class AdvancedExamples: except (FileNotFoundError, json.JSONDecodeError): return {} - def save_points(self) -> None: + def _save_points_to_json_file(self) -> None: """Save user points to a JSON file""" with open('user_points.json', 'w') as f: json.dump(self.user_points, f) - # ---- Voting System ---- - - def cmd_start_poll(self, username: str, args: List[str], bot: TwitchBot) -> None: - """Start a new poll with options""" + def cmd_start_poll(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None: + if self.use_queue: self._load_poll_state() + if not args or len(args) < 3: - bot.send_message(f"@{username}, usage: !poll \"Question\" \"Option1\" \"Option2\" [\"Option3\"...]") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, usage: !poll \"Question\" \"Option1\" \"Option2\" [\"Option3\"...]") return if self.poll_active: - bot.send_message(f"@{username}, a poll is already active. End it with !endpoll first.") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, a poll is already active. End it with !endpoll first.") return - # Reset votes self.votes = {} - - # Extract question and options question: str = args[0] options: List[str] = args[1:] @@ -73,198 +135,263 @@ class AdvancedExamples: "started_by": username, "timestamp": time.time() } - self.poll_active = True - # Announce poll - bot.send_message(f"POLL STARTED: {question}") - for i, option in enumerate(options): - bot.send_message(f"Option {i+1}: {option}") - bot.send_message("Vote using !vote ") + if self.use_queue: self._save_poll_state() - def cmd_vote(self, username: str, args: List[str], bot: TwitchBot) -> None: - """Cast a vote in the active poll""" + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"POLL STARTED: {question}") + for i, option in enumerate(options): + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"Option {i+1}: {option}") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, "Vote using !vote ") + + def cmd_vote(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None: + if self.use_queue: self._load_poll_state() + if not self.poll_active or not self.current_poll: - bot.send_message(f"@{username}, there is no active poll.") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, there is no active poll.") return if not args: - bot.send_message(f"@{username}, please specify an option number: !vote ") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, please specify an option number: !vote ") return try: - vote: int = int(args[0]) - if vote < 1 or vote > len(self.current_poll["options"]): + vote_val: int = int(args[0]) + if vote_val < 1 or vote_val > len(self.current_poll["options"]): raise ValueError except ValueError: - bot.send_message(f"@{username}, please provide a valid option number between 1 and {len(self.current_poll['options'])}") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, please provide a valid option number between 1 and {len(self.current_poll['options'])}") return - # Record the vote - self.votes[username] = vote - bot.send_message(f"@{username} voted for option {vote}: {self.current_poll['options'][vote-1]}") + self.votes[username] = vote_val + if self.use_queue: self._save_poll_state() - def cmd_end_poll(self, username: str, args: List[str], bot: TwitchBot) -> None: - """End the active poll and show results""" + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username} voted for option {vote_val}: {self.current_poll['options'][vote_val-1]}") + + def cmd_end_poll(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None: + if self.use_queue: self._load_poll_state() + if not self.poll_active or not self.current_poll: - bot.send_message(f"@{username}, there is no active poll.") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, there is no active poll.") return - if username != self.current_poll["started_by"] and not bot.is_admin(username): - bot.send_message(f"@{username}, only {self.current_poll['started_by']} or an admin can end this poll.") + if username != self.current_poll["started_by"] and \ + not self.is_admin_func(username, self.channel, self.admin_users): + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, only {self.current_poll['started_by']} or an admin can end this poll.") return - # Count votes results: Dict[int, int] = {} for i in range(1, len(self.current_poll["options"]) + 1): results[i] = 0 - for vote in self.votes.values(): - results[vote] += 1 + for vote_val in self.votes.values(): + results[vote_val] += 1 + + vote_str: str = "vote" if len(self.votes) == 1 else "votes" - # Find winner(s) max_votes: int = max(results.values()) if results else 0 winners: List[int] = [opt for opt, count in results.items() if count == max_votes] - # Announce results - bot.send_message(f"POLL ENDED: {self.current_poll['question']}") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"POLL ENDED: {self.current_poll['question']}") for i, option in enumerate(self.current_poll["options"], 1): - bot.send_message(f"Option {i}: {option} - {results.get(i, 0)} votes") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"Option {i}: {option} - {results.get(i, 0)} {vote_str}") if max_votes > 0: if len(winners) == 1: winner_option: str = self.current_poll["options"][winners[0]-1] - bot.send_message(f"The winner is: {winner_option} with {max_votes} votes!") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"The winner is: {winner_option} with {max_votes} {vote_str}!") else: tied_options: List[str] = [self.current_poll["options"][w-1] for w in winners] - bot.send_message(f"It's a tie between: {', '.join(tied_options)} with {max_votes} votes each!") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"It's a tie between: {', '.join(tied_options)} with {max_votes} {vote_str} each!") else: - bot.send_message("No votes were cast.") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, "No votes were cast.") - # Reset poll self.poll_active = False self.current_poll = None self.votes = {} + if self.use_queue: self._save_poll_state() - # ---- Timer System ---- - - def timer_callback(self, timer_id: str, message: str) -> None: - """Callback function for timers""" - self.bot.send_message(message) - del self.timer_threads[timer_id] + def timer_callback(self, timer_id: str, message: str, oauth_token: Optional[str] = None) -> None: + self.send_message_func(self.channel, oauth_token or "", self.bot_username, message) + if timer_id in self.timer_threads: + del self.timer_threads[timer_id] - def cmd_timer(self, username: str, args: List[str], bot: TwitchBot) -> None: - """Set a timer to send a message after X seconds""" + def cmd_timer(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None: + if self.use_queue: + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, the !timer command is not available when using the queue system in this version.") + return + if len(args) < 2: - bot.send_message(f"@{username}, usage: !timer ") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, usage: !timer ") return try: seconds: int = int(args[0]) - if seconds <= 0 or seconds > 3600: # Limit to 1 hour + if seconds <= 0 or seconds > 3600: raise ValueError except ValueError: - bot.send_message(f"@{username}, please provide a valid time between 1-3600 seconds.") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, please provide a valid time between 1-3600 seconds.") return message: str = f"⏰ TIMER ({username}): {' '.join(args[1:])}" - bot.send_message(f"@{username}, timer set for {seconds} seconds!") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, timer set for {seconds} seconds!") - # Create a unique ID for this timer timer_id: str = f"{username}_{int(time.time())}" - - # Create and start the timer thread - timer_thread: threading.Timer = threading.Timer(seconds, self.timer_callback, args=[timer_id, message]) + timer_thread: threading.Timer = threading.Timer(seconds, self.timer_callback, args=[timer_id, message, oauth_token]) timer_thread.daemon = True timer_thread.start() - - # Store the thread self.timer_threads[timer_id] = timer_thread - # ---- Points System ---- - - def cmd_points(self, username: str, args: List[str], bot: TwitchBot) -> None: - """Check user points""" - target_user: str = username + def cmd_points(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None: + if self.use_queue: self._load_user_points() + + target_user: str = username.lower() if args and args[0].startswith('@'): target_user = args[0][1:].lower() + elif args : + target_user = args[0].lower() points: int = self.user_points.get(target_user, 0) - bot.send_message(f"@{username}, {target_user} has {points} points.") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, {target_user} has {points} points.") - def cmd_give_points(self, username: str, args: List[str], bot: TwitchBot) -> None: - """Give points to a user (admin only)""" - if not bot.is_admin(username): - bot.send_message(f"@{username}, only the channel owner or admins can give points.") + def cmd_give_points(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None: + if not self.is_admin_func(username, self.channel, self.admin_users): + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, only the channel owner or admins can give points.") return - - if len(args) < 2: - bot.send_message(f"@{username}, usage: !give ") + + if len(args) < 2 or not args[1].isdigit(): + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, usage: !give <@user_or_user> ") return - - target_user: str = args[0] + + if self.use_queue: self._load_user_points() + + target_user: str = args[0].lower() if target_user.startswith('@'): target_user = target_user[1:] - target_user = target_user.lower() - - try: - points: int = int(args[1]) - if points <= 0: - raise ValueError - except ValueError: - bot.send_message(f"@{username}, please provide a valid positive number of points.") + + amount: int = int(args[1]) + + self.user_points[target_user] = self.user_points.get(target_user, 0) + amount + if self.use_queue: self._save_user_points() + else: + self._save_points_to_json_file() + + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username} gave {amount} points to {target_user}. They now have {self.user_points[target_user]} points.") + + def cmd_play_sound(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None: + if self.use_queue: + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, the !sound command is not available when using the queue system in this version.") return - - # Add points to user - self.user_points[target_user] = self.user_points.get(target_user, 0) + points - self.save_points() - - bot.send_message(f"@{target_user}, you received {points} points from {username}! You now have {self.user_points[target_user]} points.") - - # ---- External Control ---- - - def cmd_play_sound(self, username: str, args: List[str], bot: TwitchBot) -> None: - """Play a sound effect (Windows only)""" + + if os.name != 'nt': + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, sound effects are only supported on Windows.") + return + if not args: - bot.send_message(f"@{username}, usage: !sound ") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, usage: !sound ") + try: + sounds_dir = "sounds" + available_sounds = [f.split('.')[0] for f in os.listdir(sounds_dir) if f.endswith(".wav")] + if available_sounds: + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"Available sounds: {', '.join(available_sounds)}") + else: + self.send_message_func(self.channel, oauth_token or "", self.bot_username, "No sound files found in 'sounds' directory.") + except FileNotFoundError: + self.send_message_func(self.channel, oauth_token or "", self.bot_username, "The 'sounds' directory does not exist.") return - sound_name: str = args[0].lower() - sound_files: Dict[str, str] = { - "applause": "applause.wav", - "drumroll": "drumroll.wav", - "fail": "fail.wav", - "victory": "victory.wav" - } + sound_name: str = args[0] + sound_file_path: str = os.path.join("sounds", f"{sound_name}.wav") - if sound_name not in sound_files: - bot.send_message(f"@{username}, available sounds: {', '.join(sound_files.keys())}") + if not os.path.exists(sound_file_path): + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, sound '{sound_name}' not found.") return - sound_path: str = os.path.join("sounds", sound_files[sound_name]) - - if not os.path.exists(sound_path): - bot.send_message(f"@{username}, sound file not found: {sound_path}") - return - - # Play the sound using Windows PowerShell try: - ps_command: str = f'powershell -c (New-Object Media.SoundPlayer "{sound_path}").PlaySync();' - subprocess.Popen(ps_command, shell=True) - bot.send_message(f"@{username} played the {sound_name} sound!") + subprocess.Popen([ + "powershell", + "-c", + f"(New-Object Media.SoundPlayer '{sound_file_path}').PlaySync();" + ], creationflags=subprocess.CREATE_NO_WINDOW) + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username} played sound: {sound_name}") except Exception as e: - bot.send_message(f"Error playing sound: {e}") + self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, failed to play sound {sound_name}: {e}") + print(f"Error playing sound: {e}") +if __name__ == '__main__': + mock_admin_users = ["adminuser", "streamer"] + mock_channel = "testchannel" + mock_bot_username = "testbot" -# Example usage in main.py: -""" -from examples import AdvancedExamples + def mock_send_message(channel, oauth_token, bot_username, message): + print(f"[CHAT] To #{channel} (as {bot_username}): {message}") -def main(): - # ... set up bot ... + def mock_is_admin(username, channel, admin_list): + return username.lower() in admin_list or username.lower() == channel.lower() + + print("\n--- Testing Non-Queue Mode ---") + adv_ex_no_queue = AdvancedExamples( + send_message_func=mock_send_message, + is_admin_func=mock_is_admin, + bot_username=mock_bot_username, + channel=mock_channel, + admin_users=mock_admin_users, + use_queue=False + ) + adv_ex_no_queue.cmd_start_poll("streamer", ["Favorite Color?", "Red", "Blue", "Green"]) + adv_ex_no_queue.cmd_vote("user1", ["1"]) + adv_ex_no_queue.cmd_vote("user2", ["2"]) + adv_ex_no_queue.cmd_vote("user3", ["1"]) + adv_ex_no_queue.cmd_end_poll("streamer", []) - # Initialize advanced examples - advanced = AdvancedExamples(bot) + adv_ex_no_queue.cmd_points("user1", []) + adv_ex_no_queue.cmd_give_points("adminuser", ["user1", "100"]) + adv_ex_no_queue.cmd_points("user1", []) + adv_ex_no_queue.cmd_points("adminuser", ["@user1"]) + + print("\n\n--- Testing Queue Mode ---") + os.makedirs("data/cache", exist_ok=True) - # ... start bot ... -""" \ No newline at end of file + shelve_path = f"{SHELVE_FILE_ADVANCED_FEATURES}.db" + if os.path.exists(shelve_path): + os.remove(shelve_path) + shelve_dir_path = f"{SHELVE_FILE_ADVANCED_FEATURES}.dir" + if os.path.exists(shelve_dir_path): + import shutil + shutil.rmtree(shelve_dir_path) + + adv_ex_queue = AdvancedExamples( + send_message_func=mock_send_message, + is_admin_func=mock_is_admin, + bot_username=mock_bot_username, + channel=mock_channel, + admin_users=mock_admin_users, + use_queue=True, + shelve_file_key_prefix="test_adv_feat" + ) + + mock_oauth = "mock_oauth_token" + + adv_ex_queue.cmd_start_poll("streamer", ["Best Game?", "GameA", "GameB"], mock_oauth) + adv_ex_queue.cmd_vote("viewerA", ["1"], mock_oauth) + adv_ex_queue.cmd_vote("viewerB", ["2"], mock_oauth) + adv_ex_queue.cmd_vote("viewerC", ["2"], mock_oauth) + + adv_ex_queue_reader = AdvancedExamples( + send_message_func=mock_send_message, + is_admin_func=mock_is_admin, + bot_username=mock_bot_username, + channel=mock_channel, + admin_users=mock_admin_users, + use_queue=True, + shelve_file_key_prefix="test_adv_feat" + ) + adv_ex_queue_reader.cmd_end_poll("streamer", [], mock_oauth) + + adv_ex_queue.cmd_points("viewerA", [], mock_oauth) + adv_ex_queue.cmd_give_points("adminuser", ["viewerA", "50"], mock_oauth) + adv_ex_queue_reader.cmd_points("viewerA", [], mock_oauth) + adv_ex_queue_reader.cmd_give_points("streamer", ["viewerB", "75"], mock_oauth) + adv_ex_queue.cmd_points("viewerB", ["@viewerB"], mock_oauth) + + print(f"\nShelve file used: {SHELVE_FILE_ADVANCED_FEATURES}") \ No newline at end of file diff --git a/src/queue/server.py b/src/queue/server.py index 4636bed..5b146ae 100644 --- a/src/queue/server.py +++ b/src/queue/server.py @@ -6,8 +6,12 @@ import time import random import cloudpickle import traceback +import re # Ensure re is imported (it likely is already) +import shlex # Added for robust parsing originally, but will use regex from typing import Dict, Any, Optional, List, Callable, Union, Tuple import json +from src.features.examples import AdvancedExamples +from src.core.utils import is_admin # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') @@ -27,6 +31,9 @@ SETTINGS_KEY_KEYBOARD_INITIALIZED = "keyboard_initialized" SETTINGS_KEY_ACTIVE_GAME = "active_game_name" GAME_CONFIG_FILE_PATH = "data/config/game_controls.json" +# Shelve key for advanced command configuration +SETTINGS_KEY_ADV_CMD_CONFIG = "advanced_command_config" # New key + # Queue stats shelve keys STATS_KEY_TOTAL_ENQUEUED = "total_enqueued" STATS_KEY_TOTAL_PROCESSED = "total_processed" @@ -684,7 +691,18 @@ register_command("8ball", handle_8ball_task, "Ask the magic 8ball a question", " @huey.task() def handle_qstats_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: - """Handler for !qstats command""" + """Handler for !qstats command (admin only)""" + if not is_admin(username, channel): + message = f"@{username}, only channel admins can view queue statistics." + send_twitch_message(channel, oauth_token, bot_username, message) + return { + 'success': False, + 'username': username, + 'command': 'qstats', + 'error': 'not_admin', + 'timestamp': time.time() + } + stats = get_queue_stats() success = send_twitch_message(channel, oauth_token, bot_username, f"Queue stats: {stats['total_processed']}/{stats['total_enqueued']} processed, {stats['pending']} pending") @@ -705,7 +723,7 @@ def handle_qstats_task(username: str, args: List[str], channel: str, oauth_token } # Register qstats command -register_command("qstats", handle_qstats_task, "Show queue statistics", "queue") +register_command("qstats", handle_qstats_task, "Show queue statistics (admin only)", "admin") def log_command(username: str, command: str, command_type: str, success: bool = True, args: List[str] = None) -> None: """ @@ -890,7 +908,6 @@ def handle_unknown_command_task(username: str, command: str, args: List[str], ch def handle_switchmode_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: """Handler for !switchmode command to switch between keyboard and controller modes""" # Only allow admins to switch modes - from src.core.twitch import is_admin if not is_admin(username, channel): message = f"@{username}, only channel admins can switch input modes" send_twitch_message(channel, oauth_token, bot_username, message) @@ -981,47 +998,53 @@ register_command("switchmode", handle_switchmode_task, # Register basic keyboard commands def register_keyboard_commands(): - """Register keyboard commands based on the default game configuration.""" + """Register keyboard commands based on the default game's abstract command names.""" configs = load_game_configs() default_game_name = configs.get("default_game") if not default_game_name: logger.warning("No default_game specified in game_controls.json. Cannot register default keyboard commands.") return - default_game_config = configs.get("games", {}).get(default_game_name, {}).get("keyboard", {}) - if not default_game_config: + # Get the keyboard commands section for the default game + keyboard_config = configs.get("games", {}).get(default_game_name, {}).get("keyboard", {}) + if not keyboard_config: logger.warning(f"No keyboard configuration found for default game '{default_game_name}' in game_controls.json.") return - for cmd_name, cmd_details in default_game_config.items(): - description = cmd_details.get("description", f"Perform '{cmd_name}' keyboard action (game-specific)") - register_command(cmd_name, - lambda u, a, c, o, b, cmd=cmd_name: handle_keyboard_command_task(u, a, c, o, b, cmd), + # Register each abstract command name found in the default game's keyboard config + for abstract_cmd_name, cmd_details in keyboard_config.items(): + description = cmd_details.get("description", f"Perform '{abstract_cmd_name}' keyboard action (game-specific)") + # The actual physical mapping (action_type, params) is inside cmd_details, + # but execute_game_command will use get_current_game_control_config which reads it at runtime. + register_command(abstract_cmd_name, + lambda u, a, c, o, b, cmd=abstract_cmd_name: handle_keyboard_command_task(u, a, c, o, b, cmd), description, "keyboard") - logger.info(f"Registered {len(default_game_config)} keyboard commands based on default game: {default_game_name}") + logger.info(f"Registered {len(keyboard_config)} keyboard commands (abstract names) based on default game: {default_game_name}") # Register basic controller commands def register_controller_commands(): - """Register controller commands based on the default game configuration.""" + """Register controller commands based on the default game's abstract command names.""" configs = load_game_configs() default_game_name = configs.get("default_game") if not default_game_name: logger.warning("No default_game specified in game_controls.json. Cannot register default controller commands.") return - default_game_config = configs.get("games", {}).get(default_game_name, {}).get("controller", {}) - if not default_game_config: + # Get the controller commands section for the default game + controller_config = configs.get("games", {}).get(default_game_name, {}).get("controller", {}) + if not controller_config: logger.warning(f"No controller configuration found for default game '{default_game_name}' in game_controls.json.") return - for cmd_name, cmd_details in default_game_config.items(): - description = cmd_details.get("description", f"Perform '{cmd_name}' controller action (game-specific)") - register_command(cmd_name, - lambda u, a, c, o, b, cmd=cmd_name: handle_controller_command_task(u, a, c, o, b, cmd), + # Register each abstract command name found in the default game's controller config + for abstract_cmd_name, cmd_details in controller_config.items(): + description = cmd_details.get("description", f"Perform '{abstract_cmd_name}' controller action (game-specific)") + register_command(abstract_cmd_name, + lambda u, a, c, o, b, cmd=abstract_cmd_name: handle_controller_command_task(u, a, c, o, b, cmd), description, "controller") - logger.info(f"Registered {len(default_game_config)} controller commands based on default game: {default_game_name}") + logger.info(f"Registered {len(controller_config)} controller commands (abstract names) based on default game: {default_game_name}") @huey.task() def handle_gamehelp_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: @@ -1106,7 +1129,21 @@ def process_chat_command(username: str, command: str, args: List[str], This task will be executed by the Huey consumer """ - logger.info(f"Processing command '{command}' from {username} with args: {args}") + logger.info(f"Processing command '{command}' from {username} with raw args: {args}") + + # Reconstruct the argument string from the input args list + raw_args_string = " ".join(args) + + # Parse the raw_args_string to handle quoted arguments + # This regex finds: + # 1. Quoted strings: "([^"]*)" - captures content within double quotes + # 2. Unquoted strings: (\S+) - captures sequences of non-whitespace characters + matches = re.findall(r'"([^"]*)"|(\S+)', raw_args_string) + # Convert matches to a flat list of arguments + # Each match is a tuple, e.g., ('quoted content', '') or ('', 'unquoted_word') + parsed_args = [match[0] if match[0] else match[1] for match in matches] + + logger.info(f"Parsed args for '{command}': {parsed_args}") # Record stats - this happens in the consumer process when the task is actually processed queue_stats.log_processed(f"command:{command}") @@ -1116,64 +1153,67 @@ def process_chat_command(username: str, command: str, args: List[str], if command in command_registry: cmd_info = command_registry[command] try: - # Execute the command handler + # Execute the command handler with parsed_args logger.info(f"Found handler for '{command}', executing...") - result = cmd_info.handler(username, args, channel, oauth_token, bot_username) + result = cmd_info.handler(username, parsed_args, channel, oauth_token, bot_username) # Log all commands, not just game commands # Skip logging if the command handler already logged it (check for already_logged flag) # Also skip keyboard and controller commands as they handle their own logging if result is None: # Handle case where result is None (command handler didn't return anything) - log_command(username, command, cmd_info.category, False) + log_command(username, command, cmd_info.category, False, parsed_args) elif command not in ["commandlog"] and not result.get('already_logged', False) and cmd_info.category not in ["keyboard", "controller"]: - log_command(username, command, cmd_info.category, result.get('success', True), result.get('args', [])) + # Use parsed_args for logging if result.get('args') is not available or not relevant + log_args = result.get('args', parsed_args) if isinstance(result, dict) else parsed_args + log_command(username, command, cmd_info.category, result.get('success', True), log_args) return result except Exception as e: # Log full traceback for better debugging - logger.error(f"Error executing command {command}: {e}\n{traceback.format_exc()}") + logger.error(f"Error executing command {command}: {e}\\n{traceback.format_exc()}") # Log failed command if command in command_registry: - log_command(username, command, command_registry[command].category, False) + log_command(username, command, command_registry[command].category, False, parsed_args) return { 'success': False, 'error': str(e), 'username': username, 'command': command, - 'args': args, + 'args': parsed_args, # Return parsed_args 'timestamp': time.time() } else: # Unknown command - try the fallback handler try: - result = handle_unknown_command_task(username, command, args, channel, oauth_token, bot_username) - # No need to log here, as the handler now uses registered commands + result = handle_unknown_command_task(username, command, parsed_args, channel, oauth_token, bot_username) + # Logging is handled by handle_unknown_command_task or if it calls a registered command return result except Exception as e: # Log full traceback for better debugging - logger.error(f"Error handling unknown command {command}: {e}\n{traceback.format_exc()}") + logger.error(f"Error handling unknown command {command}: {e}\\n{traceback.format_exc()}") # Log failed unknown command - log_command(username, command, "unknown", False, args) + log_command(username, command, "unknown", False, parsed_args) return { 'success': False, 'error': str(e), 'username': username, 'command': command, - 'args': args, + 'args': parsed_args, # Return parsed_args 'timestamp': time.time() } except Exception as e: # Log full traceback for better debugging - logger.error(f"Unexpected error in process_chat_command: {e}\n{traceback.format_exc()}") + logger.error(f"Unexpected error in process_chat_command: {e}\\n{traceback.format_exc()}") # Log the error - log_command(username, command, "unknown", False) + log_command(username, command, "unknown", False, parsed_args) # Make sure we return a safely picklable object return { 'success': False, 'error': str(e), 'username': username, 'command': command, + 'args': parsed_args, # Return parsed_args 'timestamp': time.time() } @@ -1240,7 +1280,7 @@ def handle_ping_task(username: str, args: List[str], channel: str, oauth_token: } # Register ping command -register_command("ping", handle_ping_task, "Check if the bot is responding", "general") +#register_command("ping", handle_ping_task, "Check if the bot is responding", "admin") @huey.task() def handle_help_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: @@ -1266,7 +1306,6 @@ def handle_help_task(username: str, args: List[str], channel: str, oauth_token: f"Queue commands: {', '.join(queue_cmds)}") # Show admin commands if user is admin - from src.core.twitch import is_admin if is_admin(username, channel) and "admin" in categories: admin_cmds = [f"!{cmd}" for cmd, _ in categories["admin"]] send_twitch_message(channel, oauth_token, bot_username, @@ -1329,6 +1368,9 @@ def handle_commandlog_task(username: str, args: List[str], channel: str, oauth_t if filter_type: filtered_log = [entry for entry in filtered_log if entry['type'] == filter_type] + # Exclude the 'commandlog' command itself from the results + filtered_log = [entry for entry in filtered_log if entry['command'] != 'commandlog'] + if not filtered_log: filters = [] if filter_user: @@ -1432,7 +1474,6 @@ def handle_clearlog_task(username: str, args: List[str], channel: str, oauth_tok import shelve # Only allow admins to clear the log - from src.core.twitch import is_admin if not is_admin(username, channel): message = f"@{username}, only channel admins can clear the command log" send_twitch_message(channel, oauth_token, bot_username, message) @@ -1481,7 +1522,6 @@ def handle_allcommands_task(username: str, args: List[str], channel: str, oauth_ categories = get_commands_by_category() # Check if user is admin - from src.core.twitch import is_admin is_user_admin = is_admin(username, channel) # First message as header @@ -1527,7 +1567,6 @@ register_command("allcommands", handle_allcommands_task, @huey.task() def handle_setgame_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: """Handler for !setgame command to change the active game configuration.""" - from src.core.twitch import is_admin if not is_admin(username, channel): message = f"@{username}, only channel admins can change the active game." send_twitch_message(channel, oauth_token, bot_username, message) @@ -1587,6 +1626,214 @@ def handle_setgame_task(username: str, args: List[str], channel: str, oauth_toke register_command("setgame", handle_setgame_task, "Set the active game for control configurations (admin only). Usage: !setgame ", "admin") +# ---- ADVANCED FEATURE COMMANDS (HUEY TASKS) ---- + +def _get_advanced_cmd_config_from_shelve() -> Optional[Dict[str, Any]]: + """Load advanced command configuration from shelve.""" + import shelve + try: + with shelve.open(SHELVE_FILE) as db: + return db.get(SETTINGS_KEY_ADV_CMD_CONFIG) + except Exception as e: + logger.error(f"Error loading advanced command config from shelve: {e}\\n{traceback.format_exc()}") + return None + +@huey.task() +def handle_poll_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + adv_cmd_config = _get_advanced_cmd_config_from_shelve() + if not adv_cmd_config: + logger.error("Advanced commands config not found in shelve for handle_poll_task.") + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the poll command is currently not available (server configuration error).") + return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'poll'} + + adv_examples_instance = AdvancedExamples( + send_message_func=send_twitch_message, + is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])), + bot_username=adv_cmd_config["bot_username"], + channel=adv_cmd_config["channel"], + admin_users=adv_cmd_config.get("admin_users", []), + use_queue=True, + shelve_file_key_prefix="advfeat_q" # Keep queue-specific prefix for its own state + ) + try: + # Pass the oauth_token received by the task to the method + adv_examples_instance.cmd_start_poll(username, args, oauth_token) + log_command(username, "poll", "advanced", True, args) + return { + 'success': True, 'username': username, 'command': 'poll', 'timestamp': time.time(), 'already_logged': True + } + except Exception as e: + logger.error(f"Error in handle_poll_task: {e}\\n{traceback.format_exc()}") + log_command(username, "poll", "advanced", False, args) + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while trying to start the poll.") + return { + 'success': False, 'error': str(e), 'username': username, 'command': 'poll', 'timestamp': time.time(), 'already_logged': True + } + +@huey.task() +def handle_vote_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + adv_cmd_config = _get_advanced_cmd_config_from_shelve() + if not adv_cmd_config: + logger.error("Advanced commands config not found in shelve for handle_vote_task.") + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the vote command is currently not available (server configuration error).") + return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'vote'} + + adv_examples_instance = AdvancedExamples( + send_message_func=send_twitch_message, + is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])), + bot_username=adv_cmd_config["bot_username"], + channel=adv_cmd_config["channel"], + admin_users=adv_cmd_config.get("admin_users", []), + use_queue=True, + shelve_file_key_prefix="advfeat_q" + ) + try: + adv_examples_instance.cmd_vote(username, args, oauth_token) + log_command(username, "vote", "advanced", True, args) + return { + 'success': True, 'username': username, 'command': 'vote', 'timestamp': time.time(), 'already_logged': True + } + except Exception as e: + logger.error(f"Error in handle_vote_task: {e}\\n{traceback.format_exc()}") + log_command(username, "vote", "advanced", False, args) + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while casting your vote.") + return { + 'success': False, 'error': str(e), 'username': username, 'command': 'vote', 'timestamp': time.time(), 'already_logged': True + } + +@huey.task() +def handle_endpoll_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + adv_cmd_config = _get_advanced_cmd_config_from_shelve() + if not adv_cmd_config: + logger.error("Advanced commands config not found in shelve for handle_endpoll_task.") + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the endpoll command is currently not available (server configuration error).") + return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'endpoll'} + + adv_examples_instance = AdvancedExamples( + send_message_func=send_twitch_message, + is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])), + bot_username=adv_cmd_config["bot_username"], + channel=adv_cmd_config["channel"], + admin_users=adv_cmd_config.get("admin_users", []), + use_queue=True, + shelve_file_key_prefix="advfeat_q" + ) + try: + adv_examples_instance.cmd_end_poll(username, args, oauth_token) + log_command(username, "endpoll", "advanced", True, args) + return { + 'success': True, 'username': username, 'command': 'endpoll', 'timestamp': time.time(), 'already_logged': True + } + except Exception as e: + logger.error(f"Error in handle_endpoll_task: {e}\\n{traceback.format_exc()}") + log_command(username, "endpoll", "advanced", False, args) + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while trying to end the poll.") + return { + 'success': False, 'error': str(e), 'username': username, 'command': 'endpoll', 'timestamp': time.time(), 'already_logged': True + } + +@huey.task() +def handle_points_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + adv_cmd_config = _get_advanced_cmd_config_from_shelve() + if not adv_cmd_config: + logger.error("Advanced commands config not found in shelve for handle_points_task.") + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the points command is currently not available (server configuration error).") + return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'points'} + + adv_examples_instance = AdvancedExamples( + send_message_func=send_twitch_message, + is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])), + bot_username=adv_cmd_config["bot_username"], + channel=adv_cmd_config["channel"], + admin_users=adv_cmd_config.get("admin_users", []), + use_queue=True, + shelve_file_key_prefix="advfeat_q" + ) + try: + adv_examples_instance.cmd_points(username, args, oauth_token) + log_command(username, "points", "advanced", True, args) + return { + 'success': True, 'username': username, 'command': 'points', 'timestamp': time.time(), 'already_logged': True + } + except Exception as e: + logger.error(f"Error in handle_points_task: {e}\\n{traceback.format_exc()}") + log_command(username, "points", "advanced", False, args) + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while fetching points.") + return { + 'success': False, 'error': str(e), 'username': username, 'command': 'points', 'timestamp': time.time(), 'already_logged': True + } + +@huey.task() +def handle_givepoints_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]: + adv_cmd_config = _get_advanced_cmd_config_from_shelve() + if not adv_cmd_config: + logger.error("Advanced commands config not found in shelve for handle_givepoints_task.") + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the givepoints command is currently not available (server configuration error).") + return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'givepoints'} + + adv_examples_instance = AdvancedExamples( + send_message_func=send_twitch_message, + is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])), + bot_username=adv_cmd_config["bot_username"], + channel=adv_cmd_config["channel"], + admin_users=adv_cmd_config.get("admin_users", []), + use_queue=True, + shelve_file_key_prefix="advfeat_q" + ) + try: + adv_examples_instance.cmd_give_points(username, args, oauth_token) + log_command(username, "givepoints", "admin", True, args) + return { + 'success': True, 'username': username, 'command': 'givepoints', 'timestamp': time.time(), 'already_logged': True + } + except Exception as e: + logger.error(f"Error in handle_givepoints_task: {e}\\n{traceback.format_exc()}") + log_command(username, "givepoints", "admin", False, args) + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while giving points.") + return { + 'success': False, 'error': str(e), 'username': username, 'command': 'givepoints', 'timestamp': time.time(), 'already_logged': True + } + +def setup_advanced_commands_config(bot_username_main: str, channel_main: str, admin_users_main: List[str], oauth_token_main: str) -> None: + """Sets up the advanced command configuration in shelve for the consumer to load.""" + import shelve + config_to_save = { + "bot_username": bot_username_main, + "channel": channel_main, + "admin_users": admin_users_main if admin_users_main else [], + "oauth_token": oauth_token_main # This might be stale, tasks get fresh tokens + } + try: + with shelve.open(SHELVE_FILE) as db: + db[SETTINGS_KEY_ADV_CMD_CONFIG] = config_to_save + logger.info(f"Saved advanced command configuration to shelve: {config_to_save}") + except Exception as e: + logger.error(f"Failed to save advanced command config to shelve: {e}\\n{traceback.format_exc()}") + +def _load_and_register_advanced_commands() -> None: + """Loads config from shelve and registers advanced commands. Called at module import by consumer.""" + adv_cmd_config = _get_advanced_cmd_config_from_shelve() + if adv_cmd_config: + logger.info(f"Advanced command config loaded by consumer: {adv_cmd_config}") + + # Check if all necessary keys are present, although tasks will re-check + if not all(k in adv_cmd_config for k in ["bot_username", "channel"]): + logger.error("Consumer loaded incomplete advanced command config. Advanced commands may fail.") + return + + register_command("poll", handle_poll_task, "Start a poll. Usage: !poll \\\"Question\\\" \\\"Opt1\\\" \\\"Opt2\\\"...", "advanced") + register_command("vote", handle_vote_task, "Vote in an active poll. Usage: !vote ", "advanced") + register_command("endpoll", handle_endpoll_task, "End the current poll and show results.", "advanced") + register_command("points", handle_points_task, "Check your or another user's points. Usage: !points [@user]", "advanced") + register_command("givepoints", handle_givepoints_task, "Give points to a user (admin). Usage: !givepoints <@user> ", "admin") + + logger.info("Advanced commands (poll, vote, etc.) registered by consumer process.") + else: + logger.info("Consumer: No advanced command configuration found in shelve. Advanced commands will not be available via queue.") + +# Call this function at module level to register commands when consumer starts +_load_and_register_advanced_commands() + # Function to start the consumer directly (alternative to command line) def start_consumer() -> None: """