Files
web-page-backend/packs-lab/app/routes.py
gahusb 1e5e1bcdff fix(packs-lab): sign-link path traversal — startswith → relative_to (CODE_REVIEW F1)
str(abs_path).startswith(str(PACK_HOST_DIR))는 trailing slash가 없어
sibling 경로(/foo/packs ↔ /foo/packs_evil)를 통과시켜 DSM API에 잘못된
호스트 경로를 전달할 수 있었음. Path.relative_to 기반으로 컴포넌트 단위
엄격 검증으로 교체. test_sign_link_rejects_sibling_path 회귀 테스트
추가 (RED → GREEN 검증).
2026-05-17 13:50:22 +09:00

447 lines
16 KiB
Python

"""packs-lab API 엔드포인트.
- POST /api/packs/sign-link — Vercel HMAC 인증 → DSM 공유 링크
- POST /api/packs/admin/mint-token — Vercel HMAC 인증 → 일회성 upload 토큰
- POST /api/packs/upload — 일회성 토큰 인증 → multipart 저장 + supabase INSERT (single-shot)
- POST /api/packs/upload/init — 일회성 토큰 인증 → chunked upload 세션 초기화
- PUT /api/packs/upload/{session_id}/chunk — 동일 토큰 + offset → 부분파일 append
- POST /api/packs/upload/{session_id}/complete — 동일 토큰 → 완료 + supabase INSERT
- GET /api/packs/upload/{session_id}/status — 현재 written 조회 (재개용)
- DELETE /api/packs/upload/{session_id} — 세션 중단 + 부분파일 정리
- GET /api/packs/list — Vercel HMAC 인증 → pack_files 전체 조회
- DELETE /api/packs/{file_id} — Vercel HMAC 인증 → soft delete (DSM 공유는 자동 만료)
"""
import json
import logging
import os
import re
import shutil
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from fastapi import APIRouter, File, Header, HTTPException, Request, UploadFile
from supabase import Client, create_client
from .auth import (
mint_upload_token,
verify_request_hmac,
verify_upload_token,
verify_upload_token_no_consume,
)
from .dsm_client import DSMError, create_share_link
from .models import (
ChunkUploadResponse,
InitUploadResponse,
MintTokenRequest,
MintTokenResponse,
PackFileItem,
SignLinkRequest,
SignLinkResponse,
UploadResponse,
)
logger = logging.getLogger("packs-lab.routes")
router = APIRouter(prefix="/api/packs")
PACK_BASE_DIR = Path(os.getenv("PACK_BASE_DIR", "/app/data/packs"))
# DSM·Supabase에 노출되는 NAS 호스트 절대경로. 컨테이너 내부 PACK_BASE_DIR과 같은 디렉토리를
# 호스트 시점에서 가리켜야 한다 (docker volume 마운트의 호스트 측 경로). 미설정 시 PACK_BASE_DIR로
# fallback — 로컬 개발용. 운영 NAS에서는 반드시 PACK_HOST_DIR=/volume1/docker/webpage/media/packs.
PACK_HOST_DIR = Path(os.getenv("PACK_HOST_DIR", str(PACK_BASE_DIR)))
ALLOWED_EXT = {"pdf", "zip", "mp4", "mov", "mkv", "wav", "m4a", "mp3", "png", "jpg", "jpeg", "webp", "prj"}
MAX_BYTES = 5 * 1024 * 1024 * 1024 # 5GB
SAFE_FILENAME = re.compile(r"^[\w가-힣\-\.\(\)\s]+$")
UPLOAD_TOKEN_TTL_SEC = int(os.getenv("UPLOAD_TOKEN_TTL_SEC", "1800")) # 30분 default
CHUNK_MAX_SIZE = int(os.getenv("PACK_CHUNK_MAX_SIZE", str(64 * 1024 * 1024))) # 64MB default
SESSIONS_DIR_NAME = ".uploads"
def _sessions_root() -> Path:
return PACK_BASE_DIR / SESSIONS_DIR_NAME
def _session_dir(jti: str) -> Path:
# jti는 uuid4 형식이라 path traversal 위험 없음. 안전을 위해 추가 검증.
if not re.match(r"^[0-9a-fA-F\-]{1,64}$", jti):
raise HTTPException(status_code=400, detail="잘못된 session_id")
return _sessions_root() / jti
def _session_meta_path(jti: str) -> Path:
return _session_dir(jti) / "meta.json"
def _session_data_path(jti: str) -> Path:
return _session_dir(jti) / "data.part"
def _load_session(jti: str) -> dict:
meta_file = _session_meta_path(jti)
if not meta_file.exists():
raise HTTPException(status_code=404, detail="업로드 세션을 찾을 수 없습니다")
return json.loads(meta_file.read_text(encoding="utf-8"))
def _save_session(jti: str, meta: dict) -> None:
_session_meta_path(jti).write_text(json.dumps(meta), encoding="utf-8")
def _cleanup_session(jti: str) -> None:
shutil.rmtree(_session_dir(jti), ignore_errors=True)
def _verify_session_token(authorization: str, session_id: str) -> dict:
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Authorization 헤더 누락")
token = authorization[len("Bearer "):]
payload = verify_upload_token_no_consume(token)
if payload.get("jti") != session_id:
raise HTTPException(status_code=403, detail="토큰과 세션 ID 불일치")
return payload
def _supabase() -> Client:
url = os.getenv("SUPABASE_URL", "")
key = os.getenv("SUPABASE_SERVICE_KEY", "")
if not url or not key:
raise HTTPException(status_code=503, detail="Supabase 설정 미완료")
return create_client(url, key)
def _check_filename(filename: str) -> str:
if not SAFE_FILENAME.match(filename):
raise HTTPException(status_code=400, detail="파일명에 허용되지 않은 문자가 포함되어 있습니다")
if "/" in filename or "\\" in filename or filename.startswith("."):
raise HTTPException(status_code=400, detail="잘못된 파일명")
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if ext not in ALLOWED_EXT:
raise HTTPException(status_code=400, detail=f"허용되지 않은 확장자: {ext}")
return filename
@router.post("/sign-link", response_model=SignLinkResponse)
async def sign_link(
request: Request,
x_timestamp: str = Header(""),
x_signature: str = Header(""),
):
body = await request.body()
verify_request_hmac(body, x_timestamp, x_signature)
payload = SignLinkRequest.model_validate_json(body)
# 경로 안전: PACK_HOST_DIR(NAS 호스트 절대경로) 하위인지 확인.
# file_path는 upload 라우트가 Supabase에 저장한 호스트경로 그대로 전달되어 DSM API에 사용됨.
# str.startswith는 '/foo/packs' 와 '/foo/packs_evil' 같은 sibling 경로를 통과시키므로
# Path.relative_to로 엄격하게 컴포넌트 단위 검증한다 (CODE_REVIEW F1).
abs_path = Path(payload.file_path).resolve()
try:
abs_path.relative_to(PACK_HOST_DIR.resolve())
except ValueError:
raise HTTPException(status_code=400, detail="허용된 경로 외부")
try:
url, expires_at = await create_share_link(str(abs_path), payload.expires_in_seconds)
except DSMError as e:
logger.error("DSM 오류: %s", e)
raise HTTPException(status_code=502, detail=f"DSM 오류: {e}")
return SignLinkResponse(url=url, expires_at=expires_at)
@router.post("/admin/mint-token", response_model=MintTokenResponse)
async def mint_token(
request: Request,
x_timestamp: str = Header(""),
x_signature: str = Header(""),
):
body = await request.body()
verify_request_hmac(body, x_timestamp, x_signature)
payload = MintTokenRequest.model_validate_json(body)
_check_filename(payload.filename)
jti = str(uuid.uuid4())
expires_ts = int(time.time()) + UPLOAD_TOKEN_TTL_SEC
token = mint_upload_token({
"tier": payload.tier,
"label": payload.label,
"filename": payload.filename,
"size_bytes": payload.size_bytes,
"jti": jti,
"expires_at": expires_ts,
})
return MintTokenResponse(
token=token,
expires_at=datetime.fromtimestamp(expires_ts, tz=timezone.utc),
jti=jti,
)
@router.post("/upload", response_model=UploadResponse)
async def upload(
file: UploadFile = File(...),
authorization: str = Header(""),
):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Authorization 헤더 누락")
token = authorization[len("Bearer "):]
payload = verify_upload_token(token)
tier = payload["tier"]
label = payload["label"]
filename = _check_filename(payload["filename"])
expected_size = int(payload["size_bytes"])
# tier 디렉토리는 만들지 않고 PACK_BASE_DIR 평면 구조에 저장. tier 구분은 filename 규칙으로.
PACK_BASE_DIR.mkdir(parents=True, exist_ok=True)
target = PACK_BASE_DIR / filename
if target.exists():
raise HTTPException(status_code=409, detail="이미 존재하는 파일명입니다. 다른 이름으로 업로드하거나 기존 파일을 먼저 삭제하세요")
upload_committed = False
try:
# multipart 스트림 저장 + 크기 검증
written = 0
with target.open("wb") as f:
while True:
chunk = await file.read(1024 * 1024)
if not chunk:
break
written += len(chunk)
if written > MAX_BYTES:
raise HTTPException(status_code=413, detail="파일 크기 5GB 초과")
f.write(chunk)
if written != expected_size:
raise HTTPException(status_code=400, detail=f"실제 크기({written})와 토큰 크기({expected_size}) 불일치")
# Supabase·DSM에 노출되는 file_path는 NAS 호스트 절대경로여야 한다.
# 컨테이너 경로(target)는 마운트된 호스트경로의 다른 시점일 뿐이라, 같은 디렉토리 구조를 보유.
host_path = PACK_HOST_DIR / filename
# supabase INSERT
sb = _supabase()
file_id = str(uuid.uuid4())
try:
res = sb.table("pack_files").insert({
"id": file_id,
"min_tier": tier,
"label": label,
"file_path": str(host_path),
"filename": filename,
"size_bytes": written,
}).execute()
except Exception as e:
logger.exception("Supabase INSERT 예외: filename=%s", filename)
raise HTTPException(status_code=500, detail=f"DB INSERT 실패: {e}") from e
if not res.data:
raise HTTPException(status_code=500, detail="DB INSERT 실패")
upload_committed = True
return UploadResponse(
file_id=file_id,
file_path=str(host_path),
filename=filename,
size_bytes=written,
min_tier=tier,
label=label,
uploaded_at=res.data[0]["uploaded_at"],
)
finally:
if not upload_committed and target.exists():
try:
target.unlink()
logger.warning("업로드 실패로 부분 파일 정리: %s", target)
except Exception as e:
logger.exception("부분 파일 정리 실패: %s%s", target, e)
# ── Chunked upload (resumable) ──────────────────────────────────────────────
# mint-token이 발급한 동일 토큰을 init → chunk* → complete 전 흐름에서 재사용한다.
# jti = session_id. init에서만 jti consume, chunk/complete/abort는 no-consume 검증.
@router.post("/upload/init", response_model=InitUploadResponse)
async def upload_init(authorization: str = Header("")):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Authorization 헤더 누락")
token = authorization[len("Bearer "):]
payload = verify_upload_token(token) # init만 jti consume
tier = payload["tier"]
label = payload["label"]
filename = _check_filename(payload["filename"])
expected_size = int(payload["size_bytes"])
jti = payload["jti"]
PACK_BASE_DIR.mkdir(parents=True, exist_ok=True)
if (PACK_BASE_DIR / filename).exists():
raise HTTPException(status_code=409, detail="이미 존재하는 파일명입니다")
sdir = _session_dir(jti)
if sdir.exists():
raise HTTPException(status_code=409, detail="이미 시작된 세션입니다")
sdir.mkdir(parents=True, exist_ok=True)
_session_data_path(jti).touch()
_save_session(jti, {
"filename": filename,
"expected_size": expected_size,
"tier": tier,
"label": label,
"written": 0,
"expires_at": int(payload["expires_at"]),
})
return InitUploadResponse(
session_id=jti,
chunk_max_size=CHUNK_MAX_SIZE,
expected_size=expected_size,
expires_at=datetime.fromtimestamp(payload["expires_at"], tz=timezone.utc),
)
@router.put("/upload/{session_id}/chunk", response_model=ChunkUploadResponse)
async def upload_chunk(
session_id: str,
request: Request,
offset: int = 0,
authorization: str = Header(""),
):
_verify_session_token(authorization, session_id)
meta = _load_session(session_id)
if offset != meta["written"]:
raise HTTPException(
status_code=409,
detail=f"offset {offset} 불일치 (현재 written={meta['written']})",
headers={"X-Current-Offset": str(meta["written"])},
)
body = await request.body()
if not body:
raise HTTPException(status_code=400, detail="청크가 비어 있음")
if len(body) > CHUNK_MAX_SIZE:
raise HTTPException(status_code=413, detail=f"청크 크기 {CHUNK_MAX_SIZE} 초과")
if meta["written"] + len(body) > meta["expected_size"]:
raise HTTPException(status_code=413, detail="누적 크기 expected_size 초과")
with _session_data_path(session_id).open("ab") as f:
f.write(body)
meta["written"] += len(body)
_save_session(session_id, meta)
return ChunkUploadResponse(written=meta["written"], expected_size=meta["expected_size"])
@router.get("/upload/{session_id}/status", response_model=ChunkUploadResponse)
async def upload_status(
session_id: str,
authorization: str = Header(""),
):
_verify_session_token(authorization, session_id)
meta = _load_session(session_id)
return ChunkUploadResponse(written=meta["written"], expected_size=meta["expected_size"])
@router.post("/upload/{session_id}/complete", response_model=UploadResponse)
async def upload_complete(
session_id: str,
authorization: str = Header(""),
):
_verify_session_token(authorization, session_id)
meta = _load_session(session_id)
if meta["written"] != meta["expected_size"]:
raise HTTPException(
status_code=400,
detail=f"미완료: written={meta['written']} expected={meta['expected_size']}",
)
filename = meta["filename"]
target = PACK_BASE_DIR / filename
if target.exists():
raise HTTPException(status_code=409, detail="이미 존재하는 파일명입니다")
data_file = _session_data_path(session_id)
data_file.replace(target) # atomic rename within same FS
host_path = PACK_HOST_DIR / filename
sb = _supabase()
file_id = str(uuid.uuid4())
try:
res = sb.table("pack_files").insert({
"id": file_id,
"min_tier": meta["tier"],
"label": meta["label"],
"file_path": str(host_path),
"filename": filename,
"size_bytes": meta["written"],
}).execute()
except Exception as e:
logger.exception("Supabase INSERT 예외 (chunked complete): filename=%s", filename)
target.unlink(missing_ok=True)
raise HTTPException(status_code=500, detail=f"DB INSERT 실패: {e}") from e
if not res.data:
target.unlink(missing_ok=True)
raise HTTPException(status_code=500, detail="DB INSERT 실패")
_cleanup_session(session_id)
return UploadResponse(
file_id=file_id,
file_path=str(host_path),
filename=filename,
size_bytes=meta["written"],
min_tier=meta["tier"],
label=meta["label"],
uploaded_at=res.data[0]["uploaded_at"],
)
@router.delete("/upload/{session_id}")
async def upload_abort(
session_id: str,
authorization: str = Header(""),
):
_verify_session_token(authorization, session_id)
_cleanup_session(session_id)
return {"ok": True}
@router.get("/list", response_model=list[PackFileItem])
async def list_files(
request: Request,
x_timestamp: str = Header(""),
x_signature: str = Header(""),
):
body = await request.body()
verify_request_hmac(body, x_timestamp, x_signature)
sb = _supabase()
res = (
sb.table("pack_files")
.select("*")
.is_("deleted_at", "null")
.order("min_tier")
.order("sort_order")
.execute()
)
return [PackFileItem(**r) for r in (res.data or [])]
@router.delete("/{file_id}")
async def delete_file(
file_id: str,
request: Request,
x_timestamp: str = Header(""),
x_signature: str = Header(""),
):
body = await request.body()
verify_request_hmac(body, x_timestamp, x_signature)
sb = _supabase()
res = sb.table("pack_files").update({
"deleted_at": datetime.now(timezone.utc).isoformat(),
}).eq("id", file_id).execute()
if not res.data:
raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다")
return {"ok": True}