feat(music-lab): video_projects·revenue_records DB 마이그레이션 + CRUD

- init_db()에 video_projects, revenue_records 테이블 추가 (CREATE IF NOT EXISTS)
- video_projects CRUD: create/get/get_all/update_status/delete + get_track_by_id
- revenue_records CRUD: create/get_all/update/delete/get_revenue_dashboard (RPM 자동 계산)
- TDD: tests/test_db_video.py 5개 테스트 모두 PASSED
- pytest.ini 추가 (pythonpath=. 설정)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-01 11:41:07 +09:00
parent 88b5ea9ce2
commit a5495aeaa4
5 changed files with 325 additions and 0 deletions

View File

@@ -95,6 +95,47 @@ def init_db() -> None:
except sqlite3.OperationalError: except sqlite3.OperationalError:
pass pass
# ── video_projects 테이블 ─────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS video_projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_id INTEGER,
format TEXT NOT NULL DEFAULT 'visualizer',
status TEXT NOT NULL DEFAULT 'pending',
output_path TEXT NOT NULL DEFAULT '',
output_url TEXT NOT NULL DEFAULT '',
thumbnail_path TEXT NOT NULL DEFAULT '',
target_countries TEXT NOT NULL DEFAULT '[]',
yt_title TEXT NOT NULL DEFAULT '',
yt_description TEXT NOT NULL DEFAULT '',
yt_tags TEXT NOT NULL DEFAULT '[]',
render_params TEXT NOT NULL DEFAULT '{}',
error TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
completed_at TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_track ON video_projects(track_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_vp_status ON video_projects(status)")
# ── revenue_records 테이블 ────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS revenue_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
video_project_id INTEGER,
yt_video_id TEXT NOT NULL DEFAULT '',
record_month TEXT NOT NULL DEFAULT '',
views INTEGER NOT NULL DEFAULT 0,
watch_hours REAL NOT NULL DEFAULT 0.0,
revenue_usd REAL NOT NULL DEFAULT 0.0,
rpm_usd REAL NOT NULL DEFAULT 0.0,
country TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT 'manual',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_rr_month ON revenue_records(record_month DESC)")
# ── music_tasks CRUD ────────────────────────────────────────────────────────── # ── music_tasks CRUD ──────────────────────────────────────────────────────────
@@ -343,3 +384,201 @@ def delete_lyrics(lyrics_id: int) -> bool:
return False return False
conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,)) conn.execute("DELETE FROM saved_lyrics WHERE id = ?", (lyrics_id,))
return True return True
# ── video_projects CRUD ───────────────────────────────────────────────────────
def _vp_row_to_dict(r) -> dict:
return {
"id": r["id"],
"track_id": r["track_id"],
"format": r["format"],
"status": r["status"],
"output_path": r["output_path"],
"output_url": r["output_url"],
"thumbnail_path": r["thumbnail_path"],
"target_countries": json.loads(r["target_countries"]) if r["target_countries"] else [],
"yt_title": r["yt_title"],
"yt_description": r["yt_description"],
"yt_tags": json.loads(r["yt_tags"]) if r["yt_tags"] else [],
"render_params": json.loads(r["render_params"]) if r["render_params"] else {},
"error": r["error"],
"created_at": r["created_at"],
"completed_at": r["completed_at"],
}
def create_video_project(data: dict) -> dict:
with _conn() as conn:
conn.execute(
"""INSERT INTO video_projects (track_id, format, target_countries, render_params)
VALUES (?, ?, ?, ?)""",
(data.get("track_id"), data.get("format", "visualizer"),
json.dumps(data.get("target_countries", [])),
json.dumps(data.get("render_params", {}))),
)
row = conn.execute("SELECT * FROM video_projects WHERE rowid = last_insert_rowid()").fetchone()
return _vp_row_to_dict(row)
def get_video_project(project_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM video_projects WHERE id = ?", (project_id,)).fetchone()
return _vp_row_to_dict(row) if row else None
def get_all_video_projects() -> list:
with _conn() as conn:
rows = conn.execute("SELECT * FROM video_projects ORDER BY created_at DESC").fetchall()
return [_vp_row_to_dict(r) for r in rows]
def update_video_project_status(
project_id: int,
status: str,
output_path: str = "",
output_url: str = "",
thumbnail_path: str = "",
yt_title: str = "",
yt_description: str = "",
yt_tags: list = None,
error: str = None,
) -> None:
completed_at_expr = (
"strftime('%Y-%m-%dT%H:%M:%fZ','now')" if status in ("done", "failed") else "NULL"
)
with _conn() as conn:
conn.execute(
f"""UPDATE video_projects
SET status=?, output_path=?, output_url=?, thumbnail_path=?,
yt_title=?, yt_description=?, yt_tags=?, error=?,
completed_at={completed_at_expr}
WHERE id=?""",
(status, output_path, output_url, thumbnail_path,
yt_title, yt_description, json.dumps(yt_tags or []), error, project_id),
)
def delete_video_project(project_id: int) -> bool:
with _conn() as conn:
row = conn.execute("SELECT id FROM video_projects WHERE id = ?", (project_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM video_projects WHERE id = ?", (project_id,))
return True
def get_track_by_id(track_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM music_library WHERE id = ?", (track_id,)).fetchone()
return _track_row_to_dict(row) if row else None
# ── revenue_records CRUD ──────────────────────────────────────────────────────
def _rr_row_to_dict(r) -> dict:
return {
"id": r["id"],
"video_project_id": r["video_project_id"],
"yt_video_id": r["yt_video_id"],
"record_month": r["record_month"],
"views": r["views"],
"watch_hours": r["watch_hours"],
"revenue_usd": r["revenue_usd"],
"rpm_usd": r["rpm_usd"],
"country": r["country"],
"source": r["source"],
"created_at": r["created_at"],
}
def create_revenue_record(data: dict) -> dict:
views = data.get("views", 0)
revenue = data.get("revenue_usd", 0.0)
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
with _conn() as conn:
conn.execute(
"""INSERT INTO revenue_records
(video_project_id, yt_video_id, record_month, views, watch_hours,
revenue_usd, rpm_usd, country, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(data.get("video_project_id"), data.get("yt_video_id", ""),
data.get("record_month", ""), views, data.get("watch_hours", 0.0),
revenue, rpm, data.get("country", ""), data.get("source", "manual")),
)
row = conn.execute("SELECT * FROM revenue_records WHERE rowid = last_insert_rowid()").fetchone()
return _rr_row_to_dict(row)
def get_all_revenue_records(yt_video_id: str = None, year_month: str = None) -> list:
with _conn() as conn:
q = "SELECT * FROM revenue_records WHERE 1=1"
params: list = []
if yt_video_id:
q += " AND yt_video_id=?"
params.append(yt_video_id)
if year_month:
q += " AND record_month=?"
params.append(year_month)
q += " ORDER BY record_month DESC"
rows = conn.execute(q, params).fetchall()
return [_rr_row_to_dict(r) for r in rows]
def update_revenue_record(record_id: int, data: dict) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
if not row:
return None
cur = _rr_row_to_dict(row)
views = data.get("views", cur["views"])
revenue = data.get("revenue_usd", cur["revenue_usd"])
rpm = round(revenue / views * 1000, 4) if views > 0 else 0.0
conn.execute(
"""UPDATE revenue_records
SET yt_video_id=?, record_month=?, views=?, watch_hours=?,
revenue_usd=?, rpm_usd=?, country=?, source=?
WHERE id=?""",
(data.get("yt_video_id", cur["yt_video_id"]),
data.get("record_month", cur["record_month"]),
views, data.get("watch_hours", cur["watch_hours"]),
revenue, rpm,
data.get("country", cur["country"]),
data.get("source", cur["source"]),
record_id),
)
row = conn.execute("SELECT * FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
return _rr_row_to_dict(row)
def delete_revenue_record(record_id: int) -> bool:
with _conn() as conn:
row = conn.execute("SELECT id FROM revenue_records WHERE id = ?", (record_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM revenue_records WHERE id = ?", (record_id,))
return True
def get_revenue_dashboard() -> dict:
with _conn() as conn:
total = conn.execute(
"SELECT SUM(revenue_usd) as total, SUM(views) as views, SUM(watch_hours) as hours FROM revenue_records"
).fetchone()
by_month = conn.execute(
"""SELECT record_month, SUM(revenue_usd) as revenue, SUM(views) as views,
AVG(rpm_usd) as avg_rpm FROM revenue_records
GROUP BY record_month ORDER BY record_month DESC LIMIT 12"""
).fetchall()
by_country = conn.execute(
"""SELECT country, SUM(revenue_usd) as revenue, SUM(views) as views
FROM revenue_records WHERE country != ''
GROUP BY country ORDER BY revenue DESC LIMIT 10"""
).fetchall()
return {
"total_revenue_usd": total["total"] or 0.0,
"total_views": total["views"] or 0,
"total_watch_hours": total["hours"] or 0.0,
"by_month": [dict(r) for r in by_month],
"by_country": [dict(r) for r in by_country],
}

3
music-lab/pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
testpaths = tests
pythonpath = .

View File

@@ -3,3 +3,7 @@ uvicorn[standard]==0.30.6
requests==2.32.3 requests==2.32.3
python-multipart==0.0.12 python-multipart==0.0.12
mutagen==1.47.0 mutagen==1.47.0
anthropic>=0.40.0
Pillow>=11.0.0
pytest>=8.0.0
httpx>=0.27.0

View File

@@ -0,0 +1,7 @@
import pytest
@pytest.fixture
def tmp_db(tmp_path, monkeypatch):
db_path = str(tmp_path / "test_music.db")
monkeypatch.setattr("app.db.DB_PATH", db_path)
return db_path

View File

@@ -0,0 +1,72 @@
def test_create_and_get_video_project(tmp_db):
from app.db import init_db, create_video_project, get_video_project
init_db()
proj = create_video_project({"track_id": 1, "format": "visualizer", "target_countries": ["BR", "ID"]})
assert proj["id"] == 1
assert proj["format"] == "visualizer"
assert proj["status"] == "pending"
assert "BR" in proj["target_countries"]
fetched = get_video_project(1)
assert fetched["id"] == 1
assert fetched["track_id"] == 1
def test_update_video_project_status(tmp_db):
from app.db import init_db, create_video_project, update_video_project_status, get_video_project
init_db()
create_video_project({"track_id": 2, "format": "slideshow"})
update_video_project_status(
1, "done",
output_path="/data/videos/1/output.mp4",
output_url="/media/videos/1/output.mp4",
thumbnail_path="/data/videos/1/thumbnail.jpg",
yt_title="Chill Beats Brazil",
yt_description="relaxing lofi",
yt_tags=["lofi", "chill"],
)
proj = get_video_project(1)
assert proj["status"] == "done"
assert proj["yt_title"] == "Chill Beats Brazil"
assert "lofi" in proj["yt_tags"]
assert proj["completed_at"] is not None
def test_delete_video_project(tmp_db):
from app.db import init_db, create_video_project, delete_video_project, get_video_project
init_db()
create_video_project({"track_id": 1, "format": "visualizer"})
assert delete_video_project(1) is True
assert get_video_project(1) is None
assert delete_video_project(99) is False
def test_create_revenue_record(tmp_db):
from app.db import init_db, create_revenue_record, get_all_revenue_records
import pytest
init_db()
rec = create_revenue_record({
"yt_video_id": "abc123",
"record_month": "2026-04",
"views": 10000,
"watch_hours": 500.0,
"revenue_usd": 25.0,
"country": "BR",
})
assert rec["id"] == 1
assert rec["rpm_usd"] == pytest.approx(2.5)
records = get_all_revenue_records(yt_video_id="abc123")
assert len(records) == 1
def test_revenue_dashboard(tmp_db):
from app.db import init_db, create_revenue_record, get_revenue_dashboard
import pytest
init_db()
create_revenue_record({"yt_video_id": "v1", "record_month": "2026-04", "views": 5000, "revenue_usd": 10.0})
create_revenue_record({"yt_video_id": "v2", "record_month": "2026-04", "views": 5000, "revenue_usd": 15.0})
dash = get_revenue_dashboard()
assert dash["total_revenue_usd"] == pytest.approx(25.0)
assert dash["total_views"] == 10000
assert len(dash["by_month"]) == 1