104 lines
3.1 KiB
Python
104 lines
3.1 KiB
Python
import os
|
|
|
|
|
|
def _safe_company_segment(company_id: int | None) -> str:
|
|
"""Return the directory name for a company's upload folder.
|
|
|
|
We intentionally use the numeric company_id (not company name) to avoid
|
|
rename issues and any path traversal concerns.
|
|
"""
|
|
|
|
try:
|
|
cid = int(company_id) if company_id is not None else 0
|
|
except (TypeError, ValueError):
|
|
cid = 0
|
|
return str(max(0, cid))
|
|
|
|
|
|
def get_company_upload_dir(upload_root: str, company_id: int | None) -> str:
|
|
"""Return absolute directory path for a company's uploads."""
|
|
|
|
return os.path.join(upload_root, _safe_company_segment(company_id))
|
|
|
|
|
|
def ensure_company_upload_dir(upload_root: str, company_id: int | None) -> str:
|
|
"""Ensure the company's upload directory exists; return its absolute path."""
|
|
|
|
d = get_company_upload_dir(upload_root, company_id)
|
|
os.makedirs(d, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def get_company_upload_bytes(upload_root: str, company_id: int | None) -> int:
|
|
"""Return best-effort total bytes used by a company's upload directory.
|
|
|
|
This walks the directory tree under uploads/<company_id> and sums file sizes.
|
|
Any errors (missing directories, permission issues, broken links) are ignored.
|
|
"""
|
|
|
|
total = 0
|
|
root = get_company_upload_dir(upload_root, company_id)
|
|
try:
|
|
if not os.path.isdir(root):
|
|
return 0
|
|
except Exception:
|
|
return 0
|
|
|
|
for base, _dirs, files in os.walk(root):
|
|
for fn in files:
|
|
try:
|
|
p = os.path.join(base, fn)
|
|
if os.path.isfile(p):
|
|
total += os.path.getsize(p)
|
|
except Exception:
|
|
# Ignore unreadable files
|
|
continue
|
|
return int(total)
|
|
|
|
|
|
def is_valid_upload_relpath(file_path: str | None) -> bool:
|
|
"""True if file_path looks like a path we manage under /static.
|
|
|
|
Supports both layouts:
|
|
- uploads/<filename>
|
|
- uploads/<company_id>/<filename>
|
|
"""
|
|
|
|
if not file_path:
|
|
return False
|
|
fp = (file_path or "").replace("\\", "/")
|
|
if not fp.startswith("uploads/"):
|
|
return False
|
|
# Prevent weird absolute/relative tricks; we only allow a normal relative path.
|
|
if fp.startswith("uploads//") or ":" in fp:
|
|
return False
|
|
# No parent dir segments.
|
|
if "../" in fp or fp.endswith("/..") or fp.startswith("../"):
|
|
return False
|
|
return True
|
|
|
|
|
|
def abs_upload_path(upload_root: str, file_path: str | None) -> str | None:
|
|
"""Resolve an item.file_path (uploads/...) to an absolute file path.
|
|
|
|
Returns None if file_path is not a managed uploads path.
|
|
"""
|
|
|
|
if not is_valid_upload_relpath(file_path):
|
|
return None
|
|
rel = (file_path or "").replace("\\", "/")
|
|
rel = rel.split("uploads/", 1)[1]
|
|
|
|
# Split into segments and harden against traversal.
|
|
parts = [p for p in rel.split("/") if p]
|
|
if not parts:
|
|
return None
|
|
|
|
candidate = os.path.abspath(os.path.join(upload_root, *parts))
|
|
root_abs = os.path.abspath(upload_root)
|
|
|
|
# Ensure resolved path stays inside upload_root.
|
|
if os.path.commonpath([candidate, root_abs]) != root_abs:
|
|
return None
|
|
return candidate
|