diff --git a/app/assets/web/index.html b/app/assets/web/index.html index d82f6cf..6487f05 100644 --- a/app/assets/web/index.html +++ b/app/assets/web/index.html @@ -187,6 +187,8 @@ const theme = document.getElementById('livechat-theme').value; const direction = document.getElementById('livechat-direction').value; + const fontsize = document.getElementById('livechat-fontsize').value; + const hidetime = document.getElementById('livechat-hidetime').value; const urlInput = document.getElementById('livechat-url'); const openLink = document.getElementById('livechat-open'); @@ -195,6 +197,8 @@ if (theme !== 'dark') params.push(`theme=${theme}`); if (direction !== 'down') params.push(`direction=${direction}`); + if (fontsize !== 'medium') params.push(`fontsize=${fontsize}`); + if (hidetime === 'true') params.push(`hidetime=true`); if (params.length > 0) { url += '?' + params.join('&'); diff --git a/app/assets/web/widgets/livechat/app.js b/app/assets/web/widgets/livechat/app.js index 8f078a2..87babcd 100644 --- a/app/assets/web/widgets/livechat/app.js +++ b/app/assets/web/widgets/livechat/app.js @@ -27,6 +27,16 @@ class LiveChatWidget { document.body.classList.add('direction-up'); } + // Font size: small, medium (default), large, xlarge + const fontSize = urlParams.get('fontsize') || 'medium'; + document.body.classList.add(`font-${fontSize}`); + + // Hide timestamp option + const hideTime = urlParams.get('hidetime'); + if (hideTime === 'true' || hideTime === '1') { + document.body.classList.add('hide-time'); + } + this.init(); } diff --git a/app/assets/web/widgets/livechat/style.css b/app/assets/web/widgets/livechat/style.css index fde92a5..59acd19 100644 --- a/app/assets/web/widgets/livechat/style.css +++ b/app/assets/web/widgets/livechat/style.css @@ -43,6 +43,40 @@ body.theme-dark .chat-message { background: rgba(0, 0, 0, 0.6); } +/* Font size options */ +body.font-small { + --chat-font-size: 12px; + --chat-username-size: 12px; + --chat-badge-size: 14px; + --chat-emote-size: 20px; +} + +body.font-medium { + --chat-font-size: 14px; + --chat-username-size: 14px; + --chat-badge-size: 16px; + --chat-emote-size: 24px; +} + +body.font-large { + --chat-font-size: 18px; + --chat-username-size: 18px; + --chat-badge-size: 20px; + --chat-emote-size: 32px; +} + +body.font-xlarge { + --chat-font-size: 24px; + --chat-username-size: 24px; + --chat-badge-size: 26px; + --chat-emote-size: 40px; +} + +/* Hide timestamp option */ +body.hide-time .timestamp { + display: none; +} + #chat-container { width: 100vw; height: 100vh; @@ -53,7 +87,7 @@ body.theme-dark .chat-message { #chat-messages { flex: 1; - overflow-y: auto; + overflow-y: hidden; overflow-x: hidden; display: flex; flex-direction: column; @@ -168,8 +202,8 @@ body.direction-up .chat-message { } .badge { - width: 18px; - height: 18px; + width: var(--chat-badge-size, 18px); + height: var(--chat-badge-size, 18px); display: inline-flex; align-items: center; justify-content: center; @@ -190,19 +224,19 @@ body.direction-up .chat-message { .username { font-weight: bold; - font-size: 14px; + font-size: var(--chat-username-size, 14px); text-shadow: 1px 1px 2px rgba(0, 0, 0, 0.8); } .timestamp { - font-size: 11px; + font-size: calc(var(--chat-font-size, 14px) * 0.8); color: #aaa; opacity: 0.8; } /* Message text */ .message-text { - font-size: 14px; + font-size: var(--chat-font-size, 14px); line-height: 1.4; word-wrap: break-word; overflow-wrap: break-word; @@ -213,8 +247,10 @@ body.direction-up .chat-message { display: inline-block; vertical-align: middle; margin: 0 2px; - max-height: 28px; - max-width: 28px; + height: var(--chat-emote-size, 28px); + width: auto; + max-width: calc(var(--chat-emote-size, 28px) * 3); + object-fit: contain; image-rendering: pixelated; } diff --git a/app/providers/twitch_chat.py b/app/providers/twitch_chat.py index 9bf4e8a..64d9be2 100644 --- a/app/providers/twitch_chat.py +++ b/app/providers/twitch_chat.py @@ -1,15 +1,57 @@ from __future__ import annotations import asyncio +import json import re -from datetime import datetime +from datetime import datetime, timedelta +from pathlib import Path from typing import Optional import aiohttp from app.chat_models import ChatBadge, ChatMessage, ChatUser, Emote, Platform, UserRole +from app.paths import get_data_dir from app.state import AppState +# Cache settings +BTTV_TOP_CACHE_FILE = "bttv_top_emotes.json" +BTTV_TRENDING_CACHE_FILE = "bttv_trending_emotes.json" +SEVENTV_TOP_CACHE_FILE = "7tv_top_emotes.json" +SEVENTV_TRENDING_CACHE_FILE = "7tv_trending_emotes.json" +EMOTE_CACHE_MAX_AGE = timedelta(hours=24) # Refresh cache after 24 hours +TRENDING_CACHE_MAX_AGE = timedelta(hours=6) # Refresh trending more frequently + +# 7TV GraphQL query for emote search (supports different sort options) +SEVENTV_EMOTES_QUERY = """ +query EmoteSearch($page: Int, $perPage: Int!, $sortBy: SortBy!) { + emotes { + search( + query: null + tags: {tags: [], match: ANY} + sort: {sortBy: $sortBy, order: DESCENDING} + filters: {} + page: $page + perPage: $perPage + ) { + items { + id + defaultName + images { + url + mime + size + scale + width + frameCount + } + } + totalCount + pageCount + } + } +} +""" + class TwitchChatClient: """ @@ -378,6 +420,9 @@ class TwitchChatClient: if not self.session: return + loaded_global = 0 + loaded_channel = 0 + try: # Global FFZ emotes async with self.session.get("https://api.frankerfacez.com/v1/set/global") as resp: @@ -392,6 +437,9 @@ class TwitchChatClient: self.global_emotes[code] = Emote( code=code, url=f"https:{url}" if url.startswith("//") else url, provider="ffz" ) + loaded_global += 1 + + print(f"FFZ: Loaded {loaded_global} global emotes") # Channel-specific FFZ emotes async with self.session.get(f"https://api.frankerfacez.com/v1/room/{self.channel}") as resp: @@ -406,14 +454,128 @@ class TwitchChatClient: self.channel_emotes[code] = Emote( code=code, url=f"https:{url}" if url.startswith("//") else url, provider="ffz" ) + loaded_channel += 1 + + if loaded_channel > 0: + print(f"FFZ: Loaded {loaded_channel} channel emotes") + except Exception as e: print(f"FFZ emote load error: {e}") + def _get_bttv_cache_path(self, cache_type: str = "top") -> Path: + """Get the path to the BTTV emote cache file.""" + if cache_type == "trending": + return get_data_dir() / BTTV_TRENDING_CACHE_FILE + return get_data_dir() / BTTV_TOP_CACHE_FILE + + def _is_bttv_cache_valid(self, cache_type: str = "top") -> bool: + """Check if the BTTV cache exists and is not expired.""" + cache_path = self._get_bttv_cache_path(cache_type) + if not cache_path.exists(): + return False + + # Check cache age - trending refreshes more frequently + cache_mtime = datetime.fromtimestamp(cache_path.stat().st_mtime) + max_age = TRENDING_CACHE_MAX_AGE if cache_type == "trending" else EMOTE_CACHE_MAX_AGE + return datetime.now() - cache_mtime < max_age + + def _load_bttv_cache(self, cache_type: str = "top") -> list[dict]: + """Load BTTV emotes from cache file.""" + cache_path = self._get_bttv_cache_path(cache_type) + try: + with open(cache_path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f"BTTV: Error loading {cache_type} cache: {e}") + return [] + + def _save_bttv_cache(self, emotes: list[dict], cache_type: str = "top") -> None: + """Save BTTV emotes to cache file.""" + cache_path = self._get_bttv_cache_path(cache_type) + try: + cache_path.parent.mkdir(parents=True, exist_ok=True) + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(emotes, f) + print(f"BTTV: Saved {len(emotes)} {cache_type} emotes to cache") + except Exception as e: + print(f"BTTV: Error saving {cache_type} cache: {e}") + + async def _fetch_bttv_emotes_by_type(self, emote_type: str, label: str, max_pages: int = 100) -> list[dict]: + """Fetch BTTV emotes by paginating through the API with a specific type (top/trending).""" + if not self.session: + return [] + + all_emotes: list[dict] = [] + before_cursor: Optional[str] = None + page = 1 + + print(f"BTTV: Fetching {label} shared emotes...") + + while page <= max_pages: + url = f"https://api.betterttv.net/3/emotes/shared/{emote_type}?limit=100" + if before_cursor: + url += f"&before={before_cursor}" + + try: + async with self.session.get(url) as resp: + if resp.status != 200: + print(f"BTTV: Error fetching {label} page {page}: status {resp.status}") + break + + emotes = await resp.json() + if not emotes: + break # No more emotes + + all_emotes.extend(emotes) + + # Get the cursor for next page from the last item + last_item = emotes[-1] + before_cursor = last_item.get("id") + + if not before_cursor: + break + + # Log every 10 pages to reduce spam + if page % 10 == 0 or page == 1: + print(f"BTTV: Fetched {label} page {page} (total: {len(all_emotes)})") + page += 1 + + # Small delay to be nice to the API + await asyncio.sleep(0.1) + + except Exception as e: + print(f"BTTV: Error fetching {label} page {page}: {e}") + break + + print(f"BTTV: Finished fetching {len(all_emotes)} {label} emotes") + return all_emotes + + def _load_bttv_emotes_to_dict(self, emotes: list[dict]) -> int: + """Load BTTV emotes from a list into the global emotes dictionary.""" + loaded = 0 + for item in emotes: + emote = item.get("emote", {}) + code = emote.get("code") + emote_id = emote.get("id") + if code and emote_id and code not in self.global_emotes: + self.global_emotes[code] = Emote( + code=code, + url=f"https://cdn.betterttv.net/emote/{emote_id}/1x", + provider="bttv", + ) + loaded += 1 + return loaded + async def _load_bttv_emotes(self) -> None: """Load BetterTTV emotes.""" if not self.session: return + loaded_global = 0 + loaded_channel = 0 + loaded_top = 0 + loaded_trending = 0 + try: # Global BTTV emotes async with self.session.get("https://api.betterttv.net/3/cached/emotes/global") as resp: @@ -428,9 +590,40 @@ class TwitchChatClient: url=f"https://cdn.betterttv.net/emote/{emote_id}/1x", provider="bttv", ) + loaded_global += 1 + + print(f"BTTV: Loaded {loaded_global} global emotes") + + # Top shared BTTV emotes - use cache if valid, otherwise fetch all + if self._is_bttv_cache_valid("top"): + print("BTTV: Using cached top emotes") + top_emotes = self._load_bttv_cache("top") + else: + top_emotes = await self._fetch_bttv_emotes_by_type("top", "top") + if top_emotes: + self._save_bttv_cache(top_emotes, "top") + + loaded_top = self._load_bttv_emotes_to_dict(top_emotes) + if loaded_top > 0: + print(f"BTTV: Loaded {loaded_top} top shared emotes") + + # Trending BTTV emotes - use cache if valid, otherwise fetch + # Trending typically has fewer pages, limit to 50 + if self._is_bttv_cache_valid("trending"): + print("BTTV: Using cached trending emotes") + trending_emotes = self._load_bttv_cache("trending") + else: + trending_emotes = await self._fetch_bttv_emotes_by_type("trending", "trending", max_pages=50) + if trending_emotes: + self._save_bttv_cache(trending_emotes, "trending") + + loaded_trending = self._load_bttv_emotes_to_dict(trending_emotes) + if loaded_trending > 0: + print(f"BTTV: Loaded {loaded_trending} trending emotes") - # Channel BTTV emotes - async with self.session.get(f"https://api.betterttv.net/3/cached/users/twitch/{self.channel}") as resp: + # Channel BTTV emotes - use channel ID if available + channel_identifier = self.channel_id or self.channel + async with self.session.get(f"https://api.betterttv.net/3/cached/users/twitch/{channel_identifier}") as resp: if resp.status == 200: data = await resp.json() for emote in data.get("channelEmotes", []) + data.get("sharedEmotes", []): @@ -442,14 +635,200 @@ class TwitchChatClient: url=f"https://cdn.betterttv.net/emote/{emote_id}/1x", provider="bttv", ) + loaded_channel += 1 + + if loaded_channel > 0: + print(f"BTTV: Loaded {loaded_channel} channel emotes") + except Exception as e: print(f"BTTV emote load error: {e}") + def _get_7tv_emote_url(self, emote: dict) -> Optional[str]: + """Extract the correct URL from a 7TV emote object.""" + # Try to get from data.host structure (v3 API format) + emote_data = emote.get("data", {}) + host = emote_data.get("host", {}) + + if host: + base_url = host.get("url", "") + files = host.get("files", []) + + # Find best quality webp file + for f in files: + if f.get("name") == "1x.webp": + return f"https:{base_url}/{f.get('name')}" + + # Fallback to first webp file + for f in files: + if f.get("format") == "WEBP": + return f"https:{base_url}/{f.get('name')}" + + # Last resort: construct URL + if base_url: + return f"https:{base_url}/1x.webp" + + return None + + def _get_7tv_emote_url_v4(self, emote: dict) -> Optional[str]: + """Extract the correct URL from a 7TV v4 GraphQL emote object.""" + images = emote.get("images", []) + + # Find 1x scale image + for img in images: + if img.get("scale") == 1: + url = img.get("url") + if url: + return url if url.startswith("http") else f"https:{url}" + + # Fallback to first image + if images: + url = images[0].get("url") + if url: + return url if url.startswith("http") else f"https:{url}" + + return None + + def _get_7tv_cache_path(self, cache_type: str = "top") -> Path: + """Get the path to the 7TV emote cache file.""" + if cache_type == "trending": + return get_data_dir() / SEVENTV_TRENDING_CACHE_FILE + return get_data_dir() / SEVENTV_TOP_CACHE_FILE + + def _is_7tv_cache_valid(self, cache_type: str = "top") -> bool: + """Check if the 7TV cache exists and is not expired.""" + cache_path = self._get_7tv_cache_path(cache_type) + if not cache_path.exists(): + return False + + # Check cache age - trending refreshes more frequently + cache_mtime = datetime.fromtimestamp(cache_path.stat().st_mtime) + max_age = TRENDING_CACHE_MAX_AGE if cache_type == "trending" else EMOTE_CACHE_MAX_AGE + return datetime.now() - cache_mtime < max_age + + def _load_7tv_cache(self, cache_type: str = "top") -> list[dict]: + """Load 7TV emotes from cache file.""" + cache_path = self._get_7tv_cache_path(cache_type) + try: + with open(cache_path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f"7TV: Error loading {cache_type} cache: {e}") + return [] + + def _save_7tv_cache(self, emotes: list[dict], cache_type: str = "top") -> None: + """Save 7TV emotes to cache file.""" + cache_path = self._get_7tv_cache_path(cache_type) + try: + cache_path.parent.mkdir(parents=True, exist_ok=True) + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(emotes, f) + print(f"7TV: Saved {len(emotes)} {cache_type} emotes to cache") + except Exception as e: + print(f"7TV: Error saving {cache_type} cache: {e}") + + async def _fetch_7tv_emotes_by_sort(self, sort_by: str, label: str, max_pages: int = 150) -> list[dict]: + """Fetch 7TV emotes by paginating through the GraphQL API with a specific sort.""" + if not self.session: + return [] + + all_emotes: list[dict] = [] + page = 1 + per_page = 72 # Max per page for 7TV + total_pages = None + + print(f"7TV: Fetching {label} emotes...") + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Origin": "https://7tv.app", + "Referer": "https://7tv.app/", + } + + while page <= max_pages: + if total_pages is not None and page > total_pages: + break + + payload = { + "operationName": "EmoteSearch", + "query": SEVENTV_EMOTES_QUERY, + "variables": { + "page": page, + "perPage": per_page, + "sortBy": sort_by, + } + } + + try: + async with self.session.post( + "https://api.7tv.app/v4/gql", + json=payload, + headers=headers + ) as resp: + if resp.status != 200: + print(f"7TV: Error fetching {label} page {page}: status {resp.status}") + break + + result = await resp.json() + search_data = result.get("data", {}).get("emotes", {}).get("search", {}) + items = search_data.get("items", []) + + if not items: + break # No more emotes + + all_emotes.extend(items) + + # Get total page count on first request + if total_pages is None: + total_pages = min(search_data.get("pageCount", max_pages), max_pages) + total_count = search_data.get("totalCount", 0) + print(f"7TV: Found {total_count:,} {label} emotes, fetching up to {total_pages} pages") + + # Log every 10 pages to reduce spam + if page % 10 == 0 or page == 1: + print(f"7TV: Fetched {label} page {page}/{total_pages} (total: {len(all_emotes)})") + page += 1 + + # Small delay to be nice to the API + await asyncio.sleep(0.1) + + except Exception as e: + print(f"7TV: Error fetching {label} page {page}: {e}") + break + + print(f"7TV: Finished fetching {len(all_emotes)} {label} emotes") + return all_emotes + + def _load_7tv_emotes_to_dict(self, emotes: list[dict]) -> int: + """Load 7TV emotes from a list into the global emotes dictionary.""" + loaded = 0 + for emote in emotes: + code = emote.get("defaultName") + if code and code not in self.global_emotes: + url = self._get_7tv_emote_url_v4(emote) + if url: + self.global_emotes[code] = Emote( + code=code, + url=url, + provider="7tv", + is_animated=any( + img.get("frameCount", 1) > 1 + for img in emote.get("images", []) + ), + ) + loaded += 1 + return loaded + async def _load_7tv_emotes(self) -> None: """Load 7TV emotes.""" if not self.session: return + loaded_global = 0 + loaded_channel = 0 + loaded_top = 0 + loaded_trending = 0 + try: # Global 7TV emotes async with self.session.get("https://7tv.io/v3/emote-sets/global") as resp: @@ -457,33 +836,74 @@ class TwitchChatClient: data = await resp.json() for emote in data.get("emotes", []): code = emote.get("name") - emote_data = emote.get("data", {}) - host = emote_data.get("host", {}) - if code and host: - url = f"https:{host.get('url', '')}/1x.webp" + url = self._get_7tv_emote_url(emote) + if code and url: + emote_data = emote.get("data", {}) self.global_emotes[code] = Emote( code=code, url=url, provider="7tv", - is_animated=emote.get("animated", False), + is_animated=emote_data.get("animated", False), ) + loaded_global += 1 + + print(f"7TV: Loaded {loaded_global} global emotes") + + # Top 7TV emotes - use cache if valid, otherwise fetch all + if self._is_7tv_cache_valid("top"): + print("7TV: Using cached top emotes") + top_emotes = self._load_7tv_cache("top") + else: + top_emotes = await self._fetch_7tv_emotes_by_sort("TOP_ALL_TIME", "top") + if top_emotes: + self._save_7tv_cache(top_emotes, "top") + + loaded_top = self._load_7tv_emotes_to_dict(top_emotes) + if loaded_top > 0: + print(f"7TV: Loaded {loaded_top} top emotes") + + # Trending 7TV emotes - use cache if valid, otherwise fetch + # Trending has fewer pages typically, so limit to 50 pages + if self._is_7tv_cache_valid("trending"): + print("7TV: Using cached trending emotes") + trending_emotes = self._load_7tv_cache("trending") + else: + trending_emotes = await self._fetch_7tv_emotes_by_sort("TRENDING_MONTHLY", "trending", max_pages=50) + if trending_emotes: + self._save_7tv_cache(trending_emotes, "trending") + + loaded_trending = self._load_7tv_emotes_to_dict(trending_emotes) + if loaded_trending > 0: + print(f"7TV: Loaded {loaded_trending} trending emotes") - # Channel 7TV emotes - async with self.session.get(f"https://7tv.io/v3/users/twitch/{self.channel}") as resp: + # Channel 7TV emotes - try channel ID first, then username + channel_url = None + if self.channel_id: + channel_url = f"https://7tv.io/v3/users/twitch/{self.channel_id}" + else: + channel_url = f"https://7tv.io/v3/users/twitch/{self.channel}" + + async with self.session.get(channel_url) as resp: if resp.status == 200: data = await resp.json() emote_set = data.get("emote_set", {}) for emote in emote_set.get("emotes", []): code = emote.get("name") - emote_data = emote.get("data", {}) - host = emote_data.get("host", {}) - if code and host: - url = f"https:{host.get('url', '')}/1x.webp" + url = self._get_7tv_emote_url(emote) + if code and url: + emote_data = emote.get("data", {}) self.channel_emotes[code] = Emote( code=code, url=url, provider="7tv", - is_animated=emote.get("animated", False), + is_animated=emote_data.get("animated", False), ) + loaded_channel += 1 + elif resp.status == 404: + print(f"7TV: No emotes found for channel {self.channel}") + + if loaded_channel > 0: + print(f"7TV: Loaded {loaded_channel} channel emotes") + except Exception as e: print(f"7TV emote load error: {e}") diff --git a/app/webserver.py b/app/webserver.py index 3cccd33..ddf5be8 100644 --- a/app/webserver.py +++ b/app/webserver.py @@ -57,6 +57,22 @@ async def handle_root(request: web.Request) -> web.Response: +
+ + +
+
+ + +
"""