222 lines
8.5 KiB
Python
222 lines
8.5 KiB
Python
"""로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비)."""
|
||
import logging
|
||
import random
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from .analyzer import build_analysis_cache, build_number_weights, score_combination
|
||
from .utils import weighted_sample_6
|
||
|
||
|
||
def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]:
|
||
"""티켓 묶음을 당첨번호로 채점 → 매칭 히스토그램 + 보너스 + best_match.
|
||
2등 판정: 5일치 AND 보너스 번호를 티켓이 포함."""
|
||
win = set(winning6)
|
||
hist = {"m3": 0, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0}
|
||
best = 0
|
||
for t in tickets:
|
||
c = len(set(t) & win)
|
||
if c > best:
|
||
best = c
|
||
if c == 6:
|
||
hist["m6"] += 1
|
||
elif c == 5:
|
||
hist["m5"] += 1
|
||
if bonus in t:
|
||
hist["bonus_hits"] += 1
|
||
elif c == 4:
|
||
hist["m4"] += 1
|
||
elif c == 3:
|
||
hist["m3"] += 1
|
||
return {**hist, "best_match": best}
|
||
|
||
|
||
def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]:
|
||
"""매칭 히스토그램 → 등수 카운트.
|
||
1등=m6, 2등=bonus_hits, 3등=m5−bonus_hits, 4등=m4, 5등=m3."""
|
||
return {
|
||
"1st": hist.get("m6", 0),
|
||
"2nd": hist.get("bonus_hits", 0),
|
||
"3rd": hist.get("m5", 0) - hist.get("bonus_hits", 0),
|
||
"4th": hist.get("m4", 0),
|
||
"5th": hist.get("m3", 0),
|
||
}
|
||
|
||
|
||
def generate_pool(cache, number_weights, n: int = 20000,
|
||
seed: Optional[int] = None) -> List[List[int]]:
|
||
"""가중 샘플링으로 distinct 후보 풀 생성."""
|
||
rng = random.Random(seed)
|
||
seen, pool = set(), []
|
||
attempts, cap = 0, n * 4
|
||
while len(pool) < n and attempts < cap:
|
||
attempts += 1
|
||
nums = tuple(sorted(weighted_sample_6(number_weights)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
pool.append(list(nums))
|
||
if len(pool) < n:
|
||
logging.getLogger(__name__).warning(
|
||
"generate_pool: requested %d, got %d", n, len(pool)
|
||
)
|
||
return pool
|
||
|
||
|
||
def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]:
|
||
"""풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct."""
|
||
if k > len(pool):
|
||
raise ValueError(f"k={k} exceeds pool size {len(pool)}")
|
||
ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"])
|
||
return ranked[:k]
|
||
|
||
|
||
def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
|
||
"""무작위 distinct 티켓 k장 (null-model 대조군)."""
|
||
rng = random.Random(seed)
|
||
seen, out = set(), []
|
||
guard = 0
|
||
while len(out) < k and guard < k * 200:
|
||
guard += 1
|
||
nums = tuple(sorted(rng.sample(range(1, 46), 6)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
out.append(list(nums))
|
||
return out
|
||
|
||
|
||
def point_in_time_draws(draws: List[Tuple[int, List[int]]],
|
||
target_draw_no: int) -> List[Tuple[int, List[int]]]:
|
||
"""target 회차 추첨 '직전' 시점의 데이터 — target_draw_no 미만만."""
|
||
return [(d, nums) for d, nums in draws if d < target_draw_no]
|
||
|
||
|
||
def calibrate_winner_compute(draws, target_draw_no, winning6,
|
||
sample_m: int = 2000, seed: Optional[int] = None) -> Dict[str, Any]:
|
||
"""순수 연산: point-in-time 캐시로 당첨조합 채점 + 무작위 M표본 percentile."""
|
||
pit = point_in_time_draws(draws, target_draw_no)
|
||
cache = build_analysis_cache(pit)
|
||
scores = score_combination(sorted(winning6), cache)
|
||
win_total = scores["score_total"]
|
||
samples = random_null_tickets(sample_m, seed=seed)
|
||
le = sum(1 for t in samples
|
||
if score_combination(t, cache)["score_total"] <= win_total)
|
||
percentile = le / max(len(samples), 1)
|
||
return {"scores": scores, "percentile": percentile, "cache_draws": len(pit)}
|
||
|
||
|
||
MIN_HISTORY = 30 # point-in-time 캐시 최소 회차 (이 미만은 캘리브레이션 skip)
|
||
|
||
|
||
def _db():
|
||
from . import db as _db_mod
|
||
return _db_mod
|
||
|
||
|
||
def calibrate_winner(draw_no: int, sample_m: int = 2000) -> Dict[str, Any]:
|
||
"""DB 진입점: 회차 1개 캘리브레이션 후 저장 (멱등)."""
|
||
db = _db()
|
||
draws = db.get_all_draw_numbers()
|
||
row = db.get_draw(draw_no)
|
||
if row is None:
|
||
return {"ok": False, "reason": "no_draw"}
|
||
pit = point_in_time_draws(draws, draw_no)
|
||
if len(pit) < MIN_HISTORY:
|
||
return {"ok": False, "reason": "insufficient_history"}
|
||
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
|
||
res = calibrate_winner_compute(draws, draw_no, winning6, sample_m=sample_m)
|
||
db.save_winner_calibration(
|
||
draw_no=draw_no, winning=winning6, scores=res["scores"],
|
||
percentile=res["percentile"], my_pick_avg=None,
|
||
cache_draws=res["cache_draws"],
|
||
)
|
||
return {"ok": True, "draw_no": draw_no, **res}
|
||
|
||
|
||
def backfill_calibration(batch: int = 50, sample_m: int = 2000) -> Dict[str, Any]:
|
||
"""미처리 회차만 batch개 캘리브레이션 (멱등·재개 가능)."""
|
||
db = _db()
|
||
draws = db.get_all_draw_numbers()
|
||
done = db.get_calibrated_draw_nos()
|
||
todo = [d for d, _ in draws if d not in done and d > MIN_HISTORY]
|
||
todo.sort()
|
||
n = 0
|
||
for draw_no in todo[:batch]:
|
||
r = calibrate_winner(draw_no, sample_m=sample_m)
|
||
if r.get("ok"):
|
||
n += 1
|
||
return {"calibrated": n, "remaining": max(0, len(todo) - batch)}
|
||
|
||
|
||
def run_forward_purchase(draw_no: int, k: int = 5000, pool_n: int = 20000,
|
||
sample_seed: Optional[int] = None) -> Dict[str, Any]:
|
||
"""회차 추첨 '직전' 시점 데이터로 3전략 구매 → 당첨번호로 채점 → 저장(멱등).
|
||
engine_w: 그 주 weight_trials 6개(없으면 current_base 1개)로 각각 구매."""
|
||
db = _db()
|
||
draws = db.get_all_draw_numbers()
|
||
row = db.get_draw(draw_no)
|
||
if row is None:
|
||
return {"ok": False, "reason": "no_draw"}
|
||
pit = point_in_time_draws(draws, draw_no)
|
||
if len(pit) < MIN_HISTORY:
|
||
return {"ok": False, "reason": "insufficient_history"}
|
||
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
|
||
bonus = row["bonus"]
|
||
|
||
cache = build_analysis_cache(pit)
|
||
nw = build_number_weights(cache)
|
||
pool = generate_pool(cache, nw, n=pool_n, seed=sample_seed)
|
||
|
||
def _store(strategy, label, weight_json, trial_id, tickets):
|
||
graded = grade_tickets(tickets, winning6, bonus)
|
||
avg_meta = (sum(score_combination(t, cache)["score_total"] for t in tickets)
|
||
/ max(len(tickets), 1))
|
||
db.save_backtest_run(
|
||
draw_no=draw_no, strategy=strategy, weight_label=label,
|
||
weight_json=weight_json, trial_id=trial_id, n_tickets=len(tickets),
|
||
hist=graded, best_match=graded["best_match"], avg_meta_score=avg_meta,
|
||
)
|
||
|
||
# 1) engine_w — 그 주 trials(있으면) 아니면 current_base
|
||
from . import weight_evolver as we
|
||
week_start = we.get_week_start()
|
||
trials = db.get_weekly_trials(week_start) if hasattr(db, "get_weekly_trials") else []
|
||
if trials:
|
||
for t in trials:
|
||
bought = purchase_tickets(pool, cache, t["weight"], k)
|
||
_store("engine_w", f"w{t['day_of_week']}", t["weight"], t["id"], bought)
|
||
else:
|
||
base = db.get_current_base() or [0.2] * 5
|
||
bought = purchase_tickets(pool, cache, base, k)
|
||
_store("engine_w", "base", base, None, bought)
|
||
|
||
# 2) random_null
|
||
_store("random_null", "-", None, None, random_null_tickets(k, seed=sample_seed))
|
||
# 3) coverage
|
||
_store("coverage", "-", None, None, coverage_tickets(k, seed=sample_seed))
|
||
|
||
return {"ok": True, "draw_no": draw_no}
|
||
|
||
|
||
def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
|
||
"""greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산.
|
||
(휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)"""
|
||
rng = random.Random(seed)
|
||
usage = {n: 0 for n in range(1, 46)}
|
||
seen, out = set(), []
|
||
guard = 0
|
||
while len(out) < k and guard < k * 50:
|
||
guard += 1
|
||
ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random()))
|
||
nums = tuple(sorted(ranked[:6]))
|
||
if nums in seen:
|
||
# 동점 흔들기: top-6과 disjoint한 영역에서 샘플
|
||
nums = tuple(sorted(rng.sample(ranked[6:12], 6)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
out.append(list(nums))
|
||
for n in nums:
|
||
usage[n] += 1
|
||
return out
|