from __future__ import annotations import logging import os import sys from datetime import datetime from typing import Any, Dict, Optional from flask import request from flask_socketio import SocketIO, emit, join_room from models import Display, db from sync import get_current_event, get_trigger_event, mark_trigger_display_ended, server_time_ms logger = logging.getLogger(__name__) _env_async_mode = os.environ.get("ASYNC_MODE") # e.g. gevent | eventlet | threading | None(auto) # IMPORTANT: # - On Windows + Python 3.14+, eventlet is currently incompatible and may crash if auto-selected. # - If ASYNC_MODE is not provided, default to 'threading' on Windows/Py3.14+. if _env_async_mode: _async_mode = _env_async_mode else: if os.name == "nt" or sys.version_info >= (3, 14): _async_mode = "threading" else: _async_mode = None # let Socket.IO auto-select (eventlet/gevent if installed) socketio = SocketIO( async_mode=_async_mode or None, cors_allowed_origins="*", logger=False, engineio_logger=False, ping_interval=5, ping_timeout=10, ) _monitor_started = False def start_event_monitor(app) -> None: """Background task that detects when a trigger event expires by time.""" global _monitor_started if _monitor_started: return _monitor_started = True def _run(): while True: try: with app.app_context(): trig = get_trigger_event() except Exception: logger.exception("event monitor failed") socketio.sleep(0.5) socketio.start_background_task(_run) # In-memory live stats (for admin dashboard). Keyed by display public_id. _live: Dict[str, Dict[str, Any]] = {} def _set_live(public_id: str, **kwargs: Any) -> None: d = _live.setdefault(public_id, {}) d.update(kwargs) def get_live_snapshot() -> Dict[str, Dict[str, Any]]: return {k: dict(v) for k, v in _live.items()} @socketio.on("connect") def on_connect(): # Client will send hello with public_id emit("connected", {"server_time_ms": server_time_ms()}) @socketio.on("hello") def on_hello(data: Dict[str, Any]): public_id = (data or {}).get("public_id") if not public_id: emit("error", {"error": "missing public_id"}) return join_room(f"display:{public_id}") disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none() if disp is None: # Auto-create to be resilient on LAN; admin can rename later. disp = Display(name=f"Display {public_id}", public_id=public_id) db.session.add(disp) disp.is_online = True disp.last_seen = datetime.utcnow() db.session.commit() _set_live(public_id, sid=request.sid, user_agent=request.headers.get("User-Agent", ""), last_seen=disp.last_seen.isoformat()) emit("hello_ack", {"server_time_ms": server_time_ms()}) socketio.emit("admin_display_update", {"public_id": public_id, "is_online": True, "last_seen": disp.last_seen.isoformat()}, room="admin") @socketio.on("heartbeat") def on_heartbeat(data: Dict[str, Any]): public_id = (data or {}).get("public_id") if not public_id: return latency_ms = (data or {}).get("latency_ms") offset_ms = (data or {}).get("offset_ms") ready = bool((data or {}).get("ready")) _set_live(public_id, latency_ms=latency_ms, offset_ms=offset_ms, ready=ready, last_seen=datetime.utcnow().isoformat()) disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none() if disp: disp.last_seen = datetime.utcnow() disp.is_online = True db.session.commit() @socketio.on("time_sync") def on_time_sync(data: Dict[str, Any]): # NTP-style time sync: # - client sends t1 (client send time) # - server records t2 (server receive time) # - server replies with t1, t2, t3 (server send time) # Client will record t4 on receive. t1 = float((data or {}).get("t1_ms", 0.0)) t2 = server_time_ms() # Keep a distinct send timestamp (avoid t2==t3 due to same clock read). t3 = server_time_ms() emit("time_sync", {"t1_ms": t1, "t2_ms": t2, "t3_ms": t3}) @socketio.on("request_state") def on_request_state(data: Dict[str, Any]): public_id = (data or {}).get("public_id") if not public_id: return st = get_current_event() if not st: emit("event_state", {"active": False, "server_time_ms": server_time_ms()}) return video_url = st.assignments.get(public_id) if not video_url: emit("event_state", {"active": False, "server_time_ms": server_time_ms()}) return emit( "event_state", { "active": True, "event_id": st.event_id, "event_name": st.name, "video_url": video_url, "start_time_ms": st.start_time_ms, "server_time_ms": server_time_ms(), }, ) @socketio.on("playback_ended") def on_playback_ended(data: Dict[str, Any]): public_id = (data or {}).get("public_id") event_id = (data or {}).get("event_id") if not public_id or not event_id: return try: ended_all = mark_trigger_display_ended(int(event_id), str(public_id)) except Exception: return if ended_all: # Trigger event fully ended; displays will fall back to idle image. return @socketio.on("admin_join") def on_admin_join(): join_room("admin") emit("admin_snapshot", {"server_time_ms": server_time_ms(), "live": get_live_snapshot()}) @socketio.on("disconnect") def on_disconnect(): # We don't know which display disconnected unless tracked in _live sid = request.sid public_id: Optional[str] = None for pid, info in list(_live.items()): if info.get("sid") == sid: public_id = pid break if not public_id: return try: disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none() if disp: disp.is_online = False disp.last_seen = datetime.utcnow() db.session.commit() except Exception: logger.exception("Failed to mark display offline") _set_live(public_id, is_online=False) socketio.emit("admin_display_update", {"public_id": public_id, "is_online": False, "last_seen": datetime.utcnow().isoformat()}, room="admin")