Files
openslide/app/routes/admin.py
2026-01-23 13:54:58 +01:00

193 lines
6.4 KiB
Python

import uuid
import os
from flask import Blueprint, abort, current_app, flash, redirect, render_template, request, session, url_for
from flask_login import current_user, login_required, login_user
from ..extensions import db
from ..models import Company, Display, DisplaySession, Playlist, PlaylistItem, User
bp = Blueprint("admin", __name__, url_prefix="/admin")
def admin_required():
if not current_user.is_authenticated or not current_user.is_admin:
abort(403)
def _try_delete_upload(file_path: str | None, upload_folder: str):
"""Best-effort delete of an uploaded media file."""
if not file_path:
return
if not file_path.startswith("uploads/"):
return
filename = file_path.split("/", 1)[1]
abs_path = os.path.join(upload_folder, filename)
try:
if os.path.isfile(abs_path):
os.remove(abs_path)
except Exception:
# Ignore cleanup failures
pass
@bp.get("/")
@login_required
def dashboard():
admin_required()
companies = Company.query.order_by(Company.name.asc()).all()
users = User.query.order_by(User.username.asc()).all()
return render_template("admin/dashboard.html", companies=companies, users=users)
@bp.post("/companies")
@login_required
def create_company():
admin_required()
name = request.form.get("name", "").strip()
if not name:
flash("Company name required", "danger")
return redirect(url_for("admin.dashboard"))
if Company.query.filter_by(name=name).first():
flash("Company name already exists", "danger")
return redirect(url_for("admin.dashboard"))
c = Company(name=name)
db.session.add(c)
db.session.commit()
flash("Company created", "success")
return redirect(url_for("admin.company_detail", company_id=c.id))
@bp.get("/companies/<int:company_id>")
@login_required
def company_detail(company_id: int):
admin_required()
company = db.session.get(Company, company_id)
if not company:
abort(404)
return render_template("admin/company_detail.html", company=company)
@bp.post("/companies/<int:company_id>/users")
@login_required
def create_company_user(company_id: int):
admin_required()
company = db.session.get(Company, company_id)
if not company:
abort(404)
username = request.form.get("username", "").strip()
email = (request.form.get("email", "") or "").strip().lower() or None
password = request.form.get("password", "")
if not username or not email or not password:
flash("Username, email and password required", "danger")
return redirect(url_for("admin.company_detail", company_id=company_id))
if User.query.filter_by(username=username).first():
flash("Username already exists", "danger")
return redirect(url_for("admin.company_detail", company_id=company_id))
if User.query.filter_by(email=email).first():
flash("Email already exists", "danger")
return redirect(url_for("admin.company_detail", company_id=company_id))
u = User(username=username, is_admin=False, company=company)
u.email = email
u.set_password(password)
db.session.add(u)
db.session.commit()
flash("User created", "success")
return redirect(url_for("admin.company_detail", company_id=company_id))
@bp.post("/companies/<int:company_id>/displays")
@login_required
def create_display(company_id: int):
admin_required()
company = db.session.get(Company, company_id)
if not company:
abort(404)
name = request.form.get("name", "").strip() or "Display"
token = uuid.uuid4().hex
d = Display(company=company, name=name, token=token)
db.session.add(d)
db.session.commit()
flash("Display created", "success")
return redirect(url_for("admin.company_detail", company_id=company_id))
@bp.post("/companies/<int:company_id>/delete")
@login_required
def delete_company(company_id: int):
admin_required()
company = db.session.get(Company, company_id)
if not company:
abort(404)
# If FK constraints are enabled, we must delete in a safe order.
# 1) Detach displays from playlists (Display.assigned_playlist_id -> Playlist.id)
for d in list(company.displays):
d.assigned_playlist_id = None
# 2) Delete display sessions referencing displays of this company
display_ids = [d.id for d in company.displays]
if display_ids:
DisplaySession.query.filter(DisplaySession.display_id.in_(display_ids)).delete(synchronize_session=False)
# 3) Clean up uploaded media files for all playlist items
upload_folder = current_app.config["UPLOAD_FOLDER"]
items = (
PlaylistItem.query.join(Playlist, PlaylistItem.playlist_id == Playlist.id)
.filter(Playlist.company_id == company.id)
.all()
)
for it in items:
if it.item_type in ("image", "video"):
_try_delete_upload(it.file_path, upload_folder)
# 4) Delete the company; cascades will delete users/displays/playlists/items.
company_name = company.name
db.session.delete(company)
db.session.commit()
flash(f"Company '{company_name}' deleted (including users, displays and playlists).", "success")
return redirect(url_for("admin.dashboard"))
@bp.post("/impersonate/<int:user_id>")
@login_required
def impersonate(user_id: int):
admin_required()
target = db.session.get(User, user_id)
if not target or target.is_admin:
flash("Cannot impersonate this user", "danger")
return redirect(url_for("admin.dashboard"))
# Save admin id in session so we can return without any password.
session["impersonator_admin_id"] = current_user.id
login_user(target)
flash(f"Impersonating {target.username}.", "warning")
return redirect(url_for("company.dashboard"))
@bp.post("/users/<int:user_id>/email")
@login_required
def update_user_email(user_id: int):
admin_required()
u = db.session.get(User, user_id)
if not u:
abort(404)
email = (request.form.get("email", "") or "").strip().lower() or None
if email:
existing = User.query.filter(User.email == email, User.id != u.id).first()
if existing:
flash("Email already exists", "danger")
return redirect(url_for("admin.company_detail", company_id=u.company_id))
u.email = email
db.session.commit()
flash("Email updated", "success")
return redirect(url_for("admin.company_detail", company_id=u.company_id))