Files

218 lines
7.5 KiB
Python

from flask import Blueprint, abort, flash, redirect, render_template, request, session, url_for
from flask_login import current_user, login_required, login_user, logout_user
import logging
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
from ..extensions import db
from ..email_utils import send_email
from ..models import AppSettings, User
from ..auth_tokens import load_password_reset_user_id, make_password_reset_token
bp = Blueprint("auth", __name__, url_prefix="/auth")
logger = logging.getLogger(__name__)
def _make_reset_token(user: User) -> str:
from flask import current_app
return make_password_reset_token(secret_key=current_app.config["SECRET_KEY"], user_id=user.id)
def _load_reset_token(token: str, *, max_age_seconds: int) -> int:
from flask import current_app
return load_password_reset_user_id(
secret_key=current_app.config["SECRET_KEY"],
token=token,
max_age_seconds=max_age_seconds,
)
@bp.get("/forgot-password")
def forgot_password():
if current_user.is_authenticated:
return redirect(url_for("index"))
return render_template("auth_forgot_password.html")
@bp.post("/forgot-password")
def forgot_password_post():
# Always respond with a generic message to avoid user enumeration.
email = (request.form.get("email", "") or "").strip().lower()
if not email:
flash("If that email exists, you will receive a reset link.", "info")
return redirect(url_for("auth.forgot_password"))
user = User.query.filter_by(email=email).first()
if user:
token = _make_reset_token(user)
# By default Flask uses the request host when building _external URLs.
# For deployments behind proxies or where the public host differs, allow
# admins to configure a public domain used in email links.
settings = db.session.get(AppSettings, 1)
if settings and settings.public_domain:
# Flask's url_for doesn't support overriding the host per-call.
# We generate the relative path and prefix it with the configured domain.
path = url_for("auth.reset_password", token=token, _external=False)
reset_url = f"https://{settings.public_domain}{path}"
else:
reset_url = url_for("auth.reset_password", token=token, _external=True)
body = (
"Someone requested a password reset for your account.\n\n"
f"Reset your password using this link (valid for 30 minutes):\n{reset_url}\n\n"
"If you did not request this, you can ignore this email."
)
try:
send_email(to_email=user.email, subject="Password reset", body_text=body)
except Exception:
# Keep message generic to user (avoid leaking SMTP issues), but log for admins.
logger.exception("Failed to send password reset email")
flash("If that email exists, you will receive a reset link.", "info")
return redirect(url_for("auth.login"))
@bp.get("/reset-password/<token>")
def reset_password(token: str):
if current_user.is_authenticated:
return redirect(url_for("index"))
# Validate token up-front so UI can show a friendly error.
try:
_load_reset_token(token, max_age_seconds=30 * 60)
except SignatureExpired:
return render_template("auth_reset_password.html", token=None, token_error="Reset link has expired.")
except BadSignature:
return render_template("auth_reset_password.html", token=None, token_error="Invalid reset link.")
return render_template("auth_reset_password.html", token=token, token_error=None)
@bp.post("/reset-password/<token>")
def reset_password_post(token: str):
if current_user.is_authenticated:
return redirect(url_for("index"))
new_password = request.form.get("new_password", "")
confirm_password = request.form.get("confirm_password", "")
try:
user_id = _load_reset_token(token, max_age_seconds=30 * 60)
except SignatureExpired:
flash("Reset link has expired. Please request a new one.", "danger")
return redirect(url_for("auth.forgot_password"))
except BadSignature:
abort(400)
if not new_password:
flash("New password is required", "danger")
return redirect(url_for("auth.reset_password", token=token))
if len(new_password) < 8:
flash("New password must be at least 8 characters", "danger")
return redirect(url_for("auth.reset_password", token=token))
if new_password != confirm_password:
flash("New password and confirmation do not match", "danger")
return redirect(url_for("auth.reset_password", token=token))
user = db.session.get(User, user_id)
if not user:
# Generic response: treat as invalid.
abort(400)
user.set_password(new_password)
db.session.commit()
flash("Password updated. You can now log in.", "success")
return redirect(url_for("auth.login"))
@bp.get("/change-password")
@login_required
def change_password():
return render_template("auth_change_password.html")
@bp.post("/change-password")
@login_required
def change_password_post():
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
confirm_password = request.form.get("confirm_password", "")
if not current_user.check_password(current_password):
flash("Current password is incorrect", "danger")
return redirect(url_for("auth.change_password"))
if not new_password:
flash("New password is required", "danger")
return redirect(url_for("auth.change_password"))
if len(new_password) < 8:
flash("New password must be at least 8 characters", "danger")
return redirect(url_for("auth.change_password"))
if new_password != confirm_password:
flash("New password and confirmation do not match", "danger")
return redirect(url_for("auth.change_password"))
# Avoid no-op changes (helps catch accidental submits)
if current_user.check_password(new_password):
flash("New password must be different from the current password", "danger")
return redirect(url_for("auth.change_password"))
current_user.set_password(new_password)
db.session.commit()
flash("Password updated", "success")
# Send user back to their home area.
return redirect(url_for("index"))
@bp.get("/login")
def login():
if current_user.is_authenticated:
return redirect(url_for("index"))
return render_template("auth_login.html")
@bp.post("/login")
def login_post():
email = (request.form.get("email", "") or "").strip().lower()
password = request.form.get("password", "")
user = User.query.filter_by(email=email).first()
if not user or not user.check_password(password):
flash("Invalid email/password", "danger")
return redirect(url_for("auth.login"))
# clear impersonation marker, if any
session.pop("impersonator_admin_id", None)
login_user(user)
return redirect(url_for("index"))
@bp.get("/logout")
@login_required
def logout():
logout_user()
session.pop("impersonator_admin_id", None)
return redirect(url_for("auth.login"))
@bp.get("/stop-impersonation")
@login_required
def stop_impersonation():
admin_id = session.get("impersonator_admin_id")
if not admin_id:
return redirect(url_for("index"))
admin = db.session.get(User, int(admin_id))
session.pop("impersonator_admin_id", None)
if admin:
login_user(admin)
return redirect(url_for("admin.dashboard"))