Files
web-page-backend/music-lab/app/db.py
gahusb 70a256bbe4 feat(music-lab): video_pipelines 4 컬럼 추가 + compile_jobs JOIN
- _add_column_if_missing 헬퍼 추가 (idempotent ALTER TABLE)
- video_pipelines에 compile_job_id, visual_style, background_mode, background_keyword 컬럼 추가
- track_id를 nullable로 변경 (compile_job_id 입력 모드 지원)
- create_pipeline에 compile_job_id XOR track_id 검증 추가
- get_pipeline / list_pipelines에 compile_jobs LEFT JOIN — compile_title 노출

Task 1 of 17: Essential Mix pipeline DB migration

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 13:04:23 +09:00

1251 lines
50 KiB
Python

import os
import sqlite3
import json
from datetime import datetime
from typing import Any, Dict, List, Optional
DB_PATH = "/app/data/music.db"
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def _add_column_if_missing(cursor, table: str, column: str, ddl: str) -> None:
"""SQLite-safe ALTER TABLE ADD COLUMN — idempotent.
SQLite의 ALTER TABLE은 컬럼 존재 시 에러. PRAGMA로 미리 확인.
"""
cursor.execute(f"PRAGMA table_info({table})")
existing = {row[1] for row in cursor.fetchall()}
if column not in existing:
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {ddl}")
def _is_column_not_null(cursor, table: str, column: str) -> bool:
"""PRAGMA table_info row format: (cid, name, type, notnull, dflt_value, pk)."""
cursor.execute(f"PRAGMA table_info({table})")
for row in cursor.fetchall():
if row[1] == column:
return row[3] == 1
return False
def _relax_video_pipelines_track_id_nullable(cursor) -> None:
"""track_id NOT NULL → NULL (compile_job_id 만 있는 pipeline 지원).
SQLite는 ALTER COLUMN을 지원하지 않아 표준 패턴 — 새 테이블 생성 → 데이터 복사 → 교체.
Idempotent: 이미 NULL이면 no-op.
"""
if not _is_column_not_null(cursor, "video_pipelines", "track_id"):
return # already nullable
# 새 컬럼 4개도 함께 포함된 최종 스키마로 새 테이블 생성
cursor.execute("""
CREATE TABLE video_pipelines_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
video_url TEXT,
thumbnail_url TEXT,
metadata_json TEXT,
review_json TEXT,
youtube_video_id TEXT,
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
cancelled_at TEXT,
failed_reason TEXT,
compile_job_id INTEGER,
visual_style TEXT NOT NULL DEFAULT 'essential',
background_mode TEXT NOT NULL DEFAULT 'static',
background_keyword TEXT
)
""")
# 기존 컬럼 모두 명시적으로 SELECT (새 컬럼은 default로 채워짐)
cursor.execute("""
INSERT INTO video_pipelines_new
(id, track_id, state, state_started_at, cover_url, video_url,
thumbnail_url, metadata_json, review_json, youtube_video_id,
feedback_count_per_step, last_telegram_msg_ids,
created_at, updated_at, cancelled_at, failed_reason,
compile_job_id, visual_style, background_mode, background_keyword)
SELECT
id, track_id, state, state_started_at, cover_url, video_url,
thumbnail_url, metadata_json, review_json, youtube_video_id,
feedback_count_per_step, last_telegram_msg_ids,
created_at, updated_at, cancelled_at, failed_reason,
COALESCE(compile_job_id, NULL),
COALESCE(visual_style, 'essential'),
COALESCE(background_mode, 'static'),
COALESCE(background_keyword, NULL)
FROM video_pipelines
""")
cursor.execute("DROP TABLE video_pipelines")
cursor.execute("ALTER TABLE video_pipelines_new RENAME TO video_pipelines")
def init_db() -> None:
with _conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS music_tasks (
id TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'queued',
progress INTEGER NOT NULL DEFAULT 0,
message TEXT NOT NULL DEFAULT '',
audio_url TEXT,
error TEXT,
params TEXT NOT NULL DEFAULT '{}',
provider TEXT NOT NULL DEFAULT 'local',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_created ON music_tasks(created_at DESC)")
conn.execute("""
CREATE TABLE IF NOT EXISTS music_library (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL DEFAULT '',
genre TEXT NOT NULL DEFAULT '',
moods TEXT NOT NULL DEFAULT '[]',
instruments TEXT NOT NULL DEFAULT '[]',
duration_sec INTEGER,
bpm INTEGER,
key TEXT NOT NULL DEFAULT '',
scale TEXT NOT NULL DEFAULT '',
prompt TEXT NOT NULL DEFAULT '',
audio_url TEXT NOT NULL DEFAULT '',
file_path TEXT NOT NULL DEFAULT '',
task_id TEXT,
tags TEXT NOT NULL DEFAULT '[]',
provider TEXT NOT NULL DEFAULT 'local',
lyrics TEXT NOT NULL DEFAULT '',
image_url TEXT NOT NULL DEFAULT '',
suno_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_library_created ON music_library(created_at DESC)")
conn.execute("""
CREATE TABLE IF NOT EXISTS saved_lyrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL DEFAULT '',
text TEXT NOT NULL DEFAULT '',
prompt TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_lyrics_created ON saved_lyrics(created_at DESC)")
# 기존 테이블 마이그레이션 (컬럼 없으면 추가)
for col, default in [
("provider", "'local'"), ("lyrics", "''"),
("image_url", "''"), ("suno_id", "''"),
("file_hash", "''"),
]:
try:
conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}")
except sqlite3.OperationalError:
pass # 이미 존재
try:
conn.execute("ALTER TABLE music_tasks ADD COLUMN provider TEXT NOT NULL DEFAULT 'local'")
except sqlite3.OperationalError:
pass
# Phase 1~3 신규 컬럼 마이그레이션
for col, default in [
("cover_images", "'[]'"),
("wav_url", "''"),
("video_url", "''"),
("stem_urls", "'{}'"),
]:
try:
conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}")
except sqlite3.OperationalError:
pass
# ── video_projects 테이블 ─────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS video_projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER,
format TEXT NOT NULL DEFAULT 'visualizer',
status TEXT NOT NULL DEFAULT 'pending',
output_path TEXT NOT NULL DEFAULT '',
output_url TEXT NOT NULL DEFAULT '',
thumbnail_path TEXT NOT NULL DEFAULT '',
target_countries TEXT NOT NULL DEFAULT '[]',
yt_title TEXT NOT NULL DEFAULT '',
yt_description TEXT NOT NULL DEFAULT '',
yt_tags TEXT NOT NULL DEFAULT '[]',
render_params TEXT NOT NULL DEFAULT '{}',
error TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
completed_at TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_track ON video_projects(track_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_status ON video_projects(status)")
# ── revenue_records 테이블 ────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS revenue_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
video_project_id INTEGER,
yt_video_id TEXT NOT NULL DEFAULT '',
record_month TEXT NOT NULL DEFAULT '',
views INTEGER NOT NULL DEFAULT 0,
watch_hours REAL NOT NULL DEFAULT 0.0,
revenue_usd REAL NOT NULL DEFAULT 0.0,
rpm_usd REAL NOT NULL DEFAULT 0.0,
country TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT 'manual',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(yt_video_id, record_month, country)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_rr_month ON revenue_records(record_month DESC)")
# ── market_trends 테이블 ──────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS market_trends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL DEFAULT '',
country TEXT NOT NULL DEFAULT '',
genre TEXT NOT NULL DEFAULT '',
keyword TEXT NOT NULL DEFAULT '',
score REAL NOT NULL DEFAULT 0.0,
rank INTEGER,
metadata TEXT NOT NULL DEFAULT '{}',
collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_mt_country_source "
"ON market_trends(country, source, collected_at DESC)"
)
# ── trend_reports 테이블 ──────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS trend_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
report_date TEXT UNIQUE NOT NULL DEFAULT '',
top_genres TEXT NOT NULL DEFAULT '[]',
top_keywords TEXT NOT NULL DEFAULT '[]',
recommended_styles TEXT NOT NULL DEFAULT '[]',
insights TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# ── compile_jobs 테이블 ───────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS compile_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL DEFAULT '',
track_ids TEXT NOT NULL DEFAULT '[]',
crossfade_sec REAL NOT NULL DEFAULT 3.0,
status TEXT NOT NULL DEFAULT 'pending',
output_path TEXT,
duration_sec REAL,
error TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# ── YouTube pipeline 테이블 (5개) ─────────────────────────────────
# track_id는 nullable: compile_job_id로 입력하는 essential mix 모드 지원
conn.execute("""
CREATE TABLE IF NOT EXISTS video_pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER,
state TEXT NOT NULL DEFAULT 'created',
state_started_at TEXT NOT NULL,
cover_url TEXT,
video_url TEXT,
thumbnail_url TEXT,
metadata_json TEXT,
review_json TEXT,
youtube_video_id TEXT,
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
cancelled_at TEXT,
failed_reason TEXT
)
""")
# Migration for essential mix pipeline (task 2026-05-09)
cur = conn.cursor()
_add_column_if_missing(cur, "video_pipelines", "compile_job_id", "INTEGER")
_add_column_if_missing(cur, "video_pipelines", "visual_style", "TEXT NOT NULL DEFAULT 'essential'")
_add_column_if_missing(cur, "video_pipelines", "background_mode", "TEXT NOT NULL DEFAULT 'static'")
_add_column_if_missing(cur, "video_pipelines", "background_keyword", "TEXT")
_relax_video_pipelines_track_id_nullable(cur)
conn.execute("""
CREATE TABLE IF NOT EXISTS pipeline_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL,
step TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
error TEXT,
started_at TEXT,
finished_at TEXT,
duration_ms INTEGER,
FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS pipeline_feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER NOT NULL,
step TEXT NOT NULL,
feedback_text TEXT NOT NULL,
received_at TEXT NOT NULL,
FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS youtube_oauth_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT NOT NULL,
channel_title TEXT,
avatar_url TEXT,
refresh_token TEXT NOT NULL,
access_token TEXT,
expires_at TEXT,
created_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS youtube_setup (
id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1),
metadata_template_json TEXT NOT NULL,
cover_prompts_json TEXT NOT NULL,
review_weights_json TEXT NOT NULL,
review_threshold INTEGER NOT NULL DEFAULT 60,
visual_defaults_json TEXT NOT NULL,
publish_policy_json TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 기본 setup 1행 보장
cnt_row = conn.execute("SELECT COUNT(*) AS c FROM youtube_setup").fetchone()
if cnt_row["c"] == 0:
_seed_default_youtube_setup(conn)
def _seed_default_youtube_setup(conn: sqlite3.Connection) -> None:
"""youtube_setup 테이블에 기본 1행을 삽입한다.
init_db()와 get_youtube_setup() (행이 사라진 경우 self-heal)가 공유한다.
"""
defaults = (
json.dumps({
"title": "[{genre}] {title} | {bpm}BPM",
"description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n",
"tags": ["lofi", "chill", "instrumental"],
"category_id": 10,
}),
json.dumps({
"lo-fi": "moody anime cityscape at dusk, lofi aesthetic",
"phonk": "dark drift car aesthetic, neon, phonk vibe",
"ambient": "ethereal mountain landscape, ambient mood",
"default": "abstract music album cover art",
}),
json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}),
60,
json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}),
json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}),
datetime.utcnow().isoformat(timespec="seconds"),
)
conn.execute("""
INSERT INTO youtube_setup
(metadata_template_json, cover_prompts_json, review_weights_json,
review_threshold, visual_defaults_json, publish_policy_json, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", defaults)
# ── music_tasks CRUD ──────────────────────────────────────────────────────────
def _task_row_to_dict(r) -> Dict[str, Any]:
return {
"task_id": r["id"],
"status": r["status"],
"progress": r["progress"],
"message": r["message"],
"audio_url": r["audio_url"],
"error": r["error"],
"params": json.loads(r["params"]),
"provider": r["provider"] if "provider" in r.keys() else "local",
"created_at": r["created_at"],
"updated_at": r["updated_at"],
}
def create_task(task_id: str, params: Dict[str, Any], provider: str = "local") -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO music_tasks (id, params, provider) VALUES (?, ?, ?)",
(task_id, json.dumps(params), provider),
)
row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone()
return _task_row_to_dict(row)
def update_task(
task_id: str,
status: str,
progress: int,
message: str,
audio_url: str = None,
error: str = None,
) -> None:
with _conn() as conn:
conn.execute(
"""
UPDATE music_tasks
SET status = ?, progress = ?, message = ?, audio_url = ?, error = ?,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id = ?
""",
(status, progress, message, audio_url, error, task_id),
)
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone()
return _task_row_to_dict(row) if row else None
# ── music_library CRUD ────────────────────────────────────────────────────────
def _track_row_to_dict(r) -> Dict[str, Any]:
keys = r.keys()
return {
"id": r["id"],
"title": r["title"],
"genre": r["genre"],
"moods": json.loads(r["moods"]) if r["moods"] else [],
"instruments": json.loads(r["instruments"]) if r["instruments"] else [],
"duration_sec": r["duration_sec"],
"bpm": r["bpm"],
"key": r["key"],
"scale": r["scale"],
"prompt": r["prompt"],
"audio_url": r["audio_url"],
"file_path": r["file_path"],
"task_id": r["task_id"],
"tags": json.loads(r["tags"]) if r["tags"] else [],
"provider": r["provider"] if "provider" in keys else "local",
"lyrics": r["lyrics"] if "lyrics" in keys else "",
"image_url": r["image_url"] if "image_url" in keys else "",
"suno_id": r["suno_id"] if "suno_id" in keys else "",
"file_hash": r["file_hash"] if "file_hash" in keys else "",
"cover_images": json.loads(r["cover_images"]) if "cover_images" in keys and r["cover_images"] else [],
"wav_url": r["wav_url"] if "wav_url" in keys else "",
"video_url": r["video_url"] if "video_url" in keys else "",
"stem_urls": json.loads(r["stem_urls"]) if "stem_urls" in keys and r["stem_urls"] else {},
"created_at": r["created_at"],
}
def get_all_tracks() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM music_library ORDER BY created_at DESC").fetchall()
return [_track_row_to_dict(r) for r in rows]
def add_track(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"""
INSERT INTO music_library
(title, genre, moods, instruments, duration_sec, bpm, key, scale,
prompt, audio_url, file_path, task_id, tags,
provider, lyrics, image_url, suno_id, file_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data.get("title", ""),
data.get("genre", ""),
json.dumps(data.get("moods", [])),
json.dumps(data.get("instruments", [])),
data.get("duration_sec"),
data.get("bpm"),
data.get("key", ""),
data.get("scale", ""),
data.get("prompt", ""),
data.get("audio_url", ""),
data.get("file_path", ""),
data.get("task_id"),
json.dumps(data.get("tags", [])),
data.get("provider", "local"),
data.get("lyrics", ""),
data.get("image_url", ""),
data.get("suno_id", ""),
data.get("file_hash", ""),
),
)
row = conn.execute("SELECT * FROM music_library WHERE rowid = last_insert_rowid()").fetchone()
return _track_row_to_dict(row)
def delete_track(track_id: int) -> bool:
with _conn() as conn:
# 파일 경로 먼저 조회 (삭제 후 파일도 지울 수 있도록)
row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM music_library WHERE id = ?", (track_id,))
return True
def get_track_by_task_id(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM music_library WHERE task_id = ?", (task_id,)).fetchone()
return _track_row_to_dict(row) if row else None
def update_track_duration(track_id: int, duration_sec: int) -> None:
with _conn() as conn:
conn.execute(
"UPDATE music_library SET duration_sec = ? WHERE id = ? AND duration_sec IS NULL",
(duration_sec, track_id),
)
def update_track_file_info(track_id: int, title: str, audio_url: str, file_path: str) -> None:
"""파일 rename 시 파일 관련 정보만 업데이트 (태그 등 메타데이터 보존)."""
with _conn() as conn:
conn.execute(
"UPDATE music_library SET title=?, audio_url=?, file_path=? WHERE id=?",
(title, audio_url, file_path, track_id),
)
def update_track_hash(track_id: int, file_hash: str) -> None:
"""트랙의 file_hash를 업데이트."""
with _conn() as conn:
conn.execute(
"UPDATE music_library SET file_hash=? WHERE id=?",
(file_hash, track_id),
)
def get_track_file_path(track_id: int) -> Optional[str]:
with _conn() as conn:
row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone()
return row["file_path"] if row else None
# ── saved_lyrics CRUD ────────────────────────────────────────────────────────
def _lyrics_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"],
"title": r["title"],
"text": r["text"],
"prompt": r["prompt"],
"created_at": r["created_at"],
"updated_at": r["updated_at"],
}
def get_all_lyrics() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM saved_lyrics ORDER BY created_at DESC").fetchall()
return [_lyrics_row_to_dict(r) for r in rows]
def add_lyrics(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO saved_lyrics (title, text, prompt) VALUES (?, ?, ?)",
(data.get("title", ""), data.get("text", ""), data.get("prompt", "")),
)
row = conn.execute("SELECT * FROM saved_lyrics WHERE rowid = last_insert_rowid()").fetchone()
return _lyrics_row_to_dict(row)
def update_lyrics(lyrics_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
with _conn() as conn:
fields = []
values = []
for k in ("title", "text", "prompt"):
if k in data:
fields.append(f"{k} = ?")
values.append(data[k])
if not fields:
return None
fields.append("updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')")
values.append(lyrics_id)
conn.execute(f"UPDATE saved_lyrics SET {', '.join(fields)} WHERE id = ?", values)
row = conn.execute("SELECT * FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone()
return _lyrics_row_to_dict(row) if row else None
def update_track_cover_images(track_id: int, images: list) -> None:
with _conn() as conn:
conn.execute("UPDATE music_library SET cover_images=? WHERE id=?", (json.dumps(images), track_id))
def update_track_wav_url(track_id: int, wav_url: str) -> None:
with _conn() as conn:
conn.execute("UPDATE music_library SET wav_url=? WHERE id=?", (wav_url, track_id))
def update_track_video_url(track_id: int, video_url: str) -> None:
with _conn() as conn:
conn.execute("UPDATE music_library SET video_url=? WHERE id=?", (video_url, track_id))
def update_track_stem_urls(track_id: int, stems: dict) -> None:
with _conn() as conn:
conn.execute("UPDATE music_library SET stem_urls=? WHERE id=?", (json.dumps(stems), track_id))
def delete_lyrics(lyrics_id: int) -> bool:
with _conn() as conn:
row = conn.execute("SELECT id FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,))
return True
# ── video_projects CRUD ───────────────────────────────────────────────────────
def _vp_row_to_dict(r) -> dict:
return {
"id": r["id"],
"track_id": r["track_id"],
"format": r["format"],
"status": r["status"],
"output_path": r["output_path"],
"output_url": r["output_url"],
"thumbnail_path": r["thumbnail_path"],
"target_countries": json.loads(r["target_countries"]) if r["target_countries"] else [],
"yt_title": r["yt_title"],
"yt_description": r["yt_description"],
"yt_tags": json.loads(r["yt_tags"]) if r["yt_tags"] else [],
"render_params": json.loads(r["render_params"]) if r["render_params"] else {},
"error": r["error"],
"created_at": r["created_at"],
"completed_at": r["completed_at"],
}
def create_video_project(data: dict) -> dict:
with _conn() as conn:
conn.execute(
"""INSERT INTO video_projects (track_id, format, target_countries, render_params)
VALUES (?, ?, ?, ?)""",
(data.get("track_id"), data.get("format", "visualizer"),
json.dumps(data.get("target_countries", [])),
json.dumps(data.get("render_params", {}))),
)
row = conn.execute("SELECT * FROM video_projects WHERE rowid = last_insert_rowid()").fetchone()
return _vp_row_to_dict(row)
def get_video_project(project_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM video_projects WHERE id = ?", (project_id,)).fetchone()
return _vp_row_to_dict(row) if row else None
def get_all_video_projects() -> list:
with _conn() as conn:
rows = conn.execute("SELECT * FROM video_projects ORDER BY created_at DESC").fetchall()
return [_vp_row_to_dict(r) for r in rows]
def update_video_project_status(
project_id: int,
status: str,
output_path: str = "",
output_url: str = "",
thumbnail_path: str = "",
yt_title: str = "",
yt_description: str = "",
yt_tags: list = None,
error: str = None,
) -> None:
completed_at_expr = (
"strftime('%Y-%m-%dT%H:%M:%fZ','now')" if status in ("done", "failed") else "NULL"
)
with _conn() as conn:
conn.execute(
f"""UPDATE video_projects
SET status=?, output_path=?, output_url=?, thumbnail_path=?,
yt_title=?, yt_description=?, yt_tags=?, error=?,
completed_at={completed_at_expr}
WHERE id=?""",
(status, output_path, output_url, thumbnail_path,
yt_title, yt_description, json.dumps(yt_tags or []), error, project_id),
)
def delete_video_project(project_id: int) -> bool:
with _conn() as conn:
row = conn.execute("SELECT id FROM video_projects WHERE id = ?", (project_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM video_projects WHERE id = ?", (project_id,))
return True
def get_track_by_id(track_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM music_library WHERE id = ?", (track_id,)).fetchone()
return _track_row_to_dict(row) if row else None
# ── revenue_records CRUD ──────────────────────────────────────────────────────
def _rr_row_to_dict(r) -> dict:
return {
"id": r["id"],
"video_project_id": r["video_project_id"],
"yt_video_id": r["yt_video_id"],
"record_month": r["record_month"],
"views": r["views"],
"watch_hours": r["watch_hours"],
"revenue_usd": r["revenue_usd"],
"rpm_usd": r["rpm_usd"],
"country": r["country"],
"source": r["source"],
"created_at": r["created_at"],
}
def create_revenue_record(data: dict) -> dict:
views = data.get("views", 0)
revenue = data.get("revenue_usd", 0.0)
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
with _conn() as conn:
conn.execute(
"""INSERT INTO revenue_records
(video_project_id, yt_video_id, record_month, views, watch_hours,
revenue_usd, rpm_usd, country, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(data.get("video_project_id"), data.get("yt_video_id", ""),
data.get("record_month", ""), views, data.get("watch_hours", 0.0),
revenue, rpm, data.get("country", ""), data.get("source", "manual")),
)
row = conn.execute("SELECT * FROM revenue_records WHERE rowid = last_insert_rowid()").fetchone()
return _rr_row_to_dict(row)
def get_all_revenue_records(yt_video_id: str = None, year_month: str = None) -> list:
with _conn() as conn:
q = "SELECT * FROM revenue_records WHERE 1=1"
params: list = []
if yt_video_id:
q += " AND yt_video_id=?"
params.append(yt_video_id)
if year_month:
q += " AND record_month=?"
params.append(year_month)
q += " ORDER BY record_month DESC"
rows = conn.execute(q, params).fetchall()
return [_rr_row_to_dict(r) for r in rows]
def update_revenue_record(record_id: int, data: dict) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
if not row:
return None
cur = _rr_row_to_dict(row)
views = data.get("views", cur["views"])
revenue = data.get("revenue_usd", cur["revenue_usd"])
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
conn.execute(
"""UPDATE revenue_records
SET yt_video_id=?, record_month=?, views=?, watch_hours=?,
revenue_usd=?, rpm_usd=?, country=?, source=?
WHERE id=?""",
(data.get("yt_video_id", cur["yt_video_id"]),
data.get("record_month", cur["record_month"]),
views, data.get("watch_hours", cur["watch_hours"]),
revenue, rpm,
data.get("country", cur["country"]),
data.get("source", cur["source"]),
record_id),
)
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
return _rr_row_to_dict(row)
def delete_revenue_record(record_id: int) -> bool:
with _conn() as conn:
row = conn.execute("SELECT id FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM revenue_records WHERE id = ?", (record_id,))
return True
def get_revenue_dashboard() -> dict:
with _conn() as conn:
total = conn.execute(
"SELECT SUM(revenue_usd) as total, SUM(views) as views, SUM(watch_hours) as hours FROM revenue_records"
).fetchone()
by_month = conn.execute(
"""SELECT record_month, SUM(revenue_usd) as revenue, SUM(views) as views,
CASE WHEN SUM(views) > 0 THEN ROUND(SUM(revenue_usd) / SUM(views) * 1000, 4) ELSE 0.0 END as avg_rpm FROM revenue_records
GROUP BY record_month ORDER BY record_month DESC LIMIT 12"""
).fetchall()
by_country = conn.execute(
"""SELECT country, SUM(revenue_usd) as revenue, SUM(views) as views
FROM revenue_records WHERE country != ''
GROUP BY country ORDER BY revenue DESC LIMIT 10"""
).fetchall()
return {
"total_revenue_usd": total["total"] or 0.0,
"total_views": total["views"] or 0,
"total_watch_hours": total["hours"] or 0.0,
"by_month": [dict(r) for r in by_month],
"by_country": [dict(r) for r in by_country],
}
# ── market_trends CRUD ────────────────────────────────────────────────────────
def insert_market_trends(trends: list) -> None:
with _conn() as conn:
conn.executemany(
"""INSERT INTO market_trends (source, country, genre, keyword, score, rank, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
[(t.get("source",""), t.get("country",""), t.get("genre",""),
t.get("keyword",""), t.get("score", 0.0), t.get("rank"),
json.dumps(t.get("metadata", {})))
for t in trends],
)
def get_market_trends(
country: str = None, genre: str = None, source: str = None, days: int = 7
) -> list:
with _conn() as conn:
q = "SELECT * FROM market_trends WHERE collected_at >= datetime('now', ?)"
params: list = [f"-{days} days"]
if country:
q += " AND country=?"; params.append(country)
if genre:
q += " AND genre=?"; params.append(genre)
if source:
q += " AND source=?"; params.append(source)
q += " ORDER BY collected_at DESC LIMIT 500"
rows = conn.execute(q, params).fetchall()
return [
{"id": r["id"], "source": r["source"], "country": r["country"],
"genre": r["genre"], "keyword": r["keyword"], "score": r["score"],
"rank": r["rank"], "metadata": json.loads(r["metadata"]),
"collected_at": r["collected_at"]}
for r in rows
]
# ── trend_reports CRUD ────────────────────────────────────────────────────────
def upsert_trend_report(data: dict) -> None:
with _conn() as conn:
conn.execute(
"""INSERT INTO trend_reports
(report_date, top_genres, top_keywords, recommended_styles, insights)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(report_date) DO UPDATE SET
top_genres=excluded.top_genres,
top_keywords=excluded.top_keywords,
recommended_styles=excluded.recommended_styles,
insights=excluded.insights""",
(data["report_date"], json.dumps(data["top_genres"]),
json.dumps(data["top_keywords"]), json.dumps(data["recommended_styles"]),
data["insights"]),
)
def get_latest_trend_report() -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT 1"
).fetchone()
if not row:
return None
return {
"id": row["id"],
"report_date": row["report_date"],
"top_genres": json.loads(row["top_genres"]),
"top_keywords": json.loads(row["top_keywords"]),
"recommended_styles": json.loads(row["recommended_styles"]),
"insights": row["insights"],
"created_at": row["created_at"],
}
def get_trend_reports(limit: int = 10) -> list:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT ?", (limit,)
).fetchall()
return [
{
"id": r["id"],
"report_date": r["report_date"],
"top_genres": json.loads(r["top_genres"] or "[]"),
"recommended_styles": json.loads(r["recommended_styles"] or "[]"),
"insights": (r["insights"] or "")[:100],
"created_at": r["created_at"],
}
for r in rows
]
# ── Compile Jobs ─────────────────────────────────────────
def create_compile_job(title: str, track_ids: list, crossfade_sec: float) -> int:
with _conn() as conn:
cur = conn.execute(
"INSERT INTO compile_jobs (title, track_ids, crossfade_sec) VALUES (?,?,?)",
(title, json.dumps(track_ids), crossfade_sec),
)
return cur.lastrowid
def get_compile_jobs() -> list:
with _conn() as conn:
rows = conn.execute(
"SELECT id, title, track_ids, crossfade_sec, status, duration_sec, created_at "
"FROM compile_jobs ORDER BY created_at DESC LIMIT 50"
).fetchall()
return [
{
"id": r["id"],
"title": r["title"],
"track_ids": json.loads(r["track_ids"]),
"crossfade_sec": r["crossfade_sec"],
"status": r["status"],
"duration_sec": r["duration_sec"],
"created_at": r["created_at"],
}
for r in rows
]
def get_compile_job(job_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute(
"SELECT * FROM compile_jobs WHERE id = ?", (job_id,)
).fetchone()
if not r:
return None
return {
"id": r["id"],
"title": r["title"],
"track_ids": json.loads(r["track_ids"]),
"crossfade_sec": r["crossfade_sec"],
"status": r["status"],
"output_path": r["output_path"],
"duration_sec": r["duration_sec"],
"error": r["error"],
"created_at": r["created_at"],
}
def update_compile_job(job_id: int, **kwargs) -> None:
allowed = {"status", "output_path", "duration_sec", "error"}
fields = {k: v for k, v in kwargs.items() if k in allowed}
if not fields:
return
set_clause = ", ".join(f"{k} = ?" for k in fields)
with _conn() as conn:
conn.execute(
f"UPDATE compile_jobs SET {set_clause} WHERE id = ?",
(*fields.values(), job_id),
)
def delete_compile_job(job_id: int) -> None:
with _conn() as conn:
conn.execute("DELETE FROM compile_jobs WHERE id = ?", (job_id,))
# ── YouTube pipeline helpers ──────────────────────────────────────────────────
# update_pipeline_state: state/state_started_at/updated_at은 자동, 그 외 허용 컬럼 화이트리스트
_PIPELINE_STATE_EXTRA_COLS = frozenset({
"cover_url",
"video_url",
"thumbnail_url",
"metadata_json",
"review_json",
"youtube_video_id",
"cancelled_at",
"failed_reason",
"last_telegram_msg_ids",
"feedback_count_per_step",
})
# update_pipeline_job 허용 컬럼 화이트리스트
_PIPELINE_JOB_COLS = frozenset({
"status",
"error",
"duration_ms",
"started_at",
"finished_at",
})
def _now() -> str:
return datetime.utcnow().isoformat(timespec="seconds")
def _parse_pipeline_row(row: sqlite3.Row) -> Dict[str, Any]:
"""video_pipelines의 sqlite3.Row를 dict로 파싱.
JSON 컬럼을 디코드하고, metadata/review를 호환을 위해 추가로 노출한다.
get_pipeline / list_pipelines가 공유.
"""
d = dict(row)
d["feedback_count_per_step"] = json.loads(d["feedback_count_per_step"] or "{}")
d["last_telegram_msg_ids"] = json.loads(d["last_telegram_msg_ids"] or "{}")
if d.get("metadata_json"):
d["metadata"] = json.loads(d["metadata_json"])
if d.get("review_json"):
d["review"] = json.loads(d["review_json"])
return d
def create_pipeline(track_id: Optional[int] = None, *,
compile_job_id: Optional[int] = None,
visual_style: str = "essential",
background_mode: str = "static",
background_keyword: Optional[str] = None) -> int:
"""track_id XOR compile_job_id 검증."""
if (track_id is None) == (compile_job_id is None):
raise ValueError("track_id와 compile_job_id 중 정확히 하나만 지정")
with _conn() as conn:
cur = conn.cursor()
now = _now()
cur.execute("""
INSERT INTO video_pipelines
(track_id, compile_job_id, visual_style, background_mode, background_keyword,
state, state_started_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 'created', ?, ?, ?)
""", (track_id, compile_job_id, visual_style, background_mode,
background_keyword, now, now, now))
return cur.lastrowid
def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("""
SELECT vp.*, ml.title AS track_title, cj.title AS compile_title
FROM video_pipelines vp
LEFT JOIN music_library ml ON ml.id = vp.track_id
LEFT JOIN compile_jobs cj ON cj.id = vp.compile_job_id
WHERE vp.id = ?
""", (pid,)).fetchone()
if not row:
return None
return _parse_pipeline_row(row)
def update_pipeline_state(pid: int, state: str, **fields) -> None:
"""파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다.
허용 컬럼 화이트리스트(_PIPELINE_STATE_EXTRA_COLS)에 없는 키는 ValueError.
"""
unknown = set(fields) - _PIPELINE_STATE_EXTRA_COLS
if unknown:
raise ValueError(f"unknown columns for update_pipeline_state: {sorted(unknown)}")
now = _now()
cols = ["state = ?", "state_started_at = ?", "updated_at = ?"]
vals: List[Any] = [state, now, now]
for k, v in fields.items():
cols.append(f"{k} = ?")
vals.append(v)
vals.append(pid)
with _conn() as conn:
conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals)
def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]:
sql = """
SELECT vp.*, ml.title AS track_title, cj.title AS compile_title
FROM video_pipelines vp
LEFT JOIN music_library ml ON ml.id = vp.track_id
LEFT JOIN compile_jobs cj ON cj.id = vp.compile_job_id
"""
if active_only:
sql += " WHERE vp.state NOT IN ('published','cancelled','failed','awaiting_manual')"
sql += " ORDER BY vp.created_at DESC"
with _conn() as conn:
rows = conn.execute(sql).fetchall()
return [_parse_pipeline_row(r) for r in rows]
def increment_feedback_count(pid: int, step: str) -> int:
"""원자적으로 feedback_count_per_step.<step>를 +1 한 뒤 새 값을 반환.
json1 확장(SQLite 3.38+)을 사용해 read-modify-write 경합을 제거한다.
"""
now = _now()
with _conn() as conn:
conn.execute(
"""
UPDATE video_pipelines
SET feedback_count_per_step = json_set(
feedback_count_per_step,
'$.' || ?,
COALESCE(json_extract(feedback_count_per_step, '$.' || ?), 0) + 1
),
updated_at = ?
WHERE id = ?
""",
(step, step, now, pid),
)
row = conn.execute(
"SELECT json_extract(feedback_count_per_step, '$.' || ?) AS c "
"FROM video_pipelines WHERE id = ?",
(step, pid),
).fetchone()
return int(row["c"]) if row and row["c"] is not None else 0
def record_feedback(pid: int, step: str, feedback_text: str) -> None:
with _conn() as conn:
conn.execute("""
INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at)
VALUES (?, ?, ?, ?)
""", (pid, step, feedback_text, _now()))
def get_feedback_history(pid: int) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("""
SELECT * FROM pipeline_feedback
WHERE pipeline_id = ? ORDER BY id DESC
""", (pid,)).fetchall()
return [dict(r) for r in rows]
def create_pipeline_job(pid: int, step: str) -> int:
with _conn() as conn:
cur = conn.execute("""
INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at)
VALUES (?, ?, 'queued', ?)
""", (pid, step, _now()))
return cur.lastrowid
def update_pipeline_job(job_id: int, **fields) -> None:
"""pipeline_jobs 행을 갱신. 허용 컬럼 화이트리스트 외 키는 ValueError.
status가 succeeded/failed로 바뀌면 finished_at을 자동 설정 (호출자가 미지정 시).
"""
unknown = set(fields) - _PIPELINE_JOB_COLS
if unknown:
raise ValueError(f"unknown columns for update_pipeline_job: {sorted(unknown)}")
if not fields:
return
if (
fields.get("status") in ("succeeded", "failed")
and "finished_at" not in fields
):
fields["finished_at"] = _now()
cols = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values()) + [job_id]
with _conn() as conn:
conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals)
def list_pipeline_jobs(pid: int) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("""
SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC
""", (pid,)).fetchall()
return [dict(r) for r in rows]
def get_youtube_setup() -> Dict[str, Any]:
"""youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회."""
with _conn() as conn:
row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone()
if row is None:
_seed_default_youtube_setup(conn)
row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone()
d = dict(row)
for k in ("metadata_template_json", "cover_prompts_json",
"review_weights_json", "visual_defaults_json", "publish_policy_json"):
d[k.replace("_json", "")] = json.loads(d[k])
return d
def update_youtube_setup(**kwargs) -> None:
field_map = {
"metadata_template": "metadata_template_json",
"cover_prompts": "cover_prompts_json",
"review_weights": "review_weights_json",
"visual_defaults": "visual_defaults_json",
"publish_policy": "publish_policy_json",
}
cols = []
vals: List[Any] = []
for k, v in kwargs.items():
if k in field_map:
cols.append(f"{field_map[k]} = ?")
vals.append(json.dumps(v))
elif k == "review_threshold":
cols.append("review_threshold = ?")
vals.append(int(v))
if not cols:
return
cols.append("updated_at = ?")
vals.append(_now())
with _conn() as conn:
conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals)
def upsert_oauth_token(channel_id: str, channel_title: Optional[str],
avatar_url: Optional[str], refresh_token: str,
access_token: Optional[str], expires_at: Optional[str]) -> None:
with _conn() as conn:
conn.execute("DELETE FROM youtube_oauth_tokens")
conn.execute("""
INSERT INTO youtube_oauth_tokens
(channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now()))
def get_oauth_token() -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone()
return dict(row) if row else None
def delete_oauth_token() -> None:
with _conn() as conn:
conn.execute("DELETE FROM youtube_oauth_tokens")