1151 lines
45 KiB
Python
1151 lines
45 KiB
Python
import os
|
|
import sqlite3
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
DB_PATH = "/app/data/music.db"
|
|
|
|
|
|
def _conn() -> sqlite3.Connection:
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db() -> None:
|
|
with _conn() as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS music_tasks (
|
|
id TEXT PRIMARY KEY,
|
|
status TEXT NOT NULL DEFAULT 'queued',
|
|
progress INTEGER NOT NULL DEFAULT 0,
|
|
message TEXT NOT NULL DEFAULT '',
|
|
audio_url TEXT,
|
|
error TEXT,
|
|
params TEXT NOT NULL DEFAULT '{}',
|
|
provider TEXT NOT NULL DEFAULT 'local',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_created ON music_tasks(created_at DESC)")
|
|
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS music_library (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL DEFAULT '',
|
|
genre TEXT NOT NULL DEFAULT '',
|
|
moods TEXT NOT NULL DEFAULT '[]',
|
|
instruments TEXT NOT NULL DEFAULT '[]',
|
|
duration_sec INTEGER,
|
|
bpm INTEGER,
|
|
key TEXT NOT NULL DEFAULT '',
|
|
scale TEXT NOT NULL DEFAULT '',
|
|
prompt TEXT NOT NULL DEFAULT '',
|
|
audio_url TEXT NOT NULL DEFAULT '',
|
|
file_path TEXT NOT NULL DEFAULT '',
|
|
task_id TEXT,
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
provider TEXT NOT NULL DEFAULT 'local',
|
|
lyrics TEXT NOT NULL DEFAULT '',
|
|
image_url TEXT NOT NULL DEFAULT '',
|
|
suno_id TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_library_created ON music_library(created_at DESC)")
|
|
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS saved_lyrics (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL DEFAULT '',
|
|
text TEXT NOT NULL DEFAULT '',
|
|
prompt TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_lyrics_created ON saved_lyrics(created_at DESC)")
|
|
|
|
# 기존 테이블 마이그레이션 (컬럼 없으면 추가)
|
|
for col, default in [
|
|
("provider", "'local'"), ("lyrics", "''"),
|
|
("image_url", "''"), ("suno_id", "''"),
|
|
("file_hash", "''"),
|
|
]:
|
|
try:
|
|
conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}")
|
|
except sqlite3.OperationalError:
|
|
pass # 이미 존재
|
|
try:
|
|
conn.execute("ALTER TABLE music_tasks ADD COLUMN provider TEXT NOT NULL DEFAULT 'local'")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
# Phase 1~3 신규 컬럼 마이그레이션
|
|
for col, default in [
|
|
("cover_images", "'[]'"),
|
|
("wav_url", "''"),
|
|
("video_url", "''"),
|
|
("stem_urls", "'{}'"),
|
|
]:
|
|
try:
|
|
conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
# ── video_projects 테이블 ─────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS video_projects (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
track_id INTEGER,
|
|
format TEXT NOT NULL DEFAULT 'visualizer',
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
output_path TEXT NOT NULL DEFAULT '',
|
|
output_url TEXT NOT NULL DEFAULT '',
|
|
thumbnail_path TEXT NOT NULL DEFAULT '',
|
|
target_countries TEXT NOT NULL DEFAULT '[]',
|
|
yt_title TEXT NOT NULL DEFAULT '',
|
|
yt_description TEXT NOT NULL DEFAULT '',
|
|
yt_tags TEXT NOT NULL DEFAULT '[]',
|
|
render_params TEXT NOT NULL DEFAULT '{}',
|
|
error TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
completed_at TEXT
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_track ON video_projects(track_id)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_status ON video_projects(status)")
|
|
|
|
# ── revenue_records 테이블 ────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS revenue_records (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
video_project_id INTEGER,
|
|
yt_video_id TEXT NOT NULL DEFAULT '',
|
|
record_month TEXT NOT NULL DEFAULT '',
|
|
views INTEGER NOT NULL DEFAULT 0,
|
|
watch_hours REAL NOT NULL DEFAULT 0.0,
|
|
revenue_usd REAL NOT NULL DEFAULT 0.0,
|
|
rpm_usd REAL NOT NULL DEFAULT 0.0,
|
|
country TEXT NOT NULL DEFAULT '',
|
|
source TEXT NOT NULL DEFAULT 'manual',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
UNIQUE(yt_video_id, record_month, country)
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_rr_month ON revenue_records(record_month DESC)")
|
|
|
|
# ── market_trends 테이블 ──────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS market_trends (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
source TEXT NOT NULL DEFAULT '',
|
|
country TEXT NOT NULL DEFAULT '',
|
|
genre TEXT NOT NULL DEFAULT '',
|
|
keyword TEXT NOT NULL DEFAULT '',
|
|
score REAL NOT NULL DEFAULT 0.0,
|
|
rank INTEGER,
|
|
metadata TEXT NOT NULL DEFAULT '{}',
|
|
collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_mt_country_source "
|
|
"ON market_trends(country, source, collected_at DESC)"
|
|
)
|
|
|
|
# ── trend_reports 테이블 ──────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS trend_reports (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
report_date TEXT UNIQUE NOT NULL DEFAULT '',
|
|
top_genres TEXT NOT NULL DEFAULT '[]',
|
|
top_keywords TEXT NOT NULL DEFAULT '[]',
|
|
recommended_styles TEXT NOT NULL DEFAULT '[]',
|
|
insights TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
|
|
# ── compile_jobs 테이블 ───────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS compile_jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL DEFAULT '',
|
|
track_ids TEXT NOT NULL DEFAULT '[]',
|
|
crossfade_sec REAL NOT NULL DEFAULT 3.0,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
output_path TEXT,
|
|
duration_sec REAL,
|
|
error TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
|
|
# ── YouTube pipeline 테이블 (5개) ─────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS video_pipelines (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
track_id INTEGER NOT NULL,
|
|
state TEXT NOT NULL DEFAULT 'created',
|
|
state_started_at TEXT NOT NULL,
|
|
cover_url TEXT,
|
|
video_url TEXT,
|
|
thumbnail_url TEXT,
|
|
metadata_json TEXT,
|
|
review_json TEXT,
|
|
youtube_video_id TEXT,
|
|
feedback_count_per_step TEXT NOT NULL DEFAULT '{}',
|
|
last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}',
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
cancelled_at TEXT,
|
|
failed_reason TEXT
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS pipeline_jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
pipeline_id INTEGER NOT NULL,
|
|
step TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'queued',
|
|
error TEXT,
|
|
started_at TEXT,
|
|
finished_at TEXT,
|
|
duration_ms INTEGER,
|
|
FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS pipeline_feedback (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
pipeline_id INTEGER NOT NULL,
|
|
step TEXT NOT NULL,
|
|
feedback_text TEXT NOT NULL,
|
|
received_at TEXT NOT NULL,
|
|
FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id)
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS youtube_oauth_tokens (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
channel_id TEXT NOT NULL,
|
|
channel_title TEXT,
|
|
avatar_url TEXT,
|
|
refresh_token TEXT NOT NULL,
|
|
access_token TEXT,
|
|
expires_at TEXT,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS youtube_setup (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1),
|
|
metadata_template_json TEXT NOT NULL,
|
|
cover_prompts_json TEXT NOT NULL,
|
|
review_weights_json TEXT NOT NULL,
|
|
review_threshold INTEGER NOT NULL DEFAULT 60,
|
|
visual_defaults_json TEXT NOT NULL,
|
|
publish_policy_json TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
# 기본 setup 1행 보장
|
|
cnt_row = conn.execute("SELECT COUNT(*) AS c FROM youtube_setup").fetchone()
|
|
if cnt_row["c"] == 0:
|
|
_seed_default_youtube_setup(conn)
|
|
|
|
|
|
def _seed_default_youtube_setup(conn: sqlite3.Connection) -> None:
|
|
"""youtube_setup 테이블에 기본 1행을 삽입한다.
|
|
|
|
init_db()와 get_youtube_setup() (행이 사라진 경우 self-heal)가 공유한다.
|
|
"""
|
|
defaults = (
|
|
json.dumps({
|
|
"title": "[{genre}] {title} | {bpm}BPM",
|
|
"description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n",
|
|
"tags": ["lofi", "chill", "instrumental"],
|
|
"category_id": 10,
|
|
}),
|
|
json.dumps({
|
|
"lo-fi": "moody anime cityscape at dusk, lofi aesthetic",
|
|
"phonk": "dark drift car aesthetic, neon, phonk vibe",
|
|
"ambient": "ethereal mountain landscape, ambient mood",
|
|
"default": "abstract music album cover art",
|
|
}),
|
|
json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}),
|
|
60,
|
|
json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}),
|
|
json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}),
|
|
datetime.utcnow().isoformat(timespec="seconds"),
|
|
)
|
|
conn.execute("""
|
|
INSERT INTO youtube_setup
|
|
(metadata_template_json, cover_prompts_json, review_weights_json,
|
|
review_threshold, visual_defaults_json, publish_policy_json, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", defaults)
|
|
|
|
|
|
# ── music_tasks CRUD ──────────────────────────────────────────────────────────
|
|
|
|
def _task_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"task_id": r["id"],
|
|
"status": r["status"],
|
|
"progress": r["progress"],
|
|
"message": r["message"],
|
|
"audio_url": r["audio_url"],
|
|
"error": r["error"],
|
|
"params": json.loads(r["params"]),
|
|
"provider": r["provider"] if "provider" in r.keys() else "local",
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def create_task(task_id: str, params: Dict[str, Any], provider: str = "local") -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO music_tasks (id, params, provider) VALUES (?, ?, ?)",
|
|
(task_id, json.dumps(params), provider),
|
|
)
|
|
row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone()
|
|
return _task_row_to_dict(row)
|
|
|
|
|
|
def update_task(
|
|
task_id: str,
|
|
status: str,
|
|
progress: int,
|
|
message: str,
|
|
audio_url: str = None,
|
|
error: str = None,
|
|
) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE music_tasks
|
|
SET status = ?, progress = ?, message = ?, audio_url = ?, error = ?,
|
|
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
|
|
WHERE id = ?
|
|
""",
|
|
(status, progress, message, audio_url, error, task_id),
|
|
)
|
|
|
|
|
|
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone()
|
|
return _task_row_to_dict(row) if row else None
|
|
|
|
|
|
# ── music_library CRUD ────────────────────────────────────────────────────────
|
|
|
|
def _track_row_to_dict(r) -> Dict[str, Any]:
|
|
keys = r.keys()
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"genre": r["genre"],
|
|
"moods": json.loads(r["moods"]) if r["moods"] else [],
|
|
"instruments": json.loads(r["instruments"]) if r["instruments"] else [],
|
|
"duration_sec": r["duration_sec"],
|
|
"bpm": r["bpm"],
|
|
"key": r["key"],
|
|
"scale": r["scale"],
|
|
"prompt": r["prompt"],
|
|
"audio_url": r["audio_url"],
|
|
"file_path": r["file_path"],
|
|
"task_id": r["task_id"],
|
|
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
|
"provider": r["provider"] if "provider" in keys else "local",
|
|
"lyrics": r["lyrics"] if "lyrics" in keys else "",
|
|
"image_url": r["image_url"] if "image_url" in keys else "",
|
|
"suno_id": r["suno_id"] if "suno_id" in keys else "",
|
|
"file_hash": r["file_hash"] if "file_hash" in keys else "",
|
|
"cover_images": json.loads(r["cover_images"]) if "cover_images" in keys and r["cover_images"] else [],
|
|
"wav_url": r["wav_url"] if "wav_url" in keys else "",
|
|
"video_url": r["video_url"] if "video_url" in keys else "",
|
|
"stem_urls": json.loads(r["stem_urls"]) if "stem_urls" in keys and r["stem_urls"] else {},
|
|
"created_at": r["created_at"],
|
|
}
|
|
|
|
|
|
def get_all_tracks() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT * FROM music_library ORDER BY created_at DESC").fetchall()
|
|
return [_track_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def add_track(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO music_library
|
|
(title, genre, moods, instruments, duration_sec, bpm, key, scale,
|
|
prompt, audio_url, file_path, task_id, tags,
|
|
provider, lyrics, image_url, suno_id, file_hash)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
data.get("title", ""),
|
|
data.get("genre", ""),
|
|
json.dumps(data.get("moods", [])),
|
|
json.dumps(data.get("instruments", [])),
|
|
data.get("duration_sec"),
|
|
data.get("bpm"),
|
|
data.get("key", ""),
|
|
data.get("scale", ""),
|
|
data.get("prompt", ""),
|
|
data.get("audio_url", ""),
|
|
data.get("file_path", ""),
|
|
data.get("task_id"),
|
|
json.dumps(data.get("tags", [])),
|
|
data.get("provider", "local"),
|
|
data.get("lyrics", ""),
|
|
data.get("image_url", ""),
|
|
data.get("suno_id", ""),
|
|
data.get("file_hash", ""),
|
|
),
|
|
)
|
|
row = conn.execute("SELECT * FROM music_library WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _track_row_to_dict(row)
|
|
|
|
|
|
def delete_track(track_id: int) -> bool:
|
|
with _conn() as conn:
|
|
# 파일 경로 먼저 조회 (삭제 후 파일도 지울 수 있도록)
|
|
row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute("DELETE FROM music_library WHERE id = ?", (track_id,))
|
|
return True
|
|
|
|
|
|
def get_track_by_task_id(task_id: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM music_library WHERE task_id = ?", (task_id,)).fetchone()
|
|
return _track_row_to_dict(row) if row else None
|
|
|
|
|
|
def update_track_duration(track_id: int, duration_sec: int) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE music_library SET duration_sec = ? WHERE id = ? AND duration_sec IS NULL",
|
|
(duration_sec, track_id),
|
|
)
|
|
|
|
|
|
def update_track_file_info(track_id: int, title: str, audio_url: str, file_path: str) -> None:
|
|
"""파일 rename 시 파일 관련 정보만 업데이트 (태그 등 메타데이터 보존)."""
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE music_library SET title=?, audio_url=?, file_path=? WHERE id=?",
|
|
(title, audio_url, file_path, track_id),
|
|
)
|
|
|
|
|
|
def update_track_hash(track_id: int, file_hash: str) -> None:
|
|
"""트랙의 file_hash를 업데이트."""
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE music_library SET file_hash=? WHERE id=?",
|
|
(file_hash, track_id),
|
|
)
|
|
|
|
|
|
def get_track_file_path(track_id: int) -> Optional[str]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone()
|
|
return row["file_path"] if row else None
|
|
|
|
|
|
# ── saved_lyrics CRUD ────────────────────────────────────────────────────────
|
|
|
|
def _lyrics_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"text": r["text"],
|
|
"prompt": r["prompt"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_lyrics() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT * FROM saved_lyrics ORDER BY created_at DESC").fetchall()
|
|
return [_lyrics_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def add_lyrics(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO saved_lyrics (title, text, prompt) VALUES (?, ?, ?)",
|
|
(data.get("title", ""), data.get("text", ""), data.get("prompt", "")),
|
|
)
|
|
row = conn.execute("SELECT * FROM saved_lyrics WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _lyrics_row_to_dict(row)
|
|
|
|
|
|
def update_lyrics(lyrics_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
fields = []
|
|
values = []
|
|
for k in ("title", "text", "prompt"):
|
|
if k in data:
|
|
fields.append(f"{k} = ?")
|
|
values.append(data[k])
|
|
if not fields:
|
|
return None
|
|
fields.append("updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')")
|
|
values.append(lyrics_id)
|
|
conn.execute(f"UPDATE saved_lyrics SET {', '.join(fields)} WHERE id = ?", values)
|
|
row = conn.execute("SELECT * FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone()
|
|
return _lyrics_row_to_dict(row) if row else None
|
|
|
|
|
|
def update_track_cover_images(track_id: int, images: list) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE music_library SET cover_images=? WHERE id=?", (json.dumps(images), track_id))
|
|
|
|
|
|
def update_track_wav_url(track_id: int, wav_url: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE music_library SET wav_url=? WHERE id=?", (wav_url, track_id))
|
|
|
|
|
|
def update_track_video_url(track_id: int, video_url: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE music_library SET video_url=? WHERE id=?", (video_url, track_id))
|
|
|
|
|
|
def update_track_stem_urls(track_id: int, stems: dict) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE music_library SET stem_urls=? WHERE id=?", (json.dumps(stems), track_id))
|
|
|
|
|
|
def delete_lyrics(lyrics_id: int) -> bool:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT id FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,))
|
|
return True
|
|
|
|
|
|
# ── video_projects CRUD ───────────────────────────────────────────────────────
|
|
|
|
def _vp_row_to_dict(r) -> dict:
|
|
return {
|
|
"id": r["id"],
|
|
"track_id": r["track_id"],
|
|
"format": r["format"],
|
|
"status": r["status"],
|
|
"output_path": r["output_path"],
|
|
"output_url": r["output_url"],
|
|
"thumbnail_path": r["thumbnail_path"],
|
|
"target_countries": json.loads(r["target_countries"]) if r["target_countries"] else [],
|
|
"yt_title": r["yt_title"],
|
|
"yt_description": r["yt_description"],
|
|
"yt_tags": json.loads(r["yt_tags"]) if r["yt_tags"] else [],
|
|
"render_params": json.loads(r["render_params"]) if r["render_params"] else {},
|
|
"error": r["error"],
|
|
"created_at": r["created_at"],
|
|
"completed_at": r["completed_at"],
|
|
}
|
|
|
|
|
|
def create_video_project(data: dict) -> dict:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""INSERT INTO video_projects (track_id, format, target_countries, render_params)
|
|
VALUES (?, ?, ?, ?)""",
|
|
(data.get("track_id"), data.get("format", "visualizer"),
|
|
json.dumps(data.get("target_countries", [])),
|
|
json.dumps(data.get("render_params", {}))),
|
|
)
|
|
row = conn.execute("SELECT * FROM video_projects WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _vp_row_to_dict(row)
|
|
|
|
|
|
def get_video_project(project_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM video_projects WHERE id = ?", (project_id,)).fetchone()
|
|
return _vp_row_to_dict(row) if row else None
|
|
|
|
|
|
def get_all_video_projects() -> list:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT * FROM video_projects ORDER BY created_at DESC").fetchall()
|
|
return [_vp_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def update_video_project_status(
|
|
project_id: int,
|
|
status: str,
|
|
output_path: str = "",
|
|
output_url: str = "",
|
|
thumbnail_path: str = "",
|
|
yt_title: str = "",
|
|
yt_description: str = "",
|
|
yt_tags: list = None,
|
|
error: str = None,
|
|
) -> None:
|
|
completed_at_expr = (
|
|
"strftime('%Y-%m-%dT%H:%M:%fZ','now')" if status in ("done", "failed") else "NULL"
|
|
)
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
f"""UPDATE video_projects
|
|
SET status=?, output_path=?, output_url=?, thumbnail_path=?,
|
|
yt_title=?, yt_description=?, yt_tags=?, error=?,
|
|
completed_at={completed_at_expr}
|
|
WHERE id=?""",
|
|
(status, output_path, output_url, thumbnail_path,
|
|
yt_title, yt_description, json.dumps(yt_tags or []), error, project_id),
|
|
)
|
|
|
|
|
|
def delete_video_project(project_id: int) -> bool:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT id FROM video_projects WHERE id = ?", (project_id,)).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute("DELETE FROM video_projects WHERE id = ?", (project_id,))
|
|
return True
|
|
|
|
|
|
def get_track_by_id(track_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM music_library WHERE id = ?", (track_id,)).fetchone()
|
|
return _track_row_to_dict(row) if row else None
|
|
|
|
|
|
# ── revenue_records CRUD ──────────────────────────────────────────────────────
|
|
|
|
def _rr_row_to_dict(r) -> dict:
|
|
return {
|
|
"id": r["id"],
|
|
"video_project_id": r["video_project_id"],
|
|
"yt_video_id": r["yt_video_id"],
|
|
"record_month": r["record_month"],
|
|
"views": r["views"],
|
|
"watch_hours": r["watch_hours"],
|
|
"revenue_usd": r["revenue_usd"],
|
|
"rpm_usd": r["rpm_usd"],
|
|
"country": r["country"],
|
|
"source": r["source"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
|
|
|
|
def create_revenue_record(data: dict) -> dict:
|
|
views = data.get("views", 0)
|
|
revenue = data.get("revenue_usd", 0.0)
|
|
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""INSERT INTO revenue_records
|
|
(video_project_id, yt_video_id, record_month, views, watch_hours,
|
|
revenue_usd, rpm_usd, country, source)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(data.get("video_project_id"), data.get("yt_video_id", ""),
|
|
data.get("record_month", ""), views, data.get("watch_hours", 0.0),
|
|
revenue, rpm, data.get("country", ""), data.get("source", "manual")),
|
|
)
|
|
row = conn.execute("SELECT * FROM revenue_records WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _rr_row_to_dict(row)
|
|
|
|
|
|
def get_all_revenue_records(yt_video_id: str = None, year_month: str = None) -> list:
|
|
with _conn() as conn:
|
|
q = "SELECT * FROM revenue_records WHERE 1=1"
|
|
params: list = []
|
|
if yt_video_id:
|
|
q += " AND yt_video_id=?"
|
|
params.append(yt_video_id)
|
|
if year_month:
|
|
q += " AND record_month=?"
|
|
params.append(year_month)
|
|
q += " ORDER BY record_month DESC"
|
|
rows = conn.execute(q, params).fetchall()
|
|
return [_rr_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def update_revenue_record(record_id: int, data: dict) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
cur = _rr_row_to_dict(row)
|
|
views = data.get("views", cur["views"])
|
|
revenue = data.get("revenue_usd", cur["revenue_usd"])
|
|
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
|
|
conn.execute(
|
|
"""UPDATE revenue_records
|
|
SET yt_video_id=?, record_month=?, views=?, watch_hours=?,
|
|
revenue_usd=?, rpm_usd=?, country=?, source=?
|
|
WHERE id=?""",
|
|
(data.get("yt_video_id", cur["yt_video_id"]),
|
|
data.get("record_month", cur["record_month"]),
|
|
views, data.get("watch_hours", cur["watch_hours"]),
|
|
revenue, rpm,
|
|
data.get("country", cur["country"]),
|
|
data.get("source", cur["source"]),
|
|
record_id),
|
|
)
|
|
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
|
|
return _rr_row_to_dict(row)
|
|
|
|
|
|
def delete_revenue_record(record_id: int) -> bool:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT id FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute("DELETE FROM revenue_records WHERE id = ?", (record_id,))
|
|
return True
|
|
|
|
|
|
def get_revenue_dashboard() -> dict:
|
|
with _conn() as conn:
|
|
total = conn.execute(
|
|
"SELECT SUM(revenue_usd) as total, SUM(views) as views, SUM(watch_hours) as hours FROM revenue_records"
|
|
).fetchone()
|
|
by_month = conn.execute(
|
|
"""SELECT record_month, SUM(revenue_usd) as revenue, SUM(views) as views,
|
|
CASE WHEN SUM(views) > 0 THEN ROUND(SUM(revenue_usd) / SUM(views) * 1000, 4) ELSE 0.0 END as avg_rpm FROM revenue_records
|
|
GROUP BY record_month ORDER BY record_month DESC LIMIT 12"""
|
|
).fetchall()
|
|
by_country = conn.execute(
|
|
"""SELECT country, SUM(revenue_usd) as revenue, SUM(views) as views
|
|
FROM revenue_records WHERE country != ''
|
|
GROUP BY country ORDER BY revenue DESC LIMIT 10"""
|
|
).fetchall()
|
|
return {
|
|
"total_revenue_usd": total["total"] or 0.0,
|
|
"total_views": total["views"] or 0,
|
|
"total_watch_hours": total["hours"] or 0.0,
|
|
"by_month": [dict(r) for r in by_month],
|
|
"by_country": [dict(r) for r in by_country],
|
|
}
|
|
|
|
|
|
# ── market_trends CRUD ────────────────────────────────────────────────────────
|
|
|
|
def insert_market_trends(trends: list) -> None:
|
|
with _conn() as conn:
|
|
conn.executemany(
|
|
"""INSERT INTO market_trends (source, country, genre, keyword, score, rank, metadata)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
[(t.get("source",""), t.get("country",""), t.get("genre",""),
|
|
t.get("keyword",""), t.get("score", 0.0), t.get("rank"),
|
|
json.dumps(t.get("metadata", {})))
|
|
for t in trends],
|
|
)
|
|
|
|
|
|
def get_market_trends(
|
|
country: str = None, genre: str = None, source: str = None, days: int = 7
|
|
) -> list:
|
|
with _conn() as conn:
|
|
q = "SELECT * FROM market_trends WHERE collected_at >= datetime('now', ?)"
|
|
params: list = [f"-{days} days"]
|
|
if country:
|
|
q += " AND country=?"; params.append(country)
|
|
if genre:
|
|
q += " AND genre=?"; params.append(genre)
|
|
if source:
|
|
q += " AND source=?"; params.append(source)
|
|
q += " ORDER BY collected_at DESC LIMIT 500"
|
|
rows = conn.execute(q, params).fetchall()
|
|
return [
|
|
{"id": r["id"], "source": r["source"], "country": r["country"],
|
|
"genre": r["genre"], "keyword": r["keyword"], "score": r["score"],
|
|
"rank": r["rank"], "metadata": json.loads(r["metadata"]),
|
|
"collected_at": r["collected_at"]}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
# ── trend_reports CRUD ────────────────────────────────────────────────────────
|
|
|
|
def upsert_trend_report(data: dict) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""INSERT INTO trend_reports
|
|
(report_date, top_genres, top_keywords, recommended_styles, insights)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(report_date) DO UPDATE SET
|
|
top_genres=excluded.top_genres,
|
|
top_keywords=excluded.top_keywords,
|
|
recommended_styles=excluded.recommended_styles,
|
|
insights=excluded.insights""",
|
|
(data["report_date"], json.dumps(data["top_genres"]),
|
|
json.dumps(data["top_keywords"]), json.dumps(data["recommended_styles"]),
|
|
data["insights"]),
|
|
)
|
|
|
|
|
|
def get_latest_trend_report() -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT 1"
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"id": row["id"],
|
|
"report_date": row["report_date"],
|
|
"top_genres": json.loads(row["top_genres"]),
|
|
"top_keywords": json.loads(row["top_keywords"]),
|
|
"recommended_styles": json.loads(row["recommended_styles"]),
|
|
"insights": row["insights"],
|
|
"created_at": row["created_at"],
|
|
}
|
|
|
|
|
|
def get_trend_reports(limit: int = 10) -> list:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"id": r["id"],
|
|
"report_date": r["report_date"],
|
|
"top_genres": json.loads(r["top_genres"] or "[]"),
|
|
"recommended_styles": json.loads(r["recommended_styles"] or "[]"),
|
|
"insights": (r["insights"] or "")[:100],
|
|
"created_at": r["created_at"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
# ── Compile Jobs ─────────────────────────────────────────
|
|
|
|
def create_compile_job(title: str, track_ids: list, crossfade_sec: float) -> int:
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO compile_jobs (title, track_ids, crossfade_sec) VALUES (?,?,?)",
|
|
(title, json.dumps(track_ids), crossfade_sec),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def get_compile_jobs() -> list:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT id, title, track_ids, crossfade_sec, status, duration_sec, created_at "
|
|
"FROM compile_jobs ORDER BY created_at DESC LIMIT 50"
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"track_ids": json.loads(r["track_ids"]),
|
|
"crossfade_sec": r["crossfade_sec"],
|
|
"status": r["status"],
|
|
"duration_sec": r["duration_sec"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def get_compile_job(job_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute(
|
|
"SELECT * FROM compile_jobs WHERE id = ?", (job_id,)
|
|
).fetchone()
|
|
if not r:
|
|
return None
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"track_ids": json.loads(r["track_ids"]),
|
|
"crossfade_sec": r["crossfade_sec"],
|
|
"status": r["status"],
|
|
"output_path": r["output_path"],
|
|
"duration_sec": r["duration_sec"],
|
|
"error": r["error"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
|
|
|
|
def update_compile_job(job_id: int, **kwargs) -> None:
|
|
allowed = {"status", "output_path", "duration_sec", "error"}
|
|
fields = {k: v for k, v in kwargs.items() if k in allowed}
|
|
if not fields:
|
|
return
|
|
set_clause = ", ".join(f"{k} = ?" for k in fields)
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE compile_jobs SET {set_clause} WHERE id = ?",
|
|
(*fields.values(), job_id),
|
|
)
|
|
|
|
|
|
def delete_compile_job(job_id: int) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("DELETE FROM compile_jobs WHERE id = ?", (job_id,))
|
|
|
|
|
|
# ── YouTube pipeline helpers ──────────────────────────────────────────────────
|
|
|
|
# update_pipeline_state: state/state_started_at/updated_at은 자동, 그 외 허용 컬럼 화이트리스트
|
|
_PIPELINE_STATE_EXTRA_COLS = frozenset({
|
|
"cover_url",
|
|
"video_url",
|
|
"thumbnail_url",
|
|
"metadata_json",
|
|
"review_json",
|
|
"youtube_video_id",
|
|
"cancelled_at",
|
|
"failed_reason",
|
|
"last_telegram_msg_ids",
|
|
"feedback_count_per_step",
|
|
})
|
|
|
|
# update_pipeline_job 허용 컬럼 화이트리스트
|
|
_PIPELINE_JOB_COLS = frozenset({
|
|
"status",
|
|
"error",
|
|
"duration_ms",
|
|
"started_at",
|
|
"finished_at",
|
|
})
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.utcnow().isoformat(timespec="seconds")
|
|
|
|
|
|
def _parse_pipeline_row(row: sqlite3.Row) -> Dict[str, Any]:
|
|
"""video_pipelines의 sqlite3.Row를 dict로 파싱.
|
|
|
|
JSON 컬럼을 디코드하고, metadata/review를 호환을 위해 추가로 노출한다.
|
|
get_pipeline / list_pipelines가 공유.
|
|
"""
|
|
d = dict(row)
|
|
d["feedback_count_per_step"] = json.loads(d["feedback_count_per_step"] or "{}")
|
|
d["last_telegram_msg_ids"] = json.loads(d["last_telegram_msg_ids"] or "{}")
|
|
if d.get("metadata_json"):
|
|
d["metadata"] = json.loads(d["metadata_json"])
|
|
if d.get("review_json"):
|
|
d["review"] = json.loads(d["review_json"])
|
|
return d
|
|
|
|
|
|
def create_pipeline(track_id: int) -> int:
|
|
with _conn() as conn:
|
|
now = _now()
|
|
cur = conn.execute("""
|
|
INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at)
|
|
VALUES (?, 'created', ?, ?, ?)
|
|
""", (track_id, now, now, now))
|
|
return cur.lastrowid
|
|
|
|
|
|
def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("""
|
|
SELECT vp.*, ml.title AS track_title
|
|
FROM video_pipelines vp
|
|
LEFT JOIN music_library ml ON ml.id = vp.track_id
|
|
WHERE vp.id = ?
|
|
""", (pid,)).fetchone()
|
|
if not row:
|
|
return None
|
|
return _parse_pipeline_row(row)
|
|
|
|
|
|
def update_pipeline_state(pid: int, state: str, **fields) -> None:
|
|
"""파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다.
|
|
|
|
허용 컬럼 화이트리스트(_PIPELINE_STATE_EXTRA_COLS)에 없는 키는 ValueError.
|
|
"""
|
|
unknown = set(fields) - _PIPELINE_STATE_EXTRA_COLS
|
|
if unknown:
|
|
raise ValueError(f"unknown columns for update_pipeline_state: {sorted(unknown)}")
|
|
|
|
now = _now()
|
|
cols = ["state = ?", "state_started_at = ?", "updated_at = ?"]
|
|
vals: List[Any] = [state, now, now]
|
|
for k, v in fields.items():
|
|
cols.append(f"{k} = ?")
|
|
vals.append(v)
|
|
vals.append(pid)
|
|
with _conn() as conn:
|
|
conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals)
|
|
|
|
|
|
def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]:
|
|
sql = """
|
|
SELECT vp.*, ml.title AS track_title
|
|
FROM video_pipelines vp
|
|
LEFT JOIN music_library ml ON ml.id = vp.track_id
|
|
"""
|
|
if active_only:
|
|
sql += " WHERE vp.state NOT IN ('published','cancelled','failed','awaiting_manual')"
|
|
sql += " ORDER BY vp.created_at DESC"
|
|
with _conn() as conn:
|
|
rows = conn.execute(sql).fetchall()
|
|
return [_parse_pipeline_row(r) for r in rows]
|
|
|
|
|
|
def increment_feedback_count(pid: int, step: str) -> int:
|
|
"""원자적으로 feedback_count_per_step.<step>를 +1 한 뒤 새 값을 반환.
|
|
|
|
json1 확장(SQLite 3.38+)을 사용해 read-modify-write 경합을 제거한다.
|
|
"""
|
|
now = _now()
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE video_pipelines
|
|
SET feedback_count_per_step = json_set(
|
|
feedback_count_per_step,
|
|
'$.' || ?,
|
|
COALESCE(json_extract(feedback_count_per_step, '$.' || ?), 0) + 1
|
|
),
|
|
updated_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(step, step, now, pid),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT json_extract(feedback_count_per_step, '$.' || ?) AS c "
|
|
"FROM video_pipelines WHERE id = ?",
|
|
(step, pid),
|
|
).fetchone()
|
|
return int(row["c"]) if row and row["c"] is not None else 0
|
|
|
|
|
|
def record_feedback(pid: int, step: str, feedback_text: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("""
|
|
INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (pid, step, feedback_text, _now()))
|
|
|
|
|
|
def get_feedback_history(pid: int) -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("""
|
|
SELECT * FROM pipeline_feedback
|
|
WHERE pipeline_id = ? ORDER BY id DESC
|
|
""", (pid,)).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def create_pipeline_job(pid: int, step: str) -> int:
|
|
with _conn() as conn:
|
|
cur = conn.execute("""
|
|
INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at)
|
|
VALUES (?, ?, 'queued', ?)
|
|
""", (pid, step, _now()))
|
|
return cur.lastrowid
|
|
|
|
|
|
def update_pipeline_job(job_id: int, **fields) -> None:
|
|
"""pipeline_jobs 행을 갱신. 허용 컬럼 화이트리스트 외 키는 ValueError.
|
|
|
|
status가 succeeded/failed로 바뀌면 finished_at을 자동 설정 (호출자가 미지정 시).
|
|
"""
|
|
unknown = set(fields) - _PIPELINE_JOB_COLS
|
|
if unknown:
|
|
raise ValueError(f"unknown columns for update_pipeline_job: {sorted(unknown)}")
|
|
if not fields:
|
|
return
|
|
|
|
if (
|
|
fields.get("status") in ("succeeded", "failed")
|
|
and "finished_at" not in fields
|
|
):
|
|
fields["finished_at"] = _now()
|
|
|
|
cols = ", ".join(f"{k} = ?" for k in fields)
|
|
vals = list(fields.values()) + [job_id]
|
|
with _conn() as conn:
|
|
conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals)
|
|
|
|
|
|
def list_pipeline_jobs(pid: int) -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("""
|
|
SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC
|
|
""", (pid,)).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_youtube_setup() -> Dict[str, Any]:
|
|
"""youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회."""
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone()
|
|
if row is None:
|
|
_seed_default_youtube_setup(conn)
|
|
row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone()
|
|
d = dict(row)
|
|
for k in ("metadata_template_json", "cover_prompts_json",
|
|
"review_weights_json", "visual_defaults_json", "publish_policy_json"):
|
|
d[k.replace("_json", "")] = json.loads(d[k])
|
|
return d
|
|
|
|
|
|
def update_youtube_setup(**kwargs) -> None:
|
|
field_map = {
|
|
"metadata_template": "metadata_template_json",
|
|
"cover_prompts": "cover_prompts_json",
|
|
"review_weights": "review_weights_json",
|
|
"visual_defaults": "visual_defaults_json",
|
|
"publish_policy": "publish_policy_json",
|
|
}
|
|
cols = []
|
|
vals: List[Any] = []
|
|
for k, v in kwargs.items():
|
|
if k in field_map:
|
|
cols.append(f"{field_map[k]} = ?")
|
|
vals.append(json.dumps(v))
|
|
elif k == "review_threshold":
|
|
cols.append("review_threshold = ?")
|
|
vals.append(int(v))
|
|
if not cols:
|
|
return
|
|
cols.append("updated_at = ?")
|
|
vals.append(_now())
|
|
with _conn() as conn:
|
|
conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals)
|
|
|
|
|
|
def upsert_oauth_token(channel_id: str, channel_title: Optional[str],
|
|
avatar_url: Optional[str], refresh_token: str,
|
|
access_token: Optional[str], expires_at: Optional[str]) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("DELETE FROM youtube_oauth_tokens")
|
|
conn.execute("""
|
|
INSERT INTO youtube_oauth_tokens
|
|
(channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now()))
|
|
|
|
|
|
def get_oauth_token() -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def delete_oauth_token() -> None:
|
|
with _conn() as conn:
|
|
conn.execute("DELETE FROM youtube_oauth_tokens")
|