"""로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비).""" import logging import random from typing import Any, Dict, List, Optional, Tuple from .analyzer import build_analysis_cache, build_number_weights, score_combination from .utils import weighted_sample_6 def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]: """티켓 묶음을 당첨번호로 채점 → 매칭 히스토그램 + 보너스 + best_match. 2등 판정: 5일치 AND 보너스 번호를 티켓이 포함.""" win = set(winning6) hist = {"m3": 0, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0} best = 0 for t in tickets: c = len(set(t) & win) if c > best: best = c if c == 6: hist["m6"] += 1 elif c == 5: hist["m5"] += 1 if bonus in t: hist["bonus_hits"] += 1 elif c == 4: hist["m4"] += 1 elif c == 3: hist["m3"] += 1 return {**hist, "best_match": best} def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]: """매칭 히스토그램 → 등수 카운트. 1등=m6, 2등=bonus_hits, 3등=m5−bonus_hits, 4등=m4, 5등=m3.""" return { "1st": hist.get("m6", 0), "2nd": hist.get("bonus_hits", 0), "3rd": hist.get("m5", 0) - hist.get("bonus_hits", 0), "4th": hist.get("m4", 0), "5th": hist.get("m3", 0), } def generate_pool(cache, number_weights, n: int = 20000, seed: Optional[int] = None) -> List[List[int]]: """가중 샘플링으로 distinct 후보 풀 생성.""" rng = random.Random(seed) seen, pool = set(), [] attempts, cap = 0, n * 4 while len(pool) < n and attempts < cap: attempts += 1 nums = tuple(sorted(weighted_sample_6(number_weights))) if nums in seen: continue seen.add(nums) pool.append(list(nums)) if len(pool) < n: logging.getLogger(__name__).warning( "generate_pool: requested %d, got %d", n, len(pool) ) return pool def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]: """풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct.""" if k > len(pool): raise ValueError(f"k={k} exceeds pool size {len(pool)}") ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"]) return ranked[:k] def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]: """무작위 distinct 티켓 k장 (null-model 대조군).""" rng = random.Random(seed) seen, out = set(), [] guard = 0 while len(out) < k and guard < k * 200: guard += 1 nums = tuple(sorted(rng.sample(range(1, 46), 6))) if nums in seen: continue seen.add(nums) out.append(list(nums)) return out def point_in_time_draws(draws: List[Tuple[int, List[int]]], target_draw_no: int) -> List[Tuple[int, List[int]]]: """target 회차 추첨 '직전' 시점의 데이터 — target_draw_no 미만만.""" return [(d, nums) for d, nums in draws if d < target_draw_no] def calibrate_winner_compute(draws, target_draw_no, winning6, sample_m: int = 2000, seed: Optional[int] = None) -> Dict[str, Any]: """순수 연산: point-in-time 캐시로 당첨조합 채점 + 무작위 M표본 percentile.""" pit = point_in_time_draws(draws, target_draw_no) cache = build_analysis_cache(pit) scores = score_combination(sorted(winning6), cache) win_total = scores["score_total"] samples = random_null_tickets(sample_m, seed=seed) le = sum(1 for t in samples if score_combination(t, cache)["score_total"] <= win_total) percentile = le / max(len(samples), 1) return {"scores": scores, "percentile": percentile, "cache_draws": len(pit)} MIN_HISTORY = 30 # point-in-time 캐시 최소 회차 (이 미만은 캘리브레이션 skip) def _db(): from . import db as _db_mod return _db_mod def calibrate_winner(draw_no: int, sample_m: int = 2000) -> Dict[str, Any]: """DB 진입점: 회차 1개 캘리브레이션 후 저장 (멱등).""" db = _db() draws = db.get_all_draw_numbers() row = db.get_draw(draw_no) if row is None: return {"ok": False, "reason": "no_draw"} pit = point_in_time_draws(draws, draw_no) if len(pit) < MIN_HISTORY: return {"ok": False, "reason": "insufficient_history"} winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]] res = calibrate_winner_compute(draws, draw_no, winning6, sample_m=sample_m) db.save_winner_calibration( draw_no=draw_no, winning=winning6, scores=res["scores"], percentile=res["percentile"], my_pick_avg=None, cache_draws=res["cache_draws"], ) return {"ok": True, "draw_no": draw_no, **res} def backfill_calibration(batch: int = 50, sample_m: int = 2000) -> Dict[str, Any]: """미처리 회차만 batch개 캘리브레이션 (멱등·재개 가능).""" db = _db() draws = db.get_all_draw_numbers() done = db.get_calibrated_draw_nos() todo = [d for d, _ in draws if d not in done and d > MIN_HISTORY] todo.sort() n = 0 for draw_no in todo[:batch]: r = calibrate_winner(draw_no, sample_m=sample_m) if r.get("ok"): n += 1 return {"calibrated": n, "remaining": max(0, len(todo) - batch)} def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]: """greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산. (휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)""" rng = random.Random(seed) usage = {n: 0 for n in range(1, 46)} seen, out = set(), [] guard = 0 while len(out) < k and guard < k * 50: guard += 1 ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random())) nums = tuple(sorted(ranked[:6])) if nums in seen: # 동점 흔들기: top-6과 disjoint한 영역에서 샘플 nums = tuple(sorted(rng.sample(ranked[6:12], 6))) if nums in seen: continue seen.add(nums) out.append(list(nums)) for n in nums: usage[n] += 1 return out