Files
web-page-backend/packs-lab/app/auth.py
gahusb c8ce6cb617 fix(packs-lab): 일회성 토큰 jti 영속화 (SQLite) — 재시작 replay 방어 유지
인메모리 _used_jti set은 컨테이너 재시작 시 비워져 TTL 내 토큰 replay가 가능했음(webhook 배포가 잦아 실재 구멍). 영속 볼륨(PACK_BASE_DIR)의 jti_store.db에 사용 jti를 기록(PK 원자성), 만료 항목은 lazy 정리. verify_upload_token이 jti_store.consume 사용. TDD 3 + 기존 replay 테스트 보존.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 15:08:02 +09:00

93 lines
3.3 KiB
Python

"""HMAC 인증.
- verify_request_hmac: Vercel ↔ backend 요청 검증.
Vercel이 X-Timestamp + X-Signature 헤더로 보냄. signature = HMAC(timestamp.body, secret).
요청은 5분 이내여야 함 (replay 방어).
- mint_upload_token / verify_upload_token: admin 5GB 업로드 일회성 토큰.
Vercel이 발급, browser가 web-backend에 직접 multipart POST 시 Authorization: Bearer <token>.
JTI 단발성으로 재사용 차단.
"""
import base64
import hashlib
import hmac
import json
import os
import time
from fastapi import HTTPException
from . import jti_store
_SECRET = os.getenv("BACKEND_HMAC_SECRET", "")
REQUEST_MAX_AGE_SEC = 300 # 5분
def _sign(payload: bytes) -> str:
if not _SECRET:
raise HTTPException(status_code=503, detail="BACKEND_HMAC_SECRET 미설정")
return hmac.new(_SECRET.encode(), payload, hashlib.sha256).hexdigest()
def verify_request_hmac(body: bytes, timestamp: str, signature: str) -> None:
"""Vercel → backend 요청 시그니처 검증."""
if not timestamp or not signature:
raise HTTPException(status_code=401, detail="HMAC 헤더 누락")
try:
ts = int(timestamp)
except ValueError:
raise HTTPException(status_code=401, detail="잘못된 timestamp")
age = abs(int(time.time()) - ts)
if age > REQUEST_MAX_AGE_SEC:
raise HTTPException(status_code=401, detail="요청이 만료됨")
expected = _sign(timestamp.encode() + b"." + body)
if not hmac.compare_digest(expected, signature):
raise HTTPException(status_code=401, detail="HMAC 시그니처 불일치")
def mint_upload_token(payload: dict) -> str:
"""일회성 업로드 토큰 발급. payload는 expires_at + jti 포함해야 함."""
body = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode()
sig = _sign(body)
return base64.urlsafe_b64encode(body).decode() + "." + sig
def _decode_upload_token(token: str) -> dict:
"""토큰 시그니처 + 만료 + jti 존재만 검증. JTI 마킹 없음."""
try:
b64, sig = token.split(".", 1)
body = base64.urlsafe_b64decode(b64.encode())
except Exception:
raise HTTPException(status_code=401, detail="잘못된 토큰 포맷")
expected = _sign(body)
if not hmac.compare_digest(expected, sig):
raise HTTPException(status_code=401, detail="토큰 시그니처 불일치")
payload = json.loads(body)
expires_at = payload.get("expires_at", 0)
if int(time.time()) > expires_at:
raise HTTPException(status_code=401, detail="토큰 만료")
if not payload.get("jti"):
raise HTTPException(status_code=401, detail="jti 누락")
return payload
def verify_upload_token(token: str) -> dict:
"""업로드 토큰 검증 + jti 사용 마킹. single-shot 업로드와 chunked init에서만 사용."""
payload = _decode_upload_token(token)
jti = payload["jti"]
# 영속 저장소에 사용 마킹 (재시작에도 단발성 유지). 이미 사용됐으면 False.
if not jti_store.consume(jti, int(payload["expires_at"])):
raise HTTPException(status_code=409, detail="이미 사용된 토큰")
return payload
def verify_upload_token_no_consume(token: str) -> dict:
"""업로드 토큰 검증만 (jti consume 없음). chunked upload chunk/complete/abort/status에 사용."""
return _decode_upload_token(token)