- _add_column_if_missing 헬퍼 추가 (idempotent ALTER TABLE) - video_pipelines에 compile_job_id, visual_style, background_mode, background_keyword 컬럼 추가 - track_id를 nullable로 변경 (compile_job_id 입력 모드 지원) - create_pipeline에 compile_job_id XOR track_id 검증 추가 - get_pipeline / list_pipelines에 compile_jobs LEFT JOIN — compile_title 노출 Task 1 of 17: Essential Mix pipeline DB migration Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
223 lines
7.6 KiB
Python
223 lines
7.6 KiB
Python
import os
|
|
import tempfile
|
|
import pytest
|
|
from app import db
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(monkeypatch, tmp_path):
|
|
db_path = tmp_path / "music.db"
|
|
monkeypatch.setattr(db, "DB_PATH", str(db_path))
|
|
db.init_db()
|
|
return db_path
|
|
|
|
|
|
def test_create_pipeline_inserts_row(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
row = db.get_pipeline(pid)
|
|
assert row["id"] == pid
|
|
assert row["state"] == "created"
|
|
assert row["track_id"] == 1
|
|
assert row["feedback_count_per_step"] == {}
|
|
|
|
|
|
def test_update_pipeline_state_records_started_at(fresh_db, freezer):
|
|
pid = db.create_pipeline(track_id=1)
|
|
freezer.move_to("2026-05-07T08:00:00")
|
|
db.update_pipeline_state(pid, "cover_pending")
|
|
row = db.get_pipeline(pid)
|
|
assert row["state"] == "cover_pending"
|
|
assert row["state_started_at"] == "2026-05-07T08:00:00"
|
|
|
|
|
|
def test_increment_feedback_count(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
db.increment_feedback_count(pid, "cover")
|
|
db.increment_feedback_count(pid, "cover")
|
|
row = db.get_pipeline(pid)
|
|
assert row["feedback_count_per_step"] == {"cover": 2}
|
|
|
|
|
|
def test_record_feedback(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
db.record_feedback(pid, "cover", "더 어둡게")
|
|
rows = db.get_feedback_history(pid)
|
|
assert len(rows) == 1
|
|
assert rows[0]["feedback_text"] == "더 어둡게"
|
|
|
|
|
|
def test_create_pipeline_job_lifecycle(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
job_id = db.create_pipeline_job(pid, "cover")
|
|
db.update_pipeline_job(job_id, status="running")
|
|
db.update_pipeline_job(job_id, status="succeeded", duration_ms=1234)
|
|
jobs = db.list_pipeline_jobs(pid)
|
|
assert jobs[0]["status"] == "succeeded"
|
|
assert jobs[0]["duration_ms"] == 1234
|
|
|
|
|
|
def test_youtube_setup_default_row_created_on_init(fresh_db):
|
|
setup = db.get_youtube_setup()
|
|
assert setup["review_threshold"] == 60
|
|
assert "metadata_template_json" in setup
|
|
|
|
|
|
def test_youtube_oauth_token_upsert(fresh_db):
|
|
db.upsert_oauth_token(
|
|
channel_id="UC123",
|
|
channel_title="My Channel",
|
|
avatar_url="https://...",
|
|
refresh_token="r1",
|
|
access_token="a1",
|
|
expires_at="2026-05-07T09:00:00",
|
|
)
|
|
tok = db.get_oauth_token()
|
|
assert tok["channel_id"] == "UC123"
|
|
assert tok["refresh_token"] == "r1"
|
|
db.upsert_oauth_token(
|
|
channel_id="UC123", channel_title="My Channel",
|
|
avatar_url=None, refresh_token="r2",
|
|
access_token="a2", expires_at="2026-05-07T10:00:00",
|
|
)
|
|
tok = db.get_oauth_token()
|
|
assert tok["refresh_token"] == "r2" # upsert
|
|
|
|
|
|
def test_update_pipeline_state_rejects_unknown_column(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
with pytest.raises(ValueError):
|
|
db.update_pipeline_state(pid, "cover_pending", evil_col="x; DROP TABLE")
|
|
|
|
|
|
def test_update_pipeline_job_rejects_unknown_column(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
job_id = db.create_pipeline_job(pid, "cover")
|
|
with pytest.raises(ValueError):
|
|
db.update_pipeline_job(job_id, evil_col="x")
|
|
|
|
|
|
def test_create_pipeline_with_compile_job(fresh_db):
|
|
pid = db.create_pipeline(track_id=None, compile_job_id=42,
|
|
visual_style="essential", background_mode="static",
|
|
background_keyword="rainy cafe")
|
|
row = db.get_pipeline(pid)
|
|
assert row["track_id"] is None
|
|
assert row["compile_job_id"] == 42
|
|
assert row["visual_style"] == "essential"
|
|
assert row["background_mode"] == "static"
|
|
assert row["background_keyword"] == "rainy cafe"
|
|
|
|
|
|
def test_create_pipeline_with_track_keeps_defaults(fresh_db):
|
|
pid = db.create_pipeline(track_id=1)
|
|
row = db.get_pipeline(pid)
|
|
assert row["track_id"] == 1
|
|
assert row["compile_job_id"] is None
|
|
assert row["visual_style"] == "essential" # default
|
|
assert row["background_mode"] == "static" # default
|
|
assert row["background_keyword"] is None
|
|
|
|
|
|
def test_create_pipeline_rejects_neither(fresh_db):
|
|
import pytest
|
|
with pytest.raises(ValueError):
|
|
db.create_pipeline()
|
|
|
|
|
|
def test_create_pipeline_rejects_both(fresh_db):
|
|
import pytest
|
|
with pytest.raises(ValueError):
|
|
db.create_pipeline(track_id=1, compile_job_id=2)
|
|
|
|
|
|
def test_migration_idempotent(monkeypatch, tmp_path):
|
|
"""init_db 두 번 호출해도 ALTER TABLE 에러 없이 통과."""
|
|
db_path = tmp_path / "music.db"
|
|
monkeypatch.setattr(db, "DB_PATH", str(db_path))
|
|
db.init_db()
|
|
db.init_db() # 두 번째 — 컬럼 이미 존재해도 OK여야
|
|
import sqlite3
|
|
conn = sqlite3.connect(str(db_path))
|
|
cols = [r[1] for r in conn.execute("PRAGMA table_info(video_pipelines)").fetchall()]
|
|
assert "compile_job_id" in cols
|
|
assert "visual_style" in cols
|
|
assert "background_mode" in cols
|
|
assert "background_keyword" in cols
|
|
conn.close()
|
|
|
|
|
|
def test_pipeline_response_includes_compile_title(fresh_db):
|
|
"""compile_jobs LEFT JOIN — pipeline 응답에 compile_title 포함."""
|
|
import sqlite3
|
|
conn = sqlite3.connect(db.DB_PATH)
|
|
cur = conn.cursor()
|
|
cur.execute("""CREATE TABLE IF NOT EXISTS compile_jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, status TEXT,
|
|
track_ids_json TEXT, crossfade_sec INTEGER, audio_path TEXT, created_at TEXT)""")
|
|
cur.execute("INSERT INTO compile_jobs (id, title, status) VALUES (1, 'My Mix', 'succeeded')")
|
|
conn.commit()
|
|
conn.close()
|
|
pid = db.create_pipeline(compile_job_id=1)
|
|
p = db.get_pipeline(pid)
|
|
assert p.get("compile_title") == "My Mix"
|
|
|
|
|
|
def test_migration_relaxes_existing_not_null_track_id(monkeypatch, tmp_path):
|
|
"""기존 production-like DB(track_id NOT NULL)를 nullable로 마이그레이션."""
|
|
db_path = tmp_path / "music.db"
|
|
monkeypatch.setattr(db, "DB_PATH", str(db_path))
|
|
|
|
# 1) 옛 스키마(track_id NOT NULL)로 직접 생성
|
|
import sqlite3
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("""
|
|
CREATE TABLE video_pipelines (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
track_id INTEGER NOT NULL,
|
|
state TEXT NOT NULL DEFAULT 'created',
|
|
state_started_at TEXT NOT NULL,
|
|
cover_url TEXT,
|
|
video_url TEXT,
|
|
thumbnail_url TEXT,
|
|
metadata_json TEXT,
|
|
review_json TEXT,
|
|
youtube_video_id TEXT,
|
|
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
|
|
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
cancelled_at TEXT,
|
|
failed_reason TEXT
|
|
)
|
|
""")
|
|
# 옛 데이터 1행
|
|
conn.execute("""
|
|
INSERT INTO video_pipelines (track_id, state_started_at, created_at, updated_at)
|
|
VALUES (1, '2026-05-01T00:00:00', '2026-05-01T00:00:00', '2026-05-01T00:00:00')
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# 2) init_db 실행 (마이그레이션 트리거)
|
|
db.init_db()
|
|
|
|
# 3) NOT NULL 제약 해제 확인
|
|
conn = sqlite3.connect(str(db_path))
|
|
cur = conn.cursor()
|
|
cur.execute("PRAGMA table_info(video_pipelines)")
|
|
cols = {r[1]: r[3] for r in cur.fetchall()} # name → notnull
|
|
assert cols["track_id"] == 0 # not null released
|
|
# 새 컬럼들도 존재
|
|
assert "compile_job_id" in cols
|
|
assert "visual_style" in cols
|
|
# 기존 데이터 보존
|
|
cur.execute("SELECT track_id FROM video_pipelines WHERE id=1")
|
|
assert cur.fetchone()[0] == 1
|
|
conn.close()
|
|
|
|
# 4) compile_job_id-only INSERT 가능 확인
|
|
pid = db.create_pipeline(compile_job_id=99)
|
|
p = db.get_pipeline(pid)
|
|
assert p["track_id"] is None
|
|
assert p["compile_job_id"] == 99
|