diff --git a/music-lab/app/db.py b/music-lab/app/db.py index c8504db..559d7fe 100644 --- a/music-lab/app/db.py +++ b/music-lab/app/db.py @@ -95,6 +95,47 @@ def init_db() -> None: except sqlite3.OperationalError: pass + # ── video_projects 테이블 ───────────────────────────────────────── + conn.execute(""" + CREATE TABLE IF NOT EXISTS video_projects ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + track_id INTEGER, + format TEXT NOT NULL DEFAULT 'visualizer', + status TEXT NOT NULL DEFAULT 'pending', + output_path TEXT NOT NULL DEFAULT '', + output_url TEXT NOT NULL DEFAULT '', + thumbnail_path TEXT NOT NULL DEFAULT '', + target_countries TEXT NOT NULL DEFAULT '[]', + yt_title TEXT NOT NULL DEFAULT '', + yt_description TEXT NOT NULL DEFAULT '', + yt_tags TEXT NOT NULL DEFAULT '[]', + render_params TEXT NOT NULL DEFAULT '{}', + error TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), + completed_at TEXT + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_track ON video_projects(track_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_status ON video_projects(status)") + + # ── revenue_records 테이블 ──────────────────────────────────────── + conn.execute(""" + CREATE TABLE IF NOT EXISTS revenue_records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + video_project_id INTEGER, + yt_video_id TEXT NOT NULL DEFAULT '', + record_month TEXT NOT NULL DEFAULT '', + views INTEGER NOT NULL DEFAULT 0, + watch_hours REAL NOT NULL DEFAULT 0.0, + revenue_usd REAL NOT NULL DEFAULT 0.0, + rpm_usd REAL NOT NULL DEFAULT 0.0, + country TEXT NOT NULL DEFAULT '', + source TEXT NOT NULL DEFAULT 'manual', + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_rr_month ON revenue_records(record_month DESC)") + # ── music_tasks CRUD ────────────────────────────────────────────────────────── @@ -343,3 +384,201 @@ def delete_lyrics(lyrics_id: int) -> bool: return False conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,)) return True + + +# ── video_projects CRUD ─────────────────────────────────────────────────────── + +def _vp_row_to_dict(r) -> dict: + return { + "id": r["id"], + "track_id": r["track_id"], + "format": r["format"], + "status": r["status"], + "output_path": r["output_path"], + "output_url": r["output_url"], + "thumbnail_path": r["thumbnail_path"], + "target_countries": json.loads(r["target_countries"]) if r["target_countries"] else [], + "yt_title": r["yt_title"], + "yt_description": r["yt_description"], + "yt_tags": json.loads(r["yt_tags"]) if r["yt_tags"] else [], + "render_params": json.loads(r["render_params"]) if r["render_params"] else {}, + "error": r["error"], + "created_at": r["created_at"], + "completed_at": r["completed_at"], + } + + +def create_video_project(data: dict) -> dict: + with _conn() as conn: + conn.execute( + """INSERT INTO video_projects (track_id, format, target_countries, render_params) + VALUES (?, ?, ?, ?)""", + (data.get("track_id"), data.get("format", "visualizer"), + json.dumps(data.get("target_countries", [])), + json.dumps(data.get("render_params", {}))), + ) + row = conn.execute("SELECT * FROM video_projects WHERE rowid = last_insert_rowid()").fetchone() + return _vp_row_to_dict(row) + + +def get_video_project(project_id: int) -> Optional[Dict[str, Any]]: + with _conn() as conn: + row = conn.execute("SELECT * FROM video_projects WHERE id = ?", (project_id,)).fetchone() + return _vp_row_to_dict(row) if row else None + + +def get_all_video_projects() -> list: + with _conn() as conn: + rows = conn.execute("SELECT * FROM video_projects ORDER BY created_at DESC").fetchall() + return [_vp_row_to_dict(r) for r in rows] + + +def update_video_project_status( + project_id: int, + status: str, + output_path: str = "", + output_url: str = "", + thumbnail_path: str = "", + yt_title: str = "", + yt_description: str = "", + yt_tags: list = None, + error: str = None, +) -> None: + completed_at_expr = ( + "strftime('%Y-%m-%dT%H:%M:%fZ','now')" if status in ("done", "failed") else "NULL" + ) + with _conn() as conn: + conn.execute( + f"""UPDATE video_projects + SET status=?, output_path=?, output_url=?, thumbnail_path=?, + yt_title=?, yt_description=?, yt_tags=?, error=?, + completed_at={completed_at_expr} + WHERE id=?""", + (status, output_path, output_url, thumbnail_path, + yt_title, yt_description, json.dumps(yt_tags or []), error, project_id), + ) + + +def delete_video_project(project_id: int) -> bool: + with _conn() as conn: + row = conn.execute("SELECT id FROM video_projects WHERE id = ?", (project_id,)).fetchone() + if not row: + return False + conn.execute("DELETE FROM video_projects WHERE id = ?", (project_id,)) + return True + + +def get_track_by_id(track_id: int) -> Optional[Dict[str, Any]]: + with _conn() as conn: + row = conn.execute("SELECT * FROM music_library WHERE id = ?", (track_id,)).fetchone() + return _track_row_to_dict(row) if row else None + + +# ── revenue_records CRUD ────────────────────────────────────────────────────── + +def _rr_row_to_dict(r) -> dict: + return { + "id": r["id"], + "video_project_id": r["video_project_id"], + "yt_video_id": r["yt_video_id"], + "record_month": r["record_month"], + "views": r["views"], + "watch_hours": r["watch_hours"], + "revenue_usd": r["revenue_usd"], + "rpm_usd": r["rpm_usd"], + "country": r["country"], + "source": r["source"], + "created_at": r["created_at"], + } + + +def create_revenue_record(data: dict) -> dict: + views = data.get("views", 0) + revenue = data.get("revenue_usd", 0.0) + rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0 + with _conn() as conn: + conn.execute( + """INSERT INTO revenue_records + (video_project_id, yt_video_id, record_month, views, watch_hours, + revenue_usd, rpm_usd, country, source) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (data.get("video_project_id"), data.get("yt_video_id", ""), + data.get("record_month", ""), views, data.get("watch_hours", 0.0), + revenue, rpm, data.get("country", ""), data.get("source", "manual")), + ) + row = conn.execute("SELECT * FROM revenue_records WHERE rowid = last_insert_rowid()").fetchone() + return _rr_row_to_dict(row) + + +def get_all_revenue_records(yt_video_id: str = None, year_month: str = None) -> list: + with _conn() as conn: + q = "SELECT * FROM revenue_records WHERE 1=1" + params: list = [] + if yt_video_id: + q += " AND yt_video_id=?" + params.append(yt_video_id) + if year_month: + q += " AND record_month=?" + params.append(year_month) + q += " ORDER BY record_month DESC" + rows = conn.execute(q, params).fetchall() + return [_rr_row_to_dict(r) for r in rows] + + +def update_revenue_record(record_id: int, data: dict) -> Optional[Dict[str, Any]]: + with _conn() as conn: + row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone() + if not row: + return None + cur = _rr_row_to_dict(row) + views = data.get("views", cur["views"]) + revenue = data.get("revenue_usd", cur["revenue_usd"]) + rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0 + conn.execute( + """UPDATE revenue_records + SET yt_video_id=?, record_month=?, views=?, watch_hours=?, + revenue_usd=?, rpm_usd=?, country=?, source=? + WHERE id=?""", + (data.get("yt_video_id", cur["yt_video_id"]), + data.get("record_month", cur["record_month"]), + views, data.get("watch_hours", cur["watch_hours"]), + revenue, rpm, + data.get("country", cur["country"]), + data.get("source", cur["source"]), + record_id), + ) + row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone() + return _rr_row_to_dict(row) + + +def delete_revenue_record(record_id: int) -> bool: + with _conn() as conn: + row = conn.execute("SELECT id FROM revenue_records WHERE id = ?", (record_id,)).fetchone() + if not row: + return False + conn.execute("DELETE FROM revenue_records WHERE id = ?", (record_id,)) + return True + + +def get_revenue_dashboard() -> dict: + with _conn() as conn: + total = conn.execute( + "SELECT SUM(revenue_usd) as total, SUM(views) as views, SUM(watch_hours) as hours FROM revenue_records" + ).fetchone() + by_month = conn.execute( + """SELECT record_month, SUM(revenue_usd) as revenue, SUM(views) as views, + AVG(rpm_usd) as avg_rpm FROM revenue_records + GROUP BY record_month ORDER BY record_month DESC LIMIT 12""" + ).fetchall() + by_country = conn.execute( + """SELECT country, SUM(revenue_usd) as revenue, SUM(views) as views + FROM revenue_records WHERE country != '' + GROUP BY country ORDER BY revenue DESC LIMIT 10""" + ).fetchall() + return { + "total_revenue_usd": total["total"] or 0.0, + "total_views": total["views"] or 0, + "total_watch_hours": total["hours"] or 0.0, + "by_month": [dict(r) for r in by_month], + "by_country": [dict(r) for r in by_country], + } diff --git a/music-lab/pytest.ini b/music-lab/pytest.ini new file mode 100644 index 0000000..4584de7 --- /dev/null +++ b/music-lab/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +testpaths = tests +pythonpath = . diff --git a/music-lab/requirements.txt b/music-lab/requirements.txt index 80e5c9b..92fb320 100644 --- a/music-lab/requirements.txt +++ b/music-lab/requirements.txt @@ -3,3 +3,7 @@ uvicorn[standard]==0.30.6 requests==2.32.3 python-multipart==0.0.12 mutagen==1.47.0 +anthropic>=0.40.0 +Pillow>=11.0.0 +pytest>=8.0.0 +httpx>=0.27.0 diff --git a/music-lab/tests/conftest.py b/music-lab/tests/conftest.py new file mode 100644 index 0000000..a19432e --- /dev/null +++ b/music-lab/tests/conftest.py @@ -0,0 +1,7 @@ +import pytest + +@pytest.fixture +def tmp_db(tmp_path, monkeypatch): + db_path = str(tmp_path / "test_music.db") + monkeypatch.setattr("app.db.DB_PATH", db_path) + return db_path diff --git a/music-lab/tests/test_db_video.py b/music-lab/tests/test_db_video.py new file mode 100644 index 0000000..d32f75c --- /dev/null +++ b/music-lab/tests/test_db_video.py @@ -0,0 +1,72 @@ +def test_create_and_get_video_project(tmp_db): + from app.db import init_db, create_video_project, get_video_project + init_db() + proj = create_video_project({"track_id": 1, "format": "visualizer", "target_countries": ["BR", "ID"]}) + assert proj["id"] == 1 + assert proj["format"] == "visualizer" + assert proj["status"] == "pending" + assert "BR" in proj["target_countries"] + + fetched = get_video_project(1) + assert fetched["id"] == 1 + assert fetched["track_id"] == 1 + + +def test_update_video_project_status(tmp_db): + from app.db import init_db, create_video_project, update_video_project_status, get_video_project + init_db() + create_video_project({"track_id": 2, "format": "slideshow"}) + update_video_project_status( + 1, "done", + output_path="/data/videos/1/output.mp4", + output_url="/media/videos/1/output.mp4", + thumbnail_path="/data/videos/1/thumbnail.jpg", + yt_title="Chill Beats Brazil", + yt_description="relaxing lofi", + yt_tags=["lofi", "chill"], + ) + proj = get_video_project(1) + assert proj["status"] == "done" + assert proj["yt_title"] == "Chill Beats Brazil" + assert "lofi" in proj["yt_tags"] + assert proj["completed_at"] is not None + + +def test_delete_video_project(tmp_db): + from app.db import init_db, create_video_project, delete_video_project, get_video_project + init_db() + create_video_project({"track_id": 1, "format": "visualizer"}) + assert delete_video_project(1) is True + assert get_video_project(1) is None + assert delete_video_project(99) is False + + +def test_create_revenue_record(tmp_db): + from app.db import init_db, create_revenue_record, get_all_revenue_records + import pytest + init_db() + rec = create_revenue_record({ + "yt_video_id": "abc123", + "record_month": "2026-04", + "views": 10000, + "watch_hours": 500.0, + "revenue_usd": 25.0, + "country": "BR", + }) + assert rec["id"] == 1 + assert rec["rpm_usd"] == pytest.approx(2.5) + + records = get_all_revenue_records(yt_video_id="abc123") + assert len(records) == 1 + + +def test_revenue_dashboard(tmp_db): + from app.db import init_db, create_revenue_record, get_revenue_dashboard + import pytest + init_db() + create_revenue_record({"yt_video_id": "v1", "record_month": "2026-04", "views": 5000, "revenue_usd": 10.0}) + create_revenue_record({"yt_video_id": "v2", "record_month": "2026-04", "views": 5000, "revenue_usd": 15.0}) + dash = get_revenue_dashboard() + assert dash["total_revenue_usd"] == pytest.approx(25.0) + assert dash["total_views"] == 10000 + assert len(dash["by_month"]) == 1