from __future__ import annotations import json import secrets import webbrowser from datetime import datetime, timedelta from pathlib import Path from typing import Optional from urllib.parse import urlencode from aiohttp import web from app.chat_models import AuthTokens, Platform from app.config import load_config from app.paths import get_data_dir from app.state import AppState # In-memory state storage for OAuth flow oauth_states: dict[str, dict] = {} # Global config - loaded at module level _app_config = load_config() def get_tokens_file() -> Path: """Get path to tokens storage file.""" data_dir = get_data_dir() data_dir.mkdir(parents=True, exist_ok=True) return data_dir / "tokens.json" async def load_tokens(state: AppState) -> None: """Load saved tokens from disk.""" tokens_file = get_tokens_file() if not tokens_file.exists(): return try: with open(tokens_file, "r") as f: data = json.load(f) if "twitch" in data: twitch_data = data["twitch"] tokens = AuthTokens( access_token=twitch_data["access_token"], refresh_token=twitch_data.get("refresh_token"), expires_at=( datetime.fromisoformat(twitch_data["expires_at"]) if twitch_data.get("expires_at") else None ), scope=twitch_data.get("scope", []), username=twitch_data.get("username"), ) await state.set_auth_tokens(Platform.TWITCH, tokens) if "youtube" in data: youtube_data = data["youtube"] tokens = AuthTokens( access_token=youtube_data["access_token"], refresh_token=youtube_data.get("refresh_token"), expires_at=( datetime.fromisoformat(youtube_data["expires_at"]) if youtube_data.get("expires_at") else None ), scope=youtube_data.get("scope", []), ) await state.set_auth_tokens(Platform.YOUTUBE, tokens) except Exception as e: print(f"Error loading tokens: {e}") async def save_tokens(state: AppState) -> None: """Save tokens to disk.""" tokens_file = get_tokens_file() data = {} twitch_tokens = await state.get_auth_tokens(Platform.TWITCH) if twitch_tokens: data["twitch"] = twitch_tokens.to_dict() youtube_tokens = await state.get_auth_tokens(Platform.YOUTUBE) if youtube_tokens: data["youtube"] = youtube_tokens.to_dict() try: with open(tokens_file, "w") as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error saving tokens: {e}") async def handle_twitch_login(request: web.Request) -> web.Response: """Initiate Twitch OAuth flow.""" if not _app_config.twitch_oauth.is_configured(): return web.json_response( { "error": "Twitch OAuth not configured. Please edit config.json with your OAuth credentials.", "config_path": str(load_config().twitch_oauth), }, status=400, ) state_token = secrets.token_urlsafe(32) oauth_states[state_token] = {"platform": "twitch", "timestamp": datetime.now()} params = { "client_id": _app_config.twitch_oauth.client_id, "redirect_uri": _app_config.twitch_oauth.redirect_uri, "response_type": "code", "scope": "chat:read chat:edit", "state": state_token, } auth_url = f"https://id.twitch.tv/oauth2/authorize?{urlencode(params)}" # Open browser webbrowser.open(auth_url) return web.json_response({"message": "Opening browser for Twitch login..."}) async def handle_twitch_callback(request: web.Request) -> web.Response: """Handle Twitch OAuth callback.""" code = request.query.get("code") state_token = request.query.get("state") if not code or not state_token or state_token not in oauth_states: return web.Response(text="Invalid OAuth state", status=400) del oauth_states[state_token] # Exchange code for token import aiohttp async with aiohttp.ClientSession() as session: token_url = "https://id.twitch.tv/oauth2/token" data = { "client_id": _app_config.twitch_oauth.client_id, "client_secret": _app_config.twitch_oauth.client_secret, "code": code, "grant_type": "authorization_code", "redirect_uri": _app_config.twitch_oauth.redirect_uri, } async with session.post(token_url, data=data) as resp: if resp.status != 200: return web.Response(text="Failed to get access token", status=400) token_data = await resp.json() # Get the user's username using the access token user_login = None headers = { "Authorization": f"Bearer {token_data['access_token']}", "Client-Id": _app_config.twitch_oauth.client_id, } async with session.get("https://api.twitch.tv/helix/users", headers=headers) as resp: if resp.status == 200: user_data = await resp.json() if user_data.get("data"): user_login = user_data["data"][0].get("login") print(f"Twitch: Authenticated as {user_login}") # Store tokens state: AppState = request.app["state"] expires_in = token_data.get("expires_in", 3600) tokens = AuthTokens( access_token=token_data["access_token"], refresh_token=token_data.get("refresh_token"), expires_at=datetime.now() + timedelta(seconds=expires_in), scope=token_data.get("scope", []), username=user_login, ) await state.set_auth_tokens(Platform.TWITCH, tokens) await save_tokens(state) html = """
This window will close automatically...
This window will close automatically...