import os import sqlite3 import json from datetime import datetime from typing import Any, Dict, List, Optional DB_PATH = "/app/data/music.db" def _conn() -> sqlite3.Connection: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db() -> None: with _conn() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS music_tasks ( id TEXT PRIMARY KEY, status TEXT NOT NULL DEFAULT 'queued', progress INTEGER NOT NULL DEFAULT 0, message TEXT NOT NULL DEFAULT '', audio_url TEXT, error TEXT, params TEXT NOT NULL DEFAULT '{}', provider TEXT NOT NULL DEFAULT 'local', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_created ON music_tasks(created_at DESC)") conn.execute(""" CREATE TABLE IF NOT EXISTS music_library ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL DEFAULT '', genre TEXT NOT NULL DEFAULT '', moods TEXT NOT NULL DEFAULT '[]', instruments TEXT NOT NULL DEFAULT '[]', duration_sec INTEGER, bpm INTEGER, key TEXT NOT NULL DEFAULT '', scale TEXT NOT NULL DEFAULT '', prompt TEXT NOT NULL DEFAULT '', audio_url TEXT NOT NULL DEFAULT '', file_path TEXT NOT NULL DEFAULT '', task_id TEXT, tags TEXT NOT NULL DEFAULT '[]', provider TEXT NOT NULL DEFAULT 'local', lyrics TEXT NOT NULL DEFAULT '', image_url TEXT NOT NULL DEFAULT '', suno_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_library_created ON music_library(created_at DESC)") conn.execute(""" CREATE TABLE IF NOT EXISTS saved_lyrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL DEFAULT '', text TEXT NOT NULL DEFAULT '', prompt TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_lyrics_created ON saved_lyrics(created_at DESC)") # 기존 테이블 마이그레이션 (컬럼 없으면 추가) for col, default in [ ("provider", "'local'"), ("lyrics", "''"), ("image_url", "''"), ("suno_id", "''"), ("file_hash", "''"), ]: try: conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}") except sqlite3.OperationalError: pass # 이미 존재 try: conn.execute("ALTER TABLE music_tasks ADD COLUMN provider TEXT NOT NULL DEFAULT 'local'") except sqlite3.OperationalError: pass # Phase 1~3 신규 컬럼 마이그레이션 for col, default in [ ("cover_images", "'[]'"), ("wav_url", "''"), ("video_url", "''"), ("stem_urls", "'{}'"), ]: try: conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}") except sqlite3.OperationalError: pass # ── video_projects 테이블 ───────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS video_projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, track_id INTEGER, format TEXT NOT NULL DEFAULT 'visualizer', status TEXT NOT NULL DEFAULT 'pending', output_path TEXT NOT NULL DEFAULT '', output_url TEXT NOT NULL DEFAULT '', thumbnail_path TEXT NOT NULL DEFAULT '', target_countries TEXT NOT NULL DEFAULT '[]', yt_title TEXT NOT NULL DEFAULT '', yt_description TEXT NOT NULL DEFAULT '', yt_tags TEXT NOT NULL DEFAULT '[]', render_params TEXT NOT NULL DEFAULT '{}', error TEXT, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), completed_at TEXT ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_track ON video_projects(track_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_status ON video_projects(status)") # ── revenue_records 테이블 ──────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS revenue_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_project_id INTEGER, yt_video_id TEXT NOT NULL DEFAULT '', record_month TEXT NOT NULL DEFAULT '', views INTEGER NOT NULL DEFAULT 0, watch_hours REAL NOT NULL DEFAULT 0.0, revenue_usd REAL NOT NULL DEFAULT 0.0, rpm_usd REAL NOT NULL DEFAULT 0.0, country TEXT NOT NULL DEFAULT '', source TEXT NOT NULL DEFAULT 'manual', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), UNIQUE(yt_video_id, record_month, country) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_rr_month ON revenue_records(record_month DESC)") # ── market_trends 테이블 ────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS market_trends ( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL DEFAULT '', country TEXT NOT NULL DEFAULT '', genre TEXT NOT NULL DEFAULT '', keyword TEXT NOT NULL DEFAULT '', score REAL NOT NULL DEFAULT 0.0, rank INTEGER, metadata TEXT NOT NULL DEFAULT '{}', collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute( "CREATE INDEX IF NOT EXISTS idx_mt_country_source " "ON market_trends(country, source, collected_at DESC)" ) # ── trend_reports 테이블 ────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS trend_reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, report_date TEXT UNIQUE NOT NULL DEFAULT '', top_genres TEXT NOT NULL DEFAULT '[]', top_keywords TEXT NOT NULL DEFAULT '[]', recommended_styles TEXT NOT NULL DEFAULT '[]', insights TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) # ── compile_jobs 테이블 ─────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS compile_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL DEFAULT '', track_ids TEXT NOT NULL DEFAULT '[]', crossfade_sec REAL NOT NULL DEFAULT 3.0, status TEXT NOT NULL DEFAULT 'pending', output_path TEXT, duration_sec REAL, error TEXT, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) # ── YouTube pipeline 테이블 (5개) ───────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS video_pipelines ( id INTEGER PRIMARY KEY AUTOINCREMENT, track_id INTEGER NOT NULL, state TEXT NOT NULL DEFAULT 'created', state_started_at TEXT NOT NULL, cover_url TEXT, video_url TEXT, thumbnail_url TEXT, metadata_json TEXT, review_json TEXT, youtube_video_id TEXT, feedback_count_per_step TEXT NOT NULL DEFAULT '{}', last_telegram_msg_ids TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, cancelled_at TEXT, failed_reason TEXT ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS pipeline_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, pipeline_id INTEGER NOT NULL, step TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'queued', error TEXT, started_at TEXT, finished_at TEXT, duration_ms INTEGER, FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS pipeline_feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, pipeline_id INTEGER NOT NULL, step TEXT NOT NULL, feedback_text TEXT NOT NULL, received_at TEXT NOT NULL, FOREIGN KEY (pipeline_id) REFERENCES video_pipelines(id) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS youtube_oauth_tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT NOT NULL, channel_title TEXT, avatar_url TEXT, refresh_token TEXT NOT NULL, access_token TEXT, expires_at TEXT, created_at TEXT NOT NULL ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS youtube_setup ( id INTEGER PRIMARY KEY AUTOINCREMENT CHECK (id = 1), metadata_template_json TEXT NOT NULL, cover_prompts_json TEXT NOT NULL, review_weights_json TEXT NOT NULL, review_threshold INTEGER NOT NULL DEFAULT 60, visual_defaults_json TEXT NOT NULL, publish_policy_json TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # 기본 setup 1행 보장 cnt_row = conn.execute("SELECT COUNT(*) AS c FROM youtube_setup").fetchone() if cnt_row["c"] == 0: _seed_default_youtube_setup(conn) def _seed_default_youtube_setup(conn: sqlite3.Connection) -> None: """youtube_setup 테이블에 기본 1행을 삽입한다. init_db()와 get_youtube_setup() (행이 사라진 경우 self-heal)가 공유한다. """ defaults = ( json.dumps({ "title": "[{genre}] {title} | {bpm}BPM", "description": "{title}\n\n장르: {genre}\nBPM: {bpm}\nKey: {key}\n", "tags": ["lofi", "chill", "instrumental"], "category_id": 10, }), json.dumps({ "lo-fi": "moody anime cityscape at dusk, lofi aesthetic", "phonk": "dark drift car aesthetic, neon, phonk vibe", "ambient": "ethereal mountain landscape, ambient mood", "default": "abstract music album cover art", }), json.dumps({"meta": 25, "policy": 30, "viewer": 25, "trend": 20}), 60, json.dumps({"resolution": "1920x1080", "style": "visualizer", "background": "ai_cover"}), json.dumps({"mode": "manual", "privacy": "private", "schedule_time": None}), datetime.utcnow().isoformat(timespec="seconds"), ) conn.execute(""" INSERT INTO youtube_setup (metadata_template_json, cover_prompts_json, review_weights_json, review_threshold, visual_defaults_json, publish_policy_json, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, defaults) # ── music_tasks CRUD ────────────────────────────────────────────────────────── def _task_row_to_dict(r) -> Dict[str, Any]: return { "task_id": r["id"], "status": r["status"], "progress": r["progress"], "message": r["message"], "audio_url": r["audio_url"], "error": r["error"], "params": json.loads(r["params"]), "provider": r["provider"] if "provider" in r.keys() else "local", "created_at": r["created_at"], "updated_at": r["updated_at"], } def create_task(task_id: str, params: Dict[str, Any], provider: str = "local") -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO music_tasks (id, params, provider) VALUES (?, ?, ?)", (task_id, json.dumps(params), provider), ) row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone() return _task_row_to_dict(row) def update_task( task_id: str, status: str, progress: int, message: str, audio_url: str = None, error: str = None, ) -> None: with _conn() as conn: conn.execute( """ UPDATE music_tasks SET status = ?, progress = ?, message = ?, audio_url = ?, error = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ? """, (status, progress, message, audio_url, error, task_id), ) def get_task(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone() return _task_row_to_dict(row) if row else None # ── music_library CRUD ──────────────────────────────────────────────────────── def _track_row_to_dict(r) -> Dict[str, Any]: keys = r.keys() return { "id": r["id"], "title": r["title"], "genre": r["genre"], "moods": json.loads(r["moods"]) if r["moods"] else [], "instruments": json.loads(r["instruments"]) if r["instruments"] else [], "duration_sec": r["duration_sec"], "bpm": r["bpm"], "key": r["key"], "scale": r["scale"], "prompt": r["prompt"], "audio_url": r["audio_url"], "file_path": r["file_path"], "task_id": r["task_id"], "tags": json.loads(r["tags"]) if r["tags"] else [], "provider": r["provider"] if "provider" in keys else "local", "lyrics": r["lyrics"] if "lyrics" in keys else "", "image_url": r["image_url"] if "image_url" in keys else "", "suno_id": r["suno_id"] if "suno_id" in keys else "", "file_hash": r["file_hash"] if "file_hash" in keys else "", "cover_images": json.loads(r["cover_images"]) if "cover_images" in keys and r["cover_images"] else [], "wav_url": r["wav_url"] if "wav_url" in keys else "", "video_url": r["video_url"] if "video_url" in keys else "", "stem_urls": json.loads(r["stem_urls"]) if "stem_urls" in keys and r["stem_urls"] else {}, "created_at": r["created_at"], } def get_all_tracks() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM music_library ORDER BY created_at DESC").fetchall() return [_track_row_to_dict(r) for r in rows] def add_track(data: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( """ INSERT INTO music_library (title, genre, moods, instruments, duration_sec, bpm, key, scale, prompt, audio_url, file_path, task_id, tags, provider, lyrics, image_url, suno_id, file_hash) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data.get("title", ""), data.get("genre", ""), json.dumps(data.get("moods", [])), json.dumps(data.get("instruments", [])), data.get("duration_sec"), data.get("bpm"), data.get("key", ""), data.get("scale", ""), data.get("prompt", ""), data.get("audio_url", ""), data.get("file_path", ""), data.get("task_id"), json.dumps(data.get("tags", [])), data.get("provider", "local"), data.get("lyrics", ""), data.get("image_url", ""), data.get("suno_id", ""), data.get("file_hash", ""), ), ) row = conn.execute("SELECT * FROM music_library WHERE rowid = last_insert_rowid()").fetchone() return _track_row_to_dict(row) def delete_track(track_id: int) -> bool: with _conn() as conn: # 파일 경로 먼저 조회 (삭제 후 파일도 지울 수 있도록) row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone() if not row: return False conn.execute("DELETE FROM music_library WHERE id = ?", (track_id,)) return True def get_track_by_task_id(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM music_library WHERE task_id = ?", (task_id,)).fetchone() return _track_row_to_dict(row) if row else None def update_track_duration(track_id: int, duration_sec: int) -> None: with _conn() as conn: conn.execute( "UPDATE music_library SET duration_sec = ? WHERE id = ? AND duration_sec IS NULL", (duration_sec, track_id), ) def update_track_file_info(track_id: int, title: str, audio_url: str, file_path: str) -> None: """파일 rename 시 파일 관련 정보만 업데이트 (태그 등 메타데이터 보존).""" with _conn() as conn: conn.execute( "UPDATE music_library SET title=?, audio_url=?, file_path=? WHERE id=?", (title, audio_url, file_path, track_id), ) def update_track_hash(track_id: int, file_hash: str) -> None: """트랙의 file_hash를 업데이트.""" with _conn() as conn: conn.execute( "UPDATE music_library SET file_hash=? WHERE id=?", (file_hash, track_id), ) def get_track_file_path(track_id: int) -> Optional[str]: with _conn() as conn: row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone() return row["file_path"] if row else None # ── saved_lyrics CRUD ──────────────────────────────────────────────────────── def _lyrics_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "title": r["title"], "text": r["text"], "prompt": r["prompt"], "created_at": r["created_at"], "updated_at": r["updated_at"], } def get_all_lyrics() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM saved_lyrics ORDER BY created_at DESC").fetchall() return [_lyrics_row_to_dict(r) for r in rows] def add_lyrics(data: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO saved_lyrics (title, text, prompt) VALUES (?, ?, ?)", (data.get("title", ""), data.get("text", ""), data.get("prompt", "")), ) row = conn.execute("SELECT * FROM saved_lyrics WHERE rowid = last_insert_rowid()").fetchone() return _lyrics_row_to_dict(row) def update_lyrics(lyrics_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: with _conn() as conn: fields = [] values = [] for k in ("title", "text", "prompt"): if k in data: fields.append(f"{k} = ?") values.append(data[k]) if not fields: return None fields.append("updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')") values.append(lyrics_id) conn.execute(f"UPDATE saved_lyrics SET {', '.join(fields)} WHERE id = ?", values) row = conn.execute("SELECT * FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone() return _lyrics_row_to_dict(row) if row else None def update_track_cover_images(track_id: int, images: list) -> None: with _conn() as conn: conn.execute("UPDATE music_library SET cover_images=? WHERE id=?", (json.dumps(images), track_id)) def update_track_wav_url(track_id: int, wav_url: str) -> None: with _conn() as conn: conn.execute("UPDATE music_library SET wav_url=? WHERE id=?", (wav_url, track_id)) def update_track_video_url(track_id: int, video_url: str) -> None: with _conn() as conn: conn.execute("UPDATE music_library SET video_url=? WHERE id=?", (video_url, track_id)) def update_track_stem_urls(track_id: int, stems: dict) -> None: with _conn() as conn: conn.execute("UPDATE music_library SET stem_urls=? WHERE id=?", (json.dumps(stems), track_id)) def delete_lyrics(lyrics_id: int) -> bool: with _conn() as conn: row = conn.execute("SELECT id FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone() if not row: return False conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,)) return True # ── video_projects CRUD ─────────────────────────────────────────────────────── def _vp_row_to_dict(r) -> dict: return { "id": r["id"], "track_id": r["track_id"], "format": r["format"], "status": r["status"], "output_path": r["output_path"], "output_url": r["output_url"], "thumbnail_path": r["thumbnail_path"], "target_countries": json.loads(r["target_countries"]) if r["target_countries"] else [], "yt_title": r["yt_title"], "yt_description": r["yt_description"], "yt_tags": json.loads(r["yt_tags"]) if r["yt_tags"] else [], "render_params": json.loads(r["render_params"]) if r["render_params"] else {}, "error": r["error"], "created_at": r["created_at"], "completed_at": r["completed_at"], } def create_video_project(data: dict) -> dict: with _conn() as conn: conn.execute( """INSERT INTO video_projects (track_id, format, target_countries, render_params) VALUES (?, ?, ?, ?)""", (data.get("track_id"), data.get("format", "visualizer"), json.dumps(data.get("target_countries", [])), json.dumps(data.get("render_params", {}))), ) row = conn.execute("SELECT * FROM video_projects WHERE rowid = last_insert_rowid()").fetchone() return _vp_row_to_dict(row) def get_video_project(project_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM video_projects WHERE id = ?", (project_id,)).fetchone() return _vp_row_to_dict(row) if row else None def get_all_video_projects() -> list: with _conn() as conn: rows = conn.execute("SELECT * FROM video_projects ORDER BY created_at DESC").fetchall() return [_vp_row_to_dict(r) for r in rows] def update_video_project_status( project_id: int, status: str, output_path: str = "", output_url: str = "", thumbnail_path: str = "", yt_title: str = "", yt_description: str = "", yt_tags: list = None, error: str = None, ) -> None: completed_at_expr = ( "strftime('%Y-%m-%dT%H:%M:%fZ','now')" if status in ("done", "failed") else "NULL" ) with _conn() as conn: conn.execute( f"""UPDATE video_projects SET status=?, output_path=?, output_url=?, thumbnail_path=?, yt_title=?, yt_description=?, yt_tags=?, error=?, completed_at={completed_at_expr} WHERE id=?""", (status, output_path, output_url, thumbnail_path, yt_title, yt_description, json.dumps(yt_tags or []), error, project_id), ) def delete_video_project(project_id: int) -> bool: with _conn() as conn: row = conn.execute("SELECT id FROM video_projects WHERE id = ?", (project_id,)).fetchone() if not row: return False conn.execute("DELETE FROM video_projects WHERE id = ?", (project_id,)) return True def get_track_by_id(track_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM music_library WHERE id = ?", (track_id,)).fetchone() return _track_row_to_dict(row) if row else None # ── revenue_records CRUD ────────────────────────────────────────────────────── def _rr_row_to_dict(r) -> dict: return { "id": r["id"], "video_project_id": r["video_project_id"], "yt_video_id": r["yt_video_id"], "record_month": r["record_month"], "views": r["views"], "watch_hours": r["watch_hours"], "revenue_usd": r["revenue_usd"], "rpm_usd": r["rpm_usd"], "country": r["country"], "source": r["source"], "created_at": r["created_at"], } def create_revenue_record(data: dict) -> dict: views = data.get("views", 0) revenue = data.get("revenue_usd", 0.0) rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0 with _conn() as conn: conn.execute( """INSERT INTO revenue_records (video_project_id, yt_video_id, record_month, views, watch_hours, revenue_usd, rpm_usd, country, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", (data.get("video_project_id"), data.get("yt_video_id", ""), data.get("record_month", ""), views, data.get("watch_hours", 0.0), revenue, rpm, data.get("country", ""), data.get("source", "manual")), ) row = conn.execute("SELECT * FROM revenue_records WHERE rowid = last_insert_rowid()").fetchone() return _rr_row_to_dict(row) def get_all_revenue_records(yt_video_id: str = None, year_month: str = None) -> list: with _conn() as conn: q = "SELECT * FROM revenue_records WHERE 1=1" params: list = [] if yt_video_id: q += " AND yt_video_id=?" params.append(yt_video_id) if year_month: q += " AND record_month=?" params.append(year_month) q += " ORDER BY record_month DESC" rows = conn.execute(q, params).fetchall() return [_rr_row_to_dict(r) for r in rows] def update_revenue_record(record_id: int, data: dict) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone() if not row: return None cur = _rr_row_to_dict(row) views = data.get("views", cur["views"]) revenue = data.get("revenue_usd", cur["revenue_usd"]) rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0 conn.execute( """UPDATE revenue_records SET yt_video_id=?, record_month=?, views=?, watch_hours=?, revenue_usd=?, rpm_usd=?, country=?, source=? WHERE id=?""", (data.get("yt_video_id", cur["yt_video_id"]), data.get("record_month", cur["record_month"]), views, data.get("watch_hours", cur["watch_hours"]), revenue, rpm, data.get("country", cur["country"]), data.get("source", cur["source"]), record_id), ) row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone() return _rr_row_to_dict(row) def delete_revenue_record(record_id: int) -> bool: with _conn() as conn: row = conn.execute("SELECT id FROM revenue_records WHERE id = ?", (record_id,)).fetchone() if not row: return False conn.execute("DELETE FROM revenue_records WHERE id = ?", (record_id,)) return True def get_revenue_dashboard() -> dict: with _conn() as conn: total = conn.execute( "SELECT SUM(revenue_usd) as total, SUM(views) as views, SUM(watch_hours) as hours FROM revenue_records" ).fetchone() by_month = conn.execute( """SELECT record_month, SUM(revenue_usd) as revenue, SUM(views) as views, CASE WHEN SUM(views) > 0 THEN ROUND(SUM(revenue_usd) / SUM(views) * 1000, 4) ELSE 0.0 END as avg_rpm FROM revenue_records GROUP BY record_month ORDER BY record_month DESC LIMIT 12""" ).fetchall() by_country = conn.execute( """SELECT country, SUM(revenue_usd) as revenue, SUM(views) as views FROM revenue_records WHERE country != '' GROUP BY country ORDER BY revenue DESC LIMIT 10""" ).fetchall() return { "total_revenue_usd": total["total"] or 0.0, "total_views": total["views"] or 0, "total_watch_hours": total["hours"] or 0.0, "by_month": [dict(r) for r in by_month], "by_country": [dict(r) for r in by_country], } # ── market_trends CRUD ──────────────────────────────────────────────────────── def insert_market_trends(trends: list) -> None: with _conn() as conn: conn.executemany( """INSERT INTO market_trends (source, country, genre, keyword, score, rank, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)""", [(t.get("source",""), t.get("country",""), t.get("genre",""), t.get("keyword",""), t.get("score", 0.0), t.get("rank"), json.dumps(t.get("metadata", {}))) for t in trends], ) def get_market_trends( country: str = None, genre: str = None, source: str = None, days: int = 7 ) -> list: with _conn() as conn: q = "SELECT * FROM market_trends WHERE collected_at >= datetime('now', ?)" params: list = [f"-{days} days"] if country: q += " AND country=?"; params.append(country) if genre: q += " AND genre=?"; params.append(genre) if source: q += " AND source=?"; params.append(source) q += " ORDER BY collected_at DESC LIMIT 500" rows = conn.execute(q, params).fetchall() return [ {"id": r["id"], "source": r["source"], "country": r["country"], "genre": r["genre"], "keyword": r["keyword"], "score": r["score"], "rank": r["rank"], "metadata": json.loads(r["metadata"]), "collected_at": r["collected_at"]} for r in rows ] # ── trend_reports CRUD ──────────────────────────────────────────────────────── def upsert_trend_report(data: dict) -> None: with _conn() as conn: conn.execute( """INSERT INTO trend_reports (report_date, top_genres, top_keywords, recommended_styles, insights) VALUES (?, ?, ?, ?, ?) ON CONFLICT(report_date) DO UPDATE SET top_genres=excluded.top_genres, top_keywords=excluded.top_keywords, recommended_styles=excluded.recommended_styles, insights=excluded.insights""", (data["report_date"], json.dumps(data["top_genres"]), json.dumps(data["top_keywords"]), json.dumps(data["recommended_styles"]), data["insights"]), ) def get_latest_trend_report() -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute( "SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT 1" ).fetchone() if not row: return None return { "id": row["id"], "report_date": row["report_date"], "top_genres": json.loads(row["top_genres"]), "top_keywords": json.loads(row["top_keywords"]), "recommended_styles": json.loads(row["recommended_styles"]), "insights": row["insights"], "created_at": row["created_at"], } def get_trend_reports(limit: int = 10) -> list: with _conn() as conn: rows = conn.execute( "SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT ?", (limit,) ).fetchall() return [ { "id": r["id"], "report_date": r["report_date"], "top_genres": json.loads(r["top_genres"] or "[]"), "recommended_styles": json.loads(r["recommended_styles"] or "[]"), "insights": (r["insights"] or "")[:100], "created_at": r["created_at"], } for r in rows ] # ── Compile Jobs ───────────────────────────────────────── def create_compile_job(title: str, track_ids: list, crossfade_sec: float) -> int: with _conn() as conn: cur = conn.execute( "INSERT INTO compile_jobs (title, track_ids, crossfade_sec) VALUES (?,?,?)", (title, json.dumps(track_ids), crossfade_sec), ) return cur.lastrowid def get_compile_jobs() -> list: with _conn() as conn: rows = conn.execute( "SELECT id, title, track_ids, crossfade_sec, status, duration_sec, created_at " "FROM compile_jobs ORDER BY created_at DESC LIMIT 50" ).fetchall() return [ { "id": r["id"], "title": r["title"], "track_ids": json.loads(r["track_ids"]), "crossfade_sec": r["crossfade_sec"], "status": r["status"], "duration_sec": r["duration_sec"], "created_at": r["created_at"], } for r in rows ] def get_compile_job(job_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute( "SELECT * FROM compile_jobs WHERE id = ?", (job_id,) ).fetchone() if not r: return None return { "id": r["id"], "title": r["title"], "track_ids": json.loads(r["track_ids"]), "crossfade_sec": r["crossfade_sec"], "status": r["status"], "output_path": r["output_path"], "duration_sec": r["duration_sec"], "error": r["error"], "created_at": r["created_at"], } def update_compile_job(job_id: int, **kwargs) -> None: allowed = {"status", "output_path", "duration_sec", "error"} fields = {k: v for k, v in kwargs.items() if k in allowed} if not fields: return set_clause = ", ".join(f"{k} = ?" for k in fields) with _conn() as conn: conn.execute( f"UPDATE compile_jobs SET {set_clause} WHERE id = ?", (*fields.values(), job_id), ) def delete_compile_job(job_id: int) -> None: with _conn() as conn: conn.execute("DELETE FROM compile_jobs WHERE id = ?", (job_id,)) # ── YouTube pipeline helpers ────────────────────────────────────────────────── # update_pipeline_state: state/state_started_at/updated_at은 자동, 그 외 허용 컬럼 화이트리스트 _PIPELINE_STATE_EXTRA_COLS = frozenset({ "cover_url", "video_url", "thumbnail_url", "metadata_json", "review_json", "youtube_video_id", "cancelled_at", "failed_reason", "last_telegram_msg_ids", "feedback_count_per_step", }) # update_pipeline_job 허용 컬럼 화이트리스트 _PIPELINE_JOB_COLS = frozenset({ "status", "error", "duration_ms", "started_at", "finished_at", }) def _now() -> str: return datetime.utcnow().isoformat(timespec="seconds") def _parse_pipeline_row(row: sqlite3.Row) -> Dict[str, Any]: """video_pipelines의 sqlite3.Row를 dict로 파싱. JSON 컬럼을 디코드하고, metadata/review를 호환을 위해 추가로 노출한다. get_pipeline / list_pipelines가 공유. """ d = dict(row) d["feedback_count_per_step"] = json.loads(d["feedback_count_per_step"] or "{}") d["last_telegram_msg_ids"] = json.loads(d["last_telegram_msg_ids"] or "{}") if d.get("metadata_json"): d["metadata"] = json.loads(d["metadata_json"]) if d.get("review_json"): d["review"] = json.loads(d["review_json"]) return d def create_pipeline(track_id: int) -> int: with _conn() as conn: now = _now() cur = conn.execute(""" INSERT INTO video_pipelines (track_id, state, state_started_at, created_at, updated_at) VALUES (?, 'created', ?, ?, ?) """, (track_id, now, now, now)) return cur.lastrowid def get_pipeline(pid: int) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM video_pipelines WHERE id = ?", (pid,)).fetchone() if not row: return None return _parse_pipeline_row(row) def update_pipeline_state(pid: int, state: str, **fields) -> None: """파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다. 허용 컬럼 화이트리스트(_PIPELINE_STATE_EXTRA_COLS)에 없는 키는 ValueError. """ unknown = set(fields) - _PIPELINE_STATE_EXTRA_COLS if unknown: raise ValueError(f"unknown columns for update_pipeline_state: {sorted(unknown)}") now = _now() cols = ["state = ?", "state_started_at = ?", "updated_at = ?"] vals: List[Any] = [state, now, now] for k, v in fields.items(): cols.append(f"{k} = ?") vals.append(v) vals.append(pid) with _conn() as conn: conn.execute(f"UPDATE video_pipelines SET {', '.join(cols)} WHERE id = ?", vals) def list_pipelines(active_only: bool = False) -> List[Dict[str, Any]]: with _conn() as conn: if active_only: rows = conn.execute(""" SELECT * FROM video_pipelines WHERE state NOT IN ('published','cancelled','failed','awaiting_manual') ORDER BY created_at DESC """).fetchall() else: rows = conn.execute("SELECT * FROM video_pipelines ORDER BY created_at DESC").fetchall() return [_parse_pipeline_row(r) for r in rows] def increment_feedback_count(pid: int, step: str) -> int: """원자적으로 feedback_count_per_step.를 +1 한 뒤 새 값을 반환. json1 확장(SQLite 3.38+)을 사용해 read-modify-write 경합을 제거한다. """ now = _now() with _conn() as conn: conn.execute( """ UPDATE video_pipelines SET feedback_count_per_step = json_set( feedback_count_per_step, '$.' || ?, COALESCE(json_extract(feedback_count_per_step, '$.' || ?), 0) + 1 ), updated_at = ? WHERE id = ? """, (step, step, now, pid), ) row = conn.execute( "SELECT json_extract(feedback_count_per_step, '$.' || ?) AS c " "FROM video_pipelines WHERE id = ?", (step, pid), ).fetchone() return int(row["c"]) if row and row["c"] is not None else 0 def record_feedback(pid: int, step: str, feedback_text: str) -> None: with _conn() as conn: conn.execute(""" INSERT INTO pipeline_feedback (pipeline_id, step, feedback_text, received_at) VALUES (?, ?, ?, ?) """, (pid, step, feedback_text, _now())) def get_feedback_history(pid: int) -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute(""" SELECT * FROM pipeline_feedback WHERE pipeline_id = ? ORDER BY id DESC """, (pid,)).fetchall() return [dict(r) for r in rows] def create_pipeline_job(pid: int, step: str) -> int: with _conn() as conn: cur = conn.execute(""" INSERT INTO pipeline_jobs (pipeline_id, step, status, started_at) VALUES (?, ?, 'queued', ?) """, (pid, step, _now())) return cur.lastrowid def update_pipeline_job(job_id: int, **fields) -> None: """pipeline_jobs 행을 갱신. 허용 컬럼 화이트리스트 외 키는 ValueError. status가 succeeded/failed로 바뀌면 finished_at을 자동 설정 (호출자가 미지정 시). """ unknown = set(fields) - _PIPELINE_JOB_COLS if unknown: raise ValueError(f"unknown columns for update_pipeline_job: {sorted(unknown)}") if not fields: return if ( fields.get("status") in ("succeeded", "failed") and "finished_at" not in fields ): fields["finished_at"] = _now() cols = ", ".join(f"{k} = ?" for k in fields) vals = list(fields.values()) + [job_id] with _conn() as conn: conn.execute(f"UPDATE pipeline_jobs SET {cols} WHERE id = ?", vals) def list_pipeline_jobs(pid: int) -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute(""" SELECT * FROM pipeline_jobs WHERE pipeline_id = ? ORDER BY id ASC """, (pid,)).fetchall() return [dict(r) for r in rows] def get_youtube_setup() -> Dict[str, Any]: """youtube_setup의 기본 1행을 반환. 누락 시 자동 시드 후 재조회.""" with _conn() as conn: row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone() if row is None: _seed_default_youtube_setup(conn) row = conn.execute("SELECT * FROM youtube_setup WHERE id = 1").fetchone() d = dict(row) for k in ("metadata_template_json", "cover_prompts_json", "review_weights_json", "visual_defaults_json", "publish_policy_json"): d[k.replace("_json", "")] = json.loads(d[k]) return d def update_youtube_setup(**kwargs) -> None: field_map = { "metadata_template": "metadata_template_json", "cover_prompts": "cover_prompts_json", "review_weights": "review_weights_json", "visual_defaults": "visual_defaults_json", "publish_policy": "publish_policy_json", } cols = [] vals: List[Any] = [] for k, v in kwargs.items(): if k in field_map: cols.append(f"{field_map[k]} = ?") vals.append(json.dumps(v)) elif k == "review_threshold": cols.append("review_threshold = ?") vals.append(int(v)) if not cols: return cols.append("updated_at = ?") vals.append(_now()) with _conn() as conn: conn.execute(f"UPDATE youtube_setup SET {', '.join(cols)} WHERE id = 1", vals) def upsert_oauth_token(channel_id: str, channel_title: Optional[str], avatar_url: Optional[str], refresh_token: str, access_token: Optional[str], expires_at: Optional[str]) -> None: with _conn() as conn: conn.execute("DELETE FROM youtube_oauth_tokens") conn.execute(""" INSERT INTO youtube_oauth_tokens (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (channel_id, channel_title, avatar_url, refresh_token, access_token, expires_at, _now())) def get_oauth_token() -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM youtube_oauth_tokens ORDER BY id DESC LIMIT 1").fetchone() return dict(row) if row else None def delete_oauth_token() -> None: with _conn() as conn: conn.execute("DELETE FROM youtube_oauth_tokens")