diff --git a/lotto/app/backtest.py b/lotto/app/backtest.py index 2f0305e..132ebe5 100644 --- a/lotto/app/backtest.py +++ b/lotto/app/backtest.py @@ -1,10 +1,10 @@ """로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비).""" +import logging import random from typing import Any, Dict, List, Optional, Tuple from .analyzer import build_analysis_cache, build_number_weights, score_combination from .utils import weighted_sample_6 -from .weight_evolver import count_match def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]: @@ -45,8 +45,7 @@ def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]: def generate_pool(cache, number_weights, n: int = 20000, seed: Optional[int] = None) -> List[List[int]]: """가중 샘플링으로 distinct 후보 풀 생성.""" - if seed is not None: - random.seed(seed) + rng = random.Random(seed) seen, pool = set(), [] attempts, cap = 0, n * 4 while len(pool) < n and attempts < cap: @@ -56,22 +55,29 @@ def generate_pool(cache, number_weights, n: int = 20000, continue seen.add(nums) pool.append(list(nums)) + if len(pool) < n: + logging.getLogger(__name__).warning( + "generate_pool: requested %d, got %d", n, len(pool) + ) return pool def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]: """풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct.""" + if k > len(pool): + raise ValueError(f"k={k} exceeds pool size {len(pool)}") ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"]) return ranked[:k] def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]: """무작위 distinct 티켓 k장 (null-model 대조군).""" - if seed is not None: - random.seed(seed) + rng = random.Random(seed) seen, out = set(), [] - while len(out) < k: - nums = tuple(sorted(random.sample(range(1, 46), 6))) + guard = 0 + while len(out) < k and guard < k * 200: + guard += 1 + nums = tuple(sorted(rng.sample(range(1, 46), 6))) if nums in seen: continue seen.add(nums) @@ -82,18 +88,17 @@ def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]: def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]: """greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산. (휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)""" - if seed is not None: - random.seed(seed) + rng = random.Random(seed) usage = {n: 0 for n in range(1, 46)} seen, out = set(), [] guard = 0 while len(out) < k and guard < k * 50: guard += 1 - ranked = sorted(range(1, 46), key=lambda n: (usage[n], random.random())) + ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random())) nums = tuple(sorted(ranked[:6])) if nums in seen: - # 동점 흔들기: 약간 더 깊은 풀에서 샘플 - nums = tuple(sorted(random.sample(ranked[:12], 6))) + # 동점 흔들기: top-6과 disjoint한 영역에서 샘플 + nums = tuple(sorted(rng.sample(ranked[6:12], 6))) if nums in seen: continue seen.add(nums) diff --git a/lotto/app/db.py b/lotto/app/db.py index bc58cf2..a85a53e 100644 --- a/lotto/app/db.py +++ b/lotto/app/db.py @@ -1501,10 +1501,10 @@ def save_backtest_run(draw_no, strategy, weight_label, weight_json, trial_id, weight_json=excluded.weight_json, trial_id=excluded.trial_id, n_tickets=excluded.n_tickets, m3=excluded.m3, m4=excluded.m4, m5=excluded.m5, m6=excluded.m6, bonus_hits=excluded.bonus_hits, - best_match=excluded.best_match, avg_meta_score=excluded.avg_meta_score, - created_at=datetime('now') + best_match=excluded.best_match, avg_meta_score=excluded.avg_meta_score """, (draw_no, strategy, weight_label, + # weight_json must be a dict/list (not a pre-serialized string) to avoid double-encoding json.dumps(weight_json) if weight_json is not None else None, trial_id, n_tickets, hist.get("m3",0), hist.get("m4",0), hist.get("m5",0), hist.get("m6",0), @@ -1536,8 +1536,7 @@ def save_winner_calibration(draw_no, winning, scores, percentile, score_frequency=excluded.score_frequency, score_fingerprint=excluded.score_fingerprint, score_gap=excluded.score_gap, score_cooccur=excluded.score_cooccur, score_diversity=excluded.score_diversity, percentile=excluded.percentile, - my_pick_avg=excluded.my_pick_avg, cache_draws=excluded.cache_draws, - created_at=datetime('now') + my_pick_avg=excluded.my_pick_avg, cache_draws=excluded.cache_draws """, (draw_no, json.dumps(winning), scores["score_total"], scores["score_frequency"], scores["score_fingerprint"], scores["score_gap"], scores["score_cooccur"], @@ -1557,7 +1556,7 @@ def get_calibration_history(limit: int = 52) -> List[Dict[str, Any]]: (limit,)).fetchall() return [dict(r) for r in rows] -def get_calibrated_draw_nos() -> set: +def get_calibrated_draw_nos() -> set[int]: with _conn() as conn: return {r["draw_no"] for r in conn.execute("SELECT draw_no FROM winner_calibration").fetchall()} diff --git a/lotto/tests/test_backtest.py b/lotto/tests/test_backtest.py index 5fb19f6..9eee498 100644 --- a/lotto/tests/test_backtest.py +++ b/lotto/tests/test_backtest.py @@ -1,5 +1,5 @@ from app import backtest as bt -from app.analyzer import build_analysis_cache, score_combination +from app.analyzer import build_analysis_cache, build_number_weights, score_combination def _toy_draws(n=120): @@ -38,16 +38,17 @@ def test_grade_tickets_histogram_and_prizes(): def test_purchase_tickets_distinct_and_count(): draws = _toy_draws() - cache = bt.build_analysis_cache(draws) - nw = bt.build_number_weights(cache) + cache = build_analysis_cache(draws) + nw = build_number_weights(cache) pool = bt.generate_pool(cache, nw, n=2000, seed=7) W = [0.25, 0.30, 0.20, 0.15, 0.10] bought = bt.purchase_tickets(pool, cache, W, k=50) assert len(bought) == 50 assert len({tuple(t) for t in bought}) == 50 # distinct - # W로 랭킹된 상위 → 평균 분석치가 풀 평균보다 높아야 + # W로 랭킹된 상위 k → 평균 점수가 풀 전체 평균 이상이어야 avg_bought = sum(score_combination(t, cache, W)["score_total"] for t in bought) / 50 - assert avg_bought > 0 + avg_pool = sum(score_combination(t, cache, W)["score_total"] for t in pool) / len(pool) + assert avg_bought >= avg_pool def test_random_null_and_coverage_distinct(): @@ -57,3 +58,24 @@ def test_random_null_and_coverage_distinct(): flat = {n for t in cov for n in t} assert len(cov) == 9 and len({tuple(t) for t in cov}) == 9 assert len(flat) >= 40 # 커버리지 전략은 번호를 넓게 퍼뜨림 + + +def test_generate_pool_partial_fill(monkeypatch): + """weighted_sample_6이 항상 같은 조합만 반환하도록 패치 → cap에 먼저 걸려 len < n — 예외 없이 반환.""" + import random as _r + _r.seed(42) + tiny_draws = [(i, sorted(_r.sample(range(1, 46), 6))) for i in range(1, 10)] + cache = build_analysis_cache(tiny_draws) + nw = build_number_weights(cache) + + # weighted_sample_6을 항상 동일한 하나의 조합만 반환하도록 패치 + # → 두 번째 시도부터 seen에 막혀 n개를 채울 수 없고 cap=n*4 이후 종료 + import app.backtest as _bt_mod + monkeypatch.setattr(_bt_mod, "weighted_sample_6", lambda _w: [1, 2, 3, 4, 5, 6]) + + n = 50 + pool = bt.generate_pool(cache, nw, n=n, seed=0) + # 예외 없이 반환해야 하고, 결과는 n 미만이어야 하며 모두 distinct + assert isinstance(pool, list) + assert len(pool) < n + assert len({tuple(t) for t in pool}) == len(pool) diff --git a/lotto/tests/test_backtest_db.py b/lotto/tests/test_backtest_db.py index 5f47975..00123b7 100644 --- a/lotto/tests/test_backtest_db.py +++ b/lotto/tests/test_backtest_db.py @@ -29,3 +29,50 @@ def test_backtest_runs_unique(monkeypatch): rows = db.get_backtest_runs(draw_no=100) assert len(rows) == 1 assert rows[0]["m3"] == 2 # 마지막 값으로 갱신 + + +_SCORES = { + "score_total": 1.23, + "score_frequency": 0.30, + "score_fingerprint": 0.25, + "score_gap": 0.20, + "score_cooccur": 0.28, + "score_diversity": 0.20, +} + + +def test_winner_calibration_upsert(monkeypatch): + """save_winner_calibration 두 번 호출 시 upsert — 행 1개, 값은 마지막 것.""" + db = _fresh_db(monkeypatch) + winning = [3, 7, 15, 22, 33, 41] + db.save_winner_calibration(draw_no=200, winning=winning, + scores=_SCORES, percentile=75.0, + my_pick_avg=0.9, cache_draws=100) + # 두 번째 저장 — percentile, my_pick_avg 업데이트 + scores2 = {**_SCORES, "score_total": 2.00} + db.save_winner_calibration(draw_no=200, winning=winning, + scores=scores2, percentile=80.0, + my_pick_avg=1.1, cache_draws=110) + row = db.get_winner_calibration(200) + assert row is not None + # 행이 1개만 존재하는지 확인 + with db._conn() as conn: + cnt = conn.execute( + "SELECT COUNT(*) AS c FROM winner_calibration WHERE draw_no=200" + ).fetchone()["c"] + assert cnt == 1 + assert row["percentile"] == 80.0 + assert row["score_total"] == 2.00 + + +def test_get_calibrated_draw_nos(monkeypatch): + """저장된 draw_no 집합이 get_calibrated_draw_nos에 포함되어야 한다.""" + db = _fresh_db(monkeypatch) + winning = [1, 2, 3, 4, 5, 6] + for draw_no in (301, 302, 303): + db.save_winner_calibration(draw_no=draw_no, winning=winning, + scores=_SCORES, percentile=50.0, + my_pick_avg=0.5, cache_draws=50) + nos = db.get_calibrated_draw_nos() + assert isinstance(nos, set) + assert {301, 302, 303}.issubset(nos)