- Fix 1: retry_pipeline이 bg.add_task 직전 상태를 'retrying'으로 전이 → 동시 retry 409 방지 - Fix 2: test_retry_failed_pipeline_retriggers에 called[pid/step] assert 추가 - Fix 3: failed_step이 STEPS에 없으면 409 (엉뚱한 prefix 방지) - Fix 4: STEP_RETRY_BACKOFF_SEC 빈 리스트 시 IndexError → 0으로 폴백 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
175 lines
6.6 KiB
Python
175 lines
6.6 KiB
Python
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from app import db
|
|
from app.pipeline import orchestrator
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(monkeypatch, tmp_path):
|
|
db_path = tmp_path / "music.db"
|
|
monkeypatch.setattr(db, "DB_PATH", str(db_path))
|
|
db.init_db()
|
|
return db_path
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _no_backoff(monkeypatch):
|
|
monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0])
|
|
|
|
|
|
def test_get_last_failed_step_returns_step(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
job_id = db.create_pipeline_job(pid, "video")
|
|
db.update_pipeline_job(job_id, status="failed", error="boom")
|
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
|
assert db.get_last_failed_step(pid) == "video"
|
|
|
|
|
|
def test_get_last_failed_step_none_when_no_failure(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
db.create_pipeline_job(pid, "cover")
|
|
assert db.get_last_failed_step(pid) is None
|
|
|
|
|
|
async def test_retryable_step_retries_then_succeeds(fresh_db, monkeypatch):
|
|
pid = db.create_pipeline(track_id=1)
|
|
calls = {"n": 0}
|
|
|
|
async def flaky(step, p, ctx, feedback):
|
|
calls["n"] += 1
|
|
if calls["n"] < 3:
|
|
raise RuntimeError("transient")
|
|
return {"next_state": "video_pending", "fields": {}}
|
|
|
|
monkeypatch.setattr(orchestrator, "_dispatch_step", flaky)
|
|
monkeypatch.setattr(
|
|
orchestrator, "_resolve_input",
|
|
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
|
|
)
|
|
await orchestrator.run_step(pid, "cover")
|
|
assert calls["n"] == 3
|
|
assert db.get_pipeline(pid)["state"] == "video_pending"
|
|
|
|
|
|
async def test_retryable_step_exhausts_to_failed(fresh_db, monkeypatch):
|
|
pid = db.create_pipeline(track_id=1)
|
|
|
|
async def always_fail(step, p, ctx, feedback):
|
|
raise RuntimeError("permanent")
|
|
|
|
monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail)
|
|
monkeypatch.setattr(
|
|
orchestrator, "_resolve_input",
|
|
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
|
|
)
|
|
await orchestrator.run_step(pid, "cover")
|
|
assert db.get_pipeline(pid)["state"] == "failed"
|
|
|
|
|
|
async def test_publish_not_retried(fresh_db, monkeypatch):
|
|
pid = db.create_pipeline(track_id=1)
|
|
calls = {"n": 0}
|
|
|
|
async def fail_publish(step, p, ctx, feedback):
|
|
calls["n"] += 1
|
|
raise RuntimeError("upload error")
|
|
|
|
monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish)
|
|
monkeypatch.setattr(
|
|
orchestrator, "_resolve_input",
|
|
lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0},
|
|
)
|
|
await orchestrator.run_step(pid, "publish")
|
|
assert calls["n"] == 1
|
|
assert db.get_pipeline(pid)["state"] == "failed"
|
|
|
|
|
|
# ── Task 3: retry endpoint tests ─────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def client(fresh_db):
|
|
from app.main import app
|
|
return TestClient(app)
|
|
|
|
|
|
def test_retry_failed_pipeline_retriggers(fresh_db, client, monkeypatch):
|
|
pid = db.create_pipeline(track_id=1)
|
|
job = db.create_pipeline_job(pid, "video")
|
|
db.update_pipeline_job(job, status="failed", error="boom")
|
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
|
called = {}
|
|
|
|
async def fake_run(p, step, *a):
|
|
called["pid"], called["step"] = p, step
|
|
|
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
|
assert r.status_code in (200, 202)
|
|
assert r.json()["retrying_step"] == "video"
|
|
|
|
|
|
def test_retry_non_failed_409(fresh_db, client):
|
|
pid = db.create_pipeline(track_id=1) # state='created'
|
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
|
assert r.status_code == 409
|
|
|
|
|
|
def test_retry_publish_with_video_id_rejected(fresh_db, client):
|
|
pid = db.create_pipeline(track_id=1)
|
|
job = db.create_pipeline_job(pid, "publish")
|
|
db.update_pipeline_job(job, status="failed", error="x")
|
|
db.update_pipeline_state(pid, "failed", failed_reason="publish: x",
|
|
youtube_video_id="abc123")
|
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
|
assert r.status_code == 409
|
|
|
|
|
|
# ── Fix 2: fake_run 인자 검증 ────────────────────────────────────────────────
|
|
|
|
def test_retry_failed_pipeline_retriggers_with_correct_args(fresh_db, client, monkeypatch):
|
|
"""fake_run이 (pid, failed_step)으로 호출되는지 검증."""
|
|
pid = db.create_pipeline(track_id=1)
|
|
job = db.create_pipeline_job(pid, "video")
|
|
db.update_pipeline_job(job, status="failed", error="boom")
|
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
|
called = {}
|
|
|
|
async def fake_run(p, step, *a):
|
|
called["pid"], called["step"] = p, step
|
|
|
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
|
assert r.status_code in (200, 202)
|
|
assert called["pid"] == pid
|
|
assert called["step"] == "video"
|
|
|
|
|
|
# ── Fix 1: retrying 전이로 중복 retry 409 ────────────────────────────────────
|
|
|
|
def test_retry_twice_second_is_409(fresh_db, client, monkeypatch):
|
|
"""첫 번째 retry가 상태를 'retrying'으로 전이 → 두 번째 retry는 409."""
|
|
pid = db.create_pipeline(track_id=1)
|
|
job = db.create_pipeline_job(pid, "video")
|
|
db.update_pipeline_job(job, status="failed", error="boom")
|
|
db.update_pipeline_state(pid, "failed", failed_reason="video: boom")
|
|
|
|
async def fake_run(p, step, *a):
|
|
pass
|
|
|
|
monkeypatch.setattr(orchestrator, "run_step", fake_run)
|
|
r1 = client.post(f"/api/music/pipeline/{pid}/retry")
|
|
assert r1.status_code in (200, 202)
|
|
r2 = client.post(f"/api/music/pipeline/{pid}/retry") # 이미 retrying → 409
|
|
assert r2.status_code == 409
|
|
|
|
|
|
# ── Fix 3: 알 수 없는 step prefix → 409 ─────────────────────────────────────
|
|
|
|
def test_retry_unparseable_failed_reason_409(fresh_db, client):
|
|
"""failed_reason이 known STEPS에 없는 prefix면 409."""
|
|
pid = db.create_pipeline(track_id=1)
|
|
# failed job row 없이 state만 failed + 비-step prefix reason
|
|
db.update_pipeline_state(pid, "failed", failed_reason="ValueError: track 1 없음")
|
|
r = client.post(f"/api/music/pipeline/{pid}/retry")
|
|
assert r.status_code == 409
|