import os import uuid from urllib.parse import urlparse, parse_qs from datetime import datetime, timedelta from flask import Blueprint, abort, current_app, flash, jsonify, redirect, render_template, request, url_for from flask_login import current_user, login_required from werkzeug.utils import secure_filename from PIL import Image, ImageOps from ..extensions import db from ..uploads import ( abs_upload_path, compute_storage_usage, ensure_company_upload_dir, get_company_upload_bytes, is_valid_upload_relpath, ) from ..models import AppSettings, Company, Display, DisplayPlaylist, DisplaySession, Playlist, PlaylistItem, User from ..email_utils import send_email from ..auth_tokens import make_password_reset_token ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".tiff"} ALLOWED_VIDEO_EXTENSIONS = {".mp4", ".webm", ".ogg", ".mov", ".m4v"} # Overlay is a transparent PNG that sits on top of a display. ALLOWED_OVERLAY_EXTENSIONS = {".png"} # Keep overlay size reasonable; it will be stretched to fit anyway. # (PNG overlays are typically small-ish; 10MB is generous.) MAX_OVERLAY_BYTES = 10 * 1024 * 1024 # Videos should have a maximum upload size of 250MB MAX_VIDEO_BYTES = 250 * 1024 * 1024 def _normalize_youtube_embed_url(raw: str) -> str | None: """Normalize a user-provided YouTube URL into a privacy-friendly embed base URL. Returns: https://www.youtube-nocookie.com/embed/ or None if we cannot parse a valid video id. """ val = (raw or "").strip() if not val: return None # Be forgiving for inputs like "youtu.be/". if not val.startswith("http://") and not val.startswith("https://"): val = "https://" + val try: u = urlparse(val) except Exception: return None host = (u.netloc or "").lower() host = host[4:] if host.startswith("www.") else host video_id: str | None = None path = (u.path or "").strip("/") if host in {"youtube.com", "m.youtube.com"}: # /watch?v= if path == "watch": v = (parse_qs(u.query).get("v") or [None])[0] video_id = v # /embed/ elif path.startswith("embed/"): video_id = path.split("/", 1)[1] # /shorts/ elif path.startswith("shorts/"): video_id = path.split("/", 1)[1] elif host == "youtu.be": # / if path: video_id = path.split("/", 1)[0] # Basic validation: YouTube IDs are typically 11 chars (letters/digits/_/-) if not video_id: return None video_id = video_id.strip() if len(video_id) != 11: return None for ch in video_id: if not (ch.isalnum() or ch in {"_", "-"}): return None return f"https://www.youtube-nocookie.com/embed/{video_id}" def _center_crop_to_aspect(img: Image.Image, aspect_w: int, aspect_h: int) -> Image.Image: """Return a center-cropped copy of img to the desired aspect ratio.""" w, h = img.size if w <= 0 or h <= 0: return img target = aspect_w / aspect_h current = w / h # If image is wider than target: crop width; else crop height. if current > target: new_w = max(1, int(h * target)) left = max(0, (w - new_w) // 2) return img.crop((left, 0, left + new_w, h)) else: new_h = max(1, int(w / target)) top = max(0, (h - new_h) // 2) return img.crop((0, top, w, top + new_h)) def _save_compressed_image( uploaded_file, upload_root: str, company_id: int | None, crop_mode: str | None = None, ) -> str: """Save an uploaded image as a compressed WEBP file. crop_mode: - "16:9" : center-crop to landscape - "9:16" : center-crop to portrait - "none" : no crop Returns relative file path under /static (e.g. uploads/.webp) """ unique = f"{uuid.uuid4().hex}.webp" company_dir = ensure_company_upload_dir(upload_root, company_id) save_path = os.path.join(company_dir, unique) cm = (crop_mode or "16:9").strip().lower() if cm not in {"16:9", "9:16", "none"}: cm = "16:9" img = Image.open(uploaded_file) # Respect EXIF orientation (common for phone photos) img = ImageOps.exif_transpose(img) # Normalize mode for webp if img.mode not in ("RGB", "RGBA"): img = img.convert("RGB") # Optional crop if cm == "16:9": img = _center_crop_to_aspect(img, 16, 9) max_box = (1920, 1080) elif cm == "9:16": img = _center_crop_to_aspect(img, 9, 16) max_box = (1080, 1920) else: # No crop: allow both portrait and landscape up to 1920px on the longest side. max_box = (1920, 1920) # Resize down if very large (keeps aspect ratio) img.thumbnail(max_box) img.save(save_path, format="WEBP", quality=80, method=6) company_seg = str(int(company_id)) if company_id is not None else "0" return f"uploads/{company_seg}/{unique}" def _try_delete_upload(file_path: str | None, upload_root: str): """Best-effort delete of an uploaded media file.""" if not file_path: return if not is_valid_upload_relpath(file_path): return abs_path = abs_upload_path(upload_root, file_path) if not abs_path: return try: if os.path.isfile(abs_path): os.remove(abs_path) except Exception: # Ignore cleanup failures pass def _save_overlay_png( uploaded_file, upload_root: str, company_id: int | None, ) -> str: """Save a company overlay as PNG under the company's upload dir. Returns relative file path under /static (uploads//overlay_.png) """ unique = f"overlay_{uuid.uuid4().hex}.png" company_dir = ensure_company_upload_dir(upload_root, company_id) save_path = os.path.join(company_dir, unique) # Validate file is a PNG and is 16:9-ish. # Use magic bytes (signature) instead of relying on Pillow's img.format, # which can be unreliable if the stream position isn't at 0. try: if hasattr(uploaded_file, "stream"): uploaded_file.stream.seek(0) except Exception: pass try: sig = uploaded_file.stream.read(8) if hasattr(uploaded_file, "stream") else uploaded_file.read(8) except Exception: sig = b"" # PNG file signature: 89 50 4E 47 0D 0A 1A 0A if sig != b"\x89PNG\r\n\x1a\n": raise ValueError("not_png") # Rewind before Pillow parses. try: if hasattr(uploaded_file, "stream"): uploaded_file.stream.seek(0) except Exception: pass img = Image.open(uploaded_file) img = ImageOps.exif_transpose(img) w, h = img.size if not w or not h: raise ValueError("invalid") # Allow some tolerance (overlays may include extra transparent padding). aspect = w / h target = 16 / 9 if abs(aspect - target) > 0.15: # ~15% tolerance raise ValueError("not_16_9") # Ensure we preserve alpha; normalize mode. if img.mode not in ("RGBA", "LA"): # Convert to RGBA so transparency is supported consistently. img = img.convert("RGBA") img.save(save_path, format="PNG", optimize=True) company_seg = str(int(company_id)) if company_id is not None else "0" return f"uploads/{company_seg}/{unique}" bp = Blueprint("company", __name__, url_prefix="/company") def _parse_schedule_local_to_utc(*, date_str: str | None, time_str: str | None) -> datetime | None: """Parse local date+time form inputs into a naive UTC datetime. Inputs come from and . We interpret them as *local* time of the server. Note: this project currently does not store per-company timezone; in most deployments server timezone matches users. If you need per-company timezone later, we can extend this function. """ d = (date_str or "").strip() t = (time_str or "").strip() if not d and not t: return None if not d or not t: # Require both parts for clarity raise ValueError("Both date and time are required") # Basic parsing: YYYY-MM-DD and HH:MM try: year, month, day = [int(x) for x in d.split("-")] hh, mm = [int(x) for x in t.split(":")[:2]] except Exception: raise ValueError("Invalid date/time") # Interpret as local time, convert to UTC naive local_dt = datetime(year, month, day, hh, mm) # local_dt.timestamp() uses local timezone when naive. utc_ts = local_dt.timestamp() return datetime.utcfromtimestamp(utc_ts) def company_user_required(): if not current_user.is_authenticated: abort(403) if current_user.is_admin: abort(403) if not current_user.company_id: abort(403) def _format_bytes(num: int) -> str: num = max(0, int(num or 0)) units = ["B", "KB", "MB", "GB", "TB"] size = float(num) idx = 0 while size >= 1024.0 and idx < len(units) - 1: size /= 1024.0 idx += 1 if idx == 0: return f"{int(size)} {units[idx]}" return f"{size:.1f} {units[idx]}" def _storage_limit_error_message(*, storage_max_human: str | None) -> str: if storage_max_human: return f"Storage limit reached. Maximum allowed storage is {storage_max_human}. Please delete items to free space." return "Storage limit reached. Please delete items to free space." @bp.get("/my-company") @login_required def my_company(): company_user_required() company = db.session.get(Company, current_user.company_id) if not company: abort(404) # Stats display_count = Display.query.filter_by(company_id=company.id).count() playlist_count = Playlist.query.filter_by(company_id=company.id).count() user_count = User.query.filter_by(company_id=company.id, is_admin=False).count() item_count = ( PlaylistItem.query.join(Playlist, PlaylistItem.playlist_id == Playlist.id) .filter(Playlist.company_id == company.id) .count() ) # Active display sessions (best-effort, based on same TTL as /api) cutoff = datetime.utcnow() - timedelta(seconds=90) active_sessions = ( DisplaySession.query.join(Display, DisplaySession.display_id == Display.id) .filter(Display.company_id == company.id, DisplaySession.last_seen_at >= cutoff) .count() ) # Storage usage upload_root = current_app.config["UPLOAD_FOLDER"] used_bytes = get_company_upload_bytes(upload_root, company.id) usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes) max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None users = User.query.filter_by(company_id=company.id, is_admin=False).order_by(User.email.asc()).all() overlay_url = None if company.overlay_file_path and is_valid_upload_relpath(company.overlay_file_path): overlay_url = url_for("static", filename=company.overlay_file_path) return render_template( "company/my_company.html", company=company, users=users, overlay_url=overlay_url, stats={ "users": user_count, "displays": display_count, "playlists": playlist_count, "items": item_count, "active_sessions": active_sessions, "storage_bytes": used_bytes, "storage_human": _format_bytes(used_bytes), "storage_max_bytes": usage.get("max_bytes"), "storage_max_human": max_human, "storage_used_percent": usage.get("used_percent"), }, ) @bp.post("/my-company/overlay") @login_required def upload_company_overlay(): """Upload/replace the per-company 16:9 PNG overlay.""" company_user_required() company = db.session.get(Company, current_user.company_id) if not company: abort(404) f = request.files.get("overlay") if not f or not f.filename: flash("Overlay file is required", "danger") return redirect(url_for("company.my_company")) filename = secure_filename(f.filename) ext = os.path.splitext(filename)[1].lower() if ext not in ALLOWED_OVERLAY_EXTENSIONS: flash("Unsupported overlay type. Please upload a PNG file.", "danger") return redirect(url_for("company.my_company")) # Enforce size limit best-effort. size = None try: size = getattr(f, "content_length", None) if (size is None or size <= 0) and hasattr(f, "stream"): pos = f.stream.tell() f.stream.seek(0, os.SEEK_END) size = f.stream.tell() f.stream.seek(pos, os.SEEK_SET) except Exception: size = None if size is not None and size > MAX_OVERLAY_BYTES: flash("Overlay file too large. Maximum allowed size is 10MB.", "danger") return redirect(url_for("company.my_company")) # Enforce storage quota too (overlay is stored in the same uploads folder). upload_root = current_app.config["UPLOAD_FOLDER"] used_bytes = get_company_upload_bytes(upload_root, company.id) usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes) storage_max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None if usage.get("is_exceeded"): flash(_storage_limit_error_message(storage_max_human=storage_max_human), "danger") return redirect(url_for("company.my_company")) old_path = company.overlay_file_path try: new_relpath = _save_overlay_png(f, upload_root, company.id) except ValueError as e: code = str(e) if code == "not_png": flash("Overlay must be a PNG file.", "danger") elif code == "not_16_9": flash("Overlay should be 16:9 (landscape).", "danger") else: flash("Failed to process overlay upload.", "danger") return redirect(url_for("company.my_company")) except Exception: flash("Failed to process overlay upload.", "danger") return redirect(url_for("company.my_company")) # Post-save quota check (like images) because PNG size is unknown until saved. if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0: try: used_after = get_company_upload_bytes(upload_root, company.id) except Exception: used_after = None if used_after is not None: usage_after = compute_storage_usage(used_bytes=used_after, max_bytes=company.storage_max_bytes) if usage_after.get("is_exceeded"): _try_delete_upload(new_relpath, upload_root) flash(_storage_limit_error_message(storage_max_human=storage_max_human), "danger") return redirect(url_for("company.my_company")) company.overlay_file_path = new_relpath db.session.commit() # Clean up the old overlay file. if old_path and old_path != new_relpath: _try_delete_upload(old_path, upload_root) flash("Overlay updated.", "success") return redirect(url_for("company.my_company")) @bp.post("/my-company/overlay/delete") @login_required def delete_company_overlay(): company_user_required() company = db.session.get(Company, current_user.company_id) if not company: abort(404) upload_root = current_app.config["UPLOAD_FOLDER"] old_path = company.overlay_file_path company.overlay_file_path = None db.session.commit() _try_delete_upload(old_path, upload_root) flash("Overlay removed.", "success") return redirect(url_for("company.my_company")) @bp.post("/my-company/invite") @login_required def invite_user(): company_user_required() email = (request.form.get("email", "") or "").strip().lower() if not email: flash("Email is required", "danger") return redirect(url_for("company.my_company")) if User.query.filter_by(email=email).first(): flash("Email already exists", "danger") return redirect(url_for("company.my_company")) company = db.session.get(Company, current_user.company_id) if not company: abort(404) # Create user without password; they must set it via reset link. u = User(is_admin=False, company=company) u.email = email u.username = email # keep backwards-compatible username column in sync u.password_hash = None db.session.add(u) db.session.commit() token = make_password_reset_token(secret_key=current_app.config["SECRET_KEY"], user_id=u.id) settings = db.session.get(AppSettings, 1) if settings and settings.public_domain: path = url_for("auth.reset_password", token=token, _external=False) reset_url = f"https://{settings.public_domain}{path}" else: reset_url = url_for("auth.reset_password", token=token, _external=True) body = ( f"You have been invited to {company.name} on Signage.\n\n" "Set your password using this link (valid for 30 minutes):\n" f"{reset_url}\n" ) try: send_email(to_email=u.email, subject=f"Invite: {company.name} (set your password)", body_text=body) except Exception: # Roll back created user if we cannot send invite email, to avoid orphan accounts. db.session.delete(u) db.session.commit() flash( "Failed to send invite email. Please check SMTP configuration (SMTP_* env vars).", "danger", ) return redirect(url_for("company.my_company")) flash(f"Invite sent to {email}", "success") return redirect(url_for("company.my_company")) @bp.post("/my-company/users//delete") @login_required def delete_company_user(user_id: int): company_user_required() if int(user_id) == int(current_user.id): flash("You cannot delete yourself", "danger") return redirect(url_for("company.my_company")) u = db.session.get(User, user_id) if not u or u.is_admin or u.company_id != current_user.company_id: abort(404) email = u.email db.session.delete(u) db.session.commit() flash(f"User '{email}' deleted", "success") return redirect(url_for("company.my_company")) @bp.get("/") @login_required def dashboard(): company_user_required() playlists = Playlist.query.filter_by(company_id=current_user.company_id).order_by(Playlist.name.asc()).all() displays = Display.query.filter_by(company_id=current_user.company_id).order_by(Display.name.asc()).all() playlists_json = [{"id": p.id, "name": p.name} for p in playlists] return render_template( "company/dashboard.html", playlists=playlists, now_utc=datetime.utcnow(), playlists_json=playlists_json, displays=displays, ) @bp.post("/playlists") @login_required def create_playlist(): company_user_required() name = request.form.get("name", "").strip() if not name: flash("Playlist name required", "danger") return redirect(url_for("company.dashboard")) p = Playlist(company_id=current_user.company_id, name=name) db.session.add(p) db.session.commit() flash("Playlist created", "success") return redirect(url_for("company.playlist_detail", playlist_id=p.id)) @bp.get("/playlists/") @login_required def playlist_detail(playlist_id: int): company_user_required() playlist = db.session.get(Playlist, playlist_id) if not playlist or playlist.company_id != current_user.company_id: abort(404) return render_template("company/playlist_detail.html", playlist=playlist, now_utc=datetime.utcnow()) @bp.post("/playlists/") @login_required def update_playlist(playlist_id: int): """Update playlist metadata. Currently supports renaming the playlist from the playlist detail (edit) page. """ company_user_required() playlist = db.session.get(Playlist, playlist_id) if not playlist or playlist.company_id != current_user.company_id: abort(404) name = (request.form.get("name") or "").strip() if not name: flash("Playlist name required", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) # Keep within DB column limit (String(120)) if len(name) > 120: name = name[:120] playlist.name = name db.session.commit() flash("Playlist renamed", "success") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) @bp.post("/playlists//schedule") @login_required def update_playlist_schedule(playlist_id: int): """Update playlist schedule window + priority flag.""" company_user_required() playlist = db.session.get(Playlist, playlist_id) if not playlist or playlist.company_id != current_user.company_id: abort(404) try: start = _parse_schedule_local_to_utc( date_str=request.form.get("schedule_start_date"), time_str=request.form.get("schedule_start_time"), ) end = _parse_schedule_local_to_utc( date_str=request.form.get("schedule_end_date"), time_str=request.form.get("schedule_end_time"), ) except ValueError as e: flash(str(e), "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) if start and end and end < start: flash("End must be after start", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) playlist.schedule_start = start playlist.schedule_end = end db.session.commit() flash("Schedule updated", "success") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) @bp.post("/playlists//schedule/delete") @login_required def clear_playlist_schedule(playlist_id: int): """Clear schedule for a playlist (sets start/end to NULL).""" company_user_required() playlist = db.session.get(Playlist, playlist_id) if not playlist or playlist.company_id != current_user.company_id: abort(404) playlist.schedule_start = None playlist.schedule_end = None db.session.commit() flash("Schedule removed", "success") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) @bp.post("/playlists//priority") @login_required def update_playlist_priority(playlist_id: int): """Update playlist priority flag.""" company_user_required() playlist = db.session.get(Playlist, playlist_id) if not playlist or playlist.company_id != current_user.company_id: abort(404) wants_json = ( (request.headers.get("X-Requested-With") == "XMLHttpRequest") or ("application/json" in (request.headers.get("Accept") or "")) or request.is_json ) # Accept both form and JSON payloads. raw = request.form.get("is_priority") if raw is None and request.is_json: raw = (request.get_json(silent=True) or {}).get("is_priority") playlist.is_priority = bool((raw or "").strip()) db.session.commit() if wants_json: return jsonify({"ok": True, "is_priority": bool(playlist.is_priority)}) flash("Priority updated", "success") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) @bp.post("/playlists//delete") @login_required def delete_playlist(playlist_id: int): company_user_required() playlist = db.session.get(Playlist, playlist_id) if not playlist or playlist.company_id != current_user.company_id: abort(404) # Unassign from any displays in this company Display.query.filter_by(company_id=current_user.company_id, assigned_playlist_id=playlist.id).update( {"assigned_playlist_id": None} ) # Remove from any display multi-playlist mappings in this company. # Use a subquery to avoid a JOIN-based DELETE which is not supported on SQLite. display_ids = [d.id for d in Display.query.filter_by(company_id=current_user.company_id).all()] if display_ids: DisplayPlaylist.query.filter( DisplayPlaylist.display_id.in_(display_ids), DisplayPlaylist.playlist_id == playlist.id, ).delete(synchronize_session=False) # cleanup uploaded files for image/video items for it in list(playlist.items): if it.item_type in ("image", "video"): _try_delete_upload(it.file_path, current_app.config["UPLOAD_FOLDER"]) db.session.delete(playlist) db.session.commit() flash("Playlist deleted", "success") return redirect(url_for("company.dashboard")) @bp.post("/playlists//items/reorder") @login_required def reorder_playlist_items(playlist_id: int): """Persist new ordering for playlist items. Expects form data: order=. """ company_user_required() playlist = db.session.get(Playlist, playlist_id) if not playlist or playlist.company_id != current_user.company_id: abort(404) # Accept both form and JSON payloads. order = (request.form.get("order") or "").strip() if not order and request.is_json: order = ((request.get_json(silent=True) or {}).get("order") or "").strip() if not order: abort(400) try: ids = [int(x) for x in order.split(",") if x.strip()] except ValueError: abort(400) # Ensure ids belong to this playlist existing = PlaylistItem.query.filter(PlaylistItem.playlist_id == playlist_id, PlaylistItem.id.in_(ids)).all() existing_ids = {i.id for i in existing} if len(existing_ids) != len(ids): abort(400) # Re-number positions starting at 1 id_to_item = {i.id: i for i in existing} for pos, item_id in enumerate(ids, start=1): id_to_item[item_id].position = pos db.session.commit() # Client currently doesn't require JSON, but returning JSON if requested # helps debugging and future enhancements. wants_json = ( (request.headers.get("X-Requested-With") == "XMLHttpRequest") or ("application/json" in (request.headers.get("Accept") or "")) or request.is_json ) if wants_json: return jsonify({"ok": True}) return ("", 204) @bp.post("/playlists//items") @login_required def add_playlist_item(playlist_id: int): company_user_required() playlist = db.session.get(Playlist, playlist_id) if not playlist or playlist.company_id != current_user.company_id: abort(404) # Support AJAX/modal usage: return JSON when requested. wants_json = ( (request.headers.get("X-Requested-With") == "XMLHttpRequest") or ("application/json" in (request.headers.get("Accept") or "")) or (request.form.get("response") == "json") ) def _json_error(message: str, status: int = 400): return jsonify({"ok": False, "error": message}), status item_type = (request.form.get("item_type") or "").strip().lower() title = request.form.get("title", "").strip() or None # Duration is only used for image/webpage. Video/YouTube plays until ended. raw_duration = request.form.get("duration_seconds") try: duration = int(raw_duration) if raw_duration is not None else 10 except (TypeError, ValueError): duration = 10 max_pos = ( db.session.query(db.func.max(PlaylistItem.position)).filter_by(playlist_id=playlist_id).scalar() or 0 ) pos = max_pos + 1 item = PlaylistItem( playlist=playlist, item_type=item_type, title=title, duration_seconds=max(1, duration), position=pos, ) # Enforce storage quota for uploads (image/video). # Webpage/YouTube do not consume local storage. # Note: querying the DB triggers an autoflush by default. Because `item` is not yet in the # session, SQLAlchemy may emit warnings about relationship operations. We explicitly avoid # autoflush while checking quota. with db.session.no_autoflush: company = db.session.get(Company, current_user.company_id) if not company: abort(404) upload_root = current_app.config["UPLOAD_FOLDER"] used_bytes = get_company_upload_bytes(upload_root, company.id) usage = compute_storage_usage(used_bytes=used_bytes, max_bytes=company.storage_max_bytes) storage_max_human = _format_bytes(usage["max_bytes"]) if usage.get("max_bytes") else None if item_type in ("image", "video"): if usage.get("is_exceeded"): msg = _storage_limit_error_message(storage_max_human=storage_max_human) if wants_json: return _json_error(msg, 403) flash(msg, "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) f = request.files.get("file") if not f or not f.filename: if wants_json: return _json_error("File required") flash("File required", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) filename = secure_filename(f.filename) ext = os.path.splitext(filename)[1].lower() if item_type == "image": crop_mode = (request.form.get("crop_mode") or "16:9").strip().lower() if ext not in ALLOWED_IMAGE_EXTENSIONS: if wants_json: return _json_error( "Unsupported image type. Please upload one of: " + ", ".join(sorted(ALLOWED_IMAGE_EXTENSIONS)) ) flash("Unsupported image type", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) try: item.file_path = _save_compressed_image( f, current_app.config["UPLOAD_FOLDER"], current_user.company_id, crop_mode=crop_mode, ) # Post-save quota check for images as well. # (We can't reliably estimate image size before compression.) if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0: try: used_after = get_company_upload_bytes(upload_root, company.id) except Exception: used_after = None if used_after is not None: usage_after = compute_storage_usage( used_bytes=used_after, max_bytes=company.storage_max_bytes, ) if usage_after.get("is_exceeded"): # Remove the saved file and reject. try: _try_delete_upload(item.file_path, upload_root) except Exception: pass msg = _storage_limit_error_message(storage_max_human=storage_max_human) if wants_json: return _json_error(msg, 403) flash(msg, "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) except Exception: if wants_json: return _json_error("Failed to process image upload", 500) flash("Failed to process image upload", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) else: if ext not in ALLOWED_VIDEO_EXTENSIONS: if wants_json: return _json_error( "Unsupported video type. Please upload one of: " + ", ".join(sorted(ALLOWED_VIDEO_EXTENSIONS)) ) flash("Unsupported video type", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) # Enforce video size limit (250MB) with a clear error message. # This is separate from Flask's MAX_CONTENT_LENGTH, which caps the full request. size = None try: size = getattr(f, "content_length", None) # Werkzeug may report 0 for unknown per-part length. if (size is None or size <= 0) and hasattr(f, "stream"): # Measure by seeking in the file-like stream. pos = f.stream.tell() f.stream.seek(0, os.SEEK_END) size = f.stream.tell() f.stream.seek(pos, os.SEEK_SET) except Exception: size = None if size is not None and size > MAX_VIDEO_BYTES: msg = "Video file too large. Maximum allowed size is 250MB." if wants_json: return _json_error(msg, 413) flash(msg, "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) # Keep as-is but always rename to a UUID. unique = uuid.uuid4().hex + ext company_dir = ensure_company_upload_dir(current_app.config["UPLOAD_FOLDER"], current_user.company_id) save_path = os.path.join(company_dir, unique) f.save(save_path) # Post-save quota check: clients may not report size reliably. # If quota is exceeded after saving, delete file and reject. if company.storage_max_bytes is not None and int(company.storage_max_bytes or 0) > 0: try: used_after = get_company_upload_bytes(upload_root, company.id) except Exception: used_after = None if used_after is not None: usage_after = compute_storage_usage( used_bytes=used_after, max_bytes=company.storage_max_bytes, ) if usage_after.get("is_exceeded"): try: os.remove(save_path) except OSError: pass msg = _storage_limit_error_message(storage_max_human=storage_max_human) if wants_json: return _json_error(msg, 403) flash(msg, "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) # Safety check: validate using the actual saved file size. # (Some clients/framework layers don't reliably report per-part size.) try: saved_size = os.path.getsize(save_path) except OSError: saved_size = None if saved_size is not None and saved_size > MAX_VIDEO_BYTES: try: os.remove(save_path) except OSError: pass msg = "Video file too large. Maximum allowed size is 250MB." if wants_json: return _json_error(msg, 413) flash(msg, "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) item.file_path = f"uploads/{int(current_user.company_id)}/{unique}" elif item_type == "webpage": url = request.form.get("url", "").strip() if not url: if wants_json: return _json_error("URL required") flash("URL required", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) item.url = url elif item_type == "youtube": raw = request.form.get("url", "").strip() if not raw: if wants_json: return _json_error("YouTube URL required") flash("YouTube URL required", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) embed_url = _normalize_youtube_embed_url(raw) if not embed_url: if wants_json: return _json_error("Invalid YouTube URL") flash("Invalid YouTube URL", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) item.url = embed_url else: if wants_json: return _json_error("Invalid item type") flash("Invalid item type", "danger") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) db.session.add(item) db.session.commit() if wants_json: return jsonify( { "ok": True, "item": { "id": item.id, "playlist_id": item.playlist_id, "position": item.position, "item_type": item.item_type, "title": item.title, "file_path": item.file_path, "url": item.url, "duration_seconds": item.duration_seconds, }, } ) flash("Item added", "success") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) @bp.post("/items//delete") @login_required def delete_item(item_id: int): company_user_required() item = db.session.get(PlaylistItem, item_id) if not item or item.playlist.company_id != current_user.company_id: abort(404) playlist_id = item.playlist_id if item.item_type in ("image", "video"): _try_delete_upload(item.file_path, current_app.config["UPLOAD_FOLDER"]) db.session.delete(item) db.session.commit() flash("Item deleted", "success") return redirect(url_for("company.playlist_detail", playlist_id=playlist_id)) @bp.post("/items//duration") @login_required def update_item_duration(item_id: int): """Update duration_seconds for a playlist item. Used from the playlist overview (inline edit). """ company_user_required() item = db.session.get(PlaylistItem, item_id) if not item or item.playlist.company_id != current_user.company_id: abort(404) # Duration only applies to images/webpages; videos play until ended. if item.item_type == "video": return jsonify({"ok": False, "error": "Duration cannot be set for video items"}), 400 wants_json = ( (request.headers.get("X-Requested-With") == "XMLHttpRequest") or ("application/json" in (request.headers.get("Accept") or "")) or request.is_json ) def _json_error(message: str, status: int = 400): return jsonify({"ok": False, "error": message}), status raw = request.form.get("duration_seconds") if raw is None and request.is_json: raw = (request.get_json(silent=True) or {}).get("duration_seconds") try: duration = int(raw) except (TypeError, ValueError): if wants_json: return _json_error("Invalid duration") abort(400) item.duration_seconds = max(1, duration) db.session.commit() if wants_json: return jsonify({"ok": True, "duration_seconds": item.duration_seconds}) return ("", 204) @bp.post("/displays//assign") @login_required def assign_playlist(display_id: int): company_user_required() display = db.session.get(Display, display_id) if not display or display.company_id != current_user.company_id: abort(404) playlist_id = request.form.get("playlist_id") if not playlist_id: display.assigned_playlist_id = None else: playlist = db.session.get(Playlist, int(playlist_id)) if not playlist or playlist.company_id != current_user.company_id: abort(400) display.assigned_playlist_id = playlist.id db.session.commit() flash("Display assignment updated", "success") return redirect(url_for("company.dashboard")) @bp.post("/displays/") @login_required def update_display(display_id: int): """Update display metadata. Supports both form POST (full update) and JSON/AJAX (partial update). Company users can set a short description per display and assign a playlist. """ company_user_required() display = db.session.get(Display, display_id) if not display or display.company_id != current_user.company_id: abort(404) wants_json = ( (request.headers.get("X-Requested-With") == "XMLHttpRequest") or ("application/json" in (request.headers.get("Accept") or "")) or request.is_json ) def _json_error(message: str, status: int = 400): return jsonify({"ok": False, "error": message}), status def _normalize_transition(val: str | None) -> str | None: v = (val or "").strip().lower() if not v: return None if v not in {"none", "fade", "slide"}: return None return v # Inputs from either form or JSON payload = request.get_json(silent=True) if request.is_json else None # Description (short, optional) if request.is_json: if payload is None: return _json_error("Invalid JSON") if "description" in payload: desc = (payload.get("description") or "").strip() or None if desc is not None: desc = desc[:200] display.description = desc else: # form POST implies full update desc = (request.form.get("description") or "").strip() or None if desc is not None: desc = desc[:200] display.description = desc # Slide transition if request.is_json: if payload is None: return _json_error("Invalid JSON") if "transition" in payload: display.transition = _normalize_transition(payload.get("transition")) else: # Form POST implies full update display.transition = _normalize_transition(request.form.get("transition")) # Overlay toggle if request.is_json: if payload is None: return _json_error("Invalid JSON") if "show_overlay" in payload: raw = payload.get("show_overlay") # Accept common truthy representations. if isinstance(raw, bool): display.show_overlay = raw elif raw in (1, 0): display.show_overlay = bool(raw) else: s = ("" if raw is None else str(raw)).strip().lower() display.show_overlay = s in {"1", "true", "yes", "on"} else: # Form POST implies full update raw = request.form.get("show_overlay") if raw is not None: display.show_overlay = (raw or "").strip().lower() in {"1", "true", "yes", "on"} # Playlist assignment if request.is_json: if "playlist_id" in payload: playlist_id_val = payload.get("playlist_id") if playlist_id_val in (None, ""): display.assigned_playlist_id = None else: try: playlist_id_int = int(playlist_id_val) except (TypeError, ValueError): return _json_error("Invalid playlist_id") playlist = db.session.get(Playlist, playlist_id_int) if not playlist or playlist.company_id != current_user.company_id: return _json_error("Invalid playlist") display.assigned_playlist_id = playlist.id else: playlist_id = (request.form.get("playlist_id") or "").strip() if not playlist_id: display.assigned_playlist_id = None else: try: playlist_id_int = int(playlist_id) except ValueError: abort(400) playlist = db.session.get(Playlist, playlist_id_int) if not playlist or playlist.company_id != current_user.company_id: abort(400) display.assigned_playlist_id = playlist.id db.session.commit() if wants_json: return jsonify( { "ok": True, "display": { "id": display.id, "name": display.name, "description": display.description, "transition": display.transition, "show_overlay": bool(display.show_overlay), "assigned_playlist_id": display.assigned_playlist_id, }, } ) flash("Display updated", "success") return redirect(url_for("company.dashboard")) @bp.post("/displays//playlists") @login_required def update_display_playlists(display_id: int): """Set active playlists for a display. Expects JSON: { playlist_ids: [1,2,3] } Returns JSON with the updated assigned playlist ids. Note: if playlist_ids is empty, the display will have no active playlists. For backwards compatibility, this does NOT modify Display.assigned_playlist_id. """ company_user_required() display = db.session.get(Display, display_id) if not display or display.company_id != current_user.company_id: abort(404) if not request.is_json: abort(400) payload = request.get_json(silent=True) or {} raw_ids = payload.get("playlist_ids") if raw_ids is None: return jsonify({"ok": False, "error": "playlist_ids is required"}), 400 if not isinstance(raw_ids, list): return jsonify({"ok": False, "error": "playlist_ids must be a list"}), 400 playlist_ids: list[int] = [] try: for x in raw_ids: if x in (None, ""): continue playlist_ids.append(int(x)) except (TypeError, ValueError): return jsonify({"ok": False, "error": "Invalid playlist id"}), 400 # Ensure playlists belong to this company. if playlist_ids: allowed = { p.id for p in Playlist.query.filter( Playlist.company_id == current_user.company_id, Playlist.id.in_(playlist_ids), ).all() } if len(allowed) != len(set(playlist_ids)): return jsonify({"ok": False, "error": "One or more playlists are invalid"}), 400 # Replace mapping rows. DisplayPlaylist.query.filter_by(display_id=display.id).delete(synchronize_session=False) now = datetime.utcnow() for pos, pid in enumerate(dict.fromkeys(playlist_ids), start=1): db.session.add( DisplayPlaylist( display_id=display.id, playlist_id=pid, position=pos, created_at=now, ) ) db.session.commit() active_ids = [ r[0] for r in db.session.query(DisplayPlaylist.playlist_id) .filter(DisplayPlaylist.display_id == display.id) .order_by(DisplayPlaylist.position.asc(), DisplayPlaylist.playlist_id.asc()) .all() ] return jsonify( { "ok": True, "display": { "id": display.id, "active_playlist_ids": active_ids, }, } )