Files
web-page-backend/docs/superpowers/plans/2026-05-20-lotto-active-agent.md
gahusb b4e873b5b0 docs(plan): LottoAgent 능동성 확장 구현 plan (12 tasks, Phase 1-3)
Why: spec (2026-05-20-lotto-active-agent-design.md)을 12개 atomic task
(TDD: 테스트→fail→구현→pass→commit)로 분해. 24h 가동 검증 task 포함.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 02:26:49 +09:00

1652 lines
57 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# LottoAgent 능동성 확장 Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** LottoAgent에 다중 트리거 + 적응형 시그널 모니터링 + 텔레그램 능동 알림(긴급/일일 요약) 기능 추가
**Architecture:** agent-office 단일 컨테이너 안에 `curator/signals.py` 모듈 신설 + agent_office.db에 2개 테이블 추가. lotto-lab은 read-only GET API만 소비, 변경 없음. cron 4개로 light / sim / deep / digest 분리.
**Tech Stack:** Python 3.12, FastAPI, APScheduler, SQLite, httpx, pytest, pytest-asyncio
**Spec:** `docs/superpowers/specs/2026-05-20-lotto-active-agent-design.md`
---
## File Structure
| 경로 | 작업 | 책임 |
|---|---|---|
| `agent-office/app/curator/signals.py` | Create | 3종 메트릭 평가 + adaptive baseline (순수 함수) |
| `agent-office/app/curator/signal_runner.py` | Create | DB I/O + cron 진입점 + 알림 결정 |
| `agent-office/app/db.py` | Modify | `lotto_signals`, `lotto_baselines` 테이블 + CRUD |
| `agent-office/app/service_proxy.py` | Modify | `lotto_best()`, `lotto_strategy_weights()` 추가 |
| `agent-office/app/agents/lotto.py` | Modify | `on_signal_check()`, `on_daily_digest()` 추가 |
| `agent-office/app/scheduler.py` | Modify | cron 4개 등록 |
| `agent-office/app/notifiers/telegram_lotto.py` | Modify | `send_urgent_signal()`, `send_signal_summary()` |
| `agent-office/app/main.py` | Modify | endpoint 3개 |
| `agent-office/app/config.py` | Modify | env vars 7개 |
| `agent-office/tests/test_lotto_signals.py` | Create | signals.py 단위 테스트 |
| `agent-office/tests/test_lotto_signal_runner.py` | Create | signal_runner DB I/O 통합 테스트 |
| `agent-office/tests/test_lotto_telegram_signal.py` | Create | 텔레그램 메시지 포맷 테스트 |
| `web-backend/CLAUDE.md` | Modify | agent-office 섹션 갱신 |
---
# Phase 1 — 순수 함수 + DB 마이그레이션
## Task 1: signals.py 메트릭 함수 단위 테스트
**Files:**
- Create: `agent-office/tests/test_lotto_signals.py`
- [ ] **Step 1: Write failing tests for Sim Consensus**
```python
# agent-office/tests/test_lotto_signals.py
import math
import pytest
from app.curator import signals
def test_sim_consensus_top10_geomean():
"""top-10 consensus 평균이 기하평균 기반인지."""
best_picks = [
{"scores": [10, 10, 10, 10, 10]}, # high & uniform
{"scores": [9, 9, 9, 9, 9]},
{"scores": [8, 8, 8, 8, 8]},
{"scores": [7, 7, 7, 7, 7]},
{"scores": [6, 6, 6, 6, 6]},
{"scores": [5, 5, 5, 5, 5]},
{"scores": [4, 4, 4, 4, 4]},
{"scores": [3, 3, 3, 3, 3]},
{"scores": [2, 2, 2, 2, 2]},
{"scores": [1, 1, 1, 1, 1]}, # top 10
{"scores": [0, 0, 0, 0, 0]}, # bottom 10
] * 1 + [{"scores": [0, 0, 0, 0, 0]}] * 10
# 처음 10개가 top, 다 normalize 후 consensus 동일하니까 top10 평균은 명확
result = signals.sim_consensus_score(best_picks)
assert 0.0 <= result <= 1.0
# uniform 분포에선 top10 평균이 단조 감소 분포의 상반부 평균과 같아야 함
assert result > 0.4
def test_sim_consensus_geomean_penalizes_imbalance():
"""5종 중 한 종만 폭주하는 outlier 후보는 균형 후보보다 작아야 한다."""
balanced = [{"scores": [5, 5, 5, 5, 5]}] * 20
imbalanced = [{"scores": [25, 0, 0, 0, 0]}] * 20 # 평균은 같지만 한 점수만 폭주
s_balanced = signals.sim_consensus_score(balanced)
s_imbalanced = signals.sim_consensus_score(imbalanced)
# imbalanced는 normalize 후 한 차원만 1.0, 나머지 0.0 → 기하평균 0
assert s_imbalanced < s_balanced
def test_strategy_drift_score():
"""drift = 전략별 가중치 변화 절댓값 합."""
w_prev = {"gap_focus": 0.30, "hot_focus": 0.25, "pair_bias": 0.45}
w_curr = {"gap_focus": 0.40, "hot_focus": 0.20, "pair_bias": 0.40}
# |0.10| + |-0.05| + |-0.05| = 0.20
result = signals.strategy_drift_score(w_prev, w_curr)
assert abs(result - 0.20) < 1e-9
def test_strategy_drift_new_strategy_appears():
"""이전에 없던 전략이 등장하면 그 가중치 전체가 drift에 가산."""
w_prev = {"gap_focus": 0.5, "hot_focus": 0.5}
w_curr = {"gap_focus": 0.4, "hot_focus": 0.4, "newbie": 0.2}
# |0.5-0.4| + |0.5-0.4| + |0-0.2| = 0.4
result = signals.strategy_drift_score(w_prev, w_curr)
assert abs(result - 0.4) < 1e-9
def test_confidence_score_passthrough():
"""confidence는 큐레이션 결과의 값 그대로 (0~1 clamp 확인)."""
assert signals.confidence_score({"confidence": 0.85}) == 0.85
assert signals.confidence_score({"confidence": 1.2}) == 1.0 # clamp
assert signals.confidence_score({"confidence": -0.1}) == 0.0
assert signals.confidence_score({}) is None
def test_adaptive_baseline_cold_start():
"""window 크기 < 4 → warmup, z=None."""
bl = signals.AdaptiveBaseline(window=[1.0, 1.1, 0.9], window_max=8)
z, fire = bl.evaluate(value=1.5, z_normal=1.5, z_urgent=2.5)
assert fire == "warmup"
assert z is None
def test_adaptive_baseline_preparing():
"""window 4~7 → 보수적 임계치 z=2.0."""
bl = signals.AdaptiveBaseline(window=[1.0, 1.0, 1.0, 1.0], window_max=8)
# value=1.0 → mu=1.0, sigma=0 → div0 처리 필요. preparing 단계라 임계치는 보수적.
z, fire = bl.evaluate(value=3.0, z_normal=1.5, z_urgent=2.5)
# sigma=0인 경우 처리 — value > mu면 무한대로 보고 urgent? 또는 z계산 불가 → warmup?
# 결정: sigma == 0이면 (window 분산 없음) z=inf 취급, value > mu면 urgent로
assert fire in ("normal", "urgent")
def test_adaptive_baseline_normal_window_full():
"""window 8 풀, value가 평균보다 1.5σ 이상이면 normal."""
bl = signals.AdaptiveBaseline(
window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0],
window_max=8,
)
# μ ≈ 1.0, σ ≈ 0.08 → z=1.5는 value ≈ 1.12
z, fire = bl.evaluate(value=1.20, z_normal=1.5, z_urgent=2.5)
assert fire == "normal"
assert z is not None and z >= 1.5
def test_adaptive_baseline_urgent():
"""z >= 2.5 → urgent."""
bl = signals.AdaptiveBaseline(
window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0],
window_max=8,
)
z, fire = bl.evaluate(value=2.0, z_normal=1.5, z_urgent=2.5)
assert fire == "urgent"
def test_adaptive_baseline_push_updates_window():
"""push 시 FIFO 동작."""
bl = signals.AdaptiveBaseline(window=[1, 2, 3, 4, 5, 6, 7, 8], window_max=8)
bl.push(9.0)
assert bl.window == [2, 3, 4, 5, 6, 7, 8, 9.0]
def test_decide_fire_level_combination():
"""2개 이상 normal 발화 → urgent."""
# 2개 normal z ≥ 1.5
sigs = [
{"metric": "sim", "z": 1.6, "fire": "normal"},
{"metric": "drift", "z": 1.7, "fire": "normal"},
{"metric": "conf", "z": 0.5, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs) == "urgent"
# 1개 normal
sigs2 = [
{"metric": "sim", "z": 1.6, "fire": "normal"},
{"metric": "drift", "z": 0.3, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs2) == "normal"
# 단일 극값 z ≥ 2.5 → urgent
sigs3 = [
{"metric": "sim", "z": 3.0, "fire": "urgent"},
{"metric": "drift", "z": 0.2, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs3) == "urgent"
# 전부 noop
sigs4 = [{"metric": "sim", "z": 0.5, "fire": "noop"}]
assert signals.decide_overall_fire(sigs4) == "noop"
```
- [ ] **Step 2: Run test to verify it fails**
Run: `cd agent-office && pytest tests/test_lotto_signals.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'app.curator.signals'`
- [ ] **Step 3: Commit failing tests**
```bash
git add agent-office/tests/test_lotto_signals.py
git commit -m "test(lotto-signals): 메트릭 함수·adaptive baseline 단위 테스트"
```
---
## Task 2: signals.py 구현
**Files:**
- Create: `agent-office/app/curator/signals.py`
- [ ] **Step 1: Implement signals module**
```python
# agent-office/app/curator/signals.py
"""LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수).
DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list.
signal_runner.py에서 DB 연동 + cron 진입점 담당.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from statistics import mean, pstdev
from typing import Any, Dict, List, Optional, Tuple
# ---------- Metric: Sim Consensus ----------
def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]:
"""20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수."""
if not picks:
return []
n_metrics = len(picks[0]["scores"])
columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)]
norms_per_col = []
for col in columns:
lo, hi = min(col), max(col)
rng = hi - lo
if rng == 0:
norms_per_col.append([0.5] * len(col)) # 모두 동일하면 중립 0.5
else:
norms_per_col.append([(v - lo) / rng for v in col])
# 전치: 후보별 정규화 점수 리스트
return [
[norms_per_col[k][i] for k in range(n_metrics)]
for i in range(len(picks))
]
def _geomean(values: List[float]) -> float:
"""기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티)."""
if not values:
return 0.0
if any(v <= 0 for v in values):
return 0.0
log_sum = sum(math.log(v) for v in values)
return math.exp(log_sum / len(values))
def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float:
"""top-10 후보의 기하평균 consensus 평균.
Args:
best_picks: [{"numbers": [...], "scores": [s1, s2, s3, s4, s5]}, ...]
"""
if not best_picks:
return 0.0
normalized = _normalize_columns(best_picks)
consensus = [_geomean(scores) for scores in normalized]
consensus.sort(reverse=True)
top = consensus[:10] if len(consensus) >= 10 else consensus
return mean(top) if top else 0.0
# ---------- Metric: Strategy Drift ----------
def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float:
"""가중치 변화 절댓값 합. 신규/소멸 전략도 가산."""
keys = set(prev) | set(curr)
return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys)
# ---------- Metric: Confidence ----------
def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]:
"""큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None."""
if "confidence" not in curate_result:
return None
v = float(curate_result["confidence"])
return max(0.0, min(1.0, v))
# ---------- Adaptive Baseline ----------
@dataclass
class AdaptiveBaseline:
window: List[float] = field(default_factory=list)
window_max: int = 8
last_pushed_draw_no: Optional[int] = None # 회차 단위 메트릭 중복 push 방지
@property
def size(self) -> int:
return len(self.window)
@property
def mu(self) -> float:
return mean(self.window) if self.window else 0.0
@property
def sigma(self) -> float:
# population stdev (8개 윈도우라 ddof=0이 안정)
return pstdev(self.window) if len(self.window) >= 2 else 0.0
def push(self, value: float, draw_no: Optional[int] = None) -> None:
"""FIFO push. window_max 초과 시 가장 오래된 값 제거."""
self.window.append(float(value))
if len(self.window) > self.window_max:
self.window = self.window[-self.window_max:]
if draw_no is not None:
self.last_pushed_draw_no = draw_no
def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]:
"""z-score 계산 + fire_level 판정.
Returns:
(z_score, fire_level) — z_score는 cold start/warmup이면 None.
fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'}
"""
if self.size < 4:
return None, "warmup"
# 보수적 임계치 (window 4~7)
z_normal_eff = 2.0 if self.size < self.window_max else z_normal
z_urgent_eff = z_urgent
if self.sigma == 0:
# 분산 없음 — value가 평균을 크게 벗어나면 urgent로 분류, 아니면 noop
return (None, "urgent") if value > self.mu else (None, "noop")
z = (value - self.mu) / self.sigma
if z >= z_urgent_eff:
return z, "urgent"
if z >= z_normal_eff:
return z, "normal"
return z, "noop"
# ---------- Combined fire decision ----------
def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str:
"""3종 시그널을 종합해 전체 fire_level 결정.
Args:
signal_results: [{"metric": str, "z": float|None, "fire": str}, ...]
Returns:
'noop' | 'normal' | 'urgent'
"""
fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")]
if any(s["fire"] == "urgent" for s in fires):
return "urgent"
if len(fires) >= 2:
return "urgent" # 2개 이상 normal → urgent로 승격
if len(fires) == 1:
return "normal"
return "noop"
```
- [ ] **Step 2: Run tests to verify they pass**
Run: `cd agent-office && pytest tests/test_lotto_signals.py -v`
Expected: All 9 tests PASS
- [ ] **Step 3: Commit**
```bash
git add agent-office/app/curator/signals.py
git commit -m "feat(lotto-signals): 메트릭 함수·adaptive baseline 순수함수 구현"
```
---
## Task 3: DB 마이그레이션 (lotto_signals + lotto_baselines)
**Files:**
- Modify: `agent-office/app/db.py:19` (init_db 함수 내)
- [ ] **Step 1: Add table creation to init_db**
`init_db()` 함수의 마지막 `INSERT OR IGNORE` 루프 직전(line 100, `youtube_research_jobs` 테이블 뒤)에 다음 추가:
```python
conn.execute("""
CREATE TABLE IF NOT EXISTS lotto_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
triggered_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
source TEXT NOT NULL,
metric TEXT NOT NULL,
value REAL NOT NULL,
baseline_mu REAL,
baseline_sigma REAL,
z_score REAL,
fire_level TEXT NOT NULL,
notified_at TEXT,
payload TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_ls_triggered
ON lotto_signals(triggered_at DESC)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_ls_fire
ON lotto_signals(fire_level, notified_at)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS lotto_baselines (
metric TEXT PRIMARY KEY,
window_values TEXT NOT NULL DEFAULT '[]',
mu REAL NOT NULL DEFAULT 0.0,
sigma REAL NOT NULL DEFAULT 0.0,
last_pushed_draw_no INTEGER,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
```
- [ ] **Step 2: Add CRUD helpers at end of db.py**
`db.py` 파일 맨 아래에 추가:
```python
# --- lotto_signals / lotto_baselines CRUD ---
def insert_lotto_signal(
source: str,
metric: str,
value: float,
baseline_mu: Optional[float],
baseline_sigma: Optional[float],
z_score: Optional[float],
fire_level: str,
payload: Optional[Dict[str, Any]] = None,
) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO lotto_signals
(source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, payload)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
source, metric, value,
baseline_mu, baseline_sigma, z_score, fire_level,
json.dumps(payload or {}, ensure_ascii=False),
),
)
return cur.lastrowid
def mark_signal_notified(signal_id: int) -> None:
with _conn() as conn:
conn.execute(
"UPDATE lotto_signals SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?",
(signal_id,),
)
def get_recent_lotto_signals(hours: int = 24, min_fire: str = "normal") -> List[Dict[str, Any]]:
"""지난 N시간 발화 시그널. min_fire='normal'이면 normal+urgent."""
levels = ("urgent",) if min_fire == "urgent" else ("normal", "urgent")
placeholders = ",".join("?" * len(levels))
with _conn() as conn:
rows = conn.execute(
f"""
SELECT * FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
AND fire_level IN ({placeholders})
ORDER BY triggered_at DESC
""",
(f"-{int(hours)} hours", *levels),
).fetchall()
return [dict(r) for r in rows]
def get_signals_history(days: int = 7) -> List[Dict[str, Any]]:
"""차트/이력 페이지용 — 모든 fire_level 포함."""
with _conn() as conn:
rows = conn.execute(
"""
SELECT * FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
ORDER BY triggered_at DESC
""",
(f"-{int(days)} days",),
).fetchall()
return [dict(r) for r in rows]
def get_recent_urgent_count(hours: int = 24) -> int:
with _conn() as conn:
row = conn.execute(
"""
SELECT COUNT(*) AS c FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
AND fire_level = 'urgent'
AND notified_at IS NOT NULL
""",
(f"-{int(hours)} hours",),
).fetchone()
return int(row["c"]) if row else 0
def get_last_signal_notification(metric: str, fire_level: str, hours: int) -> Optional[str]:
"""같은 metric+fire_level이 hours 내에 알림 발송된 마지막 시각. throttle용."""
with _conn() as conn:
row = conn.execute(
"""
SELECT notified_at FROM lotto_signals
WHERE metric = ?
AND fire_level = ?
AND notified_at IS NOT NULL
AND notified_at >= datetime('now', ?)
ORDER BY notified_at DESC LIMIT 1
""",
(metric, fire_level, f"-{int(hours)} hours"),
).fetchone()
return row["notified_at"] if row else None
def get_baseline(metric: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM lotto_baselines WHERE metric = ?",
(metric,),
).fetchone()
if not row:
return None
d = dict(row)
d["window_values"] = json.loads(d["window_values"])
return d
def upsert_baseline(
metric: str,
window_values: List[float],
mu: float,
sigma: float,
last_pushed_draw_no: Optional[int],
) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO lotto_baselines
(metric, window_values, mu, sigma, last_pushed_draw_no, updated_at)
VALUES (?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
ON CONFLICT(metric) DO UPDATE SET
window_values = excluded.window_values,
mu = excluded.mu,
sigma = excluded.sigma,
last_pushed_draw_no = excluded.last_pushed_draw_no,
updated_at = excluded.updated_at
""",
(
metric,
json.dumps(window_values),
mu, sigma, last_pushed_draw_no,
),
)
def get_all_baselines() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM lotto_baselines ORDER BY metric").fetchall()
out = []
for r in rows:
d = dict(r)
d["window_values"] = json.loads(d["window_values"])
out.append(d)
return out
```
- [ ] **Step 3: Smoke test the migration**
새 임시 DB로 마이그레이션이 동작하는지:
```bash
cd agent-office && python -c "
import os
os.environ['AGENT_OFFICE_DB_PATH'] = '/tmp/test_migration.db'
from app.db import init_db, insert_lotto_signal, get_baseline, upsert_baseline
init_db()
sid = insert_lotto_signal('light','sim_signal', 1.5, 1.0, 0.2, 2.5, 'urgent', {'top':[1,2,3]})
print('signal id:', sid)
upsert_baseline('sim_signal', [1.0, 1.1, 0.9], 1.0, 0.1, None)
print('baseline:', get_baseline('sim_signal'))
"
rm /tmp/test_migration.db
```
Expected: signal id가 1 출력, baseline dict 출력 (window_values는 list)
- [ ] **Step 4: Commit**
```bash
git add agent-office/app/db.py
git commit -m "feat(lotto-signals): lotto_signals/lotto_baselines 테이블 + CRUD"
```
---
## Task 4: signal_runner.py — DB I/O + 평가 orchestration
**Files:**
- Create: `agent-office/app/curator/signal_runner.py`
- Create: `agent-office/tests/test_lotto_signal_runner.py`
- [ ] **Step 1: Write failing test for signal_runner**
```python
# agent-office/tests/test_lotto_signal_runner.py
import os
import pytest
os.environ["AGENT_OFFICE_DB_PATH"] = "/tmp/test_signal_runner.db"
from app.curator import signal_runner
from app import db
@pytest.fixture(autouse=True)
def fresh_db():
if os.path.exists("/tmp/test_signal_runner.db"):
os.remove("/tmp/test_signal_runner.db")
db.init_db()
yield
if os.path.exists("/tmp/test_signal_runner.db"):
os.remove("/tmp/test_signal_runner.db")
def test_evaluate_and_persist_cold_start():
"""첫 호출은 warmup으로 기록되고 baseline은 비어있다."""
result = signal_runner.evaluate_metric_and_persist(
source="light",
metric="sim_signal",
value=1.5,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
assert result["fire_level"] == "warmup"
assert result["z_score"] is None
bl = db.get_baseline("sim_signal")
assert bl is not None
assert bl["window_values"] == [1.5]
def test_evaluate_after_window_filled_normal_fire():
"""8회 push 후 정상 운영, 평균 대비 z≥1.5면 normal."""
for v in [1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0]:
signal_runner.evaluate_metric_and_persist(
source="sim",
metric="sim_signal",
value=v,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
# 9번째: window는 [v2..v8, 1.0]이지만 push 후 평가. 평균≈1.0, σ≈0.08 → z 큰 값
result = signal_runner.evaluate_metric_and_persist(
source="sim",
metric="sim_signal",
value=1.20,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
assert result["fire_level"] in ("normal", "urgent")
assert result["z_score"] is not None and result["z_score"] >= 1.5
def test_evaluate_drift_skips_same_draw_push():
"""drift는 회차 단위. 같은 회차에서 두 번 호출하면 두 번째는 window push X."""
# 첫 push (draw 1100)
signal_runner.evaluate_metric_and_persist(
source="sim", metric="drift", value=0.05, draw_no=1100,
z_normal=1.5, z_urgent=2.5, push_to_window=True,
)
bl_before = db.get_baseline("drift")
assert bl_before["window_values"] == [0.05]
assert bl_before["last_pushed_draw_no"] == 1100
# 같은 회차 두 번째 호출 — window push X
signal_runner.evaluate_metric_and_persist(
source="sim", metric="drift", value=0.08, draw_no=1100,
z_normal=1.5, z_urgent=2.5, push_to_window=True,
)
bl_after = db.get_baseline("drift")
assert bl_after["window_values"] == [0.05] # 변경 없음
def test_run_signal_check_aggregates_three_metrics(monkeypatch):
"""run_signal_check이 3종 메트릭 모두 평가하고 overall fire를 반환."""
# mock service_proxy
async def fake_lotto_best():
# 동일한 5종 점수의 균등 후보 20개
return [{"numbers": [1,2,3,4,5,6], "scores": [10,10,10,10,10]}] * 20
async def fake_lotto_strategy_weights():
return {"gap_focus": 0.4, "hot_focus": 0.3, "pair_bias": 0.3}
monkeypatch.setattr(signal_runner, "_fetch_best_picks", fake_lotto_best)
monkeypatch.setattr(signal_runner, "_fetch_strategy_weights", fake_lotto_strategy_weights)
import asyncio
out = asyncio.get_event_loop().run_until_complete(
signal_runner.run_signal_check(source="light", curate_result=None, current_draw_no=1101)
)
assert "overall_fire" in out
assert "results" in out
assert any(r["metric"] == "sim_signal" for r in out["results"])
assert any(r["metric"] == "drift" for r in out["results"])
# light_check는 confidence 평가 안 함
assert not any(r["metric"] == "confidence" for r in out["results"])
```
- [ ] **Step 2: Run test to verify it fails**
Run: `cd agent-office && pytest tests/test_lotto_signal_runner.py -v`
Expected: FAIL `ModuleNotFoundError: No module named 'app.curator.signal_runner'`
- [ ] **Step 3: Implement signal_runner.py**
```python
# agent-office/app/curator/signal_runner.py
"""LottoAgent 능동 시그널 — DB I/O + cron 진입점 + 평가 orchestration."""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from .. import db
from .. import service_proxy
from . import signals
logger = logging.getLogger("agent-office.lotto-signals")
# 회차 단위 메트릭 (window push 시 last_pushed_draw_no 비교)
DRAW_SCOPED_METRICS = {"drift", "confidence"}
def _load_baseline(metric: str) -> signals.AdaptiveBaseline:
row = db.get_baseline(metric)
if row is None:
return signals.AdaptiveBaseline(window=[], window_max=8)
return signals.AdaptiveBaseline(
window=list(row["window_values"]),
window_max=8,
last_pushed_draw_no=row.get("last_pushed_draw_no"),
)
def _save_baseline(metric: str, bl: signals.AdaptiveBaseline) -> None:
db.upsert_baseline(
metric=metric,
window_values=bl.window,
mu=bl.mu,
sigma=bl.sigma,
last_pushed_draw_no=bl.last_pushed_draw_no,
)
def evaluate_metric_and_persist(
source: str,
metric: str,
value: float,
draw_no: Optional[int],
z_normal: float,
z_urgent: float,
push_to_window: bool,
payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""단일 메트릭 평가 → lotto_signals INSERT → baseline 갱신.
회차 단위 메트릭(drift, confidence)은 같은 draw_no에서 window push 생략.
"""
bl = _load_baseline(metric)
# 회차 가드: 같은 회차 재평가 시 window push 생략
do_push = push_to_window
if metric in DRAW_SCOPED_METRICS and draw_no is not None:
if bl.last_pushed_draw_no == draw_no:
do_push = False
# 평가는 push 전 baseline 기준 (현재값 vs 과거 분포)
z, fire = bl.evaluate(value=value, z_normal=z_normal, z_urgent=z_urgent)
if do_push:
bl.push(value=value, draw_no=draw_no)
_save_baseline(metric, bl)
sid = db.insert_lotto_signal(
source=source,
metric=metric,
value=value,
baseline_mu=bl.mu if bl.size > 0 else None,
baseline_sigma=bl.sigma if bl.size >= 2 else None,
z_score=z,
fire_level=fire,
payload=payload,
)
return {
"signal_id": sid,
"metric": metric,
"value": value,
"z_score": z,
"fire_level": fire,
"payload": payload or {},
}
# ---------- Service proxy thin wrappers (monkeypatch 대상) ----------
async def _fetch_best_picks() -> List[Dict[str, Any]]:
return await service_proxy.lotto_best()
async def _fetch_strategy_weights() -> Dict[str, float]:
return await service_proxy.lotto_strategy_weights()
# ---------- Orchestrator ----------
async def run_signal_check(
source: str,
z_normal: float = 1.5,
z_urgent: float = 2.5,
curate_result: Optional[Dict[str, Any]] = None,
current_draw_no: Optional[int] = None,
) -> Dict[str, Any]:
"""cron 진입점. source ∈ {'light', 'sim', 'deep'}.
light/sim: Sim Consensus + Strategy Drift 평가
deep: 위 2종 + Confidence (curate_result 필요)
"""
results: List[Dict[str, Any]] = []
# --- Sim Consensus ---
try:
best = await _fetch_best_picks()
v = signals.sim_consensus_score(best)
results.append(
evaluate_metric_and_persist(
source=source, metric="sim_signal",
value=v, draw_no=None,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"top_count": min(len(best), 10)},
)
)
except Exception as e:
logger.warning(f"sim_consensus 평가 실패: {e}")
# --- Strategy Drift (회차 단위) ---
try:
w_curr = await _fetch_strategy_weights()
bl_drift = _load_baseline("drift")
# drift는 prev → curr 비교가 필요. baseline window의 마지막 entry는 이전 drift값.
# prev_weights는 별도 저장이 필요한데, 간단히 lotto_baselines.payload에 안 두고
# 매 호출마다 GET 후 캐시되지 않은 직전 값과 비교 불가 → drift = 1회차 단위.
# 단순화: 이번 호출의 가중치를 baseline에 저장하기 전 직전 저장값을 prev로 사용.
prev_payload_row = db.get_baseline("drift_weights_cache")
w_prev = prev_payload_row["window_values"] if prev_payload_row else None
if w_prev and isinstance(w_prev, list) and len(w_prev) > 0 and isinstance(w_prev[0], dict):
# last entry is dict of weights
prev_dict = w_prev[-1]
drift_value = signals.strategy_drift_score(prev_dict, w_curr)
results.append(
evaluate_metric_and_persist(
source=source, metric="drift",
value=drift_value, draw_no=current_draw_no,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"weights_now": w_curr, "weights_prev": prev_dict},
)
)
# weights 캐시 갱신 (FIFO 2개만 보관)
cache_window = (w_prev or []) + [w_curr]
if len(cache_window) > 2:
cache_window = cache_window[-2:]
db.upsert_baseline(
metric="drift_weights_cache",
window_values=cache_window,
mu=0.0, sigma=0.0,
last_pushed_draw_no=current_draw_no,
)
except Exception as e:
logger.warning(f"strategy_drift 평가 실패: {e}")
# --- Confidence (deep_check + curate_result 필수) ---
if source == "deep" and curate_result is not None:
try:
cv = signals.confidence_score(curate_result)
if cv is not None:
results.append(
evaluate_metric_and_persist(
source=source, metric="confidence",
value=cv, draw_no=current_draw_no,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"draw_no": current_draw_no},
)
)
except Exception as e:
logger.warning(f"confidence 평가 실패: {e}")
overall = signals.decide_overall_fire(
[{"metric": r["metric"], "z": r["z_score"], "fire": r["fire_level"]} for r in results]
)
return {"overall_fire": overall, "results": results}
```
- [ ] **Step 4: Add lotto_best + lotto_strategy_weights to service_proxy.py**
`agent-office/app/service_proxy.py` 파일 맨 아래 추가:
```python
async def lotto_best() -> List[Dict[str, Any]]:
"""GET /api/lotto/best — best_picks 20개 (numbers + scores 5종)."""
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/best")
resp.raise_for_status()
data = resp.json()
items = data.get("items") if isinstance(data, dict) else data
return items or []
async def lotto_strategy_weights() -> Dict[str, float]:
"""GET /api/lotto/strategy/weights — 전략별 가중치 dict."""
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/strategy/weights")
resp.raise_for_status()
data = resp.json()
weights = data.get("weights") if isinstance(data, dict) else data
# API 응답 형태가 [{strategy, weight, ...}] 일 수 있어 dict로 정규화
if isinstance(weights, list):
return {item["strategy"]: float(item["weight"]) for item in weights}
return {k: float(v) for k, v in (weights or {}).items()}
```
- [ ] **Step 5: Run tests to verify they pass**
Run: `cd agent-office && pytest tests/test_lotto_signal_runner.py -v`
Expected: All 4 tests PASS
- [ ] **Step 6: Commit**
```bash
git add agent-office/app/curator/signal_runner.py \
agent-office/app/service_proxy.py \
agent-office/tests/test_lotto_signal_runner.py
git commit -m "feat(lotto-signals): signal_runner orchestrator + service_proxy GET helpers"
```
---
# Phase 2 — cron 통합 (텔레그램 X, DB 기록만)
## Task 5: config.py env vars 7개 추가
**Files:**
- Modify: `agent-office/app/config.py` (파일 끝)
- [ ] **Step 1: Append env var section**
`agent-office/app/config.py` 끝에 추가:
```python
# Lotto Active Signals
LOTTO_SIGNAL_WINDOW = int(os.getenv("LOTTO_SIGNAL_WINDOW", "8"))
LOTTO_Z_NORMAL = float(os.getenv("LOTTO_Z_NORMAL", "1.5"))
LOTTO_Z_URGENT = float(os.getenv("LOTTO_Z_URGENT", "2.5"))
LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9"))
LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25"))
LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6"))
LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3"))
```
- [ ] **Step 2: Smoke check**
```bash
cd agent-office && python -c "from app.config import LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR; print(LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR)"
```
Expected: `1.5 9`
- [ ] **Step 3: Commit**
```bash
git add agent-office/app/config.py
git commit -m "feat(lotto-signals): config env vars 7종 추가 (window/임계치/digest/throttle)"
```
---
## Task 6: LottoAgent에 on_signal_check / on_daily_digest 추가 (텔레그램 발송 X)
**Files:**
- Modify: `agent-office/app/agents/lotto.py`
- [ ] **Step 1: Extend LottoAgent class**
`agent-office/app/agents/lotto.py``on_command()`를 다음으로 교체 (signal check 액션 추가):
```python
async def on_command(self, action: str, params: dict) -> dict:
if action in ("curate_now", "curate_weekly"):
return await self._run(source="manual")
if action == "status":
return {"ok": True, "message": f"{self.state}: {self.state_detail}"}
if action in ("signal_check", "light_check", "sim_check", "deep_check"):
return await self.run_signal_check(source=action.replace("_check", "") if action != "signal_check" else "light")
if action == "daily_digest":
return await self.run_daily_digest()
return {"ok": False, "message": f"unknown action: {action}"}
```
그리고 클래스 끝에 신규 메서드 추가:
```python
async def run_signal_check(self, source: str = "light") -> dict:
"""비-LLM 시그널 평가 (light/sim) 또는 deep_check (LLM 호출 후).
Phase 2에선 텔레그램 발송 안 함. lotto_signals INSERT만.
Phase 3에서 텔레그램 트리거 추가.
"""
from ..curator.signal_runner import run_signal_check
from ..config import LOTTO_Z_NORMAL, LOTTO_Z_URGENT
from ..db import add_log
if self.state not in ("idle", "reporting"):
return {"ok": False, "message": f"busy ({self.state})"}
try:
curate_result: dict | None = None
current_draw_no: int | None = None
if source == "deep":
# deep_check은 큐레이션을 먼저 수행 (LLM 호출)
from ..curator.pipeline import curate_weekly
cw = await curate_weekly(source="signal_deep")
curate_result = cw.get("payload") or cw # confidence 키 필요
current_draw_no = cw.get("draw_no")
outcome = await run_signal_check(
source=source,
z_normal=LOTTO_Z_NORMAL,
z_urgent=LOTTO_Z_URGENT,
curate_result=curate_result,
current_draw_no=current_draw_no,
)
add_log(
self.agent_id,
f"signal_check({source}) → overall={outcome['overall_fire']} results={len(outcome['results'])}",
)
return {"ok": True, **outcome}
except Exception as e:
add_log(self.agent_id, f"signal_check 예외: {e}", level="error")
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_daily_digest(self) -> dict:
"""Phase 2: 발화 카운트만 반환. Phase 3에서 텔레그램 발송 추가."""
from ..db import get_recent_lotto_signals, add_log
sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
add_log(self.agent_id, f"daily_digest: 지난 24h 발화 {len(sigs)}")
return {"ok": True, "count": len(sigs), "signals": sigs}
```
- [ ] **Step 2: Smoke import check**
```bash
cd agent-office && python -c "from app.agents.lotto import LottoAgent; print(LottoAgent.agent_id)"
```
Expected: `lotto`
- [ ] **Step 3: Commit**
```bash
git add agent-office/app/agents/lotto.py
git commit -m "feat(lotto-signals): LottoAgent.run_signal_check/run_daily_digest (텔레그램 X)"
```
---
## Task 7: scheduler.py에 cron 4개 추가
**Files:**
- Modify: `agent-office/app/scheduler.py`
- [ ] **Step 1: Add cron job functions + registrations**
기존 `_run_lotto_schedule` 근처에 함수 4개 추가:
```python
async def _run_lotto_light_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="light")
async def _run_lotto_sim_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="sim")
async def _run_lotto_deep_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="deep")
async def _run_lotto_daily_digest():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_daily_digest()
```
기존 `lotto_curate` cron 등록(line 75) 직후에 4개 추가:
```python
scheduler.add_job(_run_lotto_light_check, "cron", hour=9, minute=15, id="lotto_light_check")
scheduler.add_job(_run_lotto_sim_check, "cron", minute=15, hour="0,4,8,12,16,20", id="lotto_sim_check")
scheduler.add_job(_run_lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check")
# digest hour/min은 동적 (env)이지만 cron 등록은 상수로 — 9:25 고정. 변경 필요 시 컨테이너 재시작.
scheduler.add_job(_run_lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest")
```
- [ ] **Step 2: Verify import + cron registration**
```bash
cd agent-office && python -c "
from app.scheduler import _run_lotto_light_check, _run_lotto_sim_check, _run_lotto_deep_check, _run_lotto_daily_digest
print('all imports ok')
"
```
Expected: `all imports ok`
- [ ] **Step 3: Commit**
```bash
git add agent-office/app/scheduler.py
git commit -m "feat(lotto-signals): scheduler cron 4종 등록 (light/sim/deep/digest)"
```
---
# Phase 3 — 텔레그램 알림 + Throttle + API endpoints
## Task 8: 텔레그램 메시지 폼 단위 테스트
**Files:**
- Create: `agent-office/tests/test_lotto_telegram_signal.py`
- [ ] **Step 1: Write failing tests**
```python
# agent-office/tests/test_lotto_telegram_signal.py
from app.notifiers.telegram_lotto import (
_format_urgent_signal,
_format_signal_digest,
)
def test_urgent_signal_format_basic():
event = {
"fire_level": "urgent",
"triggered_at": "2026-05-20T07:18:00.000Z",
"results": [
{"metric": "sim_signal", "value": 1.84, "z_score": 3.9,
"baseline_mu": 1.02, "baseline_sigma": 0.21, "payload": {}},
{"metric": "drift", "value": 0.18, "z_score": 3.0,
"baseline_mu": 0.06, "baseline_sigma": 0.04,
"payload": {"weights_now": {"gap_focus": 0.5, "hot_focus": 0.5}}},
],
}
text = _format_urgent_signal(event)
assert "🚨" in text
assert "Sim Consensus" in text
assert "z=3.9" in text
assert "Strategy Drift" in text or "drift" in text.lower()
def test_signal_digest_format_with_signals():
digest = {
"evaluated": 6,
"fired": 2,
"signals": [
{"metric": "sim_signal", "fire_level": "normal", "z_score": 1.7,
"triggered_at": "2026-05-20T16:18:00Z", "payload": {}},
{"metric": "confidence", "fire_level": "normal", "z_score": 1.6,
"triggered_at": "2026-05-20T09:05:00Z", "payload": {}},
],
"weights_trend": {"gap_focus": +0.12, "hot_focus": -0.02, "pair_bias": -0.08},
}
text = _format_signal_digest(digest)
assert "📊" in text
assert "지난 24h" in text
assert "z=1.7" in text
def test_signal_digest_empty_returns_empty_string():
"""발화 0건이면 빈 문자열 → 발송 자체 skip 가능."""
text = _format_signal_digest({"evaluated": 6, "fired": 0, "signals": [], "weights_trend": {}})
assert text == ""
```
- [ ] **Step 2: Run test to verify it fails**
Run: `cd agent-office && pytest tests/test_lotto_telegram_signal.py -v`
Expected: FAIL `ImportError: cannot import name '_format_urgent_signal'`
- [ ] **Step 3: Implement telegram_lotto.py additions**
`agent-office/app/notifiers/telegram_lotto.py`에 추가 (파일 끝):
```python
# ---------- 능동 시그널 알림 (urgent + digest) ----------
_METRIC_LABEL = {
"sim_signal": "Sim Consensus",
"drift": "Strategy Drift",
"confidence": "Confidence",
}
def _format_urgent_signal(event: Dict[str, Any]) -> str:
"""긴급 시그널 텔레그램 메시지 포맷."""
triggered = event.get("triggered_at", "")[:19].replace("T", " ")
results = event.get("results", [])
fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")]
lines = [
"🚨 로또 능동 신호",
"",
f"[{triggered}]",
f"강한 시그널 {len(fired)}종 발화:",
]
for r in fired:
label = _METRIC_LABEL.get(r["metric"], r["metric"])
v = r.get("value")
mu = r.get("baseline_mu")
sigma = r.get("baseline_sigma")
z = r.get("z_score")
if mu is not None and sigma is not None:
lines.append(f"{label} {v:.2f} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}")
else:
lines.append(f"{label} {v:.2f}")
# drift 페이로드 — 어떤 전략이 변동했는지 한 줄
for r in fired:
if r["metric"] == "drift":
wn = (r.get("payload") or {}).get("weights_now") or {}
wp = (r.get("payload") or {}).get("weights_prev") or {}
if wn and wp:
diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))}
top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2]
detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top)
lines.append("")
lines.append(f"요인: {detail}")
break
lines.append("")
lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)")
return "\n".join(lines)
def _format_signal_digest(digest: Dict[str, Any]) -> str:
"""일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호)."""
fired = int(digest.get("fired", 0))
if fired == 0:
return ""
signals_list = digest.get("signals", [])
evaluated = digest.get("evaluated", 0)
lines = [
"📊 로또 일일 요약 (지난 24h)",
"",
f"평가 {evaluated}회 / 발화 {fired}",
]
for s in signals_list:
label = _METRIC_LABEL.get(s["metric"], s["metric"])
z = s.get("z_score")
when = (s.get("triggered_at") or "")[11:16] # HH:MM
z_text = f"z={z:.1f}" if z is not None else "z=-"
lines.append(f"{label:14s} {s['fire_level']:6s} {z_text} ({when})")
weights_trend = digest.get("weights_trend") or {}
if weights_trend:
lines += ["", "전략 가중치 추세 (최근 8회 baseline):"]
for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])):
arrow = "" if delta > 0.01 else ("" if delta < -0.01 else "")
lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%")
return "\n".join(lines)
async def send_urgent_signal(event: Dict[str, Any]) -> None:
text = _format_urgent_signal(event)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] urgent signal send failed: {e}")
async def send_signal_summary(digest: Dict[str, Any]) -> None:
text = _format_signal_digest(digest)
if not text:
return # 발화 0건이면 발송 skip
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] digest send failed: {e}")
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `cd agent-office && pytest tests/test_lotto_telegram_signal.py -v`
Expected: All 3 tests PASS
- [ ] **Step 5: Commit**
```bash
git add agent-office/app/notifiers/telegram_lotto.py \
agent-office/tests/test_lotto_telegram_signal.py
git commit -m "feat(lotto-signals): 텔레그램 urgent/digest 메시지 포맷"
```
---
## Task 9: signal_check이 텔레그램 발송 트리거 + throttle 통합
**Files:**
- Modify: `agent-office/app/agents/lotto.py`
- [ ] **Step 1: Update run_signal_check to fire telegram on urgent**
`run_signal_check()`의 마지막 `return {"ok": True, **outcome}` 직전에 추가:
```python
# --- Throttle + 텔레그램 urgent 발송 ---
from ..config import (
LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX,
)
from ..db import (
get_last_signal_notification, get_recent_urgent_count,
mark_signal_notified,
)
from ..notifiers.telegram_lotto import send_urgent_signal
if outcome["overall_fire"] == "urgent":
# daily cap 체크
if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX:
add_log(
self.agent_id,
f"urgent daily cap 도달 → normal로 강등 (digest 합류)",
level="warning",
)
else:
# throttle: 어느 메트릭이라도 6시간 이내 같은 fire_level 알림이 있으면 skip
blocked = False
for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"):
last = get_last_signal_notification(
metric=r["metric"], fire_level=r["fire_level"],
hours=LOTTO_THROTTLE_HOURS,
)
if last:
blocked = True
break
if not blocked:
from datetime import datetime, timezone
event = {
"fire_level": "urgent",
"triggered_at": datetime.now(timezone.utc).isoformat(),
"results": outcome["results"],
}
await send_urgent_signal(event)
# 모든 발화 시그널에 대해 notified_at 마킹
for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"):
mark_signal_notified(r["signal_id"])
add_log(self.agent_id, f"urgent 텔레그램 발송 완료 (시그널 {len(outcome['results'])}개 마킹)")
```
- [ ] **Step 2: Update run_daily_digest to actually send telegram**
`run_daily_digest()`를 다음으로 교체:
```python
async def run_daily_digest(self) -> dict:
"""일일 요약 — 지난 24h normal/urgent 발화를 묶어 텔레그램 1통."""
from ..db import get_recent_lotto_signals, get_signals_history, add_log, get_all_baselines
from ..notifiers.telegram_lotto import send_signal_summary
sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
# 전체 평가 횟수 (noop+warmup 포함)
total_24h = get_signals_history(days=1)
evaluated = len(total_24h)
# weights_trend: drift_weights_cache에서 prev/curr 차이 계산
trend = {}
try:
from ..db import get_baseline
cache = get_baseline("drift_weights_cache")
if cache and isinstance(cache["window_values"], list) and len(cache["window_values"]) >= 2:
prev_w = cache["window_values"][-2]
curr_w = cache["window_values"][-1]
trend = {
k: curr_w.get(k, 0.0) - prev_w.get(k, 0.0)
for k in (set(prev_w) | set(curr_w))
}
except Exception as e:
add_log(self.agent_id, f"weights_trend 계산 실패: {e}", level="warning")
digest = {
"evaluated": evaluated,
"fired": len(sigs),
"signals": sigs,
"weights_trend": trend,
}
await send_signal_summary(digest)
add_log(self.agent_id, f"daily_digest 발송: 평가 {evaluated} / 발화 {len(sigs)}")
return {"ok": True, **digest}
```
- [ ] **Step 3: Smoke import check**
```bash
cd agent-office && python -c "
import asyncio
from app.agents.lotto import LottoAgent
agent = LottoAgent()
print('methods:', [m for m in dir(agent) if 'signal' in m or 'digest' in m])
"
```
Expected: `methods: ['run_daily_digest', 'run_signal_check']`
- [ ] **Step 4: Commit**
```bash
git add agent-office/app/agents/lotto.py
git commit -m "feat(lotto-signals): urgent 텔레그램 발송 + throttle/cap + daily digest 전송"
```
---
## Task 10: FastAPI endpoint 3개 추가
**Files:**
- Modify: `agent-office/app/main.py`
- [ ] **Step 1: Add endpoints**
`agent-office/app/main.py`에서 다른 lotto 관련 endpoint 근처에 추가:
```python
@app.get("/api/agent-office/lotto/signals")
async def list_lotto_signals(days: int = 7):
"""시그널 이력 (모든 fire_level)."""
from .db import get_signals_history
return {"items": get_signals_history(days=days)}
@app.get("/api/agent-office/lotto/baselines")
async def list_lotto_baselines():
"""현재 baseline μ/σ + window 상태."""
from .db import get_all_baselines
return {"items": get_all_baselines()}
@app.post("/api/agent-office/lotto/signal-check")
async def trigger_signal_check(source: str = "light"):
"""수동 트리거 (디버그·테스트용). source ∈ {light, sim, deep}."""
if source not in ("light", "sim", "deep"):
from fastapi import HTTPException
raise HTTPException(status_code=400, detail="source must be light/sim/deep")
agent = AGENT_REGISTRY.get("lotto")
if not agent:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="lotto agent not registered")
return await agent.run_signal_check(source=source)
```
- [ ] **Step 2: Verify endpoints**
```bash
cd agent-office && python -c "
from app.main import app
routes = [r.path for r in app.routes if 'lotto' in r.path and 'signal' in r.path or 'baseline' in r.path]
print(routes)
"
```
Expected: 3 routes listed including `/api/agent-office/lotto/signals`, `/baselines`, `/signal-check`
- [ ] **Step 3: Commit**
```bash
git add agent-office/app/main.py
git commit -m "feat(lotto-signals): GET signals/baselines + POST signal-check endpoint"
```
---
## Task 11: CLAUDE.md 업데이트 (agent-office 섹션)
**Files:**
- Modify: `web-backend/CLAUDE.md`
- [ ] **Step 1: Update agent-office API & scheduler section**
`web-backend/CLAUDE.md`의 agent-office API 표에 다음 행 추가 (적절한 위치):
```markdown
| GET | `/api/agent-office/lotto/signals?days=7` | 로또 능동 시그널 이력 (모든 fire_level) |
| GET | `/api/agent-office/lotto/baselines` | 로또 메트릭별 baseline μ/σ + 윈도우 상태 |
| POST | `/api/agent-office/lotto/signal-check?source=light` | 로또 시그널 평가 수동 트리거 (light/sim/deep) |
```
그리고 "스케줄러 job" 항목에 추가:
```markdown
- 09:15 매일 — 로또 light_check (시뮬·전략 가중치 평가)
- 매 4시간 :15 — 로또 sim_check (00/04/08/12/16/20시)
- 일/수 21:15 — 로또 deep_check (큐레이션 후 confidence 포함 평가)
- 09:25 매일 — 로또 daily_digest (지난 24h 발화 텔레그램 1통)
```
또한 새 환경변수 명시:
```markdown
- `LOTTO_SIGNAL_WINDOW`: baseline 윈도우 크기 (기본 8)
- `LOTTO_Z_NORMAL`: normal fire 임계치 (기본 1.5)
- `LOTTO_Z_URGENT`: urgent fire 임계치 (기본 2.5)
- `LOTTO_THROTTLE_HOURS`: 같은 메트릭 재발화 throttle (기본 6시간)
- `LOTTO_URGENT_DAILY_MAX`: urgent 하루 cap (기본 3통)
```
- [ ] **Step 2: Verify**
```bash
git -C C:/Users/jaeoh/Desktop/workspace/web-backend diff CLAUDE.md | head -50
```
- [ ] **Step 3: Commit**
```bash
git add CLAUDE.md
git commit -m "docs(CLAUDE): agent-office 로또 능동 시그널 API/스케줄러/env 추가"
```
---
## Task 12: NAS 배포 + 24h 가동 검증 (수동)
- [ ] **Step 1: git push로 자동 배포**
```bash
git push
```
Expected: Gitea webhook → deployer가 `docker compose up -d --build` 실행 → agent-office 재시작.
- [ ] **Step 2: 컨테이너 상태 확인**
```bash
ssh nas "docker logs agent-office --tail 50"
```
Expected:
- `init_db` 로그에서 `lotto_signals`, `lotto_baselines` 테이블 생성 성공 메시지
- scheduler 로그에서 `lotto_light_check`, `lotto_sim_check`, `lotto_deep_check`, `lotto_digest` 4개 job 등록 확인
- [ ] **Step 3: 수동 트리거로 즉시 검증**
```bash
curl -X POST "https://gahusb.synology.me/api/agent-office/lotto/signal-check?source=light"
```
Expected: `{"ok": true, "overall_fire": "warmup", "results": [...]}`
(첫 호출은 baseline 비어있어 warmup)
- [ ] **Step 4: DB에 시그널 들어갔는지 확인**
```bash
curl "https://gahusb.synology.me/api/agent-office/lotto/signals?days=1"
```
Expected: `{"items": [{"id": 1, "source": "light", "metric": "sim_signal", "fire_level": "warmup", ...}]}`
- [ ] **Step 5: 24시간 가동 후 baseline 누적 확인**
24h 후:
```bash
curl "https://gahusb.synology.me/api/agent-office/lotto/baselines"
```
Expected: `sim_signal` 메트릭의 `window_values` 길이가 6~7개 (4h 시뮬 6회 + 1회 light)
- [ ] **Step 6: 텔레그램 발송 검증 (자연 발화 또는 강제)**
24h 가동 후에도 자연 발화 없으면 강제 테스트:
- `LOTTO_Z_NORMAL=0.1`로 일시 낮춤 → 재배포 → 다음 sim_check에서 발화
- 텔레그램으로 🚨 메시지 도착 확인 후 원복
---
# Self-Review (수행 완료)
**1. Spec coverage check**
| Spec 섹션 | 구현 task |
|---|---|
| 4.1 Sim Consensus | Task 1, 2 |
| 4.2 Strategy Drift | Task 1, 2 |
| 4.3 Confidence | Task 1, 2 |
| 4.4 Adaptive Baseline | Task 1, 2 |
| 4.5 Trigger × Metric matrix | Task 4 (DRAW_SCOPED_METRICS + source 분기) |
| 4.6 Fire 결정 | Task 1 (`decide_overall_fire`) |
| 5.1 알림 흐름 | Task 9 |
| 5.2 메시지 폼 | Task 8 |
| 5.3 Throttle | Task 9 |
| 6.1 lotto_signals 테이블 | Task 3 |
| 6.2 lotto_baselines 테이블 | Task 3 |
| 7. API 3종 | Task 10 |
| 8. env vars 7종 | Task 5 |
| 9. cron 4종 | Task 7 |
모두 매핑됨.
**2. Placeholder scan**: TBD/TODO 없음. 모든 step에 실제 코드 포함.
**3. Type consistency**:
- `evaluate_metric_and_persist` 시그니처는 Task 4 정의 그대로 Task 6에서 호출
- `_format_urgent_signal(event)` event 구조: `fire_level`, `triggered_at`, `results[]` → Task 8 테스트 & Task 9 호출 일치
- `digest` 구조: `evaluated`, `fired`, `signals`, `weights_trend` → Task 8 테스트 & Task 9 호출 일치
- DB CRUD 함수명 (`insert_lotto_signal`, `get_recent_lotto_signals`, `mark_signal_notified` 등) — Task 3 정의와 Task 9 호출 일치
이슈 없음.
---
# 비목표 (Out of scope)
- Layer B 자동 시뮬 강도 조절 (v2)
- 텔레그램 인라인 키보드 / 사용자 피드백 루프 (v2)
- 핫넘버/콜드넘버 시그널 (노이즈 위험으로 v1 제외)
- 프론트 `/lotto/agent` UI (web-ui 별도 PR)
- Hot/Cold rotation 메트릭 추가