129 lines
4.8 KiB
Python
129 lines
4.8 KiB
Python
"""로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비)."""
|
||
import logging
|
||
import random
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from .analyzer import build_analysis_cache, build_number_weights, score_combination
|
||
from .utils import weighted_sample_6
|
||
|
||
|
||
def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]:
|
||
"""티켓 묶음을 당첨번호로 채점 → 매칭 히스토그램 + 보너스 + best_match.
|
||
2등 판정: 5일치 AND 보너스 번호를 티켓이 포함."""
|
||
win = set(winning6)
|
||
hist = {"m3": 0, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0}
|
||
best = 0
|
||
for t in tickets:
|
||
c = len(set(t) & win)
|
||
if c > best:
|
||
best = c
|
||
if c == 6:
|
||
hist["m6"] += 1
|
||
elif c == 5:
|
||
hist["m5"] += 1
|
||
if bonus in t:
|
||
hist["bonus_hits"] += 1
|
||
elif c == 4:
|
||
hist["m4"] += 1
|
||
elif c == 3:
|
||
hist["m3"] += 1
|
||
return {**hist, "best_match": best}
|
||
|
||
|
||
def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]:
|
||
"""매칭 히스토그램 → 등수 카운트.
|
||
1등=m6, 2등=bonus_hits, 3등=m5−bonus_hits, 4등=m4, 5등=m3."""
|
||
return {
|
||
"1st": hist.get("m6", 0),
|
||
"2nd": hist.get("bonus_hits", 0),
|
||
"3rd": hist.get("m5", 0) - hist.get("bonus_hits", 0),
|
||
"4th": hist.get("m4", 0),
|
||
"5th": hist.get("m3", 0),
|
||
}
|
||
|
||
|
||
def generate_pool(cache, number_weights, n: int = 20000,
|
||
seed: Optional[int] = None) -> List[List[int]]:
|
||
"""가중 샘플링으로 distinct 후보 풀 생성."""
|
||
rng = random.Random(seed)
|
||
seen, pool = set(), []
|
||
attempts, cap = 0, n * 4
|
||
while len(pool) < n and attempts < cap:
|
||
attempts += 1
|
||
nums = tuple(sorted(weighted_sample_6(number_weights)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
pool.append(list(nums))
|
||
if len(pool) < n:
|
||
logging.getLogger(__name__).warning(
|
||
"generate_pool: requested %d, got %d", n, len(pool)
|
||
)
|
||
return pool
|
||
|
||
|
||
def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]:
|
||
"""풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct."""
|
||
if k > len(pool):
|
||
raise ValueError(f"k={k} exceeds pool size {len(pool)}")
|
||
ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"])
|
||
return ranked[:k]
|
||
|
||
|
||
def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
|
||
"""무작위 distinct 티켓 k장 (null-model 대조군)."""
|
||
rng = random.Random(seed)
|
||
seen, out = set(), []
|
||
guard = 0
|
||
while len(out) < k and guard < k * 200:
|
||
guard += 1
|
||
nums = tuple(sorted(rng.sample(range(1, 46), 6)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
out.append(list(nums))
|
||
return out
|
||
|
||
|
||
def point_in_time_draws(draws: List[Tuple[int, List[int]]],
|
||
target_draw_no: int) -> List[Tuple[int, List[int]]]:
|
||
"""target 회차 추첨 '직전' 시점의 데이터 — target_draw_no 미만만."""
|
||
return [(d, nums) for d, nums in draws if d < target_draw_no]
|
||
|
||
|
||
def calibrate_winner_compute(draws, target_draw_no, winning6,
|
||
sample_m: int = 2000, seed: Optional[int] = None) -> Dict[str, Any]:
|
||
"""순수 연산: point-in-time 캐시로 당첨조합 채점 + 무작위 M표본 percentile."""
|
||
pit = point_in_time_draws(draws, target_draw_no)
|
||
cache = build_analysis_cache(pit)
|
||
scores = score_combination(sorted(winning6), cache)
|
||
win_total = scores["score_total"]
|
||
samples = random_null_tickets(sample_m, seed=seed)
|
||
le = sum(1 for t in samples
|
||
if score_combination(t, cache)["score_total"] <= win_total)
|
||
percentile = le / max(len(samples), 1)
|
||
return {"scores": scores, "percentile": percentile, "cache_draws": len(pit)}
|
||
|
||
|
||
def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
|
||
"""greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산.
|
||
(휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)"""
|
||
rng = random.Random(seed)
|
||
usage = {n: 0 for n in range(1, 46)}
|
||
seen, out = set(), []
|
||
guard = 0
|
||
while len(out) < k and guard < k * 50:
|
||
guard += 1
|
||
ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random()))
|
||
nums = tuple(sorted(ranked[:6]))
|
||
if nums in seen:
|
||
# 동점 흔들기: top-6과 disjoint한 영역에서 샘플
|
||
nums = tuple(sorted(rng.sample(ranked[6:12], 6)))
|
||
if nums in seen:
|
||
continue
|
||
seen.add(nums)
|
||
out.append(list(nums))
|
||
for n in nums:
|
||
usage[n] += 1
|
||
return out
|