refactor: backend→lotto 서비스 리네이밍 + lotto.db 레거시 테이블 스키마 제거

- backend/ → lotto/ 디렉토리 이동
- docker-compose: lotto-backend→lotto, lotto-frontend→frontend
- deploy scripts, nginx, agent-office config 네이밍 일괄 반영
- lotto/app/db.py에서 todos·blog_posts CREATE TABLE 제거 (personal로 이관 완료)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-27 17:29:13 +09:00
parent 6c46759848
commit 2a8635e9ed
26 changed files with 18 additions and 56 deletions

655
lotto/app/analyzer.py Normal file
View File

@@ -0,0 +1,655 @@
"""
통계 분석 엔진 - lotto-lab 고도화
[팀 회의 합의 기반 5가지 통계 기법]
1. 빈도 Z-score 분석: 각 번호의 출현 빈도가 기댓값에서 얼마나 벗어났는지
2. 조합 지문(Fingerprint): 조합의 합계, 홀짝 비율, 구간 분포가 역대 당첨번호와 유사한지
3. 갭 분석(Gap): 각 번호의 마지막 출현으로부터 경과 회차 수 기반 점수
4. 공동 출현 행렬(Co-occurrence): 번호 쌍이 역대에 함께 나온 빈도 기반 점수
5. 다양성(Diversity): 연속 번호, 범위, 구간 분포 다양성
[통계 근거]
- 1~45번 각각의 이론적 출현 확률: 6/45 ≈ 13.33% per draw
- 기댓값 합계: E[sum] = 6 × E[1..45] = 6 × 23 = 138
- 표준편차 합계: std ≈ sqrt(6 × Var[uniform 1..45]) ≈ 31
- 홀수 23개 (1,3,...,45), 짝수 22개 (2,4,...,44)
- 번호 쌍 공동 출현 확률: C(43,4)/C(45,6) ≈ 1.516% per draw
"""
import math
from collections import Counter, defaultdict
from datetime import datetime, timezone
from typing import List, Tuple, Dict, Any, Optional
# 구간 정의: (시작, 끝) 포함
ZONE_RANGES: List[Tuple[int, int]] = [
(1, 9),
(10, 19),
(20, 29),
(30, 39),
(40, 45),
]
def _get_zone(n: int) -> int:
"""번호가 속하는 구간 인덱스 (0-4)"""
for z, (lo, hi) in enumerate(ZONE_RANGES):
if lo <= n <= hi:
return z
return 4
def build_analysis_cache(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]:
"""
역대 당첨번호 데이터 기반 통계 분석 캐시 구성.
시뮬레이션 실행 시 한 번만 호출하여 재사용 (성능 최적화).
Args:
draws: [(drw_no, [n1,n2,n3,n4,n5,n6]), ...] 오름차순
Returns:
통계 캐시 딕셔너리
"""
if not draws:
return {}
total_draws = len(draws)
all_nums_list = [n for _, nums in draws for n in nums]
freq_all = Counter(all_nums_list)
# ── 1. 빈도 Z-score ──────────────────────────────────────────────────────
freq_values = [freq_all.get(n, 0) for n in range(1, 46)]
mean_freq = sum(freq_values) / 45.0
variance_freq = sum((f - mean_freq) ** 2 for f in freq_values) / 45.0
std_freq = math.sqrt(variance_freq)
z_scores: Dict[int, float] = {}
for n in range(1, 46):
z_scores[n] = (freq_all.get(n, 0) - mean_freq) / max(std_freq, 0.001)
# ── 2. 갭 분석: 마지막 출현 이후 경과 회차 ──────────────────────────────
# gap = 0: 가장 최근 회차에 출현, gap = k: k회 전에 마지막 출현
last_seen_gap: Dict[int, int] = {}
for gap_idx, (_, nums) in enumerate(reversed(draws)):
for n in nums:
if n not in last_seen_gap:
last_seen_gap[n] = gap_idx
for n in range(1, 46):
if n not in last_seen_gap:
last_seen_gap[n] = total_draws # 한 번도 안 나옴 (이론상 거의 불가)
# ── 3. 공동 출현 행렬 ────────────────────────────────────────────────────
# cooccur[(i,j)] = 번호 i와 j가 같은 회차에 함께 출현한 횟수 (i < j)
cooccur: Dict[Tuple[int, int], int] = defaultdict(int)
for _, nums in draws:
s = sorted(nums)
for i in range(len(s)):
for j in range(i + 1, len(s)):
cooccur[(s[i], s[j])] += 1
# 번호 쌍 공동 출현 기댓값: C(43,4)/C(45,6) × total_draws
# C(43,4) = 123,410 / C(45,6) = 8,145,060
expected_cooccur = total_draws * 123410.0 / 8145060.0
# ── 4. 역대 조합 통계 (합계, 홀수 개수) ──────────────────────────────────
historical_sums = [sum(nums) for _, nums in draws]
mean_sum = sum(historical_sums) / total_draws
std_sum = math.sqrt(
sum((s - mean_sum) ** 2 for s in historical_sums) / total_draws
)
std_sum = max(std_sum, 1.0) # 0 나누기 방지
historical_odds = [sum(1 for n in nums if n % 2 == 1) for _, nums in draws]
odd_dist = Counter(historical_odds)
odd_prob: Dict[int, float] = {k: v / total_draws for k, v in odd_dist.items()}
max_odd_prob = max(odd_prob.values()) if odd_prob else 1.0
# ── 5. 구간별 분포 통계 ───────────────────────────────────────────────────
# 각 구간에 몇 개 포함되는지의 역대 분포
zone_counts = [Counter() for _ in ZONE_RANGES]
for _, nums in draws:
for z_idx, (lo, hi) in enumerate(ZONE_RANGES):
cnt = sum(1 for n in nums if lo <= n <= hi)
zone_counts[z_idx][cnt] += 1
zone_probs: List[Dict[int, float]] = []
for zc in zone_counts:
total_z = sum(zc.values())
zone_probs.append({k: v / total_z for k, v in zc.items()})
max_zone_probs = [max(zp.values()) if zp else 1.0 for zp in zone_probs]
# ── 6. 최근 빈도 (후보 생성 가중치용) ────────────────────────────────────
recent_100 = draws[-100:] if len(draws) >= 100 else draws
freq_recent = Counter(n for _, nums in recent_100 for n in nums)
return {
"total_draws": total_draws,
"freq_all": freq_all,
"z_scores": z_scores,
"last_seen_gap": last_seen_gap,
"cooccur": dict(cooccur),
"expected_cooccur": expected_cooccur,
"mean_sum": mean_sum,
"std_sum": std_sum,
"odd_prob": odd_prob,
"max_odd_prob": max_odd_prob,
"zone_probs": zone_probs,
"max_zone_probs": max_zone_probs,
"freq_recent": freq_recent,
}
def build_number_weights(cache: Dict[str, Any]) -> Dict[int, float]:
"""
몬테카를로 시뮬레이션의 후보 생성에 사용할 번호별 샘플링 가중치.
빈도 + 최근 빈도 + 갭 분석을 반영하여 '좋은' 번호가 더 자주 선택되도록 유도.
"""
freq_all = cache["freq_all"]
last_seen_gap = cache["last_seen_gap"]
freq_recent = cache["freq_recent"]
weights: Dict[int, float] = {}
for n in range(1, 46):
w = freq_all.get(n, 0) + 1.5 * freq_recent.get(n, 0)
gap = last_seen_gap.get(n, 0)
if gap <= 1:
gap_factor = 0.50 # 바로 직전 등장 → 패널티
elif gap <= 3:
gap_factor = 0.75
elif gap <= 12:
gap_factor = 1.00 # 적정 범위
elif gap <= 25:
gap_factor = 1.10 # 약간 오래된 번호 → 소폭 보너스
else:
gap_factor = 1.20 # 오래된 번호 → 보너스
weights[n] = max(w * gap_factor, 0.5)
return weights
def score_combination(numbers: List[int], cache: Dict[str, Any]) -> Dict[str, float]:
"""
6개 번호 조합의 통계적 품질 점수 계산 (0~1 범위 정규화).
5가지 기법별 점수:
- score_frequency (25%): 빈도 Z-score
- score_fingerprint(30%): 조합의 통계적 지문 (합계, 홀짝, 구간)
- score_gap (20%): 갭 분석
- score_cooccur (15%): 공동 출현 기댓값 대비
- score_diversity (10%): 연속번호, 범위, 구간 다양성
Returns:
{"score_total": ..., "score_frequency": ..., ...}
"""
nums = sorted(numbers)
# ── 1. 빈도 점수 (Frequency Score) ────────────────────────────────────────
z_scores = cache["z_scores"]
avg_z = sum(z_scores.get(n, 0.0) for n in nums) / 6.0
# Sigmoid 정규화: avg_z > 0이면 0.5 이상
score_frequency = 1.0 / (1.0 + math.exp(-avg_z / 1.5))
# ── 2. 조합 지문 점수 (Fingerprint Score) ─────────────────────────────────
# 2a. 합계 정규분포 점수
total = sum(nums)
mean_sum = cache["mean_sum"]
std_sum = cache["std_sum"]
z_sum = (total - mean_sum) / std_sum
sum_score = math.exp(-0.5 * z_sum ** 2) # 정규분포 밀도 (peak=1 at mean)
# 2b. 홀짝 비율 점수
odd_count = sum(1 for n in nums if n % 2 == 1)
odd_prob = cache["odd_prob"]
max_odd_prob = cache["max_odd_prob"]
odd_score = odd_prob.get(odd_count, 0.01) / max_odd_prob
# 2c. 구간 분포 점수
zone_probs = cache["zone_probs"]
max_zone_probs = cache["max_zone_probs"]
zone_score = 0.0
for z_idx, (lo, hi) in enumerate(ZONE_RANGES):
cnt = sum(1 for n in nums if lo <= n <= hi)
zp = zone_probs[z_idx]
mzp = max_zone_probs[z_idx]
zone_score += zp.get(cnt, 0.01) / mzp
zone_score /= len(ZONE_RANGES)
score_fingerprint = sum_score * 0.50 + odd_score * 0.30 + zone_score * 0.20
# ── 3. 갭 점수 (Gap Score) ────────────────────────────────────────────────
last_seen_gap = cache["last_seen_gap"]
gap_scores: List[float] = []
for n in nums:
gap = last_seen_gap.get(n, 0)
if gap <= 1:
gs = 0.20 # 직전 등장 번호 - 강한 패널티
elif gap <= 3:
gs = 0.55
elif gap <= 7:
gs = 0.85
elif gap <= 15:
gs = 1.00 # 최적 범위
elif gap <= 25:
gs = 0.90
else:
gs = 0.75 # 오래된 번호 - 여전히 양호
gap_scores.append(gs)
score_gap = sum(gap_scores) / 6.0
# ── 4. 공동 출현 점수 (Co-occurrence Score) ───────────────────────────────
cooccur = cache["cooccur"]
expected_cooccur = cache["expected_cooccur"]
pair_scores: List[float] = []
for i in range(len(nums)):
for j in range(i + 1, len(nums)):
actual = cooccur.get((nums[i], nums[j]), 0)
ratio = actual / max(expected_cooccur, 0.001)
# Sigmoid: ratio = 1에서 0.5, ratio > 1이면 > 0.5
ps = 1.0 / (1.0 + math.exp(-2.0 * (ratio - 1.0)))
pair_scores.append(ps)
score_cooccur = sum(pair_scores) / max(len(pair_scores), 1)
# ── 5. 다양성 점수 (Diversity Score) ─────────────────────────────────────
# 5a. 연속 번호 포함 여부 (역대 당첨번호 약 52%에 최소 1쌍 포함)
has_consecutive = any(nums[i + 1] - nums[i] == 1 for i in range(len(nums) - 1))
consecutive_score = 0.65 if has_consecutive else 0.40
# 5b. 범위 점수 (최소~최대 차이)
num_range = nums[-1] - nums[0]
if 28 <= num_range <= 43:
spread_score = 1.00
elif 20 <= num_range < 28:
spread_score = 0.85
elif 13 <= num_range < 20:
spread_score = 0.65
elif num_range < 13:
spread_score = 0.25
else: # > 43 (최대 44: 1~45)
spread_score = 0.95
# 5c. 구간 커버리지 (몇 개 구간에 걸쳐 있는가)
zones_used = set(_get_zone(n) for n in nums)
zone_coverage = (len(zones_used) - 1) / 4.0 # 0~1
score_diversity = (
consecutive_score * 0.35
+ spread_score * 0.35
+ zone_coverage * 0.30
)
# ── 최종 가중 합산 ────────────────────────────────────────────────────────
score_total = (
score_frequency * 0.25
+ score_fingerprint * 0.30
+ score_gap * 0.20
+ score_cooccur * 0.15
+ score_diversity * 0.10
)
return {
"score_total": round(score_total, 6),
"score_frequency": round(score_frequency, 6),
"score_fingerprint": round(score_fingerprint, 6),
"score_gap": round(score_gap, 6),
"score_cooccur": round(score_cooccur, 6),
"score_diversity": round(score_diversity, 6),
}
def get_statistical_report(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]:
"""
통계 분석 리포트 생성 (GET /api/lotto/analysis 응답용).
각 번호의 빈도, Z-score, 갭, 히트/콜드/오버듀 분류를 반환.
"""
if not draws:
return {"error": "데이터 없음"}
cache = build_analysis_cache(draws)
total_draws = cache["total_draws"]
freq_all = cache["freq_all"]
z_scores = cache["z_scores"]
last_seen_gap = cache["last_seen_gap"]
number_stats = []
for n in range(1, 46):
freq = freq_all.get(n, 0)
expected = total_draws * 6.0 / 45.0
number_stats.append({
"number": n,
"frequency": freq,
"expected": round(expected, 1),
"frequency_pct": round(freq / (total_draws * 6) * 100, 2),
"z_score": round(z_scores.get(n, 0.0), 3),
"gap": last_seen_gap.get(n, total_draws),
"zone": _get_zone(n),
})
sorted_by_freq = sorted(number_stats, key=lambda x: -x["frequency"])
sorted_by_gap = sorted(number_stats, key=lambda x: -x["gap"])
# 역대 합계 분포 요약
hist_sums = [sum(nums) for _, nums in draws]
sum_buckets: Dict[str, int] = {}
for lo in range(21, 256, 20):
hi = lo + 19
key = f"{lo}-{hi}"
sum_buckets[key] = sum(1 for s in hist_sums if lo <= s <= hi)
return {
"total_draws": total_draws,
"mean_sum": round(cache["mean_sum"], 2),
"std_sum": round(cache["std_sum"], 2),
"odd_distribution": {
str(k): round(v * 100, 1)
for k, v in sorted(cache["odd_prob"].items())
},
"number_stats": number_stats,
"hot_numbers": [x["number"] for x in sorted_by_freq[:10]],
"cold_numbers": [x["number"] for x in sorted_by_freq[-10:]],
"overdue_numbers": [x["number"] for x in sorted_by_gap[:10]],
"sum_distribution": sum_buckets,
}
def analyze_personal_patterns(
all_numbers: List[List[int]],
draws: List[Tuple[int, List[int]]],
) -> Dict[str, Any]:
"""
사용자 추천 이력 기반 개인 패턴 분석.
all_numbers: 저장된 모든 추천 번호 리스트 (각 원소는 6개짜리 리스트)
draws: 역대 당첨번호 (홀짝/합계 평균 비교용)
"""
if not all_numbers:
return {"total_analyzed": 0, "message": "추천 이력이 없습니다"}
total = len(all_numbers)
flat = [n for nums in all_numbers for n in nums]
freq = Counter(flat)
# 번호별 선택 빈도
number_frequency = {n: freq.get(n, 0) for n in range(1, 46)}
top_picks = sorted(range(1, 46), key=lambda n: -freq.get(n, 0))[:10]
least_picks = [n for n in sorted(range(1, 46), key=lambda n: freq.get(n, 0)) if freq.get(n, 0) == 0][:10]
# 패턴 지표
odd_counts = [sum(1 for n in nums if n % 2 == 1) for nums in all_numbers]
sums = [sum(nums) for nums in all_numbers]
ranges = [max(nums) - min(nums) for nums in all_numbers]
consecutive_count = sum(
1 for nums in all_numbers
if any(sorted(nums)[i + 1] - sorted(nums)[i] == 1 for i in range(5))
)
zone_totals = {k: 0 for k in ["1-9", "10-19", "20-29", "30-39", "40-45"]}
zone_ranges = [("1-9", 1, 9), ("10-19", 10, 19), ("20-29", 20, 29), ("30-39", 30, 39), ("40-45", 40, 45)]
for nums in all_numbers:
for label, lo, hi in zone_ranges:
zone_totals[label] += sum(1 for n in nums if lo <= n <= hi)
zone_avg = {k: round(v / total, 2) for k, v in zone_totals.items()}
avg_odd = sum(odd_counts) / total
avg_sum = sum(sums) / total
avg_range = sum(ranges) / total
# 역대 당첨번호 평균과 비교
if draws:
draw_odd_avg = sum(sum(1 for n in nums if n % 2 == 1) for _, nums in draws) / len(draws)
draw_sum_avg = sum(sum(nums) for _, nums in draws) / len(draws)
else:
draw_odd_avg = 3.0
draw_sum_avg = 138.0
return {
"total_analyzed": total,
"number_frequency": number_frequency,
"top_picks": top_picks,
"least_picks": least_picks,
"pattern": {
"avg_odd_count": round(avg_odd, 2),
"avg_sum": round(avg_sum, 1),
"avg_range": round(avg_range, 1),
"consecutive_rate": round(consecutive_count / total, 3),
"zone_avg": zone_avg,
},
"vs_draw_avg": {
"odd_diff": round(avg_odd - draw_odd_avg, 2),
"sum_diff": round(avg_sum - draw_sum_avg, 1),
"odd_tendency": "홀수 선호" if avg_odd > draw_odd_avg + 0.2 else ("짝수 선호" if avg_odd < draw_odd_avg - 0.2 else "균형"),
"sum_tendency": "고합계 선호" if avg_sum > draw_sum_avg + 5 else ("저합계 선호" if avg_sum < draw_sum_avg - 5 else "균형"),
},
}
def generate_combined_recommendation(draws: List[Tuple[int, List[int]]]) -> Dict[str, Any]:
"""
5가지 통계 기법을 종합한 추론 번호 추천.
각 기법이 상위 6개 번호를 추천하고, 기법별 가중치(score_combination 가중치와 동일)로
투표를 집계한 뒤 최종 6개 번호를 선정한다.
가중치: 빈도Z(25%) · 지문(30%) · 갭(20%) · 공동출현(15%) · 다양성(10%)
"""
if not draws:
return {"error": "데이터 없음"}
cache = build_analysis_cache(draws)
z = cache["z_scores"]
gap = cache["last_seen_gap"]
freq = cache["freq_all"]
cooccur = cache["cooccur"]
zone_probs = cache["zone_probs"]
# ── Method 1: 빈도 Z-score ────────────────────────────────────────────────
# Z-score 내림차순 상위 6 (출현 빈도가 기댓값보다 높은 번호)
m_frequency = sorted(range(1, 46), key=lambda n: -z.get(n, 0))[:6]
# ── Method 2: 갭 분석 ─────────────────────────────────────────────────────
# 가장 오래 미출현한 번호 6개 (오버듀)
m_gap = sorted(range(1, 46), key=lambda n: -gap.get(n, 0))[:6]
# ── Method 3: 공동출현 ────────────────────────────────────────────────────
# 각 번호의 총 공동출현 합산 점수 내림차순 6개
cooccur_total: Dict[int, float] = defaultdict(float)
for (a, b), cnt in cooccur.items():
cooccur_total[a] += cnt
cooccur_total[b] += cnt
m_cooccur = sorted(range(1, 46), key=lambda n: -cooccur_total.get(n, 0))[:6]
# ── Method 4: 조합 지문 ───────────────────────────────────────────────────
# 역대 당첨 조합의 구간별 최빈 분포에 맞게 각 구간에서 빈도 상위 번호 선택
zone_targets: List[int] = []
for zp in zone_probs:
zone_targets.append(max(zp, key=zp.get) if zp else 1)
# 합이 정확히 6이 되도록 조정
diff = sum(zone_targets) - 6
if diff > 0:
idxs = sorted(range(5), key=lambda i: -zone_targets[i])
for i in idxs:
if diff <= 0:
break
zone_targets[i] = max(0, zone_targets[i] - 1)
diff -= 1
elif diff < 0:
idxs = sorted(range(5), key=lambda i: zone_targets[i])
for i in idxs:
if diff >= 0:
break
zone_targets[i] += 1
diff += 1
m_fingerprint: List[int] = []
for (lo, hi), tgt in zip(ZONE_RANGES, zone_targets):
zone_nums = sorted(range(lo, hi + 1), key=lambda x: -freq.get(x, 0))
m_fingerprint.extend(zone_nums[:tgt])
m_fingerprint = sorted(m_fingerprint[:6])
# ── Method 5: 다양성 ──────────────────────────────────────────────────────
# 각 구간에서 갭 가장 큰 번호 1개씩 (5개) + 전체 갭 상위 1개 보충
m_diversity: List[int] = []
for lo, hi in ZONE_RANGES:
zone_nums = sorted(range(lo, hi + 1), key=lambda n: -gap.get(n, 0))
if zone_nums:
m_diversity.append(zone_nums[0])
if len(m_diversity) < 6:
rest = sorted(
[x for x in range(1, 46) if x not in m_diversity],
key=lambda n: -gap.get(n, 0),
)
m_diversity.extend(rest[: 6 - len(m_diversity)])
m_diversity = sorted(m_diversity[:6])
# ── 가중 투표 집계 ────────────────────────────────────────────────────────
# score_combination 가중치와 동일: 빈도25, 지문30, 갭20, 공동출현15, 다양성10
method_entries = [
(m_frequency, 25, "frequency", "빈도 Z-score"),
(m_fingerprint, 30, "fingerprint", "조합 지문"),
(m_gap, 20, "gap", "갭 분석"),
(m_cooccur, 15, "cooccur", "공동 출현"),
(m_diversity, 10, "diversity", "다양성"),
]
vote_scores: Dict[int, float] = {n: 0.0 for n in range(1, 46)}
for method_nums, weight, _, _ in method_entries:
for rank, n in enumerate(method_nums):
# rank 0 = 1위: (6-0)×weight = 6w, rank 5 = 6위: (6-5)×weight = w
vote_scores[n] += (6 - rank) * weight
# 상위 6개 — 동점 시 Z-score 타이브레이크
final_numbers = sorted(
sorted(range(1, 46), key=lambda n: (-vote_scores[n], -z.get(n, 0)))[:6]
)
scores = score_combination(final_numbers, cache)
# 각 번호가 몇 개 방법에서 채택됐는지
vote_counts: Dict[str, int] = {
str(n): sum(1 for nums, _, _, _ in method_entries if n in nums)
for n in range(1, 46)
}
methods_result: Dict[str, Any] = {}
for nums, weight, key, label in method_entries:
methods_result[key] = {
"label": label,
"weight_pct": weight,
"numbers": sorted(nums),
}
return {
"methods": methods_result,
"final_numbers": final_numbers,
"scores": scores,
"vote_scores": {str(n): round(vote_scores[n], 1) for n in range(1, 46)},
"vote_counts": vote_counts,
"total_draws": cache["total_draws"],
}
def generate_weekly_report(draws: List[Tuple[int, List[int]]], target_drw_no: int) -> Dict[str, Any]:
"""
특정 회차 공략 리포트 생성.
target_drw_no: 공략 대상 회차 (아직 추첨 안 된 회차)
draws: target_drw_no 이전까지의 당첨번호 (오름차순)
"""
if not draws:
return {"error": "데이터 없음"}
cache = build_analysis_cache(draws)
total_draws = cache["total_draws"]
freq_all = cache["freq_all"]
last_seen_gap = cache["last_seen_gap"]
recent_10 = draws[-10:] if len(draws) >= 10 else draws
recent_3 = draws[-3:] if len(draws) >= 3 else draws
# 과출현: 최근 10회에 2회 이상 출현 번호 (출현 많은 순)
r10_nums = [n for _, nums in recent_10 for n in nums]
r10_freq = Counter(r10_nums)
hot_numbers = [n for n, _ in sorted(r10_freq.items(), key=lambda x: -x[1]) if r10_freq[n] >= 2]
# 냉각: 역대 출현 빈도 낮은 번호
cold_numbers = sorted(range(1, 46), key=lambda n: freq_all.get(n, 0))[:10]
# 오버듀: 가장 오래 미출현 번호
overdue_numbers = sorted(range(1, 46), key=lambda n: -last_seen_gap.get(n, 0))[:10]
# 최근 3회 연속 출현 (2회 이상)
r3_nums = [n for _, nums in recent_3 for n in nums]
r3_freq = Counter(r3_nums)
triple_appear = sorted(n for n, cnt in r3_freq.items() if cnt >= 2)
recent_sums = [sum(nums) for _, nums in recent_10]
recent_odd = [sum(1 for n in nums if n % 2 == 1) for _, nums in recent_10]
# 갭 기반 가중치 (오래된 번호일수록 높음)
gap_w = {n: last_seen_gap.get(n, 0) for n in range(1, 46)}
def _pick(exclude=None, prefer=None, n=6):
ex = set(exclude or [])
chosen = []
# prefer에서 최대 3개 우선 선택
for p in (prefer or []):
if p not in ex and len(chosen) < 3:
chosen.append(p)
# 구간별 1개씩 (갭 우선)
for lo, hi in [(1, 9), (10, 19), (20, 29), (30, 39), (40, 45)]:
if len(chosen) >= n:
break
cands = [x for x in range(lo, hi + 1) if x not in ex and x not in chosen]
if cands:
chosen.append(max(cands, key=lambda x: gap_w.get(x, 0)))
# 부족하면 나머지에서 갭 순
rest = sorted(
[x for x in range(1, 46) if x not in ex and x not in chosen],
key=lambda x: -gap_w.get(x, 0),
)
while len(chosen) < n and rest:
chosen.append(rest.pop(0))
return sorted(chosen[:n])
set1 = _pick(exclude=hot_numbers[:5], prefer=overdue_numbers[:5])
set2 = _pick()
set3 = _pick(exclude=hot_numbers)
# 신뢰도 점수
data_vol = min(total_draws / 500, 1.0)
if len(recent_sums) > 1:
avg_s = sum(recent_sums) / len(recent_sums)
std_s = (sum((s - avg_s) ** 2 for s in recent_sums) / len(recent_sums)) ** 0.5
pattern = max(0.0, 1.0 - std_s / 60.0)
else:
pattern = 0.5
trend = max(0.0, 1.0 - len(hot_numbers) / max(len(r10_nums), 1))
confidence = round((data_vol * 0.4 + pattern * 0.35 + trend * 0.25) * 100)
return {
"target_drw_no": target_drw_no,
"based_on_draw": draws[-1][0],
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"hot_numbers": hot_numbers[:8],
"cold_numbers": cold_numbers,
"overdue_numbers": overdue_numbers,
"recent_pattern": {
"last3_numbers": sorted(set(r3_nums)),
"triple_appear": triple_appear,
"recent_sum_avg": round(sum(recent_sums) / len(recent_sums), 1) if recent_sums else 0,
"recent_odd_avg": round(sum(recent_odd) / len(recent_odd), 1) if recent_odd else 0,
},
"recommended_sets": [
{"strategy": "냉각번호 중심", "numbers": set1, "description": "오랫동안 미출현 번호 위주 + 과출현 제외"},
{"strategy": "균형형", "numbers": set2, "description": "구간 균형 + 갭 최적화"},
{"strategy": "과출현 피하기", "numbers": set3, "description": "최근 자주 나온 번호 완전 제외"},
],
"confidence_score": confidence,
"confidence_factors": {
"data_volume": round(data_vol * 100),
"pattern_consistency": round(pattern * 100),
"recent_trend": round(trend * 100),
},
}

73
lotto/app/checker.py Normal file
View File

@@ -0,0 +1,73 @@
import json
from .db import (
_conn, get_draw, update_recommendation_result
)
def _calc_rank(my_nums: list[int], win_nums: list[int], bonus: int) -> tuple[int, int, bool]:
"""
(rank, correct_cnt, has_bonus) 반환
rank: 1~5 (1등~5등), 0 (낙첨)
"""
matched = set(my_nums) & set(win_nums)
cnt = len(matched)
has_bonus = bonus in my_nums
if cnt == 6:
return 1, cnt, has_bonus
if cnt == 5 and has_bonus:
return 2, cnt, has_bonus
if cnt == 5:
return 3, cnt, has_bonus
if cnt == 4:
return 4, cnt, has_bonus
if cnt == 3:
return 5, cnt, has_bonus
return 0, cnt, has_bonus
def check_results_for_draw(drw_no: int) -> int:
"""
특정 회차(drw_no) 결과가 나왔을 때,
해당 회차를 타겟으로 했던(based_on_draw == drw_no - 1) 추천들을 채점한다.
반환값: 채점한 개수
"""
win_row = get_draw(drw_no)
if not win_row:
return 0
win_nums = [
win_row["n1"], win_row["n2"], win_row["n3"],
win_row["n4"], win_row["n5"], win_row["n6"]
]
bonus = win_row["bonus"]
# based_on_draw가 (이번회차 - 1)인 것들 조회
# 즉, 1000회차 추첨 결과로는, 999회차 데이터를 바탕으로 1000회차를 예측한 것들을 채점
target_based_on = drw_no - 1
with _conn() as conn:
rows = conn.execute(
"""
SELECT id, numbers
FROM recommendations
WHERE based_on_draw = ? AND checked = 0
""",
(target_based_on,)
).fetchall()
count = 0
for r in rows:
my_nums = json.loads(r["numbers"])
rank, correct, has_bonus = _calc_rank(my_nums, win_nums, bonus)
update_recommendation_result(r["id"], rank, correct, has_bonus)
count += 1
# ── 구매 이력 체크 연동 ──────────────────────────────────────
try:
from .purchase_manager import check_purchases_for_draw as _check_purchases
_check_purchases(drw_no) # 내부에서 evolve_after_check → recalculate_weights 호출
except ImportError:
pass # purchase_manager 미설치 시 무시 (하위호환)
return count

85
lotto/app/collector.py Normal file
View File

@@ -0,0 +1,85 @@
import requests
from typing import Dict, Any
from .db import get_draw, upsert_draw, upsert_many_draws, get_latest_draw, count_draws
def _normalize_item(item: dict) -> dict:
# smok95 all.json / latest.json 구조
# - draw_no: int
# - numbers: [n1..n6]
# - bonus_no: int
# - date: "YYYY-MM-DD ..."
numbers = item["numbers"]
return {
"drw_no": int(item["draw_no"]),
"drw_date": (item.get("date") or "")[:10],
"n1": int(numbers[0]),
"n2": int(numbers[1]),
"n3": int(numbers[2]),
"n4": int(numbers[3]),
"n5": int(numbers[4]),
"n6": int(numbers[5]),
"bonus": int(item["bonus_no"]),
}
def sync_all_from_json(all_url: str) -> Dict[str, Any]:
r = requests.get(all_url, timeout=60)
r.raise_for_status()
data = r.json() # list[dict]
# 정규화
rows = [_normalize_item(item) for item in data]
# Bulk Insert (성능 향상)
upsert_many_draws(rows)
return {"mode": "all_json", "url": all_url, "total": len(rows)}
def sync_latest(latest_url: str) -> Dict[str, Any]:
r = requests.get(latest_url, timeout=30)
r.raise_for_status()
item = r.json()
row = _normalize_item(item)
before = get_draw(row["drw_no"])
upsert_draw(row)
return {"mode": "latest_json", "url": latest_url, "was_new": (before is None), "drawNo": row["drw_no"]}
def sync_ensure_all(latest_url: str, all_url: str) -> Dict[str, Any]:
"""
1회부터 최신 회차까지 빠짐없이 있는지 확인하고, 없으면 전체 동기화 수행.
반환값: {"synced": bool, "reason": str, ...}
"""
# 1. 원격 최신 회차 확인
try:
r = requests.get(latest_url, timeout=10)
r.raise_for_status()
remote_item = r.json()
remote_no = int(remote_item["draw_no"])
except Exception as e:
# 외부 통신 실패 시, 그냥 로컬 데이터로 진행하도록 에러 억제 (혹은 에러 발생)
# 여기서는 통계 기능 작동이 우선이므로 로그만 남기고 pass하고 싶지만,
# 확실한 동기화를 위해 에러를 던지거나 False 리턴
return {"synced": False, "error": str(e)}
# 2. 로컬 상태 확인
local_latest_row = get_latest_draw()
local_no = local_latest_row["drw_no"] if local_latest_row else 0
local_cnt = count_draws()
# 3. 동기화 필요 여부 판단
# - 전체 개수가 최신 회차 번호보다 적으면 중간에 빈 것 (1회부터 시작한다고 가정)
# - 혹은 DB 최신 번호가 원격보다 낮으면 업데이트 필요
need_sync = (local_no < remote_no) or (local_cnt < local_no)
if not need_sync:
return {"synced": True, "updated": False, "local_no": local_no}
# 4. 전체 동기화 실행
# (단순 latest sync로는 중간 구멍을 못 채우므로, 구멍이 있거나 차이가 크면 all_sync 수행)
# 만약 차이가 1회차 뿐이고 구멍이 없다면 sync_latest만 해도 되지만,
# 로직 단순화를 위해 missing 감지 시 그냥 all_sync (Bulk Insert라 빠름)
res = sync_all_from_json(all_url)
return {"synced": True, "updated": True, "detail": res}

View File

@@ -0,0 +1,151 @@
"""큐레이터용 후보 가공 — 여러 엔진 결과를 하나로 병합, 중복 제거, 피처 계산."""
from typing import Dict, List, Any, Set
from . import db
from .recommender import recommend_numbers, recommend_with_heatmap
from .analyzer import get_statistical_report
from .strategy_evolver import generate_smart_recommendation
LOW_HIGH_CUT = 22
def compute_features(numbers: List[int], hot: Set[int], cold: Set[int]) -> Dict[str, Any]:
nums = sorted(numbers)
odd = sum(1 for n in nums if n % 2 == 1)
low = sum(1 for n in nums if n <= LOW_HIGH_CUT)
buckets = [0, 0, 0, 0, 0]
for n in nums:
if n <= 10: buckets[0] += 1
elif n <= 20: buckets[1] += 1
elif n <= 30: buckets[2] += 1
elif n <= 40: buckets[3] += 1
else: buckets[4] += 1
consecutive = any(nums[i+1] - nums[i] == 1 for i in range(len(nums) - 1))
return {
"odd_count": odd,
"even_count": 6 - odd,
"low_count": low,
"high_count": 6 - low,
"range_distribution": buckets,
"has_consecutive": consecutive,
"hot_number_count": len(set(nums) & hot),
"cold_number_count": len(set(nums) & cold),
"sum": sum(nums),
}
def _key(numbers: List[int]) -> str:
return ",".join(str(n) for n in sorted(numbers))
def collect_candidates(n: int, hot: Set[int], cold: Set[int]) -> List[Dict[str, Any]]:
"""우선순위: simulation best_picks → meta → heatmap → statistics. 중복 제거 후 최대 n세트."""
seen: Dict[str, Dict[str, Any]] = {}
order: List[str] = []
def _add(numbers: List[int], source: str) -> None:
if not numbers:
return
k = _key(numbers)
if k in seen:
return
seen[k] = {"numbers": sorted(numbers), "source": source}
order.append(k)
# 1. simulation best_picks
try:
for row in db.get_best_picks(limit=n):
_add(row.get("numbers") or [], "simulation")
except Exception:
pass
# draws는 한 번만 로드
draws = []
try:
draws = db.get_all_draw_numbers()
except Exception:
pass
# 2. meta-strategy (smart)
try:
meta = generate_smart_recommendation(sets=n)
for s in meta.get("sets", []):
_add(s.get("numbers") or [], "meta")
except Exception:
pass
# 3. heatmap (n번 호출, 중복 회피)
if draws:
try:
for _ in range(n * 2):
if len(order) >= n * 2:
break
r = recommend_with_heatmap(draws, [])
_add(r.get("numbers") or [], "heatmap")
except Exception:
pass
# 4. statistics
if draws:
try:
for _ in range(n * 2):
if len(order) >= n * 2:
break
r = recommend_numbers(draws)
_add(r.get("numbers") or [], "statistics")
except Exception:
pass
out = []
for k in order[:n]:
item = seen[k]
item["features"] = compute_features(item["numbers"], hot, cold)
out.append(item)
return out
def build_context(hot_limit: int = 10, cold_limit: int = 10) -> Dict[str, Any]:
"""주간 맥락 패키지 — get_statistical_report가 이미 hot/cold를 제공."""
hot: List[int] = []
cold: List[int] = []
last_summary = ""
try:
draws = db.get_all_draw_numbers()
except Exception:
draws = []
if draws:
try:
report = get_statistical_report(draws)
hot = list(report.get("hot_numbers", []))[:hot_limit]
cold = list(report.get("cold_numbers", []))[:cold_limit]
except Exception:
pass
try:
latest = db.get_latest_draw()
except Exception:
latest = None
if latest:
nums = [latest.get(f"n{i}") for i in range(1, 7)]
nums = [n for n in nums if n is not None]
if nums:
odd = sum(1 for n in nums if n % 2 == 1)
low = sum(1 for n in nums if n <= LOW_HIGH_CUT)
last_summary = f"{latest.get('drw_no')}회: {', '.join(str(n) for n in nums)} (홀{odd}{6-odd}, 저{low}{6-low})"
my_perf: List[Dict[str, Any]] = []
try:
from .purchase_manager import get_recent_performance
my_perf = get_recent_performance(limit=3)
except Exception:
my_perf = []
return {
"hot_numbers": hot,
"cold_numbers": cold,
"last_draw_summary": last_summary,
"my_recent_performance": my_perf,
}

1054
lotto/app/db.py Normal file

File diff suppressed because it is too large Load Diff

135
lotto/app/generator.py Normal file
View File

@@ -0,0 +1,135 @@
"""
시뮬레이션 엔진 - lotto-lab 고도화
[몬테카를로 시뮬레이션 흐름]
1. 역대 당첨번호 기반 통계 캐시 구성 (build_analysis_cache)
2. 통계 가중치로 N개 후보 조합 생성 (weighted sampling)
3. 5가지 기법으로 각 후보 스코어링 (score_combination)
4. 상위 top_k개 선별하여 DB 저장 (simulation_candidates, best_picks 교체)
[시뮬레이션 파라미터]
- n_candidates: 1회 시뮬레이션당 생성 후보 수 (기본 20,000)
- top_k: 선별 및 저장할 상위 개수 (기본 100)
- best_n: best_picks에 올릴 최상위 개수 (기본 20)
"""
import random
from typing import Dict, Any, List, Optional
from .db import (
get_latest_draw,
get_all_draw_numbers,
save_simulation_run,
save_simulation_candidates_bulk,
replace_best_picks,
)
from .analyzer import build_analysis_cache, build_number_weights, score_combination
from .utils import weighted_sample_6
def run_simulation(
n_candidates: int = 20000,
top_k: int = 100,
best_n: int = 20,
) -> Dict[str, Any]:
"""
몬테카를로 시뮬레이션 실행 메인 함수.
Args:
n_candidates: 생성할 후보 조합 수 (기본 20,000)
top_k: DB에 저장할 상위 후보 수 (기본 100)
best_n: best_picks에 올릴 최상위 수 (기본 20)
Returns:
{run_id, total_generated, top_k_selected, avg_score, best_score, based_on_draw}
또는 {"error": ...}
"""
draws = get_all_draw_numbers()
if not draws:
return {"error": "당첨번호 데이터가 없습니다. 먼저 동기화를 실행하세요."}
latest = get_latest_draw()
based_on_draw = latest["drw_no"] if latest else None
# ── 1. 통계 캐시 및 가중치 구성 (시뮬레이션 전체에서 재사용) ────────────
cache = build_analysis_cache(draws)
weights = build_number_weights(cache)
# ── 2. 후보 생성 및 스코어링 ──────────────────────────────────────────────
candidates: List[Dict[str, Any]] = []
seen_keys: set = set()
max_attempts = n_candidates * 3 # 중복 제거 여유분
attempts = 0
while len(candidates) < n_candidates and attempts < max_attempts:
attempts += 1
nums = weighted_sample_6(weights)
key = tuple(sorted(nums))
if key in seen_keys:
continue
seen_keys.add(key)
scores = score_combination(nums, cache)
candidates.append({
"numbers": sorted(nums),
**scores,
})
# ── 3. 점수 내림차순 정렬 및 상위 선별 ──────────────────────────────────
candidates.sort(key=lambda x: -x["score_total"])
top_candidates = candidates[:top_k]
# is_best 플래그 표시
best_keys = {tuple(c["numbers"]) for c in top_candidates[:best_n]}
for c in top_candidates:
c["is_best"] = tuple(c["numbers"]) in best_keys
avg_score = (
sum(c["score_total"] for c in top_candidates) / len(top_candidates)
if top_candidates else 0.0
)
best_score = top_candidates[0]["score_total"] if top_candidates else 0.0
# ── 4. DB 저장 ────────────────────────────────────────────────────────────
run_id = save_simulation_run(
strategy="monte_carlo",
total_generated=len(candidates),
top_k_selected=len(top_candidates),
avg_score=avg_score,
notes=f"based_on_draw={based_on_draw}, history={len(draws)}",
)
# 상위 top_k개만 DB에 저장 (전체 20,000개는 메모리에서만 처리)
save_simulation_candidates_bulk(run_id, top_candidates, based_on_draw)
# best_picks 교체 (상위 best_n개)
best_picks_data = [
{
"numbers": c["numbers"],
"score_total": c["score_total"],
"rank_in_run": i + 1,
}
for i, c in enumerate(top_candidates[:best_n])
]
replace_best_picks(best_picks_data, run_id, based_on_draw)
return {
"run_id": run_id,
"total_generated": len(candidates),
"top_k_selected": len(top_candidates),
"best_n_saved": len(best_picks_data),
"avg_score": round(avg_score, 6),
"best_score": round(best_score, 6),
"based_on_draw": based_on_draw,
}
def generate_smart_recommendations(count: int = 10) -> int:
"""
하위 호환성 유지용 래퍼.
내부적으로 run_simulation을 호출하며, 기존 /api/admin/auto_gen 등에서 계속 사용 가능.
"""
result = run_simulation(n_candidates=5000, top_k=count, best_n=count)
if "error" in result:
return 0
return result.get("best_n_saved", 0)

837
lotto/app/main.py Normal file
View File

@@ -0,0 +1,837 @@
import os
import time
import logging
from typing import Optional, List, Dict, Any, Tuple
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from apscheduler.schedulers.background import BackgroundScheduler
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
logger = logging.getLogger("lotto-backend")
from .db import (
init_db, get_draw, get_latest_draw, get_all_draw_numbers,
save_recommendation_dedup, list_recommendations_ex, delete_recommendation,
update_recommendation,
# 시뮬레이션 관련
get_best_picks, get_simulation_runs, get_simulation_candidates,
# 성과 통계
get_recommendation_performance,
# Phase 2: 구매 이력
add_purchase, get_purchases, update_purchase, delete_purchase, get_purchase_stats,
# Phase 2: 주간 리포트 캐시
save_weekly_report, get_weekly_report_list, get_weekly_report,
# Phase 2: 개인 패턴 분석
get_all_recommendation_numbers,
# Phase 3: 전략 관련
get_strategy_performance as db_get_strategy_performance,
)
from .recommender import recommend_numbers, recommend_with_heatmap
from .collector import sync_latest, sync_ensure_all
from .generator import run_simulation, generate_smart_recommendations
from .checker import check_results_for_draw
from .utils import calc_metrics, calc_recent_overlap
from .analyzer import get_statistical_report, generate_weekly_report, analyze_personal_patterns, generate_combined_recommendation
from .purchase_manager import check_purchases_for_draw
from .strategy_evolver import (
get_weights_with_trend, recalculate_weights,
generate_smart_recommendation,
)
from .routers import curator as curator_router
from .routers import briefing as briefing_router
app = FastAPI()
app.include_router(curator_router.router)
app.include_router(briefing_router.router)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json")
LATEST_URL = os.getenv("LOTTO_LATEST_URL", "https://smok95.github.io/lotto/results/latest.json")
# ── 성과 통계 인메모리 캐시 ───────────────────────────────────────────────────
# 채점 데이터는 하루 2번 스케줄러 실행 시에만 갱신되므로 인메모리 캐시로 충분
_PERF_CACHE: Dict[str, Any] = {"data": None, "at": 0.0}
_PERF_CACHE_TTL = 3600 # 1시간 (스케줄러 미실행 상황 대비 폴백)
def _refresh_perf_cache() -> None:
_PERF_CACHE["data"] = get_recommendation_performance()
_PERF_CACHE["at"] = time.time()
logger.info("성과 통계 캐시 갱신")
@app.on_event("startup")
def on_startup():
init_db()
# 1. 로또 당첨번호 동기화 (매일 9시, 21시 10분)
# 동기화 후 새로운 회차가 있으면 채점(check)까지 수행
def _sync_and_check():
res = sync_latest(LATEST_URL)
if res["was_new"]:
check_results_for_draw(res["drawNo"])
_refresh_perf_cache() # 새 채점 결과 반영 → 즉시 갱신
scheduler.add_job(_sync_and_check, "cron", hour="9,21", minute=10)
# 2. 몬테카를로 시뮬레이션 (하루 6회: 0, 4, 8, 12, 16, 20시)
# 20,000개 후보 생성 → 스코어링 → 상위 100개 저장 → best_picks 교체
def _run_simulation_job():
run_simulation(n_candidates=20000, top_k=100, best_n=20)
scheduler.add_job(_run_simulation_job, "cron", hour="0,4,8,12,16,20", minute=5)
# 3. 토요일 오전 9시 — 다음 회차 공략 리포트 자동 캐싱
def _save_weekly_report_job():
import json as _json
draws = get_all_draw_numbers()
latest = get_latest_draw()
if not draws or not latest:
return
target = latest["drw_no"] + 1
report = generate_weekly_report(draws, target)
save_weekly_report(target, _json.dumps(report, ensure_ascii=False))
logger.info(f"{target}회차 리포트 저장 완료")
scheduler.add_job(_save_weekly_report_job, "cron", day_of_week="sat", hour=9, minute=0)
scheduler.start()
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/lotto/latest")
def api_latest():
row = get_latest_draw()
if not row:
raise HTTPException(status_code=404, detail="No data yet")
return {
"drawNo": row["drw_no"],
"date": row["drw_date"],
"numbers": [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]],
"bonus": row["bonus"],
"metrics": calc_metrics([row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]),
}
@app.get("/api/lotto/{drw_no:int}")
def api_draw(drw_no: int):
row = get_draw(drw_no)
if not row:
raise HTTPException(status_code=404, detail="Not found")
return {
"drwNo": row["drw_no"],
"date": row["drw_date"],
"numbers": [row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]],
"bonus": row["bonus"],
"metrics": calc_metrics([row["n1"], row["n2"], row["n3"], row["n4"], row["n5"], row["n6"]]),
}
@app.post("/api/admin/sync_latest")
def admin_sync_latest():
res = sync_latest(LATEST_URL)
if res["was_new"]:
check_results_for_draw(res["drawNo"])
return res
@app.post("/api/admin/auto_gen")
def admin_auto_gen(count: int = 10):
"""기존 호환 유지: 소규모 시뮬레이션 수동 트리거"""
n = generate_smart_recommendations(count)
return {"generated": n}
@app.post("/api/admin/simulate")
def admin_simulate(n_candidates: int = 20000, top_k: int = 100, best_n: int = 20):
"""
몬테카를로 시뮬레이션 수동 트리거.
백그라운드 스케줄과 동일한 동작을 즉시 실행.
"""
result = run_simulation(
n_candidates=max(1000, min(n_candidates, 50000)),
top_k=max(10, min(top_k, 500)),
best_n=max(10, min(best_n, 50)),
)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.get("/api/lotto/stats")
def api_stats():
sync_ensure_all(LATEST_URL, ALL_URL)
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
frequency = {n: 0 for n in range(1, 46)}
total_draws = len(draws)
for _, nums in draws:
for n in nums:
frequency[n] += 1
stats = [
{"number": n, "count": frequency[n]}
for n in range(1, 46)
]
return {
"total_draws": total_draws,
"frequency": stats,
}
# ── 추천 성과 통계 (Phase 1, 인메모리 캐시) ──────────────────────────────────
@app.get("/api/lotto/stats/performance")
def api_performance_stats():
"""
채점된 추천 이력 기반 성과 통계 (캐시 반환).
캐시 갱신 시점: 새 회차 채점 직후 | TTL 1시간 만료 시
"""
if _PERF_CACHE["data"] is None or time.time() - _PERF_CACHE["at"] > _PERF_CACHE_TTL:
_refresh_perf_cache()
return _PERF_CACHE["data"]
# ── 회차 공략 리포트 (Phase 1) ────────────────────────────────────────────────
@app.get("/api/lotto/report/latest")
def api_report_latest():
"""
다음 회차 공략 리포트 (최신 회차 기준으로 자동 계산).
- 과출현/냉각/오버듀 번호 분석
- 최근 3회 패턴
- 3가지 전략별 추천 번호
- AI 신뢰도 점수
"""
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
latest = get_latest_draw()
target = latest["drw_no"] + 1
return generate_weekly_report(draws, target)
@app.get("/api/lotto/report/history")
def api_report_history(limit: int = 10):
"""저장된 주간 리포트 목록 (자동 저장된 것만)"""
return {"reports": get_weekly_report_list(limit=min(limit, 52))}
@app.get("/api/lotto/report/{drw_no}")
def api_report_by_draw(drw_no: int):
"""
특정 회차 공략 리포트 (캐시 우선, 없으면 실시간 생성).
"""
cached = get_weekly_report(drw_no)
if cached:
return {**cached, "cached": True}
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
base_draws = [(no, nums) for no, nums in draws if no < drw_no]
if not base_draws:
raise HTTPException(status_code=400, detail=f"{drw_no}회차 이전 데이터가 없습니다")
return {**generate_weekly_report(base_draws, drw_no), "cached": False}
# ── 개인 패턴 분석 (Phase 2) ─────────────────────────────────────────────────
@app.get("/api/lotto/analysis/personal")
def api_personal_analysis():
"""
저장된 추천 이력 기반 개인 패턴 분석.
- 자주 선택한 번호 TOP 10 / 한 번도 선택 안 한 번호
- 홀짝 비율, 합계, 범위, 연속번호 포함률
- 구간별 분포, 역대 당첨번호 평균과 비교
"""
all_numbers = get_all_recommendation_numbers()
draws = get_all_draw_numbers()
return analyze_personal_patterns(all_numbers, draws)
# ── 구매 이력 API (Phase 2) ───────────────────────────────────────────────────
class PurchaseCreate(BaseModel):
draw_no: int
amount: int
sets: int = 1
prize: int = 0
note: str = ""
numbers: List[List[int]] = []
is_real: bool = True
source_strategy: str = "manual"
source_detail: dict = {}
class PurchaseUpdate(BaseModel):
draw_no: Optional[int] = None
amount: Optional[int] = None
sets: Optional[int] = None
prize: Optional[int] = None
note: Optional[str] = None
numbers: Optional[List[List[int]]] = None
is_real: Optional[bool] = None
source_strategy: Optional[str] = None
@app.get("/api/lotto/purchase/stats")
def api_purchase_stats():
"""투자 수익률 통계 (총 투자금, 총 당첨금, 수익률 등)"""
return get_purchase_stats()
@app.get("/api/lotto/purchase")
def api_purchase_list(draw_no: Optional[int] = None, days: Optional[int] = None,
is_real: Optional[bool] = None, strategy: Optional[str] = None):
"""구매 이력 조회 (필터: draw_no, days, is_real, strategy)"""
return {"records": get_purchases(draw_no=draw_no, days=days, is_real=is_real, strategy=strategy)}
@app.post("/api/lotto/purchase", status_code=201)
def api_purchase_create(body: PurchaseCreate):
"""구매 이력 추가 (실제/가상)"""
sets = body.sets if body.sets > 0 else max(len(body.numbers), 1)
amount = body.amount if body.amount > 0 else sets * 1000
return add_purchase(
draw_no=body.draw_no,
amount=amount,
sets=sets,
prize=body.prize,
note=body.note,
numbers=body.numbers,
is_real=body.is_real,
source_strategy=body.source_strategy,
source_detail=body.source_detail,
)
@app.put("/api/lotto/purchase/{purchase_id}")
def api_purchase_update(purchase_id: int, body: PurchaseUpdate):
"""구매 이력 수정 (당첨금 업데이트 등)"""
updated = update_purchase(purchase_id, body.model_dump(exclude_none=True))
if updated is None:
raise HTTPException(status_code=404, detail="Purchase not found")
return updated
@app.delete("/api/lotto/purchase/{purchase_id}")
def api_purchase_delete(purchase_id: int):
"""구매 이력 삭제"""
if not delete_purchase(purchase_id):
raise HTTPException(status_code=404, detail="Purchase not found")
return {"ok": True}
# ── 전략 진화 API ──────────────────────────────────────────────────────────
@app.get("/api/lotto/strategy/weights")
def api_strategy_weights():
"""현재 전략별 가중치 + 성과 요약 + trend"""
return get_weights_with_trend()
@app.get("/api/lotto/strategy/performance")
def api_strategy_performance(strategy: Optional[str] = None, days: Optional[int] = None):
"""전략별 회차 성과 이력 (차트용)"""
rows = db_get_strategy_performance(strategy=strategy, days=days)
return {"records": rows}
@app.post("/api/lotto/strategy/evolve")
def api_strategy_evolve():
"""수동 가중치 재계산 트리거"""
new_weights = recalculate_weights()
return {"ok": True, "weights": new_weights}
# ── 스마트 추천 API ────────────────────────────────────────────────────────
@app.get("/api/lotto/recommend/smart")
def api_recommend_smart(sets: int = 5):
"""전략 가중치 기반 메타 전략 추천"""
sets = max(1, min(sets, 10))
result = generate_smart_recommendation(sets=sets)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
# ── 통계 분석 리포트 ────────────────────────────────────────────────────────
@app.get("/api/lotto/analysis")
def api_analysis():
"""
5가지 통계 기법 기반 분석 리포트.
- 번호별 빈도, Z-score, 갭
- 핫/콜드/오버듀 번호
- 역대 합계 분포, 홀짝 분포
"""
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
return get_statistical_report(draws)
# ── 시뮬레이션 best_picks (메인 추천 엔드포인트) ────────────────────────────
@app.get("/api/lotto/best")
def api_best_picks(limit: int = 20):
"""
시뮬레이션을 통해 선별된 최적 번호 조합 반환 (기본 20쌍).
하루 6회 시뮬레이션 후 자동 갱신됨.
각 조합에 점수 및 메트릭 포함.
"""
limit = max(1, min(limit, 50))
picks = get_best_picks(limit=limit)
if not picks:
raise HTTPException(
status_code=404,
detail="시뮬레이션 결과가 없습니다. /api/admin/simulate로 먼저 실행하세요.",
)
draws = get_all_draw_numbers()
result = []
for p in picks:
nums = p["numbers"]
result.append({
"rank": p["rank_in_run"],
"numbers": nums,
"score_total": p["score_total"],
"based_on_draw": p["based_on_draw"],
"simulation_run_id": p["source_run_id"],
"created_at": p["created_at"],
"metrics": calc_metrics(nums),
})
latest = get_latest_draw()
return {
"based_on_draw": latest["drw_no"] if latest else None,
"count": len(result),
"items": result,
}
# ── 시뮬레이션 전체 결과 조회 (상세 API) ────────────────────────────────────
@app.get("/api/lotto/simulation")
def api_simulation(run_id: Optional[int] = None, runs_limit: int = 5):
"""
시뮬레이션 실행 기록 및 상위 후보 상세 조회.
run_id 미지정 시: 최근 runs_limit개 실행 기록 + 가장 최근 run의 후보 반환.
run_id 지정 시: 해당 run의 후보만 반환.
"""
runs = get_simulation_runs(limit=runs_limit)
if not runs:
raise HTTPException(status_code=404, detail="시뮬레이션 기록이 없습니다.")
target_run_id = run_id if run_id is not None else runs[0]["id"]
candidates = get_simulation_candidates(target_run_id, limit=100)
# 후보에 메트릭 추가
enriched = []
for c in candidates:
enriched.append({
**c,
"metrics": calc_metrics(c["numbers"]),
})
return {
"runs": runs,
"selected_run_id": target_run_id,
"candidates_count": len(enriched),
"candidates": enriched,
}
# ── 종합 추론 추천 ───────────────────────────────────────────────────────────
@app.get("/api/lotto/recommend/combined")
def api_recommend_combined():
"""5가지 통계 기법 종합 추론 추천 — 결과를 이력에 저장한다."""
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data")
latest = get_latest_draw()
result = generate_combined_recommendation(draws)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
# 추천 이력 저장 (태그: 종합추론)
params = {"method": "combined"}
saved = save_recommendation_dedup(
latest["drw_no"] if latest else None,
result["final_numbers"],
params,
)
if saved["saved"]:
update_recommendation(saved["id"], tags=["종합추론"])
return {
**result,
"id": saved["id"],
"saved": saved["saved"],
"deduped": saved["deduped"],
"based_on_latest_draw": latest["drw_no"] if latest else None,
}
@app.get("/api/lotto/recommend/combined/history")
def api_combined_history(limit: int = 30):
"""종합추론 추천 이력 조회."""
items = list_recommendations_ex(limit=limit, tag="종합추론", sort="id_desc")
return {"items": items, "total": len(items)}
# ── 기존 수동 추천 API (하위 호환 유지) ─────────────────────────────────────
@app.get("/api/lotto/recommend")
def api_recommend(
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
sum_min: Optional[int] = None,
sum_max: Optional[int] = None,
odd_min: Optional[int] = None,
odd_max: Optional[int] = None,
range_min: Optional[int] = None,
range_max: Optional[int] = None,
max_overlap_latest: Optional[int] = None,
max_try: int = 200,
):
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
latest = get_latest_draw()
params = {
"recent_window": recent_window,
"recent_weight": float(recent_weight),
"avoid_recent_k": avoid_recent_k,
"sum_min": sum_min,
"sum_max": sum_max,
"odd_min": odd_min,
"odd_max": odd_max,
"range_min": range_min,
"range_max": range_max,
"max_overlap_latest": max_overlap_latest,
"max_try": int(max_try),
}
def _accept(nums: List[int]) -> bool:
m = calc_metrics(nums)
if sum_min is not None and m["sum"] < sum_min:
return False
if sum_max is not None and m["sum"] > sum_max:
return False
if odd_min is not None and m["odd"] < odd_min:
return False
if odd_max is not None and m["odd"] > odd_max:
return False
if range_min is not None and m["range"] < range_min:
return False
if range_max is not None and m["range"] > range_max:
return False
if max_overlap_latest is not None:
ov = calc_recent_overlap(nums, draws, last_k=avoid_recent_k)
if ov["repeats"] > max_overlap_latest:
return False
return True
chosen = None
explain = None
tries = 0
while tries < max_try:
tries += 1
result = recommend_numbers(
draws,
recent_window=recent_window,
recent_weight=recent_weight,
avoid_recent_k=avoid_recent_k,
)
nums = result["numbers"]
if _accept(nums):
chosen = nums
explain = result["explain"]
break
if chosen is None:
raise HTTPException(
status_code=400,
detail=f"Constraints too strict. No valid set found in max_try={max_try}.",
)
saved = save_recommendation_dedup(
latest["drw_no"] if latest else None,
chosen,
params,
)
metrics = calc_metrics(chosen)
overlap = calc_recent_overlap(chosen, draws, last_k=avoid_recent_k)
return {
"id": saved["id"],
"saved": saved["saved"],
"deduped": saved["deduped"],
"based_on_latest_draw": latest["drw_no"] if latest else None,
"numbers": chosen,
"explain": explain,
"params": params,
"metrics": metrics,
"recent_overlap": overlap,
"tries": tries,
}
# ── 히트맵 기반 추천 (하위 호환 유지) ────────────────────────────────────────
@app.get("/api/lotto/recommend/heatmap")
def api_recommend_heatmap(
heatmap_window: int = 20,
heatmap_weight: float = 1.5,
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
sum_min: Optional[int] = None,
sum_max: Optional[int] = None,
odd_min: Optional[int] = None,
odd_max: Optional[int] = None,
range_min: Optional[int] = None,
range_max: Optional[int] = None,
max_overlap_latest: Optional[int] = None,
max_try: int = 200,
):
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
past_recs = list_recommendations_ex(limit=100, sort="id_desc")
latest = get_latest_draw()
params = {
"heatmap_window": heatmap_window,
"heatmap_weight": float(heatmap_weight),
"recent_window": recent_window,
"recent_weight": float(recent_weight),
"avoid_recent_k": avoid_recent_k,
"sum_min": sum_min,
"sum_max": sum_max,
"odd_min": odd_min,
"odd_max": odd_max,
"range_min": range_min,
"range_max": range_max,
"max_overlap_latest": max_overlap_latest,
"max_try": int(max_try),
}
def _accept(nums: List[int]) -> bool:
m = calc_metrics(nums)
if sum_min is not None and m["sum"] < sum_min:
return False
if sum_max is not None and m["sum"] > sum_max:
return False
if odd_min is not None and m["odd"] < odd_min:
return False
if odd_max is not None and m["odd"] > odd_max:
return False
if range_min is not None and m["range"] < range_min:
return False
if range_max is not None and m["range"] > range_max:
return False
if max_overlap_latest is not None:
ov = calc_recent_overlap(nums, draws, last_k=avoid_recent_k)
if ov["repeats"] > max_overlap_latest:
return False
return True
chosen = None
explain = None
tries = 0
while tries < max_try:
tries += 1
result = recommend_with_heatmap(
draws,
past_recs,
heatmap_window=heatmap_window,
heatmap_weight=heatmap_weight,
recent_window=recent_window,
recent_weight=recent_weight,
avoid_recent_k=avoid_recent_k,
)
nums = result["numbers"]
if _accept(nums):
chosen = nums
explain = result["explain"]
break
if chosen is None:
raise HTTPException(
status_code=400,
detail=f"Constraints too strict. No valid set found in max_try={max_try}.",
)
saved = save_recommendation_dedup(
latest["drw_no"] if latest else None,
chosen,
params,
)
metrics = calc_metrics(chosen)
overlap = calc_recent_overlap(chosen, draws, last_k=avoid_recent_k)
return {
"id": saved["id"],
"saved": saved["saved"],
"deduped": saved["deduped"],
"based_on_latest_draw": latest["drw_no"] if latest else None,
"numbers": chosen,
"explain": explain,
"params": params,
"metrics": metrics,
"recent_overlap": overlap,
"tries": tries,
}
# ── 추천 이력 ────────────────────────────────────────────────────────────────
@app.get("/api/history")
def api_history(
limit: int = 30,
offset: int = 0,
favorite: Optional[bool] = None,
tag: Optional[str] = None,
q: Optional[str] = None,
sort: str = "id_desc",
):
items = list_recommendations_ex(
limit=limit,
offset=offset,
favorite=favorite,
tag=tag,
q=q,
sort=sort,
)
draws = get_all_draw_numbers()
out = []
for it in items:
nums = it["numbers"]
out.append({
**it,
"metrics": calc_metrics(nums),
"recent_overlap": calc_recent_overlap(
nums, draws, last_k=int(it["params"].get("avoid_recent_k", 0) or 0)
),
})
return {
"items": out,
"limit": limit,
"offset": offset,
"filters": {"favorite": favorite, "tag": tag, "q": q, "sort": sort},
}
@app.delete("/api/history/{rec_id:int}")
def api_history_delete(rec_id: int):
ok = delete_recommendation(rec_id)
if not ok:
raise HTTPException(status_code=404, detail="Not found")
return {"deleted": True, "id": rec_id}
class HistoryUpdate(BaseModel):
favorite: Optional[bool] = None
note: Optional[str] = None
tags: Optional[List[str]] = None
@app.patch("/api/history/{rec_id:int}")
def api_history_patch(rec_id: int, body: HistoryUpdate):
ok = update_recommendation(rec_id, favorite=body.favorite, note=body.note, tags=body.tags)
if not ok:
raise HTTPException(status_code=404, detail="Not found or no changes")
return {"updated": True, "id": rec_id}
# ── 배치 추천 (하위 호환 유지) ───────────────────────────────────────────────
def _batch_unique(draws, count: int, recent_window: int, recent_weight: float, avoid_recent_k: int, max_try: int = 200):
items = []
seen = set()
tries = 0
while len(items) < count and tries < max_try:
tries += 1
r = recommend_numbers(draws, recent_window=recent_window, recent_weight=recent_weight, avoid_recent_k=avoid_recent_k)
key = tuple(sorted(r["numbers"]))
if key in seen:
continue
seen.add(key)
items.append(r)
return items
@app.get("/api/lotto/recommend/batch")
def api_recommend_batch(
count: int = 5,
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
):
count = max(1, min(count, 20))
draws = get_all_draw_numbers()
if not draws:
raise HTTPException(status_code=404, detail="No data yet")
latest = get_latest_draw()
params = {
"recent_window": recent_window,
"recent_weight": float(recent_weight),
"avoid_recent_k": avoid_recent_k,
"count": count,
}
items = _batch_unique(draws, count, recent_window, float(recent_weight), avoid_recent_k)
return {
"based_on_latest_draw": latest["drw_no"] if latest else None,
"count": count,
"items": [{
"numbers": it["numbers"],
"explain": it["explain"],
"metrics": calc_metrics(it["numbers"]),
} for it in items],
"params": params,
}
class BatchSave(BaseModel):
items: List[List[int]]
params: dict
@app.post("/api/lotto/recommend/batch")
def api_recommend_batch_save(body: BatchSave):
latest = get_latest_draw()
based = latest["drw_no"] if latest else None
created, deduped = [], []
for nums in body.items:
saved = save_recommendation_dedup(based, nums, body.params)
(created if saved["saved"] else deduped).append(saved["id"])
return {"saved": True, "created_ids": created, "deduped_ids": deduped}
@app.get("/api/version")
def version():
return {"version": os.getenv("APP_VERSION", "dev")}

View File

@@ -0,0 +1,116 @@
"""
구매 이력 관리 + 결과 체크 모듈.
- check_purchases_for_draw(): 특정 회차 구매 건들의 결과를 자동 체크
- 체커의 _calc_rank 재사용
- 결과 체크 후 strategy_performance 자동 갱신
"""
import logging
from .db import (
get_draw, get_purchases, update_purchase_results,
upsert_strategy_performance,
)
from .checker import _calc_rank
logger = logging.getLogger("lotto-backend")
RANK_PRIZE = {1: 0, 2: 0, 3: 1_500_000, 4: 50_000, 5: 5_000}
def check_purchases_for_draw(drw_no: int) -> int:
"""
특정 회차 결과로 해당 회차 구매 건들을 채점한다.
Returns: 채점한 구매 건 수
"""
win_row = get_draw(drw_no)
if not win_row:
return 0
win_nums = [win_row["n1"], win_row["n2"], win_row["n3"],
win_row["n4"], win_row["n5"], win_row["n6"]]
bonus = win_row["bonus"]
unchecked = get_purchases(draw_no=drw_no, checked=False)
strategy_agg = {}
count = 0
for purchase in unchecked:
numbers_list = purchase["numbers"]
if not numbers_list:
continue
results = []
for nums in numbers_list:
rank, correct, has_bonus = _calc_rank(nums, win_nums, bonus)
prize = RANK_PRIZE.get(rank, 0)
results.append({
"numbers": nums,
"rank": rank,
"correct": correct,
"has_bonus": has_bonus,
"prize": prize,
})
total_prize = sum(r["prize"] for r in results)
update_purchase_results(purchase["id"], results, total_prize)
strat = purchase["source_strategy"]
if strat not in strategy_agg:
strategy_agg[strat] = {
"sets_count": 0,
"total_correct": 0,
"max_correct": 0,
"prize_total": 0,
"scores": [],
"_results": [],
}
agg = strategy_agg[strat]
agg["_results"].extend(results)
for r in results:
agg["sets_count"] += 1
agg["total_correct"] += r["correct"]
agg["max_correct"] = max(agg["max_correct"], r["correct"])
agg["prize_total"] += r["prize"]
agg["scores"].append(r["correct"] / 6.0)
count += 1
for strat, agg in strategy_agg.items():
avg_score = sum(agg["scores"]) / len(agg["scores"]) if agg["scores"] else 0.0
upsert_strategy_performance(
strategy=strat,
draw_no=drw_no,
sets_count=agg["sets_count"],
total_correct=agg["total_correct"],
max_correct=agg["max_correct"],
prize_total=agg["prize_total"],
avg_score=round(avg_score, 4),
)
# EMA 피드백 루프: 전략 가중치 진화
try:
from .strategy_evolver import evolve_after_check
evolve_after_check(strat, drw_no, agg["_results"])
except Exception:
logger.debug(f"[purchase_manager] evolve_after_check 건너뜀: {strat}")
logger.info(f"[purchase_manager] {drw_no}회차 구매 {count}건 체크 완료")
return count
def get_recent_performance(limit: int = 3) -> list:
"""최근 N회차 내 구매 성과 요약. 없으면 빈 리스트."""
from . import db
purchases = db.get_purchases() or []
by_draw: dict = {}
for p in purchases:
d = p.get("draw_no")
if not d:
continue
results = p.get("results") or []
max_correct = max((int(r.get("correct") or 0) for r in results), default=0)
slot = by_draw.setdefault(d, {"draw_no": d, "purchased_sets": 0, "best_match": 0})
slot["purchased_sets"] += int(p.get("sets") or 1)
slot["best_match"] = max(slot["best_match"], max_correct)
return sorted(by_draw.values(), key=lambda x: -x["draw_no"])[:limit]

139
lotto/app/recommender.py Normal file
View File

@@ -0,0 +1,139 @@
import random
from collections import Counter
from typing import Dict, Any, List, Tuple
from .utils import weighted_sample_6
def recommend_numbers(
draws: List[Tuple[int, List[int]]],
*,
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
seed: int | None = None,
) -> Dict[str, Any]:
"""
가벼운 통계 기반 추천:
- 전체 빈도 + 최근(recent_window) 빈도에 가중치를 더한 가중 샘플링
- 최근 avoid_recent_k 회차에 나온 번호는 확률을 낮춤(완전 제외는 아님)
"""
if seed is not None:
random.seed(seed)
# 전체 빈도
all_nums = [n for _, nums in draws for n in nums]
freq_all = Counter(all_nums)
# 최근 빈도
recent = draws[-recent_window:] if len(draws) >= recent_window else draws
recent_nums = [n for _, nums in recent for n in nums]
freq_recent = Counter(recent_nums)
# 최근 k회차 번호(패널티)
last_k = draws[-avoid_recent_k:] if len(draws) >= avoid_recent_k else draws
last_k_nums = set(n for _, nums in last_k for n in nums)
# 가중치 구성
weights = {}
for n in range(1, 46):
w = freq_all[n] + recent_weight * freq_recent[n]
if n in last_k_nums:
w *= 0.6 # 최근에 너무 방금 나온 건 살짝 덜 뽑히게
weights[n] = max(w, 0.1)
# 중복 없이 6개 뽑기(가중 샘플링)
chosen_sorted = sorted(weighted_sample_6(weights))
explain = {
"recent_window": recent_window,
"recent_weight": recent_weight,
"avoid_recent_k": avoid_recent_k,
"top_all": [n for n, _ in freq_all.most_common(10)],
"top_recent": [n for n, _ in freq_recent.most_common(10)],
"last_k_draws": [d for d, _ in last_k],
}
return {"numbers": chosen_sorted, "explain": explain}
def recommend_with_heatmap(
draws: List[Tuple[int, List[int]]],
past_recommendations: List[Dict[str, Any]],
*,
heatmap_window: int = 10,
heatmap_weight: float = 1.5,
recent_window: int = 200,
recent_weight: float = 2.0,
avoid_recent_k: int = 5,
seed: int | None = None,
) -> Dict[str, Any]:
"""
히트맵 기반 가중치 추천:
- 과거 추천 번호들의 적중률을 분석하여 잘 맞춘 번호에 가중치 부여
- 기존 통계 기반 추천과 결합
Args:
draws: 실제 당첨 번호 리스트 [(회차, [번호들]), ...]
past_recommendations: 과거 추천 데이터 [{"numbers": [...], "correct_count": N, "based_on_draw": M}, ...]
heatmap_window: 히트맵 분석할 최근 추천 개수
heatmap_weight: 히트맵 가중치 (높을수록 과거 적중 번호 선호)
"""
if seed is not None:
random.seed(seed)
# 1. 기존 통계 기반 가중치 계산
all_nums = [n for _, nums in draws for n in nums]
freq_all = Counter(all_nums)
recent = draws[-recent_window:] if len(draws) >= recent_window else draws
recent_nums = [n for _, nums in recent for n in nums]
freq_recent = Counter(recent_nums)
last_k = draws[-avoid_recent_k:] if len(draws) >= avoid_recent_k else draws
last_k_nums = set(n for _, nums in last_k for n in nums)
# 2. 히트맵 생성: 과거 추천에서 적중한 번호들의 빈도
heatmap = Counter()
recent_recs = past_recommendations[-heatmap_window:] if len(past_recommendations) >= heatmap_window else past_recommendations
for rec in recent_recs:
if rec.get("correct_count", 0) > 0: # 적중한 추천만
# 적중 개수에 비례해서 가중치 부여 (많이 맞춘 추천일수록 높은 가중)
weight = rec["correct_count"] ** 1.5 # 제곱으로 강조
for num in rec["numbers"]:
heatmap[num] += weight
# 3. 최종 가중치 = 기존 통계 + 히트맵
weights = {}
for n in range(1, 46):
w = freq_all[n] + recent_weight * freq_recent[n]
# 히트맵 가중치 추가
if n in heatmap:
w += heatmap_weight * heatmap[n]
# 최근 출현 번호 패널티
if n in last_k_nums:
w *= 0.6
weights[n] = max(w, 0.1)
# 4. 가중 샘플링으로 6개 선택
chosen_sorted = sorted(weighted_sample_6(weights))
# 5. 설명 데이터
explain = {
"recent_window": recent_window,
"recent_weight": recent_weight,
"avoid_recent_k": avoid_recent_k,
"heatmap_window": heatmap_window,
"heatmap_weight": heatmap_weight,
"top_all": [n for n, _ in freq_all.most_common(10)],
"top_recent": [n for n, _ in freq_recent.most_common(10)],
"top_heatmap": [n for n, _ in heatmap.most_common(10)],
"last_k_draws": [d for d, _ in last_k],
"analyzed_recommendations": len(recent_recs),
}
return {"numbers": chosen_sorted, "explain": explain}

View File

@@ -0,0 +1,5 @@
fastapi==0.115.6
uvicorn[standard]==0.30.6
requests==2.32.3
beautifulsoup4==4.12.3
APScheduler==3.10.4

View File

View File

@@ -0,0 +1,53 @@
"""브리핑 저장/조회 + 큐레이터 사용량 엔드포인트."""
from typing import Any, Dict, List
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from .. import db
router = APIRouter(prefix="/api/lotto")
class BriefingRequest(BaseModel):
draw_no: int
picks: List[Dict[str, Any]]
narrative: Dict[str, Any]
confidence: int = Field(ge=0, le=100)
model: str
tokens_input: int = 0
tokens_output: int = 0
cache_read: int = 0
cache_write: int = 0
latency_ms: int = 0
source: str = "auto"
@router.post("/briefing", status_code=201)
def save_briefing(body: BriefingRequest):
bid = db.save_briefing(body.model_dump())
return {"ok": True, "id": bid}
@router.get("/briefing/latest")
def latest():
b = db.get_latest_briefing()
if not b:
raise HTTPException(404, "no briefing yet")
return b
@router.get("/briefing/{draw_no}")
def get_one(draw_no: int):
b = db.get_briefing(draw_no)
if not b:
raise HTTPException(404, f"no briefing for draw {draw_no}")
return b
@router.get("/briefing")
def history(limit: int = 10):
return {"briefings": db.list_briefings(limit)}
@router.get("/curator/usage")
def usage(days: int = 30):
return db.get_curator_usage(days)

View File

@@ -0,0 +1,24 @@
"""큐레이터 입력 엔드포인트 — agent-office에서만 호출."""
from fastapi import APIRouter
from ..curator_helpers import collect_candidates, build_context
from .. import db
router = APIRouter(prefix="/api/lotto/curator")
@router.get("/candidates")
def candidates(n: int = 20):
ctx = build_context()
hot = set(ctx["hot_numbers"])
cold = set(ctx["cold_numbers"])
latest = db.get_latest_draw()
draw_no = (latest["drw_no"] + 1) if latest else 0
items = collect_candidates(n, hot, cold)
return {"draw_no": draw_no, "candidates": items}
@router.get("/context")
def context():
latest = db.get_latest_draw()
draw_no = (latest["drw_no"] + 1) if latest else 0
return {"draw_no": draw_no, **build_context()}

View File

@@ -0,0 +1,277 @@
"""
전략 진화 엔진 — EMA + Softmax 기반 적응형 가중치 관리.
"""
import math
import json
import logging
from typing import Dict, List, Any
logger = logging.getLogger("lotto-backend")
# ── Constants (importable without DB) ─────────────────────────────────────────
ALPHA = 0.3 # EMA 감쇠율
TEMPERATURE = 2.0 # Softmax 온도
MIN_WEIGHT = 0.05 # 최소 가중치
INITIAL_EMA = 0.15 # 콜드스타트 초기값
MIN_DATA_DRAWS = 10 # 학습 최소 회차
STRATEGIES = ["combined", "simulation", "heatmap", "manual", "custom"]
RANK_BONUS = {5: 0.1, 4: 0.3, 3: 0.6, 2: 0.8, 1: 1.0}
# ── Pure functions (no DB dependency) ─────────────────────────────────────────
def calc_draw_score(results: List[Dict]) -> float:
"""구매 결과 리스트 → 평균 성과 점수"""
if not results:
return 0.0
scores = []
for r in results:
s = r.get("correct", 0) / 6.0
s += RANK_BONUS.get(r.get("rank", 0), 0)
scores.append(s)
return sum(scores) / len(scores)
def _softmax_weights(ema_scores: Dict[str, float]) -> Dict[str, float]:
"""EMA 점수 → Softmax → 최소 가중치 보장 → 정규화"""
raw = {s: math.exp(ema / TEMPERATURE) for s, ema in ema_scores.items()}
total = sum(raw.values())
weights = {s: v / total for s, v in raw.items()}
clamped = {}
surplus = 0.0
unclamped = []
for s, w in weights.items():
if w < MIN_WEIGHT:
clamped[s] = MIN_WEIGHT
surplus += MIN_WEIGHT - w
else:
unclamped.append(s)
clamped[s] = w
if surplus > 0 and unclamped:
unclamped_total = sum(clamped[s] for s in unclamped)
for s in unclamped:
clamped[s] -= surplus * (clamped[s] / unclamped_total)
final_total = sum(clamped.values())
return {s: round(v / final_total, 4) for s, v in clamped.items()}
# ── DB-dependent functions (use lazy imports) ─────────────────────────────────
def _db():
"""Lazy import to avoid circular/relative import issues in tests"""
from . import db as _db_mod
return _db_mod
def _recommender():
from . import recommender as _rec_mod
return _rec_mod
def _analyzer():
from . import analyzer as _ana_mod
return _ana_mod
def update_ema_for_strategy(strategy: str, draw_score: float) -> float:
db = _db()
weights = db.get_strategy_weights()
current = next((w for w in weights if w["strategy"] == strategy), None)
old_ema = current["ema_score"] if current else INITIAL_EMA
new_ema = ALPHA * draw_score + (1 - ALPHA) * old_ema
return new_ema
def recalculate_weights() -> Dict[str, float]:
db = _db()
weights_rows = db.get_strategy_weights()
ema_scores = {w["strategy"]: w["ema_score"] for w in weights_rows}
for s in STRATEGIES:
if s not in ema_scores:
ema_scores[s] = INITIAL_EMA
new_weights = _softmax_weights(ema_scores)
for s, w in new_weights.items():
row = next((r for r in weights_rows if r["strategy"] == s), None)
db.update_strategy_weight(
strategy=s,
weight=w,
ema_score=ema_scores[s],
total_sets=row["total_sets"] if row else 0,
total_hits_3plus=row["total_hits_3plus"] if row else 0,
)
logger.info(f"[strategy_evolver] 가중치 재계산: {new_weights}")
return new_weights
def evolve_after_check(strategy: str, draw_no: int, results: List[Dict]) -> None:
db = _db()
draw_score = calc_draw_score(results)
new_ema = update_ema_for_strategy(strategy, draw_score)
weights_rows = db.get_strategy_weights()
current = next((w for w in weights_rows if w["strategy"] == strategy), None)
hits_3plus = sum(1 for r in results if r.get("correct", 0) >= 3)
db.update_strategy_weight(
strategy=strategy,
weight=current["weight"] if current else 0.2,
ema_score=new_ema,
total_sets=(current["total_sets"] if current else 0) + len(results),
total_hits_3plus=(current["total_hits_3plus"] if current else 0) + hits_3plus,
)
recalculate_weights()
def get_weights_with_trend() -> Dict[str, Any]:
db = _db()
weights = db.get_strategy_weights()
perfs = db.get_strategy_performance()
strat_perfs = {}
for p in perfs:
s = p["strategy"]
if s not in strat_perfs:
strat_perfs[s] = []
strat_perfs[s].append(p)
result = []
for w in weights:
sp = strat_perfs.get(w["strategy"], [])
if len(sp) >= 5:
recent_avg = sum(p["avg_score"] for p in sp[-3:]) / 3
older_avg = sum(p["avg_score"] for p in sp[-5:-2]) / 3
delta = recent_avg - older_avg
trend = "up" if delta > 0.02 else ("down" if delta < -0.02 else "stable")
else:
trend = "stable"
result.append({
"strategy": w["strategy"],
"weight": w["weight"],
"ema_score": w["ema_score"],
"total_sets": w["total_sets"],
"hits_3plus": w["total_hits_3plus"],
"trend": trend,
})
all_draws = set()
for p in perfs:
all_draws.add(p["draw_no"])
return {
"weights": result,
"last_evolved": weights[0]["updated_at"] if weights else None,
"min_data_draws": MIN_DATA_DRAWS,
"current_data_draws": len(all_draws),
"status": "active" if len(all_draws) >= MIN_DATA_DRAWS else "learning",
}
def generate_smart_recommendation(sets: int = 5) -> Dict[str, Any]:
db = _db()
rec = _recommender()
ana = _analyzer()
weights_data = db.get_strategy_weights()
weight_map = {w["strategy"]: w["weight"] for w in weights_data}
draws = db.get_all_draw_numbers()
if not draws:
return {"error": "No draw data"}
latest = db.get_latest_draw()
cache = ana.build_analysis_cache(draws)
past_recs = db.list_recommendations_ex(limit=100, sort="id_desc")
candidates = []
seen_keys = set()
def _add_candidate(nums: list, strategy: str, raw_score: float = None):
key = tuple(sorted(nums))
if key in seen_keys:
return
seen_keys.add(key)
if raw_score is None:
sc = ana.score_combination(nums, cache)
raw_score = sc["score_total"]
meta = raw_score * weight_map.get(strategy, 0.1)
candidates.append({
"numbers": sorted(nums),
"raw_score": round(raw_score, 4),
"strategy": strategy,
"meta_score": round(meta, 4),
})
# combined: 10세트
for _ in range(10):
try:
r = ana.generate_combined_recommendation(draws)
if "final_numbers" in r:
_add_candidate(r["final_numbers"], "combined")
except Exception:
pass
# simulation: best_picks 상위 10개
best = db.get_best_picks(limit=10)
for b in best:
nums = json.loads(b["numbers"]) if isinstance(b["numbers"], str) else b["numbers"]
_add_candidate(nums, "simulation", b.get("score_total"))
# heatmap: 10세트
for _ in range(10):
try:
r = rec.recommend_with_heatmap(draws, past_recs)
_add_candidate(r["numbers"], "heatmap")
except Exception:
pass
# manual: 10세트
for _ in range(10):
try:
r = rec.recommend_numbers(draws)
_add_candidate(r["numbers"], "manual")
except Exception:
pass
candidates.sort(key=lambda c: -c["meta_score"])
top = candidates[:sets]
result_sets = []
for c in top:
sc = ana.score_combination(c["numbers"], cache)
contributions = {}
for strat in STRATEGIES:
contributions[strat] = round(weight_map.get(strat, 0) * sc["score_total"], 4)
contrib_total = sum(contributions.values()) or 1
contributions = {s: round(v / contrib_total, 3) for s, v in contributions.items()}
result_sets.append({
"numbers": c["numbers"],
"meta_score": c["meta_score"],
"source_strategy": c["strategy"],
"contribution": contributions,
"individual_scores": {k: round(v, 4) for k, v in sc.items()},
})
perfs = db.get_strategy_performance()
data_draws = len(set(p["draw_no"] for p in perfs))
status = "active" if data_draws >= MIN_DATA_DRAWS else "learning"
return {
"sets": result_sets,
"strategy_weights_used": weight_map,
"learning_status": {
"draws_learned": data_draws,
"status": status,
"message": "" if status == "active" else f"{MIN_DATA_DRAWS}회차 이상 데이터 필요 (현재 {data_draws}회차)",
},
"based_on_latest_draw": latest["drw_no"] if latest else None,
}

80
lotto/app/utils.py Normal file
View File

@@ -0,0 +1,80 @@
import random
from typing import List, Dict, Any, Tuple
def weighted_sample_6(weights: Dict[int, float]) -> List[int]:
"""
가중 확률 샘플링으로 중복 없이 6개 번호 추출.
weights: {1: w1, 2: w2, ..., 45: w45}
"""
pool = list(range(1, 46))
chosen: List[int] = []
for _ in range(6):
total = sum(weights[n] for n in pool)
r = random.random() * total
acc = 0.0
for n in pool:
acc += weights[n]
if acc >= r:
chosen.append(n)
pool.remove(n)
break
return chosen
def calc_metrics(numbers: List[int]) -> Dict[str, Any]:
nums = sorted(numbers)
s = sum(nums)
odd = sum(1 for x in nums if x % 2 == 1)
even = len(nums) - odd
mn, mx = nums[0], nums[-1]
rng = mx - mn
# 1-10, 11-20, 21-30, 31-40, 41-45
buckets = {
"1-10": 0,
"11-20": 0,
"21-30": 0,
"31-40": 0,
"41-45": 0,
}
for x in nums:
if 1 <= x <= 10:
buckets["1-10"] += 1
elif 11 <= x <= 20:
buckets["11-20"] += 1
elif 21 <= x <= 30:
buckets["21-30"] += 1
elif 31 <= x <= 40:
buckets["31-40"] += 1
else:
buckets["41-45"] += 1
return {
"sum": s,
"odd": odd,
"even": even,
"min": mn,
"max": mx,
"range": rng,
"buckets": buckets,
}
def calc_recent_overlap(numbers: List[int], draws: List[Tuple[int, List[int]]], last_k: int) -> Dict[str, Any]:
"""
draws: [(drw_no, [n1..n6]), ...] 오름차순
last_k: 최근 k회 기준 중복
"""
if last_k <= 0:
return {"last_k": 0, "repeats": 0, "repeated_numbers": []}
recent = draws[-last_k:] if len(draws) >= last_k else draws
recent_set = set()
for _, nums in recent:
recent_set.update(nums)
repeated = sorted(set(numbers) & recent_set)
return {
"last_k": len(recent),
"repeats": len(repeated),
"repeated_numbers": repeated,
}