355 lines
14 KiB
Python
355 lines
14 KiB
Python
"""
|
||
통계 분석 엔진 - lotto-lab 고도화
|
||
|
||
[팀 회의 합의 기반 5가지 통계 기법]
|
||
1. 빈도 Z-score 분석: 각 번호의 출현 빈도가 기댓값에서 얼마나 벗어났는지
|
||
2. 조합 지문(Fingerprint): 조합의 합계, 홀짝 비율, 구간 분포가 역대 당첨번호와 유사한지
|
||
3. 갭 분석(Gap): 각 번호의 마지막 출현으로부터 경과 회차 수 기반 점수
|
||
4. 공동 출현 행렬(Co-occurrence): 번호 쌍이 역대에 함께 나온 빈도 기반 점수
|
||
5. 다양성(Diversity): 연속 번호, 범위, 구간 분포 다양성
|
||
|
||
[통계 근거]
|
||
- 1~45번 각각의 이론적 출현 확률: 6/45 ≈ 13.33% per draw
|
||
- 기댓값 합계: E[sum] = 6 × E[1..45] = 6 × 23 = 138
|
||
- 표준편차 합계: std ≈ sqrt(6 × Var[uniform 1..45]) ≈ 31
|
||
- 홀수 23개 (1,3,...,45), 짝수 22개 (2,4,...,44)
|
||
- 번호 쌍 공동 출현 확률: C(43,4)/C(45,6) ≈ 1.516% per draw
|
||
"""
|
||
|
||
import math
|
||
from collections import Counter, defaultdict
|
||
from typing import List, Tuple, Dict, Any, Optional
|
||
|
||
# 구간 정의: (시작, 끝) 포함
|
||
ZONE_RANGES: List[Tuple[int, int]] = [
|
||
(1, 9),
|
||
(10, 19),
|
||
(20, 29),
|
||
(30, 39),
|
||
(40, 45),
|
||
]
|
||
|
||
|
||
def _get_zone(n: int) -> int:
|
||
"""번호가 속하는 구간 인덱스 (0-4)"""
|
||
for z, (lo, hi) in enumerate(ZONE_RANGES):
|
||
if lo <= n <= hi:
|
||
return z
|
||
return 4
|
||
|
||
|
||
def build_analysis_cache(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]:
|
||
"""
|
||
역대 당첨번호 데이터 기반 통계 분석 캐시 구성.
|
||
시뮬레이션 실행 시 한 번만 호출하여 재사용 (성능 최적화).
|
||
|
||
Args:
|
||
draws: [(drw_no, [n1,n2,n3,n4,n5,n6]), ...] 오름차순
|
||
|
||
Returns:
|
||
통계 캐시 딕셔너리
|
||
"""
|
||
if not draws:
|
||
return {}
|
||
|
||
total_draws = len(draws)
|
||
all_nums_list = [n for _, nums in draws for n in nums]
|
||
freq_all = Counter(all_nums_list)
|
||
|
||
# ── 1. 빈도 Z-score ──────────────────────────────────────────────────────
|
||
freq_values = [freq_all.get(n, 0) for n in range(1, 46)]
|
||
mean_freq = sum(freq_values) / 45.0
|
||
variance_freq = sum((f - mean_freq) ** 2 for f in freq_values) / 45.0
|
||
std_freq = math.sqrt(variance_freq)
|
||
|
||
z_scores: Dict[int, float] = {}
|
||
for n in range(1, 46):
|
||
z_scores[n] = (freq_all.get(n, 0) - mean_freq) / max(std_freq, 0.001)
|
||
|
||
# ── 2. 갭 분석: 마지막 출현 이후 경과 회차 ──────────────────────────────
|
||
# gap = 0: 가장 최근 회차에 출현, gap = k: k회 전에 마지막 출현
|
||
last_seen_gap: Dict[int, int] = {}
|
||
for gap_idx, (_, nums) in enumerate(reversed(draws)):
|
||
for n in nums:
|
||
if n not in last_seen_gap:
|
||
last_seen_gap[n] = gap_idx
|
||
for n in range(1, 46):
|
||
if n not in last_seen_gap:
|
||
last_seen_gap[n] = total_draws # 한 번도 안 나옴 (이론상 거의 불가)
|
||
|
||
# ── 3. 공동 출현 행렬 ────────────────────────────────────────────────────
|
||
# cooccur[(i,j)] = 번호 i와 j가 같은 회차에 함께 출현한 횟수 (i < j)
|
||
cooccur: Dict[Tuple[int, int], int] = defaultdict(int)
|
||
for _, nums in draws:
|
||
s = sorted(nums)
|
||
for i in range(len(s)):
|
||
for j in range(i + 1, len(s)):
|
||
cooccur[(s[i], s[j])] += 1
|
||
|
||
# 번호 쌍 공동 출현 기댓값: C(43,4)/C(45,6) × total_draws
|
||
# C(43,4) = 123,410 / C(45,6) = 8,145,060
|
||
expected_cooccur = total_draws * 123410.0 / 8145060.0
|
||
|
||
# ── 4. 역대 조합 통계 (합계, 홀수 개수) ──────────────────────────────────
|
||
historical_sums = [sum(nums) for _, nums in draws]
|
||
mean_sum = sum(historical_sums) / total_draws
|
||
std_sum = math.sqrt(
|
||
sum((s - mean_sum) ** 2 for s in historical_sums) / total_draws
|
||
)
|
||
std_sum = max(std_sum, 1.0) # 0 나누기 방지
|
||
|
||
historical_odds = [sum(1 for n in nums if n % 2 == 1) for _, nums in draws]
|
||
odd_dist = Counter(historical_odds)
|
||
odd_prob: Dict[int, float] = {k: v / total_draws for k, v in odd_dist.items()}
|
||
max_odd_prob = max(odd_prob.values()) if odd_prob else 1.0
|
||
|
||
# ── 5. 구간별 분포 통계 ───────────────────────────────────────────────────
|
||
# 각 구간에 몇 개 포함되는지의 역대 분포
|
||
zone_counts = [Counter() for _ in ZONE_RANGES]
|
||
for _, nums in draws:
|
||
for z_idx, (lo, hi) in enumerate(ZONE_RANGES):
|
||
cnt = sum(1 for n in nums if lo <= n <= hi)
|
||
zone_counts[z_idx][cnt] += 1
|
||
|
||
zone_probs: List[Dict[int, float]] = []
|
||
for zc in zone_counts:
|
||
total_z = sum(zc.values())
|
||
zone_probs.append({k: v / total_z for k, v in zc.items()})
|
||
|
||
max_zone_probs = [max(zp.values()) if zp else 1.0 for zp in zone_probs]
|
||
|
||
# ── 6. 최근 빈도 (후보 생성 가중치용) ────────────────────────────────────
|
||
recent_100 = draws[-100:] if len(draws) >= 100 else draws
|
||
freq_recent = Counter(n for _, nums in recent_100 for n in nums)
|
||
|
||
return {
|
||
"total_draws": total_draws,
|
||
"freq_all": freq_all,
|
||
"z_scores": z_scores,
|
||
"last_seen_gap": last_seen_gap,
|
||
"cooccur": dict(cooccur),
|
||
"expected_cooccur": expected_cooccur,
|
||
"mean_sum": mean_sum,
|
||
"std_sum": std_sum,
|
||
"odd_prob": odd_prob,
|
||
"max_odd_prob": max_odd_prob,
|
||
"zone_probs": zone_probs,
|
||
"max_zone_probs": max_zone_probs,
|
||
"freq_recent": freq_recent,
|
||
}
|
||
|
||
|
||
def build_number_weights(cache: Dict[str, Any]) -> Dict[int, float]:
|
||
"""
|
||
몬테카를로 시뮬레이션의 후보 생성에 사용할 번호별 샘플링 가중치.
|
||
빈도 + 최근 빈도 + 갭 분석을 반영하여 '좋은' 번호가 더 자주 선택되도록 유도.
|
||
"""
|
||
freq_all = cache["freq_all"]
|
||
last_seen_gap = cache["last_seen_gap"]
|
||
freq_recent = cache["freq_recent"]
|
||
|
||
weights: Dict[int, float] = {}
|
||
for n in range(1, 46):
|
||
w = freq_all.get(n, 0) + 1.5 * freq_recent.get(n, 0)
|
||
|
||
gap = last_seen_gap.get(n, 0)
|
||
if gap <= 1:
|
||
gap_factor = 0.50 # 바로 직전 등장 → 패널티
|
||
elif gap <= 3:
|
||
gap_factor = 0.75
|
||
elif gap <= 12:
|
||
gap_factor = 1.00 # 적정 범위
|
||
elif gap <= 25:
|
||
gap_factor = 1.10 # 약간 오래된 번호 → 소폭 보너스
|
||
else:
|
||
gap_factor = 1.20 # 오래된 번호 → 보너스
|
||
|
||
weights[n] = max(w * gap_factor, 0.5)
|
||
|
||
return weights
|
||
|
||
|
||
def score_combination(numbers: List[int], cache: Dict[str, Any]) -> Dict[str, float]:
|
||
"""
|
||
6개 번호 조합의 통계적 품질 점수 계산 (0~1 범위 정규화).
|
||
|
||
5가지 기법별 점수:
|
||
- score_frequency (25%): 빈도 Z-score
|
||
- score_fingerprint(30%): 조합의 통계적 지문 (합계, 홀짝, 구간)
|
||
- score_gap (20%): 갭 분석
|
||
- score_cooccur (15%): 공동 출현 기댓값 대비
|
||
- score_diversity (10%): 연속번호, 범위, 구간 다양성
|
||
|
||
Returns:
|
||
{"score_total": ..., "score_frequency": ..., ...}
|
||
"""
|
||
nums = sorted(numbers)
|
||
|
||
# ── 1. 빈도 점수 (Frequency Score) ────────────────────────────────────────
|
||
z_scores = cache["z_scores"]
|
||
avg_z = sum(z_scores.get(n, 0.0) for n in nums) / 6.0
|
||
# Sigmoid 정규화: avg_z > 0이면 0.5 이상
|
||
score_frequency = 1.0 / (1.0 + math.exp(-avg_z / 1.5))
|
||
|
||
# ── 2. 조합 지문 점수 (Fingerprint Score) ─────────────────────────────────
|
||
# 2a. 합계 정규분포 점수
|
||
total = sum(nums)
|
||
mean_sum = cache["mean_sum"]
|
||
std_sum = cache["std_sum"]
|
||
z_sum = (total - mean_sum) / std_sum
|
||
sum_score = math.exp(-0.5 * z_sum ** 2) # 정규분포 밀도 (peak=1 at mean)
|
||
|
||
# 2b. 홀짝 비율 점수
|
||
odd_count = sum(1 for n in nums if n % 2 == 1)
|
||
odd_prob = cache["odd_prob"]
|
||
max_odd_prob = cache["max_odd_prob"]
|
||
odd_score = odd_prob.get(odd_count, 0.01) / max_odd_prob
|
||
|
||
# 2c. 구간 분포 점수
|
||
zone_probs = cache["zone_probs"]
|
||
max_zone_probs = cache["max_zone_probs"]
|
||
zone_score = 0.0
|
||
for z_idx, (lo, hi) in enumerate(ZONE_RANGES):
|
||
cnt = sum(1 for n in nums if lo <= n <= hi)
|
||
zp = zone_probs[z_idx]
|
||
mzp = max_zone_probs[z_idx]
|
||
zone_score += zp.get(cnt, 0.01) / mzp
|
||
zone_score /= len(ZONE_RANGES)
|
||
|
||
score_fingerprint = sum_score * 0.50 + odd_score * 0.30 + zone_score * 0.20
|
||
|
||
# ── 3. 갭 점수 (Gap Score) ────────────────────────────────────────────────
|
||
last_seen_gap = cache["last_seen_gap"]
|
||
gap_scores: List[float] = []
|
||
for n in nums:
|
||
gap = last_seen_gap.get(n, 0)
|
||
if gap <= 1:
|
||
gs = 0.20 # 직전 등장 번호 - 강한 패널티
|
||
elif gap <= 3:
|
||
gs = 0.55
|
||
elif gap <= 7:
|
||
gs = 0.85
|
||
elif gap <= 15:
|
||
gs = 1.00 # 최적 범위
|
||
elif gap <= 25:
|
||
gs = 0.90
|
||
else:
|
||
gs = 0.75 # 오래된 번호 - 여전히 양호
|
||
gap_scores.append(gs)
|
||
score_gap = sum(gap_scores) / 6.0
|
||
|
||
# ── 4. 공동 출현 점수 (Co-occurrence Score) ───────────────────────────────
|
||
cooccur = cache["cooccur"]
|
||
expected_cooccur = cache["expected_cooccur"]
|
||
|
||
pair_scores: List[float] = []
|
||
for i in range(len(nums)):
|
||
for j in range(i + 1, len(nums)):
|
||
actual = cooccur.get((nums[i], nums[j]), 0)
|
||
ratio = actual / max(expected_cooccur, 0.001)
|
||
# Sigmoid: ratio = 1에서 0.5, ratio > 1이면 > 0.5
|
||
ps = 1.0 / (1.0 + math.exp(-2.0 * (ratio - 1.0)))
|
||
pair_scores.append(ps)
|
||
score_cooccur = sum(pair_scores) / max(len(pair_scores), 1)
|
||
|
||
# ── 5. 다양성 점수 (Diversity Score) ─────────────────────────────────────
|
||
# 5a. 연속 번호 포함 여부 (역대 당첨번호 약 52%에 최소 1쌍 포함)
|
||
has_consecutive = any(nums[i + 1] - nums[i] == 1 for i in range(len(nums) - 1))
|
||
consecutive_score = 0.65 if has_consecutive else 0.40
|
||
|
||
# 5b. 범위 점수 (최소~최대 차이)
|
||
num_range = nums[-1] - nums[0]
|
||
if 28 <= num_range <= 43:
|
||
spread_score = 1.00
|
||
elif 20 <= num_range < 28:
|
||
spread_score = 0.85
|
||
elif 13 <= num_range < 20:
|
||
spread_score = 0.65
|
||
elif num_range < 13:
|
||
spread_score = 0.25
|
||
else: # > 43 (최대 44: 1~45)
|
||
spread_score = 0.95
|
||
|
||
# 5c. 구간 커버리지 (몇 개 구간에 걸쳐 있는가)
|
||
zones_used = set(_get_zone(n) for n in nums)
|
||
zone_coverage = (len(zones_used) - 1) / 4.0 # 0~1
|
||
|
||
score_diversity = (
|
||
consecutive_score * 0.35
|
||
+ spread_score * 0.35
|
||
+ zone_coverage * 0.30
|
||
)
|
||
|
||
# ── 최종 가중 합산 ────────────────────────────────────────────────────────
|
||
score_total = (
|
||
score_frequency * 0.25
|
||
+ score_fingerprint * 0.30
|
||
+ score_gap * 0.20
|
||
+ score_cooccur * 0.15
|
||
+ score_diversity * 0.10
|
||
)
|
||
|
||
return {
|
||
"score_total": round(score_total, 6),
|
||
"score_frequency": round(score_frequency, 6),
|
||
"score_fingerprint": round(score_fingerprint, 6),
|
||
"score_gap": round(score_gap, 6),
|
||
"score_cooccur": round(score_cooccur, 6),
|
||
"score_diversity": round(score_diversity, 6),
|
||
}
|
||
|
||
|
||
def get_statistical_report(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]:
|
||
"""
|
||
통계 분석 리포트 생성 (GET /api/lotto/analysis 응답용).
|
||
각 번호의 빈도, Z-score, 갭, 히트/콜드/오버듀 분류를 반환.
|
||
"""
|
||
if not draws:
|
||
return {"error": "데이터 없음"}
|
||
|
||
cache = build_analysis_cache(draws)
|
||
total_draws = cache["total_draws"]
|
||
freq_all = cache["freq_all"]
|
||
z_scores = cache["z_scores"]
|
||
last_seen_gap = cache["last_seen_gap"]
|
||
|
||
number_stats = []
|
||
for n in range(1, 46):
|
||
freq = freq_all.get(n, 0)
|
||
expected = total_draws * 6.0 / 45.0
|
||
number_stats.append({
|
||
"number": n,
|
||
"frequency": freq,
|
||
"expected": round(expected, 1),
|
||
"frequency_pct": round(freq / (total_draws * 6) * 100, 2),
|
||
"z_score": round(z_scores.get(n, 0.0), 3),
|
||
"gap": last_seen_gap.get(n, total_draws),
|
||
"zone": _get_zone(n),
|
||
})
|
||
|
||
sorted_by_freq = sorted(number_stats, key=lambda x: -x["frequency"])
|
||
sorted_by_gap = sorted(number_stats, key=lambda x: -x["gap"])
|
||
|
||
# 역대 합계 분포 요약
|
||
hist_sums = [sum(nums) for _, nums in draws]
|
||
sum_buckets: Dict[str, int] = {}
|
||
for lo in range(21, 256, 20):
|
||
hi = lo + 19
|
||
key = f"{lo}-{hi}"
|
||
sum_buckets[key] = sum(1 for s in hist_sums if lo <= s <= hi)
|
||
|
||
return {
|
||
"total_draws": total_draws,
|
||
"mean_sum": round(cache["mean_sum"], 2),
|
||
"std_sum": round(cache["std_sum"], 2),
|
||
"odd_distribution": {
|
||
str(k): round(v * 100, 1)
|
||
for k, v in sorted(cache["odd_prob"].items())
|
||
},
|
||
"number_stats": number_stats,
|
||
"hot_numbers": [x["number"] for x in sorted_by_freq[:10]],
|
||
"cold_numbers": [x["number"] for x in sorted_by_freq[-10:]],
|
||
"overdue_numbers": [x["number"] for x in sorted_by_gap[:10]],
|
||
"sum_distribution": sum_buckets,
|
||
}
|