8 Commits

Author SHA1 Message Date
262366bc1e test(music-lab): compile_job 기반 happy path 통합 테스트 2026-05-09 13:27:47 +09:00
5fc914cd8f feat(music-lab): POST /pipeline에 compile_job_id + visual_style/background 옵션 2026-05-09 13:20:38 +09:00
8f859274c4 feat(music-lab): video.py — Windows에 style/background_mode/tracks 전달 + orchestrator 파라미터 wiring 2026-05-09 13:17:49 +09:00
a347da075c feat(music-lab): metadata tracks 옵션 + YouTube 챕터 자동 형식 2026-05-09 13:15:30 +09:00
e754fb30f5 feat(music-lab): background.py — Pexels Video API + orchestrator video_loop 분기 2026-05-09 13:13:42 +09:00
f0c0c18beb feat(music-lab): cover.py Pexels 이미지 검색 분기 (image_source=pexels) 2026-05-09 13:10:49 +09:00
d11023decb feat(music-lab): orchestrator _resolve_input — track/compile_job 통합 입력 2026-05-09 13:08:53 +09:00
70a256bbe4 feat(music-lab): video_pipelines 4 컬럼 추가 + compile_jobs JOIN
- _add_column_if_missing 헬퍼 추가 (idempotent ALTER TABLE)
- video_pipelines에 compile_job_id, visual_style, background_mode, background_keyword 컬럼 추가
- track_id를 nullable로 변경 (compile_job_id 입력 모드 지원)
- create_pipeline에 compile_job_id XOR track_id 검증 추가
- get_pipeline / list_pipelines에 compile_jobs LEFT JOIN — compile_title 노출

Task 1 of 17: Essential Mix pipeline DB migration

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 13:04:23 +09:00
15 changed files with 966 additions and 50 deletions

View File

@@ -14,6 +14,85 @@ def _conn() -> sqlite3.Connection:
return conn return conn
def _add_column_if_missing(cursor, table: str, column: str, ddl: str) -> None:
"""SQLite-safe ALTER TABLE ADD COLUMN — idempotent.
SQLite의 ALTER TABLE은 컬럼 존재 시 에러. PRAGMA로 미리 확인.
"""
cursor.execute(f"PRAGMA table_info({table})")
existing = {row[1] for row in cursor.fetchall()}
if column not in existing:
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {ddl}")
def _is_column_not_null(cursor, table: str, column: str) -> bool:
"""PRAGMA table_info row format: (cid, name, type, notnull, dflt_value, pk)."""
cursor.execute(f"PRAGMA table_info({table})")
for row in cursor.fetchall():
if row[1] == column:
return row[3] == 1
return False
def _relax_video_pipelines_track_id_nullable(cursor) -> None:
"""track_id NOT NULL → NULL (compile_job_id 만 있는 pipeline 지원).
SQLite는 ALTER COLUMN을 지원하지 않아 표준 패턴 — 새 테이블 생성 → 데이터 복사 → 교체.
Idempotent: 이미 NULL이면 no-op.
"""
if not _is_column_not_null(cursor, "video_pipelines", "track_id"):
return # already nullable
# 새 컬럼 4개도 함께 포함된 최종 스키마로 새 테이블 생성
cursor.execute("""
CREATE TABLE video_pipelines_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
video_url TEXT,
thumbnail_url TEXT,
metadata_json TEXT,
review_json TEXT,
youtube_video_id TEXT,
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
cancelled_at TEXT,
failed_reason TEXT,
compile_job_id INTEGER,
visual_style TEXT NOT NULL DEFAULT 'essential',
background_mode TEXT NOT NULL DEFAULT 'static',
background_keyword TEXT
)
""")
# 기존 컬럼 모두 명시적으로 SELECT (새 컬럼은 default로 채워짐)
cursor.execute("""
INSERT INTO video_pipelines_new
(id, track_id, state, state_started_at, cover_url, video_url,
thumbnail_url, metadata_json, review_json, youtube_video_id,
feedback_count_per_step, last_telegram_msg_ids,
created_at, updated_at, cancelled_at, failed_reason,
compile_job_id, visual_style, background_mode, background_keyword)
SELECT
id, track_id, state, state_started_at, cover_url, video_url,
thumbnail_url, metadata_json, review_json, youtube_video_id,
feedback_count_per_step, last_telegram_msg_ids,
created_at, updated_at, cancelled_at, failed_reason,
COALESCE(compile_job_id, NULL),
COALESCE(visual_style, 'essential'),
COALESCE(background_mode, 'static'),
COALESCE(background_keyword, NULL)
FROM video_pipelines
""")
cursor.execute("DROP TABLE video_pipelines")
cursor.execute("ALTER TABLE video_pipelines_new RENAME TO video_pipelines")
def init_db() -> None: def init_db() -> None:
with _conn() as conn: with _conn() as conn:
conn.execute(""" conn.execute("""
@@ -186,10 +265,11 @@ def init_db() -> None:
""") """)
# ── YouTube pipeline 테이블 (5개) ───────────────────────────────── # ── YouTube pipeline 테이블 (5개) ─────────────────────────────────
# track_id는 nullable: compile_job_id로 입력하는 essential mix 모드 지원
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS video_pipelines ( CREATE TABLE IF NOT EXISTS video_pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER NOT NULL, track_id INTEGER,
state TEXT NOT NULL DEFAULT 'created', state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL, state_started_at TEXT NOT NULL,
cover_url TEXT, cover_url TEXT,
@@ -206,6 +286,13 @@ def init_db() -> None:
failed_reason TEXT failed_reason TEXT
) )
""") """)
# Migration for essential mix pipeline (task 2026-05-09)
cur = conn.cursor()
_add_column_if_missing(cur, "video_pipelines", "compile_job_id", "INTEGER")
_add_column_if_missing(cur, "video_pipelines", "visual_style", "TEXT NOT NULL DEFAULT 'essential'")
_add_column_if_missing(cur, "video_pipelines", "background_mode", "TEXT NOT NULL DEFAULT 'static'")
_add_column_if_missing(cur, "video_pipelines", "background_keyword", "TEXT")
_relax_video_pipelines_track_id_nullable(cur)
conn.execute(""" conn.execute("""
CREATE TABLE IF NOT EXISTS pipeline_jobs ( CREATE TABLE IF NOT EXISTS pipeline_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -946,22 +1033,34 @@ def _parse_pipeline_row(row: sqlite3.Row) -> Dict[str, Any]:
return d return d
def create_pipeline(track_id: int) -> int: def create_pipeline(track_id: Optional[int] = None, *,
compile_job_id: Optional[int] = None,
visual_style: str = "essential",
background_mode: str = "static",
background_keyword: Optional[str] = None) -> int:
"""track_id XOR compile_job_id 검증."""
if (track_id is None) == (compile_job_id is None):
raise ValueError("track_id와 compile_job_id 중 정확히 하나만 지정")
with _conn() as conn: with _conn() as conn:
cur = conn.cursor()
now = _now() now = _now()
cur = conn.execute(""" cur.execute("""
INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at) INSERT INTO video_pipelines
VALUES (?, 'created', ?, ?, ?) (track_id, compile_job_id, visual_style, background_mode, background_keyword,
""", (track_id, now, now, now)) state, state_started_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 'created', ?, ?, ?)
""", (track_id, compile_job_id, visual_style, background_mode,
background_keyword, now, now, now))
return cur.lastrowid return cur.lastrowid
def get_pipeline(pid: int) -> Optional[Dict[str, Any]]: def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
with _conn() as conn: with _conn() as conn:
row = conn.execute(""" row = conn.execute("""
SELECT vp.*, ml.title AS track_title SELECT vp.*, ml.title AS track_title, cj.title AS compile_title
FROM video_pipelines vp FROM video_pipelines vp
LEFT JOIN music_library ml ON ml.id = vp.track_id LEFT JOIN music_library ml ON ml.id = vp.track_id
LEFT JOIN compile_jobs cj ON cj.id = vp.compile_job_id
WHERE vp.id = ? WHERE vp.id = ?
""", (pid,)).fetchone() """, (pid,)).fetchone()
if not row: if not row:
@@ -991,9 +1090,10 @@ def update_pipeline_state(pid: int, state: str, **fields) -> None:
def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]: def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]:
sql = """ sql = """
SELECT vp.*, ml.title AS track_title SELECT vp.*, ml.title AS track_title, cj.title AS compile_title
FROM video_pipelines vp FROM video_pipelines vp
LEFT JOIN music_library ml ON ml.id = vp.track_id LEFT JOIN music_library ml ON ml.id = vp.track_id
LEFT JOIN compile_jobs cj ON cj.id = vp.compile_job_id
""" """
if active_only: if active_only:
sql += " WHERE vp.state NOT IN ('published','cancelled','failed','awaiting_manual')" sql += " WHERE vp.state NOT IN ('published','cancelled','failed','awaiting_manual')"

View File

@@ -929,7 +929,11 @@ def market_suggest(limit: int = 5):
# ── Pipeline endpoints ──────────────────────────────────────────────────────── # ── Pipeline endpoints ────────────────────────────────────────────────────────
class PipelineCreate(BaseModel): class PipelineCreate(BaseModel):
track_id: int track_id: int | None = None
compile_job_id: int | None = None
visual_style: str | None = None # single | essential
background_mode: str | None = None # static | video_loop
background_keyword: str | None = None
class FeedbackRequest(BaseModel): class FeedbackRequest(BaseModel):
@@ -940,10 +944,34 @@ class FeedbackRequest(BaseModel):
@app.post("/api/music/pipeline", status_code=201) @app.post("/api/music/pipeline", status_code=201)
def create_pipeline(req: PipelineCreate): def create_pipeline(req: PipelineCreate):
# XOR 검증
if (req.track_id is None) == (req.compile_job_id is None):
raise HTTPException(400, "track_id 또는 compile_job_id 중 정확히 하나를 지정")
# compile_job 상태 확인
if req.compile_job_id is not None:
job = _db_module.get_compile_job(req.compile_job_id)
if not job:
raise HTTPException(404, f"compile job {req.compile_job_id} 없음")
if job.get("status") != "succeeded":
raise HTTPException(400, f"compile job {req.compile_job_id} not ready (status={job.get('status')})")
# 동일 입력으로 이미 active 파이프라인 있으면 409
actives = _db_module.list_pipelines(active_only=True) actives = _db_module.list_pipelines(active_only=True)
if any(p["track_id"] == req.track_id for p in actives): for p in actives:
raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다") if (req.track_id and p.get("track_id") == req.track_id) or \
pid = _db_module.create_pipeline(req.track_id) (req.compile_job_id and p.get("compile_job_id") == req.compile_job_id):
raise HTTPException(409, "이미 진행 중인 파이프라인이 있습니다")
setup = _db_module.get_youtube_setup()
vd = setup["visual_defaults"]
pid = _db_module.create_pipeline(
track_id=req.track_id,
compile_job_id=req.compile_job_id,
visual_style=req.visual_style or vd.get("default_visual_style", "essential"),
background_mode=req.background_mode or vd.get("default_background_mode", "static"),
background_keyword=req.background_keyword or vd.get("default_background_keyword") or None,
)
return _db_module.get_pipeline(pid) return _db_module.get_pipeline(pid)

View File

@@ -0,0 +1,60 @@
"""Pexels Video API로 background loop 영상 받아오기 (video_loop 모드용)."""
import os
import logging
import httpx
from . import storage
logger = logging.getLogger("music-lab.background")
TIMEOUT_S = 60
async def fetch_video_loop(pipeline_id: int, keyword: str) -> dict:
"""Pexels Video API → 720p HD mp4 다운로드 → /app/data/videos/{id}/loop.mp4 저장.
반환: {"path": str | None, "used_fallback": bool, "error": str | None}
"""
api_key = os.getenv("PEXELS_API_KEY", "")
if not api_key:
return {"path": None, "used_fallback": True, "error": "PEXELS_API_KEY 미설정"}
out_dir = storage.pipeline_dir(pipeline_id)
out_path = os.path.join(out_dir, "loop.mp4")
try:
async with httpx.AsyncClient(timeout=TIMEOUT_S) as client:
resp = await client.get(
"https://api.pexels.com/videos/search",
headers={"Authorization": api_key},
params={"query": keyword or "ambient calm", "per_page": 5,
"orientation": "landscape"},
)
resp.raise_for_status()
data = resp.json()
videos = data.get("videos", [])
if not videos:
return {"path": None, "used_fallback": True,
"error": f"Pexels 결과 없음: {keyword}"}
# 720p/1080p HD 우선, 없으면 첫 번째 video file
chosen = None
for v in videos:
for f in v.get("video_files", []):
if f.get("quality") == "hd" and f.get("width") in (1280, 1920):
chosen = f
break
if chosen:
break
if not chosen:
chosen = videos[0]["video_files"][0]
video_url = chosen["link"]
vid_resp = await client.get(video_url)
vid_resp.raise_for_status()
with open(out_path, "wb") as f:
f.write(vid_resp.content)
return {"path": out_path, "used_fallback": False, "error": None}
except (httpx.HTTPError, httpx.TimeoutException, KeyError, ValueError, OSError) as e:
logger.warning("Pexels video fetch 실패: %s", e)
return {"path": None, "used_fallback": True, "error": str(e)}

View File

@@ -13,6 +13,7 @@ from .gradient import make_gradient_with_title
logger = logging.getLogger("music-lab.cover") logger = logging.getLogger("music-lab.cover")
DALLE_TIMEOUT_S = 90 DALLE_TIMEOUT_S = 90
PEXELS_IMG_TIMEOUT_S = 30
def _get_api_key() -> str: def _get_api_key() -> str:
@@ -23,13 +24,68 @@ def _get_model() -> str:
return os.getenv("OPENAI_IMAGE_MODEL", "gpt-image-1") return os.getenv("OPENAI_IMAGE_MODEL", "gpt-image-1")
def _get_pexels_key() -> str:
return os.getenv("PEXELS_API_KEY", "")
async def _generate_with_pexels(genre: str, mood: str, track_title: str,
out_path: str, keyword_override: str = "") -> bool:
"""Pexels 이미지 검색·다운로드. 성공 시 True. API key 없거나 0 결과면 False."""
api_key = _get_pexels_key()
if not api_key:
return False
keyword = keyword_override or f"{genre} aesthetic background"
try:
async with httpx.AsyncClient(timeout=PEXELS_IMG_TIMEOUT_S) as client:
resp = await client.get(
"https://api.pexels.com/v1/search",
headers={"Authorization": api_key},
params={"query": keyword, "per_page": 5, "orientation": "landscape"},
)
resp.raise_for_status()
data = resp.json()
photos = data.get("photos", [])
if not photos:
return False
img_url = photos[0]["src"].get("large2x") or photos[0]["src"].get("original")
img_resp = await client.get(img_url)
img_resp.raise_for_status()
with Image.open(BytesIO(img_resp.content)) as src:
img = src.convert("RGB")
img.save(out_path, "JPEG", quality=92)
return True
except (httpx.HTTPError, httpx.TimeoutException, KeyError, ValueError, OSError) as e:
logger.warning("Pexels 이미지 검색 실패: %s", e)
return False
async def generate(*, pipeline_id: int, genre: str, prompt_template: str, async def generate(*, pipeline_id: int, genre: str, prompt_template: str,
mood: str = "", track_title: str = "", feedback: str = "") -> dict: mood: str = "", track_title: str = "", feedback: str = "",
image_source: str = "ai",
background_keyword: str = "") -> dict:
"""커버 아트 생성. 성공 시 jpg 저장 + URL 반환. 실패 시 그라데이션 폴백. """커버 아트 생성. 성공 시 jpg 저장 + URL 반환. 실패 시 그라데이션 폴백.
image_source: 'ai' (DALL·E 기본) | 'pexels' (스톡 사진).
반환: {"url": str, "used_fallback": bool, "error": str | None} 반환: {"url": str, "used_fallback": bool, "error": str | None}
""" """
out_path = os.path.join(storage.pipeline_dir(pipeline_id), "cover.jpg") out_path = os.path.join(storage.pipeline_dir(pipeline_id), "cover.jpg")
if image_source == "pexels":
ok = await _generate_with_pexels(genre, mood, track_title, out_path, background_keyword)
if ok:
return {
"url": storage.media_url(pipeline_id, "cover.jpg"),
"used_fallback": False,
"error": None,
}
# Pexels 실패 → 그라데이션 폴백
make_gradient_with_title(genre, track_title, out_path)
return {
"url": storage.media_url(pipeline_id, "cover.jpg"),
"used_fallback": True,
"error": "Pexels 검색 실패 또는 API 키 없음",
}
used_fallback = False used_fallback = False
error = None error = None

View File

@@ -19,26 +19,46 @@ def _get_model() -> str:
return os.getenv("CLAUDE_HAIKU_MODEL", CLAUDE_HAIKU_MODEL_DEFAULT) return os.getenv("CLAUDE_HAIKU_MODEL", CLAUDE_HAIKU_MODEL_DEFAULT)
def _format_chapters(tracks: list[dict]) -> str:
"""YouTube 챕터 자동 인식 형식: '[mm:ss] 제목' 한 줄씩.
1시간 이상이면 hh:mm:ss 형식.
"""
if not tracks:
return ""
lines = []
for t in tracks:
offset = int(t.get("start_offset_sec", 0))
m, s = divmod(offset, 60)
h, m = divmod(m, 60)
if h > 0:
ts = f"{h:02d}:{m:02d}:{s:02d}"
else:
ts = f"{m:02d}:{s:02d}"
lines.append(f"{ts} {t.get('title', '')}")
return "\n".join(lines)
async def generate(*, track: dict, template: dict, trend_keywords: list[str], async def generate(*, track: dict, template: dict, trend_keywords: list[str],
feedback: str = "") -> dict: feedback: str = "", tracks: list[dict] | None = None) -> dict:
"""메타데이터 생성. 성공 시 LLM, 실패/미설정 시 템플릿 치환 폴백. """메타데이터 생성. 성공 시 LLM, 실패/미설정 시 템플릿 치환 폴백.
반환: {"title", "description", "tags", "category_id", "used_fallback", "error"} 반환: {"title", "description", "tags", "category_id", "used_fallback", "error"}
""" """
api_key = _get_api_key() api_key = _get_api_key()
if not api_key: if not api_key:
return {**_fallback_template(track, template), "used_fallback": True, "error": "no api key"} return {**_fallback_template(track, template, tracks), "used_fallback": True, "error": "no api key"}
try: try:
result = await _call_claude(track, template, trend_keywords, feedback, result = await _call_claude(track, template, trend_keywords, feedback, tracks,
api_key=api_key, model=_get_model()) api_key=api_key, model=_get_model())
return {**result, "used_fallback": False, "error": None} return {**result, "used_fallback": False, "error": None}
except (httpx.HTTPError, httpx.TimeoutException, KeyError, ValueError, json.JSONDecodeError) as e: except (httpx.HTTPError, httpx.TimeoutException, KeyError, ValueError, json.JSONDecodeError) as e:
logger.warning("메타데이터 LLM 실패 — 폴백: %s", e) logger.warning("메타데이터 LLM 실패 — 폴백: %s", e)
return {**_fallback_template(track, template), "used_fallback": True, "error": str(e)} return {**_fallback_template(track, template, tracks), "used_fallback": True, "error": str(e)}
def _fallback_template(track: dict, template: dict) -> dict: def _fallback_template(track: dict, template: dict, tracks: list[dict] | None = None) -> dict:
fmt_vars = { fmt_vars = {
"title": track.get("title", ""), "title": track.get("title", ""),
"genre": track.get("genre", ""), "genre": track.get("genre", ""),
@@ -48,6 +68,8 @@ def _fallback_template(track: dict, template: dict) -> dict:
} }
title = template.get("title", "{title}").format(**fmt_vars) title = template.get("title", "{title}").format(**fmt_vars)
description = template.get("description", "{title}").format(**fmt_vars) description = template.get("description", "{title}").format(**fmt_vars)
if tracks and len(tracks) > 1:
description = description + "\n\n" + _format_chapters(tracks)
return { return {
"title": title[:100], "title": title[:100],
"description": description[:5000], "description": description[:5000],
@@ -57,17 +79,24 @@ def _fallback_template(track: dict, template: dict) -> dict:
async def _call_claude(track: dict, template: dict, trend_keywords: list[str], async def _call_claude(track: dict, template: dict, trend_keywords: list[str],
feedback: str, *, api_key: str, model: str) -> dict: feedback: str, tracks: list[dict] | None,
*, api_key: str, model: str) -> dict:
user_prompt = ( user_prompt = (
"다음 트랙의 YouTube 메타데이터를 생성하세요. JSON으로만 응답.\n\n" "다음 트랙의 YouTube 메타데이터를 생성하세요. JSON으로만 응답.\n\n"
f"트랙: {json.dumps(track, ensure_ascii=False)}\n" f"트랙: {json.dumps(track, ensure_ascii=False)}\n"
f"템플릿: {json.dumps(template, ensure_ascii=False)}\n" f"템플릿: {json.dumps(template, ensure_ascii=False)}\n"
f"트렌드 키워드: {', '.join(trend_keywords)}\n" f"트렌드 키워드: {', '.join(trend_keywords)}\n"
) )
if tracks and len(tracks) > 1:
chapters = _format_chapters(tracks)
user_prompt += (
f"\n이 영상은 {len(tracks)}개 트랙의 mix입니다. "
f"description에 다음 챕터 리스트를 그대로 포함하세요 (YouTube 자동 챕터 인식용):\n{chapters}\n"
)
if feedback: if feedback:
user_prompt += f"\n사용자 피드백: {feedback}\n" user_prompt += f"\n사용자 피드백: {feedback}\n"
user_prompt += ( user_prompt += (
'\n출력 JSON: {"title": "60자 이내", "description": "1000자 이내, 3-5문단",' '\n출력 JSON: {"title": "60자 이내", "description": "1000자 이내",'
' "tags": ["15개 이내"], "category_id": 10}' ' "tags": ["15개 이내"], "category_id": 10}'
) )
@@ -81,7 +110,7 @@ async def _call_claude(track: dict, template: dict, trend_keywords: list[str],
}, },
json={ json={
"model": model, "model": model,
"max_tokens": 1024, "max_tokens": 2048, # mix 더 길어서
"messages": [{"role": "user", "content": user_prompt}], "messages": [{"role": "user", "content": user_prompt}],
}, },
) )

View File

@@ -6,7 +6,8 @@ import os
import sqlite3 import sqlite3
from app import db from app import db
from . import cover, video, thumb, metadata, review, youtube from . import cover, video, thumb, metadata, review, youtube, background, storage
from .gradient import make_gradient_with_title
logger = logging.getLogger("music-lab.orchestrator") logger = logging.getLogger("music-lab.orchestrator")
@@ -20,21 +21,26 @@ async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
job_id = db.create_pipeline_job(pipeline_id, step) job_id = db.create_pipeline_job(pipeline_id, step)
db.update_pipeline_job(job_id, status="running") db.update_pipeline_job(job_id, status="running")
p = db.get_pipeline(pipeline_id) p = db.get_pipeline(pipeline_id)
track = _get_track(p["track_id"]) try:
ctx = _resolve_input(p)
except ValueError as e:
db.update_pipeline_job(job_id, status="failed", error=str(e))
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
return
try: try:
if step == "cover": if step == "cover":
result = await _run_cover(p, track, feedback) result = await _run_cover(p, ctx, feedback)
elif step == "video": elif step == "video":
result = await _run_video(p, track) result = await _run_video(p, ctx)
elif step == "thumb": elif step == "thumb":
result = await _run_thumb(p, track, feedback) result = await _run_thumb(p, ctx, feedback)
elif step == "meta": elif step == "meta":
result = await _run_meta(p, track, feedback) result = await _run_meta(p, ctx, feedback)
elif step == "review": elif step == "review":
result = await _run_review(p, track) result = await _run_review(p, ctx)
elif step == "publish": elif step == "publish":
result = await _run_publish(p, track) result = await _run_publish(p, ctx)
else: else:
raise ValueError(f"unknown step: {step}") raise ValueError(f"unknown step: {step}")
db.update_pipeline_job(job_id, status="succeeded") db.update_pipeline_job(job_id, status="succeeded")
@@ -45,6 +51,78 @@ async def run_step(pipeline_id: int, step: str, feedback: str = "") -> None:
db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}") db.update_pipeline_state(pipeline_id, "failed", failed_reason=f"{step}: {e}")
def _resolve_input(p: dict) -> dict:
"""파이프라인 입력 = 단일 트랙 또는 컴파일 결과.
반환: {
"audio_path": str, # 컨테이너 절대경로
"duration_sec": int,
"tracks": list[{"id", "title", "start_offset_sec", "duration_sec"}],
"title": str,
"genre": str, # mix는 "mix"
"moods": list[str],
}
"""
track_id = p.get("track_id")
compile_id = p.get("compile_job_id")
if track_id is None and compile_id is None:
raise ValueError("track_id 또는 compile_job_id 중 하나는 필요")
if compile_id is not None:
job = db.get_compile_job(compile_id)
if not job or job.get("status") != "succeeded":
raise ValueError(
f"compile job {compile_id} not ready "
f"(status={job.get('status') if job else None})"
)
tracks = []
offset = 0.0
crossfade = job.get("crossfade_sec", 0) or 0
track_ids = job.get("track_ids") or []
for tid in track_ids:
t = db.get_track_by_id(tid)
if not t:
continue
dur = t.get("duration_sec", 0)
tracks.append({
"id": tid,
"title": t.get("title", ""),
"start_offset_sec": int(offset),
"duration_sec": dur,
})
offset += dur - crossfade
# 마지막 트랙은 풀 길이 반영 (crossfade 빼기 한 것 복구)
total = int(offset + crossfade) if tracks else 0
return {
"audio_path": job.get("audio_path") or job.get("output_path") or "",
"duration_sec": total,
"tracks": tracks,
"title": job.get("title") or "Mix",
"genre": "mix",
"moods": [],
}
# 단일 트랙
t = db.get_track_by_id(track_id)
if not t:
raise ValueError(f"track {track_id} 없음")
return {
"audio_path": t.get("file_path") or _local_path(t.get("audio_url", "")),
"duration_sec": t.get("duration_sec", 0),
"tracks": [{
"id": t["id"],
"title": t.get("title", ""),
"start_offset_sec": 0,
"duration_sec": t.get("duration_sec", 0),
}],
"title": t.get("title", ""),
"genre": t.get("genre", "default"),
"moods": t.get("moods", []) or [],
}
def _get_track(track_id: int) -> dict: def _get_track(track_id: int) -> dict:
# tracks 테이블 헬퍼 — 기존 db에 있는 함수 사용 # tracks 테이블 헬퍼 — 기존 db에 있는 함수 사용
t = None t = None
@@ -88,62 +166,95 @@ def _fetch_track_fallback(track_id: int) -> dict | None:
return None return None
async def _run_cover(p, track, feedback): async def _run_cover(p, ctx, feedback):
setup = db.get_youtube_setup() setup = db.get_youtube_setup()
vd = setup["visual_defaults"]
bg_mode = p.get("background_mode") or vd.get("default_background_mode", "static")
keyword = p.get("background_keyword") or vd.get("default_background_keyword", "")
if bg_mode == "video_loop":
# Pexels 영상 다운로드 시도 — 성공 여부와 무관하게 cover.jpg는 그라데이션으로 별도 생성
# (실패 시 video.py가 cover.jpg를 fallback 배경으로 사용 가능)
await background.fetch_video_loop(p["id"], keyword)
out_path = os.path.join(storage.pipeline_dir(p["id"]), "cover.jpg")
make_gradient_with_title(ctx["genre"], ctx["title"], out_path)
return {"next_state": "cover_pending",
"fields": {"cover_url": storage.media_url(p["id"], "cover.jpg")}}
# 정적 모드 — 기존 cover.generate 흐름
prompts = setup["cover_prompts"] prompts = setup["cover_prompts"]
template = prompts.get(track.get("genre", "default").lower(), prompts.get("default", "")) template = prompts.get(ctx["genre"].lower(), prompts.get("default", ""))
image_source = vd.get("background_image_source", "ai")
out = await cover.generate( out = await cover.generate(
pipeline_id=p["id"], genre=track.get("genre", "default"), pipeline_id=p["id"], genre=ctx["genre"],
prompt_template=template, prompt_template=template,
mood=", ".join(track.get("moods", []) or []), mood=", ".join(ctx["moods"] or []),
track_title=track.get("title", ""), track_title=ctx["title"], feedback=feedback,
feedback=feedback, image_source=image_source,
background_keyword=keyword,
) )
return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}} return {"next_state": "cover_pending", "fields": {"cover_url": out["url"]}}
async def _run_video(p, track): async def _run_video(p, ctx):
setup = db.get_youtube_setup() setup = db.get_youtube_setup()
vd = setup["visual_defaults"] vd = setup["visual_defaults"]
audio_path = track.get("file_path") or _local_path(track.get("audio_url", "")) audio_path = ctx["audio_path"]
cover_path = _local_path(p["cover_url"]) cover_path = _local_path(p["cover_url"])
style = p.get("visual_style") or vd.get("default_visual_style", "essential")
bg_mode = p.get("background_mode") or vd.get("default_background_mode", "static")
bg_path = None
if bg_mode == "video_loop":
loop_local = os.path.join(storage.pipeline_dir(p["id"]), "loop.mp4")
bg_path = loop_local if os.path.isfile(loop_local) else None
out = await asyncio.to_thread( out = await asyncio.to_thread(
video.generate, video.generate,
pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path, pipeline_id=p["id"], audio_path=audio_path, cover_path=cover_path,
genre=track.get("genre", "default"), genre=ctx["genre"],
duration_sec=track.get("duration_sec", 120), duration_sec=ctx["duration_sec"],
resolution=vd["resolution"], style=vd["style"], resolution=vd.get("resolution", "1920x1080"),
style=style,
background_mode=bg_mode,
background_path=bg_path,
tracks=ctx["tracks"] if len(ctx["tracks"]) > 1 else None,
) )
return {"next_state": "video_pending", "fields": {"video_url": out["url"]}} return {"next_state": "video_pending", "fields": {"video_url": out["url"]}}
async def _run_thumb(p, track, feedback): async def _run_thumb(p, ctx, feedback):
video_path = _local_path(p["video_url"]) video_path = _local_path(p["video_url"])
out = await asyncio.to_thread( out = await asyncio.to_thread(
thumb.generate, thumb.generate,
pipeline_id=p["id"], video_path=video_path, pipeline_id=p["id"], video_path=video_path,
track_title=track.get("title", ""), overlay_text=True, track_title=ctx["title"], overlay_text=True,
) )
return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}} return {"next_state": "thumb_pending", "fields": {"thumbnail_url": out["url"]}}
async def _run_meta(p, track, feedback): async def _run_meta(p, ctx, feedback):
setup = db.get_youtube_setup() setup = db.get_youtube_setup()
trend_top = _get_trend_top() trend_top = _get_trend_top()
out = await metadata.generate( out = await metadata.generate(
track=track, template=setup["metadata_template"], track={"title": ctx["title"], "genre": ctx["genre"],
"duration_sec": ctx["duration_sec"], "moods": ctx["moods"]},
template=setup["metadata_template"],
trend_keywords=trend_top, feedback=feedback, trend_keywords=trend_top, feedback=feedback,
tracks=ctx["tracks"] if len(ctx["tracks"]) > 1 else None,
) )
return {"next_state": "meta_pending", return {"next_state": "meta_pending",
"fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}} "fields": {"metadata_json": json.dumps(out, ensure_ascii=False)}}
async def _run_review(p, track): async def _run_review(p, ctx):
setup = db.get_youtube_setup() setup = db.get_youtube_setup()
meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {} meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {}
result = await review.run_4_axis( result = await review.run_4_axis(
pipeline=p, track=track, pipeline=p,
video_meta={"length_sec": track.get("duration_sec", 120), track={"title": ctx["title"], "genre": ctx["genre"], "duration_sec": ctx["duration_sec"]},
video_meta={"length_sec": ctx["duration_sec"],
"resolution": setup["visual_defaults"]["resolution"]}, "resolution": setup["visual_defaults"]["resolution"]},
metadata=meta, thumbnail_url=p.get("thumbnail_url", ""), metadata=meta, thumbnail_url=p.get("thumbnail_url", ""),
trend_top=_get_trend_top(), trend_top=_get_trend_top(),
@@ -153,7 +264,7 @@ async def _run_review(p, track):
"fields": {"review_json": json.dumps(result, ensure_ascii=False)}} "fields": {"review_json": json.dumps(result, ensure_ascii=False)}}
async def _run_publish(p, track): async def _run_publish(p, ctx):
setup = db.get_youtube_setup() setup = db.get_youtube_setup()
meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {} meta = json.loads(p["metadata_json"]) if p.get("metadata_json") else {}
privacy = setup["publish_policy"].get("privacy", "private") privacy = setup["publish_policy"].get("privacy", "private")

View File

@@ -24,7 +24,10 @@ class VideoGenerationError(Exception):
def generate(*, pipeline_id: int, audio_path: str, cover_path: str, def generate(*, pipeline_id: int, audio_path: str, cover_path: str,
genre: str, duration_sec: int, resolution: str = "1920x1080", genre: str, duration_sec: int, resolution: str = "1920x1080",
style: str = "visualizer") -> dict: style: str = "essential",
background_mode: str = "static",
background_path: str | None = None,
tracks: list[dict] | None = None) -> dict:
"""원격 Windows GPU 서버 호출. 다운/실패 시 즉시 예외.""" """원격 Windows GPU 서버 호출. 다운/실패 시 즉시 예외."""
if not ENCODER_URL: if not ENCODER_URL:
raise VideoGenerationError( raise VideoGenerationError(
@@ -35,6 +38,7 @@ def generate(*, pipeline_id: int, audio_path: str, cover_path: str,
nas_audio = _container_to_nas(audio_path) nas_audio = _container_to_nas(audio_path)
nas_cover = _container_to_nas(cover_path) nas_cover = _container_to_nas(cover_path)
nas_output = _container_to_nas(out_path) nas_output = _container_to_nas(out_path)
nas_bg = _container_to_nas(background_path) if background_path else None
payload = { payload = {
"cover_path_nas": nas_cover, "cover_path_nas": nas_cover,
@@ -43,9 +47,13 @@ def generate(*, pipeline_id: int, audio_path: str, cover_path: str,
"resolution": resolution, "resolution": resolution,
"duration_sec": duration_sec, "duration_sec": duration_sec,
"style": style, "style": style,
"background_mode": background_mode,
"background_path_nas": nas_bg,
"tracks": tracks or [],
} }
logger.info("Windows 인코더 호출: pipeline=%d audio=%s", pipeline_id, audio_path) logger.info("Windows 인코더 호출: pipeline=%d audio=%s style=%s bg_mode=%s",
pipeline_id, audio_path, style, background_mode)
try: try:
with httpx.Client(timeout=ENCODER_TIMEOUT_S) as client: with httpx.Client(timeout=ENCODER_TIMEOUT_S) as client:
resp = client.post(f"{ENCODER_URL}/encode_video", json=payload) resp = client.post(f"{ENCODER_URL}/encode_video", json=payload)

View File

@@ -0,0 +1,51 @@
import os
import pytest
import respx
from httpx import Response
from app.pipeline import background, storage
@pytest.fixture
def tmp_storage(monkeypatch, tmp_path):
monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path))
return tmp_path
@pytest.mark.asyncio
@respx.mock
async def test_fetch_video_loop_success(tmp_storage, monkeypatch):
monkeypatch.setenv("PEXELS_API_KEY", "k")
video_url = "https://videos.pexels.com/video-files/123/sample.mp4"
respx.get("https://api.pexels.com/videos/search").mock(
return_value=Response(200, json={
"videos": [{
"id": 123, "duration": 10,
"video_files": [
{"quality": "hd", "width": 1920, "link": video_url},
],
}],
})
)
respx.get(video_url).mock(return_value=Response(200, content=b"\x00" * 4096))
result = await background.fetch_video_loop(pipeline_id=10, keyword="rainy window")
assert result["used_fallback"] is False
assert (tmp_storage / "10" / "loop.mp4").exists()
@pytest.mark.asyncio
async def test_fetch_video_loop_no_api_key(tmp_storage, monkeypatch):
monkeypatch.delenv("PEXELS_API_KEY", raising=False)
result = await background.fetch_video_loop(pipeline_id=11, keyword="rain")
assert result["used_fallback"] is True
@pytest.mark.asyncio
@respx.mock
async def test_fetch_video_loop_zero_results(tmp_storage, monkeypatch):
monkeypatch.setenv("PEXELS_API_KEY", "k")
respx.get("https://api.pexels.com/videos/search").mock(
return_value=Response(200, json={"videos": []})
)
result = await background.fetch_video_loop(pipeline_id=12, keyword="impossible-keyword")
assert result["used_fallback"] is True

View File

@@ -91,3 +91,59 @@ async def test_dalle_b64_response_handled(tmp_storage, monkeypatch):
prompt_template="x", mood="", track_title="X") prompt_template="x", mood="", track_title="X")
assert out["used_fallback"] is False assert out["used_fallback"] is False
assert (tmp_storage / "46" / "cover.jpg").exists() assert (tmp_storage / "46" / "cover.jpg").exists()
@pytest.mark.asyncio
@respx.mock
async def test_pexels_image_source(tmp_storage, monkeypatch):
monkeypatch.setenv("PEXELS_API_KEY", "test-pexels-key")
img_url = "https://images.pexels.com/photos/123/photo.jpg"
respx.get("https://api.pexels.com/v1/search").mock(
return_value=Response(200, json={
"photos": [{
"id": 123,
"src": {"large2x": img_url, "original": img_url},
}],
})
)
png_bytes = bytes.fromhex(
"89504e470d0a1a0a0000000d49484452000000010000000108020000009077"
"53de0000000c4944415478da6300010000050001"
"0d0a2db40000000049454e44ae426082"
)
respx.get(img_url).mock(return_value=Response(200, content=png_bytes))
out = await cover.generate(
pipeline_id=99, genre="lo-fi", prompt_template="ignored",
mood="chill", track_title="Mix",
image_source="pexels",
)
assert out["used_fallback"] is False
assert out["url"].endswith("/cover.jpg")
assert (tmp_storage / "99" / "cover.jpg").exists()
@pytest.mark.asyncio
async def test_pexels_no_api_key_falls_back(tmp_storage, monkeypatch):
monkeypatch.delenv("PEXELS_API_KEY", raising=False)
out = await cover.generate(
pipeline_id=98, genre="lo-fi", prompt_template="x",
mood="", track_title="Test",
image_source="pexels",
)
assert out["used_fallback"] is True
@pytest.mark.asyncio
@respx.mock
async def test_pexels_zero_results_falls_back(tmp_storage, monkeypatch):
monkeypatch.setenv("PEXELS_API_KEY", "test-key")
respx.get("https://api.pexels.com/v1/search").mock(
return_value=Response(200, json={"photos": []})
)
out = await cover.generate(
pipeline_id=97, genre="lo-fi", prompt_template="x",
mood="", track_title="Test",
image_source="pexels",
)
assert out["used_fallback"] is True

View File

@@ -80,3 +80,75 @@ async def test_metadata_falls_back_on_api_error(monkeypatch):
) )
assert result["used_fallback"] is True assert result["used_fallback"] is True
assert "Drive" in result["title"] assert "Drive" in result["title"]
@pytest.mark.asyncio
@respx.mock
async def test_metadata_with_tracks_includes_chapter_format(monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "k")
captured = {}
def hook(req):
import json as _json
captured["body"] = _json.loads(req.content)
return Response(200, json={"content": [{"type": "text", "text":
'{"title":"Lo-Fi Mix 3 Tracks","description":"Track 1: [00:00] T1\\nTrack 2: [03:00] T2",'
'"tags":["lofi","mix"],"category_id":10}'}]})
respx.post("https://api.anthropic.com/v1/messages").mock(side_effect=hook)
result = await metadata.generate(
track={"title": "Mix", "genre": "mix", "duration_sec": 600,
"moods": []},
template={"title": "{title}", "description": "{title}",
"tags": [], "category_id": 10},
trend_keywords=[],
tracks=[
{"id": 1, "title": "T1", "start_offset_sec": 0, "duration_sec": 180},
{"id": 2, "title": "T2", "start_offset_sec": 180, "duration_sec": 200},
{"id": 3, "title": "T3", "start_offset_sec": 380, "duration_sec": 220},
],
)
body_str = str(captured["body"])
assert "T1" in body_str and "T2" in body_str and "T3" in body_str
assert "00:00" in body_str
assert result["used_fallback"] is False
@pytest.mark.asyncio
async def test_metadata_fallback_with_tracks(monkeypatch):
"""API 키 없을 때 폴백에서도 트랙 챕터 포함."""
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
result = await metadata.generate(
track={"title": "Mix", "genre": "mix", "duration_sec": 600, "moods": []},
template={"title": "{title}", "description": "{title}",
"tags": [], "category_id": 10},
trend_keywords=[],
tracks=[
{"id": 1, "title": "T1", "start_offset_sec": 0, "duration_sec": 180},
{"id": 2, "title": "T2", "start_offset_sec": 180, "duration_sec": 200},
],
)
assert result["used_fallback"] is True
assert "00:00" in result["description"]
assert "T1" in result["description"]
assert "T2" in result["description"]
def test_format_chapters_under_hour():
from app.pipeline.metadata import _format_chapters
out = _format_chapters([
{"start_offset_sec": 0, "title": "T1"},
{"start_offset_sec": 180, "title": "T2"},
])
assert "00:00 T1" in out
assert "03:00 T2" in out
def test_format_chapters_over_hour():
from app.pipeline.metadata import _format_chapters
out = _format_chapters([
{"start_offset_sec": 0, "title": "T1"},
{"start_offset_sec": 3700, "title": "T2"},
])
assert "00:00 T1" in out
assert "01:01:40 T2" in out

View File

@@ -0,0 +1,61 @@
import pytest
from unittest.mock import patch, MagicMock
from app.pipeline.orchestrator import _resolve_input
def test_resolve_input_track():
pipeline = {"id": 1, "track_id": 13, "compile_job_id": None}
track = {
"id": 13, "title": "Lo-Fi Drive", "genre": "lo-fi",
"moods": ["chill"], "duration_sec": 176,
"file_path": "/app/data/x.mp3", "audio_url": "/media/music/x.mp3",
}
with patch("app.pipeline.orchestrator.db.get_track_by_id", return_value=track):
result = _resolve_input(pipeline)
assert result["audio_path"] == "/app/data/x.mp3"
assert result["duration_sec"] == 176
assert len(result["tracks"]) == 1
assert result["tracks"][0]["start_offset_sec"] == 0
assert result["title"] == "Lo-Fi Drive"
assert result["genre"] == "lo-fi"
def test_resolve_input_compile_job():
pipeline = {"id": 2, "track_id": None, "compile_job_id": 5}
job = {
"id": 5, "status": "succeeded", "title": "Chill Mix",
"audio_path": "/app/data/compiles/5.mp3",
"track_ids": [13, 14, 15],
"crossfade_sec": 3,
}
tracks = {
13: {"id": 13, "title": "T1", "duration_sec": 180},
14: {"id": 14, "title": "T2", "duration_sec": 200},
15: {"id": 15, "title": "T3", "duration_sec": 150},
}
with patch("app.pipeline.orchestrator.db.get_compile_job", return_value=job), \
patch("app.pipeline.orchestrator.db.get_track_by_id", side_effect=lambda i: tracks[i]):
result = _resolve_input(pipeline)
assert result["audio_path"] == "/app/data/compiles/5.mp3"
# 누적 = 180+200+150 - 2*3(crossfade pair gaps) = 524
assert result["duration_sec"] == 524
assert len(result["tracks"]) == 3
assert result["tracks"][0]["start_offset_sec"] == 0
assert result["tracks"][1]["start_offset_sec"] == 177 # 180 - 3
assert result["tracks"][2]["start_offset_sec"] == 374 # 177 + 200 - 3
assert result["title"] == "Chill Mix"
assert result["genre"] == "mix"
def test_resolve_input_compile_not_ready():
pipeline = {"id": 3, "track_id": None, "compile_job_id": 6}
job = {"id": 6, "status": "rendering"}
with patch("app.pipeline.orchestrator.db.get_compile_job", return_value=job):
with pytest.raises(ValueError, match="not ready"):
_resolve_input(pipeline)
def test_resolve_input_neither():
pipeline = {"id": 4, "track_id": None, "compile_job_id": None}
with pytest.raises(ValueError):
_resolve_input(pipeline)

View File

@@ -94,3 +94,129 @@ def test_update_pipeline_job_rejects_unknown_column(fresh_db):
job_id = db.create_pipeline_job(pid, "cover") job_id = db.create_pipeline_job(pid, "cover")
with pytest.raises(ValueError): with pytest.raises(ValueError):
db.update_pipeline_job(job_id, evil_col="x") db.update_pipeline_job(job_id, evil_col="x")
def test_create_pipeline_with_compile_job(fresh_db):
pid = db.create_pipeline(track_id=None, compile_job_id=42,
visual_style="essential", background_mode="static",
background_keyword="rainy cafe")
row = db.get_pipeline(pid)
assert row["track_id"] is None
assert row["compile_job_id"] == 42
assert row["visual_style"] == "essential"
assert row["background_mode"] == "static"
assert row["background_keyword"] == "rainy cafe"
def test_create_pipeline_with_track_keeps_defaults(fresh_db):
pid = db.create_pipeline(track_id=1)
row = db.get_pipeline(pid)
assert row["track_id"] == 1
assert row["compile_job_id"] is None
assert row["visual_style"] == "essential" # default
assert row["background_mode"] == "static" # default
assert row["background_keyword"] is None
def test_create_pipeline_rejects_neither(fresh_db):
import pytest
with pytest.raises(ValueError):
db.create_pipeline()
def test_create_pipeline_rejects_both(fresh_db):
import pytest
with pytest.raises(ValueError):
db.create_pipeline(track_id=1, compile_job_id=2)
def test_migration_idempotent(monkeypatch, tmp_path):
"""init_db 두 번 호출해도 ALTER TABLE 에러 없이 통과."""
db_path = tmp_path / "music.db"
monkeypatch.setattr(db, "DB_PATH", str(db_path))
db.init_db()
db.init_db() # 두 번째 — 컬럼 이미 존재해도 OK여야
import sqlite3
conn = sqlite3.connect(str(db_path))
cols = [r[1] for r in conn.execute("PRAGMA table_info(video_pipelines)").fetchall()]
assert "compile_job_id" in cols
assert "visual_style" in cols
assert "background_mode" in cols
assert "background_keyword" in cols
conn.close()
def test_pipeline_response_includes_compile_title(fresh_db):
"""compile_jobs LEFT JOIN — pipeline 응답에 compile_title 포함."""
import sqlite3
conn = sqlite3.connect(db.DB_PATH)
cur = conn.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS compile_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, status TEXT,
track_ids_json TEXT, crossfade_sec INTEGER, audio_path TEXT, created_at TEXT)""")
cur.execute("INSERT INTO compile_jobs (id, title, status) VALUES (1, 'My Mix', 'succeeded')")
conn.commit()
conn.close()
pid = db.create_pipeline(compile_job_id=1)
p = db.get_pipeline(pid)
assert p.get("compile_title") == "My Mix"
def test_migration_relaxes_existing_not_null_track_id(monkeypatch, tmp_path):
"""기존 production-like DB(track_id NOT NULL)를 nullable로 마이그레이션."""
db_path = tmp_path / "music.db"
monkeypatch.setattr(db, "DB_PATH", str(db_path))
# 1) 옛 스키마(track_id NOT NULL)로 직접 생성
import sqlite3
conn = sqlite3.connect(str(db_path))
conn.execute("""
CREATE TABLE video_pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER NOT NULL,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
video_url TEXT,
thumbnail_url TEXT,
metadata_json TEXT,
review_json TEXT,
youtube_video_id TEXT,
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
cancelled_at TEXT,
failed_reason TEXT
)
""")
# 옛 데이터 1행
conn.execute("""
INSERT INTO video_pipelines (track_id, state_started_at, created_at, updated_at)
VALUES (1, '2026-05-01T00:00:00', '2026-05-01T00:00:00', '2026-05-01T00:00:00')
""")
conn.commit()
conn.close()
# 2) init_db 실행 (마이그레이션 트리거)
db.init_db()
# 3) NOT NULL 제약 해제 확인
conn = sqlite3.connect(str(db_path))
cur = conn.cursor()
cur.execute("PRAGMA table_info(video_pipelines)")
cols = {r[1]: r[3] for r in cur.fetchall()} # name → notnull
assert cols["track_id"] == 0 # not null released
# 새 컬럼들도 존재
assert "compile_job_id" in cols
assert "visual_style" in cols
# 기존 데이터 보존
cur.execute("SELECT track_id FROM video_pipelines WHERE id=1")
assert cur.fetchone()[0] == 1
conn.close()
# 4) compile_job_id-only INSERT 가능 확인
pid = db.create_pipeline(compile_job_id=99)
p = db.get_pipeline(pid)
assert p["track_id"] is None
assert p["compile_job_id"] == 99

View File

@@ -108,3 +108,69 @@ def test_youtube_status_when_disconnected(client):
r = client.get("/api/music/youtube/status") r = client.get("/api/music/youtube/status")
assert r.status_code == 200 assert r.status_code == 200
assert r.json() == {"connected": False} assert r.json() == {"connected": False}
def test_create_pipeline_with_compile_job(client, monkeypatch):
import sqlite3
conn = sqlite3.connect(db.DB_PATH)
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO compile_jobs (title, track_ids_json, crossfade_sec,
audio_path, status, created_at)
VALUES ('Test Mix', '[1,2,3]', 3, '/app/data/compiles/9.mp3',
'succeeded', datetime())
""")
except sqlite3.OperationalError:
pytest.skip("compile_jobs schema mismatch")
conn.commit()
cid = cur.lastrowid
conn.close()
r = client.post("/api/music/pipeline", json={"compile_job_id": cid})
assert r.status_code == 201
body = r.json()
assert body["track_id"] is None
assert body["compile_job_id"] == cid
assert body["visual_style"] == "essential"
def test_create_pipeline_rejects_both_inputs(client):
r = client.post("/api/music/pipeline", json={"track_id": 1, "compile_job_id": 1})
assert r.status_code == 400
def test_create_pipeline_rejects_neither(client):
r = client.post("/api/music/pipeline", json={})
assert r.status_code == 400
def test_create_pipeline_rejects_compile_not_ready(client):
import sqlite3
conn = sqlite3.connect(db.DB_PATH)
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO compile_jobs (title, status, created_at)
VALUES ('Pending', 'rendering', datetime())
""")
except sqlite3.OperationalError:
pytest.skip("compile_jobs schema mismatch")
conn.commit()
cid = cur.lastrowid
conn.close()
r = client.post("/api/music/pipeline", json={"compile_job_id": cid})
assert r.status_code == 400
def test_create_pipeline_with_visual_options(client):
r = client.post("/api/music/pipeline", json={
"track_id": 1, "visual_style": "single",
"background_mode": "video_loop", "background_keyword": "rain",
})
assert r.status_code == 201
body = r.json()
assert body["visual_style"] == "single"
assert body["background_mode"] == "video_loop"
assert body["background_keyword"] == "rain"

View File

@@ -111,3 +111,65 @@ def test_pipeline_reject_and_regenerate(client):
assert p["feedback_count_per_step"]["cover"] == 1 assert p["feedback_count_per_step"]["cover"] == 1
history = db.get_feedback_history(pid) history = db.get_feedback_history(pid)
assert history[0]["feedback_text"] == "더 어둡게" assert history[0]["feedback_text"] == "더 어둡게"
@patch("app.pipeline.youtube.upload_video", return_value={"video_id": "MIX_VID"})
@patch("app.pipeline.review.run_4_axis", new=AsyncMock(return_value={
"metadata_quality": {"score": 90, "notes": ""},
"policy_compliance": {"score": 95, "issues": []},
"viewer_experience": {"score": 85, "notes": ""},
"trend_alignment": {"score": 70, "matched_keywords": []},
"weighted_total": 87.0, "verdict": "pass", "summary": "ok",
"used_fallback": False,
}))
@patch("app.pipeline.metadata.generate", new=AsyncMock(return_value={
"title": "Mix", "description": "Track desc",
"tags": ["lofi"], "category_id": 10,
"used_fallback": False, "error": None,
}))
@patch("app.pipeline.thumb.generate", return_value={
"url": "/media/videos/X/thumbnail.jpg", "used_fallback": False,
})
@patch("app.pipeline.video.generate", return_value={
"url": "/media/videos/X/video.mp4", "used_fallback": False, "duration_sec": 600,
})
@patch("app.pipeline.cover.generate", new=AsyncMock(return_value={
"url": "/media/videos/X/cover.jpg", "used_fallback": False, "error": None,
}))
def test_full_pipeline_compile_job_happy_path(mock_video, mock_thumb, mock_yt, client):
# compile_job 1개 추가 (succeeded)
conn = sqlite3.connect(db.DB_PATH)
cur = conn.cursor()
try:
cur.execute("""
INSERT INTO compile_jobs (title, track_ids, crossfade_sec, output_path,
status, created_at)
VALUES ('Test Mix', '[1]', 3, '/app/data/compiles/1.mp3', 'succeeded', datetime())
""")
except sqlite3.OperationalError:
pytest.skip("compile_jobs schema mismatch — skip integration test")
conn.commit()
cid = cur.lastrowid
conn.close()
pid = client.post("/api/music/pipeline", json={"compile_job_id": cid}).json()["id"]
assert db.get_pipeline(pid)["state"] == "created"
assert db.get_pipeline(pid)["compile_job_id"] == cid
assert db.get_pipeline(pid)["track_id"] is None
client.post(f"/api/music/pipeline/{pid}/start")
p = db.get_pipeline(pid)
assert p["state"] == "cover_pending"
for step in ["cover", "video", "thumb", "meta"]:
r = client.post(f"/api/music/pipeline/{pid}/feedback",
json={"step": step, "intent": "approve"})
assert r.status_code == 202
p = db.get_pipeline(pid)
assert p["state"] == "publish_pending"
client.post(f"/api/music/pipeline/{pid}/publish")
p = db.get_pipeline(pid)
assert p["state"] == "published"
assert p["youtube_video_id"] == "MIX_VID"

View File

@@ -130,3 +130,33 @@ def test_container_to_nas_music_path(monkeypatch):
monkeypatch.setattr(video, "NAS_VIDEOS_ROOT", "/volume1/docker/webpage/data/videos") monkeypatch.setattr(video, "NAS_VIDEOS_ROOT", "/volume1/docker/webpage/data/videos")
monkeypatch.setattr(video, "NAS_MUSIC_ROOT", "/volume1/docker/webpage/data/music") monkeypatch.setattr(video, "NAS_MUSIC_ROOT", "/volume1/docker/webpage/data/music")
assert video._container_to_nas("/app/data/abc.mp3") == "/volume1/docker/webpage/data/music/abc.mp3" assert video._container_to_nas("/app/data/abc.mp3") == "/volume1/docker/webpage/data/music/abc.mp3"
@respx.mock
def test_generate_video_passes_essential_params(encoder_env, tmp_path, monkeypatch):
monkeypatch.setattr(storage, "VIDEO_DATA_DIR", str(tmp_path))
captured = {}
def hook(req):
import json as _json
captured["body"] = _json.loads(req.content)
return Response(200, json={"ok": True, "duration_ms": 5000,
"output_path_nas": "/v/3/video.mp4",
"output_bytes": 10_000_000,
"encoder": "h264_nvenc", "preset": "p4"})
respx.post("http://192.168.45.59:8765/encode_video").mock(side_effect=hook)
out = video.generate(
pipeline_id=3, audio_path="/app/data/x.mp3",
cover_path="/app/data/videos/3/cover.jpg",
genre="mix", duration_sec=3600, resolution="1920x1080",
style="essential", background_mode="video_loop",
background_path="/app/data/videos/3/loop.mp4",
tracks=[{"id": 1, "title": "T1", "start_offset_sec": 0}],
)
body = captured["body"]
assert body["style"] == "essential"
assert body["background_mode"] == "video_loop"
assert body["background_path_nas"] == "/volume1/docker/webpage/data/videos/3/loop.mp4"
assert body["tracks"][0]["title"] == "T1"
assert out["url"].endswith("/3/video.mp4")