- realestate_complexes 테이블 추가 (lotto.db) - CRUD 엔드포인트 4개: GET/POST /api/realestate/complexes, PUT/DELETE /api/realestate/complexes/:id - status: 청약예정|청약중|결과발표|완료, priority: high|normal|low 검증 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
831 lines
30 KiB
Python
831 lines
30 KiB
Python
# backend/app/db.py
|
|
import os
|
|
import sqlite3
|
|
import json
|
|
import hashlib
|
|
from typing import Any, Dict, Optional, List
|
|
|
|
DB_PATH = "/app/data/lotto.db"
|
|
|
|
def _conn() -> sqlite3.Connection:
|
|
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def _ensure_column(conn: sqlite3.Connection, table: str, col: str, ddl: str) -> None:
|
|
cols = {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
|
|
if col not in cols:
|
|
conn.execute(ddl)
|
|
|
|
def init_db() -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS draws (
|
|
drw_no INTEGER PRIMARY KEY,
|
|
drw_date TEXT NOT NULL,
|
|
n1 INTEGER NOT NULL,
|
|
n2 INTEGER NOT NULL,
|
|
n3 INTEGER NOT NULL,
|
|
n4 INTEGER NOT NULL,
|
|
n5 INTEGER NOT NULL,
|
|
n6 INTEGER NOT NULL,
|
|
bonus INTEGER NOT NULL,
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_draws_date ON draws(drw_date);")
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS recommendations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
based_on_draw INTEGER,
|
|
numbers TEXT NOT NULL,
|
|
params TEXT NOT NULL
|
|
);
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_reco_created ON recommendations(created_at DESC);")
|
|
|
|
# ✅ 확장 컬럼들(기존 DB에도 자동 추가)
|
|
_ensure_column(conn, "recommendations", "numbers_sorted",
|
|
"ALTER TABLE recommendations ADD COLUMN numbers_sorted TEXT;")
|
|
_ensure_column(conn, "recommendations", "dedup_hash",
|
|
"ALTER TABLE recommendations ADD COLUMN dedup_hash TEXT;")
|
|
_ensure_column(conn, "recommendations", "favorite",
|
|
"ALTER TABLE recommendations ADD COLUMN favorite INTEGER NOT NULL DEFAULT 0;")
|
|
_ensure_column(conn, "recommendations", "note",
|
|
"ALTER TABLE recommendations ADD COLUMN note TEXT NOT NULL DEFAULT '';")
|
|
_ensure_column(conn, "recommendations", "tags",
|
|
"ALTER TABLE recommendations ADD COLUMN tags TEXT NOT NULL DEFAULT '[]';")
|
|
|
|
# ✅ 결과 채점용 컬럼 추가
|
|
_ensure_column(conn, "recommendations", "rank",
|
|
"ALTER TABLE recommendations ADD COLUMN rank INTEGER;")
|
|
_ensure_column(conn, "recommendations", "correct_count",
|
|
"ALTER TABLE recommendations ADD COLUMN correct_count INTEGER DEFAULT 0;")
|
|
_ensure_column(conn, "recommendations", "has_bonus",
|
|
"ALTER TABLE recommendations ADD COLUMN has_bonus INTEGER DEFAULT 0;")
|
|
_ensure_column(conn, "recommendations", "checked",
|
|
"ALTER TABLE recommendations ADD COLUMN checked INTEGER DEFAULT 0;")
|
|
|
|
|
|
# ✅ UNIQUE 인덱스(중복 저장 방지)
|
|
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_reco_dedup ON recommendations(dedup_hash);")
|
|
|
|
# ── 시뮬레이션 테이블 ─────────────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS simulation_runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
run_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
strategy TEXT NOT NULL DEFAULT 'monte_carlo',
|
|
total_generated INTEGER NOT NULL DEFAULT 0,
|
|
top_k_selected INTEGER NOT NULL DEFAULT 0,
|
|
avg_score REAL,
|
|
notes TEXT DEFAULT ''
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_simrun_at ON simulation_runs(run_at DESC);"
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS simulation_candidates (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
run_id INTEGER NOT NULL,
|
|
numbers TEXT NOT NULL,
|
|
score_total REAL NOT NULL,
|
|
score_frequency REAL,
|
|
score_fingerprint REAL,
|
|
score_gap REAL,
|
|
score_cooccur REAL,
|
|
score_diversity REAL,
|
|
is_best INTEGER DEFAULT 0,
|
|
based_on_draw INTEGER,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
FOREIGN KEY(run_id) REFERENCES simulation_runs(id)
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_simcand_run "
|
|
"ON simulation_candidates(run_id, score_total DESC);"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_simcand_best "
|
|
"ON simulation_candidates(is_best, score_total DESC);"
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS best_picks (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
numbers TEXT NOT NULL,
|
|
score_total REAL NOT NULL,
|
|
rank_in_run INTEGER,
|
|
source_run_id INTEGER,
|
|
based_on_draw INTEGER,
|
|
is_active INTEGER DEFAULT 1,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
FOREIGN KEY(source_run_id) REFERENCES simulation_runs(id)
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_bestpicks_active "
|
|
"ON best_picks(is_active, score_total DESC);"
|
|
)
|
|
|
|
# ── todos 테이블 ───────────────────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS todos (
|
|
id TEXT PRIMARY KEY
|
|
DEFAULT (lower(hex(randomblob(4))) || '-' || lower(hex(randomblob(2)))),
|
|
title TEXT NOT NULL,
|
|
description TEXT,
|
|
status TEXT NOT NULL DEFAULT 'todo'
|
|
CHECK(status IN ('todo','in_progress','done')),
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_todos_created ON todos(created_at DESC);"
|
|
)
|
|
|
|
# ── blog_posts 테이블 ──────────────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS blog_posts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL,
|
|
body TEXT NOT NULL DEFAULT '',
|
|
excerpt TEXT NOT NULL DEFAULT '',
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
date TEXT NOT NULL DEFAULT (date('now','localtime')),
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_blog_date ON blog_posts(date DESC);"
|
|
)
|
|
|
|
# ── realestate_complexes 테이블 ────────────────────────────────────────
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS realestate_complexes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
address TEXT NOT NULL DEFAULT '',
|
|
lat REAL,
|
|
lng REAL,
|
|
units INTEGER,
|
|
types TEXT NOT NULL DEFAULT '[]',
|
|
avg_price_per_pyeong INTEGER,
|
|
subscription_start TEXT,
|
|
subscription_end TEXT,
|
|
result_date TEXT,
|
|
status TEXT NOT NULL DEFAULT '청약예정'
|
|
CHECK(status IN ('청약예정','청약중','결과발표','완료')),
|
|
priority TEXT NOT NULL DEFAULT 'normal'
|
|
CHECK(priority IN ('high','normal','low')),
|
|
tags TEXT NOT NULL DEFAULT '[]',
|
|
naver_url TEXT NOT NULL DEFAULT '',
|
|
floor_plan_url TEXT NOT NULL DEFAULT '',
|
|
memo TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
|
|
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
|
);
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_realestate_status ON realestate_complexes(status);"
|
|
)
|
|
|
|
|
|
# ── todos CRUD ───────────────────────────────────────────────────────────────
|
|
|
|
def _todo_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"description": r["description"],
|
|
"status": r["status"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_todos() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM todos ORDER BY created_at DESC"
|
|
).fetchall()
|
|
return [_todo_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def create_todo(title: str, description: Optional[str], status: str) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO todos (title, description, status) VALUES (?, ?, ?)",
|
|
(title, description, status),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM todos WHERE rowid = last_insert_rowid()"
|
|
).fetchone()
|
|
return _todo_row_to_dict(row)
|
|
|
|
|
|
def update_todo(todo_id: str, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""fields에 있는 항목만 업데이트 (PATCH 방식), updated_at 자동 갱신"""
|
|
allowed = {"title", "description", "status"}
|
|
updates = {k: v for k, v in fields.items() if k in allowed}
|
|
if not updates:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone()
|
|
return _todo_row_to_dict(row) if row else None
|
|
|
|
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
|
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
|
args = list(updates.values()) + [todo_id]
|
|
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE todos SET {set_clauses} WHERE id = ?",
|
|
args,
|
|
)
|
|
row = conn.execute("SELECT * FROM todos WHERE id = ?", (todo_id,)).fetchone()
|
|
return _todo_row_to_dict(row) if row else None
|
|
|
|
|
|
def delete_todo(todo_id: str) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM todos WHERE id = ?", (todo_id,))
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def delete_done_todos() -> int:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM todos WHERE status = 'done'")
|
|
return cur.rowcount
|
|
|
|
|
|
# ── blog_posts CRUD ──────────────────────────────────────────────────────────
|
|
|
|
def _post_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"title": r["title"],
|
|
"body": r["body"],
|
|
"excerpt": r["excerpt"],
|
|
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
|
"date": r["date"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_posts() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM blog_posts ORDER BY date DESC, id DESC"
|
|
).fetchall()
|
|
return [_post_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def create_post(title: str, body: str, excerpt: str, tags: List[str], date: str) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO blog_posts (title, body, excerpt, tags, date) VALUES (?, ?, ?, ?, ?)",
|
|
(title, body, excerpt, json.dumps(tags), date),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM blog_posts WHERE rowid = last_insert_rowid()"
|
|
).fetchone()
|
|
return _post_row_to_dict(row)
|
|
|
|
|
|
def update_post(post_id: int, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
allowed = {"title", "body", "excerpt", "tags", "date"}
|
|
updates = {k: v for k, v in fields.items() if k in allowed}
|
|
if not updates:
|
|
with _conn() as conn:
|
|
row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone()
|
|
return _post_row_to_dict(row) if row else None
|
|
|
|
if "tags" in updates:
|
|
updates["tags"] = json.dumps(updates["tags"])
|
|
|
|
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
|
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
|
args = list(updates.values()) + [post_id]
|
|
|
|
with _conn() as conn:
|
|
conn.execute(f"UPDATE blog_posts SET {set_clauses} WHERE id = ?", args)
|
|
row = conn.execute("SELECT * FROM blog_posts WHERE id = ?", (post_id,)).fetchone()
|
|
return _post_row_to_dict(row) if row else None
|
|
|
|
|
|
def delete_post(post_id: int) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM blog_posts WHERE id = ?", (post_id,))
|
|
return cur.rowcount > 0
|
|
|
|
|
|
def upsert_draw(row: Dict[str, Any]) -> None:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(drw_no) DO UPDATE SET
|
|
drw_date=excluded.drw_date,
|
|
n1=excluded.n1, n2=excluded.n2, n3=excluded.n3,
|
|
n4=excluded.n4, n5=excluded.n5, n6=excluded.n6,
|
|
bonus=excluded.bonus,
|
|
updated_at=datetime('now')
|
|
""",
|
|
(
|
|
int(row["drw_no"]),
|
|
str(row["drw_date"]),
|
|
int(row["n1"]), int(row["n2"]), int(row["n3"]),
|
|
int(row["n4"]), int(row["n5"]), int(row["n6"]),
|
|
int(row["bonus"]),
|
|
),
|
|
)
|
|
|
|
def upsert_many_draws(rows: List[Dict[str, Any]]) -> None:
|
|
data = [
|
|
(
|
|
int(r["drw_no"]), str(r["drw_date"]),
|
|
int(r["n1"]), int(r["n2"]), int(r["n3"]),
|
|
int(r["n4"]), int(r["n5"]), int(r["n6"]),
|
|
int(r["bonus"])
|
|
) for r in rows
|
|
]
|
|
with _conn() as conn:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(drw_no) DO UPDATE SET
|
|
drw_date=excluded.drw_date,
|
|
n1=excluded.n1, n2=excluded.n2, n3=excluded.n3,
|
|
n4=excluded.n4, n5=excluded.n5, n6=excluded.n6,
|
|
bonus=excluded.bonus,
|
|
updated_at=datetime('now')
|
|
""",
|
|
data
|
|
)
|
|
|
|
def get_latest_draw() -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT * FROM draws ORDER BY drw_no DESC LIMIT 1").fetchone()
|
|
return dict(r) if r else None
|
|
|
|
def get_draw(drw_no: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT * FROM draws WHERE drw_no = ?", (drw_no,)).fetchone()
|
|
return dict(r) if r else None
|
|
|
|
def count_draws() -> int:
|
|
with _conn() as conn:
|
|
r = conn.execute("SELECT COUNT(*) AS c FROM draws").fetchone()
|
|
return int(r["c"])
|
|
|
|
def get_all_draw_numbers():
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT drw_no, n1, n2, n3, n4, n5, n6 FROM draws ORDER BY drw_no ASC"
|
|
).fetchall()
|
|
return [(int(r["drw_no"]), [int(r["n1"]), int(r["n2"]), int(r["n3"]), int(r["n4"]), int(r["n5"]), int(r["n6"])]) for r in rows]
|
|
|
|
# ---------- ✅ recommendation helpers ----------
|
|
|
|
def _canonical_params(params: dict) -> str:
|
|
return json.dumps(params, sort_keys=True, separators=(",", ":"))
|
|
|
|
def _numbers_sorted_str(numbers: List[int]) -> str:
|
|
return ",".join(str(x) for x in sorted(numbers))
|
|
|
|
def _dedup_hash(based_on_draw: Optional[int], numbers: List[int], params: dict) -> str:
|
|
s = f"{based_on_draw or ''}|{_numbers_sorted_str(numbers)}|{_canonical_params(params)}"
|
|
return hashlib.sha1(s.encode("utf-8")).hexdigest()
|
|
|
|
def save_recommendation_dedup(based_on_draw: Optional[int], numbers: List[int], params: dict) -> Dict[str, Any]:
|
|
"""
|
|
✅ 동일 추천(번호+params+based_on_draw)이면 중복 저장 없이 기존 id 반환
|
|
"""
|
|
ns = _numbers_sorted_str(numbers)
|
|
h = _dedup_hash(based_on_draw, numbers, params)
|
|
|
|
with _conn() as conn:
|
|
# 이미 있으면 반환
|
|
r = conn.execute("SELECT id FROM recommendations WHERE dedup_hash = ?", (h,)).fetchone()
|
|
if r:
|
|
return {"id": int(r["id"]), "saved": False, "deduped": True}
|
|
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO recommendations (based_on_draw, numbers, params, numbers_sorted, dedup_hash)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(based_on_draw, json.dumps(numbers), json.dumps(params), ns, h),
|
|
)
|
|
return {"id": int(cur.lastrowid), "saved": True, "deduped": False}
|
|
|
|
def list_recommendations_ex(
|
|
limit: int = 30,
|
|
offset: int = 0,
|
|
favorite: Optional[bool] = None,
|
|
tag: Optional[str] = None,
|
|
q: Optional[str] = None,
|
|
sort: str = "id_desc", # id_desc|created_desc|favorite_desc
|
|
) -> List[Dict[str, Any]]:
|
|
import json
|
|
|
|
where = []
|
|
args: list[Any] = []
|
|
|
|
if favorite is not None:
|
|
where.append("favorite = ?")
|
|
args.append(1 if favorite else 0)
|
|
|
|
if q:
|
|
where.append("note LIKE ?")
|
|
args.append(f"%{q}%")
|
|
|
|
# tags는 JSON 문자열이므로 단순 LIKE로 처리(가볍게 시작)
|
|
if tag:
|
|
where.append("tags LIKE ?")
|
|
args.append(f"%{tag}%")
|
|
|
|
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
|
|
|
|
if sort == "created_desc":
|
|
order = "created_at DESC"
|
|
elif sort == "favorite_desc":
|
|
# favorite(1)이 먼저, 그 다음 최신
|
|
order = "favorite DESC, id DESC"
|
|
else:
|
|
order = "id DESC"
|
|
|
|
sql = f"""
|
|
SELECT id, created_at, based_on_draw, numbers, params, favorite, note, tags
|
|
FROM recommendations
|
|
{where_sql}
|
|
ORDER BY {order}
|
|
LIMIT ? OFFSET ?
|
|
"""
|
|
args.extend([int(limit), int(offset)])
|
|
|
|
with _conn() as conn:
|
|
rows = conn.execute(sql, args).fetchall()
|
|
|
|
out = []
|
|
for r in rows:
|
|
out.append({
|
|
"id": int(r["id"]),
|
|
"created_at": r["created_at"],
|
|
"based_on_draw": r["based_on_draw"],
|
|
"numbers": json.loads(r["numbers"]),
|
|
"params": json.loads(r["params"]),
|
|
"favorite": bool(r["favorite"]) if r["favorite"] is not None else False,
|
|
"note": r["note"],
|
|
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
|
})
|
|
return out
|
|
|
|
def update_recommendation(rec_id: int, favorite: Optional[bool] = None, note: Optional[str] = None, tags: Optional[List[str]] = None) -> bool:
|
|
fields = []
|
|
args: list[Any] = []
|
|
|
|
if favorite is not None:
|
|
fields.append("favorite = ?")
|
|
args.append(1 if favorite else 0)
|
|
if note is not None:
|
|
fields.append("note = ?")
|
|
args.append(note)
|
|
if tags is not None:
|
|
fields.append("tags = ?")
|
|
args.append(json.dumps(tags))
|
|
|
|
if not fields:
|
|
return False
|
|
|
|
args.append(rec_id)
|
|
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
f"UPDATE recommendations SET {', '.join(fields)} WHERE id = ?",
|
|
args,
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
def delete_recommendation(rec_id: int) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute("DELETE FROM recommendations WHERE id = ?", (rec_id,))
|
|
return cur.rowcount > 0
|
|
|
|
def update_recommendation_result(rec_id: int, rank: int, correct_count: int, has_bonus: bool) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"""
|
|
UPDATE recommendations
|
|
SET rank = ?, correct_count = ?, has_bonus = ?, checked = 1
|
|
WHERE id = ?
|
|
""",
|
|
(rank, correct_count, 1 if has_bonus else 0, rec_id)
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
|
|
# ── 시뮬레이션 CRUD ─────────────────────────────────────────────────────────
|
|
|
|
def save_simulation_run(
|
|
strategy: str,
|
|
total_generated: int,
|
|
top_k_selected: int,
|
|
avg_score: float,
|
|
notes: str = "",
|
|
) -> int:
|
|
"""시뮬레이션 실행 기록 저장, 생성된 ID 반환"""
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO simulation_runs (strategy, total_generated, top_k_selected, avg_score, notes)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(strategy, total_generated, top_k_selected, round(avg_score, 6), notes),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def save_simulation_candidates_bulk(
|
|
run_id: int,
|
|
candidates: List[Dict[str, Any]],
|
|
based_on_draw: Optional[int],
|
|
) -> None:
|
|
"""
|
|
상위 후보들을 simulation_candidates 테이블에 일괄 저장.
|
|
candidates 각 항목: {"numbers": [...], "score_total": ..., "score_*": ..., "is_best": bool}
|
|
"""
|
|
data = [
|
|
(
|
|
run_id,
|
|
json.dumps(sorted(c["numbers"])),
|
|
c["score_total"],
|
|
c.get("score_frequency"),
|
|
c.get("score_fingerprint"),
|
|
c.get("score_gap"),
|
|
c.get("score_cooccur"),
|
|
c.get("score_diversity"),
|
|
1 if c.get("is_best") else 0,
|
|
based_on_draw,
|
|
)
|
|
for c in candidates
|
|
]
|
|
with _conn() as conn:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO simulation_candidates
|
|
(run_id, numbers, score_total, score_frequency, score_fingerprint,
|
|
score_gap, score_cooccur, score_diversity, is_best, based_on_draw)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
data,
|
|
)
|
|
|
|
|
|
def replace_best_picks(
|
|
picks: List[Dict[str, Any]],
|
|
run_id: int,
|
|
based_on_draw: Optional[int],
|
|
) -> None:
|
|
"""
|
|
기존 활성 best_picks를 비활성화하고 새 picks로 교체.
|
|
picks 각 항목: {"numbers": [...], "score_total": ..., "rank_in_run": int}
|
|
"""
|
|
with _conn() as conn:
|
|
conn.execute("UPDATE best_picks SET is_active = 0 WHERE is_active = 1")
|
|
data = [
|
|
(
|
|
json.dumps(sorted(p["numbers"])),
|
|
p["score_total"],
|
|
p.get("rank_in_run"),
|
|
run_id,
|
|
based_on_draw,
|
|
)
|
|
for p in picks
|
|
]
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO best_picks (numbers, score_total, rank_in_run, source_run_id, based_on_draw, is_active)
|
|
VALUES (?, ?, ?, ?, ?, 1)
|
|
""",
|
|
data,
|
|
)
|
|
|
|
|
|
def get_best_picks(limit: int = 20) -> List[Dict[str, Any]]:
|
|
"""현재 활성화된 best_picks 조회 (점수 내림차순)"""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, numbers, score_total, rank_in_run, source_run_id, based_on_draw, created_at
|
|
FROM best_picks
|
|
WHERE is_active = 1
|
|
ORDER BY score_total DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [
|
|
{
|
|
"id": int(r["id"]),
|
|
"numbers": json.loads(r["numbers"]),
|
|
"score_total": r["score_total"],
|
|
"rank_in_run": r["rank_in_run"],
|
|
"source_run_id": r["source_run_id"],
|
|
"based_on_draw": r["based_on_draw"],
|
|
"created_at": r["created_at"],
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def get_simulation_runs(limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""최근 시뮬레이션 실행 기록 조회"""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, run_at, strategy, total_generated, top_k_selected, avg_score, notes
|
|
FROM simulation_runs
|
|
ORDER BY id DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_simulation_candidates(run_id: int, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""특정 시뮬레이션 실행의 후보 목록 조회 (점수 내림차순)"""
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, numbers, score_total, score_frequency, score_fingerprint,
|
|
score_gap, score_cooccur, score_diversity, is_best, based_on_draw, created_at
|
|
FROM simulation_candidates
|
|
WHERE run_id = ?
|
|
ORDER BY score_total DESC
|
|
LIMIT ?
|
|
""",
|
|
(run_id, limit),
|
|
).fetchall()
|
|
return [
|
|
{**dict(r), "numbers": json.loads(r["numbers"])}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
# ── realestate_complexes CRUD ─────────────────────────────────────────────────
|
|
|
|
def _complex_row_to_dict(r) -> Dict[str, Any]:
|
|
return {
|
|
"id": r["id"],
|
|
"name": r["name"],
|
|
"address": r["address"],
|
|
"lat": r["lat"],
|
|
"lng": r["lng"],
|
|
"units": r["units"],
|
|
"types": json.loads(r["types"]) if r["types"] else [],
|
|
"avgPricePerPyeong": r["avg_price_per_pyeong"],
|
|
"subscriptionStart": r["subscription_start"],
|
|
"subscriptionEnd": r["subscription_end"],
|
|
"resultDate": r["result_date"],
|
|
"status": r["status"],
|
|
"priority": r["priority"],
|
|
"tags": json.loads(r["tags"]) if r["tags"] else [],
|
|
"naverUrl": r["naver_url"],
|
|
"floorPlanUrl": r["floor_plan_url"],
|
|
"memo": r["memo"],
|
|
"created_at": r["created_at"],
|
|
"updated_at": r["updated_at"],
|
|
}
|
|
|
|
|
|
def get_all_complexes() -> List[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM realestate_complexes ORDER BY id DESC"
|
|
).fetchall()
|
|
return [_complex_row_to_dict(r) for r in rows]
|
|
|
|
|
|
def get_complex(complex_id: int) -> Optional[Dict[str, Any]]:
|
|
with _conn() as conn:
|
|
r = conn.execute(
|
|
"SELECT * FROM realestate_complexes WHERE id = ?", (complex_id,)
|
|
).fetchone()
|
|
return _complex_row_to_dict(r) if r else None
|
|
|
|
|
|
def create_complex(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO realestate_complexes
|
|
(name, address, lat, lng, units, types, avg_price_per_pyeong,
|
|
subscription_start, subscription_end, result_date,
|
|
status, priority, tags, naver_url, floor_plan_url, memo)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
data["name"],
|
|
data.get("address", ""),
|
|
data.get("lat"),
|
|
data.get("lng"),
|
|
data.get("units"),
|
|
json.dumps(data.get("types", [])),
|
|
data.get("avgPricePerPyeong"),
|
|
data.get("subscriptionStart"),
|
|
data.get("subscriptionEnd"),
|
|
data.get("resultDate"),
|
|
data.get("status", "청약예정"),
|
|
data.get("priority", "normal"),
|
|
json.dumps(data.get("tags", [])),
|
|
data.get("naverUrl", ""),
|
|
data.get("floorPlanUrl", ""),
|
|
data.get("memo", ""),
|
|
),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM realestate_complexes WHERE rowid = last_insert_rowid()"
|
|
).fetchone()
|
|
return _complex_row_to_dict(row)
|
|
|
|
|
|
def update_complex(complex_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
field_map = {
|
|
"name": "name",
|
|
"address": "address",
|
|
"lat": "lat",
|
|
"lng": "lng",
|
|
"units": "units",
|
|
"avgPricePerPyeong": "avg_price_per_pyeong",
|
|
"subscriptionStart": "subscription_start",
|
|
"subscriptionEnd": "subscription_end",
|
|
"resultDate": "result_date",
|
|
"status": "status",
|
|
"priority": "priority",
|
|
"naverUrl": "naver_url",
|
|
"floorPlanUrl": "floor_plan_url",
|
|
"memo": "memo",
|
|
}
|
|
json_fields = {"types", "tags"}
|
|
|
|
updates: Dict[str, Any] = {}
|
|
for camel, snake in field_map.items():
|
|
if camel in data:
|
|
updates[snake] = data[camel]
|
|
for f in json_fields:
|
|
if f in data:
|
|
updates[f] = json.dumps(data[f])
|
|
|
|
if not updates:
|
|
return get_complex(complex_id)
|
|
|
|
set_clauses = ", ".join(f"{k} = ?" for k in updates)
|
|
set_clauses += ", updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')"
|
|
args = list(updates.values()) + [complex_id]
|
|
|
|
with _conn() as conn:
|
|
conn.execute(
|
|
f"UPDATE realestate_complexes SET {set_clauses} WHERE id = ?", args
|
|
)
|
|
row = conn.execute(
|
|
"SELECT * FROM realestate_complexes WHERE id = ?", (complex_id,)
|
|
).fetchone()
|
|
return _complex_row_to_dict(row) if row else None
|
|
|
|
|
|
def delete_complex(complex_id: int) -> bool:
|
|
with _conn() as conn:
|
|
cur = conn.execute(
|
|
"DELETE FROM realestate_complexes WHERE id = ?", (complex_id,)
|
|
)
|
|
return cur.rowcount > 0
|
|
|