diff --git a/docs/superpowers/plans/2026-05-20-lotto-active-agent.md b/docs/superpowers/plans/2026-05-20-lotto-active-agent.md new file mode 100644 index 0000000..6262492 --- /dev/null +++ b/docs/superpowers/plans/2026-05-20-lotto-active-agent.md @@ -0,0 +1,1651 @@ +# LottoAgent 능동성 확장 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** LottoAgent에 다중 트리거 + 적응형 시그널 모니터링 + 텔레그램 능동 알림(긴급/일일 요약) 기능 추가 + +**Architecture:** agent-office 단일 컨테이너 안에 `curator/signals.py` 모듈 신설 + agent_office.db에 2개 테이블 추가. lotto-lab은 read-only GET API만 소비, 변경 없음. cron 4개로 light / sim / deep / digest 분리. + +**Tech Stack:** Python 3.12, FastAPI, APScheduler, SQLite, httpx, pytest, pytest-asyncio + +**Spec:** `docs/superpowers/specs/2026-05-20-lotto-active-agent-design.md` + +--- + +## File Structure + +| 경로 | 작업 | 책임 | +|---|---|---| +| `agent-office/app/curator/signals.py` | Create | 3종 메트릭 평가 + adaptive baseline (순수 함수) | +| `agent-office/app/curator/signal_runner.py` | Create | DB I/O + cron 진입점 + 알림 결정 | +| `agent-office/app/db.py` | Modify | `lotto_signals`, `lotto_baselines` 테이블 + CRUD | +| `agent-office/app/service_proxy.py` | Modify | `lotto_best()`, `lotto_strategy_weights()` 추가 | +| `agent-office/app/agents/lotto.py` | Modify | `on_signal_check()`, `on_daily_digest()` 추가 | +| `agent-office/app/scheduler.py` | Modify | cron 4개 등록 | +| `agent-office/app/notifiers/telegram_lotto.py` | Modify | `send_urgent_signal()`, `send_signal_summary()` | +| `agent-office/app/main.py` | Modify | endpoint 3개 | +| `agent-office/app/config.py` | Modify | env vars 7개 | +| `agent-office/tests/test_lotto_signals.py` | Create | signals.py 단위 테스트 | +| `agent-office/tests/test_lotto_signal_runner.py` | Create | signal_runner DB I/O 통합 테스트 | +| `agent-office/tests/test_lotto_telegram_signal.py` | Create | 텔레그램 메시지 포맷 테스트 | +| `web-backend/CLAUDE.md` | Modify | agent-office 섹션 갱신 | + +--- + +# Phase 1 — 순수 함수 + DB 마이그레이션 + +## Task 1: signals.py 메트릭 함수 단위 테스트 + +**Files:** +- Create: `agent-office/tests/test_lotto_signals.py` + +- [ ] **Step 1: Write failing tests for Sim Consensus** + +```python +# agent-office/tests/test_lotto_signals.py +import math +import pytest + +from app.curator import signals + + +def test_sim_consensus_top10_geomean(): + """top-10 consensus 평균이 기하평균 기반인지.""" + best_picks = [ + {"scores": [10, 10, 10, 10, 10]}, # high & uniform + {"scores": [9, 9, 9, 9, 9]}, + {"scores": [8, 8, 8, 8, 8]}, + {"scores": [7, 7, 7, 7, 7]}, + {"scores": [6, 6, 6, 6, 6]}, + {"scores": [5, 5, 5, 5, 5]}, + {"scores": [4, 4, 4, 4, 4]}, + {"scores": [3, 3, 3, 3, 3]}, + {"scores": [2, 2, 2, 2, 2]}, + {"scores": [1, 1, 1, 1, 1]}, # top 10 + {"scores": [0, 0, 0, 0, 0]}, # bottom 10 + ] * 1 + [{"scores": [0, 0, 0, 0, 0]}] * 10 + # 처음 10개가 top, 다 normalize 후 consensus 동일하니까 top10 평균은 명확 + result = signals.sim_consensus_score(best_picks) + assert 0.0 <= result <= 1.0 + # uniform 분포에선 top10 평균이 단조 감소 분포의 상반부 평균과 같아야 함 + assert result > 0.4 + + +def test_sim_consensus_geomean_penalizes_imbalance(): + """5종 중 한 종만 폭주하는 outlier 후보는 균형 후보보다 작아야 한다.""" + balanced = [{"scores": [5, 5, 5, 5, 5]}] * 20 + imbalanced = [{"scores": [25, 0, 0, 0, 0]}] * 20 # 평균은 같지만 한 점수만 폭주 + s_balanced = signals.sim_consensus_score(balanced) + s_imbalanced = signals.sim_consensus_score(imbalanced) + # imbalanced는 normalize 후 한 차원만 1.0, 나머지 0.0 → 기하평균 0 + assert s_imbalanced < s_balanced + + +def test_strategy_drift_score(): + """drift = 전략별 가중치 변화 절댓값 합.""" + w_prev = {"gap_focus": 0.30, "hot_focus": 0.25, "pair_bias": 0.45} + w_curr = {"gap_focus": 0.40, "hot_focus": 0.20, "pair_bias": 0.40} + # |0.10| + |-0.05| + |-0.05| = 0.20 + result = signals.strategy_drift_score(w_prev, w_curr) + assert abs(result - 0.20) < 1e-9 + + +def test_strategy_drift_new_strategy_appears(): + """이전에 없던 전략이 등장하면 그 가중치 전체가 drift에 가산.""" + w_prev = {"gap_focus": 0.5, "hot_focus": 0.5} + w_curr = {"gap_focus": 0.4, "hot_focus": 0.4, "newbie": 0.2} + # |0.5-0.4| + |0.5-0.4| + |0-0.2| = 0.4 + result = signals.strategy_drift_score(w_prev, w_curr) + assert abs(result - 0.4) < 1e-9 + + +def test_confidence_score_passthrough(): + """confidence는 큐레이션 결과의 값 그대로 (0~1 clamp 확인).""" + assert signals.confidence_score({"confidence": 0.85}) == 0.85 + assert signals.confidence_score({"confidence": 1.2}) == 1.0 # clamp + assert signals.confidence_score({"confidence": -0.1}) == 0.0 + assert signals.confidence_score({}) is None + + +def test_adaptive_baseline_cold_start(): + """window 크기 < 4 → warmup, z=None.""" + bl = signals.AdaptiveBaseline(window=[1.0, 1.1, 0.9], window_max=8) + z, fire = bl.evaluate(value=1.5, z_normal=1.5, z_urgent=2.5) + assert fire == "warmup" + assert z is None + + +def test_adaptive_baseline_preparing(): + """window 4~7 → 보수적 임계치 z=2.0.""" + bl = signals.AdaptiveBaseline(window=[1.0, 1.0, 1.0, 1.0], window_max=8) + # value=1.0 → mu=1.0, sigma=0 → div0 처리 필요. preparing 단계라 임계치는 보수적. + z, fire = bl.evaluate(value=3.0, z_normal=1.5, z_urgent=2.5) + # sigma=0인 경우 처리 — value > mu면 무한대로 보고 urgent? 또는 z계산 불가 → warmup? + # 결정: sigma == 0이면 (window 분산 없음) z=inf 취급, value > mu면 urgent로 + assert fire in ("normal", "urgent") + + +def test_adaptive_baseline_normal_window_full(): + """window 8 풀, value가 평균보다 1.5σ 이상이면 normal.""" + bl = signals.AdaptiveBaseline( + window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0], + window_max=8, + ) + # μ ≈ 1.0, σ ≈ 0.08 → z=1.5는 value ≈ 1.12 + z, fire = bl.evaluate(value=1.20, z_normal=1.5, z_urgent=2.5) + assert fire == "normal" + assert z is not None and z >= 1.5 + + +def test_adaptive_baseline_urgent(): + """z >= 2.5 → urgent.""" + bl = signals.AdaptiveBaseline( + window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0], + window_max=8, + ) + z, fire = bl.evaluate(value=2.0, z_normal=1.5, z_urgent=2.5) + assert fire == "urgent" + + +def test_adaptive_baseline_push_updates_window(): + """push 시 FIFO 동작.""" + bl = signals.AdaptiveBaseline(window=[1, 2, 3, 4, 5, 6, 7, 8], window_max=8) + bl.push(9.0) + assert bl.window == [2, 3, 4, 5, 6, 7, 8, 9.0] + + +def test_decide_fire_level_combination(): + """2개 이상 normal 발화 → urgent.""" + # 2개 normal z ≥ 1.5 + sigs = [ + {"metric": "sim", "z": 1.6, "fire": "normal"}, + {"metric": "drift", "z": 1.7, "fire": "normal"}, + {"metric": "conf", "z": 0.5, "fire": "noop"}, + ] + assert signals.decide_overall_fire(sigs) == "urgent" + + # 1개 normal + sigs2 = [ + {"metric": "sim", "z": 1.6, "fire": "normal"}, + {"metric": "drift", "z": 0.3, "fire": "noop"}, + ] + assert signals.decide_overall_fire(sigs2) == "normal" + + # 단일 극값 z ≥ 2.5 → urgent + sigs3 = [ + {"metric": "sim", "z": 3.0, "fire": "urgent"}, + {"metric": "drift", "z": 0.2, "fire": "noop"}, + ] + assert signals.decide_overall_fire(sigs3) == "urgent" + + # 전부 noop + sigs4 = [{"metric": "sim", "z": 0.5, "fire": "noop"}] + assert signals.decide_overall_fire(sigs4) == "noop" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd agent-office && pytest tests/test_lotto_signals.py -v` +Expected: FAIL with `ModuleNotFoundError: No module named 'app.curator.signals'` + +- [ ] **Step 3: Commit failing tests** + +```bash +git add agent-office/tests/test_lotto_signals.py +git commit -m "test(lotto-signals): 메트릭 함수·adaptive baseline 단위 테스트" +``` + +--- + +## Task 2: signals.py 구현 + +**Files:** +- Create: `agent-office/app/curator/signals.py` + +- [ ] **Step 1: Implement signals module** + +```python +# agent-office/app/curator/signals.py +"""LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수). + +DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list. +signal_runner.py에서 DB 연동 + cron 진입점 담당. +""" +from __future__ import annotations +import math +from dataclasses import dataclass, field +from statistics import mean, pstdev +from typing import Any, Dict, List, Optional, Tuple + + +# ---------- Metric: Sim Consensus ---------- + +def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]: + """20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수.""" + if not picks: + return [] + n_metrics = len(picks[0]["scores"]) + columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)] + norms_per_col = [] + for col in columns: + lo, hi = min(col), max(col) + rng = hi - lo + if rng == 0: + norms_per_col.append([0.5] * len(col)) # 모두 동일하면 중립 0.5 + else: + norms_per_col.append([(v - lo) / rng for v in col]) + # 전치: 후보별 정규화 점수 리스트 + return [ + [norms_per_col[k][i] for k in range(n_metrics)] + for i in range(len(picks)) + ] + + +def _geomean(values: List[float]) -> float: + """기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티).""" + if not values: + return 0.0 + if any(v <= 0 for v in values): + return 0.0 + log_sum = sum(math.log(v) for v in values) + return math.exp(log_sum / len(values)) + + +def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float: + """top-10 후보의 기하평균 consensus 평균. + + Args: + best_picks: [{"numbers": [...], "scores": [s1, s2, s3, s4, s5]}, ...] + """ + if not best_picks: + return 0.0 + normalized = _normalize_columns(best_picks) + consensus = [_geomean(scores) for scores in normalized] + consensus.sort(reverse=True) + top = consensus[:10] if len(consensus) >= 10 else consensus + return mean(top) if top else 0.0 + + +# ---------- Metric: Strategy Drift ---------- + +def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float: + """가중치 변화 절댓값 합. 신규/소멸 전략도 가산.""" + keys = set(prev) | set(curr) + return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys) + + +# ---------- Metric: Confidence ---------- + +def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]: + """큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None.""" + if "confidence" not in curate_result: + return None + v = float(curate_result["confidence"]) + return max(0.0, min(1.0, v)) + + +# ---------- Adaptive Baseline ---------- + +@dataclass +class AdaptiveBaseline: + window: List[float] = field(default_factory=list) + window_max: int = 8 + last_pushed_draw_no: Optional[int] = None # 회차 단위 메트릭 중복 push 방지 + + @property + def size(self) -> int: + return len(self.window) + + @property + def mu(self) -> float: + return mean(self.window) if self.window else 0.0 + + @property + def sigma(self) -> float: + # population stdev (8개 윈도우라 ddof=0이 안정) + return pstdev(self.window) if len(self.window) >= 2 else 0.0 + + def push(self, value: float, draw_no: Optional[int] = None) -> None: + """FIFO push. window_max 초과 시 가장 오래된 값 제거.""" + self.window.append(float(value)) + if len(self.window) > self.window_max: + self.window = self.window[-self.window_max:] + if draw_no is not None: + self.last_pushed_draw_no = draw_no + + def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]: + """z-score 계산 + fire_level 판정. + + Returns: + (z_score, fire_level) — z_score는 cold start/warmup이면 None. + fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'} + """ + if self.size < 4: + return None, "warmup" + + # 보수적 임계치 (window 4~7) + z_normal_eff = 2.0 if self.size < self.window_max else z_normal + z_urgent_eff = z_urgent + + if self.sigma == 0: + # 분산 없음 — value가 평균을 크게 벗어나면 urgent로 분류, 아니면 noop + return (None, "urgent") if value > self.mu else (None, "noop") + + z = (value - self.mu) / self.sigma + if z >= z_urgent_eff: + return z, "urgent" + if z >= z_normal_eff: + return z, "normal" + return z, "noop" + + +# ---------- Combined fire decision ---------- + +def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str: + """3종 시그널을 종합해 전체 fire_level 결정. + + Args: + signal_results: [{"metric": str, "z": float|None, "fire": str}, ...] + Returns: + 'noop' | 'normal' | 'urgent' + """ + fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")] + if any(s["fire"] == "urgent" for s in fires): + return "urgent" + if len(fires) >= 2: + return "urgent" # 2개 이상 normal → urgent로 승격 + if len(fires) == 1: + return "normal" + return "noop" +``` + +- [ ] **Step 2: Run tests to verify they pass** + +Run: `cd agent-office && pytest tests/test_lotto_signals.py -v` +Expected: All 9 tests PASS + +- [ ] **Step 3: Commit** + +```bash +git add agent-office/app/curator/signals.py +git commit -m "feat(lotto-signals): 메트릭 함수·adaptive baseline 순수함수 구현" +``` + +--- + +## Task 3: DB 마이그레이션 (lotto_signals + lotto_baselines) + +**Files:** +- Modify: `agent-office/app/db.py:19` (init_db 함수 내) + +- [ ] **Step 1: Add table creation to init_db** + +`init_db()` 함수의 마지막 `INSERT OR IGNORE` 루프 직전(line 100, `youtube_research_jobs` 테이블 뒤)에 다음 추가: + +```python + conn.execute(""" + CREATE TABLE IF NOT EXISTS lotto_signals ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + triggered_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), + source TEXT NOT NULL, + metric TEXT NOT NULL, + value REAL NOT NULL, + baseline_mu REAL, + baseline_sigma REAL, + z_score REAL, + fire_level TEXT NOT NULL, + notified_at TEXT, + payload TEXT + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_ls_triggered + ON lotto_signals(triggered_at DESC) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_ls_fire + ON lotto_signals(fire_level, notified_at) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS lotto_baselines ( + metric TEXT PRIMARY KEY, + window_values TEXT NOT NULL DEFAULT '[]', + mu REAL NOT NULL DEFAULT 0.0, + sigma REAL NOT NULL DEFAULT 0.0, + last_pushed_draw_no INTEGER, + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ) + """) +``` + +- [ ] **Step 2: Add CRUD helpers at end of db.py** + +`db.py` 파일 맨 아래에 추가: + +```python +# --- lotto_signals / lotto_baselines CRUD --- + +def insert_lotto_signal( + source: str, + metric: str, + value: float, + baseline_mu: Optional[float], + baseline_sigma: Optional[float], + z_score: Optional[float], + fire_level: str, + payload: Optional[Dict[str, Any]] = None, +) -> int: + with _conn() as conn: + cur = conn.execute( + """ + INSERT INTO lotto_signals + (source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, payload) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + source, metric, value, + baseline_mu, baseline_sigma, z_score, fire_level, + json.dumps(payload or {}, ensure_ascii=False), + ), + ) + return cur.lastrowid + + +def mark_signal_notified(signal_id: int) -> None: + with _conn() as conn: + conn.execute( + "UPDATE lotto_signals SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?", + (signal_id,), + ) + + +def get_recent_lotto_signals(hours: int = 24, min_fire: str = "normal") -> List[Dict[str, Any]]: + """지난 N시간 발화 시그널. min_fire='normal'이면 normal+urgent.""" + levels = ("urgent",) if min_fire == "urgent" else ("normal", "urgent") + placeholders = ",".join("?" * len(levels)) + with _conn() as conn: + rows = conn.execute( + f""" + SELECT * FROM lotto_signals + WHERE triggered_at >= datetime('now', ?) + AND fire_level IN ({placeholders}) + ORDER BY triggered_at DESC + """, + (f"-{int(hours)} hours", *levels), + ).fetchall() + return [dict(r) for r in rows] + + +def get_signals_history(days: int = 7) -> List[Dict[str, Any]]: + """차트/이력 페이지용 — 모든 fire_level 포함.""" + with _conn() as conn: + rows = conn.execute( + """ + SELECT * FROM lotto_signals + WHERE triggered_at >= datetime('now', ?) + ORDER BY triggered_at DESC + """, + (f"-{int(days)} days",), + ).fetchall() + return [dict(r) for r in rows] + + +def get_recent_urgent_count(hours: int = 24) -> int: + with _conn() as conn: + row = conn.execute( + """ + SELECT COUNT(*) AS c FROM lotto_signals + WHERE triggered_at >= datetime('now', ?) + AND fire_level = 'urgent' + AND notified_at IS NOT NULL + """, + (f"-{int(hours)} hours",), + ).fetchone() + return int(row["c"]) if row else 0 + + +def get_last_signal_notification(metric: str, fire_level: str, hours: int) -> Optional[str]: + """같은 metric+fire_level이 hours 내에 알림 발송된 마지막 시각. throttle용.""" + with _conn() as conn: + row = conn.execute( + """ + SELECT notified_at FROM lotto_signals + WHERE metric = ? + AND fire_level = ? + AND notified_at IS NOT NULL + AND notified_at >= datetime('now', ?) + ORDER BY notified_at DESC LIMIT 1 + """, + (metric, fire_level, f"-{int(hours)} hours"), + ).fetchone() + return row["notified_at"] if row else None + + +def get_baseline(metric: str) -> Optional[Dict[str, Any]]: + with _conn() as conn: + row = conn.execute( + "SELECT * FROM lotto_baselines WHERE metric = ?", + (metric,), + ).fetchone() + if not row: + return None + d = dict(row) + d["window_values"] = json.loads(d["window_values"]) + return d + + +def upsert_baseline( + metric: str, + window_values: List[float], + mu: float, + sigma: float, + last_pushed_draw_no: Optional[int], +) -> None: + with _conn() as conn: + conn.execute( + """ + INSERT INTO lotto_baselines + (metric, window_values, mu, sigma, last_pushed_draw_no, updated_at) + VALUES (?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now')) + ON CONFLICT(metric) DO UPDATE SET + window_values = excluded.window_values, + mu = excluded.mu, + sigma = excluded.sigma, + last_pushed_draw_no = excluded.last_pushed_draw_no, + updated_at = excluded.updated_at + """, + ( + metric, + json.dumps(window_values), + mu, sigma, last_pushed_draw_no, + ), + ) + + +def get_all_baselines() -> List[Dict[str, Any]]: + with _conn() as conn: + rows = conn.execute("SELECT * FROM lotto_baselines ORDER BY metric").fetchall() + out = [] + for r in rows: + d = dict(r) + d["window_values"] = json.loads(d["window_values"]) + out.append(d) + return out +``` + +- [ ] **Step 3: Smoke test the migration** + +새 임시 DB로 마이그레이션이 동작하는지: + +```bash +cd agent-office && python -c " +import os +os.environ['AGENT_OFFICE_DB_PATH'] = '/tmp/test_migration.db' +from app.db import init_db, insert_lotto_signal, get_baseline, upsert_baseline +init_db() +sid = insert_lotto_signal('light','sim_signal', 1.5, 1.0, 0.2, 2.5, 'urgent', {'top':[1,2,3]}) +print('signal id:', sid) +upsert_baseline('sim_signal', [1.0, 1.1, 0.9], 1.0, 0.1, None) +print('baseline:', get_baseline('sim_signal')) +" +rm /tmp/test_migration.db +``` +Expected: signal id가 1 출력, baseline dict 출력 (window_values는 list) + +- [ ] **Step 4: Commit** + +```bash +git add agent-office/app/db.py +git commit -m "feat(lotto-signals): lotto_signals/lotto_baselines 테이블 + CRUD" +``` + +--- + +## Task 4: signal_runner.py — DB I/O + 평가 orchestration + +**Files:** +- Create: `agent-office/app/curator/signal_runner.py` +- Create: `agent-office/tests/test_lotto_signal_runner.py` + +- [ ] **Step 1: Write failing test for signal_runner** + +```python +# agent-office/tests/test_lotto_signal_runner.py +import os +import pytest + +os.environ["AGENT_OFFICE_DB_PATH"] = "/tmp/test_signal_runner.db" + +from app.curator import signal_runner +from app import db + + +@pytest.fixture(autouse=True) +def fresh_db(): + if os.path.exists("/tmp/test_signal_runner.db"): + os.remove("/tmp/test_signal_runner.db") + db.init_db() + yield + if os.path.exists("/tmp/test_signal_runner.db"): + os.remove("/tmp/test_signal_runner.db") + + +def test_evaluate_and_persist_cold_start(): + """첫 호출은 warmup으로 기록되고 baseline은 비어있다.""" + result = signal_runner.evaluate_metric_and_persist( + source="light", + metric="sim_signal", + value=1.5, + draw_no=None, + z_normal=1.5, + z_urgent=2.5, + push_to_window=True, + ) + assert result["fire_level"] == "warmup" + assert result["z_score"] is None + + bl = db.get_baseline("sim_signal") + assert bl is not None + assert bl["window_values"] == [1.5] + + +def test_evaluate_after_window_filled_normal_fire(): + """8회 push 후 정상 운영, 평균 대비 z≥1.5면 normal.""" + for v in [1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0]: + signal_runner.evaluate_metric_and_persist( + source="sim", + metric="sim_signal", + value=v, + draw_no=None, + z_normal=1.5, + z_urgent=2.5, + push_to_window=True, + ) + + # 9번째: window는 [v2..v8, 1.0]이지만 push 후 평가. 평균≈1.0, σ≈0.08 → z 큰 값 + result = signal_runner.evaluate_metric_and_persist( + source="sim", + metric="sim_signal", + value=1.20, + draw_no=None, + z_normal=1.5, + z_urgent=2.5, + push_to_window=True, + ) + assert result["fire_level"] in ("normal", "urgent") + assert result["z_score"] is not None and result["z_score"] >= 1.5 + + +def test_evaluate_drift_skips_same_draw_push(): + """drift는 회차 단위. 같은 회차에서 두 번 호출하면 두 번째는 window push X.""" + # 첫 push (draw 1100) + signal_runner.evaluate_metric_and_persist( + source="sim", metric="drift", value=0.05, draw_no=1100, + z_normal=1.5, z_urgent=2.5, push_to_window=True, + ) + bl_before = db.get_baseline("drift") + assert bl_before["window_values"] == [0.05] + assert bl_before["last_pushed_draw_no"] == 1100 + + # 같은 회차 두 번째 호출 — window push X + signal_runner.evaluate_metric_and_persist( + source="sim", metric="drift", value=0.08, draw_no=1100, + z_normal=1.5, z_urgent=2.5, push_to_window=True, + ) + bl_after = db.get_baseline("drift") + assert bl_after["window_values"] == [0.05] # 변경 없음 + + +def test_run_signal_check_aggregates_three_metrics(monkeypatch): + """run_signal_check이 3종 메트릭 모두 평가하고 overall fire를 반환.""" + # mock service_proxy + async def fake_lotto_best(): + # 동일한 5종 점수의 균등 후보 20개 + return [{"numbers": [1,2,3,4,5,6], "scores": [10,10,10,10,10]}] * 20 + + async def fake_lotto_strategy_weights(): + return {"gap_focus": 0.4, "hot_focus": 0.3, "pair_bias": 0.3} + + monkeypatch.setattr(signal_runner, "_fetch_best_picks", fake_lotto_best) + monkeypatch.setattr(signal_runner, "_fetch_strategy_weights", fake_lotto_strategy_weights) + + import asyncio + out = asyncio.get_event_loop().run_until_complete( + signal_runner.run_signal_check(source="light", curate_result=None, current_draw_no=1101) + ) + assert "overall_fire" in out + assert "results" in out + assert any(r["metric"] == "sim_signal" for r in out["results"]) + assert any(r["metric"] == "drift" for r in out["results"]) + # light_check는 confidence 평가 안 함 + assert not any(r["metric"] == "confidence" for r in out["results"]) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd agent-office && pytest tests/test_lotto_signal_runner.py -v` +Expected: FAIL `ModuleNotFoundError: No module named 'app.curator.signal_runner'` + +- [ ] **Step 3: Implement signal_runner.py** + +```python +# agent-office/app/curator/signal_runner.py +"""LottoAgent 능동 시그널 — DB I/O + cron 진입점 + 평가 orchestration.""" +from __future__ import annotations +import logging +from typing import Any, Dict, List, Optional + +from .. import db +from .. import service_proxy +from . import signals + +logger = logging.getLogger("agent-office.lotto-signals") + +# 회차 단위 메트릭 (window push 시 last_pushed_draw_no 비교) +DRAW_SCOPED_METRICS = {"drift", "confidence"} + + +def _load_baseline(metric: str) -> signals.AdaptiveBaseline: + row = db.get_baseline(metric) + if row is None: + return signals.AdaptiveBaseline(window=[], window_max=8) + return signals.AdaptiveBaseline( + window=list(row["window_values"]), + window_max=8, + last_pushed_draw_no=row.get("last_pushed_draw_no"), + ) + + +def _save_baseline(metric: str, bl: signals.AdaptiveBaseline) -> None: + db.upsert_baseline( + metric=metric, + window_values=bl.window, + mu=bl.mu, + sigma=bl.sigma, + last_pushed_draw_no=bl.last_pushed_draw_no, + ) + + +def evaluate_metric_and_persist( + source: str, + metric: str, + value: float, + draw_no: Optional[int], + z_normal: float, + z_urgent: float, + push_to_window: bool, + payload: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """단일 메트릭 평가 → lotto_signals INSERT → baseline 갱신. + + 회차 단위 메트릭(drift, confidence)은 같은 draw_no에서 window push 생략. + """ + bl = _load_baseline(metric) + + # 회차 가드: 같은 회차 재평가 시 window push 생략 + do_push = push_to_window + if metric in DRAW_SCOPED_METRICS and draw_no is not None: + if bl.last_pushed_draw_no == draw_no: + do_push = False + + # 평가는 push 전 baseline 기준 (현재값 vs 과거 분포) + z, fire = bl.evaluate(value=value, z_normal=z_normal, z_urgent=z_urgent) + + if do_push: + bl.push(value=value, draw_no=draw_no) + _save_baseline(metric, bl) + + sid = db.insert_lotto_signal( + source=source, + metric=metric, + value=value, + baseline_mu=bl.mu if bl.size > 0 else None, + baseline_sigma=bl.sigma if bl.size >= 2 else None, + z_score=z, + fire_level=fire, + payload=payload, + ) + return { + "signal_id": sid, + "metric": metric, + "value": value, + "z_score": z, + "fire_level": fire, + "payload": payload or {}, + } + + +# ---------- Service proxy thin wrappers (monkeypatch 대상) ---------- + +async def _fetch_best_picks() -> List[Dict[str, Any]]: + return await service_proxy.lotto_best() + + +async def _fetch_strategy_weights() -> Dict[str, float]: + return await service_proxy.lotto_strategy_weights() + + +# ---------- Orchestrator ---------- + +async def run_signal_check( + source: str, + z_normal: float = 1.5, + z_urgent: float = 2.5, + curate_result: Optional[Dict[str, Any]] = None, + current_draw_no: Optional[int] = None, +) -> Dict[str, Any]: + """cron 진입점. source ∈ {'light', 'sim', 'deep'}. + + light/sim: Sim Consensus + Strategy Drift 평가 + deep: 위 2종 + Confidence (curate_result 필요) + """ + results: List[Dict[str, Any]] = [] + + # --- Sim Consensus --- + try: + best = await _fetch_best_picks() + v = signals.sim_consensus_score(best) + results.append( + evaluate_metric_and_persist( + source=source, metric="sim_signal", + value=v, draw_no=None, + z_normal=z_normal, z_urgent=z_urgent, + push_to_window=True, + payload={"top_count": min(len(best), 10)}, + ) + ) + except Exception as e: + logger.warning(f"sim_consensus 평가 실패: {e}") + + # --- Strategy Drift (회차 단위) --- + try: + w_curr = await _fetch_strategy_weights() + bl_drift = _load_baseline("drift") + # drift는 prev → curr 비교가 필요. baseline window의 마지막 entry는 이전 drift값. + # prev_weights는 별도 저장이 필요한데, 간단히 lotto_baselines.payload에 안 두고 + # 매 호출마다 GET 후 캐시되지 않은 직전 값과 비교 불가 → drift = 1회차 단위. + # 단순화: 이번 호출의 가중치를 baseline에 저장하기 전 직전 저장값을 prev로 사용. + prev_payload_row = db.get_baseline("drift_weights_cache") + w_prev = prev_payload_row["window_values"] if prev_payload_row else None + + if w_prev and isinstance(w_prev, list) and len(w_prev) > 0 and isinstance(w_prev[0], dict): + # last entry is dict of weights + prev_dict = w_prev[-1] + drift_value = signals.strategy_drift_score(prev_dict, w_curr) + results.append( + evaluate_metric_and_persist( + source=source, metric="drift", + value=drift_value, draw_no=current_draw_no, + z_normal=z_normal, z_urgent=z_urgent, + push_to_window=True, + payload={"weights_now": w_curr, "weights_prev": prev_dict}, + ) + ) + # weights 캐시 갱신 (FIFO 2개만 보관) + cache_window = (w_prev or []) + [w_curr] + if len(cache_window) > 2: + cache_window = cache_window[-2:] + db.upsert_baseline( + metric="drift_weights_cache", + window_values=cache_window, + mu=0.0, sigma=0.0, + last_pushed_draw_no=current_draw_no, + ) + except Exception as e: + logger.warning(f"strategy_drift 평가 실패: {e}") + + # --- Confidence (deep_check + curate_result 필수) --- + if source == "deep" and curate_result is not None: + try: + cv = signals.confidence_score(curate_result) + if cv is not None: + results.append( + evaluate_metric_and_persist( + source=source, metric="confidence", + value=cv, draw_no=current_draw_no, + z_normal=z_normal, z_urgent=z_urgent, + push_to_window=True, + payload={"draw_no": current_draw_no}, + ) + ) + except Exception as e: + logger.warning(f"confidence 평가 실패: {e}") + + overall = signals.decide_overall_fire( + [{"metric": r["metric"], "z": r["z_score"], "fire": r["fire_level"]} for r in results] + ) + return {"overall_fire": overall, "results": results} +``` + +- [ ] **Step 4: Add lotto_best + lotto_strategy_weights to service_proxy.py** + +`agent-office/app/service_proxy.py` 파일 맨 아래 추가: + +```python +async def lotto_best() -> List[Dict[str, Any]]: + """GET /api/lotto/best — best_picks 20개 (numbers + scores 5종).""" + from .config import LOTTO_BACKEND_URL + resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/best") + resp.raise_for_status() + data = resp.json() + items = data.get("items") if isinstance(data, dict) else data + return items or [] + + +async def lotto_strategy_weights() -> Dict[str, float]: + """GET /api/lotto/strategy/weights — 전략별 가중치 dict.""" + from .config import LOTTO_BACKEND_URL + resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/strategy/weights") + resp.raise_for_status() + data = resp.json() + weights = data.get("weights") if isinstance(data, dict) else data + # API 응답 형태가 [{strategy, weight, ...}] 일 수 있어 dict로 정규화 + if isinstance(weights, list): + return {item["strategy"]: float(item["weight"]) for item in weights} + return {k: float(v) for k, v in (weights or {}).items()} +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `cd agent-office && pytest tests/test_lotto_signal_runner.py -v` +Expected: All 4 tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add agent-office/app/curator/signal_runner.py \ + agent-office/app/service_proxy.py \ + agent-office/tests/test_lotto_signal_runner.py +git commit -m "feat(lotto-signals): signal_runner orchestrator + service_proxy GET helpers" +``` + +--- + +# Phase 2 — cron 통합 (텔레그램 X, DB 기록만) + +## Task 5: config.py env vars 7개 추가 + +**Files:** +- Modify: `agent-office/app/config.py` (파일 끝) + +- [ ] **Step 1: Append env var section** + +`agent-office/app/config.py` 끝에 추가: + +```python + +# Lotto Active Signals +LOTTO_SIGNAL_WINDOW = int(os.getenv("LOTTO_SIGNAL_WINDOW", "8")) +LOTTO_Z_NORMAL = float(os.getenv("LOTTO_Z_NORMAL", "1.5")) +LOTTO_Z_URGENT = float(os.getenv("LOTTO_Z_URGENT", "2.5")) +LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9")) +LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25")) +LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6")) +LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3")) +``` + +- [ ] **Step 2: Smoke check** + +```bash +cd agent-office && python -c "from app.config import LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR; print(LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR)" +``` +Expected: `1.5 9` + +- [ ] **Step 3: Commit** + +```bash +git add agent-office/app/config.py +git commit -m "feat(lotto-signals): config env vars 7종 추가 (window/임계치/digest/throttle)" +``` + +--- + +## Task 6: LottoAgent에 on_signal_check / on_daily_digest 추가 (텔레그램 발송 X) + +**Files:** +- Modify: `agent-office/app/agents/lotto.py` + +- [ ] **Step 1: Extend LottoAgent class** + +`agent-office/app/agents/lotto.py`의 `on_command()`를 다음으로 교체 (signal check 액션 추가): + +```python + async def on_command(self, action: str, params: dict) -> dict: + if action in ("curate_now", "curate_weekly"): + return await self._run(source="manual") + if action == "status": + return {"ok": True, "message": f"{self.state}: {self.state_detail}"} + if action in ("signal_check", "light_check", "sim_check", "deep_check"): + return await self.run_signal_check(source=action.replace("_check", "") if action != "signal_check" else "light") + if action == "daily_digest": + return await self.run_daily_digest() + return {"ok": False, "message": f"unknown action: {action}"} +``` + +그리고 클래스 끝에 신규 메서드 추가: + +```python + async def run_signal_check(self, source: str = "light") -> dict: + """비-LLM 시그널 평가 (light/sim) 또는 deep_check (LLM 호출 후). + + Phase 2에선 텔레그램 발송 안 함. lotto_signals INSERT만. + Phase 3에서 텔레그램 트리거 추가. + """ + from ..curator.signal_runner import run_signal_check + from ..config import LOTTO_Z_NORMAL, LOTTO_Z_URGENT + from ..db import add_log + + if self.state not in ("idle", "reporting"): + return {"ok": False, "message": f"busy ({self.state})"} + + try: + curate_result: dict | None = None + current_draw_no: int | None = None + + if source == "deep": + # deep_check은 큐레이션을 먼저 수행 (LLM 호출) + from ..curator.pipeline import curate_weekly + cw = await curate_weekly(source="signal_deep") + curate_result = cw.get("payload") or cw # confidence 키 필요 + current_draw_no = cw.get("draw_no") + + outcome = await run_signal_check( + source=source, + z_normal=LOTTO_Z_NORMAL, + z_urgent=LOTTO_Z_URGENT, + curate_result=curate_result, + current_draw_no=current_draw_no, + ) + add_log( + self.agent_id, + f"signal_check({source}) → overall={outcome['overall_fire']} results={len(outcome['results'])}", + ) + return {"ok": True, **outcome} + except Exception as e: + add_log(self.agent_id, f"signal_check 예외: {e}", level="error") + return {"ok": False, "message": f"{type(e).__name__}: {e}"} + + async def run_daily_digest(self) -> dict: + """Phase 2: 발화 카운트만 반환. Phase 3에서 텔레그램 발송 추가.""" + from ..db import get_recent_lotto_signals, add_log + sigs = get_recent_lotto_signals(hours=24, min_fire="normal") + add_log(self.agent_id, f"daily_digest: 지난 24h 발화 {len(sigs)}건") + return {"ok": True, "count": len(sigs), "signals": sigs} +``` + +- [ ] **Step 2: Smoke import check** + +```bash +cd agent-office && python -c "from app.agents.lotto import LottoAgent; print(LottoAgent.agent_id)" +``` +Expected: `lotto` + +- [ ] **Step 3: Commit** + +```bash +git add agent-office/app/agents/lotto.py +git commit -m "feat(lotto-signals): LottoAgent.run_signal_check/run_daily_digest (텔레그램 X)" +``` + +--- + +## Task 7: scheduler.py에 cron 4개 추가 + +**Files:** +- Modify: `agent-office/app/scheduler.py` + +- [ ] **Step 1: Add cron job functions + registrations** + +기존 `_run_lotto_schedule` 근처에 함수 4개 추가: + +```python +async def _run_lotto_light_check(): + agent = AGENT_REGISTRY.get("lotto") + if agent: + await agent.run_signal_check(source="light") + +async def _run_lotto_sim_check(): + agent = AGENT_REGISTRY.get("lotto") + if agent: + await agent.run_signal_check(source="sim") + +async def _run_lotto_deep_check(): + agent = AGENT_REGISTRY.get("lotto") + if agent: + await agent.run_signal_check(source="deep") + +async def _run_lotto_daily_digest(): + agent = AGENT_REGISTRY.get("lotto") + if agent: + await agent.run_daily_digest() +``` + +기존 `lotto_curate` cron 등록(line 75) 직후에 4개 추가: + +```python + scheduler.add_job(_run_lotto_light_check, "cron", hour=9, minute=15, id="lotto_light_check") + scheduler.add_job(_run_lotto_sim_check, "cron", minute=15, hour="0,4,8,12,16,20", id="lotto_sim_check") + scheduler.add_job(_run_lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check") + # digest hour/min은 동적 (env)이지만 cron 등록은 상수로 — 9:25 고정. 변경 필요 시 컨테이너 재시작. + scheduler.add_job(_run_lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest") +``` + +- [ ] **Step 2: Verify import + cron registration** + +```bash +cd agent-office && python -c " +from app.scheduler import _run_lotto_light_check, _run_lotto_sim_check, _run_lotto_deep_check, _run_lotto_daily_digest +print('all imports ok') +" +``` +Expected: `all imports ok` + +- [ ] **Step 3: Commit** + +```bash +git add agent-office/app/scheduler.py +git commit -m "feat(lotto-signals): scheduler cron 4종 등록 (light/sim/deep/digest)" +``` + +--- + +# Phase 3 — 텔레그램 알림 + Throttle + API endpoints + +## Task 8: 텔레그램 메시지 폼 단위 테스트 + +**Files:** +- Create: `agent-office/tests/test_lotto_telegram_signal.py` + +- [ ] **Step 1: Write failing tests** + +```python +# agent-office/tests/test_lotto_telegram_signal.py +from app.notifiers.telegram_lotto import ( + _format_urgent_signal, + _format_signal_digest, +) + + +def test_urgent_signal_format_basic(): + event = { + "fire_level": "urgent", + "triggered_at": "2026-05-20T07:18:00.000Z", + "results": [ + {"metric": "sim_signal", "value": 1.84, "z_score": 3.9, + "baseline_mu": 1.02, "baseline_sigma": 0.21, "payload": {}}, + {"metric": "drift", "value": 0.18, "z_score": 3.0, + "baseline_mu": 0.06, "baseline_sigma": 0.04, + "payload": {"weights_now": {"gap_focus": 0.5, "hot_focus": 0.5}}}, + ], + } + text = _format_urgent_signal(event) + assert "🚨" in text + assert "Sim Consensus" in text + assert "z=3.9" in text + assert "Strategy Drift" in text or "drift" in text.lower() + + +def test_signal_digest_format_with_signals(): + digest = { + "evaluated": 6, + "fired": 2, + "signals": [ + {"metric": "sim_signal", "fire_level": "normal", "z_score": 1.7, + "triggered_at": "2026-05-20T16:18:00Z", "payload": {}}, + {"metric": "confidence", "fire_level": "normal", "z_score": 1.6, + "triggered_at": "2026-05-20T09:05:00Z", "payload": {}}, + ], + "weights_trend": {"gap_focus": +0.12, "hot_focus": -0.02, "pair_bias": -0.08}, + } + text = _format_signal_digest(digest) + assert "📊" in text + assert "지난 24h" in text + assert "z=1.7" in text + + +def test_signal_digest_empty_returns_empty_string(): + """발화 0건이면 빈 문자열 → 발송 자체 skip 가능.""" + text = _format_signal_digest({"evaluated": 6, "fired": 0, "signals": [], "weights_trend": {}}) + assert text == "" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd agent-office && pytest tests/test_lotto_telegram_signal.py -v` +Expected: FAIL `ImportError: cannot import name '_format_urgent_signal'` + +- [ ] **Step 3: Implement telegram_lotto.py additions** + +`agent-office/app/notifiers/telegram_lotto.py`에 추가 (파일 끝): + +```python + + +# ---------- 능동 시그널 알림 (urgent + digest) ---------- + +_METRIC_LABEL = { + "sim_signal": "Sim Consensus", + "drift": "Strategy Drift", + "confidence": "Confidence", +} + + +def _format_urgent_signal(event: Dict[str, Any]) -> str: + """긴급 시그널 텔레그램 메시지 포맷.""" + triggered = event.get("triggered_at", "")[:19].replace("T", " ") + results = event.get("results", []) + fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")] + + lines = [ + "🚨 로또 능동 신호", + "", + f"[{triggered}]", + f"강한 시그널 {len(fired)}종 발화:", + ] + for r in fired: + label = _METRIC_LABEL.get(r["metric"], r["metric"]) + v = r.get("value") + mu = r.get("baseline_mu") + sigma = r.get("baseline_sigma") + z = r.get("z_score") + if mu is not None and sigma is not None: + lines.append(f"• {label} {v:.2f} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}") + else: + lines.append(f"• {label} {v:.2f}") + + # drift 페이로드 — 어떤 전략이 변동했는지 한 줄 + for r in fired: + if r["metric"] == "drift": + wn = (r.get("payload") or {}).get("weights_now") or {} + wp = (r.get("payload") or {}).get("weights_prev") or {} + if wn and wp: + diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))} + top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2] + detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top) + lines.append("") + lines.append(f"요인: {detail}") + break + + lines.append("") + lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)") + return "\n".join(lines) + + +def _format_signal_digest(digest: Dict[str, Any]) -> str: + """일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호).""" + fired = int(digest.get("fired", 0)) + if fired == 0: + return "" + + signals_list = digest.get("signals", []) + evaluated = digest.get("evaluated", 0) + + lines = [ + "📊 로또 일일 요약 (지난 24h)", + "", + f"평가 {evaluated}회 / 발화 {fired}회", + ] + for s in signals_list: + label = _METRIC_LABEL.get(s["metric"], s["metric"]) + z = s.get("z_score") + when = (s.get("triggered_at") or "")[11:16] # HH:MM + z_text = f"z={z:.1f}" if z is not None else "z=-" + lines.append(f"• {label:14s} {s['fire_level']:6s} {z_text} ({when})") + + weights_trend = digest.get("weights_trend") or {} + if weights_trend: + lines += ["", "전략 가중치 추세 (최근 8회 baseline):"] + for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])): + arrow = "↑" if delta > 0.01 else ("↓" if delta < -0.01 else "→") + lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%") + + return "\n".join(lines) + + +async def send_urgent_signal(event: Dict[str, Any]) -> None: + text = _format_urgent_signal(event) + try: + await send_raw(text) + except Exception as e: + logger.warning(f"[telegram_lotto] urgent signal send failed: {e}") + + +async def send_signal_summary(digest: Dict[str, Any]) -> None: + text = _format_signal_digest(digest) + if not text: + return # 발화 0건이면 발송 skip + try: + await send_raw(text) + except Exception as e: + logger.warning(f"[telegram_lotto] digest send failed: {e}") +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd agent-office && pytest tests/test_lotto_telegram_signal.py -v` +Expected: All 3 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add agent-office/app/notifiers/telegram_lotto.py \ + agent-office/tests/test_lotto_telegram_signal.py +git commit -m "feat(lotto-signals): 텔레그램 urgent/digest 메시지 포맷" +``` + +--- + +## Task 9: signal_check이 텔레그램 발송 트리거 + throttle 통합 + +**Files:** +- Modify: `agent-office/app/agents/lotto.py` + +- [ ] **Step 1: Update run_signal_check to fire telegram on urgent** + +`run_signal_check()`의 마지막 `return {"ok": True, **outcome}` 직전에 추가: + +```python + # --- Throttle + 텔레그램 urgent 발송 --- + from ..config import ( + LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX, + ) + from ..db import ( + get_last_signal_notification, get_recent_urgent_count, + mark_signal_notified, + ) + from ..notifiers.telegram_lotto import send_urgent_signal + + if outcome["overall_fire"] == "urgent": + # daily cap 체크 + if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX: + add_log( + self.agent_id, + f"urgent daily cap 도달 → normal로 강등 (digest 합류)", + level="warning", + ) + else: + # throttle: 어느 메트릭이라도 6시간 이내 같은 fire_level 알림이 있으면 skip + blocked = False + for r in outcome["results"]: + if r["fire_level"] in ("normal", "urgent"): + last = get_last_signal_notification( + metric=r["metric"], fire_level=r["fire_level"], + hours=LOTTO_THROTTLE_HOURS, + ) + if last: + blocked = True + break + if not blocked: + from datetime import datetime, timezone + event = { + "fire_level": "urgent", + "triggered_at": datetime.now(timezone.utc).isoformat(), + "results": outcome["results"], + } + await send_urgent_signal(event) + # 모든 발화 시그널에 대해 notified_at 마킹 + for r in outcome["results"]: + if r["fire_level"] in ("normal", "urgent"): + mark_signal_notified(r["signal_id"]) + add_log(self.agent_id, f"urgent 텔레그램 발송 완료 (시그널 {len(outcome['results'])}개 마킹)") +``` + +- [ ] **Step 2: Update run_daily_digest to actually send telegram** + +`run_daily_digest()`를 다음으로 교체: + +```python + async def run_daily_digest(self) -> dict: + """일일 요약 — 지난 24h normal/urgent 발화를 묶어 텔레그램 1통.""" + from ..db import get_recent_lotto_signals, get_signals_history, add_log, get_all_baselines + from ..notifiers.telegram_lotto import send_signal_summary + + sigs = get_recent_lotto_signals(hours=24, min_fire="normal") + # 전체 평가 횟수 (noop+warmup 포함) + total_24h = get_signals_history(days=1) + evaluated = len(total_24h) + + # weights_trend: drift_weights_cache에서 prev/curr 차이 계산 + trend = {} + try: + from ..db import get_baseline + cache = get_baseline("drift_weights_cache") + if cache and isinstance(cache["window_values"], list) and len(cache["window_values"]) >= 2: + prev_w = cache["window_values"][-2] + curr_w = cache["window_values"][-1] + trend = { + k: curr_w.get(k, 0.0) - prev_w.get(k, 0.0) + for k in (set(prev_w) | set(curr_w)) + } + except Exception as e: + add_log(self.agent_id, f"weights_trend 계산 실패: {e}", level="warning") + + digest = { + "evaluated": evaluated, + "fired": len(sigs), + "signals": sigs, + "weights_trend": trend, + } + await send_signal_summary(digest) + add_log(self.agent_id, f"daily_digest 발송: 평가 {evaluated} / 발화 {len(sigs)}") + return {"ok": True, **digest} +``` + +- [ ] **Step 3: Smoke import check** + +```bash +cd agent-office && python -c " +import asyncio +from app.agents.lotto import LottoAgent +agent = LottoAgent() +print('methods:', [m for m in dir(agent) if 'signal' in m or 'digest' in m]) +" +``` +Expected: `methods: ['run_daily_digest', 'run_signal_check']` + +- [ ] **Step 4: Commit** + +```bash +git add agent-office/app/agents/lotto.py +git commit -m "feat(lotto-signals): urgent 텔레그램 발송 + throttle/cap + daily digest 전송" +``` + +--- + +## Task 10: FastAPI endpoint 3개 추가 + +**Files:** +- Modify: `agent-office/app/main.py` + +- [ ] **Step 1: Add endpoints** + +`agent-office/app/main.py`에서 다른 lotto 관련 endpoint 근처에 추가: + +```python +@app.get("/api/agent-office/lotto/signals") +async def list_lotto_signals(days: int = 7): + """시그널 이력 (모든 fire_level).""" + from .db import get_signals_history + return {"items": get_signals_history(days=days)} + + +@app.get("/api/agent-office/lotto/baselines") +async def list_lotto_baselines(): + """현재 baseline μ/σ + window 상태.""" + from .db import get_all_baselines + return {"items": get_all_baselines()} + + +@app.post("/api/agent-office/lotto/signal-check") +async def trigger_signal_check(source: str = "light"): + """수동 트리거 (디버그·테스트용). source ∈ {light, sim, deep}.""" + if source not in ("light", "sim", "deep"): + from fastapi import HTTPException + raise HTTPException(status_code=400, detail="source must be light/sim/deep") + agent = AGENT_REGISTRY.get("lotto") + if not agent: + from fastapi import HTTPException + raise HTTPException(status_code=503, detail="lotto agent not registered") + return await agent.run_signal_check(source=source) +``` + +- [ ] **Step 2: Verify endpoints** + +```bash +cd agent-office && python -c " +from app.main import app +routes = [r.path for r in app.routes if 'lotto' in r.path and 'signal' in r.path or 'baseline' in r.path] +print(routes) +" +``` +Expected: 3 routes listed including `/api/agent-office/lotto/signals`, `/baselines`, `/signal-check` + +- [ ] **Step 3: Commit** + +```bash +git add agent-office/app/main.py +git commit -m "feat(lotto-signals): GET signals/baselines + POST signal-check endpoint" +``` + +--- + +## Task 11: CLAUDE.md 업데이트 (agent-office 섹션) + +**Files:** +- Modify: `web-backend/CLAUDE.md` + +- [ ] **Step 1: Update agent-office API & scheduler section** + +`web-backend/CLAUDE.md`의 agent-office API 표에 다음 행 추가 (적절한 위치): + +```markdown +| GET | `/api/agent-office/lotto/signals?days=7` | 로또 능동 시그널 이력 (모든 fire_level) | +| GET | `/api/agent-office/lotto/baselines` | 로또 메트릭별 baseline μ/σ + 윈도우 상태 | +| POST | `/api/agent-office/lotto/signal-check?source=light` | 로또 시그널 평가 수동 트리거 (light/sim/deep) | +``` + +그리고 "스케줄러 job" 항목에 추가: + +```markdown +- 09:15 매일 — 로또 light_check (시뮬·전략 가중치 평가) +- 매 4시간 :15 — 로또 sim_check (00/04/08/12/16/20시) +- 일/수 21:15 — 로또 deep_check (큐레이션 후 confidence 포함 평가) +- 09:25 매일 — 로또 daily_digest (지난 24h 발화 텔레그램 1통) +``` + +또한 새 환경변수 명시: + +```markdown +- `LOTTO_SIGNAL_WINDOW`: baseline 윈도우 크기 (기본 8) +- `LOTTO_Z_NORMAL`: normal fire 임계치 (기본 1.5) +- `LOTTO_Z_URGENT`: urgent fire 임계치 (기본 2.5) +- `LOTTO_THROTTLE_HOURS`: 같은 메트릭 재발화 throttle (기본 6시간) +- `LOTTO_URGENT_DAILY_MAX`: urgent 하루 cap (기본 3통) +``` + +- [ ] **Step 2: Verify** + +```bash +git -C C:/Users/jaeoh/Desktop/workspace/web-backend diff CLAUDE.md | head -50 +``` + +- [ ] **Step 3: Commit** + +```bash +git add CLAUDE.md +git commit -m "docs(CLAUDE): agent-office 로또 능동 시그널 API/스케줄러/env 추가" +``` + +--- + +## Task 12: NAS 배포 + 24h 가동 검증 (수동) + +- [ ] **Step 1: git push로 자동 배포** + +```bash +git push +``` +Expected: Gitea webhook → deployer가 `docker compose up -d --build` 실행 → agent-office 재시작. + +- [ ] **Step 2: 컨테이너 상태 확인** + +```bash +ssh nas "docker logs agent-office --tail 50" +``` +Expected: +- `init_db` 로그에서 `lotto_signals`, `lotto_baselines` 테이블 생성 성공 메시지 +- scheduler 로그에서 `lotto_light_check`, `lotto_sim_check`, `lotto_deep_check`, `lotto_digest` 4개 job 등록 확인 + +- [ ] **Step 3: 수동 트리거로 즉시 검증** + +```bash +curl -X POST "https://gahusb.synology.me/api/agent-office/lotto/signal-check?source=light" +``` +Expected: `{"ok": true, "overall_fire": "warmup", "results": [...]}` +(첫 호출은 baseline 비어있어 warmup) + +- [ ] **Step 4: DB에 시그널 들어갔는지 확인** + +```bash +curl "https://gahusb.synology.me/api/agent-office/lotto/signals?days=1" +``` +Expected: `{"items": [{"id": 1, "source": "light", "metric": "sim_signal", "fire_level": "warmup", ...}]}` + +- [ ] **Step 5: 24시간 가동 후 baseline 누적 확인** + +24h 후: +```bash +curl "https://gahusb.synology.me/api/agent-office/lotto/baselines" +``` +Expected: `sim_signal` 메트릭의 `window_values` 길이가 6~7개 (4h 시뮬 6회 + 1회 light) + +- [ ] **Step 6: 텔레그램 발송 검증 (자연 발화 또는 강제)** + +24h 가동 후에도 자연 발화 없으면 강제 테스트: +- `LOTTO_Z_NORMAL=0.1`로 일시 낮춤 → 재배포 → 다음 sim_check에서 발화 +- 텔레그램으로 🚨 메시지 도착 확인 후 원복 + +--- + +# Self-Review (수행 완료) + +**1. Spec coverage check** + +| Spec 섹션 | 구현 task | +|---|---| +| 4.1 Sim Consensus | Task 1, 2 | +| 4.2 Strategy Drift | Task 1, 2 | +| 4.3 Confidence | Task 1, 2 | +| 4.4 Adaptive Baseline | Task 1, 2 | +| 4.5 Trigger × Metric matrix | Task 4 (DRAW_SCOPED_METRICS + source 분기) | +| 4.6 Fire 결정 | Task 1 (`decide_overall_fire`) | +| 5.1 알림 흐름 | Task 9 | +| 5.2 메시지 폼 | Task 8 | +| 5.3 Throttle | Task 9 | +| 6.1 lotto_signals 테이블 | Task 3 | +| 6.2 lotto_baselines 테이블 | Task 3 | +| 7. API 3종 | Task 10 | +| 8. env vars 7종 | Task 5 | +| 9. cron 4종 | Task 7 | + +모두 매핑됨. + +**2. Placeholder scan**: TBD/TODO 없음. 모든 step에 실제 코드 포함. + +**3. Type consistency**: +- `evaluate_metric_and_persist` 시그니처는 Task 4 정의 그대로 Task 6에서 호출 +- `_format_urgent_signal(event)` event 구조: `fire_level`, `triggered_at`, `results[]` → Task 8 테스트 & Task 9 호출 일치 +- `digest` 구조: `evaluated`, `fired`, `signals`, `weights_trend` → Task 8 테스트 & Task 9 호출 일치 +- DB CRUD 함수명 (`insert_lotto_signal`, `get_recent_lotto_signals`, `mark_signal_notified` 등) — Task 3 정의와 Task 9 호출 일치 + +이슈 없음. + +--- + +# 비목표 (Out of scope) + +- Layer B 자동 시뮬 강도 조절 (v2) +- 텔레그램 인라인 키보드 / 사용자 피드백 루프 (v2) +- 핫넘버/콜드넘버 시그널 (노이즈 위험으로 v1 제외) +- 프론트 `/lotto/agent` UI (web-ui 별도 PR) +- Hot/Cold rotation 메트릭 추가