Files
2026-02-12 10:50:49 +01:00

499 lines
17 KiB
Python

from __future__ import annotations
import logging
import os
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from flask import (
Blueprint,
Response,
current_app,
flash,
redirect,
render_template,
request,
send_from_directory,
url_for,
)
from models import AppSetting, Display, Event, EventDisplayMap, EventLog, Video, db
from sockets import socketio
from event_manager import activate_trigger_event
from sync import get_current_event, server_time_ms
logger = logging.getLogger(__name__)
bp = Blueprint("main", __name__)
admin_bp = Blueprint("admin", __name__, url_prefix="/admin")
def _get_setting(key: str) -> Optional[str]:
row = db.session.get(AppSetting, key)
return row.value if row else None
def _set_setting(key: str, value: str) -> None:
row = db.session.get(AppSetting, key)
if row is None:
row = AppSetting(key=key, value=value)
else:
row.value = value
db.session.add(row)
db.session.commit()
def _media_dir() -> str:
return current_app.config["MEDIA_DIR"]
@bp.get("/")
def index():
return redirect(url_for("admin.dashboard"))
@bp.get("/display/<public_id>")
def display_page(public_id: str):
# Minimal player page.
idle_image = _get_setting("idle_image")
idle_image_url = url_for("main.media", filename=idle_image) if idle_image else None
return render_template("display.html", public_id=public_id, idle_image_url=idle_image_url)
@bp.get("/media/<path:filename>")
def media(filename: str):
# Efficient static serving (still recommend nginx for very large deployments).
resp = send_from_directory(_media_dir(), filename, conditional=True)
# Kiosk players often struggle with overly strict headers.
resp.headers.pop("Content-Security-Policy", None)
resp.headers["Cache-Control"] = "public, max-age=3600"
return resp
def _probe_duration_seconds(path: str) -> Optional[float]:
"""Try to get media duration via ffprobe if available. Returns None on failure."""
try:
# Requires ffprobe in PATH.
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
path,
]
out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True, timeout=5).strip()
if not out:
return None
return float(out)
except Exception:
return None
def trigger_event(
event_id: int,
trigger_source: str,
source_ip: Optional[str] = None,
*,
force: bool = False,
) -> bool:
"""Trigger an event.
Cooldown is enforced unless force=True.
Returns True if triggered.
"""
ev = db.session.get(Event, event_id)
if not ev:
return False
now = datetime.utcnow()
if (not force) and ev.last_triggered and ev.cooldown_seconds:
if now < ev.last_triggered + timedelta(seconds=ev.cooldown_seconds):
return False
maps = (
db.session.query(EventDisplayMap)
.filter_by(event_id=ev.id)
.join(Display, EventDisplayMap.display_id == Display.id)
.join(Video, EventDisplayMap.video_id == Video.id)
.all()
)
if not maps:
return False
lead_ms = int(current_app.config["EVENT_LEAD_TIME_MS"])
start_ms = server_time_ms() + float(lead_ms)
assignments: Dict[str, str] = {}
max_duration = 0.0
for m in maps:
assignments[m.display.public_id] = url_for("main.media", filename=m.video.filename)
if m.video.duration and m.video.duration > max_duration:
max_duration = float(m.video.duration)
ttl_s = current_app.config["EVENT_FALLBACK_TTL_SECONDS"]
end_ms = start_ms + (max_duration * 1000.0 if max_duration > 0 else ttl_s * 1000.0)
activate_trigger_event(event_id=ev.id, start_ms=start_ms, end_ms=end_ms, name=ev.name)
ev.last_triggered = now
db.session.add(
EventLog(event_id=ev.id, trigger_source=trigger_source, source_ip=source_ip)
)
db.session.commit()
logger.info("Triggered event %s (%s) via %s", ev.id, ev.name, trigger_source)
socketio.emit("admin_event_triggered", {"event_id": ev.id, "event_name": ev.name, "start_time_ms": start_ms}, room="admin")
return True
@admin_bp.get("/")
def dashboard():
displays = db.session.query(Display).order_by(Display.id.asc()).all()
events = db.session.query(Event).order_by(Event.id.desc()).all()
active = get_current_event()
return render_template("admin/dashboard.html", displays=displays, events=events, active=active)
@admin_bp.get("/displays")
def displays_list():
displays = db.session.query(Display).order_by(Display.id.asc()).all()
return render_template("admin/displays.html", displays=displays)
@admin_bp.route("/displays/new", methods=["GET", "POST"])
def display_new():
if request.method == "POST":
name = request.form.get("name", "").strip()
public_id = request.form.get("public_id", "").strip()
if not name or not public_id:
flash("Name and public_id are required", "danger")
return render_template("admin/display_form.html", display=None)
if db.session.query(Display).filter_by(public_id=public_id).first():
flash("public_id already exists", "danger")
return render_template("admin/display_form.html", display=None)
d = Display(name=name, public_id=public_id, is_online=False)
db.session.add(d)
db.session.commit()
return redirect(url_for("admin.displays_list"))
return render_template("admin/display_form.html", display=None)
@admin_bp.route("/displays/<int:display_id>/edit", methods=["GET", "POST"])
def display_edit(display_id: int):
d = db.session.get(Display, display_id)
if not d:
return redirect(url_for("admin.displays_list"))
if request.method == "POST":
d.name = request.form.get("name", d.name).strip()
public_id = request.form.get("public_id", d.public_id).strip()
if public_id != d.public_id and db.session.query(Display).filter_by(public_id=public_id).first():
flash("public_id already exists", "danger")
return render_template("admin/display_form.html", display=d)
d.public_id = public_id
db.session.commit()
return redirect(url_for("admin.displays_list"))
return render_template("admin/display_form.html", display=d)
@admin_bp.post("/displays/<int:display_id>/delete")
def display_delete(display_id: int):
d = db.session.get(Display, display_id)
if d:
db.session.delete(d)
db.session.commit()
return redirect(url_for("admin.displays_list"))
@admin_bp.get("/videos")
def videos_list():
videos = db.session.query(Video).order_by(Video.uploaded_at.desc()).all()
return render_template("admin/videos.html", videos=videos)
@admin_bp.route("/videos/upload", methods=["GET", "POST"])
def videos_upload():
if request.method == "POST":
f = request.files.get("file")
if not f or not f.filename:
flash("No file selected", "danger")
return redirect(url_for("admin.videos_upload"))
filename = os.path.basename(f.filename)
# Basic format check
if not (filename.lower().endswith(".mp4") or filename.lower().endswith(".webm")):
flash("Only MP4/WebM supported", "danger")
return redirect(url_for("admin.videos_upload"))
os.makedirs(_media_dir(), exist_ok=True)
dst = os.path.join(_media_dir(), filename)
if os.path.exists(dst):
flash("A file with that name already exists", "danger")
return redirect(url_for("admin.videos_upload"))
f.save(dst)
dur = _probe_duration_seconds(dst)
v = Video(filename=filename, duration=dur)
db.session.add(v)
db.session.commit()
flash(f"Uploaded {filename}", "success")
return redirect(url_for("admin.videos_list"))
return render_template("admin/videos_upload.html")
@admin_bp.get("/idle-image")
def idle_image_page():
current = _get_setting("idle_image")
current_url = url_for("main.media", filename=current) if current else None
return render_template("admin/idle_image.html", current=current, current_url=current_url)
@admin_bp.post("/idle-image/upload")
def idle_image_upload():
f = request.files.get("file")
if not f or not f.filename:
flash("No file selected", "danger")
return redirect(url_for("admin.idle_image_page"))
filename = os.path.basename(f.filename)
ext = os.path.splitext(filename)[1].lower()
if ext not in (".png", ".jpg", ".jpeg", ".webp"):
flash("Only PNG/JPG/WebP supported", "danger")
return redirect(url_for("admin.idle_image_page"))
os.makedirs(_media_dir(), exist_ok=True)
# Store as a stable name to avoid needing more DB fields.
dst_name = f"idle_image{ext}"
# If we're changing extension, clean up the old file.
prev = _get_setting("idle_image")
if prev and prev != dst_name:
try:
os.remove(os.path.join(_media_dir(), prev))
except OSError:
pass
dst = os.path.join(_media_dir(), dst_name)
try:
f.save(dst)
except Exception:
flash("Failed to save file", "danger")
return redirect(url_for("admin.idle_image_page"))
_set_setting("idle_image", dst_name)
flash(f"Uploaded idle image ({dst_name})", "success")
return redirect(url_for("admin.idle_image_page"))
@admin_bp.post("/idle-image/clear")
def idle_image_clear():
cur = _get_setting("idle_image")
if cur:
# Delete DB setting
row = db.session.get(AppSetting, "idle_image")
if row:
db.session.delete(row)
db.session.commit()
# Best-effort delete file
try:
os.remove(os.path.join(_media_dir(), cur))
except OSError:
pass
flash("Idle image cleared", "success")
return redirect(url_for("admin.idle_image_page"))
@admin_bp.post("/videos/<int:video_id>/delete")
def videos_delete(video_id: int):
v = db.session.get(Video, video_id)
if v:
try:
os.remove(os.path.join(_media_dir(), v.filename))
except OSError:
pass
db.session.delete(v)
db.session.commit()
return redirect(url_for("admin.videos_list"))
@admin_bp.get("/events")
def events_list():
events = db.session.query(Event).order_by(Event.id.desc()).all()
return render_template("admin/events.html", events=events)
@admin_bp.route("/events/new", methods=["GET", "POST"])
def event_new():
displays = db.session.query(Display).order_by(Display.id.asc()).all()
videos = db.session.query(Video).order_by(Video.filename.asc()).all()
if request.method == "POST":
name = request.form.get("name", "").strip()
udp_port = request.form.get("udp_port", "").strip() or None
udp_payload = request.form.get("udp_payload", "").strip() or None
cooldown = int(request.form.get("cooldown_seconds", "2") or 2)
if not name:
flash("Name required", "danger")
return render_template("admin/event_form.html", event=None, displays=displays, videos=videos)
ev = Event(
name=name,
udp_port=int(udp_port) if udp_port else None,
udp_payload=udp_payload,
cooldown_seconds=cooldown,
)
db.session.add(ev)
db.session.flush()
# Mappings: form fields map_<display_id>
for d in displays:
vid = request.form.get(f"map_{d.id}")
if not vid:
continue
db.session.add(EventDisplayMap(event_id=ev.id, display_id=d.id, video_id=int(vid)))
db.session.commit()
return redirect(url_for("admin.events_list"))
return render_template("admin/event_form.html", event=None, displays=displays, videos=videos)
@admin_bp.route("/events/<int:event_id>/edit", methods=["GET", "POST"])
def event_edit(event_id: int):
ev = db.session.get(Event, event_id)
if not ev:
return redirect(url_for("admin.events_list"))
displays = db.session.query(Display).order_by(Display.id.asc()).all()
videos = db.session.query(Video).order_by(Video.filename.asc()).all()
existing = {m.display_id: m.video_id for m in db.session.query(EventDisplayMap).filter_by(event_id=ev.id).all()}
if request.method == "POST":
ev.name = request.form.get("name", ev.name).strip()
udp_port = request.form.get("udp_port", "").strip() or None
ev.udp_port = int(udp_port) if udp_port else None
ev.udp_payload = request.form.get("udp_payload", "").strip() or None
ev.cooldown_seconds = int(request.form.get("cooldown_seconds", str(ev.cooldown_seconds)) or ev.cooldown_seconds)
# Replace mappings
db.session.query(EventDisplayMap).filter_by(event_id=ev.id).delete()
for d in displays:
vid = request.form.get(f"map_{d.id}")
if not vid:
continue
db.session.add(EventDisplayMap(event_id=ev.id, display_id=d.id, video_id=int(vid)))
db.session.commit()
return redirect(url_for("admin.events_list"))
return render_template(
"admin/event_form.html",
event=ev,
displays=displays,
videos=videos,
existing=existing,
)
@admin_bp.post("/events/<int:event_id>/delete")
def event_delete(event_id: int):
ev = db.session.get(Event, event_id)
if ev:
db.session.delete(ev)
db.session.commit()
return redirect(url_for("admin.events_list"))
@admin_bp.post("/events/<int:event_id>/trigger")
def event_trigger(event_id: int):
ok = trigger_event(event_id=event_id, trigger_source="manual", source_ip=request.remote_addr)
if not ok:
flash("Event not triggered (cooldown or missing mappings)", "warning")
else:
flash("Event triggered", "success")
return redirect(url_for("admin.dashboard"))
@bp.get("/trigger/<int:event_id>")
def http_trigger(event_id: int):
"""Simple URL trigger endpoint for LAN integrations.
Example:
http://server:5000/trigger/1
If you want a tiny bit more safety, run behind a LAN firewall or add a shared secret.
"""
force = request.args.get("force") in ("1", "true", "yes")
ev = db.session.get(Event, event_id)
if not ev:
return {"ok": False, "event_id": event_id, "reason": "not_found", "server_time_ms": server_time_ms()}, 404
# Provide a useful reason if not triggered.
if (not force) and ev.last_triggered and ev.cooldown_seconds:
next_allowed = ev.last_triggered + timedelta(seconds=ev.cooldown_seconds)
if datetime.utcnow() < next_allowed:
return {
"ok": False,
"event_id": event_id,
"reason": "cooldown",
"cooldown_seconds": ev.cooldown_seconds,
"server_time_ms": server_time_ms(),
}
ok = trigger_event(event_id=event_id, trigger_source="manual", source_ip=request.remote_addr, force=force)
return {
"ok": bool(ok),
"event_id": event_id,
"forced": bool(force),
"server_time_ms": server_time_ms(),
}
@bp.get("/trigger_by_name/<name>")
def http_trigger_by_name(name: str):
"""Trigger an event by its name. Intended for very simple LAN automation."""
force = request.args.get("force") in ("1", "true", "yes")
ev = db.session.query(Event).filter(Event.name == name).one_or_none()
if not ev:
return {"ok": False, "reason": "not_found", "name": name, "server_time_ms": server_time_ms()}, 404
ok = trigger_event(event_id=ev.id, trigger_source="manual", source_ip=request.remote_addr, force=force)
return {"ok": bool(ok), "event_id": ev.id, "name": ev.name, "forced": bool(force), "server_time_ms": server_time_ms()}
@bp.get("/api/videos")
def api_videos():
"""List all videos for display-side pre-caching."""
vids = db.session.query(Video).order_by(Video.filename.asc()).all()
return {
"videos": [
{
"id": v.id,
"filename": v.filename,
"url": url_for("main.media", filename=v.filename),
"duration": v.duration,
}
for v in vids
]
}
@admin_bp.get("/logs/events")
def event_logs():
logs = db.session.query(EventLog).order_by(EventLog.triggered_at.desc()).limit(200).all()
return render_template("admin/event_logs.html", logs=logs)
@admin_bp.get("/logs/system")
def system_logs():
# Simple log viewer for logs/system.log
log_path = os.path.abspath(os.path.join(current_app.root_path, "..", "logs", "system.log"))
lines: List[str] = []
try:
with open(log_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()[-400:]
except OSError:
lines = ["(no logs yet)"]
return render_template("admin/system_logs.html", log_path=log_path, lines=lines)