From e90e25d78fa13a22bf14b7dc97121f07641980b2 Mon Sep 17 00:00:00 2001 From: gahusb Date: Fri, 12 Jun 2026 00:20:29 +0900 Subject: [PATCH] =?UTF-8?q?feat(music-lab):=20orchestrator=20step=20?= =?UTF-8?q?=EC=9E=90=EB=8F=99=20=EC=9E=AC=EC=8B=9C=EB=8F=84=20(publish=20?= =?UTF-8?q?=EC=A0=9C=EC=99=B8)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 (1M context) --- music-lab/app/pipeline/orchestrator.py | 53 ++++++++++++++--------- music-lab/tests/test_pipeline_retry.py | 59 ++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 21 deletions(-) diff --git a/music-lab/app/pipeline/orchestrator.py b/music-lab/app/pipeline/orchestrator.py index 53622b1..652c956 100644 --- a/music-lab/app/pipeline/orchestrator.py +++ b/music-lab/app/pipeline/orchestrator.py @@ -11,6 +11,10 @@ from .gradient import make_gradient_with_title logger = logging.getLogger("music-lab.orchestrator") +STEP_MAX_RETRIES = 2 # 추가 재시도 (총 시도 = +1) +STEP_RETRY_BACKOFF_SEC = [5, 15] +NON_RETRY_STEPS = {"publish"} + async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None: """단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이. @@ -28,27 +32,34 @@ async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None: db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") return - try: - if step == "cover": - result = await _run_cover(p, ctx, feedback) - elif step == "video": - result = await _run_video(p, ctx) - elif step == "thumb": - result = await _run_thumb(p, ctx, feedback) - elif step == "meta": - result = await _run_meta(p, ctx, feedback) - elif step == "review": - result = await _run_review(p, ctx) - elif step == "publish": - result = await _run_publish(p, ctx) - else: - raise ValueError(f"unknown step: {step}") - db.update_pipeline_job(job_id, status="succeeded") - db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {})) - except Exception as e: - logger.exception("step %s failed for pipeline %s", step, pipeline_id) - db.update_pipeline_job(job_id, status="failed", error=str(e)) - db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") + attempts = 1 if step in NON_RETRY_STEPS else (STEP_MAX_RETRIES + 1) + last_err = None + for i in range(attempts): + try: + result = await _dispatch_step(step, p, ctx, feedback) + db.update_pipeline_job(job_id, status="succeeded") + db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {})) + return + except Exception as e: + last_err = e + logger.exception( + "step %s 실패 (pipeline %s, attempt %d/%d)", step, pipeline_id, i + 1, attempts + ) + if i < attempts - 1: + await asyncio.sleep(STEP_RETRY_BACKOFF_SEC[min(i, len(STEP_RETRY_BACKOFF_SEC) - 1)]) + db.update_pipeline_job(job_id, status="failed", error=str(last_err)) + db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {last_err}") + + +async def _dispatch_step(step: str, p: dict, ctx: dict, feedback: str) -> dict: + """step 이름으로 실행 함수 디스패치.""" + if step == "cover": return await _run_cover(p, ctx, feedback) + if step == "video": return await _run_video(p, ctx) + if step == "thumb": return await _run_thumb(p, ctx, feedback) + if step == "meta": return await _run_meta(p, ctx, feedback) + if step == "review": return await _run_review(p, ctx) + if step == "publish": return await _run_publish(p, ctx) + raise ValueError(f"unknown step: {step}") def _resolve_input(p: dict) -> dict: diff --git a/music-lab/tests/test_pipeline_retry.py b/music-lab/tests/test_pipeline_retry.py index d98631b..bb3482b 100644 --- a/music-lab/tests/test_pipeline_retry.py +++ b/music-lab/tests/test_pipeline_retry.py @@ -1,5 +1,6 @@ import pytest from app import db +from app.pipeline import orchestrator @pytest.fixture @@ -10,6 +11,11 @@ def fresh_db(monkeypatch, tmp_path): return db_path +@pytest.fixture(autouse=True) +def _no_backoff(monkeypatch): + monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0]) + + def test_get_last_failed_step_returns_step(fresh_db): pid = db.create_pipeline(track_id=1) job_id = db.create_pipeline_job(pid, "video") @@ -22,3 +28,56 @@ def test_get_last_failed_step_none_when_no_failure(fresh_db): pid = db.create_pipeline(track_id=1) db.create_pipeline_job(pid, "cover") assert db.get_last_failed_step(pid) is None + + +async def test_retryable_step_retries_then_succeeds(fresh_db, monkeypatch): + pid = db.create_pipeline(track_id=1) + calls = {"n": 0} + + async def flaky(step, p, ctx, feedback): + calls["n"] += 1 + if calls["n"] < 3: + raise RuntimeError("transient") + return {"next_state": "video_pending", "fields": {}} + + monkeypatch.setattr(orchestrator, "_dispatch_step", flaky) + monkeypatch.setattr( + orchestrator, "_resolve_input", + lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}, + ) + await orchestrator.run_step(pid, "cover") + assert calls["n"] == 3 + assert db.get_pipeline(pid)["state"] == "video_pending" + + +async def test_retryable_step_exhausts_to_failed(fresh_db, monkeypatch): + pid = db.create_pipeline(track_id=1) + + async def always_fail(step, p, ctx, feedback): + raise RuntimeError("permanent") + + monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail) + monkeypatch.setattr( + orchestrator, "_resolve_input", + lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}, + ) + await orchestrator.run_step(pid, "cover") + assert db.get_pipeline(pid)["state"] == "failed" + + +async def test_publish_not_retried(fresh_db, monkeypatch): + pid = db.create_pipeline(track_id=1) + calls = {"n": 0} + + async def fail_publish(step, p, ctx, feedback): + calls["n"] += 1 + raise RuntimeError("upload error") + + monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish) + monkeypatch.setattr( + orchestrator, "_resolve_input", + lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}, + ) + await orchestrator.run_step(pid, "publish") + assert calls["n"] == 1 + assert db.get_pipeline(pid)["state"] == "failed"