merge: 로또 자가학습 백테스트 & 캘리브레이션 (Phase 1-5)

forward 가상구매(6 engine_w + 6 random_null + coverage) + winner 캘리브레이션
+ evolver lift 학습신호(best-vs-best, ε게이팅) + 일요 회고 텔레그램.
null-model 베이스라인으로 무작위 대비 우위를 정직하게 측정.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-31 21:04:40 +09:00
16 changed files with 2719 additions and 0 deletions

View File

@@ -22,6 +22,8 @@ class LottoAgent(BaseAgent):
return await self.run_signal_check(source=source)
if action == "daily_digest":
return await self.run_daily_digest()
if action == "sunday_review":
return await self.run_sunday_review()
return {"ok": False, "message": f"unknown action: {action}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
@@ -155,6 +157,29 @@ class LottoAgent(BaseAgent):
add_log("lotto", f"daily_digest 예외: {e}", level="error", task_id=task_id)
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_sunday_review(self) -> dict:
"""일 09:00 — 최신 회차 forward+calibration 보장 후 회고 텔레그램."""
from ..service_proxy import lotto_latest_draw, lotto_backtest_review
from ..notifiers.telegram_lotto import send_sunday_review
from ..db import create_task, update_task_status, add_log
task_id = create_task("lotto", "sunday_review", {})
try:
draw_no = await lotto_latest_draw()
if not draw_no:
update_task_status(task_id, "failed", result_data={"reason": "no_draw"})
return {"ok": False, "message": "no latest draw"}
# forward는 lotto cron이 이미 돌렸을 수 있으나 멱등이라 안전 — review만 호출
payload = await lotto_backtest_review(draw_no)
await send_sunday_review(payload)
update_task_status(task_id, "succeeded", result_data={"draw_no": draw_no})
add_log("lotto", f"sunday_review 발송: #{draw_no}", task_id=task_id)
return {"ok": True, "draw_no": draw_no}
except Exception as e:
update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log("lotto", f"sunday_review 예외: {e}", level="error", task_id=task_id)
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_weekly_evolution_report(self) -> dict:
"""토 22:15 — lotto-lab evaluate-now 트리거 후 텔레그램 리포트. task_id wrap."""
from ..service_proxy import lotto_evolver_evaluate, lotto_evolver_status

View File

@@ -225,3 +225,42 @@ async def send_evolution_report(eval_result: Dict[str, Any], current_base: List[
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] evolution report send failed: {e}")
# ---------- 일요 회고 브리핑 ----------
def format_sunday_review(payload: Dict[str, Any]) -> str:
"""일요 회고 브리핑 텍스트 (HTML parse_mode)."""
wa = payload.get("winner_analysis") or {}
draw_no = payload.get("draw_no") or "?"
pct = wa.get("percentile")
pct_txt = f"{pct*100:.0f}%" if pct is not None else ""
lines = [f"🔍 <b>로또 #{draw_no} 일요 회고</b>", ""]
if wa:
lines.append(f"이번 당첨조합 분석치: <b>{wa.get('score_total',0):.2f}</b> "
f"(무작위 분포 상위 {pct_txt})")
lines.append(f" 빈도 {wa.get('score_frequency',0):.2f} · 지문 {wa.get('score_fingerprint',0):.2f} "
f"· 갭 {wa.get('score_gap',0):.2f} · 공동출현 {wa.get('score_cooccur',0):.2f} "
f"· 다양성 {wa.get('score_diversity',0):.2f}")
lines.append("")
if payload.get("forward"):
lines.append("📊 <b>이번 회차 가상구매 성적</b>")
for f in payload.get("forward", []):
p = f.get("prizes") or {}
name = {"engine_w": f"엔진({f.get('label','')})", "random_null": "무작위", "coverage": "커버리지"}.get(
f.get("strategy", ""), f.get("strategy", "?"))
lines.append(f" {name}: 최고 {f.get('best_match','?')}일치 / "
f"4등 {p.get('4th', 0)} · 5등 {p.get('5th', 0)}")
else:
lines.append("📊 <b>이번 회차 가상구매 성적</b>: 데이터 없음 (아직 집계 전)")
lines.append("")
lines.append(" 무작위 대비 우위가 통계적으로 의미있을 때만 가중치가 진화합니다.")
return "\n".join(lines)
async def send_sunday_review(payload: Dict[str, Any]) -> None:
text = format_sunday_review(payload)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] sunday review send failed: {e}")

View File

@@ -68,6 +68,11 @@ async def _run_lotto_sync_evolver_activity():
if agent:
await agent.sync_evolver_activity()
async def _run_lotto_sunday_review():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_sunday_review()
async def _run_youtube_research():
agent = AGENT_REGISTRY.get("youtube")
if agent:
@@ -116,6 +121,7 @@ def init_scheduler():
scheduler.add_job(_run_lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check")
scheduler.add_job(_run_lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest")
scheduler.add_job(_run_lotto_weekly_evolution_report, "cron", day_of_week="sat", hour=22, minute=15, id="lotto_evolution_weekly")
scheduler.add_job(_run_lotto_sunday_review, "cron", day_of_week="sun", hour=9, minute=0, id="lotto_sunday_review")
scheduler.add_job(
_run_lotto_sync_evolver_activity,
"cron", hour=9, minute=30,

View File

@@ -399,6 +399,14 @@ async def lotto_evolver_evaluate() -> Dict[str, Any]:
return resp.json()
async def lotto_backtest_review(draw_no: int) -> Dict[str, Any]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/backtest/review/{draw_no}")
resp.raise_for_status()
return resp.json()
from .config import AGENT_CONTAINER_MAP

View File

@@ -0,0 +1,38 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.notifiers import telegram_lotto as tl
def test_format_sunday_review_text():
payload = {
"draw_no": 1170,
"winner_analysis": {"score_total": 0.41, "percentile": 0.33,
"score_frequency": 0.4, "score_fingerprint": 0.5, "score_gap": 0.3,
"score_cooccur": 0.45, "score_diversity": 0.6},
"forward": [
{"strategy": "engine_w", "label": "w1", "prizes": {"1st":0,"2nd":0,"3rd":0,"4th":1,"5th":12}, "best_match": 4, "avg_meta_score": 0.55},
{"strategy": "random_null", "label": "-", "prizes": {"1st":0,"2nd":0,"3rd":0,"4th":0,"5th":10}, "best_match": 3, "avg_meta_score": 0.33},
],
"track_record": {},
"calibration_trend": [{"draw_no":1170,"score_total":0.41,"percentile":0.33}],
}
txt = tl.format_sunday_review(payload)
assert "1170" in txt
assert "%" in txt # percentile 표기
assert "engine" in txt.lower() or "엔진" in txt
def test_format_sunday_review_no_calibration():
payload = {"draw_no": 1171, "winner_analysis": None, "forward": []}
txt = tl.format_sunday_review(payload)
assert "1171" in txt
assert "%" not in txt # no percentile section when calibration absent
assert "데이터 없음" in txt
def test_format_sunday_review_missing_prizes_no_crash():
payload = {"draw_no": 1171, "winner_analysis": None,
"forward": [{"strategy": "engine_w", "label": "w1", "best_match": 3}]} # no 'prizes'
txt = tl.format_sunday_review(payload) # must NOT raise
assert "1171" in txt

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,191 @@
# 로또 자가학습 백테스트 & 캘리브레이션 — 설계 Spec
- **작성일**: 2026-05-31
- **상태**: 설계 승인 (구현 plan 대기)
- **대상 서비스**: `lotto` (lotto-lab) + `agent-office` (LottoAgent) + `web-ui` (/lotto 자율학습 탭)
- **사이클**: 스마트 에이전트 고도화 3종(로또/주식/인스타) 중 **1번 로또**. 주식·인스타는 후속 사이클.
---
## 1. 배경 & 목표
사용자(CEO)는 로또 에이전트를 "분석 번호를 계속 가상구매해 시도횟수를 늘리고, 실제 당첨조합을 역분석해 스스로 학습·디벨롭하며 일요일에 회고 브리핑하는 스마트 에이전트"로 고도화하길 원한다. 명시 목표는 "로또 1등".
### ⚠️ 정직성 전제 (설계의 토대)
로또는 매 회차 균등·독립 추첨이다. C(45,6)=8,145,060 조합이 전부 동일 확률이며 회차 간 독립이다. 따라서:
- **과거 데이터(빈도·갭·공동출현)의 미래 예측력은 수학적으로 0.** 통계 분석으로 1등 확률을 올릴 수 없다.
- 고정 예산 N장으로 1등 확률을 최대화하는 유일한 방법은 **서로 다른(distinct) 조합 N개**를 사는 것이다.
이 사실을 부정하지 않고 **시스템에 내장**한다. 본 프로젝트의 가치는 "예측"이 아니라:
1. **정직한 측정** — "내 분석 엔진이 무작위를 이기는가?"를 null-model 대조군으로 매번 엄밀히 검정.
2. **자가학습 엔진 인프라** — 측정→학습→회고 루프 자체의 엔지니어링.
3. **커버리지 최적화** — 1등이 목표라면 distinct 조합 커버리지 최대화가 수학적 최적.
→ 사용자 결정(2026-05-31): **"정직한 측정 + 커버리지 최적"** 프레이밍 채택. 패턴 학습은 계속하되 모든 백테스트에 null-model 베이스라인을 내장한다.
### 기존 자산 (100% 재활용, 신규 ML 없음)
- `analyzer.build_analysis_cache(draws)` / `score_combination(numbers, cache, weights)` — 임의 조합의 5개 sub-score + 종합점수(0~1) = **"분석치"**.
- `analyzer.build_number_weights` + `utils.weighted_sample_6` — 가중 후보 생성.
- `generator.run_simulation` — 20k 후보를 `score_combination(·, active_weights)`로 랭킹→best_picks. **W가 선택을 바꾸는 경로가 이미 존재.**
- `weight_evolver` — 토 22:00 주간 6 가중치 후보 채점→base 갱신.
### 발견된 잠재 결함 (본 작업으로 수정)
`weight_evolver.apply_today_and_pick``recommend_numbers(draws)`(W 미사용)로 픽을 뽑은 뒤 W로 점수만 매긴다. 즉 **현재 daily 픽은 W와 무관**하고, evolver가 평가하는 매칭 결과도 W-독립이라 가중치 진화가 픽 품질에 연결돼 있지 않다. → forward 가상구매를 **시뮬레이션 선택 경로(풀 생성→W 랭킹→상위 K 구매)**로 구현하면 W가 결과를 실제로 바꿔 가중치 학습이 비로소 의미를 갖고 이 결함도 해소된다.
---
## 2. 핵심 개념 — Self-Learning Backtest Loop
세 축으로 구성:
### 축 A — Forward 가상구매 (매주, 회차당 수천 장)
매 회차 추첨 후, 각 전략별로 대량 후보를 생성·랭킹해 상위 K장을 "구매"로 간주 → 실제 당첨번호로 채점 → **회차별 집계 1행만 영구 저장**. 개별 티켓 미저장.
- 전략: `engine_w`(6개 trial 가중치 각각) / `random_null`(무작위 대조군) / `coverage`(distinct 최대화).
- 이 매칭 결과가 evolver의 학습 신호가 된다.
### 축 B — Winner 캘리브레이션 (역대 전체 백필 + 매주 증분)
각 회차의 **실제 당첨조합을 그 시점 이전 데이터로 만든 캐시(point-in-time)에 넣어** 5개 분석치 + 종합점수 + percentile을 기록.
- percentile = 당첨조합 score_total이 그 시점 무작위 M개 표본 분포에서 차지하는 위치.
- "내 엔진이 실제 당첨번호에 높은 점수를 주는가?"의 가장 정직한 신호. 당첨조합이 일관되게 낮은 percentile이면 엔진은 헛다리.
### 축 C — 일요일 회고 브리핑
토 추첨(20:45)→동기화(21:10)→기존 evolver 리포트(토 22:15) 이후, **일 09:00**에 차분히 회고. 이번 회차 forward 성적 + 당첨조합 역분석 + 내 추천과 비교 + 캘리브레이션 추세 + 가중치 진화를 텔레그램 1통 + UI.
---
## 3. 데이터 모델 (lotto.db 신규)
집계 전용 — row 수 ≈ 회차 × 전략 (수천 규모, 무시 가능).
### `backtest_runs` — forward 가상구매 집계
```
id INTEGER PK
draw_no INTEGER NOT NULL -- 채점 대상(당첨 확정된) 회차
strategy TEXT NOT NULL -- 'engine_w' | 'random_null' | 'coverage'
weight_label TEXT NOT NULL -- engine_w는 trial day_of_week('w0'..'w5'), 그 외 '-'
weight_json TEXT -- 사용한 W (random/coverage는 NULL)
trial_id INTEGER -- FK weight_trials (engine_w만, nullable)
n_tickets INTEGER NOT NULL -- 구매(채점) 장수
m3 INTEGER NOT NULL DEFAULT 0 -- 3개 일치 장수
m4 INTEGER NOT NULL DEFAULT 0
m5 INTEGER NOT NULL DEFAULT 0
m6 INTEGER NOT NULL DEFAULT 0
bonus_hits INTEGER NOT NULL DEFAULT 0 -- 5+보너스(2등) 장수
best_match INTEGER NOT NULL DEFAULT 0
avg_meta_score REAL -- 구매 티켓 평균 분석치
created_at TEXT NOT NULL
UNIQUE(draw_no, strategy, weight_label) -- 멱등
```
- 등수 매핑: 1등=m6, 2등=bonus_hits, 3등=m5bonus_hits, 4등=m4, 5등=m3.
### `winner_calibration` — 회차별 당첨조합 역분석
```
draw_no INTEGER PK -- 멱등
winning_json TEXT NOT NULL -- [n1..n6] (보너스 별도 보관 안 함)
score_total REAL NOT NULL
score_frequency REAL NOT NULL
score_fingerprint REAL NOT NULL
score_gap REAL NOT NULL
score_cooccur REAL NOT NULL
score_diversity REAL NOT NULL
percentile REAL -- 0~1, 무작위 M표본 대비 당첨조합 점수 위치
my_pick_avg REAL -- 그 회차 engine 추천 평균 분석치(있으면)
cache_draws INTEGER NOT NULL -- point-in-time 캐시에 쓰인 회차 수
created_at TEXT NOT NULL
```
> 누적 성적표(track record)는 `backtest_runs` SUM 집계로 on-the-fly 계산 — 별도 테이블 불필요.
---
## 4. 컴포넌트
### 4.1 lotto-lab `app/backtest.py` (순수 연산 — FastAPI 의존성 0, Windows 이전 대비)
- `generate_pool(cache, number_weights, n) -> list[tuple]``weighted_sample_6` 반복으로 distinct 후보 풀.
- `purchase_tickets(pool, cache, W, k) -> list[dict]` — 풀을 `score_combination(·, W)`로 랭킹→상위 k장 distinct.
- `coverage_select(pool, k) -> list` — distinct 보장 상위 커버리지(초기엔 단순 distinct, 휠링은 향후).
- `grade_tickets(tickets, winning6, bonus) -> dict` — 매칭 히스토그램 + 등수 카운트 + best_match + avg_meta. `bonus`는 draws 레코드에서 가져옴(2등=5일치+보너스 판정용).
- `run_forward_purchase(draw_no, k=5000, pool_n=20000) -> dict` — engine(6 W)+random_null+coverage 각각 **전략당 k=5000장(수천 장)** 구매·채점·`backtest_runs` 저장(멱등). 풀 pool_n=20000에서 랭킹.
- `calibrate_winner(draw_no, sample_m=2000) -> dict``draws[:idx]`(대상 회차 제외) 캐시로 당첨조합 채점 + 무작위 sample_m 표본 percentile → `winner_calibration` 저장(멱등).
- `backfill_calibration(batch=50) -> dict` — 미처리 회차만 청크 처리, 재개 가능.
- `build_review_payload(draw_no) -> dict` — 회고 브리핑용 조립(당첨조합 분해 + 내 추천 비교 + forward 성적 + 캘리브레이션 추세 + 진화 결과).
### 4.2 lotto-lab `app/routers/backtest.py`
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/lotto/backtest/track-record` | 누적 성적표(전략별 등수 카운트, engine vs random) |
| GET | `/api/lotto/backtest/calibration?weeks=N` | 캘리브레이션 이력 + 추세 |
| GET | `/api/lotto/backtest/review/{draw_no}` | 회고 payload |
| POST | `/api/lotto/backtest/run-forward?draw_no=` | forward 수동 트리거 |
| POST | `/api/lotto/backtest/backfill` | 캘리브레이션 백필(백그라운드) |
### 4.3 weight_evolver 업그레이드
- `evaluate_weekly`: 학습 신호를 N=5(W-무관)에서 **forward 가상구매(engine_w 6전략) + null-model 대비 lift**로 승격.
- lift = engine_w 등수 점수 random_null 등수 점수(동일 회차).
- 승자 = lift 최대 trial. **모든 W의 lift가 노이즈 범위(±ε) 내면 base `unchanged`** → 노이즈 과적합 방지.
- `decide_base_update` 규칙은 유지하되 입력(winner)을 backtest 기반으로 교체.
- 기존 `auto_picks` 경로는 하위호환·일일 활동표시용으로 유지(evolver 결정에는 미사용).
---
## 5. 플로우
1. **캘리브레이션 백필 (1회)**: `POST /backtest/backfill` → 백그라운드 청크(50회차/배치, 멱등 재개). 이후 회차마다 증분.
2. **주간 forward**: 당첨번호 동기화 직후 `run_forward_purchase(latest)`. 참고: 6 W × 20k 풀은 기존 시뮬이 **하루 6회** 돌리는 부하보다 가벼움 → NAS 부담 작음.
3. **일 09:00 회고 (agent-office 신규 cron)**: `LottoAgent.run_sunday_review()` → forward+calibration 보장 → `GET /backtest/review/{latest}` → 텔레그램 1통.
4. **evolver (토 22:00, 기존 cron)**: backtest 집계를 학습 신호로 소비.
### Windows 이전 경로 (NAS 부하 측정 후 필요시)
`backtest.py`가 순수 함수라, lotto-lab은 system-of-record 유지 + 무거운 연산만 Windows WSL docker 워커에 위임(`/api/internal/lotto/*` webhook, 기존 music/video/image 워커 패턴 재활용) + agent 폴링. 코드 경계가 깨끗해 마이그레이션 비용 최소. **초기 구현은 NAS-first**, 측정 후 결정.
---
## 6. 출력
### 6.1 텔레그램 (일 09:00, `notifiers/telegram_lotto.py` 신규 섹션)
이번 당첨조합 5분석치 분해 + 내 추천 평균과 비교 + 이번주 forward 성적(등수 카운트, **무작위 대비 lift**) + 캘리브레이션 percentile 추세 + 가중치 진화 결과.
### 6.2 web-ui `/lotto` "자율 학습" 탭 확장 (`.lotto-evolver-*` 다크 네임스페이스 재활용)
- **TrackRecordCard**: 누적 "매주 전략당 5,000장 샀다면" 등수 — engine vs random_null 나란히 + 총지출 대비 당첨금(정직하게 적자 표시).
- **CalibrationChart**: 당첨조합 score_total 추세 + 내 추천 평균 오버레이 + percentile 밴드 → "우위 없음"을 시각화.
- **WinnerAnalysisCard**: 이번 회차 당첨조합 5분석치 레이더 + 내 추천 비교.
---
## 7. 에러·성능·멱등
- **멱등성**: `winner_calibration` UNIQUE(draw_no), `backtest_runs` UNIQUE(draw_no,strategy,weight_label) → 재실행 skip.
- **NAS 성능**: 주간 forward는 기존 시뮬보다 가벼움. 백필만 1회 무거움(≈1100 point-in-time 캐시 재구성) → 청크+백그라운드+멱등 재개. 야간/유휴 트리거 권장.
- **텔레그램 실패**: 로그만 남기고 job은 성공 처리(기존 패턴). 회고 데이터는 이미 DB에 있어 UI는 영향 없음.
## 8. 테스트 전략
- 등수 매핑(m3~m6/bonus → 1~5등) 단위 테스트.
- null-model 기대값 + lift 계산.
- percentile 계산 정확성.
- **point-in-time 캐시가 대상 회차를 제외하는지** (calibrate_winner 정직성 핵심).
- 멱등 백필(재실행 시 중복 row 없음, 중단 후 재개).
- evolver의 lift-over-random 승자 선택 + ε-게이팅(노이즈 시 unchanged).
- 기존 `count_match`/`calc_pick_score` 테스트 유지.
## 9. 리스크 & 완화
| 리스크 | 완화 |
|--------|------|
| 무작위성 → 실제 우위 없음 | null-model 정직 프레이밍, 우위 없음을 데이터로 보고하는 게 목표 |
| Celeron 백필 부하 | 청크+1회성+멱등 재개, 필요시 Windows 이전 |
| evolver 노이즈 추종 | lift-over-random + ε-게이팅으로 unchanged 처리 |
| DB 증가 | 집계 전용, row 수 무시 가능 |
| forward 풀 중복으로 커버리지 손실 | distinct 강제 + coverage 전략 별도 측정 |
## 10. 결정 로그 (2026-05-31 brainstorming)
1. 3종 중 **로또 먼저**, 주식·인스타는 후속 사이클.
2. 회고 브리핑 = **토 추첨 직후 일 09:00**.
3. 시도 규모 = **수천 장/회차 + 집계만 저장**.
4. 자율성 = **가중치 자동튜닝 강화**(산식 구조 고정).
5. 백테스트 범위 = **캘리브레이션 전체 백필 + 가상구매 forward**.
6. 출력 = **텔레그램 + 기존 자율학습 탭 확장**.
7. 프레이밍 = **정직한 측정(null-model) + 커버리지 최적**.
8. 연산 위치 = **NAS-first, 필요시 Windows WSL 이전**.
## 11. 스코프 밖 / 향후
- 주식 에이전트(보유종목 집중 분석+차트 매수/매도 시그널), 인스타 에이전트(자율 카드 발급) — 별도 사이클.
- 휠링/커버링 디자인(하위 등수 최소 보장) — coverage 전략 고도화로 향후.
- Windows WSL 워커 분리 — NAS 부하 측정 후.

272
lotto/app/backtest.py Normal file
View File

@@ -0,0 +1,272 @@
"""로또 자가학습 백테스트 — 순수 연산 (FastAPI 의존성 0, Windows 이전 대비)."""
import logging
import random
from typing import Any, Dict, List, Optional, Tuple
from .analyzer import build_analysis_cache, build_number_weights, score_combination
from .utils import weighted_sample_6
# engine_w trials 수와 동일하게 맞춰 selection bias를 상쇄한다.
N_NULL_TRIALS = 6
def grade_tickets(tickets: List[List[int]], winning6: List[int], bonus: int) -> Dict[str, Any]:
"""티켓 묶음을 당첨번호로 채점 → 매칭 히스토그램 + 보너스 + best_match.
2등 판정: 5일치 AND 보너스 번호를 티켓이 포함."""
win = set(winning6)
hist = {"m3": 0, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0}
best = 0
for t in tickets:
c = len(set(t) & win)
if c > best:
best = c
if c == 6:
hist["m6"] += 1
elif c == 5:
hist["m5"] += 1
if bonus in t:
hist["bonus_hits"] += 1
elif c == 4:
hist["m4"] += 1
elif c == 3:
hist["m3"] += 1
return {**hist, "best_match": best}
def prize_counts(hist: Dict[str, Any]) -> Dict[str, int]:
"""매칭 히스토그램 → 등수 카운트.
1등=m6, 2등=bonus_hits, 3등=m5bonus_hits, 4등=m4, 5등=m3."""
return {
"1st": hist.get("m6", 0),
"2nd": hist.get("bonus_hits", 0),
"3rd": hist.get("m5", 0) - hist.get("bonus_hits", 0),
"4th": hist.get("m4", 0),
"5th": hist.get("m3", 0),
}
def generate_pool(cache, number_weights, n: int = 20000,
seed: Optional[int] = None) -> List[List[int]]:
"""가중 샘플링으로 distinct 후보 풀 생성."""
rng = random.Random(seed)
seen, pool = set(), []
attempts, cap = 0, n * 4
while len(pool) < n and attempts < cap:
attempts += 1
nums = tuple(sorted(weighted_sample_6(number_weights)))
if nums in seen:
continue
seen.add(nums)
pool.append(list(nums))
if len(pool) < n:
logging.getLogger(__name__).warning(
"generate_pool: requested %d, got %d", n, len(pool)
)
return pool
def purchase_tickets(pool, cache, W: List[float], k: int) -> List[List[int]]:
"""풀을 score_combination(·, W)로 랭킹 → 상위 k장 distinct."""
if k > len(pool):
raise ValueError(f"k={k} exceeds pool size {len(pool)}")
ranked = sorted(pool, key=lambda t: -score_combination(t, cache, W)["score_total"])
return ranked[:k]
def random_null_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
"""무작위 distinct 티켓 k장 (null-model 대조군)."""
rng = random.Random(seed)
seen, out = set(), []
guard = 0
while len(out) < k and guard < k * 200:
guard += 1
nums = tuple(sorted(rng.sample(range(1, 46), 6)))
if nums in seen:
continue
seen.add(nums)
out.append(list(nums))
return out
def point_in_time_draws(draws: List[Tuple[int, List[int]]],
target_draw_no: int) -> List[Tuple[int, List[int]]]:
"""target 회차 추첨 '직전' 시점의 데이터 — target_draw_no 미만만."""
return [(d, nums) for d, nums in draws if d < target_draw_no]
def calibrate_winner_compute(draws, target_draw_no, winning6,
sample_m: int = 2000, seed: Optional[int] = None) -> Dict[str, Any]:
"""순수 연산: point-in-time 캐시로 당첨조합 채점 + 무작위 M표본 percentile."""
pit = point_in_time_draws(draws, target_draw_no)
cache = build_analysis_cache(pit)
scores = score_combination(sorted(winning6), cache)
win_total = scores["score_total"]
samples = random_null_tickets(sample_m, seed=seed)
le = sum(1 for t in samples
if score_combination(t, cache)["score_total"] <= win_total)
percentile = le / max(len(samples), 1)
return {"scores": scores, "percentile": percentile, "cache_draws": len(pit)}
MIN_HISTORY = 30 # point-in-time 캐시 최소 회차 (이 미만은 캘리브레이션 skip)
def _db():
from . import db as _db_mod
return _db_mod
def calibrate_winner(draw_no: int, sample_m: int = 2000, draws=None) -> Dict[str, Any]:
"""DB 진입점: 회차 1개 캘리브레이션 후 저장 (멱등).
draws를 외부에서 전달하면 N+1 조회를 방지한다."""
db = _db()
if draws is None:
draws = db.get_all_draw_numbers()
row = db.get_draw(draw_no)
if row is None:
return {"ok": False, "reason": "no_draw"}
pit = point_in_time_draws(draws, draw_no)
if len(pit) < MIN_HISTORY:
return {"ok": False, "reason": "insufficient_history"}
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
res = calibrate_winner_compute(draws, draw_no, winning6, sample_m=sample_m)
db.save_winner_calibration(
draw_no=draw_no, winning=winning6, scores=res["scores"],
percentile=res["percentile"], my_pick_avg=None,
cache_draws=res["cache_draws"],
)
return {"ok": True, "draw_no": draw_no, **res}
def backfill_calibration(batch: int = 50, sample_m: int = 2000) -> Dict[str, Any]:
"""미처리 회차만 batch개 캘리브레이션 (멱등·재개 가능)."""
db = _db()
draws = db.get_all_draw_numbers()
done = db.get_calibrated_draw_nos()
todo = [d for d, _ in draws if d not in done and d > MIN_HISTORY]
todo.sort()
n = 0
for draw_no in todo[:batch]:
r = calibrate_winner(draw_no, sample_m=sample_m, draws=draws)
if r.get("ok"):
n += 1
return {"calibrated": n, "remaining": max(0, len(todo) - batch)}
def run_forward_purchase(draw_no: int, k: int = 5000, pool_n: int = 20000,
sample_seed: Optional[int] = None) -> Dict[str, Any]:
"""회차 추첨 '직전' 시점 데이터로 3전략 구매 → 당첨번호로 채점 → 저장(멱등).
engine_w: 그 주 weight_trials 6개(없으면 current_base 1개)로 각각 구매."""
db = _db()
draws = db.get_all_draw_numbers()
row = db.get_draw(draw_no)
if row is None:
return {"ok": False, "reason": "no_draw"}
pit = point_in_time_draws(draws, draw_no)
if len(pit) < MIN_HISTORY:
return {"ok": False, "reason": "insufficient_history"}
winning6 = [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]
bonus = row["bonus"]
cache = build_analysis_cache(pit)
nw = build_number_weights(cache)
pool = generate_pool(cache, nw, n=pool_n, seed=sample_seed)
def _store(strategy, label, weight_json, trial_id, tickets):
graded = grade_tickets(tickets, winning6, bonus)
avg_meta = (sum(score_combination(t, cache)["score_total"] for t in tickets)
/ max(len(tickets), 1))
db.save_backtest_run(
draw_no=draw_no, strategy=strategy, weight_label=label,
weight_json=weight_json, trial_id=trial_id, n_tickets=len(tickets),
hist=graded, best_match=graded["best_match"], avg_meta_score=avg_meta,
)
# 1) engine_w — 그 주 trials(있으면) 아니면 uniform fallback (leak-free)
from datetime import date as _date
from . import weight_evolver as we
draw_date = _date.fromisoformat(row["drw_date"])
week_start = we.get_week_start(draw_date)
trials = db.get_weekly_trials(week_start)
if trials:
for t in trials:
bought = purchase_tickets(pool, cache, t["weight"], k)
_store("engine_w", f"w{t['day_of_week']}", t["weight"], t["id"], bought)
else:
base = [0.2] * 5
bought = purchase_tickets(pool, cache, base, k)
_store("engine_w", "base", base, None, bought)
# 2) random_null — N_NULL_TRIALS 개 (engine_w 수와 동일해 selection bias 상쇄)
for _i in range(N_NULL_TRIALS):
seed_i = None if sample_seed is None else sample_seed + 100 + _i
_store("random_null", f"r{_i}", None, None, random_null_tickets(k, seed=seed_i))
# 3) coverage
_store("coverage", "-", None, None, coverage_tickets(k, seed=sample_seed))
return {"ok": True, "draw_no": draw_no}
def track_record() -> Dict[str, Any]:
"""전략별 누적 등수 집계 (engine_w는 라벨 합산)."""
db = _db()
rows = db.get_backtest_runs()
agg: Dict[str, Dict[str, int]] = {}
draw_sets: Dict[str, set] = {}
for r in rows:
a = agg.setdefault(r["strategy"], {
"n_tickets": 0, "1st": 0, "2nd": 0, "3rd": 0, "4th": 0, "5th": 0, "draws": 0})
p = prize_counts(r)
a["n_tickets"] += r["n_tickets"]
for tier in ("1st", "2nd", "3rd", "4th", "5th"):
a[tier] += p[tier]
draw_sets.setdefault(r["strategy"], set()).add(r["draw_no"])
for strat, s in draw_sets.items():
agg[strat]["draws"] = len(s)
return {"by_strategy": agg}
def build_review_payload(draw_no: int) -> Dict[str, Any]:
"""일요 회고 브리핑용 조립."""
db = _db()
cal = db.get_winner_calibration(draw_no)
runs = db.get_backtest_runs(draw_no=draw_no)
hist = db.get_calibration_history(limit=12)
forward = []
for r in runs:
forward.append({"strategy": r["strategy"], "label": r["weight_label"],
"prizes": prize_counts(r), "best_match": r["best_match"],
"avg_meta_score": r["avg_meta_score"]})
return {
"draw_no": draw_no,
"winner_analysis": cal, # score_* + percentile
"forward": forward,
"track_record": track_record()["by_strategy"],
"calibration_trend": [
{"draw_no": h["draw_no"], "score_total": h["score_total"],
"percentile": h["percentile"]} for h in hist
],
}
def coverage_tickets(k: int, seed: Optional[int] = None) -> List[List[int]]:
"""greedy 커버리지 — 아직 덜 쓰인 번호를 우선 배치해 번호를 넓게 분산.
(휠링/보장설계는 향후. 현재는 distinct + 번호 사용 균등화)"""
rng = random.Random(seed)
usage = {n: 0 for n in range(1, 46)}
seen, out = set(), []
guard = 0
while len(out) < k and guard < k * 50:
guard += 1
ranked = sorted(range(1, 46), key=lambda n: (usage[n], rng.random()))
nums = tuple(sorted(ranked[:6]))
if nums in seen:
# 동점 흔들기: top-6과 disjoint한 영역에서 샘플
nums = tuple(sorted(rng.sample(ranked[6:12], 6)))
if nums in seen:
continue
seen.add(nums)
out.append(list(nums))
for n in nums:
usage[n] += 1
return out

View File

@@ -125,6 +125,48 @@ def init_db() -> None:
"ON simulation_candidates(is_best, score_total DESC);"
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS backtest_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draw_no INTEGER NOT NULL,
strategy TEXT NOT NULL,
weight_label TEXT NOT NULL DEFAULT '-',
weight_json TEXT,
trial_id INTEGER,
n_tickets INTEGER NOT NULL,
m3 INTEGER NOT NULL DEFAULT 0,
m4 INTEGER NOT NULL DEFAULT 0,
m5 INTEGER NOT NULL DEFAULT 0,
m6 INTEGER NOT NULL DEFAULT 0,
bonus_hits INTEGER NOT NULL DEFAULT 0,
best_match INTEGER NOT NULL DEFAULT 0,
avg_meta_score REAL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_backtest_run "
"ON backtest_runs(draw_no, strategy, weight_label);")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS winner_calibration (
draw_no INTEGER PRIMARY KEY,
winning_json TEXT NOT NULL,
score_total REAL NOT NULL,
score_frequency REAL NOT NULL,
score_fingerprint REAL NOT NULL,
score_gap REAL NOT NULL,
score_cooccur REAL NOT NULL,
score_diversity REAL NOT NULL,
percentile REAL,
my_pick_avg REAL,
cache_draws INTEGER NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS best_picks (
@@ -1443,3 +1485,79 @@ def get_base_history(limit: int = 12) -> List[Dict[str, Any]]:
out.append(d)
return out
# ── backtest_runs / winner_calibration CRUD ───────────────────────────────────
def save_backtest_run(draw_no, strategy, weight_label, weight_json, trial_id,
n_tickets, hist, best_match, avg_meta_score) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO backtest_runs
(draw_no, strategy, weight_label, weight_json, trial_id, n_tickets,
m3, m4, m5, m6, bonus_hits, best_match, avg_meta_score)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(draw_no, strategy, weight_label) DO UPDATE SET
weight_json=excluded.weight_json, trial_id=excluded.trial_id,
n_tickets=excluded.n_tickets, m3=excluded.m3, m4=excluded.m4,
m5=excluded.m5, m6=excluded.m6, bonus_hits=excluded.bonus_hits,
best_match=excluded.best_match, avg_meta_score=excluded.avg_meta_score
""",
(draw_no, strategy, weight_label,
# weight_json must be a dict/list (not a pre-serialized string) to avoid double-encoding
json.dumps(weight_json) if weight_json is not None else None,
trial_id, n_tickets,
hist.get("m3",0), hist.get("m4",0), hist.get("m5",0), hist.get("m6",0),
hist.get("bonus_hits",0), best_match, avg_meta_score),
)
def get_backtest_runs(draw_no=None, strategy=None) -> List[Dict[str, Any]]:
q = "SELECT * FROM backtest_runs WHERE 1=1"
args = []
if draw_no is not None:
q += " AND draw_no=?"; args.append(draw_no)
if strategy is not None:
q += " AND strategy=?"; args.append(strategy)
q += " ORDER BY draw_no DESC, strategy, weight_label"
with _conn() as conn:
return [dict(r) for r in conn.execute(q, args).fetchall()]
def save_winner_calibration(draw_no, winning, scores, percentile,
my_pick_avg, cache_draws) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO winner_calibration
(draw_no, winning_json, score_total, score_frequency, score_fingerprint,
score_gap, score_cooccur, score_diversity, percentile, my_pick_avg, cache_draws)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(draw_no) DO UPDATE SET
winning_json=excluded.winning_json, score_total=excluded.score_total,
score_frequency=excluded.score_frequency, score_fingerprint=excluded.score_fingerprint,
score_gap=excluded.score_gap, score_cooccur=excluded.score_cooccur,
score_diversity=excluded.score_diversity, percentile=excluded.percentile,
my_pick_avg=excluded.my_pick_avg, cache_draws=excluded.cache_draws
""",
(draw_no, json.dumps(winning), scores["score_total"], scores["score_frequency"],
scores["score_fingerprint"], scores["score_gap"], scores["score_cooccur"],
scores["score_diversity"], percentile, my_pick_avg, cache_draws),
)
def get_winner_calibration(draw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM winner_calibration WHERE draw_no=?",
(draw_no,)).fetchone()
return dict(r) if r else None
def get_calibration_history(limit: int = 52) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM winner_calibration ORDER BY draw_no DESC LIMIT ?",
(limit,)).fetchall()
return [dict(r) for r in rows]
def get_calibrated_draw_nos() -> set[int]:
with _conn() as conn:
return {r["draw_no"] for r in
conn.execute("SELECT draw_no FROM winner_calibration").fetchall()}

View File

@@ -47,13 +47,16 @@ from .weight_evolver import (
from .routers import curator as curator_router
from .routers import briefing as briefing_router
from .routers import review as review_router
from .routers import backtest as backtest_router
from .jobs.grade_weekly_review import run_for_latest as grade_run_for_latest
from . import backtest
app = FastAPI()
install_access_log(app)
app.include_router(curator_router.router)
app.include_router(briefing_router.router)
app.include_router(review_router.router)
app.include_router(backtest_router.router)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json")
@@ -82,6 +85,12 @@ def on_startup():
if res["was_new"]:
check_results_for_draw(res["drawNo"])
_refresh_perf_cache() # 새 채점 결과 반영 → 즉시 갱신
# 자가학습 백테스트 — 새 회차 forward 구매 + 당첨조합 캘리브레이션
try:
backtest.run_forward_purchase(draw_no=res["drawNo"])
backtest.calibrate_winner(res["drawNo"])
except Exception as e:
logger.warning(f"backtest 갱신 실패: {e}")
scheduler.add_job(_sync_and_check, "cron", hour="9,21", minute=10)

View File

@@ -0,0 +1,39 @@
from fastapi import APIRouter, BackgroundTasks, Query
from .. import backtest, db
router = APIRouter(prefix="/api/lotto/backtest", tags=["backtest"])
@router.get("/track-record")
def track_record():
return backtest.track_record()
@router.get("/calibration")
def calibration(weeks: int = Query(52, ge=1, le=520)):
return {"history": db.get_calibration_history(limit=weeks)}
@router.get("/review/{draw_no}")
def review(draw_no: int):
if db.get_draw(draw_no) is None:
from fastapi import HTTPException
raise HTTPException(404, f"no draw {draw_no}")
return backtest.build_review_payload(draw_no)
@router.post("/run-forward")
def run_forward(
background_tasks: BackgroundTasks,
draw_no: int = Query(...),
k: int = Query(5000, ge=1, le=5000),
pool_n: int = Query(20000, ge=1000, le=20000),
):
background_tasks.add_task(backtest.run_forward_purchase, draw_no, k, pool_n)
return {"ok": True, "queued": True, "draw_no": draw_no}
@router.post("/backfill")
def backfill(background_tasks: BackgroundTasks, batch: int = 50, sample_m: int = 2000):
background_tasks.add_task(backtest.backfill_calibration, batch, sample_m)
return {"ok": True, "message": f"backfill 시작 (batch={batch})"}

View File

@@ -4,6 +4,7 @@
순수 함수 (clamp/perturb/Dirichlet/score/base-rule) + DB 진입점은 별도 섹션.
"""
from __future__ import annotations
import json
import math
import random
from datetime import datetime, timedelta, timezone
@@ -18,6 +19,34 @@ DEFAULT_UNIFORM = [0.2] * N_METRICS # cold start
RANK_BY_CORRECT = {6: 1, 5: 3, 4: 4, 3: 5}
RANK_BONUS = {1: 1.0, 2: 0.8, 3: 0.6, 4: 0.3, 5: 0.1}
LIFT_EPSILON = 10.0 # best-of-engine vs best-of-random margin;
# selection bias already cancelled by equal group sizes (N_NULL_TRIALS == engine trial count);
# tune as needed.
PRIZE_WEIGHTS = {"m6": 1000.0, "bonus_hits": 50.0, "m5": 30.0, "m4": 4.0, "m3": 1.0}
def select_winner_by_lift(per_w: List[Dict[str, Any]], random_score: float,
epsilon: float = LIFT_EPSILON) -> Dict[str, Any]:
"""engine_w 후보들 중 random 대비 lift 최대 선택.
최대 lift가 epsilon 미만이면 gated=True (노이즈 → base 유지 권고)."""
scored = [{**w, "lift": w["prize_score"] - random_score} for w in per_w]
best = max(scored, key=lambda w: w["lift"])
return {**best, "gated": best["lift"] < epsilon}
def prize_score_from_hist(hist: Dict[str, int]) -> float:
"""매칭 히스토그램 → 등수 가중 합산 점수.
1등=m6, 2등=bonus_hits, 3등=m5bonus_hits, 4등=m4, 5등=m3.
m3/m4/m5/m6/bonus_hits 키만 읽으며 나머지는 무시하므로
DB 전체 행(backtest_runs row)을 그대로 넘겨도 안전하다."""
third = max(0, hist.get("m5", 0) - hist.get("bonus_hits", 0))
return (hist.get("m6", 0) * PRIZE_WEIGHTS["m6"]
+ hist.get("bonus_hits", 0) * PRIZE_WEIGHTS["bonus_hits"]
+ third * PRIZE_WEIGHTS["m5"]
+ hist.get("m4", 0) * PRIZE_WEIGHTS["m4"]
+ hist.get("m3", 0) * PRIZE_WEIGHTS["m3"])
def clamp_and_normalize(W: List[float], min_w: float = MIN_WEIGHT) -> List[float]:
"""각 값 ≥ min_w + 합=1.0. 보장 안 되면 raise."""
@@ -269,6 +298,47 @@ def evaluate_weekly() -> Dict[str, Any]:
winner = max(per_day, key=lambda d: d["avg_score"])
# 자가학습 강화: backtest forward 등수점수 lift로 winner 재선정.
# best-of-engine vs best-of-random 비교 — 동등 그룹 크기로 selection bias 상쇄.
latest_no = latest["drw_no"]
runs = db.get_backtest_runs(draw_no=latest_no)
engine_runs = [r for r in runs if r["strategy"] == "engine_w"]
null_runs = [r for r in runs if r["strategy"] == "random_null"]
gated = False # 이후 decide_base_update override에 사용
if engine_runs and null_runs:
# base 단독 행이 있고 w* 행도 있으면 base 행 제외 (identity collision 방지)
has_w_trials = any(r["weight_label"].startswith("w") for r in engine_runs)
if has_w_trials:
engine_runs = [r for r in engine_runs if r["weight_label"] != "base"]
# best-of-random: 동등 그룹의 최댓값 (selection bias 상쇄)
random_best = max(prize_score_from_hist(r) for r in null_runs)
per_w = []
for r in engine_runs:
per_w.append({
"trial_id": r["trial_id"],
"weight_label": r["weight_label"],
"weight": json.loads(r["weight_json"]) if r["weight_json"] else DEFAULT_UNIFORM[:],
"prize_score": prize_score_from_hist(r),
"best_match": r["best_match"],
})
lift_winner = select_winner_by_lift(per_w, random_score=random_best)
if not lift_winner["gated"]:
# lift winner의 정체성과 채점값을 일관되게 사용
winner = {
"trial_id": lift_winner["trial_id"],
"weight": lift_winner["weight"],
"max_correct": lift_winner["best_match"], # 이 trial의 실제값
"avg_score": lift_winner["prize_score"], # lift winner의 prize score
"lift": lift_winner["lift"],
}
else:
# 노이즈 → gated 플래그 설정; decide_base_update 이후 명시적으로 override
gated = True
winner = {**winner, "lift": lift_winner["lift"]}
current_base = db.get_current_base()
new_base, reason = decide_base_update(
winner_max_correct=winner["max_correct"],
@@ -276,6 +346,11 @@ def evaluate_weekly() -> Dict[str, Any]:
current_base=current_base,
)
# gated path: decide_base_update 결과와 무관하게 base 유지 강제
if gated:
new_base = list(current_base) if current_base is not None else DEFAULT_UNIFORM[:]
reason = "unchanged_gated"
next_monday = today + timedelta(days=(7 - today.weekday()) % 7 or 7)
next_monday_iso = next_monday.isoformat()

View File

@@ -0,0 +1,100 @@
from app import backtest as bt
from app.analyzer import build_analysis_cache, build_number_weights, score_combination
def _toy_draws(n=120):
# 결정적 가짜 회차: 분석 캐시 구성용 (오름차순 (drw_no, [6 nums]))
import random as _r
_r.seed(1)
out = []
for i in range(1, n + 1):
nums = sorted(_r.sample(range(1, 46), 6))
out.append((i, nums))
return out
def test_grade_tickets_histogram_and_prizes():
winning6 = [1, 2, 3, 4, 5, 6]
bonus = 7
tickets = [
[1, 2, 3, 4, 5, 6], # 6일치 = 1등
[1, 2, 3, 4, 5, 7], # 5일치 + 보너스 = 2등
[1, 2, 3, 4, 5, 8], # 5일치 = 3등
[1, 2, 3, 4, 9, 10], # 4일치 = 4등
[1, 2, 3, 11, 12, 13], # 3일치 = 5등
[40, 41, 42, 43, 44, 45], # 0일치
]
r = bt.grade_tickets(tickets, winning6, bonus)
assert r["m6"] == 1
assert r["m5"] == 2 # 5일치 총 2장(보너스 포함)
assert r["bonus_hits"] == 1 # 그 중 보너스 1장
assert r["m4"] == 1
assert r["m3"] == 1
assert r["best_match"] == 6
# 등수 매핑 헬퍼
prizes = bt.prize_counts(r)
assert prizes == {"1st": 1, "2nd": 1, "3rd": 1, "4th": 1, "5th": 1}
def test_purchase_tickets_distinct_and_count():
draws = _toy_draws()
cache = build_analysis_cache(draws)
nw = build_number_weights(cache)
pool = bt.generate_pool(cache, nw, n=2000, seed=7)
W = [0.25, 0.30, 0.20, 0.15, 0.10]
bought = bt.purchase_tickets(pool, cache, W, k=50)
assert len(bought) == 50
assert len({tuple(t) for t in bought}) == 50 # distinct
# W로 랭킹된 상위 k → 평균 점수가 풀 전체 평균 이상이어야
avg_bought = sum(score_combination(t, cache, W)["score_total"] for t in bought) / 50
avg_pool = sum(score_combination(t, cache, W)["score_total"] for t in pool) / len(pool)
assert avg_bought >= avg_pool
def test_random_null_and_coverage_distinct():
rnd = bt.random_null_tickets(k=50, seed=3)
assert len(rnd) == 50 and len({tuple(t) for t in rnd}) == 50
cov = bt.coverage_tickets(k=9, seed=3) # 9장 = 54슬롯 ≥ 45번호 전수 커버 가능
flat = {n for t in cov for n in t}
assert len(cov) == 9 and len({tuple(t) for t in cov}) == 9
assert len(flat) >= 40 # 커버리지 전략은 번호를 넓게 퍼뜨림
def test_point_in_time_excludes_target_draw():
draws = _toy_draws(50) # drw_no 1..50
pit = bt.point_in_time_draws(draws, target_draw_no=30)
assert all(d < 30 for d, _ in pit) # 30 이상 제외
assert max(d for d, _ in pit) == 29
assert len(pit) == 29
def test_calibrate_winner_scores_and_percentile():
draws = _toy_draws(60)
winning6 = [3, 11, 19, 27, 35, 44]
res = bt.calibrate_winner_compute(draws, target_draw_no=60,
winning6=winning6, sample_m=500, seed=9)
assert set(res["scores"].keys()) >= {"score_total", "score_frequency",
"score_fingerprint", "score_gap", "score_cooccur", "score_diversity"}
assert 0.0 <= res["percentile"] <= 1.0
assert res["cache_draws"] == 59 # 1..59
def test_generate_pool_partial_fill(monkeypatch):
"""weighted_sample_6이 항상 같은 조합만 반환하도록 패치 → cap에 먼저 걸려 len < n — 예외 없이 반환."""
import random as _r
_r.seed(42)
tiny_draws = [(i, sorted(_r.sample(range(1, 46), 6))) for i in range(1, 10)]
cache = build_analysis_cache(tiny_draws)
nw = build_number_weights(cache)
# weighted_sample_6을 항상 동일한 하나의 조합만 반환하도록 패치
# → 두 번째 시도부터 seen에 막혀 n개를 채울 수 없고 cap=n*4 이후 종료
import app.backtest as _bt_mod
monkeypatch.setattr(_bt_mod, "weighted_sample_6", lambda _w: [1, 2, 3, 4, 5, 6])
n = 50
pool = bt.generate_pool(cache, nw, n=n, seed=0)
# 예외 없이 반환해야 하고, 결과는 n 미만이어야 하며 모두 distinct
assert isinstance(pool, list)
assert len(pool) < n
assert len({tuple(t) for t in pool}) == len(pool)

View File

@@ -0,0 +1,75 @@
import os, sys, tempfile, random as _r
# _shared lives in web-backend/_shared; add the parent dir so it can be found
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from fastapi.testclient import TestClient
def _client(monkeypatch):
tmp = tempfile.mkdtemp()
from app import db
monkeypatch.setattr(db, "DB_PATH", os.path.join(tmp, "lotto.db"))
db.init_db()
from app.main import app
return TestClient(app), db
def _seed_draws(db, n=40):
rows = []
_r.seed(2)
for i in range(1, n + 1):
s = sorted(_r.sample(range(1, 46), 6))
rows.append({"drw_no": i, "drw_date": f"2020-01-{(i % 28) + 1:02d}",
"n1": s[0], "n2": s[1], "n3": s[2], "n4": s[3],
"n5": s[4], "n6": s[5], "bonus": ((s[5] % 45) + 1)})
db.upsert_many_draws(rows)
def test_backtest_endpoints(monkeypatch):
client, db = _client(monkeypatch)
r = client.get("/api/lotto/backtest/track-record")
assert r.status_code == 200
assert "by_strategy" in r.json()
r2 = client.get("/api/lotto/backtest/calibration?weeks=4")
assert r2.status_code == 200
assert isinstance(r2.json().get("history"), list)
def test_track_record_with_data(monkeypatch):
"""seed 40 draws + forward run → track-record contains random_null."""
client, db_mod = _client(monkeypatch)
_seed_draws(db_mod, 40)
from app import backtest as bt
bt.run_forward_purchase(40, k=20, pool_n=500, sample_seed=5)
r = client.get("/api/lotto/backtest/track-record")
assert r.status_code == 200
body = r.json()
assert "by_strategy" in body
assert "random_null" in body["by_strategy"]
def test_review_known_and_unknown(monkeypatch):
"""Known draw with calibration → 200 + non-null winner_analysis; unknown → 404."""
client, db_mod = _client(monkeypatch)
_seed_draws(db_mod, 40)
from app import backtest as bt
bt.run_forward_purchase(40, k=20, pool_n=500, sample_seed=5)
bt.calibrate_winner(40, sample_m=200)
r = client.get("/api/lotto/backtest/review/40")
assert r.status_code == 200
body = r.json()
assert body["winner_analysis"] is not None
assert "score_total" in body["winner_analysis"]
r2 = client.get("/api/lotto/backtest/review/99999")
assert r2.status_code == 404
def test_calibration_weeks_bounds(monkeypatch):
"""weeks=0 and weeks=521 should be rejected with 422."""
client, _ = _client(monkeypatch)
r0 = client.get("/api/lotto/backtest/calibration?weeks=0")
assert r0.status_code == 422
r521 = client.get("/api/lotto/backtest/calibration?weeks=521")
assert r521.status_code == 422

View File

@@ -0,0 +1,320 @@
import os, tempfile
def _fresh_db(monkeypatch):
tmp = tempfile.mkdtemp()
path = os.path.join(tmp, "lotto.db")
from app import db
monkeypatch.setattr(db, "DB_PATH", path)
db.init_db()
return db
def test_backtest_tables_exist(monkeypatch):
db = _fresh_db(monkeypatch)
with db._conn() as conn:
tables = {r["name"] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
assert "backtest_runs" in tables
assert "winner_calibration" in tables
def test_backtest_runs_unique(monkeypatch):
db = _fresh_db(monkeypatch)
db.save_backtest_run(draw_no=100, strategy="random_null", weight_label="-",
weight_json=None, trial_id=None, n_tickets=10,
hist={"m3":1,"m4":0,"m5":0,"m6":0,"bonus_hits":0},
best_match=3, avg_meta_score=0.5)
db.save_backtest_run(draw_no=100, strategy="random_null", weight_label="-",
weight_json=None, trial_id=None, n_tickets=10,
hist={"m3":2,"m4":0,"m5":0,"m6":0,"bonus_hits":0},
best_match=3, avg_meta_score=0.6) # 멱등 upsert
rows = db.get_backtest_runs(draw_no=100)
assert len(rows) == 1
assert rows[0]["m3"] == 2 # 마지막 값으로 갱신
_SCORES = {
"score_total": 1.23,
"score_frequency": 0.30,
"score_fingerprint": 0.25,
"score_gap": 0.20,
"score_cooccur": 0.28,
"score_diversity": 0.20,
}
def test_winner_calibration_upsert(monkeypatch):
"""save_winner_calibration 두 번 호출 시 upsert — 행 1개, 값은 마지막 것."""
db = _fresh_db(monkeypatch)
winning = [3, 7, 15, 22, 33, 41]
db.save_winner_calibration(draw_no=200, winning=winning,
scores=_SCORES, percentile=75.0,
my_pick_avg=0.9, cache_draws=100)
# 두 번째 저장 — percentile, my_pick_avg 업데이트
scores2 = {**_SCORES, "score_total": 2.00}
db.save_winner_calibration(draw_no=200, winning=winning,
scores=scores2, percentile=80.0,
my_pick_avg=1.1, cache_draws=110)
row = db.get_winner_calibration(200)
assert row is not None
# 행이 1개만 존재하는지 확인
with db._conn() as conn:
cnt = conn.execute(
"SELECT COUNT(*) AS c FROM winner_calibration WHERE draw_no=200"
).fetchone()["c"]
assert cnt == 1
assert row["percentile"] == 80.0
assert row["score_total"] == 2.00
def _seed_draws(db, n=40):
rows = []
import random as _r; _r.seed(2)
for i in range(1, n + 1):
s = sorted(_r.sample(range(1, 46), 6))
rows.append({"drw_no": i, "drw_date": f"2020-01-{(i%28)+1:02d}",
"n1": s[0], "n2": s[1], "n3": s[2], "n4": s[3],
"n5": s[4], "n6": s[5], "bonus": ((s[5] % 45) + 1)})
db.upsert_many_draws(rows)
def test_backfill_calibration_idempotent(monkeypatch):
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
r1 = bt.backfill_calibration(batch=15, sample_m=200)
# 첫 회차는 point-in-time 데이터가 빈약 → min_history 이후만 처리
done1 = len(db.get_calibrated_draw_nos())
assert done1 > 0
r2 = bt.backfill_calibration(batch=100, sample_m=200) # 나머지
done2 = len(db.get_calibrated_draw_nos())
assert done2 >= done1
r3 = bt.backfill_calibration(batch=100, sample_m=200) # 재실행 → 추가 0
assert r3["calibrated"] == 0
def test_run_forward_purchase_persists_all_strategies(monkeypatch):
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
# 작은 규모로 빠르게
res = bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
assert res["ok"] is True
rows = db.get_backtest_runs(draw_no=40)
strategies = {r["strategy"] for r in rows}
assert "random_null" in strategies
assert "coverage" in strategies
assert "engine_w" in strategies # base 가중치로 최소 1건
for r in rows:
assert r["n_tickets"] == 20
def test_calibrate_winner_no_draw(monkeypatch):
"""DB에 없는 회차 번호 → ok=False, reason='no_draw'."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
r = bt.calibrate_winner(99999)
assert r["ok"] is False
assert r["reason"] == "no_draw"
def test_calibrate_winner_insufficient_history(monkeypatch):
"""point-in-time 이력이 MIN_HISTORY(30) 미만인 회차 → reason='insufficient_history'.
draw_no=20이면 PIT 이력이 19개(draws 1~19)로 30 미만."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
r = bt.calibrate_winner(20)
assert r["ok"] is False
assert r["reason"] == "insufficient_history"
def test_run_forward_purchase_with_trials(monkeypatch):
"""그 주 weight_trials가 존재하면 engine_w 행의 weight_label이 'w0'..'w5' 형식이어야 한다."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
# draw 40: drw_date='2020-01-13' → week_start='2020-01-13'
from datetime import date, timedelta
draw_date = date.fromisoformat("2020-01-13")
ws = (draw_date - timedelta(days=draw_date.weekday())).isoformat()
# 해당 주에 trial 2개 심기 (day_of_week 0, 1)
db.save_weight_trial(ws, 0, [0.1, 0.3, 0.2, 0.2, 0.2], "perturb")
db.save_weight_trial(ws, 1, [0.25, 0.25, 0.25, 0.15, 0.1], "perturb")
from app import backtest as bt
res = bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
assert res["ok"] is True
rows = db.get_backtest_runs(draw_no=40)
engine_w_labels = {r["weight_label"] for r in rows if r["strategy"] == "engine_w"}
# trials가 있으므로 'base'가 아닌 'w0', 'w1' 형식이어야 한다
assert "base" not in engine_w_labels
assert any(lbl.startswith("w") for lbl in engine_w_labels)
def test_run_forward_purchase_idempotent(monkeypatch):
"""run_forward_purchase 두 번 호출 시 upsert — 행 수 변화 없음."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
count_after_first = len(db.get_backtest_runs(draw_no=40))
bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
count_after_second = len(db.get_backtest_runs(draw_no=40))
assert count_after_second == count_after_first
def test_get_calibrated_draw_nos(monkeypatch):
"""저장된 draw_no 집합이 get_calibrated_draw_nos에 포함되어야 한다."""
db = _fresh_db(monkeypatch)
winning = [1, 2, 3, 4, 5, 6]
for draw_no in (301, 302, 303):
db.save_winner_calibration(draw_no=draw_no, winning=winning,
scores=_SCORES, percentile=50.0,
my_pick_avg=0.5, cache_draws=50)
nos = db.get_calibrated_draw_nos()
assert isinstance(nos, set)
assert {301, 302, 303}.issubset(nos)
def test_track_record_and_review_payload(monkeypatch):
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=5)
bt.calibrate_winner(40, sample_m=200)
tr = bt.track_record()
assert "random_null" in tr["by_strategy"]
# 이제 random_null은 N_NULL_TRIALS=6 행이므로 6*20=120장
assert tr["by_strategy"]["random_null"]["n_tickets"] >= 20
payload = bt.build_review_payload(40)
assert payload["draw_no"] == 40
assert "winner_analysis" in payload # 당첨조합 5분석치
assert "forward" in payload # 이번 회차 전략별 성적
assert "calibration_trend" in payload
assert payload["winner_analysis"] is not None
assert "score_total" in payload["winner_analysis"]
def test_run_forward_purchase_random_null_count(monkeypatch):
"""run_forward_purchase는 random_null을 N_NULL_TRIALS=6개 저장해야 한다."""
db = _fresh_db(monkeypatch)
_seed_draws(db, 40)
from app import backtest as bt
res = bt.run_forward_purchase(draw_no=40, k=20, pool_n=500, sample_seed=7)
assert res["ok"] is True
rows = db.get_backtest_runs(draw_no=40)
null_rows = [r for r in rows if r["strategy"] == "random_null"]
assert len(null_rows) == bt.N_NULL_TRIALS # 6개
null_labels = {r["weight_label"] for r in null_rows}
assert null_labels == {f"r{i}" for i in range(bt.N_NULL_TRIALS)}
for r in null_rows:
assert r["n_tickets"] == 20
def test_evaluate_weekly_gated_keeps_base_unchanged(monkeypatch):
"""Fix 5 통합 테스트 (end-to-end gated path).
접근: DB에 draws, weight_trials, auto_picks, backtest_runs, base_history를 직접 심어
evaluate_weekly()의 gated 분기가 base를 바꾸지 않음을 검증한다.
gated 조건: engine_w 최고 prize_score random_best < LIFT_EPSILON(10.0).
engine_best=5, random_best=20 → lift=-15 → gated.
evaluate_weekly 내부 흐름:
- get_weekly_trials(week_start) : _today_kst() 기준 week_start 사용
- get_latest_draw() : draws 테이블에서 max(drw_no) 반환
두 참조가 같은 날짜 기준이어야 하므로 _today_kst를 monkeypatch로 고정하고
draws의 최신 회차 날짜(drw_date)를 해당 주의 날짜로 맞춘다.
"""
import json as _json
from datetime import date, timedelta, datetime as _dt, timezone as _tz, timedelta as _td
db = _fresh_db(monkeypatch)
# KST 오늘 날짜 — evaluate_weekly가 이 날짜를 기준으로 week_start 계산
KST = _tz(_td(hours=9))
today_kst = _dt.now(KST).date()
from app import weight_evolver as we
week_start = we.get_week_start(today_kst)
# 1) draws 심기 — 최신 회차의 drw_date를 week_start 주 안의 날짜로 맞춤
import random as _r; _r.seed(99)
rows = []
for i in range(1, 41):
s = sorted(_r.sample(range(1, 46), 6))
# 마지막 회차(40)는 오늘 날짜 사용 (week_start 주 내)
if i == 40:
drw_date = today_kst.isoformat()
else:
drw_date = f"2020-01-{(i % 28) + 1:02d}"
rows.append({
"drw_no": i, "drw_date": drw_date,
"n1": s[0], "n2": s[1], "n3": s[2],
"n4": s[3], "n5": s[4], "n6": s[5],
"bonus": (s[5] % 45) + 1,
})
db.upsert_many_draws(rows)
latest = db.get_latest_draw()
assert latest is not None
assert latest["drw_date"] == today_kst.isoformat()
# 2) weight trial 1개 심기 (day_of_week=0, week_start=오늘 주)
trial_w = [0.2, 0.2, 0.2, 0.2, 0.2]
db.save_weight_trial(week_start, 0, trial_w, "perturb")
trial_rows = db.get_weekly_trials(week_start)
assert len(trial_rows) == 1
trial_id = trial_rows[0]["id"]
# 3) auto_picks 1개 심기 (winning 번호와 2개 일치 → max_correct=2)
winning6 = [latest["n1"], latest["n2"], latest["n3"],
latest["n4"], latest["n5"], latest["n6"]]
pick = winning6[:2] + [40, 41, 42, 43]
db.save_auto_pick(trial_id, 1, pick, meta_score=0.5)
# 4) backtest_runs: engine_w prize_score=5, random_null 6개 prize_score=20 (gated 확실)
LOW_HIST = {"m3": 5, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0} # prize=5
HIGH_HIST = {"m3": 20, "m4": 0, "m5": 0, "m6": 0, "bonus_hits": 0} # prize=20
draw_no = latest["drw_no"]
db.save_backtest_run(
draw_no=draw_no, strategy="engine_w", weight_label="w0",
weight_json=_json.dumps(trial_w), trial_id=trial_id, n_tickets=20,
hist=LOW_HIST, best_match=2, avg_meta_score=0.5,
)
from app import backtest as bt
for i in range(bt.N_NULL_TRIALS):
db.save_backtest_run(
draw_no=draw_no, strategy="random_null", weight_label=f"r{i}",
weight_json=None, trial_id=None, n_tickets=20,
hist=HIGH_HIST, best_match=3, avg_meta_score=0.5,
)
# 5) current base 저장 (이전 주 월요일 effective_from)
base_w = [0.2, 0.2, 0.2, 0.2, 0.2]
prev_monday = (today_kst - timedelta(weeks=1, days=today_kst.weekday())).isoformat()
db.save_base_history(
effective_from=prev_monday,
weight=base_w,
source_trial_id=None,
update_reason="cold_start",
winner_score=None,
winner_max_correct=None,
)
assert db.get_current_base() == base_w
# 6) evaluate_weekly 호출 — _today_kst()를 monkeypatch로 오늘 날짜 고정
monkeypatch.setattr(we, "_today_kst", lambda: today_kst)
result = we.evaluate_weekly()
assert result.get("ok") is True, f"evaluate_weekly 실패: {result}"
# gated path 검증
update_reason = result.get("update_reason", "")
assert update_reason in ("unchanged_gated", "idempotent_skip"), (
f"gated여야 하는데 reason='{update_reason}' — 게이팅 로직 깨짐"
)
# base가 바뀌지 않았는지 검증
new_base = result.get("new_base")
assert new_base == base_w, (
f"gated인데 base가 변경됨: {new_base} != {base_w}"
)

View File

@@ -120,3 +120,79 @@ def test_decide_base_update_cold_start_returns_default():
)
assert new_base == winner_W
assert reason == "winner_4plus"
def test_select_winner_by_lift_gating():
# engine_w 3개 + random_null 기준. lift = engine 등수점수 random 등수점수
per_w = [
{"trial_id": 1, "day_of_week": 0, "weight": [0.2]*5, "prize_score": 5.0},
{"trial_id": 2, "day_of_week": 1, "weight": [0.3,0.2,0.2,0.2,0.1], "prize_score": 9.0},
{"trial_id": 3, "day_of_week": 2, "weight": [0.1,0.3,0.2,0.2,0.2], "prize_score": 4.0},
]
# random baseline이 8.0이면 lift는 -3, +1, -4 → 최대 lift(+1) < ε(2) → 게이팅
winner = we.select_winner_by_lift(per_w, random_score=8.0, epsilon=2.0)
assert winner["gated"] is True # 최대 lift(+1) < ε(2) → 게이팅
winner2 = we.select_winner_by_lift(per_w, random_score=3.0, epsilon=2.0)
assert winner2["gated"] is False
assert winner2["trial_id"] == 2 # prize 9 → lift +6
def test_prize_score_from_hist():
# 등수 가중치: 1등 매우 큼, 하위는 작게
s = we.prize_score_from_hist({"m3": 10, "m4": 2, "m5": 0, "m6": 0, "bonus_hits": 0})
s_big = we.prize_score_from_hist({"m3": 0, "m4": 0, "m5": 0, "m6": 1, "bonus_hits": 0})
assert s_big > s # 1등 1장이 5등 다수보다 큼
def test_select_winner_by_lift_preserves_all_keys():
"""select_winner_by_lift는 per_w 항목의 모든 키를 보존해야 한다.
best_match, weight_label 등 identity 필드가 누락되면 evaluate_weekly가 깨진다."""
per_w = [
{
"trial_id": 10,
"weight_label": "w0",
"weight": [0.2] * 5,
"prize_score": 3.0,
"best_match": 3,
},
{
"trial_id": 11,
"weight_label": "w1",
"weight": [0.3, 0.2, 0.2, 0.2, 0.1],
"prize_score": 20.0,
"best_match": 4,
},
]
result = we.select_winner_by_lift(per_w, random_score=5.0, epsilon=2.0)
assert result["gated"] is False
assert result["trial_id"] == 11
assert result["weight_label"] == "w1" # identity 키 보존
assert result["best_match"] == 4 # best_match 키 보존
assert "lift" in result # lift 추가됨
assert result["lift"] == pytest.approx(15.0)
def test_gated_path_keeps_base_via_select_winner():
"""gated=True일 때 select_winner_by_lift의 반환값 검증.
evaluate_weekly 내의 gated 분기가 올바른 값에 의존함을 확인한다."""
per_w = [
{"trial_id": 1, "weight_label": "w0", "weight": [0.2]*5,
"prize_score": 5.0, "best_match": 2},
{"trial_id": 2, "weight_label": "w1", "weight": [0.3,0.2,0.2,0.2,0.1],
"prize_score": 7.0, "best_match": 3},
]
# random_best=8.0 → 최대 engine lift=7-8=-1 → gated
result = we.select_winner_by_lift(per_w, random_score=8.0, epsilon=we.LIFT_EPSILON)
assert result["gated"] is True
assert result["lift"] < 0
# decide_base_update를 통해 gated가 unchanged를 유도하는지 확인
# (gated override가 없더라도, 현재 LIFT_EPSILON=10.0 하에서 lift<0이면 항상 gated)
current = [0.2, 0.2, 0.2, 0.2, 0.2]
# gated이면 evaluate_weekly가 current_base를 그대로 유지해야 함
# 여기서는 override 로직을 직접 재현해 검증한다
gated = result["gated"]
new_base_override = list(current) if gated else None
reason_override = "unchanged_gated" if gated else "should_not_reach"
assert new_base_override == current
assert reason_override == "unchanged_gated"