1338 lines
46 KiB
Python
1338 lines
46 KiB
Python
import os
|
|
import uuid
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from flask import Blueprint, abort, current_app, flash, jsonify, redirect, render_template, request, url_for
|
|
from flask_login import current_user, login_required
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from PIL import Image, ImageOps
|
|
|
|
from ..extensions import db
|
|
from ..uploads import (
|
|
abs_upload_path,
|
|
compute_storage_usage,
|
|
ensure_company_upload_dir,
|
|
get_company_upload_bytes,
|
|
is_valid_upload_relpath,
|
|
)
|
|
from ..models import AppSettings, Company, Display, DisplayPlaylist, DisplaySession, Playlist, PlaylistItem, User
|
|
from ..email_utils import send_email
|
|
from ..auth_tokens import make_password_reset_token
|
|
|
|
|
|
ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".tiff"}
|
|
ALLOWED_VIDEO_EXTENSIONS = {".mp4", ".webm", ".ogg", ".mov", ".m4v"}
|
|
|
|
# Overlay is a transparent PNG that sits on top of a display.
|
|
ALLOWED_OVERLAY_EXTENSIONS = {".png"}
|
|
|
|
# Keep overlay size reasonable; it will be stretched to fit anyway.
|
|
# (PNG overlays are typically small-ish; 10MB is generous.)
|
|
MAX_OVERLAY_BYTES = 10 * 1024 * 1024
|
|
|
|
# Videos should have a maximum upload size of 250MB
|
|
MAX_VIDEO_BYTES = 250 * 1024 * 1024
|
|
|
|
|
|
def _normalize_youtube_embed_url(raw: str) -> str | None:
|
|
"""Normalize a user-provided YouTube URL into a privacy-friendly embed base URL.
|
|
|
|
Returns:
|
|
https://www.youtube-nocookie.com/embed/<VIDEO_ID>
|
|
or None if we cannot parse a valid video id.
|
|
"""
|
|
|
|
val = (raw or "").strip()
|
|
if not val:
|
|
return None
|
|
|
|
# Be forgiving for inputs like "youtu.be/<id>".
|
|
if not val.startswith("http://") and not val.startswith("https://"):
|
|
val = "https://" + val
|
|
|
|
try:
|
|
u = urlparse(val)
|
|
except Exception:
|
|
return None
|
|
|
|
host = (u.netloc or "").lower()
|
|
host = host[4:] if host.startswith("www.") else host
|
|
|
|
video_id: str | None = None
|
|
path = (u.path or "").strip("/")
|
|
|
|
if host in {"youtube.com", "m.youtube.com"}:
|
|
# /watch?v=<id>
|
|
if path == "watch":
|
|
v = (parse_qs(u.query).get("v") or [None])[0]
|
|
video_id = v
|
|
# /embed/<id>
|
|
elif path.startswith("embed/"):
|
|
video_id = path.split("/", 1)[1]
|
|
# /shorts/<id>
|
|
elif path.startswith("shorts/"):
|
|
video_id = path.split("/", 1)[1]
|
|
elif host == "youtu.be":
|
|
# /<id>
|
|
if path:
|
|
video_id = path.split("/", 1)[0]
|
|
|
|
# Basic validation: YouTube IDs are typically 11 chars (letters/digits/_/-)
|
|
if not video_id:
|
|
return None
|
|
video_id = video_id.strip()
|
|
if len(video_id) != 11:
|
|
return None
|
|
for ch in video_id:
|
|
if not (ch.isalnum() or ch in {"_", "-"}):
|
|
return None
|
|
|
|
return f"https://www.youtube-nocookie.com/embed/{video_id}"
|
|
|
|
|
|
def _center_crop_to_aspect(img: Image.Image, aspect_w: int, aspect_h: int) -> Image.Image:
|
|
"""Return a center-cropped copy of img to the desired aspect ratio."""
|
|
|
|
w, h = img.size
|
|
if w <= 0 or h <= 0:
|
|
return img
|
|
|
|
target = aspect_w / aspect_h
|
|
current = w / h
|
|
|
|
# If image is wider than target: crop width; else crop height.
|
|
if current > target:
|
|
new_w = max(1, int(h * target))
|
|
left = max(0, (w - new_w) // 2)
|
|
return img.crop((left, 0, left + new_w, h))
|
|
else:
|
|
new_h = max(1, int(w / target))
|
|
top = max(0, (h - new_h) // 2)
|
|
return img.crop((0, top, w, top + new_h))
|
|
|
|
|
|
def _save_compressed_image(
|
|
uploaded_file,
|
|
upload_root: str,
|
|
company_id: int | None,
|
|
crop_mode: str | None = None,
|
|
) -> str:
|
|
"""Save an uploaded image as a compressed WEBP file.
|
|
|
|
crop_mode:
|
|
- "16:9" : center-crop to landscape
|
|
- "9:16" : center-crop to portrait
|
|
- "none" : no crop
|
|
|
|
Returns relative file path under /static (e.g. uploads/<uuid>.webp)
|
|
"""
|
|
|
|
unique = f"{uuid.uuid4().hex}.webp"
|
|
company_dir = ensure_company_upload_dir(upload_root, company_id)
|
|
save_path = os.path.join(company_dir, unique)
|
|
|
|
cm = (crop_mode or "16:9").strip().lower()
|
|
if cm not in {"16:9", "9:16", "none"}:
|
|
cm = "16:9"
|
|
|
|
img = Image.open(uploaded_file)
|
|
# Respect EXIF orientation (common for phone photos)
|
|
img = ImageOps.exif_transpose(img)
|
|
|
|
# Normalize mode for webp
|
|
if img.mode not in ("RGB", "RGBA"):
|
|
img = img.convert("RGB")
|
|
|
|
# Optional crop
|
|
if cm == "16:9":
|
|
img = _center_crop_to_aspect(img, 16, 9)
|
|
max_box = (1920, 1080)
|
|
elif cm == "9:16":
|
|
img = _center_crop_to_aspect(img, 9, 16)
|
|
max_box = (1080, 1920)
|
|
else:
|
|
# No crop: allow both portrait and landscape up to 1920px on the longest side.
|
|
max_box = (1920, 1920)
|
|
|
|
# Resize down if very large (keeps aspect ratio)
|
|
img.thumbnail(max_box)
|
|
|
|
img.save(save_path, format="WEBP", quality=80, method=6)
|
|
company_seg = str(int(company_id)) if company_id is not None else "0"
|
|
return f"uploads/{company_seg}/{unique}"
|
|
|
|
|
|
def _try_delete_upload(file_path: str | None, upload_root: str):
|
|
"""Best-effort delete of an uploaded media file."""
|
|
if not file_path:
|
|
return
|
|
if not is_valid_upload_relpath(file_path):
|
|
return
|
|
|
|
abs_path = abs_upload_path(upload_root, file_path)
|
|
if not abs_path:
|
|
return
|
|
try:
|
|
if os.path.isfile(abs_path):
|
|
os.remove(abs_path)
|
|
except Exception:
|
|
# Ignore cleanup failures
|
|
pass
|
|
|
|
|
|
def _save_overlay_png(
|
|
uploaded_file,
|
|
upload_root: str,
|
|
company_id: int | None,
|
|
) -> str:
|
|
"""Save a company overlay as PNG under the company's upload dir.
|
|
|
|
Returns relative file path under /static (uploads/<company_id>/overlay_<uuid>.png)
|
|
"""
|
|
|
|
unique = f"overlay_{uuid.uuid4().hex}.png"
|
|
company_dir = ensure_company_upload_dir(upload_root, company_id)
|
|
save_path = os.path.join(company_dir, unique)
|
|
|
|
# Validate file is a PNG and is 16:9-ish.
|
|
# Use magic bytes (signature) instead of relying on Pillow's img.format,
|
|
# which can be unreliable if the stream position isn't at 0.
|
|
try:
|
|
if hasattr(uploaded_file, "stream"):
|
|
uploaded_file.stream.seek(0)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
sig = uploaded_file.stream.read(8) if hasattr(uploaded_file, "stream") else uploaded_file.read(8)
|
|
except Exception:
|
|
sig = b""
|
|
|
|
# PNG file signature: 89 50 4E 47 0D 0A 1A 0A
|
|
if sig != b"\x89PNG\r\n\x1a\n":
|
|
raise ValueError("not_png")
|
|
|
|
# Rewind before Pillow parses.
|
|
try:
|
|
if hasattr(uploaded_file, "stream"):
|
|
uploaded_file.stream.seek(0)
|
|
except Exception:
|
|
pass
|
|
|
|
img = Image.open(uploaded_file)
|
|
img = ImageOps.exif_transpose(img)
|
|
|
|
w, h = img.size
|
|
if not w or not h:
|
|
raise ValueError("invalid")
|
|
|
|
# Allow some tolerance (overlays may include extra transparent padding).
|
|
aspect = w / h
|
|
target = 16 / 9
|
|
if abs(aspect - target) > 0.15: # ~15% tolerance
|
|
raise ValueError("not_16_9")
|
|
|
|
# Ensure we preserve alpha; normalize mode.
|
|
if img.mode not in ("RGBA", "LA"):
|
|
# Convert to RGBA so transparency is supported consistently.
|
|
img = img.convert("RGBA")
|
|
|
|
img.save(save_path, format="PNG", optimize=True)
|
|
company_seg = str(int(company_id)) if company_id is not None else "0"
|
|
return f"uploads/{company_seg}/{unique}"
|
|
|
|
bp = Blueprint("company", __name__, url_prefix="/company")
|
|
|
|
|
|
def _parse_schedule_local_to_utc(*, date_str: str | None, time_str: str | None) -> datetime | None:
|
|
"""Parse local date+time form inputs into a naive UTC datetime.
|
|
|
|
Inputs come from <input type="date"> and <input type="time">.
|
|
We interpret them as *local* time of the server.
|
|
|
|
Note: this project currently does not store per-company timezone; in most deployments
|
|
server timezone matches users. If you need per-company timezone later, we can extend
|
|
this function.
|
|
"""
|
|
|
|
d = (date_str or "").strip()
|
|
t = (time_str or "").strip()
|
|
if not d and not t:
|
|
return None
|
|
if not d or not t:
|
|
# Require both parts for clarity
|
|
raise ValueError("Both date and time are required")
|
|
|
|
# Basic parsing: YYYY-MM-DD and HH:MM
|
|
try:
|
|
year, month, day = [int(x) for x in d.split("-")]
|
|
hh, mm = [int(x) for x in t.split(":")[:2]]
|
|
except Exception:
|
|
raise ValueError("Invalid date/time")
|
|
|
|
# Interpret as local time, convert to UTC naive
|
|
local_dt = datetime(year, month, day, hh, mm)
|
|
# local_dt.timestamp() uses local timezone when naive.
|
|
utc_ts = local_dt.timestamp()
|
|
return datetime.utcfromtimestamp(utc_ts)
|
|
|
|
|
|
def company_user_required():
|
|
if not current_user.is_authenticated:
|
|
abort(403)
|
|
if current_user.is_admin:
|
|
abort(403)
|
|
if not current_user.company_id:
|
|
abort(403)
|
|
|
|
|
|
def _format_bytes(num: int) -> str:
|
|
num = max(0, int(num or 0))
|
|
units = ["B", "KB", "MB", "GB", "TB"]
|
|
size = float(num)
|
|
idx = 0
|
|
while size >= 1024.0 and idx < len(units) - 1:
|
|
size /= 1024.0
|
|
idx += 1
|
|
if idx == 0:
|
|
return f"{int(size)} {units[idx]}"
|
|
return f"{size:.1f} {units[idx]}"
|
|
|
|
|
|
def _storage_limit_error_message(*, storage_max_human: str | None) -> str:
|
|
if storage_max_human:
|
|
return f"Storage limit reached. Maximum allowed storage is {storage_max_human}. Please delete items to free space."
|
|
return "Storage limit reached. Please delete items to free space."
|
|
|
|
|
|
@bp.get("/my-company")
|
|
@login_required
|
|
def my_company():
|
|
company_user_required()
|
|
|
|
company = db.session.get(Company, current_user.company_id)
|
|
if not company:
|
|
abort(404)
|
|
|
|
# Stats
|
|
display_count = Display.query.filter_by(company_id=company.id).count()
|
|
playlist_count = Playlist.query.filter_by(company_id=company.id).count()
|
|
user_count = User.query.filter_by(company_id=company.id, is_admin=False).count()
|
|
|
|
item_count = (
|
|
PlaylistItem.query.join(Playlist, PlaylistItem.playlist_id == Playlist.id)
|
|
.filter(Playlist.company_id == company.id)
|
|
.count()
|
|
)
|
|
|
|
# Active display sessions (best-effort, based on same TTL as /api)
|
|
cutoff = datetime.utcnow() - timedelta(seconds=90)
|
|
active_sessions = (
|
|
DisplaySession.query.join(Display, DisplaySession.display_id == Display.id)
|
|
.filter(Display.company_id == company.id, DisplaySession.last_seen_at >= cutoff)
|
|
.count()
|
|
)
|
|
|
|
# Storage usage
|
|
upload_root = current_app.config["UPLOAD_FOLDER"]
|
|
used_bytes = get_company_upload_bytes(upload_root, company.id)
|
|
|
|
usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes)
|
|
max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None
|
|
|
|
users = User.query.filter_by(company_id=company.id, is_admin=False).order_by(User.email.asc()).all()
|
|
|
|
overlay_url = None
|
|
if company.overlay_file_path and is_valid_upload_relpath(company.overlay_file_path):
|
|
overlay_url = url_for("static", filename=company.overlay_file_path)
|
|
|
|
return render_template(
|
|
"company/my_company.html",
|
|
company=company,
|
|
users=users,
|
|
overlay_url=overlay_url,
|
|
stats={
|
|
"users": user_count,
|
|
"displays": display_count,
|
|
"playlists": playlist_count,
|
|
"items": item_count,
|
|
"active_sessions": active_sessions,
|
|
"storage_bytes": used_bytes,
|
|
"storage_human": _format_bytes(used_bytes),
|
|
"storage_max_bytes": usage.get("max_bytes"),
|
|
"storage_max_human": max_human,
|
|
"storage_used_percent": usage.get("used_percent"),
|
|
},
|
|
)
|
|
|
|
|
|
@bp.post("/my-company/overlay")
|
|
@login_required
|
|
def upload_company_overlay():
|
|
"""Upload/replace the per-company 16:9 PNG overlay."""
|
|
|
|
company_user_required()
|
|
|
|
company = db.session.get(Company, current_user.company_id)
|
|
if not company:
|
|
abort(404)
|
|
|
|
f = request.files.get("overlay")
|
|
if not f or not f.filename:
|
|
flash("Overlay file is required", "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
filename = secure_filename(f.filename)
|
|
ext = os.path.splitext(filename)[1].lower()
|
|
if ext not in ALLOWED_OVERLAY_EXTENSIONS:
|
|
flash("Unsupported overlay type. Please upload a PNG file.", "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
# Enforce size limit best-effort.
|
|
size = None
|
|
try:
|
|
size = getattr(f, "content_length", None)
|
|
if (size is None or size <= 0) and hasattr(f, "stream"):
|
|
pos = f.stream.tell()
|
|
f.stream.seek(0, os.SEEK_END)
|
|
size = f.stream.tell()
|
|
f.stream.seek(pos, os.SEEK_SET)
|
|
except Exception:
|
|
size = None
|
|
if size is not None and size > MAX_OVERLAY_BYTES:
|
|
flash("Overlay file too large. Maximum allowed size is 10MB.", "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
# Enforce storage quota too (overlay is stored in the same uploads folder).
|
|
upload_root = current_app.config["UPLOAD_FOLDER"]
|
|
used_bytes = get_company_upload_bytes(upload_root, company.id)
|
|
usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes)
|
|
storage_max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None
|
|
if usage.get("is_exceeded"):
|
|
flash(_storage_limit_error_message(storage_max_human=storage_max_human), "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
old_path = company.overlay_file_path
|
|
try:
|
|
new_relpath = _save_overlay_png(f, upload_root, company.id)
|
|
except ValueError as e:
|
|
code = str(e)
|
|
if code == "not_png":
|
|
flash("Overlay must be a PNG file.", "danger")
|
|
elif code == "not_16_9":
|
|
flash("Overlay should be 16:9 (landscape).", "danger")
|
|
else:
|
|
flash("Failed to process overlay upload.", "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
except Exception:
|
|
flash("Failed to process overlay upload.", "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
# Post-save quota check (like images) because PNG size is unknown until saved.
|
|
if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0:
|
|
try:
|
|
used_after = get_company_upload_bytes(upload_root, company.id)
|
|
except Exception:
|
|
used_after = None
|
|
if used_after is not None:
|
|
usage_after = compute_storage_usage(used_bytes=used_after, max_bytes=company.storage_max_bytes)
|
|
if usage_after.get("is_exceeded"):
|
|
_try_delete_upload(new_relpath, upload_root)
|
|
flash(_storage_limit_error_message(storage_max_human=storage_max_human), "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
company.overlay_file_path = new_relpath
|
|
db.session.commit()
|
|
|
|
# Clean up the old overlay file.
|
|
if old_path and old_path != new_relpath:
|
|
_try_delete_upload(old_path, upload_root)
|
|
|
|
flash("Overlay updated.", "success")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
|
|
@bp.post("/my-company/overlay/delete")
|
|
@login_required
|
|
def delete_company_overlay():
|
|
company_user_required()
|
|
|
|
company = db.session.get(Company, current_user.company_id)
|
|
if not company:
|
|
abort(404)
|
|
|
|
upload_root = current_app.config["UPLOAD_FOLDER"]
|
|
old_path = company.overlay_file_path
|
|
company.overlay_file_path = None
|
|
db.session.commit()
|
|
|
|
_try_delete_upload(old_path, upload_root)
|
|
flash("Overlay removed.", "success")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
|
|
@bp.post("/my-company/invite")
|
|
@login_required
|
|
def invite_user():
|
|
company_user_required()
|
|
|
|
email = (request.form.get("email", "") or "").strip().lower()
|
|
if not email:
|
|
flash("Email is required", "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
if User.query.filter_by(email=email).first():
|
|
flash("Email already exists", "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
company = db.session.get(Company, current_user.company_id)
|
|
if not company:
|
|
abort(404)
|
|
|
|
# Create user without password; they must set it via reset link.
|
|
u = User(is_admin=False, company=company)
|
|
u.email = email
|
|
u.username = email # keep backwards-compatible username column in sync
|
|
u.password_hash = None
|
|
db.session.add(u)
|
|
db.session.commit()
|
|
|
|
token = make_password_reset_token(secret_key=current_app.config["SECRET_KEY"], user_id=u.id)
|
|
|
|
settings = db.session.get(AppSettings, 1)
|
|
if settings and settings.public_domain:
|
|
path = url_for("auth.reset_password", token=token, _external=False)
|
|
reset_url = f"https://{settings.public_domain}{path}"
|
|
else:
|
|
reset_url = url_for("auth.reset_password", token=token, _external=True)
|
|
body = (
|
|
f"You have been invited to {company.name} on Signage.\n\n"
|
|
"Set your password using this link (valid for 30 minutes):\n"
|
|
f"{reset_url}\n"
|
|
)
|
|
try:
|
|
send_email(to_email=u.email, subject=f"Invite: {company.name} (set your password)", body_text=body)
|
|
except Exception:
|
|
# Roll back created user if we cannot send invite email, to avoid orphan accounts.
|
|
db.session.delete(u)
|
|
db.session.commit()
|
|
flash(
|
|
"Failed to send invite email. Please check SMTP configuration (SMTP_* env vars).",
|
|
"danger",
|
|
)
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
flash(f"Invite sent to {email}", "success")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
|
|
@bp.post("/my-company/users/<int:user_id>/delete")
|
|
@login_required
|
|
def delete_company_user(user_id: int):
|
|
company_user_required()
|
|
|
|
if int(user_id) == int(current_user.id):
|
|
flash("You cannot delete yourself", "danger")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
u = db.session.get(User, user_id)
|
|
if not u or u.is_admin or u.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
email = u.email
|
|
db.session.delete(u)
|
|
db.session.commit()
|
|
flash(f"User '{email}' deleted", "success")
|
|
return redirect(url_for("company.my_company"))
|
|
|
|
|
|
@bp.get("/")
|
|
@login_required
|
|
def dashboard():
|
|
company_user_required()
|
|
playlists = Playlist.query.filter_by(company_id=current_user.company_id).order_by(Playlist.name.asc()).all()
|
|
displays = Display.query.filter_by(company_id=current_user.company_id).order_by(Display.name.asc()).all()
|
|
playlists_json = [{"id": p.id, "name": p.name} for p in playlists]
|
|
return render_template(
|
|
"company/dashboard.html",
|
|
playlists=playlists,
|
|
now_utc=datetime.utcnow(),
|
|
playlists_json=playlists_json,
|
|
displays=displays,
|
|
)
|
|
|
|
|
|
@bp.post("/playlists")
|
|
@login_required
|
|
def create_playlist():
|
|
company_user_required()
|
|
name = request.form.get("name", "").strip()
|
|
if not name:
|
|
flash("Playlist name required", "danger")
|
|
return redirect(url_for("company.dashboard"))
|
|
p = Playlist(company_id=current_user.company_id, name=name)
|
|
db.session.add(p)
|
|
db.session.commit()
|
|
flash("Playlist created", "success")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=p.id))
|
|
|
|
|
|
@bp.get("/playlists/<int:playlist_id>")
|
|
@login_required
|
|
def playlist_detail(playlist_id: int):
|
|
company_user_required()
|
|
playlist = db.session.get(Playlist, playlist_id)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
return render_template("company/playlist_detail.html", playlist=playlist, now_utc=datetime.utcnow())
|
|
|
|
|
|
@bp.post("/playlists/<int:playlist_id>")
|
|
@login_required
|
|
def update_playlist(playlist_id: int):
|
|
"""Update playlist metadata.
|
|
|
|
Currently supports renaming the playlist from the playlist detail (edit) page.
|
|
"""
|
|
|
|
company_user_required()
|
|
playlist = db.session.get(Playlist, playlist_id)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
name = (request.form.get("name") or "").strip()
|
|
if not name:
|
|
flash("Playlist name required", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
# Keep within DB column limit (String(120))
|
|
if len(name) > 120:
|
|
name = name[:120]
|
|
|
|
playlist.name = name
|
|
db.session.commit()
|
|
flash("Playlist renamed", "success")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
|
|
@bp.post("/playlists/<int:playlist_id>/schedule")
|
|
@login_required
|
|
def update_playlist_schedule(playlist_id: int):
|
|
"""Update playlist schedule window + priority flag."""
|
|
|
|
company_user_required()
|
|
playlist = db.session.get(Playlist, playlist_id)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
try:
|
|
start = _parse_schedule_local_to_utc(
|
|
date_str=request.form.get("schedule_start_date"),
|
|
time_str=request.form.get("schedule_start_time"),
|
|
)
|
|
end = _parse_schedule_local_to_utc(
|
|
date_str=request.form.get("schedule_end_date"),
|
|
time_str=request.form.get("schedule_end_time"),
|
|
)
|
|
except ValueError as e:
|
|
flash(str(e), "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
if start and end and end < start:
|
|
flash("End must be after start", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
playlist.schedule_start = start
|
|
playlist.schedule_end = end
|
|
db.session.commit()
|
|
|
|
flash("Schedule updated", "success")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
|
|
@bp.post("/playlists/<int:playlist_id>/schedule/delete")
|
|
@login_required
|
|
def clear_playlist_schedule(playlist_id: int):
|
|
"""Clear schedule for a playlist (sets start/end to NULL)."""
|
|
|
|
company_user_required()
|
|
playlist = db.session.get(Playlist, playlist_id)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
playlist.schedule_start = None
|
|
playlist.schedule_end = None
|
|
db.session.commit()
|
|
flash("Schedule removed", "success")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
|
|
@bp.post("/playlists/<int:playlist_id>/priority")
|
|
@login_required
|
|
def update_playlist_priority(playlist_id: int):
|
|
"""Update playlist priority flag."""
|
|
|
|
company_user_required()
|
|
playlist = db.session.get(Playlist, playlist_id)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
wants_json = (
|
|
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
|
|
or ("application/json" in (request.headers.get("Accept") or ""))
|
|
or request.is_json
|
|
)
|
|
|
|
# Accept both form and JSON payloads.
|
|
raw = request.form.get("is_priority")
|
|
if raw is None and request.is_json:
|
|
raw = (request.get_json(silent=True) or {}).get("is_priority")
|
|
|
|
playlist.is_priority = bool((raw or "").strip())
|
|
db.session.commit()
|
|
|
|
if wants_json:
|
|
return jsonify({"ok": True, "is_priority": bool(playlist.is_priority)})
|
|
|
|
flash("Priority updated", "success")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
|
|
@bp.post("/playlists/<int:playlist_id>/delete")
|
|
@login_required
|
|
def delete_playlist(playlist_id: int):
|
|
company_user_required()
|
|
playlist = db.session.get(Playlist, playlist_id)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
# Unassign from any displays in this company
|
|
Display.query.filter_by(company_id=current_user.company_id, assigned_playlist_id=playlist.id).update(
|
|
{"assigned_playlist_id": None}
|
|
)
|
|
|
|
# Remove from any display multi-playlist mappings in this company.
|
|
# Use a subquery to avoid a JOIN-based DELETE which is not supported on SQLite.
|
|
display_ids = [d.id for d in Display.query.filter_by(company_id=current_user.company_id).all()]
|
|
if display_ids:
|
|
DisplayPlaylist.query.filter(
|
|
DisplayPlaylist.display_id.in_(display_ids),
|
|
DisplayPlaylist.playlist_id == playlist.id,
|
|
).delete(synchronize_session=False)
|
|
|
|
# cleanup uploaded files for image/video items
|
|
for it in list(playlist.items):
|
|
if it.item_type in ("image", "video"):
|
|
_try_delete_upload(it.file_path, current_app.config["UPLOAD_FOLDER"])
|
|
|
|
db.session.delete(playlist)
|
|
db.session.commit()
|
|
flash("Playlist deleted", "success")
|
|
return redirect(url_for("company.dashboard"))
|
|
|
|
|
|
@bp.post("/playlists/<int:playlist_id>/items/reorder")
|
|
@login_required
|
|
def reorder_playlist_items(playlist_id: int):
|
|
"""Persist new ordering for playlist items.
|
|
|
|
Expects form data: order=<comma-separated item ids>.
|
|
"""
|
|
company_user_required()
|
|
playlist = db.session.get(Playlist, playlist_id)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
# Accept both form and JSON payloads.
|
|
order = (request.form.get("order") or "").strip()
|
|
if not order and request.is_json:
|
|
order = ((request.get_json(silent=True) or {}).get("order") or "").strip()
|
|
if not order:
|
|
abort(400)
|
|
|
|
try:
|
|
ids = [int(x) for x in order.split(",") if x.strip()]
|
|
except ValueError:
|
|
abort(400)
|
|
|
|
# Ensure ids belong to this playlist
|
|
existing = PlaylistItem.query.filter(PlaylistItem.playlist_id == playlist_id, PlaylistItem.id.in_(ids)).all()
|
|
existing_ids = {i.id for i in existing}
|
|
if len(existing_ids) != len(ids):
|
|
abort(400)
|
|
|
|
# Re-number positions starting at 1
|
|
id_to_item = {i.id: i for i in existing}
|
|
for pos, item_id in enumerate(ids, start=1):
|
|
id_to_item[item_id].position = pos
|
|
|
|
db.session.commit()
|
|
|
|
# Client currently doesn't require JSON, but returning JSON if requested
|
|
# helps debugging and future enhancements.
|
|
wants_json = (
|
|
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
|
|
or ("application/json" in (request.headers.get("Accept") or ""))
|
|
or request.is_json
|
|
)
|
|
if wants_json:
|
|
return jsonify({"ok": True})
|
|
return ("", 204)
|
|
|
|
|
|
@bp.post("/playlists/<int:playlist_id>/items")
|
|
@login_required
|
|
def add_playlist_item(playlist_id: int):
|
|
company_user_required()
|
|
playlist = db.session.get(Playlist, playlist_id)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
# Support AJAX/modal usage: return JSON when requested.
|
|
wants_json = (
|
|
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
|
|
or ("application/json" in (request.headers.get("Accept") or ""))
|
|
or (request.form.get("response") == "json")
|
|
)
|
|
|
|
def _json_error(message: str, status: int = 400):
|
|
return jsonify({"ok": False, "error": message}), status
|
|
|
|
item_type = (request.form.get("item_type") or "").strip().lower()
|
|
title = request.form.get("title", "").strip() or None
|
|
|
|
# Duration is only used for image/webpage. Video/YouTube plays until ended.
|
|
raw_duration = request.form.get("duration_seconds")
|
|
try:
|
|
duration = int(raw_duration) if raw_duration is not None else 10
|
|
except (TypeError, ValueError):
|
|
duration = 10
|
|
|
|
max_pos = (
|
|
db.session.query(db.func.max(PlaylistItem.position)).filter_by(playlist_id=playlist_id).scalar() or 0
|
|
)
|
|
pos = max_pos + 1
|
|
|
|
item = PlaylistItem(
|
|
playlist=playlist,
|
|
item_type=item_type,
|
|
title=title,
|
|
duration_seconds=max(1, duration),
|
|
position=pos,
|
|
)
|
|
|
|
# Enforce storage quota for uploads (image/video).
|
|
# Webpage/YouTube do not consume local storage.
|
|
# Note: querying the DB triggers an autoflush by default. Because `item` is not yet in the
|
|
# session, SQLAlchemy may emit warnings about relationship operations. We explicitly avoid
|
|
# autoflush while checking quota.
|
|
with db.session.no_autoflush:
|
|
company = db.session.get(Company, current_user.company_id)
|
|
if not company:
|
|
abort(404)
|
|
|
|
upload_root = current_app.config["UPLOAD_FOLDER"]
|
|
used_bytes = get_company_upload_bytes(upload_root, company.id)
|
|
usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes)
|
|
storage_max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None
|
|
|
|
if item_type in ("image", "video"):
|
|
if usage.get("is_exceeded"):
|
|
msg = _storage_limit_error_message(storage_max_human=storage_max_human)
|
|
if wants_json:
|
|
return _json_error(msg, 403)
|
|
flash(msg, "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
f = request.files.get("file")
|
|
if not f or not f.filename:
|
|
if wants_json:
|
|
return _json_error("File required")
|
|
flash("File required", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
filename = secure_filename(f.filename)
|
|
ext = os.path.splitext(filename)[1].lower()
|
|
|
|
if item_type == "image":
|
|
crop_mode = (request.form.get("crop_mode") or "16:9").strip().lower()
|
|
if ext not in ALLOWED_IMAGE_EXTENSIONS:
|
|
if wants_json:
|
|
return _json_error(
|
|
"Unsupported image type. Please upload one of: " + ", ".join(sorted(ALLOWED_IMAGE_EXTENSIONS))
|
|
)
|
|
flash("Unsupported image type", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
try:
|
|
item.file_path = _save_compressed_image(
|
|
f,
|
|
current_app.config["UPLOAD_FOLDER"],
|
|
current_user.company_id,
|
|
crop_mode=crop_mode,
|
|
)
|
|
|
|
# Post-save quota check for images as well.
|
|
# (We can't reliably estimate image size before compression.)
|
|
if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0:
|
|
try:
|
|
used_after = get_company_upload_bytes(upload_root, company.id)
|
|
except Exception:
|
|
used_after = None
|
|
if used_after is not None:
|
|
usage_after = compute_storage_usage(
|
|
used_bytes=used_after,
|
|
max_bytes=company.storage_max_bytes,
|
|
)
|
|
if usage_after.get("is_exceeded"):
|
|
# Remove the saved file and reject.
|
|
try:
|
|
_try_delete_upload(item.file_path, upload_root)
|
|
except Exception:
|
|
pass
|
|
msg = _storage_limit_error_message(storage_max_human=storage_max_human)
|
|
if wants_json:
|
|
return _json_error(msg, 403)
|
|
flash(msg, "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
except Exception:
|
|
if wants_json:
|
|
return _json_error("Failed to process image upload", 500)
|
|
flash("Failed to process image upload", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
else:
|
|
if ext not in ALLOWED_VIDEO_EXTENSIONS:
|
|
if wants_json:
|
|
return _json_error(
|
|
"Unsupported video type. Please upload one of: " + ", ".join(sorted(ALLOWED_VIDEO_EXTENSIONS))
|
|
)
|
|
flash("Unsupported video type", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
# Enforce video size limit (250MB) with a clear error message.
|
|
# This is separate from Flask's MAX_CONTENT_LENGTH, which caps the full request.
|
|
size = None
|
|
try:
|
|
size = getattr(f, "content_length", None)
|
|
# Werkzeug may report 0 for unknown per-part length.
|
|
if (size is None or size <= 0) and hasattr(f, "stream"):
|
|
# Measure by seeking in the file-like stream.
|
|
pos = f.stream.tell()
|
|
f.stream.seek(0, os.SEEK_END)
|
|
size = f.stream.tell()
|
|
f.stream.seek(pos, os.SEEK_SET)
|
|
except Exception:
|
|
size = None
|
|
|
|
if size is not None and size > MAX_VIDEO_BYTES:
|
|
msg = "Video file too large. Maximum allowed size is 250MB."
|
|
if wants_json:
|
|
return _json_error(msg, 413)
|
|
flash(msg, "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
# Keep as-is but always rename to a UUID.
|
|
unique = uuid.uuid4().hex + ext
|
|
company_dir = ensure_company_upload_dir(current_app.config["UPLOAD_FOLDER"], current_user.company_id)
|
|
save_path = os.path.join(company_dir, unique)
|
|
f.save(save_path)
|
|
|
|
# Post-save quota check: clients may not report size reliably.
|
|
# If quota is exceeded after saving, delete file and reject.
|
|
if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0:
|
|
try:
|
|
used_after = get_company_upload_bytes(upload_root, company.id)
|
|
except Exception:
|
|
used_after = None
|
|
if used_after is not None:
|
|
usage_after = compute_storage_usage(
|
|
used_bytes=used_after,
|
|
max_bytes=company.storage_max_bytes,
|
|
)
|
|
if usage_after.get("is_exceeded"):
|
|
try:
|
|
os.remove(save_path)
|
|
except OSError:
|
|
pass
|
|
msg = _storage_limit_error_message(storage_max_human=storage_max_human)
|
|
if wants_json:
|
|
return _json_error(msg, 403)
|
|
flash(msg, "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
# Safety check: validate using the actual saved file size.
|
|
# (Some clients/framework layers don't reliably report per-part size.)
|
|
try:
|
|
saved_size = os.path.getsize(save_path)
|
|
except OSError:
|
|
saved_size = None
|
|
|
|
if saved_size is not None and saved_size > MAX_VIDEO_BYTES:
|
|
try:
|
|
os.remove(save_path)
|
|
except OSError:
|
|
pass
|
|
msg = "Video file too large. Maximum allowed size is 250MB."
|
|
if wants_json:
|
|
return _json_error(msg, 413)
|
|
flash(msg, "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
item.file_path = f"uploads/{int(current_user.company_id)}/{unique}"
|
|
|
|
elif item_type == "webpage":
|
|
url = request.form.get("url", "").strip()
|
|
if not url:
|
|
if wants_json:
|
|
return _json_error("URL required")
|
|
flash("URL required", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
item.url = url
|
|
|
|
elif item_type == "youtube":
|
|
raw = request.form.get("url", "").strip()
|
|
if not raw:
|
|
if wants_json:
|
|
return _json_error("YouTube URL required")
|
|
flash("YouTube URL required", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
embed_url = _normalize_youtube_embed_url(raw)
|
|
if not embed_url:
|
|
if wants_json:
|
|
return _json_error("Invalid YouTube URL")
|
|
flash("Invalid YouTube URL", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
item.url = embed_url
|
|
|
|
else:
|
|
if wants_json:
|
|
return _json_error("Invalid item type")
|
|
flash("Invalid item type", "danger")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
db.session.add(item)
|
|
db.session.commit()
|
|
|
|
if wants_json:
|
|
return jsonify(
|
|
{
|
|
"ok": True,
|
|
"item": {
|
|
"id": item.id,
|
|
"playlist_id": item.playlist_id,
|
|
"position": item.position,
|
|
"item_type": item.item_type,
|
|
"title": item.title,
|
|
"file_path": item.file_path,
|
|
"url": item.url,
|
|
"duration_seconds": item.duration_seconds,
|
|
},
|
|
}
|
|
)
|
|
|
|
flash("Item added", "success")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
|
|
@bp.post("/items/<int:item_id>/delete")
|
|
@login_required
|
|
def delete_item(item_id: int):
|
|
company_user_required()
|
|
item = db.session.get(PlaylistItem, item_id)
|
|
if not item or item.playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
playlist_id = item.playlist_id
|
|
|
|
if item.item_type in ("image", "video"):
|
|
_try_delete_upload(item.file_path, current_app.config["UPLOAD_FOLDER"])
|
|
|
|
db.session.delete(item)
|
|
db.session.commit()
|
|
flash("Item deleted", "success")
|
|
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
|
|
|
|
|
|
@bp.post("/items/<int:item_id>/duration")
|
|
@login_required
|
|
def update_item_duration(item_id: int):
|
|
"""Update duration_seconds for a playlist item.
|
|
|
|
Used from the playlist overview (inline edit).
|
|
"""
|
|
|
|
company_user_required()
|
|
|
|
item = db.session.get(PlaylistItem, item_id)
|
|
if not item or item.playlist.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
# Duration only applies to images/webpages; videos play until ended.
|
|
if item.item_type == "video":
|
|
return jsonify({"ok": False, "error": "Duration cannot be set for video items"}), 400
|
|
|
|
wants_json = (
|
|
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
|
|
or ("application/json" in (request.headers.get("Accept") or ""))
|
|
or request.is_json
|
|
)
|
|
|
|
def _json_error(message: str, status: int = 400):
|
|
return jsonify({"ok": False, "error": message}), status
|
|
|
|
raw = request.form.get("duration_seconds")
|
|
if raw is None and request.is_json:
|
|
raw = (request.get_json(silent=True) or {}).get("duration_seconds")
|
|
|
|
try:
|
|
duration = int(raw)
|
|
except (TypeError, ValueError):
|
|
if wants_json:
|
|
return _json_error("Invalid duration")
|
|
abort(400)
|
|
|
|
item.duration_seconds = max(1, duration)
|
|
db.session.commit()
|
|
|
|
if wants_json:
|
|
return jsonify({"ok": True, "duration_seconds": item.duration_seconds})
|
|
return ("", 204)
|
|
|
|
|
|
@bp.post("/displays/<int:display_id>/assign")
|
|
@login_required
|
|
def assign_playlist(display_id: int):
|
|
company_user_required()
|
|
display = db.session.get(Display, display_id)
|
|
if not display or display.company_id != current_user.company_id:
|
|
abort(404)
|
|
playlist_id = request.form.get("playlist_id")
|
|
if not playlist_id:
|
|
display.assigned_playlist_id = None
|
|
else:
|
|
playlist = db.session.get(Playlist, int(playlist_id))
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(400)
|
|
display.assigned_playlist_id = playlist.id
|
|
db.session.commit()
|
|
flash("Display assignment updated", "success")
|
|
return redirect(url_for("company.dashboard"))
|
|
|
|
|
|
@bp.post("/displays/<int:display_id>")
|
|
@login_required
|
|
def update_display(display_id: int):
|
|
"""Update display metadata.
|
|
|
|
Supports both form POST (full update) and JSON/AJAX (partial update).
|
|
Company users can set a short description per display and assign a playlist.
|
|
"""
|
|
|
|
company_user_required()
|
|
|
|
display = db.session.get(Display, display_id)
|
|
if not display or display.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
wants_json = (
|
|
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
|
|
or ("application/json" in (request.headers.get("Accept") or ""))
|
|
or request.is_json
|
|
)
|
|
|
|
def _json_error(message: str, status: int = 400):
|
|
return jsonify({"ok": False, "error": message}), status
|
|
|
|
def _normalize_transition(val: str | None) -> str | None:
|
|
v = (val or "").strip().lower()
|
|
if not v:
|
|
return None
|
|
if v not in {"none", "fade", "slide"}:
|
|
return None
|
|
return v
|
|
|
|
# Inputs from either form or JSON
|
|
payload = request.get_json(silent=True) if request.is_json else None
|
|
|
|
# Description (short, optional)
|
|
if request.is_json:
|
|
if payload is None:
|
|
return _json_error("Invalid JSON")
|
|
if "description" in payload:
|
|
desc = (payload.get("description") or "").strip() or None
|
|
if desc is not None:
|
|
desc = desc[:200]
|
|
display.description = desc
|
|
else:
|
|
# form POST implies full update
|
|
desc = (request.form.get("description") or "").strip() or None
|
|
if desc is not None:
|
|
desc = desc[:200]
|
|
display.description = desc
|
|
|
|
# Slide transition
|
|
if request.is_json:
|
|
if payload is None:
|
|
return _json_error("Invalid JSON")
|
|
if "transition" in payload:
|
|
display.transition = _normalize_transition(payload.get("transition"))
|
|
else:
|
|
# Form POST implies full update
|
|
display.transition = _normalize_transition(request.form.get("transition"))
|
|
|
|
# Overlay toggle
|
|
if request.is_json:
|
|
if payload is None:
|
|
return _json_error("Invalid JSON")
|
|
if "show_overlay" in payload:
|
|
raw = payload.get("show_overlay")
|
|
# Accept common truthy representations.
|
|
if isinstance(raw, bool):
|
|
display.show_overlay = raw
|
|
elif raw in (1, 0):
|
|
display.show_overlay = bool(raw)
|
|
else:
|
|
s = ("" if raw is None else str(raw)).strip().lower()
|
|
display.show_overlay = s in {"1", "true", "yes", "on"}
|
|
else:
|
|
# Form POST implies full update
|
|
raw = request.form.get("show_overlay")
|
|
if raw is not None:
|
|
display.show_overlay = (raw or "").strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
# Playlist assignment
|
|
if request.is_json:
|
|
if "playlist_id" in payload:
|
|
playlist_id_val = payload.get("playlist_id")
|
|
if playlist_id_val in (None, ""):
|
|
display.assigned_playlist_id = None
|
|
else:
|
|
try:
|
|
playlist_id_int = int(playlist_id_val)
|
|
except (TypeError, ValueError):
|
|
return _json_error("Invalid playlist_id")
|
|
playlist = db.session.get(Playlist, playlist_id_int)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
return _json_error("Invalid playlist")
|
|
display.assigned_playlist_id = playlist.id
|
|
else:
|
|
playlist_id = (request.form.get("playlist_id") or "").strip()
|
|
if not playlist_id:
|
|
display.assigned_playlist_id = None
|
|
else:
|
|
try:
|
|
playlist_id_int = int(playlist_id)
|
|
except ValueError:
|
|
abort(400)
|
|
playlist = db.session.get(Playlist, playlist_id_int)
|
|
if not playlist or playlist.company_id != current_user.company_id:
|
|
abort(400)
|
|
display.assigned_playlist_id = playlist.id
|
|
|
|
db.session.commit()
|
|
|
|
if wants_json:
|
|
return jsonify(
|
|
{
|
|
"ok": True,
|
|
"display": {
|
|
"id": display.id,
|
|
"name": display.name,
|
|
"description": display.description,
|
|
"transition": display.transition,
|
|
"show_overlay": bool(display.show_overlay),
|
|
"assigned_playlist_id": display.assigned_playlist_id,
|
|
},
|
|
}
|
|
)
|
|
|
|
flash("Display updated", "success")
|
|
return redirect(url_for("company.dashboard"))
|
|
|
|
|
|
@bp.post("/displays/<int:display_id>/playlists")
|
|
@login_required
|
|
def update_display_playlists(display_id: int):
|
|
"""Set active playlists for a display.
|
|
|
|
Expects JSON: { playlist_ids: [1,2,3] }
|
|
Returns JSON with the updated assigned playlist ids.
|
|
|
|
Note: if playlist_ids is empty, the display will have no active playlists.
|
|
For backwards compatibility, this does NOT modify Display.assigned_playlist_id.
|
|
"""
|
|
|
|
company_user_required()
|
|
|
|
display = db.session.get(Display, display_id)
|
|
if not display or display.company_id != current_user.company_id:
|
|
abort(404)
|
|
|
|
if not request.is_json:
|
|
abort(400)
|
|
|
|
payload = request.get_json(silent=True) or {}
|
|
raw_ids = payload.get("playlist_ids")
|
|
if raw_ids is None:
|
|
return jsonify({"ok": False, "error": "playlist_ids is required"}), 400
|
|
|
|
if not isinstance(raw_ids, list):
|
|
return jsonify({"ok": False, "error": "playlist_ids must be a list"}), 400
|
|
|
|
playlist_ids: list[int] = []
|
|
try:
|
|
for x in raw_ids:
|
|
if x in (None, ""):
|
|
continue
|
|
playlist_ids.append(int(x))
|
|
except (TypeError, ValueError):
|
|
return jsonify({"ok": False, "error": "Invalid playlist id"}), 400
|
|
|
|
# Ensure playlists belong to this company.
|
|
if playlist_ids:
|
|
allowed = {
|
|
p.id
|
|
for p in Playlist.query.filter(
|
|
Playlist.company_id == current_user.company_id,
|
|
Playlist.id.in_(playlist_ids),
|
|
).all()
|
|
}
|
|
if len(allowed) != len(set(playlist_ids)):
|
|
return jsonify({"ok": False, "error": "One or more playlists are invalid"}), 400
|
|
|
|
# Replace mapping rows.
|
|
DisplayPlaylist.query.filter_by(display_id=display.id).delete(synchronize_session=False)
|
|
now = datetime.utcnow()
|
|
for pos, pid in enumerate(dict.fromkeys(playlist_ids), start=1):
|
|
db.session.add(
|
|
DisplayPlaylist(
|
|
display_id=display.id,
|
|
playlist_id=pid,
|
|
position=pos,
|
|
created_at=now,
|
|
)
|
|
)
|
|
|
|
db.session.commit()
|
|
|
|
active_ids = [
|
|
r[0]
|
|
for r in db.session.query(DisplayPlaylist.playlist_id)
|
|
.filter(DisplayPlaylist.display_id == display.id)
|
|
.order_by(DisplayPlaylist.position.asc(), DisplayPlaylist.playlist_id.asc())
|
|
.all()
|
|
]
|
|
|
|
return jsonify(
|
|
{
|
|
"ok": True,
|
|
"display": {
|
|
"id": display.id,
|
|
"active_playlist_ids": active_ids,
|
|
},
|
|
}
|
|
)
|