16 Commits

Author SHA1 Message Date
af3df87672 docs: lotto 큐레이터 API·테이블·스케줄 반영
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 08:36:05 +09:00
c6de615271 feat(agent-office): lotto 큐레이터 월요일 07:00 스케줄 2026-04-15 08:28:22 +09:00
7c4d7b4534 feat(agent-office): LottoAgent 등록 + seed + 텔레그램 메타 2026-04-15 08:28:10 +09:00
cc17c29266 feat(agent-office): 큐레이터 파이프라인(fetch→claude→validate→save) 2026-04-15 08:27:43 +09:00
889dc417a9 feat(agent-office): 큐레이터 system 프롬프트 2026-04-15 08:27:23 +09:00
e16cf8f817 feat(agent-office): 큐레이터 응답 검증 스키마 + 테스트 2026-04-15 08:27:07 +09:00
d4a4849943 feat(agent-office): service_proxy lotto 메서드 2026-04-15 08:26:37 +09:00
21721d34a0 feat(agent-office): lotto 큐레이터 환경변수 2026-04-15 08:26:29 +09:00
86be8c2a53 feat(lotto): curator/briefing 라우터 마운트 2026-04-15 08:24:14 +09:00
753ecdbbf2 feat(lotto): briefing CRUD + 큐레이터 사용량 라우터 2026-04-15 08:24:06 +09:00
1ec45acb95 feat(lotto): curator candidates/context 라우터 2026-04-15 08:23:53 +09:00
d1fec71bdc fix(lotto): curator_helpers 시그니처 정합 (recommender/analyzer/strategy_evolver 실제 시그니처에 맞춤)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 08:22:56 +09:00
4a8b0092d7 feat(lotto): curator_helpers — 후보 병합·피처·맥락
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 08:20:51 +09:00
e1ae0f7501 feat(lotto): lotto_briefings 테이블 + CRUD 함수
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 08:18:20 +09:00
adb5cdb54e docs: lotto AI curator 설계/구현 계획 추가
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-15 03:45:14 +09:00
e691ed9a7d docs(lotto): AI 큐레이터 설계 스펙 추가
- 주간 AI 큐레이터: 월요일 07:00 자동 생성, Claude Sonnet 4.5
- lotto-backend = 엔진·저장소, agent-office = AI 판단 분리
- 브리핑 중심 프론트 재배치(3탭), 토큰·비용 노출
- 최종 미사용 DB/코드 정리 패스 포함
2026-04-15 03:35:29 +09:00
22 changed files with 2936 additions and 0 deletions

View File

@@ -153,6 +153,7 @@ docker compose up -d
| `strategy_performance` | 전략별 회차 성과 (EMA 입력 데이터) |
| `strategy_weights` | 메타 전략 가중치 (EMA + Softmax) |
| `weekly_reports` | 주간 공략 리포트 캐시 |
| `lotto_briefings` | AI 큐레이터 주간 브리핑 (5세트 + 내러티브 + 토큰·비용 집계) |
| `todos` | 투두리스트 (UUID PK) |
| `blog_posts` | 블로그 글 (tags: JSON 배열) |
@@ -197,6 +198,13 @@ docker compose up -d
| POST | `/api/blog/posts` | 블로그 글 생성 (date 미입력 시 오늘) |
| PUT | `/api/blog/posts/{id}` | 블로그 글 수정 |
| DELETE | `/api/blog/posts/{id}` | 블로그 글 삭제 |
| GET | `/api/lotto/curator/candidates` | 큐레이터용 후보 N세트 + 피처 |
| GET | `/api/lotto/curator/context` | 주간 맥락(핫/콜드·직전 회차) |
| GET | `/api/lotto/curator/usage` | 큐레이터 토큰·비용 집계 |
| POST | `/api/lotto/briefing` | AI 브리핑 저장 |
| GET | `/api/lotto/briefing/latest` | 최신 브리핑 |
| GET | `/api/lotto/briefing/{draw_no}` | 특정 회차 브리핑 |
| GET | `/api/lotto/briefing` | 브리핑 이력 |
### stock-lab (stock-lab/)
- Windows AI 서버 연동: `WINDOWS_AI_SERVER_URL=http://192.168.45.59:8000`
@@ -430,6 +438,8 @@ docker compose up -d
- `CONVERSATION_MODEL`: 대화 모델 (기본 `claude-haiku-4-5-20251001`)
- `CONVERSATION_HISTORY_LIMIT`: 이력 주입 수 (기본 20)
- `CONVERSATION_RATE_PER_MIN`: 채팅당 분당 최대 메시지 (기본 6)
- `LOTTO_BACKEND_URL`: 기본 `http://lotto-backend:8000`
- `LOTTO_CURATOR_MODEL`: 기본 `claude-sonnet-4-5`
**텔레그램 자연어 대화 (옵션 B)**
- 슬래시 명령이 아닌 일반 문장을 보내면 Claude Haiku 4.5가 응답
@@ -440,6 +450,7 @@ docker compose up -d
**스케줄러 job**
- 07:30 매일 — 주식 뉴스 요약 (`stock_news_job`)
- 매주 월요일 07:00 — 로또 큐레이터 브리핑 (`lotto_curate`)
- 60초 간격 — 유휴 에이전트 휴식 체크 (`idle_check_job`)
**agent-office API 목록**

View File

@@ -2,6 +2,7 @@ from .stock import StockAgent
from .music import MusicAgent
from .blog import BlogAgent
from .realestate import RealestateAgent
from .lotto import LottoAgent
AGENT_REGISTRY = {}
@@ -10,6 +11,7 @@ def init_agents():
AGENT_REGISTRY["music"] = MusicAgent()
AGENT_REGISTRY["blog"] = BlogAgent()
AGENT_REGISTRY["realestate"] = RealestateAgent()
AGENT_REGISTRY["lotto"] = LottoAgent()
def get_agent(agent_id: str):
return AGENT_REGISTRY.get(agent_id)

View File

@@ -0,0 +1,44 @@
from .base import BaseAgent
from ..db import create_task, update_task_status, add_log
from ..curator.pipeline import curate_weekly, CuratorError
class LottoAgent(BaseAgent):
agent_id = "lotto"
display_name = "로또 큐레이터"
async def on_schedule(self) -> None:
if self.state not in ("idle", "break"):
return
await self._run(source="auto")
async def on_command(self, action: str, params: dict) -> dict:
if action in ("curate_now", "curate_weekly"):
return await self._run(source="manual")
if action == "status":
return {"ok": True, "message": f"{self.state}: {self.state_detail}"}
return {"ok": False, "message": f"unknown action: {action}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
pass
async def _run(self, source: str) -> dict:
task_id = create_task(self.agent_id, "curate_weekly", {"source": source})
await self.transition("working", "후보 수집 및 AI 큐레이션 중...", task_id)
try:
result = await curate_weekly(source=source)
update_task_status(task_id, "succeeded", result_data=result)
await self.transition("reporting", f"#{result['draw_no']} 브리핑 저장 완료")
add_log(self.agent_id, f"큐레이션 완료: #{result['draw_no']} conf={result['confidence']}", task_id=task_id)
await self.transition("idle", "대기 중")
return {"ok": True, **result}
except CuratorError as e:
update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log(self.agent_id, f"큐레이션 실패: {e}", level="error", task_id=task_id)
await self.transition("idle", "오류")
return {"ok": False, "message": str(e)}
except Exception as e:
update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log(self.agent_id, f"큐레이션 예외: {e}", level="error", task_id=task_id)
await self.transition("idle", "오류")
return {"ok": False, "message": f"{type(e).__name__}: {e}"}

View File

@@ -30,3 +30,7 @@ CORS_ALLOW_ORIGINS = os.getenv(
IDLE_BREAK_THRESHOLD = int(os.getenv("IDLE_BREAK_THRESHOLD", "300")) # 5 min
BREAK_DURATION_MIN = int(os.getenv("BREAK_DURATION_MIN", "60")) # 1 min
BREAK_DURATION_MAX = int(os.getenv("BREAK_DURATION_MAX", "180")) # 3 min
# Lotto Curator
LOTTO_BACKEND_URL = os.getenv("LOTTO_BACKEND_URL", "http://lotto-backend:8000")
LOTTO_CURATOR_MODEL = os.getenv("LOTTO_CURATOR_MODEL", "claude-sonnet-4-5")

View File

View File

@@ -0,0 +1,121 @@
"""큐레이터 파이프라인 — fetch → claude → validate → save."""
import json
import time
from typing import Any, Dict
import httpx
from ..config import ANTHROPIC_API_KEY, LOTTO_CURATOR_MODEL
from .. import service_proxy
from .prompt import SYSTEM_PROMPT, build_user_message
from .schema import validate_response
API_URL = "https://api.anthropic.com/v1/messages"
class CuratorError(Exception):
pass
async def _call_claude(user_text: str, feedback: str = "") -> tuple[dict, dict]:
if not ANTHROPIC_API_KEY:
raise CuratorError("ANTHROPIC_API_KEY missing")
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"anthropic-beta": "prompt-caching-2024-07-31",
"content-type": "application/json",
}
system_blocks = [{
"type": "text",
"text": SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"},
}]
if feedback:
user_text = f"이전 응답이 다음 이유로 거절됨: {feedback}\n올바른 스키마로 다시 응답.\n\n{user_text}"
payload = {
"model": LOTTO_CURATOR_MODEL,
"max_tokens": 4096,
"system": system_blocks,
"messages": [{"role": "user", "content": [{"type": "text", "text": user_text}]}],
}
started = time.monotonic()
async with httpx.AsyncClient(timeout=120) as client:
r = await client.post(API_URL, headers=headers, json=payload)
r.raise_for_status()
resp = r.json()
latency_ms = int((time.monotonic() - started) * 1000)
text = "".join(
b.get("text", "") for b in resp.get("content", []) if b.get("type") == "text"
).strip()
if text.startswith("```"):
text = text.strip("`")
if text.startswith("json"):
text = text[4:]
text = text.strip()
parsed = json.loads(text)
usage = resp.get("usage", {}) or {}
return parsed, {
"input": int(usage.get("input_tokens", 0) or 0),
"output": int(usage.get("output_tokens", 0) or 0),
"cache_read": int(usage.get("cache_read_input_tokens", 0) or 0),
"cache_write": int(usage.get("cache_creation_input_tokens", 0) or 0),
"latency_ms": latency_ms,
}
async def curate_weekly(source: str = "auto") -> Dict[str, Any]:
cand_resp = await service_proxy.lotto_candidates(n=20)
draw_no = cand_resp["draw_no"]
candidates = cand_resp["candidates"]
context = await service_proxy.lotto_context()
user_text = build_user_message(draw_no, candidates, {
"hot_numbers": context.get("hot_numbers", []),
"cold_numbers": context.get("cold_numbers", []),
"last_draw_summary": context.get("last_draw_summary", ""),
"my_recent_performance": context.get("my_recent_performance", []),
})
candidate_numbers = [c["numbers"] for c in candidates]
usage_total = {"input": 0, "output": 0, "cache_read": 0, "cache_write": 0, "latency_ms": 0}
last_error = None
validated = None
for attempt in (0, 1):
try:
raw, usage = await _call_claude(user_text, feedback=last_error or "")
for k in usage_total:
usage_total[k] += usage[k]
validated = validate_response(raw, candidate_numbers)
break
except Exception as e:
last_error = f"{type(e).__name__}: {e}"
if validated is None:
raise CuratorError(f"schema validation failed after retry: {last_error}")
payload = {
"draw_no": draw_no,
"picks": [p.model_dump() for p in validated.picks],
"narrative": validated.narrative.model_dump(),
"confidence": validated.confidence,
"model": LOTTO_CURATOR_MODEL,
"tokens_input": usage_total["input"],
"tokens_output": usage_total["output"],
"cache_read": usage_total["cache_read"],
"cache_write": usage_total["cache_write"],
"latency_ms": usage_total["latency_ms"],
"source": source,
}
await service_proxy.lotto_save_briefing(payload)
return {
"ok": True,
"draw_no": draw_no,
"confidence": validated.confidence,
"tokens": {"input": usage_total["input"], "output": usage_total["output"]},
}

View File

@@ -0,0 +1,46 @@
"""큐레이터 system/user 프롬프트. system은 정적이므로 캐시 대상."""
import json
SYSTEM_PROMPT = """당신은 로또 번호 큐레이터입니다. 주어진 후보 20세트 중 5세트를 다음 규칙으로 선별합니다.
선별 규칙:
- 5세트의 리스크 분포는 안정 2 · 균형 2 · 공격 1 을 권장(유연 ±1).
- 홀짝 비율, 저/고 구간, 연속번호 포함 여부가 세트끼리 겹치지 않도록 다양성을 확보.
- hot_number_count=0 이고 cold_number_count=0 인 '중립형' 세트를 최소 1개 포함.
- 후보에 없는 번호 조합은 절대 사용 금지. numbers 필드는 반드시 candidates 중 하나와 정확히 일치해야 함.
- 각 세트 reason은 한국어 40자 이내 한 줄. 해당 세트의 features 값과 context 값만 근거로.
narrative 규칙:
- headline: 한 줄, 이번 주 추첨 전망 요약.
- summary_3lines: 정확히 3개 항목의 배열.
- hot_cold_comment: hot/cold 번호에 대한 한 줄 논평.
- warnings: 특별한 주의사항 없으면 빈 문자열.
출력은 반드시 JSON 하나, 그 외 어떤 텍스트도 금지. 스키마:
{
"picks": [
{"numbers":[int,int,int,int,int,int], "risk_tag":"안정"|"균형"|"공격", "reason": str}
],
"narrative": {
"headline": str,
"summary_3lines": [str, str, str],
"hot_cold_comment": str,
"warnings": str
},
"confidence": int (0~100)
}
"""
def build_user_message(draw_no: int, candidates: list, context: dict) -> str:
payload = {
"draw_no": draw_no,
"context": context,
"candidates": candidates,
}
return (
f"이번 회차: {draw_no}\n"
f"아래 데이터로 5세트를 큐레이션하고 위 스키마로만 응답하세요.\n\n"
f"```json\n{json.dumps(payload, ensure_ascii=False)}\n```"
)

View File

@@ -0,0 +1,41 @@
from typing import List, Literal
from pydantic import BaseModel, Field, field_validator
class Pick(BaseModel):
numbers: List[int] = Field(min_length=6, max_length=6)
risk_tag: Literal["안정", "균형", "공격"]
reason: str = Field(max_length=80)
@field_validator("numbers")
@classmethod
def _check_numbers(cls, v):
if len(set(v)) != 6:
raise ValueError("numbers must be 6 unique integers")
if any(n < 1 or n > 45 for n in v):
raise ValueError("numbers must be within 1..45")
return sorted(v)
class Narrative(BaseModel):
headline: str
summary_3lines: List[str] = Field(min_length=3, max_length=3)
hot_cold_comment: str = ""
warnings: str = ""
class CuratorOutput(BaseModel):
picks: List[Pick]
narrative: Narrative
confidence: int = Field(ge=0, le=100)
def validate_response(data: dict, candidate_numbers: List[List[int]]) -> CuratorOutput:
out = CuratorOutput.model_validate(data)
if len(out.picks) != 5:
raise ValueError("picks must have exactly 5 sets")
candidate_set = {tuple(sorted(c)) for c in candidate_numbers}
for p in out.picks:
if tuple(p.numbers) not in candidate_set:
raise ValueError(f"pick {p.numbers} not in candidates")
return out

View File

@@ -92,6 +92,7 @@ def init_db() -> None:
("music", "음악 프로듀서"),
("blog", "블로그 마케터"),
("realestate", "청약 애널리스트"),
("lotto", "로또 큐레이터"),
]:
conn.execute(
"INSERT OR IGNORE INTO agent_config(agent_id, display_name) VALUES(?,?)",

View File

@@ -24,9 +24,15 @@ async def _run_blog_schedule():
if agent:
await agent.on_schedule()
async def _run_lotto_schedule():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.on_schedule()
def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
scheduler.add_job(_run_realestate_schedule, "cron", hour=9, minute=15, id="realestate_report")
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate")
scheduler.add_job(_check_idle_breaks, "interval", seconds=60, id="idle_check")
scheduler.start()

View File

@@ -131,3 +131,26 @@ async def realestate_mark_read(match_id: int) -> Dict[str, Any]:
resp = await _client.patch(f"{REALESTATE_LAB_URL}/api/realestate/matches/{match_id}/read")
resp.raise_for_status()
return resp.json()
# --- lotto-backend ---
async def lotto_candidates(n: int = 20) -> Dict[str, Any]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/curator/candidates", params={"n": n})
resp.raise_for_status()
return resp.json()
async def lotto_context() -> Dict[str, Any]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/curator/context")
resp.raise_for_status()
return resp.json()
async def lotto_save_briefing(payload: dict) -> Dict[str, Any]:
from .config import LOTTO_BACKEND_URL
resp = await _client.post(f"{LOTTO_BACKEND_URL}/api/lotto/briefing", json=payload)
resp.raise_for_status()
return resp.json()

View File

@@ -11,6 +11,10 @@ AGENT_META = {
"emoji": "🎵",
"color": "#44aa88",
},
"lotto": {
"emoji": "🎱",
"display_name": "로또 큐레이터",
},
}

View File

@@ -0,0 +1,60 @@
import pytest
from app.curator.schema import validate_response, CuratorOutput
CANDIDATE_NUMBERS = [
[1, 2, 3, 4, 5, 6],
[7, 8, 9, 10, 11, 12],
[13, 14, 15, 16, 17, 18],
[19, 20, 21, 22, 23, 24],
[25, 26, 27, 28, 29, 30],
[31, 32, 33, 34, 35, 36],
]
def _valid_payload():
return {
"picks": [
{"numbers": s, "risk_tag": "안정", "reason": "test"}
for s in CANDIDATE_NUMBERS[:5]
],
"narrative": {
"headline": "h", "summary_3lines": ["a", "b", "c"],
"hot_cold_comment": "hc", "warnings": "",
},
"confidence": 80,
}
def test_valid_payload_passes():
result = validate_response(_valid_payload(), CANDIDATE_NUMBERS)
assert isinstance(result, CuratorOutput)
assert len(result.picks) == 5
def test_rejects_number_out_of_candidates():
bad = _valid_payload()
bad["picks"][0]["numbers"] = [40, 41, 42, 43, 44, 45] # valid numbers but not in candidates
with pytest.raises(ValueError, match="not in candidates"):
validate_response(bad, CANDIDATE_NUMBERS)
def test_rejects_wrong_pick_count():
bad = _valid_payload()
bad["picks"] = bad["picks"][:3]
with pytest.raises(ValueError, match="exactly 5"):
validate_response(bad, CANDIDATE_NUMBERS)
def test_rejects_duplicate_numbers_within_set():
bad = _valid_payload()
bad["picks"][0]["numbers"] = [1, 1, 2, 3, 4, 5]
with pytest.raises(ValueError):
validate_response(bad, CANDIDATE_NUMBERS)
def test_rejects_invalid_risk_tag():
bad = _valid_payload()
bad["picks"][0]["risk_tag"] = "미친"
with pytest.raises(ValueError):
validate_response(bad, CANDIDATE_NUMBERS)

View File

@@ -0,0 +1,151 @@
"""큐레이터용 후보 가공 — 여러 엔진 결과를 하나로 병합, 중복 제거, 피처 계산."""
from typing import Dict, List, Any, Set
from . import db
from .recommender import recommend_numbers, recommend_with_heatmap
from .analyzer import get_statistical_report
from .strategy_evolver import generate_smart_recommendation
LOW_HIGH_CUT = 22
def compute_features(numbers: List[int], hot: Set[int], cold: Set[int]) -> Dict[str, Any]:
nums = sorted(numbers)
odd = sum(1 for n in nums if n % 2 == 1)
low = sum(1 for n in nums if n <= LOW_HIGH_CUT)
buckets = [0, 0, 0, 0, 0]
for n in nums:
if n <= 10: buckets[0] += 1
elif n <= 20: buckets[1] += 1
elif n <= 30: buckets[2] += 1
elif n <= 40: buckets[3] += 1
else: buckets[4] += 1
consecutive = any(nums[i+1] - nums[i] == 1 for i in range(len(nums) - 1))
return {
"odd_count": odd,
"even_count": 6 - odd,
"low_count": low,
"high_count": 6 - low,
"range_distribution": buckets,
"has_consecutive": consecutive,
"hot_number_count": len(set(nums) & hot),
"cold_number_count": len(set(nums) & cold),
"sum": sum(nums),
}
def _key(numbers: List[int]) -> str:
return ",".join(str(n) for n in sorted(numbers))
def collect_candidates(n: int, hot: Set[int], cold: Set[int]) -> List[Dict[str, Any]]:
"""우선순위: simulation best_picks → meta → heatmap → statistics. 중복 제거 후 최대 n세트."""
seen: Dict[str, Dict[str, Any]] = {}
order: List[str] = []
def _add(numbers: List[int], source: str) -> None:
if not numbers:
return
k = _key(numbers)
if k in seen:
return
seen[k] = {"numbers": sorted(numbers), "source": source}
order.append(k)
# 1. simulation best_picks
try:
for row in db.get_best_picks(limit=n):
_add(row.get("numbers") or [], "simulation")
except Exception:
pass
# draws는 한 번만 로드
draws = []
try:
draws = db.get_all_draw_numbers()
except Exception:
pass
# 2. meta-strategy (smart)
try:
meta = generate_smart_recommendation(sets=n)
for s in meta.get("sets", []):
_add(s.get("numbers") or [], "meta")
except Exception:
pass
# 3. heatmap (n번 호출, 중복 회피)
if draws:
try:
for _ in range(n * 2):
if len(order) >= n * 2:
break
r = recommend_with_heatmap(draws, [])
_add(r.get("numbers") or [], "heatmap")
except Exception:
pass
# 4. statistics
if draws:
try:
for _ in range(n * 2):
if len(order) >= n * 2:
break
r = recommend_numbers(draws)
_add(r.get("numbers") or [], "statistics")
except Exception:
pass
out = []
for k in order[:n]:
item = seen[k]
item["features"] = compute_features(item["numbers"], hot, cold)
out.append(item)
return out
def build_context(hot_limit: int = 10, cold_limit: int = 10) -> Dict[str, Any]:
"""주간 맥락 패키지 — get_statistical_report가 이미 hot/cold를 제공."""
hot: List[int] = []
cold: List[int] = []
last_summary = ""
try:
draws = db.get_all_draw_numbers()
except Exception:
draws = []
if draws:
try:
report = get_statistical_report(draws)
hot = list(report.get("hot_numbers", []))[:hot_limit]
cold = list(report.get("cold_numbers", []))[:cold_limit]
except Exception:
pass
try:
latest = db.get_latest_draw()
except Exception:
latest = None
if latest:
nums = [latest.get(f"n{i}") for i in range(1, 7)]
nums = [n for n in nums if n is not None]
if nums:
odd = sum(1 for n in nums if n % 2 == 1)
low = sum(1 for n in nums if n <= LOW_HIGH_CUT)
last_summary = f"{latest.get('drw_no')}회: {', '.join(str(n) for n in nums)} (홀{odd}{6-odd}, 저{low}{6-low})"
my_perf: List[Dict[str, Any]] = []
try:
from .purchase_manager import get_recent_performance
my_perf = get_recent_performance(limit=3)
except Exception:
my_perf = []
return {
"hot_numbers": hot,
"cold_numbers": cold,
"last_draw_summary": last_summary,
"my_recent_performance": my_perf,
}

View File

@@ -277,6 +277,26 @@ def init_db() -> None:
conn.execute("CREATE INDEX IF NOT EXISTS idx_purchase_strategy ON purchase_history(source_strategy)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_purchase_checked ON purchase_history(draw_no, checked)")
# ── lotto_briefings 테이블 ─────────────────────────────────────────────
conn.execute("""
CREATE TABLE IF NOT EXISTS lotto_briefings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draw_no INTEGER UNIQUE NOT NULL,
picks TEXT NOT NULL,
narrative TEXT NOT NULL,
confidence INTEGER NOT NULL,
model TEXT NOT NULL,
tokens_input INTEGER NOT NULL DEFAULT 0,
tokens_output INTEGER NOT NULL DEFAULT 0,
cache_read INTEGER NOT NULL DEFAULT 0,
cache_write INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER NOT NULL DEFAULT 0,
source TEXT NOT NULL DEFAULT 'auto',
generated_at TEXT NOT NULL DEFAULT (datetime('now','localtime'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_briefings_draw ON lotto_briefings(draw_no DESC)")
# ── todos CRUD ───────────────────────────────────────────────────────────────
@@ -1096,3 +1116,104 @@ def update_purchase_results(purchase_id: int, results: list, total_prize: int) -
(json.dumps(results, ensure_ascii=False), total_prize, purchase_id),
)
# --- Lotto Briefings ---
def save_briefing(data: Dict[str, Any]) -> int:
with _conn() as conn:
cur = conn.execute("""
INSERT INTO lotto_briefings
(draw_no, picks, narrative, confidence, model,
tokens_input, tokens_output, cache_read, cache_write,
latency_ms, source)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(draw_no) DO UPDATE SET
picks=excluded.picks, narrative=excluded.narrative,
confidence=excluded.confidence, model=excluded.model,
tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output,
cache_read=excluded.cache_read,
cache_write=excluded.cache_write,
latency_ms=excluded.latency_ms,
source=excluded.source,
generated_at=datetime('now','localtime')
""", (
data["draw_no"],
json.dumps(data["picks"], ensure_ascii=False),
json.dumps(data["narrative"], ensure_ascii=False),
int(data["confidence"]),
data["model"],
int(data.get("tokens_input", 0)),
int(data.get("tokens_output", 0)),
int(data.get("cache_read", 0)),
int(data.get("cache_write", 0)),
int(data.get("latency_ms", 0)),
data.get("source", "auto"),
))
return cur.lastrowid
def _briefing_row(r) -> Dict[str, Any]:
return {
"id": r["id"],
"draw_no": r["draw_no"],
"picks": json.loads(r["picks"]),
"narrative": json.loads(r["narrative"]),
"confidence": r["confidence"],
"model": r["model"],
"tokens_input": r["tokens_input"],
"tokens_output": r["tokens_output"],
"cache_read": r["cache_read"],
"cache_write": r["cache_write"],
"latency_ms": r["latency_ms"],
"source": r["source"],
"generated_at": r["generated_at"],
}
def get_latest_briefing() -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM lotto_briefings ORDER BY draw_no DESC LIMIT 1").fetchone()
return _briefing_row(r) if r else None
def get_briefing(draw_no: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
r = conn.execute("SELECT * FROM lotto_briefings WHERE draw_no=?", (draw_no,)).fetchone()
return _briefing_row(r) if r else None
def list_briefings(limit: int = 10) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM lotto_briefings ORDER BY draw_no DESC LIMIT ?",
(limit,),
).fetchall()
return [_briefing_row(r) for r in rows]
def get_curator_usage(days: int = 30) -> Dict[str, Any]:
with _conn() as conn:
r = conn.execute("""
SELECT COUNT(*) AS calls,
SUM(tokens_input) AS in_tokens,
SUM(tokens_output) AS out_tokens,
SUM(cache_read) AS cache_read,
SUM(cache_write) AS cache_write,
AVG(latency_ms) AS avg_latency
FROM lotto_briefings
WHERE generated_at >= datetime('now', ?, 'localtime')
""", (f"-{int(days)} days",)).fetchone()
cr = int(r["cache_read"] or 0)
cw = int(r["cache_write"] or 0)
return {
"days": days,
"calls": int(r["calls"] or 0),
"tokens_input": int(r["in_tokens"] or 0),
"tokens_output": int(r["out_tokens"] or 0),
"cache_read": cr,
"cache_write": cw,
"cache_hit_rate": round(cr / (cr + cw), 3) if (cr + cw) > 0 else 0.0,
"avg_latency_ms": round(float(r["avg_latency"] or 0), 1),
}

View File

@@ -41,8 +41,12 @@ from .strategy_evolver import (
get_weights_with_trend, recalculate_weights,
generate_smart_recommendation,
)
from .routers import curator as curator_router
from .routers import briefing as briefing_router
app = FastAPI()
app.include_router(curator_router.router)
app.include_router(briefing_router.router)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
ALL_URL = os.getenv("LOTTO_ALL_URL", "https://smok95.github.io/lotto/results/all.json")

View File

@@ -97,3 +97,20 @@ def check_purchases_for_draw(drw_no: int) -> int:
logger.info(f"[purchase_manager] {drw_no}회차 구매 {count}건 체크 완료")
return count
def get_recent_performance(limit: int = 3) -> list:
"""최근 N회차 내 구매 성과 요약. 없으면 빈 리스트."""
from . import db
purchases = db.get_purchases() or []
by_draw: dict = {}
for p in purchases:
d = p.get("draw_no")
if not d:
continue
results = p.get("results") or []
max_correct = max((int(r.get("correct") or 0) for r in results), default=0)
slot = by_draw.setdefault(d, {"draw_no": d, "purchased_sets": 0, "best_match": 0})
slot["purchased_sets"] += int(p.get("sets") or 1)
slot["best_match"] = max(slot["best_match"], max_correct)
return sorted(by_draw.values(), key=lambda x: -x["draw_no"])[:limit]

View File

View File

@@ -0,0 +1,53 @@
"""브리핑 저장/조회 + 큐레이터 사용량 엔드포인트."""
from typing import Any, Dict, List
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from .. import db
router = APIRouter(prefix="/api/lotto")
class BriefingRequest(BaseModel):
draw_no: int
picks: List[Dict[str, Any]]
narrative: Dict[str, Any]
confidence: int = Field(ge=0, le=100)
model: str
tokens_input: int = 0
tokens_output: int = 0
cache_read: int = 0
cache_write: int = 0
latency_ms: int = 0
source: str = "auto"
@router.post("/briefing", status_code=201)
def save_briefing(body: BriefingRequest):
bid = db.save_briefing(body.model_dump())
return {"ok": True, "id": bid}
@router.get("/briefing/latest")
def latest():
b = db.get_latest_briefing()
if not b:
raise HTTPException(404, "no briefing yet")
return b
@router.get("/briefing/{draw_no}")
def get_one(draw_no: int):
b = db.get_briefing(draw_no)
if not b:
raise HTTPException(404, f"no briefing for draw {draw_no}")
return b
@router.get("/briefing")
def history(limit: int = 10):
return {"briefings": db.list_briefings(limit)}
@router.get("/curator/usage")
def usage(days: int = 30):
return db.get_curator_usage(days)

View File

@@ -0,0 +1,24 @@
"""큐레이터 입력 엔드포인트 — agent-office에서만 호출."""
from fastapi import APIRouter
from ..curator_helpers import collect_candidates, build_context
from .. import db
router = APIRouter(prefix="/api/lotto/curator")
@router.get("/candidates")
def candidates(n: int = 20):
ctx = build_context()
hot = set(ctx["hot_numbers"])
cold = set(ctx["cold_numbers"])
latest = db.get_latest_draw()
draw_no = (latest["drw_no"] + 1) if latest else 0
items = collect_candidates(n, hot, cold)
return {"draw_no": draw_no, "candidates": items}
@router.get("/context")
def context():
latest = db.get_latest_draw()
draw_no = (latest["drw_no"] + 1) if latest else 0
return {"draw_no": draw_no, **build_context()}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,350 @@
# Lotto AI 큐레이터 — 설계 문서
> 작성일: 2026-04-15
> 목표: 난잡한 lotto 랩을 **주간 AI 브리핑**을 축으로 재정리. 매주 월요일 아침 자동으로 "이번 주 5세트 + 내러티브 리포트"를 생성해 구매 의사결정 참고.
---
## 1. 배경
- 현재 lotto 랩은 분석(5가지)·추천(통계/히트맵/메타)·시뮬레이션·전략진화 등 기능이 풍부하지만 출력이 분산되어 "결국 뭘 사야 하지"가 한눈에 들어오지 않음.
- `docs/lotto-premium-roadmap.md` Phase 1 방향(신뢰 기반 + 주간 리포트)을 AI 활용으로 압축 실행.
## 2. 핵심 결정사항
| 항목 | 결정 |
|------|------|
| AI 역할 | **큐레이터(Curator)** — 숫자 생성 X, 기존 엔진 후보 중 5세트 선별 + 내러티브 작성 |
| 브리핑 형식 | **A+B 조합** — 리포트형 내러티브 + 최종 5세트 카드 |
| 트리거 | **매주 월요일 07:00 자동 생성** (웹 UI 전용, 텔레그램 미전송) |
| 로직 위치 | **agent-office `lotto` 에이전트** (lotto-backend는 엔진·저장소 역할만) |
| 모델 | `claude-sonnet-4-5` (주 1회 호출, 품질 우선) — 환경변수 `LOTTO_CURATOR_MODEL` |
| 사용량 노출 | 브리핑 카드 + 큐레이터 사용량 API(월간 집계) |
---
## 3. 아키텍처
```
┌──────────────────────────────────────────────────────────────┐
│ 월요일 07:00 APScheduler (agent-office) │
│ → lotto 에이전트 curate_weekly 태스크 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. GET /api/lotto/curator/candidates?n=20 │ │
│ │ 2. GET /api/lotto/curator/context │ │
│ │ 3. Claude Sonnet 4.5 호출 (strict JSON out) │ │
│ │ 4. 스키마·번호 검증 + 1회 재시도 │ │
│ │ 5. POST /api/lotto/briefing (저장) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 사용자는 웹에서: │
│ GET /api/lotto/briefing/latest (최신 표시) │
│ POST /api/agent-office/command {agent:"lotto", …} (수동) │
└──────────────────────────────────────────────────────────────┘
```
서비스 경계: **lotto-backend = 데이터·엔진 / agent-office = AI 판단**.
---
## 4. Backend (lotto-backend)
### 4.1 신규 API
| 메서드 | 경로 | 설명 |
|--------|------|------|
| GET | `/api/lotto/curator/candidates` | 큐레이터용 후보 N세트 + 세트별 피처 |
| GET | `/api/lotto/curator/context` | 주간 맥락(핫/콜드·직전 회차 분석·내 최근 성과) |
| POST | `/api/lotto/briefing` | 큐레이터 결과 저장 |
| GET | `/api/lotto/briefing/latest` | 최신 브리핑 |
| GET | `/api/lotto/briefing/{draw_no}` | 특정 회차 브리핑 |
| GET | `/api/lotto/briefing?limit=10` | 브리핑 이력 |
| GET | `/api/lotto/curator/usage?days=30` | 큐레이터 토큰·비용 집계 |
### 4.2 `GET /curator/candidates` 응답 구조
```json
{
"draw_no": 1180,
"generated_at": "2026-04-13T07:00:00Z",
"candidates": [
{
"numbers": [3, 14, 22, 29, 35, 41],
"source": "simulation" | "meta" | "heatmap" | "statistics",
"features": {
"odd_count": 3,
"even_count": 3,
"low_count": 3, // 1~22
"high_count": 3, // 23~45
"range_distribution": [1,1,1,1,1,1], // 1-10,11-20,...,41-45
"has_consecutive": true,
"hot_number_count": 1, // context.hot_numbers 교집합
"cold_number_count": 2, // context.cold_numbers 교집합
"sum": 144,
"historical_match_avg": 2.3 // 이 세트가 과거 실제 회차와 평균 몇 개 일치
}
}
]
}
```
중복 제거: 6숫자 정렬 튜플 기준 set 해시. 각 세트의 `source`는 가장 먼저 포함시킨 엔진.
### 4.3 `GET /curator/context` 응답 구조
```json
{
"draw_no": 1180,
"hot_numbers": [3, 17, 28], // 최근 10회 과출현 top
"cold_numbers": [7, 22, 41], // 최근 30회 미출현 top
"last_draw_summary": "1179회: 7, 12, 18, 24, 31, 40 (홀4짝2, 저4고2)",
"recent_analysis": {
"avg_sum": 138,
"avg_odd_count": 2.8
},
"my_recent_performance": [
{ "draw_no": 1177, "purchased_sets": 5, "best_match": 3 },
{ "draw_no": 1178, "purchased_sets": 5, "best_match": 2 },
{ "draw_no": 1179, "purchased_sets": 5, "best_match": 4 }
]
}
```
### 4.4 신규 테이블 `lotto_briefings`
```sql
CREATE TABLE lotto_briefings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
draw_no INTEGER UNIQUE NOT NULL,
picks TEXT NOT NULL, -- JSON: 5세트 + reason + risk_tag
narrative TEXT NOT NULL, -- JSON: headline/summary_3lines/hot_cold/warnings
confidence INTEGER NOT NULL, -- 0~100
model TEXT NOT NULL,
tokens_input INTEGER DEFAULT 0,
tokens_output INTEGER DEFAULT 0,
cache_read INTEGER DEFAULT 0,
cache_write INTEGER DEFAULT 0,
latency_ms INTEGER DEFAULT 0,
source TEXT NOT NULL DEFAULT 'auto', -- 'auto' | 'manual'
generated_at TEXT NOT NULL DEFAULT (datetime('now','localtime'))
);
CREATE INDEX idx_briefings_draw ON lotto_briefings(draw_no DESC);
```
### 4.5 파일 구조 정리
`backend/app/main.py` 933줄 → 라우터 분리:
- `backend/app/routers/briefing.py` — briefing CRUD + curator usage
- `backend/app/routers/curator.py` — candidates / context
- `backend/app/curator_helpers.py` — 후보 중복 제거, 피처 계산, 맥락 추출
기존 `main.py`는 라우터 등록과 앱 조립만 담당(목표 ~300줄).
---
## 5. agent-office `lotto` 에이전트
### 5.1 파일 구조
```
agent-office/app/
agents/lotto.py # LottoAgent (BaseAgent 상속)
curator/
__init__.py
pipeline.py # curate_weekly() 메인 플로우
prompt.py # system prompt + 출력 스키마 정의
schema.py # pydantic 응답 모델 + 검증
service.py # lotto-backend 호출 래퍼 (httpx)
```
`service_proxy.py``lotto_candidates()`, `lotto_context()`, `lotto_save_briefing()` 메서드 추가.
### 5.2 태스크 타입
- `curate_weekly` — 자동/수동 공통. 파라미터 없음(draw_no 자동 계산).
### 5.3 큐레이터 규칙 (system prompt 요지)
```
당신은 로또 번호 큐레이터입니다. 후보 20세트 중 5세트를 다음 규칙으로 선별합니다.
선별 규칙:
- 5세트의 리스크 분포: 안정 2 · 균형 2 · 공격 1 (유연 ±1)
- 홀짝 비율, 저/고 구간, 연속번호 포함 여부가 세트끼리 겹치지 않도록 다양성 확보
- hot_number_count와 cold_number_count 모두 0인 세트는 최소 1개
- 후보 외 번호 사용 절대 금지
- 각 세트 reason은 40자 이내 한 줄 (해당 세트 피처와 context 값만 근거)
출력은 반드시 아래 JSON 스키마로만:
{
"picks": [
{"numbers":[...], "risk_tag":"안정"|"균형"|"공격", "reason":"..."}
],
"narrative": {
"headline": "...",
"summary_3lines": ["...","...","..."],
"hot_cold_comment": "...",
"warnings": "..." // 없으면 빈 문자열
},
"confidence": 0-100
}
```
### 5.4 파이프라인 의사코드
```python
async def curate_weekly(draw_no: int) -> dict:
candidates = await service.lotto_candidates(n=20)
context = await service.lotto_context()
prompt = build_prompt(candidates, context, draw_no)
result, usage = await call_claude(prompt, model=LOTTO_CURATOR_MODEL)
parsed = validate(result) # 실패 시 1회 재시도
if parsed is None:
raise CuratorError("schema validation failed after retry")
await service.lotto_save_briefing({
"draw_no": draw_no,
"picks": parsed.picks,
"narrative": parsed.narrative,
"confidence": parsed.confidence,
"model": LOTTO_CURATOR_MODEL,
"tokens_input": usage.input,
"tokens_output": usage.output,
"cache_read": usage.cache_read,
"cache_write": usage.cache_write,
"latency_ms": usage.latency_ms,
"source": "auto" | "manual",
})
return {"ok": True, "draw_no": draw_no, ...}
```
### 5.5 검증 로직 (`schema.py`)
- pydantic 모델로 형식 검증
- 번호 제약: 각 세트 정확히 6개 · 중복 없음 · 1~45 범위
- 세트 수: 정확히 5
- 번호가 **candidates 내에 존재하는 조합인지** 대조 (환각 차단)
- risk_tag 분포가 규칙에서 ±1 이상 벗어나면 경고 로그(차단은 안 함)
- 실패 시 errors 리스트 담아 1회 재시도(프롬프트에 에러 피드백 포함)
### 5.6 스케줄러
`scheduler.py`에 추가:
```python
scheduler.add_job(_run_lotto_curate, "cron", day_of_week="mon", hour=7, minute=0, id="lotto_curate")
```
### 5.7 상태 표시
agent-office 메인 UI에 lotto 에이전트 카드가 추가되어 `idle` / `working` / `error` 상태 실시간 표시(기존 BaseAgent 패턴).
---
## 6. Frontend (web-ui)
### 6.1 새 탭 구조
```
Lotto
├─ 🗓 이번 주 브리핑 (기본)
├─ 📊 분석·통계
└─ 💰 구매·성과
```
`Functions.jsx` 460줄 → 탭 라우터 ~80줄로 축소. 각 탭은 `pages/lotto/tabs/BriefingTab.jsx`, `AnalysisTab.jsx`, `PurchaseTab.jsx`.
### 6.2 신규 컴포넌트 (`components/briefing/`)
- **BriefingHeader.jsx** — 회차 번호, 생성 시각, 신뢰도 바, 재생성 버튼, **사용 토큰 칩**(`42K in · 1.2K out · $0.18`)
- **BriefingSummary.jsx** — 3줄 요약 + 핫/콜드 블록 + 주의사항
- **PickSetCard.jsx** — 6볼 + risk 뱃지(🟢안정/🟡균형/🔴공격) + reason + "구매 기록" CTA
- **BriefingEmpty.jsx** — 브리핑 없을 때 placeholder + "지금 생성" 버튼
- **CuratorUsageFooter.jsx** — 페이지 하단 mini 카드. 최근 30일 호출 수·토큰·추정 비용·캐시 히트율
### 6.3 훅
- **useBriefing.js**
- `GET /api/lotto/briefing/latest`
- `regenerate()`: `POST /api/agent-office/command {agent:"lotto", action:"curate_now"}` → 3초 간격 최대 40회(=2분) 폴링으로 신규 briefing 확인
- 로딩/에러 상태 분리, 월요일 07:00 이후인데 브리핑 없으면 빈 상태 CTA
- **useCuratorUsage.js** — `GET /api/lotto/curator/usage?days=30`
### 6.4 기존 컴포넌트 처리
| 컴포넌트 | 조치 |
|---------|------|
| `FrequencyChart`, `MetricBlock`, `PersonalAnalysisPanel`, `ReportPanel` | 분석 탭으로 이동 |
| `PurchasePanel`, `PerformanceBanner` | 구매 탭으로 이동 |
| `CombinedRecommendPanel`, `ConfidenceRing` | 제거 후보 — 정리 패스에서 실제 참조 없으면 삭제 |
### 6.5 토큰·비용 노출 정책
- **브리핑 카드 헤더**: 이번 브리핑 1건의 in/out 토큰 + 추정 비용 (Sonnet 4.5 단가 기준 계산 — 상수로 프론트에 보유, `$3/$15 per 1M tokens`)
- **페이지 하단 푸터**: 최근 30일 누적 — 호출 수, 총 토큰, 추정 비용, 캐시 히트율
- **Agent Office 사이드**: 기존 `GET /api/agent-office/agents/lotto/token-usage` 자동 상속
### 6.6 모바일
브리핑 탭 세로 스택 기본. PickSetCard는 한 행 1카드 + 6볼 flex-wrap. 헤더 토큰 칩은 768px 이하에서 축약 표시(`$0.18`만).
---
## 7. 환경변수
| 변수 | 기본값 | 위치 |
|------|--------|------|
| `ANTHROPIC_API_KEY` | (없음) | agent-office (이미 존재) |
| `LOTTO_CURATOR_MODEL` | `claude-sonnet-4-5` | agent-office |
| `LOTTO_BACKEND_URL` | `http://lotto-backend:8000` | agent-office (service_proxy) |
---
## 8. 에러·폴백
| 상황 | 처리 |
|------|------|
| lotto-backend 후보 API 실패 | 에이전트 상태 `error` + 로그 + 슬랙/알림 없음(주 1회라 로그 충분) |
| Claude 호출 실패 | 1회 재시도 후 실패 시 error 저장, 기존 최신 브리핑 유지 |
| JSON 스키마 검증 실패 | 피드백 포함 1회 재시도 → 실패 시 error |
| 월요일 생성 자체가 누락 | 사용자가 웹에서 수동 재생성 버튼으로 보완 가능 |
---
## 9. 구현 순서
1. **Backend**: curator 엔드포인트 + briefing CRUD + 라우터 분리
2. **Agent-office**: lotto 에이전트 + curator pipeline + 월요일 스케줄러
3. **Frontend**: BriefingTab + 컴포넌트 + 훅 + 탭 재배치
4. **미사용 정리 패스**: 아래 "10. 정리 대상" 후보를 실제 참조 grep → 제거
---
## 10. 정리 대상 (최종 패스에서 검증 후 제거)
### Frontend
- `components/CombinedRecommendPanel.jsx`
- `components/ConfidenceRing.jsx`
- `Functions.jsx` 내 인라인 레이아웃 로직 (탭 분리 후 잔재)
### Backend
- `strategy_evolver.py` 중 실제 사용되지 않는 EMA 서브 함수
- 주간 리포트 관련 `weekly_reports` 테이블 — 브리핑이 대체하므로 드롭 후보
- `best_picks` 교체 로직 중 큐레이터 전환 후 사용 안 되는 경로
### DB 드롭 후보
- `weekly_reports` (브리핑이 대체)
- `simulation_candidates` (best_picks만 있으면 충분한지 사용처 grep 후 결정)
정리 패스는 **실제 import/참조 grep → 없으면 제거 → 테스트 → 커밋** 순서로 별도 커밋 분리.
---
## 11. 성공 기준
- 월요일 07:00 브리핑이 자동 생성되고, 웹 페이지 진입 1초 안에 5세트 + 3줄 요약이 보인다.
- 큐레이터는 candidates 내 세트만 선택한다(환각 0건).
- 브리핑 카드에 이번 건 토큰/비용, 페이지 하단에 30일 누적 사용량이 표시된다.
- 기존 난잡한 패널이 분석/구매 탭으로 정돈되고 브리핑 탭이 기본 진입점이다.
- 미사용 테이블·컴포넌트가 최종 정리 패스에서 제거된다.