207 lines
6.8 KiB
Python
207 lines
6.8 KiB
Python
from flask import Blueprint, abort, flash, redirect, render_template, request, session, url_for
|
|
from flask_login import current_user, login_required, login_user, logout_user
|
|
|
|
import logging
|
|
|
|
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
|
|
|
|
from ..extensions import db
|
|
from ..email_utils import send_email
|
|
from ..models import User
|
|
from ..auth_tokens import load_password_reset_user_id, make_password_reset_token
|
|
|
|
bp = Blueprint("auth", __name__, url_prefix="/auth")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _make_reset_token(user: User) -> str:
|
|
from flask import current_app
|
|
|
|
return make_password_reset_token(secret_key=current_app.config["SECRET_KEY"], user_id=user.id)
|
|
|
|
|
|
def _load_reset_token(token: str, *, max_age_seconds: int) -> int:
|
|
from flask import current_app
|
|
|
|
return load_password_reset_user_id(
|
|
secret_key=current_app.config["SECRET_KEY"],
|
|
token=token,
|
|
max_age_seconds=max_age_seconds,
|
|
)
|
|
|
|
|
|
@bp.get("/forgot-password")
|
|
def forgot_password():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("index"))
|
|
return render_template("auth_forgot_password.html")
|
|
|
|
|
|
@bp.post("/forgot-password")
|
|
def forgot_password_post():
|
|
# Always respond with a generic message to avoid user enumeration.
|
|
email = (request.form.get("email", "") or "").strip().lower()
|
|
if not email:
|
|
flash("If that email exists, you will receive a reset link.", "info")
|
|
return redirect(url_for("auth.forgot_password"))
|
|
|
|
user = User.query.filter_by(email=email).first()
|
|
if user:
|
|
token = _make_reset_token(user)
|
|
reset_url = url_for("auth.reset_password", token=token, _external=True)
|
|
body = (
|
|
"Someone requested a password reset for your account.\n\n"
|
|
f"Reset your password using this link (valid for 30 minutes):\n{reset_url}\n\n"
|
|
"If you did not request this, you can ignore this email."
|
|
)
|
|
try:
|
|
send_email(to_email=user.email, subject="Password reset", body_text=body)
|
|
except Exception:
|
|
# Keep message generic to user (avoid leaking SMTP issues), but log for admins.
|
|
logger.exception("Failed to send password reset email")
|
|
|
|
flash("If that email exists, you will receive a reset link.", "info")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
|
|
@bp.get("/reset-password/<token>")
|
|
def reset_password(token: str):
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("index"))
|
|
|
|
# Validate token up-front so UI can show a friendly error.
|
|
try:
|
|
_load_reset_token(token, max_age_seconds=30 * 60)
|
|
except SignatureExpired:
|
|
return render_template("auth_reset_password.html", token=None, token_error="Reset link has expired.")
|
|
except BadSignature:
|
|
return render_template("auth_reset_password.html", token=None, token_error="Invalid reset link.")
|
|
|
|
return render_template("auth_reset_password.html", token=token, token_error=None)
|
|
|
|
|
|
@bp.post("/reset-password/<token>")
|
|
def reset_password_post(token: str):
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("index"))
|
|
|
|
new_password = request.form.get("new_password", "")
|
|
confirm_password = request.form.get("confirm_password", "")
|
|
|
|
try:
|
|
user_id = _load_reset_token(token, max_age_seconds=30 * 60)
|
|
except SignatureExpired:
|
|
flash("Reset link has expired. Please request a new one.", "danger")
|
|
return redirect(url_for("auth.forgot_password"))
|
|
except BadSignature:
|
|
abort(400)
|
|
|
|
if not new_password:
|
|
flash("New password is required", "danger")
|
|
return redirect(url_for("auth.reset_password", token=token))
|
|
|
|
if len(new_password) < 8:
|
|
flash("New password must be at least 8 characters", "danger")
|
|
return redirect(url_for("auth.reset_password", token=token))
|
|
|
|
if new_password != confirm_password:
|
|
flash("New password and confirmation do not match", "danger")
|
|
return redirect(url_for("auth.reset_password", token=token))
|
|
|
|
user = db.session.get(User, user_id)
|
|
if not user:
|
|
# Generic response: treat as invalid.
|
|
abort(400)
|
|
|
|
user.set_password(new_password)
|
|
db.session.commit()
|
|
flash("Password updated. You can now log in.", "success")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
|
|
@bp.get("/change-password")
|
|
@login_required
|
|
def change_password():
|
|
return render_template("auth_change_password.html")
|
|
|
|
|
|
@bp.post("/change-password")
|
|
@login_required
|
|
def change_password_post():
|
|
current_password = request.form.get("current_password", "")
|
|
new_password = request.form.get("new_password", "")
|
|
confirm_password = request.form.get("confirm_password", "")
|
|
|
|
if not current_user.check_password(current_password):
|
|
flash("Current password is incorrect", "danger")
|
|
return redirect(url_for("auth.change_password"))
|
|
|
|
if not new_password:
|
|
flash("New password is required", "danger")
|
|
return redirect(url_for("auth.change_password"))
|
|
|
|
if len(new_password) < 8:
|
|
flash("New password must be at least 8 characters", "danger")
|
|
return redirect(url_for("auth.change_password"))
|
|
|
|
if new_password != confirm_password:
|
|
flash("New password and confirmation do not match", "danger")
|
|
return redirect(url_for("auth.change_password"))
|
|
|
|
# Avoid no-op changes (helps catch accidental submits)
|
|
if current_user.check_password(new_password):
|
|
flash("New password must be different from the current password", "danger")
|
|
return redirect(url_for("auth.change_password"))
|
|
|
|
current_user.set_password(new_password)
|
|
db.session.commit()
|
|
flash("Password updated", "success")
|
|
|
|
# Send user back to their home area.
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@bp.get("/login")
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("index"))
|
|
return render_template("auth_login.html")
|
|
|
|
|
|
@bp.post("/login")
|
|
def login_post():
|
|
email = (request.form.get("email", "") or "").strip().lower()
|
|
password = request.form.get("password", "")
|
|
|
|
user = User.query.filter_by(email=email).first()
|
|
if not user or not user.check_password(password):
|
|
flash("Invalid email/password", "danger")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
# clear impersonation marker, if any
|
|
session.pop("impersonator_admin_id", None)
|
|
login_user(user)
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@bp.get("/logout")
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
session.pop("impersonator_admin_id", None)
|
|
return redirect(url_for("auth.login"))
|
|
|
|
|
|
@bp.get("/stop-impersonation")
|
|
@login_required
|
|
def stop_impersonation():
|
|
admin_id = session.get("impersonator_admin_id")
|
|
if not admin_id:
|
|
return redirect(url_for("index"))
|
|
|
|
admin = db.session.get(User, int(admin_id))
|
|
session.pop("impersonator_admin_id", None)
|
|
if admin:
|
|
login_user(admin)
|
|
return redirect(url_for("admin.dashboard"))
|