Files
web-page-backend/lotto/app/db.py
gahusb 2b5009f864 fix(sqlite): WAL + busy_timeout 120s standardized across all labs
8개 lab의 _conn() 함수에 표준 동시성 패턴 통일:
- timeout=120.0 (connection 획득)
- PRAGMA journal_mode=WAL (reader/writer 분리)
- PRAGMA busy_timeout=120000 (트랜잭션 충돌 시 120초 대기)

stock-lab/screener/router.py 의 검증된 패턴(d9b6122) 을 lotto, stock-lab(메인),
music-lab, blog-lab, realestate-lab, agent-office, personal, travel-proxy 로 확산.
기존 'database is locked' 오류 윈도우를 흡수.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 22:12:01 +09:00

1231 lines
49 KiB
Python

# backend/app/db.py
import os
import sqlite3
import json
import hashlib
from typing import Any, Dict, Optional, List
DB_PATH = "/app/data/lotto.db"
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn
def _ensure_column(conn: sqlite3.Connection, table: str, col: str, ddl: str) -> None:
cols = {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
if col not in cols:
conn.execute(ddl)
def init_db() -> None:
with _conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS draws (
drw_no INTEGER PRIMARY KEY,
drw_date TEXT NOT NULL,
n1 INTEGER NOT NULL,
n2 INTEGER NOT NULL,
n3 INTEGER NOT NULL,
n4 INTEGER NOT NULL,
n5 INTEGER NOT NULL,
n6 INTEGER NOT NULL,
bonus INTEGER NOT NULL,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_draws_date ON draws(drw_date);")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
based_on_draw INTEGER,
numbers TEXT NOT NULL,
params TEXT NOT NULL
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_reco_created ON recommendations(created_at DESC);")
# ✅ 확장 컬럼들(기존 DB에도 자동 추가)
_ensure_column(conn, "recommendations", "numbers_sorted",
"ALTER TABLE recommendations ADD COLUMN numbers_sorted TEXT;")
_ensure_column(conn, "recommendations", "dedup_hash",
"ALTER TABLE recommendations ADD COLUMN dedup_hash TEXT;")
_ensure_column(conn, "recommendations", "favorite",
"ALTER TABLE recommendations ADD COLUMN favorite INTEGER NOT NULL DEFAULT 0;")
_ensure_column(conn, "recommendations", "note",
"ALTER TABLE recommendations ADD COLUMN note TEXT NOT NULL DEFAULT '';")
_ensure_column(conn, "recommendations", "tags",
"ALTER TABLE recommendations ADD COLUMN tags TEXT NOT NULL DEFAULT '[]';")
# ✅ 결과 채점용 컬럼 추가
_ensure_column(conn, "recommendations", "rank",
"ALTER TABLE recommendations ADD COLUMN rank INTEGER;")
_ensure_column(conn, "recommendations", "correct_count",
"ALTER TABLE recommendations ADD COLUMN correct_count INTEGER DEFAULT 0;")
_ensure_column(conn, "recommendations", "has_bonus",
"ALTER TABLE recommendations ADD COLUMN has_bonus INTEGER DEFAULT 0;")
_ensure_column(conn, "recommendations", "checked",
"ALTER TABLE recommendations ADD COLUMN checked INTEGER DEFAULT 0;")
# ✅ UNIQUE 인덱스(중복 저장 방지)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_reco_dedup ON recommendations(dedup_hash);")
# ── 시뮬레이션 테이블 ─────────────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS simulation_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_at TEXT NOT NULL DEFAULT (datetime('now')),
strategy TEXT NOT NULL DEFAULT 'monte_carlo',
total_generated INTEGER NOT NULL DEFAULT 0,
top_k_selected INTEGER NOT NULL DEFAULT 0,
avg_score REAL,
notes TEXT DEFAULT ''
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_simrun_at ON simulation_runs(run_at DESC);"
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS simulation_candidates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER NOT NULL,
numbers TEXT NOT NULL,
score_total REAL NOT NULL,
score_frequency REAL,
score_fingerprint REAL,
score_gap REAL,
score_cooccur REAL,
score_diversity REAL,
is_best INTEGER DEFAULT 0,
based_on_draw INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY(run_id) REFERENCES simulation_runs(id)
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_simcand_run "
"ON simulation_candidates(run_id, score_total DESC);"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_simcand_best "
"ON simulation_candidates(is_best, score_total DESC);"
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS best_picks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numbers TEXT NOT NULL,
score_total REAL NOT NULL,
rank_in_run INTEGER,
source_run_id INTEGER,
based_on_draw INTEGER,
is_active INTEGER DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY(source_run_id) REFERENCES simulation_runs(id)
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_bestpicks_active "
"ON best_picks(is_active, score_total DESC);"
)
# ── purchase_history 테이블 ────────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS purchase_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draw_no INTEGER NOT NULL,
amount INTEGER NOT NULL,
sets INTEGER NOT NULL DEFAULT 1,
prize INTEGER NOT NULL DEFAULT 0,
note TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_purchase_draw ON purchase_history(draw_no DESC);")
# ── purchase_history 컬럼 확장 (기존 데이터 보존) ──────────────────────
_ensure_column(conn, "purchase_history", "numbers",
"ALTER TABLE purchase_history ADD COLUMN numbers TEXT NOT NULL DEFAULT '[]'")
_ensure_column(conn, "purchase_history", "is_real",
"ALTER TABLE purchase_history ADD COLUMN is_real INTEGER NOT NULL DEFAULT 1")
_ensure_column(conn, "purchase_history", "source_strategy",
"ALTER TABLE purchase_history ADD COLUMN source_strategy TEXT NOT NULL DEFAULT 'manual'")
_ensure_column(conn, "purchase_history", "source_detail",
"ALTER TABLE purchase_history ADD COLUMN source_detail TEXT NOT NULL DEFAULT '{}'")
_ensure_column(conn, "purchase_history", "checked",
"ALTER TABLE purchase_history ADD COLUMN checked INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "purchase_history", "results",
"ALTER TABLE purchase_history ADD COLUMN results TEXT NOT NULL DEFAULT '[]'")
_ensure_column(conn, "purchase_history", "total_prize",
"ALTER TABLE purchase_history ADD COLUMN total_prize INTEGER NOT NULL DEFAULT 0")
# ── strategy_performance 테이블 ────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS strategy_performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy TEXT NOT NULL,
draw_no INTEGER NOT NULL,
sets_count INTEGER NOT NULL DEFAULT 0,
total_correct INTEGER NOT NULL DEFAULT 0,
max_correct INTEGER NOT NULL DEFAULT 0,
prize_total INTEGER NOT NULL DEFAULT 0,
avg_score REAL NOT NULL DEFAULT 0.0,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
UNIQUE(strategy, draw_no)
);
"""
)
# ── strategy_weights 테이블 ────────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS strategy_weights (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy TEXT NOT NULL UNIQUE,
weight REAL NOT NULL DEFAULT 0.2,
ema_score REAL NOT NULL DEFAULT 0.15,
total_sets INTEGER NOT NULL DEFAULT 0,
total_hits_3plus INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
"""
)
# strategy_weights 초기값 시드 (이미 있으면 무시)
_INIT_WEIGHTS = [
("combined", 0.30, 0.15),
("simulation", 0.25, 0.15),
("heatmap", 0.20, 0.15),
("manual", 0.15, 0.15),
("custom", 0.10, 0.15),
]
for strat, w, ema in _INIT_WEIGHTS:
conn.execute(
"INSERT OR IGNORE INTO strategy_weights (strategy, weight, ema_score) VALUES (?, ?, ?)",
(strat, w, ema),
)
# ── weekly_reports 캐시 테이블 ──────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS weekly_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
drw_no INTEGER UNIQUE NOT NULL,
report TEXT NOT NULL,
generated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
"""
)
# ── 추가 인덱스 ───────────────────────────────────────────────────────
conn.execute("CREATE INDEX IF NOT EXISTS idx_reco_based_checked ON recommendations(based_on_draw, checked)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_purchase_strategy ON purchase_history(source_strategy)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_purchase_checked ON purchase_history(draw_no, checked)")
# ── lotto_briefings 테이블 ─────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS lotto_briefings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draw_no INTEGER UNIQUE NOT NULL,
picks TEXT NOT NULL,
narrative TEXT NOT NULL,
confidence INTEGER NOT NULL,
model TEXT NOT NULL,
tokens_input INTEGER NOT NULL DEFAULT 0,
tokens_output INTEGER NOT NULL DEFAULT 0,
cache_read INTEGER NOT NULL DEFAULT 0,
cache_write INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER NOT NULL DEFAULT 0,
source TEXT NOT NULL DEFAULT 'auto',
generated_at TEXT NOT NULL DEFAULT (datetime('now','localtime'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_briefings_draw ON lotto_briefings(draw_no DESC)")
# ── weekly_review 테이블 (큐레이터 자기 평가 + 사용자 패턴 갭) ────────
conn.execute("""
CREATE TABLE IF NOT EXISTS weekly_review (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draw_no INTEGER UNIQUE NOT NULL,
curator_avg_match REAL,
curator_best_tier TEXT,
curator_best_match INTEGER,
curator_5plus_prizes INTEGER,
user_avg_match REAL,
user_best_match INTEGER,
user_5plus_prizes INTEGER,
user_pattern_summary TEXT,
draw_pattern_summary TEXT,
pattern_delta TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now','localtime'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_review_draw ON weekly_review(draw_no DESC)")
# ── lotto_briefings.picks 4계층 마이그레이션 (1회 변환) ───────────────
# 기존: picks가 JSON 리스트 [{numbers,risk_tag,reason}]
# 신규: picks가 JSON 객체 {core:[...], bonus:[], extended:[], pool:[]}
rows = conn.execute("SELECT id, picks FROM lotto_briefings").fetchall()
for r in rows:
try:
p = json.loads(r["picks"])
if isinstance(p, list):
new_picks = {"core": p, "bonus": [], "extended": [], "pool": []}
conn.execute(
"UPDATE lotto_briefings SET picks=? WHERE id=?",
(json.dumps(new_picks, ensure_ascii=False), r["id"]),
)
except (json.JSONDecodeError, TypeError):
continue
_ensure_column(conn, "lotto_briefings", "tier_rationale",
"ALTER TABLE lotto_briefings ADD COLUMN tier_rationale TEXT NOT NULL DEFAULT '{}'")
def upsert_draw(row: Dict[str, Any]) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(drw_no) DO UPDATE SET
drw_date=excluded.drw_date,
n1=excluded.n1, n2=excluded.n2, n3=excluded.n3,
n4=excluded.n4, n5=excluded.n5, n6=excluded.n6,
bonus=excluded.bonus,
updated_at=datetime('now')
""",
(
int(row["drw_no"]),
str(row["drw_date"]),
int(row["n1"]), int(row["n2"]), int(row["n3"]),
int(row["n4"]), int(row["n5"]), int(row["n6"]),
int(row["bonus"]),
),
)
def upsert_many_draws(rows: List[Dict[str, Any]]) -> None:
data = [
(
int(r["drw_no"]), str(r["drw_date"]),
int(r["n1"]), int(r["n2"]), int(r["n3"]),
int(r["n4"]), int(r["n5"]), int(r["n6"]),
int(r["bonus"])
) for r in rows
]
with _conn() as conn:
conn.executemany(
"""
INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(drw_no) DO UPDATE SET
drw_date=excluded.drw_date,
n1=excluded.n1, n2=excluded.n2, n3=excluded.n3,
n4=excluded.n4, n5=excluded.n5, n6=excluded.n6,
bonus=excluded.bonus,
updated_at=datetime('now')
""",
data
)
def get_latest_draw() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM draws ORDER BY drw_no DESC LIMIT 1").fetchone()
return dict(r) if r else None
def get_draw(drw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM draws WHERE drw_no = ?", (drw_no,)).fetchone()
return dict(r) if r else None
def count_draws() -> int:
with _conn() as conn:
r = conn.execute("SELECT COUNT(*) AS c FROM draws").fetchone()
return int(r["c"])
def get_all_draw_numbers():
with _conn() as conn:
rows = conn.execute(
"SELECT drw_no, n1, n2, n3, n4, n5, n6 FROM draws ORDER BY drw_no ASC"
).fetchall()
return [(int(r["drw_no"]), [int(r["n1"]), int(r["n2"]), int(r["n3"]), int(r["n4"]), int(r["n5"]), int(r["n6"])]) for r in rows]
# ---------- ✅ recommendation helpers ----------
def _canonical_params(params: dict) -> str:
return json.dumps(params, sort_keys=True, separators=(",", ":"))
def _numbers_sorted_str(numbers: List[int]) -> str:
return ",".join(str(x) for x in sorted(numbers))
def _dedup_hash(based_on_draw: Optional[int], numbers: List[int], params: dict) -> str:
s = f"{based_on_draw or ''}|{_numbers_sorted_str(numbers)}|{_canonical_params(params)}"
return hashlib.sha1(s.encode("utf-8")).hexdigest()
def save_recommendation_dedup(based_on_draw: Optional[int], numbers: List[int], params: dict) -> Dict[str, Any]:
"""
✅ 동일 추천(번호+params+based_on_draw)이면 중복 저장 없이 기존 id 반환
"""
ns = _numbers_sorted_str(numbers)
h = _dedup_hash(based_on_draw, numbers, params)
with _conn() as conn:
# 이미 있으면 반환
r = conn.execute("SELECT id FROM recommendations WHERE dedup_hash = ?", (h,)).fetchone()
if r:
return {"id": int(r["id"]), "saved": False, "deduped": True}
cur = conn.execute(
"""
INSERT INTO recommendations (based_on_draw, numbers, params, numbers_sorted, dedup_hash)
VALUES (?, ?, ?, ?, ?)
""",
(based_on_draw, json.dumps(numbers), json.dumps(params), ns, h),
)
return {"id": int(cur.lastrowid), "saved": True, "deduped": False}
def list_recommendations_ex(
limit: int = 30,
offset: int = 0,
favorite: Optional[bool] = None,
tag: Optional[str] = None,
q: Optional[str] = None,
sort: str = "id_desc", # id_desc|created_desc|favorite_desc
) -> List[Dict[str, Any]]:
where = []
args: list[Any] = []
if favorite is not None:
where.append("favorite = ?")
args.append(1 if favorite else 0)
if q:
where.append("note LIKE ?")
args.append(f"%{q}%")
# tags는 JSON 문자열이므로 단순 LIKE로 처리(가볍게 시작)
if tag:
where.append("tags LIKE ?")
args.append(f"%{tag}%")
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
if sort == "created_desc":
order = "created_at DESC"
elif sort == "favorite_desc":
# favorite(1)이 먼저, 그 다음 최신
order = "favorite DESC, id DESC"
else:
order = "id DESC"
sql = f"""
SELECT id, created_at, based_on_draw, numbers, params, favorite, note, tags
FROM recommendations
{where_sql}
ORDER BY {order}
LIMIT ? OFFSET ?
"""
args.extend([int(limit), int(offset)])
with _conn() as conn:
rows = conn.execute(sql, args).fetchall()
out = []
for r in rows:
out.append({
"id": int(r["id"]),
"created_at": r["created_at"],
"based_on_draw": r["based_on_draw"],
"numbers": json.loads(r["numbers"]),
"params": json.loads(r["params"]),
"favorite": bool(r["favorite"]) if r["favorite"] is not None else False,
"note": r["note"],
"tags": json.loads(r["tags"]) if r["tags"] else [],
})
return out
def update_recommendation(rec_id: int, favorite: Optional[bool] = None, note: Optional[str] = None, tags: Optional[List[str]] = None) -> bool:
fields = []
args: list[Any] = []
if favorite is not None:
fields.append("favorite = ?")
args.append(1 if favorite else 0)
if note is not None:
fields.append("note = ?")
args.append(note)
if tags is not None:
fields.append("tags = ?")
args.append(json.dumps(tags))
if not fields:
return False
args.append(rec_id)
with _conn() as conn:
cur = conn.execute(
f"UPDATE recommendations SET {', '.join(fields)} WHERE id = ?",
args,
)
return cur.rowcount > 0
def delete_recommendation(rec_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM recommendations WHERE id = ?", (rec_id,))
return cur.rowcount > 0
def get_recommendation_performance() -> Dict[str, Any]:
"""채점된 추천 이력 기반 성과 통계"""
with _conn() as conn:
rows = conn.execute(
"SELECT correct_count, rank FROM recommendations WHERE checked = 1"
).fetchall()
if not rows:
return {
"total_checked": 0,
"avg_correct": 0.0,
"distribution": {str(i): 0 for i in range(7)},
"rate_3plus": 0.0,
"rate_4plus": 0.0,
"by_rank": {"rank_1": 0, "rank_2": 0, "rank_3": 0, "rank_4": 0, "rank_5": 0, "no_prize": 0},
"vs_random": {"our_avg": 0.0, "random_avg": 0.8, "improvement_pct": 0.0},
}
total = len(rows)
corrects = [r["correct_count"] or 0 for r in rows]
ranks = [r["rank"] or 0 for r in rows]
avg_correct = sum(corrects) / total
RANDOM_AVG = 0.8 # 이론 기댓값: 6 * (6/45)
improvement = (avg_correct - RANDOM_AVG) / RANDOM_AVG * 100
return {
"total_checked": total,
"avg_correct": round(avg_correct, 3),
"distribution": {str(i): corrects.count(i) for i in range(7)},
"rate_3plus": round(sum(1 for c in corrects if c >= 3) / total, 4),
"rate_4plus": round(sum(1 for c in corrects if c >= 4) / total, 4),
"by_rank": {
"rank_1": ranks.count(1),
"rank_2": ranks.count(2),
"rank_3": ranks.count(3),
"rank_4": ranks.count(4),
"rank_5": ranks.count(5),
"no_prize": ranks.count(0),
},
"vs_random": {
"our_avg": round(avg_correct, 3),
"random_avg": RANDOM_AVG,
"improvement_pct": round(improvement, 1),
},
}
def update_recommendation_result(rec_id: int, rank: int, correct_count: int, has_bonus: bool) -> bool:
with _conn() as conn:
cur = conn.execute(
"""
UPDATE recommendations
SET rank = ?, correct_count = ?, has_bonus = ?, checked = 1
WHERE id = ?
""",
(rank, correct_count, 1 if has_bonus else 0, rec_id)
)
return cur.rowcount > 0
# ── 시뮬레이션 CRUD ─────────────────────────────────────────────────────────
def save_simulation_run(
strategy: str,
total_generated: int,
top_k_selected: int,
avg_score: float,
notes: str = "",
) -> int:
"""시뮬레이션 실행 기록 저장, 생성된 ID 반환"""
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO simulation_runs (strategy, total_generated, top_k_selected, avg_score, notes)
VALUES (?, ?, ?, ?, ?)
""",
(strategy, total_generated, top_k_selected, round(avg_score, 6), notes),
)
return int(cur.lastrowid)
def save_simulation_candidates_bulk(
run_id: int,
candidates: List[Dict[str, Any]],
based_on_draw: Optional[int],
) -> None:
"""
상위 후보들을 simulation_candidates 테이블에 일괄 저장.
candidates 각 항목: {"numbers": [...], "score_total": ..., "score_*": ..., "is_best": bool}
"""
data = [
(
run_id,
json.dumps(sorted(c["numbers"])),
c["score_total"],
c.get("score_frequency"),
c.get("score_fingerprint"),
c.get("score_gap"),
c.get("score_cooccur"),
c.get("score_diversity"),
1 if c.get("is_best") else 0,
based_on_draw,
)
for c in candidates
]
with _conn() as conn:
conn.executemany(
"""
INSERT INTO simulation_candidates
(run_id, numbers, score_total, score_frequency, score_fingerprint,
score_gap, score_cooccur, score_diversity, is_best, based_on_draw)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
data,
)
def replace_best_picks(
picks: List[Dict[str, Any]],
run_id: int,
based_on_draw: Optional[int],
) -> None:
"""
기존 활성 best_picks를 비활성화하고 새 picks로 교체.
picks 각 항목: {"numbers": [...], "score_total": ..., "rank_in_run": int}
"""
with _conn() as conn:
conn.execute("UPDATE best_picks SET is_active = 0 WHERE is_active = 1")
data = [
(
json.dumps(sorted(p["numbers"])),
p["score_total"],
p.get("rank_in_run"),
run_id,
based_on_draw,
)
for p in picks
]
conn.executemany(
"""
INSERT INTO best_picks (numbers, score_total, rank_in_run, source_run_id, based_on_draw, is_active)
VALUES (?, ?, ?, ?, ?, 1)
""",
data,
)
def get_best_picks(limit: int = 20) -> List[Dict[str, Any]]:
"""현재 활성화된 best_picks 조회 (점수 내림차순)"""
with _conn() as conn:
rows = conn.execute(
"""
SELECT id, numbers, score_total, rank_in_run, source_run_id, based_on_draw, created_at
FROM best_picks
WHERE is_active = 1
ORDER BY score_total DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [
{
"id": int(r["id"]),
"numbers": json.loads(r["numbers"]),
"score_total": r["score_total"],
"rank_in_run": r["rank_in_run"],
"source_run_id": r["source_run_id"],
"based_on_draw": r["based_on_draw"],
"created_at": r["created_at"],
}
for r in rows
]
def get_simulation_runs(limit: int = 10) -> List[Dict[str, Any]]:
"""최근 시뮬레이션 실행 기록 조회"""
with _conn() as conn:
rows = conn.execute(
"""
SELECT id, run_at, strategy, total_generated, top_k_selected, avg_score, notes
FROM simulation_runs
ORDER BY id DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def get_simulation_candidates(run_id: int, limit: int = 100) -> List[Dict[str, Any]]:
"""특정 시뮬레이션 실행의 후보 목록 조회 (점수 내림차순)"""
with _conn() as conn:
rows = conn.execute(
"""
SELECT id, numbers, score_total, score_frequency, score_fingerprint,
score_gap, score_cooccur, score_diversity, is_best, based_on_draw, created_at
FROM simulation_candidates
WHERE run_id = ?
ORDER BY score_total DESC
LIMIT ?
""",
(run_id, limit),
).fetchall()
return [
{**dict(r), "numbers": json.loads(r["numbers"])}
for r in rows
]
# ── purchase_history CRUD ─────────────────────────────────────────────────────
def _purchase_row_to_dict(r) -> Dict[str, Any]:
keys = r.keys()
numbers_raw = r["numbers"] if "numbers" in keys else "[]"
detail_raw = r["source_detail"] if "source_detail" in keys else "{}"
results_raw = r["results"] if "results" in keys else "[]"
return {
"id": r["id"],
"draw_no": r["draw_no"],
"amount": r["amount"],
"sets": r["sets"],
"prize": r["prize"],
"note": r["note"],
"created_at": r["created_at"],
"numbers": json.loads(numbers_raw) if numbers_raw else [],
"is_real": r["is_real"] if "is_real" in keys else 1,
"source_strategy": r["source_strategy"] if "source_strategy" in keys else "manual",
"source_detail": json.loads(detail_raw) if detail_raw else {},
"checked": r["checked"] if "checked" in keys else 0,
"results": json.loads(results_raw) if results_raw else [],
"total_prize": r["total_prize"] if "total_prize" in keys else 0,
}
def add_purchase(draw_no: int, amount: int, sets: int, prize: int = 0, note: str = "",
numbers: list = None, is_real: bool = True,
source_strategy: str = "manual", source_detail: dict = None) -> Dict[str, Any]:
numbers_json = json.dumps(numbers or [], ensure_ascii=False)
detail_json = json.dumps(source_detail or {}, ensure_ascii=False)
is_real_int = 1 if is_real else 0
with _conn() as conn:
conn.execute(
"""INSERT INTO purchase_history
(draw_no, amount, sets, prize, note, numbers, is_real, source_strategy, source_detail)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(draw_no, amount, sets, prize, note, numbers_json, is_real_int, source_strategy, detail_json),
)
row = conn.execute("SELECT * FROM purchase_history WHERE rowid = last_insert_rowid()").fetchone()
return _purchase_row_to_dict(row)
def get_purchases(draw_no: int = None, days: int = None,
is_real: bool = None, strategy: str = None,
checked: bool = None) -> List[Dict[str, Any]]:
conditions, params = [], []
if draw_no is not None:
conditions.append("draw_no = ?")
params.append(draw_no)
if days:
conditions.append("created_at >= datetime('now', ? || ' days')")
params.append(f"-{days}")
if is_real is not None:
conditions.append("is_real = ?")
params.append(1 if is_real else 0)
if strategy is not None:
conditions.append("source_strategy = ?")
params.append(strategy)
if checked is not None:
conditions.append("checked = ?")
params.append(1 if checked else 0)
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
with _conn() as conn:
rows = conn.execute(
f"SELECT * FROM purchase_history {where} ORDER BY draw_no DESC, id DESC",
params,
).fetchall()
return [_purchase_row_to_dict(r) for r in rows]
def update_purchase(purchase_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
allowed = {"draw_no", "amount", "sets", "prize", "note", "numbers", "is_real", "source_strategy"}
updates = {k: v for k, v in data.items() if k in allowed}
if not updates:
with _conn() as conn:
row = conn.execute("SELECT * FROM purchase_history WHERE id = ?", (purchase_id,)).fetchone()
return _purchase_row_to_dict(row) if row else None
# SQLite에 전달 전 타입 변환
if "numbers" in updates:
updates["numbers"] = json.dumps(updates["numbers"], ensure_ascii=False)
if "is_real" in updates:
updates["is_real"] = 1 if updates["is_real"] else 0
set_clause = ", ".join(f"{k} = ?" for k in updates)
with _conn() as conn:
cur = conn.execute(
f"UPDATE purchase_history SET {set_clause} WHERE id = ?",
list(updates.values()) + [purchase_id],
)
if cur.rowcount == 0:
return None
row = conn.execute("SELECT * FROM purchase_history WHERE id = ?", (purchase_id,)).fetchone()
return _purchase_row_to_dict(row)
def delete_purchase(purchase_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM purchase_history WHERE id = ?", (purchase_id,))
return cur.rowcount > 0
def get_purchase_stats() -> Dict[str, Any]:
def _calc_group(rows):
if not rows:
return {"sets": 0, "invested": 0, "prize": 0, "roi": 0.0, "win_rate": 0.0}
invested = sum(r["amount"] for r in rows)
prize = sum(r.get("total_prize") or r["prize"] for r in rows)
wins = sum(1 for r in rows if (r.get("total_prize") or r["prize"]) > 0)
return {
"sets": sum(r["sets"] for r in rows),
"invested": invested,
"prize": prize,
"roi": round((prize / invested * 100 - 100) if invested else 0.0, 2),
"win_rate": round(wins / len(rows) * 100, 2) if rows else 0.0,
}
with _conn() as conn:
rows = conn.execute("SELECT * FROM purchase_history").fetchall()
all_rows = [dict(r) for r in rows]
real_rows = [r for r in all_rows if r.get("is_real", 1) == 1]
virtual_rows = [r for r in all_rows if r.get("is_real", 1) == 0]
# 전략별 집계
by_strategy: Dict[str, list] = {}
for r in all_rows:
strat = r.get("source_strategy", "manual")
if strat not in by_strategy:
by_strategy[strat] = []
by_strategy[strat].append(r)
strategy_stats: Dict[str, Any] = {}
for strat, srows in by_strategy.items():
s = _calc_group(srows)
total_correct = 0
count_sets = 0
hits_3plus = 0
for r in srows:
results_raw = r.get("results", "[]")
try:
results = json.loads(results_raw) if isinstance(results_raw, str) else (results_raw or [])
except Exception:
results = []
for res in results:
count_sets += 1
c = res.get("correct", 0)
total_correct += c
if c >= 3:
hits_3plus += 1
s["avg_correct"] = round(total_correct / count_sets, 2) if count_sets else 0.0
s["hits_3plus"] = hits_3plus
strategy_stats[strat] = s
total_invested = sum(r["amount"] for r in all_rows)
total_prize_sum = sum(r.get("total_prize") or r["prize"] for r in all_rows)
return {
"total": _calc_group(all_rows),
"real": _calc_group(real_rows),
"virtual": _calc_group(virtual_rows),
"by_strategy": strategy_stats,
# 하위호환
"total_records": len(all_rows),
"total_invested": total_invested,
"total_prize": total_prize_sum,
"net": total_prize_sum - total_invested,
"return_rate": round((total_prize_sum / total_invested * 100) if total_invested else 0.0, 2),
"prize_count": sum(1 for r in all_rows if (r.get("total_prize") or r["prize"]) > 0),
"max_prize": max((r.get("total_prize") or r["prize"] for r in all_rows), default=0),
}
# ── weekly_reports CRUD ───────────────────────────────────────────────────────
def save_weekly_report(drw_no: int, report_json: str) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO weekly_reports (drw_no, report)
VALUES (?, ?)
ON CONFLICT(drw_no) DO UPDATE SET
report = excluded.report,
generated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
""",
(drw_no, report_json),
)
def get_weekly_report_list(limit: int = 10) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT drw_no, generated_at FROM weekly_reports ORDER BY drw_no DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def get_weekly_report(drw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT drw_no, report, generated_at FROM weekly_reports WHERE drw_no = ?",
(drw_no,),
).fetchone()
if not row:
return None
return {"drw_no": row["drw_no"], "generated_at": row["generated_at"], **json.loads(row["report"])}
def get_all_recommendation_numbers() -> List[List[int]]:
"""개인 패턴 분석용: 저장된 모든 추천 번호 반환"""
with _conn() as conn:
rows = conn.execute("SELECT numbers FROM recommendations ORDER BY id DESC").fetchall()
return [json.loads(r["numbers"]) for r in rows]
# ── strategy_performance CRUD ─────────────────────────────────────────────────
def upsert_strategy_performance(strategy: str, draw_no: int, sets_count: int = 0,
total_correct: int = 0, max_correct: int = 0,
prize_total: int = 0, avg_score: float = 0.0) -> None:
with _conn() as conn:
conn.execute(
"""INSERT INTO strategy_performance (strategy, draw_no, sets_count, total_correct, max_correct, prize_total, avg_score)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(strategy, draw_no) DO UPDATE SET
sets_count=excluded.sets_count, total_correct=excluded.total_correct,
max_correct=excluded.max_correct, prize_total=excluded.prize_total,
avg_score=excluded.avg_score,
updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')""",
(strategy, draw_no, sets_count, total_correct, max_correct, prize_total, avg_score),
)
def get_strategy_performance(strategy: str = None, days: int = None) -> List[Dict[str, Any]]:
conditions, params = [], []
if strategy:
conditions.append("strategy = ?")
params.append(strategy)
if days:
conditions.append("updated_at >= datetime('now', ? || ' days')")
params.append(f"-{days}")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
with _conn() as conn:
rows = conn.execute(
f"SELECT * FROM strategy_performance {where} ORDER BY draw_no ASC",
params,
).fetchall()
return [dict(r) for r in rows]
# ── strategy_weights CRUD ─────────────────────────────────────────────────────
def get_strategy_weights() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM strategy_weights ORDER BY weight DESC").fetchall()
return [dict(r) for r in rows]
def update_strategy_weight(strategy: str, weight: float, ema_score: float,
total_sets: int = None, total_hits_3plus: int = None) -> None:
with _conn() as conn:
fields = "weight=?, ema_score=?, updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')"
params = [weight, ema_score]
if total_sets is not None:
fields += ", total_sets=?"
params.append(total_sets)
if total_hits_3plus is not None:
fields += ", total_hits_3plus=?"
params.append(total_hits_3plus)
params.append(strategy)
conn.execute(f"UPDATE strategy_weights SET {fields} WHERE strategy=?", params)
def update_purchase_results(purchase_id: int, results: list, total_prize: int) -> None:
"""구매 건의 결과를 갱신 (체커 호출 후)"""
with _conn() as conn:
conn.execute(
"UPDATE purchase_history SET results=?, total_prize=?, checked=1 WHERE id=?",
(json.dumps(results, ensure_ascii=False), total_prize, purchase_id),
)
def bulk_insert_purchases_from_briefing(draw_no: int, tier_mode: str, amount: int) -> Dict[str, Any]:
"""tier_mode 에 해당하는 큐레이터 picks 를 purchase_history 에 일괄 INSERT.
tier_mode: "core" | "core_bonus" | "core_bonus_extended" | "full"
"""
briefing = get_briefing(draw_no)
if not briefing:
return {"ok": False, "reason": "briefing not found"}
picks = briefing.get("picks") or {}
if isinstance(picks, list):
# 마이그레이션 이전 형태
picks = {"core": picks, "bonus": [], "extended": [], "pool": []}
tier_chain = {
"core": ["core"],
"core_bonus": ["core", "bonus"],
"core_bonus_extended": ["core", "bonus", "extended"],
"full": ["core", "bonus", "extended", "pool"],
}.get(tier_mode)
if not tier_chain:
return {"ok": False, "reason": f"unknown tier_mode: {tier_mode}"}
inserted_ids = []
with _conn() as conn:
for tier in tier_chain:
for idx, pick in enumerate(picks.get(tier) or []):
source_strategy = f"curator_{tier}"
source_detail = json.dumps({
"tier": tier,
"role": pick.get("risk_tag"),
"set_index": idx,
"draw_no": draw_no,
}, ensure_ascii=False)
numbers_json = json.dumps([pick.get("numbers")], ensure_ascii=False)
cur = conn.execute(
"""INSERT INTO purchase_history
(draw_no, amount, sets, prize, note, numbers, is_real, source_strategy, source_detail)
VALUES (?, ?, 1, 0, '', ?, 1, ?, ?)""",
(draw_no, 1000, numbers_json, source_strategy, source_detail),
)
inserted_ids.append(cur.lastrowid)
return {"ok": True, "inserted_ids": inserted_ids, "sets": len(inserted_ids)}
# --- Lotto Briefings ---
def save_briefing(data: Dict[str, Any]) -> int:
picks_json = json.dumps(data["picks"], ensure_ascii=False)
narrative_json = json.dumps(data["narrative"], ensure_ascii=False)
tier_rationale_json = json.dumps(data.get("tier_rationale") or {}, ensure_ascii=False)
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO lotto_briefings
(draw_no, picks, narrative, confidence, model,
tokens_input, tokens_output, cache_read, cache_write,
latency_ms, source, tier_rationale)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(draw_no) DO UPDATE SET
picks=excluded.picks,
narrative=excluded.narrative,
confidence=excluded.confidence,
model=excluded.model,
tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output,
cache_read=excluded.cache_read,
cache_write=excluded.cache_write,
latency_ms=excluded.latency_ms,
source=excluded.source,
tier_rationale=excluded.tier_rationale,
generated_at=datetime('now','localtime')
""",
(
data["draw_no"], picks_json, narrative_json,
data["confidence"], data["model"],
data.get("tokens_input", 0), data.get("tokens_output", 0),
data.get("cache_read", 0), data.get("cache_write", 0),
data.get("latency_ms", 0), data.get("source", "auto"),
tier_rationale_json,
),
)
return cur.lastrowid
def _briefing_row(r) -> Dict[str, Any]:
return {
"id": r["id"],
"draw_no": r["draw_no"],
"picks": json.loads(r["picks"]),
"narrative": json.loads(r["narrative"]),
"tier_rationale": json.loads(r["tier_rationale"]) if r["tier_rationale"] else {},
"confidence": r["confidence"],
"model": r["model"],
"tokens_input": r["tokens_input"],
"tokens_output": r["tokens_output"],
"cache_read": r["cache_read"],
"cache_write": r["cache_write"],
"latency_ms": r["latency_ms"],
"source": r["source"],
"generated_at": r["generated_at"],
}
def get_latest_briefing() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM lotto_briefings ORDER BY draw_no DESC LIMIT 1").fetchone()
return _briefing_row(r) if r else None
def get_briefing(draw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM lotto_briefings WHERE draw_no=?", (draw_no,)).fetchone()
return _briefing_row(r) if r else None
def list_briefings(limit: int = 10) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM lotto_briefings ORDER BY draw_no DESC LIMIT ?",
(limit,),
).fetchall()
return [_briefing_row(r) for r in rows]
def get_curator_usage(days: int = 30) -> Dict[str, Any]:
with _conn() as conn:
r = conn.execute("""
SELECT COUNT(*) AS calls,
SUM(tokens_input) AS in_tokens,
SUM(tokens_output) AS out_tokens,
SUM(cache_read) AS cache_read,
SUM(cache_write) AS cache_write,
AVG(latency_ms) AS avg_latency
FROM lotto_briefings
WHERE generated_at >= datetime('now', ?, 'localtime')
""", (f"-{int(days)} days",)).fetchone()
cr = int(r["cache_read"] or 0)
cw = int(r["cache_write"] or 0)
return {
"days": days,
"calls": int(r["calls"] or 0),
"tokens_input": int(r["in_tokens"] or 0),
"tokens_output": int(r["out_tokens"] or 0),
"cache_read": cr,
"cache_write": cw,
"cache_hit_rate": round(cr / (cr + cw), 3) if (cr + cw) > 0 else 0.0,
"avg_latency_ms": round(float(r["avg_latency"] or 0), 1),
}
def save_review(data: Dict[str, Any]) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO weekly_review (
draw_no,
curator_avg_match, curator_best_tier, curator_best_match, curator_5plus_prizes,
user_avg_match, user_best_match, user_5plus_prizes,
user_pattern_summary, draw_pattern_summary, pattern_delta
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(draw_no) DO UPDATE SET
curator_avg_match=excluded.curator_avg_match,
curator_best_tier=excluded.curator_best_tier,
curator_best_match=excluded.curator_best_match,
curator_5plus_prizes=excluded.curator_5plus_prizes,
user_avg_match=excluded.user_avg_match,
user_best_match=excluded.user_best_match,
user_5plus_prizes=excluded.user_5plus_prizes,
user_pattern_summary=excluded.user_pattern_summary,
draw_pattern_summary=excluded.draw_pattern_summary,
pattern_delta=excluded.pattern_delta
""",
(
data["draw_no"],
data.get("curator_avg_match"), data.get("curator_best_tier"),
data.get("curator_best_match"), data.get("curator_5plus_prizes"),
data.get("user_avg_match"), data.get("user_best_match"),
data.get("user_5plus_prizes"),
data.get("user_pattern_summary"), data.get("draw_pattern_summary"),
data.get("pattern_delta"),
),
)
return cur.lastrowid
def _review_row(r) -> Optional[Dict[str, Any]]:
if not r:
return None
return {
"id": r["id"],
"draw_no": r["draw_no"],
"curator_avg_match": r["curator_avg_match"],
"curator_best_tier": r["curator_best_tier"],
"curator_best_match": r["curator_best_match"],
"curator_5plus_prizes": r["curator_5plus_prizes"],
"user_avg_match": r["user_avg_match"],
"user_best_match": r["user_best_match"],
"user_5plus_prizes": r["user_5plus_prizes"],
"user_pattern_summary": r["user_pattern_summary"],
"draw_pattern_summary": r["draw_pattern_summary"],
"pattern_delta": r["pattern_delta"],
"created_at": r["created_at"],
}
def get_review(draw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM weekly_review WHERE draw_no=?", (draw_no,)).fetchone()
return _review_row(r)
def get_latest_review() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM weekly_review ORDER BY draw_no DESC LIMIT 1").fetchone()
return _review_row(r)
def get_reviews_range(start_drw: int, end_drw: int) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM weekly_review WHERE draw_no BETWEEN ? AND ? ORDER BY draw_no ASC",
(start_drw, end_drw),
).fetchall()
return [_review_row(r) for r in rows]
def list_reviews(limit: int = 10) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM weekly_review ORDER BY draw_no DESC LIMIT ?",
(limit,),
).fetchall()
return [_review_row(r) for r in rows]