17 Commits

Author SHA1 Message Date
134e628e5e Merge feature/lotto-curator-evolution: Lotto Curator Evolution
16 commits across Phase A-E + H:
- weekly_review 테이블 + grade_weekly_review 잡 (일 03:00 KST)
- review/bulk/briefing 4계층 라우터
- 큐레이터 4계층 스키마 + retrospective + N=30
- 텔레그램 큐레이션·당첨 알림 + lotto_agent 월 09:00 KST
- 1주차 운영 점검 체크리스트

자세한 컨셉/계획: web-ui/docs/superpowers/{specs,plans}/2026-05-11-*.md
2026-05-11 09:38:31 +09:00
ce3a734e81 docs(lotto): 1주차 운영 점검 체크리스트 2026-05-11 09:08:05 +09:00
fb81c51dc8 feat(curator): 큐레이션 후 텔레그램 자동 푸시 + cron 09:00 변경 2026-05-11 08:55:12 +09:00
715e1598ce feat(agent-office): /api/agent-office/notify/lotto-prize 웹훅 2026-05-11 08:54:19 +09:00
57a4a72ff1 feat(curator): 텔레그램 큐레이션·당첨 알림 포맷터 2026-05-11 08:53:10 +09:00
e14278ec69 feat(curator): pipeline 4계층 직렬화 + retrospective 컨텍스트 + N=30 2026-05-11 08:51:07 +09:00
ff3134b838 feat(curator): build_retrospective + lotto review service proxy 2026-05-11 08:49:58 +09:00
95c5dc4217 feat(curator): SYSTEM_PROMPT 회고 + 4계층 규칙 2026-05-11 08:48:06 +09:00
9fb1c37eae feat(curator): 4계층 picks + tier_rationale + narrative.retrospective 스키마 2026-05-11 08:46:50 +09:00
3bd819b5e2 feat(lotto): briefing API 4계층 picks + tier_rationale 수용 2026-05-11 08:45:21 +09:00
b936233e7c feat(lotto): POST /api/lotto/purchase/bulk — 결정카드 원클릭 기록 2026-05-11 08:42:27 +09:00
4f85496fe5 feat(lotto): review 라우터 — latest/history/by-draw 2026-05-11 08:39:01 +09:00
2a2209a86c feat(lotto): 일 03:00 KST 채점 잡 APScheduler 등록 2026-05-11 08:37:08 +09:00
30bc627ae7 feat(lotto): grade_weekly_review 통합 잡 — 큐레이터 자기평가 + 패턴 갭 2026-05-11 08:33:51 +09:00
d972ea66c3 feat(lotto): 채점 보조 함수 — 일치 수·패턴 요약·델타 2026-05-11 08:29:46 +09:00
66165ebb88 feat(lotto): lotto_briefings.picks 4계층 객체로 마이그레이션 + tier_rationale 컬럼 2026-05-11 08:25:23 +09:00
5621cc7687 feat(lotto): weekly_review 테이블 + CRUD 헬퍼 2026-05-11 08:21:44 +09:00
27 changed files with 1123 additions and 93 deletions

View File

@@ -27,11 +27,21 @@ class LottoAgent(BaseAgent):
await self.transition("working", "후보 수집 및 AI 큐레이션 중...", task_id) await self.transition("working", "후보 수집 및 AI 큐레이션 중...", task_id)
try: try:
result = await curate_weekly(source=source) result = await curate_weekly(source=source)
update_task_status(task_id, "succeeded", result_data=result) update_task_status(task_id, "succeeded", result_data={
k: v for k, v in result.items() if k != "payload"
})
await self.transition("reporting", f"#{result['draw_no']} 브리핑 저장 완료") await self.transition("reporting", f"#{result['draw_no']} 브리핑 저장 완료")
add_log(self.agent_id, f"큐레이션 완료: #{result['draw_no']} conf={result['confidence']}", task_id=task_id) add_log(self.agent_id, f"큐레이션 완료: #{result['draw_no']} conf={result['confidence']}", task_id=task_id)
# 텔레그램 헤드라인 푸시 (실패해도 큐레이션은 성공으로 마감)
try:
from ..notifiers.telegram_lotto import send_curator_briefing
await send_curator_briefing(result["payload"])
except Exception as e:
add_log(self.agent_id, f"텔레그램 알림 실패: {e}", level="warning", task_id=task_id)
await self.transition("idle", "대기 중") await self.transition("idle", "대기 중")
return {"ok": True, **result} return {"ok": True, **{k: v for k, v in result.items() if k != "payload"}}
except CuratorError as e: except CuratorError as e:
update_task_status(task_id, "failed", result_data={"error": str(e)}) update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log(self.agent_id, f"큐레이션 실패: {e}", level="error", task_id=task_id) add_log(self.agent_id, f"큐레이션 실패: {e}", level="error", task_id=task_id)

View File

@@ -9,6 +9,7 @@ from ..config import ANTHROPIC_API_KEY, LOTTO_CURATOR_MODEL
from .. import service_proxy from .. import service_proxy
from .prompt import SYSTEM_PROMPT, build_user_message from .prompt import SYSTEM_PROMPT, build_user_message
from .schema import validate_response from .schema import validate_response
from .retrospective import build_retrospective
API_URL = "https://api.anthropic.com/v1/messages" API_URL = "https://api.anthropic.com/v1/messages"
@@ -36,12 +37,12 @@ async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict]:
user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마로 다시 응답.\n\n{user_text}" user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마로 다시 응답.\n\n{user_text}"
payload = { payload = {
"model": LOTTO_CURATOR_MODEL, "model": LOTTO_CURATOR_MODEL,
"max_tokens": 4096, "max_tokens": 8192, # 4계층 20세트 + narrative + retrospective 수용
"system": system_blocks, "system": system_blocks,
"messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}], "messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}],
} }
started = time.monotonic() started = time.monotonic()
async with httpx.AsyncClient(timeout=120) as client: async with httpx.AsyncClient(timeout=180) as client: # 큰 응답 → 시간 여유
r = await client.post(API_URL, headers=headers, json=payload) r = await client.post(API_URL, headers=headers, json=payload)
r.raise_for_status() r.raise_for_status()
resp = r.json() resp = r.json()
@@ -68,16 +69,19 @@ async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict]:
async def curate_weekly(source: str = "auto") -> Dict[str, Any]: async def curate_weekly(source: str = "auto") -> Dict[str, Any]:
cand_resp = await service_proxy.lotto_candidates(n=20) cand_resp = await service_proxy.lotto_candidates(n=30) # ← 30 으로 확장
draw_no = cand_resp["draw_no"] draw_no = cand_resp["draw_no"]
candidates = cand_resp["candidates"] candidates = cand_resp["candidates"]
context = await service_proxy.lotto_context() context = await service_proxy.lotto_context()
retrospective = await build_retrospective(draw_no)
user_text = build_user_message(draw_no, candidates, { user_text = build_user_message(draw_no, candidates, {
"hot_numbers": context.get("hot_numbers", []), "hot_numbers": context.get("hot_numbers", []),
"cold_numbers": context.get("cold_numbers", []), "cold_numbers": context.get("cold_numbers", []),
"last_draw_summary": context.get("last_draw_summary", ""), "last_draw_summary": context.get("last_draw_summary", ""),
"my_recent_performance": context.get("my_recent_performance", []), "my_recent_performance": context.get("my_recent_performance", []),
"retrospective": retrospective,
}) })
candidate_numbers = [c["numbers"] for c in candidates] candidate_numbers = [c["numbers"] for c in candidates]
@@ -101,8 +105,14 @@ async def curate_weekly(source: str = "auto") -> Dict[str, Any]:
payload = { payload = {
"draw_no": draw_no, "draw_no": draw_no,
"picks": [p.model_dump() for p in validated.picks], "picks": {
"core": [p.model_dump() for p in validated.core_picks],
"bonus": [p.model_dump() for p in validated.bonus_picks],
"extended": [p.model_dump() for p in validated.extended_picks],
"pool": [p.model_dump() for p in validated.pool_picks],
},
"narrative": validated.narrative.model_dump(), "narrative": validated.narrative.model_dump(),
"tier_rationale": validated.tier_rationale.model_dump(),
"confidence": validated.confidence, "confidence": validated.confidence,
"model": LOTTO_CURATOR_MODEL, "model": LOTTO_CURATOR_MODEL,
"tokens_input": usage_total["input"], "tokens_input": usage_total["input"],
@@ -118,4 +128,5 @@ async def curate_weekly(source: str = "auto") -> Dict[str, Any]:
"draw_no": draw_no, "draw_no": draw_no,
"confidence": validated.confidence, "confidence": validated.confidence,
"tokens": {"input": usage_total["input"], "output": usage_total["output"]}, "tokens": {"input": usage_total["input"], "output": usage_total["output"]},
"payload": payload, # 텔레그램 알림용
} }

View File

@@ -2,31 +2,49 @@
import json import json
SYSTEM_PROMPT = """당신은 로또 번호 큐레이터입니다. 주어진 후보 20세트 중 5세트를 다음 규칙으로 선별합니다. SYSTEM_PROMPT = """당신은 로또 번호 큐레이터입니다.
주어진 후보 30세트 중 4계층(코어 5, 보너스 5, 확장 5, 풀 5) 총 20세트를 선별합니다.
선별 규칙: 계층별 큐레이션 규칙:
- 5세트의 리스크 분포는 안정 2 · 균형 2 · 공격 1 을 권장(유연 ±1). - core_picks (5): 안정 2 / 균형 2 / 공격 1. 그 주 주축. 홀짝·저고·구간 분포가 세트끼리 겹치지 않게.
- 홀짝 비율, 저/고 구간, 연속번호 포함 여부가 세트끼리 겹치지 않도록 다양성을 확보. - bonus_picks (5): 코어 분배의 공백을 메우는 5세트. 코어가 공격 1뿐이면 보너스에 공격 +2 식.
- hot_number_count=0 이고 cold_number_count=0 인 '중립형' 세트를 최소 1개 포함. - extended_picks (5): 코어·보너스에 없는 시각 — 합계 극단(80↓ / 180↑) / 콜드 4주 누적 / 4주 미등장 번호 노출.
- 후보에 없는 번호 조합은 절대 사용 금지. numbers 필드는 반드시 candidates 중 하나와 정확히 일치해야 함. - pool_picks (5): 이번 주 한 번도 누르지 않은 패턴 — 연속 3개 / 동일 끝자리 / 5수 균등(각 끝자리 5개씩) 등.
- 각 세트 reason은 한국어 40자 이내 한 줄. 해당 세트의 features 값과 context 값만 근거로. - tier_rationale 의 3개 키(bonus·extended·pool)에 각각 30자 이내 한국어 사유.
공통 규칙:
- 후보에 없는 번호 조합은 절대 사용 금지. 모든 픽은 candidates 중 하나와 정확히 일치해야 함.
- 4계층 사이에 중복 픽 금지 (총 20세트는 모두 서로 달라야 함).
- 각 픽 reason 은 한국어 40자 이내. 해당 픽의 features 와 context 만 근거로.
- 중립형(hot_number_count=0 이고 cold_number_count=0) 세트를 코어에 최소 1개 포함.
회고 규칙:
- context.retrospective 가 있으면 narrative.retrospective 에 한 줄(60자 이내)로 작성.
- 회고는 큐레이터 자기 결과(curator_avg, best_tier) + 사용자 결과(user_avg, pattern_delta) 둘 다 짚을 것.
- 이번 주 코어 분배는 회고에 근거해 조정. 조정 사유는 narrative.headline 에 한 줄로.
예: "지난 주 너 저번호 편향 → 보너스 고번호 보강"
- context.retrospective 가 없으면 narrative.retrospective 는 빈 문자열.
narrative 규칙: narrative 규칙:
- headline: 한 줄, 이번 주 추첨 전망 요약. - headline: 한 줄, 이번 주 추첨 전망 + 조정 사유.
- summary_3lines: 정확히 3개 항목의 배열. - summary_3lines: 정확히 3개 항목.
- hot_cold_comment: hot/cold 번호에 대한 한 줄 논평. - hot_cold_comment: hot/cold 번호 한 줄 논평.
- warnings: 특별한 주의사항 없으면 빈 문자열. - warnings: 주의사항 없으면 빈 문자열.
- retrospective: 회고 한 줄 또는 빈 문자열.
출력은 반드시 JSON 하나, 그 외 어떤 텍스트도 금지. 스키마: 출력은 반드시 JSON 하나, 그 외 어떤 텍스트도 금지. 스키마:
{ {
"picks": [ "core_picks": [{"numbers":[...], "risk_tag":"안정"|"균형"|"공격", "reason": str}, ...5개],
{"numbers":[int,int,int,int,int,int], "risk_tag":"안정"|"균형"|"공격", "reason": str} "bonus_picks": [...5개],
], "extended_picks": [...5개],
"pool_picks": [...5개],
"tier_rationale": {"bonus": str, "extended": str, "pool": str},
"narrative": { "narrative": {
"headline": str, "headline": str,
"summary_3lines": [str, str, str], "summary_3lines": [str, str, str],
"hot_cold_comment": str, "hot_cold_comment": str,
"warnings": str "warnings": str,
"retrospective": str
}, },
"confidence": int (0~100) "confidence": int (0~100)
} }
@@ -36,11 +54,11 @@ narrative 규칙:
def build_user_message(draw_no: int, candidates: list, context: dict) -> str: def build_user_message(draw_no: int, candidates: list, context: dict) -> str:
payload = { payload = {
"draw_no": draw_no, "draw_no": draw_no,
"context": context, "context": context, # hot_numbers, cold_numbers, last_draw_summary, my_recent_performance, retrospective
"candidates": candidates, "candidates": candidates,
} }
return ( return (
f"이번 회차: {draw_no}\n" f"이번 회차: {draw_no}\n"
f"아래 데이터로 5세트를 큐레이션하고 위 스키마로만 응답하세요.\n\n" f"아래 데이터로 4계층 20세트를 큐레이션하고 위 스키마로만 응답하세요.\n\n"
f"```json\n{json.dumps(payload, ensure_ascii=False)}\n```" f"```json\n{json.dumps(payload, ensure_ascii=False)}\n```"
) )

View File

@@ -0,0 +1,50 @@
"""큐레이션 직전 호출 — review 1건 + 추세 3건 → 컨텍스트 dict."""
import json
from typing import Optional, Dict, Any
from .. import service_proxy
def _detect_bias(reviews: list) -> str:
"""3주↑ 같은 방향 패턴 편향이 유지되면 한 줄로."""
deltas = [r.get("pattern_delta") or "" for r in reviews if r.get("pattern_delta")]
if len(deltas) < 2:
return ""
# 단순 휴리스틱 — 같은 키워드("저번호" 등)가 2회 이상이면 지속 편향
keywords = ["저번호", "고번호", "합계", "홀짝"]
persistent = []
for kw in keywords:
cnt = sum(1 for d in deltas if kw in d)
if cnt >= max(2, len(deltas) - 1):
persistent.append(kw)
return " · ".join(persistent)
async def build_retrospective(target_draw_no: int) -> Optional[Dict[str, Any]]:
"""target_draw_no(이번 주) 직전 회차의 review + 그 앞 3회 추세."""
last = await service_proxy.lotto_review_by_draw(target_draw_no - 1)
if not last:
return None
history = await service_proxy.lotto_reviews_history(limit=4)
# history 는 desc 정렬 → last 와 그 이전 3건 분리
others = [r for r in history if r["draw_no"] < target_draw_no - 1][:3]
series = [last] + others
cur_avgs = [r["curator_avg_match"] for r in series if r.get("curator_avg_match") is not None]
usr_avgs = [r["user_avg_match"] for r in series if r.get("user_avg_match") is not None]
return {
"last_draw": {
"draw_no": last["draw_no"],
"curator_avg": last.get("curator_avg_match"),
"curator_best_tier": last.get("curator_best_tier"),
"user_avg": last.get("user_avg_match"),
"user_5plus": last.get("user_5plus_prizes"),
"pattern_delta": last.get("pattern_delta") or "",
},
"trend_4w": {
"curator_avg_4w": round(sum(cur_avgs) / len(cur_avgs), 2) if cur_avgs else None,
"user_avg_4w": round(sum(usr_avgs) / len(usr_avgs), 2) if usr_avgs else None,
"user_persistent_bias": _detect_bias(series),
},
}

View File

@@ -17,25 +17,42 @@ class Pick(BaseModel):
return sorted(v) return sorted(v)
class TierRationale(BaseModel):
bonus: str = Field(max_length=40)
extended: str = Field(max_length=40)
pool: str = Field(max_length=40)
class Narrative(BaseModel): class Narrative(BaseModel):
headline: str headline: str
summary_3lines: List[str] = Field(min_length=3, max_length=3) summary_3lines: List[str] = Field(min_length=3, max_length=3)
hot_cold_comment: str = "" hot_cold_comment: str = ""
warnings: str = "" warnings: str = ""
retrospective: str = Field(default="", max_length=80)
class CuratorOutput(BaseModel): class CuratorOutput(BaseModel):
picks: List[Pick] core_picks: List[Pick] = Field(min_length=5, max_length=5)
bonus_picks: List[Pick] = Field(min_length=5, max_length=5)
extended_picks: List[Pick] = Field(min_length=5, max_length=5)
pool_picks: List[Pick] = Field(min_length=5, max_length=5)
tier_rationale: TierRationale
narrative: Narrative narrative: Narrative
confidence: int = Field(ge=0, le=100) confidence: int = Field(ge=0, le=100)
def validate_response(data: dict, candidate_numbers: List[List[int]]) -> CuratorOutput: def validate_response(data: dict, candidate_numbers: List[List[int]]) -> CuratorOutput:
out = CuratorOutput.model_validate(data) out = CuratorOutput.model_validate(data)
if len(out.picks) != 5:
raise ValueError("picks must have exactly 5 sets")
candidate_set = {tuple(sorted(c)) for c in candidate_numbers} candidate_set = {tuple(sorted(c)) for c in candidate_numbers}
for p in out.picks: all_picks = (
out.core_picks + out.bonus_picks + out.extended_picks + out.pool_picks
)
# 중복 픽 검증
pick_keys = [tuple(p.numbers) for p in all_picks]
if len(pick_keys) != len(set(pick_keys)):
raise ValueError("duplicate picks across tiers")
# 후보에 없는 번호 조합 금지
for p in all_picks:
if tuple(p.numbers) not in candidate_set: if tuple(p.numbers) not in candidate_set:
raise ValueError(f"pick {p.numbers} not in candidates") raise ValueError(f"pick {p.numbers} not in candidates")
return out return out

View File

@@ -10,8 +10,10 @@ from .websocket_manager import ws_manager
from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY from .agents import init_agents, get_agent, get_all_agent_states, AGENT_REGISTRY
from .scheduler import init_scheduler from .scheduler import init_scheduler
from . import telegram_bot from . import telegram_bot
from .routers import notify as notify_router
app = FastAPI() app = FastAPI()
app.include_router(notify_router.router)
_cors_origins = CORS_ALLOW_ORIGINS.split(",") _cors_origins = CORS_ALLOW_ORIGINS.split(",")
app.add_middleware( app.add_middleware(

View File

View File

@@ -0,0 +1,61 @@
"""로또 큐레이션·당첨 알림 — 텔레그램 푸시."""
import logging
from typing import Dict, Any
# 기존 에이전트들과 동일한 패턴: send_raw(text, reply_markup=None, chat_id=None)
# chat_id 생략 시 기본 TELEGRAM_CHAT_ID로 자동 발송.
from ..telegram.messaging import send_raw
logger = logging.getLogger("agent-office")
LOTTO_URL = "https://gahusb.synology.me/lotto"
def _format_briefing(payload: Dict[str, Any]) -> str:
draw_no = payload["draw_no"]
nar = payload["narrative"]
conf = payload["confidence"]
# 분배 칩 — core 5세트의 risk_tag 빈도
core = payload["picks"]["core"]
role_count = {"안정": 0, "균형": 0, "공격": 0}
for p in core:
role_count[p["risk_tag"]] = role_count.get(p["risk_tag"], 0) + 1
chip = " · ".join(f"{k} {v}" for k, v in role_count.items() if v)
msg = [
f"🎟 {draw_no}회 · 큐레이션 떴음",
"",
f"\"{nar['headline']}\"",
f"신뢰도 {conf} · 분배 {chip}",
]
retro = nar.get("retrospective") or ""
if retro:
msg += ["", f"▸ 회고: {retro}"]
msg += ["", f"👉 결정 카드 보러가기 ({LOTTO_URL})"]
return "\n".join(msg)
def _format_prize_alert(event: Dict[str, Any]) -> str:
return (
"🚨 로또 당첨 가능성!\n"
f"{event['draw_no']}회 — {event['match_count']}개 일치\n"
f"번호: {', '.join(str(n) for n in event['numbers'])}\n"
"동행복권에서 즉시 확인하세요."
)
async def send_curator_briefing(payload: Dict[str, Any]) -> None:
text = _format_briefing(payload)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] briefing send failed: {e}")
async def send_prize_alert(event: Dict[str, Any]) -> None:
text = _format_prize_alert(event)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] prize alert send failed: {e}")

View File

View File

@@ -0,0 +1,20 @@
"""다른 서비스가 트리거하는 웹훅 — 현재 lotto-backend → 텔레그램 푸시."""
from typing import List
from fastapi import APIRouter
from pydantic import BaseModel
from ..notifiers.telegram_lotto import send_prize_alert
router = APIRouter(prefix="/api/agent-office/notify")
class LottoPrizeEvent(BaseModel):
draw_no: int
match_count: int
numbers: List[int]
purchase_id: int
@router.post("/lotto-prize")
async def lotto_prize(body: LottoPrizeEvent):
await send_prize_alert(body.model_dump())
return {"ok": True}

View File

@@ -42,7 +42,7 @@ async def _poll_pipelines():
def init_scheduler(): def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news") scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline") scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate") scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research") scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report") scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check") scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check")

View File

@@ -180,6 +180,34 @@ async def lotto_save_briefing(payload: dict) -> Dict[str, Any]:
return resp.json() return resp.json()
async def lotto_review_latest() -> Optional[Dict[str, Any]]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/review/latest")
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
async def lotto_review_by_draw(draw_no: int) -> Optional[Dict[str, Any]]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/review/{draw_no}")
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
async def lotto_reviews_history(limit: int = 10) -> List[Dict[str, Any]]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(
f"{LOTTO_BACKEND_URL}/api/lotto/review/history",
params={"limit": limit},
)
resp.raise_for_status()
return resp.json().get("reviews", [])
# --- music-lab pipeline (YouTube publisher orchestration) --- # --- music-lab pipeline (YouTube publisher orchestration) ---
async def list_active_pipelines() -> list[dict]: async def list_active_pipelines() -> list[dict]:

View File

@@ -1,60 +1,55 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
import pytest import pytest
from app.curator.schema import validate_response, CuratorOutput from app.curator.schema import validate_response
CANDIDATE_NUMBERS = [ def _pick(nums, role="안정"):
[1, 2, 3, 4, 5, 6], return {"numbers": nums, "risk_tag": role, "reason": "x"}
[7, 8, 9, 10, 11, 12],
[13, 14, 15, 16, 17, 18],
[19, 20, 21, 22, 23, 24],
[25, 26, 27, 28, 29, 30],
[31, 32, 33, 34, 35, 36],
]
def _valid_payload(): def _make_payload(core, bonus, ext, pool):
return { return {
"picks": [ "core_picks": core, "bonus_picks": bonus,
{"numbers": s, "risk_tag": "안정", "reason": "test"} "extended_picks": ext, "pool_picks": pool,
for s in CANDIDATE_NUMBERS[:5] "tier_rationale": {"bonus": "a", "extended": "b", "pool": "c"},
],
"narrative": { "narrative": {
"headline": "h", "summary_3lines": ["a", "b", "c"], "headline": "h",
"hot_cold_comment": "hc", "warnings": "", "summary_3lines": ["1", "2", "3"],
"retrospective": "지난주 평균 1.8",
}, },
"confidence": 80, "confidence": 70,
} }
def test_valid_payload_passes(): def test_valid_4tier():
result = validate_response(_valid_payload(), CANDIDATE_NUMBERS) pool = [[i, i+1, i+2, i+3, i+4, i+5] for i in range(1, 21)]
assert isinstance(result, CuratorOutput) cores = [_pick(pool[i]) for i in range(5)]
assert len(result.picks) == 5 bonus = [_pick(pool[i]) for i in range(5, 10)]
ext = [_pick(pool[i]) for i in range(10, 15)]
pl = [_pick(pool[i]) for i in range(15, 20)]
out = validate_response(_make_payload(cores, bonus, ext, pl), pool)
assert len(out.core_picks) == 5
assert out.narrative.retrospective.startswith("지난주")
def test_rejects_number_out_of_candidates(): def test_duplicate_pick_rejected():
bad = _valid_payload() pool = [[i, i+1, i+2, i+3, i+4, i+5] for i in range(1, 21)]
bad["picks"][0]["numbers"] = [40, 41, 42, 43, 44, 45] # valid numbers but not in candidates cores = [_pick(pool[0])] * 5 # 중복
bonus = [_pick(pool[i]) for i in range(5, 10)]
ext = [_pick(pool[i]) for i in range(10, 15)]
pl = [_pick(pool[i]) for i in range(15, 20)]
with pytest.raises(ValueError, match="duplicate"):
validate_response(_make_payload(cores, bonus, ext, pl), pool)
def test_pick_not_in_candidates_rejected():
pool = [[i, i+1, i+2, i+3, i+4, i+5] for i in range(1, 21)]
foreign = [40, 41, 42, 43, 44, 45]
cores = [_pick(foreign)] + [_pick(pool[i]) for i in range(1, 5)]
bonus = [_pick(pool[i]) for i in range(5, 10)]
ext = [_pick(pool[i]) for i in range(10, 15)]
pl = [_pick(pool[i]) for i in range(15, 20)]
with pytest.raises(ValueError, match="not in candidates"): with pytest.raises(ValueError, match="not in candidates"):
validate_response(bad, CANDIDATE_NUMBERS) validate_response(_make_payload(cores, bonus, ext, pl), pool)
def test_rejects_wrong_pick_count():
bad = _valid_payload()
bad["picks"] = bad["picks"][:3]
with pytest.raises(ValueError, match="exactly 5"):
validate_response(bad, CANDIDATE_NUMBERS)
def test_rejects_duplicate_numbers_within_set():
bad = _valid_payload()
bad["picks"][0]["numbers"] = [1, 1, 2, 3, 4, 5]
with pytest.raises(ValueError):
validate_response(bad, CANDIDATE_NUMBERS)
def test_rejects_invalid_risk_tag():
bad = _valid_payload()
bad["picks"][0]["risk_tag"] = "미친"
with pytest.raises(ValueError):
validate_response(bad, CANDIDATE_NUMBERS)

View File

@@ -0,0 +1,47 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
import pytest
from unittest.mock import AsyncMock, patch
from app.curator.retrospective import build_retrospective, _detect_bias
def test_detect_bias_persistent_low():
reviews = [
{"pattern_delta": "저번호 편향 +1.2 / 합계 -18"},
{"pattern_delta": "저번호 편향 +0.8"},
{"pattern_delta": "저번호 편향 +1.0 / 홀짝 +0.5"},
]
assert "저번호" in _detect_bias(reviews)
def test_detect_bias_no_persistence():
reviews = [
{"pattern_delta": "저번호 편향 +1.2"},
{"pattern_delta": "고번호 편향 +0.8"},
]
assert _detect_bias(reviews) == ""
@pytest.mark.asyncio
async def test_build_retrospective_with_data():
with patch("app.service_proxy.lotto_review_by_draw", new=AsyncMock(return_value={
"draw_no": 1153, "curator_avg_match": 1.8, "curator_best_tier": "안정",
"user_avg_match": 2.0, "user_5plus_prizes": 1, "pattern_delta": "저번호 편향 +1.2",
})), patch("app.service_proxy.lotto_reviews_history", new=AsyncMock(return_value=[
{"draw_no": 1153, "curator_avg_match": 1.8, "user_avg_match": 2.0, "pattern_delta": "저번호 편향 +1.2"},
{"draw_no": 1152, "curator_avg_match": 1.6, "user_avg_match": 1.5, "pattern_delta": "저번호 편향 +0.8"},
{"draw_no": 1151, "curator_avg_match": 1.7, "user_avg_match": 1.8, "pattern_delta": "저번호 편향 +1.0"},
{"draw_no": 1150, "curator_avg_match": 1.9, "user_avg_match": 2.2, "pattern_delta": ""},
])):
out = await build_retrospective(1154)
assert out["last_draw"]["draw_no"] == 1153
assert out["trend_4w"]["curator_avg_4w"] == round((1.8+1.6+1.7+1.9)/4, 2)
assert "저번호" in out["trend_4w"]["user_persistent_bias"]
@pytest.mark.asyncio
async def test_build_retrospective_no_review():
with patch("app.service_proxy.lotto_review_by_draw", new=AsyncMock(return_value=None)):
out = await build_retrospective(1154)
assert out is None

View File

@@ -0,0 +1,44 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.notifiers.telegram_lotto import _format_briefing, _format_prize_alert
def test_briefing_with_retrospective():
payload = {
"draw_no": 1154,
"confidence": 72,
"narrative": {
"headline": "안정 +1, 콜드 누적 보강",
"summary_3lines": ["a", "b", "c"],
"retrospective": "너 2.0 / 나 1.8 — 저번호 편향",
},
"picks": {
"core": [
{"risk_tag": "안정"}, {"risk_tag": "안정"}, {"risk_tag": "안정"},
{"risk_tag": "균형"}, {"risk_tag": "공격"},
],
"bonus": [], "extended": [], "pool": [],
},
}
text = _format_briefing(payload)
assert "1154회" in text
assert "신뢰도 72" in text
assert "안정 3" in text
assert "회고: 너 2.0" in text
def test_briefing_without_retrospective():
payload = {
"draw_no": 1, "confidence": 50,
"narrative": {"headline": "h", "summary_3lines": ["a","b","c"], "retrospective": ""},
"picks": {"core": [{"risk_tag":"안정"}]*5, "bonus":[],"extended":[],"pool":[]},
}
text = _format_briefing(payload)
assert "회고" not in text
def test_prize_alert():
text = _format_prize_alert({"draw_no": 1154, "match_count": 5, "numbers": [3,11,17,25,33,8]})
assert "5개 일치" in text
assert "3, 11, 17, 25, 33, 8" in text

View File

@@ -259,6 +259,45 @@ def init_db() -> None:
""") """)
conn.execute("CREATE INDEX IF NOT EXISTS idx_briefings_draw ON lotto_briefings(draw_no DESC)") conn.execute("CREATE INDEX IF NOT EXISTS idx_briefings_draw ON lotto_briefings(draw_no DESC)")
# ── weekly_review 테이블 (큐레이터 자기 평가 + 사용자 패턴 갭) ────────
conn.execute("""
CREATE TABLE IF NOT EXISTS weekly_review (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draw_no INTEGER UNIQUE NOT NULL,
curator_avg_match REAL,
curator_best_tier TEXT,
curator_best_match INTEGER,
curator_5plus_prizes INTEGER,
user_avg_match REAL,
user_best_match INTEGER,
user_5plus_prizes INTEGER,
user_pattern_summary TEXT,
draw_pattern_summary TEXT,
pattern_delta TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now','localtime'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_review_draw ON weekly_review(draw_no DESC)")
# ── lotto_briefings.picks 4계층 마이그레이션 (1회 변환) ───────────────
# 기존: picks가 JSON 리스트 [{numbers,risk_tag,reason}]
# 신규: picks가 JSON 객체 {core:[...], bonus:[], extended:[], pool:[]}
rows = conn.execute("SELECT id, picks FROM lotto_briefings").fetchall()
for r in rows:
try:
p = json.loads(r["picks"])
if isinstance(p, list):
new_picks = {"core": p, "bonus": [], "extended": [], "pool": []}
conn.execute(
"UPDATE lotto_briefings SET picks=? WHERE id=?",
(json.dumps(new_picks, ensure_ascii=False), r["id"]),
)
except (json.JSONDecodeError, TypeError):
continue
_ensure_column(conn, "lotto_briefings", "tier_rationale",
"ALTER TABLE lotto_briefings ADD COLUMN tier_rationale TEXT NOT NULL DEFAULT '{}'")
@@ -952,39 +991,88 @@ def update_purchase_results(purchase_id: int, results: list, total_prize: int) -
) )
def bulk_insert_purchases_from_briefing(draw_no: int, tier_mode: str, amount: int) -> Dict[str, Any]:
"""tier_mode 에 해당하는 큐레이터 picks 를 purchase_history 에 일괄 INSERT.
tier_mode: "core" | "core_bonus" | "core_bonus_extended" | "full"
"""
briefing = get_briefing(draw_no)
if not briefing:
return {"ok": False, "reason": "briefing not found"}
picks = briefing.get("picks") or {}
if isinstance(picks, list):
# 마이그레이션 이전 형태
picks = {"core": picks, "bonus": [], "extended": [], "pool": []}
tier_chain = {
"core": ["core"],
"core_bonus": ["core", "bonus"],
"core_bonus_extended": ["core", "bonus", "extended"],
"full": ["core", "bonus", "extended", "pool"],
}.get(tier_mode)
if not tier_chain:
return {"ok": False, "reason": f"unknown tier_mode: {tier_mode}"}
inserted_ids = []
with _conn() as conn:
for tier in tier_chain:
for idx, pick in enumerate(picks.get(tier) or []):
source_strategy = f"curator_{tier}"
source_detail = json.dumps({
"tier": tier,
"role": pick.get("risk_tag"),
"set_index": idx,
"draw_no": draw_no,
}, ensure_ascii=False)
numbers_json = json.dumps([pick.get("numbers")], ensure_ascii=False)
cur = conn.execute(
"""INSERT INTO purchase_history
(draw_no, amount, sets, prize, note, numbers, is_real, source_strategy, source_detail)
VALUES (?, ?, 1, 0, '', ?, 1, ?, ?)""",
(draw_no, 1000, numbers_json, source_strategy, source_detail),
)
inserted_ids.append(cur.lastrowid)
return {"ok": True, "inserted_ids": inserted_ids, "sets": len(inserted_ids)}
# --- Lotto Briefings --- # --- Lotto Briefings ---
def save_briefing(data: Dict[str, Any]) -> int: def save_briefing(data: Dict[str, Any]) -> int:
picks_json = json.dumps(data["picks"], ensure_ascii=False)
narrative_json = json.dumps(data["narrative"], ensure_ascii=False)
tier_rationale_json = json.dumps(data.get("tier_rationale") or {}, ensure_ascii=False)
with _conn() as conn: with _conn() as conn:
cur = conn.execute(""" cur = conn.execute(
"""
INSERT INTO lotto_briefings INSERT INTO lotto_briefings
(draw_no, picks, narrative, confidence, model, (draw_no, picks, narrative, confidence, model,
tokens_input, tokens_output, cache_read, cache_write, tokens_input, tokens_output, cache_read, cache_write,
latency_ms, source) latency_ms, source, tier_rationale)
VALUES (?,?,?,?,?,?,?,?,?,?,?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(draw_no) DO UPDATE SET ON CONFLICT(draw_no) DO UPDATE SET
picks=excluded.picks, narrative=excluded.narrative, picks=excluded.picks,
confidence=excluded.confidence, model=excluded.model, narrative=excluded.narrative,
confidence=excluded.confidence,
model=excluded.model,
tokens_input=excluded.tokens_input, tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output, tokens_output=excluded.tokens_output,
cache_read=excluded.cache_read, cache_read=excluded.cache_read,
cache_write=excluded.cache_write, cache_write=excluded.cache_write,
latency_ms=excluded.latency_ms, latency_ms=excluded.latency_ms,
source=excluded.source, source=excluded.source,
tier_rationale=excluded.tier_rationale,
generated_at=datetime('now','localtime') generated_at=datetime('now','localtime')
""", ( """,
data["draw_no"], (
json.dumps(data["picks"], ensure_ascii=False), data["draw_no"], picks_json, narrative_json,
json.dumps(data["narrative"], ensure_ascii=False), data["confidence"], data["model"],
int(data["confidence"]), data.get("tokens_input", 0), data.get("tokens_output", 0),
data["model"], data.get("cache_read", 0), data.get("cache_write", 0),
int(data.get("tokens_input", 0)), data.get("latency_ms", 0), data.get("source", "auto"),
int(data.get("tokens_output", 0)), tier_rationale_json,
int(data.get("cache_read", 0)), ),
int(data.get("cache_write", 0)), )
int(data.get("latency_ms", 0)),
data.get("source", "auto"),
))
return cur.lastrowid return cur.lastrowid
@@ -994,6 +1082,7 @@ def _briefing_row(r) -> Dict[str, Any]:
"draw_no": r["draw_no"], "draw_no": r["draw_no"],
"picks": json.loads(r["picks"]), "picks": json.loads(r["picks"]),
"narrative": json.loads(r["narrative"]), "narrative": json.loads(r["narrative"]),
"tier_rationale": json.loads(r["tier_rationale"]) if r["tier_rationale"] else {},
"confidence": r["confidence"], "confidence": r["confidence"],
"model": r["model"], "model": r["model"],
"tokens_input": r["tokens_input"], "tokens_input": r["tokens_input"],
@@ -1052,3 +1141,88 @@ def get_curator_usage(days: int = 30) -> Dict[str, Any]:
"avg_latency_ms": round(float(r["avg_latency"] or 0), 1), "avg_latency_ms": round(float(r["avg_latency"] or 0), 1),
} }
def save_review(data: Dict[str, Any]) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO weekly_review (
draw_no,
curator_avg_match, curator_best_tier, curator_best_match, curator_5plus_prizes,
user_avg_match, user_best_match, user_5plus_prizes,
user_pattern_summary, draw_pattern_summary, pattern_delta
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(draw_no) DO UPDATE SET
curator_avg_match=excluded.curator_avg_match,
curator_best_tier=excluded.curator_best_tier,
curator_best_match=excluded.curator_best_match,
curator_5plus_prizes=excluded.curator_5plus_prizes,
user_avg_match=excluded.user_avg_match,
user_best_match=excluded.user_best_match,
user_5plus_prizes=excluded.user_5plus_prizes,
user_pattern_summary=excluded.user_pattern_summary,
draw_pattern_summary=excluded.draw_pattern_summary,
pattern_delta=excluded.pattern_delta
""",
(
data["draw_no"],
data.get("curator_avg_match"), data.get("curator_best_tier"),
data.get("curator_best_match"), data.get("curator_5plus_prizes"),
data.get("user_avg_match"), data.get("user_best_match"),
data.get("user_5plus_prizes"),
data.get("user_pattern_summary"), data.get("draw_pattern_summary"),
data.get("pattern_delta"),
),
)
return cur.lastrowid
def _review_row(r) -> Optional[Dict[str, Any]]:
if not r:
return None
return {
"id": r["id"],
"draw_no": r["draw_no"],
"curator_avg_match": r["curator_avg_match"],
"curator_best_tier": r["curator_best_tier"],
"curator_best_match": r["curator_best_match"],
"curator_5plus_prizes": r["curator_5plus_prizes"],
"user_avg_match": r["user_avg_match"],
"user_best_match": r["user_best_match"],
"user_5plus_prizes": r["user_5plus_prizes"],
"user_pattern_summary": r["user_pattern_summary"],
"draw_pattern_summary": r["draw_pattern_summary"],
"pattern_delta": r["pattern_delta"],
"created_at": r["created_at"],
}
def get_review(draw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM weekly_review WHERE draw_no=?", (draw_no,)).fetchone()
return _review_row(r)
def get_latest_review() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM weekly_review ORDER BY draw_no DESC LIMIT 1").fetchone()
return _review_row(r)
def get_reviews_range(start_drw: int, end_drw: int) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM weekly_review WHERE draw_no BETWEEN ? AND ? ORDER BY draw_no ASC",
(start_drw, end_drw),
).fetchall()
return [_review_row(r) for r in rows]
def list_reviews(limit: int = 10) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM weekly_review ORDER BY draw_no DESC LIMIT ?",
(limit,),
).fetchall()
return [_review_row(r) for r in rows]

View File

View File

@@ -0,0 +1,154 @@
"""주간 회고 채점 통합 잡 — 일요일 03:00 KST 실행.
1) 기존 purchase_manager.check_purchases_for_draw() 로 사용자 구매 자동 채점
2) 큐레이터 4계층 picks vs 추첨 결과 비교
3) 패턴 요약·갭 계산
4) weekly_review UPSERT
5) 4등 이상 발견 시 agent-office webhook 호출
"""
import json
import logging
import os
from typing import Optional
import httpx
from .. import db
from ..purchase_manager import check_purchases_for_draw
from .grading_helpers import (
score_picks_against_draw,
summarize_pattern,
aggregate_pattern_summaries,
compute_pattern_delta,
)
logger = logging.getLogger("lotto-backend")
AGENT_OFFICE_URL = os.environ.get("AGENT_OFFICE_URL", "http://agent-office:8000")
def _flatten_curator_picks(briefing: dict) -> list:
"""4계층 picks 를 모두 합쳐 단일 리스트(score 계산용)."""
picks = briefing.get("picks") or {}
if isinstance(picks, list):
return picks
out = []
for tier in ("core", "bonus", "extended", "pool"):
out.extend(picks.get(tier) or [])
return out
def _curator_score(briefing: dict, win_nums: list, bonus: int) -> dict:
if not briefing:
return {}
flat = _flatten_curator_picks(briefing)
if not flat:
return {}
return score_picks_against_draw(flat, win_nums, bonus)
def _user_score(drw_no: int, win_nums: list) -> dict:
purchases = db.get_purchases(draw_no=drw_no)
if not purchases:
return {}
matches = []
win_set = set(win_nums)
pattern_summaries = []
for p in purchases:
for nums in (p.get("numbers") or []):
if not nums:
continue
m = len(set(nums) & win_set)
matches.append(m)
pattern_summaries.append(summarize_pattern(nums))
if not matches:
return {}
return {
"avg_match": round(sum(matches) / len(matches), 2),
"best_match": max(matches),
"five_plus_prizes": sum(1 for m in matches if m >= 3),
"pattern_avg": aggregate_pattern_summaries(pattern_summaries),
}
def _trigger_prize_alert(drw_no: int, match_count: int, numbers: list, purchase_id: int) -> None:
try:
with httpx.Client(timeout=10) as client:
client.post(
f"{AGENT_OFFICE_URL}/api/agent-office/notify/lotto-prize",
json={
"draw_no": drw_no,
"match_count": match_count,
"numbers": numbers,
"purchase_id": purchase_id,
},
)
except Exception as e:
logger.warning(f"[grade_weekly_review] prize alert webhook failed: {e}")
def run_weekly_grading(drw_no: int) -> dict:
"""주어진 회차에 대해 채점 잡 1회 실행. 멱등."""
draw = db.get_draw(drw_no)
if not draw:
logger.warning(f"[grade_weekly_review] draw {drw_no} not found, skip")
return {"ok": False, "reason": "no draw"}
win_nums = [draw["n1"], draw["n2"], draw["n3"], draw["n4"], draw["n5"], draw["n6"]]
bonus = draw["bonus"]
# 1) 사용자 구매 자동 채점 (기존 인프라)
try:
check_purchases_for_draw(drw_no)
except Exception as e:
logger.warning(f"[grade_weekly_review] check_purchases_for_draw failed: {e}")
# 2) 4등 이상 발견 시 webhook
purchases = db.get_purchases(draw_no=drw_no, checked=True)
for p in purchases:
for r in (p.get("results") or []):
if r.get("correct", 0) >= 4:
_trigger_prize_alert(drw_no, r["correct"], r["numbers"], p["id"])
# 3) 큐레이터 자기 평가
briefing = db.get_briefing(drw_no)
cur = _curator_score(briefing, win_nums, bonus)
# 4) 사용자 평가 (재로드, 구매가 다 채점된 후 패턴 계산)
usr = _user_score(drw_no, win_nums)
# 5) 추첨 패턴 요약 + 델타
draw_summary = summarize_pattern(win_nums)
draw_pattern = {
"low_avg": draw_summary["low_count"],
"odd_avg": draw_summary["odd_count"],
"sum_avg": draw_summary["sum"],
}
user_pattern = usr.get("pattern_avg", {})
delta = compute_pattern_delta(user_pattern, draw_pattern) if user_pattern else ""
# 6) UPSERT
payload = {
"draw_no": drw_no,
"curator_avg_match": cur.get("avg_match"),
"curator_best_tier": cur.get("best_tier"),
"curator_best_match": cur.get("best_match"),
"curator_5plus_prizes": cur.get("five_plus_prizes"),
"user_avg_match": usr.get("avg_match"),
"user_best_match": usr.get("best_match"),
"user_5plus_prizes": usr.get("five_plus_prizes"),
"user_pattern_summary": json.dumps(user_pattern, ensure_ascii=False) if user_pattern else None,
"draw_pattern_summary": json.dumps(draw_pattern, ensure_ascii=False),
"pattern_delta": delta,
}
rid = db.save_review(payload)
logger.info(f"[grade_weekly_review] saved review id={rid} for draw {drw_no}")
return {"ok": True, "review_id": rid}
def run_for_latest() -> dict:
"""가장 최근 sync된 추첨 회차로 채점 — cron 진입점."""
latest = db.get_latest_draw()
if not latest:
return {"ok": False, "reason": "no draws"}
return run_weekly_grading(latest["drw_no"])

View File

@@ -0,0 +1,93 @@
"""채점 보조 — 일치 수 계산, 패턴 요약, 패턴 갭."""
from typing import List, Dict, Any
LOW_HIGH_CUT = 22 # curator_helpers.py 와 동일
def score_picks_against_draw(picks: List[Dict[str, Any]],
win_nums: List[int],
bonus: int) -> Dict[str, Any]:
"""4계층 중 한 그룹(예: core_picks 5세트) vs 추첨 결과 채점.
picks 는 [{numbers, risk_tag, reason}] 리스트.
"""
if not picks:
return {"avg_match": None, "best_match": 0, "five_plus_prizes": 0, "best_tier": None}
win_set = set(win_nums)
matches = []
for p in picks:
nums = p.get("numbers") or []
m = len(set(nums) & win_set)
matches.append((m, p.get("risk_tag")))
avg = sum(m for m, _ in matches) / len(matches)
best_match, best_tier = max(matches, key=lambda x: x[0])
five_plus = sum(1 for m, _ in matches if m >= 3) # 5등 이상
# tier별 평균 → 가장 잘 맞은 risk_tag
tier_scores: Dict[str, List[int]] = {}
for m, t in matches:
if t:
tier_scores.setdefault(t, []).append(m)
if tier_scores:
best_tier = max(tier_scores.items(),
key=lambda kv: sum(kv[1]) / len(kv[1]))[0]
return {
"avg_match": round(avg, 2),
"best_match": best_match,
"five_plus_prizes": five_plus,
"best_tier": best_tier,
}
def summarize_pattern(nums: List[int]) -> Dict[str, int]:
"""한 세트의 패턴 요약 — 저/고, 홀/짝, 합계."""
nums = sorted(nums)
odd = sum(1 for n in nums if n % 2 == 1)
low = sum(1 for n in nums if n <= LOW_HIGH_CUT)
return {
"odd_count": odd,
"even_count": 6 - odd,
"low_count": low,
"high_count": 6 - low,
"sum": sum(nums),
}
def aggregate_pattern_summaries(summaries: List[Dict[str, int]]) -> Dict[str, float]:
"""여러 세트의 패턴 요약 → 평균(low_avg, odd_avg, sum_avg)."""
if not summaries:
return {"low_avg": None, "odd_avg": None, "sum_avg": None}
n = len(summaries)
return {
"low_avg": round(sum(s["low_count"] for s in summaries) / n, 2),
"odd_avg": round(sum(s["odd_count"] for s in summaries) / n, 2),
"sum_avg": round(sum(s["sum"] for s in summaries) / n, 1),
}
def compute_pattern_delta(user_summary: Dict[str, float],
draw_summary: Dict[str, float]) -> str:
"""사용자 평균 vs 추첨 패턴의 가장 큰 격차 1~2개를 한 줄로."""
if not user_summary or user_summary.get("low_avg") is None:
return ""
deltas = []
if user_summary.get("low_avg") is not None and draw_summary.get("low_avg") is not None:
d = round(user_summary["low_avg"] - draw_summary["low_avg"], 2)
if abs(d) >= 0.5:
sign = "+" if d > 0 else ""
deltas.append(("저번호", d, f"저번호 편향 {sign}{d}"))
if user_summary.get("sum_avg") is not None and draw_summary.get("sum_avg") is not None:
d = round(user_summary["sum_avg"] - draw_summary["sum_avg"], 1)
if abs(d) >= 10:
sign = "+" if d > 0 else ""
deltas.append(("합계", d, f"합계 {sign}{d}"))
if user_summary.get("odd_avg") is not None and draw_summary.get("odd_avg") is not None:
d = round(user_summary["odd_avg"] - draw_summary["odd_avg"], 2)
if abs(d) >= 0.5:
sign = "+" if d > 0 else ""
deltas.append(("홀짝", d, f"홀짝 {sign}{d}"))
deltas.sort(key=lambda x: -abs(x[1]))
return " / ".join(d[2] for d in deltas[:2])

View File

@@ -19,6 +19,7 @@ from .db import (
get_recommendation_performance, get_recommendation_performance,
# Phase 2: 구매 이력 # Phase 2: 구매 이력
add_purchase, get_purchases, update_purchase, delete_purchase, get_purchase_stats, add_purchase, get_purchases, update_purchase, delete_purchase, get_purchase_stats,
bulk_insert_purchases_from_briefing,
# Phase 2: 주간 리포트 캐시 # Phase 2: 주간 리포트 캐시
save_weekly_report, get_weekly_report_list, get_weekly_report, save_weekly_report, get_weekly_report_list, get_weekly_report,
# Phase 2: 개인 패턴 분석 # Phase 2: 개인 패턴 분석
@@ -39,10 +40,13 @@ from .strategy_evolver import (
) )
from .routers import curator as curator_router from .routers import curator as curator_router
from .routers import briefing as briefing_router from .routers import briefing as briefing_router
from .routers import review as review_router
from .jobs.grade_weekly_review import run_for_latest as grade_run_for_latest
app = FastAPI() app = FastAPI()
app.include_router(curator_router.router) app.include_router(curator_router.router)
app.include_router(briefing_router.router) app.include_router(briefing_router.router)
app.include_router(review_router.router)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul")) scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json") ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json")
@@ -95,6 +99,17 @@ def on_startup():
scheduler.add_job(_save_weekly_report_job, "cron", day_of_week="sat", hour=9, minute=0) scheduler.add_job(_save_weekly_report_job, "cron", day_of_week="sat", hour=9, minute=0)
# 4. 주간 채점 (매주 일요일 03:00 KST — 토요일 추첨 다음날 새벽)
# 당첨번호 sync 이후 추천 vs 실제 결과 비교 → reviews 테이블 저장
scheduler.add_job(
grade_run_for_latest,
"cron",
day_of_week="sun",
hour=3,
minute=0,
id="grade_weekly_review",
)
scheduler.start() scheduler.start()
@@ -329,6 +344,22 @@ def api_purchase_delete(purchase_id: int):
return {"ok": True} return {"ok": True}
class BulkPurchaseRequest(BaseModel):
draw_no: int
tier_mode: str # core | core_bonus | core_bonus_extended | full
sets: int # 검증용 — 실제 INSERT는 briefing 기준
amount: int # 검증용
@app.post("/api/lotto/purchase/bulk", status_code=201)
def api_purchase_bulk(body: BulkPurchaseRequest):
"""결정카드 원클릭 기록 — 큐레이터 브리핑 picks 를 tier_mode 기준으로 일괄 기록."""
result = bulk_insert_purchases_from_briefing(body.draw_no, body.tier_mode, body.amount)
if not result["ok"]:
raise HTTPException(status_code=400, detail=result["reason"])
return result
# ── 전략 진화 API ────────────────────────────────────────────────────────── # ── 전략 진화 API ──────────────────────────────────────────────────────────
@app.get("/api/lotto/strategy/weights") @app.get("/api/lotto/strategy/weights")

View File

@@ -7,10 +7,24 @@ from .. import db
router = APIRouter(prefix="/api/lotto") router = APIRouter(prefix="/api/lotto")
class TierRationale(BaseModel):
bonus: str = ""
extended: str = ""
pool: str = ""
class BriefingPicks(BaseModel):
core: List[Dict[str, Any]] = Field(default_factory=list)
bonus: List[Dict[str, Any]] = Field(default_factory=list)
extended: List[Dict[str, Any]] = Field(default_factory=list)
pool: List[Dict[str, Any]] = Field(default_factory=list)
class BriefingRequest(BaseModel): class BriefingRequest(BaseModel):
draw_no: int draw_no: int
picks: List[Dict[str, Any]] picks: BriefingPicks
narrative: Dict[str, Any] narrative: Dict[str, Any]
tier_rationale: TierRationale = Field(default_factory=TierRationale)
confidence: int = Field(ge=0, le=100) confidence: int = Field(ge=0, le=100)
model: str model: str
tokens_input: int = 0 tokens_input: int = 0

View File

@@ -0,0 +1,26 @@
"""주간 회고(weekly_review) 조회 엔드포인트."""
from fastapi import APIRouter, HTTPException
from .. import db
router = APIRouter(prefix="/api/lotto/review")
@router.get("/latest")
def latest():
r = db.get_latest_review()
if not r:
raise HTTPException(404, "no review yet")
return r
@router.get("/history")
def history(limit: int = 10):
return {"reviews": db.list_reviews(limit)}
@router.get("/{draw_no}")
def get_one(draw_no: int):
r = db.get_review(draw_no)
if not r:
raise HTTPException(404, f"no review for draw {draw_no}")
return r

View File

@@ -0,0 +1,28 @@
# Lotto Curator Evolution — 1주차 운영 점검
## 일요일 (추첨 다음날)
- [ ] 03:05 KST: lotto-backend 로그에 `[grade_weekly_review] saved review id=N` 출력 확인
- [ ] `curl http://localhost:18000/api/lotto/review/latest` → JSON 정상
- [ ] purchase_history 의 직전 회차 행이 `checked=1`, `total_prize` 채워졌는지
## 월요일
- [ ] 09:05 KST: agent-office 로그에 `큐레이션 완료: #NNNN` + `[telegram_lotto] briefing` 출력
- [ ] 텔레그램 봇 채팅에 헤드라인 알림 도착 (회고 단락 포함/생략 정확)
- [ ] `curl http://localhost:18000/api/lotto/briefing/latest` → 4계층 picks(core/bonus/extended/pool 각 5세트) + tier_rationale + narrative.retrospective
## 사이트 확인
- [ ] http://localhost:3007/lotto 브리핑 탭 결정 카드 정상 렌더
- [ ] 모드 토글 4단계 동작 (5/10/15/20 펼침/접힘)
- [ ] localStorage `lotto.tier_mode` 마지막 선택 기억 (새로고침 후 유지)
- [ ] "이대로 N세트 구매" 클릭 → 토스트 + 구매탭 갱신
- [ ] 자료실 탭 첫 진입 시 모든 패널 접힘
- [ ] 구매탭 추세 차트 1주차에는 점 1개, 2주차부터 라인 형성
## 실패 케이스
- [ ] 큐레이션 실패(Anthropic API 다운): agent-office 로그 + lotto_agent state=idle, 에러 텔레그램
- [ ] 4등 이상 발견: 별도 텔레그램 푸시 도착 (3개 이하만 있으면 미발송)
- [ ] briefing 없는 회차에 bulk purchase 시도: 400 응답, 토스트 표시
## cron 시간 조정 (필요 시)
- 채점 잡: `lotto/app/main.py``scheduler.add_job(grade_run_for_latest, "cron", day_of_week="sun", hour=3, minute=0)`
- 큐레이션: `agent-office/app/scheduler.py` `add_job(_run_lotto_schedule, ..., hour=9, minute=0)`

View File

@@ -0,0 +1,52 @@
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
import pytest
from app import db
@pytest.fixture(autouse=True)
def setup_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
monkeypatch.setattr(db, "DB_PATH", str(test_db))
db.init_db()
yield
def test_save_briefing_4tier_roundtrip():
payload = {
"draw_no": 9999,
"picks": {"core":[{"numbers":[1,2,3,4,5,6],"risk_tag":"안정","reason":"x"}],
"bonus":[], "extended":[], "pool":[]},
"narrative": {"headline":"H","summary_3lines":["a","b","c"],"retrospective":"r"},
"tier_rationale": {"bonus":"b1","extended":"e1","pool":"p1"},
"confidence": 70,
"model": "test",
}
bid = db.save_briefing(payload)
assert bid > 0
got = db.get_briefing(9999)
assert got["picks"]["core"][0]["numbers"] == [1,2,3,4,5,6]
assert got["tier_rationale"]["bonus"] == "b1"
assert got["narrative"]["retrospective"] == "r"
def test_save_briefing_upsert_overwrites():
db.save_briefing({
"draw_no": 8888,
"picks": {"core":[], "bonus":[], "extended":[], "pool":[]},
"narrative": {"headline":"old","summary_3lines":["a","b","c"]},
"confidence": 50, "model": "v1",
})
db.save_briefing({
"draw_no": 8888,
"picks": {"core":[{"numbers":[10,20,30,40,41,42],"risk_tag":"공격","reason":"y"}],
"bonus":[], "extended":[], "pool":[]},
"narrative": {"headline":"new","summary_3lines":["x","y","z"]},
"tier_rationale": {"bonus":"","extended":"","pool":""},
"confidence": 90, "model": "v2",
})
got = db.get_briefing(8888)
assert got["narrative"]["headline"] == "new"
assert got["confidence"] == 90
assert got["picks"]["core"][0]["risk_tag"] == "공격"

View File

@@ -0,0 +1,53 @@
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from app import db
@pytest.fixture(autouse=True)
def setup_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
monkeypatch.setattr(db, "DB_PATH", str(test_db))
db.init_db()
yield
def _seed_briefing(drw=1153):
picks = {
"core": [{"numbers": [1, 2, 3, 4, 5, 6], "risk_tag": "안정", "reason": "x"}] * 5,
"bonus": [{"numbers": [7, 8, 9, 10, 11, 12], "risk_tag": "균형", "reason": "x"}] * 5,
"extended": [{"numbers": [13, 14, 15, 16, 17, 18], "risk_tag": "공격", "reason": "x"}] * 5,
"pool": [{"numbers": [19, 20, 21, 22, 23, 24], "risk_tag": "안정", "reason": "x"}] * 5,
}
db.save_briefing({
"draw_no": drw, "picks": picks,
"narrative": {"headline": "h", "summary_3lines": ["a", "b", "c"]},
"confidence": 70, "model": "test",
})
def test_bulk_core_inserts_5():
_seed_briefing()
r = db.bulk_insert_purchases_from_briefing(1153, "core", 5000)
assert r["ok"] and r["sets"] == 5
rows = db.get_purchases(draw_no=1153)
assert len(rows) == 5
assert all(row["source_strategy"] == "curator_core" for row in rows)
def test_bulk_full_inserts_20():
_seed_briefing()
r = db.bulk_insert_purchases_from_briefing(1153, "full", 20000)
assert r["ok"] and r["sets"] == 20
def test_bulk_unknown_tier_mode():
_seed_briefing()
r = db.bulk_insert_purchases_from_briefing(1153, "garbage", 1000)
assert r["ok"] is False and "garbage" in r["reason"]
def test_bulk_no_briefing():
r = db.bulk_insert_purchases_from_briefing(9999, "core", 5000)
assert r["ok"] is False and "not found" in r["reason"]

View File

@@ -0,0 +1,60 @@
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import json
import pytest
from app import db
from app.jobs.grade_weekly_review import run_weekly_grading
@pytest.fixture(autouse=True)
def setup_db(tmp_path, monkeypatch):
test_db = tmp_path / "test.db"
monkeypatch.setattr(db, "DB_PATH", str(test_db))
db.init_db()
yield
def _seed_draw(drw_no=1153):
db.upsert_draw({
"drw_no": drw_no, "drw_date": "2026-05-09",
"n1": 3, "n2": 11, "n3": 17, "n4": 25, "n5": 33, "n6": 41, "bonus": 8,
})
def _seed_briefing(drw_no=1153):
picks = {
"core": [
{"numbers": [3, 11, 17, 25, 33, 41], "risk_tag": "안정", "reason": "x"}, # 6
{"numbers": [1, 2, 3, 4, 5, 6], "risk_tag": "안정", "reason": "x"}, # 1
{"numbers": [3, 11, 17, 4, 5, 6], "risk_tag": "균형", "reason": "x"}, # 3
{"numbers": [11, 25, 33, 7, 8, 9], "risk_tag": "균형", "reason": "x"}, # 3
{"numbers": [3, 11, 17, 25, 33, 9], "risk_tag": "공격", "reason": "x"}, # 5
],
"bonus": [], "extended": [], "pool": [],
}
db.save_briefing({
"draw_no": drw_no, "picks": picks,
"narrative": {"headline": "h", "summary_3lines": ["a", "b", "c"], "retrospective": ""},
"confidence": 70, "model": "test",
})
def test_grade_with_curator_only_no_purchase():
_seed_draw()
_seed_briefing()
run_weekly_grading(1153)
rev = db.get_review(1153)
assert rev is not None
assert rev["curator_avg_match"] == round((6+1+3+3+5)/5, 2)
assert rev["curator_best_match"] == 6
assert rev["curator_5plus_prizes"] == 4 # 6,3,3,5 ≥3 (네 개)
assert rev["user_avg_match"] is None # 구매 없음
def test_grade_with_no_briefing():
_seed_draw()
run_weekly_grading(1153)
rev = db.get_review(1153)
assert rev is not None
assert rev["curator_avg_match"] is None

View File

@@ -0,0 +1,42 @@
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from app.jobs.grading_helpers import (
score_picks_against_draw,
summarize_pattern,
compute_pattern_delta,
)
def test_score_picks_against_draw_basic():
win_nums = [3, 11, 17, 25, 33, 41]
bonus = 8
picks = [
{"numbers": [3, 11, 17, 25, 33, 41], "risk_tag": "안정"}, # 6 일치
{"numbers": [1, 2, 3, 4, 5, 6], "risk_tag": "공격"}, # 1 일치
{"numbers": [3, 11, 17, 4, 5, 6], "risk_tag": "안정"}, # 3 일치 → 5등
]
out = score_picks_against_draw(picks, win_nums, bonus)
# 함수가 round(avg, 2) 로 반환하므로 rounded 비교
assert out["avg_match"] == 3.33
assert out["best_match"] == 6
assert out["five_plus_prizes"] == 2 # 3개 이상 카운트(5등 이상)
assert out["best_tier"] == "안정"
def test_summarize_pattern():
nums = [3, 11, 17, 25, 33, 41]
s = summarize_pattern(nums)
# 저번호(<=22) 3개, 고번호 3개, 모두 홀수이므로 홀:짝 = 6:0
assert s["low_count"] == 3
assert s["odd_count"] == 6
assert s["sum"] == 130
def test_compute_pattern_delta_picks_dominant_axis():
# 사용자가 평균 저번호 4.2개 / 추첨 평균 3 → 저번호 편향 +1.2
user = {"low_avg": 4.2, "odd_avg": 3.4, "sum_avg": 124}
draw = {"low_avg": 3.0, "odd_avg": 3.0, "sum_avg": 142}
delta = compute_pattern_delta(user, draw)
assert "저번호" in delta or "low" in delta
assert "+1.2" in delta or "1.2" in delta