Files
openslide/app/routes/api.py
2026-01-26 15:34:52 +01:00

549 lines
18 KiB
Python

from datetime import datetime, timedelta
import hashlib
import json
import os
import time
import re
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from xml.etree import ElementTree as ET
from flask import Blueprint, Response, abort, jsonify, request, stream_with_context, url_for
from ..extensions import db
from ..models import Company, Display, DisplayPlaylist, DisplaySession, Playlist, PlaylistItem
from ..uploads import is_valid_upload_relpath
bp = Blueprint("api", __name__, url_prefix="/api")
MAX_ACTIVE_SESSIONS_PER_DISPLAY = 3
SESSION_TTL_SECONDS = 90
# RSS ticker cache (in-memory; OK for this small app; avoids hammering feeds)
#
# Default is intentionally long because displays can refresh headlines on a long interval
# (e.g., twice per day) and we don't want many displays to re-fetch the same feed.
# Override via env var `TICKER_CACHE_TTL_SECONDS`.
def _env_int(name: str, default: int) -> int:
try:
return int(os.environ.get(name, "") or default)
except (TypeError, ValueError):
return default
TICKER_CACHE_TTL_SECONDS = max(10, _env_int("TICKER_CACHE_TTL_SECONDS", 6 * 60 * 60))
_TICKER_CACHE: dict[str, dict] = {}
def _is_playlist_active_now(p: Playlist, now_utc: datetime) -> bool:
"""Return True if playlist is active based on its optional schedule window."""
if p.schedule_start and now_utc < p.schedule_start:
return False
if p.schedule_end and now_utc > p.schedule_end:
return False
return True
def _enforce_and_touch_display_session(display: Display, sid: str | None):
"""Enforce concurrent display viewer limit and touch last_seen.
Returns:
(ok, response)
- ok=True: caller may proceed
- ok=False: response is a Flask response tuple to return
"""
sid = (sid or "").strip()
if not sid:
return True, None
cutoff = datetime.utcnow() - timedelta(seconds=SESSION_TTL_SECONDS)
# Cleanup old sessions. Avoid committing if nothing was deleted (saves write locks on SQLite).
deleted = (
DisplaySession.query.filter(
DisplaySession.display_id == display.id,
DisplaySession.last_seen_at < cutoff,
).delete(synchronize_session=False)
)
if deleted:
db.session.commit()
existing = DisplaySession.query.filter_by(display_id=display.id, sid=sid).first()
if existing:
existing.last_seen_at = datetime.utcnow()
db.session.commit()
return True, None
active_count = (
DisplaySession.query.filter(
DisplaySession.display_id == display.id,
DisplaySession.last_seen_at >= cutoff,
).count()
)
if active_count >= MAX_ACTIVE_SESSIONS_PER_DISPLAY:
return (
False,
(
jsonify(
{
"error": "display_limit_reached",
"message": f"This display URL is already open on {MAX_ACTIVE_SESSIONS_PER_DISPLAY} displays.",
}
),
429,
),
)
s = DisplaySession(
display_id=display.id,
sid=sid,
last_seen_at=datetime.utcnow(),
ip=request.headers.get("X-Forwarded-For", request.remote_addr),
user_agent=(request.headers.get("User-Agent") or "")[:300],
)
db.session.add(s)
db.session.commit()
return True, None
def _playlist_signature(display: Display) -> tuple[int | None, str]:
"""Compute a stable hash for what the player should be showing.
We include enough information so that changing the assigned playlist, reordering,
duration changes, and item adds/deletes trigger an update.
"""
# Determine active playlists. If display_playlist has any rows, use those.
# Otherwise fall back to the legacy assigned_playlist_id.
mapped_ids = [
r[0]
for r in db.session.query(DisplayPlaylist.playlist_id)
.filter(DisplayPlaylist.display_id == display.id)
.order_by(DisplayPlaylist.position.asc(), DisplayPlaylist.playlist_id.asc())
.all()
]
use_mapping = bool(mapped_ids)
active_ids = mapped_ids
if not active_ids and display.assigned_playlist_id:
active_ids = [display.assigned_playlist_id]
use_mapping = False
if not active_ids:
raw = "no-playlist"
return None, hashlib.sha1(raw.encode("utf-8")).hexdigest()
# Apply scheduling + priority rule so a schedule change triggers a player refresh.
playlists = Playlist.query.filter(Playlist.id.in_(active_ids)).all()
now_utc = datetime.utcnow()
scheduled = [p for p in playlists if _is_playlist_active_now(p, now_utc)]
if any(p.is_priority for p in scheduled):
scheduled = [p for p in scheduled if p.is_priority]
active_ids = [x for x in active_ids if any(p.id == x for p in scheduled)]
if not active_ids:
raw = "no-active-playlist"
return None, hashlib.sha1(raw.encode("utf-8")).hexdigest()
# Pull items in a stable order so reordering affects signature.
if use_mapping:
items = (
PlaylistItem.query.join(DisplayPlaylist, DisplayPlaylist.playlist_id == PlaylistItem.playlist_id)
.filter(
DisplayPlaylist.display_id == display.id,
PlaylistItem.playlist_id.in_(active_ids),
)
.order_by(DisplayPlaylist.position.asc(), PlaylistItem.position.asc())
.all()
)
else:
items = (
PlaylistItem.query.filter(PlaylistItem.playlist_id == active_ids[0])
.order_by(PlaylistItem.position.asc())
.all()
)
payload = {
"playlist_ids": list(active_ids),
"items": [
{
"id": it.id,
"playlist_id": it.playlist_id,
"pos": it.position,
"type": it.item_type,
"title": it.title,
"duration": it.duration_seconds,
"file_path": it.file_path,
"url": it.url,
}
for it in items
],
}
raw = json.dumps(payload, sort_keys=True, separators=(",", ":"))
# signature returns a single playlist_id previously; now return None when multiple.
# callers only use it for changed-detection.
if len(set(active_ids)) == 1:
return active_ids[0], hashlib.sha1(raw.encode("utf-8")).hexdigest()
return None, hashlib.sha1(raw.encode("utf-8")).hexdigest()
def _is_http_url_allowed(url: str) -> bool:
"""Basic SSRF hardening: only allow http(s) and disallow obvious local targets."""
try:
u = urlparse(url)
except Exception:
return False
if u.scheme not in {"http", "https"}:
return False
host = (u.hostname or "").strip().lower()
if not host:
return False
# Block localhost and common local domains.
if host in {"localhost", "127.0.0.1", "::1"}:
return False
# Block RFC1918-ish and link-local targets when host is an IP.
# Note: this is best-effort; proper SSRF protection would require DNS resolution too.
if re.match(r"^\d+\.\d+\.\d+\.\d+$", host):
parts = [int(x) for x in host.split(".")]
if parts[0] == 10:
return False
if parts[0] == 127:
return False
if parts[0] == 169 and parts[1] == 254:
return False
if parts[0] == 192 and parts[1] == 168:
return False
if parts[0] == 172 and 16 <= parts[1] <= 31:
return False
return True
def _strip_text(s: str) -> str:
s = (s or "").strip()
s = re.sub(r"\s+", " ", s)
return s
def _fetch_rss_titles(url: str, *, limit: int = 20) -> list[str]:
"""Fetch RSS/Atom titles from a feed URL.
We intentionally avoid adding dependencies (feedparser) for this project.
This implementation is tolerant enough for typical RSS2/Atom feeds.
"""
req = Request(
url,
headers={
"User-Agent": "SignageTicker/1.0 (+https://example.invalid)",
"Accept": "application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
},
method="GET",
)
with urlopen(req, timeout=8) as resp:
# Basic size cap (avoid reading huge responses into memory)
raw = resp.read(2_000_000) # 2MB
try:
root = ET.fromstring(raw)
except Exception:
return []
titles: list[str] = []
# RSS2: <rss><channel><item><title>
for el in root.findall(".//item/title"):
t = _strip_text("".join(el.itertext()))
if t:
titles.append(t)
# Atom: <feed><entry><title>
if not titles:
for el in root.findall(".//{*}entry/{*}title"):
t = _strip_text("".join(el.itertext()))
if t:
titles.append(t)
# Some feeds may have <channel><title> etc; we only want entry titles.
# Deduplicate while preserving order.
deduped: list[str] = []
seen = set()
for t in titles:
if t in seen:
continue
seen.add(t)
deduped.append(t)
if len(deduped) >= limit:
break
return deduped
def _get_ticker_titles_cached(url: str) -> tuple[list[str], bool]:
"""Return (titles, from_cache)."""
now = time.time()
key = (url or "").strip()
if not key:
return [], True
entry = _TICKER_CACHE.get(key)
if entry and (now - float(entry.get("ts") or 0)) < TICKER_CACHE_TTL_SECONDS:
return (entry.get("titles") or []), True
titles: list[str] = []
try:
if _is_http_url_allowed(key):
titles = _fetch_rss_titles(key)
except Exception:
titles = []
_TICKER_CACHE[key] = {"ts": now, "titles": titles}
return titles, False
@bp.get("/display/<token>/playlist")
def display_playlist(token: str):
display = Display.query.filter_by(token=token).first()
if not display:
abort(404)
company = Company.query.filter_by(id=display.company_id).first()
# Optional overlay URL (per-company) when enabled on this display.
overlay_src = None
if display.show_overlay:
if company and company.overlay_file_path and is_valid_upload_relpath(company.overlay_file_path):
overlay_src = url_for("static", filename=company.overlay_file_path)
# Enforce: a display URL/token can be opened by max 3 concurrently active sessions.
# Player sends a stable `sid` via querystring.
sid = request.args.get("sid")
ok, resp = _enforce_and_touch_display_session(display, sid)
if not ok:
return resp
# Ticker settings are configured per-company; displays can enable/disable individually.
ticker_cfg = {
"enabled": bool(display.ticker_enabled),
"rss_url": (company.ticker_rss_url if company else None),
"color": (company.ticker_color if company else None),
"bg_color": (company.ticker_bg_color if company else None),
"bg_opacity": (company.ticker_bg_opacity if company else None),
"font_family": (company.ticker_font_family if company else None),
"font_size_px": (company.ticker_font_size_px if company else None),
"speed": (company.ticker_speed if company else None),
}
# Determine active playlists. If display_playlist has any rows, use those.
# Otherwise fall back to the legacy assigned_playlist_id.
mapped_ids = [
r[0]
for r in db.session.query(DisplayPlaylist.playlist_id)
.filter(DisplayPlaylist.display_id == display.id)
.order_by(DisplayPlaylist.position.asc(), DisplayPlaylist.playlist_id.asc())
.all()
]
use_mapping = bool(mapped_ids)
active_ids = mapped_ids
if not active_ids and display.assigned_playlist_id:
active_ids = [display.assigned_playlist_id]
use_mapping = False
if not active_ids:
return jsonify(
{
"display": display.name,
"transition": display.transition or "none",
"overlay_src": overlay_src,
"ticker": ticker_cfg,
"playlists": [],
"items": [],
}
)
playlists = Playlist.query.filter(Playlist.id.in_(active_ids)).all()
# Filter playlists by schedule
now_utc = datetime.utcnow()
scheduled = [p for p in playlists if _is_playlist_active_now(p, now_utc)]
# Priority rule:
# If any active (scheduled) playlist is marked priority, only play priority playlists.
any_priority = any(p.is_priority for p in scheduled)
if any_priority:
scheduled = [p for p in scheduled if p.is_priority]
pl_by_id = {p.id: p for p in scheduled}
scheduled_ids = [x for x in active_ids if x in pl_by_id]
ordered_playlists = [pl_by_id[x] for x in scheduled_ids]
# Merge items across active playlists.
if use_mapping:
merged = (
PlaylistItem.query.join(DisplayPlaylist, DisplayPlaylist.playlist_id == PlaylistItem.playlist_id)
.filter(
DisplayPlaylist.display_id == display.id,
PlaylistItem.playlist_id.in_(scheduled_ids),
)
.order_by(DisplayPlaylist.position.asc(), PlaylistItem.position.asc())
.all()
)
else:
# single-playlist fallback; apply schedule filter too.
if scheduled_ids:
merged = (
PlaylistItem.query.filter(PlaylistItem.playlist_id == scheduled_ids[0])
.order_by(PlaylistItem.position.asc())
.all()
)
else:
merged = []
items = []
for item in merged:
payload = {
"id": item.id,
"playlist_id": item.playlist_id,
"playlist_name": (pl_by_id.get(item.playlist_id).name if pl_by_id.get(item.playlist_id) else None),
"type": item.item_type,
"title": item.title,
"duration": item.duration_seconds,
}
if item.item_type in ("image", "video") and item.file_path:
payload["src"] = url_for("static", filename=item.file_path)
if item.item_type in ("webpage", "youtube"):
payload["url"] = item.url
items.append(payload)
return jsonify(
{
"display": display.name,
"transition": display.transition or "none",
"overlay_src": overlay_src,
"ticker": ticker_cfg,
"playlists": [{"id": p.id, "name": p.name} for p in ordered_playlists],
"items": items,
}
)
@bp.get("/display/<token>/ticker")
def display_ticker(token: str):
"""Return ticker headlines for a display.
We keep it separate from /playlist so the player can refresh headlines on its own interval.
"""
display = Display.query.filter_by(token=token).first()
if not display:
abort(404)
company = Company.query.filter_by(id=display.company_id).first()
# Enforce concurrent session limit the same way as /playlist.
sid = request.args.get("sid")
ok, resp = _enforce_and_touch_display_session(display, sid)
if not ok:
return resp
if not display.ticker_enabled:
return jsonify({"enabled": False, "headlines": []})
rss_url = ((company.ticker_rss_url if company else None) or "").strip()
if not rss_url:
return jsonify({"enabled": True, "headlines": []})
titles, from_cache = _get_ticker_titles_cached(rss_url)
return jsonify(
{
"enabled": True,
"rss_url": rss_url,
"headlines": titles,
"cached": bool(from_cache),
}
)
@bp.get("/display/<token>/events")
def display_events(token: str):
"""Server-Sent Events stream to notify the player when its playlist changes."""
display = Display.query.filter_by(token=token).first()
if not display:
abort(404)
sid = request.args.get("sid")
ok, resp = _enforce_and_touch_display_session(display, sid)
if not ok:
return resp
display_id = display.id
sid = (sid or "").strip() or None
@stream_with_context
def _gen():
last_hash = None
last_touch = 0.0
keepalive_counter = 0
while True:
try:
# Refresh from DB each loop so changes become visible.
d = Display.query.filter_by(id=display_id).first()
if not d:
yield "event: closed\ndata: {}\n\n"
return
playlist_id, h = _playlist_signature(d)
if h != last_hash:
last_hash = h
payload = json.dumps({"playlist_id": playlist_id, "hash": h})
yield f"event: changed\ndata: {payload}\n\n"
# Touch session periodically so SSE-only viewers don't time out.
now = time.time()
if sid and (now - last_touch) >= 30:
last_touch = now
existing = DisplaySession.query.filter_by(display_id=display_id, sid=sid).first()
if existing:
existing.last_seen_at = datetime.utcnow()
db.session.commit()
# Keep-alive comment (prevents some proxies from closing idle streams).
keepalive_counter += 1
if keepalive_counter >= 10: # ~20s with the sleep below
keepalive_counter = 0
yield ": keep-alive\n\n"
# Release DB connections between iterations.
db.session.remove()
time.sleep(2)
except GeneratorExit:
return
except Exception:
# Avoid tight error loops.
try:
db.session.remove()
except Exception:
pass
time.sleep(2)
return Response(
_gen(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)