lotto lab 추천 알고리즘 및 시뮬레이션 강화

This commit is contained in:
2026-02-23 22:32:14 +09:00
parent c96815c2e3
commit 71d9d7a571
4 changed files with 868 additions and 147 deletions

354
backend/app/analyzer.py Normal file
View File

@@ -0,0 +1,354 @@
"""
통계 분석 엔진 - lotto-lab 고도화
[팀 회의 합의 기반 5가지 통계 기법]
1. 빈도 Z-score 분석: 각 번호의 출현 빈도가 기댓값에서 얼마나 벗어났는지
2. 조합 지문(Fingerprint): 조합의 합계, 홀짝 비율, 구간 분포가 역대 당첨번호와 유사한지
3. 갭 분석(Gap): 각 번호의 마지막 출현으로부터 경과 회차 수 기반 점수
4. 공동 출현 행렬(Co-occurrence): 번호 쌍이 역대에 함께 나온 빈도 기반 점수
5. 다양성(Diversity): 연속 번호, 범위, 구간 분포 다양성
[통계 근거]
- 1~45번 각각의 이론적 출현 확률: 6/45 ≈ 13.33% per draw
- 기댓값 합계: E[sum] = 6 × E[1..45] = 6 × 23 = 138
- 표준편차 합계: std ≈ sqrt(6 × Var[uniform 1..45]) ≈ 31
- 홀수 23개 (1,3,...,45), 짝수 22개 (2,4,...,44)
- 번호 쌍 공동 출현 확률: C(43,4)/C(45,6) ≈ 1.516% per draw
"""
import math
from collections import Counter, defaultdict
from typing import List, Tuple, Dict, Any, Optional
# 구간 정의: (시작, 끝) 포함
ZONE_RANGES: List[Tuple[int, int]] = [
(1, 9),
(10, 19),
(20, 29),
(30, 39),
(40, 45),
]
def _get_zone(n: int) -> int:
"""번호가 속하는 구간 인덱스 (0-4)"""
for z, (lo, hi) in enumerate(ZONE_RANGES):
if lo <= n <= hi:
return z
return 4
def build_analysis_cache(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]:
"""
역대 당첨번호 데이터 기반 통계 분석 캐시 구성.
시뮬레이션 실행 시 한 번만 호출하여 재사용 (성능 최적화).
Args:
draws: [(drw_no, [n1,n2,n3,n4,n5,n6]), ...] 오름차순
Returns:
통계 캐시 딕셔너리
"""
if not draws:
return {}
total_draws = len(draws)
all_nums_list = [n for _, nums in draws for n in nums]
freq_all = Counter(all_nums_list)
# ── 1. 빈도 Z-score ──────────────────────────────────────────────────────
freq_values = [freq_all.get(n, 0) for n in range(1, 46)]
mean_freq = sum(freq_values) / 45.0
variance_freq = sum((f - mean_freq) ** 2 for f in freq_values) / 45.0
std_freq = math.sqrt(variance_freq)
z_scores: Dict[int, float] = {}
for n in range(1, 46):
z_scores[n] = (freq_all.get(n, 0) - mean_freq) / max(std_freq, 0.001)
# ── 2. 갭 분석: 마지막 출현 이후 경과 회차 ──────────────────────────────
# gap = 0: 가장 최근 회차에 출현, gap = k: k회 전에 마지막 출현
last_seen_gap: Dict[int, int] = {}
for gap_idx, (_, nums) in enumerate(reversed(draws)):
for n in nums:
if n not in last_seen_gap:
last_seen_gap[n] = gap_idx
for n in range(1, 46):
if n not in last_seen_gap:
last_seen_gap[n] = total_draws # 한 번도 안 나옴 (이론상 거의 불가)
# ── 3. 공동 출현 행렬 ────────────────────────────────────────────────────
# cooccur[(i,j)] = 번호 i와 j가 같은 회차에 함께 출현한 횟수 (i < j)
cooccur: Dict[Tuple[int, int], int] = defaultdict(int)
for _, nums in draws:
s = sorted(nums)
for i in range(len(s)):
for j in range(i + 1, len(s)):
cooccur[(s[i], s[j])] += 1
# 번호 쌍 공동 출현 기댓값: C(43,4)/C(45,6) × total_draws
# C(43,4) = 123,410 / C(45,6) = 8,145,060
expected_cooccur = total_draws * 123410.0 / 8145060.0
# ── 4. 역대 조합 통계 (합계, 홀수 개수) ──────────────────────────────────
historical_sums = [sum(nums) for _, nums in draws]
mean_sum = sum(historical_sums) / total_draws
std_sum = math.sqrt(
sum((s - mean_sum) ** 2 for s in historical_sums) / total_draws
)
std_sum = max(std_sum, 1.0) # 0 나누기 방지
historical_odds = [sum(1 for n in nums if n % 2 == 1) for _, nums in draws]
odd_dist = Counter(historical_odds)
odd_prob: Dict[int, float] = {k: v / total_draws for k, v in odd_dist.items()}
max_odd_prob = max(odd_prob.values()) if odd_prob else 1.0
# ── 5. 구간별 분포 통계 ───────────────────────────────────────────────────
# 각 구간에 몇 개 포함되는지의 역대 분포
zone_counts = [Counter() for _ in ZONE_RANGES]
for _, nums in draws:
for z_idx, (lo, hi) in enumerate(ZONE_RANGES):
cnt = sum(1 for n in nums if lo <= n <= hi)
zone_counts[z_idx][cnt] += 1
zone_probs: List[Dict[int, float]] = []
for zc in zone_counts:
total_z = sum(zc.values())
zone_probs.append({k: v / total_z for k, v in zc.items()})
max_zone_probs = [max(zp.values()) if zp else 1.0 for zp in zone_probs]
# ── 6. 최근 빈도 (후보 생성 가중치용) ────────────────────────────────────
recent_100 = draws[-100:] if len(draws) >= 100 else draws
freq_recent = Counter(n for _, nums in recent_100 for n in nums)
return {
"total_draws": total_draws,
"freq_all": freq_all,
"z_scores": z_scores,
"last_seen_gap": last_seen_gap,
"cooccur": dict(cooccur),
"expected_cooccur": expected_cooccur,
"mean_sum": mean_sum,
"std_sum": std_sum,
"odd_prob": odd_prob,
"max_odd_prob": max_odd_prob,
"zone_probs": zone_probs,
"max_zone_probs": max_zone_probs,
"freq_recent": freq_recent,
}
def build_number_weights(cache: Dict[str, Any]) -> Dict[int, float]:
"""
몬테카를로 시뮬레이션의 후보 생성에 사용할 번호별 샘플링 가중치.
빈도 + 최근 빈도 + 갭 분석을 반영하여 '좋은' 번호가 더 자주 선택되도록 유도.
"""
freq_all = cache["freq_all"]
last_seen_gap = cache["last_seen_gap"]
freq_recent = cache["freq_recent"]
weights: Dict[int, float] = {}
for n in range(1, 46):
w = freq_all.get(n, 0) + 1.5 * freq_recent.get(n, 0)
gap = last_seen_gap.get(n, 0)
if gap <= 1:
gap_factor = 0.50 # 바로 직전 등장 → 패널티
elif gap <= 3:
gap_factor = 0.75
elif gap <= 12:
gap_factor = 1.00 # 적정 범위
elif gap <= 25:
gap_factor = 1.10 # 약간 오래된 번호 → 소폭 보너스
else:
gap_factor = 1.20 # 오래된 번호 → 보너스
weights[n] = max(w * gap_factor, 0.5)
return weights
def score_combination(numbers: List[int], cache: Dict[str, Any]) -> Dict[str, float]:
"""
6개 번호 조합의 통계적 품질 점수 계산 (0~1 범위 정규화).
5가지 기법별 점수:
- score_frequency (25%): 빈도 Z-score
- score_fingerprint(30%): 조합의 통계적 지문 (합계, 홀짝, 구간)
- score_gap (20%): 갭 분석
- score_cooccur (15%): 공동 출현 기댓값 대비
- score_diversity (10%): 연속번호, 범위, 구간 다양성
Returns:
{"score_total": ..., "score_frequency": ..., ...}
"""
nums = sorted(numbers)
# ── 1. 빈도 점수 (Frequency Score) ────────────────────────────────────────
z_scores = cache["z_scores"]
avg_z = sum(z_scores.get(n, 0.0) for n in nums) / 6.0
# Sigmoid 정규화: avg_z > 0이면 0.5 이상
score_frequency = 1.0 / (1.0 + math.exp(-avg_z / 1.5))
# ── 2. 조합 지문 점수 (Fingerprint Score) ─────────────────────────────────
# 2a. 합계 정규분포 점수
total = sum(nums)
mean_sum = cache["mean_sum"]
std_sum = cache["std_sum"]
z_sum = (total - mean_sum) / std_sum
sum_score = math.exp(-0.5 * z_sum ** 2) # 정규분포 밀도 (peak=1 at mean)
# 2b. 홀짝 비율 점수
odd_count = sum(1 for n in nums if n % 2 == 1)
odd_prob = cache["odd_prob"]
max_odd_prob = cache["max_odd_prob"]
odd_score = odd_prob.get(odd_count, 0.01) / max_odd_prob
# 2c. 구간 분포 점수
zone_probs = cache["zone_probs"]
max_zone_probs = cache["max_zone_probs"]
zone_score = 0.0
for z_idx, (lo, hi) in enumerate(ZONE_RANGES):
cnt = sum(1 for n in nums if lo <= n <= hi)
zp = zone_probs[z_idx]
mzp = max_zone_probs[z_idx]
zone_score += zp.get(cnt, 0.01) / mzp
zone_score /= len(ZONE_RANGES)
score_fingerprint = sum_score * 0.50 + odd_score * 0.30 + zone_score * 0.20
# ── 3. 갭 점수 (Gap Score) ────────────────────────────────────────────────
last_seen_gap = cache["last_seen_gap"]
gap_scores: List[float] = []
for n in nums:
gap = last_seen_gap.get(n, 0)
if gap <= 1:
gs = 0.20 # 직전 등장 번호 - 강한 패널티
elif gap <= 3:
gs = 0.55
elif gap <= 7:
gs = 0.85
elif gap <= 15:
gs = 1.00 # 최적 범위
elif gap <= 25:
gs = 0.90
else:
gs = 0.75 # 오래된 번호 - 여전히 양호
gap_scores.append(gs)
score_gap = sum(gap_scores) / 6.0
# ── 4. 공동 출현 점수 (Co-occurrence Score) ───────────────────────────────
cooccur = cache["cooccur"]
expected_cooccur = cache["expected_cooccur"]
pair_scores: List[float] = []
for i in range(len(nums)):
for j in range(i + 1, len(nums)):
actual = cooccur.get((nums[i], nums[j]), 0)
ratio = actual / max(expected_cooccur, 0.001)
# Sigmoid: ratio = 1에서 0.5, ratio > 1이면 > 0.5
ps = 1.0 / (1.0 + math.exp(-2.0 * (ratio - 1.0)))
pair_scores.append(ps)
score_cooccur = sum(pair_scores) / max(len(pair_scores), 1)
# ── 5. 다양성 점수 (Diversity Score) ─────────────────────────────────────
# 5a. 연속 번호 포함 여부 (역대 당첨번호 약 52%에 최소 1쌍 포함)
has_consecutive = any(nums[i + 1] - nums[i] == 1 for i in range(len(nums) - 1))
consecutive_score = 0.65 if has_consecutive else 0.40
# 5b. 범위 점수 (최소~최대 차이)
num_range = nums[-1] - nums[0]
if 28 <= num_range <= 43:
spread_score = 1.00
elif 20 <= num_range < 28:
spread_score = 0.85
elif 13 <= num_range < 20:
spread_score = 0.65
elif num_range < 13:
spread_score = 0.25
else: # > 43 (최대 44: 1~45)
spread_score = 0.95
# 5c. 구간 커버리지 (몇 개 구간에 걸쳐 있는가)
zones_used = set(_get_zone(n) for n in nums)
zone_coverage = (len(zones_used) - 1) / 4.0 # 0~1
score_diversity = (
consecutive_score * 0.35
+ spread_score * 0.35
+ zone_coverage * 0.30
)
# ── 최종 가중 합산 ────────────────────────────────────────────────────────
score_total = (
score_frequency * 0.25
+ score_fingerprint * 0.30
+ score_gap * 0.20
+ score_cooccur * 0.15
+ score_diversity * 0.10
)
return {
"score_total": round(score_total, 6),
"score_frequency": round(score_frequency, 6),
"score_fingerprint": round(score_fingerprint, 6),
"score_gap": round(score_gap, 6),
"score_cooccur": round(score_cooccur, 6),
"score_diversity": round(score_diversity, 6),
}
def get_statistical_report(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]:
"""
통계 분석 리포트 생성 (GET /api/lotto/analysis 응답용).
각 번호의 빈도, Z-score, 갭, 히트/콜드/오버듀 분류를 반환.
"""
if not draws:
return {"error": "데이터 없음"}
cache = build_analysis_cache(draws)
total_draws = cache["total_draws"]
freq_all = cache["freq_all"]
z_scores = cache["z_scores"]
last_seen_gap = cache["last_seen_gap"]
number_stats = []
for n in range(1, 46):
freq = freq_all.get(n, 0)
expected = total_draws * 6.0 / 45.0
number_stats.append({
"number": n,
"frequency": freq,
"expected": round(expected, 1),
"frequency_pct": round(freq / (total_draws * 6) * 100, 2),
"z_score": round(z_scores.get(n, 0.0), 3),
"gap": last_seen_gap.get(n, total_draws),
"zone": _get_zone(n),
})
sorted_by_freq = sorted(number_stats, key=lambda x: -x["frequency"])
sorted_by_gap = sorted(number_stats, key=lambda x: -x["gap"])
# 역대 합계 분포 요약
hist_sums = [sum(nums) for _, nums in draws]
sum_buckets: Dict[str, int] = {}
for lo in range(21, 256, 20):
hi = lo + 19
key = f"{lo}-{hi}"
sum_buckets[key] = sum(1 for s in hist_sums if lo <= s <= hi)
return {
"total_draws": total_draws,
"mean_sum": round(cache["mean_sum"], 2),
"std_sum": round(cache["std_sum"], 2),
"odd_distribution": {
str(k): round(v * 100, 1)
for k, v in sorted(cache["odd_prob"].items())
},
"number_stats": number_stats,
"hot_numbers": [x["number"] for x in sorted_by_freq[:10]],
"cold_numbers": [x["number"] for x in sorted_by_freq[-10:]],
"overdue_numbers": [x["number"] for x in sorted_by_gap[:10]],
"sum_distribution": sum_buckets,
}

View File

@@ -77,6 +77,72 @@ def init_db() -> None:
# ✅ UNIQUE 인덱스(중복 저장 방지)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_reco_dedup ON recommendations(dedup_hash);")
# ── 시뮬레이션 테이블 ─────────────────────────────────────────────────
conn.execute(
"""
CREATE TABLE IF NOT EXISTS simulation_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_at TEXT NOT NULL DEFAULT (datetime('now')),
strategy TEXT NOT NULL DEFAULT 'monte_carlo',
total_generated INTEGER NOT NULL DEFAULT 0,
top_k_selected INTEGER NOT NULL DEFAULT 0,
avg_score REAL,
notes TEXT DEFAULT ''
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_simrun_at ON simulation_runs(run_at DESC);"
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS simulation_candidates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER NOT NULL,
numbers TEXT NOT NULL,
score_total REAL NOT NULL,
score_frequency REAL,
score_fingerprint REAL,
score_gap REAL,
score_cooccur REAL,
score_diversity REAL,
is_best INTEGER DEFAULT 0,
based_on_draw INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY(run_id) REFERENCES simulation_runs(id)
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_simcand_run "
"ON simulation_candidates(run_id, score_total DESC);"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_simcand_best "
"ON simulation_candidates(is_best, score_total DESC);"
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS best_picks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
numbers TEXT NOT NULL,
score_total REAL NOT NULL,
rank_in_run INTEGER,
source_run_id INTEGER,
based_on_draw INTEGER,
is_active INTEGER DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY(source_run_id) REFERENCES simulation_runs(id)
);
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_bestpicks_active "
"ON best_picks(is_active, score_total DESC);"
)
def upsert_draw(row: Dict[str, Any]) -> None:
with _conn() as conn:
conn.execute(
@@ -276,11 +342,160 @@ def update_recommendation_result(rec_id: int, rank: int, correct_count: int, has
with _conn() as conn:
cur = conn.execute(
"""
UPDATE recommendations
SET rank = ?, correct_count = ?, has_bonus = ?, checked = 1
UPDATE recommendations
SET rank = ?, correct_count = ?, has_bonus = ?, checked = 1
WHERE id = ?
""",
(rank, correct_count, 1 if has_bonus else 0, rec_id)
)
return cur.rowcount > 0
# ── 시뮬레이션 CRUD ─────────────────────────────────────────────────────────
def save_simulation_run(
strategy: str,
total_generated: int,
top_k_selected: int,
avg_score: float,
notes: str = "",
) -> int:
"""시뮬레이션 실행 기록 저장, 생성된 ID 반환"""
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO simulation_runs (strategy, total_generated, top_k_selected, avg_score, notes)
VALUES (?, ?, ?, ?, ?)
""",
(strategy, total_generated, top_k_selected, round(avg_score, 6), notes),
)
return int(cur.lastrowid)
def save_simulation_candidates_bulk(
run_id: int,
candidates: List[Dict[str, Any]],
based_on_draw: Optional[int],
) -> None:
"""
상위 후보들을 simulation_candidates 테이블에 일괄 저장.
candidates 각 항목: {"numbers": [...], "score_total": ..., "score_*": ..., "is_best": bool}
"""
data = [
(
run_id,
json.dumps(sorted(c["numbers"])),
c["score_total"],
c.get("score_frequency"),
c.get("score_fingerprint"),
c.get("score_gap"),
c.get("score_cooccur"),
c.get("score_diversity"),
1 if c.get("is_best") else 0,
based_on_draw,
)
for c in candidates
]
with _conn() as conn:
conn.executemany(
"""
INSERT INTO simulation_candidates
(run_id, numbers, score_total, score_frequency, score_fingerprint,
score_gap, score_cooccur, score_diversity, is_best, based_on_draw)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
data,
)
def replace_best_picks(
picks: List[Dict[str, Any]],
run_id: int,
based_on_draw: Optional[int],
) -> None:
"""
기존 활성 best_picks를 비활성화하고 새 picks로 교체.
picks 각 항목: {"numbers": [...], "score_total": ..., "rank_in_run": int}
"""
with _conn() as conn:
conn.execute("UPDATE best_picks SET is_active = 0 WHERE is_active = 1")
data = [
(
json.dumps(sorted(p["numbers"])),
p["score_total"],
p.get("rank_in_run"),
run_id,
based_on_draw,
)
for p in picks
]
conn.executemany(
"""
INSERT INTO best_picks (numbers, score_total, rank_in_run, source_run_id, based_on_draw, is_active)
VALUES (?, ?, ?, ?, ?, 1)
""",
data,
)
def get_best_picks(limit: int = 20) -> List[Dict[str, Any]]:
"""현재 활성화된 best_picks 조회 (점수 내림차순)"""
with _conn() as conn:
rows = conn.execute(
"""
SELECT id, numbers, score_total, rank_in_run, source_run_id, based_on_draw, created_at
FROM best_picks
WHERE is_active = 1
ORDER BY score_total DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [
{
"id": int(r["id"]),
"numbers": json.loads(r["numbers"]),
"score_total": r["score_total"],
"rank_in_run": r["rank_in_run"],
"source_run_id": r["source_run_id"],
"based_on_draw": r["based_on_draw"],
"created_at": r["created_at"],
}
for r in rows
]
def get_simulation_runs(limit: int = 10) -> List[Dict[str, Any]]:
"""최근 시뮬레이션 실행 기록 조회"""
with _conn() as conn:
rows = conn.execute(
"""
SELECT id, run_at, strategy, total_generated, top_k_selected, avg_score, notes
FROM simulation_runs
ORDER BY id DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def get_simulation_candidates(run_id: int, limit: int = 100) -> List[Dict[str, Any]]:
"""특정 시뮬레이션 실행의 후보 목록 조회 (점수 내림차순)"""
with _conn() as conn:
rows = conn.execute(
"""
SELECT id, numbers, score_total, score_frequency, score_fingerprint,
score_gap, score_cooccur, score_diversity, is_best, based_on_draw, created_at
FROM simulation_candidates
WHERE run_id = ?
ORDER BY score_total DESC
LIMIT ?
""",
(run_id, limit),
).fetchall()
return [
{**dict(r), "numbers": json.loads(r["numbers"])}
for r in rows
]

View File

@@ -1,100 +1,154 @@
"""
시뮬레이션 엔진 - lotto-lab 고도화
[몬테카를로 시뮬레이션 흐름]
1. 역대 당첨번호 기반 통계 캐시 구성 (build_analysis_cache)
2. 통계 가중치로 N개 후보 조합 생성 (weighted sampling)
3. 5가지 기법으로 각 후보 스코어링 (score_combination)
4. 상위 top_k개 선별하여 DB 저장 (simulation_candidates, best_picks 교체)
[시뮬레이션 파라미터]
- n_candidates: 1회 시뮬레이션당 생성 후보 수 (기본 20,000)
- top_k: 선별 및 저장할 상위 개수 (기본 100)
- best_n: best_picks에 올릴 최상위 개수 (기본 20)
"""
import random
import json
from typing import Dict, Any, List, Optional
from .db import _conn, save_recommendation_dedup, get_latest_draw, get_all_draw_numbers
from .recommender import recommend_numbers
from .utils import calc_metrics, calc_recent_overlap
from .db import (
get_latest_draw,
get_all_draw_numbers,
save_simulation_run,
save_simulation_candidates_bulk,
replace_best_picks,
)
from .analyzer import build_analysis_cache, build_number_weights, score_combination
# 순환 참조 방지를 위해 main.py의 calc_metrics 등을 utils.py가 아닌 여기서 재정의하거나
# main.py에서 generator를 import할 때 함수 내부에서 하도록 처리.
# 여기서는 코드가 중복되더라도 안전하게 독립적으로 구현하거나, db/collector만 import.
def _get_top_performing_params(limit: int = 20) -> List[Dict[str, Any]]:
def _weighted_sample_6(weights: Dict[int, float]) -> List[int]:
"""
최근 1~5등에 당첨된 추천들의 파라미터 조회
가중 확률 샘플링으로 중복 없이 6개 번호 추출.
weights: {1: w1, 2: w2, ..., 45: w45}
"""
sql = """
SELECT params
FROM recommendations
WHERE rank > 0 AND rank <= 5
ORDER BY id DESC
LIMIT ?
"""
with _conn() as conn:
rows = conn.execute(sql, (limit,)).fetchall()
return [json.loads(r["params"]) for r in rows]
pool = list(range(1, 46))
chosen: List[int] = []
for _ in range(6):
total = sum(weights[n] for n in pool)
r = random.random() * total
acc = 0.0
for n in pool:
acc += weights[n]
if acc >= r:
chosen.append(n)
pool.remove(n)
break
return chosen
def _perturb_param(val: float, delta: float, min_val: float, max_val: float, is_int: bool = False) -> float:
change = random.uniform(-delta, delta)
new_val = val + change
new_val = max(min_val, min(new_val, max_val))
return int(round(new_val)) if is_int else round(new_val, 2)
def generate_smart_recommendations(count: int = 10) -> int:
def run_simulation(
n_candidates: int = 20000,
top_k: int = 100,
best_n: int = 20,
) -> Dict[str, Any]:
"""
지능형 자동 생성: 과거 성적 우수 파라미터 기반으로 생성
몬테카를로 시뮬레이션 실행 메인 함수.
Args:
n_candidates: 생성할 후보 조합 수 (기본 20,000)
top_k: DB에 저장할 상위 후보 수 (기본 100)
best_n: best_picks에 올릴 최상위 수 (기본 20)
Returns:
{run_id, total_generated, top_k_selected, avg_score, best_score, based_on_draw}
또는 {"error": ...}
"""
draws = get_all_draw_numbers()
if not draws:
return 0
return {"error": "당첨번호 데이터가 없습니다. 먼저 동기화를 실행하세요."}
latest = get_latest_draw()
based_on = latest["drw_no"] if latest else None
# 1. 성공 사례 조회 (Feedback)
top_params = _get_top_performing_params()
generated_count = 0
for _ in range(count):
# 전략 선택: 이력이 있으면 70% 확률로 모방(Exploitation), 30%는 랜덤(Exploration)
use_history = (len(top_params) > 0) and (random.random() < 0.7)
if use_history:
# 과거 우수 파라미터 중 하나 선택하여 변형
base = random.choice(top_params)
# 파라미터 변형 (유전 알고리즘과 유사)
p_window = _perturb_param(base.get("recent_window", 200), 50, 10, 500, True)
p_weight = _perturb_param(base.get("recent_weight", 2.0), 1.0, 0.1, 10.0, False)
p_avoid = _perturb_param(base.get("avoid_recent_k", 5), 2, 0, 20, True)
# Constraints 로직은 복잡하니 일단 랜덤성 부여하거나 유지
# (여기서는 기본 파라미터 위주로 튜닝)
params = {
"recent_window": p_window,
"recent_weight": p_weight,
"avoid_recent_k": p_avoid,
"strategy": "smart_feedback"
}
else:
# 완전 랜덤 탐색
params = {
"recent_window": random.randint(50, 400),
"recent_weight": round(random.uniform(0.5, 5.0), 2),
"avoid_recent_k": random.randint(0, 10),
"strategy": "random_exploration"
}
# 생성 시도
try:
# recommend_numbers는 db.py/main.py 로직과 독립적이므로 여기서 사용 가능
# 단, recommend_numbers 함수가 어디 있는지 확인 (recommender.py)
res = recommend_numbers(
draws,
recent_window=params["recent_window"],
recent_weight=params["recent_weight"],
avoid_recent_k=params["avoid_recent_k"]
)
save_recommendation_dedup(based_on, res["numbers"], params)
generated_count += 1
except Exception as e:
print(f"Gen Error: {e}")
based_on_draw = latest["drw_no"] if latest else None
# ── 1. 통계 캐시 및 가중치 구성 (시뮬레이션 전체에서 재사용) ────────────
cache = build_analysis_cache(draws)
weights = build_number_weights(cache)
# ── 2. 후보 생성 및 스코어링 ──────────────────────────────────────────────
candidates: List[Dict[str, Any]] = []
seen_keys: set = set()
max_attempts = n_candidates * 3 # 중복 제거 여유분
attempts = 0
while len(candidates) < n_candidates and attempts < max_attempts:
attempts += 1
nums = _weighted_sample_6(weights)
key = tuple(sorted(nums))
if key in seen_keys:
continue
return generated_count
seen_keys.add(key)
scores = score_combination(nums, cache)
candidates.append({
"numbers": sorted(nums),
**scores,
})
# ── 3. 점수 내림차순 정렬 및 상위 선별 ──────────────────────────────────
candidates.sort(key=lambda x: -x["score_total"])
top_candidates = candidates[:top_k]
# is_best 플래그 표시
best_keys = {tuple(c["numbers"]) for c in top_candidates[:best_n]}
for c in top_candidates:
c["is_best"] = tuple(c["numbers"]) in best_keys
avg_score = (
sum(c["score_total"] for c in top_candidates) / len(top_candidates)
if top_candidates else 0.0
)
best_score = top_candidates[0]["score_total"] if top_candidates else 0.0
# ── 4. DB 저장 ────────────────────────────────────────────────────────────
run_id = save_simulation_run(
strategy="monte_carlo",
total_generated=len(candidates),
top_k_selected=len(top_candidates),
avg_score=avg_score,
notes=f"based_on_draw={based_on_draw}, history={len(draws)}",
)
# 상위 top_k개만 DB에 저장 (전체 20,000개는 메모리에서만 처리)
save_simulation_candidates_bulk(run_id, top_candidates, based_on_draw)
# best_picks 교체 (상위 best_n개)
best_picks_data = [
{
"numbers": c["numbers"],
"score_total": c["score_total"],
"rank_in_run": i + 1,
}
for i, c in enumerate(top_candidates[:best_n])
]
replace_best_picks(best_picks_data, run_id, based_on_draw)
return {
"run_id": run_id,
"total_generated": len(candidates),
"top_k_selected": len(top_candidates),
"best_n_saved": len(best_picks_data),
"avg_score": round(avg_score, 6),
"best_score": round(best_score, 6),
"based_on_draw": based_on_draw,
}
def generate_smart_recommendations(count: int = 10) -> int:
"""
하위 호환성 유지용 래퍼.
내부적으로 run_simulation을 호출하며, 기존 /api/admin/auto_gen 등에서 계속 사용 가능.
"""
result = run_simulation(n_candidates=5000, top_k=count, best_n=count)
if "error" in result:
return 0
return result.get("best_n_saved", 0)

View File

@@ -8,13 +8,15 @@ from .db import (
init_db, get_draw, get_latest_draw, get_all_draw_numbers,
save_recommendation_dedup, list_recommendations_ex, delete_recommendation,
update_recommendation,
# 시뮬레이션 관련
get_best_picks, get_simulation_runs, get_simulation_candidates,
)
from .recommender import recommend_numbers, recommend_with_heatmap
from .collector import sync_latest, sync_ensure_all
from .generator import generate_smart_recommendations
from .generator import generate_smart_recommendations
from .generator import run_simulation, generate_smart_recommendations
from .checker import check_results_for_draw
from .utils import calc_metrics, calc_recent_overlap
from .analyzer import get_statistical_report
app = FastAPI()
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
@@ -22,29 +24,35 @@ scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json")
LATEST_URL = os.getenv("LOTTO_LATEST_URL", "https://smok95.github.io/lotto/results/latest.json")
@app.on_event("startup")
def on_startup():
init_db()
# 1. 로또 당첨번호 동기화 (매일 9시, 21시 10분)
# 동기화 후 새로운 회차가 있으면 채점(check)까지 수행
def _sync_and_check():
res = sync_latest(LATEST_URL)
if res["was_new"]:
# 새로운 회차(예: 1000회)가 나오면, 999회차 기반 추천들을 채점
check_results_for_draw(res["drawNo"])
scheduler.add_job(_sync_and_check, "cron", hour="9,21", minute=10)
# 2. 매일 아침 8시: 지능형 자동 추천 (10개씩)
scheduler.add_job(lambda: generate_smart_recommendations(10), "cron", hour="8", minute=0)
# 2. 몬테카를로 시뮬레이션 (하루 6회: 0, 4, 8, 12, 16, 20시)
# 20,000개 후보 생성 → 스코어링 → 상위 100개 저장 → best_picks 교체
def _run_simulation_job():
run_simulation(n_candidates=20000, top_k=100, best_n=20)
scheduler.add_job(_run_simulation_job, "cron", hour="0,4,8,12,16,20", minute=5)
scheduler.start()
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/lotto/latest")
def api_latest():
row = get_latest_draw()
@@ -58,6 +66,7 @@ def api_latest():
"metrics": calc_metrics([row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]),
}
@app.get("/api/lotto/{drw_no:int}")
def api_draw(drw_no: int):
row = get_draw(drw_no)
@@ -71,67 +80,163 @@ def api_draw(drw_no: int):
"metrics": calc_metrics([row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]),
}
@app.post("/api/admin/sync_latest")
def admin_sync_latest():
res = sync_latest(LATEST_URL)
# 수동 동기화 시에도 신규 회차면 채점
if res["was_new"]:
check_results_for_draw(res["drawNo"])
return res
@app.post("/api/admin/auto_gen")
def admin_auto_gen(count: int = 10):
"""지능형 자동 생성 수동 트리거"""
"""기존 호환 유지: 소규모 시뮬레이션 수동 트리거"""
n = generate_smart_recommendations(count)
return {"generated": n}
@app.post("/api/admin/simulate")
def admin_simulate(n_candidates: int = 20000, top_k: int = 100, best_n: int = 20):
"""
몬테카를로 시뮬레이션 수동 트리거.
백그라운드 스케줄과 동일한 동작을 즉시 실행.
"""
result = run_simulation(
n_candidates=max(1000, min(n_candidates, 50000)),
top_k=max(10, min(top_k, 500)),
best_n=max(10, min(best_n, 50)),
)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.get("/api/lotto/stats")
def api_stats():
# 1. 데이터 완전성 보장 (없으면 가져옴)
sync_ensure_all(LATEST_URL, ALL_URL)
# 2. 전체 데이터 조회
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
# 1~45번 빈도 초기화
frequency = {n: 0 for n in range(1, 46)}
total_draws = len(draws)
for _, nums in draws:
for n in nums:
frequency[n] += 1
# 리스트 형태로 변환 (프론트엔드 차트용)
# x: 번호, y: 횟수
stats = [
{"number": n, "count": frequency[n]}
{"number": n, "count": frequency[n]}
for n in range(1, 46)
]
return {
"total_draws": total_draws,
"frequency": stats
"frequency": stats,
}
# ---------- ✅ recommend (dedup save) ----------
# ── 통계 분석 리포트 ────────────────────────────────────────────────────────
@app.get("/api/lotto/analysis")
def api_analysis():
"""
5가지 통계 기법 기반 분석 리포트.
- 번호별 빈도, Z-score, 갭
- 핫/콜드/오버듀 번호
- 역대 합계 분포, 홀짝 분포
"""
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
return get_statistical_report(draws)
# ── 시뮬레이션 best_picks (메인 추천 엔드포인트) ────────────────────────────
@app.get("/api/lotto/best")
def api_best_picks(limit: int = 20):
"""
시뮬레이션을 통해 선별된 최적 번호 조합 반환 (기본 20쌍).
하루 6회 시뮬레이션 후 자동 갱신됨.
각 조합에 점수 및 메트릭 포함.
"""
limit = max(1, min(limit, 50))
picks = get_best_picks(limit=limit)
if not picks:
raise HTTPException(
status_code=404,
detail="시뮬레이션 결과가 없습니다. /api/admin/simulate로 먼저 실행하세요.",
)
draws = get_all_draw_numbers()
result = []
for p in picks:
nums = p["numbers"]
result.append({
"rank": p["rank_in_run"],
"numbers": nums,
"score_total": p["score_total"],
"based_on_draw": p["based_on_draw"],
"simulation_run_id": p["source_run_id"],
"created_at": p["created_at"],
"metrics": calc_metrics(nums),
})
latest = get_latest_draw()
return {
"based_on_draw": latest["drw_no"] if latest else None,
"count": len(result),
"items": result,
}
# ── 시뮬레이션 전체 결과 조회 (상세 API) ────────────────────────────────────
@app.get("/api/lotto/simulation")
def api_simulation(run_id: Optional[int] = None, runs_limit: int = 5):
"""
시뮬레이션 실행 기록 및 상위 후보 상세 조회.
run_id 미지정 시: 최근 runs_limit개 실행 기록 + 가장 최근 run의 후보 반환.
run_id 지정 시: 해당 run의 후보만 반환.
"""
runs = get_simulation_runs(limit=runs_limit)
if not runs:
raise HTTPException(status_code=404, detail="시뮬레이션 기록이 없습니다.")
target_run_id = run_id if run_id is not None else runs[0]["id"]
candidates = get_simulation_candidates(target_run_id, limit=100)
# 후보에 메트릭 추가
enriched = []
for c in candidates:
enriched.append({
**c,
"metrics": calc_metrics(c["numbers"]),
})
return {
"runs": runs,
"selected_run_id": target_run_id,
"candidates_count": len(enriched),
"candidates": enriched,
}
# ── 기존 수동 추천 API (하위 호환 유지) ─────────────────────────────────────
@app.get("/api/lotto/recommend")
def api_recommend(
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
# ---- optional constraints (Lotto Lab) ----
sum_min: Optional[int] = None,
sum_max: Optional[int] = None,
odd_min: Optional[int] = None,
odd_max: Optional[int] = None,
range_min: Optional[int] = None,
range_max: Optional[int] = None,
max_overlap_latest: Optional[int] = None, # 최근 avoid_recent_k 회차와 중복 허용 개수
max_try: int = 200, # 조건 맞는 조합 찾기 재시도
max_overlap_latest: Optional[int] = None,
max_try: int = 200,
):
draws = get_all_draw_numbers()
if not draws:
@@ -143,7 +248,6 @@ def api_recommend(
"recent_window": recent_window,
"recent_weight": float(recent_weight),
"avoid_recent_k": avoid_recent_k,
"sum_min": sum_min,
"sum_max": sum_max,
"odd_min": odd_min,
@@ -168,7 +272,6 @@ def api_recommend(
return False
if range_max is not None and m["range"] > range_max:
return False
if max_overlap_latest is not None:
ov = calc_recent_overlap(nums, draws, last_k=avoid_recent_k)
if ov["repeats"] > max_overlap_latest:
@@ -196,11 +299,9 @@ def api_recommend(
if chosen is None:
raise HTTPException(
status_code=400,
detail=f"Constraints too strict. No valid set found in max_try={max_try}. "
f"Try relaxing sum/odd/range/overlap constraints.",
detail=f"Constraints too strict. No valid set found in max_try={max_try}.",
)
# ✅ dedup save
saved = save_recommendation_dedup(
latest["drw_no"] if latest else None,
chosen,
@@ -220,10 +321,11 @@ def api_recommend(
"params": params,
"metrics": metrics,
"recent_overlap": overlap,
"tries": tries,
"tries": tries,
}
# ---------- ✅ heatmap-based recommend ----------
# ── 히트맵 기반 추천 (하위 호환 유지) ────────────────────────────────────────
@app.get("/api/lotto/recommend/heatmap")
def api_recommend_heatmap(
heatmap_window: int = 20,
@@ -231,8 +333,6 @@ def api_recommend_heatmap(
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
# ---- optional constraints ----
sum_min: Optional[int] = None,
sum_max: Optional[int] = None,
odd_min: Optional[int] = None,
@@ -242,18 +342,13 @@ def api_recommend_heatmap(
max_overlap_latest: Optional[int] = None,
max_try: int = 200,
):
"""
히트맵 기반 추천: 과거 추천 번호들의 적중률을 분석하여 가중치 부여
"""
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
# 과거 추천 데이터 가져오기 (적중 결과가 있는 것만)
past_recs = list_recommendations_ex(limit=100, sort="id_desc")
latest = get_latest_draw()
params = {
"heatmap_window": heatmap_window,
"heatmap_weight": float(heatmap_weight),
@@ -269,7 +364,7 @@ def api_recommend_heatmap(
"max_overlap_latest": max_overlap_latest,
"max_try": int(max_try),
}
def _accept(nums: List[int]) -> bool:
m = calc_metrics(nums)
if sum_min is not None and m["sum"] < sum_min:
@@ -284,16 +379,15 @@ def api_recommend_heatmap(
return False
if range_max is not None and m["range"] > range_max:
return False
if max_overlap_latest is not None:
ov = calc_recent_overlap(nums, draws, last_k=avoid_recent_k)
if ov["repeats"] > max_overlap_latest:
return False
return True
chosen = None
explain = None
tries = 0
while tries < max_try:
tries += 1
@@ -311,23 +405,22 @@ def api_recommend_heatmap(
chosen = nums
explain = result["explain"]
break
if chosen is None:
raise HTTPException(
status_code=400,
detail=f"Constraints too strict. No valid set found in max_try={max_try}.",
)
# ✅ dedup save
saved = save_recommendation_dedup(
latest["drw_no"] if latest else None,
chosen,
params,
)
metrics = calc_metrics(chosen)
overlap = calc_recent_overlap(chosen, draws, last_k=avoid_recent_k)
return {
"id": saved["id"],
"saved": saved["saved"],
@@ -341,7 +434,8 @@ def api_recommend_heatmap(
"tries": tries,
}
# ---------- ✅ history list (filter/paging) ----------
# ── 추천 이력 ────────────────────────────────────────────────────────────────
@app.get("/api/history")
def api_history(
limit: int = 30,
@@ -380,6 +474,7 @@ def api_history(
"filters": {"favorite": favorite, "tag": tag, "q": q, "sort": sort},
}
@app.delete("/api/history/{rec_id:int}")
def api_history_delete(rec_id: int):
ok = delete_recommendation(rec_id)
@@ -387,12 +482,13 @@ def api_history_delete(rec_id: int):
raise HTTPException(status_code=404, detail="Not found")
return {"deleted": True, "id": rec_id}
# ---------- ✅ history update (favorite/note/tags) ----------
class HistoryUpdate(BaseModel):
favorite: Optional[bool] = None
note: Optional[str] = None
tags: Optional[List[str]] = None
@app.patch("/api/history/{rec_id:int}")
def api_history_patch(rec_id: int, body: HistoryUpdate):
ok = update_recommendation(rec_id, favorite=body.favorite, note=body.note, tags=body.tags)
@@ -400,11 +496,11 @@ def api_history_patch(rec_id: int, body: HistoryUpdate):
raise HTTPException(status_code=404, detail="Not found or no changes")
return {"updated": True, "id": rec_id}
# ---------- ✅ batch recommend ----------
# ── 배치 추천 (하위 호환 유지) ───────────────────────────────────────────────
def _batch_unique(draws, count: int, recent_window: int, recent_weight: float, avoid_recent_k: int, max_try: int = 200):
items = []
seen = set()
tries = 0
while len(items) < count and tries < max_try:
tries += 1
@@ -414,9 +510,9 @@ def _batch_unique(draws, count: int, recent_window: int, recent_weight: float, a
continue
seen.add(key)
items.append(r)
return items
@app.get("/api/lotto/recommend/batch")
def api_recommend_batch(
count: int = 5,
@@ -443,17 +539,19 @@ def api_recommend_batch(
"based_on_latest_draw": latest["drw_no"] if latest else None,
"count": count,
"items": [{
"numbers": it["numbers"],
"numbers": it["numbers"],
"explain": it["explain"],
"metrics": calc_metrics(it["numbers"]),
} for it in items],
"params": params,
}
class BatchSave(BaseModel):
items: List[List[int]]
params: dict
@app.post("/api/lotto/recommend/batch")
def api_recommend_batch_save(body: BatchSave):
latest = get_latest_draw()
@@ -466,7 +564,7 @@ def api_recommend_batch_save(body: BatchSave):
return {"saved": True, "created_ids": created, "deduped_ids": deduped}
@app.get("/api/version")
def version():
import os
return {"version": os.getenv("APP_VERSION", "dev")}