"""배치 음악 생성 + 자동 컴파일·영상 파이프라인.""" import asyncio import logging import uuid from . import db from .random_pools import randomize logger = logging.getLogger("music-lab.batch") POLL_INTERVAL_S = 5 TRACK_GEN_TIMEOUT_S = 240 async def run_batch(batch_id: int) -> None: """1) genre로 N트랙 순차 Suno 생성 2) 모두 완료 후 compile_job 자동 생성·실행 3) compile 완료 후 영상 파이프라인 시작 (cover step) """ job = db.get_batch_job(batch_id) if not job: return genre = job["genre"] count = job["count"] duration = job["target_duration_sec"] auto_pipe = bool(job["auto_pipeline"]) db.update_batch_job(batch_id, status="generating") track_ids: list[int] = [] for i in range(1, count + 1): title = f"{genre.title()} Mix Track {i}" params = randomize(genre) db.update_batch_job(batch_id, current_track_index=i, current_track_status="generating") track_id = await _generate_one_track( title=title, genre=genre, duration_sec=duration, params=params, ) if track_id: track_ids.append(track_id) db.append_batch_track(batch_id, track_id) db.update_batch_job(batch_id, current_track_status="succeeded") else: db.update_batch_job(batch_id, current_track_status="failed") logger.warning("배치 %d 트랙 %d 실패 — 계속 진행", batch_id, i) if not track_ids: db.update_batch_job(batch_id, status="failed", error="모든 트랙 생성 실패") return db.update_batch_job(batch_id, status="generated") if not auto_pipe: return # 자동 컴파일 db.update_batch_job(batch_id, status="compiling") try: compile_id = db.create_compile_job( title=f"{genre.title()} Mix", track_ids=track_ids, crossfade_sec=3.0, ) db.update_batch_job(batch_id, compile_job_id=compile_id) except Exception as e: logger.exception("compile create failed") db.update_batch_job(batch_id, status="failed", error=f"compile create: {e}") return from . import compiler try: await asyncio.to_thread(compiler.run_compile, compile_id) except Exception as e: logger.exception("compile run failed") db.update_batch_job(batch_id, status="failed", error=f"compile run: {e}") return job_after = db.get_compile_job(compile_id) status_after = job_after.get("status") if job_after else None if status_after not in ("done", "succeeded"): db.update_batch_job( batch_id, status="failed", error=f"compile not done (status={status_after})" ) return # 자동 영상 파이프라인 try: pipeline_id = db.create_pipeline(compile_job_id=compile_id) db.update_batch_job(batch_id, pipeline_id=pipeline_id, status="piped") from .pipeline import orchestrator await orchestrator.run_step(pipeline_id, "cover") except Exception as e: logger.exception("pipeline launch failed") db.update_batch_job(batch_id, status="failed", error=f"pipeline launch: {e}") async def _generate_one_track(*, title: str, genre: str, duration_sec: int, params: dict) -> int | None: """기존 Suno generate 호출 + 완료까지 polling. 성공 시 새 track id, 실패 시 None.""" from .suno_provider import run_suno_generation task_id = str(uuid.uuid4()) suno_params = { "title": title, "genre": genre, "moods": params["moods"], "instruments": params["instruments"], "duration_sec": duration_sec, "bpm": params["bpm"], "key": params["key"], "scale": params["scale"], "prompt": params.get("prompt_modifier", ""), } db.create_task(task_id, suno_params, provider="suno") # Suno background task — 우리가 await로 기다림 (BackgroundTasks 미사용) asyncio.create_task(asyncio.to_thread(run_suno_generation, task_id, suno_params)) waited = 0 while waited < TRACK_GEN_TIMEOUT_S: await asyncio.sleep(POLL_INTERVAL_S) waited += POLL_INTERVAL_S task = db.get_task(task_id) if not task: continue status = task.get("status") if status == "succeeded": # task["track"] 또는 task["result"]["track"] 형태 시도, 없으면 task_id로 조회 tr = task.get("track") if tr and isinstance(tr, dict): return tr.get("id") result = task.get("result", {}) or {} if isinstance(result, dict) and isinstance(result.get("track"), dict): return result["track"].get("id") # Fallback: music_library에서 task_id로 검색 track = db.get_track_by_task_id(task_id) if track: return track.get("id") return None if status == "failed": return None return None # timeout