Make twitch/youtube chat widget

This commit is contained in:
Joey Yakimowich-Payne 2026-01-07 12:47:09 -07:00
commit 0842dccf73
22 changed files with 3787 additions and 45 deletions

470
app/assets/web/config.html Normal file
View file

@ -0,0 +1,470 @@
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Chat Configuration - Streamer Widgets</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
background: #f8fafc;
color: #334155;
margin: 0;
padding: 20px;
}
.container {
max-width: 800px;
margin: 0 auto;
background: #ffffff;
border-radius: 12px;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
overflow: hidden;
}
header {
background: #0f172a;
color: white;
padding: 24px 32px;
}
h1 {
margin: 0;
font-size: 24px;
font-weight: 600;
}
.content {
padding: 32px;
}
.section {
margin-bottom: 32px;
padding-bottom: 24px;
border-bottom: 1px solid #e2e8f0;
}
.section:last-child {
border-bottom: none;
}
h2 {
font-size: 18px;
margin: 0 0 16px;
color: #1e293b;
}
.form-group {
margin-bottom: 16px;
}
label {
display: block;
margin-bottom: 6px;
font-weight: 500;
font-size: 14px;
}
input[type="text"],
input[type="number"] {
width: 100%;
padding: 10px 12px;
border: 1px solid #cbd5e1;
border-radius: 6px;
font-size: 14px;
box-sizing: border-box;
}
input[type="checkbox"] {
margin-right: 8px;
}
.checkbox-label {
display: flex;
align-items: center;
margin-bottom: 8px;
cursor: pointer;
}
button {
background: #0f172a;
color: white;
border: none;
padding: 10px 20px;
border-radius: 6px;
font-size: 14px;
font-weight: 500;
cursor: pointer;
margin-right: 8px;
}
button:hover {
background: #1e293b;
}
button.secondary {
background: #e2e8f0;
color: #0f172a;
}
button.secondary:hover {
background: #cbd5e1;
}
button:disabled {
background: #94a3b8;
cursor: not-allowed;
opacity: 0.7;
}
button:disabled:hover {
background: #94a3b8;
}
.status {
margin-top: 16px;
padding: 12px;
border-radius: 6px;
font-size: 14px;
display: none;
}
.status.success {
background: #dcfce7;
color: #166534;
display: block;
}
.status.error {
background: #fee2e2;
color: #991b1b;
display: block;
}
.platform-status {
display: inline-block;
padding: 4px 8px;
border-radius: 4px;
font-size: 12px;
font-weight: 500;
margin-left: 8px;
}
.connected {
background: #dcfce7;
color: #166534;
}
.disconnected {
background: #fee2e2;
color: #991b1b;
}
.help-text {
font-size: 12px;
color: #64748b;
margin-top: 4px;
}
.warning-box {
background: #fef3c7;
border: 1px solid #f59e0b;
border-radius: 6px;
padding: 16px;
margin-bottom: 24px;
}
.warning-box h3 {
margin: 0 0 8px 0;
font-size: 14px;
font-weight: 600;
color: #92400e;
}
.warning-box p {
margin: 0 0 8px 0;
font-size: 13px;
color: #78350f;
line-height: 1.5;
}
.warning-box code {
background: rgba(0,0,0,0.1);
padding: 2px 6px;
border-radius: 3px;
font-family: monospace;
font-size: 12px;
}
.warning-box ul {
margin: 8px 0;
padding-left: 20px;
}
.warning-box li {
margin: 4px 0;
font-size: 13px;
color: #78350f;
}
</style>
</head>
<body>
<div class="container">
<header>
<h1>Live Chat Configuration</h1>
</header>
<div class="content">
<!-- Twitch Section -->
<div class="section">
<h2>
Twitch
<span id="twitch-status" class="platform-status disconnected">Not Connected</span>
</h2>
<div class="form-group">
<label for="twitch-channel">Channel Name</label>
<input type="text" id="twitch-channel" placeholder="your_channel_name">
<div class="help-text">Enter the Twitch channel you want to monitor. Chat works immediately - no login required!</div>
</div>
<button id="twitch-login-btn" onclick="loginTwitch()">Login with Twitch</button>
<span id="twitch-login-note" class="help-text" style="display: inline; margin-left: 8px;">(Optional - for extra features)</span>
</div>
<!-- YouTube Section -->
<div class="section">
<h2>
YouTube
<span id="youtube-status" class="platform-status disconnected">Not Logged In</span>
</h2>
<div id="youtube-not-configured" class="warning-box" style="display: none; margin-bottom: 16px;">
<h3>⚠️ YouTube Not Available</h3>
<p>YouTube requires OAuth credentials to be configured. See the Advanced Settings section below for setup instructions.</p>
</div>
<div class="form-group">
<label for="youtube-video-id">Video/Stream ID <span style="color: #64748b; font-weight: normal;">(Optional)</span></label>
<input type="text" id="youtube-video-id" placeholder="Leave blank to auto-detect your stream">
<div class="help-text">
<strong>Auto-detect:</strong> Leave blank to automatically find your active live stream<br>
<strong>Manual:</strong> Enter a video ID from the URL (e.g., youtube.com/watch?v=<strong>dQw4w9WgXcQ</strong>)
</div>
</div>
<button id="youtube-login-btn" onclick="loginYouTube()">Login with YouTube</button>
</div>
<!-- Emote Providers -->
<div class="section">
<h2>Emote Providers</h2>
<label class="checkbox-label">
<input type="checkbox" id="enable-ffz" checked>
Enable FrankerFaceZ emotes
</label>
<label class="checkbox-label">
<input type="checkbox" id="enable-bttv" checked>
Enable BetterTTV emotes
</label>
<label class="checkbox-label">
<input type="checkbox" id="enable-7tv" checked>
Enable 7TV emotes
</label>
</div>
<!-- Display Settings -->
<div class="section">
<h2>Display Settings</h2>
<div class="form-group">
<label for="max-messages">Maximum Messages</label>
<input type="number" id="max-messages" value="50" min="10" max="200">
<div class="help-text">Number of messages to keep in the widget</div>
</div>
<label class="checkbox-label">
<input type="checkbox" id="show-timestamps" checked>
Show timestamps
</label>
<label class="checkbox-label">
<input type="checkbox" id="show-badges" checked>
Show user badges
</label>
<label class="checkbox-label">
<input type="checkbox" id="unified-view" checked>
Unified view (mix both platforms)
</label>
</div>
<!-- Actions -->
<div class="section">
<button onclick="saveConfig()">Save Configuration</button>
<button class="secondary" onclick="window.location.href='/'">Back to Widgets</button>
<div id="status" class="status"></div>
</div>
<!-- Advanced Settings -->
<div class="section" style="background: #f1f5f9; margin: 0 -32px -32px; padding: 24px 32px; border-radius: 0 0 12px 12px;">
<details>
<summary style="cursor: pointer; font-size: 16px; font-weight: 600; color: #1e293b; margin-bottom: 12px;">
🔧 Advanced Settings
</summary>
<div style="margin-top: 16px;">
<p style="font-size: 14px; color: #64748b; margin: 0 0 12px;">
Configuration files are stored at:
</p>
<code id="config-path-display" style="display: block; background: #e2e8f0; padding: 10px 12px; border-radius: 6px; font-size: 13px; margin-bottom: 12px; word-break: break-all;"></code>
<button class="secondary" onclick="openConfigDirectory()">📁 Open Config Directory</button>
<div id="oauth-setup-instructions" style="margin-top: 20px; padding-top: 16px; border-top: 1px solid #cbd5e1; display: none;">
<h4 style="margin: 0 0 12px; font-size: 14px; color: #1e293b;">Setting Up OAuth Credentials</h4>
<p style="font-size: 13px; color: #64748b; line-height: 1.6;">
To enable login functionality, you need to create OAuth apps and add your credentials to <code>config.json</code>:
</p>
<ul style="font-size: 13px; color: #64748b; line-height: 1.8; padding-left: 20px;">
<li><strong>Twitch:</strong> Create an app at <a href="https://dev.twitch.tv/console/apps" target="_blank">dev.twitch.tv/console/apps</a></li>
<li><strong>YouTube:</strong> Create credentials in <a href="https://console.cloud.google.com/" target="_blank">Google Cloud Console</a> with YouTube Data API v3 enabled</li>
</ul>
<p style="font-size: 13px; color: #64748b;">
Set redirect URI to: <code>http://localhost:8765/auth/twitch/callback</code> (or youtube)
</p>
</div>
</div>
</details>
</div>
</div>
</div>
<script>
// Check OAuth configuration status
async function checkOAuthStatus() {
try {
const resp = await fetch('/api/oauth/status');
if (resp.ok) {
const data = await resp.json();
const configPathDisplay = document.getElementById('config-path-display');
const youtubeWarning = document.getElementById('youtube-not-configured');
const oauthSetupInstructions = document.getElementById('oauth-setup-instructions');
// Always show the config file path
if (configPathDisplay) {
configPathDisplay.textContent = data.config_file;
}
// Handle Twitch OAuth status
const twitchBtn = document.getElementById('twitch-login-btn');
const twitchNote = document.getElementById('twitch-login-note');
if (!data.twitch_configured) {
twitchBtn.disabled = true;
twitchBtn.title = 'OAuth not configured - chat still works anonymously!';
twitchBtn.textContent = 'Login Unavailable';
twitchNote.textContent = '(Chat works without login!)';
}
// Handle YouTube OAuth status
const youtubeBtn = document.getElementById('youtube-login-btn');
if (!data.youtube_configured) {
youtubeBtn.disabled = true;
youtubeBtn.title = 'OAuth credentials required for YouTube';
youtubeWarning.style.display = 'block';
}
// Show OAuth setup instructions if either platform is not configured
if (!data.twitch_configured || !data.youtube_configured) {
oauthSetupInstructions.style.display = 'block';
}
}
} catch (err) {
console.error('Failed to check OAuth status:', err);
}
}
// Load current configuration
async function loadConfig() {
try {
const resp = await fetch('/api/chat/config');
if (resp.ok) {
const config = await resp.json();
// Populate form
document.getElementById('twitch-channel').value = config.twitch_channel || '';
document.getElementById('youtube-video-id').value = config.youtube_video_id || '';
document.getElementById('enable-ffz').checked = config.enable_ffz !== false;
document.getElementById('enable-bttv').checked = config.enable_bttv !== false;
document.getElementById('enable-7tv').checked = config.enable_7tv !== false;
document.getElementById('max-messages').value = config.max_messages || 50;
document.getElementById('show-timestamps').checked = config.show_timestamps !== false;
document.getElementById('show-badges').checked = config.show_badges !== false;
document.getElementById('unified-view').checked = config.unified_view !== false;
}
} catch (err) {
console.error('Failed to load config:', err);
}
}
// Check authentication status (whether user has logged in)
async function checkAuthStatus() {
try {
const resp = await fetch('/api/auth/status');
if (resp.ok) {
const data = await resp.json();
// Update Twitch status
const twitchStatus = document.getElementById('twitch-status');
if (data.twitch_authenticated) {
twitchStatus.className = 'platform-status connected';
twitchStatus.textContent = 'Logged In';
} else {
twitchStatus.className = 'platform-status disconnected';
twitchStatus.textContent = 'Not Logged In';
}
// Update YouTube status
const youtubeStatus = document.getElementById('youtube-status');
if (data.youtube_authenticated) {
youtubeStatus.className = 'platform-status connected';
youtubeStatus.textContent = 'Logged In';
} else {
youtubeStatus.className = 'platform-status disconnected';
youtubeStatus.textContent = 'Not Logged In';
}
}
} catch (err) {
console.error('Failed to check auth status:', err);
}
}
async function saveConfig() {
const config = {
twitch_channel: document.getElementById('twitch-channel').value,
youtube_video_id: document.getElementById('youtube-video-id').value,
enable_ffz: document.getElementById('enable-ffz').checked,
enable_bttv: document.getElementById('enable-bttv').checked,
enable_7tv: document.getElementById('enable-7tv').checked,
max_messages: parseInt(document.getElementById('max-messages').value),
show_timestamps: document.getElementById('show-timestamps').checked,
show_badges: document.getElementById('show-badges').checked,
unified_view: document.getElementById('unified-view').checked,
};
try {
const resp = await fetch('/api/chat/config', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(config),
});
const status = document.getElementById('status');
if (resp.ok) {
status.className = 'status success';
status.textContent = 'Configuration saved successfully!';
} else {
status.className = 'status error';
status.textContent = 'Failed to save configuration';
}
} catch (err) {
const status = document.getElementById('status');
status.className = 'status error';
status.textContent = 'Error: ' + err.message;
}
}
function loginTwitch() {
window.open('/auth/twitch/login', 'TwitchAuth', 'width=600,height=700');
}
function loginYouTube() {
window.open('/auth/youtube/login', 'YouTubeAuth', 'width=600,height=700');
}
// Called by OAuth popup when authentication succeeds
window.onAuthComplete = function(platform) {
console.log(`${platform} authentication complete`);
checkAuthStatus();
};
async function openConfigDirectory() {
try {
const resp = await fetch('/api/config/open-directory', { method: 'POST' });
const data = await resp.json();
if (resp.ok) {
// Directory opened successfully
} else {
const path = document.getElementById('config-path-display').textContent;
alert('Failed to open directory. Path: ' + path);
}
} catch (err) {
alert('Error: ' + err.message);
}
}
// Load config and check status on page load
checkOAuthStatus();
loadConfig();
checkAuthStatus();
</script>
</body>
</html>

View file

@ -80,6 +80,7 @@
.widget-url-row {
display: flex;
align-items: center;
gap: 8px;
background: #ffffff;
border: 1px solid #cbd5e1;
border-radius: 6px;
@ -87,7 +88,56 @@
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace;
font-size: 13px;
color: #475569;
overflow-x: auto;
}
.widget-url-row input {
flex: 1;
border: none;
background: transparent;
font-family: inherit;
font-size: inherit;
color: inherit;
outline: none;
min-width: 0;
}
.copy-btn {
background: #0f172a;
color: white;
border: none;
padding: 4px 10px;
border-radius: 4px;
font-size: 12px;
cursor: pointer;
flex-shrink: 0;
}
.copy-btn:hover {
background: #1e293b;
}
.widget-options {
display: flex;
flex-wrap: wrap;
gap: 16px;
margin-top: 8px;
padding-top: 12px;
border-top: 1px solid #e2e8f0;
}
.option-group {
display: flex;
flex-direction: column;
gap: 6px;
}
.option-group label {
font-size: 12px;
font-weight: 600;
color: #64748b;
text-transform: uppercase;
}
.option-group select {
padding: 6px 10px;
border: 1px solid #cbd5e1;
border-radius: 4px;
font-size: 13px;
background: white;
cursor: pointer;
}
.footer {
border-top: 1px solid #e2e8f0;
@ -97,6 +147,13 @@
color: #64748b;
text-align: center;
}
.footer a {
color: #0f172a;
text-decoration: none;
}
.footer a:hover {
text-decoration: underline;
}
code {
font-family: inherit;
background: rgba(0,0,0,0.05);
@ -118,8 +175,50 @@
</ul>
</div>
<div class="footer">
Server running at <code>{{HOSTPORT}}</code>
Server running at <code>{{HOSTPORT}}</code> · <a href="/config">Configure Chat</a>
</div>
</div>
<script>
// Live Chat URL generator
function updateLiveChatUrl() {
const baseUrl = document.getElementById('livechat-base-url');
if (!baseUrl) return;
const theme = document.getElementById('livechat-theme').value;
const direction = document.getElementById('livechat-direction').value;
const urlInput = document.getElementById('livechat-url');
const openLink = document.getElementById('livechat-open');
let url = baseUrl.value;
const params = [];
if (theme !== 'dark') params.push(`theme=${theme}`);
if (direction !== 'down') params.push(`direction=${direction}`);
if (params.length > 0) {
url += '?' + params.join('&');
}
urlInput.value = url;
openLink.href = url;
}
function copyUrl(inputId) {
const input = document.getElementById(inputId);
input.select();
navigator.clipboard.writeText(input.value).then(() => {
const btn = input.parentElement.querySelector('.copy-btn');
const originalText = btn.textContent;
btn.textContent = 'Copied!';
setTimeout(() => btn.textContent = originalText, 1500);
});
}
// Initialize on page load
document.addEventListener('DOMContentLoaded', () => {
updateLiveChatUrl();
});
</script>
</body>
</html>

View file

@ -0,0 +1,277 @@
class LiveChatWidget {
constructor() {
this.ws = null;
this.messagesContainer = document.getElementById('chat-messages');
this.maxMessages = 50;
this.autoScroll = true;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.platformIcons = {
twitch: `<svg width="20" height="20" viewBox="0 0 24 24" fill="#9146FF"><path d="M11.571 4.714h1.715v5.143H11.57zm4.715 0H18v5.143h-1.714zM6 0L1.714 4.286v15.428h5.143V24l4.286-4.286h3.428L22.286 12V0zm14.571 11.143l-3.428 3.428h-3.429l-3 3v-3H6.857V1.714h13.714Z"/></svg>`,
youtube: `<svg width="20" height="20" viewBox="0 0 24 24" fill="#FF0000"><path d="M23.498 6.186a3.016 3.016 0 0 0-2.122-2.136C19.505 3.545 12 3.545 12 3.545s-7.505 0-9.377.505A3.017 3.017 0 0 0 .502 6.186C0 8.07 0 12 0 12s0 3.93.502 5.814a3.016 3.016 0 0 0 2.122 2.136c1.871.505 9.376.505 9.376.505s7.505 0 9.377-.505a3.015 3.015 0 0 0 2.122-2.136C24 15.93 24 12 24 12s0-3.93-.502-5.814zM9.545 15.568V8.432L15.818 12l-6.273 3.568z"/></svg>`,
};
// Apply theme from URL query param
const urlParams = new URLSearchParams(window.location.search);
const theme = urlParams.get('theme');
if (theme === 'light') {
document.body.classList.add('theme-light');
} else {
document.body.classList.add('theme-dark');
}
// Direction: 'down' = newest at bottom (default), 'up' = newest at top
this.direction = urlParams.get('direction') || 'down';
if (this.direction === 'up') {
document.body.classList.add('direction-up');
}
this.init();
}
init() {
this.showStatus('Connecting to chat...', 'connecting');
this.connect();
// Handle scroll to detect manual scrolling
this.messagesContainer.addEventListener('scroll', () => {
const container = this.messagesContainer;
const isAtBottom = container.scrollHeight - container.scrollTop <= container.clientHeight + 50;
this.autoScroll = isAtBottom;
});
}
connect() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws`;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('Chat WebSocket connected');
this.reconnectAttempts = 0;
this.clearStatus();
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.handleMessage(data);
} catch (err) {
console.error('Failed to parse message:', err);
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
this.ws.onclose = () => {
console.log('Chat WebSocket disconnected');
this.showStatus('Disconnected. Reconnecting...', 'error');
this.reconnect();
};
}
reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.showStatus('Failed to connect. Please refresh.', 'error');
return;
}
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
setTimeout(() => {
console.log(`Reconnecting... (attempt ${this.reconnectAttempts})`);
this.connect();
}, delay);
}
handleMessage(data) {
switch (data.type) {
case 'chat_message':
this.addChatMessage(data.data);
break;
case 'chat_history':
// Initial history load (comes in oldest-to-newest order)
if (data.data && Array.isArray(data.data)) {
// For both directions, we add messages in order
// The addChatMessage handles placement based on direction
data.data.forEach(msg => this.addChatMessage(msg, false));
if (this.direction === 'down') {
this.scrollToBottom();
}
}
break;
default:
// Ignore other message types
break;
}
}
addChatMessage(messageData, shouldAnimate = true) {
const msgElement = this.createMessageElement(messageData);
if (!shouldAnimate) {
msgElement.style.animation = 'none';
}
if (this.direction === 'up') {
// Direction UP: newest at bottom (anchored), older messages bubble upward
// With flex-direction: column-reverse, prepending puts new message at visual bottom
this.messagesContainer.insertBefore(msgElement, this.messagesContainer.firstChild);
// Limit total messages (remove oldest = last child = visually at top)
while (this.messagesContainer.children.length > this.maxMessages) {
this.messagesContainer.removeChild(this.messagesContainer.lastChild);
}
} else {
// Direction DOWN (default): newest at bottom, scroll down
this.messagesContainer.appendChild(msgElement);
// Limit total messages (remove oldest = first child = visually at top)
while (this.messagesContainer.children.length > this.maxMessages) {
this.messagesContainer.removeChild(this.messagesContainer.firstChild);
}
if (this.autoScroll) {
this.scrollToBottom();
}
}
}
createMessageElement(data) {
const msg = document.createElement('div');
msg.className = `chat-message ${data.platform}`;
msg.dataset.messageId = data.id;
if (data.is_action) {
msg.classList.add('action');
}
// Platform icon (optional)
if (data.platform) {
const iconDiv = document.createElement('div');
iconDiv.className = 'platform-icon';
iconDiv.innerHTML = this.platformIcons[data.platform] || '';
msg.appendChild(iconDiv);
}
// Message content
const content = document.createElement('div');
content.className = 'message-content';
// User info line
const userInfo = document.createElement('div');
userInfo.className = 'user-info';
// Badges
if (data.user.badges && data.user.badges.length > 0) {
const badgesContainer = document.createElement('div');
badgesContainer.className = 'user-badges';
data.user.badges.forEach(badge => {
const badgeEl = document.createElement('span');
badgeEl.className = 'badge';
badgeEl.title = badge.name;
if (badge.icon_url) {
badgeEl.innerHTML = `<img src="${badge.icon_url}" alt="${badge.name}">`;
} else {
// Simple text badge fallback
badgeEl.textContent = badge.name.charAt(0).toUpperCase();
}
badgesContainer.appendChild(badgeEl);
});
userInfo.appendChild(badgesContainer);
}
// Username
const username = document.createElement('span');
username.className = 'username';
username.textContent = data.user.display_name;
if (data.user.color) {
username.style.color = data.user.color;
}
userInfo.appendChild(username);
// Timestamp (optional)
const timestamp = document.createElement('span');
timestamp.className = 'timestamp';
timestamp.textContent = this.formatTime(data.timestamp);
userInfo.appendChild(timestamp);
content.appendChild(userInfo);
// Message text with emotes
const messageText = document.createElement('div');
messageText.className = 'message-text';
messageText.innerHTML = this.parseMessageWithEmotes(data.message, data.emotes);
content.appendChild(messageText);
msg.appendChild(content);
return msg;
}
parseMessageWithEmotes(message, emotes) {
if (!emotes || emotes.length === 0) {
return this.escapeHtml(message);
}
// Build a map of emote codes to emote data
const emoteMap = {};
emotes.forEach(emote => {
emoteMap[emote.code] = emote;
});
// Split message into words and replace emotes
const words = message.split(' ');
const result = words.map(word => {
if (emoteMap[word]) {
const emote = emoteMap[word];
const animatedClass = emote.is_animated ? 'animated' : '';
return `<img class="emote ${animatedClass}" src="${emote.url}" alt="${emote.code}" title="${emote.code} (${emote.provider})">`;
}
return this.escapeHtml(word);
});
return result.join(' ');
}
escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
formatTime(timestamp) {
const date = new Date(timestamp);
const hours = date.getHours().toString().padStart(2, '0');
const minutes = date.getMinutes().toString().padStart(2, '0');
return `${hours}:${minutes}`;
}
scrollToBottom() {
this.messagesContainer.scrollTop = this.messagesContainer.scrollHeight;
}
showStatus(message, className = '') {
this.messagesContainer.innerHTML = `<div class="status-message ${className}">${message}</div>`;
}
clearStatus() {
const statusMsg = this.messagesContainer.querySelector('.status-message');
if (statusMsg) {
statusMsg.remove();
}
}
}
// Initialize when DOM is ready
document.addEventListener('DOMContentLoaded', () => {
new LiveChatWidget();
});

View file

@ -0,0 +1,15 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Live Chat</title>
<link rel="stylesheet" href="style.css">
</head>
<body>
<div id="chat-container">
<div id="chat-messages"></div>
</div>
<script src="app.js"></script>
</body>
</html>

View file

@ -0,0 +1,266 @@
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: transparent;
overflow: hidden;
color: #fff; /* Default dark mode text */
}
/* Light Theme overrides */
body.theme-light {
color: #1a1a1a;
}
body.theme-light .chat-message {
background: rgba(255, 255, 255, 0.85);
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
body.theme-light .username {
text-shadow: none;
}
body.theme-light .timestamp {
color: #666;
}
body.theme-light .badge {
background: rgba(0, 0, 0, 0.1);
color: #333;
}
body.theme-light #chat-messages::-webkit-scrollbar-thumb {
background: rgba(0, 0, 0, 0.2);
}
/* Dark Theme (Default) overrides for clarity */
body.theme-dark .chat-message {
background: rgba(0, 0, 0, 0.6);
}
#chat-container {
width: 100vw;
height: 100vh;
display: flex;
flex-direction: column;
padding: 10px;
}
#chat-messages {
flex: 1;
overflow-y: auto;
overflow-x: hidden;
display: flex;
flex-direction: column;
gap: 4px;
}
/* Hide scrollbar but keep functionality */
#chat-messages::-webkit-scrollbar {
width: 4px;
}
#chat-messages::-webkit-scrollbar-track {
background: transparent;
}
#chat-messages::-webkit-scrollbar-thumb {
background: rgba(255, 255, 255, 0.2);
border-radius: 2px;
}
/* Chat message */
.chat-message {
display: flex;
align-items: flex-start;
gap: 8px;
padding: 6px 10px;
background: rgba(0, 0, 0, 0.4);
border-radius: 4px;
animation: slideIn 0.2s ease-out;
word-wrap: break-word;
}
@keyframes slideIn {
from {
opacity: 0;
transform: translateX(-20px);
}
to {
opacity: 1;
transform: translateX(0);
}
}
/* Direction UP: Messages anchor to bottom, bubble upward */
body.direction-up #chat-messages {
flex-direction: column-reverse;
justify-content: flex-start;
}
body.direction-up .chat-message {
animation: slideUp 0.2s ease-out;
}
@keyframes slideUp {
from {
opacity: 0;
transform: translateY(20px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
.chat-message.twitch {
border-left: 3px solid #9146FF;
}
.chat-message.youtube {
border-left: 3px solid #FF0000;
}
.chat-message.action {
font-style: italic;
background: rgba(100, 100, 100, 0.3);
}
/* Platform icon */
.platform-icon {
flex-shrink: 0;
width: 20px;
height: 20px;
margin-top: 2px;
}
.platform-icon img {
width: 100%;
height: 100%;
}
/* Message content area */
.message-content {
flex: 1;
display: flex;
flex-direction: column;
gap: 2px;
min-width: 0;
}
/* User info line */
.user-info {
display: flex;
align-items: center;
gap: 6px;
flex-wrap: wrap;
}
.user-badges {
display: flex;
gap: 4px;
align-items: center;
}
.badge {
width: 18px;
height: 18px;
display: inline-flex;
align-items: center;
justify-content: center;
vertical-align: middle;
font-size: 10px;
font-weight: bold;
background: rgba(255, 255, 255, 0.15);
border-radius: 3px;
color: #fff;
text-transform: uppercase;
}
.badge img {
width: 100%;
height: 100%;
border-radius: 3px;
}
.username {
font-weight: bold;
font-size: 14px;
text-shadow: 1px 1px 2px rgba(0, 0, 0, 0.8);
}
.timestamp {
font-size: 11px;
color: #aaa;
opacity: 0.8;
}
/* Message text */
.message-text {
font-size: 14px;
line-height: 1.4;
word-wrap: break-word;
overflow-wrap: break-word;
}
/* Emotes */
.emote {
display: inline-block;
vertical-align: middle;
margin: 0 2px;
max-height: 28px;
max-width: 28px;
image-rendering: pixelated;
}
.emote.animated {
image-rendering: auto;
}
/* Roles */
.role-badge {
display: inline-block;
padding: 2px 6px;
border-radius: 3px;
font-size: 10px;
font-weight: bold;
text-transform: uppercase;
background: rgba(255, 255, 255, 0.1);
}
.role-badge.broadcaster {
background: #e91916;
}
.role-badge.moderator {
background: #00ad03;
}
.role-badge.vip {
background: #e005b9;
}
.role-badge.subscriber {
background: #6441a5;
}
/* Loading/Error states */
.status-message {
text-align: center;
padding: 20px;
color: #aaa;
font-size: 14px;
}
.error {
color: #ff6b6b;
}
.connecting {
color: #4ecdc4;
}

299
app/auth.py Normal file
View file

@ -0,0 +1,299 @@
from __future__ import annotations
import json
import secrets
import webbrowser
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from urllib.parse import urlencode
from aiohttp import web
from app.chat_models import AuthTokens, Platform
from app.config import load_config
from app.paths import get_data_dir
from app.state import AppState
# In-memory state storage for OAuth flow
oauth_states: dict[str, dict] = {}
# Global config - loaded at module level
_app_config = load_config()
def get_tokens_file() -> Path:
"""Get path to tokens storage file."""
data_dir = get_data_dir()
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir / "tokens.json"
async def load_tokens(state: AppState) -> None:
"""Load saved tokens from disk."""
tokens_file = get_tokens_file()
if not tokens_file.exists():
return
try:
with open(tokens_file, "r") as f:
data = json.load(f)
if "twitch" in data:
twitch_data = data["twitch"]
tokens = AuthTokens(
access_token=twitch_data["access_token"],
refresh_token=twitch_data.get("refresh_token"),
expires_at=(
datetime.fromisoformat(twitch_data["expires_at"])
if twitch_data.get("expires_at")
else None
),
scope=twitch_data.get("scope", []),
)
await state.set_auth_tokens(Platform.TWITCH, tokens)
if "youtube" in data:
youtube_data = data["youtube"]
tokens = AuthTokens(
access_token=youtube_data["access_token"],
refresh_token=youtube_data.get("refresh_token"),
expires_at=(
datetime.fromisoformat(youtube_data["expires_at"])
if youtube_data.get("expires_at")
else None
),
scope=youtube_data.get("scope", []),
)
await state.set_auth_tokens(Platform.YOUTUBE, tokens)
except Exception as e:
print(f"Error loading tokens: {e}")
async def save_tokens(state: AppState) -> None:
"""Save tokens to disk."""
tokens_file = get_tokens_file()
data = {}
twitch_tokens = await state.get_auth_tokens(Platform.TWITCH)
if twitch_tokens:
data["twitch"] = twitch_tokens.to_dict()
youtube_tokens = await state.get_auth_tokens(Platform.YOUTUBE)
if youtube_tokens:
data["youtube"] = youtube_tokens.to_dict()
try:
with open(tokens_file, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error saving tokens: {e}")
async def handle_twitch_login(request: web.Request) -> web.Response:
"""Initiate Twitch OAuth flow."""
if not _app_config.twitch_oauth.is_configured():
return web.json_response(
{
"error": "Twitch OAuth not configured. Please edit config.json with your OAuth credentials.",
"config_path": str(load_config().twitch_oauth),
},
status=400,
)
state_token = secrets.token_urlsafe(32)
oauth_states[state_token] = {"platform": "twitch", "timestamp": datetime.now()}
params = {
"client_id": _app_config.twitch_oauth.client_id,
"redirect_uri": _app_config.twitch_oauth.redirect_uri,
"response_type": "code",
"scope": "chat:read",
"state": state_token,
}
auth_url = f"https://id.twitch.tv/oauth2/authorize?{urlencode(params)}"
# Open browser
webbrowser.open(auth_url)
return web.json_response({"message": "Opening browser for Twitch login..."})
async def handle_twitch_callback(request: web.Request) -> web.Response:
"""Handle Twitch OAuth callback."""
code = request.query.get("code")
state_token = request.query.get("state")
if not code or not state_token or state_token not in oauth_states:
return web.Response(text="Invalid OAuth state", status=400)
del oauth_states[state_token]
# Exchange code for token
import aiohttp
async with aiohttp.ClientSession() as session:
token_url = "https://id.twitch.tv/oauth2/token"
data = {
"client_id": _app_config.twitch_oauth.client_id,
"client_secret": _app_config.twitch_oauth.client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": _app_config.twitch_oauth.redirect_uri,
}
async with session.post(token_url, data=data) as resp:
if resp.status != 200:
return web.Response(text="Failed to get access token", status=400)
token_data = await resp.json()
# Store tokens
state: AppState = request.app["state"]
expires_in = token_data.get("expires_in", 3600)
tokens = AuthTokens(
access_token=token_data["access_token"],
refresh_token=token_data.get("refresh_token"),
expires_at=datetime.now() + timedelta(seconds=expires_in),
scope=token_data.get("scope", []),
)
await state.set_auth_tokens(Platform.TWITCH, tokens)
await save_tokens(state)
html = """<!DOCTYPE html>
<html>
<head>
<title>Twitch Login Successful</title>
<style>
body { font-family: -apple-system, system-ui, sans-serif; display: flex; align-items: center; justify-content: center; height: 100vh; margin: 0; background: #0f172a; color: white; }
.card { text-align: center; padding: 40px; background: #1e293b; border-radius: 12px; }
h1 { color: #a78bfa; margin-bottom: 16px; }
p { color: #94a3b8; }
</style>
</head>
<body>
<div class="card">
<h1> Twitch Login Successful!</h1>
<p>This window will close automatically...</p>
<script>
if (window.opener && !window.opener.closed) {
window.opener.onAuthComplete && window.opener.onAuthComplete('twitch');
}
setTimeout(() => window.close(), 1500);
</script>
</div>
</body>
</html>"""
return web.Response(text=html, content_type="text/html")
async def handle_youtube_login(request: web.Request) -> web.Response:
"""Initiate YouTube OAuth flow."""
if not _app_config.youtube_oauth.is_configured():
return web.json_response(
{
"error": "YouTube OAuth not configured. Please edit config.json with your OAuth credentials.",
"config_path": str(load_config().youtube_oauth),
},
status=400,
)
state_token = secrets.token_urlsafe(32)
oauth_states[state_token] = {"platform": "youtube", "timestamp": datetime.now()}
params = {
"client_id": _app_config.youtube_oauth.client_id,
"redirect_uri": _app_config.youtube_oauth.redirect_uri,
"response_type": "code",
"scope": "https://www.googleapis.com/auth/youtube.readonly",
"state": state_token,
"access_type": "offline",
"prompt": "consent",
}
auth_url = f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode(params)}"
webbrowser.open(auth_url)
return web.json_response({"message": "Opening browser for YouTube login..."})
async def handle_youtube_callback(request: web.Request) -> web.Response:
"""Handle YouTube OAuth callback."""
code = request.query.get("code")
state_token = request.query.get("state")
if not code or not state_token or state_token not in oauth_states:
return web.Response(text="Invalid OAuth state", status=400)
del oauth_states[state_token]
# Exchange code for token
import aiohttp
async with aiohttp.ClientSession() as session:
token_url = "https://oauth2.googleapis.com/token"
data = {
"client_id": _app_config.youtube_oauth.client_id,
"client_secret": _app_config.youtube_oauth.client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": _app_config.youtube_oauth.redirect_uri,
}
async with session.post(token_url, data=data) as resp:
if resp.status != 200:
return web.Response(text="Failed to get access token", status=400)
token_data = await resp.json()
# Store tokens
state: AppState = request.app["state"]
expires_in = token_data.get("expires_in", 3600)
tokens = AuthTokens(
access_token=token_data["access_token"],
refresh_token=token_data.get("refresh_token"),
expires_at=datetime.now() + timedelta(seconds=expires_in),
scope=token_data.get("scope", "").split(),
)
await state.set_auth_tokens(Platform.YOUTUBE, tokens)
await save_tokens(state)
html = """<!DOCTYPE html>
<html>
<head>
<title>YouTube Login Successful</title>
<style>
body { font-family: -apple-system, system-ui, sans-serif; display: flex; align-items: center; justify-content: center; height: 100vh; margin: 0; background: #0f172a; color: white; }
.card { text-align: center; padding: 40px; background: #1e293b; border-radius: 12px; }
h1 { color: #f87171; margin-bottom: 16px; }
p { color: #94a3b8; }
</style>
</head>
<body>
<div class="card">
<h1> YouTube Login Successful!</h1>
<p>This window will close automatically...</p>
<script>
if (window.opener && !window.opener.closed) {
window.opener.onAuthComplete && window.opener.onAuthComplete('youtube');
}
setTimeout(() => window.close(), 1500);
</script>
</div>
</body>
</html>"""
return web.Response(text=html, content_type="text/html")
def register_auth_routes(app: web.Application) -> None:
"""Register OAuth routes to the application."""
app.router.add_get("/auth/twitch/login", handle_twitch_login)
app.router.add_get("/auth/twitch/callback", handle_twitch_callback)
app.router.add_get("/auth/youtube/login", handle_youtube_login)
app.router.add_get("/auth/youtube/callback", handle_youtube_callback)

96
app/chat_manager.py Normal file
View file

@ -0,0 +1,96 @@
from __future__ import annotations
import asyncio
from typing import Optional
from app.chat_models import Platform
from app.providers.twitch_chat import TwitchChatClient
from app.providers.youtube_chat import YouTubeChatClient
from app.state import AppState
class ChatManager:
"""
Manages chat connections to Twitch and YouTube.
Starts/stops clients based on configuration.
"""
def __init__(self, state: AppState):
self.state = state
self.twitch_client: Optional[TwitchChatClient] = None
self.youtube_client: Optional[YouTubeChatClient] = None
self.twitch_task: Optional[asyncio.Task] = None
self.youtube_task: Optional[asyncio.Task] = None
async def start(self) -> None:
"""Start chat clients based on current configuration."""
config = self.state.chat_config
# Start Twitch if configured
if config.twitch_channel:
twitch_tokens = await self.state.get_auth_tokens(Platform.TWITCH)
if twitch_tokens or True: # Allow anonymous connection
await self.start_twitch(config.twitch_channel)
# Start YouTube if authenticated (video_id is optional - can auto-detect)
youtube_tokens = await self.state.get_auth_tokens(Platform.YOUTUBE)
if youtube_tokens:
# Pass video_id if provided, otherwise YouTubeChatClient will auto-detect
await self.start_youtube(config.youtube_video_id or None)
async def stop(self) -> None:
"""Stop all chat clients."""
await self.stop_twitch()
await self.stop_youtube()
async def start_twitch(self, channel: str) -> None:
"""Start Twitch chat client."""
await self.stop_twitch()
self.twitch_client = TwitchChatClient(self.state, channel)
self.twitch_task = asyncio.create_task(self.twitch_client.start())
print(f"Started Twitch chat for channel: {channel}")
async def stop_twitch(self) -> None:
"""Stop Twitch chat client."""
if self.twitch_client:
await self.twitch_client.stop()
self.twitch_client = None
if self.twitch_task and not self.twitch_task.done():
self.twitch_task.cancel()
try:
await self.twitch_task
except asyncio.CancelledError:
pass
self.twitch_task = None
async def start_youtube(self, video_id: Optional[str] = None) -> None:
"""Start YouTube chat client."""
await self.stop_youtube()
self.youtube_client = YouTubeChatClient(self.state, video_id)
self.youtube_task = asyncio.create_task(self.youtube_client.start())
if video_id:
print(f"Started YouTube chat for video: {video_id}")
else:
print("Started YouTube chat (auto-detecting active broadcast)")
async def stop_youtube(self) -> None:
"""Stop YouTube chat client."""
if self.youtube_client:
await self.youtube_client.stop()
self.youtube_client = None
if self.youtube_task and not self.youtube_task.done():
self.youtube_task.cancel()
try:
await self.youtube_task
except asyncio.CancelledError:
pass
self.youtube_task = None
async def restart(self) -> None:
"""Restart all chat clients with current configuration."""
await self.stop()
await self.start()

149
app/chat_models.py Normal file
View file

@ -0,0 +1,149 @@
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
class Platform(str, Enum):
TWITCH = "twitch"
YOUTUBE = "youtube"
class UserRole(str, Enum):
BROADCASTER = "broadcaster"
MODERATOR = "moderator"
VIP = "vip"
SUBSCRIBER = "subscriber"
VIEWER = "viewer"
@dataclass
class Emote:
"""Represents an emote that can be rendered in chat."""
code: str
url: str
provider: str # "twitch", "ffz", "bttv", "7tv", "youtube"
is_animated: bool = False
scale: int = 1
@dataclass
class ChatBadge:
"""User badge (mod, subscriber, etc.)."""
name: str
icon_url: Optional[str] = None
@dataclass
class ChatUser:
"""Represents a chat user."""
id: str
username: str
display_name: str
platform: Platform
color: Optional[str] = None
roles: List[UserRole] = field(default_factory=list)
badges: List[ChatBadge] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"username": self.username,
"display_name": self.display_name,
"platform": self.platform.value,
"color": self.color,
"roles": [r.value for r in self.roles],
"badges": [{"name": b.name, "icon_url": b.icon_url} for b in self.badges],
}
@dataclass
class ChatMessage:
"""Represents a single chat message from either platform."""
id: str
platform: Platform
user: ChatUser
message: str
timestamp: datetime
emotes: List[Emote] = field(default_factory=list)
is_deleted: bool = False
is_action: bool = False # /me messages
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"platform": self.platform.value,
"user": self.user.to_dict(),
"message": self.message,
"timestamp": self.timestamp.isoformat(),
"emotes": [
{
"code": e.code,
"url": e.url,
"provider": e.provider,
"is_animated": e.is_animated,
"scale": e.scale,
}
for e in self.emotes
],
"is_deleted": self.is_deleted,
"is_action": self.is_action,
}
@dataclass
class AuthTokens:
"""OAuth tokens for a platform."""
access_token: str
refresh_token: Optional[str] = None
expires_at: Optional[datetime] = None
scope: List[str] = field(default_factory=list)
def is_expired(self) -> bool:
if not self.expires_at:
return False
return datetime.now() >= self.expires_at
def to_dict(self) -> Dict[str, Any]:
return {
"access_token": self.access_token,
"refresh_token": self.refresh_token,
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"scope": self.scope,
}
@dataclass
class ChatConfig:
"""Configuration for the chat widget."""
# Authentication
twitch_enabled: bool = False
youtube_enabled: bool = False
# Display settings
max_messages: int = 50
show_timestamps: bool = True
show_badges: bool = True
show_platform_icons: bool = True
unified_view: bool = True # True = mixed, False = separate columns
# Emote providers
enable_ffz: bool = True
enable_bttv: bool = True
enable_7tv: bool = True
# Filtering
filter_by_roles: List[UserRole] = field(default_factory=list)
blocked_keywords: List[str] = field(default_factory=list)
min_message_length: int = 0
# Twitch specific
twitch_channel: str = ""
# YouTube specific
youtube_video_id: str = ""
def to_dict(self) -> Dict[str, Any]:
return asdict(self)

239
app/config.py Normal file
View file

@ -0,0 +1,239 @@
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Optional
from app.paths import get_data_dir
# =============================================================================
# BUNDLED OAUTH CREDENTIALS
# =============================================================================
# These are the default OAuth credentials bundled with the app.
# Users can click "Login with Twitch/YouTube" without any setup.
#
# To configure: Replace these with your own OAuth app credentials before
# building the executable. Leave empty to require users to provide their own.
# =============================================================================
BUNDLED_TWITCH_CLIENT_ID = ""
BUNDLED_TWITCH_CLIENT_SECRET = ""
BUNDLED_YOUTUBE_CLIENT_ID = ""
BUNDLED_YOUTUBE_CLIENT_SECRET = ""
# =============================================================================
@dataclass
class OAuthConfig:
"""OAuth configuration for a platform."""
client_id: str = ""
client_secret: str = ""
redirect_uri: str = ""
# Placeholder values that indicate unconfigured credentials
_PLACEHOLDER_VALUES = frozenset({
"",
"YOUR_TWITCH_CLIENT_ID",
"YOUR_TWITCH_CLIENT_SECRET",
"YOUR_YOUTUBE_CLIENT_ID",
"YOUR_YOUTUBE_CLIENT_SECRET",
})
def is_configured(self) -> bool:
"""Check if OAuth is properly configured (not placeholder values)."""
return (
bool(self.client_id and self.client_secret and self.redirect_uri)
and self.client_id not in self._PLACEHOLDER_VALUES
and self.client_secret not in self._PLACEHOLDER_VALUES
)
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class AppConfig:
"""Application configuration including OAuth credentials."""
twitch_oauth: OAuthConfig
youtube_oauth: OAuthConfig
server_host: str = "127.0.0.1"
server_port: int = 8765
def to_dict(self) -> dict:
return {
"twitch_oauth": self.twitch_oauth.to_dict(),
"youtube_oauth": self.youtube_oauth.to_dict(),
"server_host": self.server_host,
"server_port": self.server_port,
}
def get_config_file() -> Path:
"""Get path to configuration file."""
data_dir = get_data_dir()
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir / "config.json"
def _get_effective_credential(user_value: str, bundled_value: str) -> str:
"""Return user value if set, otherwise fall back to bundled value."""
if user_value and user_value not in OAuthConfig._PLACEHOLDER_VALUES:
return user_value
return bundled_value
def load_config() -> AppConfig:
"""Load configuration from file, with bundled credentials as fallback.
Priority: User config file > Bundled credentials > Empty
"""
config_file = get_config_file()
# Start with bundled defaults
twitch_client_id = BUNDLED_TWITCH_CLIENT_ID
twitch_client_secret = BUNDLED_TWITCH_CLIENT_SECRET
youtube_client_id = BUNDLED_YOUTUBE_CLIENT_ID
youtube_client_secret = BUNDLED_YOUTUBE_CLIENT_SECRET
server_host = "127.0.0.1"
server_port = 8765
# Override with user config if it exists
if config_file.exists():
try:
with open(config_file, "r") as f:
data = json.load(f)
twitch_data = data.get("twitch_oauth", {})
youtube_data = data.get("youtube_oauth", {})
# User values override bundled values (if user has set them)
twitch_client_id = _get_effective_credential(
twitch_data.get("client_id", ""), BUNDLED_TWITCH_CLIENT_ID
)
twitch_client_secret = _get_effective_credential(
twitch_data.get("client_secret", ""), BUNDLED_TWITCH_CLIENT_SECRET
)
youtube_client_id = _get_effective_credential(
youtube_data.get("client_id", ""), BUNDLED_YOUTUBE_CLIENT_ID
)
youtube_client_secret = _get_effective_credential(
youtube_data.get("client_secret", ""), BUNDLED_YOUTUBE_CLIENT_SECRET
)
server_host = data.get("server_host", "127.0.0.1")
server_port = data.get("server_port", 8765)
except Exception as e:
print(f"Error loading config: {e}")
return AppConfig(
twitch_oauth=OAuthConfig(
client_id=twitch_client_id,
client_secret=twitch_client_secret,
redirect_uri="http://localhost:8765/auth/twitch/callback",
),
youtube_oauth=OAuthConfig(
client_id=youtube_client_id,
client_secret=youtube_client_secret,
redirect_uri="http://localhost:8765/auth/youtube/callback",
),
server_host=server_host,
server_port=server_port,
)
def save_config(config: AppConfig) -> None:
"""Save configuration to file."""
config_file = get_config_file()
try:
with open(config_file, "w") as f:
json.dump(config.to_dict(), f, indent=2)
except Exception as e:
print(f"Error saving config: {e}")
def create_example_config() -> None:
"""Create an example configuration file if none exists."""
config_file = get_config_file()
if config_file.exists():
return
example_config = AppConfig(
twitch_oauth=OAuthConfig(
client_id="YOUR_TWITCH_CLIENT_ID",
client_secret="YOUR_TWITCH_CLIENT_SECRET",
redirect_uri="http://localhost:8765/auth/twitch/callback",
),
youtube_oauth=OAuthConfig(
client_id="YOUR_YOUTUBE_CLIENT_ID",
client_secret="YOUR_YOUTUBE_CLIENT_SECRET",
redirect_uri="http://localhost:8765/auth/youtube/callback",
),
)
save_config(example_config)
print(f"Created example config at: {config_file}")
print("Please edit this file with your OAuth credentials.")
def open_config_directory() -> bool:
"""Open the config directory in the system file explorer."""
import platform
import subprocess
config_dir = get_data_dir()
config_dir.mkdir(parents=True, exist_ok=True)
try:
if platform.system() == "Windows":
subprocess.run(["explorer", str(config_dir)], check=False)
elif platform.system() == "Darwin": # macOS
subprocess.run(["open", str(config_dir)], check=False)
else: # Linux
subprocess.run(["xdg-open", str(config_dir)], check=False)
return True
except Exception as e:
print(f"Error opening config directory: {e}")
return False
# =============================================================================
# CHAT SETTINGS PERSISTENCE
# =============================================================================
def get_chat_settings_file() -> Path:
"""Get path to chat settings file."""
data_dir = get_data_dir()
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir / "chat_settings.json"
def load_chat_settings() -> dict:
"""Load chat settings from file."""
settings_file = get_chat_settings_file()
if not settings_file.exists():
return {}
try:
with open(settings_file, "r") as f:
return json.load(f)
except Exception as e:
print(f"Error loading chat settings: {e}")
return {}
def save_chat_settings(settings: dict) -> None:
"""Save chat settings to file."""
settings_file = get_chat_settings_file()
try:
with open(settings_file, "w") as f:
json.dump(settings, f, indent=2)
except Exception as e:
print(f"Error saving chat settings: {e}")

View file

@ -8,6 +8,10 @@ from typing import Optional
from aiohttp import web
from app.auth import load_tokens
from app.chat_manager import ChatManager
from app.chat_models import ChatConfig
from app.config import create_example_config, get_config_file, load_chat_settings
from app.providers.gsmtc import run_gsmtc_provider
from app.state import AppState
from app.webserver import make_app
@ -36,7 +40,35 @@ def _install_loop_exception_handler(loop: asyncio.AbstractEventLoop) -> None:
loop.set_exception_handler(handler)
def _load_chat_config_from_settings(state: AppState) -> None:
"""Load saved chat settings into state."""
settings = load_chat_settings()
if settings:
state.chat_config = ChatConfig(
twitch_channel=settings.get("twitch_channel", ""),
youtube_video_id=settings.get("youtube_video_id", ""),
max_messages=settings.get("max_messages", 50),
show_timestamps=settings.get("show_timestamps", True),
show_badges=settings.get("show_badges", True),
show_platform_icons=settings.get("show_platform_icons", True),
unified_view=settings.get("unified_view", True),
enable_ffz=settings.get("enable_ffz", True),
enable_bttv=settings.get("enable_bttv", True),
enable_7tv=settings.get("enable_7tv", True),
)
print(f"Loaded chat settings: twitch={settings.get('twitch_channel', '')}, youtube={settings.get('youtube_video_id', '')}")
async def _run_server(host: str, port: int, state: AppState) -> None:
# Create example config if it doesn't exist
create_example_config()
# Load saved tokens
await load_tokens(state)
# Load saved chat settings
_load_chat_config_from_settings(state)
app = make_app(state)
runner = web.AppRunner(app)
await runner.setup()
@ -46,8 +78,16 @@ async def _run_server(host: str, port: int, state: AppState) -> None:
# Start providers
asyncio.create_task(run_gsmtc_provider(state))
while True:
await asyncio.sleep(3600)
# Start chat manager (if configured)
chat_manager = ChatManager(state)
state.chat_manager = chat_manager # Store reference for config changes
await chat_manager.start()
try:
while True:
await asyncio.sleep(3600)
finally:
await chat_manager.stop()
def run_forever(host: str = "127.0.0.1", port: int = 8765) -> None:
@ -98,6 +138,15 @@ class ServerController:
state = AppState()
async def runner() -> None:
# Create example config if it doesn't exist
create_example_config()
# Load saved tokens
await load_tokens(state)
# Load saved chat settings
_load_chat_config_from_settings(state)
app = make_app(state)
runner = web.AppRunner(app)
await runner.setup()
@ -105,10 +154,16 @@ class ServerController:
await site.start()
provider_task = asyncio.create_task(run_gsmtc_provider(state))
# Start chat manager
chat_manager = ChatManager(state)
state.chat_manager = chat_manager # Store reference for config changes
await chat_manager.start()
try:
while not self._stop_evt.is_set():
await asyncio.sleep(0.2)
finally:
await chat_manager.stop()
provider_task.cancel()
# CancelledError may derive from BaseException depending on Python version;
# suppress it so Stop doesn't spam a traceback.

View file

@ -0,0 +1,489 @@
from __future__ import annotations
import asyncio
import re
from datetime import datetime
from typing import Optional
import aiohttp
from app.chat_models import ChatBadge, ChatMessage, ChatUser, Emote, Platform, UserRole
from app.state import AppState
class TwitchChatClient:
"""
Twitch IRC WebSocket client for reading chat messages.
Uses anonymous IRC connection or authenticated if token is provided.
"""
IRC_WS_URL = "wss://irc-ws.chat.twitch.tv:443"
def __init__(self, state: AppState, channel: str):
self.state = state
self.channel = channel.lower().lstrip("#")
self.ws: Optional[aiohttp.ClientWebSocket] = None
self.session: Optional[aiohttp.ClientSession] = None
self.running = False
# Emote caches
self.global_emotes: dict[str, Emote] = {}
self.channel_emotes: dict[str, Emote] = {}
# Badge caches: badge_name/version -> image_url
self.global_badges: dict[str, str] = {}
self.channel_badges: dict[str, str] = {}
self.channel_id: Optional[str] = None
async def start(self) -> None:
"""Start the IRC connection."""
self.running = True
self.session = aiohttp.ClientSession()
tokens = await self.state.get_auth_tokens(Platform.TWITCH)
try:
# Get channel ID for badges and emotes
await self._get_channel_id()
# Load badges
await self._load_badges()
# Load emotes
await self._load_emotes()
# Connect to IRC
self.ws = await self.session.ws_connect(self.IRC_WS_URL)
# Authenticate
if tokens and tokens.access_token:
await self.ws.send_str(f"PASS oauth:{tokens.access_token}")
await self.ws.send_str(f"NICK {self.channel}")
else:
# Anonymous connection
await self.ws.send_str("PASS SCHMOOPIIE")
await self.ws.send_str(f"NICK justinfan{asyncio.get_event_loop().time():.0f}")
# Request capabilities for tags (emotes, badges, color, etc.)
await self.ws.send_str("CAP REQ :twitch.tv/tags twitch.tv/commands")
# Join channel
await self.ws.send_str(f"JOIN #{self.channel}")
# Start message loop
await self._message_loop()
except Exception as e:
print(f"Twitch chat error: {e}")
finally:
await self.stop()
async def stop(self) -> None:
"""Stop the IRC connection."""
self.running = False
if self.ws:
await self.ws.close()
if self.session:
await self.session.close()
async def _message_loop(self) -> None:
"""Main loop to receive and process IRC messages."""
if not self.ws:
return
async for msg in self.ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_irc_message(msg.data)
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
async def _handle_irc_message(self, raw: str) -> None:
"""Parse and handle a single IRC message."""
raw = raw.strip()
# Respond to PING
if raw.startswith("PING"):
if self.ws:
await self.ws.send_str("PONG :tmi.twitch.tv")
return
# Parse PRIVMSG (chat messages)
if "PRIVMSG" in raw:
await self._parse_privmsg(raw)
async def _parse_privmsg(self, raw: str) -> None:
"""
Parse a PRIVMSG IRC line.
Format: @tags :user!user@user.tmi.twitch.tv PRIVMSG #channel :message
"""
# Extract tags
tags = {}
if raw.startswith("@"):
tag_str, raw = raw.split(" ", 1)
for tag in tag_str[1:].split(";"):
if "=" in tag:
key, value = tag.split("=", 1)
tags[key] = value
# Extract user
user_match = re.search(r":(\w+)!", raw)
if not user_match:
return
username = user_match.group(1)
# Extract message
msg_match = re.search(r"PRIVMSG #\w+ :(.+)", raw)
if not msg_match:
return
message_text = msg_match.group(1)
# Check for /me action
is_action = message_text.startswith("\x01ACTION") and message_text.endswith("\x01")
if is_action:
message_text = message_text[8:-1].strip()
# Build user object
user = self._build_user(username, tags)
# Build message object
msg_id = tags.get("id", f"{username}_{datetime.now().timestamp()}")
emotes = await self._parse_emotes(message_text, tags)
chat_msg = ChatMessage(
id=msg_id,
platform=Platform.TWITCH,
user=user,
message=message_text,
timestamp=datetime.now(),
emotes=emotes,
is_action=is_action,
)
# Add to state
await self.state.add_chat_message(chat_msg)
def _build_user(self, username: str, tags: dict[str, str]) -> ChatUser:
"""Build a ChatUser from IRC tags."""
display_name = tags.get("display-name", username)
user_id = tags.get("user-id", username)
color = tags.get("color") or None
# Parse roles
roles = [UserRole.VIEWER]
badges_tag = tags.get("badges", "")
if "broadcaster" in badges_tag:
roles.append(UserRole.BROADCASTER)
if "moderator" in badges_tag:
roles.append(UserRole.MODERATOR)
if "vip" in badges_tag:
roles.append(UserRole.VIP)
if "subscriber" in badges_tag or "founder" in badges_tag:
roles.append(UserRole.SUBSCRIBER)
# Parse badges with icons
badges = []
if badges_tag:
for badge_pair in badges_tag.split(","):
if "/" in badge_pair:
badge_name, badge_version = badge_pair.split("/", 1)
badge_key = f"{badge_name}/{badge_version}"
# Look up badge image URL (channel badges first, then global)
icon_url = self.channel_badges.get(badge_key) or self.global_badges.get(badge_key)
badges.append(ChatBadge(name=badge_name, icon_url=icon_url))
return ChatUser(
id=user_id,
username=username,
display_name=display_name,
platform=Platform.TWITCH,
color=color,
roles=roles,
badges=badges,
)
async def _get_channel_id(self) -> None:
"""Get the channel's Twitch user ID (needed for badges/emotes)."""
if not self.session:
return
try:
# Use the unofficial Twitch API to get user ID from username
url = f"https://api.ivr.fi/v2/twitch/user?login={self.channel}"
async with self.session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
if data and len(data) > 0:
self.channel_id = data[0].get("id")
print(f"Twitch: Got channel ID {self.channel_id} for {self.channel}")
except Exception as e:
print(f"Twitch: Error getting channel ID: {e}")
async def _load_badges(self) -> None:
"""Load Twitch badges (global and channel-specific) using Helix API."""
if not self.session:
return
# Get OAuth config for Client-ID
from app.config import load_config
config = load_config()
client_id = config.twitch_oauth.client_id
# Get access token if available
tokens = await self.state.get_auth_tokens(Platform.TWITCH)
headers = {}
if client_id:
headers["Client-ID"] = client_id
if tokens and tokens.access_token:
headers["Authorization"] = f"Bearer {tokens.access_token}"
try:
# Load global badges via Helix API
if headers:
async with self.session.get(
"https://api.twitch.tv/helix/chat/badges/global",
headers=headers
) as resp:
if resp.status == 200:
data = await resp.json()
for badge_set in data.get("data", []):
badge_name = badge_set.get("set_id")
for version in badge_set.get("versions", []):
version_id = version.get("id")
badge_key = f"{badge_name}/{version_id}"
# Prefer higher resolution images
icon_url = (
version.get("image_url_4x") or
version.get("image_url_2x") or
version.get("image_url_1x")
)
if icon_url:
self.global_badges[badge_key] = icon_url
print(f"Twitch: Loaded {len(self.global_badges)} global badges")
else:
print(f"Twitch: Failed to load global badges (status {resp.status})")
# Load channel badges if we have channel ID
if self.channel_id:
async with self.session.get(
f"https://api.twitch.tv/helix/chat/badges?broadcaster_id={self.channel_id}",
headers=headers
) as resp:
if resp.status == 200:
data = await resp.json()
for badge_set in data.get("data", []):
badge_name = badge_set.get("set_id")
for version in badge_set.get("versions", []):
version_id = version.get("id")
badge_key = f"{badge_name}/{version_id}"
icon_url = (
version.get("image_url_4x") or
version.get("image_url_2x") or
version.get("image_url_1x")
)
if icon_url:
self.channel_badges[badge_key] = icon_url
print(f"Twitch: Loaded {len(self.channel_badges)} channel badges")
else:
# Fallback: use static badge URLs for common badges if no OAuth
self._load_static_badges()
except Exception as e:
print(f"Twitch: Error loading badges: {e}")
# Fallback to static badges
self._load_static_badges()
def _load_static_badges(self) -> None:
"""Load static fallback badges for common badge types."""
# These are stable CDN URLs for common Twitch badges
static_badges = {
"broadcaster/1": "https://static-cdn.jtvnw.net/badges/v1/5527c58c-fb7d-422d-b71b-f309dcb85cc1/3",
"moderator/1": "https://static-cdn.jtvnw.net/badges/v1/3267646d-33f0-4b17-b3df-f923a41db1d0/3",
"vip/1": "https://static-cdn.jtvnw.net/badges/v1/b817aba4-fad8-49e2-b88a-7cc744f6a6e3/3",
"subscriber/0": "https://static-cdn.jtvnw.net/badges/v1/5d9f2208-5dd8-11e7-8513-2ff4adfae661/3",
"subscriber/1": "https://static-cdn.jtvnw.net/badges/v1/5d9f2208-5dd8-11e7-8513-2ff4adfae661/3",
"premium/1": "https://static-cdn.jtvnw.net/badges/v1/bbbe0db0-a598-423e-86d0-f9fb98ca1933/3",
"partner/1": "https://static-cdn.jtvnw.net/badges/v1/d12a2e27-16f6-41d0-ab77-b780518f00a3/3",
"turbo/1": "https://static-cdn.jtvnw.net/badges/v1/bd444ec6-8f34-4bf9-91f4-af1e3428d80f/3",
"glhf-pledge/1": "https://static-cdn.jtvnw.net/badges/v1/3158e758-3cb4-43c5-94b3-7571f71cf6a0/3",
"founder/0": "https://static-cdn.jtvnw.net/badges/v1/511b78a9-ab37-472f-9569-457753bbe7d3/3",
}
self.global_badges.update(static_badges)
print(f"Twitch: Loaded {len(static_badges)} static fallback badges")
async def _parse_emotes(self, message: str, tags: dict[str, str]) -> list[Emote]:
"""Parse emotes from message and tags."""
emotes = []
# Parse Twitch native emotes from tags
emotes_tag = tags.get("emotes", "")
if emotes_tag:
# Format: "emoteid:start-end,start-end/emoteid2:start-end"
for emote_data in emotes_tag.split("/"):
if ":" not in emote_data:
continue
emote_id, positions = emote_data.split(":", 1)
# Just use first position to get the code
if "-" in positions:
start_pos = int(positions.split(",")[0].split("-")[0])
end_pos = int(positions.split(",")[0].split("-")[1])
code = message[start_pos : end_pos + 1]
emotes.append(
Emote(
code=code,
url=f"https://static-cdn.jtvnw.net/emoticons/v2/{emote_id}/default/dark/1.0",
provider="twitch",
)
)
# Check for third-party emotes in message
words = message.split()
for word in words:
# Check FFZ
if word in self.global_emotes or word in self.channel_emotes:
emote = self.global_emotes.get(word) or self.channel_emotes.get(word)
if emote and emote not in emotes:
emotes.append(emote)
return emotes
async def _load_emotes(self) -> None:
"""Load third-party emotes from FFZ, BTTV, 7TV."""
config = self.state.chat_config
if not self.session:
return
try:
# Load FrankerFaceZ emotes
if config.enable_ffz:
await self._load_ffz_emotes()
# Load BTTV emotes
if config.enable_bttv:
await self._load_bttv_emotes()
# Load 7TV emotes
if config.enable_7tv:
await self._load_7tv_emotes()
except Exception as e:
print(f"Error loading emotes: {e}")
async def _load_ffz_emotes(self) -> None:
"""Load FrankerFaceZ emotes for the channel."""
if not self.session:
return
try:
# Global FFZ emotes
async with self.session.get("https://api.frankerfacez.com/v1/set/global") as resp:
if resp.status == 200:
data = await resp.json()
for set_id, set_data in data.get("sets", {}).items():
for emote in set_data.get("emoticons", []):
code = emote.get("name")
urls = emote.get("urls", {})
url = urls.get("4") or urls.get("2") or urls.get("1")
if code and url:
self.global_emotes[code] = Emote(
code=code, url=f"https:{url}" if url.startswith("//") else url, provider="ffz"
)
# Channel-specific FFZ emotes
async with self.session.get(f"https://api.frankerfacez.com/v1/room/{self.channel}") as resp:
if resp.status == 200:
data = await resp.json()
for set_id, set_data in data.get("sets", {}).items():
for emote in set_data.get("emoticons", []):
code = emote.get("name")
urls = emote.get("urls", {})
url = urls.get("4") or urls.get("2") or urls.get("1")
if code and url:
self.channel_emotes[code] = Emote(
code=code, url=f"https:{url}" if url.startswith("//") else url, provider="ffz"
)
except Exception as e:
print(f"FFZ emote load error: {e}")
async def _load_bttv_emotes(self) -> None:
"""Load BetterTTV emotes."""
if not self.session:
return
try:
# Global BTTV emotes
async with self.session.get("https://api.betterttv.net/3/cached/emotes/global") as resp:
if resp.status == 200:
emotes = await resp.json()
for emote in emotes:
code = emote.get("code")
emote_id = emote.get("id")
if code and emote_id:
self.global_emotes[code] = Emote(
code=code,
url=f"https://cdn.betterttv.net/emote/{emote_id}/1x",
provider="bttv",
)
# Channel BTTV emotes
async with self.session.get(f"https://api.betterttv.net/3/cached/users/twitch/{self.channel}") as resp:
if resp.status == 200:
data = await resp.json()
for emote in data.get("channelEmotes", []) + data.get("sharedEmotes", []):
code = emote.get("code")
emote_id = emote.get("id")
if code and emote_id:
self.channel_emotes[code] = Emote(
code=code,
url=f"https://cdn.betterttv.net/emote/{emote_id}/1x",
provider="bttv",
)
except Exception as e:
print(f"BTTV emote load error: {e}")
async def _load_7tv_emotes(self) -> None:
"""Load 7TV emotes."""
if not self.session:
return
try:
# Global 7TV emotes
async with self.session.get("https://7tv.io/v3/emote-sets/global") as resp:
if resp.status == 200:
data = await resp.json()
for emote in data.get("emotes", []):
code = emote.get("name")
emote_data = emote.get("data", {})
host = emote_data.get("host", {})
if code and host:
url = f"https:{host.get('url', '')}/1x.webp"
self.global_emotes[code] = Emote(
code=code,
url=url,
provider="7tv",
is_animated=emote.get("animated", False),
)
# Channel 7TV emotes
async with self.session.get(f"https://7tv.io/v3/users/twitch/{self.channel}") as resp:
if resp.status == 200:
data = await resp.json()
emote_set = data.get("emote_set", {})
for emote in emote_set.get("emotes", []):
code = emote.get("name")
emote_data = emote.get("data", {})
host = emote_data.get("host", {})
if code and host:
url = f"https:{host.get('url', '')}/1x.webp"
self.channel_emotes[code] = Emote(
code=code,
url=url,
provider="7tv",
is_animated=emote.get("animated", False),
)
except Exception as e:
print(f"7TV emote load error: {e}")

View file

@ -0,0 +1,245 @@
from __future__ import annotations
import asyncio
from datetime import datetime
from typing import Optional
import aiohttp
from app.chat_models import ChatBadge, ChatMessage, ChatUser, Emote, Platform, UserRole
from app.state import AppState
class YouTubeChatClient:
"""
YouTube Live Chat API client for reading chat messages.
Uses polling to fetch new messages.
"""
API_BASE = "https://www.googleapis.com/youtube/v3"
def __init__(self, state: AppState, video_id: Optional[str] = None):
self.state = state
self.video_id = video_id # Optional - can auto-detect if not provided
self.session: Optional[aiohttp.ClientSession] = None
self.running = False
self.live_chat_id: Optional[str] = None
self.next_page_token: Optional[str] = None
self.poll_interval_ms = 2000
self.broadcast_title: Optional[str] = None
async def start(self) -> None:
"""Start polling for chat messages."""
self.running = True
self.session = aiohttp.ClientSession()
tokens = await self.state.get_auth_tokens(Platform.YOUTUBE)
if not tokens or not tokens.access_token:
print("YouTube: No auth tokens available")
return
try:
# If no video ID provided, try to find user's active broadcast
if not self.video_id:
await self._find_active_broadcast(tokens.access_token)
# Get the live chat ID from the video
if self.video_id:
await self._get_live_chat_id(tokens.access_token)
if not self.live_chat_id:
print("YouTube: Could not find live chat (no active broadcast or invalid video ID)")
return
print(f"YouTube: Connected to live chat" + (f" for '{self.broadcast_title}'" if self.broadcast_title else ""))
# Start polling
await self._poll_loop(tokens.access_token)
except Exception as e:
print(f"YouTube chat error: {e}")
finally:
await self.stop()
async def stop(self) -> None:
"""Stop the polling loop."""
self.running = False
if self.session:
await self.session.close()
async def _find_active_broadcast(self, access_token: str) -> None:
"""Find the user's active live broadcast automatically."""
if not self.session:
return
url = f"{self.API_BASE}/liveBroadcasts"
params = {
"part": "id,snippet,status",
"mine": "true",
"broadcastStatus": "active", # Only get currently live broadcasts
}
headers = {"Authorization": f"Bearer {access_token}"}
try:
async with self.session.get(url, params=params, headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
items = data.get("items", [])
if items:
# Use the first active broadcast
broadcast = items[0]
self.video_id = broadcast.get("id")
self.broadcast_title = broadcast.get("snippet", {}).get("title")
print(f"YouTube: Found active broadcast: {self.broadcast_title}")
else:
print("YouTube: No active broadcasts found for your channel")
else:
error = await resp.text()
print(f"YouTube: Error finding broadcasts: {resp.status} - {error}")
except Exception as e:
print(f"YouTube: Error finding active broadcast: {e}")
async def _get_live_chat_id(self, access_token: str) -> None:
"""Fetch the live chat ID for a video."""
if not self.session or not self.video_id:
return
url = f"{self.API_BASE}/videos"
params = {
"part": "liveStreamingDetails,snippet",
"id": self.video_id,
}
headers = {"Authorization": f"Bearer {access_token}"}
try:
async with self.session.get(url, params=params, headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
items = data.get("items", [])
if items:
video = items[0]
live_details = video.get("liveStreamingDetails", {})
self.live_chat_id = live_details.get("activeLiveChatId")
if not self.broadcast_title:
self.broadcast_title = video.get("snippet", {}).get("title")
except Exception as e:
print(f"YouTube: Error fetching live chat ID: {e}")
async def _poll_loop(self, access_token: str) -> None:
"""Main polling loop to fetch chat messages."""
while self.running:
try:
await self._fetch_messages(access_token)
await asyncio.sleep(self.poll_interval_ms / 1000)
except Exception as e:
print(f"YouTube: Poll error: {e}")
await asyncio.sleep(5)
async def _fetch_messages(self, access_token: str) -> None:
"""Fetch new chat messages from the API."""
if not self.session or not self.live_chat_id:
return
url = f"{self.API_BASE}/liveChat/messages"
params = {
"liveChatId": self.live_chat_id,
"part": "snippet,authorDetails",
}
if self.next_page_token:
params["pageToken"] = self.next_page_token
headers = {"Authorization": f"Bearer {access_token}"}
try:
async with self.session.get(url, params=params, headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
# Update pagination
self.next_page_token = data.get("nextPageToken")
self.poll_interval_ms = data.get("pollingIntervalMillis", 2000)
# Process messages
for item in data.get("items", []):
await self._process_message(item)
except Exception as e:
print(f"YouTube: Error fetching messages: {e}")
async def _process_message(self, item: dict) -> None:
"""Process a single message item from the API."""
snippet = item.get("snippet", {})
author_details = item.get("authorDetails", {})
msg_type = snippet.get("type")
if msg_type != "textMessageEvent":
# Skip super chats, memberships, etc. for now
return
# Extract message data
message_id = item.get("id", "")
message_text = snippet.get("textMessageDetails", {}).get("messageText", "")
published_at_str = snippet.get("publishedAt", "")
# Parse timestamp
try:
timestamp = datetime.fromisoformat(published_at_str.replace("Z", "+00:00"))
except Exception:
timestamp = datetime.now()
# Build user
user = self._build_user(author_details)
# Build message
chat_msg = ChatMessage(
id=message_id,
platform=Platform.YOUTUBE,
user=user,
message=message_text,
timestamp=timestamp,
emotes=[], # YouTube uses standard emoji, could parse later
)
# Add to state
await self.state.add_chat_message(chat_msg)
def _build_user(self, author_details: dict) -> ChatUser:
"""Build a ChatUser from YouTube author details."""
user_id = author_details.get("channelId", "")
username = author_details.get("channelUrl", "").split("/")[-1] or user_id
display_name = author_details.get("displayName", username)
# Parse roles
roles = [UserRole.VIEWER]
is_owner = author_details.get("isChatOwner", False)
is_moderator = author_details.get("isChatModerator", False)
is_sponsor = author_details.get("isChatSponsor", False)
if is_owner:
roles.append(UserRole.BROADCASTER)
if is_moderator:
roles.append(UserRole.MODERATOR)
if is_sponsor:
roles.append(UserRole.SUBSCRIBER)
# Parse badges
badges = []
if is_owner:
badges.append(ChatBadge(name="owner"))
if is_moderator:
badges.append(ChatBadge(name="moderator"))
if is_sponsor:
badges.append(ChatBadge(name="member"))
return ChatUser(
id=user_id,
username=username,
display_name=display_name,
platform=Platform.YOUTUBE,
color=None, # YouTube doesn't provide user colors
roles=roles,
badges=badges,
)

View file

@ -1,9 +1,12 @@
from __future__ import annotations
import asyncio
from collections import deque
from dataclasses import dataclass, asdict
from typing import Any, Dict, Optional, Set
from app.chat_models import AuthTokens, ChatConfig, ChatMessage, Platform
@dataclass
class NowPlaying:
@ -30,6 +33,15 @@ class AppState:
self._ws_clients: Set[Any] = set()
self._lock = asyncio.Lock()
# Chat state
self.chat_messages: deque[ChatMessage] = deque(maxlen=100)
self.chat_config: ChatConfig = ChatConfig()
self.twitch_tokens: Optional[AuthTokens] = None
self.youtube_tokens: Optional[AuthTokens] = None
# Chat manager reference (set by main.py after creation)
self.chat_manager: Optional[Any] = None
async def set_now_playing(self, np: NowPlaying) -> None:
async with self._lock:
self.now_playing = np
@ -62,4 +74,42 @@ class AppState:
for ws in dead:
self._ws_clients.discard(ws)
async def add_chat_message(self, message: ChatMessage) -> None:
"""Add a chat message and broadcast to all connected clients."""
async with self._lock:
self.chat_messages.append(message)
await self.broadcast({
"type": "chat_message",
"data": message.to_dict(),
})
async def get_chat_messages(self, limit: int = 50) -> list[ChatMessage]:
"""Get recent chat messages."""
async with self._lock:
messages = list(self.chat_messages)
return messages[-limit:] if limit else messages
async def set_auth_tokens(self, platform: Platform, tokens: AuthTokens) -> None:
"""Store authentication tokens for a platform."""
async with self._lock:
if platform == Platform.TWITCH:
self.twitch_tokens = tokens
elif platform == Platform.YOUTUBE:
self.youtube_tokens = tokens
async def get_auth_tokens(self, platform: Platform) -> Optional[AuthTokens]:
"""Retrieve authentication tokens for a platform."""
async with self._lock:
if platform == Platform.TWITCH:
return self.twitch_tokens
elif platform == Platform.YOUTUBE:
return self.youtube_tokens
return None
async def update_chat_config(self, config: ChatConfig) -> None:
"""Update chat configuration."""
async with self._lock:
self.chat_config = config

View file

@ -4,12 +4,15 @@ from pathlib import Path
from aiohttp import WSMsgType, web
from app.chat_models import ChatConfig
from app.config import get_config_file, load_config, save_chat_settings
from app.paths import get_art_dir, get_web_assets_dir
from app.state import AppState
# Declare widgets once to avoid duplicated slugs/labels.
WIDGETS = [
{"slug": "nowplaying", "label": "Now Playing"},
{"slug": "livechat", "label": "Live Chat"},
]
@ -27,14 +30,49 @@ async def handle_root(request: web.Request) -> web.Response:
label = widget.get("label", slug or "Widget")
url = f"http://{request.host}/widgets/{slug}/" if slug else ""
item_html = f"""
<li class="widget-item">
<div class="widget-header">
<a class="widget-name" href="{url}" target="_blank">{label}</a>
</div>
<div class="widget-url-row">{url}</div>
</li>
"""
if slug == "livechat":
# Live Chat widget with options
item_html = f"""
<li class="widget-item">
<div class="widget-header">
<a id="livechat-open" class="widget-name" href="{url}" target="_blank">{label}</a>
</div>
<div class="widget-url-row">
<input type="hidden" id="livechat-base-url" value="{url}">
<input type="text" id="livechat-url" value="{url}" readonly>
<button class="copy-btn" onclick="copyUrl('livechat-url')">Copy</button>
</div>
<div class="widget-options">
<div class="option-group">
<label>Theme</label>
<select id="livechat-theme" onchange="updateLiveChatUrl()">
<option value="dark">Dark (transparent)</option>
<option value="light">Light</option>
</select>
</div>
<div class="option-group">
<label>Direction</label>
<select id="livechat-direction" onchange="updateLiveChatUrl()">
<option value="down">Down (scrolls down)</option>
<option value="up">Up (bubbles up, newest anchored)</option>
</select>
</div>
</div>
</li>
"""
else:
# Standard widget without options
item_html = f"""
<li class="widget-item">
<div class="widget-header">
<a class="widget-name" href="{url}" target="_blank">{label}</a>
</div>
<div class="widget-url-row">
<input type="text" id="{slug}-url" value="{url}" readonly>
<button class="copy-btn" onclick="copyUrl('{slug}-url')">Copy</button>
</div>
</li>
"""
widget_items.append(item_html)
widget_list_html = "\n".join(widget_items) if widget_items else '<li class="widget-item">No widgets configured</li>'
@ -50,6 +88,8 @@ async def handle_root(request: web.Request) -> web.Response:
async def handle_widget(request: web.Request) -> web.FileResponse:
slug = request.match_info.get("slug")
if not slug:
raise web.HTTPNotFound(text="Widget not found")
web_root = get_web_assets_dir()
index_path = web_root / "widgets" / slug / "index.html"
if index_path.exists():
@ -63,6 +103,124 @@ async def handle_nowplaying(request: web.Request) -> web.Response:
return web.json_response(np.to_dict())
async def handle_chat_messages(request: web.Request) -> web.Response:
"""API endpoint to get recent chat messages."""
state: AppState = request.app["state"]
limit = int(request.query.get("limit", 50))
messages = await state.get_chat_messages(limit)
return web.json_response([msg.to_dict() for msg in messages])
async def handle_chat_config_get(request: web.Request) -> web.Response:
"""Get current chat configuration."""
state: AppState = request.app["state"]
config = state.chat_config
return web.json_response(config.to_dict())
async def handle_chat_config_post(request: web.Request) -> web.Response:
"""Update chat configuration."""
state: AppState = request.app["state"]
data = await request.json()
# Check if channel settings changed (need to restart chat)
old_config = state.chat_config
new_twitch_channel = data.get("twitch_channel", "")
new_youtube_video_id = data.get("youtube_video_id", "")
channel_changed = (
old_config.twitch_channel != new_twitch_channel or
old_config.youtube_video_id != new_youtube_video_id
)
config = ChatConfig(
twitch_enabled=data.get("twitch_enabled", False),
youtube_enabled=data.get("youtube_enabled", False),
max_messages=data.get("max_messages", 50),
show_timestamps=data.get("show_timestamps", True),
show_badges=data.get("show_badges", True),
show_platform_icons=data.get("show_platform_icons", True),
unified_view=data.get("unified_view", True),
enable_ffz=data.get("enable_ffz", True),
enable_bttv=data.get("enable_bttv", True),
enable_7tv=data.get("enable_7tv", True),
filter_by_roles=data.get("filter_by_roles", []),
blocked_keywords=data.get("blocked_keywords", []),
min_message_length=data.get("min_message_length", 0),
twitch_channel=new_twitch_channel,
youtube_video_id=new_youtube_video_id,
)
await state.update_chat_config(config)
# Save chat settings to disk for persistence
save_chat_settings({
"twitch_channel": config.twitch_channel,
"youtube_video_id": config.youtube_video_id,
"max_messages": config.max_messages,
"show_timestamps": config.show_timestamps,
"show_badges": config.show_badges,
"show_platform_icons": config.show_platform_icons,
"unified_view": config.unified_view,
"enable_ffz": config.enable_ffz,
"enable_bttv": config.enable_bttv,
"enable_7tv": config.enable_7tv,
})
# Restart chat connections if channel settings changed
if channel_changed and state.chat_manager:
await state.chat_manager.restart()
return web.json_response({"status": "ok"})
async def handle_config_page(request: web.Request) -> web.FileResponse:
"""Serve the configuration page."""
config_path = get_web_assets_dir() / "config.html"
return web.FileResponse(path=str(config_path))
async def handle_oauth_status(request: web.Request) -> web.Response:
"""Get OAuth configuration status."""
app_config = load_config()
return web.json_response({
"twitch_configured": app_config.twitch_oauth.is_configured(),
"youtube_configured": app_config.youtube_oauth.is_configured(),
"config_file": str(get_config_file()),
})
async def handle_auth_status(request: web.Request) -> web.Response:
"""Get authentication status (whether user has logged in)."""
from app.chat_models import Platform
state: AppState = request.app["state"]
twitch_tokens = await state.get_auth_tokens(Platform.TWITCH)
youtube_tokens = await state.get_auth_tokens(Platform.YOUTUBE)
return web.json_response({
"twitch_authenticated": twitch_tokens is not None and not twitch_tokens.is_expired(),
"youtube_authenticated": youtube_tokens is not None and not youtube_tokens.is_expired(),
})
async def handle_open_config_dir(request: web.Request) -> web.Response:
"""Open the config directory in file explorer."""
from app.config import open_config_directory
success = open_config_directory()
if success:
return web.json_response({"status": "ok", "message": "Opened config directory"})
else:
return web.json_response(
{"status": "error", "message": "Failed to open directory"},
status=500
)
async def handle_ws(request: web.Request) -> web.WebSocketResponse:
state: AppState = request.app["state"]
ws = web.WebSocketResponse(heartbeat=30)
@ -70,10 +228,17 @@ async def handle_ws(request: web.Request) -> web.WebSocketResponse:
await state.register_ws(ws)
try:
# Send initial snapshot
# Send initial snapshots
np = await state.get_now_playing()
await ws.send_json({"type": "nowplaying", "data": np.to_dict()})
# Send chat history
chat_messages = await state.get_chat_messages(50)
await ws.send_json({
"type": "chat_history",
"data": [msg.to_dict() for msg in chat_messages]
})
async for msg in ws:
if msg.type == WSMsgType.TEXT:
# Currently no client->server messages required
@ -87,6 +252,8 @@ async def handle_ws(request: web.Request) -> web.WebSocketResponse:
def make_app(state: AppState) -> web.Application:
from app.auth import register_auth_routes
app = web.Application()
app["state"] = state
@ -95,12 +262,20 @@ def make_app(state: AppState) -> web.Application:
# Pages / API
app.router.add_get("/", handle_root)
for widget in WIDGETS:
slug = widget["slug"]
app.router.add_get(f"/widgets/{slug}/", handle_widget)
app.router.add_get("/config", handle_config_page)
app.router.add_get("/widgets/{slug}/", handle_widget)
app.router.add_get("/api/nowplaying", handle_nowplaying)
app.router.add_get("/api/chat/messages", handle_chat_messages)
app.router.add_get("/api/chat/config", handle_chat_config_get)
app.router.add_post("/api/chat/config", handle_chat_config_post)
app.router.add_get("/api/oauth/status", handle_oauth_status)
app.router.add_get("/api/auth/status", handle_auth_status)
app.router.add_post("/api/config/open-directory", handle_open_config_dir)
app.router.add_get("/ws", handle_ws)
# Register OAuth routes
register_auth_routes(app)
# Widget static routing
# e.g. /widgets/nowplaying/ -> web/widgets/nowplaying/index.html
app.router.add_static(