import uuid import os from flask import Blueprint, abort, current_app, flash, redirect, render_template, request, session, url_for from flask_login import current_user, login_required, login_user from ..extensions import db from ..uploads import abs_upload_path, ensure_company_upload_dir, get_company_upload_bytes, is_valid_upload_relpath from ..models import AppSettings, Company, Display, DisplaySession, Playlist, PlaylistItem, User from ..email_utils import send_email bp = Blueprint("admin", __name__, url_prefix="/admin") def admin_required(): if not current_user.is_authenticated or not current_user.is_admin: abort(403) def _get_app_settings() -> AppSettings: """Get the singleton-ish AppSettings row, creating it if needed.""" s = db.session.get(AppSettings, 1) if s: return s s = AppSettings(id=1) db.session.add(s) db.session.commit() return s def _try_delete_upload(file_path: str | None, upload_folder: str): """Best-effort delete of an uploaded media file.""" if not file_path: return if not is_valid_upload_relpath(file_path): return abs_path = abs_upload_path(upload_folder, file_path) if not abs_path: return try: if os.path.isfile(abs_path): os.remove(abs_path) except Exception: # Ignore cleanup failures pass @bp.get("/") @login_required def dashboard(): admin_required() companies = Company.query.order_by(Company.name.asc()).all() return render_template("admin/dashboard.html", companies=companies) @bp.get("/settings") @login_required def settings(): admin_required() settings = _get_app_settings() admins = User.query.filter_by(is_admin=True).order_by(User.email.asc()).all() return render_template("admin/settings.html", settings=settings, admins=admins) @bp.post("/settings/smtp") @login_required def update_smtp_settings(): admin_required() s = _get_app_settings() smtp_host = (request.form.get("smtp_host") or "").strip() or None smtp_port_raw = (request.form.get("smtp_port") or "").strip() smtp_username = (request.form.get("smtp_username") or "").strip() or None smtp_password = request.form.get("smtp_password") smtp_from = (request.form.get("smtp_from") or "").strip() or None smtp_starttls = (request.form.get("smtp_starttls") or "").lower() in ("1", "true", "yes", "on") smtp_debug = (request.form.get("smtp_debug") or "").lower() in ("1", "true", "yes", "on") smtp_timeout_raw = (request.form.get("smtp_timeout_seconds") or "").strip() smtp_port: int | None = None if smtp_port_raw: try: smtp_port = int(smtp_port_raw) except ValueError: flash("SMTP port must be a number", "danger") return redirect(url_for("admin.settings")) smtp_timeout: float = 10.0 if smtp_timeout_raw: try: smtp_timeout = float(smtp_timeout_raw) except ValueError: flash("SMTP timeout must be a number (seconds)", "danger") return redirect(url_for("admin.settings")) if smtp_port is not None and (smtp_port <= 0 or smtp_port > 65535): flash("SMTP port must be between 1 and 65535", "danger") return redirect(url_for("admin.settings")) if smtp_timeout <= 0: flash("SMTP timeout must be > 0", "danger") return redirect(url_for("admin.settings")) s.smtp_host = smtp_host s.smtp_port = smtp_port s.smtp_username = smtp_username # Only overwrite password if a value was submitted. # This allows editing other SMTP fields without having to re-enter the password. if smtp_password is not None and smtp_password.strip() != "": s.smtp_password = smtp_password s.smtp_from = smtp_from s.smtp_starttls = smtp_starttls s.smtp_timeout_seconds = smtp_timeout s.smtp_debug = smtp_debug db.session.commit() flash("SMTP settings saved.", "success") return redirect(url_for("admin.settings")) @bp.post("/settings/domain") @login_required def update_public_domain(): admin_required() s = _get_app_settings() raw = (request.form.get("public_domain") or "").strip().lower() if raw: # Normalize: user asked for domain-only (no scheme). Strip possible scheme anyway. raw = raw.replace("http://", "").replace("https://", "") raw = raw.strip().strip("/") if "/" in raw: flash("Public domain must not contain a path. Example: signage.example.com", "danger") return redirect(url_for("admin.settings")) if " " in raw: flash("Public domain must not contain spaces", "danger") return redirect(url_for("admin.settings")) s.public_domain = raw else: s.public_domain = None db.session.commit() flash("Public domain saved.", "success") return redirect(url_for("admin.settings")) @bp.post("/settings/test-email") @login_required def send_test_email(): admin_required() to_email = (request.form.get("to_email") or "").strip().lower() if not to_email: flash("Test email recipient is required", "danger") return redirect(url_for("admin.settings")) try: send_email( to_email=to_email, subject="Signage SMTP test", body_text="This is a test email from Signage (Admin → Settings).", ) except Exception as e: # Show a short error to the admin (they requested a test). flash(f"Failed to send test email: {e}", "danger") return redirect(url_for("admin.settings")) flash(f"Test email sent to {to_email}", "success") return redirect(url_for("admin.settings")) @bp.post("/settings/admins") @login_required def create_admin_user(): admin_required() email = (request.form.get("email") or "").strip().lower() password = request.form.get("password") or "" if not email or not password: flash("Email and password are required", "danger") return redirect(url_for("admin.settings")) if len(password) < 8: flash("Password must be at least 8 characters", "danger") return redirect(url_for("admin.settings")) existing = User.query.filter_by(email=email).first() if existing: if existing.is_admin: flash("That user is already an admin", "warning") return redirect(url_for("admin.settings")) # Promote existing user to admin existing.is_admin = True existing.set_password(password) existing.email = email existing.username = email existing.company_id = None db.session.commit() flash("User promoted to admin.", "success") return redirect(url_for("admin.settings")) u = User(is_admin=True) u.email = email u.username = email u.set_password(password) db.session.add(u) db.session.commit() flash("Admin user created.", "success") return redirect(url_for("admin.settings")) @bp.post("/settings/admins//demote") @login_required def demote_admin_user(user_id: int): admin_required() if current_user.id == user_id: flash("You cannot demote yourself", "danger") return redirect(url_for("admin.settings")) u = db.session.get(User, user_id) if not u: abort(404) if not u.is_admin: flash("That user is not an admin", "warning") return redirect(url_for("admin.settings")) # Ensure we always keep at least one admin. admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1: flash("Cannot demote the last remaining admin", "danger") return redirect(url_for("admin.settings")) u.is_admin = False db.session.commit() flash("Admin user demoted.", "success") return redirect(url_for("admin.settings")) @bp.post("/companies") @login_required def create_company(): admin_required() name = request.form.get("name", "").strip() if not name: flash("Company name required", "danger") return redirect(url_for("admin.dashboard")) if Company.query.filter_by(name=name).first(): flash("Company name already exists", "danger") return redirect(url_for("admin.dashboard")) c = Company(name=name) db.session.add(c) db.session.commit() # Create the per-company upload directory eagerly (best-effort). try: ensure_company_upload_dir(current_app.config["UPLOAD_FOLDER"], c.id) except Exception: # Upload directory is also created lazily on first upload. pass flash("Company created", "success") return redirect(url_for("admin.company_detail", company_id=c.id)) @bp.get("/companies/") @login_required def company_detail(company_id: int): admin_required() company = db.session.get(Company, company_id) if not company: abort(404) upload_root = current_app.config["UPLOAD_FOLDER"] used_bytes = get_company_upload_bytes(upload_root, company.id) return render_template( "admin/company_detail.html", company=company, storage={ "used_bytes": used_bytes, }, ) @bp.post("/companies//storage") @login_required def update_company_storage(company_id: int): admin_required() company = db.session.get(Company, company_id) if not company: abort(404) raw = (request.form.get("storage_max_mb") or "").strip() if raw == "": # Treat empty as unlimited company.storage_max_bytes = None db.session.commit() flash("Storage limit cleared (unlimited).", "success") return redirect(url_for("admin.company_detail", company_id=company_id)) try: mb = float(raw) except ValueError: flash("Invalid storage limit. Please enter a number (MB).", "danger") return redirect(url_for("admin.company_detail", company_id=company_id)) if mb <= 0: company.storage_max_bytes = None db.session.commit() flash("Storage limit cleared (unlimited).", "success") return redirect(url_for("admin.company_detail", company_id=company_id)) company.storage_max_bytes = int(mb * 1024 * 1024) db.session.commit() flash("Storage limit updated.", "success") return redirect(url_for("admin.company_detail", company_id=company_id)) @bp.post("/companies//users") @login_required def create_company_user(company_id: int): admin_required() company = db.session.get(Company, company_id) if not company: abort(404) email = (request.form.get("email", "") or "").strip().lower() or None password = request.form.get("password", "") if not email or not password: flash("Email and password required", "danger") return redirect(url_for("admin.company_detail", company_id=company_id)) if User.query.filter_by(email=email).first(): flash("Email already exists", "danger") return redirect(url_for("admin.company_detail", company_id=company_id)) u = User(is_admin=False, company=company) u.email = email u.username = email u.set_password(password) db.session.add(u) db.session.commit() flash("User created", "success") return redirect(url_for("admin.company_detail", company_id=company_id)) @bp.post("/companies//displays") @login_required def create_display(company_id: int): admin_required() company = db.session.get(Company, company_id) if not company: abort(404) name = request.form.get("name", "").strip() or "Display" token = uuid.uuid4().hex d = Display(company=company, name=name, token=token) db.session.add(d) db.session.commit() flash("Display created", "success") return redirect(url_for("admin.company_detail", company_id=company_id)) @bp.post("/companies//delete") @login_required def delete_company(company_id: int): admin_required() company = db.session.get(Company, company_id) if not company: abort(404) # If FK constraints are enabled, we must delete in a safe order. # 1) Detach displays from playlists (Display.assigned_playlist_id -> Playlist.id) for d in list(company.displays): d.assigned_playlist_id = None # 2) Delete display sessions referencing displays of this company display_ids = [d.id for d in company.displays] if display_ids: DisplaySession.query.filter(DisplaySession.display_id.in_(display_ids)).delete(synchronize_session=False) # 3) Clean up uploaded media files for all playlist items upload_folder = current_app.config["UPLOAD_FOLDER"] items = ( PlaylistItem.query.join(Playlist, PlaylistItem.playlist_id == Playlist.id) .filter(Playlist.company_id == company.id) .all() ) for it in items: if it.item_type in ("image", "video"): _try_delete_upload(it.file_path, upload_folder) # 4) Delete the company; cascades will delete users/displays/playlists/items. company_name = company.name db.session.delete(company) db.session.commit() flash(f"Company '{company_name}' deleted (including users, displays and playlists).", "success") return redirect(url_for("admin.dashboard")) @bp.post("/impersonate/") @login_required def impersonate(user_id: int): admin_required() target = db.session.get(User, user_id) if not target or target.is_admin: flash("Cannot impersonate this user", "danger") return redirect(url_for("admin.dashboard")) # Save admin id in session so we can return without any password. session["impersonator_admin_id"] = current_user.id login_user(target) flash(f"Impersonating {target.email or '(no email)'}.", "warning") return redirect(url_for("company.dashboard")) @bp.post("/users//email") @login_required def update_user_email(user_id: int): admin_required() u = db.session.get(User, user_id) if not u: abort(404) email = (request.form.get("email", "") or "").strip().lower() if not email: flash("Email is required", "danger") return redirect(url_for("admin.company_detail", company_id=u.company_id)) existing = User.query.filter(User.email == email, User.id != u.id).first() if existing: flash("Email already exists", "danger") return redirect(url_for("admin.company_detail", company_id=u.company_id)) u.email = email # keep backwards-compatible username column in sync u.username = email db.session.commit() flash("Email updated", "success") return redirect(url_for("admin.company_detail", company_id=u.company_id)) @bp.post("/users//delete") @login_required def delete_user(user_id: int): """Admin: delete a non-admin user.""" admin_required() u = db.session.get(User, user_id) if not u: abort(404) # Safety checks if u.is_admin: flash("Cannot delete an admin user", "danger") return redirect(url_for("admin.dashboard")) if u.id == current_user.id: flash("You cannot delete yourself", "danger") return redirect(url_for("admin.dashboard")) company_id = u.company_id company_name = u.company.name if u.company else None email = u.email db.session.delete(u) db.session.commit() flash( f"User '{email}' deleted" + (f" from '{company_name}'." if company_name else "."), "success", ) if company_id: return redirect(url_for("admin.company_detail", company_id=company_id)) return redirect(url_for("admin.dashboard")) @bp.post("/displays//name") @login_required def update_display_name(display_id: int): """Admin: rename a display.""" admin_required() display = db.session.get(Display, display_id) if not display: abort(404) name = (request.form.get("name") or "").strip() if not name: flash("Display name is required", "danger") return redirect(url_for("admin.company_detail", company_id=display.company_id)) display.name = name[:120] db.session.commit() flash("Display name updated", "success") return redirect(url_for("admin.company_detail", company_id=display.company_id))