From d638666659921a996f8f4fb3100559b9e8a99c77 Mon Sep 17 00:00:00 2001 From: gahusb Date: Fri, 12 Jun 2026 00:18:07 +0900 Subject: [PATCH] =?UTF-8?q?feat(music-lab):=20get=5Flast=5Ffailed=5Fstep?= =?UTF-8?q?=20=E2=80=94=20=ED=8C=8C=EC=9D=B4=ED=94=84=EB=9D=BC=EC=9D=B8=20?= =?UTF-8?q?=EC=9E=AC=EA=B0=9C=EC=9A=A9=20=EC=8B=A4=ED=8C=A8=20step=20?= =?UTF-8?q?=ED=8C=90=EB=B3=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 (1M context) --- music-lab/app/db.py | 12 ++++++++++++ music-lab/tests/test_pipeline_retry.py | 24 ++++++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 music-lab/tests/test_pipeline_retry.py diff --git a/music-lab/app/db.py b/music-lab/app/db.py index 45fa4c5..e1d9ea1 100644 --- a/music-lab/app/db.py +++ b/music-lab/app/db.py @@ -1220,6 +1220,18 @@ def list_pipeline_jobs(pid: int) -> List[Dict[str, Any]]: return [dict(r) for r in rows] +def get_last_failed_step(pid: int) -> Optional[str]: + """파이프라인의 가장 최근 status='failed' pipeline_job의 step. 없으면 None.""" + with _conn() as conn: + row = conn.execute( + "SELECT step FROM pipeline_jobs " + "WHERE pipeline_id = ? AND status = 'failed' " + "ORDER BY id DESC LIMIT 1", + (pid,), + ).fetchone() + return row["step"] if row else None + + def get_youtube_setup() -> Dict[str, Any]: """youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회.""" with _conn() as conn: diff --git a/music-lab/tests/test_pipeline_retry.py b/music-lab/tests/test_pipeline_retry.py new file mode 100644 index 0000000..d98631b --- /dev/null +++ b/music-lab/tests/test_pipeline_retry.py @@ -0,0 +1,24 @@ +import pytest +from app import db + + +@pytest.fixture +def fresh_db(monkeypatch, tmp_path): + db_path = tmp_path / "music.db" + monkeypatch.setattr(db, "DB_PATH", str(db_path)) + db.init_db() + return db_path + + +def test_get_last_failed_step_returns_step(fresh_db): + pid = db.create_pipeline(track_id=1) + job_id = db.create_pipeline_job(pid, "video") + db.update_pipeline_job(job_id, status="failed", error="boom") + db.update_pipeline_state(pid, "failed", failed_reason="video: boom") + assert db.get_last_failed_step(pid) == "video" + + +def test_get_last_failed_step_none_when_no_failure(fresh_db): + pid = db.create_pipeline(track_id=1) + db.create_pipeline_job(pid, "cover") + assert db.get_last_failed_step(pid) is None