fix(music-lab): retry 레이스 가드(retrying 전이) + failed_step 검증 + backoff 빈리스트 가드
- Fix 1: retry_pipeline이 bg.add_task 직전 상태를 'retrying'으로 전이 → 동시 retry 409 방지 - Fix 2: test_retry_failed_pipeline_retriggers에 called[pid/step] assert 추가 - Fix 3: failed_step이 STEPS에 없으면 409 (엉뚱한 prefix 방지) - Fix 4: STEP_RETRY_BACKOFF_SEC 빈 리스트 시 IndexError → 0으로 폴백 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1130,6 +1130,7 @@ def cancel_pipeline(pid: int):
|
|||||||
|
|
||||||
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
|
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
|
||||||
async def retry_pipeline(pid: int, bg: BackgroundTasks):
|
async def retry_pipeline(pid: int, bg: BackgroundTasks):
|
||||||
|
from .pipeline.state_machine import STEPS
|
||||||
p = _db_module.get_pipeline(pid)
|
p = _db_module.get_pipeline(pid)
|
||||||
if not p:
|
if not p:
|
||||||
raise HTTPException(404)
|
raise HTTPException(404)
|
||||||
@@ -1141,8 +1142,13 @@ async def retry_pipeline(pid: int, bg: BackgroundTasks):
|
|||||||
failed_step = reason.split(":", 1)[0].strip() or None
|
failed_step = reason.split(":", 1)[0].strip() or None
|
||||||
if not failed_step:
|
if not failed_step:
|
||||||
raise HTTPException(409, "실패 step을 판별할 수 없음")
|
raise HTTPException(409, "실패 step을 판별할 수 없음")
|
||||||
|
# Fix 3: failed_step이 알려진 STEPS에 없으면 409
|
||||||
|
if failed_step not in STEPS:
|
||||||
|
raise HTTPException(409, "실패 step 판별 불가")
|
||||||
if failed_step == "publish" and p.get("youtube_video_id"):
|
if failed_step == "publish" and p.get("youtube_video_id"):
|
||||||
raise HTTPException(409, "이미 업로드됨 (중복 방지)")
|
raise HTTPException(409, "이미 업로드됨 (중복 방지)")
|
||||||
|
# Fix 1: bg.add_task 직전에 상태를 'retrying'으로 전이 → 동시 retry 409 방지
|
||||||
|
_db_module.update_pipeline_state(pid, "retrying")
|
||||||
bg.add_task(orchestrator.run_step, pid, failed_step)
|
bg.add_task(orchestrator.run_step, pid, failed_step)
|
||||||
return {"ok": True, "retrying_step": failed_step}
|
return {"ok": True, "retrying_step": failed_step}
|
||||||
|
|
||||||
|
|||||||
@@ -46,7 +46,8 @@ async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
|
|||||||
"step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts
|
"step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts
|
||||||
)
|
)
|
||||||
if i < attempts - 1:
|
if i < attempts - 1:
|
||||||
await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)])
|
backoff = STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)] if STEP_RETRY_BACKOFF_SEC else 0
|
||||||
|
await asyncio.sleep(backoff)
|
||||||
db.update_pipeline_job(job_id, status="failed", error=str(last_err))
|
db.update_pipeline_job(job_id, status="failed", error=str(last_err))
|
||||||
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}")
|
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}")
|
||||||
|
|
||||||
|
|||||||
@@ -122,3 +122,53 @@ def test_retry_publish_with_video_id_rejected(fresh_db, client):
|
|||||||
youtube_video_id="abc123")
|
youtube_video_id="abc123")
|
||||||
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
assert r.status_code == 409
|
assert r.status_code == 409
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fix 2: fake_run 인자 검증 ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_retry_failed_pipeline_retriggers_with_correct_args(fresh_db, client, monkeypatch):
|
||||||
|
"""fake_run이 (pid, failed_step)으로 호출되는지 검증."""
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job = db.create_pipeline_job(pid, "video")
|
||||||
|
db.update_pipeline_job(job, status="failed", error="boom")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
||||||
|
called = {}
|
||||||
|
|
||||||
|
async def fake_run(p, step, *a):
|
||||||
|
called["pid"], called["step"] = p, step
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code in (200, 202)
|
||||||
|
assert called["pid"] == pid
|
||||||
|
assert called["step"] == "video"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fix 1: retrying 전이로 중복 retry 409 ────────────────────────────────────
|
||||||
|
|
||||||
|
def test_retry_twice_second_is_409(fresh_db, client, monkeypatch):
|
||||||
|
"""첫 번째 retry가 상태를 'retrying'으로 전이 → 두 번째 retry는 409."""
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
job = db.create_pipeline_job(pid, "video")
|
||||||
|
db.update_pipeline_job(job, status="failed", error="boom")
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
||||||
|
|
||||||
|
async def fake_run(p, step, *a):
|
||||||
|
pass
|
||||||
|
|
||||||
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
||||||
|
r1 = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r1.status_code in (200, 202)
|
||||||
|
r2 = client.post(f"/api/music/pipeline/{pid}/retry") # 이미 retrying → 409
|
||||||
|
assert r2.status_code == 409
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fix 3: 알 수 없는 step prefix → 409 ─────────────────────────────────────
|
||||||
|
|
||||||
|
def test_retry_unparseable_failed_reason_409(fresh_db, client):
|
||||||
|
"""failed_reason이 known STEPS에 없는 prefix면 409."""
|
||||||
|
pid = db.create_pipeline(track_id=1)
|
||||||
|
# failed job row 없이 state만 failed + 비-step prefix reason
|
||||||
|
db.update_pipeline_state(pid, "failed", failed_reason="ValueError: track 1 없음")
|
||||||
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
||||||
|
assert r.status_code == 409
|
||||||
|
|||||||
Reference in New Issue
Block a user