Files
web-page-backend/image-lab/app/db.py

84 lines
2.8 KiB
Python

"""SQLite persistence for image_tasks. Single table — task 단위 추적만."""
from __future__ import annotations
import json
import os
import sqlite3
from contextlib import contextmanager
from typing import Any, Dict, Optional
DB_PATH = os.path.join(os.getenv("IMAGE_DATA_DIR", "/app/data"), "image.db")
@contextmanager
def _conn():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db() -> None:
with _conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS image_tasks (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
params TEXT NOT NULL,
status TEXT DEFAULT 'queued',
progress INTEGER DEFAULT 0,
message TEXT DEFAULT '',
image_url TEXT,
error TEXT,
created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
"""
)
def _row_to_dict(row) -> Dict[str, Any]:
return {
"id": row["id"], "provider": row["provider"], "params": row["params"],
"status": row["status"], "progress": row["progress"], "message": row["message"],
"image_url": row["image_url"], "error": row["error"],
"created_at": row["created_at"], "updated_at": row["updated_at"],
}
def create_task(task_id: str, provider: str, params: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO image_tasks (id, provider, params) VALUES (?, ?, ?)",
(task_id, provider, json.dumps(params)),
)
row = conn.execute("SELECT * FROM image_tasks WHERE id = ?", (task_id,)).fetchone()
return _row_to_dict(row)
def update_task(task_id: str, status: str, progress: int, message: str = "",
image_url: Optional[str] = None, error: Optional[str] = None) -> None:
with _conn() as conn:
conn.execute(
"""
UPDATE image_tasks
SET status = ?, progress = ?, message = ?, image_url = ?, error = ?,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id = ?
""",
(status, progress, message, image_url, error, task_id),
)
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM image_tasks WHERE id = ?", (task_id,)).fetchone()
return _row_to_dict(row) if row else None