143 lines
4.7 KiB
Python
143 lines
4.7 KiB
Python
import os
|
|
import time
|
|
from urllib.parse import urlparse
|
|
|
|
import feedparser
|
|
import requests
|
|
from flask import Flask, jsonify, render_template, request
|
|
|
|
|
|
DEFAULT_RSS_URL = os.environ.get(
|
|
"RSS_URL", "https://feeds.nos.nl/nosnieuwsalgemeen"
|
|
)
|
|
|
|
DEFAULT_LOGO_URL = os.environ.get("LOGO_URL", "")
|
|
|
|
|
|
def create_app() -> Flask:
|
|
app = Flask(__name__)
|
|
|
|
# Very small in-memory cache (good enough for a single-process demo).
|
|
# For production: swap this with Redis/Memcached.
|
|
cache: dict[str, tuple[float, dict]] = {}
|
|
cache_ttl_seconds = int(os.environ.get("RSS_CACHE_TTL", "60"))
|
|
|
|
def is_valid_url(url: str) -> bool:
|
|
try:
|
|
parsed = urlparse(url)
|
|
return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
|
|
except Exception:
|
|
return False
|
|
|
|
def fetch_feed(url: str) -> dict:
|
|
now = time.time()
|
|
cached = cache.get(url)
|
|
if cached and now - cached[0] < cache_ttl_seconds:
|
|
return cached[1]
|
|
|
|
headers = {
|
|
"User-Agent": "newsfeed-viewer/1.0 (+Flask; feedparser)"
|
|
}
|
|
resp = requests.get(url, headers=headers, timeout=8)
|
|
resp.raise_for_status()
|
|
|
|
parsed = feedparser.parse(resp.content)
|
|
feed_title = (parsed.feed.get("title") or "News").strip()
|
|
|
|
def entry_ts(e) -> int:
|
|
# Prefer published_parsed, fallback to updated_parsed, else 0.
|
|
st = getattr(e, "published_parsed", None) or getattr(
|
|
e, "updated_parsed", None
|
|
)
|
|
if not st:
|
|
return 0
|
|
return int(time.mktime(st))
|
|
|
|
# Sort newest first when we have dates; otherwise keep original order.
|
|
entries = list(parsed.entries or [])
|
|
if any(entry_ts(e) for e in entries):
|
|
entries.sort(key=entry_ts, reverse=True)
|
|
|
|
items = []
|
|
for e in entries[:5]:
|
|
# Try to extract a hero/background image from RSS/Atom enclosure/media fields.
|
|
enclosure_url = None
|
|
try:
|
|
links = getattr(e, "links", None) or []
|
|
for l in links:
|
|
if (l.get("rel") == "enclosure") and (l.get("href")):
|
|
enclosure_url = l.get("href")
|
|
break
|
|
except Exception:
|
|
enclosure_url = None
|
|
|
|
if not enclosure_url:
|
|
try:
|
|
media = getattr(e, "media_content", None) or getattr(
|
|
e, "media_thumbnail", None
|
|
)
|
|
if media and isinstance(media, list) and media[0].get("url"):
|
|
enclosure_url = media[0].get("url")
|
|
except Exception:
|
|
enclosure_url = None
|
|
|
|
items.append(
|
|
{
|
|
"title": (getattr(e, "title", "") or "").strip(),
|
|
"link": getattr(e, "link", None),
|
|
"published": getattr(e, "published", None)
|
|
or getattr(e, "updated", None),
|
|
"timestamp": entry_ts(e),
|
|
"enclosure_url": enclosure_url,
|
|
}
|
|
)
|
|
|
|
data = {
|
|
"url": url,
|
|
"title": feed_title,
|
|
"items": items,
|
|
"fetched_at": int(now),
|
|
}
|
|
cache[url] = (now, data)
|
|
return data
|
|
|
|
@app.get("/")
|
|
def index():
|
|
rss_url = request.args.get("url") or DEFAULT_RSS_URL
|
|
# Allow docker-compose (or any env) to override the logo without changing code.
|
|
logo_url = os.environ.get("LOGO_URL", DEFAULT_LOGO_URL).strip()
|
|
return render_template("index.html", rss_url=rss_url, logo_url=logo_url)
|
|
|
|
@app.get("/api/feed")
|
|
def api_feed():
|
|
url = request.args.get("url") or DEFAULT_RSS_URL
|
|
if not is_valid_url(url):
|
|
return jsonify({"error": "Invalid url"}), 400
|
|
try:
|
|
return jsonify(fetch_feed(url))
|
|
except requests.RequestException as e:
|
|
return (
|
|
jsonify(
|
|
{
|
|
"error": "Failed to fetch RSS feed",
|
|
"detail": str(e),
|
|
}
|
|
),
|
|
502,
|
|
)
|
|
except Exception as e:
|
|
return jsonify({"error": "Failed to parse RSS feed", "detail": str(e)}), 500
|
|
|
|
return app
|
|
|
|
|
|
# WSGI entrypoint for production servers like Gunicorn.
|
|
# Gunicorn will look for a module-level callable named `app` by default when
|
|
# you run `gunicorn app:app`.
|
|
app = create_app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# For production: use a proper WSGI server (gunicorn/uwsgi).
|
|
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", "5000")), debug=True)
|