import os import sqlite3 import json from typing import Any, Dict, List, Optional DB_PATH = "/app/data/music.db" def _conn() -> sqlite3.Connection: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db() -> None: with _conn() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS music_tasks ( id TEXT PRIMARY KEY, status TEXT NOT NULL DEFAULT 'queued', progress INTEGER NOT NULL DEFAULT 0, message TEXT NOT NULL DEFAULT '', audio_url TEXT, error TEXT, params TEXT NOT NULL DEFAULT '{}', provider TEXT NOT NULL DEFAULT 'local', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_created ON music_tasks(created_at DESC)") conn.execute(""" CREATE TABLE IF NOT EXISTS music_library ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL DEFAULT '', genre TEXT NOT NULL DEFAULT '', moods TEXT NOT NULL DEFAULT '[]', instruments TEXT NOT NULL DEFAULT '[]', duration_sec INTEGER, bpm INTEGER, key TEXT NOT NULL DEFAULT '', scale TEXT NOT NULL DEFAULT '', prompt TEXT NOT NULL DEFAULT '', audio_url TEXT NOT NULL DEFAULT '', file_path TEXT NOT NULL DEFAULT '', task_id TEXT, tags TEXT NOT NULL DEFAULT '[]', provider TEXT NOT NULL DEFAULT 'local', lyrics TEXT NOT NULL DEFAULT '', image_url TEXT NOT NULL DEFAULT '', suno_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_library_created ON music_library(created_at DESC)") conn.execute(""" CREATE TABLE IF NOT EXISTS saved_lyrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL DEFAULT '', text TEXT NOT NULL DEFAULT '', prompt TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_lyrics_created ON saved_lyrics(created_at DESC)") # 기존 테이블 마이그레이션 (컬럼 없으면 추가) for col, default in [ ("provider", "'local'"), ("lyrics", "''"), ("image_url", "''"), ("suno_id", "''"), ("file_hash", "''"), ]: try: conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}") except sqlite3.OperationalError: pass # 이미 존재 try: conn.execute("ALTER TABLE music_tasks ADD COLUMN provider TEXT NOT NULL DEFAULT 'local'") except sqlite3.OperationalError: pass # Phase 1~3 신규 컬럼 마이그레이션 for col, default in [ ("cover_images", "'[]'"), ("wav_url", "''"), ("video_url", "''"), ("stem_urls", "'{}'"), ]: try: conn.execute(f"ALTER TABLE music_library ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}") except sqlite3.OperationalError: pass # ── video_projects 테이블 ───────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS video_projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, track_id INTEGER, format TEXT NOT NULL DEFAULT 'visualizer', status TEXT NOT NULL DEFAULT 'pending', output_path TEXT NOT NULL DEFAULT '', output_url TEXT NOT NULL DEFAULT '', thumbnail_path TEXT NOT NULL DEFAULT '', target_countries TEXT NOT NULL DEFAULT '[]', yt_title TEXT NOT NULL DEFAULT '', yt_description TEXT NOT NULL DEFAULT '', yt_tags TEXT NOT NULL DEFAULT '[]', render_params TEXT NOT NULL DEFAULT '{}', error TEXT, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), completed_at TEXT ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_track ON video_projects(track_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_status ON video_projects(status)") # ── revenue_records 테이블 ──────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS revenue_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_project_id INTEGER, yt_video_id TEXT NOT NULL DEFAULT '', record_month TEXT NOT NULL DEFAULT '', views INTEGER NOT NULL DEFAULT 0, watch_hours REAL NOT NULL DEFAULT 0.0, revenue_usd REAL NOT NULL DEFAULT 0.0, rpm_usd REAL NOT NULL DEFAULT 0.0, country TEXT NOT NULL DEFAULT '', source TEXT NOT NULL DEFAULT 'manual', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), UNIQUE(yt_video_id, record_month, country) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_rr_month ON revenue_records(record_month DESC)") # ── market_trends 테이블 ────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS market_trends ( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL DEFAULT '', country TEXT NOT NULL DEFAULT '', genre TEXT NOT NULL DEFAULT '', keyword TEXT NOT NULL DEFAULT '', score REAL NOT NULL DEFAULT 0.0, rank INTEGER, metadata TEXT NOT NULL DEFAULT '{}', collected_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute( "CREATE INDEX IF NOT EXISTS idx_mt_country_source " "ON market_trends(country, source, collected_at DESC)" ) # ── trend_reports 테이블 ────────────────────────────────────────── conn.execute(""" CREATE TABLE IF NOT EXISTS trend_reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, report_date TEXT UNIQUE NOT NULL DEFAULT '', top_genres TEXT NOT NULL DEFAULT '[]', top_keywords TEXT NOT NULL DEFAULT '[]', recommended_styles TEXT NOT NULL DEFAULT '[]', insights TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) # ── music_tasks CRUD ────────────────────────────────────────────────────────── def _task_row_to_dict(r) -> Dict[str, Any]: return { "task_id": r["id"], "status": r["status"], "progress": r["progress"], "message": r["message"], "audio_url": r["audio_url"], "error": r["error"], "params": json.loads(r["params"]), "provider": r["provider"] if "provider" in r.keys() else "local", "created_at": r["created_at"], "updated_at": r["updated_at"], } def create_task(task_id: str, params: Dict[str, Any], provider: str = "local") -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO music_tasks (id, params, provider) VALUES (?, ?, ?)", (task_id, json.dumps(params), provider), ) row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone() return _task_row_to_dict(row) def update_task( task_id: str, status: str, progress: int, message: str, audio_url: str = None, error: str = None, ) -> None: with _conn() as conn: conn.execute( """ UPDATE music_tasks SET status = ?, progress = ?, message = ?, audio_url = ?, error = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ? """, (status, progress, message, audio_url, error, task_id), ) def get_task(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM music_tasks WHERE id = ?", (task_id,)).fetchone() return _task_row_to_dict(row) if row else None # ── music_library CRUD ──────────────────────────────────────────────────────── def _track_row_to_dict(r) -> Dict[str, Any]: keys = r.keys() return { "id": r["id"], "title": r["title"], "genre": r["genre"], "moods": json.loads(r["moods"]) if r["moods"] else [], "instruments": json.loads(r["instruments"]) if r["instruments"] else [], "duration_sec": r["duration_sec"], "bpm": r["bpm"], "key": r["key"], "scale": r["scale"], "prompt": r["prompt"], "audio_url": r["audio_url"], "file_path": r["file_path"], "task_id": r["task_id"], "tags": json.loads(r["tags"]) if r["tags"] else [], "provider": r["provider"] if "provider" in keys else "local", "lyrics": r["lyrics"] if "lyrics" in keys else "", "image_url": r["image_url"] if "image_url" in keys else "", "suno_id": r["suno_id"] if "suno_id" in keys else "", "file_hash": r["file_hash"] if "file_hash" in keys else "", "cover_images": json.loads(r["cover_images"]) if "cover_images" in keys and r["cover_images"] else [], "wav_url": r["wav_url"] if "wav_url" in keys else "", "video_url": r["video_url"] if "video_url" in keys else "", "stem_urls": json.loads(r["stem_urls"]) if "stem_urls" in keys and r["stem_urls"] else {}, "created_at": r["created_at"], } def get_all_tracks() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM music_library ORDER BY created_at DESC").fetchall() return [_track_row_to_dict(r) for r in rows] def add_track(data: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( """ INSERT INTO music_library (title, genre, moods, instruments, duration_sec, bpm, key, scale, prompt, audio_url, file_path, task_id, tags, provider, lyrics, image_url, suno_id, file_hash) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data.get("title", ""), data.get("genre", ""), json.dumps(data.get("moods", [])), json.dumps(data.get("instruments", [])), data.get("duration_sec"), data.get("bpm"), data.get("key", ""), data.get("scale", ""), data.get("prompt", ""), data.get("audio_url", ""), data.get("file_path", ""), data.get("task_id"), json.dumps(data.get("tags", [])), data.get("provider", "local"), data.get("lyrics", ""), data.get("image_url", ""), data.get("suno_id", ""), data.get("file_hash", ""), ), ) row = conn.execute("SELECT * FROM music_library WHERE rowid = last_insert_rowid()").fetchone() return _track_row_to_dict(row) def delete_track(track_id: int) -> bool: with _conn() as conn: # 파일 경로 먼저 조회 (삭제 후 파일도 지울 수 있도록) row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone() if not row: return False conn.execute("DELETE FROM music_library WHERE id = ?", (track_id,)) return True def get_track_by_task_id(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM music_library WHERE task_id = ?", (task_id,)).fetchone() return _track_row_to_dict(row) if row else None def update_track_duration(track_id: int, duration_sec: int) -> None: with _conn() as conn: conn.execute( "UPDATE music_library SET duration_sec = ? WHERE id = ? AND duration_sec IS NULL", (duration_sec, track_id), ) def update_track_file_info(track_id: int, title: str, audio_url: str, file_path: str) -> None: """파일 rename 시 파일 관련 정보만 업데이트 (태그 등 메타데이터 보존).""" with _conn() as conn: conn.execute( "UPDATE music_library SET title=?, audio_url=?, file_path=? WHERE id=?", (title, audio_url, file_path, track_id), ) def update_track_hash(track_id: int, file_hash: str) -> None: """트랙의 file_hash를 업데이트.""" with _conn() as conn: conn.execute( "UPDATE music_library SET file_hash=? WHERE id=?", (file_hash, track_id), ) def get_track_file_path(track_id: int) -> Optional[str]: with _conn() as conn: row = conn.execute("SELECT file_path FROM music_library WHERE id = ?", (track_id,)).fetchone() return row["file_path"] if row else None # ── saved_lyrics CRUD ──────────────────────────────────────────────────────── def _lyrics_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "title": r["title"], "text": r["text"], "prompt": r["prompt"], "created_at": r["created_at"], "updated_at": r["updated_at"], } def get_all_lyrics() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM saved_lyrics ORDER BY created_at DESC").fetchall() return [_lyrics_row_to_dict(r) for r in rows] def add_lyrics(data: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO saved_lyrics (title, text, prompt) VALUES (?, ?, ?)", (data.get("title", ""), data.get("text", ""), data.get("prompt", "")), ) row = conn.execute("SELECT * FROM saved_lyrics WHERE rowid = last_insert_rowid()").fetchone() return _lyrics_row_to_dict(row) def update_lyrics(lyrics_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: with _conn() as conn: fields = [] values = [] for k in ("title", "text", "prompt"): if k in data: fields.append(f"{k} = ?") values.append(data[k]) if not fields: return None fields.append("updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')") values.append(lyrics_id) conn.execute(f"UPDATE saved_lyrics SET {', '.join(fields)} WHERE id = ?", values) row = conn.execute("SELECT * FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone() return _lyrics_row_to_dict(row) if row else None def update_track_cover_images(track_id: int, images: list) -> None: with _conn() as conn: conn.execute("UPDATE music_library SET cover_images=? WHERE id=?", (json.dumps(images), track_id)) def update_track_wav_url(track_id: int, wav_url: str) -> None: with _conn() as conn: conn.execute("UPDATE music_library SET wav_url=? WHERE id=?", (wav_url, track_id)) def update_track_video_url(track_id: int, video_url: str) -> None: with _conn() as conn: conn.execute("UPDATE music_library SET video_url=? WHERE id=?", (video_url, track_id)) def update_track_stem_urls(track_id: int, stems: dict) -> None: with _conn() as conn: conn.execute("UPDATE music_library SET stem_urls=? WHERE id=?", (json.dumps(stems), track_id)) def delete_lyrics(lyrics_id: int) -> bool: with _conn() as conn: row = conn.execute("SELECT id FROM saved_lyrics WHERE id = ?", (lyrics_id,)).fetchone() if not row: return False conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,)) return True # ── video_projects CRUD ─────────────────────────────────────────────────────── def _vp_row_to_dict(r) -> dict: return { "id": r["id"], "track_id": r["track_id"], "format": r["format"], "status": r["status"], "output_path": r["output_path"], "output_url": r["output_url"], "thumbnail_path": r["thumbnail_path"], "target_countries": json.loads(r["target_countries"]) if r["target_countries"] else [], "yt_title": r["yt_title"], "yt_description": r["yt_description"], "yt_tags": json.loads(r["yt_tags"]) if r["yt_tags"] else [], "render_params": json.loads(r["render_params"]) if r["render_params"] else {}, "error": r["error"], "created_at": r["created_at"], "completed_at": r["completed_at"], } def create_video_project(data: dict) -> dict: with _conn() as conn: conn.execute( """INSERT INTO video_projects (track_id, format, target_countries, render_params) VALUES (?, ?, ?, ?)""", (data.get("track_id"), data.get("format", "visualizer"), json.dumps(data.get("target_countries", [])), json.dumps(data.get("render_params", {}))), ) row = conn.execute("SELECT * FROM video_projects WHERE rowid = last_insert_rowid()").fetchone() return _vp_row_to_dict(row) def get_video_project(project_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM video_projects WHERE id = ?", (project_id,)).fetchone() return _vp_row_to_dict(row) if row else None def get_all_video_projects() -> list: with _conn() as conn: rows = conn.execute("SELECT * FROM video_projects ORDER BY created_at DESC").fetchall() return [_vp_row_to_dict(r) for r in rows] def update_video_project_status( project_id: int, status: str, output_path: str = "", output_url: str = "", thumbnail_path: str = "", yt_title: str = "", yt_description: str = "", yt_tags: list = None, error: str = None, ) -> None: completed_at_expr = ( "strftime('%Y-%m-%dT%H:%M:%fZ','now')" if status in ("done", "failed") else "NULL" ) with _conn() as conn: conn.execute( f"""UPDATE video_projects SET status=?, output_path=?, output_url=?, thumbnail_path=?, yt_title=?, yt_description=?, yt_tags=?, error=?, completed_at={completed_at_expr} WHERE id=?""", (status, output_path, output_url, thumbnail_path, yt_title, yt_description, json.dumps(yt_tags or []), error, project_id), ) def delete_video_project(project_id: int) -> bool: with _conn() as conn: row = conn.execute("SELECT id FROM video_projects WHERE id = ?", (project_id,)).fetchone() if not row: return False conn.execute("DELETE FROM video_projects WHERE id = ?", (project_id,)) return True def get_track_by_id(track_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM music_library WHERE id = ?", (track_id,)).fetchone() return _track_row_to_dict(row) if row else None # ── revenue_records CRUD ────────────────────────────────────────────────────── def _rr_row_to_dict(r) -> dict: return { "id": r["id"], "video_project_id": r["video_project_id"], "yt_video_id": r["yt_video_id"], "record_month": r["record_month"], "views": r["views"], "watch_hours": r["watch_hours"], "revenue_usd": r["revenue_usd"], "rpm_usd": r["rpm_usd"], "country": r["country"], "source": r["source"], "created_at": r["created_at"], } def create_revenue_record(data: dict) -> dict: views = data.get("views", 0) revenue = data.get("revenue_usd", 0.0) rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0 with _conn() as conn: conn.execute( """INSERT INTO revenue_records (video_project_id, yt_video_id, record_month, views, watch_hours, revenue_usd, rpm_usd, country, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", (data.get("video_project_id"), data.get("yt_video_id", ""), data.get("record_month", ""), views, data.get("watch_hours", 0.0), revenue, rpm, data.get("country", ""), data.get("source", "manual")), ) row = conn.execute("SELECT * FROM revenue_records WHERE rowid = last_insert_rowid()").fetchone() return _rr_row_to_dict(row) def get_all_revenue_records(yt_video_id: str = None, year_month: str = None) -> list: with _conn() as conn: q = "SELECT * FROM revenue_records WHERE 1=1" params: list = [] if yt_video_id: q += " AND yt_video_id=?" params.append(yt_video_id) if year_month: q += " AND record_month=?" params.append(year_month) q += " ORDER BY record_month DESC" rows = conn.execute(q, params).fetchall() return [_rr_row_to_dict(r) for r in rows] def update_revenue_record(record_id: int, data: dict) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone() if not row: return None cur = _rr_row_to_dict(row) views = data.get("views", cur["views"]) revenue = data.get("revenue_usd", cur["revenue_usd"]) rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0 conn.execute( """UPDATE revenue_records SET yt_video_id=?, record_month=?, views=?, watch_hours=?, revenue_usd=?, rpm_usd=?, country=?, source=? WHERE id=?""", (data.get("yt_video_id", cur["yt_video_id"]), data.get("record_month", cur["record_month"]), views, data.get("watch_hours", cur["watch_hours"]), revenue, rpm, data.get("country", cur["country"]), data.get("source", cur["source"]), record_id), ) row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone() return _rr_row_to_dict(row) def delete_revenue_record(record_id: int) -> bool: with _conn() as conn: row = conn.execute("SELECT id FROM revenue_records WHERE id = ?", (record_id,)).fetchone() if not row: return False conn.execute("DELETE FROM revenue_records WHERE id = ?", (record_id,)) return True def get_revenue_dashboard() -> dict: with _conn() as conn: total = conn.execute( "SELECT SUM(revenue_usd) as total, SUM(views) as views, SUM(watch_hours) as hours FROM revenue_records" ).fetchone() by_month = conn.execute( """SELECT record_month, SUM(revenue_usd) as revenue, SUM(views) as views, CASE WHEN SUM(views) > 0 THEN ROUND(SUM(revenue_usd) / SUM(views) * 1000, 4) ELSE 0.0 END as avg_rpm FROM revenue_records GROUP BY record_month ORDER BY record_month DESC LIMIT 12""" ).fetchall() by_country = conn.execute( """SELECT country, SUM(revenue_usd) as revenue, SUM(views) as views FROM revenue_records WHERE country != '' GROUP BY country ORDER BY revenue DESC LIMIT 10""" ).fetchall() return { "total_revenue_usd": total["total"] or 0.0, "total_views": total["views"] or 0, "total_watch_hours": total["hours"] or 0.0, "by_month": [dict(r) for r in by_month], "by_country": [dict(r) for r in by_country], } # ── market_trends CRUD ──────────────────────────────────────────────────────── def insert_market_trends(trends: list) -> None: with _conn() as conn: conn.executemany( """INSERT INTO market_trends (source, country, genre, keyword, score, rank, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)""", [(t.get("source",""), t.get("country",""), t.get("genre",""), t.get("keyword",""), t.get("score", 0.0), t.get("rank"), json.dumps(t.get("metadata", {}))) for t in trends], ) def get_market_trends( country: str = None, genre: str = None, source: str = None, days: int = 7 ) -> list: with _conn() as conn: q = "SELECT * FROM market_trends WHERE collected_at >= datetime('now', ?)" params: list = [f"-{days} days"] if country: q += " AND country=?"; params.append(country) if genre: q += " AND genre=?"; params.append(genre) if source: q += " AND source=?"; params.append(source) q += " ORDER BY collected_at DESC LIMIT 500" rows = conn.execute(q, params).fetchall() return [ {"id": r["id"], "source": r["source"], "country": r["country"], "genre": r["genre"], "keyword": r["keyword"], "score": r["score"], "rank": r["rank"], "metadata": json.loads(r["metadata"]), "collected_at": r["collected_at"]} for r in rows ] # ── trend_reports CRUD ──────────────────────────────────────────────────────── def upsert_trend_report(data: dict) -> None: with _conn() as conn: conn.execute( """INSERT INTO trend_reports (report_date, top_genres, top_keywords, recommended_styles, insights) VALUES (?, ?, ?, ?, ?) ON CONFLICT(report_date) DO UPDATE SET top_genres=excluded.top_genres, top_keywords=excluded.top_keywords, recommended_styles=excluded.recommended_styles, insights=excluded.insights""", (data["report_date"], json.dumps(data["top_genres"]), json.dumps(data["top_keywords"]), json.dumps(data["recommended_styles"]), data["insights"]), ) def get_latest_trend_report() -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute( "SELECT * FROM trend_reports ORDER BY report_date DESC LIMIT 1" ).fetchone() if not row: return None return { "id": row["id"], "report_date": row["report_date"], "top_genres": json.loads(row["top_genres"]), "top_keywords": json.loads(row["top_keywords"]), "recommended_styles": json.loads(row["recommended_styles"]), "insights": row["insights"], "created_at": row["created_at"], } def get_trend_reports(limit: int = 10) -> list: with _conn() as conn: rows = conn.execute( "SELECT id, report_date, insights, created_at FROM trend_reports " "ORDER BY report_date DESC LIMIT ?", (limit,) ).fetchall() return [{"id": r["id"], "report_date": r["report_date"], "insights": r["insights"][:100], "created_at": r["created_at"]} for r in rows]