541 lines
17 KiB
Python
541 lines
17 KiB
Python
import uuid
|
|
|
|
import os
|
|
|
|
from flask import Blueprint, abort, current_app, flash, redirect, render_template, request, session, url_for
|
|
from flask_login import current_user, login_required, login_user
|
|
|
|
from ..extensions import db
|
|
from ..uploads import abs_upload_path, ensure_company_upload_dir, get_company_upload_bytes, is_valid_upload_relpath
|
|
from ..models import AppSettings, Company, Display, DisplayPlaylist, DisplaySession, Playlist, PlaylistItem, User
|
|
from ..email_utils import send_email
|
|
|
|
bp = Blueprint("admin", __name__, url_prefix="/admin")
|
|
|
|
|
|
def admin_required():
|
|
if not current_user.is_authenticated or not current_user.is_admin:
|
|
abort(403)
|
|
|
|
|
|
def _get_app_settings() -> AppSettings:
|
|
"""Get the singleton-ish AppSettings row, creating it if needed."""
|
|
|
|
s = db.session.get(AppSettings, 1)
|
|
if s:
|
|
return s
|
|
s = AppSettings(id=1)
|
|
db.session.add(s)
|
|
db.session.commit()
|
|
return s
|
|
|
|
|
|
def _try_delete_upload(file_path: str | None, upload_folder: str):
|
|
"""Best-effort delete of an uploaded media file."""
|
|
if not file_path:
|
|
return
|
|
if not is_valid_upload_relpath(file_path):
|
|
return
|
|
|
|
abs_path = abs_upload_path(upload_folder, file_path)
|
|
if not abs_path:
|
|
return
|
|
try:
|
|
if os.path.isfile(abs_path):
|
|
os.remove(abs_path)
|
|
except Exception:
|
|
# Ignore cleanup failures
|
|
pass
|
|
|
|
|
|
@bp.get("/")
|
|
@login_required
|
|
def dashboard():
|
|
admin_required()
|
|
companies = Company.query.order_by(Company.name.asc()).all()
|
|
return render_template("admin/dashboard.html", companies=companies)
|
|
|
|
|
|
@bp.get("/settings")
|
|
@login_required
|
|
def settings():
|
|
admin_required()
|
|
settings = _get_app_settings()
|
|
admins = User.query.filter_by(is_admin=True).order_by(User.email.asc()).all()
|
|
return render_template("admin/settings.html", settings=settings, admins=admins)
|
|
|
|
|
|
@bp.post("/settings/smtp")
|
|
@login_required
|
|
def update_smtp_settings():
|
|
admin_required()
|
|
s = _get_app_settings()
|
|
|
|
smtp_host = (request.form.get("smtp_host") or "").strip() or None
|
|
smtp_port_raw = (request.form.get("smtp_port") or "").strip()
|
|
smtp_username = (request.form.get("smtp_username") or "").strip() or None
|
|
smtp_password = request.form.get("smtp_password")
|
|
smtp_from = (request.form.get("smtp_from") or "").strip() or None
|
|
smtp_starttls = (request.form.get("smtp_starttls") or "").lower() in ("1", "true", "yes", "on")
|
|
smtp_debug = (request.form.get("smtp_debug") or "").lower() in ("1", "true", "yes", "on")
|
|
smtp_timeout_raw = (request.form.get("smtp_timeout_seconds") or "").strip()
|
|
|
|
smtp_port: int | None = None
|
|
if smtp_port_raw:
|
|
try:
|
|
smtp_port = int(smtp_port_raw)
|
|
except ValueError:
|
|
flash("SMTP port must be a number", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
smtp_timeout: float = 10.0
|
|
if smtp_timeout_raw:
|
|
try:
|
|
smtp_timeout = float(smtp_timeout_raw)
|
|
except ValueError:
|
|
flash("SMTP timeout must be a number (seconds)", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
if smtp_port is not None and (smtp_port <= 0 or smtp_port > 65535):
|
|
flash("SMTP port must be between 1 and 65535", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
if smtp_timeout <= 0:
|
|
flash("SMTP timeout must be > 0", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
s.smtp_host = smtp_host
|
|
s.smtp_port = smtp_port
|
|
s.smtp_username = smtp_username
|
|
|
|
# Only overwrite password if a value was submitted.
|
|
# This allows editing other SMTP fields without having to re-enter the password.
|
|
if smtp_password is not None and smtp_password.strip() != "":
|
|
s.smtp_password = smtp_password
|
|
|
|
s.smtp_from = smtp_from
|
|
s.smtp_starttls = smtp_starttls
|
|
s.smtp_timeout_seconds = smtp_timeout
|
|
s.smtp_debug = smtp_debug
|
|
db.session.commit()
|
|
|
|
flash("SMTP settings saved.", "success")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
|
|
@bp.post("/settings/domain")
|
|
@login_required
|
|
def update_public_domain():
|
|
admin_required()
|
|
s = _get_app_settings()
|
|
raw = (request.form.get("public_domain") or "").strip().lower()
|
|
if raw:
|
|
# Normalize: user asked for domain-only (no scheme). Strip possible scheme anyway.
|
|
raw = raw.replace("http://", "").replace("https://", "")
|
|
raw = raw.strip().strip("/")
|
|
if "/" in raw:
|
|
flash("Public domain must not contain a path. Example: signage.example.com", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
if " " in raw:
|
|
flash("Public domain must not contain spaces", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
s.public_domain = raw
|
|
else:
|
|
s.public_domain = None
|
|
db.session.commit()
|
|
flash("Public domain saved.", "success")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
|
|
@bp.post("/settings/test-email")
|
|
@login_required
|
|
def send_test_email():
|
|
admin_required()
|
|
to_email = (request.form.get("to_email") or "").strip().lower()
|
|
if not to_email:
|
|
flash("Test email recipient is required", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
try:
|
|
send_email(
|
|
to_email=to_email,
|
|
subject="Signage SMTP test",
|
|
body_text="This is a test email from Signage (Admin → Settings).",
|
|
)
|
|
except Exception as e:
|
|
# Show a short error to the admin (they requested a test).
|
|
flash(f"Failed to send test email: {e}", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
flash(f"Test email sent to {to_email}", "success")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
|
|
@bp.post("/settings/admins")
|
|
@login_required
|
|
def create_admin_user():
|
|
admin_required()
|
|
email = (request.form.get("email") or "").strip().lower()
|
|
password = request.form.get("password") or ""
|
|
|
|
if not email or not password:
|
|
flash("Email and password are required", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
if len(password) < 8:
|
|
flash("Password must be at least 8 characters", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
existing = User.query.filter_by(email=email).first()
|
|
if existing:
|
|
if existing.is_admin:
|
|
flash("That user is already an admin", "warning")
|
|
return redirect(url_for("admin.settings"))
|
|
# Promote existing user to admin
|
|
existing.is_admin = True
|
|
existing.set_password(password)
|
|
existing.email = email
|
|
existing.username = email
|
|
existing.company_id = None
|
|
db.session.commit()
|
|
flash("User promoted to admin.", "success")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
u = User(is_admin=True)
|
|
u.email = email
|
|
u.username = email
|
|
u.set_password(password)
|
|
db.session.add(u)
|
|
db.session.commit()
|
|
flash("Admin user created.", "success")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
|
|
@bp.post("/settings/admins/<int:user_id>/demote")
|
|
@login_required
|
|
def demote_admin_user(user_id: int):
|
|
admin_required()
|
|
if current_user.id == user_id:
|
|
flash("You cannot demote yourself", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
u = db.session.get(User, user_id)
|
|
if not u:
|
|
abort(404)
|
|
if not u.is_admin:
|
|
flash("That user is not an admin", "warning")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
# Ensure we always keep at least one admin.
|
|
admin_count = User.query.filter_by(is_admin=True).count()
|
|
if admin_count <= 1:
|
|
flash("Cannot demote the last remaining admin", "danger")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
u.is_admin = False
|
|
db.session.commit()
|
|
flash("Admin user demoted.", "success")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
|
|
@bp.post("/companies")
|
|
@login_required
|
|
def create_company():
|
|
admin_required()
|
|
name = request.form.get("name", "").strip()
|
|
if not name:
|
|
flash("Company name required", "danger")
|
|
return redirect(url_for("admin.dashboard"))
|
|
if Company.query.filter_by(name=name).first():
|
|
flash("Company name already exists", "danger")
|
|
return redirect(url_for("admin.dashboard"))
|
|
c = Company(name=name)
|
|
db.session.add(c)
|
|
db.session.commit()
|
|
|
|
# Create the per-company upload directory eagerly (best-effort).
|
|
try:
|
|
ensure_company_upload_dir(current_app.config["UPLOAD_FOLDER"], c.id)
|
|
except Exception:
|
|
# Upload directory is also created lazily on first upload.
|
|
pass
|
|
flash("Company created", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=c.id))
|
|
|
|
|
|
@bp.get("/companies/<int:company_id>")
|
|
@login_required
|
|
def company_detail(company_id: int):
|
|
admin_required()
|
|
company = db.session.get(Company, company_id)
|
|
if not company:
|
|
abort(404)
|
|
|
|
upload_root = current_app.config["UPLOAD_FOLDER"]
|
|
used_bytes = get_company_upload_bytes(upload_root, company.id)
|
|
|
|
return render_template(
|
|
"admin/company_detail.html",
|
|
company=company,
|
|
storage={
|
|
"used_bytes": used_bytes,
|
|
},
|
|
)
|
|
|
|
|
|
@bp.post("/companies/<int:company_id>/storage")
|
|
@login_required
|
|
def update_company_storage(company_id: int):
|
|
admin_required()
|
|
|
|
company = db.session.get(Company, company_id)
|
|
if not company:
|
|
abort(404)
|
|
|
|
raw = (request.form.get("storage_max_mb") or "").strip()
|
|
if raw == "":
|
|
# Treat empty as unlimited
|
|
company.storage_max_bytes = None
|
|
db.session.commit()
|
|
flash("Storage limit cleared (unlimited).", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
|
|
try:
|
|
mb = float(raw)
|
|
except ValueError:
|
|
flash("Invalid storage limit. Please enter a number (MB).", "danger")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
|
|
if mb <= 0:
|
|
company.storage_max_bytes = None
|
|
db.session.commit()
|
|
flash("Storage limit cleared (unlimited).", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
|
|
company.storage_max_bytes = int(mb * 1024 * 1024)
|
|
db.session.commit()
|
|
flash("Storage limit updated.", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
|
|
|
|
@bp.post("/companies/<int:company_id>/users")
|
|
@login_required
|
|
def create_company_user(company_id: int):
|
|
admin_required()
|
|
company = db.session.get(Company, company_id)
|
|
if not company:
|
|
abort(404)
|
|
email = (request.form.get("email", "") or "").strip().lower() or None
|
|
password = request.form.get("password", "")
|
|
if not email or not password:
|
|
flash("Email and password required", "danger")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
|
|
if User.query.filter_by(email=email).first():
|
|
flash("Email already exists", "danger")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
|
|
u = User(is_admin=False, company=company)
|
|
u.email = email
|
|
u.username = email
|
|
u.set_password(password)
|
|
db.session.add(u)
|
|
db.session.commit()
|
|
flash("User created", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
|
|
|
|
@bp.post("/companies/<int:company_id>/displays")
|
|
@login_required
|
|
def create_display(company_id: int):
|
|
admin_required()
|
|
company = db.session.get(Company, company_id)
|
|
if not company:
|
|
abort(404)
|
|
name = request.form.get("name", "").strip() or "Display"
|
|
token = uuid.uuid4().hex
|
|
d = Display(company=company, name=name, token=token)
|
|
db.session.add(d)
|
|
db.session.commit()
|
|
flash("Display created", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
|
|
|
|
@bp.post("/companies/<int:company_id>/delete")
|
|
@login_required
|
|
def delete_company(company_id: int):
|
|
admin_required()
|
|
|
|
company = db.session.get(Company, company_id)
|
|
if not company:
|
|
abort(404)
|
|
|
|
# If FK constraints are enabled, we must delete in a safe order.
|
|
# 1) Detach displays from playlists (Display.assigned_playlist_id -> Playlist.id)
|
|
for d in list(company.displays):
|
|
d.assigned_playlist_id = None
|
|
|
|
# 1b) Clear multi-playlist mappings
|
|
display_ids = [d.id for d in company.displays]
|
|
if display_ids:
|
|
DisplayPlaylist.query.filter(DisplayPlaylist.display_id.in_(display_ids)).delete(synchronize_session=False)
|
|
|
|
# 2) Delete display sessions referencing displays of this company
|
|
if display_ids:
|
|
DisplaySession.query.filter(DisplaySession.display_id.in_(display_ids)).delete(synchronize_session=False)
|
|
|
|
# 3) Clean up uploaded media files for all playlist items
|
|
upload_folder = current_app.config["UPLOAD_FOLDER"]
|
|
items = (
|
|
PlaylistItem.query.join(Playlist, PlaylistItem.playlist_id == Playlist.id)
|
|
.filter(Playlist.company_id == company.id)
|
|
.all()
|
|
)
|
|
for it in items:
|
|
if it.item_type in ("image", "video"):
|
|
_try_delete_upload(it.file_path, upload_folder)
|
|
|
|
# 3b) Clean up uploaded overlay (if any)
|
|
if company.overlay_file_path:
|
|
_try_delete_upload(company.overlay_file_path, upload_folder)
|
|
|
|
# 4) Delete the company; cascades will delete users/displays/playlists/items.
|
|
company_name = company.name
|
|
db.session.delete(company)
|
|
db.session.commit()
|
|
|
|
flash(f"Company '{company_name}' deleted (including users, displays and playlists).", "success")
|
|
return redirect(url_for("admin.dashboard"))
|
|
|
|
|
|
@bp.post("/impersonate/<int:user_id>")
|
|
@login_required
|
|
def impersonate(user_id: int):
|
|
admin_required()
|
|
target = db.session.get(User, user_id)
|
|
if not target or target.is_admin:
|
|
flash("Cannot impersonate this user", "danger")
|
|
return redirect(url_for("admin.dashboard"))
|
|
|
|
# Save admin id in session so we can return without any password.
|
|
session["impersonator_admin_id"] = current_user.id
|
|
login_user(target)
|
|
flash(f"Impersonating {target.email or '(no email)'}.", "warning")
|
|
return redirect(url_for("company.dashboard"))
|
|
|
|
|
|
@bp.post("/users/<int:user_id>/email")
|
|
@login_required
|
|
def update_user_email(user_id: int):
|
|
admin_required()
|
|
u = db.session.get(User, user_id)
|
|
if not u:
|
|
abort(404)
|
|
|
|
email = (request.form.get("email", "") or "").strip().lower()
|
|
if not email:
|
|
flash("Email is required", "danger")
|
|
return redirect(url_for("admin.company_detail", company_id=u.company_id))
|
|
|
|
existing = User.query.filter(User.email == email, User.id != u.id).first()
|
|
if existing:
|
|
flash("Email already exists", "danger")
|
|
return redirect(url_for("admin.company_detail", company_id=u.company_id))
|
|
|
|
u.email = email
|
|
# keep backwards-compatible username column in sync
|
|
u.username = email
|
|
db.session.commit()
|
|
flash("Email updated", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=u.company_id))
|
|
|
|
|
|
@bp.post("/users/<int:user_id>/delete")
|
|
@login_required
|
|
def delete_user(user_id: int):
|
|
"""Admin: delete a non-admin user."""
|
|
|
|
admin_required()
|
|
|
|
u = db.session.get(User, user_id)
|
|
if not u:
|
|
abort(404)
|
|
|
|
# Safety checks
|
|
if u.is_admin:
|
|
flash("Cannot delete an admin user", "danger")
|
|
return redirect(url_for("admin.dashboard"))
|
|
|
|
if u.id == current_user.id:
|
|
flash("You cannot delete yourself", "danger")
|
|
return redirect(url_for("admin.dashboard"))
|
|
|
|
company_id = u.company_id
|
|
company_name = u.company.name if u.company else None
|
|
email = u.email
|
|
|
|
db.session.delete(u)
|
|
db.session.commit()
|
|
|
|
flash(
|
|
f"User '{email}' deleted" + (f" from '{company_name}'." if company_name else "."),
|
|
"success",
|
|
)
|
|
|
|
if company_id:
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|
|
return redirect(url_for("admin.dashboard"))
|
|
|
|
|
|
@bp.post("/displays/<int:display_id>/name")
|
|
@login_required
|
|
def update_display_name(display_id: int):
|
|
"""Admin: rename a display."""
|
|
admin_required()
|
|
|
|
display = db.session.get(Display, display_id)
|
|
if not display:
|
|
abort(404)
|
|
|
|
name = (request.form.get("name") or "").strip()
|
|
if not name:
|
|
flash("Display name is required", "danger")
|
|
return redirect(url_for("admin.company_detail", company_id=display.company_id))
|
|
|
|
display.name = name[:120]
|
|
db.session.commit()
|
|
flash("Display name updated", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=display.company_id))
|
|
|
|
|
|
@bp.post("/displays/<int:display_id>/delete")
|
|
@login_required
|
|
def delete_display(display_id: int):
|
|
"""Admin: delete a display."""
|
|
|
|
admin_required()
|
|
|
|
display = db.session.get(Display, display_id)
|
|
if not display:
|
|
abort(404)
|
|
|
|
company_id = display.company_id
|
|
display_name = display.name
|
|
|
|
# If FK constraints are enabled, delete in a safe order.
|
|
# 1) Unassign playlist
|
|
display.assigned_playlist_id = None
|
|
|
|
# 2) Clear multi-playlist mappings
|
|
DisplayPlaylist.query.filter_by(display_id=display.id).delete(synchronize_session=False)
|
|
|
|
# 3) Delete active sessions for this display
|
|
DisplaySession.query.filter_by(display_id=display.id).delete(synchronize_session=False)
|
|
|
|
# 4) Delete display
|
|
db.session.delete(display)
|
|
db.session.commit()
|
|
|
|
flash(f"Display '{display_name}' deleted.", "success")
|
|
return redirect(url_for("admin.company_detail", company_id=company_id))
|