# backend/app/db.py import os import sqlite3 import json import hashlib from typing import Any, Dict, Optional, List DB_PATH = "/app/data/lotto.db" def _conn() -> sqlite3.Connection: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def _ensure_column(conn: sqlite3.Connection, table: str, col: str, ddl: str) -> None: cols = {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()} if col not in cols: conn.execute(ddl) def init_db() -> None: with _conn() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS draws ( drw_no INTEGER PRIMARY KEY, drw_date TEXT NOT NULL, n1 INTEGER NOT NULL, n2 INTEGER NOT NULL, n3 INTEGER NOT NULL, n4 INTEGER NOT NULL, n5 INTEGER NOT NULL, n6 INTEGER NOT NULL, bonus INTEGER NOT NULL, updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_draws_date ON draws(drw_date);") conn.execute( """ CREATE TABLE IF NOT EXISTS recommendations ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL DEFAULT (datetime('now')), based_on_draw INTEGER, numbers TEXT NOT NULL, params TEXT NOT NULL ); """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_reco_created ON recommendations(created_at DESC);") # ✅ 확장 컬럼들(기존 DB에도 자동 추가) _ensure_column(conn, "recommendations", "numbers_sorted", "ALTER TABLE recommendations ADD COLUMN numbers_sorted TEXT;") _ensure_column(conn, "recommendations", "dedup_hash", "ALTER TABLE recommendations ADD COLUMN dedup_hash TEXT;") _ensure_column(conn, "recommendations", "favorite", "ALTER TABLE recommendations ADD COLUMN favorite INTEGER NOT NULL DEFAULT 0;") _ensure_column(conn, "recommendations", "note", "ALTER TABLE recommendations ADD COLUMN note TEXT NOT NULL DEFAULT '';") _ensure_column(conn, "recommendations", "tags", "ALTER TABLE recommendations ADD COLUMN tags TEXT NOT NULL DEFAULT '[]';") # ✅ 결과 채점용 컬럼 추가 _ensure_column(conn, "recommendations", "rank", "ALTER TABLE recommendations ADD COLUMN rank INTEGER;") _ensure_column(conn, "recommendations", "correct_count", "ALTER TABLE recommendations ADD COLUMN correct_count INTEGER DEFAULT 0;") _ensure_column(conn, "recommendations", "has_bonus", "ALTER TABLE recommendations ADD COLUMN has_bonus INTEGER DEFAULT 0;") _ensure_column(conn, "recommendations", "checked", "ALTER TABLE recommendations ADD COLUMN checked INTEGER DEFAULT 0;") # ✅ UNIQUE 인덱스(중복 저장 방지) conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_reco_dedup ON recommendations(dedup_hash);") # ── 시뮬레이션 테이블 ───────────────────────────────────────────────── conn.execute( """ CREATE TABLE IF NOT EXISTS simulation_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_at TEXT NOT NULL DEFAULT (datetime('now')), strategy TEXT NOT NULL DEFAULT 'monte_carlo', total_generated INTEGER NOT NULL DEFAULT 0, top_k_selected INTEGER NOT NULL DEFAULT 0, avg_score REAL, notes TEXT DEFAULT '' ); """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_simrun_at ON simulation_runs(run_at DESC);" ) conn.execute( """ CREATE TABLE IF NOT EXISTS simulation_candidates ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER NOT NULL, numbers TEXT NOT NULL, score_total REAL NOT NULL, score_frequency REAL, score_fingerprint REAL, score_gap REAL, score_cooccur REAL, score_diversity REAL, is_best INTEGER DEFAULT 0, based_on_draw INTEGER, created_at TEXT NOT NULL DEFAULT (datetime('now')), FOREIGN KEY(run_id) REFERENCES simulation_runs(id) ); """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_simcand_run " "ON simulation_candidates(run_id, score_total DESC);" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_simcand_best " "ON simulation_candidates(is_best, score_total DESC);" ) conn.execute( """ CREATE TABLE IF NOT EXISTS best_picks ( id INTEGER PRIMARY KEY AUTOINCREMENT, numbers TEXT NOT NULL, score_total REAL NOT NULL, rank_in_run INTEGER, source_run_id INTEGER, based_on_draw INTEGER, is_active INTEGER DEFAULT 1, created_at TEXT NOT NULL DEFAULT (datetime('now')), FOREIGN KEY(source_run_id) REFERENCES simulation_runs(id) ); """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_bestpicks_active " "ON best_picks(is_active, score_total DESC);" ) # ── todos 테이블 ─────────────────────────────────────────────────────── conn.execute( """ CREATE TABLE IF NOT EXISTS todos ( id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(4))) || '-' || lower(hex(randomblob(2)))), title TEXT NOT NULL, description TEXT, status TEXT NOT NULL DEFAULT 'todo' CHECK(status IN ('todo','in_progress','done')), created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ); """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_todos_created ON todos(created_at DESC);" ) # ── blog_posts 테이블 ────────────────────────────────────────────────── conn.execute( """ CREATE TABLE IF NOT EXISTS blog_posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, body TEXT NOT NULL DEFAULT '', excerpt TEXT NOT NULL DEFAULT '', tags TEXT NOT NULL DEFAULT '[]', date TEXT NOT NULL DEFAULT (date('now','localtime')), created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ); """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_blog_date ON blog_posts(date DESC);" ) # ── todos CRUD ─────────────────────────────────────────────────────────────── def _todo_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "title": r["title"], "description": r["description"], "status": r["status"], "created_at": r["created_at"], "updated_at": r["updated_at"], } def get_all_todos() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM todos ORDER BY created_at DESC" ).fetchall() return [_todo_row_to_dict(r) for r in rows] def create_todo(title: str, description: Optional[str], status: str) -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO todos (title, description, status) VALUES (?, ?, ?)", (title, description, status), ) row = conn.execute( "SELECT * FROM todos WHERE rowid = last_insert_rowid()" ).fetchone() return _todo_row_to_dict(row) def update_todo(todo_id: str, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]: """fields에 있는 항목만 업데이트 (PATCH 방식), updated_at 자동 갱신""" allowed = {"title", "description", "status"} updates = {k: v for k, v in fields.items() if k in allowed} if not updates: with _conn() as conn: row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone() return _todo_row_to_dict(row) if row else None set_clauses = ", ".join(f"{k} = ?" for k in updates) set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')" args = list(updates.values()) + [todo_id] with _conn() as conn: conn.execute( f"UPDATE todos SET {set_clauses} WHERE id = ?", args, ) row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone() return _todo_row_to_dict(row) if row else None def delete_todo(todo_id: str) -> bool: with _conn() as conn: cur = conn.execute("DELETE FROM todos WHERE id = ?", (todo_id,)) return cur.rowcount > 0 def delete_done_todos() -> int: with _conn() as conn: cur = conn.execute("DELETE FROM todos WHERE status = 'done'") return cur.rowcount # ── blog_posts CRUD ────────────────────────────────────────────────────────── def _post_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "title": r["title"], "body": r["body"], "excerpt": r["excerpt"], "tags": json.loads(r["tags"]) if r["tags"] else [], "date": r["date"], "created_at": r["created_at"], "updated_at": r["updated_at"], } def get_all_posts() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM blog_posts ORDER BY date DESC, id DESC" ).fetchall() return [_post_row_to_dict(r) for r in rows] def create_post(title: str, body: str, excerpt: str, tags: List[str], date: str) -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO blog_posts (title, body, excerpt, tags, date) VALUES (?, ?, ?, ?, ?)", (title, body, excerpt, json.dumps(tags), date), ) row = conn.execute( "SELECT * FROM blog_posts WHERE rowid = last_insert_rowid()" ).fetchone() return _post_row_to_dict(row) def update_post(post_id: int, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]: allowed = {"title", "body", "excerpt", "tags", "date"} updates = {k: v for k, v in fields.items() if k in allowed} if not updates: with _conn() as conn: row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone() return _post_row_to_dict(row) if row else None if "tags" in updates: updates["tags"] = json.dumps(updates["tags"]) set_clauses = ", ".join(f"{k} = ?" for k in updates) set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')" args = list(updates.values()) + [post_id] with _conn() as conn: conn.execute(f"UPDATE blog_posts SET {set_clauses} WHERE id = ?", args) row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone() return _post_row_to_dict(row) if row else None def delete_post(post_id: int) -> bool: with _conn() as conn: cur = conn.execute("DELETE FROM blog_posts WHERE id = ?", (post_id,)) return cur.rowcount > 0 def upsert_draw(row: Dict[str, Any]) -> None: with _conn() as conn: conn.execute( """ INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(drw_no) DO UPDATE SET drw_date=excluded.drw_date, n1=excluded.n1, n2=excluded.n2, n3=excluded.n3, n4=excluded.n4, n5=excluded.n5, n6=excluded.n6, bonus=excluded.bonus, updated_at=datetime('now') """, ( int(row["drw_no"]), str(row["drw_date"]), int(row["n1"]), int(row["n2"]), int(row["n3"]), int(row["n4"]), int(row["n5"]), int(row["n6"]), int(row["bonus"]), ), ) def upsert_many_draws(rows: List[Dict[str, Any]]) -> None: data = [ ( int(r["drw_no"]), str(r["drw_date"]), int(r["n1"]), int(r["n2"]), int(r["n3"]), int(r["n4"]), int(r["n5"]), int(r["n6"]), int(r["bonus"]) ) for r in rows ] with _conn() as conn: conn.executemany( """ INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(drw_no) DO UPDATE SET drw_date=excluded.drw_date, n1=excluded.n1, n2=excluded.n2, n3=excluded.n3, n4=excluded.n4, n5=excluded.n5, n6=excluded.n6, bonus=excluded.bonus, updated_at=datetime('now') """, data ) def get_latest_draw() -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM draws ORDER BY drw_no DESC LIMIT 1").fetchone() return dict(r) if r else None def get_draw(drw_no: int) -> Optional[Dict[str, Any]]: with _conn() as conn: r = conn.execute("SELECT * FROM draws WHERE drw_no = ?", (drw_no,)).fetchone() return dict(r) if r else None def count_draws() -> int: with _conn() as conn: r = conn.execute("SELECT COUNT(*) AS c FROM draws").fetchone() return int(r["c"]) def get_all_draw_numbers(): with _conn() as conn: rows = conn.execute( "SELECT drw_no, n1, n2, n3, n4, n5, n6 FROM draws ORDER BY drw_no ASC" ).fetchall() return [(int(r["drw_no"]), [int(r["n1"]), int(r["n2"]), int(r["n3"]), int(r["n4"]), int(r["n5"]), int(r["n6"])]) for r in rows] # ---------- ✅ recommendation helpers ---------- def _canonical_params(params: dict) -> str: return json.dumps(params, sort_keys=True, separators=(",", ":")) def _numbers_sorted_str(numbers: List[int]) -> str: return ",".join(str(x) for x in sorted(numbers)) def _dedup_hash(based_on_draw: Optional[int], numbers: List[int], params: dict) -> str: s = f"{based_on_draw or ''}|{_numbers_sorted_str(numbers)}|{_canonical_params(params)}" return hashlib.sha1(s.encode("utf-8")).hexdigest() def save_recommendation_dedup(based_on_draw: Optional[int], numbers: List[int], params: dict) -> Dict[str, Any]: """ ✅ 동일 추천(번호+params+based_on_draw)이면 중복 저장 없이 기존 id 반환 """ ns = _numbers_sorted_str(numbers) h = _dedup_hash(based_on_draw, numbers, params) with _conn() as conn: # 이미 있으면 반환 r = conn.execute("SELECT id FROM recommendations WHERE dedup_hash = ?", (h,)).fetchone() if r: return {"id": int(r["id"]), "saved": False, "deduped": True} cur = conn.execute( """ INSERT INTO recommendations (based_on_draw, numbers, params, numbers_sorted, dedup_hash) VALUES (?, ?, ?, ?, ?) """, (based_on_draw, json.dumps(numbers), json.dumps(params), ns, h), ) return {"id": int(cur.lastrowid), "saved": True, "deduped": False} def list_recommendations_ex( limit: int = 30, offset: int = 0, favorite: Optional[bool] = None, tag: Optional[str] = None, q: Optional[str] = None, sort: str = "id_desc", # id_desc|created_desc|favorite_desc ) -> List[Dict[str, Any]]: import json where = [] args: list[Any] = [] if favorite is not None: where.append("favorite = ?") args.append(1 if favorite else 0) if q: where.append("note LIKE ?") args.append(f"%{q}%") # tags는 JSON 문자열이므로 단순 LIKE로 처리(가볍게 시작) if tag: where.append("tags LIKE ?") args.append(f"%{tag}%") where_sql = ("WHERE " + " AND ".join(where)) if where else "" if sort == "created_desc": order = "created_at DESC" elif sort == "favorite_desc": # favorite(1)이 먼저, 그 다음 최신 order = "favorite DESC, id DESC" else: order = "id DESC" sql = f""" SELECT id, created_at, based_on_draw, numbers, params, favorite, note, tags FROM recommendations {where_sql} ORDER BY {order} LIMIT ? OFFSET ? """ args.extend([int(limit), int(offset)]) with _conn() as conn: rows = conn.execute(sql, args).fetchall() out = [] for r in rows: out.append({ "id": int(r["id"]), "created_at": r["created_at"], "based_on_draw": r["based_on_draw"], "numbers": json.loads(r["numbers"]), "params": json.loads(r["params"]), "favorite": bool(r["favorite"]) if r["favorite"] is not None else False, "note": r["note"], "tags": json.loads(r["tags"]) if r["tags"] else [], }) return out def update_recommendation(rec_id: int, favorite: Optional[bool] = None, note: Optional[str] = None, tags: Optional[List[str]] = None) -> bool: fields = [] args: list[Any] = [] if favorite is not None: fields.append("favorite = ?") args.append(1 if favorite else 0) if note is not None: fields.append("note = ?") args.append(note) if tags is not None: fields.append("tags = ?") args.append(json.dumps(tags)) if not fields: return False args.append(rec_id) with _conn() as conn: cur = conn.execute( f"UPDATE recommendations SET {', '.join(fields)} WHERE id = ?", args, ) return cur.rowcount > 0 def delete_recommendation(rec_id: int) -> bool: with _conn() as conn: cur = conn.execute("DELETE FROM recommendations WHERE id = ?", (rec_id,)) return cur.rowcount > 0 def update_recommendation_result(rec_id: int, rank: int, correct_count: int, has_bonus: bool) -> bool: with _conn() as conn: cur = conn.execute( """ UPDATE recommendations SET rank = ?, correct_count = ?, has_bonus = ?, checked = 1 WHERE id = ? """, (rank, correct_count, 1 if has_bonus else 0, rec_id) ) return cur.rowcount > 0 # ── 시뮬레이션 CRUD ───────────────────────────────────────────────────────── def save_simulation_run( strategy: str, total_generated: int, top_k_selected: int, avg_score: float, notes: str = "", ) -> int: """시뮬레이션 실행 기록 저장, 생성된 ID 반환""" with _conn() as conn: cur = conn.execute( """ INSERT INTO simulation_runs (strategy, total_generated, top_k_selected, avg_score, notes) VALUES (?, ?, ?, ?, ?) """, (strategy, total_generated, top_k_selected, round(avg_score, 6), notes), ) return int(cur.lastrowid) def save_simulation_candidates_bulk( run_id: int, candidates: List[Dict[str, Any]], based_on_draw: Optional[int], ) -> None: """ 상위 후보들을 simulation_candidates 테이블에 일괄 저장. candidates 각 항목: {"numbers": [...], "score_total": ..., "score_*": ..., "is_best": bool} """ data = [ ( run_id, json.dumps(sorted(c["numbers"])), c["score_total"], c.get("score_frequency"), c.get("score_fingerprint"), c.get("score_gap"), c.get("score_cooccur"), c.get("score_diversity"), 1 if c.get("is_best") else 0, based_on_draw, ) for c in candidates ] with _conn() as conn: conn.executemany( """ INSERT INTO simulation_candidates (run_id, numbers, score_total, score_frequency, score_fingerprint, score_gap, score_cooccur, score_diversity, is_best, based_on_draw) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, data, ) def replace_best_picks( picks: List[Dict[str, Any]], run_id: int, based_on_draw: Optional[int], ) -> None: """ 기존 활성 best_picks를 비활성화하고 새 picks로 교체. picks 각 항목: {"numbers": [...], "score_total": ..., "rank_in_run": int} """ with _conn() as conn: conn.execute("UPDATE best_picks SET is_active = 0 WHERE is_active = 1") data = [ ( json.dumps(sorted(p["numbers"])), p["score_total"], p.get("rank_in_run"), run_id, based_on_draw, ) for p in picks ] conn.executemany( """ INSERT INTO best_picks (numbers, score_total, rank_in_run, source_run_id, based_on_draw, is_active) VALUES (?, ?, ?, ?, ?, 1) """, data, ) def get_best_picks(limit: int = 20) -> List[Dict[str, Any]]: """현재 활성화된 best_picks 조회 (점수 내림차순)""" with _conn() as conn: rows = conn.execute( """ SELECT id, numbers, score_total, rank_in_run, source_run_id, based_on_draw, created_at FROM best_picks WHERE is_active = 1 ORDER BY score_total DESC LIMIT ? """, (limit,), ).fetchall() return [ { "id": int(r["id"]), "numbers": json.loads(r["numbers"]), "score_total": r["score_total"], "rank_in_run": r["rank_in_run"], "source_run_id": r["source_run_id"], "based_on_draw": r["based_on_draw"], "created_at": r["created_at"], } for r in rows ] def get_simulation_runs(limit: int = 10) -> List[Dict[str, Any]]: """최근 시뮬레이션 실행 기록 조회""" with _conn() as conn: rows = conn.execute( """ SELECT id, run_at, strategy, total_generated, top_k_selected, avg_score, notes FROM simulation_runs ORDER BY id DESC LIMIT ? """, (limit,), ).fetchall() return [dict(r) for r in rows] def get_simulation_candidates(run_id: int, limit: int = 100) -> List[Dict[str, Any]]: """특정 시뮬레이션 실행의 후보 목록 조회 (점수 내림차순)""" with _conn() as conn: rows = conn.execute( """ SELECT id, numbers, score_total, score_frequency, score_fingerprint, score_gap, score_cooccur, score_diversity, is_best, based_on_draw, created_at FROM simulation_candidates WHERE run_id = ? ORDER BY score_total DESC LIMIT ? """, (run_id, limit), ).fetchall() return [ {**dict(r), "numbers": json.loads(r["numbers"])} for r in rows ]