feat(music-lab): video_pipelines 4 컬럼 추가 + compile_jobs JOIN

- _add_column_if_missing 헬퍼 추가 (idempotent ALTER TABLE)
- video_pipelines에 compile_job_id, visual_style, background_mode, background_keyword 컬럼 추가
- track_id를 nullable로 변경 (compile_job_id 입력 모드 지원)
- create_pipeline에 compile_job_id XOR track_id 검증 추가
- get_pipeline / list_pipelines에 compile_jobs LEFT JOIN — compile_title 노출

Task 1 of 17: Essential Mix pipeline DB migration

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-09 13:01:40 +09:00
parent ebbfa6299a
commit 70a256bbe4
2 changed files with 234 additions and 8 deletions

View File

@@ -14,6 +14,85 @@ def _conn() -> sqlite3.Connection:
return conn
def _add_column_if_missing(cursor, table: str, column: str, ddl: str) -> None:
"""SQLite-safe ALTER TABLE ADD COLUMN — idempotent.
SQLite의 ALTER TABLE은 컬럼 존재 시 에러. PRAGMA로 미리 확인.
"""
cursor.execute(f"PRAGMA table_info({table})")
existing = {row[1] for row in cursor.fetchall()}
if column not in existing:
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {ddl}")
def _is_column_not_null(cursor, table: str, column: str) -> bool:
"""PRAGMA table_info row format: (cid, name, type, notnull, dflt_value, pk)."""
cursor.execute(f"PRAGMA table_info({table})")
for row in cursor.fetchall():
if row[1] == column:
return row[3] == 1
return False
def _relax_video_pipelines_track_id_nullable(cursor) -> None:
"""track_id NOT NULL → NULL (compile_job_id 만 있는 pipeline 지원).
SQLite는 ALTER COLUMN을 지원하지 않아 표준 패턴 — 새 테이블 생성 → 데이터 복사 → 교체.
Idempotent: 이미 NULL이면 no-op.
"""
if not _is_column_not_null(cursor, "video_pipelines", "track_id"):
return # already nullable
# 새 컬럼 4개도 함께 포함된 최종 스키마로 새 테이블 생성
cursor.execute("""
CREATE TABLE video_pipelines_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
video_url TEXT,
thumbnail_url TEXT,
metadata_json TEXT,
review_json TEXT,
youtube_video_id TEXT,
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
cancelled_at TEXT,
failed_reason TEXT,
compile_job_id INTEGER,
visual_style TEXT NOT NULL DEFAULT 'essential',
background_mode TEXT NOT NULL DEFAULT 'static',
background_keyword TEXT
)
""")
# 기존 컬럼 모두 명시적으로 SELECT (새 컬럼은 default로 채워짐)
cursor.execute("""
INSERT INTO video_pipelines_new
(id, track_id, state, state_started_at, cover_url, video_url,
thumbnail_url, metadata_json, review_json, youtube_video_id,
feedback_count_per_step, last_telegram_msg_ids,
created_at, updated_at, cancelled_at, failed_reason,
compile_job_id, visual_style, background_mode, background_keyword)
SELECT
id, track_id, state, state_started_at, cover_url, video_url,
thumbnail_url, metadata_json, review_json, youtube_video_id,
feedback_count_per_step, last_telegram_msg_ids,
created_at, updated_at, cancelled_at, failed_reason,
COALESCE(compile_job_id, NULL),
COALESCE(visual_style, 'essential'),
COALESCE(background_mode, 'static'),
COALESCE(background_keyword, NULL)
FROM video_pipelines
""")
cursor.execute("DROP TABLE video_pipelines")
cursor.execute("ALTER TABLE video_pipelines_new RENAME TO video_pipelines")
def init_db() -> None:
with _conn() as conn:
conn.execute("""
@@ -186,10 +265,11 @@ def init_db() -> None:
""")
# ── YouTube pipeline 테이블 (5개) ─────────────────────────────────
# track_id는 nullable: compile_job_id로 입력하는 essential mix 모드 지원
conn.execute("""
CREATE TABLE IF NOT EXISTS video_pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER NOT NULL,
track_id INTEGER,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
@@ -206,6 +286,13 @@ def init_db() -> None:
failed_reason TEXT
)
""")
# Migration for essential mix pipeline (task 2026-05-09)
cur = conn.cursor()
_add_column_if_missing(cur, "video_pipelines", "compile_job_id", "INTEGER")
_add_column_if_missing(cur, "video_pipelines", "visual_style", "TEXT NOT NULL DEFAULT 'essential'")
_add_column_if_missing(cur, "video_pipelines", "background_mode", "TEXT NOT NULL DEFAULT 'static'")
_add_column_if_missing(cur, "video_pipelines", "background_keyword", "TEXT")
_relax_video_pipelines_track_id_nullable(cur)
conn.execute("""
CREATE TABLE IF NOT EXISTS pipeline_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -946,22 +1033,34 @@ def _parse_pipeline_row(row: sqlite3.Row) -> Dict[str, Any]:
return d
def create_pipeline(track_id: int) -> int:
def create_pipeline(track_id: Optional[int] = None, *,
compile_job_id: Optional[int] = None,
visual_style: str = "essential",
background_mode: str = "static",
background_keyword: Optional[str] = None) -> int:
"""track_id XOR compile_job_id 검증."""
if (track_id is None) == (compile_job_id is None):
raise ValueError("track_id와 compile_job_id 중 정확히 하나만 지정")
with _conn() as conn:
cur = conn.cursor()
now = _now()
cur = conn.execute("""
INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at)
VALUES (?, 'created', ?, ?, ?)
""", (track_id, now, now, now))
cur.execute("""
INSERT INTO video_pipelines
(track_id, compile_job_id, visual_style, background_mode, background_keyword,
state, state_started_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 'created', ?, ?, ?)
""", (track_id, compile_job_id, visual_style, background_mode,
background_keyword, now, now, now))
return cur.lastrowid
def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("""
SELECT vp.*, ml.title AS track_title
SELECT vp.*, ml.title AS track_title, cj.title AS compile_title
FROM video_pipelines vp
LEFT JOIN music_library ml ON ml.id = vp.track_id
LEFT JOIN compile_jobs cj ON cj.id = vp.compile_job_id
WHERE vp.id = ?
""", (pid,)).fetchone()
if not row:
@@ -991,9 +1090,10 @@ def update_pipeline_state(pid: int, state: str, **fields) -> None:
def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]:
sql = """
SELECT vp.*, ml.title AS track_title
SELECT vp.*, ml.title AS track_title, cj.title AS compile_title
FROM video_pipelines vp
LEFT JOIN music_library ml ON ml.id = vp.track_id
LEFT JOIN compile_jobs cj ON cj.id = vp.compile_job_id
"""
if active_only:
sql += " WHERE vp.state NOT IN ('published','cancelled','failed','awaiting_manual')"

View File

@@ -94,3 +94,129 @@ def test_update_pipeline_job_rejects_unknown_column(fresh_db):
job_id = db.create_pipeline_job(pid, "cover")
with pytest.raises(ValueError):
db.update_pipeline_job(job_id, evil_col="x")
def test_create_pipeline_with_compile_job(fresh_db):
pid = db.create_pipeline(track_id=None, compile_job_id=42,
visual_style="essential", background_mode="static",
background_keyword="rainy cafe")
row = db.get_pipeline(pid)
assert row["track_id"] is None
assert row["compile_job_id"] == 42
assert row["visual_style"] == "essential"
assert row["background_mode"] == "static"
assert row["background_keyword"] == "rainy cafe"
def test_create_pipeline_with_track_keeps_defaults(fresh_db):
pid = db.create_pipeline(track_id=1)
row = db.get_pipeline(pid)
assert row["track_id"] == 1
assert row["compile_job_id"] is None
assert row["visual_style"] == "essential" # default
assert row["background_mode"] == "static" # default
assert row["background_keyword"] is None
def test_create_pipeline_rejects_neither(fresh_db):
import pytest
with pytest.raises(ValueError):
db.create_pipeline()
def test_create_pipeline_rejects_both(fresh_db):
import pytest
with pytest.raises(ValueError):
db.create_pipeline(track_id=1, compile_job_id=2)
def test_migration_idempotent(monkeypatch, tmp_path):
"""init_db 두 번 호출해도 ALTER TABLE 에러 없이 통과."""
db_path = tmp_path / "music.db"
monkeypatch.setattr(db, "DB_PATH", str(db_path))
db.init_db()
db.init_db() # 두 번째 — 컬럼 이미 존재해도 OK여야
import sqlite3
conn = sqlite3.connect(str(db_path))
cols = [r[1] for r in conn.execute("PRAGMA table_info(video_pipelines)").fetchall()]
assert "compile_job_id" in cols
assert "visual_style" in cols
assert "background_mode" in cols
assert "background_keyword" in cols
conn.close()
def test_pipeline_response_includes_compile_title(fresh_db):
"""compile_jobs LEFT JOIN — pipeline 응답에 compile_title 포함."""
import sqlite3
conn = sqlite3.connect(db.DB_PATH)
cur = conn.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS compile_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, status TEXT,
track_ids_json TEXT, crossfade_sec INTEGER, audio_path TEXT, created_at TEXT)""")
cur.execute("INSERT INTO compile_jobs (id, title, status) VALUES (1, 'My Mix', 'succeeded')")
conn.commit()
conn.close()
pid = db.create_pipeline(compile_job_id=1)
p = db.get_pipeline(pid)
assert p.get("compile_title") == "My Mix"
def test_migration_relaxes_existing_not_null_track_id(monkeypatch, tmp_path):
"""기존 production-like DB(track_id NOT NULL)를 nullable로 마이그레이션."""
db_path = tmp_path / "music.db"
monkeypatch.setattr(db, "DB_PATH", str(db_path))
# 1) 옛 스키마(track_id NOT NULL)로 직접 생성
import sqlite3
conn = sqlite3.connect(str(db_path))
conn.execute("""
CREATE TABLE video_pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER NOT NULL,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
video_url TEXT,
thumbnail_url TEXT,
metadata_json TEXT,
review_json TEXT,
youtube_video_id TEXT,
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
cancelled_at TEXT,
failed_reason TEXT
)
""")
# 옛 데이터 1행
conn.execute("""
INSERT INTO video_pipelines (track_id, state_started_at, created_at, updated_at)
VALUES (1, '2026-05-01T00:00:00', '2026-05-01T00:00:00', '2026-05-01T00:00:00')
""")
conn.commit()
conn.close()
# 2) init_db 실행 (마이그레이션 트리거)
db.init_db()
# 3) NOT NULL 제약 해제 확인
conn = sqlite3.connect(str(db_path))
cur = conn.cursor()
cur.execute("PRAGMA table_info(video_pipelines)")
cols = {r[1]: r[3] for r in cur.fetchall()} # name → notnull
assert cols["track_id"] == 0 # not null released
# 새 컬럼들도 존재
assert "compile_job_id" in cols
assert "visual_style" in cols
# 기존 데이터 보존
cur.execute("SELECT track_id FROM video_pipelines WHERE id=1")
assert cur.fetchone()[0] == 1
conn.close()
# 4) compile_job_id-only INSERT 가능 확인
pid = db.create_pipeline(compile_job_id=99)
p = db.get_pipeline(pid)
assert p["track_id"] is None
assert p["compile_job_id"] == 99