Files
2026-02-12 10:50:49 +01:00

206 lines
6.3 KiB
Python

from __future__ import annotations
import logging
import os
import sys
from datetime import datetime
from typing import Any, Dict, Optional
from flask import request
from flask_socketio import SocketIO, emit, join_room
from models import Display, db
from sync import get_current_event, get_trigger_event, mark_trigger_display_ended, server_time_ms
logger = logging.getLogger(__name__)
_env_async_mode = os.environ.get("ASYNC_MODE") # e.g. gevent | eventlet | threading | None(auto)
# IMPORTANT:
# - On Windows + Python 3.14+, eventlet is currently incompatible and may crash if auto-selected.
# - If ASYNC_MODE is not provided, default to 'threading' on Windows/Py3.14+.
if _env_async_mode:
_async_mode = _env_async_mode
else:
if os.name == "nt" or sys.version_info >= (3, 14):
_async_mode = "threading"
else:
_async_mode = None # let Socket.IO auto-select (eventlet/gevent if installed)
socketio = SocketIO(
async_mode=_async_mode or None,
cors_allowed_origins="*",
logger=False,
engineio_logger=False,
ping_interval=5,
ping_timeout=10,
)
_monitor_started = False
def start_event_monitor(app) -> None:
"""Background task that detects when a trigger event expires by time."""
global _monitor_started
if _monitor_started:
return
_monitor_started = True
def _run():
while True:
try:
with app.app_context():
trig = get_trigger_event()
except Exception:
logger.exception("event monitor failed")
socketio.sleep(0.5)
socketio.start_background_task(_run)
# In-memory live stats (for admin dashboard). Keyed by display public_id.
_live: Dict[str, Dict[str, Any]] = {}
def _set_live(public_id: str, **kwargs: Any) -> None:
d = _live.setdefault(public_id, {})
d.update(kwargs)
def get_live_snapshot() -> Dict[str, Dict[str, Any]]:
return {k: dict(v) for k, v in _live.items()}
@socketio.on("connect")
def on_connect():
# Client will send hello with public_id
emit("connected", {"server_time_ms": server_time_ms()})
@socketio.on("hello")
def on_hello(data: Dict[str, Any]):
public_id = (data or {}).get("public_id")
if not public_id:
emit("error", {"error": "missing public_id"})
return
join_room(f"display:{public_id}")
disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none()
if disp is None:
# Auto-create to be resilient on LAN; admin can rename later.
disp = Display(name=f"Display {public_id}", public_id=public_id)
db.session.add(disp)
disp.is_online = True
disp.last_seen = datetime.utcnow()
db.session.commit()
_set_live(public_id, sid=request.sid, user_agent=request.headers.get("User-Agent", ""), last_seen=disp.last_seen.isoformat())
emit("hello_ack", {"server_time_ms": server_time_ms()})
socketio.emit("admin_display_update", {"public_id": public_id, "is_online": True, "last_seen": disp.last_seen.isoformat()}, room="admin")
@socketio.on("heartbeat")
def on_heartbeat(data: Dict[str, Any]):
public_id = (data or {}).get("public_id")
if not public_id:
return
latency_ms = (data or {}).get("latency_ms")
offset_ms = (data or {}).get("offset_ms")
ready = bool((data or {}).get("ready"))
_set_live(public_id, latency_ms=latency_ms, offset_ms=offset_ms, ready=ready, last_seen=datetime.utcnow().isoformat())
disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none()
if disp:
disp.last_seen = datetime.utcnow()
disp.is_online = True
db.session.commit()
@socketio.on("time_sync")
def on_time_sync(data: Dict[str, Any]):
# NTP-style time sync:
# - client sends t1 (client send time)
# - server records t2 (server receive time)
# - server replies with t1, t2, t3 (server send time)
# Client will record t4 on receive.
t1 = float((data or {}).get("t1_ms", 0.0))
t2 = server_time_ms()
# Keep a distinct send timestamp (avoid t2==t3 due to same clock read).
t3 = server_time_ms()
emit("time_sync", {"t1_ms": t1, "t2_ms": t2, "t3_ms": t3})
@socketio.on("request_state")
def on_request_state(data: Dict[str, Any]):
public_id = (data or {}).get("public_id")
if not public_id:
return
st = get_current_event()
if not st:
emit("event_state", {"active": False, "server_time_ms": server_time_ms()})
return
video_url = st.assignments.get(public_id)
if not video_url:
emit("event_state", {"active": False, "server_time_ms": server_time_ms()})
return
emit(
"event_state",
{
"active": True,
"event_id": st.event_id,
"event_name": st.name,
"video_url": video_url,
"start_time_ms": st.start_time_ms,
"server_time_ms": server_time_ms(),
},
)
@socketio.on("playback_ended")
def on_playback_ended(data: Dict[str, Any]):
public_id = (data or {}).get("public_id")
event_id = (data or {}).get("event_id")
if not public_id or not event_id:
return
try:
ended_all = mark_trigger_display_ended(int(event_id), str(public_id))
except Exception:
return
if ended_all:
# Trigger event fully ended; displays will fall back to idle image.
return
@socketio.on("admin_join")
def on_admin_join():
join_room("admin")
emit("admin_snapshot", {"server_time_ms": server_time_ms(), "live": get_live_snapshot()})
@socketio.on("disconnect")
def on_disconnect():
# We don't know which display disconnected unless tracked in _live
sid = request.sid
public_id: Optional[str] = None
for pid, info in list(_live.items()):
if info.get("sid") == sid:
public_id = pid
break
if not public_id:
return
try:
disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none()
if disp:
disp.is_online = False
disp.last_seen = datetime.utcnow()
db.session.commit()
except Exception:
logger.exception("Failed to mark display offline")
_set_live(public_id, is_online=False)
socketio.emit("admin_display_update", {"public_id": public_id, "is_online": False, "last_seen": datetime.utcnow().isoformat()}, room="admin")