feat(packs-lab): DSM 호출 retry/backoff + 업로드 cleanup 보강

- dsm_client.py: _request_with_retry()로 5xx·transport·timeout만 지수백오프
  재시도 (DSM_MAX_RETRIES, DSM_BACKOFF_SEC env). DSM error code 응답 본문 로깅.
- routes.py: upload 핸들러를 try/finally로 감싸 부분파일 정리 보장, Supabase
  INSERT 호출 자체에 try/except 추가해 네트워크 예외도 cleanup.
- test_dsm_client.py: retry 시나리오 4종 추가 (5xx→성공/소진/transport
  error/4xx no-retry). 전체 29 테스트 pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-12 02:31:39 +09:00
parent a826e00399
commit 448dbd5f48
3 changed files with 217 additions and 50 deletions

View File

@@ -4,6 +4,7 @@
- create_share_link(file_path, expires_in_sec) -> share URL - create_share_link(file_path, expires_in_sec) -> share URL
""" """
import asyncio
import logging import logging
import os import os
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@@ -18,6 +19,8 @@ DSM_PASS = os.getenv("DSM_PASS", "")
# LAN IP로 DSM 접근 시 self-signed cert가 IP에 매칭 안 되어 검증 실패. LAN 내부 통신이라 false 허용. # LAN IP로 DSM 접근 시 self-signed cert가 IP에 매칭 안 되어 검증 실패. LAN 내부 통신이라 false 허용.
# 운영에서 LAN IP + self-signed면 DSM_VERIFY_SSL=false. 도메인 + 정상 cert면 기본값(true) 유지. # 운영에서 LAN IP + self-signed면 DSM_VERIFY_SSL=false. 도메인 + 정상 cert면 기본값(true) 유지.
DSM_VERIFY_SSL = os.getenv("DSM_VERIFY_SSL", "true").strip().lower() != "false" DSM_VERIFY_SSL = os.getenv("DSM_VERIFY_SSL", "true").strip().lower() != "false"
DSM_MAX_RETRIES = max(1, int(os.getenv("DSM_MAX_RETRIES", "3")))
DSM_BACKOFF_SEC = float(os.getenv("DSM_BACKOFF_SEC", "0.5"))
API_AUTH = "/webapi/auth.cgi" API_AUTH = "/webapi/auth.cgi"
API_SHARE = "/webapi/entry.cgi" API_SHARE = "/webapi/entry.cgi"
@@ -27,13 +30,45 @@ class DSMError(RuntimeError):
pass pass
async def _request_with_retry(
client: httpx.AsyncClient,
url: str,
params: dict,
timeout: float,
) -> httpx.Response:
"""5xx · transport · timeout만 지수백오프 retry. 4xx와 DSM success=false는 호출자가 판단."""
last_exc: Exception | None = None
for attempt in range(DSM_MAX_RETRIES):
try:
r = await client.get(url, params=params, timeout=timeout)
if r.status_code < 500:
return r
last_exc = httpx.HTTPStatusError(
f"HTTP {r.status_code}", request=r.request, response=r
)
logger.warning(
"DSM HTTP %s — attempt %s/%s body=%s",
r.status_code, attempt + 1, DSM_MAX_RETRIES, r.text[:200],
)
except (httpx.TransportError, httpx.TimeoutException) as e:
last_exc = e
logger.warning(
"DSM transport error: %s — attempt %s/%s",
e, attempt + 1, DSM_MAX_RETRIES,
)
if attempt < DSM_MAX_RETRIES - 1:
await asyncio.sleep(DSM_BACKOFF_SEC * (2 ** attempt))
raise DSMError(f"DSM 요청 실패 (재시도 {DSM_MAX_RETRIES}회): {last_exc}")
async def _login(client: httpx.AsyncClient) -> str: async def _login(client: httpx.AsyncClient) -> str:
"""DSM 세션 sid 반환.""" """DSM 세션 sid 반환."""
if not all([DSM_HOST, DSM_USER, DSM_PASS]): if not all([DSM_HOST, DSM_USER, DSM_PASS]):
raise DSMError("DSM 환경변수 미설정") raise DSMError("DSM 환경변수 미설정")
r = await client.get( r = await _request_with_retry(
client,
f"{DSM_HOST}{API_AUTH}", f"{DSM_HOST}{API_AUTH}",
params={ {
"api": "SYNO.API.Auth", "api": "SYNO.API.Auth",
"version": "7", "version": "7",
"method": "login", "method": "login",
@@ -42,12 +77,14 @@ async def _login(client: httpx.AsyncClient) -> str:
"session": "FileStation", "session": "FileStation",
"format": "sid", "format": "sid",
}, },
timeout=15.0, 15.0,
) )
r.raise_for_status() r.raise_for_status()
data = r.json() data = r.json()
if not data.get("success"): if not data.get("success"):
raise DSMError(f"DSM login 실패: {data.get('error')}") err = data.get("error", {})
logger.error("DSM login 실패: code=%s error=%s", err.get("code"), err)
raise DSMError(f"DSM login 실패: code={err.get('code')} error={err}")
return data["data"]["sid"] return data["data"]["sid"]
@@ -80,9 +117,10 @@ async def create_share_link(file_path: str, expires_in_sec: int = 14400) -> tupl
async with httpx.AsyncClient(verify=DSM_VERIFY_SSL) as client: async with httpx.AsyncClient(verify=DSM_VERIFY_SSL) as client:
sid = await _login(client) sid = await _login(client)
try: try:
r = await client.get( r = await _request_with_retry(
client,
f"{DSM_HOST}{API_SHARE}", f"{DSM_HOST}{API_SHARE}",
params={ {
"api": "SYNO.FileStation.Sharing", "api": "SYNO.FileStation.Sharing",
"version": "3", "version": "3",
"method": "create", "method": "create",
@@ -90,16 +128,22 @@ async def create_share_link(file_path: str, expires_in_sec: int = 14400) -> tupl
"date_expired": expire_time_ms, "date_expired": expire_time_ms,
"_sid": sid, "_sid": sid,
}, },
timeout=15.0, 15.0,
) )
r.raise_for_status() r.raise_for_status()
data = r.json() data = r.json()
if not data.get("success"): if not data.get("success"):
raise DSMError(f"DSM Sharing.create 실패: {data.get('error')}") err = data.get("error", {})
logger.error(
"DSM Sharing.create 실패: path=%s code=%s error=%s",
file_path, err.get("code"), err,
)
raise DSMError(f"DSM Sharing.create 실패: code={err.get('code')} error={err}")
links = data["data"]["links"] links = data["data"]["links"]
if not links: if not links:
raise DSMError("Sharing 응답에 링크 없음") raise DSMError("Sharing 응답에 링크 없음")
url = links[0]["url"] url = links[0]["url"]
logger.info("DSM share link created: path=%s", file_path)
return url, expires_at return url, expires_at
finally: finally:
await _logout(client, sid) await _logout(client, sid)

View File

@@ -135,52 +135,62 @@ async def upload(
if target.exists(): if target.exists():
raise HTTPException(status_code=409, detail="이미 존재하는 파일명입니다. 다른 이름으로 업로드하거나 기존 파일을 먼저 삭제하세요") raise HTTPException(status_code=409, detail="이미 존재하는 파일명입니다. 다른 이름으로 업로드하거나 기존 파일을 먼저 삭제하세요")
# multipart 스트림 저장 + 크기 검증 upload_committed = False
written = 0 try:
with target.open("wb") as f: # multipart 스트림 저장 + 크기 검증
while True: written = 0
chunk = await file.read(1024 * 1024) with target.open("wb") as f:
if not chunk: while True:
break chunk = await file.read(1024 * 1024)
written += len(chunk) if not chunk:
if written > MAX_BYTES: break
f.close() written += len(chunk)
target.unlink(missing_ok=True) if written > MAX_BYTES:
raise HTTPException(status_code=413, detail="파일 크기 5GB 초과") raise HTTPException(status_code=413, detail="파일 크기 5GB 초과")
f.write(chunk) f.write(chunk)
if written != expected_size: if written != expected_size:
target.unlink(missing_ok=True) raise HTTPException(status_code=400, detail=f"실제 크기({written})와 토큰 크기({expected_size}) 불일치")
raise HTTPException(status_code=400, detail=f"실제 크기({written})와 토큰 크기({expected_size}) 불일치")
# Supabase·DSM에 노출되는 file_path는 NAS 호스트 절대경로여야 한다. # Supabase·DSM에 노출되는 file_path는 NAS 호스트 절대경로여야 한다.
# 컨테이너 경로(target)는 마운트된 호스트경로의 다른 시점일 뿐이라, 같은 디렉토리 구조를 보유. # 컨테이너 경로(target)는 마운트된 호스트경로의 다른 시점일 뿐이라, 같은 디렉토리 구조를 보유.
host_path = PACK_HOST_DIR / filename host_path = PACK_HOST_DIR / filename
# supabase INSERT # supabase INSERT
sb = _supabase() sb = _supabase()
file_id = str(uuid.uuid4()) file_id = str(uuid.uuid4())
res = sb.table("pack_files").insert({ try:
"id": file_id, res = sb.table("pack_files").insert({
"min_tier": tier, "id": file_id,
"label": label, "min_tier": tier,
"file_path": str(host_path), "label": label,
"filename": filename, "file_path": str(host_path),
"size_bytes": written, "filename": filename,
}).execute() "size_bytes": written,
if not res.data: }).execute()
target.unlink(missing_ok=True) except Exception as e:
raise HTTPException(status_code=500, detail="DB INSERT 실패") logger.exception("Supabase INSERT 예외: filename=%s", filename)
raise HTTPException(status_code=500, detail=f"DB INSERT 실패: {e}") from e
if not res.data:
raise HTTPException(status_code=500, detail="DB INSERT 실패")
return UploadResponse( upload_committed = True
file_id=file_id, return UploadResponse(
file_path=str(host_path), file_id=file_id,
filename=filename, file_path=str(host_path),
size_bytes=written, filename=filename,
min_tier=tier, size_bytes=written,
label=label, min_tier=tier,
uploaded_at=res.data[0]["uploaded_at"], label=label,
) uploaded_at=res.data[0]["uploaded_at"],
)
finally:
if not upload_committed and target.exists():
try:
target.unlink()
logger.warning("업로드 실패로 부분 파일 정리: %s", target)
except Exception as e:
logger.exception("부분 파일 정리 실패: %s%s", target, e)
@router.get("/list", response_model=list[PackFileItem]) @router.get("/list", response_model=list[PackFileItem])

View File

@@ -8,6 +8,13 @@ import httpx
from app.dsm_client import create_share_link, DSMError from app.dsm_client import create_share_link, DSMError
@pytest.fixture(autouse=True)
def _no_backoff(monkeypatch):
"""retry 백오프 sleep 제거 — 테스트 속도."""
from app import dsm_client
monkeypatch.setattr(dsm_client, "DSM_BACKOFF_SEC", 0.0)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def _dsm_env(monkeypatch): def _dsm_env(monkeypatch):
monkeypatch.setenv("DSM_HOST", "https://test-nas:5001") monkeypatch.setenv("DSM_HOST", "https://test-nas:5001")
@@ -109,3 +116,109 @@ def test_dsm_share_failure_logs_out():
assert "login" in call_order assert "login" in call_order
assert "logout" in call_order, "logout이 호출되지 않음 (finally 누락 의심)" assert "logout" in call_order, "logout이 호출되지 않음 (finally 누락 의심)"
def test_retry_on_5xx_then_success(monkeypatch):
"""첫 호출 5xx → retry → 두 번째 200으로 성공."""
from app import dsm_client
monkeypatch.setattr(dsm_client, "DSM_MAX_RETRIES", 3)
login_calls = {"n": 0}
async def fake_get(self, url, *, params=None, **kw):
method = (params or {}).get("method", "")
if method == "login":
login_calls["n"] += 1
if login_calls["n"] == 1:
return _make_response({}, status_code=503)
return _make_response({"success": True, "data": {"sid": "sid-after-retry"}})
if method == "create":
return _make_response({
"success": True,
"data": {"links": [{"url": "https://nas/sharing/retry"}]},
})
return _make_response({"success": True})
with patch.object(httpx.AsyncClient, "get", new=fake_get):
url, _ = asyncio.run(create_share_link("/volume1/x.zip"))
assert url == "https://nas/sharing/retry"
assert login_calls["n"] == 2, "5xx 응답에 대해 retry가 동작해야 함"
def test_retry_exhausts_on_persistent_5xx(monkeypatch):
"""5xx가 MAX_RETRIES 동안 계속되면 DSMError로 raise."""
from app import dsm_client
monkeypatch.setattr(dsm_client, "DSM_MAX_RETRIES", 2)
login_calls = {"n": 0}
async def fake_get(self, url, *, params=None, **kw):
method = (params or {}).get("method", "")
if method == "login":
login_calls["n"] += 1
return _make_response({}, status_code=503)
return _make_response({"success": True})
with patch.object(httpx.AsyncClient, "get", new=fake_get):
with pytest.raises(DSMError, match="재시도"):
asyncio.run(create_share_link("/volume1/x.zip"))
assert login_calls["n"] == 2, f"MAX_RETRIES만큼 시도해야 함 (실제: {login_calls['n']})"
def test_retry_on_transport_error_then_success(monkeypatch):
"""httpx.ConnectError → retry → 성공."""
from app import dsm_client
monkeypatch.setattr(dsm_client, "DSM_MAX_RETRIES", 3)
login_calls = {"n": 0}
async def fake_get(self, url, *, params=None, **kw):
method = (params or {}).get("method", "")
if method == "login":
login_calls["n"] += 1
if login_calls["n"] == 1:
raise httpx.ConnectError("connection refused")
return _make_response({"success": True, "data": {"sid": "sid"}})
if method == "create":
return _make_response({
"success": True,
"data": {"links": [{"url": "https://nas/sharing/tr"}]},
})
return _make_response({"success": True})
with patch.object(httpx.AsyncClient, "get", new=fake_get):
url, _ = asyncio.run(create_share_link("/volume1/x.zip"))
assert url == "https://nas/sharing/tr"
assert login_calls["n"] == 2
def test_no_retry_on_4xx(monkeypatch):
"""4xx (영구 오류)는 retry 없이 즉시 raise_for_status."""
from app import dsm_client
monkeypatch.setattr(dsm_client, "DSM_MAX_RETRIES", 3)
login_calls = {"n": 0}
def _raise_4xx():
raise httpx.HTTPStatusError(
"client error",
request=MagicMock(),
response=MagicMock(status_code=403),
)
async def fake_get(self, url, *, params=None, **kw):
login_calls["n"] += 1
resp = MagicMock(spec=httpx.Response)
resp.status_code = 403
resp.json.return_value = {}
resp.raise_for_status = _raise_4xx
return resp
with patch.object(httpx.AsyncClient, "get", new=fake_get):
with pytest.raises(httpx.HTTPStatusError):
asyncio.run(create_share_link("/volume1/x.zip"))
assert login_calls["n"] == 1, "4xx는 retry 없이 즉시 raise"