refactor(lotto): Phase 1 코드리뷰 반영 (로컬 RNG·write-once·가드·테스트 보강)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-31 17:02:16 +09:00
parent 8dbb1abaeb
commit 77efa9b653
4 changed files with 95 additions and 22 deletions

View File

@@ -1,10 +1,10 @@
"""로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비)."""
import logging
import random
from typing import Any, Dict, List, Optional, Tuple
from .analyzer import build_analysis_cache, build_number_weights, score_combination
from .utils import weighted_sample_6
from .weight_evolver import count_match
def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]:
@@ -45,8 +45,7 @@ def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]:
def generate_pool(cache, number_weights, n: int = 20000,
seed: Optional[int] = None) -> List[List[int]]:
"""가중 샘플링으로 distinct 후보 풀 생성."""
if seed is not None:
random.seed(seed)
rng = random.Random(seed)
seen, pool = set(), []
attempts, cap = 0, n * 4
while len(pool) < n and attempts < cap:
@@ -56,22 +55,29 @@ def generate_pool(cache, number_weights, n: int = 20000,
continue
seen.add(nums)
pool.append(list(nums))
if len(pool) < n:
logging.getLogger(__name__).warning(
"generate_pool: requested %d, got %d", n, len(pool)
)
return pool
def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]:
"""풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct."""
if k > len(pool):
raise ValueError(f"k={k} exceeds pool size {len(pool)}")
ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"])
return ranked[:k]
def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
"""무작위 distinct 티켓 k장 (null-model 대조군)."""
if seed is not None:
random.seed(seed)
rng = random.Random(seed)
seen, out = set(), []
while len(out) < k:
nums = tuple(sorted(random.sample(range(1, 46), 6)))
guard = 0
while len(out) < k and guard < k * 200:
guard += 1
nums = tuple(sorted(rng.sample(range(1, 46), 6)))
if nums in seen:
continue
seen.add(nums)
@@ -82,18 +88,17 @@ def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
"""greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산.
(휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)"""
if seed is not None:
random.seed(seed)
rng = random.Random(seed)
usage = {n: 0 for n in range(1, 46)}
seen, out = set(), []
guard = 0
while len(out) < k and guard < k * 50:
guard += 1
ranked = sorted(range(1, 46), key=lambda n: (usage[n], random.random()))
ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random()))
nums = tuple(sorted(ranked[:6]))
if nums in seen:
# 동점 흔들기: 약간 더 깊은 풀에서 샘플
nums = tuple(sorted(random.sample(ranked[:12], 6)))
# 동점 흔들기: top-6과 disjoint한 영역에서 샘플
nums = tuple(sorted(rng.sample(ranked[6:12], 6)))
if nums in seen:
continue
seen.add(nums)

View File

@@ -1501,10 +1501,10 @@ def save_backtest_run(draw_no, strategy, weight_label, weight_json, trial_id,
weight_json=excluded.weight_json, trial_id=excluded.trial_id,
n_tickets=excluded.n_tickets, m3=excluded.m3, m4=excluded.m4,
m5=excluded.m5, m6=excluded.m6, bonus_hits=excluded.bonus_hits,
best_match=excluded.best_match, avg_meta_score=excluded.avg_meta_score,
created_at=datetime('now')
best_match=excluded.best_match, avg_meta_score=excluded.avg_meta_score
""",
(draw_no, strategy, weight_label,
# weight_json must be a dict/list (not a pre-serialized string) to avoid double-encoding
json.dumps(weight_json) if weight_json is not None else None,
trial_id, n_tickets,
hist.get("m3",0), hist.get("m4",0), hist.get("m5",0), hist.get("m6",0),
@@ -1536,8 +1536,7 @@ def save_winner_calibration(draw_no, winning, scores, percentile,
score_frequency=excluded.score_frequency, score_fingerprint=excluded.score_fingerprint,
score_gap=excluded.score_gap, score_cooccur=excluded.score_cooccur,
score_diversity=excluded.score_diversity, percentile=excluded.percentile,
my_pick_avg=excluded.my_pick_avg, cache_draws=excluded.cache_draws,
created_at=datetime('now')
my_pick_avg=excluded.my_pick_avg, cache_draws=excluded.cache_draws
""",
(draw_no, json.dumps(winning), scores["score_total"], scores["score_frequency"],
scores["score_fingerprint"], scores["score_gap"], scores["score_cooccur"],
@@ -1557,7 +1556,7 @@ def get_calibration_history(limit: int = 52) -> List[Dict[str, Any]]:
(limit,)).fetchall()
return [dict(r) for r in rows]
def get_calibrated_draw_nos() -> set:
def get_calibrated_draw_nos() -> set[int]:
with _conn() as conn:
return {r["draw_no"] for r in
conn.execute("SELECT draw_no FROM winner_calibration").fetchall()}

View File

@@ -1,5 +1,5 @@
from app import backtest as bt
from app.analyzer import build_analysis_cache, score_combination
from app.analyzer import build_analysis_cache, build_number_weights, score_combination
def _toy_draws(n=120):
@@ -38,16 +38,17 @@ def test_grade_tickets_histogram_and_prizes():
def test_purchase_tickets_distinct_and_count():
draws = _toy_draws()
cache = bt.build_analysis_cache(draws)
nw = bt.build_number_weights(cache)
cache = build_analysis_cache(draws)
nw = build_number_weights(cache)
pool = bt.generate_pool(cache, nw, n=2000, seed=7)
W = [0.25, 0.30, 0.20, 0.15, 0.10]
bought = bt.purchase_tickets(pool, cache, W, k=50)
assert len(bought) == 50
assert len({tuple(t) for t in bought}) == 50 # distinct
# W로 랭킹된 상위 → 평균 분석치가 풀 평균보다 높아
# W로 랭킹된 상위 k → 평균 점수가 풀 전체 평균 이상이어
avg_bought = sum(score_combination(t, cache, W)["score_total"] for t in bought) / 50
assert avg_bought > 0
avg_pool = sum(score_combination(t, cache, W)["score_total"] for t in pool) / len(pool)
assert avg_bought >= avg_pool
def test_random_null_and_coverage_distinct():
@@ -57,3 +58,24 @@ def test_random_null_and_coverage_distinct():
flat = {n for t in cov for n in t}
assert len(cov) == 9 and len({tuple(t) for t in cov}) == 9
assert len(flat) >= 40 # 커버리지 전략은 번호를 넓게 퍼뜨림
def test_generate_pool_partial_fill(monkeypatch):
"""weighted_sample_6이 항상 같은 조합만 반환하도록 패치 → cap에 먼저 걸려 len < n — 예외 없이 반환."""
import random as _r
_r.seed(42)
tiny_draws = [(i, sorted(_r.sample(range(1, 46), 6))) for i in range(1, 10)]
cache = build_analysis_cache(tiny_draws)
nw = build_number_weights(cache)
# weighted_sample_6을 항상 동일한 하나의 조합만 반환하도록 패치
# → 두 번째 시도부터 seen에 막혀 n개를 채울 수 없고 cap=n*4 이후 종료
import app.backtest as _bt_mod
monkeypatch.setattr(_bt_mod, "weighted_sample_6", lambda _w: [1, 2, 3, 4, 5, 6])
n = 50
pool = bt.generate_pool(cache, nw, n=n, seed=0)
# 예외 없이 반환해야 하고, 결과는 n 미만이어야 하며 모두 distinct
assert isinstance(pool, list)
assert len(pool) < n
assert len({tuple(t) for t in pool}) == len(pool)

View File

@@ -29,3 +29,50 @@ def test_backtest_runs_unique(monkeypatch):
rows = db.get_backtest_runs(draw_no=100)
assert len(rows) == 1
assert rows[0]["m3"] == 2 # 마지막 값으로 갱신
_SCORES = {
"score_total": 1.23,
"score_frequency": 0.30,
"score_fingerprint": 0.25,
"score_gap": 0.20,
"score_cooccur": 0.28,
"score_diversity": 0.20,
}
def test_winner_calibration_upsert(monkeypatch):
"""save_winner_calibration 두 번 호출 시 upsert — 행 1개, 값은 마지막 것."""
db = _fresh_db(monkeypatch)
winning = [3, 7, 15, 22, 33, 41]
db.save_winner_calibration(draw_no=200, winning=winning,
scores=_SCORES, percentile=75.0,
my_pick_avg=0.9, cache_draws=100)
# 두 번째 저장 — percentile, my_pick_avg 업데이트
scores2 = {**_SCORES, "score_total": 2.00}
db.save_winner_calibration(draw_no=200, winning=winning,
scores=scores2, percentile=80.0,
my_pick_avg=1.1, cache_draws=110)
row = db.get_winner_calibration(200)
assert row is not None
# 행이 1개만 존재하는지 확인
with db._conn() as conn:
cnt = conn.execute(
"SELECT COUNT(*) AS c FROM winner_calibration WHERE draw_no=200"
).fetchone()["c"]
assert cnt == 1
assert row["percentile"] == 80.0
assert row["score_total"] == 2.00
def test_get_calibrated_draw_nos(monkeypatch):
"""저장된 draw_no 집합이 get_calibrated_draw_nos에 포함되어야 한다."""
db = _fresh_db(monkeypatch)
winning = [1, 2, 3, 4, 5, 6]
for draw_no in (301, 302, 303):
db.save_winner_calibration(draw_no=draw_no, winning=winning,
scores=_SCORES, percentile=50.0,
my_pick_avg=0.5, cache_draws=50)
nos = db.get_calibrated_draw_nos()
assert isinstance(nos, set)
assert {301, 302, 303}.issubset(nos)