18 Commits

Author SHA1 Message Date
f509339cbb fix(lotto-signals): draw_no 모든 source에 전달 (drift baseline 회차 가드 활성화)
light/sim source에서도 current_draw_no를 항상 fetch해 drift/confidence
메트릭의 회차 단위 중복 push 가드가 올바르게 동작하도록 수정.
lotto_latest_draw() 헬퍼를 service_proxy에 추가하고 run_signal_check에서
source에 무관하게 최신 회차를 먼저 조회; deep_check는 curate_weekly
반환값을 우선 사용.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 08:24:02 +09:00
e72a52a950 feat(lotto): /api/lotto/best에 5종 점수 array 노출 (agent-office sim_consensus 입력)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 08:21:48 +09:00
eecaefc26d docs(CLAUDE): agent-office 로또 능동 시그널 API/스케줄러/env 추가
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 03:21:51 +09:00
b3c0683364 feat(lotto-signals): GET signals/baselines + POST signal-check endpoint
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 03:20:08 +09:00
17321d948e feat(lotto-signals): urgent 텔레그램 발송 + throttle/cap + daily digest 발송 + baseline_mu/sigma 노출
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 03:13:29 +09:00
8552cbc184 feat(lotto-signals): 텔레그램 urgent/digest 메시지 포맷 2026-05-20 03:07:30 +09:00
b1c786e59d feat(lotto-signals): scheduler cron 4종 등록 (light/sim/deep/digest)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 03:04:14 +09:00
b885d02ac4 fix(tests): test_lotto_signal_runner DB_PATH 패치 (import order 안전)
db.DB_PATH = _TMP를 from app import db 직후에 주입해
타 테스트 파일이 app.db를 먼저 import해 DB_PATH가 동결된 경우에도
올바른 임시 경로를 사용하도록 수정.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 03:02:40 +09:00
b35fab777e feat(lotto-signals): LottoAgent.run_signal_check/run_daily_digest (텔레그램 X)
Phase 2: on_command에 signal_check/light_check/sim_check/deep_check/daily_digest 액션 추가.
run_signal_check는 lotto_signals DB INSERT만, run_daily_digest는 24h 발화 카운트 반환.
텔레그램 발송은 Task 9 (Phase 3)에서 추가 예정.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 02:54:09 +09:00
43081bea0e feat(lotto-signals): config env vars 7종 추가 (window/임계치/digest/throttle) 2026-05-20 02:51:28 +09:00
bebe5797e7 feat(lotto-signals): signal_runner orchestrator + service_proxy GET helpers
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 02:48:12 +09:00
9e1001b935 feat(lotto-signals): lotto_signals/lotto_baselines 테이블 + CRUD
agent-office DB에 lotto_signals, lotto_baselines 테이블 추가 및
insert/mark/query/upsert CRUD 헬퍼 함수 구현 (throttle, z-score, baseline 관리)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 02:43:27 +09:00
e5465ad136 fix(lotto-signals): pstdev→stdev (ddof=1 sample) + z=None contract 문서화
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 02:41:09 +09:00
21d46d95dd feat(lotto-signals): 메트릭 함수·adaptive baseline 순수함수 구현
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 02:38:33 +09:00
ac4a574ef2 test(lotto-signals): floating-point 임계치 보정 + import 정리 + decide_fire 분리
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 02:36:32 +09:00
c985d2c605 test(lotto-signals): 메트릭 함수·adaptive baseline 단위 테스트
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 02:32:10 +09:00
b4e873b5b0 docs(plan): LottoAgent 능동성 확장 구현 plan (12 tasks, Phase 1-3)
Why: spec (2026-05-20-lotto-active-agent-design.md)을 12개 atomic task
(TDD: 테스트→fail→구현→pass→commit)로 분해. 24h 가동 검증 task 포함.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 02:26:49 +09:00
6c5e93f64e docs(spec): LottoAgent 능동성 확장 설계 (능동 시그널·일일 요약)
Why: 매주 1회 무조건 큐레이션만 있는 현 구조를 다중 트리거+적응형
시그널 모니터링으로 확장. 좋은 수치(z≥1.5) 일 때만 텔레그램 보고.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 02:07:39 +09:00
17 changed files with 3131 additions and 10 deletions

View File

@@ -553,6 +553,11 @@ docker compose up -d
- `LOTTO_BACKEND_URL`: 기본 `http://lotto:8000`
- `LOTTO_CURATOR_MODEL`: 기본 `claude-sonnet-4-5`
- `YOUTUBE_DATA_API_KEY`: YouTube Data API v3 키 (미설정 시 YouTube trending 수집 skip)
- `LOTTO_SIGNAL_WINDOW`: baseline 윈도우 크기 (기본 8)
- `LOTTO_Z_NORMAL`: normal fire 임계치 (기본 1.5)
- `LOTTO_Z_URGENT`: urgent fire 임계치 (기본 2.5)
- `LOTTO_THROTTLE_HOURS`: 같은 메트릭 재발화 throttle (기본 6시간)
- `LOTTO_URGENT_DAILY_MAX`: urgent 하루 cap (기본 3통)
**YouTubeResearchAgent (`agents/youtube.py`)**
- `agent_id = "youtube"` — AGENT_REGISTRY에 등록
@@ -577,6 +582,10 @@ docker compose up -d
- ~~09:15 매일 — 청약 매칭 데일리 리포트~~ (Task 2026-04-28에서 폐기. realestate-lab의 push 트리거로 전환)
- 09:00 매일 — YouTube 트렌드 수집 (`youtube_research`) → music-lab `/api/music/market/ingest` push
- 매주 월요일 08:00 — YouTube 주간 리포트 텔레그램 발송 (`youtube_weekly_report`)
- 09:15 매일 — 로또 light_check (시뮬·전략 가중치 평가)
- 매 4시간 :15 — 로또 sim_check (00/04/08/12/16/20시)
- 일/수 21:15 — 로또 deep_check (큐레이션 후 confidence 포함 평가)
- 09:25 매일 — 로또 daily_digest (지난 24h 발화 텔레그램 1통)
**RealestateAgent (`agents/realestate.py`)**
- 진입점: `on_new_matches(matches: list[dict]) -> {sent, sent_ids, message_id}`
@@ -608,6 +617,9 @@ docker compose up -d
| GET | `/api/agent-office/conversation/stats` | 텔레그램 자연어 대화 토큰·캐시 통계 (`days` 필터) |
| POST | `/api/agent-office/youtube/research` | YouTube 트렌드 수집 수동 트리거 (body: `{countries: []}`) |
| GET | `/api/agent-office/youtube/research/status` | 마지막 수집 작업 상태 |
| GET | `/api/agent-office/lotto/signals?days=7` | 로또 능동 시그널 이력 (모든 fire_level) |
| GET | `/api/agent-office/lotto/baselines` | 로또 메트릭별 baseline μ/σ + 윈도우 상태 |
| POST | `/api/agent-office/lotto/signal-check?source=light` | 로또 시그널 평가 수동 트리거 (light/sim/deep) |
### personal (personal/)
- 개인 서비스 (포트폴리오 + 블로그 + 투두 통합)

View File

@@ -17,11 +17,136 @@ class LottoAgent(BaseAgent):
return await self._run(source="manual")
if action == "status":
return {"ok": True, "message": f"{self.state}: {self.state_detail}"}
if action in ("signal_check", "light_check", "sim_check", "deep_check"):
source = action.replace("_check", "") if action != "signal_check" else "light"
return await self.run_signal_check(source=source)
if action == "daily_digest":
return await self.run_daily_digest()
return {"ok": False, "message": f"unknown action: {action}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
pass
async def run_signal_check(self, source: str = "light") -> dict:
"""비-LLM 시그널 평가 (light/sim) 또는 deep_check (LLM 호출 후).
Phase 3 (Task 9): urgent 시그널 텔레그램 발송 + throttle/daily-cap 추가.
"""
from ..curator.signal_runner import run_signal_check
from ..config import LOTTO_Z_NORMAL, LOTTO_Z_URGENT
from ..db import add_log
if self.state not in ("idle", "reporting"):
return {"ok": False, "message": f"busy ({self.state})"}
try:
curate_result = None
# 회차 단위 메트릭(drift/confidence) 가드를 위해 항상 최신 회차 가져옴
from ..service_proxy import lotto_latest_draw
current_draw_no = await lotto_latest_draw()
if source == "deep":
from ..curator.pipeline import curate_weekly
cw = await curate_weekly(source="signal_deep")
# curate_weekly returns {"ok", "draw_no", "confidence", "tokens", "payload"}
curate_result = {"confidence": cw.get("confidence")}
# deep_check 시 curate_weekly가 반환하는 draw_no를 우선 사용 (직접 수집)
if cw.get("draw_no"):
current_draw_no = cw.get("draw_no")
outcome = await run_signal_check(
source=source,
z_normal=LOTTO_Z_NORMAL,
z_urgent=LOTTO_Z_URGENT,
curate_result=curate_result,
current_draw_no=current_draw_no,
)
add_log(
self.agent_id,
f"signal_check({source}) → overall={outcome['overall_fire']} results={len(outcome['results'])}",
)
# --- Throttle + 텔레그램 urgent 발송 ---
from ..config import LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX
from ..db import (
get_last_signal_notification, get_recent_urgent_count,
mark_signal_notified,
)
from ..notifiers.telegram_lotto import send_urgent_signal
if outcome["overall_fire"] == "urgent":
if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX:
add_log(
self.agent_id,
"urgent daily cap 도달 → normal로 강등 (digest 합류)",
level="warning",
)
else:
blocked = False
for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"):
last = get_last_signal_notification(
metric=r["metric"], fire_level=r["fire_level"],
hours=LOTTO_THROTTLE_HOURS,
)
if last:
blocked = True
break
if not blocked:
from datetime import datetime, timezone
event = {
"fire_level": "urgent",
"triggered_at": datetime.now(timezone.utc).isoformat(),
"results": outcome["results"],
}
await send_urgent_signal(event)
for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"):
mark_signal_notified(r["signal_id"])
add_log(self.agent_id, f"urgent 텔레그램 발송 완료 (시그널 {len(outcome['results'])}개 마킹)")
return {"ok": True, **outcome}
except Exception as e:
add_log(self.agent_id, f"signal_check 예외: {e}", level="error")
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_daily_digest(self) -> dict:
"""일일 요약 — 지난 24h normal/urgent 발화를 묶어 텔레그램 1통."""
from ..db import (
get_recent_lotto_signals, get_signals_history, add_log,
get_baseline,
)
from ..notifiers.telegram_lotto import send_signal_summary
sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
total_24h = get_signals_history(days=1)
evaluated = len(total_24h)
# weights_trend: drift_weights_cache의 prev/curr 차이
trend = {}
try:
cache = get_baseline("drift_weights_cache")
if cache and isinstance(cache["window_values"], list) and len(cache["window_values"]) >= 2:
prev_w = cache["window_values"][-2]
curr_w = cache["window_values"][-1]
trend = {
k: curr_w.get(k, 0.0) - prev_w.get(k, 0.0)
for k in (set(prev_w) | set(curr_w))
}
except Exception as e:
add_log(self.agent_id, f"weights_trend 계산 실패: {e}", level="warning")
digest = {
"evaluated": evaluated,
"fired": len(sigs),
"signals": sigs,
"weights_trend": trend,
}
await send_signal_summary(digest)
add_log(self.agent_id, f"daily_digest 발송: 평가 {evaluated} / 발화 {len(sigs)}")
return {"ok": True, **digest}
async def _run(self, source: str) -> dict:
task_id = create_task(self.agent_id, "curate_weekly", {"source": source})
await self.transition("working", "후보 수집 및 AI 큐레이션 중...", task_id)

View File

@@ -29,3 +29,12 @@ CORS_ALLOW_ORIGINS = os.getenv(
# Lotto Curator
LOTTO_BACKEND_URL = os.getenv("LOTTO_BACKEND_URL", "http://lotto:8000")
LOTTO_CURATOR_MODEL = os.getenv("LOTTO_CURATOR_MODEL", "claude-sonnet-4-5")
# Lotto Active Signals
LOTTO_SIGNAL_WINDOW = int(os.getenv("LOTTO_SIGNAL_WINDOW", "8"))
LOTTO_Z_NORMAL = float(os.getenv("LOTTO_Z_NORMAL", "1.5"))
LOTTO_Z_URGENT = float(os.getenv("LOTTO_Z_URGENT", "2.5"))
LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9"))
LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25"))
LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6"))
LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3"))

View File

@@ -0,0 +1,185 @@
"""LottoAgent 능동 시그널 — DB I/O + cron 진입점 + 평가 orchestration."""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from .. import db
from .. import service_proxy
from . import signals
logger = logging.getLogger("agent-office.lotto-signals")
# 회차 단위 메트릭 (window push 시 last_pushed_draw_no 비교)
DRAW_SCOPED_METRICS = {"drift", "confidence"}
def _load_baseline(metric: str) -> signals.AdaptiveBaseline:
row = db.get_baseline(metric)
if row is None:
return signals.AdaptiveBaseline(window=[], window_max=8)
return signals.AdaptiveBaseline(
window=list(row["window_values"]),
window_max=8,
last_pushed_draw_no=row.get("last_pushed_draw_no"),
)
def _save_baseline(metric: str, bl: signals.AdaptiveBaseline) -> None:
db.upsert_baseline(
metric=metric,
window_values=bl.window,
mu=bl.mu,
sigma=bl.sigma,
last_pushed_draw_no=bl.last_pushed_draw_no,
)
def evaluate_metric_and_persist(
source: str,
metric: str,
value: float,
draw_no: Optional[int],
z_normal: float,
z_urgent: float,
push_to_window: bool,
payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""단일 메트릭 평가 → lotto_signals INSERT → baseline 갱신.
회차 단위 메트릭(drift, confidence)은 같은 draw_no에서 window push 생략.
"""
bl = _load_baseline(metric)
# 회차 가드
do_push = push_to_window
if metric in DRAW_SCOPED_METRICS and draw_no is not None:
if bl.last_pushed_draw_no == draw_no:
do_push = False
# 평가는 push 전 baseline 기준
z, fire = bl.evaluate(value=value, z_normal=z_normal, z_urgent=z_urgent)
if do_push:
bl.push(value=value, draw_no=draw_no)
_save_baseline(metric, bl)
else:
# cold start에서도 baseline row를 만들어 두려면 upsert 필요
_save_baseline(metric, bl)
sid = db.insert_lotto_signal(
source=source,
metric=metric,
value=value,
baseline_mu=bl.mu if bl.size > 0 else None,
baseline_sigma=bl.sigma if bl.size >= 2 else None,
z_score=z,
fire_level=fire,
payload=payload,
)
return {
"signal_id": sid,
"metric": metric,
"value": value,
"baseline_mu": bl.mu if bl.size > 0 else None,
"baseline_sigma": bl.sigma if bl.size >= 2 else None,
"z_score": z,
"fire_level": fire,
"payload": payload or {},
}
# ---------- Service proxy thin wrappers (monkeypatch 대상) ----------
async def _fetch_best_picks() -> List[Dict[str, Any]]:
return await service_proxy.lotto_best()
async def _fetch_strategy_weights() -> Dict[str, float]:
return await service_proxy.lotto_strategy_weights()
# ---------- Orchestrator ----------
async def run_signal_check(
source: str,
z_normal: float = 1.5,
z_urgent: float = 2.5,
curate_result: Optional[Dict[str, Any]] = None,
current_draw_no: Optional[int] = None,
) -> Dict[str, Any]:
"""cron 진입점. source ∈ {'light', 'sim', 'deep'}.
light/sim: Sim Consensus + Strategy Drift 평가
deep: 위 2종 + Confidence (curate_result 필요)
"""
results: List[Dict[str, Any]] = []
# --- Sim Consensus ---
try:
best = await _fetch_best_picks()
v = signals.sim_consensus_score(best)
results.append(
evaluate_metric_and_persist(
source=source, metric="sim_signal",
value=v, draw_no=None,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"top_count": min(len(best), 10)},
)
)
except Exception as e:
logger.warning(f"sim_consensus 평가 실패: {e}")
# --- Strategy Drift (회차 단위) ---
try:
w_curr = await _fetch_strategy_weights()
# weights 캐시: lotto_baselines의 별도 metric 'drift_weights_cache'에 prev/curr 2개 보관
prev_payload_row = db.get_baseline("drift_weights_cache")
w_prev = prev_payload_row["window_values"] if prev_payload_row else None
if w_prev and isinstance(w_prev, list) and len(w_prev) > 0 and isinstance(w_prev[0], dict):
prev_dict = w_prev[-1]
drift_value = signals.strategy_drift_score(prev_dict, w_curr)
results.append(
evaluate_metric_and_persist(
source=source, metric="drift",
value=drift_value, draw_no=current_draw_no,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"weights_now": w_curr, "weights_prev": prev_dict},
)
)
# weights 캐시 갱신 (최대 2개 FIFO)
cache_window = (w_prev or []) + [w_curr]
if len(cache_window) > 2:
cache_window = cache_window[-2:]
db.upsert_baseline(
metric="drift_weights_cache",
window_values=cache_window,
mu=0.0, sigma=0.0,
last_pushed_draw_no=current_draw_no,
)
except Exception as e:
logger.warning(f"strategy_drift 평가 실패: {e}")
# --- Confidence (deep_check + curate_result 필수) ---
if source == "deep" and curate_result is not None:
try:
cv = signals.confidence_score(curate_result)
if cv is not None:
results.append(
evaluate_metric_and_persist(
source=source, metric="confidence",
value=cv, draw_no=current_draw_no,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"draw_no": current_draw_no},
)
)
except Exception as e:
logger.warning(f"confidence 평가 실패: {e}")
overall = signals.decide_overall_fire(
[{"metric": r["metric"], "z": r["z_score"], "fire": r["fire_level"]} for r in results]
)
return {"overall_fire": overall, "results": results}

View File

@@ -0,0 +1,150 @@
# agent-office/app/curator/signals.py
"""LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수).
DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list.
signal_runner.py에서 DB 연동 + cron 진입점 담당.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from statistics import mean, stdev
from typing import Any, Dict, List, Optional, Tuple
# ---------- Metric: Sim Consensus ----------
def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]:
"""20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수."""
if not picks:
return []
n_metrics = len(picks[0]["scores"])
columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)]
norms_per_col = []
for col in columns:
lo, hi = min(col), max(col)
rng = hi - lo
if rng == 0:
# 모두 0이면 0.0(기하평균 페널티), 모두 동일한 양수면 0.5(타이 처리)
fallback = 0.0 if lo == 0 else 0.5
norms_per_col.append([fallback] * len(col))
else:
norms_per_col.append([(v - lo) / rng for v in col])
return [
[norms_per_col[k][i] for k in range(n_metrics)]
for i in range(len(picks))
]
def _geomean(values: List[float]) -> float:
"""기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티)."""
if not values:
return 0.0
if any(v <= 0 for v in values):
return 0.0
log_sum = sum(math.log(v) for v in values)
return math.exp(log_sum / len(values))
def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float:
"""top-10 후보의 기하평균 consensus 평균."""
if not best_picks:
return 0.0
normalized = _normalize_columns(best_picks)
consensus = [_geomean(scores) for scores in normalized]
consensus.sort(reverse=True)
top = consensus[:10] if len(consensus) >= 10 else consensus
return mean(top) if top else 0.0
# ---------- Metric: Strategy Drift ----------
def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float:
"""가중치 변화 절댓값 합. 신규/소멸 전략도 가산."""
keys = set(prev) | set(curr)
return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys)
# ---------- Metric: Confidence ----------
def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]:
"""큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None."""
if "confidence" not in curate_result:
return None
v = float(curate_result["confidence"])
return max(0.0, min(1.0, v))
# ---------- Adaptive Baseline ----------
@dataclass
class AdaptiveBaseline:
window: List[float] = field(default_factory=list)
window_max: int = 8
last_pushed_draw_no: Optional[int] = None
@property
def size(self) -> int:
return len(self.window)
@property
def mu(self) -> float:
return mean(self.window) if self.window else 0.0
@property
def sigma(self) -> float:
return stdev(self.window) if len(self.window) >= 2 else 0.0
def push(self, value: float, draw_no: Optional[int] = None) -> None:
"""FIFO push. window_max 초과 시 가장 오래된 값 제거."""
self.window.append(float(value))
if len(self.window) > self.window_max:
self.window = self.window[-self.window_max:]
if draw_no is not None:
self.last_pushed_draw_no = draw_no
def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]:
"""z-score 계산 + fire_level 판정.
Returns:
(z_score, fire_level) — z_score는 cold start/warmup이면 None.
fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'}
NOTE: z_score is None when sigma==0 (degenerate window) or warmup.
Callers must treat None as "signal present but unquantified" — do not
compare None with thresholds directly.
"""
if self.size < 4:
return None, "warmup"
z_normal_eff = 2.0 if self.size < self.window_max else z_normal
z_urgent_eff = z_urgent
if self.sigma == 0:
return (None, "urgent") if value > self.mu else (None, "noop")
z = (value - self.mu) / self.sigma
if z >= z_urgent_eff:
return z, "urgent"
if z >= z_normal_eff:
return z, "normal"
return z, "noop"
# ---------- Combined fire decision ----------
def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str:
"""3종 시그널을 종합해 전체 fire_level 결정.
Args:
signal_results: [{"metric": str, "z": float|None, "fire": str}, ...]
Returns:
'noop' | 'normal' | 'urgent'
"""
fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")]
if any(s["fire"] == "urgent" for s in fires):
return "urgent"
if len(fires) >= 2:
return "urgent"
if len(fires) == 1:
return "normal"
return "noop"

View File

@@ -98,6 +98,39 @@ def init_db() -> None:
completed_at TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS lotto_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
triggered_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
source TEXT NOT NULL,
metric TEXT NOT NULL,
value REAL NOT NULL,
baseline_mu REAL,
baseline_sigma REAL,
z_score REAL,
fire_level TEXT NOT NULL,
notified_at TEXT,
payload TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_ls_triggered
ON lotto_signals(triggered_at DESC)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_ls_fire
ON lotto_signals(fire_level, notified_at)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS lotto_baselines (
metric TEXT PRIMARY KEY,
window_values TEXT NOT NULL DEFAULT '[]',
mu REAL NOT NULL DEFAULT 0.0,
sigma REAL NOT NULL DEFAULT 0.0,
last_pushed_draw_no INTEGER,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# Seed default agent configs
for agent_id, name in [
("stock", "주식 트레이더"),
@@ -556,3 +589,153 @@ def get_latest_youtube_research_job() -> Optional[Dict[str, Any]]:
"started_at": row["started_at"],
"completed_at": row["completed_at"],
}
# --- lotto_signals / lotto_baselines CRUD ---
def insert_lotto_signal(
source: str,
metric: str,
value: float,
baseline_mu: Optional[float],
baseline_sigma: Optional[float],
z_score: Optional[float],
fire_level: str,
payload: Optional[Dict[str, Any]] = None,
) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO lotto_signals
(source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, payload)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
source, metric, value,
baseline_mu, baseline_sigma, z_score, fire_level,
json.dumps(payload or {}, ensure_ascii=False),
),
)
return cur.lastrowid
def mark_signal_notified(signal_id: int) -> None:
with _conn() as conn:
conn.execute(
"UPDATE lotto_signals SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?",
(signal_id,),
)
def get_recent_lotto_signals(hours: int = 24, min_fire: str = "normal") -> List[Dict[str, Any]]:
"""지난 N시간 발화 시그널. min_fire='normal'이면 normal+urgent."""
levels = ("urgent",) if min_fire == "urgent" else ("normal", "urgent")
placeholders = ",".join("?" * len(levels))
with _conn() as conn:
rows = conn.execute(
f"""
SELECT * FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
AND fire_level IN ({placeholders})
ORDER BY triggered_at DESC
""",
(f"-{int(hours)} hours", *levels),
).fetchall()
return [dict(r) for r in rows]
def get_signals_history(days: int = 7) -> List[Dict[str, Any]]:
"""차트/이력 페이지용 — 모든 fire_level 포함."""
with _conn() as conn:
rows = conn.execute(
"""
SELECT * FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
ORDER BY triggered_at DESC
""",
(f"-{int(days)} days",),
).fetchall()
return [dict(r) for r in rows]
def get_recent_urgent_count(hours: int = 24) -> int:
with _conn() as conn:
row = conn.execute(
"""
SELECT COUNT(*) AS c FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
AND fire_level = 'urgent'
AND notified_at IS NOT NULL
""",
(f"-{int(hours)} hours",),
).fetchone()
return int(row["c"]) if row else 0
def get_last_signal_notification(metric: str, fire_level: str, hours: int) -> Optional[str]:
"""같은 metric+fire_level이 hours 내에 알림 발송된 마지막 시각. throttle용."""
with _conn() as conn:
row = conn.execute(
"""
SELECT notified_at FROM lotto_signals
WHERE metric = ?
AND fire_level = ?
AND notified_at IS NOT NULL
AND notified_at >= datetime('now', ?)
ORDER BY notified_at DESC LIMIT 1
""",
(metric, fire_level, f"-{int(hours)} hours"),
).fetchone()
return row["notified_at"] if row else None
def get_baseline(metric: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM lotto_baselines WHERE metric = ?",
(metric,),
).fetchone()
if not row:
return None
d = dict(row)
d["window_values"] = json.loads(d["window_values"])
return d
def upsert_baseline(
metric: str,
window_values: List[float],
mu: float,
sigma: float,
last_pushed_draw_no: Optional[int],
) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO lotto_baselines
(metric, window_values, mu, sigma, last_pushed_draw_no, updated_at)
VALUES (?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
ON CONFLICT(metric) DO UPDATE SET
window_values = excluded.window_values,
mu = excluded.mu,
sigma = excluded.sigma,
last_pushed_draw_no = excluded.last_pushed_draw_no,
updated_at = excluded.updated_at
""",
(
metric,
json.dumps(window_values),
mu, sigma, last_pushed_draw_no,
),
)
def get_all_baselines() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM lotto_baselines ORDER BY metric").fetchall()
out = []
for r in rows:
d = dict(r)
d["window_values"] = json.loads(d["window_values"])
out.append(d)
return out

View File

@@ -227,3 +227,30 @@ def youtube_research_status():
if not job:
return {"status": "never_run"}
return job
# --- Lotto Signal Endpoints ---
@app.get("/api/agent-office/lotto/signals")
async def list_lotto_signals(days: int = 7):
"""시그널 이력 (모든 fire_level)."""
from .db import get_signals_history
return {"items": get_signals_history(days=days)}
@app.get("/api/agent-office/lotto/baselines")
async def list_lotto_baselines():
"""현재 baseline μ/σ + window 상태."""
from .db import get_all_baselines
return {"items": get_all_baselines()}
@app.post("/api/agent-office/lotto/signal-check")
async def trigger_signal_check(source: str = "light"):
"""수동 트리거 (디버그·테스트용). source ∈ {light, sim, deep}."""
if source not in ("light", "sim", "deep"):
raise HTTPException(status_code=400, detail="source must be light/sim/deep")
agent = AGENT_REGISTRY.get("lotto")
if not agent:
raise HTTPException(status_code=503, detail="lotto agent not registered")
return await agent.run_signal_check(source=source)

View File

@@ -59,3 +59,103 @@ async def send_prize_alert(event: Dict[str, Any]) -> None:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] prize alert send failed: {e}")
# ---------- 능동 시그널 알림 (urgent + digest) ----------
_METRIC_LABEL = {
"sim_signal": "Sim Consensus",
"drift": "Strategy Drift",
"confidence": "Confidence",
}
def _format_urgent_signal(event: Dict[str, Any]) -> str:
"""긴급 시그널 텔레그램 메시지 포맷."""
triggered = event.get("triggered_at", "")[:19].replace("T", " ")
results = event.get("results", [])
fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")]
lines = [
"🚨 로또 능동 신호",
"",
f"[{triggered}]",
f"강한 시그널 {len(fired)}종 발화:",
]
for r in fired:
label = _METRIC_LABEL.get(r["metric"], r["metric"])
v = r.get("value")
mu = r.get("baseline_mu")
sigma = r.get("baseline_sigma")
z = r.get("z_score")
v_text = f"{v:.2f}" if v is not None else "N/A"
if mu is not None and sigma is not None and z is not None:
lines.append(f"{label} {v_text} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}")
else:
lines.append(f"{label} {v_text}")
# drift 페이로드 — 어떤 전략이 변동했는지 한 줄
for r in fired:
if r["metric"] == "drift":
wn = (r.get("payload") or {}).get("weights_now") or {}
wp = (r.get("payload") or {}).get("weights_prev") or {}
if wn and wp:
diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))}
top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2]
detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top)
lines.append("")
lines.append(f"요인: {detail}")
break
lines.append("")
lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)")
return "\n".join(lines)
def _format_signal_digest(digest: Dict[str, Any]) -> str:
"""일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호)."""
fired = int(digest.get("fired", 0))
if fired == 0:
return ""
signals_list = digest.get("signals", [])
evaluated = digest.get("evaluated", 0)
lines = [
"📊 로또 일일 요약 (지난 24h)",
"",
f"평가 {evaluated}회 / 발화 {fired}",
]
for s in signals_list:
label = _METRIC_LABEL.get(s["metric"], s["metric"])
z = s.get("z_score")
when = (s.get("triggered_at") or "")[11:16] # HH:MM
z_text = f"z={z:.1f}" if z is not None else "z=-"
lines.append(f"{label:14s} {s['fire_level']:6s} {z_text} ({when})")
weights_trend = digest.get("weights_trend") or {}
if weights_trend:
lines += ["", "전략 가중치 추세 (최근 8회 baseline):"]
for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])):
arrow = "" if delta > 0.01 else ("" if delta < -0.01 else "")
lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%")
return "\n".join(lines)
async def send_urgent_signal(event: Dict[str, Any]) -> None:
text = _format_urgent_signal(event)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] urgent signal send failed: {e}")
async def send_signal_summary(digest: Dict[str, Any]) -> None:
text = _format_signal_digest(digest)
if not text:
return # 발화 0건이면 발송 skip
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] digest send failed: {e}")

View File

@@ -36,6 +36,26 @@ async def _run_lotto_schedule():
if agent:
await agent.on_schedule()
async def _run_lotto_light_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="light")
async def _run_lotto_sim_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="sim")
async def _run_lotto_deep_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="deep")
async def _run_lotto_daily_digest():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_daily_digest()
async def _run_youtube_research():
agent = AGENT_REGISTRY.get("youtube")
if agent:
@@ -73,6 +93,10 @@ def init_scheduler():
# 09:00 cron 스태거링 — Celeron 2C/2.0GHz에서 동시 실행 시 CPU 폭주 (CHECK_POINT FU-A)
scheduler.add_job(_run_insta_trends_collect, "cron", hour=9, minute=0, id="insta_trends_collect")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=5, id="lotto_curate")
scheduler.add_job(_run_lotto_light_check, "cron", hour=9, minute=15, id="lotto_light_check")
scheduler.add_job(_run_lotto_sim_check, "cron", minute=15, hour="0,4,8,12,16,20", id="lotto_sim_check")
scheduler.add_job(_run_lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check")
scheduler.add_job(_run_lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=10, id="youtube_research")
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")
scheduler.add_job(_poll_pipelines, "interval", seconds=30, id="pipeline_poll")

View File

@@ -338,3 +338,42 @@ async def lookup_pipeline_by_msg(msg_id: int) -> Optional[dict]:
if resp.status_code == 200:
return resp.json()
return None
async def lotto_best() -> List[Dict[str, Any]]:
"""GET /api/lotto/best — best_picks 20개 (numbers + scores 5종)."""
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/best")
resp.raise_for_status()
data = resp.json()
items = data.get("items") if isinstance(data, dict) else data
return items or []
async def lotto_strategy_weights() -> Dict[str, float]:
"""GET /api/lotto/strategy/weights — 전략별 가중치 dict."""
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/strategy/weights")
resp.raise_for_status()
data = resp.json()
weights = data.get("weights") if isinstance(data, dict) else data
if isinstance(weights, list):
return {item["strategy"]: float(item["weight"]) for item in weights}
return {k: float(v) for k, v in (weights or {}).items()}
async def lotto_latest_draw() -> Optional[int]:
"""GET /api/lotto/latest — 최신 회차 번호만 반환."""
from .config import LOTTO_BACKEND_URL
try:
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/latest")
resp.raise_for_status()
data = resp.json()
# /api/lotto/latest 응답 키: {"drawNo": N, ...}
# 하위 호환을 위해 drawNo, draw_no, drwNo, draw 순서로 시도
for key in ("drawNo", "draw_no", "drwNo", "draw"):
if isinstance(data, dict) and data.get(key):
return int(data[key])
return None
except Exception:
return None

View File

@@ -0,0 +1,116 @@
import gc
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from app.curator import signal_runner
from app import db
db.DB_PATH = _TMP # patch frozen module-level DB_PATH (import order safety)
@pytest.fixture(autouse=True)
def fresh_db():
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
db.init_db()
yield
gc.collect()
if os.path.exists(_TMP):
try:
os.remove(_TMP)
except PermissionError:
pass # Windows: WAL-mode file locked; DB is ephemeral anyway
def test_evaluate_and_persist_cold_start():
"""첫 호출은 warmup으로 기록되고 baseline에 값이 들어간다."""
result = signal_runner.evaluate_metric_and_persist(
source="light",
metric="sim_signal",
value=1.5,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
assert result["fire_level"] == "warmup"
assert result["z_score"] is None
bl = db.get_baseline("sim_signal")
assert bl is not None
assert bl["window_values"] == [1.5]
def test_evaluate_after_window_filled_normal_fire():
"""8회 push 후 정상 운영, 평균 대비 z≥1.5면 normal."""
for v in [1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0]:
signal_runner.evaluate_metric_and_persist(
source="sim",
metric="sim_signal",
value=v,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
result = signal_runner.evaluate_metric_and_persist(
source="sim",
metric="sim_signal",
value=1.12,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
assert result["fire_level"] in ("normal", "urgent")
assert result["z_score"] is not None and result["z_score"] >= 1.5
def test_evaluate_drift_skips_same_draw_push():
"""drift는 회차 단위. 같은 회차에서 두 번 호출하면 두 번째는 window push X."""
signal_runner.evaluate_metric_and_persist(
source="sim", metric="drift", value=0.05, draw_no=1100,
z_normal=1.5, z_urgent=2.5, push_to_window=True,
)
bl_before = db.get_baseline("drift")
assert bl_before["window_values"] == [0.05]
assert bl_before["last_pushed_draw_no"] == 1100
signal_runner.evaluate_metric_and_persist(
source="sim", metric="drift", value=0.08, draw_no=1100,
z_normal=1.5, z_urgent=2.5, push_to_window=True,
)
bl_after = db.get_baseline("drift")
assert bl_after["window_values"] == [0.05]
@pytest.mark.asyncio
async def test_run_signal_check_aggregates_three_metrics(monkeypatch):
"""run_signal_check이 3종 메트릭 모두 평가하고 overall fire를 반환."""
async def fake_lotto_best():
return [{"numbers": [1,2,3,4,5,6], "scores": [10,10,10,10,10]}] * 20
async def fake_lotto_strategy_weights():
return {"gap_focus": 0.4, "hot_focus": 0.3, "pair_bias": 0.3}
monkeypatch.setattr(signal_runner, "_fetch_best_picks", fake_lotto_best)
monkeypatch.setattr(signal_runner, "_fetch_strategy_weights", fake_lotto_strategy_weights)
out = await signal_runner.run_signal_check(source="light", curate_result=None, current_draw_no=1101)
assert "overall_fire" in out
assert "results" in out
assert any(r["metric"] == "sim_signal" for r in out["results"])
# light_check는 confidence 평가 안 함
assert not any(r["metric"] == "confidence" for r in out["results"])

View File

@@ -0,0 +1,130 @@
# agent-office/tests/test_lotto_signals.py
import pytest
from app.curator import signals
def test_sim_consensus_top10_geomean():
"""top-10 consensus 평균이 기하평균 기반인지."""
best_picks = [
{"scores": [10, 10, 10, 10, 10]}, # high & uniform
{"scores": [9, 9, 9, 9, 9]},
{"scores": [8, 8, 8, 8, 8]},
{"scores": [7, 7, 7, 7, 7]},
{"scores": [6, 6, 6, 6, 6]},
{"scores": [5, 5, 5, 5, 5]},
{"scores": [4, 4, 4, 4, 4]},
{"scores": [3, 3, 3, 3, 3]},
{"scores": [2, 2, 2, 2, 2]},
{"scores": [1, 1, 1, 1, 1]}, # top 10
{"scores": [0, 0, 0, 0, 0]}, # bottom 10
] * 1 + [{"scores": [0, 0, 0, 0, 0]}] * 10
result = signals.sim_consensus_score(best_picks)
assert 0.0 <= result <= 1.0
assert result > 0.4
def test_sim_consensus_geomean_penalizes_imbalance():
"""5종 중 한 종만 폭주하는 outlier 후보는 균형 후보보다 작아야 한다."""
balanced = [{"scores": [5, 5, 5, 5, 5]}] * 20
imbalanced = [{"scores": [25, 0, 0, 0, 0]}] * 20
s_balanced = signals.sim_consensus_score(balanced)
s_imbalanced = signals.sim_consensus_score(imbalanced)
assert s_imbalanced < s_balanced
def test_strategy_drift_score():
"""drift = 전략별 가중치 변화 절댓값 합."""
w_prev = {"gap_focus": 0.30, "hot_focus": 0.25, "pair_bias": 0.45}
w_curr = {"gap_focus": 0.40, "hot_focus": 0.20, "pair_bias": 0.40}
result = signals.strategy_drift_score(w_prev, w_curr)
assert abs(result - 0.20) < 1e-9
def test_strategy_drift_new_strategy_appears():
"""이전에 없던 전략이 등장하면 그 가중치 전체가 drift에 가산."""
w_prev = {"gap_focus": 0.5, "hot_focus": 0.5}
w_curr = {"gap_focus": 0.4, "hot_focus": 0.4, "newbie": 0.2}
result = signals.strategy_drift_score(w_prev, w_curr)
assert abs(result - 0.4) < 1e-9
def test_confidence_score_passthrough():
"""confidence는 큐레이션 결과의 값 그대로 (0~1 clamp 확인)."""
assert signals.confidence_score({"confidence": 0.85}) == 0.85
assert signals.confidence_score({"confidence": 1.2}) == 1.0
assert signals.confidence_score({"confidence": -0.1}) == 0.0
assert signals.confidence_score({}) is None
def test_adaptive_baseline_cold_start():
"""window 크기 < 4 → warmup, z=None."""
bl = signals.AdaptiveBaseline(window=[1.0, 1.1, 0.9], window_max=8)
z, fire = bl.evaluate(value=1.5, z_normal=1.5, z_urgent=2.5)
assert fire == "warmup"
assert z is None
def test_adaptive_baseline_preparing():
"""window 4~7 → 보수적 임계치 z=2.0."""
bl = signals.AdaptiveBaseline(window=[1.0, 1.0, 1.0, 1.0], window_max=8)
z, fire = bl.evaluate(value=3.0, z_normal=1.5, z_urgent=2.5)
assert fire in ("normal", "urgent")
def test_adaptive_baseline_normal_window_full():
"""window 8 풀, value가 평균보다 1.5σ 이상이면 normal."""
bl = signals.AdaptiveBaseline(
window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0],
window_max=8,
)
z, fire = bl.evaluate(value=1.12, z_normal=1.5, z_urgent=2.5)
assert fire == "normal"
assert z is not None and z >= 1.5
def test_adaptive_baseline_urgent():
"""z >= 2.5 → urgent."""
bl = signals.AdaptiveBaseline(
window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0],
window_max=8,
)
z, fire = bl.evaluate(value=2.0, z_normal=1.5, z_urgent=2.5)
assert fire == "urgent"
def test_adaptive_baseline_push_updates_window():
"""push 시 FIFO 동작."""
bl = signals.AdaptiveBaseline(window=[1, 2, 3, 4, 5, 6, 7, 8], window_max=8)
bl.push(9.0)
assert bl.window == [2, 3, 4, 5, 6, 7, 8, 9.0]
def test_decide_fire_level_two_normals_escalate():
sigs = [
{"metric": "sim", "z": 1.6, "fire": "normal"},
{"metric": "drift", "z": 1.7, "fire": "normal"},
{"metric": "conf", "z": 0.5, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs) == "urgent"
def test_decide_fire_level_single_normal():
sigs = [
{"metric": "sim", "z": 1.6, "fire": "normal"},
{"metric": "drift", "z": 0.3, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs) == "normal"
def test_decide_fire_level_single_urgent():
sigs = [
{"metric": "sim", "z": 3.0, "fire": "urgent"},
{"metric": "drift", "z": 0.2, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs) == "urgent"
def test_decide_fire_level_all_noop():
sigs = [{"metric": "sim", "z": 0.5, "fire": "noop"}]
assert signals.decide_overall_fire(sigs) == "noop"

View File

@@ -0,0 +1,49 @@
from app.notifiers.telegram_lotto import (
_format_urgent_signal,
_format_signal_digest,
)
def test_urgent_signal_format_basic():
event = {
"fire_level": "urgent",
"triggered_at": "2026-05-20T07:18:00.000Z",
"results": [
{"metric": "sim_signal", "value": 1.84, "z_score": 3.9,
"baseline_mu": 1.02, "baseline_sigma": 0.21, "payload": {},
"fire_level": "urgent"},
{"metric": "drift", "value": 0.18, "z_score": 3.0,
"baseline_mu": 0.06, "baseline_sigma": 0.04, "fire_level": "normal",
"payload": {"weights_now": {"gap_focus": 0.5, "hot_focus": 0.5},
"weights_prev": {"gap_focus": 0.3, "hot_focus": 0.7}}},
],
}
text = _format_urgent_signal(event)
assert "🚨" in text
assert "Sim Consensus" in text
assert "z=3.9" in text
assert "Strategy Drift" in text
def test_signal_digest_format_with_signals():
digest = {
"evaluated": 6,
"fired": 2,
"signals": [
{"metric": "sim_signal", "fire_level": "normal", "z_score": 1.7,
"triggered_at": "2026-05-20T16:18:00Z", "payload": {}},
{"metric": "confidence", "fire_level": "normal", "z_score": 1.6,
"triggered_at": "2026-05-20T09:05:00Z", "payload": {}},
],
"weights_trend": {"gap_focus": +0.12, "hot_focus": -0.02, "pair_bias": -0.08},
}
text = _format_signal_digest(digest)
assert "📊" in text
assert "지난 24h" in text
assert "z=1.7" in text
def test_signal_digest_empty_returns_empty_string():
"""발화 0건이면 빈 문자열 → 발송 자체 skip 가능."""
text = _format_signal_digest({"evaluated": 6, "fired": 0, "signals": [], "weights_trend": {}})
assert text == ""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,301 @@
# LottoAgent 능동성 확장 설계
- **상태**: Draft (사용자 리뷰 대기)
- **작성일**: 2026-05-20
- **대상 컨테이너**: agent-office
- **영향 외부 도메인**: lotto-lab (read-only API 소비만)
---
## 1. 문제 정의
현재 LottoAgent는 매주 월요일 09:05 cron으로 무조건 큐레이션을 1회 실행하고 헤드라인을 텔레그램으로 푸시한다. "결과가 좋지 않은 회차"도 동일하게 발화되며, **정량적 시그널이 평소보다 강할 때 별도로 알리는 능동성**이 없다.
사용자 의도: 통계·시뮬레이션·전략 가중치를 에이전트가 스스로 모니터링하다가 "좋은 수치"가 나오면 능동적으로 보고하는 패턴.
## 2. 의사결정 요약
| 결정 사항 | 선택 | 비고 |
|---|---|---|
| 분석 주기 | 다중 트리거 혼합 | 매일 정기 + 시뮬레이션 후 + 회차 후 |
| 시그널 종류 | 3종 — Sim Consensus / Strategy Drift / Confidence | Hot/Cold 변화는 제외 (노이즈) |
| 알림 정책 | 일일 요약 + 긴급 즉시 | 2개 동시 발화 OR 단일 z≥2.5 → 긴급 |
| 임계치 전략 | 적응형 (최근 8회 μ + σ) | warmup·보수적 단계 포함 |
| 시뮬 강도 조절 (Layer B) | v1 미포함 | 운영 검증 후 v2에서 도입 검토 |
## 3. 아키텍처
### 3.1 컴포넌트 다이어그램
```
┌─────────────────────────────────────────────────────────────┐
│ agent-office │
│ │
│ cron (scheduler.py) │
│ ├─ lotto_light_check 매일 09:15 │
│ ├─ lotto_sim_check 4시간마다 :15 │
│ ├─ lotto_deep_check 일/수 21:15 │
│ ├─ lotto_daily_digest 매일 09:25 │
│ └─ lotto_curate 월요일 09:05 (기존 유지) │
│ ↓ │
│ curator/signals.py (신규) │
│ ├─ evaluate_sim_consensus() ← lotto_best API │
│ ├─ evaluate_strategy_drift() ← strategy/weights API │
│ ├─ evaluate_confidence() ← deep_check 시 큐레이션 결과 │
│ └─ adaptive_baseline() ← μ, σ 갱신 │
│ ↓ │
│ agent_office.db │
│ ├─ lotto_signals (이벤트 이력) │
│ └─ lotto_baselines (롤링 8회 윈도우) │
│ ↓ │
│ notifiers/telegram_lotto.py │
│ ├─ send_urgent_signal() ← 긴급 │
│ └─ send_signal_summary() ← 일일 요약 │
└─────────────────────────────────────────────────────────────┘
↑ (HTTP GET, 기존 lotto-lab API 재사용, 변경 없음)
lotto:8000
├─ /api/lotto/best
├─ /api/lotto/strategy/weights
└─ /api/lotto/curator/*
```
### 3.2 책임 경계
- **lotto-lab**: 변경 없음. 기존 GET API만 소비.
- **agent-office**: 능동 모니터링 layer 전부 담당. DB도 `agent_office.db` 안에 분리해서 lotto.db와 결합 없음.
- **프론트엔드**: Phase 4 별도 (web-ui repo). 본 spec 범위 밖.
## 4. 시그널 평가 로직
### 4.1 Sim Consensus Score
```
best_picks 20개의 점수 5종 (s1..s5) 사용
normalize(s_k) = (s_k - min_k) / (max_k - min_k) per metric across 20 picks
consensus_i = geomean( normalize(s1_i), ..., normalize(s5_i) )
sim_signal = mean( sorted(consensus_i, desc)[:10] )
```
- 기하평균: 5종 점수가 **동시에** 높을 때만 강한 시그널. 단일 폭주는 감쇠.
- top-10 평균: 전체 20개 분포에서 강한 후보군의 농도 측정.
### 4.2 Strategy Drift Score
```
drift_t = Σ | w_strategy_t - w_strategy_{t-1} | for each strategy in strategy_weights
```
- 회차 단위로 비교. 한 전략이 EMA로 큰 폭 이동했을 때 누적값이 큼.
- 시스템이 "지난 회차에서 의미 있게 학습한" 시그널.
### 4.3 Confidence Score
`curator.pipeline.curate_weekly()` 반환의 `validated.confidence` (0~1) 그대로.
- light_check / sim_check: N/A (LLM 호출 없음)
- deep_check: 직전 큐레이션 confidence를 baseline 윈도우에 push
### 4.4 Adaptive Baseline
```
lotto_baselines.window_values = [v_{t-7}, v_{t-6}, ..., v_t] (FIFO 8)
mu = mean(window_values)
sigma = stddev(window_values, ddof=1)
z_now = (v_now - mu) / sigma
```
- **Cold start**: window 크기 < 4 → fire_level='warmup', 발화 X
- **준비 단계**: window 4~7 → 임계치 z=2.0 (false positive 줄임)
- **정상 운영**: window 8 풀 → z_normal=1.5, z_urgent=2.5
### 4.5 Trigger × Metric 매트릭스
| Trigger | Sim Consensus | Strategy Drift | Confidence |
|---|---|---|---|
| `light_check` (매일 09:15) | ✓ 평가 | ✓ 회차 변경 시만 | — |
| `sim_check` (4h마다) | ✓ 평가 | ✓ 회차 변경 시만 | — |
| `deep_check` (일/수 21:15) | ✓ 평가 | ✓ 회차 변경 시만 | ✓ (큐레이션 후) |
| `lotto_curate` (월 09:05) | — | — | ✓ 큐레이션 결과 직접 push |
**회차 변경 가드**: Strategy Drift / Confidence는 **회차 단위 메트릭**. baseline 윈도우에 push할 때 `last_pushed_draw_no`를 비교, 동일 회차면 skip. 같은 회차 내에서 값 비교는 가능하지만 baseline 갱신은 회차당 1회만.
```
if metric in ('drift', 'confidence'):
if current_draw_no == baselines[metric].last_pushed_draw_no:
# baseline 윈도우는 그대로, z-score만 현재값으로 비교
skip_window_update = True
```
Sim Consensus는 회차 무관 (4시간마다 시뮬 자체가 갱신) → 매 평가 시 window push.
### 4.6 Fire 결정
```
fires = [m for m in [sim, drift, conf] if m.z >= LOTTO_Z_NORMAL]
if len(fires) >= 2 or any(m.z >= LOTTO_Z_URGENT for m in fires):
fire_level = 'urgent'
elif len(fires) == 1:
fire_level = 'normal'
else:
fire_level = 'noop'
```
## 5. 알림 흐름
### 5.1 트리거→발송 다이어그램
```
cron / signal_check
signals.evaluate_all()
lotto_signals INSERT (all results)
fire_level == 'urgent' → send_urgent_signal() → 텔레그램 즉시
fire_level == 'normal' → 09:25 digest 합류
fire_level == 'noop' → 기록만
```
### 5.2 텔레그램 메시지 폼
**Urgent**:
```
🚨 로또 능동 신호
[2026-05-20 16:18]
강한 시그널 2종 동시 발화:
• Sim Consensus 1.84 (μ=1.02, σ=0.21) z=3.9
• Strategy Drift 0.18 (μ=0.06, σ=0.04) z=3.0
요인: gap_focus 전략이 지난 3회차 EMA +22%p
다음 시뮬: 20:05
[자세히 보기] (→ /lotto/agent)
```
**Daily digest** (09:25):
```
📊 로또 일일 요약 (지난 24h)
평가 6회 / 발화 2회
• Sim Consensus normal z=1.7 (16:18)
• Confidence normal z=1.6 (월 09:05)
전략 가중치 추세 (최근 8회 baseline):
gap_focus ↑ +12%
hot_focus → -2%
pair_bias ↓ -8%
```
- 24h 내 발화 0건이면 digest 자체 skip (조용한 날 강제 알림 없음).
### 5.3 Throttle 규칙
| 규칙 | 동작 |
|---|---|
| 같은 metric + 같은 fire_level이 6시간 이내 재발화 | 두 번째는 DB 기록만, 텔레그램 skip |
| urgent 누적 ≥ 3통/day | 4번째부터 normal로 강등 → digest 합류 |
| digest 24h 발화 0건 | digest skip |
| Anthropic / 텔레그램 실패 | 평가는 success로 기록, 메시지만 60초 후 1회 retry |
## 6. 데이터 모델
### 6.1 lotto_signals
```sql
CREATE TABLE IF NOT EXISTS lotto_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
triggered_at TEXT NOT NULL, -- ISO8601 UTC
source TEXT NOT NULL, -- 'light' | 'sim' | 'deep'
metric TEXT NOT NULL, -- 'sim_signal' | 'drift' | 'confidence'
value REAL NOT NULL,
baseline_mu REAL,
baseline_sigma REAL,
z_score REAL,
fire_level TEXT NOT NULL, -- 'noop' | 'warmup' | 'normal' | 'urgent'
notified_at TEXT, -- 텔레그램 발송 시각 (NULL=미발송)
payload TEXT -- JSON 부가 정보
);
CREATE INDEX idx_ls_triggered ON lotto_signals(triggered_at DESC);
CREATE INDEX idx_ls_fire ON lotto_signals(fire_level, notified_at);
```
### 6.2 lotto_baselines
```sql
CREATE TABLE IF NOT EXISTS lotto_baselines (
metric TEXT PRIMARY KEY,
window_values TEXT NOT NULL, -- JSON: [v1..v8]
mu REAL NOT NULL,
sigma REAL NOT NULL,
last_pushed_draw_no INTEGER, -- 회차 단위 메트릭의 중복 push 방지 (drift, confidence)
updated_at TEXT NOT NULL
);
```
마이그레이션: `agent-office/app/db.py``init_db()``CREATE TABLE IF NOT EXISTS` 추가만으로 idempotent. 기존 테이블 영향 없음.
## 7. API 추가
| 메서드 | 경로 | 설명 |
|---|---|---|
| GET | `/api/agent-office/lotto/signals?days=7` | 시그널 이력 (timeline, 차트용) |
| GET | `/api/agent-office/lotto/baselines` | 현재 baseline μ/σ 조회 |
| POST | `/api/agent-office/lotto/signal-check` | 수동 트리거 (디버깅·테스트용) |
## 8. 환경 변수
```bash
LOTTO_SIGNAL_WINDOW=8 # baseline 윈도우 크기
LOTTO_Z_NORMAL=1.5 # normal fire 임계치
LOTTO_Z_URGENT=2.5 # urgent fire 임계치
LOTTO_DIGEST_HOUR=9 # digest cron hour (KST)
LOTTO_DIGEST_MIN=25
LOTTO_THROTTLE_HOURS=6 # 같은 메트릭 재발화 throttle
LOTTO_URGENT_DAILY_MAX=3 # urgent 하루 cap
```
모두 default 있음. `.env` 미설정 시 default로 동작.
## 9. 스케줄러 cron
```python
scheduler.add_job(lotto_light_check, "cron", hour=9, minute=15, id="lotto_light_check")
scheduler.add_job(lotto_sim_check, "cron", minute=15, hour="0,4,8,12,16,20", id="lotto_sim_check")
scheduler.add_job(lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check")
scheduler.add_job(lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest")
# 기존: lotto_curate (월 09:05) 유지
```
## 10. 구현 Phase
| Phase | 범위 | 검증 |
|---|---|---|
| 1 | DB 마이그레이션 + `signals.py` (순수 함수, LLM X) | `POST /lotto/signal-check`로 수동 호출 → z-score 계산 확인 |
| 2 | cron 4개 + `lotto_signals` INSERT (텔레그램 X) | 24h 가동 → DB에 시그널 누적 |
| 3 | 텔레그램 urgent / digest + throttle | dry-run env로 메시지 검증 후 실제 발송 |
| 4 | 프론트 (web-ui) — `/lotto/agent` 시그널 timeline UI | 별도 PR (본 spec 범위 외) |
Phase 1~3이 백엔드 능동성 완성. 각 Phase 끝에 commit + 자동 배포.
## 11. 비기능 요구
- **백워드 호환**: 기존 월요일 큐레이션 cron 변경 없음
- **장애 격리**: signals 평가 실패해도 LottoAgent.state는 idle 유지 (try/except + add_log warning)
- **테스트**: `signals.py`의 메트릭 함수는 input/output 순수형 → 단위 테스트 작성 가능
- **관측**: `agent_logs` 테이블 그대로 활용 (별도 로깅 추가 없음)
## 12. 비목표 (Out of scope)
- 자동 구매·자동 픽 갱신 (보고만)
- 시뮬레이션 강도 자동 조절 (Layer B는 v2)
- 텔레그램 인라인 키보드 (v2에서 자동 액션 도입 시 함께)
- 핫넘버/콜드넘버 시그널 (노이즈 위험, v1 제외)
- 프론트 UI (별도 PR)
## 13. v2 후속 검토
- Layer B 시뮬 강도 조절 (모호 시그널 시 deep_simulate)
- 사용자 피드백 루프 (텔레그램 [좋아요/별로] 버튼 → 임계치 가중 조정)
- 회차 retrospective 자동 분석 (당첨번호 vs 추천번호 패턴 학습)

View File

@@ -645,30 +645,49 @@ def replace_best_picks(
def get_best_picks(limit: int = 20) -> List[Dict[str, Any]]:
"""현재 활성화된 best_picks 조회 (점수 내림차순)"""
"""현재 활성화된 best_picks 조회 (점수 내림차순).
simulation_candidates와 LEFT JOIN하여 5종 점수 배열(scores)을 포함.
매칭 키: sc.run_id = bp.source_run_id AND sc.numbers = bp.numbers
LEFT JOIN 미매칭(NULL) 시 scores는 [0.0, 0.0, 0.0, 0.0, 0.0] 반환.
"""
with _conn() as conn:
rows = conn.execute(
"""
SELECT id, numbers, score_total, rank_in_run, source_run_id, based_on_draw, created_at
FROM best_picks
WHERE is_active = 1
ORDER BY score_total DESC
SELECT bp.id, bp.numbers, bp.score_total, bp.rank_in_run,
bp.source_run_id, bp.based_on_draw, bp.created_at,
sc.score_frequency, sc.score_fingerprint,
sc.score_gap, sc.score_cooccur, sc.score_diversity
FROM best_picks bp
LEFT JOIN simulation_candidates sc
ON sc.run_id = bp.source_run_id
AND sc.numbers = bp.numbers
WHERE bp.is_active = 1
ORDER BY bp.score_total DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [
{
result = []
for r in rows:
scores = [
float(r["score_frequency"] or 0.0),
float(r["score_fingerprint"] or 0.0),
float(r["score_gap"] or 0.0),
float(r["score_cooccur"] or 0.0),
float(r["score_diversity"] or 0.0),
]
result.append({
"id": int(r["id"]),
"numbers": json.loads(r["numbers"]),
"score_total": r["score_total"],
"scores": scores,
"rank_in_run": r["rank_in_run"],
"source_run_id": r["source_run_id"],
"based_on_draw": r["based_on_draw"],
"created_at": r["created_at"],
}
for r in rows
]
})
return result
def get_simulation_runs(limit: int = 10) -> List[Dict[str, Any]]:

View File

@@ -435,6 +435,7 @@ def api_best_picks(limit: int = 20):
"rank": p["rank_in_run"],
"numbers": nums,
"score_total": p["score_total"],
"scores": p["scores"],
"based_on_draw": p["based_on_draw"],
"simulation_run_id": p["source_run_id"],
"created_at": p["created_at"],