Files
rssfeedviewer/app.py
2026-01-29 17:10:20 +01:00

143 lines
4.7 KiB
Python

import os
import time
from urllib.parse import urlparse
import feedparser
import requests
from flask import Flask, jsonify, render_template, request
DEFAULT_RSS_URL = os.environ.get(
"RSS_URL", "https://feeds.nos.nl/nosnieuwsalgemeen"
)
DEFAULT_LOGO_URL = os.environ.get("LOGO_URL", "")
def create_app() -> Flask:
app = Flask(__name__)
# Very small in-memory cache (good enough for a single-process demo).
# For production: swap this with Redis/Memcached.
cache: dict[str, tuple[float, dict]] = {}
cache_ttl_seconds = int(os.environ.get("RSS_CACHE_TTL", "60"))
def is_valid_url(url: str) -> bool:
try:
parsed = urlparse(url)
return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
except Exception:
return False
def fetch_feed(url: str) -> dict:
now = time.time()
cached = cache.get(url)
if cached and now - cached[0] < cache_ttl_seconds:
return cached[1]
headers = {
"User-Agent": "newsfeed-viewer/1.0 (+Flask; feedparser)"
}
resp = requests.get(url, headers=headers, timeout=8)
resp.raise_for_status()
parsed = feedparser.parse(resp.content)
feed_title = (parsed.feed.get("title") or "News").strip()
def entry_ts(e) -> int:
# Prefer published_parsed, fallback to updated_parsed, else 0.
st = getattr(e, "published_parsed", None) or getattr(
e, "updated_parsed", None
)
if not st:
return 0
return int(time.mktime(st))
# Sort newest first when we have dates; otherwise keep original order.
entries = list(parsed.entries or [])
if any(entry_ts(e) for e in entries):
entries.sort(key=entry_ts, reverse=True)
items = []
for e in entries[:5]:
# Try to extract a hero/background image from RSS/Atom enclosure/media fields.
enclosure_url = None
try:
links = getattr(e, "links", None) or []
for l in links:
if (l.get("rel") == "enclosure") and (l.get("href")):
enclosure_url = l.get("href")
break
except Exception:
enclosure_url = None
if not enclosure_url:
try:
media = getattr(e, "media_content", None) or getattr(
e, "media_thumbnail", None
)
if media and isinstance(media, list) and media[0].get("url"):
enclosure_url = media[0].get("url")
except Exception:
enclosure_url = None
items.append(
{
"title": (getattr(e, "title", "") or "").strip(),
"link": getattr(e, "link", None),
"published": getattr(e, "published", None)
or getattr(e, "updated", None),
"timestamp": entry_ts(e),
"enclosure_url": enclosure_url,
}
)
data = {
"url": url,
"title": feed_title,
"items": items,
"fetched_at": int(now),
}
cache[url] = (now, data)
return data
@app.get("/")
def index():
rss_url = request.args.get("url") or DEFAULT_RSS_URL
# Allow docker-compose (or any env) to override the logo without changing code.
logo_url = os.environ.get("LOGO_URL", DEFAULT_LOGO_URL).strip()
return render_template("index.html", rss_url=rss_url, logo_url=logo_url)
@app.get("/api/feed")
def api_feed():
url = request.args.get("url") or DEFAULT_RSS_URL
if not is_valid_url(url):
return jsonify({"error": "Invalid url"}), 400
try:
return jsonify(fetch_feed(url))
except requests.RequestException as e:
return (
jsonify(
{
"error": "Failed to fetch RSS feed",
"detail": str(e),
}
),
502,
)
except Exception as e:
return jsonify({"error": "Failed to parse RSS feed", "detail": str(e)}), 500
return app
# WSGI entrypoint for production servers like Gunicorn.
# Gunicorn will look for a module-level callable named `app` by default when
# you run `gunicorn app:app`.
app = create_app()
if __name__ == "__main__":
# For production: use a proper WSGI server (gunicorn/uwsgi).
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", "5000")), debug=True)