"""파이프라인 오케스트레이터 — 단계별 BackgroundTask 등록 및 산출물 → DB 반영.""" import asyncio import json import logging import os import sqlite3 from app import db from . import cover, video, thumb, metadata, review, youtube logger = logging.getLogger("music-lab.orchestrator") async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None: """단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이. 호출 직후 _running 상태로 전환, 끝나면 _pending(사용자 게이트) 또는 자동 다음. 실패 시 failed 상태 + reason. """ job_id = db.create_pipeline_job(pipeline_id, step) db.update_pipeline_job(job_id, status="running") p = db.get_pipeline(pipeline_id) track = _get_track(p["track_id"]) try: if step == "cover": result = await _run_cover(p, track, feedback) elif step == "video": result = await _run_video(p, track) elif step == "thumb": result = await _run_thumb(p, track, feedback) elif step == "meta": result = await _run_meta(p, track, feedback) elif step == "review": result = await _run_review(p, track) elif step == "publish": result = await _run_publish(p, track) else: raise ValueError(f"unknown step: {step}") db.update_pipeline_job(job_id, status="succeeded") db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {})) except Exception as e: logger.exception("step %s failed for pipeline %s", step, pipeline_id) db.update_pipeline_job(job_id, status="failed", error=str(e)) db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") def _get_track(track_id: int) -> dict: # tracks 테이블 헬퍼 — 기존 db에 있는 함수 사용 t = None if hasattr(db, "get_track_by_id"): t = db.get_track_by_id(track_id) elif hasattr(db, "get_track"): t = db.get_track(track_id) if not t: # 폴백: music_library 테이블에서 직접 (스키마 확인 필요) t = _fetch_track_fallback(track_id) if not t: raise ValueError(f"트랙 {track_id} 없음") return t def _fetch_track_fallback(track_id: int) -> dict | None: """db 모듈에 get_track이 없을 때 대비 — music_library 테이블 직접 조회.""" try: conn = sqlite3.connect(db.DB_PATH) conn.row_factory = sqlite3.Row # 가능한 테이블/컬럼 시도 (music_library 또는 tracks) for table in ("music_library", "tracks"): try: row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (track_id,)).fetchone() if row: d = dict(row) # JSON 컬럼 파싱 (있으면) for k in ("moods", "instruments"): if k in d and isinstance(d[k], str): try: d[k] = json.loads(d[k]) except (json.JSONDecodeError, TypeError): d[k] = [] conn.close() return d except sqlite3.OperationalError: continue conn.close() except Exception as e: logger.warning("track fallback fetch 실패: %s", e) return None async def _run_cover(p, track, feedback): setup = db.get_youtube_setup() prompts = setup["cover_prompts"] template = prompts.get(track.get("genre", "default").lower(), prompts.get("default", "")) out = await cover.generate( pipeline_id=p["id"], genre=track.get("genre", "default"), prompt_template=template, mood=", ".join(track.get("moods", []) or []), track_title=track.get("title", ""), feedback=feedback, ) return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}} async def _run_video(p, track): setup = db.get_youtube_setup() vd = setup["visual_defaults"] audio_path = track.get("file_path") or _local_path(track.get("audio_url", "")) cover_path = _local_path(p["cover_url"]) out = await asyncio.to_thread( video.generate, pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path, genre=track.get("genre", "default"), duration_sec=track.get("duration_sec", 120), resolution=vd["resolution"], style=vd["style"], ) return {"next_state": "video_pending", "fields": {"video_url": out["url"]}} async def _run_thumb(p, track, feedback): video_path = _local_path(p["video_url"]) out = await asyncio.to_thread( thumb.generate, pipeline_id=p["id"], video_path=video_path, track_title=track.get("title", ""), overlay_text=True, ) return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}} async def _run_meta(p, track, feedback): setup = db.get_youtube_setup() trend_top = _get_trend_top() out = await metadata.generate( track=track, template=setup["metadata_template"], trend_keywords=trend_top, feedback=feedback, ) return {"next_state": "meta_pending", "fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}} async def _run_review(p, track): setup = db.get_youtube_setup() meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {} result = await review.run_4_axis( pipeline=p, track=track, video_meta={"length_sec": track.get("duration_sec", 120), "resolution": setup["visual_defaults"]["resolution"]}, metadata=meta, thumbnail_url=p.get("thumbnail_url", ""), trend_top=_get_trend_top(), weights=setup["review_weights"], threshold=setup["review_threshold"], ) return {"next_state": "publish_pending", "fields": {"review_json": json.dumps(result, ensure_ascii=False)}} async def _run_publish(p, track): setup = db.get_youtube_setup() meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {} privacy = setup["publish_policy"].get("privacy", "private") result = await asyncio.to_thread( youtube.upload_video, video_path=_local_path(p["video_url"]), thumbnail_path=_local_path(p["thumbnail_url"]) if p.get("thumbnail_url") else None, metadata=meta, privacy=privacy, ) return {"next_state": "published", "fields": {"youtube_video_id": result["video_id"]}} def _local_path(media_url: str) -> str: """ /media/videos/123/cover.jpg → /app/data/videos/123/cover.jpg /media/music/abc.mp3 → /app/data/abc.mp3 (music mount at /app/data, no subdir) """ if not media_url: return "" base_media = os.getenv("VIDEO_MEDIA_BASE", "/media/videos") base_data = os.getenv("VIDEO_DATA_DIR", "/app/data/videos") if media_url.startswith(base_media): return media_url.replace(base_media, base_data, 1) if media_url.startswith("/media/music/"): return media_url.replace("/media/music/", "/app/data/", 1) return media_url.replace("/media/", "/app/data/", 1) def _get_trend_top(n: int = 10) -> list[str]: try: if hasattr(db, "get_market_trends"): rows = db.get_market_trends(days=7) return [r.get("genre", "") for r in rows[:n] if r.get("genre")] except Exception: pass return []