WAL + busy_timeout 표준 fix. create_task / update_task / get_task. provider 컬럼 추가(Sora/Veo/Kling/Seedance 구분). video_url 필드. Plan-B-Video Phase 1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
96 lines
2.8 KiB
Python
96 lines
2.8 KiB
Python
"""SQLite persistence for video_tasks. Single table — task 단위 추적만."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from typing import Any, Dict, Optional
|
|
|
|
DB_PATH = os.path.join(os.getenv("VIDEO_DATA_DIR", "/app/data"), "video.db")
|
|
|
|
|
|
@contextmanager
|
|
def _conn():
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=5000")
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def init_db() -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS video_tasks (
|
|
id TEXT PRIMARY KEY,
|
|
provider TEXT NOT NULL,
|
|
params TEXT NOT NULL,
|
|
status TEXT DEFAULT 'queued',
|
|
progress INTEGER DEFAULT 0,
|
|
message TEXT DEFAULT '',
|
|
video_url TEXT,
|
|
error TEXT,
|
|
created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
"""
|
|
)
|
|
|
|
|
|
def _row_to_dict(row) -> Dict[str, Any]:
|
|
return {
|
|
"id": row["id"],
|
|
"provider": row["provider"],
|
|
"params": row["params"],
|
|
"status": row["status"],
|
|
"progress": row["progress"],
|
|
"message": row["message"],
|
|
"video_url": row["video_url"],
|
|
"error": row["error"],
|
|
"created_at": row["created_at"],
|
|
"updated_at": row["updated_at"],
|
|
}
|
|
|
|
|
|
def create_task(task_id: str, provider: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO video_tasks (id, provider, params) VALUES (?, ?, ?)",
|
|
(task_id, provider, json.dumps(params)),
|
|
)
|
|
row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone()
|
|
return _row_to_dict(row)
|
|
|
|
|
|
def update_task(
|
|
task_id: str,
|
|
status: str,
|
|
progress: int,
|
|
message: str = "",
|
|
video_url: Optional[str] = None,
|
|
error: Optional[str] = None,
|
|
) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
UPDATE video_tasks
|
|
SET status = ?, progress = ?, message = ?, video_url = ?, error = ?,
|
|
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
|
|
WHERE id = ?
|
|
""",
|
|
(status, progress, message, video_url, error, task_id),
|
|
)
|
|
|
|
|
|
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM video_tasks WHERE id = ?", (task_id,)).fetchone()
|
|
return _row_to_dict(row) if row else None
|