# LottoAgent 능동성 확장 Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** LottoAgent에 다중 트리거 + 적응형 시그널 모니터링 + 텔레그램 능동 알림(긴급/일일 요약) 기능 추가 **Architecture:** agent-office 단일 컨테이너 안에 `curator/signals.py` 모듈 신설 + agent_office.db에 2개 테이블 추가. lotto-lab은 read-only GET API만 소비, 변경 없음. cron 4개로 light / sim / deep / digest 분리. **Tech Stack:** Python 3.12, FastAPI, APScheduler, SQLite, httpx, pytest, pytest-asyncio **Spec:** `docs/superpowers/specs/2026-05-20-lotto-active-agent-design.md` --- ## File Structure | 경로 | 작업 | 책임 | |---|---|---| | `agent-office/app/curator/signals.py` | Create | 3종 메트릭 평가 + adaptive baseline (순수 함수) | | `agent-office/app/curator/signal_runner.py` | Create | DB I/O + cron 진입점 + 알림 결정 | | `agent-office/app/db.py` | Modify | `lotto_signals`, `lotto_baselines` 테이블 + CRUD | | `agent-office/app/service_proxy.py` | Modify | `lotto_best()`, `lotto_strategy_weights()` 추가 | | `agent-office/app/agents/lotto.py` | Modify | `on_signal_check()`, `on_daily_digest()` 추가 | | `agent-office/app/scheduler.py` | Modify | cron 4개 등록 | | `agent-office/app/notifiers/telegram_lotto.py` | Modify | `send_urgent_signal()`, `send_signal_summary()` | | `agent-office/app/main.py` | Modify | endpoint 3개 | | `agent-office/app/config.py` | Modify | env vars 7개 | | `agent-office/tests/test_lotto_signals.py` | Create | signals.py 단위 테스트 | | `agent-office/tests/test_lotto_signal_runner.py` | Create | signal_runner DB I/O 통합 테스트 | | `agent-office/tests/test_lotto_telegram_signal.py` | Create | 텔레그램 메시지 포맷 테스트 | | `web-backend/CLAUDE.md` | Modify | agent-office 섹션 갱신 | --- # Phase 1 — 순수 함수 + DB 마이그레이션 ## Task 1: signals.py 메트릭 함수 단위 테스트 **Files:** - Create: `agent-office/tests/test_lotto_signals.py` - [ ] **Step 1: Write failing tests for Sim Consensus** ```python # agent-office/tests/test_lotto_signals.py import math import pytest from app.curator import signals def test_sim_consensus_top10_geomean(): """top-10 consensus 평균이 기하평균 기반인지.""" best_picks = [ {"scores": [10, 10, 10, 10, 10]}, # high & uniform {"scores": [9, 9, 9, 9, 9]}, {"scores": [8, 8, 8, 8, 8]}, {"scores": [7, 7, 7, 7, 7]}, {"scores": [6, 6, 6, 6, 6]}, {"scores": [5, 5, 5, 5, 5]}, {"scores": [4, 4, 4, 4, 4]}, {"scores": [3, 3, 3, 3, 3]}, {"scores": [2, 2, 2, 2, 2]}, {"scores": [1, 1, 1, 1, 1]}, # top 10 {"scores": [0, 0, 0, 0, 0]}, # bottom 10 ] * 1 + [{"scores": [0, 0, 0, 0, 0]}] * 10 # 처음 10개가 top, 다 normalize 후 consensus 동일하니까 top10 평균은 명확 result = signals.sim_consensus_score(best_picks) assert 0.0 <= result <= 1.0 # uniform 분포에선 top10 평균이 단조 감소 분포의 상반부 평균과 같아야 함 assert result > 0.4 def test_sim_consensus_geomean_penalizes_imbalance(): """5종 중 한 종만 폭주하는 outlier 후보는 균형 후보보다 작아야 한다.""" balanced = [{"scores": [5, 5, 5, 5, 5]}] * 20 imbalanced = [{"scores": [25, 0, 0, 0, 0]}] * 20 # 평균은 같지만 한 점수만 폭주 s_balanced = signals.sim_consensus_score(balanced) s_imbalanced = signals.sim_consensus_score(imbalanced) # imbalanced는 normalize 후 한 차원만 1.0, 나머지 0.0 → 기하평균 0 assert s_imbalanced < s_balanced def test_strategy_drift_score(): """drift = 전략별 가중치 변화 절댓값 합.""" w_prev = {"gap_focus": 0.30, "hot_focus": 0.25, "pair_bias": 0.45} w_curr = {"gap_focus": 0.40, "hot_focus": 0.20, "pair_bias": 0.40} # |0.10| + |-0.05| + |-0.05| = 0.20 result = signals.strategy_drift_score(w_prev, w_curr) assert abs(result - 0.20) < 1e-9 def test_strategy_drift_new_strategy_appears(): """이전에 없던 전략이 등장하면 그 가중치 전체가 drift에 가산.""" w_prev = {"gap_focus": 0.5, "hot_focus": 0.5} w_curr = {"gap_focus": 0.4, "hot_focus": 0.4, "newbie": 0.2} # |0.5-0.4| + |0.5-0.4| + |0-0.2| = 0.4 result = signals.strategy_drift_score(w_prev, w_curr) assert abs(result - 0.4) < 1e-9 def test_confidence_score_passthrough(): """confidence는 큐레이션 결과의 값 그대로 (0~1 clamp 확인).""" assert signals.confidence_score({"confidence": 0.85}) == 0.85 assert signals.confidence_score({"confidence": 1.2}) == 1.0 # clamp assert signals.confidence_score({"confidence": -0.1}) == 0.0 assert signals.confidence_score({}) is None def test_adaptive_baseline_cold_start(): """window 크기 < 4 → warmup, z=None.""" bl = signals.AdaptiveBaseline(window=[1.0, 1.1, 0.9], window_max=8) z, fire = bl.evaluate(value=1.5, z_normal=1.5, z_urgent=2.5) assert fire == "warmup" assert z is None def test_adaptive_baseline_preparing(): """window 4~7 → 보수적 임계치 z=2.0.""" bl = signals.AdaptiveBaseline(window=[1.0, 1.0, 1.0, 1.0], window_max=8) # value=1.0 → mu=1.0, sigma=0 → div0 처리 필요. preparing 단계라 임계치는 보수적. z, fire = bl.evaluate(value=3.0, z_normal=1.5, z_urgent=2.5) # sigma=0인 경우 처리 — value > mu면 무한대로 보고 urgent? 또는 z계산 불가 → warmup? # 결정: sigma == 0이면 (window 분산 없음) z=inf 취급, value > mu면 urgent로 assert fire in ("normal", "urgent") def test_adaptive_baseline_normal_window_full(): """window 8 풀, value가 평균보다 1.5σ 이상이면 normal.""" bl = signals.AdaptiveBaseline( window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0], window_max=8, ) # μ ≈ 1.0, σ ≈ 0.08 → z=1.5는 value ≈ 1.12 z, fire = bl.evaluate(value=1.20, z_normal=1.5, z_urgent=2.5) assert fire == "normal" assert z is not None and z >= 1.5 def test_adaptive_baseline_urgent(): """z >= 2.5 → urgent.""" bl = signals.AdaptiveBaseline( window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0], window_max=8, ) z, fire = bl.evaluate(value=2.0, z_normal=1.5, z_urgent=2.5) assert fire == "urgent" def test_adaptive_baseline_push_updates_window(): """push 시 FIFO 동작.""" bl = signals.AdaptiveBaseline(window=[1, 2, 3, 4, 5, 6, 7, 8], window_max=8) bl.push(9.0) assert bl.window == [2, 3, 4, 5, 6, 7, 8, 9.0] def test_decide_fire_level_combination(): """2개 이상 normal 발화 → urgent.""" # 2개 normal z ≥ 1.5 sigs = [ {"metric": "sim", "z": 1.6, "fire": "normal"}, {"metric": "drift", "z": 1.7, "fire": "normal"}, {"metric": "conf", "z": 0.5, "fire": "noop"}, ] assert signals.decide_overall_fire(sigs) == "urgent" # 1개 normal sigs2 = [ {"metric": "sim", "z": 1.6, "fire": "normal"}, {"metric": "drift", "z": 0.3, "fire": "noop"}, ] assert signals.decide_overall_fire(sigs2) == "normal" # 단일 극값 z ≥ 2.5 → urgent sigs3 = [ {"metric": "sim", "z": 3.0, "fire": "urgent"}, {"metric": "drift", "z": 0.2, "fire": "noop"}, ] assert signals.decide_overall_fire(sigs3) == "urgent" # 전부 noop sigs4 = [{"metric": "sim", "z": 0.5, "fire": "noop"}] assert signals.decide_overall_fire(sigs4) == "noop" ``` - [ ] **Step 2: Run test to verify it fails** Run: `cd agent-office && pytest tests/test_lotto_signals.py -v` Expected: FAIL with `ModuleNotFoundError: No module named 'app.curator.signals'` - [ ] **Step 3: Commit failing tests** ```bash git add agent-office/tests/test_lotto_signals.py git commit -m "test(lotto-signals): 메트릭 함수·adaptive baseline 단위 테스트" ``` --- ## Task 2: signals.py 구현 **Files:** - Create: `agent-office/app/curator/signals.py` - [ ] **Step 1: Implement signals module** ```python # agent-office/app/curator/signals.py """LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수). DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list. signal_runner.py에서 DB 연동 + cron 진입점 담당. """ from __future__ import annotations import math from dataclasses import dataclass, field from statistics import mean, pstdev from typing import Any, Dict, List, Optional, Tuple # ---------- Metric: Sim Consensus ---------- def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]: """20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수.""" if not picks: return [] n_metrics = len(picks[0]["scores"]) columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)] norms_per_col = [] for col in columns: lo, hi = min(col), max(col) rng = hi - lo if rng == 0: norms_per_col.append([0.5] * len(col)) # 모두 동일하면 중립 0.5 else: norms_per_col.append([(v - lo) / rng for v in col]) # 전치: 후보별 정규화 점수 리스트 return [ [norms_per_col[k][i] for k in range(n_metrics)] for i in range(len(picks)) ] def _geomean(values: List[float]) -> float: """기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티).""" if not values: return 0.0 if any(v <= 0 for v in values): return 0.0 log_sum = sum(math.log(v) for v in values) return math.exp(log_sum / len(values)) def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float: """top-10 후보의 기하평균 consensus 평균. Args: best_picks: [{"numbers": [...], "scores": [s1, s2, s3, s4, s5]}, ...] """ if not best_picks: return 0.0 normalized = _normalize_columns(best_picks) consensus = [_geomean(scores) for scores in normalized] consensus.sort(reverse=True) top = consensus[:10] if len(consensus) >= 10 else consensus return mean(top) if top else 0.0 # ---------- Metric: Strategy Drift ---------- def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float: """가중치 변화 절댓값 합. 신규/소멸 전략도 가산.""" keys = set(prev) | set(curr) return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys) # ---------- Metric: Confidence ---------- def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]: """큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None.""" if "confidence" not in curate_result: return None v = float(curate_result["confidence"]) return max(0.0, min(1.0, v)) # ---------- Adaptive Baseline ---------- @dataclass class AdaptiveBaseline: window: List[float] = field(default_factory=list) window_max: int = 8 last_pushed_draw_no: Optional[int] = None # 회차 단위 메트릭 중복 push 방지 @property def size(self) -> int: return len(self.window) @property def mu(self) -> float: return mean(self.window) if self.window else 0.0 @property def sigma(self) -> float: # population stdev (8개 윈도우라 ddof=0이 안정) return pstdev(self.window) if len(self.window) >= 2 else 0.0 def push(self, value: float, draw_no: Optional[int] = None) -> None: """FIFO push. window_max 초과 시 가장 오래된 값 제거.""" self.window.append(float(value)) if len(self.window) > self.window_max: self.window = self.window[-self.window_max:] if draw_no is not None: self.last_pushed_draw_no = draw_no def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]: """z-score 계산 + fire_level 판정. Returns: (z_score, fire_level) — z_score는 cold start/warmup이면 None. fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'} """ if self.size < 4: return None, "warmup" # 보수적 임계치 (window 4~7) z_normal_eff = 2.0 if self.size < self.window_max else z_normal z_urgent_eff = z_urgent if self.sigma == 0: # 분산 없음 — value가 평균을 크게 벗어나면 urgent로 분류, 아니면 noop return (None, "urgent") if value > self.mu else (None, "noop") z = (value - self.mu) / self.sigma if z >= z_urgent_eff: return z, "urgent" if z >= z_normal_eff: return z, "normal" return z, "noop" # ---------- Combined fire decision ---------- def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str: """3종 시그널을 종합해 전체 fire_level 결정. Args: signal_results: [{"metric": str, "z": float|None, "fire": str}, ...] Returns: 'noop' | 'normal' | 'urgent' """ fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")] if any(s["fire"] == "urgent" for s in fires): return "urgent" if len(fires) >= 2: return "urgent" # 2개 이상 normal → urgent로 승격 if len(fires) == 1: return "normal" return "noop" ``` - [ ] **Step 2: Run tests to verify they pass** Run: `cd agent-office && pytest tests/test_lotto_signals.py -v` Expected: All 9 tests PASS - [ ] **Step 3: Commit** ```bash git add agent-office/app/curator/signals.py git commit -m "feat(lotto-signals): 메트릭 함수·adaptive baseline 순수함수 구현" ``` --- ## Task 3: DB 마이그레이션 (lotto_signals + lotto_baselines) **Files:** - Modify: `agent-office/app/db.py:19` (init_db 함수 내) - [ ] **Step 1: Add table creation to init_db** `init_db()` 함수의 마지막 `INSERT OR IGNORE` 루프 직전(line 100, `youtube_research_jobs` 테이블 뒤)에 다음 추가: ```python conn.execute(""" CREATE TABLE IF NOT EXISTS lotto_signals ( id INTEGER PRIMARY KEY AUTOINCREMENT, triggered_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), source TEXT NOT NULL, metric TEXT NOT NULL, value REAL NOT NULL, baseline_mu REAL, baseline_sigma REAL, z_score REAL, fire_level TEXT NOT NULL, notified_at TEXT, payload TEXT ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_ls_triggered ON lotto_signals(triggered_at DESC) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_ls_fire ON lotto_signals(fire_level, notified_at) """) conn.execute(""" CREATE TABLE IF NOT EXISTS lotto_baselines ( metric TEXT PRIMARY KEY, window_values TEXT NOT NULL DEFAULT '[]', mu REAL NOT NULL DEFAULT 0.0, sigma REAL NOT NULL DEFAULT 0.0, last_pushed_draw_no INTEGER, updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) ``` - [ ] **Step 2: Add CRUD helpers at end of db.py** `db.py` 파일 맨 아래에 추가: ```python # --- lotto_signals / lotto_baselines CRUD --- def insert_lotto_signal( source: str, metric: str, value: float, baseline_mu: Optional[float], baseline_sigma: Optional[float], z_score: Optional[float], fire_level: str, payload: Optional[Dict[str, Any]] = None, ) -> int: with _conn() as conn: cur = conn.execute( """ INSERT INTO lotto_signals (source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, json.dumps(payload or {}, ensure_ascii=False), ), ) return cur.lastrowid def mark_signal_notified(signal_id: int) -> None: with _conn() as conn: conn.execute( "UPDATE lotto_signals SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?", (signal_id,), ) def get_recent_lotto_signals(hours: int = 24, min_fire: str = "normal") -> List[Dict[str, Any]]: """지난 N시간 발화 시그널. min_fire='normal'이면 normal+urgent.""" levels = ("urgent",) if min_fire == "urgent" else ("normal", "urgent") placeholders = ",".join("?" * len(levels)) with _conn() as conn: rows = conn.execute( f""" SELECT * FROM lotto_signals WHERE triggered_at >= datetime('now', ?) AND fire_level IN ({placeholders}) ORDER BY triggered_at DESC """, (f"-{int(hours)} hours", *levels), ).fetchall() return [dict(r) for r in rows] def get_signals_history(days: int = 7) -> List[Dict[str, Any]]: """차트/이력 페이지용 — 모든 fire_level 포함.""" with _conn() as conn: rows = conn.execute( """ SELECT * FROM lotto_signals WHERE triggered_at >= datetime('now', ?) ORDER BY triggered_at DESC """, (f"-{int(days)} days",), ).fetchall() return [dict(r) for r in rows] def get_recent_urgent_count(hours: int = 24) -> int: with _conn() as conn: row = conn.execute( """ SELECT COUNT(*) AS c FROM lotto_signals WHERE triggered_at >= datetime('now', ?) AND fire_level = 'urgent' AND notified_at IS NOT NULL """, (f"-{int(hours)} hours",), ).fetchone() return int(row["c"]) if row else 0 def get_last_signal_notification(metric: str, fire_level: str, hours: int) -> Optional[str]: """같은 metric+fire_level이 hours 내에 알림 발송된 마지막 시각. throttle용.""" with _conn() as conn: row = conn.execute( """ SELECT notified_at FROM lotto_signals WHERE metric = ? AND fire_level = ? AND notified_at IS NOT NULL AND notified_at >= datetime('now', ?) ORDER BY notified_at DESC LIMIT 1 """, (metric, fire_level, f"-{int(hours)} hours"), ).fetchone() return row["notified_at"] if row else None def get_baseline(metric: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute( "SELECT * FROM lotto_baselines WHERE metric = ?", (metric,), ).fetchone() if not row: return None d = dict(row) d["window_values"] = json.loads(d["window_values"]) return d def upsert_baseline( metric: str, window_values: List[float], mu: float, sigma: float, last_pushed_draw_no: Optional[int], ) -> None: with _conn() as conn: conn.execute( """ INSERT INTO lotto_baselines (metric, window_values, mu, sigma, last_pushed_draw_no, updated_at) VALUES (?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now')) ON CONFLICT(metric) DO UPDATE SET window_values = excluded.window_values, mu = excluded.mu, sigma = excluded.sigma, last_pushed_draw_no = excluded.last_pushed_draw_no, updated_at = excluded.updated_at """, ( metric, json.dumps(window_values), mu, sigma, last_pushed_draw_no, ), ) def get_all_baselines() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM lotto_baselines ORDER BY metric").fetchall() out = [] for r in rows: d = dict(r) d["window_values"] = json.loads(d["window_values"]) out.append(d) return out ``` - [ ] **Step 3: Smoke test the migration** 새 임시 DB로 마이그레이션이 동작하는지: ```bash cd agent-office && python -c " import os os.environ['AGENT_OFFICE_DB_PATH'] = '/tmp/test_migration.db' from app.db import init_db, insert_lotto_signal, get_baseline, upsert_baseline init_db() sid = insert_lotto_signal('light','sim_signal', 1.5, 1.0, 0.2, 2.5, 'urgent', {'top':[1,2,3]}) print('signal id:', sid) upsert_baseline('sim_signal', [1.0, 1.1, 0.9], 1.0, 0.1, None) print('baseline:', get_baseline('sim_signal')) " rm /tmp/test_migration.db ``` Expected: signal id가 1 출력, baseline dict 출력 (window_values는 list) - [ ] **Step 4: Commit** ```bash git add agent-office/app/db.py git commit -m "feat(lotto-signals): lotto_signals/lotto_baselines 테이블 + CRUD" ``` --- ## Task 4: signal_runner.py — DB I/O + 평가 orchestration **Files:** - Create: `agent-office/app/curator/signal_runner.py` - Create: `agent-office/tests/test_lotto_signal_runner.py` - [ ] **Step 1: Write failing test for signal_runner** ```python # agent-office/tests/test_lotto_signal_runner.py import os import pytest os.environ["AGENT_OFFICE_DB_PATH"] = "/tmp/test_signal_runner.db" from app.curator import signal_runner from app import db @pytest.fixture(autouse=True) def fresh_db(): if os.path.exists("/tmp/test_signal_runner.db"): os.remove("/tmp/test_signal_runner.db") db.init_db() yield if os.path.exists("/tmp/test_signal_runner.db"): os.remove("/tmp/test_signal_runner.db") def test_evaluate_and_persist_cold_start(): """첫 호출은 warmup으로 기록되고 baseline은 비어있다.""" result = signal_runner.evaluate_metric_and_persist( source="light", metric="sim_signal", value=1.5, draw_no=None, z_normal=1.5, z_urgent=2.5, push_to_window=True, ) assert result["fire_level"] == "warmup" assert result["z_score"] is None bl = db.get_baseline("sim_signal") assert bl is not None assert bl["window_values"] == [1.5] def test_evaluate_after_window_filled_normal_fire(): """8회 push 후 정상 운영, 평균 대비 z≥1.5면 normal.""" for v in [1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0]: signal_runner.evaluate_metric_and_persist( source="sim", metric="sim_signal", value=v, draw_no=None, z_normal=1.5, z_urgent=2.5, push_to_window=True, ) # 9번째: window는 [v2..v8, 1.0]이지만 push 후 평가. 평균≈1.0, σ≈0.08 → z 큰 값 result = signal_runner.evaluate_metric_and_persist( source="sim", metric="sim_signal", value=1.20, draw_no=None, z_normal=1.5, z_urgent=2.5, push_to_window=True, ) assert result["fire_level"] in ("normal", "urgent") assert result["z_score"] is not None and result["z_score"] >= 1.5 def test_evaluate_drift_skips_same_draw_push(): """drift는 회차 단위. 같은 회차에서 두 번 호출하면 두 번째는 window push X.""" # 첫 push (draw 1100) signal_runner.evaluate_metric_and_persist( source="sim", metric="drift", value=0.05, draw_no=1100, z_normal=1.5, z_urgent=2.5, push_to_window=True, ) bl_before = db.get_baseline("drift") assert bl_before["window_values"] == [0.05] assert bl_before["last_pushed_draw_no"] == 1100 # 같은 회차 두 번째 호출 — window push X signal_runner.evaluate_metric_and_persist( source="sim", metric="drift", value=0.08, draw_no=1100, z_normal=1.5, z_urgent=2.5, push_to_window=True, ) bl_after = db.get_baseline("drift") assert bl_after["window_values"] == [0.05] # 변경 없음 def test_run_signal_check_aggregates_three_metrics(monkeypatch): """run_signal_check이 3종 메트릭 모두 평가하고 overall fire를 반환.""" # mock service_proxy async def fake_lotto_best(): # 동일한 5종 점수의 균등 후보 20개 return [{"numbers": [1,2,3,4,5,6], "scores": [10,10,10,10,10]}] * 20 async def fake_lotto_strategy_weights(): return {"gap_focus": 0.4, "hot_focus": 0.3, "pair_bias": 0.3} monkeypatch.setattr(signal_runner, "_fetch_best_picks", fake_lotto_best) monkeypatch.setattr(signal_runner, "_fetch_strategy_weights", fake_lotto_strategy_weights) import asyncio out = asyncio.get_event_loop().run_until_complete( signal_runner.run_signal_check(source="light", curate_result=None, current_draw_no=1101) ) assert "overall_fire" in out assert "results" in out assert any(r["metric"] == "sim_signal" for r in out["results"]) assert any(r["metric"] == "drift" for r in out["results"]) # light_check는 confidence 평가 안 함 assert not any(r["metric"] == "confidence" for r in out["results"]) ``` - [ ] **Step 2: Run test to verify it fails** Run: `cd agent-office && pytest tests/test_lotto_signal_runner.py -v` Expected: FAIL `ModuleNotFoundError: No module named 'app.curator.signal_runner'` - [ ] **Step 3: Implement signal_runner.py** ```python # agent-office/app/curator/signal_runner.py """LottoAgent 능동 시그널 — DB I/O + cron 진입점 + 평가 orchestration.""" from __future__ import annotations import logging from typing import Any, Dict, List, Optional from .. import db from .. import service_proxy from . import signals logger = logging.getLogger("agent-office.lotto-signals") # 회차 단위 메트릭 (window push 시 last_pushed_draw_no 비교) DRAW_SCOPED_METRICS = {"drift", "confidence"} def _load_baseline(metric: str) -> signals.AdaptiveBaseline: row = db.get_baseline(metric) if row is None: return signals.AdaptiveBaseline(window=[], window_max=8) return signals.AdaptiveBaseline( window=list(row["window_values"]), window_max=8, last_pushed_draw_no=row.get("last_pushed_draw_no"), ) def _save_baseline(metric: str, bl: signals.AdaptiveBaseline) -> None: db.upsert_baseline( metric=metric, window_values=bl.window, mu=bl.mu, sigma=bl.sigma, last_pushed_draw_no=bl.last_pushed_draw_no, ) def evaluate_metric_and_persist( source: str, metric: str, value: float, draw_no: Optional[int], z_normal: float, z_urgent: float, push_to_window: bool, payload: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """단일 메트릭 평가 → lotto_signals INSERT → baseline 갱신. 회차 단위 메트릭(drift, confidence)은 같은 draw_no에서 window push 생략. """ bl = _load_baseline(metric) # 회차 가드: 같은 회차 재평가 시 window push 생략 do_push = push_to_window if metric in DRAW_SCOPED_METRICS and draw_no is not None: if bl.last_pushed_draw_no == draw_no: do_push = False # 평가는 push 전 baseline 기준 (현재값 vs 과거 분포) z, fire = bl.evaluate(value=value, z_normal=z_normal, z_urgent=z_urgent) if do_push: bl.push(value=value, draw_no=draw_no) _save_baseline(metric, bl) sid = db.insert_lotto_signal( source=source, metric=metric, value=value, baseline_mu=bl.mu if bl.size > 0 else None, baseline_sigma=bl.sigma if bl.size >= 2 else None, z_score=z, fire_level=fire, payload=payload, ) return { "signal_id": sid, "metric": metric, "value": value, "z_score": z, "fire_level": fire, "payload": payload or {}, } # ---------- Service proxy thin wrappers (monkeypatch 대상) ---------- async def _fetch_best_picks() -> List[Dict[str, Any]]: return await service_proxy.lotto_best() async def _fetch_strategy_weights() -> Dict[str, float]: return await service_proxy.lotto_strategy_weights() # ---------- Orchestrator ---------- async def run_signal_check( source: str, z_normal: float = 1.5, z_urgent: float = 2.5, curate_result: Optional[Dict[str, Any]] = None, current_draw_no: Optional[int] = None, ) -> Dict[str, Any]: """cron 진입점. source ∈ {'light', 'sim', 'deep'}. light/sim: Sim Consensus + Strategy Drift 평가 deep: 위 2종 + Confidence (curate_result 필요) """ results: List[Dict[str, Any]] = [] # --- Sim Consensus --- try: best = await _fetch_best_picks() v = signals.sim_consensus_score(best) results.append( evaluate_metric_and_persist( source=source, metric="sim_signal", value=v, draw_no=None, z_normal=z_normal, z_urgent=z_urgent, push_to_window=True, payload={"top_count": min(len(best), 10)}, ) ) except Exception as e: logger.warning(f"sim_consensus 평가 실패: {e}") # --- Strategy Drift (회차 단위) --- try: w_curr = await _fetch_strategy_weights() bl_drift = _load_baseline("drift") # drift는 prev → curr 비교가 필요. baseline window의 마지막 entry는 이전 drift값. # prev_weights는 별도 저장이 필요한데, 간단히 lotto_baselines.payload에 안 두고 # 매 호출마다 GET 후 캐시되지 않은 직전 값과 비교 불가 → drift = 1회차 단위. # 단순화: 이번 호출의 가중치를 baseline에 저장하기 전 직전 저장값을 prev로 사용. prev_payload_row = db.get_baseline("drift_weights_cache") w_prev = prev_payload_row["window_values"] if prev_payload_row else None if w_prev and isinstance(w_prev, list) and len(w_prev) > 0 and isinstance(w_prev[0], dict): # last entry is dict of weights prev_dict = w_prev[-1] drift_value = signals.strategy_drift_score(prev_dict, w_curr) results.append( evaluate_metric_and_persist( source=source, metric="drift", value=drift_value, draw_no=current_draw_no, z_normal=z_normal, z_urgent=z_urgent, push_to_window=True, payload={"weights_now": w_curr, "weights_prev": prev_dict}, ) ) # weights 캐시 갱신 (FIFO 2개만 보관) cache_window = (w_prev or []) + [w_curr] if len(cache_window) > 2: cache_window = cache_window[-2:] db.upsert_baseline( metric="drift_weights_cache", window_values=cache_window, mu=0.0, sigma=0.0, last_pushed_draw_no=current_draw_no, ) except Exception as e: logger.warning(f"strategy_drift 평가 실패: {e}") # --- Confidence (deep_check + curate_result 필수) --- if source == "deep" and curate_result is not None: try: cv = signals.confidence_score(curate_result) if cv is not None: results.append( evaluate_metric_and_persist( source=source, metric="confidence", value=cv, draw_no=current_draw_no, z_normal=z_normal, z_urgent=z_urgent, push_to_window=True, payload={"draw_no": current_draw_no}, ) ) except Exception as e: logger.warning(f"confidence 평가 실패: {e}") overall = signals.decide_overall_fire( [{"metric": r["metric"], "z": r["z_score"], "fire": r["fire_level"]} for r in results] ) return {"overall_fire": overall, "results": results} ``` - [ ] **Step 4: Add lotto_best + lotto_strategy_weights to service_proxy.py** `agent-office/app/service_proxy.py` 파일 맨 아래 추가: ```python async def lotto_best() -> List[Dict[str, Any]]: """GET /api/lotto/best — best_picks 20개 (numbers + scores 5종).""" from .config import LOTTO_BACKEND_URL resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/best") resp.raise_for_status() data = resp.json() items = data.get("items") if isinstance(data, dict) else data return items or [] async def lotto_strategy_weights() -> Dict[str, float]: """GET /api/lotto/strategy/weights — 전략별 가중치 dict.""" from .config import LOTTO_BACKEND_URL resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/strategy/weights") resp.raise_for_status() data = resp.json() weights = data.get("weights") if isinstance(data, dict) else data # API 응답 형태가 [{strategy, weight, ...}] 일 수 있어 dict로 정규화 if isinstance(weights, list): return {item["strategy"]: float(item["weight"]) for item in weights} return {k: float(v) for k, v in (weights or {}).items()} ``` - [ ] **Step 5: Run tests to verify they pass** Run: `cd agent-office && pytest tests/test_lotto_signal_runner.py -v` Expected: All 4 tests PASS - [ ] **Step 6: Commit** ```bash git add agent-office/app/curator/signal_runner.py \ agent-office/app/service_proxy.py \ agent-office/tests/test_lotto_signal_runner.py git commit -m "feat(lotto-signals): signal_runner orchestrator + service_proxy GET helpers" ``` --- # Phase 2 — cron 통합 (텔레그램 X, DB 기록만) ## Task 5: config.py env vars 7개 추가 **Files:** - Modify: `agent-office/app/config.py` (파일 끝) - [ ] **Step 1: Append env var section** `agent-office/app/config.py` 끝에 추가: ```python # Lotto Active Signals LOTTO_SIGNAL_WINDOW = int(os.getenv("LOTTO_SIGNAL_WINDOW", "8")) LOTTO_Z_NORMAL = float(os.getenv("LOTTO_Z_NORMAL", "1.5")) LOTTO_Z_URGENT = float(os.getenv("LOTTO_Z_URGENT", "2.5")) LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9")) LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25")) LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6")) LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3")) ``` - [ ] **Step 2: Smoke check** ```bash cd agent-office && python -c "from app.config import LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR; print(LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR)" ``` Expected: `1.5 9` - [ ] **Step 3: Commit** ```bash git add agent-office/app/config.py git commit -m "feat(lotto-signals): config env vars 7종 추가 (window/임계치/digest/throttle)" ``` --- ## Task 6: LottoAgent에 on_signal_check / on_daily_digest 추가 (텔레그램 발송 X) **Files:** - Modify: `agent-office/app/agents/lotto.py` - [ ] **Step 1: Extend LottoAgent class** `agent-office/app/agents/lotto.py`의 `on_command()`를 다음으로 교체 (signal check 액션 추가): ```python async def on_command(self, action: str, params: dict) -> dict: if action in ("curate_now", "curate_weekly"): return await self._run(source="manual") if action == "status": return {"ok": True, "message": f"{self.state}: {self.state_detail}"} if action in ("signal_check", "light_check", "sim_check", "deep_check"): return await self.run_signal_check(source=action.replace("_check", "") if action != "signal_check" else "light") if action == "daily_digest": return await self.run_daily_digest() return {"ok": False, "message": f"unknown action: {action}"} ``` 그리고 클래스 끝에 신규 메서드 추가: ```python async def run_signal_check(self, source: str = "light") -> dict: """비-LLM 시그널 평가 (light/sim) 또는 deep_check (LLM 호출 후). Phase 2에선 텔레그램 발송 안 함. lotto_signals INSERT만. Phase 3에서 텔레그램 트리거 추가. """ from ..curator.signal_runner import run_signal_check from ..config import LOTTO_Z_NORMAL, LOTTO_Z_URGENT from ..db import add_log if self.state not in ("idle", "reporting"): return {"ok": False, "message": f"busy ({self.state})"} try: curate_result: dict | None = None current_draw_no: int | None = None if source == "deep": # deep_check은 큐레이션을 먼저 수행 (LLM 호출) from ..curator.pipeline import curate_weekly cw = await curate_weekly(source="signal_deep") curate_result = cw.get("payload") or cw # confidence 키 필요 current_draw_no = cw.get("draw_no") outcome = await run_signal_check( source=source, z_normal=LOTTO_Z_NORMAL, z_urgent=LOTTO_Z_URGENT, curate_result=curate_result, current_draw_no=current_draw_no, ) add_log( self.agent_id, f"signal_check({source}) → overall={outcome['overall_fire']} results={len(outcome['results'])}", ) return {"ok": True, **outcome} except Exception as e: add_log(self.agent_id, f"signal_check 예외: {e}", level="error") return {"ok": False, "message": f"{type(e).__name__}: {e}"} async def run_daily_digest(self) -> dict: """Phase 2: 발화 카운트만 반환. Phase 3에서 텔레그램 발송 추가.""" from ..db import get_recent_lotto_signals, add_log sigs = get_recent_lotto_signals(hours=24, min_fire="normal") add_log(self.agent_id, f"daily_digest: 지난 24h 발화 {len(sigs)}건") return {"ok": True, "count": len(sigs), "signals": sigs} ``` - [ ] **Step 2: Smoke import check** ```bash cd agent-office && python -c "from app.agents.lotto import LottoAgent; print(LottoAgent.agent_id)" ``` Expected: `lotto` - [ ] **Step 3: Commit** ```bash git add agent-office/app/agents/lotto.py git commit -m "feat(lotto-signals): LottoAgent.run_signal_check/run_daily_digest (텔레그램 X)" ``` --- ## Task 7: scheduler.py에 cron 4개 추가 **Files:** - Modify: `agent-office/app/scheduler.py` - [ ] **Step 1: Add cron job functions + registrations** 기존 `_run_lotto_schedule` 근처에 함수 4개 추가: ```python async def _run_lotto_light_check(): agent = AGENT_REGISTRY.get("lotto") if agent: await agent.run_signal_check(source="light") async def _run_lotto_sim_check(): agent = AGENT_REGISTRY.get("lotto") if agent: await agent.run_signal_check(source="sim") async def _run_lotto_deep_check(): agent = AGENT_REGISTRY.get("lotto") if agent: await agent.run_signal_check(source="deep") async def _run_lotto_daily_digest(): agent = AGENT_REGISTRY.get("lotto") if agent: await agent.run_daily_digest() ``` 기존 `lotto_curate` cron 등록(line 75) 직후에 4개 추가: ```python scheduler.add_job(_run_lotto_light_check, "cron", hour=9, minute=15, id="lotto_light_check") scheduler.add_job(_run_lotto_sim_check, "cron", minute=15, hour="0,4,8,12,16,20", id="lotto_sim_check") scheduler.add_job(_run_lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check") # digest hour/min은 동적 (env)이지만 cron 등록은 상수로 — 9:25 고정. 변경 필요 시 컨테이너 재시작. scheduler.add_job(_run_lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest") ``` - [ ] **Step 2: Verify import + cron registration** ```bash cd agent-office && python -c " from app.scheduler import _run_lotto_light_check, _run_lotto_sim_check, _run_lotto_deep_check, _run_lotto_daily_digest print('all imports ok') " ``` Expected: `all imports ok` - [ ] **Step 3: Commit** ```bash git add agent-office/app/scheduler.py git commit -m "feat(lotto-signals): scheduler cron 4종 등록 (light/sim/deep/digest)" ``` --- # Phase 3 — 텔레그램 알림 + Throttle + API endpoints ## Task 8: 텔레그램 메시지 폼 단위 테스트 **Files:** - Create: `agent-office/tests/test_lotto_telegram_signal.py` - [ ] **Step 1: Write failing tests** ```python # agent-office/tests/test_lotto_telegram_signal.py from app.notifiers.telegram_lotto import ( _format_urgent_signal, _format_signal_digest, ) def test_urgent_signal_format_basic(): event = { "fire_level": "urgent", "triggered_at": "2026-05-20T07:18:00.000Z", "results": [ {"metric": "sim_signal", "value": 1.84, "z_score": 3.9, "baseline_mu": 1.02, "baseline_sigma": 0.21, "payload": {}}, {"metric": "drift", "value": 0.18, "z_score": 3.0, "baseline_mu": 0.06, "baseline_sigma": 0.04, "payload": {"weights_now": {"gap_focus": 0.5, "hot_focus": 0.5}}}, ], } text = _format_urgent_signal(event) assert "🚨" in text assert "Sim Consensus" in text assert "z=3.9" in text assert "Strategy Drift" in text or "drift" in text.lower() def test_signal_digest_format_with_signals(): digest = { "evaluated": 6, "fired": 2, "signals": [ {"metric": "sim_signal", "fire_level": "normal", "z_score": 1.7, "triggered_at": "2026-05-20T16:18:00Z", "payload": {}}, {"metric": "confidence", "fire_level": "normal", "z_score": 1.6, "triggered_at": "2026-05-20T09:05:00Z", "payload": {}}, ], "weights_trend": {"gap_focus": +0.12, "hot_focus": -0.02, "pair_bias": -0.08}, } text = _format_signal_digest(digest) assert "📊" in text assert "지난 24h" in text assert "z=1.7" in text def test_signal_digest_empty_returns_empty_string(): """발화 0건이면 빈 문자열 → 발송 자체 skip 가능.""" text = _format_signal_digest({"evaluated": 6, "fired": 0, "signals": [], "weights_trend": {}}) assert text == "" ``` - [ ] **Step 2: Run test to verify it fails** Run: `cd agent-office && pytest tests/test_lotto_telegram_signal.py -v` Expected: FAIL `ImportError: cannot import name '_format_urgent_signal'` - [ ] **Step 3: Implement telegram_lotto.py additions** `agent-office/app/notifiers/telegram_lotto.py`에 추가 (파일 끝): ```python # ---------- 능동 시그널 알림 (urgent + digest) ---------- _METRIC_LABEL = { "sim_signal": "Sim Consensus", "drift": "Strategy Drift", "confidence": "Confidence", } def _format_urgent_signal(event: Dict[str, Any]) -> str: """긴급 시그널 텔레그램 메시지 포맷.""" triggered = event.get("triggered_at", "")[:19].replace("T", " ") results = event.get("results", []) fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")] lines = [ "🚨 로또 능동 신호", "", f"[{triggered}]", f"강한 시그널 {len(fired)}종 발화:", ] for r in fired: label = _METRIC_LABEL.get(r["metric"], r["metric"]) v = r.get("value") mu = r.get("baseline_mu") sigma = r.get("baseline_sigma") z = r.get("z_score") if mu is not None and sigma is not None: lines.append(f"• {label} {v:.2f} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}") else: lines.append(f"• {label} {v:.2f}") # drift 페이로드 — 어떤 전략이 변동했는지 한 줄 for r in fired: if r["metric"] == "drift": wn = (r.get("payload") or {}).get("weights_now") or {} wp = (r.get("payload") or {}).get("weights_prev") or {} if wn and wp: diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))} top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2] detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top) lines.append("") lines.append(f"요인: {detail}") break lines.append("") lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)") return "\n".join(lines) def _format_signal_digest(digest: Dict[str, Any]) -> str: """일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호).""" fired = int(digest.get("fired", 0)) if fired == 0: return "" signals_list = digest.get("signals", []) evaluated = digest.get("evaluated", 0) lines = [ "📊 로또 일일 요약 (지난 24h)", "", f"평가 {evaluated}회 / 발화 {fired}회", ] for s in signals_list: label = _METRIC_LABEL.get(s["metric"], s["metric"]) z = s.get("z_score") when = (s.get("triggered_at") or "")[11:16] # HH:MM z_text = f"z={z:.1f}" if z is not None else "z=-" lines.append(f"• {label:14s} {s['fire_level']:6s} {z_text} ({when})") weights_trend = digest.get("weights_trend") or {} if weights_trend: lines += ["", "전략 가중치 추세 (최근 8회 baseline):"] for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])): arrow = "↑" if delta > 0.01 else ("↓" if delta < -0.01 else "→") lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%") return "\n".join(lines) async def send_urgent_signal(event: Dict[str, Any]) -> None: text = _format_urgent_signal(event) try: await send_raw(text) except Exception as e: logger.warning(f"[telegram_lotto] urgent signal send failed: {e}") async def send_signal_summary(digest: Dict[str, Any]) -> None: text = _format_signal_digest(digest) if not text: return # 발화 0건이면 발송 skip try: await send_raw(text) except Exception as e: logger.warning(f"[telegram_lotto] digest send failed: {e}") ``` - [ ] **Step 4: Run tests to verify they pass** Run: `cd agent-office && pytest tests/test_lotto_telegram_signal.py -v` Expected: All 3 tests PASS - [ ] **Step 5: Commit** ```bash git add agent-office/app/notifiers/telegram_lotto.py \ agent-office/tests/test_lotto_telegram_signal.py git commit -m "feat(lotto-signals): 텔레그램 urgent/digest 메시지 포맷" ``` --- ## Task 9: signal_check이 텔레그램 발송 트리거 + throttle 통합 **Files:** - Modify: `agent-office/app/agents/lotto.py` - [ ] **Step 1: Update run_signal_check to fire telegram on urgent** `run_signal_check()`의 마지막 `return {"ok": True, **outcome}` 직전에 추가: ```python # --- Throttle + 텔레그램 urgent 발송 --- from ..config import ( LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX, ) from ..db import ( get_last_signal_notification, get_recent_urgent_count, mark_signal_notified, ) from ..notifiers.telegram_lotto import send_urgent_signal if outcome["overall_fire"] == "urgent": # daily cap 체크 if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX: add_log( self.agent_id, f"urgent daily cap 도달 → normal로 강등 (digest 합류)", level="warning", ) else: # throttle: 어느 메트릭이라도 6시간 이내 같은 fire_level 알림이 있으면 skip blocked = False for r in outcome["results"]: if r["fire_level"] in ("normal", "urgent"): last = get_last_signal_notification( metric=r["metric"], fire_level=r["fire_level"], hours=LOTTO_THROTTLE_HOURS, ) if last: blocked = True break if not blocked: from datetime import datetime, timezone event = { "fire_level": "urgent", "triggered_at": datetime.now(timezone.utc).isoformat(), "results": outcome["results"], } await send_urgent_signal(event) # 모든 발화 시그널에 대해 notified_at 마킹 for r in outcome["results"]: if r["fire_level"] in ("normal", "urgent"): mark_signal_notified(r["signal_id"]) add_log(self.agent_id, f"urgent 텔레그램 발송 완료 (시그널 {len(outcome['results'])}개 마킹)") ``` - [ ] **Step 2: Update run_daily_digest to actually send telegram** `run_daily_digest()`를 다음으로 교체: ```python async def run_daily_digest(self) -> dict: """일일 요약 — 지난 24h normal/urgent 발화를 묶어 텔레그램 1통.""" from ..db import get_recent_lotto_signals, get_signals_history, add_log, get_all_baselines from ..notifiers.telegram_lotto import send_signal_summary sigs = get_recent_lotto_signals(hours=24, min_fire="normal") # 전체 평가 횟수 (noop+warmup 포함) total_24h = get_signals_history(days=1) evaluated = len(total_24h) # weights_trend: drift_weights_cache에서 prev/curr 차이 계산 trend = {} try: from ..db import get_baseline cache = get_baseline("drift_weights_cache") if cache and isinstance(cache["window_values"], list) and len(cache["window_values"]) >= 2: prev_w = cache["window_values"][-2] curr_w = cache["window_values"][-1] trend = { k: curr_w.get(k, 0.0) - prev_w.get(k, 0.0) for k in (set(prev_w) | set(curr_w)) } except Exception as e: add_log(self.agent_id, f"weights_trend 계산 실패: {e}", level="warning") digest = { "evaluated": evaluated, "fired": len(sigs), "signals": sigs, "weights_trend": trend, } await send_signal_summary(digest) add_log(self.agent_id, f"daily_digest 발송: 평가 {evaluated} / 발화 {len(sigs)}") return {"ok": True, **digest} ``` - [ ] **Step 3: Smoke import check** ```bash cd agent-office && python -c " import asyncio from app.agents.lotto import LottoAgent agent = LottoAgent() print('methods:', [m for m in dir(agent) if 'signal' in m or 'digest' in m]) " ``` Expected: `methods: ['run_daily_digest', 'run_signal_check']` - [ ] **Step 4: Commit** ```bash git add agent-office/app/agents/lotto.py git commit -m "feat(lotto-signals): urgent 텔레그램 발송 + throttle/cap + daily digest 전송" ``` --- ## Task 10: FastAPI endpoint 3개 추가 **Files:** - Modify: `agent-office/app/main.py` - [ ] **Step 1: Add endpoints** `agent-office/app/main.py`에서 다른 lotto 관련 endpoint 근처에 추가: ```python @app.get("/api/agent-office/lotto/signals") async def list_lotto_signals(days: int = 7): """시그널 이력 (모든 fire_level).""" from .db import get_signals_history return {"items": get_signals_history(days=days)} @app.get("/api/agent-office/lotto/baselines") async def list_lotto_baselines(): """현재 baseline μ/σ + window 상태.""" from .db import get_all_baselines return {"items": get_all_baselines()} @app.post("/api/agent-office/lotto/signal-check") async def trigger_signal_check(source: str = "light"): """수동 트리거 (디버그·테스트용). source ∈ {light, sim, deep}.""" if source not in ("light", "sim", "deep"): from fastapi import HTTPException raise HTTPException(status_code=400, detail="source must be light/sim/deep") agent = AGENT_REGISTRY.get("lotto") if not agent: from fastapi import HTTPException raise HTTPException(status_code=503, detail="lotto agent not registered") return await agent.run_signal_check(source=source) ``` - [ ] **Step 2: Verify endpoints** ```bash cd agent-office && python -c " from app.main import app routes = [r.path for r in app.routes if 'lotto' in r.path and 'signal' in r.path or 'baseline' in r.path] print(routes) " ``` Expected: 3 routes listed including `/api/agent-office/lotto/signals`, `/baselines`, `/signal-check` - [ ] **Step 3: Commit** ```bash git add agent-office/app/main.py git commit -m "feat(lotto-signals): GET signals/baselines + POST signal-check endpoint" ``` --- ## Task 11: CLAUDE.md 업데이트 (agent-office 섹션) **Files:** - Modify: `web-backend/CLAUDE.md` - [ ] **Step 1: Update agent-office API & scheduler section** `web-backend/CLAUDE.md`의 agent-office API 표에 다음 행 추가 (적절한 위치): ```markdown | GET | `/api/agent-office/lotto/signals?days=7` | 로또 능동 시그널 이력 (모든 fire_level) | | GET | `/api/agent-office/lotto/baselines` | 로또 메트릭별 baseline μ/σ + 윈도우 상태 | | POST | `/api/agent-office/lotto/signal-check?source=light` | 로또 시그널 평가 수동 트리거 (light/sim/deep) | ``` 그리고 "스케줄러 job" 항목에 추가: ```markdown - 09:15 매일 — 로또 light_check (시뮬·전략 가중치 평가) - 매 4시간 :15 — 로또 sim_check (00/04/08/12/16/20시) - 일/수 21:15 — 로또 deep_check (큐레이션 후 confidence 포함 평가) - 09:25 매일 — 로또 daily_digest (지난 24h 발화 텔레그램 1통) ``` 또한 새 환경변수 명시: ```markdown - `LOTTO_SIGNAL_WINDOW`: baseline 윈도우 크기 (기본 8) - `LOTTO_Z_NORMAL`: normal fire 임계치 (기본 1.5) - `LOTTO_Z_URGENT`: urgent fire 임계치 (기본 2.5) - `LOTTO_THROTTLE_HOURS`: 같은 메트릭 재발화 throttle (기본 6시간) - `LOTTO_URGENT_DAILY_MAX`: urgent 하루 cap (기본 3통) ``` - [ ] **Step 2: Verify** ```bash git -C C:/Users/jaeoh/Desktop/workspace/web-backend diff CLAUDE.md | head -50 ``` - [ ] **Step 3: Commit** ```bash git add CLAUDE.md git commit -m "docs(CLAUDE): agent-office 로또 능동 시그널 API/스케줄러/env 추가" ``` --- ## Task 12: NAS 배포 + 24h 가동 검증 (수동) - [ ] **Step 1: git push로 자동 배포** ```bash git push ``` Expected: Gitea webhook → deployer가 `docker compose up -d --build` 실행 → agent-office 재시작. - [ ] **Step 2: 컨테이너 상태 확인** ```bash ssh nas "docker logs agent-office --tail 50" ``` Expected: - `init_db` 로그에서 `lotto_signals`, `lotto_baselines` 테이블 생성 성공 메시지 - scheduler 로그에서 `lotto_light_check`, `lotto_sim_check`, `lotto_deep_check`, `lotto_digest` 4개 job 등록 확인 - [ ] **Step 3: 수동 트리거로 즉시 검증** ```bash curl -X POST "https://gahusb.synology.me/api/agent-office/lotto/signal-check?source=light" ``` Expected: `{"ok": true, "overall_fire": "warmup", "results": [...]}` (첫 호출은 baseline 비어있어 warmup) - [ ] **Step 4: DB에 시그널 들어갔는지 확인** ```bash curl "https://gahusb.synology.me/api/agent-office/lotto/signals?days=1" ``` Expected: `{"items": [{"id": 1, "source": "light", "metric": "sim_signal", "fire_level": "warmup", ...}]}` - [ ] **Step 5: 24시간 가동 후 baseline 누적 확인** 24h 후: ```bash curl "https://gahusb.synology.me/api/agent-office/lotto/baselines" ``` Expected: `sim_signal` 메트릭의 `window_values` 길이가 6~7개 (4h 시뮬 6회 + 1회 light) - [ ] **Step 6: 텔레그램 발송 검증 (자연 발화 또는 강제)** 24h 가동 후에도 자연 발화 없으면 강제 테스트: - `LOTTO_Z_NORMAL=0.1`로 일시 낮춤 → 재배포 → 다음 sim_check에서 발화 - 텔레그램으로 🚨 메시지 도착 확인 후 원복 --- # Self-Review (수행 완료) **1. Spec coverage check** | Spec 섹션 | 구현 task | |---|---| | 4.1 Sim Consensus | Task 1, 2 | | 4.2 Strategy Drift | Task 1, 2 | | 4.3 Confidence | Task 1, 2 | | 4.4 Adaptive Baseline | Task 1, 2 | | 4.5 Trigger × Metric matrix | Task 4 (DRAW_SCOPED_METRICS + source 분기) | | 4.6 Fire 결정 | Task 1 (`decide_overall_fire`) | | 5.1 알림 흐름 | Task 9 | | 5.2 메시지 폼 | Task 8 | | 5.3 Throttle | Task 9 | | 6.1 lotto_signals 테이블 | Task 3 | | 6.2 lotto_baselines 테이블 | Task 3 | | 7. API 3종 | Task 10 | | 8. env vars 7종 | Task 5 | | 9. cron 4종 | Task 7 | 모두 매핑됨. **2. Placeholder scan**: TBD/TODO 없음. 모든 step에 실제 코드 포함. **3. Type consistency**: - `evaluate_metric_and_persist` 시그니처는 Task 4 정의 그대로 Task 6에서 호출 - `_format_urgent_signal(event)` event 구조: `fire_level`, `triggered_at`, `results[]` → Task 8 테스트 & Task 9 호출 일치 - `digest` 구조: `evaluated`, `fired`, `signals`, `weights_trend` → Task 8 테스트 & Task 9 호출 일치 - DB CRUD 함수명 (`insert_lotto_signal`, `get_recent_lotto_signals`, `mark_signal_notified` 등) — Task 3 정의와 Task 9 호출 일치 이슈 없음. --- # 비목표 (Out of scope) - Layer B 자동 시뮬 강도 조절 (v2) - 텔레그램 인라인 키보드 / 사용자 피드백 루프 (v2) - 핫넘버/콜드넘버 시그널 (노이즈 위험으로 v1 제외) - 프론트 `/lotto/agent` UI (web-ui 별도 PR) - Hot/Cold rotation 메트릭 추가