Files
web-page-backend/lotto/app/backtest.py

226 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비)."""
import logging
import random
from typing import Any, Dict, List, Optional, Tuple
from .analyzer import build_analysis_cache, build_number_weights, score_combination
from .utils import weighted_sample_6
def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]:
"""티켓 묶음을 당첨번호로 채점 → 매칭 히스토그램 + 보너스 + best_match.
2등 판정: 5일치 AND 보너스 번호를 티켓이 포함."""
win = set(winning6)
hist = {"m3": 0, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0}
best = 0
for t in tickets:
c = len(set(t) & win)
if c > best:
best = c
if c == 6:
hist["m6"] += 1
elif c == 5:
hist["m5"] += 1
if bonus in t:
hist["bonus_hits"] += 1
elif c == 4:
hist["m4"] += 1
elif c == 3:
hist["m3"] += 1
return {**hist, "best_match": best}
def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]:
"""매칭 히스토그램 → 등수 카운트.
1등=m6, 2등=bonus_hits, 3등=m5bonus_hits, 4등=m4, 5등=m3."""
return {
"1st": hist.get("m6", 0),
"2nd": hist.get("bonus_hits", 0),
"3rd": hist.get("m5", 0) - hist.get("bonus_hits", 0),
"4th": hist.get("m4", 0),
"5th": hist.get("m3", 0),
}
def generate_pool(cache, number_weights, n: int = 20000,
seed: Optional[int] = None) -> List[List[int]]:
"""가중 샘플링으로 distinct 후보 풀 생성."""
rng = random.Random(seed)
seen, pool = set(), []
attempts, cap = 0, n * 4
while len(pool) < n and attempts < cap:
attempts += 1
nums = tuple(sorted(weighted_sample_6(number_weights)))
if nums in seen:
continue
seen.add(nums)
pool.append(list(nums))
if len(pool) < n:
logging.getLogger(__name__).warning(
"generate_pool: requested %d, got %d", n, len(pool)
)
return pool
def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]:
"""풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct."""
if k > len(pool):
raise ValueError(f"k={k} exceeds pool size {len(pool)}")
ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"])
return ranked[:k]
def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
"""무작위 distinct 티켓 k장 (null-model 대조군)."""
rng = random.Random(seed)
seen, out = set(), []
guard = 0
while len(out) < k and guard < k * 200:
guard += 1
nums = tuple(sorted(rng.sample(range(1, 46), 6)))
if nums in seen:
continue
seen.add(nums)
out.append(list(nums))
return out
def point_in_time_draws(draws: List[Tuple[int, List[int]]],
target_draw_no: int) -> List[Tuple[int, List[int]]]:
"""target 회차 추첨 '직전' 시점의 데이터 — target_draw_no 미만만."""
return [(d, nums) for d, nums in draws if d < target_draw_no]
def calibrate_winner_compute(draws, target_draw_no, winning6,
sample_m: int = 2000, seed: Optional[int] = None) -> Dict[str, Any]:
"""순수 연산: point-in-time 캐시로 당첨조합 채점 + 무작위 M표본 percentile."""
pit = point_in_time_draws(draws, target_draw_no)
cache = build_analysis_cache(pit)
scores = score_combination(sorted(winning6), cache)
win_total = scores["score_total"]
samples = random_null_tickets(sample_m, seed=seed)
le = sum(1 for t in samples
if score_combination(t, cache)["score_total"] <= win_total)
percentile = le / max(len(samples), 1)
return {"scores": scores, "percentile": percentile, "cache_draws": len(pit)}
MIN_HISTORY = 30 # point-in-time 캐시 최소 회차 (이 미만은 캘리브레이션 skip)
def _db():
from . import db as _db_mod
return _db_mod
def calibrate_winner(draw_no: int, sample_m: int = 2000, draws=None) -> Dict[str, Any]:
"""DB 진입점: 회차 1개 캘리브레이션 후 저장 (멱등).
draws를 외부에서 전달하면 N+1 조회를 방지한다."""
db = _db()
if draws is None:
draws = db.get_all_draw_numbers()
row = db.get_draw(draw_no)
if row is None:
return {"ok": False, "reason": "no_draw"}
pit = point_in_time_draws(draws, draw_no)
if len(pit) < MIN_HISTORY:
return {"ok": False, "reason": "insufficient_history"}
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
res = calibrate_winner_compute(draws, draw_no, winning6, sample_m=sample_m)
db.save_winner_calibration(
draw_no=draw_no, winning=winning6, scores=res["scores"],
percentile=res["percentile"], my_pick_avg=None,
cache_draws=res["cache_draws"],
)
return {"ok": True, "draw_no": draw_no, **res}
def backfill_calibration(batch: int = 50, sample_m: int = 2000) -> Dict[str, Any]:
"""미처리 회차만 batch개 캘리브레이션 (멱등·재개 가능)."""
db = _db()
draws = db.get_all_draw_numbers()
done = db.get_calibrated_draw_nos()
todo = [d for d, _ in draws if d not in done and d > MIN_HISTORY]
todo.sort()
n = 0
for draw_no in todo[:batch]:
r = calibrate_winner(draw_no, sample_m=sample_m, draws=draws)
if r.get("ok"):
n += 1
return {"calibrated": n, "remaining": max(0, len(todo) - batch)}
def run_forward_purchase(draw_no: int, k: int = 5000, pool_n: int = 20000,
sample_seed: Optional[int] = None) -> Dict[str, Any]:
"""회차 추첨 '직전' 시점 데이터로 3전략 구매 → 당첨번호로 채점 → 저장(멱등).
engine_w: 그 주 weight_trials 6개(없으면 current_base 1개)로 각각 구매."""
db = _db()
draws = db.get_all_draw_numbers()
row = db.get_draw(draw_no)
if row is None:
return {"ok": False, "reason": "no_draw"}
pit = point_in_time_draws(draws, draw_no)
if len(pit) < MIN_HISTORY:
return {"ok": False, "reason": "insufficient_history"}
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
bonus = row["bonus"]
cache = build_analysis_cache(pit)
nw = build_number_weights(cache)
pool = generate_pool(cache, nw, n=pool_n, seed=sample_seed)
def _store(strategy, label, weight_json, trial_id, tickets):
graded = grade_tickets(tickets, winning6, bonus)
avg_meta = (sum(score_combination(t, cache)["score_total"] for t in tickets)
/ max(len(tickets), 1))
db.save_backtest_run(
draw_no=draw_no, strategy=strategy, weight_label=label,
weight_json=weight_json, trial_id=trial_id, n_tickets=len(tickets),
hist=graded, best_match=graded["best_match"], avg_meta_score=avg_meta,
)
# 1) engine_w — 그 주 trials(있으면) 아니면 uniform fallback (leak-free)
from datetime import date as _date
from . import weight_evolver as we
draw_date = _date.fromisoformat(row["drw_date"])
week_start = we.get_week_start(draw_date)
trials = db.get_weekly_trials(week_start)
if trials:
for t in trials:
bought = purchase_tickets(pool, cache, t["weight"], k)
_store("engine_w", f"w{t['day_of_week']}", t["weight"], t["id"], bought)
else:
base = [0.2] * 5
bought = purchase_tickets(pool, cache, base, k)
_store("engine_w", "base", base, None, bought)
# 2) random_null
_store("random_null", "-", None, None, random_null_tickets(k, seed=sample_seed))
# 3) coverage
_store("coverage", "-", None, None, coverage_tickets(k, seed=sample_seed))
return {"ok": True, "draw_no": draw_no}
def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
"""greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산.
(휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)"""
rng = random.Random(seed)
usage = {n: 0 for n in range(1, 46)}
seen, out = set(), []
guard = 0
while len(out) < k and guard < k * 50:
guard += 1
ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random()))
nums = tuple(sorted(ranked[:6]))
if nums in seen:
# 동점 흔들기: top-6과 disjoint한 영역에서 샘플
nums = tuple(sorted(rng.sample(ranked[6:12], 6)))
if nums in seen:
continue
seen.add(nums)
out.append(list(nums))
for n in nums:
usage[n] += 1
return out