Refactor: Implement generalized command argument parsing with optional quotes
This commit introduces a significant refactoring to how command arguments are parsed. The primary goal is to allow users to optionally enclose arguments containing spaces in double quotes for all commands, providing a more flexible and intuitive command input experience.
Key changes include:
1. **Centralized Argument Parsing (`src/queue/server.py`):**
* The `process_chat_command` function in `src/queue/server.py` now handles the primary parsing of command arguments.
* Incoming raw arguments (previously a list of strings split by spaces) are joined into a single string.
* This string is then processed using the regular expression `re.findall(r'"([^"]*)"|(\S+)', ...)` to accurately identify and separate arguments. This regex correctly distinguishes between:
* Substrings enclosed in double quotes (e.g., `"my poll question"`), which are treated as single arguments.
* Sequences of non-whitespace characters (e.g., `1`, `myoption`), also treated as single arguments.
* This ensures that all command handlers receive a list of arguments (`parsed_args`) where multi-word arguments (if originally quoted) are correctly preserved as single elements.
* A correction was made to the regex pattern: an initial version mistakenly used `(\\\\S+)` (matching a literal backslash then 'S') instead of the intended `(\\S+)` (matching any non-whitespace sequence) for unquoted arguments. This fix ensures that simple unquoted arguments like numbers (e.g., for the `!vote` command) are parsed correctly.
* The `re` module import was ensured.
2. **Simplification of Poll Command (`src/features/examples.py`):**
* The `cmd_start_poll` method in `src/features/examples.py` has been simplified. Its specific regex-based logic for parsing quoted poll questions and options was removed.
* This method now relies on the generalized, upstream parsing performed by `process_chat_command` in `src/queue/server.py`. It expects to receive a list of arguments that have already been correctly parsed.
* Test calls for `cmd_start_poll` within the `if __name__ == '__main__':` block were updated to align with this change, demonstrating how arguments (like poll questions and options) are now passed as a simple list of strings.
These changes collectively improve the robustness and usability of the command system by standardizing argument parsing and allowing for more complex argument values.
This commit is contained in:
parent
c43cb5e0ce
commit
c17f382c60
5 changed files with 665 additions and 204 deletions
75
main.py
75
main.py
|
|
@ -205,11 +205,12 @@ def main() -> None:
|
|||
client_secret=client_secret,
|
||||
channel=channel,
|
||||
use_queue=args.use_queue,
|
||||
command_prefix=config.get("command_prefix", "!"),
|
||||
token_cache_file=args.token_cache,
|
||||
admin_users=admin_users,
|
||||
force_new_token=args.force_new_token,
|
||||
access_token=None, # Explicitly set to None to avoid using env vars
|
||||
refresh_token=None # Explicitly set to None to avoid using env vars
|
||||
refresh_token=None # Explicitly set to None
|
||||
)
|
||||
|
||||
# Register basic commands
|
||||
|
|
@ -246,17 +247,71 @@ def main() -> None:
|
|||
if args.advanced:
|
||||
try:
|
||||
from src.features.examples import AdvancedExamples
|
||||
import traceback # For more detailed error logging
|
||||
print("Enabling advanced features...")
|
||||
advanced: AdvancedExamples = AdvancedExamples(bot)
|
||||
print("Advanced features enabled: polls, voting, timers, points system")
|
||||
if os.name == 'nt':
|
||||
print("Sound effects enabled (Windows only)")
|
||||
# Create sounds directory if it doesn't exist
|
||||
if not os.path.exists("sounds"):
|
||||
os.makedirs("sounds")
|
||||
print("Created 'sounds' directory. Add .wav files to use with !sound command")
|
||||
|
||||
if args.use_queue:
|
||||
from src.queue.server import setup_advanced_commands_config
|
||||
setup_advanced_commands_config(
|
||||
bot_username_main=bot.username,
|
||||
channel_main=channel,
|
||||
admin_users_main=admin_users if admin_users else [],
|
||||
oauth_token_main=bot.access_token if bot.access_token else ""
|
||||
)
|
||||
print("Advanced features configuration (poll, points, etc.) saved for queue processing.")
|
||||
print("The queue consumer will register these commands: !poll, !vote, !endpoll, !points, !givepoints.")
|
||||
|
||||
else: # Not using queue, direct registration with bot instance
|
||||
|
||||
# Wrapper for send_message_func for AdvancedExamples
|
||||
# Expected: (channel, oauth_token, bot_username, message)
|
||||
def non_queue_send_message_wrapper(ch_ignored: str, oauth_ignored: Optional[str], bot_user_ignored: str, message: str):
|
||||
bot.send_message(message) # bot.send_message handles its own channel context
|
||||
|
||||
# Wrapper for is_admin_func for AdvancedExamples
|
||||
# Expected: (username, channel, admin_list)
|
||||
def non_queue_is_admin_wrapper(usr: str, chan_ignored: str, admin_list_ignored: List[str]) -> bool:
|
||||
return bot.is_admin(usr) # bot.is_admin uses its own channel and admin_list
|
||||
|
||||
advanced_instance = AdvancedExamples(
|
||||
send_message_func=non_queue_send_message_wrapper,
|
||||
is_admin_func=non_queue_is_admin_wrapper,
|
||||
bot_username=bot.username,
|
||||
channel=channel, # Pass the specific channel to AdvancedExamples
|
||||
admin_users=admin_users if admin_users else [], # Pass specific admin_users
|
||||
use_queue=False
|
||||
)
|
||||
|
||||
# Register commands with the bot instance
|
||||
# Lambdas pass username, args, and the bot's current access_token
|
||||
bot.register_command("poll", lambda u, a, b_inst: advanced_instance.cmd_start_poll(u, a, b_inst.access_token))
|
||||
bot.register_command("vote", lambda u, a, b_inst: advanced_instance.cmd_vote(u, a, b_inst.access_token))
|
||||
bot.register_command("endpoll", lambda u, a, b_inst: advanced_instance.cmd_end_poll(u, a, b_inst.access_token))
|
||||
bot.register_command("points", lambda u, a, b_inst: advanced_instance.cmd_points(u, a, b_inst.access_token))
|
||||
# Using "givepoints" for consistency with queue registration
|
||||
bot.register_command("givepoints", lambda u, a, b_inst: advanced_instance.cmd_give_points(u, a, b_inst.access_token))
|
||||
|
||||
print("Advanced features enabled directly: polls, voting, points system.")
|
||||
print("Use !poll, !vote, !endpoll, !points, !givepoints.")
|
||||
|
||||
# Timer and Sound commands are handled by AdvancedExamples based on its use_queue flag
|
||||
# For non-queue mode, AdvancedExamples will allow them.
|
||||
bot.register_command("timer", lambda u, a, b_inst: advanced_instance.cmd_timer(u,a,b_inst.access_token))
|
||||
print("Timer command !timer enabled.")
|
||||
if os.name == 'nt':
|
||||
bot.register_command("sound", lambda u, a, b_inst: advanced_instance.cmd_play_sound(u, a, b_inst.access_token))
|
||||
print("Sound effects !sound enabled (Windows only).")
|
||||
# Create sounds directory if it doesn't exist (moved from previous unconditional spot)
|
||||
if not os.path.exists("sounds"):
|
||||
os.makedirs("sounds")
|
||||
print("Created 'sounds' directory. Add .wav files to use with !sound command")
|
||||
|
||||
except ImportError as e:
|
||||
print(f"Failed to import advanced features: {e}")
|
||||
print(f"Failed to import or configure advanced features: {e}")
|
||||
print(traceback.format_exc()) # Print full traceback
|
||||
except Exception as e:
|
||||
print(f"Error setting up advanced features: {e}")
|
||||
print(traceback.format_exc()) # Print full traceback
|
||||
|
||||
# Enable game control if requested
|
||||
if args.game_control:
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ class TwitchBot:
|
|||
def __init__(self, username: str, client_id: str = None, client_secret: str = None,
|
||||
access_token: str = None, refresh_token: str = None,
|
||||
channel: str = None, use_queue: bool = False,
|
||||
command_prefix: str = "!",
|
||||
token_cache_file: str = "data/cache/token_cache.json",
|
||||
admin_users: List[str] = None,
|
||||
force_new_token: bool = False) -> None:
|
||||
|
|
@ -32,6 +33,7 @@ class TwitchBot:
|
|||
self.client_id: Optional[str] = client_id or os.environ.get("TWITCH_CLIENT_ID")
|
||||
self.client_secret: Optional[str] = client_secret or os.environ.get("TWITCH_CLIENT_SECRET")
|
||||
self.channel: str = channel.lower() if channel else ""
|
||||
self.command_prefix: str = command_prefix
|
||||
self.server: str = 'irc.chat.twitch.tv'
|
||||
self.port: int = 6667
|
||||
self.socket: socket.socket = socket.socket()
|
||||
|
|
@ -243,23 +245,26 @@ class TwitchBot:
|
|||
username: str = message_data['username']
|
||||
|
||||
# If this is a regular message (not a command), handle it
|
||||
if not content.startswith('!'):
|
||||
if not content.startswith(self.command_prefix):
|
||||
if self.use_queue:
|
||||
# If using queue, send the message to the queue
|
||||
enqueue_message(username, content)
|
||||
return
|
||||
|
||||
# At this point, we know this is a command (starts with !)
|
||||
parts: List[str] = content.split()
|
||||
command: str = parts[0][1:].lower() # Remove ! and convert to lowercase
|
||||
args: List[str] = parts[1:] if len(parts) > 1 else []
|
||||
# Make sure the content is longer than the prefix itself before trying to split
|
||||
if len(content) <= len(self.command_prefix):
|
||||
return # Not a valid command format
|
||||
|
||||
command_parts: List[str] = content[len(self.command_prefix):].split()
|
||||
command_name: str = command_parts[0].lower()
|
||||
args: List[str] = command_parts[1:]
|
||||
|
||||
# Store the last executed command
|
||||
self.last_command = command
|
||||
self.last_command = command_name
|
||||
|
||||
# Log if this is a controller command
|
||||
if command in self.commands:
|
||||
logger.info(f"Recognized command: !{command}")
|
||||
if command_name in self.commands:
|
||||
logger.info(f"Recognized command: {self.command_prefix}{command_name}")
|
||||
|
||||
# If using queue, just enqueue the command without any processing
|
||||
if self.use_queue:
|
||||
|
|
@ -269,7 +274,7 @@ class TwitchBot:
|
|||
|
||||
enqueue_command(
|
||||
username,
|
||||
command,
|
||||
command_name,
|
||||
args,
|
||||
channel=self.channel,
|
||||
oauth_token=self.oauth_token,
|
||||
|
|
@ -278,11 +283,11 @@ class TwitchBot:
|
|||
return
|
||||
|
||||
# Direct execution (no queue)
|
||||
if command in self.commands:
|
||||
if command_name in self.commands:
|
||||
try:
|
||||
self.commands[command](username, args, self)
|
||||
self.commands[command_name](username, args, self)
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing command {command}: {e}\n{traceback.format_exc()}")
|
||||
logger.error(f"Error executing command {self.command_prefix}{command_name}: {e}\n{traceback.format_exc()}")
|
||||
|
||||
def start(self, use_queue: bool = False) -> None:
|
||||
"""Start the bot and listen for messages"""
|
||||
|
|
|
|||
27
src/core/utils.py
Normal file
27
src/core/utils.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
from typing import List
|
||||
# Add a standalone version of is_admin for use by other modules
|
||||
def is_admin(username: str, channel: str = None, admin_users: List[str] = None) -> bool:
|
||||
"""
|
||||
Check if a user has admin privileges
|
||||
|
||||
Args:
|
||||
username: The username to check
|
||||
channel: The channel name (channel owner is always an admin)
|
||||
admin_users: Optional list of additional admin users
|
||||
|
||||
Returns:
|
||||
bool: True if the user is an admin, False otherwise
|
||||
"""
|
||||
# Normalize usernames to lowercase
|
||||
username = username.lower()
|
||||
|
||||
# Create admin list - channel owner is always an admin
|
||||
admins = []
|
||||
if channel:
|
||||
admins.append(channel.lower())
|
||||
|
||||
# Add additional admins if provided
|
||||
if admin_users:
|
||||
admins.extend([user.lower() for user in admin_users])
|
||||
|
||||
return username in admins
|
||||
|
|
@ -3,8 +3,11 @@ import threading
|
|||
import subprocess
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, List, Any, Optional, Set, Tuple, Union, cast
|
||||
from src.core.twitch import TwitchBot
|
||||
import shelve
|
||||
import re
|
||||
from typing import Dict, List, Any, Optional, Set, Tuple, Union, cast, Callable
|
||||
|
||||
SHELVE_FILE_ADVANCED_FEATURES = "data/cache/advanced_features_state"
|
||||
|
||||
class AdvancedExamples:
|
||||
"""
|
||||
|
|
@ -15,27 +18,90 @@ class AdvancedExamples:
|
|||
- User points system
|
||||
"""
|
||||
|
||||
def __init__(self, bot: TwitchBot) -> None:
|
||||
self.bot: TwitchBot = bot
|
||||
def __init__(self,
|
||||
send_message_func: Callable[[str, str, str, str], None],
|
||||
is_admin_func: Callable[[str, str, List[str]], bool],
|
||||
bot_username: str,
|
||||
channel: str,
|
||||
admin_users: List[str],
|
||||
use_queue: bool = False,
|
||||
shelve_file_key_prefix: str = "advfeat") -> None:
|
||||
self.send_message_func = send_message_func
|
||||
self.is_admin_func = is_admin_func
|
||||
self.bot_username = bot_username
|
||||
self.channel = channel
|
||||
self.admin_users = admin_users
|
||||
self.use_queue = use_queue
|
||||
self.shelve_file_key_prefix = shelve_file_key_prefix
|
||||
|
||||
self.votes: Dict[str, int] = {}
|
||||
self.current_poll: Optional[Dict[str, Any]] = None
|
||||
self.poll_active: bool = False
|
||||
self.timer_threads: Dict[str, threading.Timer] = {}
|
||||
self.user_points: Dict[str, int] = self.load_points()
|
||||
|
||||
# Register commands
|
||||
self.bot.register_command("poll", self.cmd_start_poll)
|
||||
self.bot.register_command("vote", self.cmd_vote)
|
||||
self.bot.register_command("endpoll", self.cmd_end_poll)
|
||||
self.bot.register_command("timer", self.cmd_timer)
|
||||
self.bot.register_command("points", self.cmd_points)
|
||||
self.bot.register_command("give", self.cmd_give_points)
|
||||
|
||||
# For external application control (example)
|
||||
if os.name == 'nt': # Windows
|
||||
self.bot.register_command("sound", self.cmd_play_sound)
|
||||
|
||||
def load_points(self) -> Dict[str, int]:
|
||||
self.user_points: Dict[str, int] = {}
|
||||
|
||||
if self.use_queue:
|
||||
self._load_poll_state()
|
||||
self._load_user_points()
|
||||
else:
|
||||
self.user_points = self._load_points_from_json_file()
|
||||
|
||||
def _get_shelve_db(self):
|
||||
return shelve.open(SHELVE_FILE_ADVANCED_FEATURES)
|
||||
|
||||
def _load_poll_state(self) -> None:
|
||||
if not self.use_queue:
|
||||
return
|
||||
try:
|
||||
with self._get_shelve_db() as db:
|
||||
poll_state_key = f"{self.shelve_file_key_prefix}_poll"
|
||||
state = db.get(poll_state_key, {})
|
||||
self.current_poll = state.get("current_poll")
|
||||
self.poll_active = state.get("poll_active", False)
|
||||
self.votes = state.get("votes", {})
|
||||
except Exception as e:
|
||||
print(f"Error loading poll state from shelve: {e}")
|
||||
self.current_poll = None
|
||||
self.poll_active = False
|
||||
self.votes = {}
|
||||
|
||||
def _save_poll_state(self) -> None:
|
||||
if not self.use_queue:
|
||||
return
|
||||
try:
|
||||
with self._get_shelve_db() as db:
|
||||
poll_state_key = f"{self.shelve_file_key_prefix}_poll"
|
||||
db[poll_state_key] = {
|
||||
"current_poll": self.current_poll,
|
||||
"poll_active": self.poll_active,
|
||||
"votes": self.votes
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Error saving poll state to shelve: {e}")
|
||||
|
||||
def _load_user_points(self) -> None:
|
||||
if not self.use_queue:
|
||||
return
|
||||
try:
|
||||
with self._get_shelve_db() as db:
|
||||
points_key = f"{self.shelve_file_key_prefix}_points"
|
||||
self.user_points = db.get(points_key, {})
|
||||
except Exception as e:
|
||||
print(f"Error loading user points from shelve: {e}")
|
||||
self.user_points = {}
|
||||
|
||||
def _save_user_points(self) -> None:
|
||||
if not self.use_queue:
|
||||
self._save_points_to_json_file()
|
||||
return
|
||||
try:
|
||||
with self._get_shelve_db() as db:
|
||||
points_key = f"{self.shelve_file_key_prefix}_points"
|
||||
db[points_key] = self.user_points
|
||||
except Exception as e:
|
||||
print(f"Error saving user points to shelve: {e}")
|
||||
|
||||
def _load_points_from_json_file(self) -> Dict[str, int]:
|
||||
"""Load user points from a JSON file"""
|
||||
try:
|
||||
with open('user_points.json', 'r') as f:
|
||||
|
|
@ -43,27 +109,23 @@ class AdvancedExamples:
|
|||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return {}
|
||||
|
||||
def save_points(self) -> None:
|
||||
def _save_points_to_json_file(self) -> None:
|
||||
"""Save user points to a JSON file"""
|
||||
with open('user_points.json', 'w') as f:
|
||||
json.dump(self.user_points, f)
|
||||
|
||||
# ---- Voting System ----
|
||||
|
||||
def cmd_start_poll(self, username: str, args: List[str], bot: TwitchBot) -> None:
|
||||
"""Start a new poll with options"""
|
||||
def cmd_start_poll(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None:
|
||||
if self.use_queue: self._load_poll_state()
|
||||
|
||||
if not args or len(args) < 3:
|
||||
bot.send_message(f"@{username}, usage: !poll \"Question\" \"Option1\" \"Option2\" [\"Option3\"...]")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, usage: !poll \"Question\" \"Option1\" \"Option2\" [\"Option3\"...]")
|
||||
return
|
||||
|
||||
if self.poll_active:
|
||||
bot.send_message(f"@{username}, a poll is already active. End it with !endpoll first.")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, a poll is already active. End it with !endpoll first.")
|
||||
return
|
||||
|
||||
# Reset votes
|
||||
self.votes = {}
|
||||
|
||||
# Extract question and options
|
||||
question: str = args[0]
|
||||
options: List[str] = args[1:]
|
||||
|
||||
|
|
@ -73,198 +135,263 @@ class AdvancedExamples:
|
|||
"started_by": username,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
|
||||
self.poll_active = True
|
||||
|
||||
# Announce poll
|
||||
bot.send_message(f"POLL STARTED: {question}")
|
||||
for i, option in enumerate(options):
|
||||
bot.send_message(f"Option {i+1}: {option}")
|
||||
bot.send_message("Vote using !vote <number>")
|
||||
if self.use_queue: self._save_poll_state()
|
||||
|
||||
def cmd_vote(self, username: str, args: List[str], bot: TwitchBot) -> None:
|
||||
"""Cast a vote in the active poll"""
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"POLL STARTED: {question}")
|
||||
for i, option in enumerate(options):
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"Option {i+1}: {option}")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, "Vote using !vote <number>")
|
||||
|
||||
def cmd_vote(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None:
|
||||
if self.use_queue: self._load_poll_state()
|
||||
|
||||
if not self.poll_active or not self.current_poll:
|
||||
bot.send_message(f"@{username}, there is no active poll.")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, there is no active poll.")
|
||||
return
|
||||
|
||||
if not args:
|
||||
bot.send_message(f"@{username}, please specify an option number: !vote <number>")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, please specify an option number: !vote <number>")
|
||||
return
|
||||
|
||||
try:
|
||||
vote: int = int(args[0])
|
||||
if vote < 1 or vote > len(self.current_poll["options"]):
|
||||
vote_val: int = int(args[0])
|
||||
if vote_val < 1 or vote_val > len(self.current_poll["options"]):
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
bot.send_message(f"@{username}, please provide a valid option number between 1 and {len(self.current_poll['options'])}")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, please provide a valid option number between 1 and {len(self.current_poll['options'])}")
|
||||
return
|
||||
|
||||
# Record the vote
|
||||
self.votes[username] = vote
|
||||
bot.send_message(f"@{username} voted for option {vote}: {self.current_poll['options'][vote-1]}")
|
||||
self.votes[username] = vote_val
|
||||
if self.use_queue: self._save_poll_state()
|
||||
|
||||
def cmd_end_poll(self, username: str, args: List[str], bot: TwitchBot) -> None:
|
||||
"""End the active poll and show results"""
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username} voted for option {vote_val}: {self.current_poll['options'][vote_val-1]}")
|
||||
|
||||
def cmd_end_poll(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None:
|
||||
if self.use_queue: self._load_poll_state()
|
||||
|
||||
if not self.poll_active or not self.current_poll:
|
||||
bot.send_message(f"@{username}, there is no active poll.")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, there is no active poll.")
|
||||
return
|
||||
|
||||
if username != self.current_poll["started_by"] and not bot.is_admin(username):
|
||||
bot.send_message(f"@{username}, only {self.current_poll['started_by']} or an admin can end this poll.")
|
||||
if username != self.current_poll["started_by"] and \
|
||||
not self.is_admin_func(username, self.channel, self.admin_users):
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, only {self.current_poll['started_by']} or an admin can end this poll.")
|
||||
return
|
||||
|
||||
# Count votes
|
||||
results: Dict[int, int] = {}
|
||||
for i in range(1, len(self.current_poll["options"]) + 1):
|
||||
results[i] = 0
|
||||
|
||||
for vote in self.votes.values():
|
||||
results[vote] += 1
|
||||
for vote_val in self.votes.values():
|
||||
results[vote_val] += 1
|
||||
|
||||
vote_str: str = "vote" if len(self.votes) == 1 else "votes"
|
||||
|
||||
# Find winner(s)
|
||||
max_votes: int = max(results.values()) if results else 0
|
||||
winners: List[int] = [opt for opt, count in results.items() if count == max_votes]
|
||||
|
||||
# Announce results
|
||||
bot.send_message(f"POLL ENDED: {self.current_poll['question']}")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"POLL ENDED: {self.current_poll['question']}")
|
||||
for i, option in enumerate(self.current_poll["options"], 1):
|
||||
bot.send_message(f"Option {i}: {option} - {results.get(i, 0)} votes")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"Option {i}: {option} - {results.get(i, 0)} {vote_str}")
|
||||
|
||||
if max_votes > 0:
|
||||
if len(winners) == 1:
|
||||
winner_option: str = self.current_poll["options"][winners[0]-1]
|
||||
bot.send_message(f"The winner is: {winner_option} with {max_votes} votes!")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"The winner is: {winner_option} with {max_votes} {vote_str}!")
|
||||
else:
|
||||
tied_options: List[str] = [self.current_poll["options"][w-1] for w in winners]
|
||||
bot.send_message(f"It's a tie between: {', '.join(tied_options)} with {max_votes} votes each!")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"It's a tie between: {', '.join(tied_options)} with {max_votes} {vote_str} each!")
|
||||
else:
|
||||
bot.send_message("No votes were cast.")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, "No votes were cast.")
|
||||
|
||||
# Reset poll
|
||||
self.poll_active = False
|
||||
self.current_poll = None
|
||||
self.votes = {}
|
||||
if self.use_queue: self._save_poll_state()
|
||||
|
||||
# ---- Timer System ----
|
||||
|
||||
def timer_callback(self, timer_id: str, message: str) -> None:
|
||||
"""Callback function for timers"""
|
||||
self.bot.send_message(message)
|
||||
del self.timer_threads[timer_id]
|
||||
def timer_callback(self, timer_id: str, message: str, oauth_token: Optional[str] = None) -> None:
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, message)
|
||||
if timer_id in self.timer_threads:
|
||||
del self.timer_threads[timer_id]
|
||||
|
||||
def cmd_timer(self, username: str, args: List[str], bot: TwitchBot) -> None:
|
||||
"""Set a timer to send a message after X seconds"""
|
||||
def cmd_timer(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None:
|
||||
if self.use_queue:
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, the !timer command is not available when using the queue system in this version.")
|
||||
return
|
||||
|
||||
if len(args) < 2:
|
||||
bot.send_message(f"@{username}, usage: !timer <seconds> <message>")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, usage: !timer <seconds> <message>")
|
||||
return
|
||||
|
||||
try:
|
||||
seconds: int = int(args[0])
|
||||
if seconds <= 0 or seconds > 3600: # Limit to 1 hour
|
||||
if seconds <= 0 or seconds > 3600:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
bot.send_message(f"@{username}, please provide a valid time between 1-3600 seconds.")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, please provide a valid time between 1-3600 seconds.")
|
||||
return
|
||||
|
||||
message: str = f"⏰ TIMER ({username}): {' '.join(args[1:])}"
|
||||
bot.send_message(f"@{username}, timer set for {seconds} seconds!")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, timer set for {seconds} seconds!")
|
||||
|
||||
# Create a unique ID for this timer
|
||||
timer_id: str = f"{username}_{int(time.time())}"
|
||||
|
||||
# Create and start the timer thread
|
||||
timer_thread: threading.Timer = threading.Timer(seconds, self.timer_callback, args=[timer_id, message])
|
||||
timer_thread: threading.Timer = threading.Timer(seconds, self.timer_callback, args=[timer_id, message, oauth_token])
|
||||
timer_thread.daemon = True
|
||||
timer_thread.start()
|
||||
|
||||
# Store the thread
|
||||
self.timer_threads[timer_id] = timer_thread
|
||||
|
||||
# ---- Points System ----
|
||||
|
||||
def cmd_points(self, username: str, args: List[str], bot: TwitchBot) -> None:
|
||||
"""Check user points"""
|
||||
target_user: str = username
|
||||
def cmd_points(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None:
|
||||
if self.use_queue: self._load_user_points()
|
||||
|
||||
target_user: str = username.lower()
|
||||
if args and args[0].startswith('@'):
|
||||
target_user = args[0][1:].lower()
|
||||
elif args :
|
||||
target_user = args[0].lower()
|
||||
|
||||
points: int = self.user_points.get(target_user, 0)
|
||||
bot.send_message(f"@{username}, {target_user} has {points} points.")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, {target_user} has {points} points.")
|
||||
|
||||
def cmd_give_points(self, username: str, args: List[str], bot: TwitchBot) -> None:
|
||||
"""Give points to a user (admin only)"""
|
||||
if not bot.is_admin(username):
|
||||
bot.send_message(f"@{username}, only the channel owner or admins can give points.")
|
||||
def cmd_give_points(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None:
|
||||
if not self.is_admin_func(username, self.channel, self.admin_users):
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, only the channel owner or admins can give points.")
|
||||
return
|
||||
|
||||
if len(args) < 2:
|
||||
bot.send_message(f"@{username}, usage: !give <user> <points>")
|
||||
|
||||
if len(args) < 2 or not args[1].isdigit():
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, usage: !give <@user_or_user> <points_amount>")
|
||||
return
|
||||
|
||||
target_user: str = args[0]
|
||||
|
||||
if self.use_queue: self._load_user_points()
|
||||
|
||||
target_user: str = args[0].lower()
|
||||
if target_user.startswith('@'):
|
||||
target_user = target_user[1:]
|
||||
target_user = target_user.lower()
|
||||
|
||||
try:
|
||||
points: int = int(args[1])
|
||||
if points <= 0:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
bot.send_message(f"@{username}, please provide a valid positive number of points.")
|
||||
|
||||
amount: int = int(args[1])
|
||||
|
||||
self.user_points[target_user] = self.user_points.get(target_user, 0) + amount
|
||||
if self.use_queue: self._save_user_points()
|
||||
else:
|
||||
self._save_points_to_json_file()
|
||||
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username} gave {amount} points to {target_user}. They now have {self.user_points[target_user]} points.")
|
||||
|
||||
def cmd_play_sound(self, username: str, args: List[str], oauth_token: Optional[str] = None) -> None:
|
||||
if self.use_queue:
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, the !sound command is not available when using the queue system in this version.")
|
||||
return
|
||||
|
||||
# Add points to user
|
||||
self.user_points[target_user] = self.user_points.get(target_user, 0) + points
|
||||
self.save_points()
|
||||
|
||||
bot.send_message(f"@{target_user}, you received {points} points from {username}! You now have {self.user_points[target_user]} points.")
|
||||
|
||||
# ---- External Control ----
|
||||
|
||||
def cmd_play_sound(self, username: str, args: List[str], bot: TwitchBot) -> None:
|
||||
"""Play a sound effect (Windows only)"""
|
||||
|
||||
if os.name != 'nt':
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, sound effects are only supported on Windows.")
|
||||
return
|
||||
|
||||
if not args:
|
||||
bot.send_message(f"@{username}, usage: !sound <sound_name>")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, usage: !sound <sound_name_without_extension>")
|
||||
try:
|
||||
sounds_dir = "sounds"
|
||||
available_sounds = [f.split('.')[0] for f in os.listdir(sounds_dir) if f.endswith(".wav")]
|
||||
if available_sounds:
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"Available sounds: {', '.join(available_sounds)}")
|
||||
else:
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, "No sound files found in 'sounds' directory.")
|
||||
except FileNotFoundError:
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, "The 'sounds' directory does not exist.")
|
||||
return
|
||||
|
||||
sound_name: str = args[0].lower()
|
||||
sound_files: Dict[str, str] = {
|
||||
"applause": "applause.wav",
|
||||
"drumroll": "drumroll.wav",
|
||||
"fail": "fail.wav",
|
||||
"victory": "victory.wav"
|
||||
}
|
||||
sound_name: str = args[0]
|
||||
sound_file_path: str = os.path.join("sounds", f"{sound_name}.wav")
|
||||
|
||||
if sound_name not in sound_files:
|
||||
bot.send_message(f"@{username}, available sounds: {', '.join(sound_files.keys())}")
|
||||
if not os.path.exists(sound_file_path):
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, sound '{sound_name}' not found.")
|
||||
return
|
||||
|
||||
sound_path: str = os.path.join("sounds", sound_files[sound_name])
|
||||
|
||||
if not os.path.exists(sound_path):
|
||||
bot.send_message(f"@{username}, sound file not found: {sound_path}")
|
||||
return
|
||||
|
||||
# Play the sound using Windows PowerShell
|
||||
try:
|
||||
ps_command: str = f'powershell -c (New-Object Media.SoundPlayer "{sound_path}").PlaySync();'
|
||||
subprocess.Popen(ps_command, shell=True)
|
||||
bot.send_message(f"@{username} played the {sound_name} sound!")
|
||||
subprocess.Popen([
|
||||
"powershell",
|
||||
"-c",
|
||||
f"(New-Object Media.SoundPlayer '{sound_file_path}').PlaySync();"
|
||||
], creationflags=subprocess.CREATE_NO_WINDOW)
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username} played sound: {sound_name}")
|
||||
except Exception as e:
|
||||
bot.send_message(f"Error playing sound: {e}")
|
||||
self.send_message_func(self.channel, oauth_token or "", self.bot_username, f"@{username}, failed to play sound {sound_name}: {e}")
|
||||
print(f"Error playing sound: {e}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
mock_admin_users = ["adminuser", "streamer"]
|
||||
mock_channel = "testchannel"
|
||||
mock_bot_username = "testbot"
|
||||
|
||||
# Example usage in main.py:
|
||||
"""
|
||||
from examples import AdvancedExamples
|
||||
def mock_send_message(channel, oauth_token, bot_username, message):
|
||||
print(f"[CHAT] To #{channel} (as {bot_username}): {message}")
|
||||
|
||||
def main():
|
||||
# ... set up bot ...
|
||||
def mock_is_admin(username, channel, admin_list):
|
||||
return username.lower() in admin_list or username.lower() == channel.lower()
|
||||
|
||||
print("\n--- Testing Non-Queue Mode ---")
|
||||
adv_ex_no_queue = AdvancedExamples(
|
||||
send_message_func=mock_send_message,
|
||||
is_admin_func=mock_is_admin,
|
||||
bot_username=mock_bot_username,
|
||||
channel=mock_channel,
|
||||
admin_users=mock_admin_users,
|
||||
use_queue=False
|
||||
)
|
||||
adv_ex_no_queue.cmd_start_poll("streamer", ["Favorite Color?", "Red", "Blue", "Green"])
|
||||
adv_ex_no_queue.cmd_vote("user1", ["1"])
|
||||
adv_ex_no_queue.cmd_vote("user2", ["2"])
|
||||
adv_ex_no_queue.cmd_vote("user3", ["1"])
|
||||
adv_ex_no_queue.cmd_end_poll("streamer", [])
|
||||
|
||||
# Initialize advanced examples
|
||||
advanced = AdvancedExamples(bot)
|
||||
adv_ex_no_queue.cmd_points("user1", [])
|
||||
adv_ex_no_queue.cmd_give_points("adminuser", ["user1", "100"])
|
||||
adv_ex_no_queue.cmd_points("user1", [])
|
||||
adv_ex_no_queue.cmd_points("adminuser", ["@user1"])
|
||||
|
||||
print("\n\n--- Testing Queue Mode ---")
|
||||
os.makedirs("data/cache", exist_ok=True)
|
||||
|
||||
# ... start bot ...
|
||||
"""
|
||||
shelve_path = f"{SHELVE_FILE_ADVANCED_FEATURES}.db"
|
||||
if os.path.exists(shelve_path):
|
||||
os.remove(shelve_path)
|
||||
shelve_dir_path = f"{SHELVE_FILE_ADVANCED_FEATURES}.dir"
|
||||
if os.path.exists(shelve_dir_path):
|
||||
import shutil
|
||||
shutil.rmtree(shelve_dir_path)
|
||||
|
||||
adv_ex_queue = AdvancedExamples(
|
||||
send_message_func=mock_send_message,
|
||||
is_admin_func=mock_is_admin,
|
||||
bot_username=mock_bot_username,
|
||||
channel=mock_channel,
|
||||
admin_users=mock_admin_users,
|
||||
use_queue=True,
|
||||
shelve_file_key_prefix="test_adv_feat"
|
||||
)
|
||||
|
||||
mock_oauth = "mock_oauth_token"
|
||||
|
||||
adv_ex_queue.cmd_start_poll("streamer", ["Best Game?", "GameA", "GameB"], mock_oauth)
|
||||
adv_ex_queue.cmd_vote("viewerA", ["1"], mock_oauth)
|
||||
adv_ex_queue.cmd_vote("viewerB", ["2"], mock_oauth)
|
||||
adv_ex_queue.cmd_vote("viewerC", ["2"], mock_oauth)
|
||||
|
||||
adv_ex_queue_reader = AdvancedExamples(
|
||||
send_message_func=mock_send_message,
|
||||
is_admin_func=mock_is_admin,
|
||||
bot_username=mock_bot_username,
|
||||
channel=mock_channel,
|
||||
admin_users=mock_admin_users,
|
||||
use_queue=True,
|
||||
shelve_file_key_prefix="test_adv_feat"
|
||||
)
|
||||
adv_ex_queue_reader.cmd_end_poll("streamer", [], mock_oauth)
|
||||
|
||||
adv_ex_queue.cmd_points("viewerA", [], mock_oauth)
|
||||
adv_ex_queue.cmd_give_points("adminuser", ["viewerA", "50"], mock_oauth)
|
||||
adv_ex_queue_reader.cmd_points("viewerA", [], mock_oauth)
|
||||
adv_ex_queue_reader.cmd_give_points("streamer", ["viewerB", "75"], mock_oauth)
|
||||
adv_ex_queue.cmd_points("viewerB", ["@viewerB"], mock_oauth)
|
||||
|
||||
print(f"\nShelve file used: {SHELVE_FILE_ADVANCED_FEATURES}")
|
||||
|
|
@ -6,8 +6,12 @@ import time
|
|||
import random
|
||||
import cloudpickle
|
||||
import traceback
|
||||
import re # Ensure re is imported (it likely is already)
|
||||
import shlex # Added for robust parsing originally, but will use regex
|
||||
from typing import Dict, Any, Optional, List, Callable, Union, Tuple
|
||||
import json
|
||||
from src.features.examples import AdvancedExamples
|
||||
from src.core.utils import is_admin
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
|
||||
|
|
@ -27,6 +31,9 @@ SETTINGS_KEY_KEYBOARD_INITIALIZED = "keyboard_initialized"
|
|||
SETTINGS_KEY_ACTIVE_GAME = "active_game_name"
|
||||
GAME_CONFIG_FILE_PATH = "data/config/game_controls.json"
|
||||
|
||||
# Shelve key for advanced command configuration
|
||||
SETTINGS_KEY_ADV_CMD_CONFIG = "advanced_command_config" # New key
|
||||
|
||||
# Queue stats shelve keys
|
||||
STATS_KEY_TOTAL_ENQUEUED = "total_enqueued"
|
||||
STATS_KEY_TOTAL_PROCESSED = "total_processed"
|
||||
|
|
@ -684,7 +691,18 @@ register_command("8ball", handle_8ball_task, "Ask the magic 8ball a question", "
|
|||
|
||||
@huey.task()
|
||||
def handle_qstats_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
"""Handler for !qstats command"""
|
||||
"""Handler for !qstats command (admin only)"""
|
||||
if not is_admin(username, channel):
|
||||
message = f"@{username}, only channel admins can view queue statistics."
|
||||
send_twitch_message(channel, oauth_token, bot_username, message)
|
||||
return {
|
||||
'success': False,
|
||||
'username': username,
|
||||
'command': 'qstats',
|
||||
'error': 'not_admin',
|
||||
'timestamp': time.time()
|
||||
}
|
||||
|
||||
stats = get_queue_stats()
|
||||
success = send_twitch_message(channel, oauth_token, bot_username,
|
||||
f"Queue stats: {stats['total_processed']}/{stats['total_enqueued']} processed, {stats['pending']} pending")
|
||||
|
|
@ -705,7 +723,7 @@ def handle_qstats_task(username: str, args: List[str], channel: str, oauth_token
|
|||
}
|
||||
|
||||
# Register qstats command
|
||||
register_command("qstats", handle_qstats_task, "Show queue statistics", "queue")
|
||||
register_command("qstats", handle_qstats_task, "Show queue statistics (admin only)", "admin")
|
||||
|
||||
def log_command(username: str, command: str, command_type: str, success: bool = True, args: List[str] = None) -> None:
|
||||
"""
|
||||
|
|
@ -890,7 +908,6 @@ def handle_unknown_command_task(username: str, command: str, args: List[str], ch
|
|||
def handle_switchmode_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
"""Handler for !switchmode command to switch between keyboard and controller modes"""
|
||||
# Only allow admins to switch modes
|
||||
from src.core.twitch import is_admin
|
||||
if not is_admin(username, channel):
|
||||
message = f"@{username}, only channel admins can switch input modes"
|
||||
send_twitch_message(channel, oauth_token, bot_username, message)
|
||||
|
|
@ -981,47 +998,53 @@ register_command("switchmode", handle_switchmode_task,
|
|||
|
||||
# Register basic keyboard commands
|
||||
def register_keyboard_commands():
|
||||
"""Register keyboard commands based on the default game configuration."""
|
||||
"""Register keyboard commands based on the default game's abstract command names."""
|
||||
configs = load_game_configs()
|
||||
default_game_name = configs.get("default_game")
|
||||
if not default_game_name:
|
||||
logger.warning("No default_game specified in game_controls.json. Cannot register default keyboard commands.")
|
||||
return
|
||||
|
||||
default_game_config = configs.get("games", {}).get(default_game_name, {}).get("keyboard", {})
|
||||
if not default_game_config:
|
||||
# Get the keyboard commands section for the default game
|
||||
keyboard_config = configs.get("games", {}).get(default_game_name, {}).get("keyboard", {})
|
||||
if not keyboard_config:
|
||||
logger.warning(f"No keyboard configuration found for default game '{default_game_name}' in game_controls.json.")
|
||||
return
|
||||
|
||||
for cmd_name, cmd_details in default_game_config.items():
|
||||
description = cmd_details.get("description", f"Perform '{cmd_name}' keyboard action (game-specific)")
|
||||
register_command(cmd_name,
|
||||
lambda u, a, c, o, b, cmd=cmd_name: handle_keyboard_command_task(u, a, c, o, b, cmd),
|
||||
# Register each abstract command name found in the default game's keyboard config
|
||||
for abstract_cmd_name, cmd_details in keyboard_config.items():
|
||||
description = cmd_details.get("description", f"Perform '{abstract_cmd_name}' keyboard action (game-specific)")
|
||||
# The actual physical mapping (action_type, params) is inside cmd_details,
|
||||
# but execute_game_command will use get_current_game_control_config which reads it at runtime.
|
||||
register_command(abstract_cmd_name,
|
||||
lambda u, a, c, o, b, cmd=abstract_cmd_name: handle_keyboard_command_task(u, a, c, o, b, cmd),
|
||||
description,
|
||||
"keyboard")
|
||||
logger.info(f"Registered {len(default_game_config)} keyboard commands based on default game: {default_game_name}")
|
||||
logger.info(f"Registered {len(keyboard_config)} keyboard commands (abstract names) based on default game: {default_game_name}")
|
||||
|
||||
# Register basic controller commands
|
||||
def register_controller_commands():
|
||||
"""Register controller commands based on the default game configuration."""
|
||||
"""Register controller commands based on the default game's abstract command names."""
|
||||
configs = load_game_configs()
|
||||
default_game_name = configs.get("default_game")
|
||||
if not default_game_name:
|
||||
logger.warning("No default_game specified in game_controls.json. Cannot register default controller commands.")
|
||||
return
|
||||
|
||||
default_game_config = configs.get("games", {}).get(default_game_name, {}).get("controller", {})
|
||||
if not default_game_config:
|
||||
# Get the controller commands section for the default game
|
||||
controller_config = configs.get("games", {}).get(default_game_name, {}).get("controller", {})
|
||||
if not controller_config:
|
||||
logger.warning(f"No controller configuration found for default game '{default_game_name}' in game_controls.json.")
|
||||
return
|
||||
|
||||
for cmd_name, cmd_details in default_game_config.items():
|
||||
description = cmd_details.get("description", f"Perform '{cmd_name}' controller action (game-specific)")
|
||||
register_command(cmd_name,
|
||||
lambda u, a, c, o, b, cmd=cmd_name: handle_controller_command_task(u, a, c, o, b, cmd),
|
||||
# Register each abstract command name found in the default game's controller config
|
||||
for abstract_cmd_name, cmd_details in controller_config.items():
|
||||
description = cmd_details.get("description", f"Perform '{abstract_cmd_name}' controller action (game-specific)")
|
||||
register_command(abstract_cmd_name,
|
||||
lambda u, a, c, o, b, cmd=abstract_cmd_name: handle_controller_command_task(u, a, c, o, b, cmd),
|
||||
description,
|
||||
"controller")
|
||||
logger.info(f"Registered {len(default_game_config)} controller commands based on default game: {default_game_name}")
|
||||
logger.info(f"Registered {len(controller_config)} controller commands (abstract names) based on default game: {default_game_name}")
|
||||
|
||||
@huey.task()
|
||||
def handle_gamehelp_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
|
|
@ -1106,7 +1129,21 @@ def process_chat_command(username: str, command: str, args: List[str],
|
|||
|
||||
This task will be executed by the Huey consumer
|
||||
"""
|
||||
logger.info(f"Processing command '{command}' from {username} with args: {args}")
|
||||
logger.info(f"Processing command '{command}' from {username} with raw args: {args}")
|
||||
|
||||
# Reconstruct the argument string from the input args list
|
||||
raw_args_string = " ".join(args)
|
||||
|
||||
# Parse the raw_args_string to handle quoted arguments
|
||||
# This regex finds:
|
||||
# 1. Quoted strings: "([^"]*)" - captures content within double quotes
|
||||
# 2. Unquoted strings: (\S+) - captures sequences of non-whitespace characters
|
||||
matches = re.findall(r'"([^"]*)"|(\S+)', raw_args_string)
|
||||
# Convert matches to a flat list of arguments
|
||||
# Each match is a tuple, e.g., ('quoted content', '') or ('', 'unquoted_word')
|
||||
parsed_args = [match[0] if match[0] else match[1] for match in matches]
|
||||
|
||||
logger.info(f"Parsed args for '{command}': {parsed_args}")
|
||||
|
||||
# Record stats - this happens in the consumer process when the task is actually processed
|
||||
queue_stats.log_processed(f"command:{command}")
|
||||
|
|
@ -1116,64 +1153,67 @@ def process_chat_command(username: str, command: str, args: List[str],
|
|||
if command in command_registry:
|
||||
cmd_info = command_registry[command]
|
||||
try:
|
||||
# Execute the command handler
|
||||
# Execute the command handler with parsed_args
|
||||
logger.info(f"Found handler for '{command}', executing...")
|
||||
result = cmd_info.handler(username, args, channel, oauth_token, bot_username)
|
||||
result = cmd_info.handler(username, parsed_args, channel, oauth_token, bot_username)
|
||||
|
||||
# Log all commands, not just game commands
|
||||
# Skip logging if the command handler already logged it (check for already_logged flag)
|
||||
# Also skip keyboard and controller commands as they handle their own logging
|
||||
if result is None:
|
||||
# Handle case where result is None (command handler didn't return anything)
|
||||
log_command(username, command, cmd_info.category, False)
|
||||
log_command(username, command, cmd_info.category, False, parsed_args)
|
||||
elif command not in ["commandlog"] and not result.get('already_logged', False) and cmd_info.category not in ["keyboard", "controller"]:
|
||||
log_command(username, command, cmd_info.category, result.get('success', True), result.get('args', []))
|
||||
# Use parsed_args for logging if result.get('args') is not available or not relevant
|
||||
log_args = result.get('args', parsed_args) if isinstance(result, dict) else parsed_args
|
||||
log_command(username, command, cmd_info.category, result.get('success', True), log_args)
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
# Log full traceback for better debugging
|
||||
logger.error(f"Error executing command {command}: {e}\n{traceback.format_exc()}")
|
||||
logger.error(f"Error executing command {command}: {e}\\n{traceback.format_exc()}")
|
||||
# Log failed command
|
||||
if command in command_registry:
|
||||
log_command(username, command, command_registry[command].category, False)
|
||||
log_command(username, command, command_registry[command].category, False, parsed_args)
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e),
|
||||
'username': username,
|
||||
'command': command,
|
||||
'args': args,
|
||||
'args': parsed_args, # Return parsed_args
|
||||
'timestamp': time.time()
|
||||
}
|
||||
else:
|
||||
# Unknown command - try the fallback handler
|
||||
try:
|
||||
result = handle_unknown_command_task(username, command, args, channel, oauth_token, bot_username)
|
||||
# No need to log here, as the handler now uses registered commands
|
||||
result = handle_unknown_command_task(username, command, parsed_args, channel, oauth_token, bot_username)
|
||||
# Logging is handled by handle_unknown_command_task or if it calls a registered command
|
||||
return result
|
||||
except Exception as e:
|
||||
# Log full traceback for better debugging
|
||||
logger.error(f"Error handling unknown command {command}: {e}\n{traceback.format_exc()}")
|
||||
logger.error(f"Error handling unknown command {command}: {e}\\n{traceback.format_exc()}")
|
||||
# Log failed unknown command
|
||||
log_command(username, command, "unknown", False, args)
|
||||
log_command(username, command, "unknown", False, parsed_args)
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e),
|
||||
'username': username,
|
||||
'command': command,
|
||||
'args': args,
|
||||
'args': parsed_args, # Return parsed_args
|
||||
'timestamp': time.time()
|
||||
}
|
||||
except Exception as e:
|
||||
# Log full traceback for better debugging
|
||||
logger.error(f"Unexpected error in process_chat_command: {e}\n{traceback.format_exc()}")
|
||||
logger.error(f"Unexpected error in process_chat_command: {e}\\n{traceback.format_exc()}")
|
||||
# Log the error
|
||||
log_command(username, command, "unknown", False)
|
||||
log_command(username, command, "unknown", False, parsed_args)
|
||||
# Make sure we return a safely picklable object
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e),
|
||||
'username': username,
|
||||
'command': command,
|
||||
'args': parsed_args, # Return parsed_args
|
||||
'timestamp': time.time()
|
||||
}
|
||||
|
||||
|
|
@ -1240,7 +1280,7 @@ def handle_ping_task(username: str, args: List[str], channel: str, oauth_token:
|
|||
}
|
||||
|
||||
# Register ping command
|
||||
register_command("ping", handle_ping_task, "Check if the bot is responding", "general")
|
||||
#register_command("ping", handle_ping_task, "Check if the bot is responding", "admin")
|
||||
|
||||
@huey.task()
|
||||
def handle_help_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
|
|
@ -1266,7 +1306,6 @@ def handle_help_task(username: str, args: List[str], channel: str, oauth_token:
|
|||
f"Queue commands: {', '.join(queue_cmds)}")
|
||||
|
||||
# Show admin commands if user is admin
|
||||
from src.core.twitch import is_admin
|
||||
if is_admin(username, channel) and "admin" in categories:
|
||||
admin_cmds = [f"!{cmd}" for cmd, _ in categories["admin"]]
|
||||
send_twitch_message(channel, oauth_token, bot_username,
|
||||
|
|
@ -1329,6 +1368,9 @@ def handle_commandlog_task(username: str, args: List[str], channel: str, oauth_t
|
|||
if filter_type:
|
||||
filtered_log = [entry for entry in filtered_log if entry['type'] == filter_type]
|
||||
|
||||
# Exclude the 'commandlog' command itself from the results
|
||||
filtered_log = [entry for entry in filtered_log if entry['command'] != 'commandlog']
|
||||
|
||||
if not filtered_log:
|
||||
filters = []
|
||||
if filter_user:
|
||||
|
|
@ -1432,7 +1474,6 @@ def handle_clearlog_task(username: str, args: List[str], channel: str, oauth_tok
|
|||
import shelve
|
||||
|
||||
# Only allow admins to clear the log
|
||||
from src.core.twitch import is_admin
|
||||
if not is_admin(username, channel):
|
||||
message = f"@{username}, only channel admins can clear the command log"
|
||||
send_twitch_message(channel, oauth_token, bot_username, message)
|
||||
|
|
@ -1481,7 +1522,6 @@ def handle_allcommands_task(username: str, args: List[str], channel: str, oauth_
|
|||
categories = get_commands_by_category()
|
||||
|
||||
# Check if user is admin
|
||||
from src.core.twitch import is_admin
|
||||
is_user_admin = is_admin(username, channel)
|
||||
|
||||
# First message as header
|
||||
|
|
@ -1527,7 +1567,6 @@ register_command("allcommands", handle_allcommands_task,
|
|||
@huey.task()
|
||||
def handle_setgame_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
"""Handler for !setgame command to change the active game configuration."""
|
||||
from src.core.twitch import is_admin
|
||||
if not is_admin(username, channel):
|
||||
message = f"@{username}, only channel admins can change the active game."
|
||||
send_twitch_message(channel, oauth_token, bot_username, message)
|
||||
|
|
@ -1587,6 +1626,214 @@ def handle_setgame_task(username: str, args: List[str], channel: str, oauth_toke
|
|||
register_command("setgame", handle_setgame_task,
|
||||
"Set the active game for control configurations (admin only). Usage: !setgame <game_name>", "admin")
|
||||
|
||||
# ---- ADVANCED FEATURE COMMANDS (HUEY TASKS) ----
|
||||
|
||||
def _get_advanced_cmd_config_from_shelve() -> Optional[Dict[str, Any]]:
|
||||
"""Load advanced command configuration from shelve."""
|
||||
import shelve
|
||||
try:
|
||||
with shelve.open(SHELVE_FILE) as db:
|
||||
return db.get(SETTINGS_KEY_ADV_CMD_CONFIG)
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading advanced command config from shelve: {e}\\n{traceback.format_exc()}")
|
||||
return None
|
||||
|
||||
@huey.task()
|
||||
def handle_poll_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
adv_cmd_config = _get_advanced_cmd_config_from_shelve()
|
||||
if not adv_cmd_config:
|
||||
logger.error("Advanced commands config not found in shelve for handle_poll_task.")
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the poll command is currently not available (server configuration error).")
|
||||
return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'poll'}
|
||||
|
||||
adv_examples_instance = AdvancedExamples(
|
||||
send_message_func=send_twitch_message,
|
||||
is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])),
|
||||
bot_username=adv_cmd_config["bot_username"],
|
||||
channel=adv_cmd_config["channel"],
|
||||
admin_users=adv_cmd_config.get("admin_users", []),
|
||||
use_queue=True,
|
||||
shelve_file_key_prefix="advfeat_q" # Keep queue-specific prefix for its own state
|
||||
)
|
||||
try:
|
||||
# Pass the oauth_token received by the task to the method
|
||||
adv_examples_instance.cmd_start_poll(username, args, oauth_token)
|
||||
log_command(username, "poll", "advanced", True, args)
|
||||
return {
|
||||
'success': True, 'username': username, 'command': 'poll', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in handle_poll_task: {e}\\n{traceback.format_exc()}")
|
||||
log_command(username, "poll", "advanced", False, args)
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while trying to start the poll.")
|
||||
return {
|
||||
'success': False, 'error': str(e), 'username': username, 'command': 'poll', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
|
||||
@huey.task()
|
||||
def handle_vote_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
adv_cmd_config = _get_advanced_cmd_config_from_shelve()
|
||||
if not adv_cmd_config:
|
||||
logger.error("Advanced commands config not found in shelve for handle_vote_task.")
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the vote command is currently not available (server configuration error).")
|
||||
return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'vote'}
|
||||
|
||||
adv_examples_instance = AdvancedExamples(
|
||||
send_message_func=send_twitch_message,
|
||||
is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])),
|
||||
bot_username=adv_cmd_config["bot_username"],
|
||||
channel=adv_cmd_config["channel"],
|
||||
admin_users=adv_cmd_config.get("admin_users", []),
|
||||
use_queue=True,
|
||||
shelve_file_key_prefix="advfeat_q"
|
||||
)
|
||||
try:
|
||||
adv_examples_instance.cmd_vote(username, args, oauth_token)
|
||||
log_command(username, "vote", "advanced", True, args)
|
||||
return {
|
||||
'success': True, 'username': username, 'command': 'vote', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in handle_vote_task: {e}\\n{traceback.format_exc()}")
|
||||
log_command(username, "vote", "advanced", False, args)
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while casting your vote.")
|
||||
return {
|
||||
'success': False, 'error': str(e), 'username': username, 'command': 'vote', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
|
||||
@huey.task()
|
||||
def handle_endpoll_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
adv_cmd_config = _get_advanced_cmd_config_from_shelve()
|
||||
if not adv_cmd_config:
|
||||
logger.error("Advanced commands config not found in shelve for handle_endpoll_task.")
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the endpoll command is currently not available (server configuration error).")
|
||||
return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'endpoll'}
|
||||
|
||||
adv_examples_instance = AdvancedExamples(
|
||||
send_message_func=send_twitch_message,
|
||||
is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])),
|
||||
bot_username=adv_cmd_config["bot_username"],
|
||||
channel=adv_cmd_config["channel"],
|
||||
admin_users=adv_cmd_config.get("admin_users", []),
|
||||
use_queue=True,
|
||||
shelve_file_key_prefix="advfeat_q"
|
||||
)
|
||||
try:
|
||||
adv_examples_instance.cmd_end_poll(username, args, oauth_token)
|
||||
log_command(username, "endpoll", "advanced", True, args)
|
||||
return {
|
||||
'success': True, 'username': username, 'command': 'endpoll', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in handle_endpoll_task: {e}\\n{traceback.format_exc()}")
|
||||
log_command(username, "endpoll", "advanced", False, args)
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while trying to end the poll.")
|
||||
return {
|
||||
'success': False, 'error': str(e), 'username': username, 'command': 'endpoll', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
|
||||
@huey.task()
|
||||
def handle_points_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
adv_cmd_config = _get_advanced_cmd_config_from_shelve()
|
||||
if not adv_cmd_config:
|
||||
logger.error("Advanced commands config not found in shelve for handle_points_task.")
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the points command is currently not available (server configuration error).")
|
||||
return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'points'}
|
||||
|
||||
adv_examples_instance = AdvancedExamples(
|
||||
send_message_func=send_twitch_message,
|
||||
is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])),
|
||||
bot_username=adv_cmd_config["bot_username"],
|
||||
channel=adv_cmd_config["channel"],
|
||||
admin_users=adv_cmd_config.get("admin_users", []),
|
||||
use_queue=True,
|
||||
shelve_file_key_prefix="advfeat_q"
|
||||
)
|
||||
try:
|
||||
adv_examples_instance.cmd_points(username, args, oauth_token)
|
||||
log_command(username, "points", "advanced", True, args)
|
||||
return {
|
||||
'success': True, 'username': username, 'command': 'points', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in handle_points_task: {e}\\n{traceback.format_exc()}")
|
||||
log_command(username, "points", "advanced", False, args)
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while fetching points.")
|
||||
return {
|
||||
'success': False, 'error': str(e), 'username': username, 'command': 'points', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
|
||||
@huey.task()
|
||||
def handle_givepoints_task(username: str, args: List[str], channel: str, oauth_token: str, bot_username: str) -> Dict[str, Any]:
|
||||
adv_cmd_config = _get_advanced_cmd_config_from_shelve()
|
||||
if not adv_cmd_config:
|
||||
logger.error("Advanced commands config not found in shelve for handle_givepoints_task.")
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, the givepoints command is currently not available (server configuration error).")
|
||||
return {'success': False, 'error': 'Advanced commands not configured', 'username': username, 'command': 'givepoints'}
|
||||
|
||||
adv_examples_instance = AdvancedExamples(
|
||||
send_message_func=send_twitch_message,
|
||||
is_admin_func=lambda u, c, admins: is_admin(u, adv_cmd_config.get("channel"), adv_cmd_config.get("admin_users", [])),
|
||||
bot_username=adv_cmd_config["bot_username"],
|
||||
channel=adv_cmd_config["channel"],
|
||||
admin_users=adv_cmd_config.get("admin_users", []),
|
||||
use_queue=True,
|
||||
shelve_file_key_prefix="advfeat_q"
|
||||
)
|
||||
try:
|
||||
adv_examples_instance.cmd_give_points(username, args, oauth_token)
|
||||
log_command(username, "givepoints", "admin", True, args)
|
||||
return {
|
||||
'success': True, 'username': username, 'command': 'givepoints', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in handle_givepoints_task: {e}\\n{traceback.format_exc()}")
|
||||
log_command(username, "givepoints", "admin", False, args)
|
||||
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, an error occurred while giving points.")
|
||||
return {
|
||||
'success': False, 'error': str(e), 'username': username, 'command': 'givepoints', 'timestamp': time.time(), 'already_logged': True
|
||||
}
|
||||
|
||||
def setup_advanced_commands_config(bot_username_main: str, channel_main: str, admin_users_main: List[str], oauth_token_main: str) -> None:
|
||||
"""Sets up the advanced command configuration in shelve for the consumer to load."""
|
||||
import shelve
|
||||
config_to_save = {
|
||||
"bot_username": bot_username_main,
|
||||
"channel": channel_main,
|
||||
"admin_users": admin_users_main if admin_users_main else [],
|
||||
"oauth_token": oauth_token_main # This might be stale, tasks get fresh tokens
|
||||
}
|
||||
try:
|
||||
with shelve.open(SHELVE_FILE) as db:
|
||||
db[SETTINGS_KEY_ADV_CMD_CONFIG] = config_to_save
|
||||
logger.info(f"Saved advanced command configuration to shelve: {config_to_save}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save advanced command config to shelve: {e}\\n{traceback.format_exc()}")
|
||||
|
||||
def _load_and_register_advanced_commands() -> None:
|
||||
"""Loads config from shelve and registers advanced commands. Called at module import by consumer."""
|
||||
adv_cmd_config = _get_advanced_cmd_config_from_shelve()
|
||||
if adv_cmd_config:
|
||||
logger.info(f"Advanced command config loaded by consumer: {adv_cmd_config}")
|
||||
|
||||
# Check if all necessary keys are present, although tasks will re-check
|
||||
if not all(k in adv_cmd_config for k in ["bot_username", "channel"]):
|
||||
logger.error("Consumer loaded incomplete advanced command config. Advanced commands may fail.")
|
||||
return
|
||||
|
||||
register_command("poll", handle_poll_task, "Start a poll. Usage: !poll \\\"Question\\\" \\\"Opt1\\\" \\\"Opt2\\\"...", "advanced")
|
||||
register_command("vote", handle_vote_task, "Vote in an active poll. Usage: !vote <number>", "advanced")
|
||||
register_command("endpoll", handle_endpoll_task, "End the current poll and show results.", "advanced")
|
||||
register_command("points", handle_points_task, "Check your or another user's points. Usage: !points [@user]", "advanced")
|
||||
register_command("givepoints", handle_givepoints_task, "Give points to a user (admin). Usage: !givepoints <@user> <amount>", "admin")
|
||||
|
||||
logger.info("Advanced commands (poll, vote, etc.) registered by consumer process.")
|
||||
else:
|
||||
logger.info("Consumer: No advanced command configuration found in shelve. Advanced commands will not be available via queue.")
|
||||
|
||||
# Call this function at module level to register commands when consumer starts
|
||||
_load_and_register_advanced_commands()
|
||||
|
||||
# Function to start the consumer directly (alternative to command line)
|
||||
def start_consumer() -> None:
|
||||
"""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue