- purchase_history 테이블 추가 (draw_no, amount, sets, prize, note) - weekly_reports 캐시 테이블 추가 (drw_no UNIQUE, report JSON) - GET /api/lotto/purchase 구매 이력 조회 (draw_no, days 필터) - POST /api/lotto/purchase 구매 이력 추가 - PUT /api/lotto/purchase/:id 구매 이력 수정 (당첨금 업데이트) - DELETE /api/lotto/purchase/:id 구매 이력 삭제 - GET /api/lotto/purchase/stats 투자 수익률 통계 - GET /api/lotto/analysis/personal 개인 패턴 분석 (top/least picks, 홀짝/구간/연속번호) - GET /api/lotto/report/history 저장된 주간 리포트 목록 - GET /api/lotto/report/:drw_no 캐시 우선 조회 + cached 플래그 - 스케줄러: 토요일 09:00 주간 리포트 자동 생성 및 DB 캐싱 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1329 lines
49 KiB
Python
1329 lines
49 KiB
Python
# backend/app/db.py
|
|
import os
|
|
import sqlite3
|
|
import json
|
|
import hashlib
|
|
from typing import Any, Dict, Optional, List
|
|
|
|
DB_PATH = "/app/data/lotto.db"
|
|
|
|
def _conn() -> sqlite3.Connection:
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def _ensure_column(conn: sqlite3.Connection, table: str, col: str, ddl: str) -> None:
|
|
cols = {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
|
|
if col not in cols:
|
|
conn.execute(ddl)
|
|
|
|
def init_db() -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS draws (
|
|
drw_no INTEGER PRIMARY KEY,
|
|
drw_date TEXT NOT NULL,
|
|
n1 INTEGER NOT NULL,
|
|
n2 INTEGER NOT NULL,
|
|
n3 INTEGER NOT NULL,
|
|
n4 INTEGER NOT NULL,
|
|
n5 INTEGER NOT NULL,
|
|
n6 INTEGER NOT NULL,
|
|
bonus INTEGER NOT NULL,
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_draws_date ON draws(drw_date);")
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS recommendations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
based_on_draw INTEGER,
|
|
numbers TEXT NOT NULL,
|
|
params TEXT NOT NULL
|
|
);
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_reco_created ON recommendations(created_at DESC);")
|
|
|
|
# ✅ 확장 컬럼들(기존 DB에도 자동 추가)
|
|
_ensure_column(conn, "recommendations", "numbers_sorted",
|
|
"ALTER TABLE recommendations ADD COLUMN numbers_sorted TEXT;")
|
|
_ensure_column(conn, "recommendations", "dedup_hash",
|
|
"ALTER TABLE recommendations ADD COLUMN dedup_hash TEXT;")
|
|
_ensure_column(conn, "recommendations", "favorite",
|
|
"ALTER TABLE recommendations ADD COLUMN favorite INTEGER NOT NULL DEFAULT 0;")
|
|
_ensure_column(conn, "recommendations", "note",
|
|
"ALTER TABLE recommendations ADD COLUMN note TEXT NOT NULL DEFAULT '';")
|
|
_ensure_column(conn, "recommendations", "tags",
|
|
"ALTER TABLE recommendations ADD COLUMN tags TEXT NOT NULL DEFAULT '[]';")
|
|
|
|
# ✅ 결과 채점용 컬럼 추가
|
|
_ensure_column(conn, "recommendations", "rank",
|
|
"ALTER TABLE recommendations ADD COLUMN rank INTEGER;")
|
|
_ensure_column(conn, "recommendations", "correct_count",
|
|
"ALTER TABLE recommendations ADD COLUMN correct_count INTEGER DEFAULT 0;")
|
|
_ensure_column(conn, "recommendations", "has_bonus",
|
|
"ALTER TABLE recommendations ADD COLUMN has_bonus INTEGER DEFAULT 0;")
|
|
_ensure_column(conn, "recommendations", "checked",
|
|
"ALTER TABLE recommendations ADD COLUMN checked INTEGER DEFAULT 0;")
|
|
|
|
|
|
# ✅ UNIQUE 인덱스(중복 저장 방지)
|
|
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_reco_dedup ON recommendations(dedup_hash);")
|
|
|
|
# ── 시뮬레이션 테이블 ─────────────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS simulation_runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
run_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
strategy TEXT NOT NULL DEFAULT 'monte_carlo',
|
|
total_generated INTEGER NOT NULL DEFAULT 0,
|
|
top_k_selected INTEGER NOT NULL DEFAULT 0,
|
|
avg_score REAL,
|
|
notes TEXT DEFAULT ''
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_simrun_at ON simulation_runs(run_at DESC);"
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS simulation_candidates (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
run_id INTEGER NOT NULL,
|
|
numbers TEXT NOT NULL,
|
|
score_total REAL NOT NULL,
|
|
score_frequency REAL,
|
|
score_fingerprint REAL,
|
|
score_gap REAL,
|
|
score_cooccur REAL,
|
|
score_diversity REAL,
|
|
is_best INTEGER DEFAULT 0,
|
|
based_on_draw INTEGER,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
FOREIGN KEY(run_id) REFERENCES simulation_runs(id)
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_simcand_run "
|
|
"ON simulation_candidates(run_id, score_total DESC);"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_simcand_best "
|
|
"ON simulation_candidates(is_best, score_total DESC);"
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS best_picks (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
numbers TEXT NOT NULL,
|
|
score_total REAL NOT NULL,
|
|
rank_in_run INTEGER,
|
|
source_run_id INTEGER,
|
|
based_on_draw INTEGER,
|
|
is_active INTEGER DEFAULT 1,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
FOREIGN KEY(source_run_id) REFERENCES simulation_runs(id)
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_bestpicks_active "
|
|
"ON best_picks(is_active, score_total DESC);"
|
|
)
|
|
|
|
# ── todos 테이블 ───────────────────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS todos (
|
|
id TEXT PRIMARY KEY
|
|
DEFAULT (lower(hex(randomblob(4))) || '-' || lower(hex(randomblob(2)))),
|
|
title TEXT NOT NULL,
|
|
description TEXT,
|
|
status TEXT NOT NULL DEFAULT 'todo'
|
|
CHECK(status IN ('todo','in_progress','done')),
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_todos_created ON todos(created_at DESC);"
|
|
)
|
|
|
|
# ── blog_posts 테이블 ──────────────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS blog_posts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL,
|
|
body TEXT NOT NULL DEFAULT '',
|
|
excerpt TEXT NOT NULL DEFAULT '',
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
date TEXT NOT NULL DEFAULT (date('now','localtime')),
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_blog_date ON blog_posts(date DESC);"
|
|
)
|
|
|
|
# ── realestate_complexes 테이블 ────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS realestate_complexes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
address TEXT NOT NULL DEFAULT '',
|
|
lat REAL,
|
|
lng REAL,
|
|
units INTEGER,
|
|
types TEXT NOT NULL DEFAULT '[]',
|
|
avg_price_per_pyeong INTEGER,
|
|
subscription_start TEXT,
|
|
subscription_end TEXT,
|
|
result_date TEXT,
|
|
status TEXT NOT NULL DEFAULT '청약예정'
|
|
CHECK(status IN ('청약예정','청약중','결과발표','완료')),
|
|
priority TEXT NOT NULL DEFAULT 'normal'
|
|
CHECK(priority IN ('high','normal','low')),
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
naver_url TEXT NOT NULL DEFAULT '',
|
|
floor_plan_url TEXT NOT NULL DEFAULT '',
|
|
memo TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_realestate_status ON realestate_complexes(status);"
|
|
)
|
|
|
|
# ── subscription_items 테이블 ──────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS subscription_items (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
complex_name TEXT NOT NULL,
|
|
address TEXT NOT NULL DEFAULT '',
|
|
pyeong TEXT,
|
|
total_price INTEGER,
|
|
type TEXT,
|
|
special_type TEXT,
|
|
supply_type TEXT,
|
|
status TEXT NOT NULL DEFAULT '검토중',
|
|
min_score INTEGER,
|
|
max_income INTEGER,
|
|
homeless_required INTEGER,
|
|
subscription_start TEXT,
|
|
subscription_end TEXT,
|
|
contract_date TEXT,
|
|
interim_date TEXT,
|
|
balance_date TEXT,
|
|
result_date TEXT,
|
|
deposit_rate INTEGER DEFAULT 10,
|
|
interim_rate INTEGER DEFAULT 60,
|
|
balance_rate INTEGER DEFAULT 30,
|
|
loan_type TEXT,
|
|
loan_rate REAL,
|
|
memo TEXT NOT NULL DEFAULT '',
|
|
naver_url TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_sub_items_created ON subscription_items(created_at DESC);"
|
|
)
|
|
|
|
# ── purchase_history 테이블 ────────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS purchase_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
draw_no INTEGER NOT NULL,
|
|
amount INTEGER NOT NULL,
|
|
sets INTEGER NOT NULL DEFAULT 1,
|
|
prize INTEGER NOT NULL DEFAULT 0,
|
|
note TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_purchase_draw ON purchase_history(draw_no DESC);")
|
|
|
|
# ── weekly_reports 캐시 테이블 ──────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS weekly_reports (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
drw_no INTEGER UNIQUE NOT NULL,
|
|
report TEXT NOT NULL,
|
|
generated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
|
|
# ── subscription_profile 테이블 (싱글톤 id=1) ──────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS subscription_profile (
|
|
id INTEGER PRIMARY KEY DEFAULT 1,
|
|
is_household_head INTEGER DEFAULT 1,
|
|
is_homeless INTEGER DEFAULT 1,
|
|
homeless_period INTEGER,
|
|
savings_months INTEGER,
|
|
savings_count INTEGER,
|
|
dependents INTEGER DEFAULT 0,
|
|
residency_area TEXT,
|
|
is_married INTEGER,
|
|
marriage_months INTEGER,
|
|
monthly_income INTEGER,
|
|
special_quals TEXT NOT NULL DEFAULT '[]'
|
|
);
|
|
"""
|
|
)
|
|
|
|
|
|
# ── todos CRUD ───────────────────────────────────────────────────────────────
|
|
|
|
def _todo_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"description": r["description"],
|
|
"status": r["status"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_todos() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM todos ORDER BY created_at DESC"
|
|
).fetchall()
|
|
return [_todo_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def create_todo(title: str, description: Optional[str], status: str) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO todos (title, description, status) VALUES (?, ?, ?)",
|
|
(title, description, status),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM todos WHERE rowid = last_insert_rowid()"
|
|
).fetchone()
|
|
return _todo_row_to_dict(row)
|
|
|
|
|
|
def update_todo(todo_id: str, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""fields에 있는 항목만 업데이트 (PATCH 방식), updated_at 자동 갱신"""
|
|
allowed = {"title", "description", "status"}
|
|
updates = {k: v for k, v in fields.items() if k in allowed}
|
|
if not updates:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone()
|
|
return _todo_row_to_dict(row) if row else None
|
|
|
|
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
|
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
|
args = list(updates.values()) + [todo_id]
|
|
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE todos SET {set_clauses} WHERE id = ?",
|
|
args,
|
|
)
|
|
row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone()
|
|
return _todo_row_to_dict(row) if row else None
|
|
|
|
|
|
def delete_todo(todo_id: str) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM todos WHERE id = ?", (todo_id,))
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def delete_done_todos() -> int:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM todos WHERE status = 'done'")
|
|
return cur.rowcount
|
|
|
|
|
|
# ── blog_posts CRUD ──────────────────────────────────────────────────────────
|
|
|
|
def _post_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"body": r["body"],
|
|
"excerpt": r["excerpt"],
|
|
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
|
"date": r["date"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_posts() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM blog_posts ORDER BY date DESC, id DESC"
|
|
).fetchall()
|
|
return [_post_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def create_post(title: str, body: str, excerpt: str, tags: List[str], date: str) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO blog_posts (title, body, excerpt, tags, date) VALUES (?, ?, ?, ?, ?)",
|
|
(title, body, excerpt, json.dumps(tags), date),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM blog_posts WHERE rowid = last_insert_rowid()"
|
|
).fetchone()
|
|
return _post_row_to_dict(row)
|
|
|
|
|
|
def update_post(post_id: int, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
allowed = {"title", "body", "excerpt", "tags", "date"}
|
|
updates = {k: v for k, v in fields.items() if k in allowed}
|
|
if not updates:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone()
|
|
return _post_row_to_dict(row) if row else None
|
|
|
|
if "tags" in updates:
|
|
updates["tags"] = json.dumps(updates["tags"])
|
|
|
|
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
|
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
|
args = list(updates.values()) + [post_id]
|
|
|
|
with _conn() as conn:
|
|
conn.execute(f"UPDATE blog_posts SET {set_clauses} WHERE id = ?", args)
|
|
row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone()
|
|
return _post_row_to_dict(row) if row else None
|
|
|
|
|
|
def delete_post(post_id: int) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM blog_posts WHERE id = ?", (post_id,))
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def upsert_draw(row: Dict[str, Any]) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(drw_no) DO UPDATE SET
|
|
drw_date=excluded.drw_date,
|
|
n1=excluded.n1, n2=excluded.n2, n3=excluded.n3,
|
|
n4=excluded.n4, n5=excluded.n5, n6=excluded.n6,
|
|
bonus=excluded.bonus,
|
|
updated_at=datetime('now')
|
|
""",
|
|
(
|
|
int(row["drw_no"]),
|
|
str(row["drw_date"]),
|
|
int(row["n1"]), int(row["n2"]), int(row["n3"]),
|
|
int(row["n4"]), int(row["n5"]), int(row["n6"]),
|
|
int(row["bonus"]),
|
|
),
|
|
)
|
|
|
|
def upsert_many_draws(rows: List[Dict[str, Any]]) -> None:
|
|
data = [
|
|
(
|
|
int(r["drw_no"]), str(r["drw_date"]),
|
|
int(r["n1"]), int(r["n2"]), int(r["n3"]),
|
|
int(r["n4"]), int(r["n5"]), int(r["n6"]),
|
|
int(r["bonus"])
|
|
) for r in rows
|
|
]
|
|
with _conn() as conn:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(drw_no) DO UPDATE SET
|
|
drw_date=excluded.drw_date,
|
|
n1=excluded.n1, n2=excluded.n2, n3=excluded.n3,
|
|
n4=excluded.n4, n5=excluded.n5, n6=excluded.n6,
|
|
bonus=excluded.bonus,
|
|
updated_at=datetime('now')
|
|
""",
|
|
data
|
|
)
|
|
|
|
def get_latest_draw() -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT * FROM draws ORDER BY drw_no DESC LIMIT 1").fetchone()
|
|
return dict(r) if r else None
|
|
|
|
def get_draw(drw_no: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT * FROM draws WHERE drw_no = ?", (drw_no,)).fetchone()
|
|
return dict(r) if r else None
|
|
|
|
def count_draws() -> int:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT COUNT(*) AS c FROM draws").fetchone()
|
|
return int(r["c"])
|
|
|
|
def get_all_draw_numbers():
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT drw_no, n1, n2, n3, n4, n5, n6 FROM draws ORDER BY drw_no ASC"
|
|
).fetchall()
|
|
return [(int(r["drw_no"]), [int(r["n1"]), int(r["n2"]), int(r["n3"]), int(r["n4"]), int(r["n5"]), int(r["n6"])]) for r in rows]
|
|
|
|
# ---------- ✅ recommendation helpers ----------
|
|
|
|
def _canonical_params(params: dict) -> str:
|
|
return json.dumps(params, sort_keys=True, separators=(",", ":"))
|
|
|
|
def _numbers_sorted_str(numbers: List[int]) -> str:
|
|
return ",".join(str(x) for x in sorted(numbers))
|
|
|
|
def _dedup_hash(based_on_draw: Optional[int], numbers: List[int], params: dict) -> str:
|
|
s = f"{based_on_draw or ''}|{_numbers_sorted_str(numbers)}|{_canonical_params(params)}"
|
|
return hashlib.sha1(s.encode("utf-8")).hexdigest()
|
|
|
|
def save_recommendation_dedup(based_on_draw: Optional[int], numbers: List[int], params: dict) -> Dict[str, Any]:
|
|
"""
|
|
✅ 동일 추천(번호+params+based_on_draw)이면 중복 저장 없이 기존 id 반환
|
|
"""
|
|
ns = _numbers_sorted_str(numbers)
|
|
h = _dedup_hash(based_on_draw, numbers, params)
|
|
|
|
with _conn() as conn:
|
|
# 이미 있으면 반환
|
|
r = conn.execute("SELECT id FROM recommendations WHERE dedup_hash = ?", (h,)).fetchone()
|
|
if r:
|
|
return {"id": int(r["id"]), "saved": False, "deduped": True}
|
|
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO recommendations (based_on_draw, numbers, params, numbers_sorted, dedup_hash)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(based_on_draw, json.dumps(numbers), json.dumps(params), ns, h),
|
|
)
|
|
return {"id": int(cur.lastrowid), "saved": True, "deduped": False}
|
|
|
|
def list_recommendations_ex(
|
|
limit: int = 30,
|
|
offset: int = 0,
|
|
favorite: Optional[bool] = None,
|
|
tag: Optional[str] = None,
|
|
q: Optional[str] = None,
|
|
sort: str = "id_desc", # id_desc|created_desc|favorite_desc
|
|
) -> List[Dict[str, Any]]:
|
|
import json
|
|
|
|
where = []
|
|
args: list[Any] = []
|
|
|
|
if favorite is not None:
|
|
where.append("favorite = ?")
|
|
args.append(1 if favorite else 0)
|
|
|
|
if q:
|
|
where.append("note LIKE ?")
|
|
args.append(f"%{q}%")
|
|
|
|
# tags는 JSON 문자열이므로 단순 LIKE로 처리(가볍게 시작)
|
|
if tag:
|
|
where.append("tags LIKE ?")
|
|
args.append(f"%{tag}%")
|
|
|
|
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
|
|
|
|
if sort == "created_desc":
|
|
order = "created_at DESC"
|
|
elif sort == "favorite_desc":
|
|
# favorite(1)이 먼저, 그 다음 최신
|
|
order = "favorite DESC, id DESC"
|
|
else:
|
|
order = "id DESC"
|
|
|
|
sql = f"""
|
|
SELECT id, created_at, based_on_draw, numbers, params, favorite, note, tags
|
|
FROM recommendations
|
|
{where_sql}
|
|
ORDER BY {order}
|
|
LIMIT ? OFFSET ?
|
|
"""
|
|
args.extend([int(limit), int(offset)])
|
|
|
|
with _conn() as conn:
|
|
rows = conn.execute(sql, args).fetchall()
|
|
|
|
out = []
|
|
for r in rows:
|
|
out.append({
|
|
"id": int(r["id"]),
|
|
"created_at": r["created_at"],
|
|
"based_on_draw": r["based_on_draw"],
|
|
"numbers": json.loads(r["numbers"]),
|
|
"params": json.loads(r["params"]),
|
|
"favorite": bool(r["favorite"]) if r["favorite"] is not None else False,
|
|
"note": r["note"],
|
|
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
|
})
|
|
return out
|
|
|
|
def update_recommendation(rec_id: int, favorite: Optional[bool] = None, note: Optional[str] = None, tags: Optional[List[str]] = None) -> bool:
|
|
fields = []
|
|
args: list[Any] = []
|
|
|
|
if favorite is not None:
|
|
fields.append("favorite = ?")
|
|
args.append(1 if favorite else 0)
|
|
if note is not None:
|
|
fields.append("note = ?")
|
|
args.append(note)
|
|
if tags is not None:
|
|
fields.append("tags = ?")
|
|
args.append(json.dumps(tags))
|
|
|
|
if not fields:
|
|
return False
|
|
|
|
args.append(rec_id)
|
|
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
f"UPDATE recommendations SET {', '.join(fields)} WHERE id = ?",
|
|
args,
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
def delete_recommendation(rec_id: int) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM recommendations WHERE id = ?", (rec_id,))
|
|
return cur.rowcount > 0
|
|
|
|
def get_recommendation_performance() -> Dict[str, Any]:
|
|
"""채점된 추천 이력 기반 성과 통계"""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT correct_count, rank FROM recommendations WHERE checked = 1"
|
|
).fetchall()
|
|
|
|
if not rows:
|
|
return {
|
|
"total_checked": 0,
|
|
"avg_correct": 0.0,
|
|
"distribution": {str(i): 0 for i in range(7)},
|
|
"rate_3plus": 0.0,
|
|
"rate_4plus": 0.0,
|
|
"by_rank": {"rank_1": 0, "rank_2": 0, "rank_3": 0, "rank_4": 0, "rank_5": 0, "no_prize": 0},
|
|
"vs_random": {"our_avg": 0.0, "random_avg": 0.8, "improvement_pct": 0.0},
|
|
}
|
|
|
|
total = len(rows)
|
|
corrects = [r["correct_count"] or 0 for r in rows]
|
|
ranks = [r["rank"] or 0 for r in rows]
|
|
avg_correct = sum(corrects) / total
|
|
|
|
RANDOM_AVG = 0.8 # 이론 기댓값: 6 * (6/45)
|
|
improvement = (avg_correct - RANDOM_AVG) / RANDOM_AVG * 100
|
|
|
|
return {
|
|
"total_checked": total,
|
|
"avg_correct": round(avg_correct, 3),
|
|
"distribution": {str(i): corrects.count(i) for i in range(7)},
|
|
"rate_3plus": round(sum(1 for c in corrects if c >= 3) / total, 4),
|
|
"rate_4plus": round(sum(1 for c in corrects if c >= 4) / total, 4),
|
|
"by_rank": {
|
|
"rank_1": ranks.count(1),
|
|
"rank_2": ranks.count(2),
|
|
"rank_3": ranks.count(3),
|
|
"rank_4": ranks.count(4),
|
|
"rank_5": ranks.count(5),
|
|
"no_prize": ranks.count(0),
|
|
},
|
|
"vs_random": {
|
|
"our_avg": round(avg_correct, 3),
|
|
"random_avg": RANDOM_AVG,
|
|
"improvement_pct": round(improvement, 1),
|
|
},
|
|
}
|
|
|
|
|
|
def update_recommendation_result(rec_id: int, rank: int, correct_count: int, has_bonus: bool) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"""
|
|
UPDATE recommendations
|
|
SET rank = ?, correct_count = ?, has_bonus = ?, checked = 1
|
|
WHERE id = ?
|
|
""",
|
|
(rank, correct_count, 1 if has_bonus else 0, rec_id)
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
|
|
# ── 시뮬레이션 CRUD ─────────────────────────────────────────────────────────
|
|
|
|
def save_simulation_run(
|
|
strategy: str,
|
|
total_generated: int,
|
|
top_k_selected: int,
|
|
avg_score: float,
|
|
notes: str = "",
|
|
) -> int:
|
|
"""시뮬레이션 실행 기록 저장, 생성된 ID 반환"""
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO simulation_runs (strategy, total_generated, top_k_selected, avg_score, notes)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(strategy, total_generated, top_k_selected, round(avg_score, 6), notes),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def save_simulation_candidates_bulk(
|
|
run_id: int,
|
|
candidates: List[Dict[str, Any]],
|
|
based_on_draw: Optional[int],
|
|
) -> None:
|
|
"""
|
|
상위 후보들을 simulation_candidates 테이블에 일괄 저장.
|
|
candidates 각 항목: {"numbers": [...], "score_total": ..., "score_*": ..., "is_best": bool}
|
|
"""
|
|
data = [
|
|
(
|
|
run_id,
|
|
json.dumps(sorted(c["numbers"])),
|
|
c["score_total"],
|
|
c.get("score_frequency"),
|
|
c.get("score_fingerprint"),
|
|
c.get("score_gap"),
|
|
c.get("score_cooccur"),
|
|
c.get("score_diversity"),
|
|
1 if c.get("is_best") else 0,
|
|
based_on_draw,
|
|
)
|
|
for c in candidates
|
|
]
|
|
with _conn() as conn:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO simulation_candidates
|
|
(run_id, numbers, score_total, score_frequency, score_fingerprint,
|
|
score_gap, score_cooccur, score_diversity, is_best, based_on_draw)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
data,
|
|
)
|
|
|
|
|
|
def replace_best_picks(
|
|
picks: List[Dict[str, Any]],
|
|
run_id: int,
|
|
based_on_draw: Optional[int],
|
|
) -> None:
|
|
"""
|
|
기존 활성 best_picks를 비활성화하고 새 picks로 교체.
|
|
picks 각 항목: {"numbers": [...], "score_total": ..., "rank_in_run": int}
|
|
"""
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE best_picks SET is_active = 0 WHERE is_active = 1")
|
|
data = [
|
|
(
|
|
json.dumps(sorted(p["numbers"])),
|
|
p["score_total"],
|
|
p.get("rank_in_run"),
|
|
run_id,
|
|
based_on_draw,
|
|
)
|
|
for p in picks
|
|
]
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO best_picks (numbers, score_total, rank_in_run, source_run_id, based_on_draw, is_active)
|
|
VALUES (?, ?, ?, ?, ?, 1)
|
|
""",
|
|
data,
|
|
)
|
|
|
|
|
|
def get_best_picks(limit: int = 20) -> List[Dict[str, Any]]:
|
|
"""현재 활성화된 best_picks 조회 (점수 내림차순)"""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, numbers, score_total, rank_in_run, source_run_id, based_on_draw, created_at
|
|
FROM best_picks
|
|
WHERE is_active = 1
|
|
ORDER BY score_total DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"id": int(r["id"]),
|
|
"numbers": json.loads(r["numbers"]),
|
|
"score_total": r["score_total"],
|
|
"rank_in_run": r["rank_in_run"],
|
|
"source_run_id": r["source_run_id"],
|
|
"based_on_draw": r["based_on_draw"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def get_simulation_runs(limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""최근 시뮬레이션 실행 기록 조회"""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, run_at, strategy, total_generated, top_k_selected, avg_score, notes
|
|
FROM simulation_runs
|
|
ORDER BY id DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_simulation_candidates(run_id: int, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""특정 시뮬레이션 실행의 후보 목록 조회 (점수 내림차순)"""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, numbers, score_total, score_frequency, score_fingerprint,
|
|
score_gap, score_cooccur, score_diversity, is_best, based_on_draw, created_at
|
|
FROM simulation_candidates
|
|
WHERE run_id = ?
|
|
ORDER BY score_total DESC
|
|
LIMIT ?
|
|
""",
|
|
(run_id, limit),
|
|
).fetchall()
|
|
return [
|
|
{**dict(r), "numbers": json.loads(r["numbers"])}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
# ── realestate_complexes CRUD ─────────────────────────────────────────────────
|
|
|
|
def _complex_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"name": r["name"],
|
|
"address": r["address"],
|
|
"lat": r["lat"],
|
|
"lng": r["lng"],
|
|
"units": r["units"],
|
|
"types": json.loads(r["types"]) if r["types"] else [],
|
|
"avgPricePerPyeong": r["avg_price_per_pyeong"],
|
|
"subscriptionStart": r["subscription_start"],
|
|
"subscriptionEnd": r["subscription_end"],
|
|
"resultDate": r["result_date"],
|
|
"status": r["status"],
|
|
"priority": r["priority"],
|
|
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
|
"naverUrl": r["naver_url"],
|
|
"floorPlanUrl": r["floor_plan_url"],
|
|
"memo": r["memo"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_complexes() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM realestate_complexes ORDER BY id DESC"
|
|
).fetchall()
|
|
return [_complex_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def get_complex(complex_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute(
|
|
"SELECT * FROM realestate_complexes WHERE id = ?", (complex_id,)
|
|
).fetchone()
|
|
return _complex_row_to_dict(r) if r else None
|
|
|
|
|
|
def create_complex(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO realestate_complexes
|
|
(name, address, lat, lng, units, types, avg_price_per_pyeong,
|
|
subscription_start, subscription_end, result_date,
|
|
status, priority, tags, naver_url, floor_plan_url, memo)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
data["name"],
|
|
data.get("address", ""),
|
|
data.get("lat"),
|
|
data.get("lng"),
|
|
data.get("units"),
|
|
json.dumps(data.get("types", [])),
|
|
data.get("avgPricePerPyeong"),
|
|
data.get("subscriptionStart"),
|
|
data.get("subscriptionEnd"),
|
|
data.get("resultDate"),
|
|
data.get("status", "청약예정"),
|
|
data.get("priority", "normal"),
|
|
json.dumps(data.get("tags", [])),
|
|
data.get("naverUrl", ""),
|
|
data.get("floorPlanUrl", ""),
|
|
data.get("memo", ""),
|
|
),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM realestate_complexes WHERE rowid = last_insert_rowid()"
|
|
).fetchone()
|
|
return _complex_row_to_dict(row)
|
|
|
|
|
|
def update_complex(complex_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
field_map = {
|
|
"name": "name",
|
|
"address": "address",
|
|
"lat": "lat",
|
|
"lng": "lng",
|
|
"units": "units",
|
|
"avgPricePerPyeong": "avg_price_per_pyeong",
|
|
"subscriptionStart": "subscription_start",
|
|
"subscriptionEnd": "subscription_end",
|
|
"resultDate": "result_date",
|
|
"status": "status",
|
|
"priority": "priority",
|
|
"naverUrl": "naver_url",
|
|
"floorPlanUrl": "floor_plan_url",
|
|
"memo": "memo",
|
|
}
|
|
json_fields = {"types", "tags"}
|
|
|
|
updates: Dict[str, Any] = {}
|
|
for camel, snake in field_map.items():
|
|
if camel in data:
|
|
updates[snake] = data[camel]
|
|
for f in json_fields:
|
|
if f in data:
|
|
updates[f] = json.dumps(data[f])
|
|
|
|
if not updates:
|
|
return get_complex(complex_id)
|
|
|
|
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
|
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
|
args = list(updates.values()) + [complex_id]
|
|
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE realestate_complexes SET {set_clauses} WHERE id = ?", args
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM realestate_complexes WHERE id = ?", (complex_id,)
|
|
).fetchone()
|
|
return _complex_row_to_dict(row) if row else None
|
|
|
|
|
|
def delete_complex(complex_id: int) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"DELETE FROM realestate_complexes WHERE id = ?", (complex_id,)
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
|
|
# ── subscription_items CRUD ───────────────────────────────────────────────────
|
|
|
|
_SUB_ITEM_FIELD_MAP = {
|
|
"complexName": "complex_name",
|
|
"address": "address",
|
|
"pyeong": "pyeong",
|
|
"totalPrice": "total_price",
|
|
"type": "type",
|
|
"specialType": "special_type",
|
|
"supplyType": "supply_type",
|
|
"status": "status",
|
|
"minScore": "min_score",
|
|
"maxIncome": "max_income",
|
|
"homelessRequired": "homeless_required",
|
|
"subscriptionStart": "subscription_start",
|
|
"subscriptionEnd": "subscription_end",
|
|
"contractDate": "contract_date",
|
|
"interimDate": "interim_date",
|
|
"balanceDate": "balance_date",
|
|
"resultDate": "result_date",
|
|
"depositRate": "deposit_rate",
|
|
"interimRate": "interim_rate",
|
|
"balanceRate": "balance_rate",
|
|
"loanType": "loan_type",
|
|
"loanRate": "loan_rate",
|
|
"memo": "memo",
|
|
"naverUrl": "naver_url",
|
|
}
|
|
|
|
|
|
def _sub_item_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"complexName": r["complex_name"],
|
|
"address": r["address"],
|
|
"pyeong": r["pyeong"],
|
|
"totalPrice": r["total_price"],
|
|
"type": r["type"],
|
|
"specialType": r["special_type"],
|
|
"supplyType": r["supply_type"],
|
|
"status": r["status"],
|
|
"minScore": r["min_score"],
|
|
"maxIncome": r["max_income"],
|
|
"homelessRequired": r["homeless_required"],
|
|
"subscriptionStart": r["subscription_start"],
|
|
"subscriptionEnd": r["subscription_end"],
|
|
"contractDate": r["contract_date"],
|
|
"interimDate": r["interim_date"],
|
|
"balanceDate": r["balance_date"],
|
|
"resultDate": r["result_date"],
|
|
"depositRate": r["deposit_rate"],
|
|
"interimRate": r["interim_rate"],
|
|
"balanceRate": r["balance_rate"],
|
|
"loanType": r["loan_type"],
|
|
"loanRate": r["loan_rate"],
|
|
"memo": r["memo"],
|
|
"naverUrl": r["naver_url"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_subscription_items() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM subscription_items ORDER BY created_at DESC"
|
|
).fetchall()
|
|
return [_sub_item_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def create_subscription_item(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO subscription_items
|
|
(complex_name, address, pyeong, total_price, type, special_type, supply_type,
|
|
status, min_score, max_income, homeless_required,
|
|
subscription_start, subscription_end, contract_date, interim_date,
|
|
balance_date, result_date, deposit_rate, interim_rate, balance_rate,
|
|
loan_type, loan_rate, memo, naver_url)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
data["complexName"],
|
|
data.get("address", ""),
|
|
data.get("pyeong"),
|
|
data.get("totalPrice"),
|
|
data.get("type"),
|
|
data.get("specialType"),
|
|
data.get("supplyType"),
|
|
data.get("status", "검토중"),
|
|
data.get("minScore"),
|
|
data.get("maxIncome"),
|
|
data.get("homelessRequired"),
|
|
data.get("subscriptionStart"),
|
|
data.get("subscriptionEnd"),
|
|
data.get("contractDate"),
|
|
data.get("interimDate"),
|
|
data.get("balanceDate"),
|
|
data.get("resultDate"),
|
|
data.get("depositRate", 10),
|
|
data.get("interimRate", 60),
|
|
data.get("balanceRate", 30),
|
|
data.get("loanType"),
|
|
data.get("loanRate"),
|
|
data.get("memo", ""),
|
|
data.get("naverUrl", ""),
|
|
),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM subscription_items WHERE rowid = last_insert_rowid()"
|
|
).fetchone()
|
|
return _sub_item_row_to_dict(row)
|
|
|
|
|
|
def update_subscription_item(item_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
updates: Dict[str, Any] = {}
|
|
for camel, snake in _SUB_ITEM_FIELD_MAP.items():
|
|
if camel in data:
|
|
updates[snake] = data[camel]
|
|
|
|
if not updates:
|
|
with _conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM subscription_items WHERE id = ?", (item_id,)
|
|
).fetchone()
|
|
return _sub_item_row_to_dict(row) if row else None
|
|
|
|
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
|
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
|
args = list(updates.values()) + [item_id]
|
|
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE subscription_items SET {set_clauses} WHERE id = ?", args
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM subscription_items WHERE id = ?", (item_id,)
|
|
).fetchone()
|
|
return _sub_item_row_to_dict(row) if row else None
|
|
|
|
|
|
def delete_subscription_item(item_id: int) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM subscription_items WHERE id = ?", (item_id,))
|
|
return cur.rowcount > 0
|
|
|
|
|
|
# ── subscription_profile CRUD (싱글톤) ────────────────────────────────────────
|
|
|
|
def _profile_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"isHouseholdHead": bool(r["is_household_head"]) if r["is_household_head"] is not None else None,
|
|
"isHomeless": bool(r["is_homeless"]) if r["is_homeless"] is not None else None,
|
|
"homelessPeriod": r["homeless_period"],
|
|
"savingsMonths": r["savings_months"],
|
|
"savingsCount": r["savings_count"],
|
|
"dependents": r["dependents"],
|
|
"residencyArea": r["residency_area"],
|
|
"isMarried": bool(r["is_married"]) if r["is_married"] is not None else None,
|
|
"marriageMonths": r["marriage_months"],
|
|
"monthlyIncome": r["monthly_income"],
|
|
"specialQuals": json.loads(r["special_quals"]) if r["special_quals"] else [],
|
|
}
|
|
|
|
|
|
def get_subscription_profile() -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute(
|
|
"SELECT * FROM subscription_profile WHERE id = 1"
|
|
).fetchone()
|
|
return _profile_row_to_dict(r) if r else None
|
|
|
|
|
|
# ── purchase_history CRUD ─────────────────────────────────────────────────────
|
|
|
|
def _purchase_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"draw_no": r["draw_no"],
|
|
"amount": r["amount"],
|
|
"sets": r["sets"],
|
|
"prize": r["prize"],
|
|
"note": r["note"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
|
|
|
|
def add_purchase(draw_no: int, amount: int, sets: int, prize: int = 0, note: str = "") -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO purchase_history (draw_no, amount, sets, prize, note) VALUES (?, ?, ?, ?, ?)",
|
|
(draw_no, amount, sets, prize, note),
|
|
)
|
|
row = conn.execute("SELECT * FROM purchase_history WHERE rowid = last_insert_rowid()").fetchone()
|
|
return _purchase_row_to_dict(row)
|
|
|
|
|
|
def get_purchases(draw_no: int = None, days: int = None) -> List[Dict[str, Any]]:
|
|
conditions, params = [], []
|
|
if draw_no is not None:
|
|
conditions.append("draw_no = ?")
|
|
params.append(draw_no)
|
|
if days:
|
|
conditions.append("created_at >= datetime('now', ? || ' days')")
|
|
params.append(f"-{days}")
|
|
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
f"SELECT * FROM purchase_history {where} ORDER BY draw_no DESC, id DESC",
|
|
params,
|
|
).fetchall()
|
|
return [_purchase_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def update_purchase(purchase_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
allowed = {"draw_no", "amount", "sets", "prize", "note"}
|
|
updates = {k: v for k, v in data.items() if k in allowed}
|
|
if not updates:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM purchase_history WHERE id = ?", (purchase_id,)).fetchone()
|
|
return _purchase_row_to_dict(row) if row else None
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
f"UPDATE purchase_history SET {set_clause} WHERE id = ?",
|
|
list(updates.values()) + [purchase_id],
|
|
)
|
|
if cur.rowcount == 0:
|
|
return None
|
|
row = conn.execute("SELECT * FROM purchase_history WHERE id = ?", (purchase_id,)).fetchone()
|
|
return _purchase_row_to_dict(row)
|
|
|
|
|
|
def delete_purchase(purchase_id: int) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM purchase_history WHERE id = ?", (purchase_id,))
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def get_purchase_stats() -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT amount, prize FROM purchase_history").fetchall()
|
|
if not rows:
|
|
return {
|
|
"total_records": 0,
|
|
"total_invested": 0,
|
|
"total_prize": 0,
|
|
"net": 0,
|
|
"return_rate": 0.0,
|
|
"prize_count": 0,
|
|
"max_prize": 0,
|
|
}
|
|
amounts = [r["amount"] for r in rows]
|
|
prizes = [r["prize"] for r in rows]
|
|
total_invested = sum(amounts)
|
|
total_prize = sum(prizes)
|
|
return {
|
|
"total_records": len(rows),
|
|
"total_invested": total_invested,
|
|
"total_prize": total_prize,
|
|
"net": total_prize - total_invested,
|
|
"return_rate": round((total_prize / total_invested * 100) if total_invested else 0.0, 2),
|
|
"prize_count": sum(1 for p in prizes if p > 0),
|
|
"max_prize": max(prizes),
|
|
}
|
|
|
|
|
|
# ── weekly_reports CRUD ───────────────────────────────────────────────────────
|
|
|
|
def save_weekly_report(drw_no: int, report_json: str) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO weekly_reports (drw_no, report)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(drw_no) DO UPDATE SET
|
|
report = excluded.report,
|
|
generated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
|
|
""",
|
|
(drw_no, report_json),
|
|
)
|
|
|
|
|
|
def get_weekly_report_list(limit: int = 10) -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT drw_no, generated_at FROM weekly_reports ORDER BY drw_no DESC LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_weekly_report(drw_no: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
row = conn.execute(
|
|
"SELECT drw_no, report, generated_at FROM weekly_reports WHERE drw_no = ?",
|
|
(drw_no,),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
import json as _json
|
|
return {"drw_no": row["drw_no"], "generated_at": row["generated_at"], **_json.loads(row["report"])}
|
|
|
|
|
|
def get_all_recommendation_numbers() -> List[List[int]]:
|
|
"""개인 패턴 분석용: 저장된 모든 추천 번호 반환"""
|
|
with _conn() as conn:
|
|
rows = conn.execute("SELECT numbers FROM recommendations ORDER BY id DESC").fetchall()
|
|
return [json.loads(r["numbers"]) for r in rows]
|
|
|
|
|
|
def upsert_subscription_profile(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
field_map = {
|
|
"isHouseholdHead": "is_household_head",
|
|
"isHomeless": "is_homeless",
|
|
"homelessPeriod": "homeless_period",
|
|
"savingsMonths": "savings_months",
|
|
"savingsCount": "savings_count",
|
|
"dependents": "dependents",
|
|
"residencyArea": "residency_area",
|
|
"isMarried": "is_married",
|
|
"marriageMonths": "marriage_months",
|
|
"monthlyIncome": "monthly_income",
|
|
}
|
|
|
|
updates: Dict[str, Any] = {}
|
|
for camel, snake in field_map.items():
|
|
if camel in data:
|
|
val = data[camel]
|
|
# bool → int (SQLite)
|
|
if isinstance(val, bool):
|
|
val = 1 if val else 0
|
|
updates[snake] = val
|
|
if "specialQuals" in data:
|
|
updates["special_quals"] = json.dumps(data["specialQuals"])
|
|
|
|
with _conn() as conn:
|
|
existing = conn.execute(
|
|
"SELECT id FROM subscription_profile WHERE id = 1"
|
|
).fetchone()
|
|
|
|
if existing:
|
|
if updates:
|
|
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
|
conn.execute(
|
|
f"UPDATE subscription_profile SET {set_clauses} WHERE id = 1",
|
|
list(updates.values()),
|
|
)
|
|
else:
|
|
cols = ["id"] + list(updates.keys())
|
|
vals = [1] + list(updates.values())
|
|
placeholders = ", ".join("?" for _ in vals)
|
|
conn.execute(
|
|
f"INSERT INTO subscription_profile ({', '.join(cols)}) VALUES ({placeholders})",
|
|
vals,
|
|
)
|
|
|
|
row = conn.execute(
|
|
"SELECT * FROM subscription_profile WHERE id = 1"
|
|
).fetchone()
|
|
return _profile_row_to_dict(row)
|
|
|