"""saju.db SQLite — saju_records + compat_records CRUD.""" import json import os import sqlite3 from typing import Any, Dict, Optional from .config import DB_PATH def _conn() -> sqlite3.Connection: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH, timeout=120.0) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=120000") return conn def init_db() -> None: with _conn() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS saju_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, birth_year INTEGER NOT NULL, birth_month INTEGER NOT NULL, birth_day INTEGER NOT NULL, birth_hour INTEGER, gender TEXT NOT NULL, calendar_type TEXT DEFAULT 'solar', saju_data TEXT NOT NULL, analysis_data TEXT NOT NULL, daeun_data TEXT NOT NULL, interpretation_json TEXT, model TEXT, tokens_in INTEGER DEFAULT 0, tokens_out INTEGER DEFAULT 0, cost_usd REAL DEFAULT 0, latency_ms INTEGER DEFAULT 0, reroll_count INTEGER DEFAULT 0, favorite INTEGER DEFAULT 0, memo TEXT, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_saju_created ON saju_records(created_at DESC) """) conn.execute(""" CREATE TABLE IF NOT EXISTS compat_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, person_a TEXT NOT NULL, person_b TEXT NOT NULL, saju_a TEXT NOT NULL, saju_b TEXT NOT NULL, score INTEGER NOT NULL, breakdown TEXT NOT NULL, interpretation_json TEXT, model TEXT, tokens_in INTEGER DEFAULT 0, tokens_out INTEGER DEFAULT 0, cost_usd REAL DEFAULT 0, latency_ms INTEGER DEFAULT 0, reroll_count INTEGER DEFAULT 0, favorite INTEGER DEFAULT 0, memo TEXT, created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) # saju_records CRUD def save_saju_record(data: Dict[str, Any]) -> int: with _conn() as conn: cur = conn.execute( """INSERT INTO saju_records (birth_year, birth_month, birth_day, birth_hour, gender, calendar_type, saju_data, analysis_data, daeun_data, interpretation_json, model, tokens_in, tokens_out, cost_usd, latency_ms, reroll_count) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( data["birth_year"], data["birth_month"], data["birth_day"], data.get("birth_hour"), data["gender"], data.get("calendar_type", "solar"), json.dumps(data["saju_data"], ensure_ascii=False), json.dumps(data["analysis_data"], ensure_ascii=False), json.dumps(data["daeun_data"], ensure_ascii=False), json.dumps(data.get("interpretation_json"), ensure_ascii=False) if data.get("interpretation_json") else None, data.get("model"), data.get("tokens_in", 0), data.get("tokens_out", 0), data.get("cost_usd", 0.0), data.get("latency_ms", 0), data.get("reroll_count", 0), ), ) return int(cur.lastrowid) def get_saju_record(record_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM saju_records WHERE id=?", (record_id,)).fetchone() return _saju_row_to_dict(r) if r else None def list_saju_records(page: int = 1, size: int = 20, favorite: Optional[bool] = None) -> Dict[str, Any]: wheres, params = [], [] if favorite is not None: wheres.append("favorite=?") params.append(1 if favorite else 0) where_sql = ("WHERE " + " AND ".join(wheres)) if wheres else "" offset = (page - 1) * size with _conn() as conn: total = conn.execute(f"SELECT COUNT(*) c FROM saju_records {where_sql}", params).fetchone()["c"] rows = conn.execute( f"SELECT * FROM saju_records {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ?", params + [size, offset], ).fetchall() return { "items": [_saju_row_to_dict(r) for r in rows], "page": page, "size": size, "total": int(total), } def update_saju_record(record_id: int, **kwargs) -> None: sets, vals = [], [] if "favorite" in kwargs and kwargs["favorite"] is not None: sets.append("favorite=?"); vals.append(1 if kwargs["favorite"] else 0) if "memo" in kwargs and kwargs["memo"] is not None: sets.append("memo=?"); vals.append(kwargs["memo"]) if not sets: return vals.append(record_id) with _conn() as conn: conn.execute(f"UPDATE saju_records SET {','.join(sets)} WHERE id=?", vals) def delete_saju_record(record_id: int) -> None: with _conn() as conn: conn.execute("DELETE FROM saju_records WHERE id=?", (record_id,)) def _saju_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "created_at": r["created_at"], "birth_year": r["birth_year"], "birth_month": r["birth_month"], "birth_day": r["birth_day"], "birth_hour": r["birth_hour"], "gender": r["gender"], "calendar_type": r["calendar_type"], "saju_data": json.loads(r["saju_data"]) if r["saju_data"] else None, "analysis_data": json.loads(r["analysis_data"]) if r["analysis_data"] else None, "daeun_data": json.loads(r["daeun_data"]) if r["daeun_data"] else None, "interpretation_json": json.loads(r["interpretation_json"]) if r["interpretation_json"] else None, "model": r["model"], "tokens_in": r["tokens_in"], "tokens_out": r["tokens_out"], "cost_usd": r["cost_usd"], "latency_ms": r["latency_ms"], "reroll_count": r["reroll_count"], "favorite": int(r["favorite"]), "memo": r["memo"], } # compat_records CRUD def save_compat_record(data: Dict[str, Any]) -> int: with _conn() as conn: cur = conn.execute( """INSERT INTO compat_records (person_a, person_b, saju_a, saju_b, score, breakdown, interpretation_json, model, tokens_in, tokens_out, cost_usd, latency_ms, reroll_count) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( json.dumps(data["person_a"], ensure_ascii=False), json.dumps(data["person_b"], ensure_ascii=False), json.dumps(data["saju_a"], ensure_ascii=False), json.dumps(data["saju_b"], ensure_ascii=False), data["score"], json.dumps(data["breakdown"], ensure_ascii=False), json.dumps(data.get("interpretation_json"), ensure_ascii=False) if data.get("interpretation_json") else None, data.get("model"), data.get("tokens_in", 0), data.get("tokens_out", 0), data.get("cost_usd", 0.0), data.get("latency_ms", 0), data.get("reroll_count", 0), ), ) return int(cur.lastrowid) def get_compat_record(record_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM compat_records WHERE id=?", (record_id,)).fetchone() return _compat_row_to_dict(r) if r else None def list_compat_records(page: int = 1, size: int = 20, favorite: Optional[bool] = None) -> Dict[str, Any]: wheres, params = [], [] if favorite is not None: wheres.append("favorite=?"); params.append(1 if favorite else 0) where_sql = ("WHERE " + " AND ".join(wheres)) if wheres else "" offset = (page - 1) * size with _conn() as conn: total = conn.execute(f"SELECT COUNT(*) c FROM compat_records {where_sql}", params).fetchone()["c"] rows = conn.execute( f"SELECT * FROM compat_records {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ?", params + [size, offset], ).fetchall() return { "items": [_compat_row_to_dict(r) for r in rows], "page": page, "size": size, "total": int(total), } def update_compat_record(record_id: int, **kwargs) -> None: sets, vals = [], [] if "favorite" in kwargs and kwargs["favorite"] is not None: sets.append("favorite=?"); vals.append(1 if kwargs["favorite"] else 0) if "memo" in kwargs and kwargs["memo"] is not None: sets.append("memo=?"); vals.append(kwargs["memo"]) if not sets: return vals.append(record_id) with _conn() as conn: conn.execute(f"UPDATE compat_records SET {','.join(sets)} WHERE id=?", vals) def delete_compat_record(record_id: int) -> None: with _conn() as conn: conn.execute("DELETE FROM compat_records WHERE id=?", (record_id,)) def _compat_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "created_at": r["created_at"], "person_a": json.loads(r["person_a"]), "person_b": json.loads(r["person_b"]), "saju_a": json.loads(r["saju_a"]), "saju_b": json.loads(r["saju_b"]), "score": r["score"], "breakdown": json.loads(r["breakdown"]), "interpretation_json": json.loads(r["interpretation_json"]) if r["interpretation_json"] else None, "model": r["model"], "tokens_in": r["tokens_in"], "tokens_out": r["tokens_out"], "cost_usd": r["cost_usd"], "latency_ms": r["latency_ms"], "reroll_count": r["reroll_count"], "favorite": int(r["favorite"]), "memo": r["memo"], }