Initial working version

This commit is contained in:
Joey Yakimowich-Payne 2025-05-14 17:45:07 -06:00
commit ec1c5958ce
33 changed files with 4547 additions and 0 deletions

179
.gitignore vendored Normal file
View file

@ -0,0 +1,179 @@
.env
.venv
default_config.json
data
*.db
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

388
README.md Normal file
View file

@ -0,0 +1,388 @@
# Twitch Chat Interaction Bot
A simple Python bot that connects to Twitch chat and performs actions based on user commands.
## Features
- Easy connection to Twitch chat via IRC
- Command-based interaction system
- Simple API for adding custom commands
- Example commands included
- Queue system for asynchronous command processing
- Robust authentication handling with token management
- Advanced features (optional):
- Interactive polls and voting system
- Timer system for scheduled messages
- Points system for viewers
- Sound effects (Windows only)
- Game control (optional):
- Control games via Twitch chat
- Direct control mode for immediate actions
- Voting mode for democratic game control
- User stats tracking for game commands
- Streamer override mode for exclusive control
- Multiple input methods:
- Keyboard and mouse simulation
- Virtual Xbox controller emulation (Windows only)
## Requirements
- Python 3.6+
- Socket library (included in Python standard library)
- Requests library (`pip install requests`) for Twitch authentication
- For keyboard/mouse control: pynput library (`pip install pynput`)
- For Xbox controller emulation: vgamepad library (`pip install vgamepad`) - Windows only
- For queue functionality: huey library (`pip install huey`)
## Setup
1. Clone this repository:
```
git clone https://github.com/yourusername/stream-interact.git
cd stream-interact
```
2. Install dependencies:
```
pip install -r requirements.txt
```
3. Set up your Twitch API credentials:
- Go to [https://dev.twitch.tv/console/apps](https://dev.twitch.tv/console/apps)
- Create a new application (or use an existing one)
- Generate a client secret
- Note your client ID and client secret
- Run the setup script to generate authentication tokens:
```
python scripts/setup_twitch_auth.py
```
- For detailed authentication setup instructions, see [Authentication Setup Guide](docs/twitch_auth_setup.md)
4. Set your environment variables:
```
# On Windows
set TWITCH_USERNAME=your_bot_username
set TWITCH_CLIENT_ID=your_client_id
set TWITCH_CLIENT_SECRET=your_client_secret
set TWITCH_CHANNEL=your_channel_name
set TWITCH_ADMIN_USERS=bot_username,mod_username
# On Linux/Mac
export TWITCH_USERNAME=your_bot_username
export TWITCH_CLIENT_ID=your_client_id
export TWITCH_CLIENT_SECRET=your_client_secret
export TWITCH_CHANNEL=your_channel_name
export TWITCH_ADMIN_USERS=bot_username,mod_username
```
Alternatively, you can edit the `main.py` file and set these values directly.
5. Configuration file:
You can also create a `default_config.json` file in the root directory with the following structure:
```json
{
"admin_users": ["bot_username", "mod_username"],
"cooldown": 2.0,
"game_mode": "direct",
"input_type": "keyboard",
"use_queue": false,
"token_cache_file": "data/cache/token_cache.json",
"features": {
"advanced": false,
"game_control": false
}
}
```
6. Run the bot:
```
# Basic functionality
python main.py
# With advanced features
python main.py --advanced
# With game control (keyboard & mouse, direct mode)
python main.py --game-control
# With game control (Xbox controller, direct mode)
python main.py --game-control --input-type controller
# With game control (voting mode)
python main.py --game-control --game-mode vote
# With game control (Xbox controller, voting mode)
python main.py --game-control --game-mode vote --input-type controller
# With both advanced features and game control
python main.py --advanced --game-control
# Customize cooldown time (in seconds)
python main.py --game-control --cooldown 5.0
# With queue system for processing commands
python main.py --use-queue
# With additional admin users
python main.py --admin-users bot_username,mod_username
# With custom config file
python main.py --config my_config.json
```
## Admin Users
Admin users have the same permissions as the channel owner. This is useful if:
- You have a separate bot account that will be used on your channel
- You want to give moderators special privileges
- You're running the bot for another streamer
Admin users can:
- Stop the bot with !stop
- Check token status with !token
- Take control of the game with !takeover
- Start and end voting with !startvote and !endvote
- End polls with !endpoll
- Give points to users with !give
Admin users can be configured in three ways (in order of priority):
1. Command line: `--admin-users bot_username,mod_username`
2. Environment variable: `TWITCH_ADMIN_USERS=bot_username,mod_username`
3. Config file: `"admin_users": ["bot_username", "mod_username"]` in default_config.json
## Basic Commands
The bot comes with several example commands:
- `!hello` - The bot will say hello back to the user
- `!dice [sides]` - Roll a dice with the specified number of sides (default: 6)
- `!echo [message]` - The bot will echo back the provided message
- `!8ball` - Ask the Magic 8-Ball a question and get a random answer
## Advanced Features
When running with the `--advanced` flag, these additional commands become available:
### Voting System
- `!poll "Question" "Option1" "Option2" ["Option3"...]` - Start a new poll
- `!vote <number>` - Vote in the active poll
- `!endpoll` - End the active poll and show results
### Timer System
- `!timer <seconds> <message>` - Set a timer to send a message after specified seconds
### Points System
- `!points [@user]` - Check points for yourself or another user
- `!give <user> <points>` - Give points to a user (channel owner only)
### Sound Effects (Windows only)
- `!sound <sound_name>` - Play a sound effect
- Add .wav files to the `sounds` directory to use this feature
### Queue System
For high-volume chats or intensive commands, you can use the queue system:
```
# Start the bot with queue enabled
python main.py --use-queue
# Start a consumer in a separate terminal to process tasks
python -m huey.bin.huey_consumer src.queue.server.huey
```
For detailed documentation on the queue system, see [Queue System Guide](docs/queue.md)
## Game Control
The game control feature allows Twitch chat to control a game via keyboard/mouse inputs or a virtual Xbox controller.
### General Commands
- `!gamehelp` - Show available game control commands
- `!gamestats [@user]` - Check game control stats for a user
### Keyboard & Mouse Mode
When using keyboard & mouse mode (`--input-type keyboard`), the following commands are available:
- `!up` - Press W key (move up/forward)
- `!down` - Press S key (move down/backward)
- `!left` - Press A key (move left)
- `!right` - Press D key (move right)
- `!jump` - Press Space bar (jump)
- `!attack` - Press left mouse button (attack)
- `!interact` - Press E key (interact)
- `!inventory` - Press I key (inventory)
- `!skill1` - Press 1 key (skill 1)
- `!skill2` - Press 2 key (skill 2)
- `!skill3` - Press 3 key (skill 3)
- `!ultimate` - Press R key (ultimate ability)
### Xbox Controller Mode (Windows only)
When using controller mode (`--input-type controller`), the following commands are available:
- Movement:
- `!up` - Move left stick up (forward)
- `!down` - Move left stick down (backward)
- `!left` - Move left stick left
- `!right` - Move left stick right
- Camera:
- `!look_up` - Move right stick up
- `!look_down` - Move right stick down
- `!look_left` - Move right stick left
- `!look_right` - Move right stick right
- Action Buttons:
- `!jump` - Press A button
- `!action` - Press B button
- `!interact` - Press X button
- `!menu` - Press Y button
- Shoulder Buttons & Triggers:
- `!block` - Press left shoulder (LB)
- `!attack` - Press right shoulder (RB)
- `!aim` - Press left trigger (LT)
- `!shoot` - Press right trigger (RT)
- D-pad:
- `!item1` - Press D-pad up
- `!item2` - Press D-pad right
- `!item3` - Press D-pad down
- `!item4` - Press D-pad left
- Menu Buttons:
- `!start` - Press start button
- `!select` - Press select/back button
### Direct Mode
In direct mode, commands are executed immediately when received. Each user has a cooldown between commands to prevent spam.
### Vote Mode
In vote mode, commands are collected through voting:
- `!startvote [seconds]` - Start a vote session (streamer only)
- `!vote <command>` - Vote for a specific command
- `!endvote` - End the current vote session (streamer only)
After the voting period, the command with the most votes is executed.
### Streamer Override Mode
The streamer override feature allows the channel owner to take exclusive control of the game when needed:
- `!takeover` - Channel owner takes exclusive control (only their commands will work)
- `!givecontrol` - Return control to the chat (viewers can use commands again)
When streamer override is active:
- Only the channel owner's commands will be executed
- The channel owner bypasses all cooldowns that apply to viewers
- If there's an active vote, it will be cancelled
- Viewers cannot start new votes until the streamer gives control back
#### Controller Redirection
When using controller mode (`--input-type controller`), the streamer override feature includes physical controller redirection:
- When `!takeover` is used, the streamer's physical controller inputs are automatically redirected to the virtual controller
- This allows the streamer to directly control the game using their own controller during takeover
- The redirection ends automatically when `!givecontrol` is used
- Requires a physical controller to be connected to the PC running the bot
Requirements for controller redirection:
- pygame library (`pip install pygame`)
- A physical controller connected to the PC
- Windows operating system (due to vgamepad dependency)
This feature is particularly useful when the streamer needs to:
- Quickly navigate a challenging section of a game
- Demonstrate something specific to viewers
- Override the chat's decisions temporarily
### Customizing Game Controls
You can customize the key/button mappings in the code:
- For keyboard/mouse controls, edit the keyboard commands in `game_control.py`
- For Xbox controller, edit the controller commands in `game_control.py`
## Adding Custom Commands
You can easily add your own commands by creating a handler function and registering it with the bot:
```python
def my_custom_command(username, args, bot):
# Do something interesting
bot.send_message(f"@{username}, your command was processed!")
# In main.py
bot.register_command("mycommand", my_custom_command)
```
Users can then trigger this command by typing `!mycommand` in the Twitch chat.
## Advanced Usage
You can extend the TwitchBot class to add more functionality, such as:
- User permission levels
- Cooldowns for commands
- Custom events (subscriptions, follows, etc.)
- Integration with other APIs
### Queue System
For high-traffic channels or resource-intensive commands, the queue system allows you to process messages and commands asynchronously:
- Moves command execution to a background process
- Prevents the bot from getting overwhelmed during high activity
- Allows for distributed processing across multiple machines
- Provides statistics and monitoring capabilities
See [Queue System Documentation](docs/queue.md) for detailed setup and usage instructions.
## Authentication
The bot uses Twitch's Client Credentials Grant Flow for authentication:
1. Create a Twitch application at [dev.twitch.tv/console/apps](https://dev.twitch.tv/console/apps)
2. Get your Client ID and generate a Client Secret
3. Set these as environment variables:
```
TWITCH_CLIENT_ID=your_client_id
TWITCH_CLIENT_SECRET=your_client_secret
```
4. Run the authentication setup script:
```
python scripts/setup_twitch_auth.py
```
This will guide you through the authentication process and store your tokens.
For detailed authentication setup instructions, see [Authentication Setup Guide](docs/twitch_auth_setup.md)
### Token Management
The bot uses a token cache file to minimize API requests. When you run the bot:
1. It checks for a cached token in `twitch_token_cache.json`
2. If a valid cached token exists, it uses that token
3. If no valid token is found, it requests a new one from Twitch
4. The new token is saved to the cache file for future use
The token cache can be configured with these options:
```
# Use a custom token cache location
python main.py --token-cache /path/to/your/cache.json
# Check token status while the bot is running (channel owner only)
!token
```
## License
This project is open source and available under the MIT License.

13
config/.env.example Normal file
View file

@ -0,0 +1,13 @@
# Twitch API credentials
# You can obtain these from https://dev.twitch.tv/console/apps
TWITCH_USERNAME=your_bot_username
TWITCH_CLIENT_ID=your_client_id
TWITCH_CLIENT_SECRET=your_client_secret
TWITCH_CHANNEL=your_channel
TWITCH_ADMIN_USERS=user1,user2
# Optional settings
# QUEUE_ENABLED=false
# GAME_CONTROL_ENABLED=true
# INPUT_TYPE=keyboard
# LOG_LEVEL=INFO

View file

@ -0,0 +1,41 @@
{
"twitch": {
"username": "",
"channel": "",
"client_id": "",
"client_secret": "",
"token_cache_file": "data/cache/token_cache.json"
},
"queue": {
"enabled": false,
"db_file": "twitch_queue.db",
"auto_start_consumer": false
},
"game_control": {
"enabled": false,
"mode": "direct",
"cooldown": 2.0,
"input_type": "keyboard",
"commands": {
"up": "w",
"down": "s",
"left": "a",
"right": "d",
"jump": "space",
"attack": "j",
"interact": "e"
}
},
"features": {
"advanced": false,
"sounds_directory": "data/sounds",
"points_per_message": 1,
"points_per_minute_watched": 5,
"voting_duration": 30
},
"logging": {
"level": "INFO",
"file": "stream-interact.log",
"console": true
}
}

94
docs/queue.md Normal file
View file

@ -0,0 +1,94 @@
# Twitch Queue System
This README explains how to use the Huey-based queue system with your Twitch bot.
## Overview
The queue system allows your Twitch bot to process commands and messages asynchronously. This is useful for:
- Handling high-volume chat without performance issues
- Processing commands that take time to complete
- Distributing processing across multiple processes or machines
- Maintaining history of commands for analytics
## Components
The queue system consists of three main components:
1. **Queue Server**: Defined in `src/queue/server.py`, this establishes the Huey queue with SQLite backend
2. **Consumer**: Processes tasks from the queue (can run in a separate process)
3. **Producer**: Your Twitch bot that sends tasks to the queue
## Setup
1. Make sure you have Huey installed:
```bash
pip install huey
```
2. Enable the queue in your bot by using the `--use-queue` flag:
```bash
python main.py --use-queue
```
3. Start a consumer to process tasks (in a separate terminal):
```bash
# Option 1: Use the Huey consumer CLI
python -m huey.bin.huey_consumer src.queue.server.huey
# Option 2: Use our simple wrapper
python scripts/run_consumer.py
# Option 3: Use the bot with built-in consumer (not recommended for production)
python main.py --use-queue --start-consumer
```
## How It Works
1. When `--use-queue` is enabled, the bot sends all commands to the queue instead of processing them directly.
2. Each command becomes a task in the SQLite database.
3. The consumer process picks up tasks and executes them in the background.
4. Results can be stored and retrieved later.
## Commands
The queue system adds the following command to the bot:
- `!qstats` - Display statistics about the queue (processed tasks, pending tasks, etc.)
## Scaling
For high-volume applications:
1. Run multiple consumers to process tasks in parallel:
```bash
python -m huey.bin.huey_consumer src.queue.server.huey -w 4 # 4 worker processes
```
2. Consider using Redis instead of SQLite for better performance:
(Requires modifying `src/queue/server.py` to use RedisHuey instead of SqliteHuey)
## Architecture Diagram
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ │ │ │ │ │
│ Twitch Chat │────▶│ Twitch Bot │────▶│ Queue (SQLite) │
│ │ │ (Producer) │ │ │
└─────────────────┘ └─────────────────┘ └────────┬────────┘
┌─────────────────────┐
│ │
│ Huey Consumer(s) │
│ │
└─────────────────────┘
```
## Customization
You can customize the queue system by:
1. Adding more task types in `src/queue/server.py`
2. Creating custom handlers for processed tasks
3. Implementing result handling in the bot

118
docs/twitch_auth_setup.md Normal file
View file

@ -0,0 +1,118 @@
# Twitch Authentication Setup
This document explains how to set up Twitch authentication for the application.
## Prerequisites
1. You need a Twitch account
2. You need to register a Twitch application at [Twitch Developer Console](https://dev.twitch.tv/console/apps)
3. Required Python packages:
- `cryptography` - For generating SSL certificates
- `requests` - For making HTTP requests to the Twitch API
- `python-dotenv` - For managing environment variables
You can install these dependencies using the provided script:
```bash
python scripts/install_auth_deps.py
```
## Registering a Twitch Application
1. Go to [Twitch Developer Console](https://dev.twitch.tv/console/apps)
2. Click "Register Your Application"
3. Fill in the details:
- **Name**: Choose a name for your application (e.g., "MyStreamInteract")
- **OAuth Redirect URLs**: Set to `https://localhost:3000` (note: HTTPS is required)
- **Category**: Choose "Chat Bot"
4. Click "Create"
5. Once created, click "Manage" for your application
6. Make note of your **Client ID**
7. Generate a **Client Secret** and store it securely
## Setting Up Authentication
There are two ways to authenticate:
### Method 1: Using the Setup Script (Recommended)
We provide a setup script that walks you through the authentication process:
```bash
python scripts/setup_twitch_auth.py
```
The script will:
1. Ask for your Client ID and Client Secret
2. Generate a self-signed SSL certificate for HTTPS (required by Twitch)
3. Open a browser window for authentication
4. Store the tokens securely
5. Update your .env file with the credentials
> **Note:** When the browser opens, you will see a security warning about the self-signed certificate.
> This is normal for local development. You need to proceed past this warning to continue the authentication process.
> In Chrome, click "Advanced" and then "Proceed to localhost (unsafe)".
If you want to force the generation of a new token (ignoring any cached ones):
```bash
python scripts/setup_twitch_auth.py --force-new-token
```
### Method 2: Manual Setup
1. Create or edit a `.env` file in the root directory of the project
2. Add the following information:
```
TWITCH_CLIENT_ID=your_client_id_here
TWITCH_CLIENT_SECRET=your_client_secret_here
TWITCH_USERNAME=your_twitch_username
TWITCH_CHANNEL=channel_to_connect_to
```
3. Run the authentication script:
```bash
python scripts/setup_twitch_auth.py --manual
```
4. Follow the instructions to complete authentication
## Windows-Specific Information
On Windows, the application uses the Python `cryptography` library to generate self-signed certificates instead of relying on the OpenSSL command-line tool. This ensures better compatibility with Windows systems.
If you encounter any issues with certificate generation on Windows:
1. Make sure you have the latest version of pip: `python -m pip install --upgrade pip`
2. Try reinstalling the cryptography package: `pip install --force-reinstall cryptography`
3. Visual C++ Build tools might be required. If you get build errors, you may need to install the [Microsoft C++ Build Tools](https://visualstudio.microsoft.com/visual-cpp-build-tools/)
## Important Notes
- The application uses user access tokens with `chat:read` and `chat:edit` scopes
- IRC chat access requires a user access token (client credentials are not sufficient)
- Twitch requires HTTPS for OAuth redirects, so we use a self-signed certificate for local development
- The tokens are cached in `data/cache/token_cache.json` and refreshed automatically when needed
- Never share your Client Secret or access tokens
## Self-Signed Certificates
The application automatically generates a self-signed certificate for HTTPS, which is stored in `data/ssl/`. This certificate is used for handling OAuth redirects from Twitch.
If you encounter certificate issues, you can delete the files in `data/ssl/` to generate a new certificate:
- `data/ssl/localhost.crt`
- `data/ssl/localhost.key`
## Troubleshooting
If you encounter authentication issues:
1. Make sure your Client ID and Client Secret are correct
2. Check that your OAuth redirect URL in the Twitch Developer Console is exactly `https://localhost:3000` (with HTTPS)
3. When prompted with certificate warnings in your browser, make sure to proceed past these warnings
4. Verify that your Twitch account has the necessary permissions
5. Delete the token cache file (`data/cache/token_cache.json`) and try again
6. Check the application logs for detailed error messages
7. If the browser doesn't open automatically, you can copy and paste the provided URL

310
main.py Normal file
View file

@ -0,0 +1,310 @@
import os
import time
import random
import argparse
import json
from typing import List, Optional, Dict, Any, Tuple, Union, Literal
from src.core.twitch import TwitchBot, CommandCallback
from dotenv import load_dotenv
load_dotenv()
# Default config file path
DEFAULT_CONFIG_PATH = "default_config.json"
# Example command handlers
def cmd_hello(username: str, args: List[str], bot: TwitchBot) -> None:
"""Simple hello command that greets the user"""
bot.send_message(f"Hello, {username}!")
def cmd_dice(username: str, args: List[str], bot: TwitchBot) -> None:
"""Roll a dice with specified number of sides"""
sides: int = 6 # Default to 6-sided dice
if args and args[0].isdigit():
sides = int(args[0])
result: int = random.randint(1, sides)
bot.send_message(f"@{username} rolled a {result} (d{sides})")
def cmd_echo(username: str, args: List[str], bot: TwitchBot) -> None:
"""Echo back the user's message"""
if args:
message: str = " ".join(args)
bot.send_message(f"Echo: {message}")
else:
bot.send_message(f"@{username}, you didn't provide a message to echo!")
def cmd_magic8ball(username: str, args: List[str], bot: TwitchBot) -> None:
"""Magic 8-ball that gives random answers"""
responses: List[str] = [
"It is certain.",
"It is decidedly so.",
"Without a doubt.",
"Yes, definitely.",
"You may rely on it.",
"As I see it, yes.",
"Most likely.",
"Outlook good.",
"Yes.",
"Signs point to yes.",
"Reply hazy, try again.",
"Ask again later.",
"Better not tell you now.",
"Cannot predict now.",
"Concentrate and ask again.",
"Don't count on it.",
"My reply is no.",
"My sources say no.",
"Outlook not so good.",
"Very doubtful."
]
bot.send_message(f"@{username}, {random.choice(responses)}")
def cmd_queue_stats(username: str, args: List[str], bot: TwitchBot) -> None:
"""Show queue statistics"""
stats = bot.get_queue_stats()
bot.send_message(f"Queue stats: {stats['total_processed']}/{stats['total_enqueued']} processed, {stats['pending']} pending")
def cmd_token_status(username: str, args: List[str], bot: TwitchBot) -> None:
"""Show status of the Twitch API token (streamer only)"""
# Check if user is an admin
if not bot.is_admin(username):
bot.send_message(f"@{username}, only the channel owner or admins can check token status.")
return
if bot.access_token and bot.token_expiry:
# Calculate time remaining
time_remaining = bot.token_expiry - time.time()
if time_remaining > 0:
hours = int(time_remaining // 3600)
minutes = int((time_remaining % 3600) // 60)
bot.send_message(f"@{username}, token is valid for {hours}h {minutes}m.")
else:
bot.send_message(f"@{username}, token is expired and will be refreshed on next use.")
else:
bot.send_message(f"@{username}, no token available. It will be generated when needed.")
def cmd_stop(username: str, args: List[str], bot: TwitchBot) -> None:
"""Stop the bot (streamer only)"""
# Check if user is an admin
if not bot.is_admin(username):
bot.send_message(f"@{username}, only the channel owner or admins can stop the bot.")
return
bot.send_message(f"Bot is shutting down by request of @{username}.")
# Stop the bot
bot.stop()
def main() -> None:
# Parse command line arguments
parser: argparse.ArgumentParser = argparse.ArgumentParser(description='Twitch Chat Bot')
parser.add_argument('--advanced', action='store_true', help='Enable advanced features')
parser.add_argument('--game-control', action='store_true', help='Enable game control features')
parser.add_argument('--game-mode', choices=['direct', 'vote'], default='direct',
help='Game control mode: direct (immediate commands) or vote (timed voting)')
parser.add_argument('--cooldown', type=float, default=2.0,
help='Cooldown time between commands in seconds (for direct mode)')
parser.add_argument('--input-type', choices=['keyboard', 'controller'], default='keyboard',
help='Input type: keyboard/mouse or Xbox controller emulation')
parser.add_argument('--use-queue', action='store_true', help='Use Huey task queue for processing commands')
parser.add_argument('--start-consumer', action='store_true', help='Start Huey consumer within the bot process')
parser.add_argument('--token-cache', type=str, default="data/cache/token_cache.json",
help='Path to token cache file (default: data/cache/token_cache.json)')
parser.add_argument('--admin-users', type=str, help='Comma-separated list of additional admin users who have the same permissions as the channel owner')
parser.add_argument('--config', type=str, default=DEFAULT_CONFIG_PATH,
help=f'Path to configuration file (default: {DEFAULT_CONFIG_PATH})')
args: argparse.Namespace = parser.parse_args()
# Load configuration from file if it exists
config: Dict[str, Any] = {}
if os.path.exists(args.config):
try:
with open(args.config, 'r') as f:
config = json.load(f)
print(f"Loaded configuration from {args.config}")
except json.JSONDecodeError:
print(f"Error: {args.config} is not a valid JSON file")
except Exception as e:
print(f"Error loading config file: {e}")
# Get Twitch credentials from environment variables
username: Optional[str] = os.environ.get("TWITCH_USERNAME")
client_id: Optional[str] = os.environ.get("TWITCH_CLIENT_ID")
client_secret: Optional[str] = os.environ.get("TWITCH_CLIENT_SECRET")
channel: Optional[str] = os.environ.get("TWITCH_CHANNEL")
# Check for required credentials
if not all([username, client_id, client_secret, channel]):
print("Please set the following environment variables:")
print(" TWITCH_USERNAME - The bot's username")
print(" TWITCH_CLIENT_ID - Your app's registered client ID")
print(" TWITCH_CLIENT_SECRET - Your app's registered client secret")
print(" TWITCH_CHANNEL - The channel to join")
print("\nTo get client ID and secret:")
print("1. Go to https://dev.twitch.tv/console/apps")
print("2. Create a new application or use an existing one")
print("3. Get the client ID and generate a client secret")
# For testing, you can uncomment and set these values
# username = "your_bot_username"
# client_id = "your_client_id"
# client_secret = "your_client_secret"
# channel = "your_channel"
if not all([username, client_id, client_secret, channel]):
return
# Type assertion to satisfy type checker (we checked these are not None above)
username = username if username is not None else ""
client_id = client_id if client_id is not None else ""
client_secret = client_secret if client_secret is not None else ""
channel = channel if channel is not None else ""
# Parse admin users with priority: command line > env var > config file
admin_users: Optional[List[str]] = None
# 1. From command line
if args.admin_users:
admin_users = [user.strip() for user in args.admin_users.split(",")]
print(f"Additional admin users from command line: {', '.join(admin_users)}")
# 2. From environment variable
elif os.environ.get("TWITCH_ADMIN_USERS"):
env_admins = os.environ.get("TWITCH_ADMIN_USERS", "")
admin_users = [user.strip() for user in env_admins.split(",")]
print(f"Additional admin users from environment: {', '.join(admin_users)}")
# 3. From config file
elif "admin_users" in config and isinstance(config["admin_users"], list):
admin_users = config["admin_users"]
print(f"Additional admin users from config file: {', '.join(admin_users)}")
# Create bot instance
bot: TwitchBot = TwitchBot(
username=username,
client_id=client_id,
client_secret=client_secret,
channel=channel,
use_queue=args.use_queue,
token_cache_file=args.token_cache,
admin_users=admin_users
)
# Register basic commands
bot.register_command("hello", cmd_hello)
bot.register_command("dice", cmd_dice)
bot.register_command("echo", cmd_echo)
bot.register_command("8ball", cmd_magic8ball)
bot.register_command("token", cmd_token_status)
bot.register_command("stop", cmd_stop)
# Add queue-related commands if using the queue
if args.use_queue:
bot.register_command("qstats", cmd_queue_stats)
# Start the Huey consumer if requested
if args.start_consumer:
try:
from src.queue.server import start_consumer
import threading
consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.daemon = True
consumer_thread.start()
print("Started Huey consumer thread")
print("Note: For production, it's better to use: python -m huey.bin.huey_consumer src.queue.server.huey")
except ImportError as e:
print(f"Failed to start consumer: {e}")
# Enable advanced features if requested
if args.advanced:
try:
from src.features.examples import AdvancedExamples
print("Enabling advanced features...")
advanced: AdvancedExamples = AdvancedExamples(bot)
print("Advanced features enabled: polls, voting, timers, points system")
if os.name == 'nt':
print("Sound effects enabled (Windows only)")
# Create sounds directory if it doesn't exist
if not os.path.exists("sounds"):
os.makedirs("sounds")
print("Created 'sounds' directory. Add .wav files to use with !sound command")
except ImportError as e:
print(f"Failed to import advanced features: {e}")
# Enable game control if requested
if args.game_control:
try:
from src.game.controller import GameController, PYNPUT_AVAILABLE
# Check if controller input is requested
if args.input_type == 'controller':
try:
from src.game.input.gamepad import VirtualController, VGAMEPAD_AVAILABLE
if not VGAMEPAD_AVAILABLE:
print("WARNING: vgamepad library not found. Install it with: pip install vgamepad")
print("Falling back to keyboard/mouse input.")
args.input_type = 'keyboard'
except ImportError:
print("WARNING: controller_support.py not found or error importing it.")
print("Falling back to keyboard/mouse input.")
args.input_type = 'keyboard'
print(f"Enabling game control in {args.game_mode} mode with {args.input_type} input...")
if args.input_type == 'keyboard' and not PYNPUT_AVAILABLE:
print("WARNING: pynput library not found. Install it with: pip install pynput")
print("Keyboard/mouse control will be simulated but not actually perform any actions.")
print("Run 'pip install pynput' to enable actual keyboard/mouse control.")
game_controller: GameController = GameController(
bot,
mode=args.game_mode,
cooldown=args.cooldown,
input_type=args.input_type
)
if args.game_mode == "direct":
print(f"Commands will execute immediately with a {args.cooldown}s cooldown per user")
else:
print("Commands will be collected through voting")
print("Streamer can start a vote with !startvote and end with !endvote")
print("Users vote with !vote [command]")
print("Use !gamehelp to see available game commands")
print("Check user stats with !gamestats [username]")
except ImportError as e:
print(f"Failed to import game control features: {e}")
print("Make sure you have the required dependencies:")
print(" pip install pynput")
if args.input_type == 'controller':
print(" pip install vgamepad # For controller support")
# Start the bot
print(f"Starting bot for channel #{channel}")
print("Basic commands: !hello, !dice [sides], !echo [message], !8ball")
print("Admin commands: !token (check token status)")
if args.use_queue:
print("Queue mode enabled, commands will be processed by the queue system")
print("Use !qstats to see queue statistics")
try:
# Make sure requests library is installed
try:
import requests
except ImportError:
print("ERROR: The requests library is required for the new authentication method.")
print("Install it with: pip install requests")
return
# Now start the bot
bot.start(use_queue=args.use_queue)
except KeyboardInterrupt:
print("Bot stopped by user")
bot.stop()
if __name__ == "__main__":
main()

19
requirements.txt Normal file
View file

@ -0,0 +1,19 @@
# No external dependencies required for basic functionality
# The bot uses only Python standard library modules.
# Core dependencies
requests>=2.25.0 # For Twitch API authentication
huey>=2.4.0 # For background task queue
cryptography>=38.0.0 # For generating self-signed certificates
# Game control dependencies (optional)
pynput>=1.7.0; sys_platform != "darwin" or python_version < "3.0" # For keyboard/mouse control
vgamepad>=0.0.8; sys_platform == "win32" # For Xbox controller emulation (Windows only)
pysdl2>=0.9.14 # For controller detection
pysdl2-dll>=2.28.0 # SDL2 binaries
# Optional utilities
python-dotenv>=0.19.0 # For loading .env files
# Type checking (optional, for development)
mypy>=0.910

View file

@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Helper script to install authentication dependencies
"""
import sys
import subprocess
import platform
def main():
"""Install the dependencies required for Twitch authentication"""
print("Installing dependencies for Twitch authentication...")
# Required packages
packages = [
"requests",
"python-dotenv",
"cryptography"
]
# Use pip to install the packages
try:
subprocess.check_call([sys.executable, "-m", "pip", "install"] + packages)
print("\nSuccessfully installed authentication dependencies!")
print("You can now run: python scripts/setup_twitch_auth.py")
except subprocess.CalledProcessError:
print("\nError: Failed to install dependencies.")
print("Please try installing them manually:")
print(" pip install requests python-dotenv cryptography")
sys.exit(1)
# Show additional information for Windows users
if platform.system() == "Windows":
print("\nNote for Windows users:")
print("This script uses the 'cryptography' library for generating certificates,")
print("which has C dependencies that should be automatically installed.")
print("If you encounter any issues, please refer to the documentation:")
print("https://cryptography.io/en/latest/installation/")
if __name__ == "__main__":
main()

21
scripts/run_consumer.py Normal file
View file

@ -0,0 +1,21 @@
"""
Script to run the queue consumer
"""
import sys
import logging
from src.queue.server import start_consumer
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('consumer')
if __name__ == "__main__":
logger.info("Starting queue consumer...")
try:
start_consumer()
except KeyboardInterrupt:
logger.info("Consumer stopped by user")
sys.exit(0)
except Exception as e:
logger.error(f"Error in consumer: {e}")
sys.exit(1)

113
scripts/setup_env.py Normal file
View file

@ -0,0 +1,113 @@
"""
Environment setup helper script
"""
import os
import sys
import shutil
from typing import List, Dict, Any, Optional
import subprocess
import platform
def check_python_version() -> bool:
"""Check if Python version is compatible"""
if sys.version_info < (3, 7):
print(f"Error: Python 3.7 or higher is required. You are using {platform.python_version()}")
return False
print(f"Python version OK: {platform.python_version()}")
return True
def create_env_file() -> bool:
"""Create a .env file from .env.example if it doesn't exist"""
example_path = os.path.join("config", ".env.example")
env_path = ".env"
# Check if .env already exists
if os.path.exists(env_path):
print(f"Info: {env_path} file already exists")
return True
# Check if .env.example exists
if not os.path.exists(example_path):
print(f"Creating default .env file (no example found at {example_path})")
with open(env_path, "w") as f:
f.write("# Twitch API credentials\n")
f.write("TWITCH_USERNAME=your_bot_username\n")
f.write("TWITCH_CLIENT_ID=your_client_id\n")
f.write("TWITCH_CLIENT_SECRET=your_client_secret\n")
f.write("TWITCH_CHANNEL=your_channel\n")
print(f"Created default {env_path} file")
return True
# Copy .env.example to .env
try:
shutil.copy(example_path, env_path)
print(f"Created {env_path} file from {example_path}")
print(f"Please edit {env_path} to set your Twitch API credentials")
return True
except Exception as e:
print(f"Error creating .env file: {e}")
return False
def install_dependencies() -> bool:
"""Install Python dependencies from requirements.txt"""
req_path = "requirements.txt"
if not os.path.exists(req_path):
print(f"Error: {req_path} not found")
return False
try:
print(f"Installing dependencies from {req_path}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", req_path])
print("Dependencies installed successfully")
return True
except subprocess.CalledProcessError as e:
print(f"Error installing dependencies: {e}")
return False
def create_directories() -> bool:
"""Create necessary directories if they don't exist"""
dirs = [
"data/cache",
"data/sounds"
]
for directory in dirs:
if not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
print(f"Created directory: {directory}")
except Exception as e:
print(f"Error creating directory {directory}: {e}")
return False
return True
def setup_environment() -> bool:
"""
Run all setup steps
Returns True if all steps succeeded
"""
steps = [
check_python_version,
create_directories,
create_env_file,
install_dependencies
]
success = True
for step in steps:
if not step():
success = False
return success
if __name__ == "__main__":
print("Setting up environment for stream-interact...")
if setup_environment():
print("\nSetup completed successfully!")
print("\nTo run the bot, use: python main.py")
print("To see available options, use: python main.py --help")
else:
print("\nSetup completed with errors. Please check the messages above.")
sys.exit(1)

View file

@ -0,0 +1,253 @@
#!/usr/bin/env python3
"""
Setup script for Twitch authentication
This script helps users generate a Twitch user access token with proper IRC scopes.
"""
import os
import sys
import logging
import json
import argparse
import time
from dotenv import load_dotenv
# Load environment variables if available
load_dotenv()
# Add parent directory to the path so we can import our modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Import the auth module
from src.core.auth import TwitchAuth
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('twitch-auth-setup')
# Path to default config
DEFAULT_CONFIG_PATH = os.path.join("data", "default_config.json")
def load_default_config():
"""Load default configuration from JSON file"""
if os.path.exists(DEFAULT_CONFIG_PATH):
try:
with open(DEFAULT_CONFIG_PATH, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON in {DEFAULT_CONFIG_PATH}, ignoring")
return {}
def save_default_config(config_data):
"""Save configuration to default config JSON file"""
# Ensure directory exists
os.makedirs(os.path.dirname(DEFAULT_CONFIG_PATH), exist_ok=True)
# If file exists, load current data and update it
existing_data = {}
if os.path.exists(DEFAULT_CONFIG_PATH):
try:
with open(DEFAULT_CONFIG_PATH, 'r') as f:
existing_data = json.load(f)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON in {DEFAULT_CONFIG_PATH}, overwriting")
# Update with new data
existing_data.update(config_data)
# Save back to file
with open(DEFAULT_CONFIG_PATH, 'w') as f:
json.dump(existing_data, f, indent=2)
logger.info(f"Configuration saved to {DEFAULT_CONFIG_PATH}")
def setup_twitch_auth(client_id=None, client_secret=None, scopes=None, manual_mode=False, force_new_token=False, use_cached_token=False):
"""
Setup Twitch authentication by generating a user access token
Args:
client_id: Twitch client ID
client_secret: Twitch client secret
scopes: Space-separated list of scopes
manual_mode: Whether to use manual mode (no browser)
force_new_token: Force generation of a new token even if a cached one exists
use_cached_token: Whether to use a cached token if available (default: False)
Returns:
bool: True if successful, False otherwise
"""
try:
# Load default config
default_config = load_default_config()
# Priority of values:
# 1. Function arguments
# 2. Environment variables
# 3. Default config
# Use provided values, then env vars, then default config
client_id = client_id or os.environ.get('TWITCH_CLIENT_ID') or default_config.get('TWITCH_CLIENT_ID')
client_secret = client_secret or os.environ.get('TWITCH_CLIENT_SECRET') or default_config.get('TWITCH_CLIENT_SECRET')
if not client_id:
client_id = input("Enter your Twitch Client ID: ")
if not client_secret:
client_secret = input("Enter your Twitch Client Secret: ")
if not client_id or not client_secret:
logger.error("Client ID and Client Secret are required")
return False
# Default scopes for IRC chat - prioritize env var, then default config
default_scopes = "chat:read chat:edit"
scopes = scopes or os.environ.get('TWITCH_SCOPES') or default_config.get('TWITCH_SCOPES', default_scopes)
# Define redirect URI
redirect_uri = os.environ.get('TWITCH_REDIRECT_URI') or default_config.get('TWITCH_REDIRECT_URI', "https://localhost:3000")
print("\n===== Twitch Authentication Setup =====")
print(f"Client ID: {client_id}")
print(f"Redirect URI: {redirect_uri} (must match your Twitch app settings)")
print(f"Requested Scopes: {scopes}")
print("=====================================\n")
# Make sure the cache directory exists
cache_dir = os.path.join("data", "cache")
os.makedirs(cache_dir, exist_ok=True)
# Create the token cache file path
token_cache_file = os.path.join(cache_dir, "token_cache.json")
# By default, clear any cached tokens unless explicitly told to use them
if (not use_cached_token or force_new_token) and os.path.exists(token_cache_file):
logger.info("Clearing cached token to generate fresh credentials")
os.remove(token_cache_file)
# Create auth handler
auth = TwitchAuth(
client_id=client_id,
client_secret=client_secret,
token_cache_file=token_cache_file,
redirect_uri=redirect_uri,
scopes=scopes
)
# Check if a valid token is already available
has_valid_token = False
if use_cached_token and auth.access_token and auth.token_expiry and time.time() < (auth.token_expiry - 60):
has_valid_token = True
# Using cached token
if not force_new_token:
print("\nUsing cached token (valid and not expired).")
print("If you want to force a new token, run with --force-new-token")
try:
# Try to validate the token to make sure it's working
token_info = auth.validate_token()
print(f"Token belongs to user: {token_info.get('login', 'unknown')}")
expiry_hours = (auth.token_expiry - time.time()) / 3600
print(f"Token expires in {expiry_hours:.1f} hours")
# Skip browser flow and continue with cached token
oauth_token = f"oauth:{auth.access_token}"
except Exception as e:
logger.warning(f"Error validating cached token: {e}")
logger.warning("Will attempt to get a new token")
has_valid_token = False
# If no valid token, go through the auth flow
if not has_valid_token or force_new_token:
# Show security warning about certificates
if not manual_mode:
print("\n⚠️ IMPORTANT SECURITY NOTE ⚠️")
print("This application uses a self-signed certificate for HTTPS, which is required by Twitch.")
print("When your browser opens, you will see a security warning.")
print("This is expected for local development. Please proceed to the site anyway:")
print(" • In Chrome: Click 'Advanced' and then 'Proceed to localhost (unsafe)'")
print(" • In Firefox: Click 'Advanced', then 'Accept the Risk and Continue'")
print(" • In Edge: Click 'Details' and then 'Go on to the webpage'\n")
print("NOTE: If the browser doesn't open automatically, a URL will be provided for you to copy and paste.")
input("Press Enter to continue...")
# Start OAuth flow
logger.info("Starting OAuth authentication flow...")
oauth_token = auth.get_oauth_token(manual_auth=manual_mode)
# At this point, we should have a valid token
if auth.access_token:
# Get token info
try:
token_info = auth.validate_token()
print("\n===== Authentication Successful =====")
print(f"Access Token: {auth.access_token}")
if auth.refresh_token:
print(f"Refresh Token: {auth.refresh_token}")
print(f"Token Scopes: {token_info.get('scopes', [])}")
print(f"User Name: {token_info.get('login', 'unknown')}")
print("=====================================\n")
except Exception as e:
logger.error(f"Failed to validate token: {e}")
print("\n===== Authentication Status =====")
print(f"Access Token: {auth.access_token}")
if auth.refresh_token:
print(f"Refresh Token: {auth.refresh_token}")
print("Warning: Could not validate token")
print("============================\n")
# Store token info in default_config.json
if input("Would you like to save these credentials to default_config.json? (y/n): ").lower() == 'y':
config_data = {
"TWITCH_CLIENT_ID": client_id,
"TWITCH_CLIENT_SECRET": client_secret,
}
# Try to use the validated user info
if 'token_info' in locals():
username = token_info.get('login', '')
config_data["TWITCH_USERNAME"] = username
# Get channel name
default_channel = username if 'username' in locals() else os.environ.get('TWITCH_CHANNEL', '') or default_config.get('TWITCH_CHANNEL', '')
channel = input(f"Enter Twitch channel to join (default: {default_channel}): ") or default_channel
config_data["TWITCH_CHANNEL"] = channel
# Save access and refresh tokens
config_data["TWITCH_ACCESS_TOKEN"] = auth.access_token
if auth.refresh_token:
config_data["TWITCH_REFRESH_TOKEN"] = auth.refresh_token
# Save to default_config.json
save_default_config(config_data)
return True
else:
logger.error("Failed to get OAuth token")
return False
except Exception as e:
logger.error(f"Error during setup: {e}")
return False
def main():
"""Main function"""
parser = argparse.ArgumentParser(description="Setup Twitch authentication")
parser.add_argument("--client-id", help="Twitch Client ID")
parser.add_argument("--client-secret", help="Twitch Client Secret")
parser.add_argument("--scopes", help="Space-separated list of scopes")
parser.add_argument("--manual", action="store_true", help="Use manual mode (no browser)")
parser.add_argument("--force-new-token", action="store_true", help="Force generation of a new token even if a cached one exists")
parser.add_argument("--use-cached-token", action="store_true", help="Use cached token if available (default is to clear cached tokens)")
args = parser.parse_args()
if setup_twitch_auth(args.client_id, args.client_secret, args.scopes, args.manual, args.force_new_token, args.use_cached_token):
print("Setup completed successfully!")
sys.exit(0)
else:
print("Setup failed!")
sys.exit(1)
if __name__ == "__main__":
main()

5
src/__init__.py Normal file
View file

@ -0,0 +1,5 @@
import os
import sys
# Add the parent directory to Python's path so 'src' module can be found
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

0
src/core/__init__.py Normal file
View file

293
src/core/auth.py Normal file
View file

@ -0,0 +1,293 @@
"""
Authentication module for Twitch API
"""
import os
import time
import json
import logging
import requests
import webbrowser
from typing import Dict, Any, Optional, Tuple
# Import the auth server functionality
from src.core.auth_server import start_oauth_flow
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TwitchAuth:
"""Handles Twitch API authentication and token management"""
def __init__(self, client_id: str, client_secret: str = None, token_cache_file: str = "data/cache/token_cache.json",
redirect_uri: str = "https://localhost:3000", scopes: str = "chat:read chat:edit"):
self.client_id: str = client_id
self.client_secret: str = client_secret
self.token_cache_file: str = token_cache_file
self.redirect_uri: str = redirect_uri
self.scopes: str = scopes
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.token_expiry: Optional[float] = None
self._load_cached_token()
def _load_cached_token(self) -> None:
"""Load cached token from file if available and not expired"""
try:
if os.path.exists(self.token_cache_file):
with open(self.token_cache_file, 'r') as f:
cached_data = json.load(f)
# Check if we have all required fields
if all(k in cached_data for k in ['access_token', 'expires_at', 'client_id']):
# Only use the token if it was generated with the same client_id
if cached_data['client_id'] == self.client_id:
# Check if token is still valid (with a safety margin)
if cached_data['expires_at'] > time.time() + 60:
self.access_token = cached_data['access_token']
self.token_expiry = cached_data['expires_at']
# Load refresh token if available
if 'refresh_token' in cached_data:
self.refresh_token = cached_data['refresh_token']
logger.info(f"Loaded valid cached token, expires in {(self.token_expiry - time.time())/60:.1f} minutes")
else:
logger.info("Cached token is expired, will try to refresh")
if 'refresh_token' in cached_data:
self.refresh_token = cached_data['refresh_token']
self._refresh_token()
else:
logger.info("No refresh token available, will need to re-authenticate")
else:
logger.info("Cached token was generated with a different client_id, will request a new one")
except Exception as e:
logger.warning(f"Error loading cached token: {e}")
def _save_token_to_cache(self, token_data: Dict[str, Any]) -> None:
"""Save token data to cache file"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(self.token_cache_file), exist_ok=True)
cache_data = {
'access_token': token_data['access_token'],
'expires_at': time.time() + token_data['expires_in'] - 3600, # Expire 1 hour early to be safe
'token_type': token_data['token_type'],
'client_id': self.client_id,
'cached_at': time.time()
}
# Save refresh token if available
if 'refresh_token' in token_data:
cache_data['refresh_token'] = token_data['refresh_token']
with open(self.token_cache_file, 'w') as f:
json.dump(cache_data, f)
logger.info(f"Saved token to cache file: {self.token_cache_file}")
except Exception as e:
logger.warning(f"Error saving token to cache: {e}")
def _refresh_token(self) -> bool:
"""
Refresh the access token using the refresh token
Returns:
bool: True if refresh was successful, False otherwise
"""
if not self.refresh_token:
logger.warning("No refresh token available")
return False
if not self.client_id or not self.client_secret:
logger.warning("Client ID and Client Secret are required for token refresh")
return False
token_url = "https://id.twitch.tv/oauth2/token"
payload = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "refresh_token",
"refresh_token": self.refresh_token
}
try:
response = requests.post(token_url, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"] - 3600
# Update refresh token if provided
if "refresh_token" in token_data:
self.refresh_token = token_data["refresh_token"]
# Save updated tokens
self._save_token_to_cache(token_data)
logger.info(f"Successfully refreshed access token, valid for {token_data['expires_in']/3600:.1f} hours")
return True
except Exception as e:
logger.error(f"Failed to refresh token: {e}")
# Clear refresh token on failure to force a new auth flow
self.refresh_token = None
return False
def _handle_auth_code(self, auth_code: str) -> Tuple[bool, str]:
"""
Exchange authorization code for access and refresh tokens
Args:
auth_code: The authorization code from OAuth redirect
Returns:
Tuple[bool, str]: Success status and message
"""
if not self.client_id or not self.client_secret:
return False, "Client ID and Client Secret are required"
token_url = "https://id.twitch.tv/oauth2/token"
payload = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": auth_code,
"grant_type": "authorization_code",
"redirect_uri": self.redirect_uri
}
try:
response = requests.post(token_url, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
self.token_expiry = time.time() + token_data["expires_in"] - 3600
if "refresh_token" in token_data:
self.refresh_token = token_data["refresh_token"]
# Save token data
self._save_token_to_cache(token_data)
logger.info(f"Successfully obtained new tokens, valid for {token_data['expires_in']/3600:.1f} hours")
return True, "Authentication successful"
except Exception as e:
logger.error(f"Failed to exchange authorization code: {e}")
return False, f"Failed to exchange authorization code: {str(e)}"
def get_oauth_token(self, manual_auth: bool = False) -> str:
"""
Get OAuth token for Twitch authentication
If no valid token is available:
1. Try to refresh the token if a refresh token is available
2. If refresh fails or no refresh token, require user auth
Args:
manual_auth: If True, provide instructions for manual authentication
instead of automatic browser launch
Returns:
str: The OAuth token with 'oauth:' prefix
Raises:
ValueError: If authentication fails
"""
# Check if we have a valid token
if self.access_token and self.token_expiry and time.time() < (self.token_expiry - 60):
expiry_min = (self.token_expiry - time.time()) / 60
logger.debug(f"Using cached token, expires in {expiry_min:.1f} minutes")
return f"oauth:{self.access_token}"
# Try to refresh the token
if self.refresh_token and self._refresh_token():
return f"oauth:{self.access_token}"
# Need user authentication - use our auth server to handle the flow
logger.info("Starting OAuth authentication flow")
if manual_auth:
# For environments where browser launch isn't possible
auth_url = (
f"https://id.twitch.tv/oauth2/authorize"
f"?client_id={self.client_id}"
f"&redirect_uri={self.redirect_uri}"
f"&response_type=code"
f"&scope={self.scopes.replace(' ', '+')}"
)
logger.info(f"Please visit this URL to authenticate:")
logger.info(auth_url)
logger.info("After authenticating, you'll be redirected to a URL like:")
logger.info(f"{self.redirect_uri}?code=abcdef123456&scope=chat:read+chat:edit")
logger.info("Please enter the code parameter from that URL:")
auth_code = input("Enter code: ")
else:
# Use the auth server to handle the redirect
auth_code = start_oauth_flow(
client_id=self.client_id,
redirect_uri=self.redirect_uri,
scopes=self.scopes
)
if not auth_code:
raise ValueError("Failed to get authorization code")
# Exchange the auth code for tokens
success, message = self._handle_auth_code(auth_code)
if success:
return f"oauth:{self.access_token}"
else:
raise ValueError(f"Authentication failed: {message}")
def validate_token(self) -> Dict[str, Any]:
"""
Validate the current access token
Returns:
Dict: Token validation information
Raises:
ValueError: If validation fails
"""
if not self.access_token:
raise ValueError("No access token available")
validate_url = "https://id.twitch.tv/oauth2/validate"
headers = {
"Authorization": f"OAuth {self.access_token}"
}
try:
response = requests.get(validate_url, headers=headers)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Token validation failed: {e}")
raise ValueError(f"Token validation failed: {str(e)}")
def update_manually(self, access_token: str, refresh_token: str = None, expires_in: int = 14400) -> None:
"""
Manually update the tokens (useful for setting tokens from other sources)
Args:
access_token: The access token
refresh_token: The refresh token (optional)
expires_in: Token validity in seconds (default 4 hours)
"""
self.access_token = access_token
if refresh_token:
self.refresh_token = refresh_token
self.token_expiry = time.time() + expires_in - 3600 # 1 hour safety margin
# Save to cache
token_data = {
"access_token": access_token,
"expires_in": expires_in,
"token_type": "bearer"
}
if refresh_token:
token_data["refresh_token"] = refresh_token
self._save_token_to_cache(token_data)
logger.info(f"Manually updated access token, valid for {expires_in/3600:.1f} hours")

398
src/core/auth_server.py Normal file
View file

@ -0,0 +1,398 @@
"""
Simple HTTPS server to handle OAuth redirects from Twitch
"""
import http.server
import socketserver
import threading
import urllib.parse
import webbrowser
import logging
import ssl
import os
import time
import platform
import subprocess
from typing import Optional, Callable, Dict, Any
from pathlib import Path
import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AuthHandler(http.server.BaseHTTPRequestHandler):
"""HTTP request handler for OAuth redirects"""
# Class variables to store auth code and callback
auth_code: Optional[str] = None
auth_callback: Optional[Callable[[str], None]] = None
def do_GET(self):
"""Handle GET requests - extract auth code from URL"""
try:
# Parse the URL and query parameters
parsed_url = urllib.parse.urlparse(self.path)
query_params = urllib.parse.parse_qs(parsed_url.query)
if "code" in query_params:
# Extract the authorization code
AuthHandler.auth_code = query_params["code"][0]
logger.info("Authorization code received")
# Send a success response to the user
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Authentication Successful</title>
<style>
body {
font-family: Arial, sans-serif;
text-align: center;
padding: 50px;
background-color: #f0f0f0;
}
.container {
background-color: #fff;
border-radius: 10px;
padding: 30px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
max-width: 600px;
margin: 0 auto;
}
h1 {
color: #6441A4;
}
.success-icon {
font-size: 64px;
color: #28a745;
}
</style>
</head>
<body>
<div class="container">
<div class="success-icon">&check;</div>
<h1>Authentication Successful!</h1>
<p>You have successfully authenticated with Twitch.</p>
<p>You can now close this window and return to the application.</p>
</div>
</body>
</html>
"""
self.wfile.write(html_content.encode())
# Call the callback function with the auth code
if AuthHandler.auth_callback:
AuthHandler.auth_callback(AuthHandler.auth_code)
else:
# Handle error or missing code
self.send_response(400)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"<html><body><h1>Authentication Error</h1><p>No authorization code received.</p></body></html>")
logger.error("No authorization code in redirect")
except Exception as e:
# Handle any exceptions
logger.error(f"Error handling OAuth redirect: {e}")
self.send_response(500)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(f"Error: {str(e)}".encode())
def log_message(self, format, *args):
"""Override log_message to use our logger"""
logger.debug(f"AuthHandler: {format % args}")
def generate_self_signed_cert_with_cryptography(cert_path: str, key_path: str) -> bool:
"""
Generate a self-signed certificate using the cryptography library
Args:
cert_path: Path to save the certificate file
key_path: Path to save the key file
Returns:
bool: True if successful, False otherwise
"""
try:
# Import cryptography modules here to avoid requiring it until needed
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
# Ensure the directory exists
os.makedirs(os.path.dirname(cert_path), exist_ok=True)
# Generate a private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Write private key to file
with open(key_path, "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
# Create a self-signed certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "localhost"),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
# Valid for 1 year
datetime.datetime.utcnow() + datetime.timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([x509.DNSName("localhost")]),
critical=False,
).sign(private_key, hashes.SHA256())
# Write certificate to file
with open(cert_path, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
logger.info(f"Generated self-signed certificate using cryptography: {cert_path}")
return True
except ImportError:
logger.error("Failed to import cryptography library. Please install it: pip install cryptography")
return False
except Exception as e:
logger.error(f"Failed to generate self-signed certificate: {e}")
return False
def generate_self_signed_cert(cert_path: str, key_path: str) -> bool:
"""
Generate a self-signed certificate for local HTTPS development
This function first tries to use the cryptography library, and falls back
to OpenSSL command line if that fails.
Args:
cert_path: Path to save the certificate file
key_path: Path to save the key file
Returns:
bool: True if successful, False otherwise
"""
# Try using cryptography library first (pure Python solution)
if generate_self_signed_cert_with_cryptography(cert_path, key_path):
return True
# Fallback to OpenSSL command line
try:
# Check if OpenSSL is available
import subprocess
# Ensure the directory exists
os.makedirs(os.path.dirname(cert_path), exist_ok=True)
# Generate self-signed certificate valid for 1 year
cmd = [
'openssl', 'req', '-new', '-x509', '-days', '365', '-nodes',
'-out', cert_path,
'-keyout', key_path,
'-subj', '/CN=localhost'
]
subprocess.run(cmd, check=True)
logger.info(f"Generated self-signed certificate using OpenSSL: {cert_path}")
return True
except Exception as e:
logger.error(f"Failed to generate self-signed certificate: {e}")
return False
def start_auth_server(port: int = 3000, callback: Callable[[str], None] = None) -> threading.Thread:
"""
Start the HTTPS auth server in a separate thread
Args:
port: The port to listen on
callback: Function to call with the auth code when received
Returns:
Thread object for the server
"""
# Reset the auth code
AuthHandler.auth_code = None
# Set the callback
AuthHandler.auth_callback = callback
# Paths for the SSL certificate and key
data_dir = Path("data/ssl")
data_dir.mkdir(parents=True, exist_ok=True)
cert_path = str(data_dir / "localhost.crt")
key_path = str(data_dir / "localhost.key")
# Generate a self-signed certificate if it doesn't exist
if not (os.path.exists(cert_path) and os.path.exists(key_path)):
if not generate_self_signed_cert(cert_path, key_path):
logger.error("Failed to create SSL certificate for HTTPS server")
return None
# Create HTTPS server
server = socketserver.TCPServer(("", port), AuthHandler)
# Wrap the socket with SSL
server.socket = ssl.wrap_socket(
server.socket,
keyfile=key_path,
certfile=cert_path,
server_side=True
)
logger.info(f"Starting HTTPS auth server on port {port}")
logger.info(f"NOTE: Your browser will show a security warning about the self-signed certificate.")
logger.info(f"This is normal for local development. Please proceed to the site anyway.")
# Run server in a separate thread
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
return server_thread
def stop_auth_server(server_thread: threading.Thread) -> None:
"""
Stop the auth server
Args:
server_thread: The server thread to stop
"""
if server_thread and server_thread.is_alive():
# The thread is daemon, so it will be terminated when the main program exits
logger.info("Auth server will terminate when program exits")
def open_browser_with_fallback(url: str) -> bool:
"""
Open a browser with fallback methods if the standard approach fails
Args:
url: The URL to open
Returns:
bool: True if a browser was successfully opened, False otherwise
"""
try:
# Try the standard webbrowser module first
logger.info(f"Opening browser: {url}")
if webbrowser.open(url):
return True
# If that fails, try platform-specific methods
system = platform.system().lower()
if system == 'windows':
try:
os.startfile(url)
return True
except:
subprocess.Popen(['start', url], shell=True)
return True
elif system == 'darwin': # macOS
try:
subprocess.Popen(['open', url])
return True
except:
pass
elif system == 'linux':
try:
subprocess.Popen(['xdg-open', url])
return True
except:
try:
subprocess.Popen(['gnome-open', url])
return True
except:
try:
subprocess.Popen(['kde-open', url])
return True
except:
pass
logger.warning("Could not open browser automatically")
return False
except Exception as e:
logger.warning(f"Error opening browser: {e}")
return False
def start_oauth_flow(client_id: str, redirect_uri: str = "https://localhost:3000",
scopes: str = "chat:read chat:edit",
port: int = 3000) -> Optional[str]:
"""
Start the OAuth flow by opening the browser and waiting for the redirect
Args:
client_id: Twitch client ID
redirect_uri: The redirect URI (should use HTTPS)
scopes: Space-separated list of scopes to request
port: The port to listen on
Returns:
The authorization code or None if failed
"""
auth_code = [None] # Using list to allow modification in callback
auth_event = threading.Event()
def auth_callback(code: str):
auth_code[0] = code
auth_event.set()
# Start the auth server
server_thread = start_auth_server(port, auth_callback)
if not server_thread:
logger.error("Failed to start HTTPS server")
return None
# Construct the auth URL
auth_url = (
f"https://id.twitch.tv/oauth2/authorize"
f"?client_id={client_id}"
f"&redirect_uri={redirect_uri}"
f"&response_type=code"
f"&scope={scopes.replace(' ', '+')}"
)
# Try to open the browser with fallback methods
browser_opened = open_browser_with_fallback(auth_url)
if not browser_opened:
# If browser opening fails, provide the URL for manual navigation
print("\n⚠️ Could not open browser automatically ⚠️")
print("Please copy and paste the following URL into your browser:")
print(f"\n{auth_url}\n")
print("After authenticating, you'll be redirected to the local app.")
# Wait for the callback (with timeout)
auth_event.wait(timeout=300) # 5 minutes timeout
if not auth_code[0]:
logger.error("Authentication timed out or failed")
return auth_code[0]

237
src/core/twitch.py Normal file
View file

@ -0,0 +1,237 @@
import socket
import re
import logging
import threading
import requests
import time
import json
import os
from typing import Dict, Callable, List, Optional, Any, Tuple, Union, Match, Set
# Import the queue functionality
from src.queue.server import enqueue_command, enqueue_message, get_queue_stats
from src.core.auth import TwitchAuth
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Type alias for command callback function
CommandCallback = Callable[[str, List[str], 'TwitchBot'], None]
# Type alias for message data dictionary
MessageData = Dict[str, str]
class TwitchBot:
def __init__(self, username: str, client_id: str = None, client_secret: str = None,
access_token: str = None, refresh_token: str = None,
channel: str = None, use_queue: bool = False,
token_cache_file: str = "data/cache/token_cache.json",
admin_users: List[str] = None) -> None:
self.username: str = username.lower()
self.client_id: Optional[str] = client_id or os.environ.get("TWITCH_CLIENT_ID")
self.client_secret: Optional[str] = client_secret or os.environ.get("TWITCH_CLIENT_SECRET")
self.channel: str = channel.lower() if channel else ""
self.server: str = 'irc.chat.twitch.tv'
self.port: int = 6667
self.socket: socket.socket = socket.socket()
self.commands: Dict[str, CommandCallback] = {}
self.running: bool = False
self.last_command: str = ""
# Admin users who have the same privileges as the channel owner
self.admin_users: List[str] = [self.channel] # Channel owner is always an admin
if admin_users:
self.admin_users.extend([user.lower() for user in admin_users])
# Use access_token from args or environment
direct_access_token = access_token or os.environ.get("TWITCH_ACCESS_TOKEN")
direct_refresh_token = refresh_token or os.environ.get("TWITCH_REFRESH_TOKEN")
# Auth token handling
self.auth = TwitchAuth(self.client_id, self.client_secret, token_cache_file)
self.token_cache_file: str = token_cache_file
# If tokens provided directly, update the auth handler
if direct_access_token:
logger.info("Using provided access token")
self.auth.update_manually(
access_token=direct_access_token,
refresh_token=direct_refresh_token
)
# Queue integration
self.use_queue: bool = use_queue
self.oauth_token: Optional[str] = None
def is_admin(self, username: str) -> bool:
"""Check if a user has admin privileges"""
return username.lower() in self.admin_users
@property
def access_token(self) -> Optional[str]:
"""Get the current access token from the auth handler"""
return self.auth.access_token
@property
def token_expiry(self) -> Optional[float]:
"""Get the token expiry from the auth handler"""
return self.auth.token_expiry
def get_oauth_token(self) -> str:
"""Get an OAuth token for authenticating with Twitch"""
return self.auth.get_oauth_token()
def connect(self) -> bool:
"""Connect to Twitch IRC server"""
try:
# Get the OAuth token
oauth_token = self.get_oauth_token()
self.oauth_token = oauth_token # Store for queue use
self.socket.connect((self.server, self.port))
# Add a timeout so we can process keyboard interrupts
self.socket.settimeout(1.0)
self.socket.send(f"PASS {oauth_token}\n".encode('utf-8'))
self.socket.send(f"NICK {self.username}\n".encode('utf-8'))
self.socket.send(f"JOIN #{self.channel}\n".encode('utf-8'))
# Wait for confirmation
data = self.socket.recv(2048).decode('utf-8')
if "Welcome" in data or "You are in a maze of twisty passages" in data:
logger.info(f"Connected to {self.channel}'s chat")
return True
else:
logger.error(f"Failed to connect: {data}")
return False
except Exception as e:
logger.error(f"Failed to connect: {e}")
return False
def register_command(self, command: str, callback: CommandCallback) -> None:
"""Register a command with a callback function"""
self.commands[command.lower()] = callback
logger.info(f"Registered command: {command}")
def send_message(self, message: str) -> None:
"""Send a message to the Twitch chat"""
self.socket.send(f"PRIVMSG #{self.channel} :{message}\n".encode('utf-8'))
def parse_message(self, message: str) -> Optional[MessageData]:
"""Parse the IRC message and extract relevant information"""
# This regex parses Twitch IRC messages
pattern: str = r':(?P<username>[\w]+)!(?P=username)@(?P=username)\.tmi\.twitch\.tv PRIVMSG #(?P<channel>[\w]+) :(?P<content>.+)'
match: Optional[Match[str]] = re.match(pattern, message)
if match:
return {
'username': match.group('username'),
'channel': match.group('channel'),
'content': match.group('content')
}
return None
def process_commands(self, message_data: MessageData) -> None:
"""Process commands from chat messages"""
if not message_data:
return
content: str = message_data['content'].strip()
username: str = message_data['username']
# Check if we should use the queue
if self.use_queue:
# If this is a regular message, add it to the message queue
if not content.startswith('!'):
enqueue_message(username, content)
return
# Check if message is a command (starts with !)
if content.startswith('!'):
parts: List[str] = content.split()
command: str = parts[0][1:].lower() # Remove ! and convert to lowercase
args: List[str] = parts[1:] if len(parts) > 1 else []
# Store the last executed command
self.last_command = command
# Check if we should use the queue
if self.use_queue:
# Add the command to the queue and return
# We need to create a simple command registry to pass to the queue
command_registry = {cmd: cmd for cmd in self.commands.keys()}
# Make sure we have an OAuth token
if not self.oauth_token:
self.oauth_token = self.get_oauth_token()
enqueue_command(
username,
command,
args,
channel=self.channel,
oauth_token=self.oauth_token,
bot_username=self.username,
command_registry=command_registry
)
return
# Direct execution (no queue)
if command in self.commands:
try:
self.commands[command](username, args, self)
except Exception as e:
logger.error(f"Error executing command {command}: {e}")
def start(self, use_queue: bool = False) -> None:
"""Start the bot and listen for messages"""
# Update queue setting
self.use_queue = use_queue
logger.info(f"Starting bot with queue {'enabled' if use_queue else 'disabled'}")
if not self.connect():
logger.error("Failed to start bot: Connection error")
return
self.running = True
buffer: str = ""
while self.running:
try:
new_data: str = self.socket.recv(2048).decode('utf-8')
buffer += new_data
lines: List[str] = buffer.split("\r\n")
buffer = lines.pop()
for line in lines:
logger.debug(f"Received: {line}")
# Respond to PING to keep the connection alive
if line.startswith("PING"):
self.socket.send("PONG\r\n".encode('utf-8'))
logger.debug("Sent PONG")
continue
message_data: Optional[MessageData] = self.parse_message(line)
if message_data:
self.process_commands(message_data)
except socket.timeout:
# This is expected with our timeout setting - just continue the loop
continue
except KeyboardInterrupt:
logger.info("Keyboard interrupt received, shutting down...")
self.running = False
except Exception as e:
logger.error(f"Error in bot loop: {e}")
self.running = False
self.socket.close()
logger.info("Bot stopped")
def stop(self) -> None:
"""Stop the bot"""
self.running = False
def get_queue_stats(self) -> Dict[str, Any]:
"""Get current queue statistics"""
return get_queue_stats()

0
src/features/__init__.py Normal file
View file

270
src/features/examples.py Normal file
View file

@ -0,0 +1,270 @@
import time
import threading
import subprocess
import json
import os
from typing import Dict, List, Any, Optional, Set, Tuple, Union, cast
from src.core.twitch import TwitchBot
class AdvancedExamples:
"""
Examples of more advanced Twitch bot functionality:
- Voting system
- Timed messages
- External application control
- User points system
"""
def __init__(self, bot: TwitchBot) -> None:
self.bot: TwitchBot = bot
self.votes: Dict[str, int] = {}
self.current_poll: Optional[Dict[str, Any]] = None
self.poll_active: bool = False
self.timer_threads: Dict[str, threading.Timer] = {}
self.user_points: Dict[str, int] = self.load_points()
# Register commands
self.bot.register_command("poll", self.cmd_start_poll)
self.bot.register_command("vote", self.cmd_vote)
self.bot.register_command("endpoll", self.cmd_end_poll)
self.bot.register_command("timer", self.cmd_timer)
self.bot.register_command("points", self.cmd_points)
self.bot.register_command("give", self.cmd_give_points)
# For external application control (example)
if os.name == 'nt': # Windows
self.bot.register_command("sound", self.cmd_play_sound)
def load_points(self) -> Dict[str, int]:
"""Load user points from a JSON file"""
try:
with open('user_points.json', 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_points(self) -> None:
"""Save user points to a JSON file"""
with open('user_points.json', 'w') as f:
json.dump(self.user_points, f)
# ---- Voting System ----
def cmd_start_poll(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Start a new poll with options"""
if not args or len(args) < 3:
bot.send_message(f"@{username}, usage: !poll \"Question\" \"Option1\" \"Option2\" [\"Option3\"...]")
return
if self.poll_active:
bot.send_message(f"@{username}, a poll is already active. End it with !endpoll first.")
return
# Reset votes
self.votes = {}
# Extract question and options
question: str = args[0]
options: List[str] = args[1:]
self.current_poll = {
"question": question,
"options": options,
"started_by": username,
"timestamp": time.time()
}
self.poll_active = True
# Announce poll
bot.send_message(f"POLL STARTED: {question}")
for i, option in enumerate(options):
bot.send_message(f"Option {i+1}: {option}")
bot.send_message("Vote using !vote <number>")
def cmd_vote(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Cast a vote in the active poll"""
if not self.poll_active or not self.current_poll:
bot.send_message(f"@{username}, there is no active poll.")
return
if not args:
bot.send_message(f"@{username}, please specify an option number: !vote <number>")
return
try:
vote: int = int(args[0])
if vote < 1 or vote > len(self.current_poll["options"]):
raise ValueError
except ValueError:
bot.send_message(f"@{username}, please provide a valid option number between 1 and {len(self.current_poll['options'])}")
return
# Record the vote
self.votes[username] = vote
bot.send_message(f"@{username} voted for option {vote}: {self.current_poll['options'][vote-1]}")
def cmd_end_poll(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""End the active poll and show results"""
if not self.poll_active or not self.current_poll:
bot.send_message(f"@{username}, there is no active poll.")
return
if username != self.current_poll["started_by"] and not bot.is_admin(username):
bot.send_message(f"@{username}, only {self.current_poll['started_by']} or an admin can end this poll.")
return
# Count votes
results: Dict[int, int] = {}
for i in range(1, len(self.current_poll["options"]) + 1):
results[i] = 0
for vote in self.votes.values():
results[vote] += 1
# Find winner(s)
max_votes: int = max(results.values()) if results else 0
winners: List[int] = [opt for opt, count in results.items() if count == max_votes]
# Announce results
bot.send_message(f"POLL ENDED: {self.current_poll['question']}")
for i, option in enumerate(self.current_poll["options"], 1):
bot.send_message(f"Option {i}: {option} - {results.get(i, 0)} votes")
if max_votes > 0:
if len(winners) == 1:
winner_option: str = self.current_poll["options"][winners[0]-1]
bot.send_message(f"The winner is: {winner_option} with {max_votes} votes!")
else:
tied_options: List[str] = [self.current_poll["options"][w-1] for w in winners]
bot.send_message(f"It's a tie between: {', '.join(tied_options)} with {max_votes} votes each!")
else:
bot.send_message("No votes were cast.")
# Reset poll
self.poll_active = False
self.current_poll = None
self.votes = {}
# ---- Timer System ----
def timer_callback(self, timer_id: str, message: str) -> None:
"""Callback function for timers"""
self.bot.send_message(message)
del self.timer_threads[timer_id]
def cmd_timer(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Set a timer to send a message after X seconds"""
if len(args) < 2:
bot.send_message(f"@{username}, usage: !timer <seconds> <message>")
return
try:
seconds: int = int(args[0])
if seconds <= 0 or seconds > 3600: # Limit to 1 hour
raise ValueError
except ValueError:
bot.send_message(f"@{username}, please provide a valid time between 1-3600 seconds.")
return
message: str = f"⏰ TIMER ({username}): {' '.join(args[1:])}"
bot.send_message(f"@{username}, timer set for {seconds} seconds!")
# Create a unique ID for this timer
timer_id: str = f"{username}_{int(time.time())}"
# Create and start the timer thread
timer_thread: threading.Timer = threading.Timer(seconds, self.timer_callback, args=[timer_id, message])
timer_thread.daemon = True
timer_thread.start()
# Store the thread
self.timer_threads[timer_id] = timer_thread
# ---- Points System ----
def cmd_points(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Check user points"""
target_user: str = username
if args and args[0].startswith('@'):
target_user = args[0][1:].lower()
points: int = self.user_points.get(target_user, 0)
bot.send_message(f"@{username}, {target_user} has {points} points.")
def cmd_give_points(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Give points to a user (admin only)"""
if not bot.is_admin(username):
bot.send_message(f"@{username}, only the channel owner or admins can give points.")
return
if len(args) < 2:
bot.send_message(f"@{username}, usage: !give <user> <points>")
return
target_user: str = args[0]
if target_user.startswith('@'):
target_user = target_user[1:]
target_user = target_user.lower()
try:
points: int = int(args[1])
if points <= 0:
raise ValueError
except ValueError:
bot.send_message(f"@{username}, please provide a valid positive number of points.")
return
# Add points to user
self.user_points[target_user] = self.user_points.get(target_user, 0) + points
self.save_points()
bot.send_message(f"@{target_user}, you received {points} points from {username}! You now have {self.user_points[target_user]} points.")
# ---- External Control ----
def cmd_play_sound(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Play a sound effect (Windows only)"""
if not args:
bot.send_message(f"@{username}, usage: !sound <sound_name>")
return
sound_name: str = args[0].lower()
sound_files: Dict[str, str] = {
"applause": "applause.wav",
"drumroll": "drumroll.wav",
"fail": "fail.wav",
"victory": "victory.wav"
}
if sound_name not in sound_files:
bot.send_message(f"@{username}, available sounds: {', '.join(sound_files.keys())}")
return
sound_path: str = os.path.join("sounds", sound_files[sound_name])
if not os.path.exists(sound_path):
bot.send_message(f"@{username}, sound file not found: {sound_path}")
return
# Play the sound using Windows PowerShell
try:
ps_command: str = f'powershell -c (New-Object Media.SoundPlayer "{sound_path}").PlaySync();'
subprocess.Popen(ps_command, shell=True)
bot.send_message(f"@{username} played the {sound_name} sound!")
except Exception as e:
bot.send_message(f"Error playing sound: {e}")
# Example usage in main.py:
"""
from examples import AdvancedExamples
def main():
# ... set up bot ...
# Initialize advanced examples
advanced = AdvancedExamples(bot)
# ... start bot ...
"""

0
src/game/__init__.py Normal file
View file

483
src/game/controller.py Normal file
View file

@ -0,0 +1,483 @@
import time
import threading
import random
from collections import defaultdict, Counter
import json
from typing import Dict, Callable, List, Optional, Any, DefaultDict, Union, Literal, Set, Tuple, TypeVar, cast
from src.core.twitch import TwitchBot
try:
from pynput.keyboard import Key, Controller as KeyboardController
from pynput.mouse import Button, Controller as MouseController
PYNPUT_AVAILABLE = True
except ImportError:
PYNPUT_AVAILABLE = False
print("pynput library not found. Install it with: pip install pynput")
print("Without pynput, keyboard/mouse control simulation will be disabled.")
# Try to import the VirtualController
try:
from src.game.controller import VirtualController, VGAMEPAD_AVAILABLE
except ImportError:
VGAMEPAD_AVAILABLE = False
print("controller_support.py not found or error importing it.")
print("Virtual controller support will be disabled.")
# Type for command functions
CommandFunc = Callable[[], None]
# Type for stats
StatsDict = Dict[str, int]
# Type for user stats dictionary
UserStatsDict = Dict[str, StatsDict]
class GameController:
"""
Class for controlling games via Twitch chat commands
"""
def __init__(self, bot: TwitchBot, mode: Literal["direct", "vote"] = "direct",
cooldown: float = 2.0, input_type: Literal["keyboard", "controller"] = "keyboard") -> None:
self.bot: TwitchBot = bot
self.mode: Literal["direct", "vote"] = mode # "direct" or "vote"
self.cooldown: float = cooldown
self.input_type: Literal["keyboard", "controller"] = input_type # "keyboard" or "controller"
self.last_command_time: Dict[str, float] = {}
self.active_vote: bool = False
self.votes: Counter = Counter()
self.vote_timer: Optional[threading.Timer] = None
self.commands: Dict[str, CommandFunc] = {}
# Streamer override mode
self.streamer_override: bool = False
# Initialize controllers
self.keyboard: Optional[KeyboardController] = None
self.mouse: Optional[MouseController] = None
if PYNPUT_AVAILABLE:
self.keyboard = KeyboardController()
self.mouse = MouseController()
# Initialize virtual controller if requested and available
self.controller: Optional[VirtualController] = None
if input_type == "controller" and 'VirtualController' in globals():
self.controller = VirtualController()
if not self.controller.is_available():
print("Virtual controller initialization failed, falling back to keyboard controls")
self.input_type = "keyboard"
else:
print("Using virtual Xbox controller for game inputs")
# Command mappings - customize these for your game
self.init_commands()
# Register commands with the bot
self.register_commands()
# Load user stats
self.user_stats: DefaultDict[str, Dict[str, int]] = self.load_stats()
def init_commands(self) -> None:
"""Initialize command mappings based on input type"""
if self.input_type == "controller" and self.controller and self.controller.is_available():
# Controller-based commands
self.commands = {
# Movement with left stick
"up": self.controller.move_left_stick_up,
"down": self.controller.move_left_stick_down,
"left": self.controller.move_left_stick_left,
"right": self.controller.move_left_stick_right,
# Camera with right stick
"look_up": self.controller.move_right_stick_up,
"look_down": self.controller.move_right_stick_down,
"look_left": self.controller.move_right_stick_left,
"look_right": self.controller.move_right_stick_right,
# Action buttons
"jump": self.controller.press_a,
"action": self.controller.press_b,
"interact": self.controller.press_x,
"menu": self.controller.press_y,
# Shoulder buttons and triggers
"block": self.controller.press_left_shoulder,
"attack": self.controller.press_right_shoulder,
"aim": self.controller.press_left_trigger,
"shoot": self.controller.press_right_trigger,
# D-pad for items/quick slots
"item1": self.controller.press_dpad_up,
"item2": self.controller.press_dpad_right,
"item3": self.controller.press_dpad_down,
"item4": self.controller.press_dpad_left,
# Menu buttons
"start": self.controller.press_start,
"select": self.controller.press_back
}
else:
# Keyboard and mouse based commands
self.commands = {
"up": self.press_key_w,
"down": self.press_key_s,
"left": self.press_key_a,
"right": self.press_key_d,
"jump": self.press_key_space,
"attack": self.press_mouse_left,
"interact": self.press_key_e,
"inventory": self.press_key_i,
"skill1": self.press_key_1,
"skill2": self.press_key_2,
"skill3": self.press_key_3,
"ultimate": self.press_key_r
}
def register_commands(self) -> None:
"""Register game control commands with the Twitch bot"""
if self.mode == "direct":
# Direct mode: Each command executes immediately
for cmd_name in self.commands:
self.bot.register_command(cmd_name, self.handle_direct_command)
else:
# Vote mode: Commands are collected for voting
self.bot.register_command("vote", self.handle_vote)
self.bot.register_command("startvote", self.start_vote_session)
self.bot.register_command("endvote", self.end_vote_session)
# Register stats command
self.bot.register_command("gamestats", self.show_stats)
# Register help command to list available game controls
self.bot.register_command("gamehelp", self.show_game_help)
# Register streamer override commands
self.bot.register_command("takeover", self.enable_streamer_override)
self.bot.register_command("givecontrol", self.disable_streamer_override)
def enable_streamer_override(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Enable streamer override mode - only channel owner can control the game"""
if not bot.is_admin(username):
bot.send_message(f"@{username}, only the channel owner or admins can enable streamer override.")
return
if self.streamer_override:
bot.send_message(f"@{username}, you already have exclusive control.")
return
self.streamer_override = True
# If there's an active vote, end it
if self.active_vote and self.vote_timer:
self.vote_timer.cancel()
self.active_vote = False
self.votes.clear()
# If using a controller and controller redirection is available, start it
if (self.input_type == "controller" and self.controller and
hasattr(self.controller, 'is_physical_controller_available') and
self.controller.is_physical_controller_available()):
redirect_success = self.controller.start_controller_redirection()
if redirect_success:
bot.send_message("⚠️ STREAMER OVERRIDE ENABLED: Your physical controller is now controlling the game!")
else:
bot.send_message("⚠️ STREAMER OVERRIDE ENABLED: Only the streamer can control the game now!")
else:
bot.send_message("⚠️ STREAMER OVERRIDE ENABLED: Only the streamer can control the game now!")
def disable_streamer_override(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Disable streamer override mode - allow chat to control the game again"""
if not bot.is_admin(username):
bot.send_message(f"@{username}, only the channel owner or admins can disable streamer override.")
return
if not self.streamer_override:
bot.send_message(f"@{username}, streamer override is not active.")
return
# Stop controller redirection if it's active
if (self.input_type == "controller" and self.controller and
hasattr(self.controller, 'stop_controller_redirection')):
self.controller.stop_controller_redirection()
self.streamer_override = False
bot.send_message("🎮 CHAT CONTROL RESTORED: Chat can now control the game again!")
def show_game_help(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Show available game control commands"""
bot.send_message(f"@{username}, available game controls: {', '.join(['!' + cmd for cmd in self.commands.keys()])}")
if self.mode == "vote":
bot.send_message(f"Vote using !vote [command] during active voting sessions")
if bot.is_admin(username):
bot.send_message(f"Admin commands: !takeover (take exclusive control), !givecontrol (return control to chat)")
def load_stats(self) -> DefaultDict[str, Dict[str, int]]:
"""Load user stats from a JSON file"""
try:
with open('game_stats.json', 'r') as f:
stats_dict = json.load(f)
# Convert the loaded dictionary to a defaultdict
return defaultdict(lambda: {"commands": 0, "votes": 0}, stats_dict)
except (FileNotFoundError, json.JSONDecodeError):
return defaultdict(lambda: {"commands": 0, "votes": 0})
def save_stats(self) -> None:
"""Save user stats to a JSON file"""
# Convert defaultdict to regular dict for JSON serialization
stats_dict: Dict[str, Dict[str, int]] = {k: dict(v) for k, v in self.user_stats.items()}
with open('game_stats.json', 'w') as f:
json.dump(stats_dict, f)
def show_stats(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Show game control stats for a user"""
target_user: str = username
if args and args[0].startswith('@'):
target_user = args[0][1:].lower()
stats: Dict[str, int] = self.user_stats.get(target_user, {"commands": 0, "votes": 0})
bot.send_message(f"@{username}, {target_user} has issued {stats['commands']} direct commands and {stats['votes']} votes.")
# Command handling methods
def handle_direct_command(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Handle direct command execution with cooldown"""
command: str = bot.last_command
# Check for streamer override - only allow channel owner/admins
if self.streamer_override and not bot.is_admin(username):
# Silently ignore, don't message to prevent spam
return
# Check cooldown (streamer/admins bypass the cooldown)
current_time: float = time.time()
if not bot.is_admin(username) and username in self.last_command_time:
time_diff: float = current_time - self.last_command_time[username]
if time_diff < self.cooldown:
return # Silently ignore, don't message (prevents spam)
# Execute the command if it exists
if command in self.commands:
# Update stats
if username not in self.user_stats:
self.user_stats[username] = {"commands": 0, "votes": 0}
self.user_stats[username]["commands"] += 1
# Set cooldown (streamer/admins don't get cooldown)
if not bot.is_admin(username):
self.last_command_time[username] = current_time
# Execute the command
self.commands[command]()
# Confirm command execution
bot.send_message(f"@{username} used {command}!")
# Save stats periodically
if random.random() < 0.1: # 10% chance to save, to avoid too frequent saves
self.save_stats()
# Voting system methods
def start_vote_session(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Start a voting session for game control"""
if not bot.is_admin(username): # Only allow admins
bot.send_message(f"@{username}, only the channel owner or admins can start vote sessions.")
return
# Cannot start vote in streamer override mode
if self.streamer_override:
bot.send_message(f"@{username}, voting is disabled in streamer override mode. Use !givecontrol first.")
return
if self.active_vote:
bot.send_message(f"@{username}, a vote is already active. End it with !endvote first.")
return
# Get vote duration
duration: int = 30 # default: 30 seconds
if args and args[0].isdigit():
duration = min(max(5, int(args[0])), 120) # limit between 5-120 seconds
# Start vote
self.active_vote = True
self.votes = Counter()
# Announce available commands
bot.send_message(f"GAME CONTROL VOTE STARTED! Vote with !vote command (e.g., !vote up)")
bot.send_message(f"Available commands: {', '.join(self.commands.keys())}")
bot.send_message(f"Voting ends in {duration} seconds!")
# Start timer to end vote
self.vote_timer = threading.Timer(duration, self.execute_winning_vote)
self.vote_timer.daemon = True
self.vote_timer.start()
def handle_vote(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Handle a vote for a game control command"""
# Check for streamer override - don't allow voting
if self.streamer_override:
# Silently ignore in streamer override mode
return
if not self.active_vote:
bot.send_message(f"@{username}, there is no active vote. Ask the streamer to start one with !startvote")
return
if not args:
bot.send_message(f"@{username}, please specify a command to vote for")
return
vote: str = args[0].lower()
if vote not in self.commands:
bot.send_message(f"@{username}, invalid command. Available: {', '.join(self.commands.keys())}")
return
# Record vote
self.votes[vote] += 1
# Update stats
if username not in self.user_stats:
self.user_stats[username] = {"commands": 0, "votes": 0}
self.user_stats[username]["votes"] += 1
# Save stats periodically
if random.random() < 0.05: # 5% chance to save
self.save_stats()
def execute_winning_vote(self) -> None:
"""Execute the winning vote command"""
if not self.active_vote or not self.votes:
self.bot.send_message("Vote ended with no valid votes.")
self.active_vote = False
return
# Don't execute if streamer override was enabled during the vote
if self.streamer_override:
self.bot.send_message("Vote cancelled due to streamer override.")
self.active_vote = False
return
# Find the command with the most votes
winner: str = self.votes.most_common(1)[0][0]
vote_count: int = self.votes[winner]
# Execute the winning command
self.commands[winner]()
# Announce the result
self.bot.send_message(f"Vote ended! Executing {winner} with {vote_count} votes!")
# Reset vote
self.active_vote = False
def end_vote_session(self, username: str, args: List[str], bot: TwitchBot) -> None:
"""Manually end the current vote session"""
if not bot.is_admin(username): # Only allow admins
bot.send_message(f"@{username}, only the channel owner or admins can end vote sessions.")
return
if not self.active_vote:
bot.send_message(f"@{username}, there is no active vote.")
return
# Cancel timer if it's running
if self.vote_timer and self.vote_timer.is_alive():
self.vote_timer.cancel()
# Execute winning vote
self.execute_winning_vote()
# Keyboard and mouse simulation methods
def press_key(self, key: Union[str, Key], hold_time: float = 0.1) -> None:
"""Press and release a keyboard key"""
if not PYNPUT_AVAILABLE or not self.keyboard:
print(f"Would press key: {key}")
return
try:
self.keyboard.press(key)
time.sleep(hold_time)
self.keyboard.release(key)
except Exception as e:
print(f"Error pressing key {key}: {e}")
def press_mouse_button(self, button: Button, hold_time: float = 0.1) -> None:
"""Press and release a mouse button"""
if not PYNPUT_AVAILABLE or not self.mouse:
print(f"Would press mouse button: {button}")
return
try:
self.mouse.press(button)
time.sleep(hold_time)
self.mouse.release(button)
except Exception as e:
print(f"Error pressing mouse button {button}: {e}")
# Specific key methods mapped to game controls
def press_key_w(self) -> None:
"""Press W key (move up/forward)"""
self.press_key('w')
def press_key_s(self) -> None:
"""Press S key (move down/backward)"""
self.press_key('s')
def press_key_a(self) -> None:
"""Press A key (move left)"""
self.press_key('a')
def press_key_d(self) -> None:
"""Press D key (move right)"""
self.press_key('d')
def press_key_space(self) -> None:
"""Press Spacebar (jump)"""
self.press_key(Key.space)
def press_key_e(self) -> None:
"""Press E key (interact)"""
self.press_key('e')
def press_key_i(self) -> None:
"""Press I key (inventory)"""
self.press_key('i')
def press_key_1(self) -> None:
"""Press 1 key (skill 1)"""
self.press_key('1')
def press_key_2(self) -> None:
"""Press 2 key (skill 2)"""
self.press_key('2')
def press_key_3(self) -> None:
"""Press 3 key (skill 3)"""
self.press_key('3')
def press_key_r(self) -> None:
"""Press R key (ultimate ability)"""
self.press_key('r')
def press_mouse_left(self) -> None:
"""Press left mouse button (attack)"""
self.press_mouse_button(Button.left)
# Example usage:
"""
from game_control import GameController
def main():
# ... set up the bot ...
# Create a game controller in direct mode with keyboard input
game_controller = GameController(bot, mode="direct", cooldown=2.0, input_type="keyboard")
# OR create a game controller in vote mode with controller input
# game_controller = GameController(bot, mode="vote", input_type="controller")
# ... start the bot ...
"""

View file

421
src/game/input/gamepad.py Normal file
View file

@ -0,0 +1,421 @@
import time
import threading
from typing import Optional, Union, Any, Literal, Dict, Tuple, List, Set
import ctypes
# Try to import vgamepad for Xbox controller emulation
try:
import vgamepad as vg
VGAMEPAD_AVAILABLE = True
except ImportError:
VGAMEPAD_AVAILABLE = False
print("vgamepad library not found. Install it with: pip install vgamepad")
print("Virtual controller functionality will be disabled.")
# Try to import pysdl2 for physical controller detection
try:
import sdl2
import sdl2.ext
PYSDL2_AVAILABLE = True
except ImportError:
PYSDL2_AVAILABLE = False
print("pysdl2 library not found. Install it with: pip install pysdl2 pysdl2-dll")
print("Physical controller redirection will be disabled.")
# Define SDL2 constants if needed
SDL_CONTROLLER_BUTTON_MAX = 15
SDL_CONTROLLER_BUTTON_A = 0
SDL_CONTROLLER_BUTTON_B = 1
SDL_CONTROLLER_BUTTON_X = 2
SDL_CONTROLLER_BUTTON_Y = 3
SDL_CONTROLLER_BUTTON_BACK = 4
SDL_CONTROLLER_BUTTON_GUIDE = 5
SDL_CONTROLLER_BUTTON_START = 6
SDL_CONTROLLER_BUTTON_LEFTSTICK = 7
SDL_CONTROLLER_BUTTON_RIGHTSTICK = 8
SDL_CONTROLLER_BUTTON_LEFTSHOULDER = 9
SDL_CONTROLLER_BUTTON_RIGHTSHOULDER = 10
SDL_CONTROLLER_BUTTON_DPAD_UP = 11
SDL_CONTROLLER_BUTTON_DPAD_DOWN = 12
SDL_CONTROLLER_BUTTON_DPAD_LEFT = 13
SDL_CONTROLLER_BUTTON_DPAD_RIGHT = 14
SDL_CONTROLLER_AXIS_LEFTX = 0
SDL_CONTROLLER_AXIS_LEFTY = 1
SDL_CONTROLLER_AXIS_RIGHTX = 2
SDL_CONTROLLER_AXIS_RIGHTY = 3
SDL_CONTROLLER_AXIS_TRIGGERLEFT = 4
SDL_CONTROLLER_AXIS_TRIGGERRIGHT = 5
class VirtualController:
"""
Class for emulating an Xbox controller using vgamepad.
Can be used to extend the GameController class for controller-based games.
"""
def __init__(self) -> None:
self.available: bool = VGAMEPAD_AVAILABLE
self.gamepad: Optional[Any] = None
# Physical controller redirection
self.physical_controller_available: bool = False
self.redirection_active: bool = False
self.redirection_thread: Optional[threading.Thread] = None
self.stop_redirection: bool = False
self.physical_controller: Optional[Any] = None
if self.available:
try:
# Create a virtual Xbox 360 controller
self.gamepad = vg.VX360Gamepad()
print("Virtual Xbox 360 controller initialized successfully")
except Exception as e:
print(f"Failed to initialize virtual gamepad: {e}")
self.available = False
# Initialize SDL2 for physical controller if available
if PYSDL2_AVAILABLE:
try:
# Initialize SDL2
sdl2.SDL_Init(sdl2.SDL_INIT_JOYSTICK | sdl2.SDL_INIT_GAMECONTROLLER)
# Get number of joysticks/controllers
num_joysticks = sdl2.SDL_NumJoysticks()
if num_joysticks > 0:
self.physical_controller_available = True
print(f"Detected {num_joysticks} physical controller(s)")
print("Physical controller redirection is available")
else:
print("No physical controllers detected")
except Exception as e:
print(f"Failed to initialize SDL2 for controller detection: {e}")
def is_available(self) -> bool:
"""Check if the virtual controller is available"""
return self.available
def is_physical_controller_available(self) -> bool:
"""Check if a physical controller is available for redirection"""
return self.physical_controller_available
# === Basic Controller Actions ===
def press_button(self, button: Any, hold_time: float = 0.1) -> None:
"""Press and release a button on the controller"""
if not self.available:
print(f"Would press button: {button}")
return
try:
# Press button
self.gamepad.press_button(button)
self.gamepad.update()
# Hold for specified time
time.sleep(hold_time)
# Release button
self.gamepad.release_button(button)
self.gamepad.update()
except Exception as e:
print(f"Error pressing button {button}: {e}")
def press_trigger(self, trigger: Literal["LEFT", "RIGHT"], value: float = 1.0, hold_time: float = 0.1) -> None:
"""Press a trigger with a specific value (0.0-1.0)"""
if not self.available:
print(f"Would press trigger {trigger} with value {value}")
return
try:
if trigger == "LEFT":
self.gamepad.left_trigger(value=int(value * 255))
else: # "RIGHT"
self.gamepad.right_trigger(value=int(value * 255))
self.gamepad.update()
time.sleep(hold_time)
# Reset trigger
if trigger == "LEFT":
self.gamepad.left_trigger(value=0)
else:
self.gamepad.right_trigger(value=0)
self.gamepad.update()
except Exception as e:
print(f"Error with trigger {trigger}: {e}")
def move_joystick(self, stick: Literal["LEFT", "RIGHT"], x_value: float, y_value: float, hold_time: float = 0.1) -> None:
"""Move a joystick to a position"""
if not self.available:
print(f"Would move {stick} stick to x:{x_value}, y:{y_value}")
return
try:
# Convert from -1.0 to 1.0 range to the expected integer range
x: int = int(x_value * 32767)
y: int = int(y_value * 32767)
if stick == "LEFT":
self.gamepad.left_joystick(x_value=x, y_value=y)
else: # "RIGHT"
self.gamepad.right_joystick(x_value=x, y_value=y)
self.gamepad.update()
time.sleep(hold_time)
# Reset joystick
if stick == "LEFT":
self.gamepad.left_joystick(x_value=0, y_value=0)
else:
self.gamepad.right_joystick(x_value=0, y_value=0)
self.gamepad.update()
except Exception as e:
print(f"Error with {stick} joystick: {e}")
def reset(self) -> None:
"""Reset all controller inputs"""
if not self.available:
print("Would reset controller to neutral state")
return
try:
self.gamepad.reset()
self.gamepad.update()
except Exception as e:
print(f"Error resetting controller: {e}")
# === Controller Redirection System ===
def start_controller_redirection(self) -> bool:
"""Start redirecting physical controller input to virtual controller"""
if not self.available or not self.physical_controller_available or not PYSDL2_AVAILABLE:
print("Cannot start controller redirection: virtual or physical controller not available")
return False
if self.redirection_active:
print("Controller redirection already active")
return True
try:
# Reset flags
self.stop_redirection = False
self.redirection_active = True
# Start redirection in a separate thread
self.redirection_thread = threading.Thread(target=self._controller_redirection_loop)
self.redirection_thread.daemon = True
self.redirection_thread.start()
print("Controller redirection started: Physical controller now controlling virtual controller")
return True
except Exception as e:
print(f"Failed to start controller redirection: {e}")
self.redirection_active = False
return False
def stop_controller_redirection(self) -> None:
"""Stop redirecting physical controller input"""
if not self.redirection_active:
return
self.stop_redirection = True
if self.redirection_thread and self.redirection_thread.is_alive():
# Wait for thread to finish
self.redirection_thread.join(timeout=1.0)
self.redirection_active = False
self.redirection_thread = None
print("Controller redirection stopped")
# Reset virtual controller to neutral position
if self.available:
self.reset()
def _controller_redirection_loop(self) -> None:
"""Main loop for controller redirection"""
if not PYSDL2_AVAILABLE:
return
# Button mapping from SDL2 to vgamepad
button_mapping: Dict[int, Any] = {
SDL_CONTROLLER_BUTTON_A: vg.XUSB_BUTTON.XUSB_GAMEPAD_A, # A
SDL_CONTROLLER_BUTTON_B: vg.XUSB_BUTTON.XUSB_GAMEPAD_B, # B
SDL_CONTROLLER_BUTTON_X: vg.XUSB_BUTTON.XUSB_GAMEPAD_X, # X
SDL_CONTROLLER_BUTTON_Y: vg.XUSB_BUTTON.XUSB_GAMEPAD_Y, # Y
SDL_CONTROLLER_BUTTON_LEFTSHOULDER: vg.XUSB_BUTTON.XUSB_GAMEPAD_LEFT_SHOULDER, # LB
SDL_CONTROLLER_BUTTON_RIGHTSHOULDER: vg.XUSB_BUTTON.XUSB_GAMEPAD_RIGHT_SHOULDER, # RB
SDL_CONTROLLER_BUTTON_BACK: vg.XUSB_BUTTON.XUSB_GAMEPAD_BACK, # Back
SDL_CONTROLLER_BUTTON_START: vg.XUSB_BUTTON.XUSB_GAMEPAD_START, # Start
SDL_CONTROLLER_BUTTON_LEFTSTICK: vg.XUSB_BUTTON.XUSB_GAMEPAD_LEFT_THUMB, # Left stick press
SDL_CONTROLLER_BUTTON_RIGHTSTICK: vg.XUSB_BUTTON.XUSB_GAMEPAD_RIGHT_THUMB, # Right stick press
SDL_CONTROLLER_BUTTON_DPAD_UP: vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_UP, # D-pad Up
SDL_CONTROLLER_BUTTON_DPAD_DOWN: vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_DOWN, # D-pad Down
SDL_CONTROLLER_BUTTON_DPAD_LEFT: vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_LEFT, # D-pad Left
SDL_CONTROLLER_BUTTON_DPAD_RIGHT: vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_RIGHT, # D-pad Right
}
# Keep track of button states to detect changes
button_states: Dict[int, bool] = {}
try:
# Open the first controller
controller_index = 0
game_controller = sdl2.SDL_GameControllerOpen(controller_index)
if not game_controller:
print("Could not open game controller")
return
while not self.stop_redirection:
# Process SDL events
event = sdl2.SDL_Event()
while sdl2.SDL_PollEvent(ctypes.byref(event)):
pass # Just process events to keep SDL happy
# Process buttons
for btn_idx in range(SDL_CONTROLLER_BUTTON_MAX):
pressed = sdl2.SDL_GameControllerGetButton(game_controller, btn_idx) == 1
# Check if state changed
if btn_idx not in button_states or button_states[btn_idx] != pressed:
button_states[btn_idx] = pressed
if btn_idx in button_mapping:
if pressed:
self.gamepad.press_button(button_mapping[btn_idx])
else:
self.gamepad.release_button(button_mapping[btn_idx])
# Process axes (sticks)
# Left stick
left_x = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_LEFTX) / 32767.0
left_y = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_LEFTY) / 32767.0
self.gamepad.left_joystick(x_value=int(left_x * 32767), y_value=int(left_y * 32767))
# Right stick
right_x = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_RIGHTX) / 32767.0
right_y = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_RIGHTY) / 32767.0
self.gamepad.right_joystick(x_value=int(right_x * 32767), y_value=int(right_y * 32767))
# Process triggers
left_trigger = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_TRIGGERLEFT) / 32767.0
right_trigger = sdl2.SDL_GameControllerGetAxis(game_controller, SDL_CONTROLLER_AXIS_TRIGGERRIGHT) / 32767.0
# Convert to 0 to 255 range
left_trigger_val = int(left_trigger * 255)
right_trigger_val = int(right_trigger * 255)
self.gamepad.left_trigger(value=left_trigger_val)
self.gamepad.right_trigger(value=right_trigger_val)
# Handle D-pad buttons directly in the button processing loop above
# The button_mapping already includes the D-pad buttons so they're handled automatically
# Update the virtual controller
self.gamepad.update()
# Small sleep to prevent high CPU usage
time.sleep(0.01)
# Clean up SDL controller
sdl2.SDL_GameControllerClose(game_controller)
except Exception as e:
print(f"Error in controller redirection loop: {e}")
finally:
# Clean up
self.redirection_active = False
if self.available:
self.reset()
# === Specific Game Action Methods ===
def press_a(self) -> None:
"""Press A button (typically jump or confirm)"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_A)
def press_b(self) -> None:
"""Press B button (typically cancel or secondary action)"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_B)
def press_x(self) -> None:
"""Press X button (typically special action)"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_X)
def press_y(self) -> None:
"""Press Y button (typically tertiary action)"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_Y)
def press_left_shoulder(self) -> None:
"""Press left shoulder button (LB)"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_LEFT_SHOULDER)
def press_right_shoulder(self) -> None:
"""Press right shoulder button (RB)"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_RIGHT_SHOULDER)
def press_left_trigger(self) -> None:
"""Press left trigger (LT)"""
self.press_trigger("LEFT")
def press_right_trigger(self) -> None:
"""Press right trigger (RT)"""
self.press_trigger("RIGHT")
def press_dpad_up(self) -> None:
"""Press D-pad up"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_UP)
def press_dpad_down(self) -> None:
"""Press D-pad down"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_DOWN)
def press_dpad_left(self) -> None:
"""Press D-pad left"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_LEFT)
def press_dpad_right(self) -> None:
"""Press D-pad right"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_DPAD_RIGHT)
def press_start(self) -> None:
"""Press start button"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_START)
def press_back(self) -> None:
"""Press back button"""
self.press_button(vg.XUSB_BUTTON.XUSB_GAMEPAD_BACK)
def move_left_stick_up(self) -> None:
"""Move left stick up"""
self.move_joystick("LEFT", 0, 1.0)
def move_left_stick_down(self) -> None:
"""Move left stick down"""
self.move_joystick("LEFT", 0, -1.0)
def move_left_stick_left(self) -> None:
"""Move left stick left"""
self.move_joystick("LEFT", -1.0, 0)
def move_left_stick_right(self) -> None:
"""Move left stick right"""
self.move_joystick("LEFT", 1.0, 0)
def move_right_stick_up(self) -> None:
"""Move right stick up (usually camera up)"""
self.move_joystick("RIGHT", 0, 1.0)
def move_right_stick_down(self) -> None:
"""Move right stick down (usually camera down)"""
self.move_joystick("RIGHT", 0, -1.0)
def move_right_stick_left(self) -> None:
"""Move right stick left (usually camera left)"""
self.move_joystick("RIGHT", -1.0, 0)
def move_right_stick_right(self) -> None:
"""Move right stick right (usually camera right)"""
self.move_joystick("RIGHT", 1.0, 0)

119
src/game/input/keyboard.py Normal file
View file

@ -0,0 +1,119 @@
"""
Keyboard and mouse input module
"""
import logging
import time
from typing import Dict, Any, Tuple, Optional, Union, List
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Try to import pynput for keyboard/mouse control
try:
from pynput.keyboard import Key, Controller as KeyboardController
from pynput.mouse import Button, Controller as MouseController
PYNPUT_AVAILABLE = True
except ImportError:
logger.warning("pynput library not found. Install with: pip install pynput")
PYNPUT_AVAILABLE = False
class KeyboardMouseInput:
"""Handles keyboard and mouse input for game control"""
def __init__(self, simulate: bool = False):
"""
Initialize keyboard and mouse controllers
Args:
simulate: If True, actions will be logged but not performed
(useful when pynput isn't available)
"""
self.simulate = simulate or not PYNPUT_AVAILABLE
if not self.simulate:
self.keyboard = KeyboardController()
self.mouse = MouseController()
logger.info("Initialized keyboard and mouse controllers")
else:
logger.info("Input simulation mode active (no actual keypresses)")
def press_key(self, key: Union[str, Key], duration: float = 0.1) -> None:
"""
Press and release a keyboard key
Args:
key: The key to press (single character or pynput.keyboard.Key)
duration: How long to hold the key in seconds
"""
if self.simulate:
logger.info(f"[SIMULATED] Pressing key: {key} for {duration}s")
return
try:
# Convert string to pynput.keyboard.Key if needed
if isinstance(key, str) and len(key) == 1:
key_to_press = key
else:
key_to_press = key
# Press and hold the key
self.keyboard.press(key_to_press)
time.sleep(duration)
self.keyboard.release(key_to_press)
logger.debug(f"Pressed key: {key} for {duration}s")
except Exception as e:
logger.error(f"Error pressing key {key}: {e}")
def mouse_move(self, dx: int, dy: int, relative: bool = True) -> None:
"""
Move the mouse cursor
Args:
dx: x-axis movement
dy: y-axis movement
relative: If True, move relative to current position
"""
if self.simulate:
logger.info(f"[SIMULATED] Moving mouse: {dx}, {dy} (relative: {relative})")
return
try:
if relative:
self.mouse.move(dx, dy)
else:
self.mouse.position = (dx, dy)
logger.debug(f"Mouse moved: {dx}, {dy}")
except Exception as e:
logger.error(f"Error moving mouse: {e}")
def mouse_click(self, button: str = 'left', count: int = 1) -> None:
"""
Perform mouse clicks
Args:
button: 'left', 'right', or 'middle'
count: Number of clicks to perform
"""
if self.simulate:
logger.info(f"[SIMULATED] Clicking mouse: {button} button, {count} times")
return
try:
# Map string to pynput.mouse.Button
button_map = {
'left': Button.left,
'right': Button.right,
'middle': Button.middle
}
btn = button_map.get(button.lower(), Button.left)
for _ in range(count):
self.mouse.click(btn)
if count > 1:
time.sleep(0.1) # Small delay between multiple clicks
logger.debug(f"Mouse clicked: {button}, {count} times")
except Exception as e:
logger.error(f"Error clicking mouse: {e}")

0
src/queue/__init__.py Normal file
View file

105
src/queue/consumer.py Normal file
View file

@ -0,0 +1,105 @@
"""
Queue consumer implementation for processing commands
"""
import logging
import threading
import time
from typing import Dict, Any, Optional, List, Callable
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('queue-consumer')
class QueueConsumer:
"""Consumer for processing queued commands and messages"""
def __init__(self, queue_server=None):
"""
Initialize the queue consumer
Args:
queue_server: The queue server module to use
"""
if queue_server is None:
# Import here to avoid circular imports
from src.queue.server import huey, get_queue_stats
self.huey = huey
self.get_queue_stats = get_queue_stats
else:
self.huey = queue_server.huey
self.get_queue_stats = queue_server.get_queue_stats
self.running = False
self.consumer_thread = None
self.handlers = {}
logger.info("Queue consumer initialized")
def register_handler(self, task_type: str, handler: Callable) -> None:
"""
Register a handler for a specific task type
Args:
task_type: The type of task to handle (e.g., 'command', 'message')
handler: The handler function to call for this task type
"""
self.handlers[task_type] = handler
logger.info(f"Registered handler for {task_type}")
def start(self) -> None:
"""Start the consumer in a background thread"""
if self.running:
logger.warning("Consumer is already running")
return
self.running = True
self.consumer_thread = threading.Thread(target=self._consumer_loop)
self.consumer_thread.daemon = True
self.consumer_thread.start()
logger.info("Queue consumer started")
def stop(self) -> None:
"""Stop the consumer"""
self.running = False
if self.consumer_thread:
self.consumer_thread.join(timeout=2.0)
logger.info("Queue consumer stopped")
def _consumer_loop(self) -> None:
"""Main consumer loop to process pending tasks"""
logger.info("Consumer loop started")
try:
# Start the huey consumer
from huey.consumer import Consumer
consumer = Consumer(self.huey)
consumer_thread = threading.Thread(target=consumer.run)
consumer_thread.daemon = True
consumer_thread.start()
# Monitor the queue
while self.running:
stats = self.get_queue_stats()
logger.debug(f"Queue stats: {stats['pending']} pending tasks")
time.sleep(5) # Check stats every 5 seconds
except Exception as e:
logger.error(f"Error in consumer loop: {e}")
self.running = False
def start_standalone_consumer() -> None:
"""Start a standalone queue consumer"""
from src.queue.server import huey
try:
from huey.consumer import Consumer
consumer = Consumer(huey)
logger.info("Starting standalone Huey consumer...")
consumer.run()
except KeyboardInterrupt:
logger.info("Consumer stopped by user")
except Exception as e:
logger.error(f"Error in consumer: {e}")
if __name__ == "__main__":
start_standalone_consumer()

248
src/queue/server.py Normal file
View file

@ -0,0 +1,248 @@
from huey import SqliteHuey
import os
import logging
import socket
import time
from typing import Dict, Any, Optional, List, Callable, Union
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('twitch-queue')
# Create a Huey instance with SQLite storage
huey = SqliteHuey(
name='twitch_commands',
filename='twitch_queue.db',
results=True, # Store task results
immediate=False # Don't execute tasks immediately (use consumer)
)
# Queue statistics
class QueueStats:
def __init__(self) -> None:
self.total_enqueued: int = 0
self.total_processed: int = 0
self.last_task_time: float = 0
self.tasks_by_type: Dict[str, int] = {}
self.tasks_by_user: Dict[str, int] = {}
def log_enqueue(self, task_type: str, username: str) -> None:
"""Record stats when a task is enqueued"""
self.total_enqueued += 1
self.last_task_time = time.time()
if task_type not in self.tasks_by_type:
self.tasks_by_type[task_type] = 0
self.tasks_by_type[task_type] += 1
if username not in self.tasks_by_user:
self.tasks_by_user[username] = 0
self.tasks_by_user[username] += 1
def log_processed(self, task_type: str) -> None:
"""Record stats when a task is processed"""
self.total_processed += 1
def get_stats(self) -> Dict[str, Any]:
"""Get current queue statistics"""
return {
'total_enqueued': self.total_enqueued,
'total_processed': self.total_processed,
'pending': self.total_enqueued - self.total_processed,
'tasks_by_type': self.tasks_by_type,
'tasks_by_user': self.tasks_by_user
}
# Create a global stats object
queue_stats = QueueStats()
# IRC Connection details
IRC_SERVER = 'irc.chat.twitch.tv'
IRC_PORT = 6667
# Task definitions
@huey.task()
def process_chat_command(username: str, command: str, args: List[str],
channel: str, oauth_token: str, bot_username: str,
command_registry: Dict[str, str]) -> Dict[str, Any]:
"""
Process a chat command and return the result
This task will be executed by the Huey consumer
"""
logger.info(f"Processing command '{command}' from {username} with args: {args}")
# Record stats
queue_stats.log_processed(f"command:{command}")
# Check if the command is registered
if command not in command_registry:
logger.warning(f"Command '{command}' not found in registry")
return {
'success': False,
'error': f"Command '{command}' not found",
'username': username,
'command': command,
'processed_at': time.time()
}
try:
# For built-in commands we handle here
if command == "hello":
send_twitch_message(channel, oauth_token, bot_username, f"Hello, {username}!")
elif command == "dice":
import random
sides = 6 # Default to 6-sided dice
if args and args[0].isdigit():
sides = int(args[0])
result = random.randint(1, sides)
send_twitch_message(channel, oauth_token, bot_username, f"@{username} rolled a {result} (d{sides})")
elif command == "echo":
if args:
message = " ".join(args)
send_twitch_message(channel, oauth_token, bot_username, f"Echo: {message}")
else:
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, you didn't provide a message to echo!")
elif command == "8ball":
import random
responses = [
"It is certain.",
"It is decidedly so.",
"Without a doubt.",
"Yes, definitely.",
"You may rely on it.",
"As I see it, yes.",
"Most likely.",
"Outlook good.",
"Yes.",
"Signs point to yes.",
"Reply hazy, try again.",
"Ask again later.",
"Better not tell you now.",
"Cannot predict now.",
"Concentrate and ask again.",
"Don't count on it.",
"My reply is no.",
"My sources say no.",
"Outlook not so good.",
"Very doubtful."
]
send_twitch_message(channel, oauth_token, bot_username, f"@{username}, {random.choice(responses)}")
elif command == "qstats":
stats = get_queue_stats()
send_twitch_message(channel, oauth_token, bot_username,
f"Queue stats: {stats['total_processed']}/{stats['total_enqueued']} processed, {stats['pending']} pending")
else:
# For any custom commands, we'd need to implement them here
send_twitch_message(channel, oauth_token, bot_username,
f"Command '{command}' is registered but not implemented in the queue worker.")
return {
'success': True,
'username': username,
'command': command,
'args': args,
'processed_at': time.time()
}
except Exception as e:
logger.error(f"Error executing command {command}: {e}")
return {
'success': False,
'error': str(e),
'username': username,
'command': command,
'args': args,
'processed_at': time.time()
}
@huey.task()
def process_chat_message(username: str, message: str) -> Dict[str, Any]:
"""
Process a regular chat message
This task will be executed by the Huey consumer
"""
logger.info(f"Processing message from {username}: {message}")
# Record stats
queue_stats.log_processed("message")
# Return information about the processed message
return {
'success': True,
'username': username,
'message': message,
'processed_at': time.time()
}
def send_twitch_message(channel: str, oauth_token: str, username: str, message: str) -> bool:
"""
Send a message to a Twitch channel
This function creates a new IRC connection, sends the message, and then closes the connection.
In a production environment, you might want to maintain a persistent connection.
"""
try:
# Create a new socket
irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
irc.connect((IRC_SERVER, IRC_PORT))
# Send authentication
irc.send(f"PASS {oauth_token}\r\n".encode('utf-8'))
irc.send(f"NICK {username}\r\n".encode('utf-8'))
irc.send(f"JOIN #{channel}\r\n".encode('utf-8'))
# Send the message
irc.send(f"PRIVMSG #{channel} :{message}\r\n".encode('utf-8'))
# Close the connection
irc.close()
logger.info(f"Sent message to #{channel}: {message}")
return True
except Exception as e:
logger.error(f"Error sending message to Twitch: {e}")
return False
# Helper functions for enqueueing tasks
def enqueue_command(username: str, command: str, args: List[str],
channel: str = None, oauth_token: str = None,
bot_username: str = None, command_registry: Dict[str, str] = None) -> None:
"""Add a command to the processing queue"""
logger.info(f"Enqueueing command '{command}' from {username} with args: {args}")
queue_stats.log_enqueue(f"command:{command}", username)
if not all([channel, oauth_token, bot_username, command_registry]):
logger.error("Missing required parameters for enqueue_command")
return None
return process_chat_command(
username, command, args, channel, oauth_token, bot_username, command_registry
)
def enqueue_message(username: str, message: str) -> None:
"""Add a regular message to the processing queue"""
logger.info(f"Enqueueing message from {username}: {message}")
queue_stats.log_enqueue("message", username)
return process_chat_message(username, message)
def get_queue_stats() -> Dict[str, Any]:
"""Get current queue statistics"""
return queue_stats.get_stats()
# Function to start the consumer directly (alternative to command line)
def start_consumer() -> None:
"""
Start the Huey consumer programmatically
Note: It's generally better to use the command line to start the consumer:
python -m huey.bin.huey_consumer src.queue.server.huey
"""
from huey.consumer import Consumer
consumer = Consumer(huey)
consumer.start()
if __name__ == "__main__":
logger.info("Queue server module initialized")
logger.info("To start the consumer, run:")
logger.info("python -m huey.bin.huey_consumer src.queue.server.huey")

8
tests/__init__.py Normal file
View file

@ -0,0 +1,8 @@
"""
Tests package initialization - adds the project root to the Python path
"""
import os
import sys
# Add the parent directory to Python's path so 'src' module can be found
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

88
tests/test_auth.py Normal file
View file

@ -0,0 +1,88 @@
"""
Test script to verify the new Twitch API authentication
"""
import os
import sys
import logging
from dotenv import load_dotenv
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('auth-test')
def test_twitch_auth():
"""Test the Twitch authentication with client credentials flow"""
# Check for required libraries
try:
import requests
except ImportError:
logger.error("The 'requests' library is required. Install it with: pip install requests")
return False
try:
from src.core.auth import TwitchAuth
except ImportError:
logger.error("Failed to import TwitchAuth from src.core.auth")
return False
# Get credentials from environment variables
client_id = os.environ.get("TWITCH_CLIENT_ID")
client_secret = os.environ.get("TWITCH_CLIENT_SECRET")
if not client_id or not client_secret:
logger.error("TWITCH_CLIENT_ID and TWITCH_CLIENT_SECRET environment variables must be set")
logger.info("You can get these from https://dev.twitch.tv/console/apps")
return False
# Try to get a token using the TwitchAuth class
logger.info("Attempting to get Twitch OAuth token...")
try:
auth = TwitchAuth(client_id, client_secret)
oauth_token = auth.get_oauth_token()
if not oauth_token or not auth.access_token:
logger.error("Failed to get token using TwitchAuth")
return False
logger.info("Successfully obtained access token!")
if auth.token_expiry:
expiry_hours = (auth.token_expiry - time.time()) / 3600
logger.info(f"Token expires in: {expiry_hours:.1f} hours")
# Validate the token by making a simple API call
logger.info("Validating token with a simple API call...")
headers = {
'Client-ID': client_id,
'Authorization': f"Bearer {auth.access_token}"
}
# Get top games as a test
response = requests.get('https://api.twitch.tv/helix/games/top', headers=headers)
if response.status_code == 200:
games = response.json().get('data', [])
logger.info(f"API call successful! Retrieved {len(games)} top games.")
if games:
logger.info(f"Top game: {games[0]['name']}")
return True
else:
logger.error(f"API call failed: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Error during authentication: {e}")
return False
if __name__ == "__main__":
import time
print("Testing Twitch API authentication...")
if test_twitch_auth():
print("Authentication test PASSED!")
sys.exit(0)
else:
print("Authentication test FAILED!")
sys.exit(1)

34
tests/test_controller.py Normal file
View file

@ -0,0 +1,34 @@
import traceback
import sys
import os
print("Starting controller test...")
try:
import sdl2
print("SDL2 imported successfully")
except ImportError as e:
print(f"Failed to import SDL2: {e}")
sys.exit(1)
try:
from src.game.input.gamepad import VirtualController
print("Controller support module imported successfully")
except Exception as e:
print(f"Failed to import controller support module: {e}")
traceback.print_exc()
sys.exit(1)
try:
print("Creating VirtualController...")
controller = VirtualController()
print("VirtualController created successfully")
print(f"Controller available: {controller.is_available()}")
print(f"Physical controller available: {controller.is_physical_controller_available()}")
print("Test completed successfully!")
except Exception as e:
print(f"Error during testing: {e}")
traceback.print_exc()
sys.exit(1)

View file

@ -0,0 +1,37 @@
import time
import traceback
import sys
print("Starting controller redirection test...")
try:
from src.game.input.gamepad import VirtualController
print("Controller support module imported successfully")
controller = VirtualController()
print(f"Controller available: {controller.is_available()}")
print(f"Physical controller available: {controller.is_physical_controller_available()}")
if controller.is_physical_controller_available():
print("Testing controller redirection...")
# Start redirection
success = controller.start_controller_redirection()
print(f"Redirection started: {success}")
if success:
# Let it run for a few seconds
print("Redirection active for 5 seconds. Try using your physical controller...")
for i in range(5, 0, -1):
print(f"{i}...")
time.sleep(1)
# Stop redirection
controller.stop_controller_redirection()
print("Redirection stopped")
print("Test completed successfully!")
except Exception as e:
print(f"Error during testing: {e}")
traceback.print_exc()
sys.exit(1)

View file

@ -0,0 +1,50 @@
import time
import traceback
import sys
print("Starting game control test...")
try:
from src.game.input.gamepad import VirtualController
print("Controller support module imported successfully")
controller = VirtualController()
print(f"Controller available: {controller.is_available()}")
if controller.is_available():
print("Testing basic controller actions...")
# Test some basic controller actions
print("Pressing A button")
controller.press_a()
time.sleep(0.5)
print("Pressing B button")
controller.press_b()
time.sleep(0.5)
print("Moving left stick up")
controller.move_left_stick_up()
time.sleep(0.5)
print("Moving right stick right")
controller.move_right_stick_right()
time.sleep(0.5)
print("Pressing left trigger")
controller.press_left_trigger()
time.sleep(0.5)
print("Pressing D-pad up")
controller.press_dpad_up()
time.sleep(0.5)
# Reset controller at the end
controller.reset()
print("Controller reset")
print("Test completed successfully!")
except Exception as e:
print(f"Error during testing: {e}")
traceback.print_exc()
sys.exit(1)

162
tests/test_token_cache.py Normal file
View file

@ -0,0 +1,162 @@
"""
Test script to verify token caching functionality
"""
import os
import sys
import logging
import time
import json
from dotenv import load_dotenv
# Add the parent directory to Python's path so 'src' module can be found
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('token-cache-test')
def test_token_cache(cache_file: str = "test_token_cache.json") -> bool:
"""Test the token caching functionality"""
try:
import requests
except ImportError:
logger.error("The 'requests' library is required. Install it with: pip install requests")
return False
# Import the bot class
from src.core.twitch import TwitchBot
from src.core.auth import TwitchAuth
# Get credentials from environment variables
client_id = os.environ.get("TWITCH_CLIENT_ID")
client_secret = os.environ.get("TWITCH_CLIENT_SECRET")
username = os.environ.get("TWITCH_USERNAME")
channel = os.environ.get("TWITCH_CHANNEL")
if not all([client_id, client_secret, username, channel]):
logger.error("Missing required environment variables")
logger.info("Please set TWITCH_CLIENT_ID, TWITCH_CLIENT_SECRET, TWITCH_USERNAME, and TWITCH_CHANNEL")
return False
# Remove the cache file if it exists (to start fresh)
if os.path.exists(cache_file):
logger.info(f"Removing existing cache file: {cache_file}")
os.remove(cache_file)
# Use mock tokens for testing
mock_access_token = "mock_access_token_for_testing"
mock_refresh_token = "mock_refresh_token_for_testing"
mock_expires_in = 14400 # 4 hours
# Create a bot instance with the test cache file
logger.info("Creating bot instance...")
bot = TwitchBot(
username=username,
client_id=client_id,
client_secret=client_secret,
channel=channel,
access_token=mock_access_token,
refresh_token=mock_refresh_token,
token_cache_file=cache_file
)
# First call should use the provided token and save it
logger.info("First call to get_oauth_token (should use the provided token)...")
oauth_token1 = bot.get_oauth_token()
# Verify the cache file was created
if not os.path.exists(cache_file):
logger.error(f"Cache file was not created: {cache_file}")
return False
# Load and display cache file
with open(cache_file, 'r') as f:
cache_data = json.load(f)
logger.info(f"Cache file created with {len(cache_data)} entries")
logger.info(f"Token expires at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(cache_data['expires_at']))}")
# Save the token for comparison
token1 = bot.access_token
# Create a new bot instance to test loading from cache
logger.info("Creating a second bot instance (should load token from cache)...")
bot2 = TwitchBot(
username=username,
client_id=client_id,
client_secret=client_secret,
channel=channel,
token_cache_file=cache_file
)
# Second call should use the cached token
logger.info("Second call to get_oauth_token (should use cached token)...")
oauth_token2 = bot2.get_oauth_token()
token2 = bot2.access_token
# Compare tokens
if token1 == token2:
logger.info("SUCCESS: Both instances used the same token")
else:
logger.error("FAILURE: Different tokens were used")
return False
# Simulate token expiration by modifying the cache file
logger.info("Simulating token expiration...")
with open(cache_file, 'r') as f:
cache_data = json.load(f)
# Set expiry to now minus 10 minutes
cache_data['expires_at'] = time.time() - 600
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
# Create a third bot instance with the "expired" token
# Since we don't want to trigger a real OAuth flow in tests,
# we'll provide a new mock token to use
logger.info("Creating a third bot instance with expired token cache...")
mock_new_token = "mock_new_token_after_expiration"
mock_new_refresh = "mock_new_refresh_after_expiration"
bot3 = TwitchBot(
username=username,
client_id=client_id,
client_secret=client_secret,
channel=channel,
access_token=mock_new_token,
refresh_token=mock_new_refresh,
token_cache_file=cache_file
)
# Third call should use the new token since the cached one is expired
logger.info("Third call to get_oauth_token (should use the new mock token due to expiration)...")
oauth_token3 = bot3.get_oauth_token()
token3 = bot3.access_token
# Compare tokens - they should be different
if token1 != token3:
logger.info("SUCCESS: Token was refreshed after expiration")
else:
logger.error("FAILURE: Expired token was not refreshed")
return False
# Clean up
if os.path.exists(cache_file):
os.remove(cache_file)
logger.info(f"Removed test cache file: {cache_file}")
logger.info("All token cache tests passed!")
return True
if __name__ == "__main__":
print("Testing token caching functionality...")
if test_token_cache():
print("All token cache tests PASSED!")
sys.exit(0)
else:
print("Token cache tests FAILED!")
sys.exit(1)