174 lines
5.5 KiB
Python
174 lines
5.5 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import os
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Tuple
|
|
|
|
from winsdk.windows.media.control import (
|
|
GlobalSystemMediaTransportControlsSessionManager as SessionManager,
|
|
GlobalSystemMediaTransportControlsSessionPlaybackStatus as PlaybackStatus,
|
|
)
|
|
from winsdk.windows.storage.streams import DataReader
|
|
|
|
from app.paths import get_art_dir
|
|
from app.state import AppState, NowPlaying
|
|
|
|
|
|
ART_FILENAME = "album.png" # overwritten when track changes
|
|
PLACEHOLDER_FILENAME = "placeholder.png"
|
|
PLACEHOLDER_PNG_B64 = (
|
|
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNk+A8AAwUBAO+X2F8A"
|
|
"AAAASUVORK5CYII="
|
|
)
|
|
|
|
|
|
def _write_placeholder(out_path: Path) -> None:
|
|
data = base64.b64decode(PLACEHOLDER_PNG_B64)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_bytes(data)
|
|
|
|
|
|
def ensure_art_files(art_dir: Path) -> None:
|
|
placeholder_path = art_dir / PLACEHOLDER_FILENAME
|
|
if not placeholder_path.exists():
|
|
_write_placeholder(placeholder_path)
|
|
|
|
album_path = art_dir / ART_FILENAME
|
|
if not album_path.exists():
|
|
_write_placeholder(album_path)
|
|
|
|
|
|
async def _read_thumbnail_to_file(session: Any, out_path: Path) -> bool:
|
|
try:
|
|
media_props = await session.try_get_media_properties_async()
|
|
thumb_ref = media_props.thumbnail
|
|
if thumb_ref is None:
|
|
return False
|
|
|
|
stream = await thumb_ref.open_read_async()
|
|
size = int(stream.size or 0)
|
|
if size <= 0:
|
|
return False
|
|
|
|
reader = DataReader(stream)
|
|
await reader.load_async(size)
|
|
buffer = bytearray(size)
|
|
reader.read_bytes(buffer)
|
|
out_path.write_bytes(buffer)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _pick_best_session(sessions: Any) -> Any:
|
|
best = None
|
|
for s in sessions:
|
|
try:
|
|
info = s.get_playback_info()
|
|
status = info.playback_status if info else None
|
|
if status == PlaybackStatus.PLAYING:
|
|
return s
|
|
if best is None:
|
|
best = s
|
|
except Exception:
|
|
continue
|
|
return best
|
|
|
|
|
|
def _extract_album_from_artist(artist_raw: str) -> Tuple[str, str]:
|
|
if not artist_raw:
|
|
return "", ""
|
|
m = re.search(r"\s*\[ALBUM:(.*?)\]\s*$", artist_raw, re.IGNORECASE)
|
|
if not m:
|
|
return artist_raw.strip(), ""
|
|
album_hint = m.group(1).strip()
|
|
clean_artist = artist_raw[: m.start()].strip()
|
|
return clean_artist, album_hint
|
|
|
|
|
|
async def run_gsmtc_provider(state: AppState) -> None:
|
|
"""
|
|
Poll GSMTC and push state updates + broadcast over websocket.
|
|
"""
|
|
art_dir = get_art_dir()
|
|
ensure_art_files(art_dir)
|
|
|
|
last_art_sig: str | None = None
|
|
last_has_art = False
|
|
|
|
while True:
|
|
try:
|
|
manager = await SessionManager.request_async()
|
|
sessions = manager.get_sessions()
|
|
session = _pick_best_session(sessions)
|
|
|
|
if session is None:
|
|
_write_placeholder(art_dir / ART_FILENAME)
|
|
last_art_sig = None
|
|
last_has_art = False
|
|
np = NowPlaying(updated_unix=int(time.time()))
|
|
await state.set_now_playing(np)
|
|
await state.broadcast({"type": "nowplaying", "data": np.to_dict()})
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
app_id = ""
|
|
try:
|
|
app_id = session.source_app_user_model_id or ""
|
|
except Exception:
|
|
pass
|
|
|
|
info = session.get_playback_info()
|
|
status = info.playback_status if info else None
|
|
playing = status == PlaybackStatus.PLAYING
|
|
|
|
props = await session.try_get_media_properties_async()
|
|
|
|
title = getattr(props, "title", "") or ""
|
|
album = getattr(props, "album_title", "") or getattr(props, "album", "") or ""
|
|
artist_raw = getattr(props, "artist", "") or getattr(props, "album_artist", "") or ""
|
|
artist, album_hint = _extract_album_from_artist(artist_raw)
|
|
if not album and album_hint:
|
|
album = album_hint
|
|
|
|
track_key = f"{app_id}||{title}||{album}||{artist}"
|
|
has_thumb = getattr(props, "thumbnail", None) is not None
|
|
art_sig = f"{track_key}||thumb:{int(has_thumb)}"
|
|
|
|
art_available = last_has_art
|
|
if art_sig != last_art_sig:
|
|
out_path = art_dir / ART_FILENAME
|
|
if has_thumb:
|
|
wrote = await _read_thumbnail_to_file(session, out_path)
|
|
if not wrote:
|
|
_write_placeholder(out_path)
|
|
art_available = wrote
|
|
else:
|
|
_write_placeholder(out_path)
|
|
art_available = False
|
|
last_art_sig = art_sig if (title or album or artist) else None
|
|
last_has_art = art_available
|
|
|
|
np = NowPlaying(
|
|
title=title,
|
|
album=album,
|
|
artist=artist,
|
|
playing=playing,
|
|
source_app=app_id,
|
|
art_url=f"/art/{ART_FILENAME}",
|
|
has_art=last_has_art,
|
|
updated_unix=int(time.time()),
|
|
)
|
|
await state.set_now_playing(np)
|
|
await state.broadcast({"type": "nowplaying", "data": np.to_dict()})
|
|
except Exception:
|
|
# transient errors: keep last state
|
|
pass
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
|