feat(music-lab): pipeline 5개 DB 테이블 + 헬퍼

YouTube 음악 파이프라인 Task 1 — 신규 5개 테이블과 헬퍼 함수 추가.

테이블:
- video_pipelines: 파이프라인 단위 상태 머신 + 메타/리뷰 JSON
- pipeline_jobs: 단계별 비동기 작업 상태/시간
- pipeline_feedback: 텔레그램 피드백 이력
- youtube_oauth_tokens: 채널 OAuth refresh/access 토큰
- youtube_setup: 단일 행 설정 (메타 템플릿/커버 프롬프트/리뷰 가중치/임계값/비주얼/공개정책)

헬퍼:
- create_pipeline / get_pipeline / update_pipeline_state / list_pipelines
- increment_feedback_count / record_feedback / get_feedback_history
- create_pipeline_job / update_pipeline_job / list_pipeline_jobs
- get_youtube_setup / update_youtube_setup
- upsert_oauth_token / get_oauth_token / delete_oauth_token

테스트:
- tests/test_pipeline_db.py: 7개 테스트 (생성/상태/피드백/잡/셋업/OAuth)
- tests/conftest.py: freezegun 기반 freezer fixture 추가
- requirements.txt: freezegun>=1.4 추가

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-07 16:30:16 +09:00
parent e03d074222
commit d66a321982
4 changed files with 477 additions and 0 deletions

View File

@@ -1,6 +1,7 @@
import os
import sqlite3
import json
from datetime import datetime
from typing import Any, Dict, List, Optional
DB_PATH = "/app/data/music.db"
@@ -184,6 +185,112 @@ def init_db() -> None:
)
""")
# ── YouTube pipeline 테이블 (5개) ─────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS video_pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER NOT NULL,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
video_url TEXT,
thumbnail_url TEXT,
metadata_json TEXT,
review_json TEXT,
youtube_video_id TEXT,
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
cancelled_at TEXT,
failed_reason TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS pipeline_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL,
step TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
error TEXT,
started_at TEXT,
finished_at TEXT,
duration_ms INTEGER,
FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS pipeline_feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL,
step TEXT NOT NULL,
feedback_text TEXT NOT NULL,
received_at TEXT NOT NULL,
FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS youtube_oauth_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT NOT NULL,
channel_title TEXT,
avatar_url TEXT,
refresh_token TEXT NOT NULL,
access_token TEXT,
expires_at TEXT,
created_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS youtube_setup (
id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1),
metadata_template_json TEXT NOT NULL,
cover_prompts_json TEXT NOT NULL,
review_weights_json TEXT NOT NULL,
review_threshold INTEGER NOT NULL DEFAULT 60,
visual_defaults_json TEXT NOT NULL,
publish_policy_json TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 기본 setup 1행 보장
cnt_row = conn.execute("SELECT COUNT(*) AS c FROM youtube_setup").fetchone()
if cnt_row["c"] == 0:
_seed_default_youtube_setup(conn)
def _seed_default_youtube_setup(conn: sqlite3.Connection) -> None:
"""youtube_setup 테이블에 기본 1행을 삽입한다.
init_db()와 get_youtube_setup() (행이 사라진 경우 self-heal)가 공유한다.
"""
defaults = (
json.dumps({
"title": "[{genre}] {title} | {bpm}BPM",
"description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n",
"tags": ["lofi", "chill", "instrumental"],
"category_id": 10,
}),
json.dumps({
"lo-fi": "moody anime cityscape at dusk, lofi aesthetic",
"phonk": "dark drift car aesthetic, neon, phonk vibe",
"ambient": "ethereal mountain landscape, ambient mood",
"default": "abstract music album cover art",
}),
json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}),
60,
json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}),
json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}),
datetime.utcnow().isoformat(timespec="seconds"),
)
conn.execute("""
INSERT INTO youtube_setup
(metadata_template_json, cover_prompts_json, review_weights_json,
review_threshold, visual_defaults_json, publish_policy_json, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", defaults)
# ── music_tasks CRUD ──────────────────────────────────────────────────────────
@@ -791,3 +898,247 @@ def update_compile_job(job_id: int, **kwargs) -> None:
def delete_compile_job(job_id: int) -> None:
with _conn() as conn:
conn.execute("DELETE FROM compile_jobs WHERE id = ?", (job_id,))
# ── YouTube pipeline helpers ──────────────────────────────────────────────────
# update_pipeline_state: state/state_started_at/updated_at은 자동, 그 외 허용 컬럼 화이트리스트
_PIPELINE_STATE_EXTRA_COLS = frozenset({
"cover_url",
"video_url",
"thumbnail_url",
"metadata_json",
"review_json",
"youtube_video_id",
"cancelled_at",
"failed_reason",
"last_telegram_msg_ids",
"feedback_count_per_step",
})
# update_pipeline_job 허용 컬럼 화이트리스트
_PIPELINE_JOB_COLS = frozenset({
"status",
"error",
"duration_ms",
"started_at",
"finished_at",
})
def _now() -> str:
return datetime.utcnow().isoformat(timespec="seconds")
def _parse_pipeline_row(row: sqlite3.Row) -> Dict[str, Any]:
"""video_pipelines의 sqlite3.Row를 dict로 파싱.
JSON 컬럼을 디코드하고, metadata/review를 호환을 위해 추가로 노출한다.
get_pipeline / list_pipelines가 공유.
"""
d = dict(row)
d["feedback_count_per_step"] = json.loads(d["feedback_count_per_step"] or "{}")
d["last_telegram_msg_ids"] = json.loads(d["last_telegram_msg_ids"] or "{}")
if d.get("metadata_json"):
d["metadata"] = json.loads(d["metadata_json"])
if d.get("review_json"):
d["review"] = json.loads(d["review_json"])
return d
def create_pipeline(track_id: int) -> int:
with _conn() as conn:
now = _now()
cur = conn.execute("""
INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at)
VALUES (?, 'created', ?, ?, ?)
""", (track_id, now, now, now))
return cur.lastrowid
def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM video_pipelines WHERE id = ?", (pid,)).fetchone()
if not row:
return None
return _parse_pipeline_row(row)
def update_pipeline_state(pid: int, state: str, **fields) -> None:
"""파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다.
허용 컬럼 화이트리스트(_PIPELINE_STATE_EXTRA_COLS)에 없는 키는 ValueError.
"""
unknown = set(fields) - _PIPELINE_STATE_EXTRA_COLS
if unknown:
raise ValueError(f"unknown columns for update_pipeline_state: {sorted(unknown)}")
now = _now()
cols = ["state = ?", "state_started_at = ?", "updated_at = ?"]
vals: List[Any] = [state, now, now]
for k, v in fields.items():
cols.append(f"{k} = ?")
vals.append(v)
vals.append(pid)
with _conn() as conn:
conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals)
def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]:
with _conn() as conn:
if active_only:
rows = conn.execute("""
SELECT * FROM video_pipelines
WHERE state NOT IN ('published','cancelled','failed','awaiting_manual')
ORDER BY created_at DESC
""").fetchall()
else:
rows = conn.execute("SELECT * FROM video_pipelines ORDER BY created_at DESC").fetchall()
return [_parse_pipeline_row(r) for r in rows]
def increment_feedback_count(pid: int, step: str) -> int:
"""원자적으로 feedback_count_per_step.<step>를 +1 한 뒤 새 값을 반환.
json1 확장(SQLite 3.38+)을 사용해 read-modify-write 경합을 제거한다.
"""
now = _now()
with _conn() as conn:
conn.execute(
"""
UPDATE video_pipelines
SET feedback_count_per_step = json_set(
feedback_count_per_step,
'$.' || ?,
COALESCE(json_extract(feedback_count_per_step, '$.' || ?), 0) + 1
),
updated_at = ?
WHERE id = ?
""",
(step, step, now, pid),
)
row = conn.execute(
"SELECT json_extract(feedback_count_per_step, '$.' || ?) AS c "
"FROM video_pipelines WHERE id = ?",
(step, pid),
).fetchone()
return int(row["c"]) if row and row["c"] is not None else 0
def record_feedback(pid: int, step: str, feedback_text: str) -> None:
with _conn() as conn:
conn.execute("""
INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at)
VALUES (?, ?, ?, ?)
""", (pid, step, feedback_text, _now()))
def get_feedback_history(pid: int) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("""
SELECT * FROM pipeline_feedback
WHERE pipeline_id = ? ORDER BY id DESC
""", (pid,)).fetchall()
return [dict(r) for r in rows]
def create_pipeline_job(pid: int, step: str) -> int:
with _conn() as conn:
cur = conn.execute("""
INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at)
VALUES (?, ?, 'queued', ?)
""", (pid, step, _now()))
return cur.lastrowid
def update_pipeline_job(job_id: int, **fields) -> None:
"""pipeline_jobs 행을 갱신. 허용 컬럼 화이트리스트 외 키는 ValueError.
status가 succeeded/failed로 바뀌면 finished_at을 자동 설정 (호출자가 미지정 시).
"""
unknown = set(fields) - _PIPELINE_JOB_COLS
if unknown:
raise ValueError(f"unknown columns for update_pipeline_job: {sorted(unknown)}")
if not fields:
return
if (
fields.get("status") in ("succeeded", "failed")
and "finished_at" not in fields
):
fields["finished_at"] = _now()
cols = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values()) + [job_id]
with _conn() as conn:
conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals)
def list_pipeline_jobs(pid: int) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("""
SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC
""", (pid,)).fetchall()
return [dict(r) for r in rows]
def get_youtube_setup() -> Dict[str, Any]:
"""youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회."""
with _conn() as conn:
row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone()
if row is None:
_seed_default_youtube_setup(conn)
row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone()
d = dict(row)
for k in ("metadata_template_json", "cover_prompts_json",
"review_weights_json", "visual_defaults_json", "publish_policy_json"):
d[k.replace("_json", "")] = json.loads(d[k])
return d
def update_youtube_setup(**kwargs) -> None:
field_map = {
"metadata_template": "metadata_template_json",
"cover_prompts": "cover_prompts_json",
"review_weights": "review_weights_json",
"visual_defaults": "visual_defaults_json",
"publish_policy": "publish_policy_json",
}
cols = []
vals: List[Any] = []
for k, v in kwargs.items():
if k in field_map:
cols.append(f"{field_map[k]} = ?")
vals.append(json.dumps(v))
elif k == "review_threshold":
cols.append("review_threshold = ?")
vals.append(int(v))
if not cols:
return
cols.append("updated_at = ?")
vals.append(_now())
with _conn() as conn:
conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals)
def upsert_oauth_token(channel_id: str, channel_title: Optional[str],
avatar_url: Optional[str], refresh_token: str,
access_token: Optional[str], expires_at: Optional[str]) -> None:
with _conn() as conn:
conn.execute("DELETE FROM youtube_oauth_tokens")
conn.execute("""
INSERT INTO youtube_oauth_tokens
(channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now()))
def get_oauth_token() -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone()
return dict(row) if row else None
def delete_oauth_token() -> None:
with _conn() as conn:
conn.execute("DELETE FROM youtube_oauth_tokens")

View File

@@ -7,3 +7,4 @@ anthropic>=0.40.0
Pillow>=11.0.0
pytest>=8.0.0
httpx>=0.27.0
freezegun>=1.4

View File

@@ -1,7 +1,36 @@
import pytest
@pytest.fixture
def tmp_db(tmp_path, monkeypatch):
db_path = str(tmp_path / "test_music.db")
monkeypatch.setattr("app.db.DB_PATH", db_path)
return db_path
@pytest.fixture
def freezer():
"""Minimal freezegun-based fixture providing `move_to(time)` to mimic
pytest-freezer's `freezer` fixture using only the `freezegun` package."""
from freezegun import freeze_time
class _Freezer:
def __init__(self):
self._ctx = None
def move_to(self, target):
if self._ctx is not None:
self._ctx.stop()
self._ctx = freeze_time(target)
self._ctx.start()
def stop(self):
if self._ctx is not None:
self._ctx.stop()
self._ctx = None
f = _Freezer()
try:
yield f
finally:
f.stop()

View File

@@ -0,0 +1,96 @@
import os
import tempfile
import pytest
from app import db
@pytest.fixture
def fresh_db(monkeypatch, tmp_path):
db_path = tmp_path / "music.db"
monkeypatch.setattr(db, "DB_PATH", str(db_path))
db.init_db()
return db_path
def test_create_pipeline_inserts_row(fresh_db):
pid = db.create_pipeline(track_id=1)
row = db.get_pipeline(pid)
assert row["id"] == pid
assert row["state"] == "created"
assert row["track_id"] == 1
assert row["feedback_count_per_step"] == {}
def test_update_pipeline_state_records_started_at(fresh_db, freezer):
pid = db.create_pipeline(track_id=1)
freezer.move_to("2026-05-07T08:00:00")
db.update_pipeline_state(pid, "cover_pending")
row = db.get_pipeline(pid)
assert row["state"] == "cover_pending"
assert row["state_started_at"] == "2026-05-07T08:00:00"
def test_increment_feedback_count(fresh_db):
pid = db.create_pipeline(track_id=1)
db.increment_feedback_count(pid, "cover")
db.increment_feedback_count(pid, "cover")
row = db.get_pipeline(pid)
assert row["feedback_count_per_step"] == {"cover": 2}
def test_record_feedback(fresh_db):
pid = db.create_pipeline(track_id=1)
db.record_feedback(pid, "cover", "더 어둡게")
rows = db.get_feedback_history(pid)
assert len(rows) == 1
assert rows[0]["feedback_text"] == "더 어둡게"
def test_create_pipeline_job_lifecycle(fresh_db):
pid = db.create_pipeline(track_id=1)
job_id = db.create_pipeline_job(pid, "cover")
db.update_pipeline_job(job_id, status="running")
db.update_pipeline_job(job_id, status="succeeded", duration_ms=1234)
jobs = db.list_pipeline_jobs(pid)
assert jobs[0]["status"] == "succeeded"
assert jobs[0]["duration_ms"] == 1234
def test_youtube_setup_default_row_created_on_init(fresh_db):
setup = db.get_youtube_setup()
assert setup["review_threshold"] == 60
assert "metadata_template_json" in setup
def test_youtube_oauth_token_upsert(fresh_db):
db.upsert_oauth_token(
channel_id="UC123",
channel_title="My Channel",
avatar_url="https://...",
refresh_token="r1",
access_token="a1",
expires_at="2026-05-07T09:00:00",
)
tok = db.get_oauth_token()
assert tok["channel_id"] == "UC123"
assert tok["refresh_token"] == "r1"
db.upsert_oauth_token(
channel_id="UC123", channel_title="My Channel",
avatar_url=None, refresh_token="r2",
access_token="a2", expires_at="2026-05-07T10:00:00",
)
tok = db.get_oauth_token()
assert tok["refresh_token"] == "r2" # upsert
def test_update_pipeline_state_rejects_unknown_column(fresh_db):
pid = db.create_pipeline(track_id=1)
with pytest.raises(ValueError):
db.update_pipeline_state(pid, "cover_pending", evil_col="x; DROP TABLE")
def test_update_pipeline_job_rejects_unknown_column(fresh_db):
pid = db.create_pipeline(track_id=1)
job_id = db.create_pipeline_job(pid, "cover")
with pytest.raises(ValueError):
db.update_pipeline_job(job_id, evil_col="x")