239 lines
9.8 KiB
Python
239 lines
9.8 KiB
Python
"""saju.db SQLite — saju_records + compat_records CRUD."""
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
from typing import Any, Dict, Optional
|
|
|
|
from .config import DB_PATH
|
|
|
|
|
|
def _conn() -> sqlite3.Connection:
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH, timeout=120.0)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=120000")
|
|
return conn
|
|
|
|
|
|
def init_db() -> None:
|
|
with _conn() as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS saju_records (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
birth_year INTEGER NOT NULL,
|
|
birth_month INTEGER NOT NULL,
|
|
birth_day INTEGER NOT NULL,
|
|
birth_hour INTEGER,
|
|
gender TEXT NOT NULL,
|
|
calendar_type TEXT DEFAULT 'solar',
|
|
saju_data TEXT NOT NULL,
|
|
analysis_data TEXT NOT NULL,
|
|
daeun_data TEXT NOT NULL,
|
|
interpretation_json TEXT,
|
|
model TEXT,
|
|
tokens_in INTEGER DEFAULT 0,
|
|
tokens_out INTEGER DEFAULT 0,
|
|
cost_usd REAL DEFAULT 0,
|
|
latency_ms INTEGER DEFAULT 0,
|
|
reroll_count INTEGER DEFAULT 0,
|
|
favorite INTEGER DEFAULT 0,
|
|
memo TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_saju_created
|
|
ON saju_records(created_at DESC)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS compat_records (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
person_a TEXT NOT NULL,
|
|
person_b TEXT NOT NULL,
|
|
saju_a TEXT NOT NULL,
|
|
saju_b TEXT NOT NULL,
|
|
score INTEGER NOT NULL,
|
|
breakdown TEXT NOT NULL,
|
|
interpretation_json TEXT,
|
|
model TEXT,
|
|
tokens_in INTEGER DEFAULT 0,
|
|
tokens_out INTEGER DEFAULT 0,
|
|
cost_usd REAL DEFAULT 0,
|
|
latency_ms INTEGER DEFAULT 0,
|
|
reroll_count INTEGER DEFAULT 0,
|
|
favorite INTEGER DEFAULT 0,
|
|
memo TEXT,
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
)
|
|
""")
|
|
|
|
|
|
# saju_records CRUD
|
|
def save_saju_record(data: Dict[str, Any]) -> int:
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"""INSERT INTO saju_records
|
|
(birth_year, birth_month, birth_day, birth_hour, gender, calendar_type,
|
|
saju_data, analysis_data, daeun_data, interpretation_json,
|
|
model, tokens_in, tokens_out, cost_usd, latency_ms, reroll_count)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
|
|
(
|
|
data["birth_year"], data["birth_month"], data["birth_day"],
|
|
data.get("birth_hour"), data["gender"], data.get("calendar_type", "solar"),
|
|
json.dumps(data["saju_data"], ensure_ascii=False),
|
|
json.dumps(data["analysis_data"], ensure_ascii=False),
|
|
json.dumps(data["daeun_data"], ensure_ascii=False),
|
|
json.dumps(data.get("interpretation_json"), ensure_ascii=False) if data.get("interpretation_json") else None,
|
|
data.get("model"),
|
|
data.get("tokens_in", 0), data.get("tokens_out", 0),
|
|
data.get("cost_usd", 0.0), data.get("latency_ms", 0),
|
|
data.get("reroll_count", 0),
|
|
),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def get_saju_record(record_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT * FROM saju_records WHERE id=?", (record_id,)).fetchone()
|
|
return _saju_row_to_dict(r) if r else None
|
|
|
|
|
|
def list_saju_records(page: int = 1, size: int = 20, favorite: Optional[bool] = None) -> Dict[str, Any]:
|
|
wheres, params = [], []
|
|
if favorite is not None:
|
|
wheres.append("favorite=?")
|
|
params.append(1 if favorite else 0)
|
|
where_sql = ("WHERE " + " AND ".join(wheres)) if wheres else ""
|
|
offset = (page - 1) * size
|
|
with _conn() as conn:
|
|
total = conn.execute(f"SELECT COUNT(*) c FROM saju_records {where_sql}", params).fetchone()["c"]
|
|
rows = conn.execute(
|
|
f"SELECT * FROM saju_records {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ?",
|
|
params + [size, offset],
|
|
).fetchall()
|
|
return {
|
|
"items": [_saju_row_to_dict(r) for r in rows],
|
|
"page": page, "size": size, "total": int(total),
|
|
}
|
|
|
|
|
|
def update_saju_record(record_id: int, **kwargs) -> None:
|
|
sets, vals = [], []
|
|
if "favorite" in kwargs and kwargs["favorite"] is not None:
|
|
sets.append("favorite=?"); vals.append(1 if kwargs["favorite"] else 0)
|
|
if "memo" in kwargs and kwargs["memo"] is not None:
|
|
sets.append("memo=?"); vals.append(kwargs["memo"])
|
|
if not sets:
|
|
return
|
|
vals.append(record_id)
|
|
with _conn() as conn:
|
|
conn.execute(f"UPDATE saju_records SET {','.join(sets)} WHERE id=?", vals)
|
|
|
|
|
|
def delete_saju_record(record_id: int) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("DELETE FROM saju_records WHERE id=?", (record_id,))
|
|
|
|
|
|
def _saju_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"created_at": r["created_at"],
|
|
"birth_year": r["birth_year"], "birth_month": r["birth_month"], "birth_day": r["birth_day"],
|
|
"birth_hour": r["birth_hour"], "gender": r["gender"], "calendar_type": r["calendar_type"],
|
|
"saju_data": json.loads(r["saju_data"]) if r["saju_data"] else None,
|
|
"analysis_data": json.loads(r["analysis_data"]) if r["analysis_data"] else None,
|
|
"daeun_data": json.loads(r["daeun_data"]) if r["daeun_data"] else None,
|
|
"interpretation_json": json.loads(r["interpretation_json"]) if r["interpretation_json"] else None,
|
|
"model": r["model"], "tokens_in": r["tokens_in"], "tokens_out": r["tokens_out"],
|
|
"cost_usd": r["cost_usd"], "latency_ms": r["latency_ms"], "reroll_count": r["reroll_count"],
|
|
"favorite": int(r["favorite"]), "memo": r["memo"],
|
|
}
|
|
|
|
|
|
# compat_records CRUD
|
|
def save_compat_record(data: Dict[str, Any]) -> int:
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"""INSERT INTO compat_records
|
|
(person_a, person_b, saju_a, saju_b, score, breakdown,
|
|
interpretation_json, model, tokens_in, tokens_out,
|
|
cost_usd, latency_ms, reroll_count)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
|
|
(
|
|
json.dumps(data["person_a"], ensure_ascii=False),
|
|
json.dumps(data["person_b"], ensure_ascii=False),
|
|
json.dumps(data["saju_a"], ensure_ascii=False),
|
|
json.dumps(data["saju_b"], ensure_ascii=False),
|
|
data["score"],
|
|
json.dumps(data["breakdown"], ensure_ascii=False),
|
|
json.dumps(data.get("interpretation_json"), ensure_ascii=False) if data.get("interpretation_json") else None,
|
|
data.get("model"),
|
|
data.get("tokens_in", 0), data.get("tokens_out", 0),
|
|
data.get("cost_usd", 0.0), data.get("latency_ms", 0),
|
|
data.get("reroll_count", 0),
|
|
),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def get_compat_record(record_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT * FROM compat_records WHERE id=?", (record_id,)).fetchone()
|
|
return _compat_row_to_dict(r) if r else None
|
|
|
|
|
|
def list_compat_records(page: int = 1, size: int = 20, favorite: Optional[bool] = None) -> Dict[str, Any]:
|
|
wheres, params = [], []
|
|
if favorite is not None:
|
|
wheres.append("favorite=?"); params.append(1 if favorite else 0)
|
|
where_sql = ("WHERE " + " AND ".join(wheres)) if wheres else ""
|
|
offset = (page - 1) * size
|
|
with _conn() as conn:
|
|
total = conn.execute(f"SELECT COUNT(*) c FROM compat_records {where_sql}", params).fetchone()["c"]
|
|
rows = conn.execute(
|
|
f"SELECT * FROM compat_records {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ?",
|
|
params + [size, offset],
|
|
).fetchall()
|
|
return {
|
|
"items": [_compat_row_to_dict(r) for r in rows],
|
|
"page": page, "size": size, "total": int(total),
|
|
}
|
|
|
|
|
|
def update_compat_record(record_id: int, **kwargs) -> None:
|
|
sets, vals = [], []
|
|
if "favorite" in kwargs and kwargs["favorite"] is not None:
|
|
sets.append("favorite=?"); vals.append(1 if kwargs["favorite"] else 0)
|
|
if "memo" in kwargs and kwargs["memo"] is not None:
|
|
sets.append("memo=?"); vals.append(kwargs["memo"])
|
|
if not sets:
|
|
return
|
|
vals.append(record_id)
|
|
with _conn() as conn:
|
|
conn.execute(f"UPDATE compat_records SET {','.join(sets)} WHERE id=?", vals)
|
|
|
|
|
|
def delete_compat_record(record_id: int) -> None:
|
|
with _conn() as conn:
|
|
conn.execute("DELETE FROM compat_records WHERE id=?", (record_id,))
|
|
|
|
|
|
def _compat_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"created_at": r["created_at"],
|
|
"person_a": json.loads(r["person_a"]),
|
|
"person_b": json.loads(r["person_b"]),
|
|
"saju_a": json.loads(r["saju_a"]),
|
|
"saju_b": json.loads(r["saju_b"]),
|
|
"score": r["score"],
|
|
"breakdown": json.loads(r["breakdown"]),
|
|
"interpretation_json": json.loads(r["interpretation_json"]) if r["interpretation_json"] else None,
|
|
"model": r["model"], "tokens_in": r["tokens_in"], "tokens_out": r["tokens_out"],
|
|
"cost_usd": r["cost_usd"], "latency_ms": r["latency_ms"], "reroll_count": r["reroll_count"],
|
|
"favorite": int(r["favorite"]), "memo": r["memo"],
|
|
}
|