diff --git a/music-lab/app/pipeline/orchestrator.py b/music-lab/app/pipeline/orchestrator.py index fc4c39b..cb15e27 100644 --- a/music-lab/app/pipeline/orchestrator.py +++ b/music-lab/app/pipeline/orchestrator.py @@ -20,21 +20,26 @@ async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None: job_id = db.create_pipeline_job(pipeline_id, step) db.update_pipeline_job(job_id, status="running") p = db.get_pipeline(pipeline_id) - track = _get_track(p["track_id"]) + try: + ctx = _resolve_input(p) + except ValueError as e: + db.update_pipeline_job(job_id, status="failed", error=str(e)) + db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") + return try: if step == "cover": - result = await _run_cover(p, track, feedback) + result = await _run_cover(p, ctx, feedback) elif step == "video": - result = await _run_video(p, track) + result = await _run_video(p, ctx) elif step == "thumb": - result = await _run_thumb(p, track, feedback) + result = await _run_thumb(p, ctx, feedback) elif step == "meta": - result = await _run_meta(p, track, feedback) + result = await _run_meta(p, ctx, feedback) elif step == "review": - result = await _run_review(p, track) + result = await _run_review(p, ctx) elif step == "publish": - result = await _run_publish(p, track) + result = await _run_publish(p, ctx) else: raise ValueError(f"unknown step: {step}") db.update_pipeline_job(job_id, status="succeeded") @@ -45,6 +50,78 @@ async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None: db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") +def _resolve_input(p: dict) -> dict: + """파이프라인 입력 = 단일 트랙 또는 컴파일 결과. + + 반환: { + "audio_path": str, # 컨테이너 절대경로 + "duration_sec": int, + "tracks": list[{"id", "title", "start_offset_sec", "duration_sec"}], + "title": str, + "genre": str, # mix는 "mix" + "moods": list[str], + } + """ + track_id = p.get("track_id") + compile_id = p.get("compile_job_id") + + if track_id is None and compile_id is None: + raise ValueError("track_id 또는 compile_job_id 중 하나는 필요") + + if compile_id is not None: + job = db.get_compile_job(compile_id) + if not job or job.get("status") != "succeeded": + raise ValueError( + f"compile job {compile_id} not ready " + f"(status={job.get('status') if job else None})" + ) + + tracks = [] + offset = 0.0 + crossfade = job.get("crossfade_sec", 0) or 0 + track_ids = job.get("track_ids") or [] + for tid in track_ids: + t = db.get_track_by_id(tid) + if not t: + continue + dur = t.get("duration_sec", 0) + tracks.append({ + "id": tid, + "title": t.get("title", ""), + "start_offset_sec": int(offset), + "duration_sec": dur, + }) + offset += dur - crossfade + # 마지막 트랙은 풀 길이 반영 (crossfade 빼기 한 것 복구) + total = int(offset + crossfade) if tracks else 0 + return { + "audio_path": job.get("audio_path") or job.get("output_path") or "", + "duration_sec": total, + "tracks": tracks, + "title": job.get("title") or "Mix", + "genre": "mix", + "moods": [], + } + + # 단일 트랙 + t = db.get_track_by_id(track_id) + if not t: + raise ValueError(f"track {track_id} 없음") + return { + "audio_path": t.get("file_path") or _local_path(t.get("audio_url", "")), + "duration_sec": t.get("duration_sec", 0), + "tracks": [{ + "id": t["id"], + "title": t.get("title", ""), + "start_offset_sec": 0, + "duration_sec": t.get("duration_sec", 0), + }], + "title": t.get("title", ""), + "genre": t.get("genre", "default"), + "moods": t.get("moods", []) or [], + } + + def _get_track(track_id: int) -> dict: # tracks 테이블 헬퍼 — 기존 db에 있는 함수 사용 t = None @@ -88,62 +165,64 @@ def _fetch_track_fallback(track_id: int) -> dict | None: return None -async def _run_cover(p, track, feedback): +async def _run_cover(p, ctx, feedback): setup = db.get_youtube_setup() prompts = setup["cover_prompts"] - template = prompts.get(track.get("genre", "default").lower(), prompts.get("default", "")) + template = prompts.get(ctx["genre"].lower(), prompts.get("default", "")) out = await cover.generate( - pipeline_id=p["id"], genre=track.get("genre", "default"), + pipeline_id=p["id"], genre=ctx["genre"], prompt_template=template, - mood=", ".join(track.get("moods", []) or []), - track_title=track.get("title", ""), - feedback=feedback, + mood=", ".join(ctx["moods"] or []), + track_title=ctx["title"], feedback=feedback, ) return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}} -async def _run_video(p, track): +async def _run_video(p, ctx): setup = db.get_youtube_setup() vd = setup["visual_defaults"] - audio_path = track.get("file_path") or _local_path(track.get("audio_url", "")) + audio_path = ctx["audio_path"] cover_path = _local_path(p["cover_url"]) out = await asyncio.to_thread( video.generate, pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path, - genre=track.get("genre", "default"), - duration_sec=track.get("duration_sec", 120), + genre=ctx["genre"], + duration_sec=ctx["duration_sec"], resolution=vd["resolution"], style=vd["style"], ) return {"next_state": "video_pending", "fields": {"video_url": out["url"]}} -async def _run_thumb(p, track, feedback): +async def _run_thumb(p, ctx, feedback): video_path = _local_path(p["video_url"]) out = await asyncio.to_thread( thumb.generate, pipeline_id=p["id"], video_path=video_path, - track_title=track.get("title", ""), overlay_text=True, + track_title=ctx["title"], overlay_text=True, ) return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}} -async def _run_meta(p, track, feedback): +async def _run_meta(p, ctx, feedback): setup = db.get_youtube_setup() trend_top = _get_trend_top() out = await metadata.generate( - track=track, template=setup["metadata_template"], + track={"title": ctx["title"], "genre": ctx["genre"], + "duration_sec": ctx["duration_sec"], "moods": ctx["moods"]}, + template=setup["metadata_template"], trend_keywords=trend_top, feedback=feedback, ) return {"next_state": "meta_pending", "fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}} -async def _run_review(p, track): +async def _run_review(p, ctx): setup = db.get_youtube_setup() meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {} result = await review.run_4_axis( - pipeline=p, track=track, - video_meta={"length_sec": track.get("duration_sec", 120), + pipeline=p, + track={"title": ctx["title"], "genre": ctx["genre"], "duration_sec": ctx["duration_sec"]}, + video_meta={"length_sec": ctx["duration_sec"], "resolution": setup["visual_defaults"]["resolution"]}, metadata=meta, thumbnail_url=p.get("thumbnail_url", ""), trend_top=_get_trend_top(), @@ -153,7 +232,7 @@ async def _run_review(p, track): "fields": {"review_json": json.dumps(result, ensure_ascii=False)}} -async def _run_publish(p, track): +async def _run_publish(p, ctx): setup = db.get_youtube_setup() meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {} privacy = setup["publish_policy"].get("privacy", "private") diff --git a/music-lab/tests/test_orchestrator_resolve.py b/music-lab/tests/test_orchestrator_resolve.py new file mode 100644 index 0000000..ed1b1a5 --- /dev/null +++ b/music-lab/tests/test_orchestrator_resolve.py @@ -0,0 +1,61 @@ +import pytest +from unittest.mock import patch, MagicMock +from app.pipeline.orchestrator import _resolve_input + + +def test_resolve_input_track(): + pipeline = {"id": 1, "track_id": 13, "compile_job_id": None} + track = { + "id": 13, "title": "Lo-Fi Drive", "genre": "lo-fi", + "moods": ["chill"], "duration_sec": 176, + "file_path": "/app/data/x.mp3", "audio_url": "/media/music/x.mp3", + } + with patch("app.pipeline.orchestrator.db.get_track_by_id", return_value=track): + result = _resolve_input(pipeline) + assert result["audio_path"] == "/app/data/x.mp3" + assert result["duration_sec"] == 176 + assert len(result["tracks"]) == 1 + assert result["tracks"][0]["start_offset_sec"] == 0 + assert result["title"] == "Lo-Fi Drive" + assert result["genre"] == "lo-fi" + + +def test_resolve_input_compile_job(): + pipeline = {"id": 2, "track_id": None, "compile_job_id": 5} + job = { + "id": 5, "status": "succeeded", "title": "Chill Mix", + "audio_path": "/app/data/compiles/5.mp3", + "track_ids": [13, 14, 15], + "crossfade_sec": 3, + } + tracks = { + 13: {"id": 13, "title": "T1", "duration_sec": 180}, + 14: {"id": 14, "title": "T2", "duration_sec": 200}, + 15: {"id": 15, "title": "T3", "duration_sec": 150}, + } + with patch("app.pipeline.orchestrator.db.get_compile_job", return_value=job), \ + patch("app.pipeline.orchestrator.db.get_track_by_id", side_effect=lambda i: tracks[i]): + result = _resolve_input(pipeline) + assert result["audio_path"] == "/app/data/compiles/5.mp3" + # 누적 = 180+200+150 - 2*3(crossfade pair gaps) = 524 + assert result["duration_sec"] == 524 + assert len(result["tracks"]) == 3 + assert result["tracks"][0]["start_offset_sec"] == 0 + assert result["tracks"][1]["start_offset_sec"] == 177 # 180 - 3 + assert result["tracks"][2]["start_offset_sec"] == 374 # 177 + 200 - 3 + assert result["title"] == "Chill Mix" + assert result["genre"] == "mix" + + +def test_resolve_input_compile_not_ready(): + pipeline = {"id": 3, "track_id": None, "compile_job_id": 6} + job = {"id": 6, "status": "rendering"} + with patch("app.pipeline.orchestrator.db.get_compile_job", return_value=job): + with pytest.raises(ValueError, match="not ready"): + _resolve_input(pipeline) + + +def test_resolve_input_neither(): + pipeline = {"id": 4, "track_id": None, "compile_job_id": None} + with pytest.raises(ValueError): + _resolve_input(pipeline)