from __future__ import annotations import logging import os import subprocess from datetime import datetime, timedelta from typing import Dict, List, Optional from flask import ( Blueprint, Response, current_app, flash, redirect, render_template, request, send_from_directory, url_for, ) from models import AppSetting, Display, Event, EventDisplayMap, EventLog, Video, db from sockets import socketio from event_manager import activate_trigger_event from sync import get_current_event, server_time_ms logger = logging.getLogger(__name__) bp = Blueprint("main", __name__) admin_bp = Blueprint("admin", __name__, url_prefix="/admin") def _get_setting(key: str) -> Optional[str]: row = db.session.get(AppSetting, key) return row.value if row else None def _set_setting(key: str, value: str) -> None: row = db.session.get(AppSetting, key) if row is None: row = AppSetting(key=key, value=value) else: row.value = value db.session.add(row) db.session.commit() def _media_dir() -> str: return current_app.config["MEDIA_DIR"] @bp.get("/") def index(): return redirect(url_for("admin.dashboard")) @bp.get("/display/") def display_page(public_id: str): # Minimal player page. idle_image = _get_setting("idle_image") idle_image_url = url_for("main.media", filename=idle_image) if idle_image else None return render_template("display.html", public_id=public_id, idle_image_url=idle_image_url) @bp.get("/media/") def media(filename: str): # Efficient static serving (still recommend nginx for very large deployments). resp = send_from_directory(_media_dir(), filename, conditional=True) # Kiosk players often struggle with overly strict headers. resp.headers.pop("Content-Security-Policy", None) resp.headers["Cache-Control"] = "public, max-age=3600" return resp def _probe_duration_seconds(path: str) -> Optional[float]: """Try to get media duration via ffprobe if available. Returns None on failure.""" try: # Requires ffprobe in PATH. cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ] out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True, timeout=5).strip() if not out: return None return float(out) except Exception: return None def trigger_event( event_id: int, trigger_source: str, source_ip: Optional[str] = None, *, force: bool = False, ) -> bool: """Trigger an event. Cooldown is enforced unless force=True. Returns True if triggered. """ ev = db.session.get(Event, event_id) if not ev: return False now = datetime.utcnow() if (not force) and ev.last_triggered and ev.cooldown_seconds: if now < ev.last_triggered + timedelta(seconds=ev.cooldown_seconds): return False maps = ( db.session.query(EventDisplayMap) .filter_by(event_id=ev.id) .join(Display, EventDisplayMap.display_id == Display.id) .join(Video, EventDisplayMap.video_id == Video.id) .all() ) if not maps: return False lead_ms = int(current_app.config["EVENT_LEAD_TIME_MS"]) start_ms = server_time_ms() + float(lead_ms) assignments: Dict[str, str] = {} max_duration = 0.0 for m in maps: assignments[m.display.public_id] = url_for("main.media", filename=m.video.filename) if m.video.duration and m.video.duration > max_duration: max_duration = float(m.video.duration) ttl_s = current_app.config["EVENT_FALLBACK_TTL_SECONDS"] end_ms = start_ms + (max_duration * 1000.0 if max_duration > 0 else ttl_s * 1000.0) activate_trigger_event(event_id=ev.id, start_ms=start_ms, end_ms=end_ms, name=ev.name) ev.last_triggered = now db.session.add( EventLog(event_id=ev.id, trigger_source=trigger_source, source_ip=source_ip) ) db.session.commit() logger.info("Triggered event %s (%s) via %s", ev.id, ev.name, trigger_source) socketio.emit("admin_event_triggered", {"event_id": ev.id, "event_name": ev.name, "start_time_ms": start_ms}, room="admin") return True @admin_bp.get("/") def dashboard(): displays = db.session.query(Display).order_by(Display.id.asc()).all() events = db.session.query(Event).order_by(Event.id.desc()).all() active = get_current_event() return render_template("admin/dashboard.html", displays=displays, events=events, active=active) @admin_bp.get("/displays") def displays_list(): displays = db.session.query(Display).order_by(Display.id.asc()).all() return render_template("admin/displays.html", displays=displays) @admin_bp.route("/displays/new", methods=["GET", "POST"]) def display_new(): if request.method == "POST": name = request.form.get("name", "").strip() public_id = request.form.get("public_id", "").strip() if not name or not public_id: flash("Name and public_id are required", "danger") return render_template("admin/display_form.html", display=None) if db.session.query(Display).filter_by(public_id=public_id).first(): flash("public_id already exists", "danger") return render_template("admin/display_form.html", display=None) d = Display(name=name, public_id=public_id, is_online=False) db.session.add(d) db.session.commit() return redirect(url_for("admin.displays_list")) return render_template("admin/display_form.html", display=None) @admin_bp.route("/displays//edit", methods=["GET", "POST"]) def display_edit(display_id: int): d = db.session.get(Display, display_id) if not d: return redirect(url_for("admin.displays_list")) if request.method == "POST": d.name = request.form.get("name", d.name).strip() public_id = request.form.get("public_id", d.public_id).strip() if public_id != d.public_id and db.session.query(Display).filter_by(public_id=public_id).first(): flash("public_id already exists", "danger") return render_template("admin/display_form.html", display=d) d.public_id = public_id db.session.commit() return redirect(url_for("admin.displays_list")) return render_template("admin/display_form.html", display=d) @admin_bp.post("/displays//delete") def display_delete(display_id: int): d = db.session.get(Display, display_id) if d: db.session.delete(d) db.session.commit() return redirect(url_for("admin.displays_list")) @admin_bp.get("/videos") def videos_list(): videos = db.session.query(Video).order_by(Video.uploaded_at.desc()).all() return render_template("admin/videos.html", videos=videos) @admin_bp.route("/videos/upload", methods=["GET", "POST"]) def videos_upload(): if request.method == "POST": f = request.files.get("file") if not f or not f.filename: flash("No file selected", "danger") return redirect(url_for("admin.videos_upload")) filename = os.path.basename(f.filename) # Basic format check if not (filename.lower().endswith(".mp4") or filename.lower().endswith(".webm")): flash("Only MP4/WebM supported", "danger") return redirect(url_for("admin.videos_upload")) os.makedirs(_media_dir(), exist_ok=True) dst = os.path.join(_media_dir(), filename) if os.path.exists(dst): flash("A file with that name already exists", "danger") return redirect(url_for("admin.videos_upload")) f.save(dst) dur = _probe_duration_seconds(dst) v = Video(filename=filename, duration=dur) db.session.add(v) db.session.commit() flash(f"Uploaded {filename}", "success") return redirect(url_for("admin.videos_list")) return render_template("admin/videos_upload.html") @admin_bp.get("/idle-image") def idle_image_page(): current = _get_setting("idle_image") current_url = url_for("main.media", filename=current) if current else None return render_template("admin/idle_image.html", current=current, current_url=current_url) @admin_bp.post("/idle-image/upload") def idle_image_upload(): f = request.files.get("file") if not f or not f.filename: flash("No file selected", "danger") return redirect(url_for("admin.idle_image_page")) filename = os.path.basename(f.filename) ext = os.path.splitext(filename)[1].lower() if ext not in (".png", ".jpg", ".jpeg", ".webp"): flash("Only PNG/JPG/WebP supported", "danger") return redirect(url_for("admin.idle_image_page")) os.makedirs(_media_dir(), exist_ok=True) # Store as a stable name to avoid needing more DB fields. dst_name = f"idle_image{ext}" # If we're changing extension, clean up the old file. prev = _get_setting("idle_image") if prev and prev != dst_name: try: os.remove(os.path.join(_media_dir(), prev)) except OSError: pass dst = os.path.join(_media_dir(), dst_name) try: f.save(dst) except Exception: flash("Failed to save file", "danger") return redirect(url_for("admin.idle_image_page")) _set_setting("idle_image", dst_name) flash(f"Uploaded idle image ({dst_name})", "success") return redirect(url_for("admin.idle_image_page")) @admin_bp.post("/idle-image/clear") def idle_image_clear(): cur = _get_setting("idle_image") if cur: # Delete DB setting row = db.session.get(AppSetting, "idle_image") if row: db.session.delete(row) db.session.commit() # Best-effort delete file try: os.remove(os.path.join(_media_dir(), cur)) except OSError: pass flash("Idle image cleared", "success") return redirect(url_for("admin.idle_image_page")) @admin_bp.post("/videos//delete") def videos_delete(video_id: int): v = db.session.get(Video, video_id) if v: try: os.remove(os.path.join(_media_dir(), v.filename)) except OSError: pass db.session.delete(v) db.session.commit() return redirect(url_for("admin.videos_list")) @admin_bp.get("/events") def events_list(): events = db.session.query(Event).order_by(Event.id.desc()).all() return render_template("admin/events.html", events=events) @admin_bp.route("/events/new", methods=["GET", "POST"]) def event_new(): displays = db.session.query(Display).order_by(Display.id.asc()).all() videos = db.session.query(Video).order_by(Video.filename.asc()).all() if request.method == "POST": name = request.form.get("name", "").strip() udp_port = request.form.get("udp_port", "").strip() or None udp_payload = request.form.get("udp_payload", "").strip() or None cooldown = int(request.form.get("cooldown_seconds", "2") or 2) if not name: flash("Name required", "danger") return render_template("admin/event_form.html", event=None, displays=displays, videos=videos) ev = Event( name=name, udp_port=int(udp_port) if udp_port else None, udp_payload=udp_payload, cooldown_seconds=cooldown, ) db.session.add(ev) db.session.flush() # Mappings: form fields map_ for d in displays: vid = request.form.get(f"map_{d.id}") if not vid: continue db.session.add(EventDisplayMap(event_id=ev.id, display_id=d.id, video_id=int(vid))) db.session.commit() return redirect(url_for("admin.events_list")) return render_template("admin/event_form.html", event=None, displays=displays, videos=videos) @admin_bp.route("/events//edit", methods=["GET", "POST"]) def event_edit(event_id: int): ev = db.session.get(Event, event_id) if not ev: return redirect(url_for("admin.events_list")) displays = db.session.query(Display).order_by(Display.id.asc()).all() videos = db.session.query(Video).order_by(Video.filename.asc()).all() existing = {m.display_id: m.video_id for m in db.session.query(EventDisplayMap).filter_by(event_id=ev.id).all()} if request.method == "POST": ev.name = request.form.get("name", ev.name).strip() udp_port = request.form.get("udp_port", "").strip() or None ev.udp_port = int(udp_port) if udp_port else None ev.udp_payload = request.form.get("udp_payload", "").strip() or None ev.cooldown_seconds = int(request.form.get("cooldown_seconds", str(ev.cooldown_seconds)) or ev.cooldown_seconds) # Replace mappings db.session.query(EventDisplayMap).filter_by(event_id=ev.id).delete() for d in displays: vid = request.form.get(f"map_{d.id}") if not vid: continue db.session.add(EventDisplayMap(event_id=ev.id, display_id=d.id, video_id=int(vid))) db.session.commit() return redirect(url_for("admin.events_list")) return render_template( "admin/event_form.html", event=ev, displays=displays, videos=videos, existing=existing, ) @admin_bp.post("/events//delete") def event_delete(event_id: int): ev = db.session.get(Event, event_id) if ev: db.session.delete(ev) db.session.commit() return redirect(url_for("admin.events_list")) @admin_bp.post("/events//trigger") def event_trigger(event_id: int): ok = trigger_event(event_id=event_id, trigger_source="manual", source_ip=request.remote_addr) if not ok: flash("Event not triggered (cooldown or missing mappings)", "warning") else: flash("Event triggered", "success") return redirect(url_for("admin.dashboard")) @bp.get("/trigger/") def http_trigger(event_id: int): """Simple URL trigger endpoint for LAN integrations. Example: http://server:5000/trigger/1 If you want a tiny bit more safety, run behind a LAN firewall or add a shared secret. """ force = request.args.get("force") in ("1", "true", "yes") ev = db.session.get(Event, event_id) if not ev: return {"ok": False, "event_id": event_id, "reason": "not_found", "server_time_ms": server_time_ms()}, 404 # Provide a useful reason if not triggered. if (not force) and ev.last_triggered and ev.cooldown_seconds: next_allowed = ev.last_triggered + timedelta(seconds=ev.cooldown_seconds) if datetime.utcnow() < next_allowed: return { "ok": False, "event_id": event_id, "reason": "cooldown", "cooldown_seconds": ev.cooldown_seconds, "server_time_ms": server_time_ms(), } ok = trigger_event(event_id=event_id, trigger_source="manual", source_ip=request.remote_addr, force=force) return { "ok": bool(ok), "event_id": event_id, "forced": bool(force), "server_time_ms": server_time_ms(), } @bp.get("/trigger_by_name/") def http_trigger_by_name(name: str): """Trigger an event by its name. Intended for very simple LAN automation.""" force = request.args.get("force") in ("1", "true", "yes") ev = db.session.query(Event).filter(Event.name == name).one_or_none() if not ev: return {"ok": False, "reason": "not_found", "name": name, "server_time_ms": server_time_ms()}, 404 ok = trigger_event(event_id=ev.id, trigger_source="manual", source_ip=request.remote_addr, force=force) return {"ok": bool(ok), "event_id": ev.id, "name": ev.name, "forced": bool(force), "server_time_ms": server_time_ms()} @bp.get("/api/videos") def api_videos(): """List all videos for display-side pre-caching.""" vids = db.session.query(Video).order_by(Video.filename.asc()).all() return { "videos": [ { "id": v.id, "filename": v.filename, "url": url_for("main.media", filename=v.filename), "duration": v.duration, } for v in vids ] } @admin_bp.get("/logs/events") def event_logs(): logs = db.session.query(EventLog).order_by(EventLog.triggered_at.desc()).limit(200).all() return render_template("admin/event_logs.html", logs=logs) @admin_bp.get("/logs/system") def system_logs(): # Simple log viewer for logs/system.log log_path = os.path.abspath(os.path.join(current_app.root_path, "..", "logs", "system.log")) lines: List[str] = [] try: with open(log_path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines()[-400:] except OSError: lines = ["(no logs yet)"] return render_template("admin/system_logs.html", log_path=log_path, lines=lines)