1124 lines
44 KiB
Python
1124 lines
44 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
|
|
from app.chat_models import ChatBadge, ChatMessage, ChatUser, Emote, Platform, UserRole
|
|
from app.paths import get_data_dir
|
|
from app.state import AppState
|
|
|
|
# Cache settings
|
|
BTTV_TOP_CACHE_FILE = "bttv_top_emotes.json"
|
|
BTTV_TRENDING_CACHE_FILE = "bttv_trending_emotes.json"
|
|
SEVENTV_TOP_CACHE_FILE = "7tv_top_emotes.json"
|
|
SEVENTV_TRENDING_CACHE_FILE = "7tv_trending_emotes.json"
|
|
EMOTE_CACHE_MAX_AGE = timedelta(hours=24) # Refresh cache after 24 hours
|
|
TRENDING_CACHE_MAX_AGE = timedelta(hours=6) # Refresh trending more frequently
|
|
|
|
# 7TV GraphQL query for emote search (supports different sort options)
|
|
SEVENTV_EMOTES_QUERY = """
|
|
query EmoteSearch($page: Int, $perPage: Int!, $sortBy: SortBy!) {
|
|
emotes {
|
|
search(
|
|
query: null
|
|
tags: {tags: [], match: ANY}
|
|
sort: {sortBy: $sortBy, order: DESCENDING}
|
|
filters: {}
|
|
page: $page
|
|
perPage: $perPage
|
|
) {
|
|
items {
|
|
id
|
|
defaultName
|
|
images {
|
|
url
|
|
mime
|
|
size
|
|
scale
|
|
width
|
|
frameCount
|
|
}
|
|
}
|
|
totalCount
|
|
pageCount
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
class TwitchChatClient:
|
|
"""
|
|
Twitch IRC WebSocket client for reading chat messages.
|
|
Uses anonymous IRC connection or authenticated if token is provided.
|
|
"""
|
|
|
|
IRC_WS_URL = "wss://irc-ws.chat.twitch.tv:443"
|
|
|
|
def __init__(self, state: AppState, channel: str):
|
|
self.state = state
|
|
self.channel = channel.lower().lstrip("#")
|
|
self.ws: Optional[aiohttp.ClientWebSocket] = None
|
|
self.session: Optional[aiohttp.ClientSession] = None
|
|
self.running = False
|
|
|
|
# Emote caches
|
|
self.global_emotes: dict[str, Emote] = {}
|
|
self.channel_emotes: dict[str, Emote] = {}
|
|
|
|
# Badge caches: badge_name/version -> image_url
|
|
self.global_badges: dict[str, str] = {}
|
|
self.channel_badges: dict[str, str] = {}
|
|
self.channel_id: Optional[str] = None
|
|
|
|
# Authentication state
|
|
self.is_authenticated: bool = False
|
|
|
|
# Authenticated user info (populated from GLOBALUSERSTATE/USERSTATE)
|
|
self.user_color: Optional[str] = None
|
|
self.user_badges: list[ChatBadge] = []
|
|
self.user_display_name: Optional[str] = None
|
|
|
|
async def start(self) -> None:
|
|
"""Start the IRC connection."""
|
|
self.running = True
|
|
self.session = aiohttp.ClientSession()
|
|
|
|
tokens = await self.state.get_auth_tokens(Platform.TWITCH)
|
|
|
|
try:
|
|
# Get channel ID for badges and emotes
|
|
await self._get_channel_id()
|
|
|
|
# Load badges
|
|
await self._load_badges()
|
|
|
|
# Load emotes
|
|
await self._load_emotes()
|
|
|
|
# Connect to IRC
|
|
self.ws = await self.session.ws_connect(self.IRC_WS_URL)
|
|
|
|
# Authenticate
|
|
if tokens and tokens.access_token:
|
|
# Use the stored username, or fall back to channel name
|
|
nick = tokens.username or self.channel
|
|
await self.ws.send_str(f"PASS oauth:{tokens.access_token}")
|
|
await self.ws.send_str(f"NICK {nick}")
|
|
self.is_authenticated = True
|
|
print(f"Twitch: Connected with authentication as {nick}")
|
|
else:
|
|
# Anonymous connection (read-only)
|
|
await self.ws.send_str("PASS SCHMOOPIIE")
|
|
await self.ws.send_str(f"NICK justinfan{asyncio.get_event_loop().time():.0f}")
|
|
self.is_authenticated = False
|
|
print(f"Twitch: Connected anonymously (read-only, cannot send messages)")
|
|
|
|
# Request capabilities for tags (emotes, badges, color, etc.)
|
|
await self.ws.send_str("CAP REQ :twitch.tv/tags twitch.tv/commands")
|
|
|
|
# Join channel
|
|
await self.ws.send_str(f"JOIN #{self.channel}")
|
|
|
|
# Start message loop
|
|
await self._message_loop()
|
|
|
|
except Exception as e:
|
|
print(f"Twitch chat error: {e}")
|
|
finally:
|
|
await self.stop()
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the IRC connection."""
|
|
self.running = False
|
|
if self.ws:
|
|
await self.ws.close()
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def send_message(self, message: str, echo: bool = True) -> bool:
|
|
"""
|
|
Send a chat message to the channel.
|
|
|
|
Returns True if the message was sent successfully, False otherwise.
|
|
Requires authenticated connection (not anonymous).
|
|
|
|
Args:
|
|
message: The message to send
|
|
echo: Whether to locally echo the message (default True)
|
|
"""
|
|
if not self.ws or not self.running:
|
|
print("Twitch: Cannot send message - not connected")
|
|
return False
|
|
|
|
# Check if we're connected anonymously - can't send messages
|
|
if not self.is_authenticated:
|
|
print("Twitch: Cannot send message - connected anonymously. Please authenticate via /config and restart.")
|
|
return False
|
|
|
|
try:
|
|
# Send PRIVMSG to channel
|
|
await self.ws.send_str(f"PRIVMSG #{self.channel} :{message}")
|
|
print(f"Twitch: Sent message to #{self.channel}")
|
|
|
|
# Local echo - add our own message to the chat so we can see it
|
|
if echo:
|
|
await self._echo_sent_message(message)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"Twitch: Error sending message: {e}")
|
|
return False
|
|
|
|
async def send_message_no_echo(self, message: str) -> bool:
|
|
"""Send a message without local echo (used for multi-platform sends)."""
|
|
return await self.send_message(message, echo=False)
|
|
|
|
async def _echo_sent_message(self, message: str) -> None:
|
|
"""Add our own sent message to the chat display (local echo)."""
|
|
tokens = await self.state.get_auth_tokens(Platform.TWITCH)
|
|
if not tokens:
|
|
return
|
|
|
|
username = tokens.username or self.channel
|
|
display_name = self.user_display_name or username
|
|
|
|
# Determine roles from badges
|
|
roles = [UserRole.VIEWER]
|
|
badge_names = [b.name for b in self.user_badges]
|
|
if "broadcaster" in badge_names:
|
|
roles.append(UserRole.BROADCASTER)
|
|
if "moderator" in badge_names:
|
|
roles.append(UserRole.MODERATOR)
|
|
if "vip" in badge_names:
|
|
roles.append(UserRole.VIP)
|
|
if "subscriber" in badge_names or "founder" in badge_names:
|
|
roles.append(UserRole.SUBSCRIBER)
|
|
|
|
# Create a user object for ourselves using stored info
|
|
user = ChatUser(
|
|
id=username,
|
|
username=username,
|
|
display_name=display_name,
|
|
platform=Platform.TWITCH,
|
|
color=self.user_color, # Use our actual color from Twitch
|
|
roles=roles,
|
|
badges=self.user_badges.copy(), # Use our actual badges
|
|
)
|
|
|
|
# Check for /me action
|
|
is_action = message.startswith("/me ")
|
|
if is_action:
|
|
message = message[4:]
|
|
|
|
# Create the message
|
|
msg_id = f"sent_{username}_{datetime.now().timestamp()}"
|
|
|
|
# Parse emotes from our message
|
|
emotes = await self._parse_emotes(message, {})
|
|
|
|
chat_msg = ChatMessage(
|
|
id=msg_id,
|
|
platform=Platform.TWITCH,
|
|
user=user,
|
|
message=message,
|
|
timestamp=datetime.now(),
|
|
emotes=emotes,
|
|
is_action=is_action,
|
|
)
|
|
|
|
# Add to state (broadcasts to all connected clients)
|
|
await self.state.add_chat_message(chat_msg)
|
|
|
|
async def _message_loop(self) -> None:
|
|
"""Main loop to receive and process IRC messages."""
|
|
if not self.ws:
|
|
return
|
|
|
|
async for msg in self.ws:
|
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
|
await self._handle_irc_message(msg.data)
|
|
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
|
|
break
|
|
|
|
async def _handle_irc_message(self, raw: str) -> None:
|
|
"""Parse and handle a single IRC message."""
|
|
raw = raw.strip()
|
|
|
|
# Respond to PING
|
|
if raw.startswith("PING"):
|
|
if self.ws:
|
|
await self.ws.send_str("PONG :tmi.twitch.tv")
|
|
return
|
|
|
|
# Handle NOTICE messages (errors, warnings from Twitch)
|
|
if "NOTICE" in raw:
|
|
# Extract the notice message
|
|
notice_match = re.search(r"NOTICE [#\w]+ :(.+)", raw)
|
|
if notice_match:
|
|
notice_text = notice_match.group(1)
|
|
print(f"Twitch NOTICE: {notice_text}")
|
|
else:
|
|
print(f"Twitch NOTICE (raw): {raw}")
|
|
return
|
|
|
|
# Handle USERSTATE (sent after successful message - contains our user info)
|
|
if "USERSTATE" in raw:
|
|
self._parse_user_state(raw)
|
|
return
|
|
|
|
# Handle GLOBALUSERSTATE (sent on connect - contains our global user info)
|
|
if "GLOBALUSERSTATE" in raw:
|
|
self._parse_user_state(raw)
|
|
return
|
|
|
|
# Log other important messages for debugging
|
|
if any(x in raw for x in ["JOIN", "PART"]):
|
|
# Normal connection messages, ignore
|
|
return
|
|
|
|
# Log connection/room info
|
|
if "ROOMSTATE" in raw:
|
|
# Parse room state to check settings
|
|
if "followers-only=" in raw:
|
|
fo_match = re.search(r"followers-only=(-?\d+)", raw)
|
|
if fo_match:
|
|
fo_val = int(fo_match.group(1))
|
|
if fo_val >= 0:
|
|
print(f"Twitch: Channel is in followers-only mode ({fo_val} minutes)")
|
|
else:
|
|
print(f"Twitch: Channel followers-only mode is OFF")
|
|
return
|
|
|
|
# Parse PRIVMSG (chat messages)
|
|
if "PRIVMSG" in raw:
|
|
await self._parse_privmsg(raw)
|
|
|
|
def _parse_user_state(self, raw: str) -> None:
|
|
"""Parse USERSTATE or GLOBALUSERSTATE to get our own user info."""
|
|
# Extract tags
|
|
if not raw.startswith("@"):
|
|
return
|
|
|
|
tag_str = raw.split(" ", 1)[0]
|
|
tags = {}
|
|
for tag in tag_str[1:].split(";"):
|
|
if "=" in tag:
|
|
key, value = tag.split("=", 1)
|
|
tags[key] = value
|
|
|
|
# Get display name
|
|
if tags.get("display-name"):
|
|
self.user_display_name = tags["display-name"]
|
|
|
|
# Get color
|
|
if tags.get("color"):
|
|
self.user_color = tags["color"]
|
|
|
|
# Get badges
|
|
badges_tag = tags.get("badges", "")
|
|
if badges_tag:
|
|
self.user_badges = []
|
|
for badge_pair in badges_tag.split(","):
|
|
if "/" in badge_pair:
|
|
badge_name, badge_version = badge_pair.split("/", 1)
|
|
badge_key = f"{badge_name}/{badge_version}"
|
|
|
|
# Look up badge image URL
|
|
icon_url = self.channel_badges.get(badge_key) or self.global_badges.get(badge_key)
|
|
self.user_badges.append(ChatBadge(name=badge_name, icon_url=icon_url))
|
|
|
|
print(f"Twitch: User info updated - {self.user_display_name}, color={self.user_color}, badges={len(self.user_badges)}")
|
|
|
|
async def _parse_privmsg(self, raw: str) -> None:
|
|
"""
|
|
Parse a PRIVMSG IRC line.
|
|
Format: @tags :user!user@user.tmi.twitch.tv PRIVMSG #channel :message
|
|
"""
|
|
# Extract tags
|
|
tags = {}
|
|
if raw.startswith("@"):
|
|
tag_str, raw = raw.split(" ", 1)
|
|
for tag in tag_str[1:].split(";"):
|
|
if "=" in tag:
|
|
key, value = tag.split("=", 1)
|
|
tags[key] = value
|
|
|
|
# Extract user
|
|
user_match = re.search(r":(\w+)!", raw)
|
|
if not user_match:
|
|
return
|
|
username = user_match.group(1)
|
|
|
|
# Extract message
|
|
msg_match = re.search(r"PRIVMSG #\w+ :(.+)", raw)
|
|
if not msg_match:
|
|
return
|
|
message_text = msg_match.group(1)
|
|
|
|
# Check for /me action
|
|
is_action = message_text.startswith("\x01ACTION") and message_text.endswith("\x01")
|
|
if is_action:
|
|
message_text = message_text[8:-1].strip()
|
|
|
|
# Build user object
|
|
user = self._build_user(username, tags)
|
|
|
|
# Build message object
|
|
msg_id = tags.get("id", f"{username}_{datetime.now().timestamp()}")
|
|
emotes = await self._parse_emotes(message_text, tags)
|
|
|
|
chat_msg = ChatMessage(
|
|
id=msg_id,
|
|
platform=Platform.TWITCH,
|
|
user=user,
|
|
message=message_text,
|
|
timestamp=datetime.now(),
|
|
emotes=emotes,
|
|
is_action=is_action,
|
|
)
|
|
|
|
# Add to state
|
|
await self.state.add_chat_message(chat_msg)
|
|
|
|
def _build_user(self, username: str, tags: dict[str, str]) -> ChatUser:
|
|
"""Build a ChatUser from IRC tags."""
|
|
display_name = tags.get("display-name", username)
|
|
user_id = tags.get("user-id", username)
|
|
color = tags.get("color") or None
|
|
|
|
# Parse roles
|
|
roles = [UserRole.VIEWER]
|
|
badges_tag = tags.get("badges", "")
|
|
|
|
if "broadcaster" in badges_tag:
|
|
roles.append(UserRole.BROADCASTER)
|
|
if "moderator" in badges_tag:
|
|
roles.append(UserRole.MODERATOR)
|
|
if "vip" in badges_tag:
|
|
roles.append(UserRole.VIP)
|
|
if "subscriber" in badges_tag or "founder" in badges_tag:
|
|
roles.append(UserRole.SUBSCRIBER)
|
|
|
|
# Parse badges with icons
|
|
badges = []
|
|
if badges_tag:
|
|
for badge_pair in badges_tag.split(","):
|
|
if "/" in badge_pair:
|
|
badge_name, badge_version = badge_pair.split("/", 1)
|
|
badge_key = f"{badge_name}/{badge_version}"
|
|
|
|
# Look up badge image URL (channel badges first, then global)
|
|
icon_url = self.channel_badges.get(badge_key) or self.global_badges.get(badge_key)
|
|
|
|
badges.append(ChatBadge(name=badge_name, icon_url=icon_url))
|
|
|
|
return ChatUser(
|
|
id=user_id,
|
|
username=username,
|
|
display_name=display_name,
|
|
platform=Platform.TWITCH,
|
|
color=color,
|
|
roles=roles,
|
|
badges=badges,
|
|
)
|
|
|
|
async def _get_channel_id(self) -> None:
|
|
"""Get the channel's Twitch user ID (needed for badges/emotes)."""
|
|
if not self.session:
|
|
return
|
|
|
|
try:
|
|
# Use the unofficial Twitch API to get user ID from username
|
|
url = f"https://api.ivr.fi/v2/twitch/user?login={self.channel}"
|
|
async with self.session.get(url) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
if data and len(data) > 0:
|
|
self.channel_id = data[0].get("id")
|
|
print(f"Twitch: Got channel ID {self.channel_id} for {self.channel}")
|
|
except Exception as e:
|
|
print(f"Twitch: Error getting channel ID: {e}")
|
|
|
|
async def _load_badges(self) -> None:
|
|
"""Load Twitch badges (global and channel-specific) using Helix API."""
|
|
if not self.session:
|
|
return
|
|
|
|
# Get OAuth config for Client-ID
|
|
from app.config import load_config
|
|
config = load_config()
|
|
client_id = config.twitch_oauth.client_id
|
|
|
|
# Get access token if available
|
|
tokens = await self.state.get_auth_tokens(Platform.TWITCH)
|
|
|
|
headers = {}
|
|
if client_id:
|
|
headers["Client-ID"] = client_id
|
|
if tokens and tokens.access_token:
|
|
headers["Authorization"] = f"Bearer {tokens.access_token}"
|
|
|
|
try:
|
|
# Load global badges via Helix API
|
|
if headers:
|
|
async with self.session.get(
|
|
"https://api.twitch.tv/helix/chat/badges/global",
|
|
headers=headers
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
for badge_set in data.get("data", []):
|
|
badge_name = badge_set.get("set_id")
|
|
for version in badge_set.get("versions", []):
|
|
version_id = version.get("id")
|
|
badge_key = f"{badge_name}/{version_id}"
|
|
# Prefer higher resolution images
|
|
icon_url = (
|
|
version.get("image_url_4x") or
|
|
version.get("image_url_2x") or
|
|
version.get("image_url_1x")
|
|
)
|
|
if icon_url:
|
|
self.global_badges[badge_key] = icon_url
|
|
print(f"Twitch: Loaded {len(self.global_badges)} global badges")
|
|
else:
|
|
print(f"Twitch: Failed to load global badges (status {resp.status})")
|
|
|
|
# Load channel badges if we have channel ID
|
|
if self.channel_id:
|
|
async with self.session.get(
|
|
f"https://api.twitch.tv/helix/chat/badges?broadcaster_id={self.channel_id}",
|
|
headers=headers
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
for badge_set in data.get("data", []):
|
|
badge_name = badge_set.get("set_id")
|
|
for version in badge_set.get("versions", []):
|
|
version_id = version.get("id")
|
|
badge_key = f"{badge_name}/{version_id}"
|
|
icon_url = (
|
|
version.get("image_url_4x") or
|
|
version.get("image_url_2x") or
|
|
version.get("image_url_1x")
|
|
)
|
|
if icon_url:
|
|
self.channel_badges[badge_key] = icon_url
|
|
print(f"Twitch: Loaded {len(self.channel_badges)} channel badges")
|
|
else:
|
|
# Fallback: use static badge URLs for common badges if no OAuth
|
|
self._load_static_badges()
|
|
|
|
except Exception as e:
|
|
print(f"Twitch: Error loading badges: {e}")
|
|
# Fallback to static badges
|
|
self._load_static_badges()
|
|
|
|
def _load_static_badges(self) -> None:
|
|
"""Load static fallback badges for common badge types."""
|
|
# These are stable CDN URLs for common Twitch badges
|
|
static_badges = {
|
|
"broadcaster/1": "https://static-cdn.jtvnw.net/badges/v1/5527c58c-fb7d-422d-b71b-f309dcb85cc1/3",
|
|
"moderator/1": "https://static-cdn.jtvnw.net/badges/v1/3267646d-33f0-4b17-b3df-f923a41db1d0/3",
|
|
"vip/1": "https://static-cdn.jtvnw.net/badges/v1/b817aba4-fad8-49e2-b88a-7cc744f6a6e3/3",
|
|
"subscriber/0": "https://static-cdn.jtvnw.net/badges/v1/5d9f2208-5dd8-11e7-8513-2ff4adfae661/3",
|
|
"subscriber/1": "https://static-cdn.jtvnw.net/badges/v1/5d9f2208-5dd8-11e7-8513-2ff4adfae661/3",
|
|
"premium/1": "https://static-cdn.jtvnw.net/badges/v1/bbbe0db0-a598-423e-86d0-f9fb98ca1933/3",
|
|
"partner/1": "https://static-cdn.jtvnw.net/badges/v1/d12a2e27-16f6-41d0-ab77-b780518f00a3/3",
|
|
"turbo/1": "https://static-cdn.jtvnw.net/badges/v1/bd444ec6-8f34-4bf9-91f4-af1e3428d80f/3",
|
|
"glhf-pledge/1": "https://static-cdn.jtvnw.net/badges/v1/3158e758-3cb4-43c5-94b3-7571f71cf6a0/3",
|
|
"founder/0": "https://static-cdn.jtvnw.net/badges/v1/511b78a9-ab37-472f-9569-457753bbe7d3/3",
|
|
}
|
|
self.global_badges.update(static_badges)
|
|
print(f"Twitch: Loaded {len(static_badges)} static fallback badges")
|
|
|
|
async def _parse_emotes(self, message: str, tags: dict[str, str]) -> list[Emote]:
|
|
"""Parse emotes from message and tags."""
|
|
emotes = []
|
|
|
|
# Parse Twitch native emotes from tags
|
|
emotes_tag = tags.get("emotes", "")
|
|
if emotes_tag:
|
|
# Format: "emoteid:start-end,start-end/emoteid2:start-end"
|
|
for emote_data in emotes_tag.split("/"):
|
|
if ":" not in emote_data:
|
|
continue
|
|
emote_id, positions = emote_data.split(":", 1)
|
|
# Just use first position to get the code
|
|
if "-" in positions:
|
|
start_pos = int(positions.split(",")[0].split("-")[0])
|
|
end_pos = int(positions.split(",")[0].split("-")[1])
|
|
code = message[start_pos : end_pos + 1]
|
|
emotes.append(
|
|
Emote(
|
|
code=code,
|
|
url=f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/default/dark/1.0",
|
|
provider="twitch",
|
|
emote_id=emote_id,
|
|
)
|
|
)
|
|
|
|
# Check for third-party emotes in message
|
|
words = message.split()
|
|
for word in words:
|
|
# Check FFZ
|
|
if word in self.global_emotes or word in self.channel_emotes:
|
|
emote = self.global_emotes.get(word) or self.channel_emotes.get(word)
|
|
if emote and emote not in emotes:
|
|
emotes.append(emote)
|
|
|
|
return emotes
|
|
|
|
async def _load_emotes(self) -> None:
|
|
"""Load third-party emotes from FFZ, BTTV, 7TV."""
|
|
config = self.state.chat_config
|
|
|
|
if not self.session:
|
|
return
|
|
|
|
try:
|
|
# Load FrankerFaceZ emotes
|
|
if config.enable_ffz:
|
|
await self._load_ffz_emotes()
|
|
|
|
# Load BTTV emotes
|
|
if config.enable_bttv:
|
|
await self._load_bttv_emotes()
|
|
|
|
# Load 7TV emotes
|
|
if config.enable_7tv:
|
|
await self._load_7tv_emotes()
|
|
|
|
except Exception as e:
|
|
print(f"Error loading emotes: {e}")
|
|
|
|
async def _load_ffz_emotes(self) -> None:
|
|
"""Load FrankerFaceZ emotes for the channel."""
|
|
if not self.session:
|
|
return
|
|
|
|
loaded_global = 0
|
|
loaded_channel = 0
|
|
|
|
try:
|
|
# Global FFZ emotes
|
|
async with self.session.get("https://api.frankerfacez.com/v1/set/global") as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
for set_id, set_data in data.get("sets", {}).items():
|
|
for emote in set_data.get("emoticons", []):
|
|
code = emote.get("name")
|
|
emote_id = str(emote.get("id", ""))
|
|
urls = emote.get("urls", {})
|
|
# Use 1x as default, frontend will upgrade
|
|
url = urls.get("1") or urls.get("2") or urls.get("4")
|
|
if code and url:
|
|
self.global_emotes[code] = Emote(
|
|
code=code,
|
|
url=f"https:{url}" if url.startswith("//") else url,
|
|
provider="ffz",
|
|
emote_id=emote_id,
|
|
)
|
|
loaded_global += 1
|
|
|
|
print(f"FFZ: Loaded {loaded_global} global emotes")
|
|
|
|
# Channel-specific FFZ emotes
|
|
async with self.session.get(f"https://api.frankerfacez.com/v1/room/{self.channel}") as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
for set_id, set_data in data.get("sets", {}).items():
|
|
for emote in set_data.get("emoticons", []):
|
|
code = emote.get("name")
|
|
emote_id = str(emote.get("id", ""))
|
|
urls = emote.get("urls", {})
|
|
# Use 1x as default, frontend will upgrade
|
|
url = urls.get("1") or urls.get("2") or urls.get("4")
|
|
if code and url:
|
|
self.channel_emotes[code] = Emote(
|
|
code=code,
|
|
url=f"https:{url}" if url.startswith("//") else url,
|
|
provider="ffz",
|
|
emote_id=emote_id,
|
|
)
|
|
loaded_channel += 1
|
|
|
|
if loaded_channel > 0:
|
|
print(f"FFZ: Loaded {loaded_channel} channel emotes")
|
|
|
|
except Exception as e:
|
|
print(f"FFZ emote load error: {e}")
|
|
|
|
def _get_bttv_cache_path(self, cache_type: str = "top") -> Path:
|
|
"""Get the path to the BTTV emote cache file."""
|
|
if cache_type == "trending":
|
|
return get_data_dir() / BTTV_TRENDING_CACHE_FILE
|
|
return get_data_dir() / BTTV_TOP_CACHE_FILE
|
|
|
|
def _is_bttv_cache_valid(self, cache_type: str = "top") -> bool:
|
|
"""Check if the BTTV cache exists and is not expired."""
|
|
cache_path = self._get_bttv_cache_path(cache_type)
|
|
if not cache_path.exists():
|
|
return False
|
|
|
|
# Check cache age - trending refreshes more frequently
|
|
cache_mtime = datetime.fromtimestamp(cache_path.stat().st_mtime)
|
|
max_age = TRENDING_CACHE_MAX_AGE if cache_type == "trending" else EMOTE_CACHE_MAX_AGE
|
|
return datetime.now() - cache_mtime < max_age
|
|
|
|
def _load_bttv_cache(self, cache_type: str = "top") -> list[dict]:
|
|
"""Load BTTV emotes from cache file."""
|
|
cache_path = self._get_bttv_cache_path(cache_type)
|
|
try:
|
|
with open(cache_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"BTTV: Error loading {cache_type} cache: {e}")
|
|
return []
|
|
|
|
def _save_bttv_cache(self, emotes: list[dict], cache_type: str = "top") -> None:
|
|
"""Save BTTV emotes to cache file."""
|
|
cache_path = self._get_bttv_cache_path(cache_type)
|
|
try:
|
|
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(cache_path, "w", encoding="utf-8") as f:
|
|
json.dump(emotes, f)
|
|
print(f"BTTV: Saved {len(emotes)} {cache_type} emotes to cache")
|
|
except Exception as e:
|
|
print(f"BTTV: Error saving {cache_type} cache: {e}")
|
|
|
|
async def _fetch_bttv_emotes_by_type(self, emote_type: str, label: str, max_pages: int = 100) -> list[dict]:
|
|
"""Fetch BTTV emotes by paginating through the API with a specific type (top/trending)."""
|
|
if not self.session:
|
|
return []
|
|
|
|
all_emotes: list[dict] = []
|
|
before_cursor: Optional[str] = None
|
|
page = 1
|
|
|
|
print(f"BTTV: Fetching {label} shared emotes...")
|
|
|
|
while page <= max_pages:
|
|
url = f"https://api.betterttv.net/3/emotes/shared/{emote_type}?limit=100"
|
|
if before_cursor:
|
|
url += f"&before={before_cursor}"
|
|
|
|
try:
|
|
async with self.session.get(url) as resp:
|
|
if resp.status != 200:
|
|
print(f"BTTV: Error fetching {label} page {page}: status {resp.status}")
|
|
break
|
|
|
|
emotes = await resp.json()
|
|
if not emotes:
|
|
break # No more emotes
|
|
|
|
all_emotes.extend(emotes)
|
|
|
|
# Get the cursor for next page from the last item
|
|
last_item = emotes[-1]
|
|
before_cursor = last_item.get("id")
|
|
|
|
if not before_cursor:
|
|
break
|
|
|
|
# Log every 10 pages to reduce spam
|
|
if page % 10 == 0 or page == 1:
|
|
print(f"BTTV: Fetched {label} page {page} (total: {len(all_emotes)})")
|
|
page += 1
|
|
|
|
# Small delay to be nice to the API
|
|
await asyncio.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
print(f"BTTV: Error fetching {label} page {page}: {e}")
|
|
break
|
|
|
|
print(f"BTTV: Finished fetching {len(all_emotes)} {label} emotes")
|
|
return all_emotes
|
|
|
|
def _load_bttv_emotes_to_dict(self, emotes: list[dict]) -> int:
|
|
"""Load BTTV emotes from a list into the global emotes dictionary."""
|
|
loaded = 0
|
|
for item in emotes:
|
|
emote = item.get("emote", {})
|
|
code = emote.get("code")
|
|
emote_id = emote.get("id")
|
|
if code and emote_id and code not in self.global_emotes:
|
|
self.global_emotes[code] = Emote(
|
|
code=code,
|
|
url=f"https://cdn.betterttv.net/emote/{emote_id}/1x",
|
|
provider="bttv",
|
|
emote_id=emote_id,
|
|
)
|
|
loaded += 1
|
|
return loaded
|
|
|
|
async def _load_bttv_emotes(self) -> None:
|
|
"""Load BetterTTV emotes."""
|
|
if not self.session:
|
|
return
|
|
|
|
loaded_global = 0
|
|
loaded_channel = 0
|
|
loaded_top = 0
|
|
loaded_trending = 0
|
|
|
|
try:
|
|
# Global BTTV emotes
|
|
async with self.session.get("https://api.betterttv.net/3/cached/emotes/global") as resp:
|
|
if resp.status == 200:
|
|
emotes = await resp.json()
|
|
for emote in emotes:
|
|
code = emote.get("code")
|
|
emote_id = emote.get("id")
|
|
if code and emote_id:
|
|
self.global_emotes[code] = Emote(
|
|
code=code,
|
|
url=f"https://cdn.betterttv.net/emote/{emote_id}/1x",
|
|
provider="bttv",
|
|
emote_id=emote_id,
|
|
)
|
|
loaded_global += 1
|
|
|
|
print(f"BTTV: Loaded {loaded_global} global emotes")
|
|
|
|
# Top shared BTTV emotes - use cache if valid, otherwise fetch all
|
|
if self._is_bttv_cache_valid("top"):
|
|
print("BTTV: Using cached top emotes")
|
|
top_emotes = self._load_bttv_cache("top")
|
|
else:
|
|
top_emotes = await self._fetch_bttv_emotes_by_type("top", "top")
|
|
if top_emotes:
|
|
self._save_bttv_cache(top_emotes, "top")
|
|
|
|
loaded_top = self._load_bttv_emotes_to_dict(top_emotes)
|
|
if loaded_top > 0:
|
|
print(f"BTTV: Loaded {loaded_top} top shared emotes")
|
|
|
|
# Trending BTTV emotes - use cache if valid, otherwise fetch
|
|
# Trending typically has fewer pages, limit to 50
|
|
if self._is_bttv_cache_valid("trending"):
|
|
print("BTTV: Using cached trending emotes")
|
|
trending_emotes = self._load_bttv_cache("trending")
|
|
else:
|
|
trending_emotes = await self._fetch_bttv_emotes_by_type("trending", "trending", max_pages=50)
|
|
if trending_emotes:
|
|
self._save_bttv_cache(trending_emotes, "trending")
|
|
|
|
loaded_trending = self._load_bttv_emotes_to_dict(trending_emotes)
|
|
if loaded_trending > 0:
|
|
print(f"BTTV: Loaded {loaded_trending} trending emotes")
|
|
|
|
# Channel BTTV emotes - use channel ID if available
|
|
channel_identifier = self.channel_id or self.channel
|
|
async with self.session.get(f"https://api.betterttv.net/3/cached/users/twitch/{channel_identifier}") as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
for emote in data.get("channelEmotes", []) + data.get("sharedEmotes", []):
|
|
code = emote.get("code")
|
|
emote_id = emote.get("id")
|
|
if code and emote_id:
|
|
self.channel_emotes[code] = Emote(
|
|
code=code,
|
|
url=f"https://cdn.betterttv.net/emote/{emote_id}/1x",
|
|
provider="bttv",
|
|
emote_id=emote_id,
|
|
)
|
|
loaded_channel += 1
|
|
|
|
if loaded_channel > 0:
|
|
print(f"BTTV: Loaded {loaded_channel} channel emotes")
|
|
|
|
except Exception as e:
|
|
print(f"BTTV emote load error: {e}")
|
|
|
|
def _get_7tv_emote_url(self, emote: dict) -> Optional[str]:
|
|
"""Extract the correct URL from a 7TV emote object (1x for frontend scaling)."""
|
|
# Try to get from data.host structure (v3 API format)
|
|
emote_data = emote.get("data", {})
|
|
host = emote_data.get("host", {})
|
|
|
|
if host:
|
|
base_url = host.get("url", "")
|
|
files = host.get("files", [])
|
|
|
|
# Use 1x as default, frontend will upgrade based on font size
|
|
for f in files:
|
|
if f.get("name") == "1x.webp":
|
|
return f"https:{base_url}/{f.get('name')}"
|
|
|
|
# Fallback to 2x
|
|
for f in files:
|
|
if f.get("name") == "2x.webp":
|
|
return f"https:{base_url}/{f.get('name')}"
|
|
|
|
# Fallback to first webp file
|
|
for f in files:
|
|
if f.get("format") == "WEBP":
|
|
return f"https:{base_url}/{f.get('name')}"
|
|
|
|
# Last resort: construct URL
|
|
if base_url:
|
|
return f"https:{base_url}/1x.webp"
|
|
|
|
return None
|
|
|
|
def _get_7tv_emote_url_v4(self, emote: dict) -> Optional[str]:
|
|
"""Extract the correct URL from a 7TV v4 GraphQL emote object (1x for frontend scaling)."""
|
|
images = emote.get("images", [])
|
|
|
|
# Use 1x as default, frontend will upgrade based on font size
|
|
for img in images:
|
|
if img.get("scale") == 1:
|
|
url = img.get("url")
|
|
if url:
|
|
return url if url.startswith("http") else f"https:{url}"
|
|
|
|
# Fallback to 2x
|
|
for img in images:
|
|
if img.get("scale") == 2:
|
|
url = img.get("url")
|
|
if url:
|
|
return url if url.startswith("http") else f"https:{url}"
|
|
|
|
# Fallback to first image
|
|
if images:
|
|
url = images[0].get("url")
|
|
if url:
|
|
return url if url.startswith("http") else f"https:{url}"
|
|
|
|
return None
|
|
|
|
def _get_7tv_cache_path(self, cache_type: str = "top") -> Path:
|
|
"""Get the path to the 7TV emote cache file."""
|
|
if cache_type == "trending":
|
|
return get_data_dir() / SEVENTV_TRENDING_CACHE_FILE
|
|
return get_data_dir() / SEVENTV_TOP_CACHE_FILE
|
|
|
|
def _is_7tv_cache_valid(self, cache_type: str = "top") -> bool:
|
|
"""Check if the 7TV cache exists and is not expired."""
|
|
cache_path = self._get_7tv_cache_path(cache_type)
|
|
if not cache_path.exists():
|
|
return False
|
|
|
|
# Check cache age - trending refreshes more frequently
|
|
cache_mtime = datetime.fromtimestamp(cache_path.stat().st_mtime)
|
|
max_age = TRENDING_CACHE_MAX_AGE if cache_type == "trending" else EMOTE_CACHE_MAX_AGE
|
|
return datetime.now() - cache_mtime < max_age
|
|
|
|
def _load_7tv_cache(self, cache_type: str = "top") -> list[dict]:
|
|
"""Load 7TV emotes from cache file."""
|
|
cache_path = self._get_7tv_cache_path(cache_type)
|
|
try:
|
|
with open(cache_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"7TV: Error loading {cache_type} cache: {e}")
|
|
return []
|
|
|
|
def _save_7tv_cache(self, emotes: list[dict], cache_type: str = "top") -> None:
|
|
"""Save 7TV emotes to cache file."""
|
|
cache_path = self._get_7tv_cache_path(cache_type)
|
|
try:
|
|
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(cache_path, "w", encoding="utf-8") as f:
|
|
json.dump(emotes, f)
|
|
print(f"7TV: Saved {len(emotes)} {cache_type} emotes to cache")
|
|
except Exception as e:
|
|
print(f"7TV: Error saving {cache_type} cache: {e}")
|
|
|
|
async def _fetch_7tv_emotes_by_sort(self, sort_by: str, label: str, max_pages: int = 150) -> list[dict]:
|
|
"""Fetch 7TV emotes by paginating through the GraphQL API with a specific sort."""
|
|
if not self.session:
|
|
return []
|
|
|
|
all_emotes: list[dict] = []
|
|
page = 1
|
|
per_page = 72 # Max per page for 7TV
|
|
total_pages = None
|
|
|
|
print(f"7TV: Fetching {label} emotes...")
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"Origin": "https://7tv.app",
|
|
"Referer": "https://7tv.app/",
|
|
}
|
|
|
|
while page <= max_pages:
|
|
if total_pages is not None and page > total_pages:
|
|
break
|
|
|
|
payload = {
|
|
"operationName": "EmoteSearch",
|
|
"query": SEVENTV_EMOTES_QUERY,
|
|
"variables": {
|
|
"page": page,
|
|
"perPage": per_page,
|
|
"sortBy": sort_by,
|
|
}
|
|
}
|
|
|
|
try:
|
|
async with self.session.post(
|
|
"https://api.7tv.app/v4/gql",
|
|
json=payload,
|
|
headers=headers
|
|
) as resp:
|
|
if resp.status != 200:
|
|
print(f"7TV: Error fetching {label} page {page}: status {resp.status}")
|
|
break
|
|
|
|
result = await resp.json()
|
|
search_data = result.get("data", {}).get("emotes", {}).get("search", {})
|
|
items = search_data.get("items", [])
|
|
|
|
if not items:
|
|
break # No more emotes
|
|
|
|
all_emotes.extend(items)
|
|
|
|
# Get total page count on first request
|
|
if total_pages is None:
|
|
total_pages = min(search_data.get("pageCount", max_pages), max_pages)
|
|
total_count = search_data.get("totalCount", 0)
|
|
print(f"7TV: Found {total_count:,} {label} emotes, fetching up to {total_pages} pages")
|
|
|
|
# Log every 10 pages to reduce spam
|
|
if page % 10 == 0 or page == 1:
|
|
print(f"7TV: Fetched {label} page {page}/{total_pages} (total: {len(all_emotes)})")
|
|
page += 1
|
|
|
|
# Small delay to be nice to the API
|
|
await asyncio.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
print(f"7TV: Error fetching {label} page {page}: {e}")
|
|
break
|
|
|
|
print(f"7TV: Finished fetching {len(all_emotes)} {label} emotes")
|
|
return all_emotes
|
|
|
|
def _load_7tv_emotes_to_dict(self, emotes: list[dict]) -> int:
|
|
"""Load 7TV emotes from a list into the global emotes dictionary."""
|
|
loaded = 0
|
|
for emote in emotes:
|
|
code = emote.get("defaultName")
|
|
emote_id = emote.get("id")
|
|
if code and code not in self.global_emotes:
|
|
url = self._get_7tv_emote_url_v4(emote)
|
|
if url:
|
|
self.global_emotes[code] = Emote(
|
|
code=code,
|
|
url=url,
|
|
provider="7tv",
|
|
is_animated=any(
|
|
img.get("frameCount", 1) > 1
|
|
for img in emote.get("images", [])
|
|
),
|
|
emote_id=emote_id,
|
|
)
|
|
loaded += 1
|
|
return loaded
|
|
|
|
async def _load_7tv_emotes(self) -> None:
|
|
"""Load 7TV emotes."""
|
|
if not self.session:
|
|
return
|
|
|
|
loaded_global = 0
|
|
loaded_channel = 0
|
|
loaded_top = 0
|
|
loaded_trending = 0
|
|
|
|
try:
|
|
# Global 7TV emotes
|
|
async with self.session.get("https://7tv.io/v3/emote-sets/global") as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
for emote in data.get("emotes", []):
|
|
code = emote.get("name")
|
|
emote_id = emote.get("id")
|
|
url = self._get_7tv_emote_url(emote)
|
|
if code and url:
|
|
emote_data = emote.get("data", {})
|
|
self.global_emotes[code] = Emote(
|
|
code=code,
|
|
url=url,
|
|
provider="7tv",
|
|
is_animated=emote_data.get("animated", False),
|
|
emote_id=emote_id,
|
|
)
|
|
loaded_global += 1
|
|
|
|
print(f"7TV: Loaded {loaded_global} global emotes")
|
|
|
|
# Top 7TV emotes - use cache if valid, otherwise fetch all
|
|
if self._is_7tv_cache_valid("top"):
|
|
print("7TV: Using cached top emotes")
|
|
top_emotes = self._load_7tv_cache("top")
|
|
else:
|
|
top_emotes = await self._fetch_7tv_emotes_by_sort("TOP_ALL_TIME", "top")
|
|
if top_emotes:
|
|
self._save_7tv_cache(top_emotes, "top")
|
|
|
|
loaded_top = self._load_7tv_emotes_to_dict(top_emotes)
|
|
if loaded_top > 0:
|
|
print(f"7TV: Loaded {loaded_top} top emotes")
|
|
|
|
# Trending 7TV emotes - use cache if valid, otherwise fetch
|
|
# Trending has fewer pages typically, so limit to 50 pages
|
|
if self._is_7tv_cache_valid("trending"):
|
|
print("7TV: Using cached trending emotes")
|
|
trending_emotes = self._load_7tv_cache("trending")
|
|
else:
|
|
trending_emotes = await self._fetch_7tv_emotes_by_sort("TRENDING_MONTHLY", "trending", max_pages=50)
|
|
if trending_emotes:
|
|
self._save_7tv_cache(trending_emotes, "trending")
|
|
|
|
loaded_trending = self._load_7tv_emotes_to_dict(trending_emotes)
|
|
if loaded_trending > 0:
|
|
print(f"7TV: Loaded {loaded_trending} trending emotes")
|
|
|
|
# Channel 7TV emotes - try channel ID first, then username
|
|
channel_url = None
|
|
if self.channel_id:
|
|
channel_url = f"https://7tv.io/v3/users/twitch/{self.channel_id}"
|
|
else:
|
|
channel_url = f"https://7tv.io/v3/users/twitch/{self.channel}"
|
|
|
|
async with self.session.get(channel_url) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
emote_set = data.get("emote_set", {})
|
|
for emote in emote_set.get("emotes", []):
|
|
code = emote.get("name")
|
|
emote_id = emote.get("id")
|
|
url = self._get_7tv_emote_url(emote)
|
|
if code and url:
|
|
emote_data = emote.get("data", {})
|
|
self.channel_emotes[code] = Emote(
|
|
code=code,
|
|
url=url,
|
|
provider="7tv",
|
|
emote_id=emote_id,
|
|
is_animated=emote_data.get("animated", False),
|
|
)
|
|
loaded_channel += 1
|
|
elif resp.status == 404:
|
|
print(f"7TV: No emotes found for channel {self.channel}")
|
|
|
|
if loaded_channel > 0:
|
|
print(f"7TV: Loaded {loaded_channel} channel emotes")
|
|
|
|
except Exception as e:
|
|
print(f"7TV emote load error: {e}")
|