"""HMAC 인증. - verify_request_hmac: Vercel ↔ backend 요청 검증. Vercel이 X-Timestamp + X-Signature 헤더로 보냄. signature = HMAC(timestamp.body, secret). 요청은 5분 이내여야 함 (replay 방어). - mint_upload_token / verify_upload_token: admin 5GB 업로드 일회성 토큰. Vercel이 발급, browser가 web-backend에 직접 multipart POST 시 Authorization: Bearer . JTI 단발성으로 재사용 차단. """ import base64 import hashlib import hmac import json import os import time from threading import Lock from fastapi import HTTPException _SECRET = os.getenv("BACKEND_HMAC_SECRET", "") REQUEST_MAX_AGE_SEC = 300 # 5분 # JTI 단발성 set (in-memory, 단일 컨테이너 가정) _used_jti: set[str] = set() _jti_lock = Lock() def _sign(payload: bytes) -> str: if not _SECRET: raise HTTPException(status_code=503, detail="BACKEND_HMAC_SECRET 미설정") return hmac.new(_SECRET.encode(), payload, hashlib.sha256).hexdigest() def verify_request_hmac(body: bytes, timestamp: str, signature: str) -> None: """Vercel → backend 요청 시그니처 검증.""" if not timestamp or not signature: raise HTTPException(status_code=401, detail="HMAC 헤더 누락") try: ts = int(timestamp) except ValueError: raise HTTPException(status_code=401, detail="잘못된 timestamp") age = abs(int(time.time()) - ts) if age > REQUEST_MAX_AGE_SEC: raise HTTPException(status_code=401, detail="요청이 만료됨") expected = _sign(timestamp.encode() + b"." + body) if not hmac.compare_digest(expected, signature): raise HTTPException(status_code=401, detail="HMAC 시그니처 불일치") def mint_upload_token(payload: dict) -> str: """일회성 업로드 토큰 발급. payload는 expires_at + jti 포함해야 함.""" body = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode() sig = _sign(body) return base64.urlsafe_b64encode(body).decode() + "." + sig def verify_upload_token(token: str) -> dict: """업로드 토큰 검증 + jti 사용 마킹.""" try: b64, sig = token.split(".", 1) body = base64.urlsafe_b64decode(b64.encode()) except Exception: raise HTTPException(status_code=401, detail="잘못된 토큰 포맷") expected = _sign(body) if not hmac.compare_digest(expected, sig): raise HTTPException(status_code=401, detail="토큰 시그니처 불일치") payload = json.loads(body) expires_at = payload.get("expires_at", 0) if int(time.time()) > expires_at: raise HTTPException(status_code=401, detail="토큰 만료") jti = payload.get("jti") if not jti: raise HTTPException(status_code=401, detail="jti 누락") with _jti_lock: if jti in _used_jti: raise HTTPException(status_code=409, detail="이미 사용된 토큰") _used_jti.add(jti) return payload