"""주간 회고 채점 통합 잡 — 일요일 03:00 KST 실행. 1) 기존 purchase_manager.check_purchases_for_draw() 로 사용자 구매 자동 채점 2) 큐레이터 4계층 picks vs 추첨 결과 비교 3) 패턴 요약·갭 계산 4) weekly_review UPSERT 5) 4등 이상 발견 시 agent-office webhook 호출 """ import json import logging import os from typing import Optional import httpx from .. import db from ..purchase_manager import check_purchases_for_draw from .grading_helpers import ( score_picks_against_draw, summarize_pattern, aggregate_pattern_summaries, compute_pattern_delta, ) logger = logging.getLogger("lotto-backend") AGENT_OFFICE_URL = os.environ.get("AGENT_OFFICE_URL", "http://agent-office:8000") def _flatten_curator_picks(briefing: dict) -> list: """4계층 picks 를 모두 합쳐 단일 리스트(score 계산용).""" picks = briefing.get("picks") or {} if isinstance(picks, list): return picks out = [] for tier in ("core", "bonus", "extended", "pool"): out.extend(picks.get(tier) or []) return out def _curator_score(briefing: dict, win_nums: list, bonus: int) -> dict: if not briefing: return {} flat = _flatten_curator_picks(briefing) if not flat: return {} return score_picks_against_draw(flat, win_nums, bonus) def _user_score(drw_no: int, win_nums: list) -> dict: purchases = db.get_purchases(draw_no=drw_no) if not purchases: return {} matches = [] win_set = set(win_nums) pattern_summaries = [] for p in purchases: for nums in (p.get("numbers") or []): if not nums: continue m = len(set(nums) & win_set) matches.append(m) pattern_summaries.append(summarize_pattern(nums)) if not matches: return {} return { "avg_match": round(sum(matches) / len(matches), 2), "best_match": max(matches), "five_plus_prizes": sum(1 for m in matches if m >= 3), "pattern_avg": aggregate_pattern_summaries(pattern_summaries), } def _trigger_prize_alert(drw_no: int, match_count: int, numbers: list, purchase_id: int) -> None: try: with httpx.Client(timeout=10) as client: client.post( f"{AGENT_OFFICE_URL}/api/agent-office/notify/lotto-prize", json={ "draw_no": drw_no, "match_count": match_count, "numbers": numbers, "purchase_id": purchase_id, }, ) except Exception as e: logger.warning(f"[grade_weekly_review] prize alert webhook failed: {e}") def run_weekly_grading(drw_no: int) -> dict: """주어진 회차에 대해 채점 잡 1회 실행. 멱등.""" draw = db.get_draw(drw_no) if not draw: logger.warning(f"[grade_weekly_review] draw {drw_no} not found, skip") return {"ok": False, "reason": "no draw"} win_nums = [draw["n1"], draw["n2"], draw["n3"], draw["n4"], draw["n5"], draw["n6"]] bonus = draw["bonus"] # 1) 사용자 구매 자동 채점 (기존 인프라) try: check_purchases_for_draw(drw_no) except Exception as e: logger.warning(f"[grade_weekly_review] check_purchases_for_draw failed: {e}") # 2) 4등 이상 발견 시 webhook purchases = db.get_purchases(draw_no=drw_no, checked=True) for p in purchases: for r in (p.get("results") or []): if r.get("correct", 0) >= 4: _trigger_prize_alert(drw_no, r["correct"], r["numbers"], p["id"]) # 3) 큐레이터 자기 평가 briefing = db.get_briefing(drw_no) cur = _curator_score(briefing, win_nums, bonus) # 4) 사용자 평가 (재로드, 구매가 다 채점된 후 패턴 계산) usr = _user_score(drw_no, win_nums) # 5) 추첨 패턴 요약 + 델타 draw_summary = summarize_pattern(win_nums) draw_pattern = { "low_avg": draw_summary["low_count"], "odd_avg": draw_summary["odd_count"], "sum_avg": draw_summary["sum"], } user_pattern = usr.get("pattern_avg", {}) delta = compute_pattern_delta(user_pattern, draw_pattern) if user_pattern else "" # 6) UPSERT payload = { "draw_no": drw_no, "curator_avg_match": cur.get("avg_match"), "curator_best_tier": cur.get("best_tier"), "curator_best_match": cur.get("best_match"), "curator_5plus_prizes": cur.get("five_plus_prizes"), "user_avg_match": usr.get("avg_match"), "user_best_match": usr.get("best_match"), "user_5plus_prizes": usr.get("five_plus_prizes"), "user_pattern_summary": json.dumps(user_pattern, ensure_ascii=False) if user_pattern else None, "draw_pattern_summary": json.dumps(draw_pattern, ensure_ascii=False), "pattern_delta": delta, } rid = db.save_review(payload) logger.info(f"[grade_weekly_review] saved review id={rid} for draw {drw_no}") return {"ok": True, "review_id": rid} def run_for_latest() -> dict: """가장 최근 sync된 추첨 회차로 채점 — cron 진입점.""" latest = db.get_latest_draw() if not latest: return {"ok": False, "reason": "no draws"} return run_weekly_grading(latest["drw_no"])