feat(lotto): grade_weekly_review 통합 잡 — 큐레이터 자기평가 + 패턴 갭

This commit is contained in:
2026-05-11 08:33:51 +09:00
parent d972ea66c3
commit 30bc627ae7
2 changed files with 214 additions and 0 deletions

View File

@@ -0,0 +1,154 @@
"""주간 회고 채점 통합 잡 — 일요일 03:00 KST 실행.
1) 기존 purchase_manager.check_purchases_for_draw() 로 사용자 구매 자동 채점
2) 큐레이터 4계층 picks vs 추첨 결과 비교
3) 패턴 요약·갭 계산
4) weekly_review UPSERT
5) 4등 이상 발견 시 agent-office webhook 호출
"""
import json
import logging
import os
from typing import Optional
import httpx
from .. import db
from ..purchase_manager import check_purchases_for_draw
from .grading_helpers import (
score_picks_against_draw,
summarize_pattern,
aggregate_pattern_summaries,
compute_pattern_delta,
)
logger = logging.getLogger("lotto-backend")
AGENT_OFFICE_URL = os.environ.get("AGENT_OFFICE_URL", "http://agent-office:8000")
def _flatten_curator_picks(briefing: dict) -> list:
"""4계층 picks 를 모두 합쳐 단일 리스트(score 계산용)."""
picks = briefing.get("picks") or {}
if isinstance(picks, list):
return picks
out = []
for tier in ("core", "bonus", "extended", "pool"):
out.extend(picks.get(tier) or [])
return out
def _curator_score(briefing: dict, win_nums: list, bonus: int) -> dict:
if not briefing:
return {}
flat = _flatten_curator_picks(briefing)
if not flat:
return {}
return score_picks_against_draw(flat, win_nums, bonus)
def _user_score(drw_no: int, win_nums: list) -> dict:
purchases = db.get_purchases(draw_no=drw_no)
if not purchases:
return {}
matches = []
win_set = set(win_nums)
pattern_summaries = []
for p in purchases:
for nums in (p.get("numbers") or []):
if not nums:
continue
m = len(set(nums) & win_set)
matches.append(m)
pattern_summaries.append(summarize_pattern(nums))
if not matches:
return {}
return {
"avg_match": round(sum(matches) / len(matches), 2),
"best_match": max(matches),
"five_plus_prizes": sum(1 for m in matches if m >= 3),
"pattern_avg": aggregate_pattern_summaries(pattern_summaries),
}
def _trigger_prize_alert(drw_no: int, match_count: int, numbers: list, purchase_id: int) -> None:
try:
with httpx.Client(timeout=10) as client:
client.post(
f"{AGENT_OFFICE_URL}/api/agent-office/notify/lotto-prize",
json={
"draw_no": drw_no,
"match_count": match_count,
"numbers": numbers,
"purchase_id": purchase_id,
},
)
except Exception as e:
logger.warning(f"[grade_weekly_review] prize alert webhook failed: {e}")
def run_weekly_grading(drw_no: int) -> dict:
"""주어진 회차에 대해 채점 잡 1회 실행. 멱등."""
draw = db.get_draw(drw_no)
if not draw:
logger.warning(f"[grade_weekly_review] draw {drw_no} not found, skip")
return {"ok": False, "reason": "no draw"}
win_nums = [draw["n1"], draw["n2"], draw["n3"], draw["n4"], draw["n5"], draw["n6"]]
bonus = draw["bonus"]
# 1) 사용자 구매 자동 채점 (기존 인프라)
try:
check_purchases_for_draw(drw_no)
except Exception as e:
logger.warning(f"[grade_weekly_review] check_purchases_for_draw failed: {e}")
# 2) 4등 이상 발견 시 webhook
purchases = db.get_purchases(draw_no=drw_no, checked=True)
for p in purchases:
for r in (p.get("results") or []):
if r.get("correct", 0) >= 4:
_trigger_prize_alert(drw_no, r["correct"], r["numbers"], p["id"])
# 3) 큐레이터 자기 평가
briefing = db.get_briefing(drw_no)
cur = _curator_score(briefing, win_nums, bonus)
# 4) 사용자 평가 (재로드, 구매가 다 채점된 후 패턴 계산)
usr = _user_score(drw_no, win_nums)
# 5) 추첨 패턴 요약 + 델타
draw_summary = summarize_pattern(win_nums)
draw_pattern = {
"low_avg": draw_summary["low_count"],
"odd_avg": draw_summary["odd_count"],
"sum_avg": draw_summary["sum"],
}
user_pattern = usr.get("pattern_avg", {})
delta = compute_pattern_delta(user_pattern, draw_pattern) if user_pattern else ""
# 6) UPSERT
payload = {
"draw_no": drw_no,
"curator_avg_match": cur.get("avg_match"),
"curator_best_tier": cur.get("best_tier"),
"curator_best_match": cur.get("best_match"),
"curator_5plus_prizes": cur.get("five_plus_prizes"),
"user_avg_match": usr.get("avg_match"),
"user_best_match": usr.get("best_match"),
"user_5plus_prizes": usr.get("five_plus_prizes"),
"user_pattern_summary": json.dumps(user_pattern, ensure_ascii=False) if user_pattern else None,
"draw_pattern_summary": json.dumps(draw_pattern, ensure_ascii=False),
"pattern_delta": delta,
}
rid = db.save_review(payload)
logger.info(f"[grade_weekly_review] saved review id={rid} for draw {drw_no}")
return {"ok": True, "review_id": rid}
def run_for_latest() -> dict:
"""가장 최근 sync된 추첨 회차로 채점 — cron 진입점."""
latest = db.get_latest_draw()
if not latest:
return {"ok": False, "reason": "no draws"}
return run_weekly_grading(latest["drw_no"])