chore: initial backend + travel-proxy dockerized setup for NAS deployment

This commit is contained in:
2026-01-25 11:27:18 +09:00
parent 0fa163c00a
commit 9c9968b9a7
15 changed files with 1219 additions and 0 deletions

18
backend/Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*
COPY app/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY app /app/app
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

55
backend/app/collector.py Normal file
View File

@@ -0,0 +1,55 @@
import requests
from typing import Dict, Any
from .db import get_draw, upsert_draw
def _normalize_item(item: dict) -> dict:
# smok95 all.json / latest.json 구조
# - draw_no: int
# - numbers: [n1..n6]
# - bonus_no: int
# - date: "YYYY-MM-DD ..."
numbers = item["numbers"]
return {
"drw_no": int(item["draw_no"]),
"drw_date": (item.get("date") or "")[:10],
"n1": int(numbers[0]),
"n2": int(numbers[1]),
"n3": int(numbers[2]),
"n4": int(numbers[3]),
"n5": int(numbers[4]),
"n6": int(numbers[5]),
"bonus": int(item["bonus_no"]),
}
def sync_all_from_json(all_url: str) -> Dict[str, Any]:
r = requests.get(all_url, timeout=60)
r.raise_for_status()
data = r.json() # list[dict]
inserted = 0
skipped = 0
for item in data:
row = _normalize_item(item)
if get_draw(row["drw_no"]):
skipped += 1
continue
upsert_draw(row)
inserted += 1
return {"mode": "all_json", "url": all_url, "inserted": inserted, "skipped": skipped, "total": len(data)}
def sync_latest(latest_url: str) -> Dict[str, Any]:
r = requests.get(latest_url, timeout=30)
r.raise_for_status()
item = r.json()
row = _normalize_item(item)
before = get_draw(row["drw_no"])
upsert_draw(row)
return {"mode": "latest_json", "url": latest_url, "was_new": (before is None), "drawNo": row["drw_no"]}

239
backend/app/db.py Normal file
View File

@@ -0,0 +1,239 @@
# backend/app/db.py
import os
import sqlite3
import json
import hashlib
from typing import Any, Dict, Optional, List
DB_PATH = "/app/data/lotto.db"
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def _ensure_column(conn: sqlite3.Connection, table: str, col: str, ddl: str) -> None:
cols = {r["name"] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()}
if col not in cols:
conn.execute(ddl)
def init_db() -> None:
with _conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS draws (
drw_no INTEGER PRIMARY KEY,
drw_date TEXT NOT NULL,
n1 INTEGER NOT NULL,
n2 INTEGER NOT NULL,
n3 INTEGER NOT NULL,
n4 INTEGER NOT NULL,
n5 INTEGER NOT NULL,
n6 INTEGER NOT NULL,
bonus INTEGER NOT NULL,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_draws_date ON draws(drw_date);")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
based_on_draw INTEGER,
numbers TEXT NOT NULL,
params TEXT NOT NULL
);
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_reco_created ON recommendations(created_at DESC);")
# ✅ 확장 컬럼들(기존 DB에도 자동 추가)
_ensure_column(conn, "recommendations", "numbers_sorted",
"ALTER TABLE recommendations ADD COLUMN numbers_sorted TEXT;")
_ensure_column(conn, "recommendations", "dedup_hash",
"ALTER TABLE recommendations ADD COLUMN dedup_hash TEXT;")
_ensure_column(conn, "recommendations", "favorite",
"ALTER TABLE recommendations ADD COLUMN favorite INTEGER NOT NULL DEFAULT 0;")
_ensure_column(conn, "recommendations", "note",
"ALTER TABLE recommendations ADD COLUMN note TEXT NOT NULL DEFAULT '';")
_ensure_column(conn, "recommendations", "tags",
"ALTER TABLE recommendations ADD COLUMN tags TEXT NOT NULL DEFAULT '[]';")
# ✅ UNIQUE 인덱스(중복 저장 방지)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_reco_dedup ON recommendations(dedup_hash);")
def upsert_draw(row: Dict[str, Any]) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO draws (drw_no, drw_date, n1, n2, n3, n4, n5, n6, bonus)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(drw_no) DO UPDATE SET
drw_date=excluded.drw_date,
n1=excluded.n1, n2=excluded.n2, n3=excluded.n3,
n4=excluded.n4, n5=excluded.n5, n6=excluded.n6,
bonus=excluded.bonus,
updated_at=datetime('now')
""",
(
int(row["drw_no"]),
str(row["drw_date"]),
int(row["n1"]), int(row["n2"]), int(row["n3"]),
int(row["n4"]), int(row["n5"]), int(row["n6"]),
int(row["bonus"]),
),
)
def get_latest_draw() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM draws ORDER BY drw_no DESC LIMIT 1").fetchone()
return dict(r) if r else None
def get_draw(drw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM draws WHERE drw_no = ?", (drw_no,)).fetchone()
return dict(r) if r else None
def count_draws() -> int:
with _conn() as conn:
r = conn.execute("SELECT COUNT(*) AS c FROM draws").fetchone()
return int(r["c"])
def get_all_draw_numbers():
with _conn() as conn:
rows = conn.execute(
"SELECT drw_no, n1, n2, n3, n4, n5, n6 FROM draws ORDER BY drw_no ASC"
).fetchall()
return [(int(r["drw_no"]), [int(r["n1"]), int(r["n2"]), int(r["n3"]), int(r["n4"]), int(r["n5"]), int(r["n6"])]) for r in rows]
# ---------- ✅ recommendation helpers ----------
def _canonical_params(params: dict) -> str:
return json.dumps(params, sort_keys=True, separators=(",", ":"))
def _numbers_sorted_str(numbers: List[int]) -> str:
return ",".join(str(x) for x in sorted(numbers))
def _dedup_hash(based_on_draw: Optional[int], numbers: List[int], params: dict) -> str:
s = f"{based_on_draw or ''}|{_numbers_sorted_str(numbers)}|{_canonical_params(params)}"
return hashlib.sha1(s.encode("utf-8")).hexdigest()
def save_recommendation_dedup(based_on_draw: Optional[int], numbers: List[int], params: dict) -> Dict[str, Any]:
"""
✅ 동일 추천(번호+params+based_on_draw)이면 중복 저장 없이 기존 id 반환
"""
ns = _numbers_sorted_str(numbers)
h = _dedup_hash(based_on_draw, numbers, params)
with _conn() as conn:
# 이미 있으면 반환
r = conn.execute("SELECT id FROM recommendations WHERE dedup_hash = ?", (h,)).fetchone()
if r:
return {"id": int(r["id"]), "saved": False, "deduped": True}
cur = conn.execute(
"""
INSERT INTO recommendations (based_on_draw, numbers, params, numbers_sorted, dedup_hash)
VALUES (?, ?, ?, ?, ?)
""",
(based_on_draw, json.dumps(numbers), json.dumps(params), ns, h),
)
return {"id": int(cur.lastrowid), "saved": True, "deduped": False}
def list_recommendations_ex(
limit: int = 30,
offset: int = 0,
favorite: Optional[bool] = None,
tag: Optional[str] = None,
q: Optional[str] = None,
sort: str = "id_desc", # id_desc|created_desc|favorite_desc
) -> List[Dict[str, Any]]:
import json
where = []
args: list[Any] = []
if favorite is not None:
where.append("favorite = ?")
args.append(1 if favorite else 0)
if q:
where.append("note LIKE ?")
args.append(f"%{q}%")
# tags는 JSON 문자열이므로 단순 LIKE로 처리(가볍게 시작)
if tag:
where.append("tags LIKE ?")
args.append(f"%{tag}%")
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
if sort == "created_desc":
order = "created_at DESC"
elif sort == "favorite_desc":
# favorite(1)이 먼저, 그 다음 최신
order = "favorite DESC, id DESC"
else:
order = "id DESC"
sql = f"""
SELECT id, created_at, based_on_draw, numbers, params, favorite, note, tags
FROM recommendations
{where_sql}
ORDER BY {order}
LIMIT ? OFFSET ?
"""
args.extend([int(limit), int(offset)])
with _conn() as conn:
rows = conn.execute(sql, args).fetchall()
out = []
for r in rows:
out.append({
"id": int(r["id"]),
"created_at": r["created_at"],
"based_on_draw": r["based_on_draw"],
"numbers": json.loads(r["numbers"]),
"params": json.loads(r["params"]),
"favorite": bool(r["favorite"]) if r["favorite"] is not None else False,
"note": r["note"],
"tags": json.loads(r["tags"]) if r["tags"] else [],
})
return out
def update_recommendation(rec_id: int, favorite: Optional[bool] = None, note: Optional[str] = None, tags: Optional[List[str]] = None) -> bool:
fields = []
args: list[Any] = []
if favorite is not None:
fields.append("favorite = ?")
args.append(1 if favorite else 0)
if note is not None:
fields.append("note = ?")
args.append(note)
if tags is not None:
fields.append("tags = ?")
args.append(json.dumps(tags))
if not fields:
return False
args.append(rec_id)
with _conn() as conn:
cur = conn.execute(
f"UPDATE recommendations SET {', '.join(fields)} WHERE id = ?",
args,
)
return cur.rowcount > 0
def delete_recommendation(rec_id: int) -> bool:
with _conn() as conn:
cur = conn.execute("DELETE FROM recommendations WHERE id = ?", (rec_id,))
return cur.rowcount > 0

344
backend/app/main.py Normal file
View File

@@ -0,0 +1,344 @@
import os
from typing import Optional, List, Dict, Any, Tuple
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from apscheduler.schedulers.background import BackgroundScheduler
from .db import (
init_db, get_draw, get_latest_draw, get_all_draw_numbers,
save_recommendation_dedup, list_recommendations_ex, delete_recommendation,
update_recommendation,
)
from .recommender import recommend_numbers
from .collector import sync_latest
app = FastAPI()
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json")
LATEST_URL = os.getenv("LOTTO_LATEST_URL", "https://smok95.github.io/lotto/results/latest.json")
def calc_metrics(numbers: List[int]) -> Dict[str, Any]:
nums = sorted(numbers)
s = sum(nums)
odd = sum(1 for x in nums if x % 2 == 1)
even = len(nums) - odd
mn, mx = nums[0], nums[-1]
rng = mx - mn
# 1-10, 11-20, 21-30, 31-40, 41-45
buckets = {
"1-10": 0,
"11-20": 0,
"21-30": 0,
"31-40": 0,
"41-45": 0,
}
for x in nums:
if 1 <= x <= 10:
buckets["1-10"] += 1
elif 11 <= x <= 20:
buckets["11-20"] += 1
elif 21 <= x <= 30:
buckets["21-30"] += 1
elif 31 <= x <= 40:
buckets["31-40"] += 1
else:
buckets["41-45"] += 1
return {
"sum": s,
"odd": odd,
"even": even,
"min": mn,
"max": mx,
"range": rng,
"buckets": buckets,
}
def calc_recent_overlap(numbers: List[int], draws: List[Tuple[int, List[int]]], last_k: int) -> Dict[str, Any]:
"""
draws: [(drw_no, [n1..n6]), ...] 오름차순
last_k: 최근 k회 기준 중복
"""
if last_k <= 0:
return {"last_k": 0, "repeats": 0, "repeated_numbers": []}
recent = draws[-last_k:] if len(draws) >= last_k else draws
recent_set = set()
for _, nums in recent:
recent_set.update(nums)
repeated = sorted(set(numbers) & recent_set)
return {
"last_k": len(recent),
"repeats": len(repeated),
"repeated_numbers": repeated,
}
@app.on_event("startup")
def on_startup():
init_db()
scheduler.add_job(lambda: sync_latest(LATEST_URL), "cron", hour="9,21", minute=10)
scheduler.start()
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/lotto/latest")
def api_latest():
row = get_latest_draw()
if not row:
raise HTTPException(status_code=404, detail="No data yet")
return {
"drawNo": row["drw_no"],
"date": row["drw_date"],
"numbers": [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]],
"bonus": row["bonus"],
}
@app.get("/api/lotto/{drw_no:int}")
def api_draw(drw_no: int):
row = get_draw(drw_no)
if not row:
raise HTTPException(status_code=404, detail="Not found")
return {
"drwNo": row["drw_no"],
"date": row["drw_date"],
"numbers": [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]],
"bonus": row["bonus"],
}
@app.post("/api/admin/sync_latest")
def admin_sync_latest():
return sync_latest(LATEST_URL)
# ---------- ✅ recommend (dedup save) ----------
@app.get("/api/lotto/recommend")
def api_recommend(
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
# ---- optional constraints (Lotto Lab) ----
sum_min: Optional[int] = None,
sum_max: Optional[int] = None,
odd_min: Optional[int] = None,
odd_max: Optional[int] = None,
range_min: Optional[int] = None,
range_max: Optional[int] = None,
max_overlap_latest: Optional[int] = None, # 최근 avoid_recent_k 회차와 중복 허용 개수
max_try: int = 200, # 조건 맞는 조합 찾기 재시도
):
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
latest = get_latest_draw()
params = {
"recent_window": recent_window,
"recent_weight": float(recent_weight),
"avoid_recent_k": avoid_recent_k,
"sum_min": sum_min,
"sum_max": sum_max,
"odd_min": odd_min,
"odd_max": odd_max,
"range_min": range_min,
"range_max": range_max,
"max_overlap_latest": max_overlap_latest,
"max_try": int(max_try),
}
def _accept(nums: List[int]) -> bool:
m = calc_metrics(nums)
if sum_min is not None and m["sum"] < sum_min:
return False
if sum_max is not None and m["sum"] > sum_max:
return False
if odd_min is not None and m["odd"] < odd_min:
return False
if odd_max is not None and m["odd"] > odd_max:
return False
if range_min is not None and m["range"] < range_min:
return False
if range_max is not None and m["range"] > range_max:
return False
if max_overlap_latest is not None:
ov = calc_recent_overlap(nums, draws, last_k=avoid_recent_k)
if ov["repeats"] > max_overlap_latest:
return False
return True
chosen = None
explain = None
tries = 0
while tries < max_try:
tries += 1
result = recommend_numbers(
draws,
recent_window=recent_window,
recent_weight=recent_weight,
avoid_recent_k=avoid_recent_k,
)
nums = result["numbers"]
if _accept(nums):
chosen = nums
explain = result["explain"]
break
if chosen is None:
raise HTTPException(
status_code=400,
detail=f"Constraints too strict. No valid set found in max_try={max_try}. "
f"Try relaxing sum/odd/range/overlap constraints.",
)
# ✅ dedup save
saved = save_recommendation_dedup(
latest["drw_no"] if latest else None,
chosen,
params,
)
metrics = calc_metrics(chosen)
overlap = calc_recent_overlap(chosen, draws, last_k=avoid_recent_k)
return {
"id": saved["id"],
"saved": saved["saved"],
"deduped": saved["deduped"],
"based_on_latest_draw": latest["drw_no"] if latest else None,
"numbers": chosen,
"explain": explain,
"params": params,
"metrics": metrics,
"recent_overlap": overlap,
"tries": tries,
}
# ---------- ✅ history list (filter/paging) ----------
@app.get("/api/history")
def api_history(
limit: int = 30,
offset: int = 0,
favorite: Optional[bool] = None,
tag: Optional[str] = None,
q: Optional[str] = None,
sort: str = "id_desc",
):
items = list_recommendations_ex(
limit=limit,
offset=offset,
favorite=favorite,
tag=tag,
q=q,
sort=sort,
)
draws = get_all_draw_numbers()
out = []
for it in items:
nums = it["numbers"]
out.append({
**it,
"metrics": calc_metrics(nums),
"recent_overlap": calc_recent_overlap(
nums, draws, last_k=int(it["params"].get("avoid_recent_k", 0) or 0)
),
})
return {
"items": out,
"limit": limit,
"offset": offset,
"filters": {"favorite": favorite, "tag": tag, "q": q, "sort": sort},
}
@app.delete("/api/history/{rec_id:int}")
def api_history_delete(rec_id: int):
ok = delete_recommendation(rec_id)
if not ok:
raise HTTPException(status_code=404, detail="Not found")
return {"deleted": True, "id": rec_id}
# ---------- ✅ history update (favorite/note/tags) ----------
class HistoryUpdate(BaseModel):
favorite: Optional[bool] = None
note: Optional[str] = None
tags: Optional[List[str]] = None
@app.patch("/api/history/{rec_id:int}")
def api_history_patch(rec_id: int, body: HistoryUpdate):
ok = update_recommendation(rec_id, favorite=body.favorite, note=body.note, tags=body.tags)
if not ok:
raise HTTPException(status_code=404, detail="Not found or no changes")
return {"updated": True, "id": rec_id}
# ---------- ✅ batch recommend ----------
def _batch_unique(draws, count: int, recent_window: int, recent_weight: float, avoid_recent_k: int, max_try: int = 200):
items = []
seen = set()
tries = 0
while len(items) < count and tries < max_try:
tries += 1
r = recommend_numbers(draws, recent_window=recent_window, recent_weight=recent_weight, avoid_recent_k=avoid_recent_k)
key = tuple(sorted(r["numbers"]))
if key in seen:
continue
seen.add(key)
items.append(r)
return items
@app.get("/api/lotto/recommend/batch")
def api_recommend_batch(
count: int = 5,
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
):
count = max(1, min(count, 20))
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
latest = get_latest_draw()
params = {
"recent_window": recent_window,
"recent_weight": float(recent_weight),
"avoid_recent_k": avoid_recent_k,
"count": count,
}
items = _batch_unique(draws, count, recent_window, float(recent_weight), avoid_recent_k)
return {
"based_on_latest_draw": latest["drw_no"] if latest else None,
"count": count,
"items": [{"numbers": it["numbers"], "explain": it["explain"]} for it in items],
"params": params,
}
class BatchSave(BaseModel):
items: List[List[int]]
params: dict
@app.post("/api/lotto/recommend/batch")
def api_recommend_batch_save(body: BatchSave):
latest = get_latest_draw()
based = latest["drw_no"] if latest else None
created, deduped = [], []
for nums in body.items:
saved = save_recommendation_dedup(based, nums, body.params)
(created if saved["saved"] else deduped).append(saved["id"])
return {"saved": True, "created_ids": created, "deduped_ids": deduped}

View File

@@ -0,0 +1,68 @@
import random
from collections import Counter
from typing import Dict, Any, List, Tuple
def recommend_numbers(
draws: List[Tuple[int, List[int]]],
*,
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
seed: int | None = None,
) -> Dict[str, Any]:
"""
가벼운 통계 기반 추천:
- 전체 빈도 + 최근(recent_window) 빈도에 가중치를 더한 가중 샘플링
- 최근 avoid_recent_k 회차에 나온 번호는 확률을 낮춤(완전 제외는 아님)
"""
if seed is not None:
random.seed(seed)
# 전체 빈도
all_nums = [n for _, nums in draws for n in nums]
freq_all = Counter(all_nums)
# 최근 빈도
recent = draws[-recent_window:] if len(draws) >= recent_window else draws
recent_nums = [n for _, nums in recent for n in nums]
freq_recent = Counter(recent_nums)
# 최근 k회차 번호(패널티)
last_k = draws[-avoid_recent_k:] if len(draws) >= avoid_recent_k else draws
last_k_nums = set(n for _, nums in last_k for n in nums)
# 가중치 구성
weights = {}
for n in range(1, 46):
w = freq_all[n] + recent_weight * freq_recent[n]
if n in last_k_nums:
w *= 0.6 # 최근에 너무 방금 나온 건 살짝 덜 뽑히게
weights[n] = max(w, 0.1)
# 중복 없이 6개 뽑기(가중 샘플링)
chosen = []
pool = list(range(1, 46))
for _ in range(6):
total = sum(weights[n] for n in pool)
r = random.random() * total
acc = 0.0
for n in pool:
acc += weights[n]
if acc >= r:
chosen.append(n)
pool.remove(n)
break
chosen_sorted = sorted(chosen)
explain = {
"recent_window": recent_window,
"recent_weight": recent_weight,
"avoid_recent_k": avoid_recent_k,
"top_all": [n for n, _ in freq_all.most_common(10)],
"top_recent": [n for n, _ in freq_recent.most_common(10)],
"last_k_draws": [d for d, _ in last_k],
}
return {"numbers": chosen_sorted, "explain": explain}

View File

@@ -0,0 +1,5 @@
fastapi==0.115.6
uvicorn[standard]==0.30.6
requests==2.32.3
beautifulsoup4==4.12.3
APScheduler==3.10.4