""" 통계 분석 엔진 - lotto-lab 고도화 [팀 회의 합의 기반 5가지 통계 기법] 1. 빈도 Z-score 분석: 각 번호의 출현 빈도가 기댓값에서 얼마나 벗어났는지 2. 조합 지문(Fingerprint): 조합의 합계, 홀짝 비율, 구간 분포가 역대 당첨번호와 유사한지 3. 갭 분석(Gap): 각 번호의 마지막 출현으로부터 경과 회차 수 기반 점수 4. 공동 출현 행렬(Co-occurrence): 번호 쌍이 역대에 함께 나온 빈도 기반 점수 5. 다양성(Diversity): 연속 번호, 범위, 구간 분포 다양성 [통계 근거] - 1~45번 각각의 이론적 출현 확률: 6/45 ≈ 13.33% per draw - 기댓값 합계: E[sum] = 6 × E[1..45] = 6 × 23 = 138 - 표준편차 합계: std ≈ sqrt(6 × Var[uniform 1..45]) ≈ 31 - 홀수 23개 (1,3,...,45), 짝수 22개 (2,4,...,44) - 번호 쌍 공동 출현 확률: C(43,4)/C(45,6) ≈ 1.516% per draw """ import math from collections import Counter, defaultdict from datetime import datetime, timezone from typing import List, Tuple, Dict, Any, Optional # 구간 정의: (시작, 끝) 포함 ZONE_RANGES: List[Tuple[int, int]] = [ (1, 9), (10, 19), (20, 29), (30, 39), (40, 45), ] def _get_zone(n: int) -> int: """번호가 속하는 구간 인덱스 (0-4)""" for z, (lo, hi) in enumerate(ZONE_RANGES): if lo <= n <= hi: return z return 4 def build_analysis_cache(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]: """ 역대 당첨번호 데이터 기반 통계 분석 캐시 구성. 시뮬레이션 실행 시 한 번만 호출하여 재사용 (성능 최적화). Args: draws: [(drw_no, [n1,n2,n3,n4,n5,n6]), ...] 오름차순 Returns: 통계 캐시 딕셔너리 """ if not draws: return {} total_draws = len(draws) all_nums_list = [n for _, nums in draws for n in nums] freq_all = Counter(all_nums_list) # ── 1. 빈도 Z-score ────────────────────────────────────────────────────── freq_values = [freq_all.get(n, 0) for n in range(1, 46)] mean_freq = sum(freq_values) / 45.0 variance_freq = sum((f - mean_freq) ** 2 for f in freq_values) / 45.0 std_freq = math.sqrt(variance_freq) z_scores: Dict[int, float] = {} for n in range(1, 46): z_scores[n] = (freq_all.get(n, 0) - mean_freq) / max(std_freq, 0.001) # ── 2. 갭 분석: 마지막 출현 이후 경과 회차 ────────────────────────────── # gap = 0: 가장 최근 회차에 출현, gap = k: k회 전에 마지막 출현 last_seen_gap: Dict[int, int] = {} for gap_idx, (_, nums) in enumerate(reversed(draws)): for n in nums: if n not in last_seen_gap: last_seen_gap[n] = gap_idx for n in range(1, 46): if n not in last_seen_gap: last_seen_gap[n] = total_draws # 한 번도 안 나옴 (이론상 거의 불가) # ── 3. 공동 출현 행렬 ──────────────────────────────────────────────────── # cooccur[(i,j)] = 번호 i와 j가 같은 회차에 함께 출현한 횟수 (i < j) cooccur: Dict[Tuple[int, int], int] = defaultdict(int) for _, nums in draws: s = sorted(nums) for i in range(len(s)): for j in range(i + 1, len(s)): cooccur[(s[i], s[j])] += 1 # 번호 쌍 공동 출현 기댓값: C(43,4)/C(45,6) × total_draws # C(43,4) = 123,410 / C(45,6) = 8,145,060 expected_cooccur = total_draws * 123410.0 / 8145060.0 # ── 4. 역대 조합 통계 (합계, 홀수 개수) ────────────────────────────────── historical_sums = [sum(nums) for _, nums in draws] mean_sum = sum(historical_sums) / total_draws std_sum = math.sqrt( sum((s - mean_sum) ** 2 for s in historical_sums) / total_draws ) std_sum = max(std_sum, 1.0) # 0 나누기 방지 historical_odds = [sum(1 for n in nums if n % 2 == 1) for _, nums in draws] odd_dist = Counter(historical_odds) odd_prob: Dict[int, float] = {k: v / total_draws for k, v in odd_dist.items()} max_odd_prob = max(odd_prob.values()) if odd_prob else 1.0 # ── 5. 구간별 분포 통계 ─────────────────────────────────────────────────── # 각 구간에 몇 개 포함되는지의 역대 분포 zone_counts = [Counter() for _ in ZONE_RANGES] for _, nums in draws: for z_idx, (lo, hi) in enumerate(ZONE_RANGES): cnt = sum(1 for n in nums if lo <= n <= hi) zone_counts[z_idx][cnt] += 1 zone_probs: List[Dict[int, float]] = [] for zc in zone_counts: total_z = sum(zc.values()) zone_probs.append({k: v / total_z for k, v in zc.items()}) max_zone_probs = [max(zp.values()) if zp else 1.0 for zp in zone_probs] # ── 6. 최근 빈도 (후보 생성 가중치용) ──────────────────────────────────── recent_100 = draws[-100:] if len(draws) >= 100 else draws freq_recent = Counter(n for _, nums in recent_100 for n in nums) return { "total_draws": total_draws, "freq_all": freq_all, "z_scores": z_scores, "last_seen_gap": last_seen_gap, "cooccur": dict(cooccur), "expected_cooccur": expected_cooccur, "mean_sum": mean_sum, "std_sum": std_sum, "odd_prob": odd_prob, "max_odd_prob": max_odd_prob, "zone_probs": zone_probs, "max_zone_probs": max_zone_probs, "freq_recent": freq_recent, } def build_number_weights(cache: Dict[str, Any]) -> Dict[int, float]: """ 몬테카를로 시뮬레이션의 후보 생성에 사용할 번호별 샘플링 가중치. 빈도 + 최근 빈도 + 갭 분석을 반영하여 '좋은' 번호가 더 자주 선택되도록 유도. """ freq_all = cache["freq_all"] last_seen_gap = cache["last_seen_gap"] freq_recent = cache["freq_recent"] weights: Dict[int, float] = {} for n in range(1, 46): w = freq_all.get(n, 0) + 1.5 * freq_recent.get(n, 0) gap = last_seen_gap.get(n, 0) if gap <= 1: gap_factor = 0.50 # 바로 직전 등장 → 패널티 elif gap <= 3: gap_factor = 0.75 elif gap <= 12: gap_factor = 1.00 # 적정 범위 elif gap <= 25: gap_factor = 1.10 # 약간 오래된 번호 → 소폭 보너스 else: gap_factor = 1.20 # 오래된 번호 → 보너스 weights[n] = max(w * gap_factor, 0.5) return weights def score_combination(numbers: List[int], cache: Dict[str, Any]) -> Dict[str, float]: """ 6개 번호 조합의 통계적 품질 점수 계산 (0~1 범위 정규화). 5가지 기법별 점수: - score_frequency (25%): 빈도 Z-score - score_fingerprint(30%): 조합의 통계적 지문 (합계, 홀짝, 구간) - score_gap (20%): 갭 분석 - score_cooccur (15%): 공동 출현 기댓값 대비 - score_diversity (10%): 연속번호, 범위, 구간 다양성 Returns: {"score_total": ..., "score_frequency": ..., ...} """ nums = sorted(numbers) # ── 1. 빈도 점수 (Frequency Score) ──────────────────────────────────────── z_scores = cache["z_scores"] avg_z = sum(z_scores.get(n, 0.0) for n in nums) / 6.0 # Sigmoid 정규화: avg_z > 0이면 0.5 이상 score_frequency = 1.0 / (1.0 + math.exp(-avg_z / 1.5)) # ── 2. 조합 지문 점수 (Fingerprint Score) ───────────────────────────────── # 2a. 합계 정규분포 점수 total = sum(nums) mean_sum = cache["mean_sum"] std_sum = cache["std_sum"] z_sum = (total - mean_sum) / std_sum sum_score = math.exp(-0.5 * z_sum ** 2) # 정규분포 밀도 (peak=1 at mean) # 2b. 홀짝 비율 점수 odd_count = sum(1 for n in nums if n % 2 == 1) odd_prob = cache["odd_prob"] max_odd_prob = cache["max_odd_prob"] odd_score = odd_prob.get(odd_count, 0.01) / max_odd_prob # 2c. 구간 분포 점수 zone_probs = cache["zone_probs"] max_zone_probs = cache["max_zone_probs"] zone_score = 0.0 for z_idx, (lo, hi) in enumerate(ZONE_RANGES): cnt = sum(1 for n in nums if lo <= n <= hi) zp = zone_probs[z_idx] mzp = max_zone_probs[z_idx] zone_score += zp.get(cnt, 0.01) / mzp zone_score /= len(ZONE_RANGES) score_fingerprint = sum_score * 0.50 + odd_score * 0.30 + zone_score * 0.20 # ── 3. 갭 점수 (Gap Score) ──────────────────────────────────────────────── last_seen_gap = cache["last_seen_gap"] gap_scores: List[float] = [] for n in nums: gap = last_seen_gap.get(n, 0) if gap <= 1: gs = 0.20 # 직전 등장 번호 - 강한 패널티 elif gap <= 3: gs = 0.55 elif gap <= 7: gs = 0.85 elif gap <= 15: gs = 1.00 # 최적 범위 elif gap <= 25: gs = 0.90 else: gs = 0.75 # 오래된 번호 - 여전히 양호 gap_scores.append(gs) score_gap = sum(gap_scores) / 6.0 # ── 4. 공동 출현 점수 (Co-occurrence Score) ─────────────────────────────── cooccur = cache["cooccur"] expected_cooccur = cache["expected_cooccur"] pair_scores: List[float] = [] for i in range(len(nums)): for j in range(i + 1, len(nums)): actual = cooccur.get((nums[i], nums[j]), 0) ratio = actual / max(expected_cooccur, 0.001) # Sigmoid: ratio = 1에서 0.5, ratio > 1이면 > 0.5 ps = 1.0 / (1.0 + math.exp(-2.0 * (ratio - 1.0))) pair_scores.append(ps) score_cooccur = sum(pair_scores) / max(len(pair_scores), 1) # ── 5. 다양성 점수 (Diversity Score) ───────────────────────────────────── # 5a. 연속 번호 포함 여부 (역대 당첨번호 약 52%에 최소 1쌍 포함) has_consecutive = any(nums[i + 1] - nums[i] == 1 for i in range(len(nums) - 1)) consecutive_score = 0.65 if has_consecutive else 0.40 # 5b. 범위 점수 (최소~최대 차이) num_range = nums[-1] - nums[0] if 28 <= num_range <= 43: spread_score = 1.00 elif 20 <= num_range < 28: spread_score = 0.85 elif 13 <= num_range < 20: spread_score = 0.65 elif num_range < 13: spread_score = 0.25 else: # > 43 (최대 44: 1~45) spread_score = 0.95 # 5c. 구간 커버리지 (몇 개 구간에 걸쳐 있는가) zones_used = set(_get_zone(n) for n in nums) zone_coverage = (len(zones_used) - 1) / 4.0 # 0~1 score_diversity = ( consecutive_score * 0.35 + spread_score * 0.35 + zone_coverage * 0.30 ) # ── 최종 가중 합산 ──────────────────────────────────────────────────────── score_total = ( score_frequency * 0.25 + score_fingerprint * 0.30 + score_gap * 0.20 + score_cooccur * 0.15 + score_diversity * 0.10 ) return { "score_total": round(score_total, 6), "score_frequency": round(score_frequency, 6), "score_fingerprint": round(score_fingerprint, 6), "score_gap": round(score_gap, 6), "score_cooccur": round(score_cooccur, 6), "score_diversity": round(score_diversity, 6), } def get_statistical_report(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]: """ 통계 분석 리포트 생성 (GET /api/lotto/analysis 응답용). 각 번호의 빈도, Z-score, 갭, 히트/콜드/오버듀 분류를 반환. """ if not draws: return {"error": "데이터 없음"} cache = build_analysis_cache(draws) total_draws = cache["total_draws"] freq_all = cache["freq_all"] z_scores = cache["z_scores"] last_seen_gap = cache["last_seen_gap"] number_stats = [] for n in range(1, 46): freq = freq_all.get(n, 0) expected = total_draws * 6.0 / 45.0 number_stats.append({ "number": n, "frequency": freq, "expected": round(expected, 1), "frequency_pct": round(freq / (total_draws * 6) * 100, 2), "z_score": round(z_scores.get(n, 0.0), 3), "gap": last_seen_gap.get(n, total_draws), "zone": _get_zone(n), }) sorted_by_freq = sorted(number_stats, key=lambda x: -x["frequency"]) sorted_by_gap = sorted(number_stats, key=lambda x: -x["gap"]) # 역대 합계 분포 요약 hist_sums = [sum(nums) for _, nums in draws] sum_buckets: Dict[str, int] = {} for lo in range(21, 256, 20): hi = lo + 19 key = f"{lo}-{hi}" sum_buckets[key] = sum(1 for s in hist_sums if lo <= s <= hi) return { "total_draws": total_draws, "mean_sum": round(cache["mean_sum"], 2), "std_sum": round(cache["std_sum"], 2), "odd_distribution": { str(k): round(v * 100, 1) for k, v in sorted(cache["odd_prob"].items()) }, "number_stats": number_stats, "hot_numbers": [x["number"] for x in sorted_by_freq[:10]], "cold_numbers": [x["number"] for x in sorted_by_freq[-10:]], "overdue_numbers": [x["number"] for x in sorted_by_gap[:10]], "sum_distribution": sum_buckets, } def generate_weekly_report(draws: List[Tuple[int, List[int]]], target_drw_no: int) -> Dict[str, Any]: """ 특정 회차 공략 리포트 생성. target_drw_no: 공략 대상 회차 (아직 추첨 안 된 회차) draws: target_drw_no 이전까지의 당첨번호 (오름차순) """ if not draws: return {"error": "데이터 없음"} cache = build_analysis_cache(draws) total_draws = cache["total_draws"] freq_all = cache["freq_all"] last_seen_gap = cache["last_seen_gap"] recent_10 = draws[-10:] if len(draws) >= 10 else draws recent_3 = draws[-3:] if len(draws) >= 3 else draws # 과출현: 최근 10회에 2회 이상 출현 번호 (출현 많은 순) r10_nums = [n for _, nums in recent_10 for n in nums] r10_freq = Counter(r10_nums) hot_numbers = [n for n, _ in sorted(r10_freq.items(), key=lambda x: -x[1]) if r10_freq[n] >= 2] # 냉각: 역대 출현 빈도 낮은 번호 cold_numbers = sorted(range(1, 46), key=lambda n: freq_all.get(n, 0))[:10] # 오버듀: 가장 오래 미출현 번호 overdue_numbers = sorted(range(1, 46), key=lambda n: -last_seen_gap.get(n, 0))[:10] # 최근 3회 연속 출현 (2회 이상) r3_nums = [n for _, nums in recent_3 for n in nums] r3_freq = Counter(r3_nums) triple_appear = sorted(n for n, cnt in r3_freq.items() if cnt >= 2) recent_sums = [sum(nums) for _, nums in recent_10] recent_odd = [sum(1 for n in nums if n % 2 == 1) for _, nums in recent_10] # 갭 기반 가중치 (오래된 번호일수록 높음) gap_w = {n: last_seen_gap.get(n, 0) for n in range(1, 46)} def _pick(exclude=None, prefer=None, n=6): ex = set(exclude or []) chosen = [] # prefer에서 최대 3개 우선 선택 for p in (prefer or []): if p not in ex and len(chosen) < 3: chosen.append(p) # 구간별 1개씩 (갭 우선) for lo, hi in [(1, 9), (10, 19), (20, 29), (30, 39), (40, 45)]: if len(chosen) >= n: break cands = [x for x in range(lo, hi + 1) if x not in ex and x not in chosen] if cands: chosen.append(max(cands, key=lambda x: gap_w.get(x, 0))) # 부족하면 나머지에서 갭 순 rest = sorted( [x for x in range(1, 46) if x not in ex and x not in chosen], key=lambda x: -gap_w.get(x, 0), ) while len(chosen) < n and rest: chosen.append(rest.pop(0)) return sorted(chosen[:n]) set1 = _pick(exclude=hot_numbers[:5], prefer=overdue_numbers[:5]) set2 = _pick() set3 = _pick(exclude=hot_numbers) # 신뢰도 점수 data_vol = min(total_draws / 500, 1.0) if len(recent_sums) > 1: avg_s = sum(recent_sums) / len(recent_sums) std_s = (sum((s - avg_s) ** 2 for s in recent_sums) / len(recent_sums)) ** 0.5 pattern = max(0.0, 1.0 - std_s / 60.0) else: pattern = 0.5 trend = max(0.0, 1.0 - len(hot_numbers) / max(len(r10_nums), 1)) confidence = round((data_vol * 0.4 + pattern * 0.35 + trend * 0.25) * 100) return { "target_drw_no": target_drw_no, "based_on_draw": draws[-1][0], "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "hot_numbers": hot_numbers[:8], "cold_numbers": cold_numbers, "overdue_numbers": overdue_numbers, "recent_pattern": { "last3_numbers": sorted(set(r3_nums)), "triple_appear": triple_appear, "recent_sum_avg": round(sum(recent_sums) / len(recent_sums), 1) if recent_sums else 0, "recent_odd_avg": round(sum(recent_odd) / len(recent_odd), 1) if recent_odd else 0, }, "recommended_sets": [ {"strategy": "냉각번호 중심", "numbers": set1, "description": "오랫동안 미출현 번호 위주 + 과출현 제외"}, {"strategy": "균형형", "numbers": set2, "description": "구간 균형 + 갭 최적화"}, {"strategy": "과출현 피하기", "numbers": set3, "description": "최근 자주 나온 번호 완전 제외"}, ], "confidence_score": confidence, "confidence_factors": { "data_volume": round(data_vol * 100), "pattern_consistency": round(pattern * 100), "recent_trend": round(trend * 100), }, }