diff --git a/music-lab/app/db.py b/music-lab/app/db.py index 3a1be4f..b60199e 100644 --- a/music-lab/app/db.py +++ b/music-lab/app/db.py @@ -1,6 +1,7 @@ import os import sqlite3 import json +from datetime import datetime from typing import Any, Dict, List, Optional DB_PATH = "/app/data/music.db" @@ -184,6 +185,112 @@ def init_db() -> None: ) """) + # ── YouTube pipeline 테이블 (5개) ───────────────────────────────── + conn.execute(""" + CREATE TABLE IF NOT EXISTS video_pipelines ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + track_id INTEGER NOT NULL, + state TEXT NOT NULL DEFAULT 'created', + state_started_at TEXT NOT NULL, + cover_url TEXT, + video_url TEXT, + thumbnail_url TEXT, + metadata_json TEXT, + review_json TEXT, + youtube_video_id TEXT, + feedback_count_per_step TEXT NOT NULL DEFAULT '{}', + last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + cancelled_at TEXT, + failed_reason TEXT + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS pipeline_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL, + step TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'queued', + error TEXT, + started_at TEXT, + finished_at TEXT, + duration_ms INTEGER, + FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id) + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS pipeline_feedback ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline_id INTEGER NOT NULL, + step TEXT NOT NULL, + feedback_text TEXT NOT NULL, + received_at TEXT NOT NULL, + FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id) + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS youtube_oauth_tokens ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + channel_id TEXT NOT NULL, + channel_title TEXT, + avatar_url TEXT, + refresh_token TEXT NOT NULL, + access_token TEXT, + expires_at TEXT, + created_at TEXT NOT NULL + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS youtube_setup ( + id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1), + metadata_template_json TEXT NOT NULL, + cover_prompts_json TEXT NOT NULL, + review_weights_json TEXT NOT NULL, + review_threshold INTEGER NOT NULL DEFAULT 60, + visual_defaults_json TEXT NOT NULL, + publish_policy_json TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + + # 기본 setup 1행 보장 + cnt_row = conn.execute("SELECT COUNT(*) AS c FROM youtube_setup").fetchone() + if cnt_row["c"] == 0: + _seed_default_youtube_setup(conn) + + +def _seed_default_youtube_setup(conn: sqlite3.Connection) -> None: + """youtube_setup 테이블에 기본 1행을 삽입한다. + + init_db()와 get_youtube_setup() (행이 사라진 경우 self-heal)가 공유한다. + """ + defaults = ( + json.dumps({ + "title": "[{genre}] {title} | {bpm}BPM", + "description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n", + "tags": ["lofi", "chill", "instrumental"], + "category_id": 10, + }), + json.dumps({ + "lo-fi": "moody anime cityscape at dusk, lofi aesthetic", + "phonk": "dark drift car aesthetic, neon, phonk vibe", + "ambient": "ethereal mountain landscape, ambient mood", + "default": "abstract music album cover art", + }), + json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}), + 60, + json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}), + json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}), + datetime.utcnow().isoformat(timespec="seconds"), + ) + conn.execute(""" + INSERT INTO youtube_setup + (metadata_template_json, cover_prompts_json, review_weights_json, + review_threshold, visual_defaults_json, publish_policy_json, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, defaults) + # ── music_tasks CRUD ────────────────────────────────────────────────────────── @@ -791,3 +898,247 @@ def update_compile_job(job_id: int, **kwargs) -> None: def delete_compile_job(job_id: int) -> None: with _conn() as conn: conn.execute("DELETE FROM compile_jobs WHERE id = ?", (job_id,)) + + +# ── YouTube pipeline helpers ────────────────────────────────────────────────── + +# update_pipeline_state: state/state_started_at/updated_at은 자동, 그 외 허용 컬럼 화이트리스트 +_PIPELINE_STATE_EXTRA_COLS = frozenset({ + "cover_url", + "video_url", + "thumbnail_url", + "metadata_json", + "review_json", + "youtube_video_id", + "cancelled_at", + "failed_reason", + "last_telegram_msg_ids", + "feedback_count_per_step", +}) + +# update_pipeline_job 허용 컬럼 화이트리스트 +_PIPELINE_JOB_COLS = frozenset({ + "status", + "error", + "duration_ms", + "started_at", + "finished_at", +}) + + +def _now() -> str: + return datetime.utcnow().isoformat(timespec="seconds") + + +def _parse_pipeline_row(row: sqlite3.Row) -> Dict[str, Any]: + """video_pipelines의 sqlite3.Row를 dict로 파싱. + + JSON 컬럼을 디코드하고, metadata/review를 호환을 위해 추가로 노출한다. + get_pipeline / list_pipelines가 공유. + """ + d = dict(row) + d["feedback_count_per_step"] = json.loads(d["feedback_count_per_step"] or "{}") + d["last_telegram_msg_ids"] = json.loads(d["last_telegram_msg_ids"] or "{}") + if d.get("metadata_json"): + d["metadata"] = json.loads(d["metadata_json"]) + if d.get("review_json"): + d["review"] = json.loads(d["review_json"]) + return d + + +def create_pipeline(track_id: int) -> int: + with _conn() as conn: + now = _now() + cur = conn.execute(""" + INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at) + VALUES (?, 'created', ?, ?, ?) + """, (track_id, now, now, now)) + return cur.lastrowid + + +def get_pipeline(pid: int) -> Optional[Dict[str, Any]]: + with _conn() as conn: + row = conn.execute("SELECT * FROM video_pipelines WHERE id = ?", (pid,)).fetchone() + if not row: + return None + return _parse_pipeline_row(row) + + +def update_pipeline_state(pid: int, state: str, **fields) -> None: + """파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다. + + 허용 컬럼 화이트리스트(_PIPELINE_STATE_EXTRA_COLS)에 없는 키는 ValueError. + """ + unknown = set(fields) - _PIPELINE_STATE_EXTRA_COLS + if unknown: + raise ValueError(f"unknown columns for update_pipeline_state: {sorted(unknown)}") + + now = _now() + cols = ["state = ?", "state_started_at = ?", "updated_at = ?"] + vals: List[Any] = [state, now, now] + for k, v in fields.items(): + cols.append(f"{k} = ?") + vals.append(v) + vals.append(pid) + with _conn() as conn: + conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals) + + +def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]: + with _conn() as conn: + if active_only: + rows = conn.execute(""" + SELECT * FROM video_pipelines + WHERE state NOT IN ('published','cancelled','failed','awaiting_manual') + ORDER BY created_at DESC + """).fetchall() + else: + rows = conn.execute("SELECT * FROM video_pipelines ORDER BY created_at DESC").fetchall() + return [_parse_pipeline_row(r) for r in rows] + + +def increment_feedback_count(pid: int, step: str) -> int: + """원자적으로 feedback_count_per_step.를 +1 한 뒤 새 값을 반환. + + json1 확장(SQLite 3.38+)을 사용해 read-modify-write 경합을 제거한다. + """ + now = _now() + with _conn() as conn: + conn.execute( + """ + UPDATE video_pipelines + SET feedback_count_per_step = json_set( + feedback_count_per_step, + '$.' || ?, + COALESCE(json_extract(feedback_count_per_step, '$.' || ?), 0) + 1 + ), + updated_at = ? + WHERE id = ? + """, + (step, step, now, pid), + ) + row = conn.execute( + "SELECT json_extract(feedback_count_per_step, '$.' || ?) AS c " + "FROM video_pipelines WHERE id = ?", + (step, pid), + ).fetchone() + return int(row["c"]) if row and row["c"] is not None else 0 + + +def record_feedback(pid: int, step: str, feedback_text: str) -> None: + with _conn() as conn: + conn.execute(""" + INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at) + VALUES (?, ?, ?, ?) + """, (pid, step, feedback_text, _now())) + + +def get_feedback_history(pid: int) -> List[Dict[str, Any]]: + with _conn() as conn: + rows = conn.execute(""" + SELECT * FROM pipeline_feedback + WHERE pipeline_id = ? ORDER BY id DESC + """, (pid,)).fetchall() + return [dict(r) for r in rows] + + +def create_pipeline_job(pid: int, step: str) -> int: + with _conn() as conn: + cur = conn.execute(""" + INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at) + VALUES (?, ?, 'queued', ?) + """, (pid, step, _now())) + return cur.lastrowid + + +def update_pipeline_job(job_id: int, **fields) -> None: + """pipeline_jobs 행을 갱신. 허용 컬럼 화이트리스트 외 키는 ValueError. + + status가 succeeded/failed로 바뀌면 finished_at을 자동 설정 (호출자가 미지정 시). + """ + unknown = set(fields) - _PIPELINE_JOB_COLS + if unknown: + raise ValueError(f"unknown columns for update_pipeline_job: {sorted(unknown)}") + if not fields: + return + + if ( + fields.get("status") in ("succeeded", "failed") + and "finished_at" not in fields + ): + fields["finished_at"] = _now() + + cols = ", ".join(f"{k} = ?" for k in fields) + vals = list(fields.values()) + [job_id] + with _conn() as conn: + conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals) + + +def list_pipeline_jobs(pid: int) -> List[Dict[str, Any]]: + with _conn() as conn: + rows = conn.execute(""" + SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC + """, (pid,)).fetchall() + return [dict(r) for r in rows] + + +def get_youtube_setup() -> Dict[str, Any]: + """youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회.""" + with _conn() as conn: + row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone() + if row is None: + _seed_default_youtube_setup(conn) + row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone() + d = dict(row) + for k in ("metadata_template_json", "cover_prompts_json", + "review_weights_json", "visual_defaults_json", "publish_policy_json"): + d[k.replace("_json", "")] = json.loads(d[k]) + return d + + +def update_youtube_setup(**kwargs) -> None: + field_map = { + "metadata_template": "metadata_template_json", + "cover_prompts": "cover_prompts_json", + "review_weights": "review_weights_json", + "visual_defaults": "visual_defaults_json", + "publish_policy": "publish_policy_json", + } + cols = [] + vals: List[Any] = [] + for k, v in kwargs.items(): + if k in field_map: + cols.append(f"{field_map[k]} = ?") + vals.append(json.dumps(v)) + elif k == "review_threshold": + cols.append("review_threshold = ?") + vals.append(int(v)) + if not cols: + return + cols.append("updated_at = ?") + vals.append(_now()) + with _conn() as conn: + conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals) + + +def upsert_oauth_token(channel_id: str, channel_title: Optional[str], + avatar_url: Optional[str], refresh_token: str, + access_token: Optional[str], expires_at: Optional[str]) -> None: + with _conn() as conn: + conn.execute("DELETE FROM youtube_oauth_tokens") + conn.execute(""" + INSERT INTO youtube_oauth_tokens + (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now())) + + +def get_oauth_token() -> Optional[Dict[str, Any]]: + with _conn() as conn: + row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone() + return dict(row) if row else None + + +def delete_oauth_token() -> None: + with _conn() as conn: + conn.execute("DELETE FROM youtube_oauth_tokens") diff --git a/music-lab/requirements.txt b/music-lab/requirements.txt index 92fb320..4ce3db6 100644 --- a/music-lab/requirements.txt +++ b/music-lab/requirements.txt @@ -7,3 +7,4 @@ anthropic>=0.40.0 Pillow>=11.0.0 pytest>=8.0.0 httpx>=0.27.0 +freezegun>=1.4 diff --git a/music-lab/tests/conftest.py b/music-lab/tests/conftest.py index a19432e..cd3167d 100644 --- a/music-lab/tests/conftest.py +++ b/music-lab/tests/conftest.py @@ -1,7 +1,36 @@ import pytest + @pytest.fixture def tmp_db(tmp_path, monkeypatch): db_path = str(tmp_path / "test_music.db") monkeypatch.setattr("app.db.DB_PATH", db_path) return db_path + + +@pytest.fixture +def freezer(): + """Minimal freezegun-based fixture providing `move_to(time)` to mimic + pytest-freezer's `freezer` fixture using only the `freezegun` package.""" + from freezegun import freeze_time + + class _Freezer: + def __init__(self): + self._ctx = None + + def move_to(self, target): + if self._ctx is not None: + self._ctx.stop() + self._ctx = freeze_time(target) + self._ctx.start() + + def stop(self): + if self._ctx is not None: + self._ctx.stop() + self._ctx = None + + f = _Freezer() + try: + yield f + finally: + f.stop() diff --git a/music-lab/tests/test_pipeline_db.py b/music-lab/tests/test_pipeline_db.py new file mode 100644 index 0000000..96cb5a6 --- /dev/null +++ b/music-lab/tests/test_pipeline_db.py @@ -0,0 +1,96 @@ +import os +import tempfile +import pytest +from app import db + + +@pytest.fixture +def fresh_db(monkeypatch, tmp_path): + db_path = tmp_path / "music.db" + monkeypatch.setattr(db, "DB_PATH", str(db_path)) + db.init_db() + return db_path + + +def test_create_pipeline_inserts_row(fresh_db): + pid = db.create_pipeline(track_id=1) + row = db.get_pipeline(pid) + assert row["id"] == pid + assert row["state"] == "created" + assert row["track_id"] == 1 + assert row["feedback_count_per_step"] == {} + + +def test_update_pipeline_state_records_started_at(fresh_db, freezer): + pid = db.create_pipeline(track_id=1) + freezer.move_to("2026-05-07T08:00:00") + db.update_pipeline_state(pid, "cover_pending") + row = db.get_pipeline(pid) + assert row["state"] == "cover_pending" + assert row["state_started_at"] == "2026-05-07T08:00:00" + + +def test_increment_feedback_count(fresh_db): + pid = db.create_pipeline(track_id=1) + db.increment_feedback_count(pid, "cover") + db.increment_feedback_count(pid, "cover") + row = db.get_pipeline(pid) + assert row["feedback_count_per_step"] == {"cover": 2} + + +def test_record_feedback(fresh_db): + pid = db.create_pipeline(track_id=1) + db.record_feedback(pid, "cover", "더 어둡게") + rows = db.get_feedback_history(pid) + assert len(rows) == 1 + assert rows[0]["feedback_text"] == "더 어둡게" + + +def test_create_pipeline_job_lifecycle(fresh_db): + pid = db.create_pipeline(track_id=1) + job_id = db.create_pipeline_job(pid, "cover") + db.update_pipeline_job(job_id, status="running") + db.update_pipeline_job(job_id, status="succeeded", duration_ms=1234) + jobs = db.list_pipeline_jobs(pid) + assert jobs[0]["status"] == "succeeded" + assert jobs[0]["duration_ms"] == 1234 + + +def test_youtube_setup_default_row_created_on_init(fresh_db): + setup = db.get_youtube_setup() + assert setup["review_threshold"] == 60 + assert "metadata_template_json" in setup + + +def test_youtube_oauth_token_upsert(fresh_db): + db.upsert_oauth_token( + channel_id="UC123", + channel_title="My Channel", + avatar_url="https://...", + refresh_token="r1", + access_token="a1", + expires_at="2026-05-07T09:00:00", + ) + tok = db.get_oauth_token() + assert tok["channel_id"] == "UC123" + assert tok["refresh_token"] == "r1" + db.upsert_oauth_token( + channel_id="UC123", channel_title="My Channel", + avatar_url=None, refresh_token="r2", + access_token="a2", expires_at="2026-05-07T10:00:00", + ) + tok = db.get_oauth_token() + assert tok["refresh_token"] == "r2" # upsert + + +def test_update_pipeline_state_rejects_unknown_column(fresh_db): + pid = db.create_pipeline(track_id=1) + with pytest.raises(ValueError): + db.update_pipeline_state(pid, "cover_pending", evil_col="x; DROP TABLE") + + +def test_update_pipeline_job_rejects_unknown_column(fresh_db): + pid = db.create_pipeline(track_id=1) + job_id = db.create_pipeline_job(pid, "cover") + with pytest.raises(ValueError): + db.update_pipeline_job(job_id, evil_col="x")