from flask import Blueprint, abort, flash, redirect, render_template, request, session, url_for from flask_login import current_user, login_required, login_user, logout_user import logging from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer from ..extensions import db from ..email_utils import send_email from ..models import AppSettings, User from ..auth_tokens import load_password_reset_user_id, make_password_reset_token bp = Blueprint("auth", __name__, url_prefix="/auth") logger = logging.getLogger(__name__) def _make_reset_token(user: User) -> str: from flask import current_app return make_password_reset_token(secret_key=current_app.config["SECRET_KEY"], user_id=user.id) def _load_reset_token(token: str, *, max_age_seconds: int) -> int: from flask import current_app return load_password_reset_user_id( secret_key=current_app.config["SECRET_KEY"], token=token, max_age_seconds=max_age_seconds, ) @bp.get("/forgot-password") def forgot_password(): if current_user.is_authenticated: return redirect(url_for("index")) return render_template("auth_forgot_password.html") @bp.post("/forgot-password") def forgot_password_post(): # Always respond with a generic message to avoid user enumeration. email = (request.form.get("email", "") or "").strip().lower() if not email: flash("If that email exists, you will receive a reset link.", "info") return redirect(url_for("auth.forgot_password")) user = User.query.filter_by(email=email).first() if user: token = _make_reset_token(user) # By default Flask uses the request host when building _external URLs. # For deployments behind proxies or where the public host differs, allow # admins to configure a public domain used in email links. settings = db.session.get(AppSettings, 1) if settings and settings.public_domain: # Flask's url_for doesn't support overriding the host per-call. # We generate the relative path and prefix it with the configured domain. path = url_for("auth.reset_password", token=token, _external=False) reset_url = f"https://{settings.public_domain}{path}" else: reset_url = url_for("auth.reset_password", token=token, _external=True) body = ( "Someone requested a password reset for your account.\n\n" f"Reset your password using this link (valid for 30 minutes):\n{reset_url}\n\n" "If you did not request this, you can ignore this email." ) try: send_email(to_email=user.email, subject="Password reset", body_text=body) except Exception: # Keep message generic to user (avoid leaking SMTP issues), but log for admins. logger.exception("Failed to send password reset email") flash("If that email exists, you will receive a reset link.", "info") return redirect(url_for("auth.login")) @bp.get("/reset-password/") def reset_password(token: str): if current_user.is_authenticated: return redirect(url_for("index")) # Validate token up-front so UI can show a friendly error. try: _load_reset_token(token, max_age_seconds=30 * 60) except SignatureExpired: return render_template("auth_reset_password.html", token=None, token_error="Reset link has expired.") except BadSignature: return render_template("auth_reset_password.html", token=None, token_error="Invalid reset link.") return render_template("auth_reset_password.html", token=token, token_error=None) @bp.post("/reset-password/") def reset_password_post(token: str): if current_user.is_authenticated: return redirect(url_for("index")) new_password = request.form.get("new_password", "") confirm_password = request.form.get("confirm_password", "") try: user_id = _load_reset_token(token, max_age_seconds=30 * 60) except SignatureExpired: flash("Reset link has expired. Please request a new one.", "danger") return redirect(url_for("auth.forgot_password")) except BadSignature: abort(400) if not new_password: flash("New password is required", "danger") return redirect(url_for("auth.reset_password", token=token)) if len(new_password) < 8: flash("New password must be at least 8 characters", "danger") return redirect(url_for("auth.reset_password", token=token)) if new_password != confirm_password: flash("New password and confirmation do not match", "danger") return redirect(url_for("auth.reset_password", token=token)) user = db.session.get(User, user_id) if not user: # Generic response: treat as invalid. abort(400) user.set_password(new_password) db.session.commit() flash("Password updated. You can now log in.", "success") return redirect(url_for("auth.login")) @bp.get("/change-password") @login_required def change_password(): return render_template("auth_change_password.html") @bp.post("/change-password") @login_required def change_password_post(): current_password = request.form.get("current_password", "") new_password = request.form.get("new_password", "") confirm_password = request.form.get("confirm_password", "") if not current_user.check_password(current_password): flash("Current password is incorrect", "danger") return redirect(url_for("auth.change_password")) if not new_password: flash("New password is required", "danger") return redirect(url_for("auth.change_password")) if len(new_password) < 8: flash("New password must be at least 8 characters", "danger") return redirect(url_for("auth.change_password")) if new_password != confirm_password: flash("New password and confirmation do not match", "danger") return redirect(url_for("auth.change_password")) # Avoid no-op changes (helps catch accidental submits) if current_user.check_password(new_password): flash("New password must be different from the current password", "danger") return redirect(url_for("auth.change_password")) current_user.set_password(new_password) db.session.commit() flash("Password updated", "success") # Send user back to their home area. return redirect(url_for("index")) @bp.get("/login") def login(): if current_user.is_authenticated: return redirect(url_for("index")) return render_template("auth_login.html") @bp.post("/login") def login_post(): email = (request.form.get("email", "") or "").strip().lower() password = request.form.get("password", "") user = User.query.filter_by(email=email).first() if not user or not user.check_password(password): flash("Invalid email/password", "danger") return redirect(url_for("auth.login")) # clear impersonation marker, if any session.pop("impersonator_admin_id", None) login_user(user) return redirect(url_for("index")) @bp.get("/logout") @login_required def logout(): logout_user() session.pop("impersonator_admin_id", None) return redirect(url_for("auth.login")) @bp.get("/stop-impersonation") @login_required def stop_impersonation(): admin_id = session.get("impersonator_admin_id") if not admin_id: return redirect(url_for("index")) admin = db.session.get(User, int(admin_id)) session.pop("impersonator_admin_id", None) if admin: login_user(admin) return redirect(url_for("admin.dashboard"))