GET /api/music/market/report 응답에 top_genres, recommended_styles를 포함해 히스토리 리포트 클릭 시 장르 차트와 Suno 프롬프트가 비어 보이는 버그 수정. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
710 lines
29 KiB
Python
710 lines
29 KiB
Python
import os
|
|
import sqlite3
|
|
import json
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
DB_PATH = "/app/data/music.db"
|
|
|
|
|
|
def _conn() -> sqlite3.Connection:
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db() -> None:
|
|
with _conn() as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS music_tasks (
|
|
id TEXT PRIMARY KEY,
|
|
status TEXT NOT NULL DEFAULT 'queued',
|
|
progress INTEGER NOT NULL DEFAULT 0,
|
|
message TEXT NOT NULL DEFAULT '',
|
|
audio_url TEXT,
|
|
error TEXT,
|
|
params TEXT NOT NULL DEFAULT '{}',
|
|
provider TEXT NOT NULL DEFAULT 'local',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_created ON music_tasks(created_at DESC)")
|
|
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS music_library (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL DEFAULT '',
|
|
genre TEXT NOT NULL DEFAULT '',
|
|
moods TEXT NOT NULL DEFAULT '[]',
|
|
instruments TEXT NOT NULL DEFAULT '[]',
|
|
duration_sec INTEGER,
|
|
bpm INTEGER,
|
|
key TEXT NOT NULL DEFAULT '',
|
|
scale TEXT NOT NULL DEFAULT '',
|
|
prompt TEXT NOT NULL DEFAULT '',
|
|
audio_url TEXT NOT NULL DEFAULT '',
|
|
file_path TEXT NOT NULL DEFAULT '',
|
|
task_id TEXT,
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
provider TEXT NOT NULL DEFAULT 'local',
|
|
lyrics TEXT NOT NULL DEFAULT '',
|
|
image_url TEXT NOT NULL DEFAULT '',
|
|
suno_id TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_library_created ON music_library(created_at DESC)")
|
|
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS saved_lyrics (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL DEFAULT '',
|
|
text TEXT NOT NULL DEFAULT '',
|
|
prompt TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_lyrics_created ON saved_lyrics(created_at DESC)")
|
|
|
|
# 기존 테이블 마이그레이션 (컬럼 없으면 추가)
|
|
for col, default in [
|
|
("provider", "'local'"), ("lyrics", "''"),
|
|
("image_url", "''"), ("suno_id", "''"),
|
|
("file_hash", "''"),
|
|
]:
|
|
try:
|
|
conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}")
|
|
except sqlite3.OperationalError:
|
|
pass # 이미 존재
|
|
try:
|
|
conn.execute("ALTER TABLE music_tasks ADD COLUMN provider TEXT NOT NULL DEFAULT 'local'")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
# Phase 1~3 신규 컬럼 마이그레이션
|
|
for col, default in [
|
|
("cover_images", "'[]'"),
|
|
("wav_url", "''"),
|
|
("video_url", "''"),
|
|
("stem_urls", "'{}'"),
|
|
]:
|
|
try:
|
|
conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
|
|
# ── video_projects 테이블 ─────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS video_projects (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
track_id INTEGER,
|
|
format TEXT NOT NULL DEFAULT 'visualizer',
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
output_path TEXT NOT NULL DEFAULT '',
|
|
output_url TEXT NOT NULL DEFAULT '',
|
|
thumbnail_path TEXT NOT NULL DEFAULT '',
|
|
target_countries TEXT NOT NULL DEFAULT '[]',
|
|
yt_title TEXT NOT NULL DEFAULT '',
|
|
yt_description TEXT NOT NULL DEFAULT '',
|
|
yt_tags TEXT NOT NULL DEFAULT '[]',
|
|
render_params TEXT NOT NULL DEFAULT '{}',
|
|
error TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
completed_at TEXT
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_track ON video_projects(track_id)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_status ON video_projects(status)")
|
|
|
|
# ── revenue_records 테이블 ────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS revenue_records (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
video_project_id INTEGER,
|
|
yt_video_id TEXT NOT NULL DEFAULT '',
|
|
record_month TEXT NOT NULL DEFAULT '',
|
|
views INTEGER NOT NULL DEFAULT 0,
|
|
watch_hours REAL NOT NULL DEFAULT 0.0,
|
|
revenue_usd REAL NOT NULL DEFAULT 0.0,
|
|
rpm_usd REAL NOT NULL DEFAULT 0.0,
|
|
country TEXT NOT NULL DEFAULT '',
|
|
source TEXT NOT NULL DEFAULT 'manual',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
UNIQUE(yt_video_id, record_month, country)
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_rr_month ON revenue_records(record_month DESC)")
|
|
|
|
# ── market_trends 테이블 ──────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS market_trends (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
source TEXT NOT NULL DEFAULT '',
|
|
country TEXT NOT NULL DEFAULT '',
|
|
genre TEXT NOT NULL DEFAULT '',
|
|
keyword TEXT NOT NULL DEFAULT '',
|
|
score REAL NOT NULL DEFAULT 0.0,
|
|
rank INTEGER,
|
|
metadata TEXT NOT NULL DEFAULT '{}',
|
|
collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_mt_country_source "
|
|
"ON market_trends(country, source, collected_at DESC)"
|
|
)
|
|
|
|
# ── trend_reports 테이블 ──────────────────────────────────────────
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS trend_reports (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
report_date TEXT UNIQUE NOT NULL DEFAULT '',
|
|
top_genres TEXT NOT NULL DEFAULT '[]',
|
|
top_keywords TEXT NOT NULL DEFAULT '[]',
|
|
recommended_styles TEXT NOT NULL DEFAULT '[]',
|
|
insights TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
|
|
|
|
# ── music_tasks CRUD ──────────────────────────────────────────────────────────
|
|
|
|
def _task_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"task_id": r["id"],
|
|
"status": r["status"],
|
|
"progress": r["progress"],
|
|
"message": r["message"],
|
|
"audio_url": r["audio_url"],
|
|
"error": r["error"],
|
|
"params": json.loads(r["params"]),
|
|
"provider": r["provider"] if "provider" in r.keys() else "local",
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def create_task(task_id: str, params: Dict[str, Any], provider: str = "local") -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO music_tasks (id, params, provider) VALUES (?, ?, ?)",
|
|
(task_id, json.dumps(params), provider),
|
|
)
|
|
row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone()
|
|
return _task_row_to_dict(row)
|
|
|
|
|
|
def update_task(
|
|
task_id: str,
|
|
status: str,
|
|
progress: int,
|
|
message: str,
|
|
audio_url: str = None,
|
|
error: str = None,
|
|
) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE music_tasks
|
|
SET status = ?, progress = ?, message = ?, audio_url = ?, error = ?,
|
|
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
|
|
WHERE id = ?
|
|
""",
|
|
(status, progress, message, audio_url, error, task_id),
|
|
)
|
|
|
|
|
|
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone()
|
|
return _task_row_to_dict(row) if row else None
|
|
|
|
|
|
# ── music_library CRUD ────────────────────────────────────────────────────────
|
|
|
|
def _track_row_to_dict(r) -> Dict[str, Any]:
|
|
keys = r.keys()
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"genre": r["genre"],
|
|
"moods": json.loads(r["moods"]) if r["moods"] else [],
|
|
"instruments": json.loads(r["instruments"]) if r["instruments"] else [],
|
|
"duration_sec": r["duration_sec"],
|
|
"bpm": r["bpm"],
|
|
"key": r["key"],
|
|
"scale": r["scale"],
|
|
"prompt": r["prompt"],
|
|
"audio_url": r["audio_url"],
|
|
"file_path": r["file_path"],
|
|
"task_id": r["task_id"],
|
|
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
|
"provider": r["provider"] if "provider" in keys else "local",
|
|
"lyrics": r["lyrics"] if "lyrics" in keys else "",
|
|
"image_url": r["image_url"] if "image_url" in keys else "",
|
|
"suno_id": r["suno_id"] if "suno_id" in keys else "",
|
|
"file_hash": r["file_hash"] if "file_hash" in keys else "",
|
|
"cover_images": json.loads(r["cover_images"]) if "cover_images" in keys and r["cover_images"] else [],
|
|
"wav_url": r["wav_url"] if "wav_url" in keys else "",
|
|
"video_url": r["video_url"] if "video_url" in keys else "",
|
|
"stem_urls": json.loads(r["stem_urls"]) if "stem_urls" in keys and r["stem_urls"] else {},
|
|
"created_at": r["created_at"],
|
|
}
|
|
|
|
|
|
def get_all_tracks() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT * FROM music_library ORDER BY created_at DESC").fetchall()
|
|
return [_track_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def add_track(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO music_library
|
|
(title, genre, moods, instruments, duration_sec, bpm, key, scale,
|
|
prompt, audio_url, file_path, task_id, tags,
|
|
provider, lyrics, image_url, suno_id, file_hash)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
data.get("title", ""),
|
|
data.get("genre", ""),
|
|
json.dumps(data.get("moods", [])),
|
|
json.dumps(data.get("instruments", [])),
|
|
data.get("duration_sec"),
|
|
data.get("bpm"),
|
|
data.get("key", ""),
|
|
data.get("scale", ""),
|
|
data.get("prompt", ""),
|
|
data.get("audio_url", ""),
|
|
data.get("file_path", ""),
|
|
data.get("task_id"),
|
|
json.dumps(data.get("tags", [])),
|
|
data.get("provider", "local"),
|
|
data.get("lyrics", ""),
|
|
data.get("image_url", ""),
|
|
data.get("suno_id", ""),
|
|
data.get("file_hash", ""),
|
|
),
|
|
)
|
|
row = conn.execute("SELECT * FROM music_library WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _track_row_to_dict(row)
|
|
|
|
|
|
def delete_track(track_id: int) -> bool:
|
|
with _conn() as conn:
|
|
# 파일 경로 먼저 조회 (삭제 후 파일도 지울 수 있도록)
|
|
row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute("DELETE FROM music_library WHERE id = ?", (track_id,))
|
|
return True
|
|
|
|
|
|
def get_track_by_task_id(task_id: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM music_library WHERE task_id = ?", (task_id,)).fetchone()
|
|
return _track_row_to_dict(row) if row else None
|
|
|
|
|
|
def update_track_duration(track_id: int, duration_sec: int) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE music_library SET duration_sec = ? WHERE id = ? AND duration_sec IS NULL",
|
|
(duration_sec, track_id),
|
|
)
|
|
|
|
|
|
def update_track_file_info(track_id: int, title: str, audio_url: str, file_path: str) -> None:
|
|
"""파일 rename 시 파일 관련 정보만 업데이트 (태그 등 메타데이터 보존)."""
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE music_library SET title=?, audio_url=?, file_path=? WHERE id=?",
|
|
(title, audio_url, file_path, track_id),
|
|
)
|
|
|
|
|
|
def update_track_hash(track_id: int, file_hash: str) -> None:
|
|
"""트랙의 file_hash를 업데이트."""
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"UPDATE music_library SET file_hash=? WHERE id=?",
|
|
(file_hash, track_id),
|
|
)
|
|
|
|
|
|
def get_track_file_path(track_id: int) -> Optional[str]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone()
|
|
return row["file_path"] if row else None
|
|
|
|
|
|
# ── saved_lyrics CRUD ────────────────────────────────────────────────────────
|
|
|
|
def _lyrics_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"text": r["text"],
|
|
"prompt": r["prompt"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_lyrics() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT * FROM saved_lyrics ORDER BY created_at DESC").fetchall()
|
|
return [_lyrics_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def add_lyrics(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO saved_lyrics (title, text, prompt) VALUES (?, ?, ?)",
|
|
(data.get("title", ""), data.get("text", ""), data.get("prompt", "")),
|
|
)
|
|
row = conn.execute("SELECT * FROM saved_lyrics WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _lyrics_row_to_dict(row)
|
|
|
|
|
|
def update_lyrics(lyrics_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
fields = []
|
|
values = []
|
|
for k in ("title", "text", "prompt"):
|
|
if k in data:
|
|
fields.append(f"{k} = ?")
|
|
values.append(data[k])
|
|
if not fields:
|
|
return None
|
|
fields.append("updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')")
|
|
values.append(lyrics_id)
|
|
conn.execute(f"UPDATE saved_lyrics SET {', '.join(fields)} WHERE id = ?", values)
|
|
row = conn.execute("SELECT * FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone()
|
|
return _lyrics_row_to_dict(row) if row else None
|
|
|
|
|
|
def update_track_cover_images(track_id: int, images: list) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE music_library SET cover_images=? WHERE id=?", (json.dumps(images), track_id))
|
|
|
|
|
|
def update_track_wav_url(track_id: int, wav_url: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE music_library SET wav_url=? WHERE id=?", (wav_url, track_id))
|
|
|
|
|
|
def update_track_video_url(track_id: int, video_url: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE music_library SET video_url=? WHERE id=?", (video_url, track_id))
|
|
|
|
|
|
def update_track_stem_urls(track_id: int, stems: dict) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE music_library SET stem_urls=? WHERE id=?", (json.dumps(stems), track_id))
|
|
|
|
|
|
def delete_lyrics(lyrics_id: int) -> bool:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT id FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,))
|
|
return True
|
|
|
|
|
|
# ── video_projects CRUD ───────────────────────────────────────────────────────
|
|
|
|
def _vp_row_to_dict(r) -> dict:
|
|
return {
|
|
"id": r["id"],
|
|
"track_id": r["track_id"],
|
|
"format": r["format"],
|
|
"status": r["status"],
|
|
"output_path": r["output_path"],
|
|
"output_url": r["output_url"],
|
|
"thumbnail_path": r["thumbnail_path"],
|
|
"target_countries": json.loads(r["target_countries"]) if r["target_countries"] else [],
|
|
"yt_title": r["yt_title"],
|
|
"yt_description": r["yt_description"],
|
|
"yt_tags": json.loads(r["yt_tags"]) if r["yt_tags"] else [],
|
|
"render_params": json.loads(r["render_params"]) if r["render_params"] else {},
|
|
"error": r["error"],
|
|
"created_at": r["created_at"],
|
|
"completed_at": r["completed_at"],
|
|
}
|
|
|
|
|
|
def create_video_project(data: dict) -> dict:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""INSERT INTO video_projects (track_id, format, target_countries, render_params)
|
|
VALUES (?, ?, ?, ?)""",
|
|
(data.get("track_id"), data.get("format", "visualizer"),
|
|
json.dumps(data.get("target_countries", [])),
|
|
json.dumps(data.get("render_params", {}))),
|
|
)
|
|
row = conn.execute("SELECT * FROM video_projects WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _vp_row_to_dict(row)
|
|
|
|
|
|
def get_video_project(project_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM video_projects WHERE id = ?", (project_id,)).fetchone()
|
|
return _vp_row_to_dict(row) if row else None
|
|
|
|
|
|
def get_all_video_projects() -> list:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT * FROM video_projects ORDER BY created_at DESC").fetchall()
|
|
return [_vp_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def update_video_project_status(
|
|
project_id: int,
|
|
status: str,
|
|
output_path: str = "",
|
|
output_url: str = "",
|
|
thumbnail_path: str = "",
|
|
yt_title: str = "",
|
|
yt_description: str = "",
|
|
yt_tags: list = None,
|
|
error: str = None,
|
|
) -> None:
|
|
completed_at_expr = (
|
|
"strftime('%Y-%m-%dT%H:%M:%fZ','now')" if status in ("done", "failed") else "NULL"
|
|
)
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
f"""UPDATE video_projects
|
|
SET status=?, output_path=?, output_url=?, thumbnail_path=?,
|
|
yt_title=?, yt_description=?, yt_tags=?, error=?,
|
|
completed_at={completed_at_expr}
|
|
WHERE id=?""",
|
|
(status, output_path, output_url, thumbnail_path,
|
|
yt_title, yt_description, json.dumps(yt_tags or []), error, project_id),
|
|
)
|
|
|
|
|
|
def delete_video_project(project_id: int) -> bool:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT id FROM video_projects WHERE id = ?", (project_id,)).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute("DELETE FROM video_projects WHERE id = ?", (project_id,))
|
|
return True
|
|
|
|
|
|
def get_track_by_id(track_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM music_library WHERE id = ?", (track_id,)).fetchone()
|
|
return _track_row_to_dict(row) if row else None
|
|
|
|
|
|
# ── revenue_records CRUD ──────────────────────────────────────────────────────
|
|
|
|
def _rr_row_to_dict(r) -> dict:
|
|
return {
|
|
"id": r["id"],
|
|
"video_project_id": r["video_project_id"],
|
|
"yt_video_id": r["yt_video_id"],
|
|
"record_month": r["record_month"],
|
|
"views": r["views"],
|
|
"watch_hours": r["watch_hours"],
|
|
"revenue_usd": r["revenue_usd"],
|
|
"rpm_usd": r["rpm_usd"],
|
|
"country": r["country"],
|
|
"source": r["source"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
|
|
|
|
def create_revenue_record(data: dict) -> dict:
|
|
views = data.get("views", 0)
|
|
revenue = data.get("revenue_usd", 0.0)
|
|
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""INSERT INTO revenue_records
|
|
(video_project_id, yt_video_id, record_month, views, watch_hours,
|
|
revenue_usd, rpm_usd, country, source)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(data.get("video_project_id"), data.get("yt_video_id", ""),
|
|
data.get("record_month", ""), views, data.get("watch_hours", 0.0),
|
|
revenue, rpm, data.get("country", ""), data.get("source", "manual")),
|
|
)
|
|
row = conn.execute("SELECT * FROM revenue_records WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _rr_row_to_dict(row)
|
|
|
|
|
|
def get_all_revenue_records(yt_video_id: str = None, year_month: str = None) -> list:
|
|
with _conn() as conn:
|
|
q = "SELECT * FROM revenue_records WHERE 1=1"
|
|
params: list = []
|
|
if yt_video_id:
|
|
q += " AND yt_video_id=?"
|
|
params.append(yt_video_id)
|
|
if year_month:
|
|
q += " AND record_month=?"
|
|
params.append(year_month)
|
|
q += " ORDER BY record_month DESC"
|
|
rows = conn.execute(q, params).fetchall()
|
|
return [_rr_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def update_revenue_record(record_id: int, data: dict) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
|
|
if not row:
|
|
return None
|
|
cur = _rr_row_to_dict(row)
|
|
views = data.get("views", cur["views"])
|
|
revenue = data.get("revenue_usd", cur["revenue_usd"])
|
|
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
|
|
conn.execute(
|
|
"""UPDATE revenue_records
|
|
SET yt_video_id=?, record_month=?, views=?, watch_hours=?,
|
|
revenue_usd=?, rpm_usd=?, country=?, source=?
|
|
WHERE id=?""",
|
|
(data.get("yt_video_id", cur["yt_video_id"]),
|
|
data.get("record_month", cur["record_month"]),
|
|
views, data.get("watch_hours", cur["watch_hours"]),
|
|
revenue, rpm,
|
|
data.get("country", cur["country"]),
|
|
data.get("source", cur["source"]),
|
|
record_id),
|
|
)
|
|
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
|
|
return _rr_row_to_dict(row)
|
|
|
|
|
|
def delete_revenue_record(record_id: int) -> bool:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT id FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
|
|
if not row:
|
|
return False
|
|
conn.execute("DELETE FROM revenue_records WHERE id = ?", (record_id,))
|
|
return True
|
|
|
|
|
|
def get_revenue_dashboard() -> dict:
|
|
with _conn() as conn:
|
|
total = conn.execute(
|
|
"SELECT SUM(revenue_usd) as total, SUM(views) as views, SUM(watch_hours) as hours FROM revenue_records"
|
|
).fetchone()
|
|
by_month = conn.execute(
|
|
"""SELECT record_month, SUM(revenue_usd) as revenue, SUM(views) as views,
|
|
CASE WHEN SUM(views) > 0 THEN ROUND(SUM(revenue_usd) / SUM(views) * 1000, 4) ELSE 0.0 END as avg_rpm FROM revenue_records
|
|
GROUP BY record_month ORDER BY record_month DESC LIMIT 12"""
|
|
).fetchall()
|
|
by_country = conn.execute(
|
|
"""SELECT country, SUM(revenue_usd) as revenue, SUM(views) as views
|
|
FROM revenue_records WHERE country != ''
|
|
GROUP BY country ORDER BY revenue DESC LIMIT 10"""
|
|
).fetchall()
|
|
return {
|
|
"total_revenue_usd": total["total"] or 0.0,
|
|
"total_views": total["views"] or 0,
|
|
"total_watch_hours": total["hours"] or 0.0,
|
|
"by_month": [dict(r) for r in by_month],
|
|
"by_country": [dict(r) for r in by_country],
|
|
}
|
|
|
|
|
|
# ── market_trends CRUD ────────────────────────────────────────────────────────
|
|
|
|
def insert_market_trends(trends: list) -> None:
|
|
with _conn() as conn:
|
|
conn.executemany(
|
|
"""INSERT INTO market_trends (source, country, genre, keyword, score, rank, metadata)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
[(t.get("source",""), t.get("country",""), t.get("genre",""),
|
|
t.get("keyword",""), t.get("score", 0.0), t.get("rank"),
|
|
json.dumps(t.get("metadata", {})))
|
|
for t in trends],
|
|
)
|
|
|
|
|
|
def get_market_trends(
|
|
country: str = None, genre: str = None, source: str = None, days: int = 7
|
|
) -> list:
|
|
with _conn() as conn:
|
|
q = "SELECT * FROM market_trends WHERE collected_at >= datetime('now', ?)"
|
|
params: list = [f"-{days} days"]
|
|
if country:
|
|
q += " AND country=?"; params.append(country)
|
|
if genre:
|
|
q += " AND genre=?"; params.append(genre)
|
|
if source:
|
|
q += " AND source=?"; params.append(source)
|
|
q += " ORDER BY collected_at DESC LIMIT 500"
|
|
rows = conn.execute(q, params).fetchall()
|
|
return [
|
|
{"id": r["id"], "source": r["source"], "country": r["country"],
|
|
"genre": r["genre"], "keyword": r["keyword"], "score": r["score"],
|
|
"rank": r["rank"], "metadata": json.loads(r["metadata"]),
|
|
"collected_at": r["collected_at"]}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
# ── trend_reports CRUD ────────────────────────────────────────────────────────
|
|
|
|
def upsert_trend_report(data: dict) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""INSERT INTO trend_reports
|
|
(report_date, top_genres, top_keywords, recommended_styles, insights)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(report_date) DO UPDATE SET
|
|
top_genres=excluded.top_genres,
|
|
top_keywords=excluded.top_keywords,
|
|
recommended_styles=excluded.recommended_styles,
|
|
insights=excluded.insights""",
|
|
(data["report_date"], json.dumps(data["top_genres"]),
|
|
json.dumps(data["top_keywords"]), json.dumps(data["recommended_styles"]),
|
|
data["insights"]),
|
|
)
|
|
|
|
|
|
def get_latest_trend_report() -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT 1"
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"id": row["id"],
|
|
"report_date": row["report_date"],
|
|
"top_genres": json.loads(row["top_genres"]),
|
|
"top_keywords": json.loads(row["top_keywords"]),
|
|
"recommended_styles": json.loads(row["recommended_styles"]),
|
|
"insights": row["insights"],
|
|
"created_at": row["created_at"],
|
|
}
|
|
|
|
|
|
def get_trend_reports(limit: int = 10) -> list:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"id": r["id"],
|
|
"report_date": r["report_date"],
|
|
"top_genres": json.loads(r["top_genres"] or "[]"),
|
|
"recommended_styles": json.loads(r["recommended_styles"] or "[]"),
|
|
"insights": (r["insights"] or "")[:100],
|
|
"created_at": r["created_at"],
|
|
}
|
|
for r in rows
|
|
]
|