from __future__ import annotations import threading import time from dataclasses import dataclass, field from typing import Dict, Optional, Set def server_time_ms() -> float: """High-resolution server time in milliseconds.""" return time.time_ns() / 1_000_000.0 @dataclass class ActiveEventState: event_id: int name: str start_time_ms: float end_time_ms: Optional[float] # public_id -> video_url assignments: Dict[str, str] _lock = threading.Lock() _trigger: Optional[ActiveEventState] = None _trigger_ended: Set[str] = set() def set_trigger_event(state: Optional[ActiveEventState]) -> None: global _trigger, _trigger_ended with _lock: _trigger = state _trigger_ended = set() def get_current_event(now_ms: Optional[float] = None) -> Optional[ActiveEventState]: """Returns current active trigger event if any.""" global _trigger with _lock: if now_ms is None: now_ms = server_time_ms() if _trigger is not None and _trigger.end_time_ms is not None and now_ms > _trigger.end_time_ms: _trigger = None _trigger_ended.clear() return _trigger def get_trigger_event(now_ms: Optional[float] = None) -> Optional[ActiveEventState]: global _trigger with _lock: if _trigger is None: return None if now_ms is None: now_ms = server_time_ms() if _trigger.end_time_ms is not None and now_ms > _trigger.end_time_ms: _trigger = None _trigger_ended.clear() return None return _trigger def mark_trigger_display_ended(event_id: int, public_id: str) -> bool: """Marks a display as ended for the current trigger event. Returns True if this completes the event (all displays ended) and the trigger is cleared. """ global _trigger, _trigger_ended with _lock: if _trigger is None or _trigger.event_id != event_id: return False _trigger_ended.add(public_id) if len(_trigger_ended) >= len(_trigger.assignments): _trigger = None _trigger_ended = set() return True return False def clear_trigger_event() -> None: global _trigger, _trigger_ended with _lock: _trigger = None _trigger_ended = set()