273 lines
10 KiB
Python
273 lines
10 KiB
Python
"""로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비)."""
|
||
import logging
|
||
import random
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from .analyzer import build_analysis_cache, build_number_weights, score_combination
|
||
from .utils import weighted_sample_6
|
||
|
||
# engine_w trials 수와 동일하게 맞춰 selection bias를 상쇄한다.
|
||
N_NULL_TRIALS = 6
|
||
|
||
|
||
def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]:
|
||
"""티켓 묶음을 당첨번호로 채점 → 매칭 히스토그램 + 보너스 + best_match.
|
||
2등 판정: 5일치 AND 보너스 번호를 티켓이 포함."""
|
||
win = set(winning6)
|
||
hist = {"m3": 0, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0}
|
||
best = 0
|
||
for t in tickets:
|
||
c = len(set(t) & win)
|
||
if c > best:
|
||
best = c
|
||
if c == 6:
|
||
hist["m6"] += 1
|
||
elif c == 5:
|
||
hist["m5"] += 1
|
||
if bonus in t:
|
||
hist["bonus_hits"] += 1
|
||
elif c == 4:
|
||
hist["m4"] += 1
|
||
elif c == 3:
|
||
hist["m3"] += 1
|
||
return {**hist, "best_match": best}
|
||
|
||
|
||
def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]:
|
||
"""매칭 히스토그램 → 등수 카운트.
|
||
1등=m6, 2등=bonus_hits, 3등=m5−bonus_hits, 4등=m4, 5등=m3."""
|
||
return {
|
||
"1st": hist.get("m6", 0),
|
||
"2nd": hist.get("bonus_hits", 0),
|
||
"3rd": hist.get("m5", 0) - hist.get("bonus_hits", 0),
|
||
"4th": hist.get("m4", 0),
|
||
"5th": hist.get("m3", 0),
|
||
}
|
||
|
||
|
||
def generate_pool(cache, number_weights, n: int = 20000,
|
||
seed: Optional[int] = None) -> List[List[int]]:
|
||
"""가중 샘플링으로 distinct 후보 풀 생성."""
|
||
rng = random.Random(seed)
|
||
seen, pool = set(), []
|
||
attempts, cap = 0, n * 4
|
||
while len(pool) < n and attempts < cap:
|
||
attempts += 1
|
||
nums = tuple(sorted(weighted_sample_6(number_weights)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
pool.append(list(nums))
|
||
if len(pool) < n:
|
||
logging.getLogger(__name__).warning(
|
||
"generate_pool: requested %d, got %d", n, len(pool)
|
||
)
|
||
return pool
|
||
|
||
|
||
def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]:
|
||
"""풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct."""
|
||
if k > len(pool):
|
||
raise ValueError(f"k={k} exceeds pool size {len(pool)}")
|
||
ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"])
|
||
return ranked[:k]
|
||
|
||
|
||
def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
|
||
"""무작위 distinct 티켓 k장 (null-model 대조군)."""
|
||
rng = random.Random(seed)
|
||
seen, out = set(), []
|
||
guard = 0
|
||
while len(out) < k and guard < k * 200:
|
||
guard += 1
|
||
nums = tuple(sorted(rng.sample(range(1, 46), 6)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
out.append(list(nums))
|
||
return out
|
||
|
||
|
||
def point_in_time_draws(draws: List[Tuple[int, List[int]]],
|
||
target_draw_no: int) -> List[Tuple[int, List[int]]]:
|
||
"""target 회차 추첨 '직전' 시점의 데이터 — target_draw_no 미만만."""
|
||
return [(d, nums) for d, nums in draws if d < target_draw_no]
|
||
|
||
|
||
def calibrate_winner_compute(draws, target_draw_no, winning6,
|
||
sample_m: int = 2000, seed: Optional[int] = None) -> Dict[str, Any]:
|
||
"""순수 연산: point-in-time 캐시로 당첨조합 채점 + 무작위 M표본 percentile."""
|
||
pit = point_in_time_draws(draws, target_draw_no)
|
||
cache = build_analysis_cache(pit)
|
||
scores = score_combination(sorted(winning6), cache)
|
||
win_total = scores["score_total"]
|
||
samples = random_null_tickets(sample_m, seed=seed)
|
||
le = sum(1 for t in samples
|
||
if score_combination(t, cache)["score_total"] <= win_total)
|
||
percentile = le / max(len(samples), 1)
|
||
return {"scores": scores, "percentile": percentile, "cache_draws": len(pit)}
|
||
|
||
|
||
MIN_HISTORY = 30 # point-in-time 캐시 최소 회차 (이 미만은 캘리브레이션 skip)
|
||
|
||
|
||
def _db():
|
||
from . import db as _db_mod
|
||
return _db_mod
|
||
|
||
|
||
def calibrate_winner(draw_no: int, sample_m: int = 2000, draws=None) -> Dict[str, Any]:
|
||
"""DB 진입점: 회차 1개 캘리브레이션 후 저장 (멱등).
|
||
draws를 외부에서 전달하면 N+1 조회를 방지한다."""
|
||
db = _db()
|
||
if draws is None:
|
||
draws = db.get_all_draw_numbers()
|
||
row = db.get_draw(draw_no)
|
||
if row is None:
|
||
return {"ok": False, "reason": "no_draw"}
|
||
pit = point_in_time_draws(draws, draw_no)
|
||
if len(pit) < MIN_HISTORY:
|
||
return {"ok": False, "reason": "insufficient_history"}
|
||
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
|
||
res = calibrate_winner_compute(draws, draw_no, winning6, sample_m=sample_m)
|
||
db.save_winner_calibration(
|
||
draw_no=draw_no, winning=winning6, scores=res["scores"],
|
||
percentile=res["percentile"], my_pick_avg=None,
|
||
cache_draws=res["cache_draws"],
|
||
)
|
||
return {"ok": True, "draw_no": draw_no, **res}
|
||
|
||
|
||
def backfill_calibration(batch: int = 50, sample_m: int = 2000) -> Dict[str, Any]:
|
||
"""미처리 회차만 batch개 캘리브레이션 (멱등·재개 가능)."""
|
||
db = _db()
|
||
draws = db.get_all_draw_numbers()
|
||
done = db.get_calibrated_draw_nos()
|
||
todo = [d for d, _ in draws if d not in done and d > MIN_HISTORY]
|
||
todo.sort()
|
||
n = 0
|
||
for draw_no in todo[:batch]:
|
||
r = calibrate_winner(draw_no, sample_m=sample_m, draws=draws)
|
||
if r.get("ok"):
|
||
n += 1
|
||
return {"calibrated": n, "remaining": max(0, len(todo) - batch)}
|
||
|
||
|
||
def run_forward_purchase(draw_no: int, k: int = 5000, pool_n: int = 20000,
|
||
sample_seed: Optional[int] = None) -> Dict[str, Any]:
|
||
"""회차 추첨 '직전' 시점 데이터로 3전략 구매 → 당첨번호로 채점 → 저장(멱등).
|
||
engine_w: 그 주 weight_trials 6개(없으면 current_base 1개)로 각각 구매."""
|
||
db = _db()
|
||
draws = db.get_all_draw_numbers()
|
||
row = db.get_draw(draw_no)
|
||
if row is None:
|
||
return {"ok": False, "reason": "no_draw"}
|
||
pit = point_in_time_draws(draws, draw_no)
|
||
if len(pit) < MIN_HISTORY:
|
||
return {"ok": False, "reason": "insufficient_history"}
|
||
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
|
||
bonus = row["bonus"]
|
||
|
||
cache = build_analysis_cache(pit)
|
||
nw = build_number_weights(cache)
|
||
pool = generate_pool(cache, nw, n=pool_n, seed=sample_seed)
|
||
|
||
def _store(strategy, label, weight_json, trial_id, tickets):
|
||
graded = grade_tickets(tickets, winning6, bonus)
|
||
avg_meta = (sum(score_combination(t, cache)["score_total"] for t in tickets)
|
||
/ max(len(tickets), 1))
|
||
db.save_backtest_run(
|
||
draw_no=draw_no, strategy=strategy, weight_label=label,
|
||
weight_json=weight_json, trial_id=trial_id, n_tickets=len(tickets),
|
||
hist=graded, best_match=graded["best_match"], avg_meta_score=avg_meta,
|
||
)
|
||
|
||
# 1) engine_w — 그 주 trials(있으면) 아니면 uniform fallback (leak-free)
|
||
from datetime import date as _date
|
||
from . import weight_evolver as we
|
||
draw_date = _date.fromisoformat(row["drw_date"])
|
||
week_start = we.get_week_start(draw_date)
|
||
trials = db.get_weekly_trials(week_start)
|
||
if trials:
|
||
for t in trials:
|
||
bought = purchase_tickets(pool, cache, t["weight"], k)
|
||
_store("engine_w", f"w{t['day_of_week']}", t["weight"], t["id"], bought)
|
||
else:
|
||
base = [0.2] * 5
|
||
bought = purchase_tickets(pool, cache, base, k)
|
||
_store("engine_w", "base", base, None, bought)
|
||
|
||
# 2) random_null — N_NULL_TRIALS 개 (engine_w 수와 동일해 selection bias 상쇄)
|
||
for _i in range(N_NULL_TRIALS):
|
||
seed_i = None if sample_seed is None else sample_seed + 100 + _i
|
||
_store("random_null", f"r{_i}", None, None, random_null_tickets(k, seed=seed_i))
|
||
# 3) coverage
|
||
_store("coverage", "-", None, None, coverage_tickets(k, seed=sample_seed))
|
||
|
||
return {"ok": True, "draw_no": draw_no}
|
||
|
||
|
||
def track_record() -> Dict[str, Any]:
|
||
"""전략별 누적 등수 집계 (engine_w는 라벨 합산)."""
|
||
db = _db()
|
||
rows = db.get_backtest_runs()
|
||
agg: Dict[str, Dict[str, int]] = {}
|
||
draw_sets: Dict[str, set] = {}
|
||
for r in rows:
|
||
a = agg.setdefault(r["strategy"], {
|
||
"n_tickets": 0, "1st": 0, "2nd": 0, "3rd": 0, "4th": 0, "5th": 0, "draws": 0})
|
||
p = prize_counts(r)
|
||
a["n_tickets"] += r["n_tickets"]
|
||
for tier in ("1st", "2nd", "3rd", "4th", "5th"):
|
||
a[tier] += p[tier]
|
||
draw_sets.setdefault(r["strategy"], set()).add(r["draw_no"])
|
||
for strat, s in draw_sets.items():
|
||
agg[strat]["draws"] = len(s)
|
||
return {"by_strategy": agg}
|
||
|
||
|
||
def build_review_payload(draw_no: int) -> Dict[str, Any]:
|
||
"""일요 회고 브리핑용 조립."""
|
||
db = _db()
|
||
cal = db.get_winner_calibration(draw_no)
|
||
runs = db.get_backtest_runs(draw_no=draw_no)
|
||
hist = db.get_calibration_history(limit=12)
|
||
forward = []
|
||
for r in runs:
|
||
forward.append({"strategy": r["strategy"], "label": r["weight_label"],
|
||
"prizes": prize_counts(r), "best_match": r["best_match"],
|
||
"avg_meta_score": r["avg_meta_score"]})
|
||
return {
|
||
"draw_no": draw_no,
|
||
"winner_analysis": cal, # score_* + percentile
|
||
"forward": forward,
|
||
"track_record": track_record()["by_strategy"],
|
||
"calibration_trend": [
|
||
{"draw_no": h["draw_no"], "score_total": h["score_total"],
|
||
"percentile": h["percentile"]} for h in hist
|
||
],
|
||
}
|
||
|
||
|
||
def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
|
||
"""greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산.
|
||
(휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)"""
|
||
rng = random.Random(seed)
|
||
usage = {n: 0 for n in range(1, 46)}
|
||
seen, out = set(), []
|
||
guard = 0
|
||
while len(out) < k and guard < k * 50:
|
||
guard += 1
|
||
ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random()))
|
||
nums = tuple(sorted(ranked[:6]))
|
||
if nums in seen:
|
||
# 동점 흔들기: top-6과 disjoint한 영역에서 샘플
|
||
nums = tuple(sorted(rng.sample(ranked[6:12], 6)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
out.append(list(nums))
|
||
for n in nums:
|
||
usage[n] += 1
|
||
return out
|