Version 1.1.1
This commit is contained in:
272
app.py
272
app.py
@@ -7,6 +7,11 @@ from flask import send_from_directory
|
||||
from werkzeug.utils import secure_filename
|
||||
from flask_migrate import Migrate
|
||||
import uuid
|
||||
import smtplib
|
||||
import ssl
|
||||
from email.message import EmailMessage
|
||||
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
||||
from sqlalchemy.exc import OperationalError
|
||||
|
||||
UPLOAD_FOLDER = 'static/uploads'
|
||||
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
|
||||
@@ -33,6 +38,20 @@ login_manager = LoginManager(app)
|
||||
login_manager.login_view = 'login'
|
||||
migrate = Migrate(app, db)
|
||||
|
||||
def _get_serializer() -> URLSafeTimedSerializer:
|
||||
# Uses Flask secret key; rotating SECRET_KEY invalidates outstanding reset links.
|
||||
return URLSafeTimedSerializer(app.config['SECRET_KEY'])
|
||||
|
||||
|
||||
def _current_year() -> int:
|
||||
from datetime import datetime
|
||||
return datetime.now().year
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_current_year():
|
||||
return {"current_year": _current_year()}
|
||||
|
||||
# Models
|
||||
class Church(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
@@ -52,6 +71,131 @@ class User(UserMixin, db.Model):
|
||||
church_id = db.Column(db.Integer, db.ForeignKey('church.id'), nullable=False)
|
||||
is_admin = db.Column(db.Boolean, default=False)
|
||||
|
||||
|
||||
class SMTPSettings(db.Model):
|
||||
"""Global SMTP settings editable by admin.
|
||||
|
||||
For now we keep it simple: a single-row table (id=1) that stores the current
|
||||
outbound email settings.
|
||||
"""
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
host = db.Column(db.String(255), default="")
|
||||
port = db.Column(db.Integer, default=587)
|
||||
username = db.Column(db.String(255), default="")
|
||||
password = db.Column(db.String(255), default="") # stored as plain text; consider encrypting at rest later
|
||||
use_tls = db.Column(db.Boolean, default=True)
|
||||
use_ssl = db.Column(db.Boolean, default=False)
|
||||
verify_tls = db.Column(db.Boolean, default=True)
|
||||
from_email = db.Column(db.String(255), default="")
|
||||
from_name = db.Column(db.String(255), default="PsalmbordOnline")
|
||||
|
||||
|
||||
def _ensure_smtp_settings_schema() -> None:
|
||||
"""Best-effort schema sync for sqlite in dev.
|
||||
|
||||
This project uses SQLite and sometimes runs without applying Alembic
|
||||
migrations (e.g. `flask run`). When new columns are added, older DB files
|
||||
can crash on SELECT with "no such column".
|
||||
|
||||
We keep this narrowly-scoped to the SMTP settings table.
|
||||
"""
|
||||
from sqlalchemy import text
|
||||
|
||||
# Ensure tables exist first (idempotent)
|
||||
db.create_all()
|
||||
|
||||
# PRAGMA returns empty result if the table doesn't exist.
|
||||
cols = [row[1] for row in db.session.execute(text("PRAGMA table_info(smtp_settings)"))]
|
||||
if not cols:
|
||||
db.create_all()
|
||||
cols = [row[1] for row in db.session.execute(text("PRAGMA table_info(smtp_settings)"))]
|
||||
|
||||
# Add missing columns (idempotent guarded by PRAGMA)
|
||||
if "verify_tls" not in cols:
|
||||
db.session.execute(text("ALTER TABLE smtp_settings ADD COLUMN verify_tls BOOLEAN DEFAULT 1"))
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def get_smtp_settings() -> SMTPSettings:
|
||||
"""Fetch singleton SMTP settings row.
|
||||
|
||||
When running via `flask run`, the container init script (`init_db.py`) is not
|
||||
executed. If the database already exists but the `smtp_settings` table was
|
||||
added later, this can raise `OperationalError: no such table`.
|
||||
|
||||
We recover by calling `db.create_all()` (safe/ idempotent for SQLite) and
|
||||
retrying.
|
||||
"""
|
||||
# Make sure schema is up-to-date before ORM SELECTs columns.
|
||||
try:
|
||||
_ensure_smtp_settings_schema()
|
||||
except Exception:
|
||||
db.session.rollback()
|
||||
|
||||
try:
|
||||
settings = SMTPSettings.query.get(1)
|
||||
except OperationalError:
|
||||
# Handles cases where the DB is older/newer than the model.
|
||||
db.session.rollback()
|
||||
try:
|
||||
_ensure_smtp_settings_schema()
|
||||
except Exception:
|
||||
db.session.rollback()
|
||||
settings = SMTPSettings.query.get(1)
|
||||
|
||||
if not settings:
|
||||
settings = SMTPSettings(id=1)
|
||||
db.session.add(settings)
|
||||
db.session.commit()
|
||||
return settings
|
||||
|
||||
|
||||
def send_email(to_email: str, subject: str, body_text: str) -> tuple[bool, str]:
|
||||
"""Send an email using configured SMTP settings.
|
||||
|
||||
Returns (ok, message) where message is safe to show in UI/log.
|
||||
"""
|
||||
settings = get_smtp_settings()
|
||||
|
||||
if not settings.host or not settings.port or not settings.from_email:
|
||||
return False, "SMTP is niet geconfigureerd. Neem contact op met de beheerder."
|
||||
|
||||
msg = EmailMessage()
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = f"{settings.from_name} <{settings.from_email}>" if settings.from_name else settings.from_email
|
||||
msg["To"] = to_email
|
||||
msg.set_content(body_text)
|
||||
|
||||
try:
|
||||
if settings.use_ssl:
|
||||
context = ssl.create_default_context()
|
||||
if not settings.verify_tls:
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
with smtplib.SMTP_SSL(settings.host, int(settings.port), context=context, timeout=20) as server:
|
||||
if settings.username:
|
||||
server.login(settings.username, settings.password or "")
|
||||
server.send_message(msg)
|
||||
else:
|
||||
with smtplib.SMTP(settings.host, int(settings.port), timeout=20) as server:
|
||||
server.ehlo()
|
||||
if settings.use_tls:
|
||||
context = ssl.create_default_context()
|
||||
if not settings.verify_tls:
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
server.starttls(context=context)
|
||||
server.ehlo()
|
||||
if settings.username:
|
||||
server.login(settings.username, settings.password or "")
|
||||
server.send_message(msg)
|
||||
return True, "Email verstuurd."
|
||||
except Exception as e:
|
||||
# Avoid leaking credentials; show generic message.
|
||||
app.logger.exception("Failed to send email")
|
||||
return False, f"Email versturen mislukt: {type(e).__name__}"
|
||||
|
||||
# Association table for many-to-many relationship
|
||||
board_schedule_association = db.Table('board_schedule',
|
||||
db.Column('board_id', db.Integer, db.ForeignKey('liturgiebord.id'), primary_key=True),
|
||||
@@ -165,6 +309,83 @@ def login():
|
||||
return render_template('login.html')
|
||||
|
||||
|
||||
def _make_reset_token(user: User) -> str:
|
||||
s = _get_serializer()
|
||||
return s.dumps({"user_id": user.id, "pw": user.password}, salt="password-reset")
|
||||
|
||||
|
||||
def _load_user_from_reset_token(token: str, max_age_seconds: int = 3600) -> User | None:
|
||||
s = _get_serializer()
|
||||
try:
|
||||
data = s.loads(token, salt="password-reset", max_age=max_age_seconds)
|
||||
user_id = int(data.get("user_id"))
|
||||
pw_hash = data.get("pw")
|
||||
except (BadSignature, SignatureExpired, ValueError, TypeError):
|
||||
return None
|
||||
|
||||
user = User.query.get(user_id)
|
||||
if not user:
|
||||
return None
|
||||
# Invalidate token if password changed since token issuance.
|
||||
if pw_hash != user.password:
|
||||
return None
|
||||
return user
|
||||
|
||||
|
||||
@app.route('/password/forgot', methods=['GET', 'POST'])
|
||||
def forgot_password():
|
||||
if current_user.is_authenticated:
|
||||
return redirect(url_for('portal'))
|
||||
|
||||
if request.method == 'POST':
|
||||
email = request.form.get('email', '').strip().lower()
|
||||
user = User.query.filter_by(username=email).first() if email else None
|
||||
|
||||
# Always show generic message to prevent account enumeration.
|
||||
flash('Als dit e-mailadres bekend is, ontvang je een link om je wachtwoord te resetten.')
|
||||
|
||||
if user:
|
||||
token = _make_reset_token(user)
|
||||
reset_url = url_for('reset_password', token=token, _external=True)
|
||||
body = (
|
||||
"Je hebt een wachtwoord-reset aangevraagd voor PsalmbordOnline.\n\n"
|
||||
f"Klik op deze link om een nieuw wachtwoord in te stellen (geldig voor 1 uur):\n{reset_url}\n\n"
|
||||
"Als jij dit niet was, kun je deze email negeren.\n"
|
||||
)
|
||||
send_email(to_email=user.username, subject='Wachtwoord reset', body_text=body)
|
||||
|
||||
return redirect(url_for('login'))
|
||||
|
||||
return render_template('forgot_password.html')
|
||||
|
||||
|
||||
@app.route('/password/reset/<token>', methods=['GET', 'POST'])
|
||||
def reset_password(token):
|
||||
if current_user.is_authenticated:
|
||||
return redirect(url_for('portal'))
|
||||
|
||||
user = _load_user_from_reset_token(token)
|
||||
if not user:
|
||||
flash('Deze wachtwoord-reset link is ongeldig of verlopen.')
|
||||
return redirect(url_for('forgot_password'))
|
||||
|
||||
if request.method == 'POST':
|
||||
new_password = request.form.get('new_password', '')
|
||||
confirm_password = request.form.get('confirm_password', '')
|
||||
|
||||
if not new_password or not confirm_password:
|
||||
flash('Vul beide wachtwoordvelden in.')
|
||||
elif new_password != confirm_password:
|
||||
flash('De wachtwoorden komen niet overeen.')
|
||||
else:
|
||||
user.password = generate_password_hash(new_password)
|
||||
db.session.commit()
|
||||
flash('Wachtwoord gewijzigd. Je kunt nu inloggen.')
|
||||
return redirect(url_for('login'))
|
||||
|
||||
return render_template('reset_password.html', token=token)
|
||||
|
||||
|
||||
@app.route('/church/delete_user/<int:user_id>', methods=['POST'])
|
||||
@login_required
|
||||
def church_delete_user(user_id):
|
||||
@@ -574,16 +795,53 @@ def admin_dashboard():
|
||||
return redirect(url_for('portal'))
|
||||
|
||||
if request.method == 'POST':
|
||||
# Update church activation states
|
||||
for church in Church.query.all():
|
||||
checkbox_val = request.form.get(f'church_active_{church.id}')
|
||||
church.is_active = checkbox_val == 'on'
|
||||
db.session.commit()
|
||||
flash('Kerken activatiestatus bijgewerkt.')
|
||||
form_type = request.form.get('form_type', 'church_activation')
|
||||
|
||||
if form_type == 'church_activation':
|
||||
# Update church activation states
|
||||
for church in Church.query.all():
|
||||
checkbox_val = request.form.get(f'church_active_{church.id}')
|
||||
church.is_active = checkbox_val == 'on'
|
||||
db.session.commit()
|
||||
flash('Kerken activatiestatus bijgewerkt.')
|
||||
|
||||
elif form_type == 'smtp_settings':
|
||||
settings = get_smtp_settings()
|
||||
settings.host = request.form.get('smtp_host', '').strip()
|
||||
port_raw = request.form.get('smtp_port', '').strip()
|
||||
try:
|
||||
settings.port = int(port_raw) if port_raw else 587
|
||||
except ValueError:
|
||||
settings.port = 587
|
||||
settings.username = request.form.get('smtp_username', '').strip()
|
||||
# Only overwrite password if a value was provided (so admins can leave it blank).
|
||||
pw = request.form.get('smtp_password', '')
|
||||
if pw != '':
|
||||
settings.password = pw
|
||||
settings.use_tls = request.form.get('smtp_use_tls') == 'on'
|
||||
settings.use_ssl = request.form.get('smtp_use_ssl') == 'on'
|
||||
settings.verify_tls = request.form.get('smtp_verify_tls') == 'on'
|
||||
settings.from_email = request.form.get('smtp_from_email', '').strip()
|
||||
settings.from_name = request.form.get('smtp_from_name', '').strip() or 'PsalmbordOnline'
|
||||
db.session.commit()
|
||||
flash('SMTP instellingen opgeslagen.')
|
||||
|
||||
elif form_type == 'smtp_test':
|
||||
to_email = request.form.get('smtp_test_to', '').strip()
|
||||
if not to_email:
|
||||
flash('Vul een test emailadres in.')
|
||||
else:
|
||||
ok, msg = send_email(
|
||||
to_email=to_email,
|
||||
subject='PsalmbordOnline SMTP test',
|
||||
body_text='Dit is een test email vanuit PsalmbordOnline.',
|
||||
)
|
||||
flash(msg)
|
||||
|
||||
churches = Church.query.all()
|
||||
users = User.query.all()
|
||||
return render_template('admin_dashboard.html', churches=churches, users=users)
|
||||
smtp_settings = get_smtp_settings()
|
||||
return render_template('admin_dashboard.html', churches=churches, users=users, smtp_settings=smtp_settings)
|
||||
|
||||
@app.route('/admin/impersonate/<int:user_id>')
|
||||
@login_required
|
||||
|
||||
Reference in New Issue
Block a user