Files
openslide/app/routes/company.py
2026-01-25 18:00:12 +01:00

1480 lines
52 KiB
Python

import os
import uuid
from urllib.parse import urlparse, parse_qs
from datetime import datetime, timedelta
from flask import Blueprint, abort, current_app, flash, jsonify, redirect, render_template, request, url_for
from flask_login import current_user, login_required
from werkzeug.utils import secure_filename
from PIL import Image, ImageOps
from ..extensions import db
from ..uploads import (
abs_upload_path,
compute_storage_usage,
ensure_company_upload_dir,
get_company_upload_bytes,
is_valid_upload_relpath,
)
from ..models import AppSettings, Company, Display, DisplayPlaylist, DisplaySession, Playlist, PlaylistItem, User
from ..email_utils import send_email
from ..auth_tokens import make_password_reset_token
ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".tiff"}
ALLOWED_VIDEO_EXTENSIONS = {".mp4", ".webm", ".ogg", ".mov", ".m4v"}
# Overlay is a transparent PNG that sits on top of a display.
ALLOWED_OVERLAY_EXTENSIONS = {".png"}
# Keep overlay size reasonable; it will be stretched to fit anyway.
# (PNG overlays are typically small-ish; 10MB is generous.)
MAX_OVERLAY_BYTES = 10 * 1024 * 1024
# Videos should have a maximum upload size of 250MB
MAX_VIDEO_BYTES = 250 * 1024 * 1024
def _normalize_youtube_embed_url(raw: str) -> str | None:
"""Normalize a user-provided YouTube URL into a privacy-friendly embed base URL.
Returns:
https://www.youtube-nocookie.com/embed/<VIDEO_ID>
or None if we cannot parse a valid video id.
"""
val = (raw or "").strip()
if not val:
return None
# Be forgiving for inputs like "youtu.be/<id>".
if not val.startswith("http://") and not val.startswith("https://"):
val = "https://" + val
try:
u = urlparse(val)
except Exception:
return None
host = (u.netloc or "").lower()
host = host[4:] if host.startswith("www.") else host
video_id: str | None = None
path = (u.path or "").strip("/")
if host in {"youtube.com", "m.youtube.com"}:
# /watch?v=<id>
if path == "watch":
v = (parse_qs(u.query).get("v") or [None])[0]
video_id = v
# /embed/<id>
elif path.startswith("embed/"):
video_id = path.split("/", 1)[1]
# /shorts/<id>
elif path.startswith("shorts/"):
video_id = path.split("/", 1)[1]
elif host == "youtu.be":
# /<id>
if path:
video_id = path.split("/", 1)[0]
# Basic validation: YouTube IDs are typically 11 chars (letters/digits/_/-)
if not video_id:
return None
video_id = video_id.strip()
if len(video_id) != 11:
return None
for ch in video_id:
if not (ch.isalnum() or ch in {"_", "-"}):
return None
return f"https://www.youtube-nocookie.com/embed/{video_id}"
def _center_crop_to_aspect(img: Image.Image, aspect_w: int, aspect_h: int) -> Image.Image:
"""Return a center-cropped copy of img to the desired aspect ratio."""
w, h = img.size
if w <= 0 or h <= 0:
return img
target = aspect_w / aspect_h
current = w / h
# If image is wider than target: crop width; else crop height.
if current > target:
new_w = max(1, int(h * target))
left = max(0, (w - new_w) // 2)
return img.crop((left, 0, left + new_w, h))
else:
new_h = max(1, int(w / target))
top = max(0, (h - new_h) // 2)
return img.crop((0, top, w, top + new_h))
def _save_compressed_image(
uploaded_file,
upload_root: str,
company_id: int | None,
crop_mode: str | None = None,
) -> str:
"""Save an uploaded image as a compressed WEBP file.
crop_mode:
- "16:9" : center-crop to landscape
- "9:16" : center-crop to portrait
- "none" : no crop
Returns relative file path under /static (e.g. uploads/<uuid>.webp)
"""
unique = f"{uuid.uuid4().hex}.webp"
company_dir = ensure_company_upload_dir(upload_root, company_id)
save_path = os.path.join(company_dir, unique)
cm = (crop_mode or "16:9").strip().lower()
if cm not in {"16:9", "9:16", "none"}:
cm = "16:9"
img = Image.open(uploaded_file)
# Respect EXIF orientation (common for phone photos)
img = ImageOps.exif_transpose(img)
# Normalize mode for webp
if img.mode not in ("RGB", "RGBA"):
img = img.convert("RGB")
# Optional crop
# NOTE: The front-end may already upload a cropped image (canvas export), but we still
# enforce aspect + maximum output size here for consistency.
target_w = int(current_app.config.get("IMAGE_CROP_TARGET_W", 1920) or 1920)
target_h = int(current_app.config.get("IMAGE_CROP_TARGET_H", 1080) or 1080)
target_w = max(1, target_w)
target_h = max(1, target_h)
if cm == "16:9":
img = _center_crop_to_aspect(img, 16, 9)
max_box = (target_w, target_h)
elif cm == "9:16":
img = _center_crop_to_aspect(img, 9, 16)
max_box = (target_h, target_w)
else:
# No crop: allow both portrait and landscape up to target_w/target_h on the longest side.
max_box = (max(target_w, target_h),) * 2
# Resize down if very large (keeps aspect ratio)
img.thumbnail(max_box)
img.save(save_path, format="WEBP", quality=80, method=6)
company_seg = str(int(company_id)) if company_id is not None else "0"
return f"uploads/{company_seg}/{unique}"
def _try_delete_upload(file_path: str | None, upload_root: str):
"""Best-effort delete of an uploaded media file."""
if not file_path:
return
if not is_valid_upload_relpath(file_path):
return
abs_path = abs_upload_path(upload_root, file_path)
if not abs_path:
return
try:
if os.path.isfile(abs_path):
os.remove(abs_path)
except Exception:
# Ignore cleanup failures
pass
def _save_overlay_png(
uploaded_file,
upload_root: str,
company_id: int | None,
) -> str:
"""Save a company overlay as PNG under the company's upload dir.
Returns relative file path under /static (uploads/<company_id>/overlay_<uuid>.png)
"""
unique = f"overlay_{uuid.uuid4().hex}.png"
company_dir = ensure_company_upload_dir(upload_root, company_id)
save_path = os.path.join(company_dir, unique)
# Validate file is a PNG and is 16:9-ish.
# Use magic bytes (signature) instead of relying on Pillow's img.format,
# which can be unreliable if the stream position isn't at 0.
try:
if hasattr(uploaded_file, "stream"):
uploaded_file.stream.seek(0)
except Exception:
pass
try:
sig = uploaded_file.stream.read(8) if hasattr(uploaded_file, "stream") else uploaded_file.read(8)
except Exception:
sig = b""
# PNG file signature: 89 50 4E 47 0D 0A 1A 0A
if sig != b"\x89PNG\r\n\x1a\n":
raise ValueError("not_png")
# Rewind before Pillow parses.
try:
if hasattr(uploaded_file, "stream"):
uploaded_file.stream.seek(0)
except Exception:
pass
img = Image.open(uploaded_file)
img = ImageOps.exif_transpose(img)
w, h = img.size
if not w or not h:
raise ValueError("invalid")
# Allow some tolerance (overlays may include extra transparent padding).
aspect = w / h
target = 16 / 9
if abs(aspect - target) > 0.15: # ~15% tolerance
raise ValueError("not_16_9")
# Ensure we preserve alpha; normalize mode.
if img.mode not in ("RGBA", "LA"):
# Convert to RGBA so transparency is supported consistently.
img = img.convert("RGBA")
img.save(save_path, format="PNG", optimize=True)
company_seg = str(int(company_id)) if company_id is not None else "0"
return f"uploads/{company_seg}/{unique}"
bp = Blueprint("company", __name__, url_prefix="/company")
def _parse_schedule_local_to_utc(*, date_str: str | None, time_str: str | None) -> datetime | None:
"""Parse local date+time form inputs into a naive UTC datetime.
Inputs come from <input type="date"> and <input type="time">.
We interpret them as *local* time of the server.
Note: this project currently does not store per-company timezone; in most deployments
server timezone matches users. If you need per-company timezone later, we can extend
this function.
"""
d = (date_str or "").strip()
t = (time_str or "").strip()
if not d and not t:
return None
if not d or not t:
# Require both parts for clarity
raise ValueError("Both date and time are required")
# Basic parsing: YYYY-MM-DD and HH:MM
try:
year, month, day = [int(x) for x in d.split("-")]
hh, mm = [int(x) for x in t.split(":")[:2]]
except Exception:
raise ValueError("Invalid date/time")
# Interpret as local time, convert to UTC naive
local_dt = datetime(year, month, day, hh, mm)
# local_dt.timestamp() uses local timezone when naive.
utc_ts = local_dt.timestamp()
return datetime.utcfromtimestamp(utc_ts)
def company_user_required():
if not current_user.is_authenticated:
abort(403)
if current_user.is_admin:
abort(403)
if not current_user.company_id:
abort(403)
def _format_bytes(num: int) -> str:
num = max(0, int(num or 0))
units = ["B", "KB", "MB", "GB", "TB"]
size = float(num)
idx = 0
while size >= 1024.0 and idx < len(units) - 1:
size /= 1024.0
idx += 1
if idx == 0:
return f"{int(size)} {units[idx]}"
return f"{size:.1f} {units[idx]}"
def _storage_limit_error_message(*, storage_max_human: str | None) -> str:
if storage_max_human:
return f"Storage limit reached. Maximum allowed storage is {storage_max_human}. Please delete items to free space."
return "Storage limit reached. Please delete items to free space."
@bp.get("/my-company")
@login_required
def my_company():
company_user_required()
company = db.session.get(Company, current_user.company_id)
if not company:
abort(404)
# Stats
display_count = Display.query.filter_by(company_id=company.id).count()
playlist_count = Playlist.query.filter_by(company_id=company.id).count()
user_count = User.query.filter_by(company_id=company.id, is_admin=False).count()
item_count = (
PlaylistItem.query.join(Playlist, PlaylistItem.playlist_id == Playlist.id)
.filter(Playlist.company_id == company.id)
.count()
)
# Active display sessions (best-effort, based on same TTL as /api)
cutoff = datetime.utcnow() - timedelta(seconds=90)
active_sessions = (
DisplaySession.query.join(Display, DisplaySession.display_id == Display.id)
.filter(Display.company_id == company.id, DisplaySession.last_seen_at >= cutoff)
.count()
)
# Storage usage
upload_root = current_app.config["UPLOAD_FOLDER"]
used_bytes = get_company_upload_bytes(upload_root, company.id)
usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes)
max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None
users = User.query.filter_by(company_id=company.id, is_admin=False).order_by(User.email.asc()).all()
overlay_url = None
if company.overlay_file_path and is_valid_upload_relpath(company.overlay_file_path):
overlay_url = url_for("static", filename=company.overlay_file_path)
return render_template(
"company/my_company.html",
company=company,
users=users,
overlay_url=overlay_url,
stats={
"users": user_count,
"displays": display_count,
"playlists": playlist_count,
"items": item_count,
"active_sessions": active_sessions,
"storage_bytes": used_bytes,
"storage_human": _format_bytes(used_bytes),
"storage_max_bytes": usage.get("max_bytes"),
"storage_max_human": max_human,
"storage_used_percent": usage.get("used_percent"),
},
)
@bp.post("/my-company/overlay")
@login_required
def upload_company_overlay():
"""Upload/replace the per-company 16:9 PNG overlay."""
company_user_required()
company = db.session.get(Company, current_user.company_id)
if not company:
abort(404)
f = request.files.get("overlay")
if not f or not f.filename:
flash("Overlay file is required", "danger")
return redirect(url_for("company.my_company"))
filename = secure_filename(f.filename)
ext = os.path.splitext(filename)[1].lower()
if ext not in ALLOWED_OVERLAY_EXTENSIONS:
flash("Unsupported overlay type. Please upload a PNG file.", "danger")
return redirect(url_for("company.my_company"))
# Enforce size limit best-effort.
size = None
try:
size = getattr(f, "content_length", None)
if (size is None or size <= 0) and hasattr(f, "stream"):
pos = f.stream.tell()
f.stream.seek(0, os.SEEK_END)
size = f.stream.tell()
f.stream.seek(pos, os.SEEK_SET)
except Exception:
size = None
if size is not None and size > MAX_OVERLAY_BYTES:
flash("Overlay file too large. Maximum allowed size is 10MB.", "danger")
return redirect(url_for("company.my_company"))
# Enforce storage quota too (overlay is stored in the same uploads folder).
upload_root = current_app.config["UPLOAD_FOLDER"]
used_bytes = get_company_upload_bytes(upload_root, company.id)
usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes)
storage_max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None
if usage.get("is_exceeded"):
flash(_storage_limit_error_message(storage_max_human=storage_max_human), "danger")
return redirect(url_for("company.my_company"))
old_path = company.overlay_file_path
try:
new_relpath = _save_overlay_png(f, upload_root, company.id)
except ValueError as e:
code = str(e)
if code == "not_png":
flash("Overlay must be a PNG file.", "danger")
elif code == "not_16_9":
flash("Overlay should be 16:9 (landscape).", "danger")
else:
flash("Failed to process overlay upload.", "danger")
return redirect(url_for("company.my_company"))
except Exception:
flash("Failed to process overlay upload.", "danger")
return redirect(url_for("company.my_company"))
# Post-save quota check (like images) because PNG size is unknown until saved.
if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0:
try:
used_after = get_company_upload_bytes(upload_root, company.id)
except Exception:
used_after = None
if used_after is not None:
usage_after = compute_storage_usage(used_bytes=used_after, max_bytes=company.storage_max_bytes)
if usage_after.get("is_exceeded"):
_try_delete_upload(new_relpath, upload_root)
flash(_storage_limit_error_message(storage_max_human=storage_max_human), "danger")
return redirect(url_for("company.my_company"))
company.overlay_file_path = new_relpath
db.session.commit()
# Clean up the old overlay file.
if old_path and old_path != new_relpath:
_try_delete_upload(old_path, upload_root)
flash("Overlay updated.", "success")
return redirect(url_for("company.my_company"))
@bp.post("/my-company/overlay/delete")
@login_required
def delete_company_overlay():
company_user_required()
company = db.session.get(Company, current_user.company_id)
if not company:
abort(404)
upload_root = current_app.config["UPLOAD_FOLDER"]
old_path = company.overlay_file_path
company.overlay_file_path = None
db.session.commit()
_try_delete_upload(old_path, upload_root)
flash("Overlay removed.", "success")
return redirect(url_for("company.my_company"))
@bp.post("/my-company/invite")
@login_required
def invite_user():
company_user_required()
email = (request.form.get("email", "") or "").strip().lower()
if not email:
flash("Email is required", "danger")
return redirect(url_for("company.my_company"))
if User.query.filter_by(email=email).first():
flash("Email already exists", "danger")
return redirect(url_for("company.my_company"))
company = db.session.get(Company, current_user.company_id)
if not company:
abort(404)
# Create user without password; they must set it via reset link.
u = User(is_admin=False, company=company)
u.email = email
u.username = email # keep backwards-compatible username column in sync
u.password_hash = None
db.session.add(u)
db.session.commit()
token = make_password_reset_token(secret_key=current_app.config["SECRET_KEY"], user_id=u.id)
settings = db.session.get(AppSettings, 1)
if settings and settings.public_domain:
path = url_for("auth.reset_password", token=token, _external=False)
reset_url = f"https://{settings.public_domain}{path}"
else:
reset_url = url_for("auth.reset_password", token=token, _external=True)
body = (
f"You have been invited to {company.name} on Signage.\n\n"
"Set your password using this link (valid for 30 minutes):\n"
f"{reset_url}\n"
)
try:
send_email(to_email=u.email, subject=f"Invite: {company.name} (set your password)", body_text=body)
except Exception:
# Roll back created user if we cannot send invite email, to avoid orphan accounts.
db.session.delete(u)
db.session.commit()
flash(
"Failed to send invite email. Please check SMTP configuration (SMTP_* env vars).",
"danger",
)
return redirect(url_for("company.my_company"))
flash(f"Invite sent to {email}", "success")
return redirect(url_for("company.my_company"))
@bp.post("/my-company/users/<int:user_id>/delete")
@login_required
def delete_company_user(user_id: int):
company_user_required()
if int(user_id) == int(current_user.id):
flash("You cannot delete yourself", "danger")
return redirect(url_for("company.my_company"))
u = db.session.get(User, user_id)
if not u or u.is_admin or u.company_id != current_user.company_id:
abort(404)
email = u.email
db.session.delete(u)
db.session.commit()
flash(f"User '{email}' deleted", "success")
return redirect(url_for("company.my_company"))
@bp.get("/")
@login_required
def dashboard():
company_user_required()
playlists = Playlist.query.filter_by(company_id=current_user.company_id).order_by(Playlist.name.asc()).all()
displays = Display.query.filter_by(company_id=current_user.company_id).order_by(Display.name.asc()).all()
playlists_json = [{"id": p.id, "name": p.name} for p in playlists]
return render_template(
"company/dashboard.html",
playlists=playlists,
now_utc=datetime.utcnow(),
playlists_json=playlists_json,
displays=displays,
)
@bp.post("/playlists")
@login_required
def create_playlist():
company_user_required()
name = request.form.get("name", "").strip()
if not name:
flash("Playlist name required", "danger")
return redirect(url_for("company.dashboard"))
p = Playlist(company_id=current_user.company_id, name=name)
db.session.add(p)
db.session.commit()
flash("Playlist created", "success")
return redirect(url_for("company.playlist_detail", playlist_id=p.id))
@bp.get("/playlists/<int:playlist_id>")
@login_required
def playlist_detail(playlist_id: int):
company_user_required()
playlist = db.session.get(Playlist, playlist_id)
if not playlist or playlist.company_id != current_user.company_id:
abort(404)
return render_template("company/playlist_detail.html", playlist=playlist, now_utc=datetime.utcnow())
@bp.post("/playlists/<int:playlist_id>")
@login_required
def update_playlist(playlist_id: int):
"""Update playlist metadata.
Currently supports renaming the playlist from the playlist detail (edit) page.
"""
company_user_required()
playlist = db.session.get(Playlist, playlist_id)
if not playlist or playlist.company_id != current_user.company_id:
abort(404)
name = (request.form.get("name") or "").strip()
if not name:
flash("Playlist name required", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
# Keep within DB column limit (String(120))
if len(name) > 120:
name = name[:120]
playlist.name = name
db.session.commit()
flash("Playlist renamed", "success")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
@bp.post("/playlists/<int:playlist_id>/schedule")
@login_required
def update_playlist_schedule(playlist_id: int):
"""Update playlist schedule window + priority flag."""
company_user_required()
playlist = db.session.get(Playlist, playlist_id)
if not playlist or playlist.company_id != current_user.company_id:
abort(404)
try:
start = _parse_schedule_local_to_utc(
date_str=request.form.get("schedule_start_date"),
time_str=request.form.get("schedule_start_time"),
)
end = _parse_schedule_local_to_utc(
date_str=request.form.get("schedule_end_date"),
time_str=request.form.get("schedule_end_time"),
)
except ValueError as e:
flash(str(e), "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
if start and end and end < start:
flash("End must be after start", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
playlist.schedule_start = start
playlist.schedule_end = end
db.session.commit()
flash("Schedule updated", "success")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
@bp.post("/playlists/<int:playlist_id>/schedule/delete")
@login_required
def clear_playlist_schedule(playlist_id: int):
"""Clear schedule for a playlist (sets start/end to NULL)."""
company_user_required()
playlist = db.session.get(Playlist, playlist_id)
if not playlist or playlist.company_id != current_user.company_id:
abort(404)
playlist.schedule_start = None
playlist.schedule_end = None
db.session.commit()
flash("Schedule removed", "success")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
@bp.post("/playlists/<int:playlist_id>/priority")
@login_required
def update_playlist_priority(playlist_id: int):
"""Update playlist priority flag."""
company_user_required()
playlist = db.session.get(Playlist, playlist_id)
if not playlist or playlist.company_id != current_user.company_id:
abort(404)
wants_json = (
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
or ("application/json" in (request.headers.get("Accept") or ""))
or request.is_json
)
# Accept both form and JSON payloads.
raw = request.form.get("is_priority")
if raw is None and request.is_json:
raw = (request.get_json(silent=True) or {}).get("is_priority")
playlist.is_priority = bool((raw or "").strip())
db.session.commit()
if wants_json:
return jsonify({"ok": True, "is_priority": bool(playlist.is_priority)})
flash("Priority updated", "success")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
@bp.post("/playlists/<int:playlist_id>/delete")
@login_required
def delete_playlist(playlist_id: int):
company_user_required()
playlist = db.session.get(Playlist, playlist_id)
if not playlist or playlist.company_id != current_user.company_id:
abort(404)
# Unassign from any displays in this company
Display.query.filter_by(company_id=current_user.company_id, assigned_playlist_id=playlist.id).update(
{"assigned_playlist_id": None}
)
# Remove from any display multi-playlist mappings in this company.
# Use a subquery to avoid a JOIN-based DELETE which is not supported on SQLite.
display_ids = [d.id for d in Display.query.filter_by(company_id=current_user.company_id).all()]
if display_ids:
DisplayPlaylist.query.filter(
DisplayPlaylist.display_id.in_(display_ids),
DisplayPlaylist.playlist_id == playlist.id,
).delete(synchronize_session=False)
# cleanup uploaded files for image/video items
for it in list(playlist.items):
if it.item_type in ("image", "video"):
_try_delete_upload(it.file_path, current_app.config["UPLOAD_FOLDER"])
db.session.delete(playlist)
db.session.commit()
flash("Playlist deleted", "success")
return redirect(url_for("company.dashboard"))
@bp.post("/playlists/<int:playlist_id>/items/reorder")
@login_required
def reorder_playlist_items(playlist_id: int):
"""Persist new ordering for playlist items.
Expects form data: order=<comma-separated item ids>.
"""
company_user_required()
playlist = db.session.get(Playlist, playlist_id)
if not playlist or playlist.company_id != current_user.company_id:
abort(404)
# Accept both form and JSON payloads.
order = (request.form.get("order") or "").strip()
if not order and request.is_json:
order = ((request.get_json(silent=True) or {}).get("order") or "").strip()
if not order:
abort(400)
try:
ids = [int(x) for x in order.split(",") if x.strip()]
except ValueError:
abort(400)
# Ensure ids belong to this playlist
existing = PlaylistItem.query.filter(PlaylistItem.playlist_id == playlist_id, PlaylistItem.id.in_(ids)).all()
existing_ids = {i.id for i in existing}
if len(existing_ids) != len(ids):
abort(400)
# Re-number positions starting at 1
id_to_item = {i.id: i for i in existing}
for pos, item_id in enumerate(ids, start=1):
id_to_item[item_id].position = pos
db.session.commit()
# Client currently doesn't require JSON, but returning JSON if requested
# helps debugging and future enhancements.
wants_json = (
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
or ("application/json" in (request.headers.get("Accept") or ""))
or request.is_json
)
if wants_json:
return jsonify({"ok": True})
return ("", 204)
@bp.post("/playlists/<int:playlist_id>/items")
@login_required
def add_playlist_item(playlist_id: int):
company_user_required()
playlist = db.session.get(Playlist, playlist_id)
if not playlist or playlist.company_id != current_user.company_id:
abort(404)
# Support AJAX/modal usage: return JSON when requested.
wants_json = (
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
or ("application/json" in (request.headers.get("Accept") or ""))
or (request.form.get("response") == "json")
)
def _json_error(message: str, status: int = 400):
return jsonify({"ok": False, "error": message}), status
item_type = (request.form.get("item_type") or "").strip().lower()
title = request.form.get("title", "").strip() or None
# Duration is only used for image/webpage. Video/YouTube plays until ended.
raw_duration = request.form.get("duration_seconds")
try:
duration = int(raw_duration) if raw_duration is not None else 10
except (TypeError, ValueError):
duration = 10
max_pos = (
db.session.query(db.func.max(PlaylistItem.position)).filter_by(playlist_id=playlist_id).scalar() or 0
)
pos = max_pos + 1
item = PlaylistItem(
playlist=playlist,
item_type=item_type,
title=title,
duration_seconds=max(1, duration),
position=pos,
)
# Enforce storage quota for uploads (image/video).
# Webpage/YouTube do not consume local storage.
# Note: querying the DB triggers an autoflush by default. Because `item` is not yet in the
# session, SQLAlchemy may emit warnings about relationship operations. We explicitly avoid
# autoflush while checking quota.
with db.session.no_autoflush:
company = db.session.get(Company, current_user.company_id)
if not company:
abort(404)
upload_root = current_app.config["UPLOAD_FOLDER"]
used_bytes = get_company_upload_bytes(upload_root, company.id)
usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes)
storage_max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None
if item_type in ("image", "video"):
if usage.get("is_exceeded"):
msg = _storage_limit_error_message(storage_max_human=storage_max_human)
if wants_json:
return _json_error(msg, 403)
flash(msg, "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
f = request.files.get("file")
if not f or not f.filename:
if wants_json:
return _json_error("File required")
flash("File required", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
filename = secure_filename(f.filename)
ext = os.path.splitext(filename)[1].lower()
if item_type == "image":
crop_mode = (request.form.get("crop_mode") or "16:9").strip().lower()
if ext not in ALLOWED_IMAGE_EXTENSIONS:
if wants_json:
return _json_error(
"Unsupported image type. Please upload one of: " + ", ".join(sorted(ALLOWED_IMAGE_EXTENSIONS))
)
flash("Unsupported image type", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
try:
item.file_path = _save_compressed_image(
f,
current_app.config["UPLOAD_FOLDER"],
current_user.company_id,
crop_mode=crop_mode,
)
# Post-save quota check for images as well.
# (We can't reliably estimate image size before compression.)
if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0:
try:
used_after = get_company_upload_bytes(upload_root, company.id)
except Exception:
used_after = None
if used_after is not None:
usage_after = compute_storage_usage(
used_bytes=used_after,
max_bytes=company.storage_max_bytes,
)
if usage_after.get("is_exceeded"):
# Remove the saved file and reject.
try:
_try_delete_upload(item.file_path, upload_root)
except Exception:
pass
msg = _storage_limit_error_message(storage_max_human=storage_max_human)
if wants_json:
return _json_error(msg, 403)
flash(msg, "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
except Exception:
if wants_json:
return _json_error("Failed to process image upload", 500)
flash("Failed to process image upload", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
else:
if ext not in ALLOWED_VIDEO_EXTENSIONS:
if wants_json:
return _json_error(
"Unsupported video type. Please upload one of: " + ", ".join(sorted(ALLOWED_VIDEO_EXTENSIONS))
)
flash("Unsupported video type", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
# Enforce video size limit (250MB) with a clear error message.
# This is separate from Flask's MAX_CONTENT_LENGTH, which caps the full request.
size = None
try:
size = getattr(f, "content_length", None)
# Werkzeug may report 0 for unknown per-part length.
if (size is None or size <= 0) and hasattr(f, "stream"):
# Measure by seeking in the file-like stream.
pos = f.stream.tell()
f.stream.seek(0, os.SEEK_END)
size = f.stream.tell()
f.stream.seek(pos, os.SEEK_SET)
except Exception:
size = None
if size is not None and size > MAX_VIDEO_BYTES:
msg = "Video file too large. Maximum allowed size is 250MB."
if wants_json:
return _json_error(msg, 413)
flash(msg, "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
# Keep as-is but always rename to a UUID.
unique = uuid.uuid4().hex + ext
company_dir = ensure_company_upload_dir(current_app.config["UPLOAD_FOLDER"], current_user.company_id)
save_path = os.path.join(company_dir, unique)
f.save(save_path)
# Post-save quota check: clients may not report size reliably.
# If quota is exceeded after saving, delete file and reject.
if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0:
try:
used_after = get_company_upload_bytes(upload_root, company.id)
except Exception:
used_after = None
if used_after is not None:
usage_after = compute_storage_usage(
used_bytes=used_after,
max_bytes=company.storage_max_bytes,
)
if usage_after.get("is_exceeded"):
try:
os.remove(save_path)
except OSError:
pass
msg = _storage_limit_error_message(storage_max_human=storage_max_human)
if wants_json:
return _json_error(msg, 403)
flash(msg, "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
# Safety check: validate using the actual saved file size.
# (Some clients/framework layers don't reliably report per-part size.)
try:
saved_size = os.path.getsize(save_path)
except OSError:
saved_size = None
if saved_size is not None and saved_size > MAX_VIDEO_BYTES:
try:
os.remove(save_path)
except OSError:
pass
msg = "Video file too large. Maximum allowed size is 250MB."
if wants_json:
return _json_error(msg, 413)
flash(msg, "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
item.file_path = f"uploads/{int(current_user.company_id)}/{unique}"
elif item_type == "webpage":
url = request.form.get("url", "").strip()
if not url:
if wants_json:
return _json_error("URL required")
flash("URL required", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
item.url = url
elif item_type == "youtube":
raw = request.form.get("url", "").strip()
if not raw:
if wants_json:
return _json_error("YouTube URL required")
flash("YouTube URL required", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
embed_url = _normalize_youtube_embed_url(raw)
if not embed_url:
if wants_json:
return _json_error("Invalid YouTube URL")
flash("Invalid YouTube URL", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
item.url = embed_url
else:
if wants_json:
return _json_error("Invalid item type")
flash("Invalid item type", "danger")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
db.session.add(item)
db.session.commit()
if wants_json:
return jsonify(
{
"ok": True,
"item": {
"id": item.id,
"playlist_id": item.playlist_id,
"position": item.position,
"item_type": item.item_type,
"title": item.title,
"file_path": item.file_path,
"url": item.url,
"duration_seconds": item.duration_seconds,
},
}
)
flash("Item added", "success")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
@bp.post("/items/<int:item_id>/delete")
@login_required
def delete_item(item_id: int):
company_user_required()
item = db.session.get(PlaylistItem, item_id)
if not item or item.playlist.company_id != current_user.company_id:
abort(404)
playlist_id = item.playlist_id
if item.item_type in ("image", "video"):
_try_delete_upload(item.file_path, current_app.config["UPLOAD_FOLDER"])
db.session.delete(item)
db.session.commit()
flash("Item deleted", "success")
return redirect(url_for("company.playlist_detail", playlist_id=playlist_id))
@bp.post("/items/<int:item_id>/duration")
@login_required
def update_item_duration(item_id: int):
"""Update duration_seconds for a playlist item.
Used from the playlist overview (inline edit).
"""
company_user_required()
item = db.session.get(PlaylistItem, item_id)
if not item or item.playlist.company_id != current_user.company_id:
abort(404)
# Duration only applies to images/webpages; videos play until ended.
if item.item_type == "video":
return jsonify({"ok": False, "error": "Duration cannot be set for video items"}), 400
wants_json = (
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
or ("application/json" in (request.headers.get("Accept") or ""))
or request.is_json
)
def _json_error(message: str, status: int = 400):
return jsonify({"ok": False, "error": message}), status
raw = request.form.get("duration_seconds")
if raw is None and request.is_json:
raw = (request.get_json(silent=True) or {}).get("duration_seconds")
try:
duration = int(raw)
except (TypeError, ValueError):
if wants_json:
return _json_error("Invalid duration")
abort(400)
item.duration_seconds = max(1, duration)
db.session.commit()
if wants_json:
return jsonify({"ok": True, "duration_seconds": item.duration_seconds})
return ("", 204)
@bp.post("/displays/<int:display_id>/assign")
@login_required
def assign_playlist(display_id: int):
company_user_required()
display = db.session.get(Display, display_id)
if not display or display.company_id != current_user.company_id:
abort(404)
playlist_id = request.form.get("playlist_id")
if not playlist_id:
display.assigned_playlist_id = None
else:
playlist = db.session.get(Playlist, int(playlist_id))
if not playlist or playlist.company_id != current_user.company_id:
abort(400)
display.assigned_playlist_id = playlist.id
db.session.commit()
flash("Display assignment updated", "success")
return redirect(url_for("company.dashboard"))
@bp.post("/displays/<int:display_id>")
@login_required
def update_display(display_id: int):
"""Update display metadata.
Supports both form POST (full update) and JSON/AJAX (partial update).
Company users can set a short description per display and assign a playlist.
"""
company_user_required()
display = db.session.get(Display, display_id)
if not display or display.company_id != current_user.company_id:
abort(404)
wants_json = (
(request.headers.get("X-Requested-With") == "XMLHttpRequest")
or ("application/json" in (request.headers.get("Accept") or ""))
or request.is_json
)
def _json_error(message: str, status: int = 400):
return jsonify({"ok": False, "error": message}), status
def _normalize_transition(val: str | None) -> str | None:
v = (val or "").strip().lower()
if not v:
return None
if v not in {"none", "fade", "slide"}:
return None
return v
def _normalize_css_color(val: str | None) -> str | None:
"""Accept a limited set of CSS color inputs (primarily hex + a few keywords).
This is used to avoid storing arbitrary CSS strings while still being user friendly.
"""
v = (val or "").strip()
if not v:
return None
low = v.lower()
if low in {"white", "black", "red", "green", "blue", "yellow", "orange", "purple", "gray", "grey"}:
return low
# Hex colors: #RGB, #RRGGBB, #RRGGBBAA
if low.startswith("#"):
h = low[1:]
if len(h) in {3, 6, 8} and all(c in "0123456789abcdef" for c in h):
return "#" + h
return None
def _normalize_percent(val) -> int | None:
if val in (None, ""):
return None
try:
n = int(val)
except (TypeError, ValueError):
return None
return min(100, max(0, n))
def _normalize_speed(val) -> int | None:
if val in (None, ""):
return None
try:
n = int(val)
except (TypeError, ValueError):
return None
return min(100, max(1, n))
def _normalize_font_family(val: str | None) -> str | None:
v = (val or "").strip()
if not v:
return None
# keep it short and avoid quotes/newlines that could be abused in CSS.
v = v.replace("\n", " ").replace("\r", " ").replace('"', "").replace("'", "")
v = " ".join(v.split())
return v[:120] if v else None
def _normalize_font_size_px(val) -> int | None:
if val in (None, ""):
return None
try:
n = int(val)
except (TypeError, ValueError):
return None
# reasonable bounds for signage displays
return min(200, max(10, n))
# Inputs from either form or JSON
payload = request.get_json(silent=True) if request.is_json else None
# Description (short, optional)
if request.is_json:
if payload is None:
return _json_error("Invalid JSON")
if "description" in payload:
desc = (payload.get("description") or "").strip() or None
if desc is not None:
desc = desc[:200]
display.description = desc
else:
# form POST implies full update
desc = (request.form.get("description") or "").strip() or None
if desc is not None:
desc = desc[:200]
display.description = desc
# Slide transition
if request.is_json:
if payload is None:
return _json_error("Invalid JSON")
if "transition" in payload:
display.transition = _normalize_transition(payload.get("transition"))
else:
# Form POST implies full update
display.transition = _normalize_transition(request.form.get("transition"))
# Overlay toggle
if request.is_json:
if payload is None:
return _json_error("Invalid JSON")
if "show_overlay" in payload:
raw = payload.get("show_overlay")
# Accept common truthy representations.
if isinstance(raw, bool):
display.show_overlay = raw
elif raw in (1, 0):
display.show_overlay = bool(raw)
else:
s = ("" if raw is None else str(raw)).strip().lower()
display.show_overlay = s in {"1", "true", "yes", "on"}
else:
# Form POST implies full update
raw = request.form.get("show_overlay")
if raw is not None:
display.show_overlay = (raw or "").strip().lower() in {"1", "true", "yes", "on"}
# Ticker tape settings
if request.is_json:
if payload is None:
return _json_error("Invalid JSON")
if "ticker_enabled" in payload:
raw = payload.get("ticker_enabled")
if isinstance(raw, bool):
display.ticker_enabled = raw
elif raw in (1, 0):
display.ticker_enabled = bool(raw)
else:
s = ("" if raw is None else str(raw)).strip().lower()
display.ticker_enabled = s in {"1", "true", "yes", "on"}
if "ticker_rss_url" in payload:
u = (payload.get("ticker_rss_url") or "").strip() or None
# Keep within column limit and avoid whitespace-only.
if u is not None:
u = u[:1000]
display.ticker_rss_url = u
if "ticker_color" in payload:
display.ticker_color = _normalize_css_color(payload.get("ticker_color"))
if "ticker_bg_color" in payload:
display.ticker_bg_color = _normalize_css_color(payload.get("ticker_bg_color"))
if "ticker_bg_opacity" in payload:
display.ticker_bg_opacity = _normalize_percent(payload.get("ticker_bg_opacity"))
if "ticker_font_family" in payload:
display.ticker_font_family = _normalize_font_family(payload.get("ticker_font_family"))
if "ticker_font_size_px" in payload:
display.ticker_font_size_px = _normalize_font_size_px(payload.get("ticker_font_size_px"))
if "ticker_speed" in payload:
display.ticker_speed = _normalize_speed(payload.get("ticker_speed"))
else:
# Form POST implies full update
raw = request.form.get("ticker_enabled")
if raw is not None:
display.ticker_enabled = (raw or "").strip().lower() in {"1", "true", "yes", "on"}
if "ticker_rss_url" in request.form:
u = (request.form.get("ticker_rss_url") or "").strip() or None
if u is not None:
u = u[:1000]
display.ticker_rss_url = u
if "ticker_color" in request.form:
display.ticker_color = _normalize_css_color(request.form.get("ticker_color"))
if "ticker_bg_color" in request.form:
display.ticker_bg_color = _normalize_css_color(request.form.get("ticker_bg_color"))
if "ticker_bg_opacity" in request.form:
display.ticker_bg_opacity = _normalize_percent(request.form.get("ticker_bg_opacity"))
if "ticker_font_family" in request.form:
display.ticker_font_family = _normalize_font_family(request.form.get("ticker_font_family"))
if "ticker_font_size_px" in request.form:
display.ticker_font_size_px = _normalize_font_size_px(request.form.get("ticker_font_size_px"))
if "ticker_speed" in request.form:
display.ticker_speed = _normalize_speed(request.form.get("ticker_speed"))
# Playlist assignment
if request.is_json:
if "playlist_id" in payload:
playlist_id_val = payload.get("playlist_id")
if playlist_id_val in (None, ""):
display.assigned_playlist_id = None
else:
try:
playlist_id_int = int(playlist_id_val)
except (TypeError, ValueError):
return _json_error("Invalid playlist_id")
playlist = db.session.get(Playlist, playlist_id_int)
if not playlist or playlist.company_id != current_user.company_id:
return _json_error("Invalid playlist")
display.assigned_playlist_id = playlist.id
else:
playlist_id = (request.form.get("playlist_id") or "").strip()
if not playlist_id:
display.assigned_playlist_id = None
else:
try:
playlist_id_int = int(playlist_id)
except ValueError:
abort(400)
playlist = db.session.get(Playlist, playlist_id_int)
if not playlist or playlist.company_id != current_user.company_id:
abort(400)
display.assigned_playlist_id = playlist.id
db.session.commit()
if wants_json:
return jsonify(
{
"ok": True,
"display": {
"id": display.id,
"name": display.name,
"description": display.description,
"transition": display.transition,
"show_overlay": bool(display.show_overlay),
"ticker_enabled": bool(display.ticker_enabled),
"ticker_rss_url": display.ticker_rss_url,
"ticker_color": display.ticker_color,
"ticker_bg_color": display.ticker_bg_color,
"ticker_bg_opacity": display.ticker_bg_opacity,
"ticker_font_family": display.ticker_font_family,
"ticker_font_size_px": display.ticker_font_size_px,
"ticker_speed": display.ticker_speed,
"assigned_playlist_id": display.assigned_playlist_id,
},
}
)
flash("Display updated", "success")
return redirect(url_for("company.dashboard"))
@bp.post("/displays/<int:display_id>/playlists")
@login_required
def update_display_playlists(display_id: int):
"""Set active playlists for a display.
Expects JSON: { playlist_ids: [1,2,3] }
Returns JSON with the updated assigned playlist ids.
Note: if playlist_ids is empty, the display will have no active playlists.
For backwards compatibility, this does NOT modify Display.assigned_playlist_id.
"""
company_user_required()
display = db.session.get(Display, display_id)
if not display or display.company_id != current_user.company_id:
abort(404)
if not request.is_json:
abort(400)
payload = request.get_json(silent=True) or {}
raw_ids = payload.get("playlist_ids")
if raw_ids is None:
return jsonify({"ok": False, "error": "playlist_ids is required"}), 400
if not isinstance(raw_ids, list):
return jsonify({"ok": False, "error": "playlist_ids must be a list"}), 400
playlist_ids: list[int] = []
try:
for x in raw_ids:
if x in (None, ""):
continue
playlist_ids.append(int(x))
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "Invalid playlist id"}), 400
# Ensure playlists belong to this company.
if playlist_ids:
allowed = {
p.id
for p in Playlist.query.filter(
Playlist.company_id == current_user.company_id,
Playlist.id.in_(playlist_ids),
).all()
}
if len(allowed) != len(set(playlist_ids)):
return jsonify({"ok": False, "error": "One or more playlists are invalid"}), 400
# Replace mapping rows.
DisplayPlaylist.query.filter_by(display_id=display.id).delete(synchronize_session=False)
now = datetime.utcnow()
for pos, pid in enumerate(dict.fromkeys(playlist_ids), start=1):
db.session.add(
DisplayPlaylist(
display_id=display.id,
playlist_id=pid,
position=pos,
created_at=now,
)
)
db.session.commit()
active_ids = [
r[0]
for r in db.session.query(DisplayPlaylist.playlist_id)
.filter(DisplayPlaylist.display_id == display.id)
.order_by(DisplayPlaylist.position.asc(), DisplayPlaylist.playlist_id.asc())
.all()
]
return jsonify(
{
"ok": True,
"display": {
"id": display.id,
"active_playlist_ids": active_ids,
},
}
)