commit ec1c5958ceeffb833469aaca8938fb255b57adb8 Author: Joey Yakimowich-Payne Date: Wed May 14 17:45:07 2025 -0600 Initial working version diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d903ae8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,179 @@ +.env +.venv +default_config.json +data +*.db +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc diff --git a/README.md b/README.md new file mode 100644 index 0000000..667f438 --- /dev/null +++ b/README.md @@ -0,0 +1,388 @@ +# Twitch Chat Interaction Bot + +A simple Python bot that connects to Twitch chat and performs actions based on user commands. + +## Features + +- Easy connection to Twitch chat via IRC +- Command-based interaction system +- Simple API for adding custom commands +- Example commands included +- Queue system for asynchronous command processing +- Robust authentication handling with token management +- Advanced features (optional): + - Interactive polls and voting system + - Timer system for scheduled messages + - Points system for viewers + - Sound effects (Windows only) +- Game control (optional): + - Control games via Twitch chat + - Direct control mode for immediate actions + - Voting mode for democratic game control + - User stats tracking for game commands + - Streamer override mode for exclusive control + - Multiple input methods: + - Keyboard and mouse simulation + - Virtual Xbox controller emulation (Windows only) + +## Requirements + +- Python 3.6+ +- Socket library (included in Python standard library) +- Requests library (`pip install requests`) for Twitch authentication +- For keyboard/mouse control: pynput library (`pip install pynput`) +- For Xbox controller emulation: vgamepad library (`pip install vgamepad`) - Windows only +- For queue functionality: huey library (`pip install huey`) + +## Setup + +1. Clone this repository: + ``` + git clone https://github.com/yourusername/stream-interact.git + cd stream-interact + ``` + +2. Install dependencies: + ``` + pip install -r requirements.txt + ``` + +3. Set up your Twitch API credentials: + - Go to [https://dev.twitch.tv/console/apps](https://dev.twitch.tv/console/apps) + - Create a new application (or use an existing one) + - Generate a client secret + - Note your client ID and client secret + - Run the setup script to generate authentication tokens: + ``` + python scripts/setup_twitch_auth.py + ``` + - For detailed authentication setup instructions, see [Authentication Setup Guide](docs/twitch_auth_setup.md) + +4. Set your environment variables: + ``` + # On Windows + set TWITCH_USERNAME=your_bot_username + set TWITCH_CLIENT_ID=your_client_id + set TWITCH_CLIENT_SECRET=your_client_secret + set TWITCH_CHANNEL=your_channel_name + set TWITCH_ADMIN_USERS=bot_username,mod_username + + # On Linux/Mac + export TWITCH_USERNAME=your_bot_username + export TWITCH_CLIENT_ID=your_client_id + export TWITCH_CLIENT_SECRET=your_client_secret + export TWITCH_CHANNEL=your_channel_name + export TWITCH_ADMIN_USERS=bot_username,mod_username + ``` + + Alternatively, you can edit the `main.py` file and set these values directly. + +5. Configuration file: + You can also create a `default_config.json` file in the root directory with the following structure: + + ```json + { + "admin_users": ["bot_username", "mod_username"], + "cooldown": 2.0, + "game_mode": "direct", + "input_type": "keyboard", + "use_queue": false, + "token_cache_file": "data/cache/token_cache.json", + "features": { + "advanced": false, + "game_control": false + } + } + ``` + +6. Run the bot: + ``` + # Basic functionality + python main.py + + # With advanced features + python main.py --advanced + + # With game control (keyboard & mouse, direct mode) + python main.py --game-control + + # With game control (Xbox controller, direct mode) + python main.py --game-control --input-type controller + + # With game control (voting mode) + python main.py --game-control --game-mode vote + + # With game control (Xbox controller, voting mode) + python main.py --game-control --game-mode vote --input-type controller + + # With both advanced features and game control + python main.py --advanced --game-control + + # Customize cooldown time (in seconds) + python main.py --game-control --cooldown 5.0 + + # With queue system for processing commands + python main.py --use-queue + + # With additional admin users + python main.py --admin-users bot_username,mod_username + + # With custom config file + python main.py --config my_config.json + ``` + +## Admin Users + +Admin users have the same permissions as the channel owner. This is useful if: +- You have a separate bot account that will be used on your channel +- You want to give moderators special privileges +- You're running the bot for another streamer + +Admin users can: +- Stop the bot with !stop +- Check token status with !token +- Take control of the game with !takeover +- Start and end voting with !startvote and !endvote +- End polls with !endpoll +- Give points to users with !give + +Admin users can be configured in three ways (in order of priority): +1. Command line: `--admin-users bot_username,mod_username` +2. Environment variable: `TWITCH_ADMIN_USERS=bot_username,mod_username` +3. Config file: `"admin_users": ["bot_username", "mod_username"]` in default_config.json + +## Basic Commands + +The bot comes with several example commands: + +- `!hello` - The bot will say hello back to the user +- `!dice [sides]` - Roll a dice with the specified number of sides (default: 6) +- `!echo [message]` - The bot will echo back the provided message +- `!8ball` - Ask the Magic 8-Ball a question and get a random answer + +## Advanced Features + +When running with the `--advanced` flag, these additional commands become available: + +### Voting System +- `!poll "Question" "Option1" "Option2" ["Option3"...]` - Start a new poll +- `!vote ` - Vote in the active poll +- `!endpoll` - End the active poll and show results + +### Timer System +- `!timer ` - Set a timer to send a message after specified seconds + +### Points System +- `!points [@user]` - Check points for yourself or another user +- `!give ` - Give points to a user (channel owner only) + +### Sound Effects (Windows only) +- `!sound ` - Play a sound effect + - Add .wav files to the `sounds` directory to use this feature + +### Queue System +For high-volume chats or intensive commands, you can use the queue system: + +``` +# Start the bot with queue enabled +python main.py --use-queue + +# Start a consumer in a separate terminal to process tasks +python -m huey.bin.huey_consumer src.queue.server.huey +``` + +For detailed documentation on the queue system, see [Queue System Guide](docs/queue.md) + +## Game Control + +The game control feature allows Twitch chat to control a game via keyboard/mouse inputs or a virtual Xbox controller. + +### General Commands + +- `!gamehelp` - Show available game control commands +- `!gamestats [@user]` - Check game control stats for a user + +### Keyboard & Mouse Mode + +When using keyboard & mouse mode (`--input-type keyboard`), the following commands are available: + +- `!up` - Press W key (move up/forward) +- `!down` - Press S key (move down/backward) +- `!left` - Press A key (move left) +- `!right` - Press D key (move right) +- `!jump` - Press Space bar (jump) +- `!attack` - Press left mouse button (attack) +- `!interact` - Press E key (interact) +- `!inventory` - Press I key (inventory) +- `!skill1` - Press 1 key (skill 1) +- `!skill2` - Press 2 key (skill 2) +- `!skill3` - Press 3 key (skill 3) +- `!ultimate` - Press R key (ultimate ability) + +### Xbox Controller Mode (Windows only) + +When using controller mode (`--input-type controller`), the following commands are available: + +- Movement: + - `!up` - Move left stick up (forward) + - `!down` - Move left stick down (backward) + - `!left` - Move left stick left + - `!right` - Move left stick right + +- Camera: + - `!look_up` - Move right stick up + - `!look_down` - Move right stick down + - `!look_left` - Move right stick left + - `!look_right` - Move right stick right + +- Action Buttons: + - `!jump` - Press A button + - `!action` - Press B button + - `!interact` - Press X button + - `!menu` - Press Y button + +- Shoulder Buttons & Triggers: + - `!block` - Press left shoulder (LB) + - `!attack` - Press right shoulder (RB) + - `!aim` - Press left trigger (LT) + - `!shoot` - Press right trigger (RT) + +- D-pad: + - `!item1` - Press D-pad up + - `!item2` - Press D-pad right + - `!item3` - Press D-pad down + - `!item4` - Press D-pad left + +- Menu Buttons: + - `!start` - Press start button + - `!select` - Press select/back button + +### Direct Mode + +In direct mode, commands are executed immediately when received. Each user has a cooldown between commands to prevent spam. + +### Vote Mode + +In vote mode, commands are collected through voting: + +- `!startvote [seconds]` - Start a vote session (streamer only) +- `!vote ` - Vote for a specific command +- `!endvote` - End the current vote session (streamer only) + +After the voting period, the command with the most votes is executed. + +### Streamer Override Mode + +The streamer override feature allows the channel owner to take exclusive control of the game when needed: + +- `!takeover` - Channel owner takes exclusive control (only their commands will work) +- `!givecontrol` - Return control to the chat (viewers can use commands again) + +When streamer override is active: +- Only the channel owner's commands will be executed +- The channel owner bypasses all cooldowns that apply to viewers +- If there's an active vote, it will be cancelled +- Viewers cannot start new votes until the streamer gives control back + +#### Controller Redirection + +When using controller mode (`--input-type controller`), the streamer override feature includes physical controller redirection: + +- When `!takeover` is used, the streamer's physical controller inputs are automatically redirected to the virtual controller +- This allows the streamer to directly control the game using their own controller during takeover +- The redirection ends automatically when `!givecontrol` is used +- Requires a physical controller to be connected to the PC running the bot + +Requirements for controller redirection: +- pygame library (`pip install pygame`) +- A physical controller connected to the PC +- Windows operating system (due to vgamepad dependency) + +This feature is particularly useful when the streamer needs to: +- Quickly navigate a challenging section of a game +- Demonstrate something specific to viewers +- Override the chat's decisions temporarily + +### Customizing Game Controls + +You can customize the key/button mappings in the code: + +- For keyboard/mouse controls, edit the keyboard commands in `game_control.py` +- For Xbox controller, edit the controller commands in `game_control.py` + +## Adding Custom Commands + +You can easily add your own commands by creating a handler function and registering it with the bot: + +```python +def my_custom_command(username, args, bot): + # Do something interesting + bot.send_message(f"@{username}, your command was processed!") + +# In main.py +bot.register_command("mycommand", my_custom_command) +``` + +Users can then trigger this command by typing `!mycommand` in the Twitch chat. + +## Advanced Usage + +You can extend the TwitchBot class to add more functionality, such as: + +- User permission levels +- Cooldowns for commands +- Custom events (subscriptions, follows, etc.) +- Integration with other APIs + +### Queue System + +For high-traffic channels or resource-intensive commands, the queue system allows you to process messages and commands asynchronously: + +- Moves command execution to a background process +- Prevents the bot from getting overwhelmed during high activity +- Allows for distributed processing across multiple machines +- Provides statistics and monitoring capabilities + +See [Queue System Documentation](docs/queue.md) for detailed setup and usage instructions. + +## Authentication + +The bot uses Twitch's Client Credentials Grant Flow for authentication: + +1. Create a Twitch application at [dev.twitch.tv/console/apps](https://dev.twitch.tv/console/apps) +2. Get your Client ID and generate a Client Secret +3. Set these as environment variables: + ``` + TWITCH_CLIENT_ID=your_client_id + TWITCH_CLIENT_SECRET=your_client_secret + ``` +4. Run the authentication setup script: + ``` + python scripts/setup_twitch_auth.py + ``` + This will guide you through the authentication process and store your tokens. + +For detailed authentication setup instructions, see [Authentication Setup Guide](docs/twitch_auth_setup.md) + +### Token Management + +The bot uses a token cache file to minimize API requests. When you run the bot: + +1. It checks for a cached token in `twitch_token_cache.json` +2. If a valid cached token exists, it uses that token +3. If no valid token is found, it requests a new one from Twitch +4. The new token is saved to the cache file for future use + +The token cache can be configured with these options: + +``` +# Use a custom token cache location +python main.py --token-cache /path/to/your/cache.json + +# Check token status while the bot is running (channel owner only) +!token +``` + +## License + +This project is open source and available under the MIT License. \ No newline at end of file diff --git a/config/.env.example b/config/.env.example new file mode 100644 index 0000000..395df55 --- /dev/null +++ b/config/.env.example @@ -0,0 +1,13 @@ +# Twitch API credentials +# You can obtain these from https://dev.twitch.tv/console/apps +TWITCH_USERNAME=your_bot_username +TWITCH_CLIENT_ID=your_client_id +TWITCH_CLIENT_SECRET=your_client_secret +TWITCH_CHANNEL=your_channel +TWITCH_ADMIN_USERS=user1,user2 + +# Optional settings +# QUEUE_ENABLED=false +# GAME_CONTROL_ENABLED=true +# INPUT_TYPE=keyboard +# LOG_LEVEL=INFO \ No newline at end of file diff --git a/config/default_config.json.example b/config/default_config.json.example new file mode 100644 index 0000000..5fb697f --- /dev/null +++ b/config/default_config.json.example @@ -0,0 +1,41 @@ +{ + "twitch": { + "username": "", + "channel": "", + "client_id": "", + "client_secret": "", + "token_cache_file": "data/cache/token_cache.json" + }, + "queue": { + "enabled": false, + "db_file": "twitch_queue.db", + "auto_start_consumer": false + }, + "game_control": { + "enabled": false, + "mode": "direct", + "cooldown": 2.0, + "input_type": "keyboard", + "commands": { + "up": "w", + "down": "s", + "left": "a", + "right": "d", + "jump": "space", + "attack": "j", + "interact": "e" + } + }, + "features": { + "advanced": false, + "sounds_directory": "data/sounds", + "points_per_message": 1, + "points_per_minute_watched": 5, + "voting_duration": 30 + }, + "logging": { + "level": "INFO", + "file": "stream-interact.log", + "console": true + } +} diff --git a/docs/queue.md b/docs/queue.md new file mode 100644 index 0000000..99fcea4 --- /dev/null +++ b/docs/queue.md @@ -0,0 +1,94 @@ +# Twitch Queue System + +This README explains how to use the Huey-based queue system with your Twitch bot. + +## Overview + +The queue system allows your Twitch bot to process commands and messages asynchronously. This is useful for: + +- Handling high-volume chat without performance issues +- Processing commands that take time to complete +- Distributing processing across multiple processes or machines +- Maintaining history of commands for analytics + +## Components + +The queue system consists of three main components: + +1. **Queue Server**: Defined in `src/queue/server.py`, this establishes the Huey queue with SQLite backend +2. **Consumer**: Processes tasks from the queue (can run in a separate process) +3. **Producer**: Your Twitch bot that sends tasks to the queue + +## Setup + +1. Make sure you have Huey installed: + ```bash + pip install huey + ``` + +2. Enable the queue in your bot by using the `--use-queue` flag: + ```bash + python main.py --use-queue + ``` + +3. Start a consumer to process tasks (in a separate terminal): + ```bash + # Option 1: Use the Huey consumer CLI + python -m huey.bin.huey_consumer src.queue.server.huey + + # Option 2: Use our simple wrapper + python scripts/run_consumer.py + + # Option 3: Use the bot with built-in consumer (not recommended for production) + python main.py --use-queue --start-consumer + ``` + +## How It Works + +1. When `--use-queue` is enabled, the bot sends all commands to the queue instead of processing them directly. +2. Each command becomes a task in the SQLite database. +3. The consumer process picks up tasks and executes them in the background. +4. Results can be stored and retrieved later. + +## Commands + +The queue system adds the following command to the bot: + +- `!qstats` - Display statistics about the queue (processed tasks, pending tasks, etc.) + +## Scaling + +For high-volume applications: + +1. Run multiple consumers to process tasks in parallel: + ```bash + python -m huey.bin.huey_consumer src.queue.server.huey -w 4 # 4 worker processes + ``` + +2. Consider using Redis instead of SQLite for better performance: + (Requires modifying `src/queue/server.py` to use RedisHuey instead of SqliteHuey) + +## Architecture Diagram + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ │ │ │ │ │ +│ Twitch Chat │────▶│ Twitch Bot │────▶│ Queue (SQLite) │ +│ │ │ (Producer) │ │ │ +└─────────────────┘ └─────────────────┘ └────────┬────────┘ + │ + ▼ + ┌─────────────────────┐ + │ │ + │ Huey Consumer(s) │ + │ │ + └─────────────────────┘ +``` + +## Customization + +You can customize the queue system by: + +1. Adding more task types in `src/queue/server.py` +2. Creating custom handlers for processed tasks +3. Implementing result handling in the bot \ No newline at end of file diff --git a/docs/twitch_auth_setup.md b/docs/twitch_auth_setup.md new file mode 100644 index 0000000..8111f53 --- /dev/null +++ b/docs/twitch_auth_setup.md @@ -0,0 +1,118 @@ +# Twitch Authentication Setup + +This document explains how to set up Twitch authentication for the application. + +## Prerequisites + +1. You need a Twitch account +2. You need to register a Twitch application at [Twitch Developer Console](https://dev.twitch.tv/console/apps) +3. Required Python packages: + - `cryptography` - For generating SSL certificates + - `requests` - For making HTTP requests to the Twitch API + - `python-dotenv` - For managing environment variables + +You can install these dependencies using the provided script: + +```bash +python scripts/install_auth_deps.py +``` + +## Registering a Twitch Application + +1. Go to [Twitch Developer Console](https://dev.twitch.tv/console/apps) +2. Click "Register Your Application" +3. Fill in the details: + - **Name**: Choose a name for your application (e.g., "MyStreamInteract") + - **OAuth Redirect URLs**: Set to `https://localhost:3000` (note: HTTPS is required) + - **Category**: Choose "Chat Bot" +4. Click "Create" +5. Once created, click "Manage" for your application +6. Make note of your **Client ID** +7. Generate a **Client Secret** and store it securely + +## Setting Up Authentication + +There are two ways to authenticate: + +### Method 1: Using the Setup Script (Recommended) + +We provide a setup script that walks you through the authentication process: + +```bash +python scripts/setup_twitch_auth.py +``` + +The script will: +1. Ask for your Client ID and Client Secret +2. Generate a self-signed SSL certificate for HTTPS (required by Twitch) +3. Open a browser window for authentication +4. Store the tokens securely +5. Update your .env file with the credentials + +> **Note:** When the browser opens, you will see a security warning about the self-signed certificate. +> This is normal for local development. You need to proceed past this warning to continue the authentication process. +> In Chrome, click "Advanced" and then "Proceed to localhost (unsafe)". + +If you want to force the generation of a new token (ignoring any cached ones): + +```bash +python scripts/setup_twitch_auth.py --force-new-token +``` + +### Method 2: Manual Setup + +1. Create or edit a `.env` file in the root directory of the project +2. Add the following information: + +``` +TWITCH_CLIENT_ID=your_client_id_here +TWITCH_CLIENT_SECRET=your_client_secret_here +TWITCH_USERNAME=your_twitch_username +TWITCH_CHANNEL=channel_to_connect_to +``` + +3. Run the authentication script: + +```bash +python scripts/setup_twitch_auth.py --manual +``` + +4. Follow the instructions to complete authentication + +## Windows-Specific Information + +On Windows, the application uses the Python `cryptography` library to generate self-signed certificates instead of relying on the OpenSSL command-line tool. This ensures better compatibility with Windows systems. + +If you encounter any issues with certificate generation on Windows: + +1. Make sure you have the latest version of pip: `python -m pip install --upgrade pip` +2. Try reinstalling the cryptography package: `pip install --force-reinstall cryptography` +3. Visual C++ Build tools might be required. If you get build errors, you may need to install the [Microsoft C++ Build Tools](https://visualstudio.microsoft.com/visual-cpp-build-tools/) + +## Important Notes + +- The application uses user access tokens with `chat:read` and `chat:edit` scopes +- IRC chat access requires a user access token (client credentials are not sufficient) +- Twitch requires HTTPS for OAuth redirects, so we use a self-signed certificate for local development +- The tokens are cached in `data/cache/token_cache.json` and refreshed automatically when needed +- Never share your Client Secret or access tokens + +## Self-Signed Certificates + +The application automatically generates a self-signed certificate for HTTPS, which is stored in `data/ssl/`. This certificate is used for handling OAuth redirects from Twitch. + +If you encounter certificate issues, you can delete the files in `data/ssl/` to generate a new certificate: +- `data/ssl/localhost.crt` +- `data/ssl/localhost.key` + +## Troubleshooting + +If you encounter authentication issues: + +1. Make sure your Client ID and Client Secret are correct +2. Check that your OAuth redirect URL in the Twitch Developer Console is exactly `https://localhost:3000` (with HTTPS) +3. When prompted with certificate warnings in your browser, make sure to proceed past these warnings +4. Verify that your Twitch account has the necessary permissions +5. Delete the token cache file (`data/cache/token_cache.json`) and try again +6. Check the application logs for detailed error messages +7. If the browser doesn't open automatically, you can copy and paste the provided URL \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..e74d607 --- /dev/null +++ b/main.py @@ -0,0 +1,310 @@ +import os +import time +import random +import argparse +import json +from typing import List, Optional, Dict, Any, Tuple, Union, Literal +from src.core.twitch import TwitchBot, CommandCallback +from dotenv import load_dotenv + +load_dotenv() + +# Default config file path +DEFAULT_CONFIG_PATH = "default_config.json" + +# Example command handlers +def cmd_hello(username: str, args: List[str], bot: TwitchBot) -> None: + """Simple hello command that greets the user""" + bot.send_message(f"Hello, {username}!") + +def cmd_dice(username: str, args: List[str], bot: TwitchBot) -> None: + """Roll a dice with specified number of sides""" + sides: int = 6 # Default to 6-sided dice + + if args and args[0].isdigit(): + sides = int(args[0]) + + result: int = random.randint(1, sides) + bot.send_message(f"@{username} rolled a {result} (d{sides})") + +def cmd_echo(username: str, args: List[str], bot: TwitchBot) -> None: + """Echo back the user's message""" + if args: + message: str = " ".join(args) + bot.send_message(f"Echo: {message}") + else: + bot.send_message(f"@{username}, you didn't provide a message to echo!") + +def cmd_magic8ball(username: str, args: List[str], bot: TwitchBot) -> None: + """Magic 8-ball that gives random answers""" + responses: List[str] = [ + "It is certain.", + "It is decidedly so.", + "Without a doubt.", + "Yes, definitely.", + "You may rely on it.", + "As I see it, yes.", + "Most likely.", + "Outlook good.", + "Yes.", + "Signs point to yes.", + "Reply hazy, try again.", + "Ask again later.", + "Better not tell you now.", + "Cannot predict now.", + "Concentrate and ask again.", + "Don't count on it.", + "My reply is no.", + "My sources say no.", + "Outlook not so good.", + "Very doubtful." + ] + bot.send_message(f"@{username}, {random.choice(responses)}") + +def cmd_queue_stats(username: str, args: List[str], bot: TwitchBot) -> None: + """Show queue statistics""" + stats = bot.get_queue_stats() + bot.send_message(f"Queue stats: {stats['total_processed']}/{stats['total_enqueued']} processed, {stats['pending']} pending") + +def cmd_token_status(username: str, args: List[str], bot: TwitchBot) -> None: + """Show status of the Twitch API token (streamer only)""" + # Check if user is an admin + if not bot.is_admin(username): + bot.send_message(f"@{username}, only the channel owner or admins can check token status.") + return + + if bot.access_token and bot.token_expiry: + # Calculate time remaining + time_remaining = bot.token_expiry - time.time() + if time_remaining > 0: + hours = int(time_remaining // 3600) + minutes = int((time_remaining % 3600) // 60) + bot.send_message(f"@{username}, token is valid for {hours}h {minutes}m.") + else: + bot.send_message(f"@{username}, token is expired and will be refreshed on next use.") + else: + bot.send_message(f"@{username}, no token available. It will be generated when needed.") + +def cmd_stop(username: str, args: List[str], bot: TwitchBot) -> None: + """Stop the bot (streamer only)""" + # Check if user is an admin + if not bot.is_admin(username): + bot.send_message(f"@{username}, only the channel owner or admins can stop the bot.") + return + + bot.send_message(f"Bot is shutting down by request of @{username}.") + # Stop the bot + bot.stop() + +def main() -> None: + # Parse command line arguments + parser: argparse.ArgumentParser = argparse.ArgumentParser(description='Twitch Chat Bot') + parser.add_argument('--advanced', action='store_true', help='Enable advanced features') + parser.add_argument('--game-control', action='store_true', help='Enable game control features') + parser.add_argument('--game-mode', choices=['direct', 'vote'], default='direct', + help='Game control mode: direct (immediate commands) or vote (timed voting)') + parser.add_argument('--cooldown', type=float, default=2.0, + help='Cooldown time between commands in seconds (for direct mode)') + parser.add_argument('--input-type', choices=['keyboard', 'controller'], default='keyboard', + help='Input type: keyboard/mouse or Xbox controller emulation') + parser.add_argument('--use-queue', action='store_true', help='Use Huey task queue for processing commands') + parser.add_argument('--start-consumer', action='store_true', help='Start Huey consumer within the bot process') + parser.add_argument('--token-cache', type=str, default="data/cache/token_cache.json", + help='Path to token cache file (default: data/cache/token_cache.json)') + parser.add_argument('--admin-users', type=str, help='Comma-separated list of additional admin users who have the same permissions as the channel owner') + parser.add_argument('--config', type=str, default=DEFAULT_CONFIG_PATH, + help=f'Path to configuration file (default: {DEFAULT_CONFIG_PATH})') + args: argparse.Namespace = parser.parse_args() + + # Load configuration from file if it exists + config: Dict[str, Any] = {} + if os.path.exists(args.config): + try: + with open(args.config, 'r') as f: + config = json.load(f) + print(f"Loaded configuration from {args.config}") + except json.JSONDecodeError: + print(f"Error: {args.config} is not a valid JSON file") + except Exception as e: + print(f"Error loading config file: {e}") + + # Get Twitch credentials from environment variables + username: Optional[str] = os.environ.get("TWITCH_USERNAME") + client_id: Optional[str] = os.environ.get("TWITCH_CLIENT_ID") + client_secret: Optional[str] = os.environ.get("TWITCH_CLIENT_SECRET") + channel: Optional[str] = os.environ.get("TWITCH_CHANNEL") + + # Check for required credentials + if not all([username, client_id, client_secret, channel]): + print("Please set the following environment variables:") + print(" TWITCH_USERNAME - The bot's username") + print(" TWITCH_CLIENT_ID - Your app's registered client ID") + print(" TWITCH_CLIENT_SECRET - Your app's registered client secret") + print(" TWITCH_CHANNEL - The channel to join") + print("\nTo get client ID and secret:") + print("1. Go to https://dev.twitch.tv/console/apps") + print("2. Create a new application or use an existing one") + print("3. Get the client ID and generate a client secret") + + # For testing, you can uncomment and set these values + # username = "your_bot_username" + # client_id = "your_client_id" + # client_secret = "your_client_secret" + # channel = "your_channel" + + if not all([username, client_id, client_secret, channel]): + return + + # Type assertion to satisfy type checker (we checked these are not None above) + username = username if username is not None else "" + client_id = client_id if client_id is not None else "" + client_secret = client_secret if client_secret is not None else "" + channel = channel if channel is not None else "" + + # Parse admin users with priority: command line > env var > config file + admin_users: Optional[List[str]] = None + + # 1. From command line + if args.admin_users: + admin_users = [user.strip() for user in args.admin_users.split(",")] + print(f"Additional admin users from command line: {', '.join(admin_users)}") + + # 2. From environment variable + elif os.environ.get("TWITCH_ADMIN_USERS"): + env_admins = os.environ.get("TWITCH_ADMIN_USERS", "") + admin_users = [user.strip() for user in env_admins.split(",")] + print(f"Additional admin users from environment: {', '.join(admin_users)}") + + # 3. From config file + elif "admin_users" in config and isinstance(config["admin_users"], list): + admin_users = config["admin_users"] + print(f"Additional admin users from config file: {', '.join(admin_users)}") + + # Create bot instance + bot: TwitchBot = TwitchBot( + username=username, + client_id=client_id, + client_secret=client_secret, + channel=channel, + use_queue=args.use_queue, + token_cache_file=args.token_cache, + admin_users=admin_users + ) + + # Register basic commands + bot.register_command("hello", cmd_hello) + bot.register_command("dice", cmd_dice) + bot.register_command("echo", cmd_echo) + bot.register_command("8ball", cmd_magic8ball) + bot.register_command("token", cmd_token_status) + bot.register_command("stop", cmd_stop) + + # Add queue-related commands if using the queue + if args.use_queue: + bot.register_command("qstats", cmd_queue_stats) + + # Start the Huey consumer if requested + if args.start_consumer: + try: + from src.queue.server import start_consumer + import threading + + consumer_thread = threading.Thread(target=start_consumer) + consumer_thread.daemon = True + consumer_thread.start() + print("Started Huey consumer thread") + print("Note: For production, it's better to use: python -m huey.bin.huey_consumer src.queue.server.huey") + except ImportError as e: + print(f"Failed to start consumer: {e}") + + # Enable advanced features if requested + if args.advanced: + try: + from src.features.examples import AdvancedExamples + print("Enabling advanced features...") + advanced: AdvancedExamples = AdvancedExamples(bot) + print("Advanced features enabled: polls, voting, timers, points system") + if os.name == 'nt': + print("Sound effects enabled (Windows only)") + # Create sounds directory if it doesn't exist + if not os.path.exists("sounds"): + os.makedirs("sounds") + print("Created 'sounds' directory. Add .wav files to use with !sound command") + except ImportError as e: + print(f"Failed to import advanced features: {e}") + + # Enable game control if requested + if args.game_control: + try: + from src.game.controller import GameController, PYNPUT_AVAILABLE + + # Check if controller input is requested + if args.input_type == 'controller': + try: + from src.game.input.gamepad import VirtualController, VGAMEPAD_AVAILABLE + if not VGAMEPAD_AVAILABLE: + print("WARNING: vgamepad library not found. Install it with: pip install vgamepad") + print("Falling back to keyboard/mouse input.") + args.input_type = 'keyboard' + except ImportError: + print("WARNING: controller_support.py not found or error importing it.") + print("Falling back to keyboard/mouse input.") + args.input_type = 'keyboard' + + print(f"Enabling game control in {args.game_mode} mode with {args.input_type} input...") + + if args.input_type == 'keyboard' and not PYNPUT_AVAILABLE: + print("WARNING: pynput library not found. Install it with: pip install pynput") + print("Keyboard/mouse control will be simulated but not actually perform any actions.") + print("Run 'pip install pynput' to enable actual keyboard/mouse control.") + + game_controller: GameController = GameController( + bot, + mode=args.game_mode, + cooldown=args.cooldown, + input_type=args.input_type + ) + + if args.game_mode == "direct": + print(f"Commands will execute immediately with a {args.cooldown}s cooldown per user") + else: + print("Commands will be collected through voting") + print("Streamer can start a vote with !startvote and end with !endvote") + print("Users vote with !vote [command]") + + print("Use !gamehelp to see available game commands") + print("Check user stats with !gamestats [username]") + + except ImportError as e: + print(f"Failed to import game control features: {e}") + print("Make sure you have the required dependencies:") + print(" pip install pynput") + if args.input_type == 'controller': + print(" pip install vgamepad # For controller support") + + # Start the bot + print(f"Starting bot for channel #{channel}") + print("Basic commands: !hello, !dice [sides], !echo [message], !8ball") + print("Admin commands: !token (check token status)") + + if args.use_queue: + print("Queue mode enabled, commands will be processed by the queue system") + print("Use !qstats to see queue statistics") + + try: + # Make sure requests library is installed + try: + import requests + except ImportError: + print("ERROR: The requests library is required for the new authentication method.") + print("Install it with: pip install requests") + return + + # Now start the bot + bot.start(use_queue=args.use_queue) + except KeyboardInterrupt: + print("Bot stopped by user") + bot.stop() + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6de585c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,19 @@ +# No external dependencies required for basic functionality +# The bot uses only Python standard library modules. + +# Core dependencies +requests>=2.25.0 # For Twitch API authentication +huey>=2.4.0 # For background task queue +cryptography>=38.0.0 # For generating self-signed certificates + +# Game control dependencies (optional) +pynput>=1.7.0; sys_platform != "darwin" or python_version < "3.0" # For keyboard/mouse control +vgamepad>=0.0.8; sys_platform == "win32" # For Xbox controller emulation (Windows only) +pysdl2>=0.9.14 # For controller detection +pysdl2-dll>=2.28.0 # SDL2 binaries + +# Optional utilities +python-dotenv>=0.19.0 # For loading .env files + +# Type checking (optional, for development) +mypy>=0.910 \ No newline at end of file diff --git a/scripts/install_auth_deps.py b/scripts/install_auth_deps.py new file mode 100644 index 0000000..43832eb --- /dev/null +++ b/scripts/install_auth_deps.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +""" +Helper script to install authentication dependencies +""" +import sys +import subprocess +import platform + +def main(): + """Install the dependencies required for Twitch authentication""" + print("Installing dependencies for Twitch authentication...") + + # Required packages + packages = [ + "requests", + "python-dotenv", + "cryptography" + ] + + # Use pip to install the packages + try: + subprocess.check_call([sys.executable, "-m", "pip", "install"] + packages) + print("\nSuccessfully installed authentication dependencies!") + print("You can now run: python scripts/setup_twitch_auth.py") + except subprocess.CalledProcessError: + print("\nError: Failed to install dependencies.") + print("Please try installing them manually:") + print(" pip install requests python-dotenv cryptography") + sys.exit(1) + + # Show additional information for Windows users + if platform.system() == "Windows": + print("\nNote for Windows users:") + print("This script uses the 'cryptography' library for generating certificates,") + print("which has C dependencies that should be automatically installed.") + print("If you encounter any issues, please refer to the documentation:") + print("https://cryptography.io/en/latest/installation/") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/run_consumer.py b/scripts/run_consumer.py new file mode 100644 index 0000000..dcc2f07 --- /dev/null +++ b/scripts/run_consumer.py @@ -0,0 +1,21 @@ +""" +Script to run the queue consumer +""" +import sys +import logging +from src.queue.server import start_consumer + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger('consumer') + +if __name__ == "__main__": + logger.info("Starting queue consumer...") + try: + start_consumer() + except KeyboardInterrupt: + logger.info("Consumer stopped by user") + sys.exit(0) + except Exception as e: + logger.error(f"Error in consumer: {e}") + sys.exit(1) \ No newline at end of file diff --git a/scripts/setup_env.py b/scripts/setup_env.py new file mode 100644 index 0000000..73a95a7 --- /dev/null +++ b/scripts/setup_env.py @@ -0,0 +1,113 @@ +""" +Environment setup helper script +""" +import os +import sys +import shutil +from typing import List, Dict, Any, Optional +import subprocess +import platform + +def check_python_version() -> bool: + """Check if Python version is compatible""" + if sys.version_info < (3, 7): + print(f"Error: Python 3.7 or higher is required. You are using {platform.python_version()}") + return False + print(f"Python version OK: {platform.python_version()}") + return True + +def create_env_file() -> bool: + """Create a .env file from .env.example if it doesn't exist""" + example_path = os.path.join("config", ".env.example") + env_path = ".env" + + # Check if .env already exists + if os.path.exists(env_path): + print(f"Info: {env_path} file already exists") + return True + + # Check if .env.example exists + if not os.path.exists(example_path): + print(f"Creating default .env file (no example found at {example_path})") + with open(env_path, "w") as f: + f.write("# Twitch API credentials\n") + f.write("TWITCH_USERNAME=your_bot_username\n") + f.write("TWITCH_CLIENT_ID=your_client_id\n") + f.write("TWITCH_CLIENT_SECRET=your_client_secret\n") + f.write("TWITCH_CHANNEL=your_channel\n") + print(f"Created default {env_path} file") + return True + + # Copy .env.example to .env + try: + shutil.copy(example_path, env_path) + print(f"Created {env_path} file from {example_path}") + print(f"Please edit {env_path} to set your Twitch API credentials") + return True + except Exception as e: + print(f"Error creating .env file: {e}") + return False + +def install_dependencies() -> bool: + """Install Python dependencies from requirements.txt""" + req_path = "requirements.txt" + + if not os.path.exists(req_path): + print(f"Error: {req_path} not found") + return False + + try: + print(f"Installing dependencies from {req_path}...") + subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", req_path]) + print("Dependencies installed successfully") + return True + except subprocess.CalledProcessError as e: + print(f"Error installing dependencies: {e}") + return False + +def create_directories() -> bool: + """Create necessary directories if they don't exist""" + dirs = [ + "data/cache", + "data/sounds" + ] + + for directory in dirs: + if not os.path.exists(directory): + try: + os.makedirs(directory, exist_ok=True) + print(f"Created directory: {directory}") + except Exception as e: + print(f"Error creating directory {directory}: {e}") + return False + + return True + +def setup_environment() -> bool: + """ + Run all setup steps + Returns True if all steps succeeded + """ + steps = [ + check_python_version, + create_directories, + create_env_file, + install_dependencies + ] + + success = True + for step in steps: + if not step(): + success = False + + return success + +if __name__ == "__main__": + print("Setting up environment for stream-interact...") + if setup_environment(): + print("\nSetup completed successfully!") + print("\nTo run the bot, use: python main.py") + print("To see available options, use: python main.py --help") + else: + print("\nSetup completed with errors. Please check the messages above.") + sys.exit(1) diff --git a/scripts/setup_twitch_auth.py b/scripts/setup_twitch_auth.py new file mode 100644 index 0000000..4c6b8b8 --- /dev/null +++ b/scripts/setup_twitch_auth.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 +""" +Setup script for Twitch authentication + +This script helps users generate a Twitch user access token with proper IRC scopes. +""" +import os +import sys +import logging +import json +import argparse +import time +from dotenv import load_dotenv + +# Load environment variables if available +load_dotenv() + +# Add parent directory to the path so we can import our modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import the auth module +from src.core.auth import TwitchAuth + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger('twitch-auth-setup') + +# Path to default config +DEFAULT_CONFIG_PATH = os.path.join("data", "default_config.json") + +def load_default_config(): + """Load default configuration from JSON file""" + if os.path.exists(DEFAULT_CONFIG_PATH): + try: + with open(DEFAULT_CONFIG_PATH, 'r') as f: + return json.load(f) + except json.JSONDecodeError: + logger.warning(f"Invalid JSON in {DEFAULT_CONFIG_PATH}, ignoring") + return {} + +def save_default_config(config_data): + """Save configuration to default config JSON file""" + # Ensure directory exists + os.makedirs(os.path.dirname(DEFAULT_CONFIG_PATH), exist_ok=True) + + # If file exists, load current data and update it + existing_data = {} + if os.path.exists(DEFAULT_CONFIG_PATH): + try: + with open(DEFAULT_CONFIG_PATH, 'r') as f: + existing_data = json.load(f) + except json.JSONDecodeError: + logger.warning(f"Invalid JSON in {DEFAULT_CONFIG_PATH}, overwriting") + + # Update with new data + existing_data.update(config_data) + + # Save back to file + with open(DEFAULT_CONFIG_PATH, 'w') as f: + json.dump(existing_data, f, indent=2) + + logger.info(f"Configuration saved to {DEFAULT_CONFIG_PATH}") + +def setup_twitch_auth(client_id=None, client_secret=None, scopes=None, manual_mode=False, force_new_token=False, use_cached_token=False): + """ + Setup Twitch authentication by generating a user access token + + Args: + client_id: Twitch client ID + client_secret: Twitch client secret + scopes: Space-separated list of scopes + manual_mode: Whether to use manual mode (no browser) + force_new_token: Force generation of a new token even if a cached one exists + use_cached_token: Whether to use a cached token if available (default: False) + + Returns: + bool: True if successful, False otherwise + """ + try: + # Load default config + default_config = load_default_config() + + # Priority of values: + # 1. Function arguments + # 2. Environment variables + # 3. Default config + + # Use provided values, then env vars, then default config + client_id = client_id or os.environ.get('TWITCH_CLIENT_ID') or default_config.get('TWITCH_CLIENT_ID') + client_secret = client_secret or os.environ.get('TWITCH_CLIENT_SECRET') or default_config.get('TWITCH_CLIENT_SECRET') + + if not client_id: + client_id = input("Enter your Twitch Client ID: ") + + if not client_secret: + client_secret = input("Enter your Twitch Client Secret: ") + + if not client_id or not client_secret: + logger.error("Client ID and Client Secret are required") + return False + + # Default scopes for IRC chat - prioritize env var, then default config + default_scopes = "chat:read chat:edit" + scopes = scopes or os.environ.get('TWITCH_SCOPES') or default_config.get('TWITCH_SCOPES', default_scopes) + + # Define redirect URI + redirect_uri = os.environ.get('TWITCH_REDIRECT_URI') or default_config.get('TWITCH_REDIRECT_URI', "https://localhost:3000") + + print("\n===== Twitch Authentication Setup =====") + print(f"Client ID: {client_id}") + print(f"Redirect URI: {redirect_uri} (must match your Twitch app settings)") + print(f"Requested Scopes: {scopes}") + print("=====================================\n") + + # Make sure the cache directory exists + cache_dir = os.path.join("data", "cache") + os.makedirs(cache_dir, exist_ok=True) + + # Create the token cache file path + token_cache_file = os.path.join(cache_dir, "token_cache.json") + + # By default, clear any cached tokens unless explicitly told to use them + if (not use_cached_token or force_new_token) and os.path.exists(token_cache_file): + logger.info("Clearing cached token to generate fresh credentials") + os.remove(token_cache_file) + + # Create auth handler + auth = TwitchAuth( + client_id=client_id, + client_secret=client_secret, + token_cache_file=token_cache_file, + redirect_uri=redirect_uri, + scopes=scopes + ) + + # Check if a valid token is already available + has_valid_token = False + if use_cached_token and auth.access_token and auth.token_expiry and time.time() < (auth.token_expiry - 60): + has_valid_token = True + # Using cached token + if not force_new_token: + print("\nUsing cached token (valid and not expired).") + print("If you want to force a new token, run with --force-new-token") + try: + # Try to validate the token to make sure it's working + token_info = auth.validate_token() + print(f"Token belongs to user: {token_info.get('login', 'unknown')}") + expiry_hours = (auth.token_expiry - time.time()) / 3600 + print(f"Token expires in {expiry_hours:.1f} hours") + + # Skip browser flow and continue with cached token + oauth_token = f"oauth:{auth.access_token}" + except Exception as e: + logger.warning(f"Error validating cached token: {e}") + logger.warning("Will attempt to get a new token") + has_valid_token = False + + # If no valid token, go through the auth flow + if not has_valid_token or force_new_token: + # Show security warning about certificates + if not manual_mode: + print("\n⚠️ IMPORTANT SECURITY NOTE ⚠️") + print("This application uses a self-signed certificate for HTTPS, which is required by Twitch.") + print("When your browser opens, you will see a security warning.") + print("This is expected for local development. Please proceed to the site anyway:") + print(" • In Chrome: Click 'Advanced' and then 'Proceed to localhost (unsafe)'") + print(" • In Firefox: Click 'Advanced', then 'Accept the Risk and Continue'") + print(" • In Edge: Click 'Details' and then 'Go on to the webpage'\n") + print("NOTE: If the browser doesn't open automatically, a URL will be provided for you to copy and paste.") + input("Press Enter to continue...") + + # Start OAuth flow + logger.info("Starting OAuth authentication flow...") + oauth_token = auth.get_oauth_token(manual_auth=manual_mode) + + # At this point, we should have a valid token + if auth.access_token: + # Get token info + try: + token_info = auth.validate_token() + + print("\n===== Authentication Successful =====") + print(f"Access Token: {auth.access_token}") + if auth.refresh_token: + print(f"Refresh Token: {auth.refresh_token}") + print(f"Token Scopes: {token_info.get('scopes', [])}") + print(f"User Name: {token_info.get('login', 'unknown')}") + print("=====================================\n") + except Exception as e: + logger.error(f"Failed to validate token: {e}") + print("\n===== Authentication Status =====") + print(f"Access Token: {auth.access_token}") + if auth.refresh_token: + print(f"Refresh Token: {auth.refresh_token}") + print("Warning: Could not validate token") + print("============================\n") + + # Store token info in default_config.json + if input("Would you like to save these credentials to default_config.json? (y/n): ").lower() == 'y': + config_data = { + "TWITCH_CLIENT_ID": client_id, + "TWITCH_CLIENT_SECRET": client_secret, + } + + # Try to use the validated user info + if 'token_info' in locals(): + username = token_info.get('login', '') + config_data["TWITCH_USERNAME"] = username + + # Get channel name + default_channel = username if 'username' in locals() else os.environ.get('TWITCH_CHANNEL', '') or default_config.get('TWITCH_CHANNEL', '') + channel = input(f"Enter Twitch channel to join (default: {default_channel}): ") or default_channel + config_data["TWITCH_CHANNEL"] = channel + + # Save access and refresh tokens + config_data["TWITCH_ACCESS_TOKEN"] = auth.access_token + if auth.refresh_token: + config_data["TWITCH_REFRESH_TOKEN"] = auth.refresh_token + + # Save to default_config.json + save_default_config(config_data) + + return True + + else: + logger.error("Failed to get OAuth token") + return False + + except Exception as e: + logger.error(f"Error during setup: {e}") + return False + +def main(): + """Main function""" + parser = argparse.ArgumentParser(description="Setup Twitch authentication") + parser.add_argument("--client-id", help="Twitch Client ID") + parser.add_argument("--client-secret", help="Twitch Client Secret") + parser.add_argument("--scopes", help="Space-separated list of scopes") + parser.add_argument("--manual", action="store_true", help="Use manual mode (no browser)") + parser.add_argument("--force-new-token", action="store_true", help="Force generation of a new token even if a cached one exists") + parser.add_argument("--use-cached-token", action="store_true", help="Use cached token if available (default is to clear cached tokens)") + + args = parser.parse_args() + + if setup_twitch_auth(args.client_id, args.client_secret, args.scopes, args.manual, args.force_new_token, args.use_cached_token): + print("Setup completed successfully!") + sys.exit(0) + else: + print("Setup failed!") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..2094936 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,5 @@ +import os +import sys + +# Add the parent directory to Python's path so 'src' module can be found +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/auth.py b/src/core/auth.py new file mode 100644 index 0000000..bd3677a --- /dev/null +++ b/src/core/auth.py @@ -0,0 +1,293 @@ +""" +Authentication module for Twitch API +""" +import os +import time +import json +import logging +import requests +import webbrowser +from typing import Dict, Any, Optional, Tuple + +# Import the auth server functionality +from src.core.auth_server import start_oauth_flow + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class TwitchAuth: + """Handles Twitch API authentication and token management""" + + def __init__(self, client_id: str, client_secret: str = None, token_cache_file: str = "data/cache/token_cache.json", + redirect_uri: str = "https://localhost:3000", scopes: str = "chat:read chat:edit"): + self.client_id: str = client_id + self.client_secret: str = client_secret + self.token_cache_file: str = token_cache_file + self.redirect_uri: str = redirect_uri + self.scopes: str = scopes + self.access_token: Optional[str] = None + self.refresh_token: Optional[str] = None + self.token_expiry: Optional[float] = None + self._load_cached_token() + + def _load_cached_token(self) -> None: + """Load cached token from file if available and not expired""" + try: + if os.path.exists(self.token_cache_file): + with open(self.token_cache_file, 'r') as f: + cached_data = json.load(f) + + # Check if we have all required fields + if all(k in cached_data for k in ['access_token', 'expires_at', 'client_id']): + # Only use the token if it was generated with the same client_id + if cached_data['client_id'] == self.client_id: + # Check if token is still valid (with a safety margin) + if cached_data['expires_at'] > time.time() + 60: + self.access_token = cached_data['access_token'] + self.token_expiry = cached_data['expires_at'] + # Load refresh token if available + if 'refresh_token' in cached_data: + self.refresh_token = cached_data['refresh_token'] + logger.info(f"Loaded valid cached token, expires in {(self.token_expiry - time.time())/60:.1f} minutes") + else: + logger.info("Cached token is expired, will try to refresh") + if 'refresh_token' in cached_data: + self.refresh_token = cached_data['refresh_token'] + self._refresh_token() + else: + logger.info("No refresh token available, will need to re-authenticate") + else: + logger.info("Cached token was generated with a different client_id, will request a new one") + except Exception as e: + logger.warning(f"Error loading cached token: {e}") + + def _save_token_to_cache(self, token_data: Dict[str, Any]) -> None: + """Save token data to cache file""" + try: + # Ensure directory exists + os.makedirs(os.path.dirname(self.token_cache_file), exist_ok=True) + + cache_data = { + 'access_token': token_data['access_token'], + 'expires_at': time.time() + token_data['expires_in'] - 3600, # Expire 1 hour early to be safe + 'token_type': token_data['token_type'], + 'client_id': self.client_id, + 'cached_at': time.time() + } + + # Save refresh token if available + if 'refresh_token' in token_data: + cache_data['refresh_token'] = token_data['refresh_token'] + + with open(self.token_cache_file, 'w') as f: + json.dump(cache_data, f) + + logger.info(f"Saved token to cache file: {self.token_cache_file}") + except Exception as e: + logger.warning(f"Error saving token to cache: {e}") + + def _refresh_token(self) -> bool: + """ + Refresh the access token using the refresh token + + Returns: + bool: True if refresh was successful, False otherwise + """ + if not self.refresh_token: + logger.warning("No refresh token available") + return False + + if not self.client_id or not self.client_secret: + logger.warning("Client ID and Client Secret are required for token refresh") + return False + + token_url = "https://id.twitch.tv/oauth2/token" + payload = { + "client_id": self.client_id, + "client_secret": self.client_secret, + "grant_type": "refresh_token", + "refresh_token": self.refresh_token + } + + try: + response = requests.post(token_url, data=payload) + response.raise_for_status() + token_data = response.json() + + self.access_token = token_data["access_token"] + self.token_expiry = time.time() + token_data["expires_in"] - 3600 + + # Update refresh token if provided + if "refresh_token" in token_data: + self.refresh_token = token_data["refresh_token"] + + # Save updated tokens + self._save_token_to_cache(token_data) + logger.info(f"Successfully refreshed access token, valid for {token_data['expires_in']/3600:.1f} hours") + return True + + except Exception as e: + logger.error(f"Failed to refresh token: {e}") + # Clear refresh token on failure to force a new auth flow + self.refresh_token = None + return False + + def _handle_auth_code(self, auth_code: str) -> Tuple[bool, str]: + """ + Exchange authorization code for access and refresh tokens + + Args: + auth_code: The authorization code from OAuth redirect + + Returns: + Tuple[bool, str]: Success status and message + """ + if not self.client_id or not self.client_secret: + return False, "Client ID and Client Secret are required" + + token_url = "https://id.twitch.tv/oauth2/token" + payload = { + "client_id": self.client_id, + "client_secret": self.client_secret, + "code": auth_code, + "grant_type": "authorization_code", + "redirect_uri": self.redirect_uri + } + + try: + response = requests.post(token_url, data=payload) + response.raise_for_status() + token_data = response.json() + + self.access_token = token_data["access_token"] + self.token_expiry = time.time() + token_data["expires_in"] - 3600 + + if "refresh_token" in token_data: + self.refresh_token = token_data["refresh_token"] + + # Save token data + self._save_token_to_cache(token_data) + logger.info(f"Successfully obtained new tokens, valid for {token_data['expires_in']/3600:.1f} hours") + return True, "Authentication successful" + + except Exception as e: + logger.error(f"Failed to exchange authorization code: {e}") + return False, f"Failed to exchange authorization code: {str(e)}" + + def get_oauth_token(self, manual_auth: bool = False) -> str: + """ + Get OAuth token for Twitch authentication + + If no valid token is available: + 1. Try to refresh the token if a refresh token is available + 2. If refresh fails or no refresh token, require user auth + + Args: + manual_auth: If True, provide instructions for manual authentication + instead of automatic browser launch + + Returns: + str: The OAuth token with 'oauth:' prefix + + Raises: + ValueError: If authentication fails + """ + # Check if we have a valid token + if self.access_token and self.token_expiry and time.time() < (self.token_expiry - 60): + expiry_min = (self.token_expiry - time.time()) / 60 + logger.debug(f"Using cached token, expires in {expiry_min:.1f} minutes") + return f"oauth:{self.access_token}" + + # Try to refresh the token + if self.refresh_token and self._refresh_token(): + return f"oauth:{self.access_token}" + + # Need user authentication - use our auth server to handle the flow + logger.info("Starting OAuth authentication flow") + + if manual_auth: + # For environments where browser launch isn't possible + auth_url = ( + f"https://id.twitch.tv/oauth2/authorize" + f"?client_id={self.client_id}" + f"&redirect_uri={self.redirect_uri}" + f"&response_type=code" + f"&scope={self.scopes.replace(' ', '+')}" + ) + + logger.info(f"Please visit this URL to authenticate:") + logger.info(auth_url) + logger.info("After authenticating, you'll be redirected to a URL like:") + logger.info(f"{self.redirect_uri}?code=abcdef123456&scope=chat:read+chat:edit") + logger.info("Please enter the code parameter from that URL:") + auth_code = input("Enter code: ") + else: + # Use the auth server to handle the redirect + auth_code = start_oauth_flow( + client_id=self.client_id, + redirect_uri=self.redirect_uri, + scopes=self.scopes + ) + + if not auth_code: + raise ValueError("Failed to get authorization code") + + # Exchange the auth code for tokens + success, message = self._handle_auth_code(auth_code) + if success: + return f"oauth:{self.access_token}" + else: + raise ValueError(f"Authentication failed: {message}") + + def validate_token(self) -> Dict[str, Any]: + """ + Validate the current access token + + Returns: + Dict: Token validation information + + Raises: + ValueError: If validation fails + """ + if not self.access_token: + raise ValueError("No access token available") + + validate_url = "https://id.twitch.tv/oauth2/validate" + headers = { + "Authorization": f"OAuth {self.access_token}" + } + + try: + response = requests.get(validate_url, headers=headers) + response.raise_for_status() + return response.json() + except Exception as e: + logger.error(f"Token validation failed: {e}") + raise ValueError(f"Token validation failed: {str(e)}") + + def update_manually(self, access_token: str, refresh_token: str = None, expires_in: int = 14400) -> None: + """ + Manually update the tokens (useful for setting tokens from other sources) + + Args: + access_token: The access token + refresh_token: The refresh token (optional) + expires_in: Token validity in seconds (default 4 hours) + """ + self.access_token = access_token + if refresh_token: + self.refresh_token = refresh_token + self.token_expiry = time.time() + expires_in - 3600 # 1 hour safety margin + + # Save to cache + token_data = { + "access_token": access_token, + "expires_in": expires_in, + "token_type": "bearer" + } + if refresh_token: + token_data["refresh_token"] = refresh_token + + self._save_token_to_cache(token_data) + logger.info(f"Manually updated access token, valid for {expires_in/3600:.1f} hours") diff --git a/src/core/auth_server.py b/src/core/auth_server.py new file mode 100644 index 0000000..6037e3b --- /dev/null +++ b/src/core/auth_server.py @@ -0,0 +1,398 @@ +""" +Simple HTTPS server to handle OAuth redirects from Twitch +""" +import http.server +import socketserver +import threading +import urllib.parse +import webbrowser +import logging +import ssl +import os +import time +import platform +import subprocess +from typing import Optional, Callable, Dict, Any +from pathlib import Path +import datetime + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class AuthHandler(http.server.BaseHTTPRequestHandler): + """HTTP request handler for OAuth redirects""" + + # Class variables to store auth code and callback + auth_code: Optional[str] = None + auth_callback: Optional[Callable[[str], None]] = None + + def do_GET(self): + """Handle GET requests - extract auth code from URL""" + try: + # Parse the URL and query parameters + parsed_url = urllib.parse.urlparse(self.path) + query_params = urllib.parse.parse_qs(parsed_url.query) + + if "code" in query_params: + # Extract the authorization code + AuthHandler.auth_code = query_params["code"][0] + logger.info("Authorization code received") + + # Send a success response to the user + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + + html_content = """ + + + + Authentication Successful + + + +
+
+

Authentication Successful!

+

You have successfully authenticated with Twitch.

+

You can now close this window and return to the application.

+
+ + + """ + self.wfile.write(html_content.encode()) + + # Call the callback function with the auth code + if AuthHandler.auth_callback: + AuthHandler.auth_callback(AuthHandler.auth_code) + else: + # Handle error or missing code + self.send_response(400) + self.send_header("Content-type", "text/html") + self.end_headers() + self.wfile.write(b"

Authentication Error

No authorization code received.

") + logger.error("No authorization code in redirect") + + except Exception as e: + # Handle any exceptions + logger.error(f"Error handling OAuth redirect: {e}") + self.send_response(500) + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write(f"Error: {str(e)}".encode()) + + def log_message(self, format, *args): + """Override log_message to use our logger""" + logger.debug(f"AuthHandler: {format % args}") + + +def generate_self_signed_cert_with_cryptography(cert_path: str, key_path: str) -> bool: + """ + Generate a self-signed certificate using the cryptography library + + Args: + cert_path: Path to save the certificate file + key_path: Path to save the key file + + Returns: + bool: True if successful, False otherwise + """ + try: + # Import cryptography modules here to avoid requiring it until needed + from cryptography import x509 + from cryptography.x509.oid import NameOID + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.hazmat.primitives import serialization + + # Ensure the directory exists + os.makedirs(os.path.dirname(cert_path), exist_ok=True) + + # Generate a private key + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + + # Write private key to file + with open(key_path, "wb") as f: + f.write(private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + )) + + # Create a self-signed certificate + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, "localhost"), + ]) + + cert = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + private_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + datetime.datetime.utcnow() + ).not_valid_after( + # Valid for 1 year + datetime.datetime.utcnow() + datetime.timedelta(days=365) + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName("localhost")]), + critical=False, + ).sign(private_key, hashes.SHA256()) + + # Write certificate to file + with open(cert_path, "wb") as f: + f.write(cert.public_bytes(serialization.Encoding.PEM)) + + logger.info(f"Generated self-signed certificate using cryptography: {cert_path}") + return True + + except ImportError: + logger.error("Failed to import cryptography library. Please install it: pip install cryptography") + return False + except Exception as e: + logger.error(f"Failed to generate self-signed certificate: {e}") + return False + + +def generate_self_signed_cert(cert_path: str, key_path: str) -> bool: + """ + Generate a self-signed certificate for local HTTPS development + + This function first tries to use the cryptography library, and falls back + to OpenSSL command line if that fails. + + Args: + cert_path: Path to save the certificate file + key_path: Path to save the key file + + Returns: + bool: True if successful, False otherwise + """ + # Try using cryptography library first (pure Python solution) + if generate_self_signed_cert_with_cryptography(cert_path, key_path): + return True + + # Fallback to OpenSSL command line + try: + # Check if OpenSSL is available + import subprocess + + # Ensure the directory exists + os.makedirs(os.path.dirname(cert_path), exist_ok=True) + + # Generate self-signed certificate valid for 1 year + cmd = [ + 'openssl', 'req', '-new', '-x509', '-days', '365', '-nodes', + '-out', cert_path, + '-keyout', key_path, + '-subj', '/CN=localhost' + ] + + subprocess.run(cmd, check=True) + logger.info(f"Generated self-signed certificate using OpenSSL: {cert_path}") + return True + except Exception as e: + logger.error(f"Failed to generate self-signed certificate: {e}") + return False + + +def start_auth_server(port: int = 3000, callback: Callable[[str], None] = None) -> threading.Thread: + """ + Start the HTTPS auth server in a separate thread + + Args: + port: The port to listen on + callback: Function to call with the auth code when received + + Returns: + Thread object for the server + """ + # Reset the auth code + AuthHandler.auth_code = None + # Set the callback + AuthHandler.auth_callback = callback + + # Paths for the SSL certificate and key + data_dir = Path("data/ssl") + data_dir.mkdir(parents=True, exist_ok=True) + cert_path = str(data_dir / "localhost.crt") + key_path = str(data_dir / "localhost.key") + + # Generate a self-signed certificate if it doesn't exist + if not (os.path.exists(cert_path) and os.path.exists(key_path)): + if not generate_self_signed_cert(cert_path, key_path): + logger.error("Failed to create SSL certificate for HTTPS server") + return None + + # Create HTTPS server + server = socketserver.TCPServer(("", port), AuthHandler) + + # Wrap the socket with SSL + server.socket = ssl.wrap_socket( + server.socket, + keyfile=key_path, + certfile=cert_path, + server_side=True + ) + + logger.info(f"Starting HTTPS auth server on port {port}") + logger.info(f"NOTE: Your browser will show a security warning about the self-signed certificate.") + logger.info(f"This is normal for local development. Please proceed to the site anyway.") + + # Run server in a separate thread + server_thread = threading.Thread(target=server.serve_forever) + server_thread.daemon = True + server_thread.start() + + return server_thread + + +def stop_auth_server(server_thread: threading.Thread) -> None: + """ + Stop the auth server + + Args: + server_thread: The server thread to stop + """ + if server_thread and server_thread.is_alive(): + # The thread is daemon, so it will be terminated when the main program exits + logger.info("Auth server will terminate when program exits") + + +def open_browser_with_fallback(url: str) -> bool: + """ + Open a browser with fallback methods if the standard approach fails + + Args: + url: The URL to open + + Returns: + bool: True if a browser was successfully opened, False otherwise + """ + try: + # Try the standard webbrowser module first + logger.info(f"Opening browser: {url}") + if webbrowser.open(url): + return True + + # If that fails, try platform-specific methods + system = platform.system().lower() + + if system == 'windows': + try: + os.startfile(url) + return True + except: + subprocess.Popen(['start', url], shell=True) + return True + elif system == 'darwin': # macOS + try: + subprocess.Popen(['open', url]) + return True + except: + pass + elif system == 'linux': + try: + subprocess.Popen(['xdg-open', url]) + return True + except: + try: + subprocess.Popen(['gnome-open', url]) + return True + except: + try: + subprocess.Popen(['kde-open', url]) + return True + except: + pass + + logger.warning("Could not open browser automatically") + return False + except Exception as e: + logger.warning(f"Error opening browser: {e}") + return False + + +def start_oauth_flow(client_id: str, redirect_uri: str = "https://localhost:3000", + scopes: str = "chat:read chat:edit", + port: int = 3000) -> Optional[str]: + """ + Start the OAuth flow by opening the browser and waiting for the redirect + + Args: + client_id: Twitch client ID + redirect_uri: The redirect URI (should use HTTPS) + scopes: Space-separated list of scopes to request + port: The port to listen on + + Returns: + The authorization code or None if failed + """ + auth_code = [None] # Using list to allow modification in callback + auth_event = threading.Event() + + def auth_callback(code: str): + auth_code[0] = code + auth_event.set() + + # Start the auth server + server_thread = start_auth_server(port, auth_callback) + if not server_thread: + logger.error("Failed to start HTTPS server") + return None + + # Construct the auth URL + auth_url = ( + f"https://id.twitch.tv/oauth2/authorize" + f"?client_id={client_id}" + f"&redirect_uri={redirect_uri}" + f"&response_type=code" + f"&scope={scopes.replace(' ', '+')}" + ) + + # Try to open the browser with fallback methods + browser_opened = open_browser_with_fallback(auth_url) + + if not browser_opened: + # If browser opening fails, provide the URL for manual navigation + print("\n⚠️ Could not open browser automatically ⚠️") + print("Please copy and paste the following URL into your browser:") + print(f"\n{auth_url}\n") + print("After authenticating, you'll be redirected to the local app.") + + # Wait for the callback (with timeout) + auth_event.wait(timeout=300) # 5 minutes timeout + + if not auth_code[0]: + logger.error("Authentication timed out or failed") + + return auth_code[0] \ No newline at end of file diff --git a/src/core/twitch.py b/src/core/twitch.py new file mode 100644 index 0000000..52da207 --- /dev/null +++ b/src/core/twitch.py @@ -0,0 +1,237 @@ +import socket +import re +import logging +import threading +import requests +import time +import json +import os +from typing import Dict, Callable, List, Optional, Any, Tuple, Union, Match, Set + +# Import the queue functionality +from src.queue.server import enqueue_command, enqueue_message, get_queue_stats +from src.core.auth import TwitchAuth + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Type alias for command callback function +CommandCallback = Callable[[str, List[str], 'TwitchBot'], None] +# Type alias for message data dictionary +MessageData = Dict[str, str] + +class TwitchBot: + def __init__(self, username: str, client_id: str = None, client_secret: str = None, + access_token: str = None, refresh_token: str = None, + channel: str = None, use_queue: bool = False, + token_cache_file: str = "data/cache/token_cache.json", + admin_users: List[str] = None) -> None: + self.username: str = username.lower() + self.client_id: Optional[str] = client_id or os.environ.get("TWITCH_CLIENT_ID") + self.client_secret: Optional[str] = client_secret or os.environ.get("TWITCH_CLIENT_SECRET") + self.channel: str = channel.lower() if channel else "" + self.server: str = 'irc.chat.twitch.tv' + self.port: int = 6667 + self.socket: socket.socket = socket.socket() + self.commands: Dict[str, CommandCallback] = {} + self.running: bool = False + self.last_command: str = "" + + # Admin users who have the same privileges as the channel owner + self.admin_users: List[str] = [self.channel] # Channel owner is always an admin + if admin_users: + self.admin_users.extend([user.lower() for user in admin_users]) + + # Use access_token from args or environment + direct_access_token = access_token or os.environ.get("TWITCH_ACCESS_TOKEN") + direct_refresh_token = refresh_token or os.environ.get("TWITCH_REFRESH_TOKEN") + + # Auth token handling + self.auth = TwitchAuth(self.client_id, self.client_secret, token_cache_file) + self.token_cache_file: str = token_cache_file + + # If tokens provided directly, update the auth handler + if direct_access_token: + logger.info("Using provided access token") + self.auth.update_manually( + access_token=direct_access_token, + refresh_token=direct_refresh_token + ) + + # Queue integration + self.use_queue: bool = use_queue + self.oauth_token: Optional[str] = None + + def is_admin(self, username: str) -> bool: + """Check if a user has admin privileges""" + return username.lower() in self.admin_users + + @property + def access_token(self) -> Optional[str]: + """Get the current access token from the auth handler""" + return self.auth.access_token + + @property + def token_expiry(self) -> Optional[float]: + """Get the token expiry from the auth handler""" + return self.auth.token_expiry + + def get_oauth_token(self) -> str: + """Get an OAuth token for authenticating with Twitch""" + return self.auth.get_oauth_token() + + def connect(self) -> bool: + """Connect to Twitch IRC server""" + try: + # Get the OAuth token + oauth_token = self.get_oauth_token() + self.oauth_token = oauth_token # Store for queue use + + self.socket.connect((self.server, self.port)) + # Add a timeout so we can process keyboard interrupts + self.socket.settimeout(1.0) + self.socket.send(f"PASS {oauth_token}\n".encode('utf-8')) + self.socket.send(f"NICK {self.username}\n".encode('utf-8')) + self.socket.send(f"JOIN #{self.channel}\n".encode('utf-8')) + + # Wait for confirmation + data = self.socket.recv(2048).decode('utf-8') + if "Welcome" in data or "You are in a maze of twisty passages" in data: + logger.info(f"Connected to {self.channel}'s chat") + return True + else: + logger.error(f"Failed to connect: {data}") + return False + + except Exception as e: + logger.error(f"Failed to connect: {e}") + return False + + def register_command(self, command: str, callback: CommandCallback) -> None: + """Register a command with a callback function""" + self.commands[command.lower()] = callback + logger.info(f"Registered command: {command}") + + def send_message(self, message: str) -> None: + """Send a message to the Twitch chat""" + self.socket.send(f"PRIVMSG #{self.channel} :{message}\n".encode('utf-8')) + + def parse_message(self, message: str) -> Optional[MessageData]: + """Parse the IRC message and extract relevant information""" + # This regex parses Twitch IRC messages + pattern: str = r':(?P[\w]+)!(?P=username)@(?P=username)\.tmi\.twitch\.tv PRIVMSG #(?P[\w]+) :(?P.+)' + match: Optional[Match[str]] = re.match(pattern, message) + + if match: + return { + 'username': match.group('username'), + 'channel': match.group('channel'), + 'content': match.group('content') + } + return None + + def process_commands(self, message_data: MessageData) -> None: + """Process commands from chat messages""" + if not message_data: + return + + content: str = message_data['content'].strip() + username: str = message_data['username'] + + # Check if we should use the queue + if self.use_queue: + # If this is a regular message, add it to the message queue + if not content.startswith('!'): + enqueue_message(username, content) + return + + # Check if message is a command (starts with !) + if content.startswith('!'): + parts: List[str] = content.split() + command: str = parts[0][1:].lower() # Remove ! and convert to lowercase + args: List[str] = parts[1:] if len(parts) > 1 else [] + + # Store the last executed command + self.last_command = command + + # Check if we should use the queue + if self.use_queue: + # Add the command to the queue and return + # We need to create a simple command registry to pass to the queue + command_registry = {cmd: cmd for cmd in self.commands.keys()} + + # Make sure we have an OAuth token + if not self.oauth_token: + self.oauth_token = self.get_oauth_token() + + enqueue_command( + username, + command, + args, + channel=self.channel, + oauth_token=self.oauth_token, + bot_username=self.username, + command_registry=command_registry + ) + return + + # Direct execution (no queue) + if command in self.commands: + try: + self.commands[command](username, args, self) + except Exception as e: + logger.error(f"Error executing command {command}: {e}") + + def start(self, use_queue: bool = False) -> None: + """Start the bot and listen for messages""" + # Update queue setting + self.use_queue = use_queue + logger.info(f"Starting bot with queue {'enabled' if use_queue else 'disabled'}") + + if not self.connect(): + logger.error("Failed to start bot: Connection error") + return + + self.running = True + buffer: str = "" + + while self.running: + try: + new_data: str = self.socket.recv(2048).decode('utf-8') + buffer += new_data + lines: List[str] = buffer.split("\r\n") + buffer = lines.pop() + + for line in lines: + logger.debug(f"Received: {line}") + + # Respond to PING to keep the connection alive + if line.startswith("PING"): + self.socket.send("PONG\r\n".encode('utf-8')) + logger.debug("Sent PONG") + continue + + message_data: Optional[MessageData] = self.parse_message(line) + if message_data: + self.process_commands(message_data) + + except socket.timeout: + # This is expected with our timeout setting - just continue the loop + continue + except KeyboardInterrupt: + logger.info("Keyboard interrupt received, shutting down...") + self.running = False + except Exception as e: + logger.error(f"Error in bot loop: {e}") + self.running = False + + self.socket.close() + logger.info("Bot stopped") + + def stop(self) -> None: + """Stop the bot""" + self.running = False + + def get_queue_stats(self) -> Dict[str, Any]: + """Get current queue statistics""" + return get_queue_stats() diff --git a/src/features/__init__.py b/src/features/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/features/examples.py b/src/features/examples.py new file mode 100644 index 0000000..3fcd864 --- /dev/null +++ b/src/features/examples.py @@ -0,0 +1,270 @@ +import time +import threading +import subprocess +import json +import os +from typing import Dict, List, Any, Optional, Set, Tuple, Union, cast +from src.core.twitch import TwitchBot + +class AdvancedExamples: + """ + Examples of more advanced Twitch bot functionality: + - Voting system + - Timed messages + - External application control + - User points system + """ + + def __init__(self, bot: TwitchBot) -> None: + self.bot: TwitchBot = bot + self.votes: Dict[str, int] = {} + self.current_poll: Optional[Dict[str, Any]] = None + self.poll_active: bool = False + self.timer_threads: Dict[str, threading.Timer] = {} + self.user_points: Dict[str, int] = self.load_points() + + # Register commands + self.bot.register_command("poll", self.cmd_start_poll) + self.bot.register_command("vote", self.cmd_vote) + self.bot.register_command("endpoll", self.cmd_end_poll) + self.bot.register_command("timer", self.cmd_timer) + self.bot.register_command("points", self.cmd_points) + self.bot.register_command("give", self.cmd_give_points) + + # For external application control (example) + if os.name == 'nt': # Windows + self.bot.register_command("sound", self.cmd_play_sound) + + def load_points(self) -> Dict[str, int]: + """Load user points from a JSON file""" + try: + with open('user_points.json', 'r') as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + return {} + + def save_points(self) -> None: + """Save user points to a JSON file""" + with open('user_points.json', 'w') as f: + json.dump(self.user_points, f) + + # ---- Voting System ---- + + def cmd_start_poll(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Start a new poll with options""" + if not args or len(args) < 3: + bot.send_message(f"@{username}, usage: !poll \"Question\" \"Option1\" \"Option2\" [\"Option3\"...]") + return + + if self.poll_active: + bot.send_message(f"@{username}, a poll is already active. End it with !endpoll first.") + return + + # Reset votes + self.votes = {} + + # Extract question and options + question: str = args[0] + options: List[str] = args[1:] + + self.current_poll = { + "question": question, + "options": options, + "started_by": username, + "timestamp": time.time() + } + + self.poll_active = True + + # Announce poll + bot.send_message(f"POLL STARTED: {question}") + for i, option in enumerate(options): + bot.send_message(f"Option {i+1}: {option}") + bot.send_message("Vote using !vote ") + + def cmd_vote(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Cast a vote in the active poll""" + if not self.poll_active or not self.current_poll: + bot.send_message(f"@{username}, there is no active poll.") + return + + if not args: + bot.send_message(f"@{username}, please specify an option number: !vote ") + return + + try: + vote: int = int(args[0]) + if vote < 1 or vote > len(self.current_poll["options"]): + raise ValueError + except ValueError: + bot.send_message(f"@{username}, please provide a valid option number between 1 and {len(self.current_poll['options'])}") + return + + # Record the vote + self.votes[username] = vote + bot.send_message(f"@{username} voted for option {vote}: {self.current_poll['options'][vote-1]}") + + def cmd_end_poll(self, username: str, args: List[str], bot: TwitchBot) -> None: + """End the active poll and show results""" + if not self.poll_active or not self.current_poll: + bot.send_message(f"@{username}, there is no active poll.") + return + + if username != self.current_poll["started_by"] and not bot.is_admin(username): + bot.send_message(f"@{username}, only {self.current_poll['started_by']} or an admin can end this poll.") + return + + # Count votes + results: Dict[int, int] = {} + for i in range(1, len(self.current_poll["options"]) + 1): + results[i] = 0 + + for vote in self.votes.values(): + results[vote] += 1 + + # Find winner(s) + max_votes: int = max(results.values()) if results else 0 + winners: List[int] = [opt for opt, count in results.items() if count == max_votes] + + # Announce results + bot.send_message(f"POLL ENDED: {self.current_poll['question']}") + for i, option in enumerate(self.current_poll["options"], 1): + bot.send_message(f"Option {i}: {option} - {results.get(i, 0)} votes") + + if max_votes > 0: + if len(winners) == 1: + winner_option: str = self.current_poll["options"][winners[0]-1] + bot.send_message(f"The winner is: {winner_option} with {max_votes} votes!") + else: + tied_options: List[str] = [self.current_poll["options"][w-1] for w in winners] + bot.send_message(f"It's a tie between: {', '.join(tied_options)} with {max_votes} votes each!") + else: + bot.send_message("No votes were cast.") + + # Reset poll + self.poll_active = False + self.current_poll = None + self.votes = {} + + # ---- Timer System ---- + + def timer_callback(self, timer_id: str, message: str) -> None: + """Callback function for timers""" + self.bot.send_message(message) + del self.timer_threads[timer_id] + + def cmd_timer(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Set a timer to send a message after X seconds""" + if len(args) < 2: + bot.send_message(f"@{username}, usage: !timer ") + return + + try: + seconds: int = int(args[0]) + if seconds <= 0 or seconds > 3600: # Limit to 1 hour + raise ValueError + except ValueError: + bot.send_message(f"@{username}, please provide a valid time between 1-3600 seconds.") + return + + message: str = f"⏰ TIMER ({username}): {' '.join(args[1:])}" + bot.send_message(f"@{username}, timer set for {seconds} seconds!") + + # Create a unique ID for this timer + timer_id: str = f"{username}_{int(time.time())}" + + # Create and start the timer thread + timer_thread: threading.Timer = threading.Timer(seconds, self.timer_callback, args=[timer_id, message]) + timer_thread.daemon = True + timer_thread.start() + + # Store the thread + self.timer_threads[timer_id] = timer_thread + + # ---- Points System ---- + + def cmd_points(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Check user points""" + target_user: str = username + if args and args[0].startswith('@'): + target_user = args[0][1:].lower() + + points: int = self.user_points.get(target_user, 0) + bot.send_message(f"@{username}, {target_user} has {points} points.") + + def cmd_give_points(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Give points to a user (admin only)""" + if not bot.is_admin(username): + bot.send_message(f"@{username}, only the channel owner or admins can give points.") + return + + if len(args) < 2: + bot.send_message(f"@{username}, usage: !give ") + return + + target_user: str = args[0] + if target_user.startswith('@'): + target_user = target_user[1:] + target_user = target_user.lower() + + try: + points: int = int(args[1]) + if points <= 0: + raise ValueError + except ValueError: + bot.send_message(f"@{username}, please provide a valid positive number of points.") + return + + # Add points to user + self.user_points[target_user] = self.user_points.get(target_user, 0) + points + self.save_points() + + bot.send_message(f"@{target_user}, you received {points} points from {username}! You now have {self.user_points[target_user]} points.") + + # ---- External Control ---- + + def cmd_play_sound(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Play a sound effect (Windows only)""" + if not args: + bot.send_message(f"@{username}, usage: !sound ") + return + + sound_name: str = args[0].lower() + sound_files: Dict[str, str] = { + "applause": "applause.wav", + "drumroll": "drumroll.wav", + "fail": "fail.wav", + "victory": "victory.wav" + } + + if sound_name not in sound_files: + bot.send_message(f"@{username}, available sounds: {', '.join(sound_files.keys())}") + return + + sound_path: str = os.path.join("sounds", sound_files[sound_name]) + + if not os.path.exists(sound_path): + bot.send_message(f"@{username}, sound file not found: {sound_path}") + return + + # Play the sound using Windows PowerShell + try: + ps_command: str = f'powershell -c (New-Object Media.SoundPlayer "{sound_path}").PlaySync();' + subprocess.Popen(ps_command, shell=True) + bot.send_message(f"@{username} played the {sound_name} sound!") + except Exception as e: + bot.send_message(f"Error playing sound: {e}") + + +# Example usage in main.py: +""" +from examples import AdvancedExamples + +def main(): + # ... set up bot ... + + # Initialize advanced examples + advanced = AdvancedExamples(bot) + + # ... start bot ... +""" \ No newline at end of file diff --git a/src/game/__init__.py b/src/game/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/game/controller.py b/src/game/controller.py new file mode 100644 index 0000000..370c0c9 --- /dev/null +++ b/src/game/controller.py @@ -0,0 +1,483 @@ +import time +import threading +import random +from collections import defaultdict, Counter +import json +from typing import Dict, Callable, List, Optional, Any, DefaultDict, Union, Literal, Set, Tuple, TypeVar, cast +from src.core.twitch import TwitchBot + +try: + from pynput.keyboard import Key, Controller as KeyboardController + from pynput.mouse import Button, Controller as MouseController + PYNPUT_AVAILABLE = True +except ImportError: + PYNPUT_AVAILABLE = False + print("pynput library not found. Install it with: pip install pynput") + print("Without pynput, keyboard/mouse control simulation will be disabled.") + +# Try to import the VirtualController +try: + from src.game.controller import VirtualController, VGAMEPAD_AVAILABLE +except ImportError: + VGAMEPAD_AVAILABLE = False + print("controller_support.py not found or error importing it.") + print("Virtual controller support will be disabled.") + +# Type for command functions +CommandFunc = Callable[[], None] +# Type for stats +StatsDict = Dict[str, int] +# Type for user stats dictionary +UserStatsDict = Dict[str, StatsDict] + +class GameController: + """ + Class for controlling games via Twitch chat commands + """ + def __init__(self, bot: TwitchBot, mode: Literal["direct", "vote"] = "direct", + cooldown: float = 2.0, input_type: Literal["keyboard", "controller"] = "keyboard") -> None: + self.bot: TwitchBot = bot + self.mode: Literal["direct", "vote"] = mode # "direct" or "vote" + self.cooldown: float = cooldown + self.input_type: Literal["keyboard", "controller"] = input_type # "keyboard" or "controller" + self.last_command_time: Dict[str, float] = {} + self.active_vote: bool = False + self.votes: Counter = Counter() + self.vote_timer: Optional[threading.Timer] = None + self.commands: Dict[str, CommandFunc] = {} + + # Streamer override mode + self.streamer_override: bool = False + + # Initialize controllers + self.keyboard: Optional[KeyboardController] = None + self.mouse: Optional[MouseController] = None + if PYNPUT_AVAILABLE: + self.keyboard = KeyboardController() + self.mouse = MouseController() + + # Initialize virtual controller if requested and available + self.controller: Optional[VirtualController] = None + if input_type == "controller" and 'VirtualController' in globals(): + self.controller = VirtualController() + if not self.controller.is_available(): + print("Virtual controller initialization failed, falling back to keyboard controls") + self.input_type = "keyboard" + else: + print("Using virtual Xbox controller for game inputs") + + # Command mappings - customize these for your game + self.init_commands() + + # Register commands with the bot + self.register_commands() + + # Load user stats + self.user_stats: DefaultDict[str, Dict[str, int]] = self.load_stats() + + def init_commands(self) -> None: + """Initialize command mappings based on input type""" + if self.input_type == "controller" and self.controller and self.controller.is_available(): + # Controller-based commands + self.commands = { + # Movement with left stick + "up": self.controller.move_left_stick_up, + "down": self.controller.move_left_stick_down, + "left": self.controller.move_left_stick_left, + "right": self.controller.move_left_stick_right, + + # Camera with right stick + "look_up": self.controller.move_right_stick_up, + "look_down": self.controller.move_right_stick_down, + "look_left": self.controller.move_right_stick_left, + "look_right": self.controller.move_right_stick_right, + + # Action buttons + "jump": self.controller.press_a, + "action": self.controller.press_b, + "interact": self.controller.press_x, + "menu": self.controller.press_y, + + # Shoulder buttons and triggers + "block": self.controller.press_left_shoulder, + "attack": self.controller.press_right_shoulder, + "aim": self.controller.press_left_trigger, + "shoot": self.controller.press_right_trigger, + + # D-pad for items/quick slots + "item1": self.controller.press_dpad_up, + "item2": self.controller.press_dpad_right, + "item3": self.controller.press_dpad_down, + "item4": self.controller.press_dpad_left, + + # Menu buttons + "start": self.controller.press_start, + "select": self.controller.press_back + } + else: + # Keyboard and mouse based commands + self.commands = { + "up": self.press_key_w, + "down": self.press_key_s, + "left": self.press_key_a, + "right": self.press_key_d, + "jump": self.press_key_space, + "attack": self.press_mouse_left, + "interact": self.press_key_e, + "inventory": self.press_key_i, + "skill1": self.press_key_1, + "skill2": self.press_key_2, + "skill3": self.press_key_3, + "ultimate": self.press_key_r + } + + def register_commands(self) -> None: + """Register game control commands with the Twitch bot""" + if self.mode == "direct": + # Direct mode: Each command executes immediately + for cmd_name in self.commands: + self.bot.register_command(cmd_name, self.handle_direct_command) + else: + # Vote mode: Commands are collected for voting + self.bot.register_command("vote", self.handle_vote) + self.bot.register_command("startvote", self.start_vote_session) + self.bot.register_command("endvote", self.end_vote_session) + + # Register stats command + self.bot.register_command("gamestats", self.show_stats) + + # Register help command to list available game controls + self.bot.register_command("gamehelp", self.show_game_help) + + # Register streamer override commands + self.bot.register_command("takeover", self.enable_streamer_override) + self.bot.register_command("givecontrol", self.disable_streamer_override) + + def enable_streamer_override(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Enable streamer override mode - only channel owner can control the game""" + if not bot.is_admin(username): + bot.send_message(f"@{username}, only the channel owner or admins can enable streamer override.") + return + + if self.streamer_override: + bot.send_message(f"@{username}, you already have exclusive control.") + return + + self.streamer_override = True + + # If there's an active vote, end it + if self.active_vote and self.vote_timer: + self.vote_timer.cancel() + self.active_vote = False + self.votes.clear() + + # If using a controller and controller redirection is available, start it + if (self.input_type == "controller" and self.controller and + hasattr(self.controller, 'is_physical_controller_available') and + self.controller.is_physical_controller_available()): + + redirect_success = self.controller.start_controller_redirection() + if redirect_success: + bot.send_message("⚠️ STREAMER OVERRIDE ENABLED: Your physical controller is now controlling the game!") + else: + bot.send_message("⚠️ STREAMER OVERRIDE ENABLED: Only the streamer can control the game now!") + else: + bot.send_message("⚠️ STREAMER OVERRIDE ENABLED: Only the streamer can control the game now!") + + def disable_streamer_override(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Disable streamer override mode - allow chat to control the game again""" + if not bot.is_admin(username): + bot.send_message(f"@{username}, only the channel owner or admins can disable streamer override.") + return + + if not self.streamer_override: + bot.send_message(f"@{username}, streamer override is not active.") + return + + # Stop controller redirection if it's active + if (self.input_type == "controller" and self.controller and + hasattr(self.controller, 'stop_controller_redirection')): + self.controller.stop_controller_redirection() + + self.streamer_override = False + bot.send_message("🎮 CHAT CONTROL RESTORED: Chat can now control the game again!") + + def show_game_help(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Show available game control commands""" + bot.send_message(f"@{username}, available game controls: {', '.join(['!' + cmd for cmd in self.commands.keys()])}") + if self.mode == "vote": + bot.send_message(f"Vote using !vote [command] during active voting sessions") + + if bot.is_admin(username): + bot.send_message(f"Admin commands: !takeover (take exclusive control), !givecontrol (return control to chat)") + + def load_stats(self) -> DefaultDict[str, Dict[str, int]]: + """Load user stats from a JSON file""" + try: + with open('game_stats.json', 'r') as f: + stats_dict = json.load(f) + # Convert the loaded dictionary to a defaultdict + return defaultdict(lambda: {"commands": 0, "votes": 0}, stats_dict) + except (FileNotFoundError, json.JSONDecodeError): + return defaultdict(lambda: {"commands": 0, "votes": 0}) + + def save_stats(self) -> None: + """Save user stats to a JSON file""" + # Convert defaultdict to regular dict for JSON serialization + stats_dict: Dict[str, Dict[str, int]] = {k: dict(v) for k, v in self.user_stats.items()} + with open('game_stats.json', 'w') as f: + json.dump(stats_dict, f) + + def show_stats(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Show game control stats for a user""" + target_user: str = username + if args and args[0].startswith('@'): + target_user = args[0][1:].lower() + + stats: Dict[str, int] = self.user_stats.get(target_user, {"commands": 0, "votes": 0}) + + bot.send_message(f"@{username}, {target_user} has issued {stats['commands']} direct commands and {stats['votes']} votes.") + + # Command handling methods + + def handle_direct_command(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Handle direct command execution with cooldown""" + command: str = bot.last_command + + # Check for streamer override - only allow channel owner/admins + if self.streamer_override and not bot.is_admin(username): + # Silently ignore, don't message to prevent spam + return + + # Check cooldown (streamer/admins bypass the cooldown) + current_time: float = time.time() + if not bot.is_admin(username) and username in self.last_command_time: + time_diff: float = current_time - self.last_command_time[username] + if time_diff < self.cooldown: + return # Silently ignore, don't message (prevents spam) + + # Execute the command if it exists + if command in self.commands: + # Update stats + if username not in self.user_stats: + self.user_stats[username] = {"commands": 0, "votes": 0} + self.user_stats[username]["commands"] += 1 + + # Set cooldown (streamer/admins don't get cooldown) + if not bot.is_admin(username): + self.last_command_time[username] = current_time + + # Execute the command + self.commands[command]() + + # Confirm command execution + bot.send_message(f"@{username} used {command}!") + + # Save stats periodically + if random.random() < 0.1: # 10% chance to save, to avoid too frequent saves + self.save_stats() + + # Voting system methods + + def start_vote_session(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Start a voting session for game control""" + if not bot.is_admin(username): # Only allow admins + bot.send_message(f"@{username}, only the channel owner or admins can start vote sessions.") + return + + # Cannot start vote in streamer override mode + if self.streamer_override: + bot.send_message(f"@{username}, voting is disabled in streamer override mode. Use !givecontrol first.") + return + + if self.active_vote: + bot.send_message(f"@{username}, a vote is already active. End it with !endvote first.") + return + + # Get vote duration + duration: int = 30 # default: 30 seconds + if args and args[0].isdigit(): + duration = min(max(5, int(args[0])), 120) # limit between 5-120 seconds + + # Start vote + self.active_vote = True + self.votes = Counter() + + # Announce available commands + bot.send_message(f"GAME CONTROL VOTE STARTED! Vote with !vote command (e.g., !vote up)") + bot.send_message(f"Available commands: {', '.join(self.commands.keys())}") + bot.send_message(f"Voting ends in {duration} seconds!") + + # Start timer to end vote + self.vote_timer = threading.Timer(duration, self.execute_winning_vote) + self.vote_timer.daemon = True + self.vote_timer.start() + + def handle_vote(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Handle a vote for a game control command""" + # Check for streamer override - don't allow voting + if self.streamer_override: + # Silently ignore in streamer override mode + return + + if not self.active_vote: + bot.send_message(f"@{username}, there is no active vote. Ask the streamer to start one with !startvote") + return + + if not args: + bot.send_message(f"@{username}, please specify a command to vote for") + return + + vote: str = args[0].lower() + if vote not in self.commands: + bot.send_message(f"@{username}, invalid command. Available: {', '.join(self.commands.keys())}") + return + + # Record vote + self.votes[vote] += 1 + + # Update stats + if username not in self.user_stats: + self.user_stats[username] = {"commands": 0, "votes": 0} + self.user_stats[username]["votes"] += 1 + + # Save stats periodically + if random.random() < 0.05: # 5% chance to save + self.save_stats() + + def execute_winning_vote(self) -> None: + """Execute the winning vote command""" + if not self.active_vote or not self.votes: + self.bot.send_message("Vote ended with no valid votes.") + self.active_vote = False + return + + # Don't execute if streamer override was enabled during the vote + if self.streamer_override: + self.bot.send_message("Vote cancelled due to streamer override.") + self.active_vote = False + return + + # Find the command with the most votes + winner: str = self.votes.most_common(1)[0][0] + vote_count: int = self.votes[winner] + + # Execute the winning command + self.commands[winner]() + + # Announce the result + self.bot.send_message(f"Vote ended! Executing {winner} with {vote_count} votes!") + + # Reset vote + self.active_vote = False + + def end_vote_session(self, username: str, args: List[str], bot: TwitchBot) -> None: + """Manually end the current vote session""" + if not bot.is_admin(username): # Only allow admins + bot.send_message(f"@{username}, only the channel owner or admins can end vote sessions.") + return + + if not self.active_vote: + bot.send_message(f"@{username}, there is no active vote.") + return + + # Cancel timer if it's running + if self.vote_timer and self.vote_timer.is_alive(): + self.vote_timer.cancel() + + # Execute winning vote + self.execute_winning_vote() + + # Keyboard and mouse simulation methods + + def press_key(self, key: Union[str, Key], hold_time: float = 0.1) -> None: + """Press and release a keyboard key""" + if not PYNPUT_AVAILABLE or not self.keyboard: + print(f"Would press key: {key}") + return + + try: + self.keyboard.press(key) + time.sleep(hold_time) + self.keyboard.release(key) + except Exception as e: + print(f"Error pressing key {key}: {e}") + + def press_mouse_button(self, button: Button, hold_time: float = 0.1) -> None: + """Press and release a mouse button""" + if not PYNPUT_AVAILABLE or not self.mouse: + print(f"Would press mouse button: {button}") + return + + try: + self.mouse.press(button) + time.sleep(hold_time) + self.mouse.release(button) + except Exception as e: + print(f"Error pressing mouse button {button}: {e}") + + # Specific key methods mapped to game controls + + def press_key_w(self) -> None: + """Press W key (move up/forward)""" + self.press_key('w') + + def press_key_s(self) -> None: + """Press S key (move down/backward)""" + self.press_key('s') + + def press_key_a(self) -> None: + """Press A key (move left)""" + self.press_key('a') + + def press_key_d(self) -> None: + """Press D key (move right)""" + self.press_key('d') + + def press_key_space(self) -> None: + """Press Spacebar (jump)""" + self.press_key(Key.space) + + def press_key_e(self) -> None: + """Press E key (interact)""" + self.press_key('e') + + def press_key_i(self) -> None: + """Press I key (inventory)""" + self.press_key('i') + + def press_key_1(self) -> None: + """Press 1 key (skill 1)""" + self.press_key('1') + + def press_key_2(self) -> None: + """Press 2 key (skill 2)""" + self.press_key('2') + + def press_key_3(self) -> None: + """Press 3 key (skill 3)""" + self.press_key('3') + + def press_key_r(self) -> None: + """Press R key (ultimate ability)""" + self.press_key('r') + + def press_mouse_left(self) -> None: + """Press left mouse button (attack)""" + self.press_mouse_button(Button.left) + +# Example usage: +""" +from game_control import GameController + +def main(): + # ... set up the bot ... + + # Create a game controller in direct mode with keyboard input + game_controller = GameController(bot, mode="direct", cooldown=2.0, input_type="keyboard") + + # OR create a game controller in vote mode with controller input + # game_controller = GameController(bot, mode="vote", input_type="controller") + + # ... start the bot ... +""" \ No newline at end of file diff --git a/src/game/input/__init__.py b/src/game/input/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/game/input/gamepad.py b/src/game/input/gamepad.py new file mode 100644 index 0000000..20375dc --- /dev/null +++ b/src/game/input/gamepad.py @@ -0,0 +1,421 @@ +import time +import threading +from typing import Optional, Union, Any, Literal, Dict, Tuple, List, Set +import ctypes + +# Try to import vgamepad for Xbox controller emulation +try: + import vgamepad as vg + VGAMEPAD_AVAILABLE = True +except ImportError: + VGAMEPAD_AVAILABLE = False + print("vgamepad library not found. Install it with: pip install vgamepad") + print("Virtual controller functionality will be disabled.") + +# Try to import pysdl2 for physical controller detection +try: + import sdl2 + import sdl2.ext + PYSDL2_AVAILABLE = True +except ImportError: + PYSDL2_AVAILABLE = False + print("pysdl2 library not found. Install it with: pip install pysdl2 pysdl2-dll") + print("Physical controller redirection will be disabled.") + +# Define SDL2 constants if needed +SDL_CONTROLLER_BUTTON_MAX = 15 +SDL_CONTROLLER_BUTTON_A = 0 +SDL_CONTROLLER_BUTTON_B = 1 +SDL_CONTROLLER_BUTTON_X = 2 +SDL_CONTROLLER_BUTTON_Y = 3 +SDL_CONTROLLER_BUTTON_BACK = 4 +SDL_CONTROLLER_BUTTON_GUIDE = 5 +SDL_CONTROLLER_BUTTON_START = 6 +SDL_CONTROLLER_BUTTON_LEFTSTICK = 7 +SDL_CONTROLLER_BUTTON_RIGHTSTICK = 8 +SDL_CONTROLLER_BUTTON_LEFTSHOULDER = 9 +SDL_CONTROLLER_BUTTON_RIGHTSHOULDER = 10 +SDL_CONTROLLER_BUTTON_DPAD_UP = 11 +SDL_CONTROLLER_BUTTON_DPAD_DOWN = 12 +SDL_CONTROLLER_BUTTON_DPAD_LEFT = 13 +SDL_CONTROLLER_BUTTON_DPAD_RIGHT = 14 + +SDL_CONTROLLER_AXIS_LEFTX = 0 +SDL_CONTROLLER_AXIS_LEFTY = 1 +SDL_CONTROLLER_AXIS_RIGHTX = 2 +SDL_CONTROLLER_AXIS_RIGHTY = 3 +SDL_CONTROLLER_AXIS_TRIGGERLEFT = 4 +SDL_CONTROLLER_AXIS_TRIGGERRIGHT = 5 + +class VirtualController: + """ + Class for emulating an Xbox controller using vgamepad. + Can be used to extend the GameController class for controller-based games. + """ + def __init__(self) -> None: + self.available: bool = VGAMEPAD_AVAILABLE + self.gamepad: Optional[Any] = None + + # Physical controller redirection + self.physical_controller_available: bool = False + self.redirection_active: bool = False + self.redirection_thread: Optional[threading.Thread] = None + self.stop_redirection: bool = False + self.physical_controller: Optional[Any] = None + + if self.available: + try: + # Create a virtual Xbox 360 controller + self.gamepad = vg.VX360Gamepad() + print("Virtual Xbox 360 controller initialized successfully") + except Exception as e: + print(f"Failed to initialize virtual gamepad: {e}") + self.available = False + + # Initialize SDL2 for physical controller if available + if PYSDL2_AVAILABLE: + try: + # Initialize SDL2 + sdl2.SDL_Init(sdl2.SDL_INIT_JOYSTICK | sdl2.SDL_INIT_GAMECONTROLLER) + + # Get number of joysticks/controllers + num_joysticks = sdl2.SDL_NumJoysticks() + + if num_joysticks > 0: + self.physical_controller_available = True + print(f"Detected {num_joysticks} physical controller(s)") + print("Physical controller redirection is available") + else: + print("No physical controllers detected") + except Exception as e: + print(f"Failed to initialize SDL2 for controller detection: {e}") + + def is_available(self) -> bool: + """Check if the virtual controller is available""" + return self.available + + def is_physical_controller_available(self) -> bool: + """Check if a physical controller is available for redirection""" + return self.physical_controller_available + + # === Basic Controller Actions === + + def press_button(self, button: Any, hold_time: float = 0.1) -> None: + """Press and release a button on the controller""" + if not self.available: + print(f"Would press button: {button}") + return + + try: + # Press button + self.gamepad.press_button(button) + self.gamepad.update() + + # Hold for specified time + time.sleep(hold_time) + + # Release button + self.gamepad.release_button(button) + self.gamepad.update() + except Exception as e: + print(f"Error pressing button {button}: {e}") + + def press_trigger(self, trigger: Literal["LEFT", "RIGHT"], value: float = 1.0, hold_time: float = 0.1) -> None: + """Press a trigger with a specific value (0.0-1.0)""" + if not self.available: + print(f"Would press trigger {trigger} with value {value}") + return + + try: + if trigger == "LEFT": + self.gamepad.left_trigger(value=int(value * 255)) + else: # "RIGHT" + self.gamepad.right_trigger(value=int(value * 255)) + + self.gamepad.update() + time.sleep(hold_time) + + # Reset trigger + if trigger == "LEFT": + self.gamepad.left_trigger(value=0) + else: + self.gamepad.right_trigger(value=0) + + self.gamepad.update() + except Exception as e: + print(f"Error with trigger {trigger}: {e}") + + def move_joystick(self, stick: Literal["LEFT", "RIGHT"], x_value: float, y_value: float, hold_time: float = 0.1) -> None: + """Move a joystick to a position""" + if not self.available: + print(f"Would move {stick} stick to x:{x_value}, y:{y_value}") + return + + try: + # Convert from -1.0 to 1.0 range to the expected integer range + x: int = int(x_value * 32767) + y: int = int(y_value * 32767) + + if stick == "LEFT": + self.gamepad.left_joystick(x_value=x, y_value=y) + else: # "RIGHT" + self.gamepad.right_joystick(x_value=x, y_value=y) + + self.gamepad.update() + time.sleep(hold_time) + + # Reset joystick + if stick == "LEFT": + self.gamepad.left_joystick(x_value=0, y_value=0) + else: + self.gamepad.right_joystick(x_value=0, y_value=0) + + self.gamepad.update() + except Exception as e: + print(f"Error with {stick} joystick: {e}") + + def reset(self) -> None: + """Reset all controller inputs""" + if not self.available: + print("Would reset controller to neutral state") + return + + try: + self.gamepad.reset() + self.gamepad.update() + except Exception as e: + print(f"Error resetting controller: {e}") + + # === Controller Redirection System === + + def start_controller_redirection(self) -> bool: + """Start redirecting physical controller input to virtual controller""" + if not self.available or not self.physical_controller_available or not PYSDL2_AVAILABLE: + print("Cannot start controller redirection: virtual or physical controller not available") + return False + + if self.redirection_active: + print("Controller redirection already active") + return True + + try: + # Reset flags + self.stop_redirection = False + self.redirection_active = True + + # Start redirection in a separate thread + self.redirection_thread = threading.Thread(target=self._controller_redirection_loop) + self.redirection_thread.daemon = True + self.redirection_thread.start() + + print("Controller redirection started: Physical controller now controlling virtual controller") + return True + except Exception as e: + print(f"Failed to start controller redirection: {e}") + self.redirection_active = False + return False + + def stop_controller_redirection(self) -> None: + """Stop redirecting physical controller input""" + if not self.redirection_active: + return + + self.stop_redirection = True + if self.redirection_thread and self.redirection_thread.is_alive(): + # Wait for thread to finish + self.redirection_thread.join(timeout=1.0) + + self.redirection_active = False + self.redirection_thread = None + print("Controller redirection stopped") + + # Reset virtual controller to neutral position + if self.available: + self.reset() + + def _controller_redirection_loop(self) -> None: + """Main loop for controller redirection""" + if not PYSDL2_AVAILABLE: + return + + # Button mapping from SDL2 to vgamepad + button_mapping: Dict[int, Any] = { + SDL_CONTROLLER_BUTTON_A: vg.XUSB_BUTTON.XUSB_GAMEPAD_A, # A + SDL_CONTROLLER_BUTTON_B: vg.XUSB_BUTTON.XUSB_GAMEPAD_B, # B + SDL_CONTROLLER_BUTTON_X: vg.XUSB_BUTTON.XUSB_GAMEPAD_X, # X + SDL_CONTROLLER_BUTTON_Y: vg.XUSB_BUTTON.XUSB_GAMEPAD_Y, # Y + SDL_CONTROLLER_BUTTON_LEFTSHOULDER: vg.XUSB_BUTTON.XUSB_GAMEPAD_LEFT_SHOULDER, # LB + SDL_CONTROLLER_BUTTON_RIGHTSHOULDER: vg.XUSB_BUTTON.XUSB_GAMEPAD_RIGHT_SHOULDER, # RB + SDL_CONTROLLER_BUTTON_BACK: vg.XUSB_BUTTON.XUSB_GAMEPAD_BACK, # Back + SDL_CONTROLLER_BUTTON_START: vg.XUSB_BUTTON.XUSB_GAMEPAD_START, # Start + SDL_CONTROLLER_BUTTON_LEFTSTICK: vg.XUSB_BUTTON.XUSB_GAMEPAD_LEFT_THUMB, # Left stick press + SDL_CONTROLLER_BUTTON_RIGHTSTICK: vg.XUSB_BUTTON.XUSB_GAMEPAD_RIGHT_THUMB, # Right stick press + SDL_CONTROLLER_BUTTON_DPAD_UP: vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_UP, # D-pad Up + SDL_CONTROLLER_BUTTON_DPAD_DOWN: vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_DOWN, # D-pad Down + SDL_CONTROLLER_BUTTON_DPAD_LEFT: vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_LEFT, # D-pad Left + SDL_CONTROLLER_BUTTON_DPAD_RIGHT: vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_RIGHT, # D-pad Right + } + + # Keep track of button states to detect changes + button_states: Dict[int, bool] = {} + + try: + # Open the first controller + controller_index = 0 + game_controller = sdl2.SDL_GameControllerOpen(controller_index) + + if not game_controller: + print("Could not open game controller") + return + + while not self.stop_redirection: + # Process SDL events + event = sdl2.SDL_Event() + while sdl2.SDL_PollEvent(ctypes.byref(event)): + pass # Just process events to keep SDL happy + + # Process buttons + for btn_idx in range(SDL_CONTROLLER_BUTTON_MAX): + pressed = sdl2.SDL_GameControllerGetButton(game_controller, btn_idx) == 1 + + # Check if state changed + if btn_idx not in button_states or button_states[btn_idx] != pressed: + button_states[btn_idx] = pressed + + if btn_idx in button_mapping: + if pressed: + self.gamepad.press_button(button_mapping[btn_idx]) + else: + self.gamepad.release_button(button_mapping[btn_idx]) + + # Process axes (sticks) + # Left stick + left_x = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_LEFTX) / 32767.0 + left_y = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_LEFTY) / 32767.0 + self.gamepad.left_joystick(x_value=int(left_x * 32767), y_value=int(left_y * 32767)) + + # Right stick + right_x = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_RIGHTX) / 32767.0 + right_y = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_RIGHTY) / 32767.0 + self.gamepad.right_joystick(x_value=int(right_x * 32767), y_value=int(right_y * 32767)) + + # Process triggers + left_trigger = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_TRIGGERLEFT) / 32767.0 + right_trigger = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_TRIGGERRIGHT) / 32767.0 + + # Convert to 0 to 255 range + left_trigger_val = int(left_trigger * 255) + right_trigger_val = int(right_trigger * 255) + + self.gamepad.left_trigger(value=left_trigger_val) + self.gamepad.right_trigger(value=right_trigger_val) + + # Handle D-pad buttons directly in the button processing loop above + # The button_mapping already includes the D-pad buttons so they're handled automatically + + # Update the virtual controller + self.gamepad.update() + + # Small sleep to prevent high CPU usage + time.sleep(0.01) + + # Clean up SDL controller + sdl2.SDL_GameControllerClose(game_controller) + + except Exception as e: + print(f"Error in controller redirection loop: {e}") + finally: + # Clean up + self.redirection_active = False + if self.available: + self.reset() + + # === Specific Game Action Methods === + + def press_a(self) -> None: + """Press A button (typically jump or confirm)""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_A) + + def press_b(self) -> None: + """Press B button (typically cancel or secondary action)""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_B) + + def press_x(self) -> None: + """Press X button (typically special action)""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_X) + + def press_y(self) -> None: + """Press Y button (typically tertiary action)""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_Y) + + def press_left_shoulder(self) -> None: + """Press left shoulder button (LB)""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_LEFT_SHOULDER) + + def press_right_shoulder(self) -> None: + """Press right shoulder button (RB)""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_RIGHT_SHOULDER) + + def press_left_trigger(self) -> None: + """Press left trigger (LT)""" + self.press_trigger("LEFT") + + def press_right_trigger(self) -> None: + """Press right trigger (RT)""" + self.press_trigger("RIGHT") + + def press_dpad_up(self) -> None: + """Press D-pad up""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_UP) + + def press_dpad_down(self) -> None: + """Press D-pad down""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_DOWN) + + def press_dpad_left(self) -> None: + """Press D-pad left""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_LEFT) + + def press_dpad_right(self) -> None: + """Press D-pad right""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_RIGHT) + + def press_start(self) -> None: + """Press start button""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_START) + + def press_back(self) -> None: + """Press back button""" + self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_BACK) + + def move_left_stick_up(self) -> None: + """Move left stick up""" + self.move_joystick("LEFT", 0, 1.0) + + def move_left_stick_down(self) -> None: + """Move left stick down""" + self.move_joystick("LEFT", 0, -1.0) + + def move_left_stick_left(self) -> None: + """Move left stick left""" + self.move_joystick("LEFT", -1.0, 0) + + def move_left_stick_right(self) -> None: + """Move left stick right""" + self.move_joystick("LEFT", 1.0, 0) + + def move_right_stick_up(self) -> None: + """Move right stick up (usually camera up)""" + self.move_joystick("RIGHT", 0, 1.0) + + def move_right_stick_down(self) -> None: + """Move right stick down (usually camera down)""" + self.move_joystick("RIGHT", 0, -1.0) + + def move_right_stick_left(self) -> None: + """Move right stick left (usually camera left)""" + self.move_joystick("RIGHT", -1.0, 0) + + def move_right_stick_right(self) -> None: + """Move right stick right (usually camera right)""" + self.move_joystick("RIGHT", 1.0, 0) \ No newline at end of file diff --git a/src/game/input/keyboard.py b/src/game/input/keyboard.py new file mode 100644 index 0000000..af0b1b6 --- /dev/null +++ b/src/game/input/keyboard.py @@ -0,0 +1,119 @@ +""" +Keyboard and mouse input module +""" +import logging +import time +from typing import Dict, Any, Tuple, Optional, Union, List + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Try to import pynput for keyboard/mouse control +try: + from pynput.keyboard import Key, Controller as KeyboardController + from pynput.mouse import Button, Controller as MouseController + PYNPUT_AVAILABLE = True +except ImportError: + logger.warning("pynput library not found. Install with: pip install pynput") + PYNPUT_AVAILABLE = False + +class KeyboardMouseInput: + """Handles keyboard and mouse input for game control""" + + def __init__(self, simulate: bool = False): + """ + Initialize keyboard and mouse controllers + + Args: + simulate: If True, actions will be logged but not performed + (useful when pynput isn't available) + """ + self.simulate = simulate or not PYNPUT_AVAILABLE + + if not self.simulate: + self.keyboard = KeyboardController() + self.mouse = MouseController() + logger.info("Initialized keyboard and mouse controllers") + else: + logger.info("Input simulation mode active (no actual keypresses)") + + def press_key(self, key: Union[str, Key], duration: float = 0.1) -> None: + """ + Press and release a keyboard key + + Args: + key: The key to press (single character or pynput.keyboard.Key) + duration: How long to hold the key in seconds + """ + if self.simulate: + logger.info(f"[SIMULATED] Pressing key: {key} for {duration}s") + return + + try: + # Convert string to pynput.keyboard.Key if needed + if isinstance(key, str) and len(key) == 1: + key_to_press = key + else: + key_to_press = key + + # Press and hold the key + self.keyboard.press(key_to_press) + time.sleep(duration) + self.keyboard.release(key_to_press) + logger.debug(f"Pressed key: {key} for {duration}s") + except Exception as e: + logger.error(f"Error pressing key {key}: {e}") + + def mouse_move(self, dx: int, dy: int, relative: bool = True) -> None: + """ + Move the mouse cursor + + Args: + dx: x-axis movement + dy: y-axis movement + relative: If True, move relative to current position + """ + if self.simulate: + logger.info(f"[SIMULATED] Moving mouse: {dx}, {dy} (relative: {relative})") + return + + try: + if relative: + self.mouse.move(dx, dy) + else: + self.mouse.position = (dx, dy) + logger.debug(f"Mouse moved: {dx}, {dy}") + except Exception as e: + logger.error(f"Error moving mouse: {e}") + + def mouse_click(self, button: str = 'left', count: int = 1) -> None: + """ + Perform mouse clicks + + Args: + button: 'left', 'right', or 'middle' + count: Number of clicks to perform + """ + if self.simulate: + logger.info(f"[SIMULATED] Clicking mouse: {button} button, {count} times") + return + + try: + # Map string to pynput.mouse.Button + button_map = { + 'left': Button.left, + 'right': Button.right, + 'middle': Button.middle + } + + btn = button_map.get(button.lower(), Button.left) + + for _ in range(count): + self.mouse.click(btn) + if count > 1: + time.sleep(0.1) # Small delay between multiple clicks + + logger.debug(f"Mouse clicked: {button}, {count} times") + except Exception as e: + logger.error(f"Error clicking mouse: {e}") diff --git a/src/queue/__init__.py b/src/queue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/queue/consumer.py b/src/queue/consumer.py new file mode 100644 index 0000000..338d771 --- /dev/null +++ b/src/queue/consumer.py @@ -0,0 +1,105 @@ +""" +Queue consumer implementation for processing commands +""" +import logging +import threading +import time +from typing import Dict, Any, Optional, List, Callable + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger('queue-consumer') + +class QueueConsumer: + """Consumer for processing queued commands and messages""" + + def __init__(self, queue_server=None): + """ + Initialize the queue consumer + + Args: + queue_server: The queue server module to use + """ + if queue_server is None: + # Import here to avoid circular imports + from src.queue.server import huey, get_queue_stats + self.huey = huey + self.get_queue_stats = get_queue_stats + else: + self.huey = queue_server.huey + self.get_queue_stats = queue_server.get_queue_stats + + self.running = False + self.consumer_thread = None + self.handlers = {} + + logger.info("Queue consumer initialized") + + def register_handler(self, task_type: str, handler: Callable) -> None: + """ + Register a handler for a specific task type + + Args: + task_type: The type of task to handle (e.g., 'command', 'message') + handler: The handler function to call for this task type + """ + self.handlers[task_type] = handler + logger.info(f"Registered handler for {task_type}") + + def start(self) -> None: + """Start the consumer in a background thread""" + if self.running: + logger.warning("Consumer is already running") + return + + self.running = True + self.consumer_thread = threading.Thread(target=self._consumer_loop) + self.consumer_thread.daemon = True + self.consumer_thread.start() + logger.info("Queue consumer started") + + def stop(self) -> None: + """Stop the consumer""" + self.running = False + if self.consumer_thread: + self.consumer_thread.join(timeout=2.0) + logger.info("Queue consumer stopped") + + def _consumer_loop(self) -> None: + """Main consumer loop to process pending tasks""" + logger.info("Consumer loop started") + + try: + # Start the huey consumer + from huey.consumer import Consumer + consumer = Consumer(self.huey) + consumer_thread = threading.Thread(target=consumer.run) + consumer_thread.daemon = True + consumer_thread.start() + + # Monitor the queue + while self.running: + stats = self.get_queue_stats() + logger.debug(f"Queue stats: {stats['pending']} pending tasks") + time.sleep(5) # Check stats every 5 seconds + + except Exception as e: + logger.error(f"Error in consumer loop: {e}") + self.running = False + +def start_standalone_consumer() -> None: + """Start a standalone queue consumer""" + from src.queue.server import huey + + try: + from huey.consumer import Consumer + consumer = Consumer(huey) + logger.info("Starting standalone Huey consumer...") + consumer.run() + except KeyboardInterrupt: + logger.info("Consumer stopped by user") + except Exception as e: + logger.error(f"Error in consumer: {e}") + +if __name__ == "__main__": + start_standalone_consumer() diff --git a/src/queue/server.py b/src/queue/server.py new file mode 100644 index 0000000..56bb22b --- /dev/null +++ b/src/queue/server.py @@ -0,0 +1,248 @@ +from huey import SqliteHuey +import os +import logging +import socket +import time +from typing import Dict, Any, Optional, List, Callable, Union + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger('twitch-queue') + +# Create a Huey instance with SQLite storage +huey = SqliteHuey( + name='twitch_commands', + filename='twitch_queue.db', + results=True, # Store task results + immediate=False # Don't execute tasks immediately (use consumer) +) + +# Queue statistics +class QueueStats: + def __init__(self) -> None: + self.total_enqueued: int = 0 + self.total_processed: int = 0 + self.last_task_time: float = 0 + self.tasks_by_type: Dict[str, int] = {} + self.tasks_by_user: Dict[str, int] = {} + + def log_enqueue(self, task_type: str, username: str) -> None: + """Record stats when a task is enqueued""" + self.total_enqueued += 1 + self.last_task_time = time.time() + + if task_type not in self.tasks_by_type: + self.tasks_by_type[task_type] = 0 + self.tasks_by_type[task_type] += 1 + + if username not in self.tasks_by_user: + self.tasks_by_user[username] = 0 + self.tasks_by_user[username] += 1 + + def log_processed(self, task_type: str) -> None: + """Record stats when a task is processed""" + self.total_processed += 1 + + def get_stats(self) -> Dict[str, Any]: + """Get current queue statistics""" + return { + 'total_enqueued': self.total_enqueued, + 'total_processed': self.total_processed, + 'pending': self.total_enqueued - self.total_processed, + 'tasks_by_type': self.tasks_by_type, + 'tasks_by_user': self.tasks_by_user + } + +# Create a global stats object +queue_stats = QueueStats() + +# IRC Connection details +IRC_SERVER = 'irc.chat.twitch.tv' +IRC_PORT = 6667 + +# Task definitions +@huey.task() +def process_chat_command(username: str, command: str, args: List[str], + channel: str, oauth_token: str, bot_username: str, + command_registry: Dict[str, str]) -> Dict[str, Any]: + """ + Process a chat command and return the result + + This task will be executed by the Huey consumer + """ + logger.info(f"Processing command '{command}' from {username} with args: {args}") + + # Record stats + queue_stats.log_processed(f"command:{command}") + + # Check if the command is registered + if command not in command_registry: + logger.warning(f"Command '{command}' not found in registry") + return { + 'success': False, + 'error': f"Command '{command}' not found", + 'username': username, + 'command': command, + 'processed_at': time.time() + } + + try: + # For built-in commands we handle here + if command == "hello": + send_twitch_message(channel, oauth_token, bot_username, f"Hello, {username}!") + elif command == "dice": + import random + sides = 6 # Default to 6-sided dice + if args and args[0].isdigit(): + sides = int(args[0]) + result = random.randint(1, sides) + send_twitch_message(channel, oauth_token, bot_username, f"@{username} rolled a {result} (d{sides})") + elif command == "echo": + if args: + message = " ".join(args) + send_twitch_message(channel, oauth_token, bot_username, f"Echo: {message}") + else: + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, you didn't provide a message to echo!") + elif command == "8ball": + import random + responses = [ + "It is certain.", + "It is decidedly so.", + "Without a doubt.", + "Yes, definitely.", + "You may rely on it.", + "As I see it, yes.", + "Most likely.", + "Outlook good.", + "Yes.", + "Signs point to yes.", + "Reply hazy, try again.", + "Ask again later.", + "Better not tell you now.", + "Cannot predict now.", + "Concentrate and ask again.", + "Don't count on it.", + "My reply is no.", + "My sources say no.", + "Outlook not so good.", + "Very doubtful." + ] + send_twitch_message(channel, oauth_token, bot_username, f"@{username}, {random.choice(responses)}") + elif command == "qstats": + stats = get_queue_stats() + send_twitch_message(channel, oauth_token, bot_username, + f"Queue stats: {stats['total_processed']}/{stats['total_enqueued']} processed, {stats['pending']} pending") + else: + # For any custom commands, we'd need to implement them here + send_twitch_message(channel, oauth_token, bot_username, + f"Command '{command}' is registered but not implemented in the queue worker.") + + return { + 'success': True, + 'username': username, + 'command': command, + 'args': args, + 'processed_at': time.time() + } + except Exception as e: + logger.error(f"Error executing command {command}: {e}") + return { + 'success': False, + 'error': str(e), + 'username': username, + 'command': command, + 'args': args, + 'processed_at': time.time() + } + +@huey.task() +def process_chat_message(username: str, message: str) -> Dict[str, Any]: + """ + Process a regular chat message + + This task will be executed by the Huey consumer + """ + logger.info(f"Processing message from {username}: {message}") + + # Record stats + queue_stats.log_processed("message") + + # Return information about the processed message + return { + 'success': True, + 'username': username, + 'message': message, + 'processed_at': time.time() + } + +def send_twitch_message(channel: str, oauth_token: str, username: str, message: str) -> bool: + """ + Send a message to a Twitch channel + + This function creates a new IRC connection, sends the message, and then closes the connection. + In a production environment, you might want to maintain a persistent connection. + """ + try: + # Create a new socket + irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + irc.connect((IRC_SERVER, IRC_PORT)) + + # Send authentication + irc.send(f"PASS {oauth_token}\r\n".encode('utf-8')) + irc.send(f"NICK {username}\r\n".encode('utf-8')) + irc.send(f"JOIN #{channel}\r\n".encode('utf-8')) + + # Send the message + irc.send(f"PRIVMSG #{channel} :{message}\r\n".encode('utf-8')) + + # Close the connection + irc.close() + + logger.info(f"Sent message to #{channel}: {message}") + return True + except Exception as e: + logger.error(f"Error sending message to Twitch: {e}") + return False + +# Helper functions for enqueueing tasks +def enqueue_command(username: str, command: str, args: List[str], + channel: str = None, oauth_token: str = None, + bot_username: str = None, command_registry: Dict[str, str] = None) -> None: + """Add a command to the processing queue""" + logger.info(f"Enqueueing command '{command}' from {username} with args: {args}") + queue_stats.log_enqueue(f"command:{command}", username) + + if not all([channel, oauth_token, bot_username, command_registry]): + logger.error("Missing required parameters for enqueue_command") + return None + + return process_chat_command( + username, command, args, channel, oauth_token, bot_username, command_registry + ) + +def enqueue_message(username: str, message: str) -> None: + """Add a regular message to the processing queue""" + logger.info(f"Enqueueing message from {username}: {message}") + queue_stats.log_enqueue("message", username) + return process_chat_message(username, message) + +def get_queue_stats() -> Dict[str, Any]: + """Get current queue statistics""" + return queue_stats.get_stats() + +# Function to start the consumer directly (alternative to command line) +def start_consumer() -> None: + """ + Start the Huey consumer programmatically + + Note: It's generally better to use the command line to start the consumer: + python -m huey.bin.huey_consumer src.queue.server.huey + """ + from huey.consumer import Consumer + consumer = Consumer(huey) + consumer.start() + +if __name__ == "__main__": + logger.info("Queue server module initialized") + logger.info("To start the consumer, run:") + logger.info("python -m huey.bin.huey_consumer src.queue.server.huey") \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..9a6cb71 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,8 @@ +""" +Tests package initialization - adds the project root to the Python path +""" +import os +import sys + +# Add the parent directory to Python's path so 'src' module can be found +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..a0838f1 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,88 @@ +""" +Test script to verify the new Twitch API authentication +""" +import os +import sys +import logging +from dotenv import load_dotenv + +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger('auth-test') + +def test_twitch_auth(): + """Test the Twitch authentication with client credentials flow""" + # Check for required libraries + try: + import requests + except ImportError: + logger.error("The 'requests' library is required. Install it with: pip install requests") + return False + + try: + from src.core.auth import TwitchAuth + except ImportError: + logger.error("Failed to import TwitchAuth from src.core.auth") + return False + + # Get credentials from environment variables + client_id = os.environ.get("TWITCH_CLIENT_ID") + client_secret = os.environ.get("TWITCH_CLIENT_SECRET") + + if not client_id or not client_secret: + logger.error("TWITCH_CLIENT_ID and TWITCH_CLIENT_SECRET environment variables must be set") + logger.info("You can get these from https://dev.twitch.tv/console/apps") + return False + + # Try to get a token using the TwitchAuth class + logger.info("Attempting to get Twitch OAuth token...") + + try: + auth = TwitchAuth(client_id, client_secret) + oauth_token = auth.get_oauth_token() + + if not oauth_token or not auth.access_token: + logger.error("Failed to get token using TwitchAuth") + return False + + logger.info("Successfully obtained access token!") + if auth.token_expiry: + expiry_hours = (auth.token_expiry - time.time()) / 3600 + logger.info(f"Token expires in: {expiry_hours:.1f} hours") + + # Validate the token by making a simple API call + logger.info("Validating token with a simple API call...") + + headers = { + 'Client-ID': client_id, + 'Authorization': f"Bearer {auth.access_token}" + } + + # Get top games as a test + response = requests.get('https://api.twitch.tv/helix/games/top', headers=headers) + + if response.status_code == 200: + games = response.json().get('data', []) + logger.info(f"API call successful! Retrieved {len(games)} top games.") + if games: + logger.info(f"Top game: {games[0]['name']}") + return True + else: + logger.error(f"API call failed: {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"Error during authentication: {e}") + return False + +if __name__ == "__main__": + import time + print("Testing Twitch API authentication...") + if test_twitch_auth(): + print("Authentication test PASSED!") + sys.exit(0) + else: + print("Authentication test FAILED!") + sys.exit(1) \ No newline at end of file diff --git a/tests/test_controller.py b/tests/test_controller.py new file mode 100644 index 0000000..352c0a1 --- /dev/null +++ b/tests/test_controller.py @@ -0,0 +1,34 @@ +import traceback +import sys +import os + +print("Starting controller test...") + +try: + import sdl2 + print("SDL2 imported successfully") +except ImportError as e: + print(f"Failed to import SDL2: {e}") + sys.exit(1) + +try: + from src.game.input.gamepad import VirtualController + print("Controller support module imported successfully") +except Exception as e: + print(f"Failed to import controller support module: {e}") + traceback.print_exc() + sys.exit(1) + +try: + print("Creating VirtualController...") + controller = VirtualController() + print("VirtualController created successfully") + + print(f"Controller available: {controller.is_available()}") + print(f"Physical controller available: {controller.is_physical_controller_available()}") + + print("Test completed successfully!") +except Exception as e: + print(f"Error during testing: {e}") + traceback.print_exc() + sys.exit(1) \ No newline at end of file diff --git a/tests/test_controller_redirect.py b/tests/test_controller_redirect.py new file mode 100644 index 0000000..9a4c098 --- /dev/null +++ b/tests/test_controller_redirect.py @@ -0,0 +1,37 @@ +import time +import traceback +import sys + +print("Starting controller redirection test...") + +try: + from src.game.input.gamepad import VirtualController + print("Controller support module imported successfully") + + controller = VirtualController() + print(f"Controller available: {controller.is_available()}") + print(f"Physical controller available: {controller.is_physical_controller_available()}") + + if controller.is_physical_controller_available(): + print("Testing controller redirection...") + + # Start redirection + success = controller.start_controller_redirection() + print(f"Redirection started: {success}") + + if success: + # Let it run for a few seconds + print("Redirection active for 5 seconds. Try using your physical controller...") + for i in range(5, 0, -1): + print(f"{i}...") + time.sleep(1) + + # Stop redirection + controller.stop_controller_redirection() + print("Redirection stopped") + + print("Test completed successfully!") +except Exception as e: + print(f"Error during testing: {e}") + traceback.print_exc() + sys.exit(1) \ No newline at end of file diff --git a/tests/test_game_controls.py b/tests/test_game_controls.py new file mode 100644 index 0000000..a375434 --- /dev/null +++ b/tests/test_game_controls.py @@ -0,0 +1,50 @@ +import time +import traceback +import sys + +print("Starting game control test...") + +try: + from src.game.input.gamepad import VirtualController + print("Controller support module imported successfully") + + controller = VirtualController() + print(f"Controller available: {controller.is_available()}") + + if controller.is_available(): + print("Testing basic controller actions...") + + # Test some basic controller actions + print("Pressing A button") + controller.press_a() + time.sleep(0.5) + + print("Pressing B button") + controller.press_b() + time.sleep(0.5) + + print("Moving left stick up") + controller.move_left_stick_up() + time.sleep(0.5) + + print("Moving right stick right") + controller.move_right_stick_right() + time.sleep(0.5) + + print("Pressing left trigger") + controller.press_left_trigger() + time.sleep(0.5) + + print("Pressing D-pad up") + controller.press_dpad_up() + time.sleep(0.5) + + # Reset controller at the end + controller.reset() + print("Controller reset") + + print("Test completed successfully!") +except Exception as e: + print(f"Error during testing: {e}") + traceback.print_exc() + sys.exit(1) \ No newline at end of file diff --git a/tests/test_token_cache.py b/tests/test_token_cache.py new file mode 100644 index 0000000..bfff318 --- /dev/null +++ b/tests/test_token_cache.py @@ -0,0 +1,162 @@ +""" +Test script to verify token caching functionality +""" +import os +import sys +import logging +import time +import json +from dotenv import load_dotenv + +# Add the parent directory to Python's path so 'src' module can be found +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Load environment variables from .env file +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') +logger = logging.getLogger('token-cache-test') + +def test_token_cache(cache_file: str = "test_token_cache.json") -> bool: + """Test the token caching functionality""" + try: + import requests + except ImportError: + logger.error("The 'requests' library is required. Install it with: pip install requests") + return False + + # Import the bot class + from src.core.twitch import TwitchBot + from src.core.auth import TwitchAuth + + # Get credentials from environment variables + client_id = os.environ.get("TWITCH_CLIENT_ID") + client_secret = os.environ.get("TWITCH_CLIENT_SECRET") + username = os.environ.get("TWITCH_USERNAME") + channel = os.environ.get("TWITCH_CHANNEL") + + if not all([client_id, client_secret, username, channel]): + logger.error("Missing required environment variables") + logger.info("Please set TWITCH_CLIENT_ID, TWITCH_CLIENT_SECRET, TWITCH_USERNAME, and TWITCH_CHANNEL") + return False + + # Remove the cache file if it exists (to start fresh) + if os.path.exists(cache_file): + logger.info(f"Removing existing cache file: {cache_file}") + os.remove(cache_file) + + # Use mock tokens for testing + mock_access_token = "mock_access_token_for_testing" + mock_refresh_token = "mock_refresh_token_for_testing" + mock_expires_in = 14400 # 4 hours + + # Create a bot instance with the test cache file + logger.info("Creating bot instance...") + bot = TwitchBot( + username=username, + client_id=client_id, + client_secret=client_secret, + channel=channel, + access_token=mock_access_token, + refresh_token=mock_refresh_token, + token_cache_file=cache_file + ) + + # First call should use the provided token and save it + logger.info("First call to get_oauth_token (should use the provided token)...") + oauth_token1 = bot.get_oauth_token() + + # Verify the cache file was created + if not os.path.exists(cache_file): + logger.error(f"Cache file was not created: {cache_file}") + return False + + # Load and display cache file + with open(cache_file, 'r') as f: + cache_data = json.load(f) + + logger.info(f"Cache file created with {len(cache_data)} entries") + logger.info(f"Token expires at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(cache_data['expires_at']))}") + + # Save the token for comparison + token1 = bot.access_token + + # Create a new bot instance to test loading from cache + logger.info("Creating a second bot instance (should load token from cache)...") + bot2 = TwitchBot( + username=username, + client_id=client_id, + client_secret=client_secret, + channel=channel, + token_cache_file=cache_file + ) + + # Second call should use the cached token + logger.info("Second call to get_oauth_token (should use cached token)...") + oauth_token2 = bot2.get_oauth_token() + token2 = bot2.access_token + + # Compare tokens + if token1 == token2: + logger.info("SUCCESS: Both instances used the same token") + else: + logger.error("FAILURE: Different tokens were used") + return False + + # Simulate token expiration by modifying the cache file + logger.info("Simulating token expiration...") + with open(cache_file, 'r') as f: + cache_data = json.load(f) + + # Set expiry to now minus 10 minutes + cache_data['expires_at'] = time.time() - 600 + + with open(cache_file, 'w') as f: + json.dump(cache_data, f) + + # Create a third bot instance with the "expired" token + # Since we don't want to trigger a real OAuth flow in tests, + # we'll provide a new mock token to use + logger.info("Creating a third bot instance with expired token cache...") + mock_new_token = "mock_new_token_after_expiration" + mock_new_refresh = "mock_new_refresh_after_expiration" + + bot3 = TwitchBot( + username=username, + client_id=client_id, + client_secret=client_secret, + channel=channel, + access_token=mock_new_token, + refresh_token=mock_new_refresh, + token_cache_file=cache_file + ) + + # Third call should use the new token since the cached one is expired + logger.info("Third call to get_oauth_token (should use the new mock token due to expiration)...") + oauth_token3 = bot3.get_oauth_token() + token3 = bot3.access_token + + # Compare tokens - they should be different + if token1 != token3: + logger.info("SUCCESS: Token was refreshed after expiration") + else: + logger.error("FAILURE: Expired token was not refreshed") + return False + + # Clean up + if os.path.exists(cache_file): + os.remove(cache_file) + logger.info(f"Removed test cache file: {cache_file}") + + logger.info("All token cache tests passed!") + return True + +if __name__ == "__main__": + print("Testing token caching functionality...") + if test_token_cache(): + print("All token cache tests PASSED!") + sys.exit(0) + else: + print("Token cache tests FAILED!") + sys.exit(1) \ No newline at end of file