Files
gahusb 20f83cee33 feat(video-lab): app/db.py — video_tasks 테이블 + CRUD (SP-8)
WAL + busy_timeout 표준 fix. create_task / update_task / get_task.
provider 컬럼 추가(Sora/Veo/Kling/Seedance 구분). video_url 필드.
Plan-B-Video Phase 1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 08:26:19 +09:00

96 lines
2.8 KiB
Python

"""SQLite persistence for video_tasks. Single table — task 단위 추적만."""
from __future__ import annotations
import json
import os
import sqlite3
from contextlib import contextmanager
from typing import Any, Dict, Optional
DB_PATH = os.path.join(os.getenv("VIDEO_DATA_DIR", "/app/data"), "video.db")
@contextmanager
def _conn():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db() -> None:
with _conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS video_tasks (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
params TEXT NOT NULL,
status TEXT DEFAULT 'queued',
progress INTEGER DEFAULT 0,
message TEXT DEFAULT '',
video_url TEXT,
error TEXT,
created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
"""
)
def _row_to_dict(row) -> Dict[str, Any]:
return {
"id": row["id"],
"provider": row["provider"],
"params": row["params"],
"status": row["status"],
"progress": row["progress"],
"message": row["message"],
"video_url": row["video_url"],
"error": row["error"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def create_task(task_id: str, provider: str, params: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO video_tasks (id, provider, params) VALUES (?, ?, ?)",
(task_id, provider, json.dumps(params)),
)
row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone()
return _row_to_dict(row)
def update_task(
task_id: str,
status: str,
progress: int,
message: str = "",
video_url: Optional[str] = None,
error: Optional[str] = None,
) -> None:
with _conn() as conn:
conn.execute(
"""
UPDATE video_tasks
SET status = ?, progress = ?, message = ?, video_url = ?, error = ?,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id = ?
""",
(status, progress, message, video_url, error, task_id),
)
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone()
return _row_to_dict(row) if row else None