import pytest from fastapi.testclient import TestClient from app import db from app.pipeline import orchestrator @pytest.fixture def fresh_db(monkeypatch, tmp_path): db_path = tmp_path / "music.db" monkeypatch.setattr(db, "DB_PATH", str(db_path)) db.init_db() return db_path @pytest.fixture(autouse=True) def _no_backoff(monkeypatch): monkeypatch.setattr(orchestrator, "STEP_RETRY_BACKOFF_SEC", [0, 0]) def test_get_last_failed_step_returns_step(fresh_db): pid = db.create_pipeline(track_id=1) job_id = db.create_pipeline_job(pid, "video") db.update_pipeline_job(job_id, status="failed", error="boom") db.update_pipeline_state(pid, "failed", failed_reason="video: boom") assert db.get_last_failed_step(pid) == "video" def test_get_last_failed_step_none_when_no_failure(fresh_db): pid = db.create_pipeline(track_id=1) db.create_pipeline_job(pid, "cover") assert db.get_last_failed_step(pid) is None async def test_retryable_step_retries_then_succeeds(fresh_db, monkeypatch): pid = db.create_pipeline(track_id=1) calls = {"n": 0} async def flaky(step, p, ctx, feedback): calls["n"] += 1 if calls["n"] < 3: raise RuntimeError("transient") return {"next_state": "video_pending", "fields": {}} monkeypatch.setattr(orchestrator, "_dispatch_step", flaky) monkeypatch.setattr( orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}, ) await orchestrator.run_step(pid, "cover") assert calls["n"] == 3 assert db.get_pipeline(pid)["state"] == "video_pending" async def test_retryable_step_exhausts_to_failed(fresh_db, monkeypatch): pid = db.create_pipeline(track_id=1) async def always_fail(step, p, ctx, feedback): raise RuntimeError("permanent") monkeypatch.setattr(orchestrator, "_dispatch_step", always_fail) monkeypatch.setattr( orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}, ) await orchestrator.run_step(pid, "cover") assert db.get_pipeline(pid)["state"] == "failed" async def test_publish_not_retried(fresh_db, monkeypatch): pid = db.create_pipeline(track_id=1) calls = {"n": 0} async def fail_publish(step, p, ctx, feedback): calls["n"] += 1 raise RuntimeError("upload error") monkeypatch.setattr(orchestrator, "_dispatch_step", fail_publish) monkeypatch.setattr( orchestrator, "_resolve_input", lambda p: {"genre": "x", "title": "t", "moods": [], "tracks": [], "audio_path": "", "duration_sec": 0}, ) await orchestrator.run_step(pid, "publish") assert calls["n"] == 1 assert db.get_pipeline(pid)["state"] == "failed" # ── Task 3: retry endpoint tests ───────────────────────────────────────────── @pytest.fixture def client(fresh_db): from app.main import app return TestClient(app) def test_retry_failed_pipeline_retriggers(fresh_db, client, monkeypatch): pid = db.create_pipeline(track_id=1) job = db.create_pipeline_job(pid, "video") db.update_pipeline_job(job, status="failed", error="boom") db.update_pipeline_state(pid, "failed", failed_reason="video: boom") called = {} async def fake_run(p, step, *a): called["pid"], called["step"] = p, step monkeypatch.setattr(orchestrator, "run_step", fake_run) r = client.post(f"/api/music/pipeline/{pid}/retry") assert r.status_code in (200, 202) assert r.json()["retrying_step"] == "video" def test_retry_non_failed_409(fresh_db, client): pid = db.create_pipeline(track_id=1) # state='created' r = client.post(f"/api/music/pipeline/{pid}/retry") assert r.status_code == 409 def test_retry_publish_with_video_id_rejected(fresh_db, client): pid = db.create_pipeline(track_id=1) job = db.create_pipeline_job(pid, "publish") db.update_pipeline_job(job, status="failed", error="x") db.update_pipeline_state(pid, "failed", failed_reason="publish: x", youtube_video_id="abc123") r = client.post(f"/api/music/pipeline/{pid}/retry") assert r.status_code == 409