- orchestrator._run_video: track.file_path 우선 사용 (audio_url 변환 불필요) - _local_path: /media/music/ → /app/data/ (마운트가 /app/data 직접이라 music 서브디렉토리 없음) - video.py/thumb.py: stderr truncation [-800:]/[-500:] — 진짜 에러 보이게
187 lines
7.3 KiB
Python
187 lines
7.3 KiB
Python
"""파이프라인 오케스트레이터 — 단계별 BackgroundTask 등록 및 산출물 → DB 반영."""
|
|
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
|
|
from app import db
|
|
from . import cover, video, thumb, metadata, review, youtube
|
|
|
|
logger = logging.getLogger("music-lab.orchestrator")
|
|
|
|
|
|
async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
|
|
"""단계 실행 → 결과를 DB에 반영하고 *_pending 또는 다음 단계로 전이.
|
|
|
|
호출 직후 _running 상태로 전환, 끝나면 _pending(사용자 게이트) 또는 자동 다음.
|
|
실패 시 failed 상태 + reason.
|
|
"""
|
|
job_id = db.create_pipeline_job(pipeline_id, step)
|
|
db.update_pipeline_job(job_id, status="running")
|
|
p = db.get_pipeline(pipeline_id)
|
|
track = _get_track(p["track_id"])
|
|
|
|
try:
|
|
if step == "cover":
|
|
result = await _run_cover(p, track, feedback)
|
|
elif step == "video":
|
|
result = await _run_video(p, track)
|
|
elif step == "thumb":
|
|
result = await _run_thumb(p, track, feedback)
|
|
elif step == "meta":
|
|
result = await _run_meta(p, track, feedback)
|
|
elif step == "review":
|
|
result = await _run_review(p, track)
|
|
elif step == "publish":
|
|
result = await _run_publish(p, track)
|
|
else:
|
|
raise ValueError(f"unknown step: {step}")
|
|
db.update_pipeline_job(job_id, status="succeeded")
|
|
db.update_pipeline_state(pipeline_id, result["next_state"], **result.get("fields", {}))
|
|
except Exception as e:
|
|
logger.exception("step %s failed for pipeline %s", step, pipeline_id)
|
|
db.update_pipeline_job(job_id, status="failed", error=str(e))
|
|
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
|
|
|
|
|
|
def _get_track(track_id: int) -> dict:
|
|
# tracks 테이블 헬퍼 — 기존 db에 있는 함수 사용
|
|
t = None
|
|
if hasattr(db, "get_track_by_id"):
|
|
t = db.get_track_by_id(track_id)
|
|
elif hasattr(db, "get_track"):
|
|
t = db.get_track(track_id)
|
|
if not t:
|
|
# 폴백: music_library 테이블에서 직접 (스키마 확인 필요)
|
|
t = _fetch_track_fallback(track_id)
|
|
if not t:
|
|
raise ValueError(f"트랙 {track_id} 없음")
|
|
return t
|
|
|
|
|
|
def _fetch_track_fallback(track_id: int) -> dict | None:
|
|
"""db 모듈에 get_track이 없을 때 대비 — music_library 테이블 직접 조회."""
|
|
try:
|
|
conn = sqlite3.connect(db.DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
# 가능한 테이블/컬럼 시도 (music_library 또는 tracks)
|
|
for table in ("music_library", "tracks"):
|
|
try:
|
|
row = conn.execute(f"SELECT * FROM {table} WHERE id = ?", (track_id,)).fetchone()
|
|
if row:
|
|
d = dict(row)
|
|
# JSON 컬럼 파싱 (있으면)
|
|
for k in ("moods", "instruments"):
|
|
if k in d and isinstance(d[k], str):
|
|
try:
|
|
d[k] = json.loads(d[k])
|
|
except (json.JSONDecodeError, TypeError):
|
|
d[k] = []
|
|
conn.close()
|
|
return d
|
|
except sqlite3.OperationalError:
|
|
continue
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.warning("track fallback fetch 실패: %s", e)
|
|
return None
|
|
|
|
|
|
async def _run_cover(p, track, feedback):
|
|
setup = db.get_youtube_setup()
|
|
prompts = setup["cover_prompts"]
|
|
template = prompts.get(track.get("genre", "default").lower(), prompts.get("default", ""))
|
|
out = await cover.generate(
|
|
pipeline_id=p["id"], genre=track.get("genre", "default"),
|
|
prompt_template=template,
|
|
mood=", ".join(track.get("moods", []) or []),
|
|
track_title=track.get("title", ""),
|
|
feedback=feedback,
|
|
)
|
|
return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}}
|
|
|
|
|
|
async def _run_video(p, track):
|
|
setup = db.get_youtube_setup()
|
|
vd = setup["visual_defaults"]
|
|
audio_path = track.get("file_path") or _local_path(track.get("audio_url", ""))
|
|
cover_path = _local_path(p["cover_url"])
|
|
out = video.generate(
|
|
pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path,
|
|
genre=track.get("genre", "default"),
|
|
duration_sec=track.get("duration_sec", 120),
|
|
resolution=vd["resolution"], style=vd["style"],
|
|
)
|
|
return {"next_state": "video_pending", "fields": {"video_url": out["url"]}}
|
|
|
|
|
|
async def _run_thumb(p, track, feedback):
|
|
video_path = _local_path(p["video_url"])
|
|
out = thumb.generate(pipeline_id=p["id"], video_path=video_path,
|
|
track_title=track.get("title", ""), overlay_text=True)
|
|
return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}}
|
|
|
|
|
|
async def _run_meta(p, track, feedback):
|
|
setup = db.get_youtube_setup()
|
|
trend_top = _get_trend_top()
|
|
out = await metadata.generate(
|
|
track=track, template=setup["metadata_template"],
|
|
trend_keywords=trend_top, feedback=feedback,
|
|
)
|
|
return {"next_state": "meta_pending",
|
|
"fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}}
|
|
|
|
|
|
async def _run_review(p, track):
|
|
setup = db.get_youtube_setup()
|
|
meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {}
|
|
result = await review.run_4_axis(
|
|
pipeline=p, track=track,
|
|
video_meta={"length_sec": track.get("duration_sec", 120),
|
|
"resolution": setup["visual_defaults"]["resolution"]},
|
|
metadata=meta, thumbnail_url=p.get("thumbnail_url", ""),
|
|
trend_top=_get_trend_top(),
|
|
weights=setup["review_weights"], threshold=setup["review_threshold"],
|
|
)
|
|
return {"next_state": "publish_pending",
|
|
"fields": {"review_json": json.dumps(result, ensure_ascii=False)}}
|
|
|
|
|
|
async def _run_publish(p, track):
|
|
setup = db.get_youtube_setup()
|
|
meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {}
|
|
privacy = setup["publish_policy"].get("privacy", "private")
|
|
result = youtube.upload_video(
|
|
video_path=_local_path(p["video_url"]),
|
|
thumbnail_path=_local_path(p["thumbnail_url"]) if p.get("thumbnail_url") else None,
|
|
metadata=meta, privacy=privacy,
|
|
)
|
|
return {"next_state": "published",
|
|
"fields": {"youtube_video_id": result["video_id"]}}
|
|
|
|
|
|
def _local_path(media_url: str) -> str:
|
|
""" /media/videos/123/cover.jpg → /app/data/videos/123/cover.jpg
|
|
/media/music/abc.mp3 → /app/data/abc.mp3 (music mount at /app/data, no subdir)
|
|
"""
|
|
if not media_url:
|
|
return ""
|
|
base_media = os.getenv("VIDEO_MEDIA_BASE", "/media/videos")
|
|
base_data = os.getenv("VIDEO_DATA_DIR", "/app/data/videos")
|
|
if media_url.startswith(base_media):
|
|
return media_url.replace(base_media, base_data, 1)
|
|
if media_url.startswith("/media/music/"):
|
|
return media_url.replace("/media/music/", "/app/data/", 1)
|
|
return media_url.replace("/media/", "/app/data/", 1)
|
|
|
|
|
|
def _get_trend_top(n: int = 10) -> list[str]:
|
|
try:
|
|
if hasattr(db, "get_market_trends"):
|
|
rows = db.get_market_trends(days=7)
|
|
return [r.get("genre", "") for r in rows[:n] if r.get("genre")]
|
|
except Exception:
|
|
pass
|
|
return []
|