206 lines
6.3 KiB
Python
206 lines
6.3 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Optional
|
|
|
|
from flask import request
|
|
from flask_socketio import SocketIO, emit, join_room
|
|
|
|
from models import Display, db
|
|
from sync import get_current_event, get_trigger_event, mark_trigger_display_ended, server_time_ms
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
_env_async_mode = os.environ.get("ASYNC_MODE") # e.g. gevent | eventlet | threading | None(auto)
|
|
|
|
# IMPORTANT:
|
|
# - On Windows + Python 3.14+, eventlet is currently incompatible and may crash if auto-selected.
|
|
# - If ASYNC_MODE is not provided, default to 'threading' on Windows/Py3.14+.
|
|
if _env_async_mode:
|
|
_async_mode = _env_async_mode
|
|
else:
|
|
if os.name == "nt" or sys.version_info >= (3, 14):
|
|
_async_mode = "threading"
|
|
else:
|
|
_async_mode = None # let Socket.IO auto-select (eventlet/gevent if installed)
|
|
|
|
socketio = SocketIO(
|
|
async_mode=_async_mode or None,
|
|
cors_allowed_origins="*",
|
|
logger=False,
|
|
engineio_logger=False,
|
|
ping_interval=5,
|
|
ping_timeout=10,
|
|
)
|
|
|
|
|
|
_monitor_started = False
|
|
|
|
|
|
def start_event_monitor(app) -> None:
|
|
"""Background task that detects when a trigger event expires by time."""
|
|
global _monitor_started
|
|
if _monitor_started:
|
|
return
|
|
_monitor_started = True
|
|
|
|
def _run():
|
|
while True:
|
|
try:
|
|
with app.app_context():
|
|
trig = get_trigger_event()
|
|
except Exception:
|
|
logger.exception("event monitor failed")
|
|
socketio.sleep(0.5)
|
|
|
|
socketio.start_background_task(_run)
|
|
|
|
|
|
# In-memory live stats (for admin dashboard). Keyed by display public_id.
|
|
_live: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
|
def _set_live(public_id: str, **kwargs: Any) -> None:
|
|
d = _live.setdefault(public_id, {})
|
|
d.update(kwargs)
|
|
|
|
|
|
def get_live_snapshot() -> Dict[str, Dict[str, Any]]:
|
|
return {k: dict(v) for k, v in _live.items()}
|
|
|
|
|
|
@socketio.on("connect")
|
|
def on_connect():
|
|
# Client will send hello with public_id
|
|
emit("connected", {"server_time_ms": server_time_ms()})
|
|
|
|
|
|
@socketio.on("hello")
|
|
def on_hello(data: Dict[str, Any]):
|
|
public_id = (data or {}).get("public_id")
|
|
if not public_id:
|
|
emit("error", {"error": "missing public_id"})
|
|
return
|
|
|
|
join_room(f"display:{public_id}")
|
|
|
|
disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none()
|
|
if disp is None:
|
|
# Auto-create to be resilient on LAN; admin can rename later.
|
|
disp = Display(name=f"Display {public_id}", public_id=public_id)
|
|
db.session.add(disp)
|
|
|
|
disp.is_online = True
|
|
disp.last_seen = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
_set_live(public_id, sid=request.sid, user_agent=request.headers.get("User-Agent", ""), last_seen=disp.last_seen.isoformat())
|
|
emit("hello_ack", {"server_time_ms": server_time_ms()})
|
|
socketio.emit("admin_display_update", {"public_id": public_id, "is_online": True, "last_seen": disp.last_seen.isoformat()}, room="admin")
|
|
|
|
|
|
@socketio.on("heartbeat")
|
|
def on_heartbeat(data: Dict[str, Any]):
|
|
public_id = (data or {}).get("public_id")
|
|
if not public_id:
|
|
return
|
|
latency_ms = (data or {}).get("latency_ms")
|
|
offset_ms = (data or {}).get("offset_ms")
|
|
ready = bool((data or {}).get("ready"))
|
|
_set_live(public_id, latency_ms=latency_ms, offset_ms=offset_ms, ready=ready, last_seen=datetime.utcnow().isoformat())
|
|
|
|
disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none()
|
|
if disp:
|
|
disp.last_seen = datetime.utcnow()
|
|
disp.is_online = True
|
|
db.session.commit()
|
|
|
|
|
|
@socketio.on("time_sync")
|
|
def on_time_sync(data: Dict[str, Any]):
|
|
# NTP-style time sync:
|
|
# - client sends t1 (client send time)
|
|
# - server records t2 (server receive time)
|
|
# - server replies with t1, t2, t3 (server send time)
|
|
# Client will record t4 on receive.
|
|
t1 = float((data or {}).get("t1_ms", 0.0))
|
|
t2 = server_time_ms()
|
|
# Keep a distinct send timestamp (avoid t2==t3 due to same clock read).
|
|
t3 = server_time_ms()
|
|
emit("time_sync", {"t1_ms": t1, "t2_ms": t2, "t3_ms": t3})
|
|
|
|
|
|
@socketio.on("request_state")
|
|
def on_request_state(data: Dict[str, Any]):
|
|
public_id = (data or {}).get("public_id")
|
|
if not public_id:
|
|
return
|
|
st = get_current_event()
|
|
if not st:
|
|
emit("event_state", {"active": False, "server_time_ms": server_time_ms()})
|
|
return
|
|
video_url = st.assignments.get(public_id)
|
|
if not video_url:
|
|
emit("event_state", {"active": False, "server_time_ms": server_time_ms()})
|
|
return
|
|
emit(
|
|
"event_state",
|
|
{
|
|
"active": True,
|
|
"event_id": st.event_id,
|
|
"event_name": st.name,
|
|
"video_url": video_url,
|
|
"start_time_ms": st.start_time_ms,
|
|
"server_time_ms": server_time_ms(),
|
|
},
|
|
)
|
|
|
|
|
|
@socketio.on("playback_ended")
|
|
def on_playback_ended(data: Dict[str, Any]):
|
|
public_id = (data or {}).get("public_id")
|
|
event_id = (data or {}).get("event_id")
|
|
if not public_id or not event_id:
|
|
return
|
|
try:
|
|
ended_all = mark_trigger_display_ended(int(event_id), str(public_id))
|
|
except Exception:
|
|
return
|
|
if ended_all:
|
|
# Trigger event fully ended; displays will fall back to idle image.
|
|
return
|
|
|
|
|
|
@socketio.on("admin_join")
|
|
def on_admin_join():
|
|
join_room("admin")
|
|
emit("admin_snapshot", {"server_time_ms": server_time_ms(), "live": get_live_snapshot()})
|
|
|
|
|
|
@socketio.on("disconnect")
|
|
def on_disconnect():
|
|
# We don't know which display disconnected unless tracked in _live
|
|
sid = request.sid
|
|
public_id: Optional[str] = None
|
|
for pid, info in list(_live.items()):
|
|
if info.get("sid") == sid:
|
|
public_id = pid
|
|
break
|
|
if not public_id:
|
|
return
|
|
try:
|
|
disp = db.session.query(Display).filter_by(public_id=public_id).one_or_none()
|
|
if disp:
|
|
disp.is_online = False
|
|
disp.last_seen = datetime.utcnow()
|
|
db.session.commit()
|
|
except Exception:
|
|
logger.exception("Failed to mark display offline")
|
|
_set_live(public_id, is_online=False)
|
|
socketio.emit("admin_display_update", {"public_id": public_id, "is_online": False, "last_seen": datetime.utcnow().isoformat()}, room="admin")
|