"""SQLite persistence for video_tasks. Single table — task 단위 추적만.""" from __future__ import annotations import json import os import sqlite3 from contextlib import contextmanager from typing import Any, Dict, Optional DB_PATH = os.path.join(os.getenv("VIDEO_DATA_DIR", "/app/data"), "video.db") @contextmanager def _conn(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=5000") try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with _conn() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS video_tasks ( id TEXT PRIMARY KEY, provider TEXT NOT NULL, params TEXT NOT NULL, status TEXT DEFAULT 'queued', progress INTEGER DEFAULT 0, message TEXT DEFAULT '', video_url TEXT, error TEXT, created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """ ) def _row_to_dict(row) -> Dict[str, Any]: return { "id": row["id"], "provider": row["provider"], "params": row["params"], "status": row["status"], "progress": row["progress"], "message": row["message"], "video_url": row["video_url"], "error": row["error"], "created_at": row["created_at"], "updated_at": row["updated_at"], } def create_task(task_id: str, provider: str, params: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO video_tasks (id, provider, params) VALUES (?, ?, ?)", (task_id, provider, json.dumps(params)), ) row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone() return _row_to_dict(row) def update_task( task_id: str, status: str, progress: int, message: str = "", video_url: Optional[str] = None, error: Optional[str] = None, ) -> None: with _conn() as conn: conn.execute( """ UPDATE video_tasks SET status = ?, progress = ?, message = ?, video_url = ?, error = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ? """, (status, progress, message, video_url, error, task_id), ) def get_task(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone() return _row_to_dict(row) if row else None