"""packs-lab API 엔드포인트. - POST /api/packs/sign-link — Vercel HMAC 인증 → DSM 공유 링크 - POST /api/packs/upload — 일회성 토큰 인증 → multipart 저장 + supabase INSERT - GET /api/packs/list — Vercel HMAC 인증 → pack_files 전체 조회 - DELETE /api/packs/{file_id} — Vercel HMAC 인증 → soft delete + DSM 공유 정리 """ import logging import os import re import time import uuid from datetime import datetime, timezone from pathlib import Path from fastapi import APIRouter, File, Header, HTTPException, Request, UploadFile from supabase import Client, create_client from .auth import mint_upload_token, verify_request_hmac, verify_upload_token from .dsm_client import DSMError, create_share_link from .models import ( MintTokenRequest, MintTokenResponse, PackFileItem, SignLinkRequest, SignLinkResponse, UploadResponse, ) logger = logging.getLogger("packs-lab.routes") router = APIRouter(prefix="/api/packs") PACK_BASE_DIR = Path("/volume1/docker/webpage/media/packs") ALLOWED_EXT = {"pdf", "zip", "mp4", "mov", "mkv", "wav", "m4a", "mp3", "png", "jpg", "jpeg", "webp", "prj"} MAX_BYTES = 5 * 1024 * 1024 * 1024 # 5GB SAFE_FILENAME = re.compile(r"^[\w가-힣\-\.\(\)\s]+$") UPLOAD_TOKEN_TTL_SEC = int(os.getenv("UPLOAD_TOKEN_TTL_SEC", "1800")) # 30분 default def _supabase() -> Client: url = os.getenv("SUPABASE_URL", "") key = os.getenv("SUPABASE_SERVICE_KEY", "") if not url or not key: raise HTTPException(status_code=503, detail="Supabase 설정 미완료") return create_client(url, key) def _check_filename(filename: str) -> str: if not SAFE_FILENAME.match(filename): raise HTTPException(status_code=400, detail="파일명에 허용되지 않은 문자가 포함되어 있습니다") if "/" in filename or "\\" in filename or filename.startswith("."): raise HTTPException(status_code=400, detail="잘못된 파일명") ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" if ext not in ALLOWED_EXT: raise HTTPException(status_code=400, detail=f"허용되지 않은 확장자: {ext}") return filename @router.post("/sign-link", response_model=SignLinkResponse) async def sign_link( request: Request, x_timestamp: str = Header(""), x_signature: str = Header(""), ): body = await request.body() verify_request_hmac(body, x_timestamp, x_signature) payload = SignLinkRequest.model_validate_json(body) # 경로 안전: PACK_BASE_DIR 하위인지 확인 abs_path = Path(payload.file_path).resolve() if not str(abs_path).startswith(str(PACK_BASE_DIR)): raise HTTPException(status_code=400, detail="허용된 경로 외부") try: url, expires_at = await create_share_link(str(abs_path), payload.expires_in_seconds) except DSMError as e: logger.error("DSM 오류: %s", e) raise HTTPException(status_code=502, detail=f"DSM 오류: {e}") return SignLinkResponse(url=url, expires_at=expires_at) @router.post("/admin/mint-token", response_model=MintTokenResponse) async def mint_token( request: Request, x_timestamp: str = Header(""), x_signature: str = Header(""), ): body = await request.body() verify_request_hmac(body, x_timestamp, x_signature) payload = MintTokenRequest.model_validate_json(body) _check_filename(payload.filename) jti = str(uuid.uuid4()) expires_ts = int(time.time()) + UPLOAD_TOKEN_TTL_SEC token = mint_upload_token({ "tier": payload.tier, "label": payload.label, "filename": payload.filename, "size_bytes": payload.size_bytes, "jti": jti, "expires_at": expires_ts, }) return MintTokenResponse( token=token, expires_at=datetime.fromtimestamp(expires_ts, tz=timezone.utc), jti=jti, ) @router.post("/upload", response_model=UploadResponse) async def upload( file: UploadFile = File(...), authorization: str = Header(""), ): if not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Authorization 헤더 누락") token = authorization[len("Bearer "):] payload = verify_upload_token(token) tier = payload["tier"] label = payload["label"] filename = _check_filename(payload["filename"]) expected_size = int(payload["size_bytes"]) tier_dir = PACK_BASE_DIR / tier tier_dir.mkdir(parents=True, exist_ok=True) target = tier_dir / filename if target.exists(): raise HTTPException(status_code=409, detail="이미 존재하는 파일명입니다. 다른 이름으로 업로드하거나 기존 파일을 먼저 삭제하세요") # multipart 스트림 저장 + 크기 검증 written = 0 with target.open("wb") as f: while True: chunk = await file.read(1024 * 1024) if not chunk: break written += len(chunk) if written > MAX_BYTES: f.close() target.unlink(missing_ok=True) raise HTTPException(status_code=413, detail="파일 크기 5GB 초과") f.write(chunk) if written != expected_size: target.unlink(missing_ok=True) raise HTTPException(status_code=400, detail=f"실제 크기({written})와 토큰 크기({expected_size}) 불일치") # supabase INSERT sb = _supabase() file_id = str(uuid.uuid4()) res = sb.table("pack_files").insert({ "id": file_id, "min_tier": tier, "label": label, "file_path": str(target), "filename": filename, "size_bytes": written, }).execute() if not res.data: target.unlink(missing_ok=True) raise HTTPException(status_code=500, detail="DB INSERT 실패") return UploadResponse( file_id=file_id, file_path=str(target), filename=filename, size_bytes=written, min_tier=tier, label=label, uploaded_at=res.data[0]["uploaded_at"], ) @router.get("/list", response_model=list[PackFileItem]) async def list_files( request: Request, x_timestamp: str = Header(""), x_signature: str = Header(""), ): body = await request.body() verify_request_hmac(body, x_timestamp, x_signature) sb = _supabase() res = ( sb.table("pack_files") .select("*") .is_("deleted_at", "null") .order("min_tier") .order("sort_order") .execute() ) return [PackFileItem(**r) for r in (res.data or [])] @router.delete("/{file_id}") async def delete_file( file_id: str, request: Request, x_timestamp: str = Header(""), x_signature: str = Header(""), ): body = await request.body() verify_request_hmac(body, x_timestamp, x_signature) sb = _supabase() res = sb.table("pack_files").update({ "deleted_at": datetime.now(timezone.utc).isoformat(), }).eq("id", file_id).execute() if not res.data: raise HTTPException(status_code=404, detail="파일을 찾을 수 없습니다") return {"ok": True}