151 lines
4.5 KiB
Python
151 lines
4.5 KiB
Python
import os
|
|
|
|
|
|
def _safe_company_segment(company_id: int | None) -> str:
|
|
"""Return the directory name for a company's upload folder.
|
|
|
|
We intentionally use the numeric company_id (not company name) to avoid
|
|
rename issues and any path traversal concerns.
|
|
"""
|
|
|
|
try:
|
|
cid = int(company_id) if company_id is not None else 0
|
|
except (TypeError, ValueError):
|
|
cid = 0
|
|
return str(max(0, cid))
|
|
|
|
|
|
def get_company_upload_dir(upload_root: str, company_id: int | None) -> str:
|
|
"""Return absolute directory path for a company's uploads."""
|
|
|
|
return os.path.join(upload_root, _safe_company_segment(company_id))
|
|
|
|
|
|
def ensure_company_upload_dir(upload_root: str, company_id: int | None) -> str:
|
|
"""Ensure the company's upload directory exists; return its absolute path."""
|
|
|
|
d = get_company_upload_dir(upload_root, company_id)
|
|
os.makedirs(d, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def get_company_upload_bytes(upload_root: str, company_id: int | None) -> int:
|
|
"""Return best-effort total bytes used by a company's upload directory.
|
|
|
|
This walks the directory tree under uploads/<company_id> and sums file sizes.
|
|
Any errors (missing directories, permission issues, broken links) are ignored.
|
|
"""
|
|
|
|
total = 0
|
|
root = get_company_upload_dir(upload_root, company_id)
|
|
try:
|
|
if not os.path.isdir(root):
|
|
return 0
|
|
except Exception:
|
|
return 0
|
|
|
|
for base, _dirs, files in os.walk(root):
|
|
for fn in files:
|
|
try:
|
|
p = os.path.join(base, fn)
|
|
if os.path.isfile(p):
|
|
total += os.path.getsize(p)
|
|
except Exception:
|
|
# Ignore unreadable files
|
|
continue
|
|
return int(total)
|
|
|
|
|
|
def compute_storage_usage(*, used_bytes: int, max_bytes: int | None):
|
|
"""Compute storage usage info.
|
|
|
|
Args:
|
|
used_bytes: current usage
|
|
max_bytes: quota; if None or <=0: unlimited
|
|
|
|
Returns dict:
|
|
{
|
|
"max_bytes": int|None,
|
|
"used_bytes": int,
|
|
"is_limited": bool,
|
|
"is_exceeded": bool,
|
|
"used_ratio": float|None, # 0..1 when limited
|
|
"used_percent": int|None, # rounded percent when limited
|
|
"remaining_bytes": int|None,
|
|
}
|
|
"""
|
|
|
|
used = max(0, int(used_bytes or 0))
|
|
mx = None if max_bytes is None else int(max_bytes)
|
|
if mx is None or mx <= 0:
|
|
return {
|
|
"max_bytes": None,
|
|
"used_bytes": used,
|
|
"is_limited": False,
|
|
"is_exceeded": False,
|
|
"used_ratio": None,
|
|
"used_percent": None,
|
|
"remaining_bytes": None,
|
|
}
|
|
|
|
ratio = used / mx if mx > 0 else 1.0
|
|
percent = int(round(ratio * 100.0))
|
|
return {
|
|
"max_bytes": mx,
|
|
"used_bytes": used,
|
|
"is_limited": True,
|
|
"is_exceeded": used >= mx,
|
|
# Keep percent un-clamped so the UI can show e.g. 132% when exceeded.
|
|
# Clamp only the ratio (for progress bars, etc.).
|
|
"used_ratio": max(0.0, min(1.0, ratio)),
|
|
"used_percent": max(0, percent),
|
|
"remaining_bytes": max(0, mx - used),
|
|
}
|
|
|
|
|
|
def is_valid_upload_relpath(file_path: str | None) -> bool:
|
|
"""True if file_path looks like a path we manage under /static.
|
|
|
|
Supports both layouts:
|
|
- uploads/<filename>
|
|
- uploads/<company_id>/<filename>
|
|
"""
|
|
|
|
if not file_path:
|
|
return False
|
|
fp = (file_path or "").replace("\\", "/")
|
|
if not fp.startswith("uploads/"):
|
|
return False
|
|
# Prevent weird absolute/relative tricks; we only allow a normal relative path.
|
|
if fp.startswith("uploads//") or ":" in fp:
|
|
return False
|
|
# No parent dir segments.
|
|
if "../" in fp or fp.endswith("/..") or fp.startswith("../"):
|
|
return False
|
|
return True
|
|
|
|
|
|
def abs_upload_path(upload_root: str, file_path: str | None) -> str | None:
|
|
"""Resolve an item.file_path (uploads/...) to an absolute file path.
|
|
|
|
Returns None if file_path is not a managed uploads path.
|
|
"""
|
|
|
|
if not is_valid_upload_relpath(file_path):
|
|
return None
|
|
rel = (file_path or "").replace("\\", "/")
|
|
rel = rel.split("uploads/", 1)[1]
|
|
|
|
# Split into segments and harden against traversal.
|
|
parts = [p for p in rel.split("/") if p]
|
|
if not parts:
|
|
return None
|
|
|
|
candidate = os.path.abspath(os.path.join(upload_root, *parts))
|
|
root_abs = os.path.abspath(upload_root)
|
|
|
|
# Ensure resolved path stays inside upload_root.
|
|
if os.path.commonpath([candidate, root_abs]) != root_abs:
|
|
return None
|
|
return candidate
|