diff --git a/backend/app/analyzer.py b/backend/app/analyzer.py new file mode 100644 index 0000000..ab987a8 --- /dev/null +++ b/backend/app/analyzer.py @@ -0,0 +1,354 @@ +""" +통계 분석 엔진 - lotto-lab 고도화 + +[팀 회의 합의 기반 5가지 통계 기법] +1. 빈도 Z-score 분석: 각 번호의 출현 빈도가 기댓값에서 얼마나 벗어났는지 +2. 조합 지문(Fingerprint): 조합의 합계, 홀짝 비율, 구간 분포가 역대 당첨번호와 유사한지 +3. 갭 분석(Gap): 각 번호의 마지막 출현으로부터 경과 회차 수 기반 점수 +4. 공동 출현 행렬(Co-occurrence): 번호 쌍이 역대에 함께 나온 빈도 기반 점수 +5. 다양성(Diversity): 연속 번호, 범위, 구간 분포 다양성 + +[통계 근거] +- 1~45번 각각의 이론적 출현 확률: 6/45 ≈ 13.33% per draw +- 기댓값 합계: E[sum] = 6 × E[1..45] = 6 × 23 = 138 +- 표준편차 합계: std ≈ sqrt(6 × Var[uniform 1..45]) ≈ 31 +- 홀수 23개 (1,3,...,45), 짝수 22개 (2,4,...,44) +- 번호 쌍 공동 출현 확률: C(43,4)/C(45,6) ≈ 1.516% per draw +""" + +import math +from collections import Counter, defaultdict +from typing import List, Tuple, Dict, Any, Optional + +# 구간 정의: (시작, 끝) 포함 +ZONE_RANGES: List[Tuple[int, int]] = [ + (1, 9), + (10, 19), + (20, 29), + (30, 39), + (40, 45), +] + + +def _get_zone(n: int) -> int: + """번호가 속하는 구간 인덱스 (0-4)""" + for z, (lo, hi) in enumerate(ZONE_RANGES): + if lo <= n <= hi: + return z + return 4 + + +def build_analysis_cache(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]: + """ + 역대 당첨번호 데이터 기반 통계 분석 캐시 구성. + 시뮬레이션 실행 시 한 번만 호출하여 재사용 (성능 최적화). + + Args: + draws: [(drw_no, [n1,n2,n3,n4,n5,n6]), ...] 오름차순 + + Returns: + 통계 캐시 딕셔너리 + """ + if not draws: + return {} + + total_draws = len(draws) + all_nums_list = [n for _, nums in draws for n in nums] + freq_all = Counter(all_nums_list) + + # ── 1. 빈도 Z-score ────────────────────────────────────────────────────── + freq_values = [freq_all.get(n, 0) for n in range(1, 46)] + mean_freq = sum(freq_values) / 45.0 + variance_freq = sum((f - mean_freq) ** 2 for f in freq_values) / 45.0 + std_freq = math.sqrt(variance_freq) + + z_scores: Dict[int, float] = {} + for n in range(1, 46): + z_scores[n] = (freq_all.get(n, 0) - mean_freq) / max(std_freq, 0.001) + + # ── 2. 갭 분석: 마지막 출현 이후 경과 회차 ────────────────────────────── + # gap = 0: 가장 최근 회차에 출현, gap = k: k회 전에 마지막 출현 + last_seen_gap: Dict[int, int] = {} + for gap_idx, (_, nums) in enumerate(reversed(draws)): + for n in nums: + if n not in last_seen_gap: + last_seen_gap[n] = gap_idx + for n in range(1, 46): + if n not in last_seen_gap: + last_seen_gap[n] = total_draws # 한 번도 안 나옴 (이론상 거의 불가) + + # ── 3. 공동 출현 행렬 ──────────────────────────────────────────────────── + # cooccur[(i,j)] = 번호 i와 j가 같은 회차에 함께 출현한 횟수 (i < j) + cooccur: Dict[Tuple[int, int], int] = defaultdict(int) + for _, nums in draws: + s = sorted(nums) + for i in range(len(s)): + for j in range(i + 1, len(s)): + cooccur[(s[i], s[j])] += 1 + + # 번호 쌍 공동 출현 기댓값: C(43,4)/C(45,6) × total_draws + # C(43,4) = 123,410 / C(45,6) = 8,145,060 + expected_cooccur = total_draws * 123410.0 / 8145060.0 + + # ── 4. 역대 조합 통계 (합계, 홀수 개수) ────────────────────────────────── + historical_sums = [sum(nums) for _, nums in draws] + mean_sum = sum(historical_sums) / total_draws + std_sum = math.sqrt( + sum((s - mean_sum) ** 2 for s in historical_sums) / total_draws + ) + std_sum = max(std_sum, 1.0) # 0 나누기 방지 + + historical_odds = [sum(1 for n in nums if n % 2 == 1) for _, nums in draws] + odd_dist = Counter(historical_odds) + odd_prob: Dict[int, float] = {k: v / total_draws for k, v in odd_dist.items()} + max_odd_prob = max(odd_prob.values()) if odd_prob else 1.0 + + # ── 5. 구간별 분포 통계 ─────────────────────────────────────────────────── + # 각 구간에 몇 개 포함되는지의 역대 분포 + zone_counts = [Counter() for _ in ZONE_RANGES] + for _, nums in draws: + for z_idx, (lo, hi) in enumerate(ZONE_RANGES): + cnt = sum(1 for n in nums if lo <= n <= hi) + zone_counts[z_idx][cnt] += 1 + + zone_probs: List[Dict[int, float]] = [] + for zc in zone_counts: + total_z = sum(zc.values()) + zone_probs.append({k: v / total_z for k, v in zc.items()}) + + max_zone_probs = [max(zp.values()) if zp else 1.0 for zp in zone_probs] + + # ── 6. 최근 빈도 (후보 생성 가중치용) ──────────────────────────────────── + recent_100 = draws[-100:] if len(draws) >= 100 else draws + freq_recent = Counter(n for _, nums in recent_100 for n in nums) + + return { + "total_draws": total_draws, + "freq_all": freq_all, + "z_scores": z_scores, + "last_seen_gap": last_seen_gap, + "cooccur": dict(cooccur), + "expected_cooccur": expected_cooccur, + "mean_sum": mean_sum, + "std_sum": std_sum, + "odd_prob": odd_prob, + "max_odd_prob": max_odd_prob, + "zone_probs": zone_probs, + "max_zone_probs": max_zone_probs, + "freq_recent": freq_recent, + } + + +def build_number_weights(cache: Dict[str, Any]) -> Dict[int, float]: + """ + 몬테카를로 시뮬레이션의 후보 생성에 사용할 번호별 샘플링 가중치. + 빈도 + 최근 빈도 + 갭 분석을 반영하여 '좋은' 번호가 더 자주 선택되도록 유도. + """ + freq_all = cache["freq_all"] + last_seen_gap = cache["last_seen_gap"] + freq_recent = cache["freq_recent"] + + weights: Dict[int, float] = {} + for n in range(1, 46): + w = freq_all.get(n, 0) + 1.5 * freq_recent.get(n, 0) + + gap = last_seen_gap.get(n, 0) + if gap <= 1: + gap_factor = 0.50 # 바로 직전 등장 → 패널티 + elif gap <= 3: + gap_factor = 0.75 + elif gap <= 12: + gap_factor = 1.00 # 적정 범위 + elif gap <= 25: + gap_factor = 1.10 # 약간 오래된 번호 → 소폭 보너스 + else: + gap_factor = 1.20 # 오래된 번호 → 보너스 + + weights[n] = max(w * gap_factor, 0.5) + + return weights + + +def score_combination(numbers: List[int], cache: Dict[str, Any]) -> Dict[str, float]: + """ + 6개 번호 조합의 통계적 품질 점수 계산 (0~1 범위 정규화). + + 5가지 기법별 점수: + - score_frequency (25%): 빈도 Z-score + - score_fingerprint(30%): 조합의 통계적 지문 (합계, 홀짝, 구간) + - score_gap (20%): 갭 분석 + - score_cooccur (15%): 공동 출현 기댓값 대비 + - score_diversity (10%): 연속번호, 범위, 구간 다양성 + + Returns: + {"score_total": ..., "score_frequency": ..., ...} + """ + nums = sorted(numbers) + + # ── 1. 빈도 점수 (Frequency Score) ──────────────────────────────────────── + z_scores = cache["z_scores"] + avg_z = sum(z_scores.get(n, 0.0) for n in nums) / 6.0 + # Sigmoid 정규화: avg_z > 0이면 0.5 이상 + score_frequency = 1.0 / (1.0 + math.exp(-avg_z / 1.5)) + + # ── 2. 조합 지문 점수 (Fingerprint Score) ───────────────────────────────── + # 2a. 합계 정규분포 점수 + total = sum(nums) + mean_sum = cache["mean_sum"] + std_sum = cache["std_sum"] + z_sum = (total - mean_sum) / std_sum + sum_score = math.exp(-0.5 * z_sum ** 2) # 정규분포 밀도 (peak=1 at mean) + + # 2b. 홀짝 비율 점수 + odd_count = sum(1 for n in nums if n % 2 == 1) + odd_prob = cache["odd_prob"] + max_odd_prob = cache["max_odd_prob"] + odd_score = odd_prob.get(odd_count, 0.01) / max_odd_prob + + # 2c. 구간 분포 점수 + zone_probs = cache["zone_probs"] + max_zone_probs = cache["max_zone_probs"] + zone_score = 0.0 + for z_idx, (lo, hi) in enumerate(ZONE_RANGES): + cnt = sum(1 for n in nums if lo <= n <= hi) + zp = zone_probs[z_idx] + mzp = max_zone_probs[z_idx] + zone_score += zp.get(cnt, 0.01) / mzp + zone_score /= len(ZONE_RANGES) + + score_fingerprint = sum_score * 0.50 + odd_score * 0.30 + zone_score * 0.20 + + # ── 3. 갭 점수 (Gap Score) ──────────────────────────────────────────────── + last_seen_gap = cache["last_seen_gap"] + gap_scores: List[float] = [] + for n in nums: + gap = last_seen_gap.get(n, 0) + if gap <= 1: + gs = 0.20 # 직전 등장 번호 - 강한 패널티 + elif gap <= 3: + gs = 0.55 + elif gap <= 7: + gs = 0.85 + elif gap <= 15: + gs = 1.00 # 최적 범위 + elif gap <= 25: + gs = 0.90 + else: + gs = 0.75 # 오래된 번호 - 여전히 양호 + gap_scores.append(gs) + score_gap = sum(gap_scores) / 6.0 + + # ── 4. 공동 출현 점수 (Co-occurrence Score) ─────────────────────────────── + cooccur = cache["cooccur"] + expected_cooccur = cache["expected_cooccur"] + + pair_scores: List[float] = [] + for i in range(len(nums)): + for j in range(i + 1, len(nums)): + actual = cooccur.get((nums[i], nums[j]), 0) + ratio = actual / max(expected_cooccur, 0.001) + # Sigmoid: ratio = 1에서 0.5, ratio > 1이면 > 0.5 + ps = 1.0 / (1.0 + math.exp(-2.0 * (ratio - 1.0))) + pair_scores.append(ps) + score_cooccur = sum(pair_scores) / max(len(pair_scores), 1) + + # ── 5. 다양성 점수 (Diversity Score) ───────────────────────────────────── + # 5a. 연속 번호 포함 여부 (역대 당첨번호 약 52%에 최소 1쌍 포함) + has_consecutive = any(nums[i + 1] - nums[i] == 1 for i in range(len(nums) - 1)) + consecutive_score = 0.65 if has_consecutive else 0.40 + + # 5b. 범위 점수 (최소~최대 차이) + num_range = nums[-1] - nums[0] + if 28 <= num_range <= 43: + spread_score = 1.00 + elif 20 <= num_range < 28: + spread_score = 0.85 + elif 13 <= num_range < 20: + spread_score = 0.65 + elif num_range < 13: + spread_score = 0.25 + else: # > 43 (최대 44: 1~45) + spread_score = 0.95 + + # 5c. 구간 커버리지 (몇 개 구간에 걸쳐 있는가) + zones_used = set(_get_zone(n) for n in nums) + zone_coverage = (len(zones_used) - 1) / 4.0 # 0~1 + + score_diversity = ( + consecutive_score * 0.35 + + spread_score * 0.35 + + zone_coverage * 0.30 + ) + + # ── 최종 가중 합산 ──────────────────────────────────────────────────────── + score_total = ( + score_frequency * 0.25 + + score_fingerprint * 0.30 + + score_gap * 0.20 + + score_cooccur * 0.15 + + score_diversity * 0.10 + ) + + return { + "score_total": round(score_total, 6), + "score_frequency": round(score_frequency, 6), + "score_fingerprint": round(score_fingerprint, 6), + "score_gap": round(score_gap, 6), + "score_cooccur": round(score_cooccur, 6), + "score_diversity": round(score_diversity, 6), + } + + +def get_statistical_report(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]: + """ + 통계 분석 리포트 생성 (GET /api/lotto/analysis 응답용). + 각 번호의 빈도, Z-score, 갭, 히트/콜드/오버듀 분류를 반환. + """ + if not draws: + return {"error": "데이터 없음"} + + cache = build_analysis_cache(draws) + total_draws = cache["total_draws"] + freq_all = cache["freq_all"] + z_scores = cache["z_scores"] + last_seen_gap = cache["last_seen_gap"] + + number_stats = [] + for n in range(1, 46): + freq = freq_all.get(n, 0) + expected = total_draws * 6.0 / 45.0 + number_stats.append({ + "number": n, + "frequency": freq, + "expected": round(expected, 1), + "frequency_pct": round(freq / (total_draws * 6) * 100, 2), + "z_score": round(z_scores.get(n, 0.0), 3), + "gap": last_seen_gap.get(n, total_draws), + "zone": _get_zone(n), + }) + + sorted_by_freq = sorted(number_stats, key=lambda x: -x["frequency"]) + sorted_by_gap = sorted(number_stats, key=lambda x: -x["gap"]) + + # 역대 합계 분포 요약 + hist_sums = [sum(nums) for _, nums in draws] + sum_buckets: Dict[str, int] = {} + for lo in range(21, 256, 20): + hi = lo + 19 + key = f"{lo}-{hi}" + sum_buckets[key] = sum(1 for s in hist_sums if lo <= s <= hi) + + return { + "total_draws": total_draws, + "mean_sum": round(cache["mean_sum"], 2), + "std_sum": round(cache["std_sum"], 2), + "odd_distribution": { + str(k): round(v * 100, 1) + for k, v in sorted(cache["odd_prob"].items()) + }, + "number_stats": number_stats, + "hot_numbers": [x["number"] for x in sorted_by_freq[:10]], + "cold_numbers": [x["number"] for x in sorted_by_freq[-10:]], + "overdue_numbers": [x["number"] for x in sorted_by_gap[:10]], + "sum_distribution": sum_buckets, + } diff --git a/backend/app/db.py b/backend/app/db.py index ed510d8..3a306d9 100644 --- a/backend/app/db.py +++ b/backend/app/db.py @@ -77,6 +77,72 @@ def init_db() -> None: # ✅ UNIQUE 인덱스(중복 저장 방지) conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_reco_dedup ON recommendations(dedup_hash);") + # ── 시뮬레이션 테이블 ───────────────────────────────────────────────── + conn.execute( + """ + CREATE TABLE IF NOT EXISTS simulation_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_at TEXT NOT NULL DEFAULT (datetime('now')), + strategy TEXT NOT NULL DEFAULT 'monte_carlo', + total_generated INTEGER NOT NULL DEFAULT 0, + top_k_selected INTEGER NOT NULL DEFAULT 0, + avg_score REAL, + notes TEXT DEFAULT '' + ); + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_simrun_at ON simulation_runs(run_at DESC);" + ) + + conn.execute( + """ + CREATE TABLE IF NOT EXISTS simulation_candidates ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id INTEGER NOT NULL, + numbers TEXT NOT NULL, + score_total REAL NOT NULL, + score_frequency REAL, + score_fingerprint REAL, + score_gap REAL, + score_cooccur REAL, + score_diversity REAL, + is_best INTEGER DEFAULT 0, + based_on_draw INTEGER, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + FOREIGN KEY(run_id) REFERENCES simulation_runs(id) + ); + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_simcand_run " + "ON simulation_candidates(run_id, score_total DESC);" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_simcand_best " + "ON simulation_candidates(is_best, score_total DESC);" + ) + + conn.execute( + """ + CREATE TABLE IF NOT EXISTS best_picks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + numbers TEXT NOT NULL, + score_total REAL NOT NULL, + rank_in_run INTEGER, + source_run_id INTEGER, + based_on_draw INTEGER, + is_active INTEGER DEFAULT 1, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + FOREIGN KEY(source_run_id) REFERENCES simulation_runs(id) + ); + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_bestpicks_active " + "ON best_picks(is_active, score_total DESC);" + ) + def upsert_draw(row: Dict[str, Any]) -> None: with _conn() as conn: conn.execute( @@ -276,11 +342,160 @@ def update_recommendation_result(rec_id: int, rank: int, correct_count: int, has with _conn() as conn: cur = conn.execute( """ - UPDATE recommendations - SET rank = ?, correct_count = ?, has_bonus = ?, checked = 1 + UPDATE recommendations + SET rank = ?, correct_count = ?, has_bonus = ?, checked = 1 WHERE id = ? """, (rank, correct_count, 1 if has_bonus else 0, rec_id) ) return cur.rowcount > 0 + +# ── 시뮬레이션 CRUD ───────────────────────────────────────────────────────── + +def save_simulation_run( + strategy: str, + total_generated: int, + top_k_selected: int, + avg_score: float, + notes: str = "", +) -> int: + """시뮬레이션 실행 기록 저장, 생성된 ID 반환""" + with _conn() as conn: + cur = conn.execute( + """ + INSERT INTO simulation_runs (strategy, total_generated, top_k_selected, avg_score, notes) + VALUES (?, ?, ?, ?, ?) + """, + (strategy, total_generated, top_k_selected, round(avg_score, 6), notes), + ) + return int(cur.lastrowid) + + +def save_simulation_candidates_bulk( + run_id: int, + candidates: List[Dict[str, Any]], + based_on_draw: Optional[int], +) -> None: + """ + 상위 후보들을 simulation_candidates 테이블에 일괄 저장. + candidates 각 항목: {"numbers": [...], "score_total": ..., "score_*": ..., "is_best": bool} + """ + data = [ + ( + run_id, + json.dumps(sorted(c["numbers"])), + c["score_total"], + c.get("score_frequency"), + c.get("score_fingerprint"), + c.get("score_gap"), + c.get("score_cooccur"), + c.get("score_diversity"), + 1 if c.get("is_best") else 0, + based_on_draw, + ) + for c in candidates + ] + with _conn() as conn: + conn.executemany( + """ + INSERT INTO simulation_candidates + (run_id, numbers, score_total, score_frequency, score_fingerprint, + score_gap, score_cooccur, score_diversity, is_best, based_on_draw) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + data, + ) + + +def replace_best_picks( + picks: List[Dict[str, Any]], + run_id: int, + based_on_draw: Optional[int], +) -> None: + """ + 기존 활성 best_picks를 비활성화하고 새 picks로 교체. + picks 각 항목: {"numbers": [...], "score_total": ..., "rank_in_run": int} + """ + with _conn() as conn: + conn.execute("UPDATE best_picks SET is_active = 0 WHERE is_active = 1") + data = [ + ( + json.dumps(sorted(p["numbers"])), + p["score_total"], + p.get("rank_in_run"), + run_id, + based_on_draw, + ) + for p in picks + ] + conn.executemany( + """ + INSERT INTO best_picks (numbers, score_total, rank_in_run, source_run_id, based_on_draw, is_active) + VALUES (?, ?, ?, ?, ?, 1) + """, + data, + ) + + +def get_best_picks(limit: int = 20) -> List[Dict[str, Any]]: + """현재 활성화된 best_picks 조회 (점수 내림차순)""" + with _conn() as conn: + rows = conn.execute( + """ + SELECT id, numbers, score_total, rank_in_run, source_run_id, based_on_draw, created_at + FROM best_picks + WHERE is_active = 1 + ORDER BY score_total DESC + LIMIT ? + """, + (limit,), + ).fetchall() + return [ + { + "id": int(r["id"]), + "numbers": json.loads(r["numbers"]), + "score_total": r["score_total"], + "rank_in_run": r["rank_in_run"], + "source_run_id": r["source_run_id"], + "based_on_draw": r["based_on_draw"], + "created_at": r["created_at"], + } + for r in rows + ] + + +def get_simulation_runs(limit: int = 10) -> List[Dict[str, Any]]: + """최근 시뮬레이션 실행 기록 조회""" + with _conn() as conn: + rows = conn.execute( + """ + SELECT id, run_at, strategy, total_generated, top_k_selected, avg_score, notes + FROM simulation_runs + ORDER BY id DESC + LIMIT ? + """, + (limit,), + ).fetchall() + return [dict(r) for r in rows] + + +def get_simulation_candidates(run_id: int, limit: int = 100) -> List[Dict[str, Any]]: + """특정 시뮬레이션 실행의 후보 목록 조회 (점수 내림차순)""" + with _conn() as conn: + rows = conn.execute( + """ + SELECT id, numbers, score_total, score_frequency, score_fingerprint, + score_gap, score_cooccur, score_diversity, is_best, based_on_draw, created_at + FROM simulation_candidates + WHERE run_id = ? + ORDER BY score_total DESC + LIMIT ? + """, + (run_id, limit), + ).fetchall() + return [ + {**dict(r), "numbers": json.loads(r["numbers"])} + for r in rows + ] + diff --git a/backend/app/generator.py b/backend/app/generator.py index bf14c93..c0dffde 100644 --- a/backend/app/generator.py +++ b/backend/app/generator.py @@ -1,100 +1,154 @@ +""" +시뮬레이션 엔진 - lotto-lab 고도화 + +[몬테카를로 시뮬레이션 흐름] +1. 역대 당첨번호 기반 통계 캐시 구성 (build_analysis_cache) +2. 통계 가중치로 N개 후보 조합 생성 (weighted sampling) +3. 5가지 기법으로 각 후보 스코어링 (score_combination) +4. 상위 top_k개 선별하여 DB 저장 (simulation_candidates, best_picks 교체) + +[시뮬레이션 파라미터] +- n_candidates: 1회 시뮬레이션당 생성 후보 수 (기본 20,000) +- top_k: 선별 및 저장할 상위 개수 (기본 100) +- best_n: best_picks에 올릴 최상위 개수 (기본 20) +""" + import random -import json from typing import Dict, Any, List, Optional -from .db import _conn, save_recommendation_dedup, get_latest_draw, get_all_draw_numbers -from .recommender import recommend_numbers -from .utils import calc_metrics, calc_recent_overlap +from .db import ( + get_latest_draw, + get_all_draw_numbers, + save_simulation_run, + save_simulation_candidates_bulk, + replace_best_picks, +) +from .analyzer import build_analysis_cache, build_number_weights, score_combination -# 순환 참조 방지를 위해 main.py의 calc_metrics 등을 utils.py가 아닌 여기서 재정의하거나 -# main.py에서 generator를 import할 때 함수 내부에서 하도록 처리. -# 여기서는 코드가 중복되더라도 안전하게 독립적으로 구현하거나, db/collector만 import. -def _get_top_performing_params(limit: int = 20) -> List[Dict[str, Any]]: +def _weighted_sample_6(weights: Dict[int, float]) -> List[int]: """ - 최근 1~5등에 당첨된 추천들의 파라미터 조회 + 가중 확률 샘플링으로 중복 없이 6개 번호 추출. + weights: {1: w1, 2: w2, ..., 45: w45} """ - sql = """ - SELECT params - FROM recommendations - WHERE rank > 0 AND rank <= 5 - ORDER BY id DESC - LIMIT ? - """ - with _conn() as conn: - rows = conn.execute(sql, (limit,)).fetchall() - - return [json.loads(r["params"]) for r in rows] + pool = list(range(1, 46)) + chosen: List[int] = [] + for _ in range(6): + total = sum(weights[n] for n in pool) + r = random.random() * total + acc = 0.0 + for n in pool: + acc += weights[n] + if acc >= r: + chosen.append(n) + pool.remove(n) + break + return chosen -def _perturb_param(val: float, delta: float, min_val: float, max_val: float, is_int: bool = False) -> float: - change = random.uniform(-delta, delta) - new_val = val + change - new_val = max(min_val, min(new_val, max_val)) - return int(round(new_val)) if is_int else round(new_val, 2) -def generate_smart_recommendations(count: int = 10) -> int: +def run_simulation( + n_candidates: int = 20000, + top_k: int = 100, + best_n: int = 20, +) -> Dict[str, Any]: """ - 지능형 자동 생성: 과거 성적 우수 파라미터 기반으로 생성 + 몬테카를로 시뮬레이션 실행 메인 함수. + + Args: + n_candidates: 생성할 후보 조합 수 (기본 20,000) + top_k: DB에 저장할 상위 후보 수 (기본 100) + best_n: best_picks에 올릴 최상위 수 (기본 20) + + Returns: + {run_id, total_generated, top_k_selected, avg_score, best_score, based_on_draw} + 또는 {"error": ...} """ draws = get_all_draw_numbers() if not draws: - return 0 - + return {"error": "당첨번호 데이터가 없습니다. 먼저 동기화를 실행하세요."} + latest = get_latest_draw() - based_on = latest["drw_no"] if latest else None - - # 1. 성공 사례 조회 (Feedback) - top_params = _get_top_performing_params() - - generated_count = 0 - - for _ in range(count): - # 전략 선택: 이력이 있으면 70% 확률로 모방(Exploitation), 30%는 랜덤(Exploration) - use_history = (len(top_params) > 0) and (random.random() < 0.7) - - if use_history: - # 과거 우수 파라미터 중 하나 선택하여 변형 - base = random.choice(top_params) - - # 파라미터 변형 (유전 알고리즘과 유사) - p_window = _perturb_param(base.get("recent_window", 200), 50, 10, 500, True) - p_weight = _perturb_param(base.get("recent_weight", 2.0), 1.0, 0.1, 10.0, False) - p_avoid = _perturb_param(base.get("avoid_recent_k", 5), 2, 0, 20, True) - - # Constraints 로직은 복잡하니 일단 랜덤성 부여하거나 유지 - # (여기서는 기본 파라미터 위주로 튜닝) - - params = { - "recent_window": p_window, - "recent_weight": p_weight, - "avoid_recent_k": p_avoid, - "strategy": "smart_feedback" - } - else: - # 완전 랜덤 탐색 - params = { - "recent_window": random.randint(50, 400), - "recent_weight": round(random.uniform(0.5, 5.0), 2), - "avoid_recent_k": random.randint(0, 10), - "strategy": "random_exploration" - } - - # 생성 시도 - try: - # recommend_numbers는 db.py/main.py 로직과 독립적이므로 여기서 사용 가능 - # 단, recommend_numbers 함수가 어디 있는지 확인 (recommender.py) - res = recommend_numbers( - draws, - recent_window=params["recent_window"], - recent_weight=params["recent_weight"], - avoid_recent_k=params["avoid_recent_k"] - ) - - save_recommendation_dedup(based_on, res["numbers"], params) - generated_count += 1 - - except Exception as e: - print(f"Gen Error: {e}") + based_on_draw = latest["drw_no"] if latest else None + + # ── 1. 통계 캐시 및 가중치 구성 (시뮬레이션 전체에서 재사용) ──────────── + cache = build_analysis_cache(draws) + weights = build_number_weights(cache) + + # ── 2. 후보 생성 및 스코어링 ────────────────────────────────────────────── + candidates: List[Dict[str, Any]] = [] + seen_keys: set = set() + max_attempts = n_candidates * 3 # 중복 제거 여유분 + + attempts = 0 + while len(candidates) < n_candidates and attempts < max_attempts: + attempts += 1 + nums = _weighted_sample_6(weights) + key = tuple(sorted(nums)) + if key in seen_keys: continue - - return generated_count + seen_keys.add(key) + + scores = score_combination(nums, cache) + candidates.append({ + "numbers": sorted(nums), + **scores, + }) + + # ── 3. 점수 내림차순 정렬 및 상위 선별 ────────────────────────────────── + candidates.sort(key=lambda x: -x["score_total"]) + top_candidates = candidates[:top_k] + + # is_best 플래그 표시 + best_keys = {tuple(c["numbers"]) for c in top_candidates[:best_n]} + for c in top_candidates: + c["is_best"] = tuple(c["numbers"]) in best_keys + + avg_score = ( + sum(c["score_total"] for c in top_candidates) / len(top_candidates) + if top_candidates else 0.0 + ) + best_score = top_candidates[0]["score_total"] if top_candidates else 0.0 + + # ── 4. DB 저장 ──────────────────────────────────────────────────────────── + run_id = save_simulation_run( + strategy="monte_carlo", + total_generated=len(candidates), + top_k_selected=len(top_candidates), + avg_score=avg_score, + notes=f"based_on_draw={based_on_draw}, history={len(draws)}회", + ) + + # 상위 top_k개만 DB에 저장 (전체 20,000개는 메모리에서만 처리) + save_simulation_candidates_bulk(run_id, top_candidates, based_on_draw) + + # best_picks 교체 (상위 best_n개) + best_picks_data = [ + { + "numbers": c["numbers"], + "score_total": c["score_total"], + "rank_in_run": i + 1, + } + for i, c in enumerate(top_candidates[:best_n]) + ] + replace_best_picks(best_picks_data, run_id, based_on_draw) + + return { + "run_id": run_id, + "total_generated": len(candidates), + "top_k_selected": len(top_candidates), + "best_n_saved": len(best_picks_data), + "avg_score": round(avg_score, 6), + "best_score": round(best_score, 6), + "based_on_draw": based_on_draw, + } + + +def generate_smart_recommendations(count: int = 10) -> int: + """ + 하위 호환성 유지용 래퍼. + 내부적으로 run_simulation을 호출하며, 기존 /api/admin/auto_gen 등에서 계속 사용 가능. + """ + result = run_simulation(n_candidates=5000, top_k=count, best_n=count) + if "error" in result: + return 0 + return result.get("best_n_saved", 0) diff --git a/backend/app/main.py b/backend/app/main.py index f75be09..86ad549 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,13 +8,15 @@ from .db import ( init_db, get_draw, get_latest_draw, get_all_draw_numbers, save_recommendation_dedup, list_recommendations_ex, delete_recommendation, update_recommendation, + # 시뮬레이션 관련 + get_best_picks, get_simulation_runs, get_simulation_candidates, ) from .recommender import recommend_numbers, recommend_with_heatmap from .collector import sync_latest, sync_ensure_all -from .generator import generate_smart_recommendations -from .generator import generate_smart_recommendations +from .generator import run_simulation, generate_smart_recommendations from .checker import check_results_for_draw from .utils import calc_metrics, calc_recent_overlap +from .analyzer import get_statistical_report app = FastAPI() scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul")) @@ -22,29 +24,35 @@ scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul")) ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json") LATEST_URL = os.getenv("LOTTO_LATEST_URL", "https://smok95.github.io/lotto/results/latest.json") + @app.on_event("startup") def on_startup(): init_db() - + # 1. 로또 당첨번호 동기화 (매일 9시, 21시 10분) # 동기화 후 새로운 회차가 있으면 채점(check)까지 수행 def _sync_and_check(): res = sync_latest(LATEST_URL) if res["was_new"]: - # 새로운 회차(예: 1000회)가 나오면, 999회차 기반 추천들을 채점 check_results_for_draw(res["drawNo"]) scheduler.add_job(_sync_and_check, "cron", hour="9,21", minute=10) - - # 2. 매일 아침 8시: 지능형 자동 추천 (10개씩) - scheduler.add_job(lambda: generate_smart_recommendations(10), "cron", hour="8", minute=0) - + + # 2. 몬테카를로 시뮬레이션 (하루 6회: 0, 4, 8, 12, 16, 20시) + # 20,000개 후보 생성 → 스코어링 → 상위 100개 저장 → best_picks 교체 + def _run_simulation_job(): + run_simulation(n_candidates=20000, top_k=100, best_n=20) + + scheduler.add_job(_run_simulation_job, "cron", hour="0,4,8,12,16,20", minute=5) + scheduler.start() + @app.get("/health") def health(): return {"ok": True} + @app.get("/api/lotto/latest") def api_latest(): row = get_latest_draw() @@ -58,6 +66,7 @@ def api_latest(): "metrics": calc_metrics([row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]), } + @app.get("/api/lotto/{drw_no:int}") def api_draw(drw_no: int): row = get_draw(drw_no) @@ -71,67 +80,163 @@ def api_draw(drw_no: int): "metrics": calc_metrics([row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]), } + @app.post("/api/admin/sync_latest") def admin_sync_latest(): res = sync_latest(LATEST_URL) - # 수동 동기화 시에도 신규 회차면 채점 if res["was_new"]: check_results_for_draw(res["drawNo"]) return res + @app.post("/api/admin/auto_gen") def admin_auto_gen(count: int = 10): - """지능형 자동 생성 수동 트리거""" + """기존 호환 유지: 소규모 시뮬레이션 수동 트리거""" n = generate_smart_recommendations(count) return {"generated": n} + +@app.post("/api/admin/simulate") +def admin_simulate(n_candidates: int = 20000, top_k: int = 100, best_n: int = 20): + """ + 몬테카를로 시뮬레이션 수동 트리거. + 백그라운드 스케줄과 동일한 동작을 즉시 실행. + """ + result = run_simulation( + n_candidates=max(1000, min(n_candidates, 50000)), + top_k=max(10, min(top_k, 500)), + best_n=max(10, min(best_n, 50)), + ) + if "error" in result: + raise HTTPException(status_code=500, detail=result["error"]) + return result + + @app.get("/api/lotto/stats") def api_stats(): - # 1. 데이터 완전성 보장 (없으면 가져옴) sync_ensure_all(LATEST_URL, ALL_URL) - - # 2. 전체 데이터 조회 + draws = get_all_draw_numbers() if not draws: raise HTTPException(status_code=404, detail="No data yet") - - # 1~45번 빈도 초기화 + frequency = {n: 0 for n in range(1, 46)} - total_draws = len(draws) - + for _, nums in draws: for n in nums: frequency[n] += 1 - - # 리스트 형태로 변환 (프론트엔드 차트용) - # x: 번호, y: 횟수 + stats = [ - {"number": n, "count": frequency[n]} + {"number": n, "count": frequency[n]} for n in range(1, 46) ] - + return { "total_draws": total_draws, - "frequency": stats + "frequency": stats, } -# ---------- ✅ recommend (dedup save) ---------- + +# ── 통계 분석 리포트 ──────────────────────────────────────────────────────── +@app.get("/api/lotto/analysis") +def api_analysis(): + """ + 5가지 통계 기법 기반 분석 리포트. + - 번호별 빈도, Z-score, 갭 + - 핫/콜드/오버듀 번호 + - 역대 합계 분포, 홀짝 분포 + """ + draws = get_all_draw_numbers() + if not draws: + raise HTTPException(status_code=404, detail="No data yet") + return get_statistical_report(draws) + + +# ── 시뮬레이션 best_picks (메인 추천 엔드포인트) ──────────────────────────── +@app.get("/api/lotto/best") +def api_best_picks(limit: int = 20): + """ + 시뮬레이션을 통해 선별된 최적 번호 조합 반환 (기본 20쌍). + 하루 6회 시뮬레이션 후 자동 갱신됨. + 각 조합에 점수 및 메트릭 포함. + """ + limit = max(1, min(limit, 50)) + picks = get_best_picks(limit=limit) + if not picks: + raise HTTPException( + status_code=404, + detail="시뮬레이션 결과가 없습니다. /api/admin/simulate로 먼저 실행하세요.", + ) + + draws = get_all_draw_numbers() + + result = [] + for p in picks: + nums = p["numbers"] + result.append({ + "rank": p["rank_in_run"], + "numbers": nums, + "score_total": p["score_total"], + "based_on_draw": p["based_on_draw"], + "simulation_run_id": p["source_run_id"], + "created_at": p["created_at"], + "metrics": calc_metrics(nums), + }) + + latest = get_latest_draw() + return { + "based_on_draw": latest["drw_no"] if latest else None, + "count": len(result), + "items": result, + } + + +# ── 시뮬레이션 전체 결과 조회 (상세 API) ──────────────────────────────────── +@app.get("/api/lotto/simulation") +def api_simulation(run_id: Optional[int] = None, runs_limit: int = 5): + """ + 시뮬레이션 실행 기록 및 상위 후보 상세 조회. + run_id 미지정 시: 최근 runs_limit개 실행 기록 + 가장 최근 run의 후보 반환. + run_id 지정 시: 해당 run의 후보만 반환. + """ + runs = get_simulation_runs(limit=runs_limit) + if not runs: + raise HTTPException(status_code=404, detail="시뮬레이션 기록이 없습니다.") + + target_run_id = run_id if run_id is not None else runs[0]["id"] + candidates = get_simulation_candidates(target_run_id, limit=100) + + # 후보에 메트릭 추가 + enriched = [] + for c in candidates: + enriched.append({ + **c, + "metrics": calc_metrics(c["numbers"]), + }) + + return { + "runs": runs, + "selected_run_id": target_run_id, + "candidates_count": len(enriched), + "candidates": enriched, + } + + +# ── 기존 수동 추천 API (하위 호환 유지) ───────────────────────────────────── @app.get("/api/lotto/recommend") def api_recommend( recent_window: int = 200, recent_weight: float = 2.0, avoid_recent_k: int = 5, - - # ---- optional constraints (Lotto Lab) ---- sum_min: Optional[int] = None, sum_max: Optional[int] = None, odd_min: Optional[int] = None, odd_max: Optional[int] = None, range_min: Optional[int] = None, range_max: Optional[int] = None, - max_overlap_latest: Optional[int] = None, # 최근 avoid_recent_k 회차와 중복 허용 개수 - max_try: int = 200, # 조건 맞는 조합 찾기 재시도 + max_overlap_latest: Optional[int] = None, + max_try: int = 200, ): draws = get_all_draw_numbers() if not draws: @@ -143,7 +248,6 @@ def api_recommend( "recent_window": recent_window, "recent_weight": float(recent_weight), "avoid_recent_k": avoid_recent_k, - "sum_min": sum_min, "sum_max": sum_max, "odd_min": odd_min, @@ -168,7 +272,6 @@ def api_recommend( return False if range_max is not None and m["range"] > range_max: return False - if max_overlap_latest is not None: ov = calc_recent_overlap(nums, draws, last_k=avoid_recent_k) if ov["repeats"] > max_overlap_latest: @@ -196,11 +299,9 @@ def api_recommend( if chosen is None: raise HTTPException( status_code=400, - detail=f"Constraints too strict. No valid set found in max_try={max_try}. " - f"Try relaxing sum/odd/range/overlap constraints.", + detail=f"Constraints too strict. No valid set found in max_try={max_try}.", ) - # ✅ dedup save saved = save_recommendation_dedup( latest["drw_no"] if latest else None, chosen, @@ -220,10 +321,11 @@ def api_recommend( "params": params, "metrics": metrics, "recent_overlap": overlap, - "tries": tries, + "tries": tries, } -# ---------- ✅ heatmap-based recommend ---------- + +# ── 히트맵 기반 추천 (하위 호환 유지) ──────────────────────────────────────── @app.get("/api/lotto/recommend/heatmap") def api_recommend_heatmap( heatmap_window: int = 20, @@ -231,8 +333,6 @@ def api_recommend_heatmap( recent_window: int = 200, recent_weight: float = 2.0, avoid_recent_k: int = 5, - - # ---- optional constraints ---- sum_min: Optional[int] = None, sum_max: Optional[int] = None, odd_min: Optional[int] = None, @@ -242,18 +342,13 @@ def api_recommend_heatmap( max_overlap_latest: Optional[int] = None, max_try: int = 200, ): - """ - 히트맵 기반 추천: 과거 추천 번호들의 적중률을 분석하여 가중치 부여 - """ draws = get_all_draw_numbers() if not draws: raise HTTPException(status_code=404, detail="No data yet") - - # 과거 추천 데이터 가져오기 (적중 결과가 있는 것만) + past_recs = list_recommendations_ex(limit=100, sort="id_desc") - latest = get_latest_draw() - + params = { "heatmap_window": heatmap_window, "heatmap_weight": float(heatmap_weight), @@ -269,7 +364,7 @@ def api_recommend_heatmap( "max_overlap_latest": max_overlap_latest, "max_try": int(max_try), } - + def _accept(nums: List[int]) -> bool: m = calc_metrics(nums) if sum_min is not None and m["sum"] < sum_min: @@ -284,16 +379,15 @@ def api_recommend_heatmap( return False if range_max is not None and m["range"] > range_max: return False - if max_overlap_latest is not None: ov = calc_recent_overlap(nums, draws, last_k=avoid_recent_k) if ov["repeats"] > max_overlap_latest: return False return True - + chosen = None explain = None - + tries = 0 while tries < max_try: tries += 1 @@ -311,23 +405,22 @@ def api_recommend_heatmap( chosen = nums explain = result["explain"] break - + if chosen is None: raise HTTPException( status_code=400, detail=f"Constraints too strict. No valid set found in max_try={max_try}.", ) - - # ✅ dedup save + saved = save_recommendation_dedup( latest["drw_no"] if latest else None, chosen, params, ) - + metrics = calc_metrics(chosen) overlap = calc_recent_overlap(chosen, draws, last_k=avoid_recent_k) - + return { "id": saved["id"], "saved": saved["saved"], @@ -341,7 +434,8 @@ def api_recommend_heatmap( "tries": tries, } -# ---------- ✅ history list (filter/paging) ---------- + +# ── 추천 이력 ──────────────────────────────────────────────────────────────── @app.get("/api/history") def api_history( limit: int = 30, @@ -380,6 +474,7 @@ def api_history( "filters": {"favorite": favorite, "tag": tag, "q": q, "sort": sort}, } + @app.delete("/api/history/{rec_id:int}") def api_history_delete(rec_id: int): ok = delete_recommendation(rec_id) @@ -387,12 +482,13 @@ def api_history_delete(rec_id: int): raise HTTPException(status_code=404, detail="Not found") return {"deleted": True, "id": rec_id} -# ---------- ✅ history update (favorite/note/tags) ---------- + class HistoryUpdate(BaseModel): favorite: Optional[bool] = None note: Optional[str] = None tags: Optional[List[str]] = None + @app.patch("/api/history/{rec_id:int}") def api_history_patch(rec_id: int, body: HistoryUpdate): ok = update_recommendation(rec_id, favorite=body.favorite, note=body.note, tags=body.tags) @@ -400,11 +496,11 @@ def api_history_patch(rec_id: int, body: HistoryUpdate): raise HTTPException(status_code=404, detail="Not found or no changes") return {"updated": True, "id": rec_id} -# ---------- ✅ batch recommend ---------- + +# ── 배치 추천 (하위 호환 유지) ─────────────────────────────────────────────── def _batch_unique(draws, count: int, recent_window: int, recent_weight: float, avoid_recent_k: int, max_try: int = 200): items = [] seen = set() - tries = 0 while len(items) < count and tries < max_try: tries += 1 @@ -414,9 +510,9 @@ def _batch_unique(draws, count: int, recent_window: int, recent_weight: float, a continue seen.add(key) items.append(r) - return items + @app.get("/api/lotto/recommend/batch") def api_recommend_batch( count: int = 5, @@ -443,17 +539,19 @@ def api_recommend_batch( "based_on_latest_draw": latest["drw_no"] if latest else None, "count": count, "items": [{ - "numbers": it["numbers"], + "numbers": it["numbers"], "explain": it["explain"], "metrics": calc_metrics(it["numbers"]), } for it in items], "params": params, } + class BatchSave(BaseModel): items: List[List[int]] params: dict + @app.post("/api/lotto/recommend/batch") def api_recommend_batch_save(body: BatchSave): latest = get_latest_draw() @@ -466,7 +564,7 @@ def api_recommend_batch_save(body: BatchSave): return {"saved": True, "created_ids": created, "deduped_ids": deduped} + @app.get("/api/version") def version(): - import os return {"version": os.getenv("APP_VERSION", "dev")}