Why: spec (2026-05-20-lotto-active-agent-design.md)을 12개 atomic task (TDD: 테스트→fail→구현→pass→commit)로 분해. 24h 가동 검증 task 포함. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
57 KiB
LottoAgent 능동성 확장 Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: LottoAgent에 다중 트리거 + 적응형 시그널 모니터링 + 텔레그램 능동 알림(긴급/일일 요약) 기능 추가
Architecture: agent-office 단일 컨테이너 안에 curator/signals.py 모듈 신설 + agent_office.db에 2개 테이블 추가. lotto-lab은 read-only GET API만 소비, 변경 없음. cron 4개로 light / sim / deep / digest 분리.
Tech Stack: Python 3.12, FastAPI, APScheduler, SQLite, httpx, pytest, pytest-asyncio
Spec: docs/superpowers/specs/2026-05-20-lotto-active-agent-design.md
File Structure
| 경로 | 작업 | 책임 |
|---|---|---|
agent-office/app/curator/signals.py |
Create | 3종 메트릭 평가 + adaptive baseline (순수 함수) |
agent-office/app/curator/signal_runner.py |
Create | DB I/O + cron 진입점 + 알림 결정 |
agent-office/app/db.py |
Modify | lotto_signals, lotto_baselines 테이블 + CRUD |
agent-office/app/service_proxy.py |
Modify | lotto_best(), lotto_strategy_weights() 추가 |
agent-office/app/agents/lotto.py |
Modify | on_signal_check(), on_daily_digest() 추가 |
agent-office/app/scheduler.py |
Modify | cron 4개 등록 |
agent-office/app/notifiers/telegram_lotto.py |
Modify | send_urgent_signal(), send_signal_summary() |
agent-office/app/main.py |
Modify | endpoint 3개 |
agent-office/app/config.py |
Modify | env vars 7개 |
agent-office/tests/test_lotto_signals.py |
Create | signals.py 단위 테스트 |
agent-office/tests/test_lotto_signal_runner.py |
Create | signal_runner DB I/O 통합 테스트 |
agent-office/tests/test_lotto_telegram_signal.py |
Create | 텔레그램 메시지 포맷 테스트 |
web-backend/CLAUDE.md |
Modify | agent-office 섹션 갱신 |
Phase 1 — 순수 함수 + DB 마이그레이션
Task 1: signals.py 메트릭 함수 단위 테스트
Files:
-
Create:
agent-office/tests/test_lotto_signals.py -
Step 1: Write failing tests for Sim Consensus
# agent-office/tests/test_lotto_signals.py
import math
import pytest
from app.curator import signals
def test_sim_consensus_top10_geomean():
"""top-10 consensus 평균이 기하평균 기반인지."""
best_picks = [
{"scores": [10, 10, 10, 10, 10]}, # high & uniform
{"scores": [9, 9, 9, 9, 9]},
{"scores": [8, 8, 8, 8, 8]},
{"scores": [7, 7, 7, 7, 7]},
{"scores": [6, 6, 6, 6, 6]},
{"scores": [5, 5, 5, 5, 5]},
{"scores": [4, 4, 4, 4, 4]},
{"scores": [3, 3, 3, 3, 3]},
{"scores": [2, 2, 2, 2, 2]},
{"scores": [1, 1, 1, 1, 1]}, # top 10
{"scores": [0, 0, 0, 0, 0]}, # bottom 10
] * 1 + [{"scores": [0, 0, 0, 0, 0]}] * 10
# 처음 10개가 top, 다 normalize 후 consensus 동일하니까 top10 평균은 명확
result = signals.sim_consensus_score(best_picks)
assert 0.0 <= result <= 1.0
# uniform 분포에선 top10 평균이 단조 감소 분포의 상반부 평균과 같아야 함
assert result > 0.4
def test_sim_consensus_geomean_penalizes_imbalance():
"""5종 중 한 종만 폭주하는 outlier 후보는 균형 후보보다 작아야 한다."""
balanced = [{"scores": [5, 5, 5, 5, 5]}] * 20
imbalanced = [{"scores": [25, 0, 0, 0, 0]}] * 20 # 평균은 같지만 한 점수만 폭주
s_balanced = signals.sim_consensus_score(balanced)
s_imbalanced = signals.sim_consensus_score(imbalanced)
# imbalanced는 normalize 후 한 차원만 1.0, 나머지 0.0 → 기하평균 0
assert s_imbalanced < s_balanced
def test_strategy_drift_score():
"""drift = 전략별 가중치 변화 절댓값 합."""
w_prev = {"gap_focus": 0.30, "hot_focus": 0.25, "pair_bias": 0.45}
w_curr = {"gap_focus": 0.40, "hot_focus": 0.20, "pair_bias": 0.40}
# |0.10| + |-0.05| + |-0.05| = 0.20
result = signals.strategy_drift_score(w_prev, w_curr)
assert abs(result - 0.20) < 1e-9
def test_strategy_drift_new_strategy_appears():
"""이전에 없던 전략이 등장하면 그 가중치 전체가 drift에 가산."""
w_prev = {"gap_focus": 0.5, "hot_focus": 0.5}
w_curr = {"gap_focus": 0.4, "hot_focus": 0.4, "newbie": 0.2}
# |0.5-0.4| + |0.5-0.4| + |0-0.2| = 0.4
result = signals.strategy_drift_score(w_prev, w_curr)
assert abs(result - 0.4) < 1e-9
def test_confidence_score_passthrough():
"""confidence는 큐레이션 결과의 값 그대로 (0~1 clamp 확인)."""
assert signals.confidence_score({"confidence": 0.85}) == 0.85
assert signals.confidence_score({"confidence": 1.2}) == 1.0 # clamp
assert signals.confidence_score({"confidence": -0.1}) == 0.0
assert signals.confidence_score({}) is None
def test_adaptive_baseline_cold_start():
"""window 크기 < 4 → warmup, z=None."""
bl = signals.AdaptiveBaseline(window=[1.0, 1.1, 0.9], window_max=8)
z, fire = bl.evaluate(value=1.5, z_normal=1.5, z_urgent=2.5)
assert fire == "warmup"
assert z is None
def test_adaptive_baseline_preparing():
"""window 4~7 → 보수적 임계치 z=2.0."""
bl = signals.AdaptiveBaseline(window=[1.0, 1.0, 1.0, 1.0], window_max=8)
# value=1.0 → mu=1.0, sigma=0 → div0 처리 필요. preparing 단계라 임계치는 보수적.
z, fire = bl.evaluate(value=3.0, z_normal=1.5, z_urgent=2.5)
# sigma=0인 경우 처리 — value > mu면 무한대로 보고 urgent? 또는 z계산 불가 → warmup?
# 결정: sigma == 0이면 (window 분산 없음) z=inf 취급, value > mu면 urgent로
assert fire in ("normal", "urgent")
def test_adaptive_baseline_normal_window_full():
"""window 8 풀, value가 평균보다 1.5σ 이상이면 normal."""
bl = signals.AdaptiveBaseline(
window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0],
window_max=8,
)
# μ ≈ 1.0, σ ≈ 0.08 → z=1.5는 value ≈ 1.12
z, fire = bl.evaluate(value=1.20, z_normal=1.5, z_urgent=2.5)
assert fire == "normal"
assert z is not None and z >= 1.5
def test_adaptive_baseline_urgent():
"""z >= 2.5 → urgent."""
bl = signals.AdaptiveBaseline(
window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0],
window_max=8,
)
z, fire = bl.evaluate(value=2.0, z_normal=1.5, z_urgent=2.5)
assert fire == "urgent"
def test_adaptive_baseline_push_updates_window():
"""push 시 FIFO 동작."""
bl = signals.AdaptiveBaseline(window=[1, 2, 3, 4, 5, 6, 7, 8], window_max=8)
bl.push(9.0)
assert bl.window == [2, 3, 4, 5, 6, 7, 8, 9.0]
def test_decide_fire_level_combination():
"""2개 이상 normal 발화 → urgent."""
# 2개 normal z ≥ 1.5
sigs = [
{"metric": "sim", "z": 1.6, "fire": "normal"},
{"metric": "drift", "z": 1.7, "fire": "normal"},
{"metric": "conf", "z": 0.5, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs) == "urgent"
# 1개 normal
sigs2 = [
{"metric": "sim", "z": 1.6, "fire": "normal"},
{"metric": "drift", "z": 0.3, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs2) == "normal"
# 단일 극값 z ≥ 2.5 → urgent
sigs3 = [
{"metric": "sim", "z": 3.0, "fire": "urgent"},
{"metric": "drift", "z": 0.2, "fire": "noop"},
]
assert signals.decide_overall_fire(sigs3) == "urgent"
# 전부 noop
sigs4 = [{"metric": "sim", "z": 0.5, "fire": "noop"}]
assert signals.decide_overall_fire(sigs4) == "noop"
- Step 2: Run test to verify it fails
Run: cd agent-office && pytest tests/test_lotto_signals.py -v
Expected: FAIL with ModuleNotFoundError: No module named 'app.curator.signals'
- Step 3: Commit failing tests
git add agent-office/tests/test_lotto_signals.py
git commit -m "test(lotto-signals): 메트릭 함수·adaptive baseline 단위 테스트"
Task 2: signals.py 구현
Files:
-
Create:
agent-office/app/curator/signals.py -
Step 1: Implement signals module
# agent-office/app/curator/signals.py
"""LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수).
DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list.
signal_runner.py에서 DB 연동 + cron 진입점 담당.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from statistics import mean, pstdev
from typing import Any, Dict, List, Optional, Tuple
# ---------- Metric: Sim Consensus ----------
def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]:
"""20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수."""
if not picks:
return []
n_metrics = len(picks[0]["scores"])
columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)]
norms_per_col = []
for col in columns:
lo, hi = min(col), max(col)
rng = hi - lo
if rng == 0:
norms_per_col.append([0.5] * len(col)) # 모두 동일하면 중립 0.5
else:
norms_per_col.append([(v - lo) / rng for v in col])
# 전치: 후보별 정규화 점수 리스트
return [
[norms_per_col[k][i] for k in range(n_metrics)]
for i in range(len(picks))
]
def _geomean(values: List[float]) -> float:
"""기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티)."""
if not values:
return 0.0
if any(v <= 0 for v in values):
return 0.0
log_sum = sum(math.log(v) for v in values)
return math.exp(log_sum / len(values))
def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float:
"""top-10 후보의 기하평균 consensus 평균.
Args:
best_picks: [{"numbers": [...], "scores": [s1, s2, s3, s4, s5]}, ...]
"""
if not best_picks:
return 0.0
normalized = _normalize_columns(best_picks)
consensus = [_geomean(scores) for scores in normalized]
consensus.sort(reverse=True)
top = consensus[:10] if len(consensus) >= 10 else consensus
return mean(top) if top else 0.0
# ---------- Metric: Strategy Drift ----------
def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float:
"""가중치 변화 절댓값 합. 신규/소멸 전략도 가산."""
keys = set(prev) | set(curr)
return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys)
# ---------- Metric: Confidence ----------
def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]:
"""큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None."""
if "confidence" not in curate_result:
return None
v = float(curate_result["confidence"])
return max(0.0, min(1.0, v))
# ---------- Adaptive Baseline ----------
@dataclass
class AdaptiveBaseline:
window: List[float] = field(default_factory=list)
window_max: int = 8
last_pushed_draw_no: Optional[int] = None # 회차 단위 메트릭 중복 push 방지
@property
def size(self) -> int:
return len(self.window)
@property
def mu(self) -> float:
return mean(self.window) if self.window else 0.0
@property
def sigma(self) -> float:
# population stdev (8개 윈도우라 ddof=0이 안정)
return pstdev(self.window) if len(self.window) >= 2 else 0.0
def push(self, value: float, draw_no: Optional[int] = None) -> None:
"""FIFO push. window_max 초과 시 가장 오래된 값 제거."""
self.window.append(float(value))
if len(self.window) > self.window_max:
self.window = self.window[-self.window_max:]
if draw_no is not None:
self.last_pushed_draw_no = draw_no
def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]:
"""z-score 계산 + fire_level 판정.
Returns:
(z_score, fire_level) — z_score는 cold start/warmup이면 None.
fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'}
"""
if self.size < 4:
return None, "warmup"
# 보수적 임계치 (window 4~7)
z_normal_eff = 2.0 if self.size < self.window_max else z_normal
z_urgent_eff = z_urgent
if self.sigma == 0:
# 분산 없음 — value가 평균을 크게 벗어나면 urgent로 분류, 아니면 noop
return (None, "urgent") if value > self.mu else (None, "noop")
z = (value - self.mu) / self.sigma
if z >= z_urgent_eff:
return z, "urgent"
if z >= z_normal_eff:
return z, "normal"
return z, "noop"
# ---------- Combined fire decision ----------
def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str:
"""3종 시그널을 종합해 전체 fire_level 결정.
Args:
signal_results: [{"metric": str, "z": float|None, "fire": str}, ...]
Returns:
'noop' | 'normal' | 'urgent'
"""
fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")]
if any(s["fire"] == "urgent" for s in fires):
return "urgent"
if len(fires) >= 2:
return "urgent" # 2개 이상 normal → urgent로 승격
if len(fires) == 1:
return "normal"
return "noop"
- Step 2: Run tests to verify they pass
Run: cd agent-office && pytest tests/test_lotto_signals.py -v
Expected: All 9 tests PASS
- Step 3: Commit
git add agent-office/app/curator/signals.py
git commit -m "feat(lotto-signals): 메트릭 함수·adaptive baseline 순수함수 구현"
Task 3: DB 마이그레이션 (lotto_signals + lotto_baselines)
Files:
-
Modify:
agent-office/app/db.py:19(init_db 함수 내) -
Step 1: Add table creation to init_db
init_db() 함수의 마지막 INSERT OR IGNORE 루프 직전(line 100, youtube_research_jobs 테이블 뒤)에 다음 추가:
conn.execute("""
CREATE TABLE IF NOT EXISTS lotto_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
triggered_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
source TEXT NOT NULL,
metric TEXT NOT NULL,
value REAL NOT NULL,
baseline_mu REAL,
baseline_sigma REAL,
z_score REAL,
fire_level TEXT NOT NULL,
notified_at TEXT,
payload TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_ls_triggered
ON lotto_signals(triggered_at DESC)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_ls_fire
ON lotto_signals(fire_level, notified_at)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS lotto_baselines (
metric TEXT PRIMARY KEY,
window_values TEXT NOT NULL DEFAULT '[]',
mu REAL NOT NULL DEFAULT 0.0,
sigma REAL NOT NULL DEFAULT 0.0,
last_pushed_draw_no INTEGER,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
- Step 2: Add CRUD helpers at end of db.py
db.py 파일 맨 아래에 추가:
# --- lotto_signals / lotto_baselines CRUD ---
def insert_lotto_signal(
source: str,
metric: str,
value: float,
baseline_mu: Optional[float],
baseline_sigma: Optional[float],
z_score: Optional[float],
fire_level: str,
payload: Optional[Dict[str, Any]] = None,
) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO lotto_signals
(source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, payload)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
source, metric, value,
baseline_mu, baseline_sigma, z_score, fire_level,
json.dumps(payload or {}, ensure_ascii=False),
),
)
return cur.lastrowid
def mark_signal_notified(signal_id: int) -> None:
with _conn() as conn:
conn.execute(
"UPDATE lotto_signals SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?",
(signal_id,),
)
def get_recent_lotto_signals(hours: int = 24, min_fire: str = "normal") -> List[Dict[str, Any]]:
"""지난 N시간 발화 시그널. min_fire='normal'이면 normal+urgent."""
levels = ("urgent",) if min_fire == "urgent" else ("normal", "urgent")
placeholders = ",".join("?" * len(levels))
with _conn() as conn:
rows = conn.execute(
f"""
SELECT * FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
AND fire_level IN ({placeholders})
ORDER BY triggered_at DESC
""",
(f"-{int(hours)} hours", *levels),
).fetchall()
return [dict(r) for r in rows]
def get_signals_history(days: int = 7) -> List[Dict[str, Any]]:
"""차트/이력 페이지용 — 모든 fire_level 포함."""
with _conn() as conn:
rows = conn.execute(
"""
SELECT * FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
ORDER BY triggered_at DESC
""",
(f"-{int(days)} days",),
).fetchall()
return [dict(r) for r in rows]
def get_recent_urgent_count(hours: int = 24) -> int:
with _conn() as conn:
row = conn.execute(
"""
SELECT COUNT(*) AS c FROM lotto_signals
WHERE triggered_at >= datetime('now', ?)
AND fire_level = 'urgent'
AND notified_at IS NOT NULL
""",
(f"-{int(hours)} hours",),
).fetchone()
return int(row["c"]) if row else 0
def get_last_signal_notification(metric: str, fire_level: str, hours: int) -> Optional[str]:
"""같은 metric+fire_level이 hours 내에 알림 발송된 마지막 시각. throttle용."""
with _conn() as conn:
row = conn.execute(
"""
SELECT notified_at FROM lotto_signals
WHERE metric = ?
AND fire_level = ?
AND notified_at IS NOT NULL
AND notified_at >= datetime('now', ?)
ORDER BY notified_at DESC LIMIT 1
""",
(metric, fire_level, f"-{int(hours)} hours"),
).fetchone()
return row["notified_at"] if row else None
def get_baseline(metric: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM lotto_baselines WHERE metric = ?",
(metric,),
).fetchone()
if not row:
return None
d = dict(row)
d["window_values"] = json.loads(d["window_values"])
return d
def upsert_baseline(
metric: str,
window_values: List[float],
mu: float,
sigma: float,
last_pushed_draw_no: Optional[int],
) -> None:
with _conn() as conn:
conn.execute(
"""
INSERT INTO lotto_baselines
(metric, window_values, mu, sigma, last_pushed_draw_no, updated_at)
VALUES (?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
ON CONFLICT(metric) DO UPDATE SET
window_values = excluded.window_values,
mu = excluded.mu,
sigma = excluded.sigma,
last_pushed_draw_no = excluded.last_pushed_draw_no,
updated_at = excluded.updated_at
""",
(
metric,
json.dumps(window_values),
mu, sigma, last_pushed_draw_no,
),
)
def get_all_baselines() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM lotto_baselines ORDER BY metric").fetchall()
out = []
for r in rows:
d = dict(r)
d["window_values"] = json.loads(d["window_values"])
out.append(d)
return out
- Step 3: Smoke test the migration
새 임시 DB로 마이그레이션이 동작하는지:
cd agent-office && python -c "
import os
os.environ['AGENT_OFFICE_DB_PATH'] = '/tmp/test_migration.db'
from app.db import init_db, insert_lotto_signal, get_baseline, upsert_baseline
init_db()
sid = insert_lotto_signal('light','sim_signal', 1.5, 1.0, 0.2, 2.5, 'urgent', {'top':[1,2,3]})
print('signal id:', sid)
upsert_baseline('sim_signal', [1.0, 1.1, 0.9], 1.0, 0.1, None)
print('baseline:', get_baseline('sim_signal'))
"
rm /tmp/test_migration.db
Expected: signal id가 1 출력, baseline dict 출력 (window_values는 list)
- Step 4: Commit
git add agent-office/app/db.py
git commit -m "feat(lotto-signals): lotto_signals/lotto_baselines 테이블 + CRUD"
Task 4: signal_runner.py — DB I/O + 평가 orchestration
Files:
-
Create:
agent-office/app/curator/signal_runner.py -
Create:
agent-office/tests/test_lotto_signal_runner.py -
Step 1: Write failing test for signal_runner
# agent-office/tests/test_lotto_signal_runner.py
import os
import pytest
os.environ["AGENT_OFFICE_DB_PATH"] = "/tmp/test_signal_runner.db"
from app.curator import signal_runner
from app import db
@pytest.fixture(autouse=True)
def fresh_db():
if os.path.exists("/tmp/test_signal_runner.db"):
os.remove("/tmp/test_signal_runner.db")
db.init_db()
yield
if os.path.exists("/tmp/test_signal_runner.db"):
os.remove("/tmp/test_signal_runner.db")
def test_evaluate_and_persist_cold_start():
"""첫 호출은 warmup으로 기록되고 baseline은 비어있다."""
result = signal_runner.evaluate_metric_and_persist(
source="light",
metric="sim_signal",
value=1.5,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
assert result["fire_level"] == "warmup"
assert result["z_score"] is None
bl = db.get_baseline("sim_signal")
assert bl is not None
assert bl["window_values"] == [1.5]
def test_evaluate_after_window_filled_normal_fire():
"""8회 push 후 정상 운영, 평균 대비 z≥1.5면 normal."""
for v in [1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0]:
signal_runner.evaluate_metric_and_persist(
source="sim",
metric="sim_signal",
value=v,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
# 9번째: window는 [v2..v8, 1.0]이지만 push 후 평가. 평균≈1.0, σ≈0.08 → z 큰 값
result = signal_runner.evaluate_metric_and_persist(
source="sim",
metric="sim_signal",
value=1.20,
draw_no=None,
z_normal=1.5,
z_urgent=2.5,
push_to_window=True,
)
assert result["fire_level"] in ("normal", "urgent")
assert result["z_score"] is not None and result["z_score"] >= 1.5
def test_evaluate_drift_skips_same_draw_push():
"""drift는 회차 단위. 같은 회차에서 두 번 호출하면 두 번째는 window push X."""
# 첫 push (draw 1100)
signal_runner.evaluate_metric_and_persist(
source="sim", metric="drift", value=0.05, draw_no=1100,
z_normal=1.5, z_urgent=2.5, push_to_window=True,
)
bl_before = db.get_baseline("drift")
assert bl_before["window_values"] == [0.05]
assert bl_before["last_pushed_draw_no"] == 1100
# 같은 회차 두 번째 호출 — window push X
signal_runner.evaluate_metric_and_persist(
source="sim", metric="drift", value=0.08, draw_no=1100,
z_normal=1.5, z_urgent=2.5, push_to_window=True,
)
bl_after = db.get_baseline("drift")
assert bl_after["window_values"] == [0.05] # 변경 없음
def test_run_signal_check_aggregates_three_metrics(monkeypatch):
"""run_signal_check이 3종 메트릭 모두 평가하고 overall fire를 반환."""
# mock service_proxy
async def fake_lotto_best():
# 동일한 5종 점수의 균등 후보 20개
return [{"numbers": [1,2,3,4,5,6], "scores": [10,10,10,10,10]}] * 20
async def fake_lotto_strategy_weights():
return {"gap_focus": 0.4, "hot_focus": 0.3, "pair_bias": 0.3}
monkeypatch.setattr(signal_runner, "_fetch_best_picks", fake_lotto_best)
monkeypatch.setattr(signal_runner, "_fetch_strategy_weights", fake_lotto_strategy_weights)
import asyncio
out = asyncio.get_event_loop().run_until_complete(
signal_runner.run_signal_check(source="light", curate_result=None, current_draw_no=1101)
)
assert "overall_fire" in out
assert "results" in out
assert any(r["metric"] == "sim_signal" for r in out["results"])
assert any(r["metric"] == "drift" for r in out["results"])
# light_check는 confidence 평가 안 함
assert not any(r["metric"] == "confidence" for r in out["results"])
- Step 2: Run test to verify it fails
Run: cd agent-office && pytest tests/test_lotto_signal_runner.py -v
Expected: FAIL ModuleNotFoundError: No module named 'app.curator.signal_runner'
- Step 3: Implement signal_runner.py
# agent-office/app/curator/signal_runner.py
"""LottoAgent 능동 시그널 — DB I/O + cron 진입점 + 평가 orchestration."""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from .. import db
from .. import service_proxy
from . import signals
logger = logging.getLogger("agent-office.lotto-signals")
# 회차 단위 메트릭 (window push 시 last_pushed_draw_no 비교)
DRAW_SCOPED_METRICS = {"drift", "confidence"}
def _load_baseline(metric: str) -> signals.AdaptiveBaseline:
row = db.get_baseline(metric)
if row is None:
return signals.AdaptiveBaseline(window=[], window_max=8)
return signals.AdaptiveBaseline(
window=list(row["window_values"]),
window_max=8,
last_pushed_draw_no=row.get("last_pushed_draw_no"),
)
def _save_baseline(metric: str, bl: signals.AdaptiveBaseline) -> None:
db.upsert_baseline(
metric=metric,
window_values=bl.window,
mu=bl.mu,
sigma=bl.sigma,
last_pushed_draw_no=bl.last_pushed_draw_no,
)
def evaluate_metric_and_persist(
source: str,
metric: str,
value: float,
draw_no: Optional[int],
z_normal: float,
z_urgent: float,
push_to_window: bool,
payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""단일 메트릭 평가 → lotto_signals INSERT → baseline 갱신.
회차 단위 메트릭(drift, confidence)은 같은 draw_no에서 window push 생략.
"""
bl = _load_baseline(metric)
# 회차 가드: 같은 회차 재평가 시 window push 생략
do_push = push_to_window
if metric in DRAW_SCOPED_METRICS and draw_no is not None:
if bl.last_pushed_draw_no == draw_no:
do_push = False
# 평가는 push 전 baseline 기준 (현재값 vs 과거 분포)
z, fire = bl.evaluate(value=value, z_normal=z_normal, z_urgent=z_urgent)
if do_push:
bl.push(value=value, draw_no=draw_no)
_save_baseline(metric, bl)
sid = db.insert_lotto_signal(
source=source,
metric=metric,
value=value,
baseline_mu=bl.mu if bl.size > 0 else None,
baseline_sigma=bl.sigma if bl.size >= 2 else None,
z_score=z,
fire_level=fire,
payload=payload,
)
return {
"signal_id": sid,
"metric": metric,
"value": value,
"z_score": z,
"fire_level": fire,
"payload": payload or {},
}
# ---------- Service proxy thin wrappers (monkeypatch 대상) ----------
async def _fetch_best_picks() -> List[Dict[str, Any]]:
return await service_proxy.lotto_best()
async def _fetch_strategy_weights() -> Dict[str, float]:
return await service_proxy.lotto_strategy_weights()
# ---------- Orchestrator ----------
async def run_signal_check(
source: str,
z_normal: float = 1.5,
z_urgent: float = 2.5,
curate_result: Optional[Dict[str, Any]] = None,
current_draw_no: Optional[int] = None,
) -> Dict[str, Any]:
"""cron 진입점. source ∈ {'light', 'sim', 'deep'}.
light/sim: Sim Consensus + Strategy Drift 평가
deep: 위 2종 + Confidence (curate_result 필요)
"""
results: List[Dict[str, Any]] = []
# --- Sim Consensus ---
try:
best = await _fetch_best_picks()
v = signals.sim_consensus_score(best)
results.append(
evaluate_metric_and_persist(
source=source, metric="sim_signal",
value=v, draw_no=None,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"top_count": min(len(best), 10)},
)
)
except Exception as e:
logger.warning(f"sim_consensus 평가 실패: {e}")
# --- Strategy Drift (회차 단위) ---
try:
w_curr = await _fetch_strategy_weights()
bl_drift = _load_baseline("drift")
# drift는 prev → curr 비교가 필요. baseline window의 마지막 entry는 이전 drift값.
# prev_weights는 별도 저장이 필요한데, 간단히 lotto_baselines.payload에 안 두고
# 매 호출마다 GET 후 캐시되지 않은 직전 값과 비교 불가 → drift = 1회차 단위.
# 단순화: 이번 호출의 가중치를 baseline에 저장하기 전 직전 저장값을 prev로 사용.
prev_payload_row = db.get_baseline("drift_weights_cache")
w_prev = prev_payload_row["window_values"] if prev_payload_row else None
if w_prev and isinstance(w_prev, list) and len(w_prev) > 0 and isinstance(w_prev[0], dict):
# last entry is dict of weights
prev_dict = w_prev[-1]
drift_value = signals.strategy_drift_score(prev_dict, w_curr)
results.append(
evaluate_metric_and_persist(
source=source, metric="drift",
value=drift_value, draw_no=current_draw_no,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"weights_now": w_curr, "weights_prev": prev_dict},
)
)
# weights 캐시 갱신 (FIFO 2개만 보관)
cache_window = (w_prev or []) + [w_curr]
if len(cache_window) > 2:
cache_window = cache_window[-2:]
db.upsert_baseline(
metric="drift_weights_cache",
window_values=cache_window,
mu=0.0, sigma=0.0,
last_pushed_draw_no=current_draw_no,
)
except Exception as e:
logger.warning(f"strategy_drift 평가 실패: {e}")
# --- Confidence (deep_check + curate_result 필수) ---
if source == "deep" and curate_result is not None:
try:
cv = signals.confidence_score(curate_result)
if cv is not None:
results.append(
evaluate_metric_and_persist(
source=source, metric="confidence",
value=cv, draw_no=current_draw_no,
z_normal=z_normal, z_urgent=z_urgent,
push_to_window=True,
payload={"draw_no": current_draw_no},
)
)
except Exception as e:
logger.warning(f"confidence 평가 실패: {e}")
overall = signals.decide_overall_fire(
[{"metric": r["metric"], "z": r["z_score"], "fire": r["fire_level"]} for r in results]
)
return {"overall_fire": overall, "results": results}
- Step 4: Add lotto_best + lotto_strategy_weights to service_proxy.py
agent-office/app/service_proxy.py 파일 맨 아래 추가:
async def lotto_best() -> List[Dict[str, Any]]:
"""GET /api/lotto/best — best_picks 20개 (numbers + scores 5종)."""
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/best")
resp.raise_for_status()
data = resp.json()
items = data.get("items") if isinstance(data, dict) else data
return items or []
async def lotto_strategy_weights() -> Dict[str, float]:
"""GET /api/lotto/strategy/weights — 전략별 가중치 dict."""
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/strategy/weights")
resp.raise_for_status()
data = resp.json()
weights = data.get("weights") if isinstance(data, dict) else data
# API 응답 형태가 [{strategy, weight, ...}] 일 수 있어 dict로 정규화
if isinstance(weights, list):
return {item["strategy"]: float(item["weight"]) for item in weights}
return {k: float(v) for k, v in (weights or {}).items()}
- Step 5: Run tests to verify they pass
Run: cd agent-office && pytest tests/test_lotto_signal_runner.py -v
Expected: All 4 tests PASS
- Step 6: Commit
git add agent-office/app/curator/signal_runner.py \
agent-office/app/service_proxy.py \
agent-office/tests/test_lotto_signal_runner.py
git commit -m "feat(lotto-signals): signal_runner orchestrator + service_proxy GET helpers"
Phase 2 — cron 통합 (텔레그램 X, DB 기록만)
Task 5: config.py env vars 7개 추가
Files:
-
Modify:
agent-office/app/config.py(파일 끝) -
Step 1: Append env var section
agent-office/app/config.py 끝에 추가:
# Lotto Active Signals
LOTTO_SIGNAL_WINDOW = int(os.getenv("LOTTO_SIGNAL_WINDOW", "8"))
LOTTO_Z_NORMAL = float(os.getenv("LOTTO_Z_NORMAL", "1.5"))
LOTTO_Z_URGENT = float(os.getenv("LOTTO_Z_URGENT", "2.5"))
LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9"))
LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25"))
LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6"))
LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3"))
- Step 2: Smoke check
cd agent-office && python -c "from app.config import LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR; print(LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR)"
Expected: 1.5 9
- Step 3: Commit
git add agent-office/app/config.py
git commit -m "feat(lotto-signals): config env vars 7종 추가 (window/임계치/digest/throttle)"
Task 6: LottoAgent에 on_signal_check / on_daily_digest 추가 (텔레그램 발송 X)
Files:
-
Modify:
agent-office/app/agents/lotto.py -
Step 1: Extend LottoAgent class
agent-office/app/agents/lotto.py의 on_command()를 다음으로 교체 (signal check 액션 추가):
async def on_command(self, action: str, params: dict) -> dict:
if action in ("curate_now", "curate_weekly"):
return await self._run(source="manual")
if action == "status":
return {"ok": True, "message": f"{self.state}: {self.state_detail}"}
if action in ("signal_check", "light_check", "sim_check", "deep_check"):
return await self.run_signal_check(source=action.replace("_check", "") if action != "signal_check" else "light")
if action == "daily_digest":
return await self.run_daily_digest()
return {"ok": False, "message": f"unknown action: {action}"}
그리고 클래스 끝에 신규 메서드 추가:
async def run_signal_check(self, source: str = "light") -> dict:
"""비-LLM 시그널 평가 (light/sim) 또는 deep_check (LLM 호출 후).
Phase 2에선 텔레그램 발송 안 함. lotto_signals INSERT만.
Phase 3에서 텔레그램 트리거 추가.
"""
from ..curator.signal_runner import run_signal_check
from ..config import LOTTO_Z_NORMAL, LOTTO_Z_URGENT
from ..db import add_log
if self.state not in ("idle", "reporting"):
return {"ok": False, "message": f"busy ({self.state})"}
try:
curate_result: dict | None = None
current_draw_no: int | None = None
if source == "deep":
# deep_check은 큐레이션을 먼저 수행 (LLM 호출)
from ..curator.pipeline import curate_weekly
cw = await curate_weekly(source="signal_deep")
curate_result = cw.get("payload") or cw # confidence 키 필요
current_draw_no = cw.get("draw_no")
outcome = await run_signal_check(
source=source,
z_normal=LOTTO_Z_NORMAL,
z_urgent=LOTTO_Z_URGENT,
curate_result=curate_result,
current_draw_no=current_draw_no,
)
add_log(
self.agent_id,
f"signal_check({source}) → overall={outcome['overall_fire']} results={len(outcome['results'])}",
)
return {"ok": True, **outcome}
except Exception as e:
add_log(self.agent_id, f"signal_check 예외: {e}", level="error")
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_daily_digest(self) -> dict:
"""Phase 2: 발화 카운트만 반환. Phase 3에서 텔레그램 발송 추가."""
from ..db import get_recent_lotto_signals, add_log
sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
add_log(self.agent_id, f"daily_digest: 지난 24h 발화 {len(sigs)}건")
return {"ok": True, "count": len(sigs), "signals": sigs}
- Step 2: Smoke import check
cd agent-office && python -c "from app.agents.lotto import LottoAgent; print(LottoAgent.agent_id)"
Expected: lotto
- Step 3: Commit
git add agent-office/app/agents/lotto.py
git commit -m "feat(lotto-signals): LottoAgent.run_signal_check/run_daily_digest (텔레그램 X)"
Task 7: scheduler.py에 cron 4개 추가
Files:
-
Modify:
agent-office/app/scheduler.py -
Step 1: Add cron job functions + registrations
기존 _run_lotto_schedule 근처에 함수 4개 추가:
async def _run_lotto_light_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="light")
async def _run_lotto_sim_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="sim")
async def _run_lotto_deep_check():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_signal_check(source="deep")
async def _run_lotto_daily_digest():
agent = AGENT_REGISTRY.get("lotto")
if agent:
await agent.run_daily_digest()
기존 lotto_curate cron 등록(line 75) 직후에 4개 추가:
scheduler.add_job(_run_lotto_light_check, "cron", hour=9, minute=15, id="lotto_light_check")
scheduler.add_job(_run_lotto_sim_check, "cron", minute=15, hour="0,4,8,12,16,20", id="lotto_sim_check")
scheduler.add_job(_run_lotto_deep_check, "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check")
# digest hour/min은 동적 (env)이지만 cron 등록은 상수로 — 9:25 고정. 변경 필요 시 컨테이너 재시작.
scheduler.add_job(_run_lotto_daily_digest, "cron", hour=9, minute=25, id="lotto_digest")
- Step 2: Verify import + cron registration
cd agent-office && python -c "
from app.scheduler import _run_lotto_light_check, _run_lotto_sim_check, _run_lotto_deep_check, _run_lotto_daily_digest
print('all imports ok')
"
Expected: all imports ok
- Step 3: Commit
git add agent-office/app/scheduler.py
git commit -m "feat(lotto-signals): scheduler cron 4종 등록 (light/sim/deep/digest)"
Phase 3 — 텔레그램 알림 + Throttle + API endpoints
Task 8: 텔레그램 메시지 폼 단위 테스트
Files:
-
Create:
agent-office/tests/test_lotto_telegram_signal.py -
Step 1: Write failing tests
# agent-office/tests/test_lotto_telegram_signal.py
from app.notifiers.telegram_lotto import (
_format_urgent_signal,
_format_signal_digest,
)
def test_urgent_signal_format_basic():
event = {
"fire_level": "urgent",
"triggered_at": "2026-05-20T07:18:00.000Z",
"results": [
{"metric": "sim_signal", "value": 1.84, "z_score": 3.9,
"baseline_mu": 1.02, "baseline_sigma": 0.21, "payload": {}},
{"metric": "drift", "value": 0.18, "z_score": 3.0,
"baseline_mu": 0.06, "baseline_sigma": 0.04,
"payload": {"weights_now": {"gap_focus": 0.5, "hot_focus": 0.5}}},
],
}
text = _format_urgent_signal(event)
assert "🚨" in text
assert "Sim Consensus" in text
assert "z=3.9" in text
assert "Strategy Drift" in text or "drift" in text.lower()
def test_signal_digest_format_with_signals():
digest = {
"evaluated": 6,
"fired": 2,
"signals": [
{"metric": "sim_signal", "fire_level": "normal", "z_score": 1.7,
"triggered_at": "2026-05-20T16:18:00Z", "payload": {}},
{"metric": "confidence", "fire_level": "normal", "z_score": 1.6,
"triggered_at": "2026-05-20T09:05:00Z", "payload": {}},
],
"weights_trend": {"gap_focus": +0.12, "hot_focus": -0.02, "pair_bias": -0.08},
}
text = _format_signal_digest(digest)
assert "📊" in text
assert "지난 24h" in text
assert "z=1.7" in text
def test_signal_digest_empty_returns_empty_string():
"""발화 0건이면 빈 문자열 → 발송 자체 skip 가능."""
text = _format_signal_digest({"evaluated": 6, "fired": 0, "signals": [], "weights_trend": {}})
assert text == ""
- Step 2: Run test to verify it fails
Run: cd agent-office && pytest tests/test_lotto_telegram_signal.py -v
Expected: FAIL ImportError: cannot import name '_format_urgent_signal'
- Step 3: Implement telegram_lotto.py additions
agent-office/app/notifiers/telegram_lotto.py에 추가 (파일 끝):
# ---------- 능동 시그널 알림 (urgent + digest) ----------
_METRIC_LABEL = {
"sim_signal": "Sim Consensus",
"drift": "Strategy Drift",
"confidence": "Confidence",
}
def _format_urgent_signal(event: Dict[str, Any]) -> str:
"""긴급 시그널 텔레그램 메시지 포맷."""
triggered = event.get("triggered_at", "")[:19].replace("T", " ")
results = event.get("results", [])
fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")]
lines = [
"🚨 로또 능동 신호",
"",
f"[{triggered}]",
f"강한 시그널 {len(fired)}종 발화:",
]
for r in fired:
label = _METRIC_LABEL.get(r["metric"], r["metric"])
v = r.get("value")
mu = r.get("baseline_mu")
sigma = r.get("baseline_sigma")
z = r.get("z_score")
if mu is not None and sigma is not None:
lines.append(f"• {label} {v:.2f} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}")
else:
lines.append(f"• {label} {v:.2f}")
# drift 페이로드 — 어떤 전략이 변동했는지 한 줄
for r in fired:
if r["metric"] == "drift":
wn = (r.get("payload") or {}).get("weights_now") or {}
wp = (r.get("payload") or {}).get("weights_prev") or {}
if wn and wp:
diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))}
top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2]
detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top)
lines.append("")
lines.append(f"요인: {detail}")
break
lines.append("")
lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)")
return "\n".join(lines)
def _format_signal_digest(digest: Dict[str, Any]) -> str:
"""일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호)."""
fired = int(digest.get("fired", 0))
if fired == 0:
return ""
signals_list = digest.get("signals", [])
evaluated = digest.get("evaluated", 0)
lines = [
"📊 로또 일일 요약 (지난 24h)",
"",
f"평가 {evaluated}회 / 발화 {fired}회",
]
for s in signals_list:
label = _METRIC_LABEL.get(s["metric"], s["metric"])
z = s.get("z_score")
when = (s.get("triggered_at") or "")[11:16] # HH:MM
z_text = f"z={z:.1f}" if z is not None else "z=-"
lines.append(f"• {label:14s} {s['fire_level']:6s} {z_text} ({when})")
weights_trend = digest.get("weights_trend") or {}
if weights_trend:
lines += ["", "전략 가중치 추세 (최근 8회 baseline):"]
for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])):
arrow = "↑" if delta > 0.01 else ("↓" if delta < -0.01 else "→")
lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%")
return "\n".join(lines)
async def send_urgent_signal(event: Dict[str, Any]) -> None:
text = _format_urgent_signal(event)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] urgent signal send failed: {e}")
async def send_signal_summary(digest: Dict[str, Any]) -> None:
text = _format_signal_digest(digest)
if not text:
return # 발화 0건이면 발송 skip
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] digest send failed: {e}")
- Step 4: Run tests to verify they pass
Run: cd agent-office && pytest tests/test_lotto_telegram_signal.py -v
Expected: All 3 tests PASS
- Step 5: Commit
git add agent-office/app/notifiers/telegram_lotto.py \
agent-office/tests/test_lotto_telegram_signal.py
git commit -m "feat(lotto-signals): 텔레그램 urgent/digest 메시지 포맷"
Task 9: signal_check이 텔레그램 발송 트리거 + throttle 통합
Files:
-
Modify:
agent-office/app/agents/lotto.py -
Step 1: Update run_signal_check to fire telegram on urgent
run_signal_check()의 마지막 return {"ok": True, **outcome} 직전에 추가:
# --- Throttle + 텔레그램 urgent 발송 ---
from ..config import (
LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX,
)
from ..db import (
get_last_signal_notification, get_recent_urgent_count,
mark_signal_notified,
)
from ..notifiers.telegram_lotto import send_urgent_signal
if outcome["overall_fire"] == "urgent":
# daily cap 체크
if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX:
add_log(
self.agent_id,
f"urgent daily cap 도달 → normal로 강등 (digest 합류)",
level="warning",
)
else:
# throttle: 어느 메트릭이라도 6시간 이내 같은 fire_level 알림이 있으면 skip
blocked = False
for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"):
last = get_last_signal_notification(
metric=r["metric"], fire_level=r["fire_level"],
hours=LOTTO_THROTTLE_HOURS,
)
if last:
blocked = True
break
if not blocked:
from datetime import datetime, timezone
event = {
"fire_level": "urgent",
"triggered_at": datetime.now(timezone.utc).isoformat(),
"results": outcome["results"],
}
await send_urgent_signal(event)
# 모든 발화 시그널에 대해 notified_at 마킹
for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"):
mark_signal_notified(r["signal_id"])
add_log(self.agent_id, f"urgent 텔레그램 발송 완료 (시그널 {len(outcome['results'])}개 마킹)")
- Step 2: Update run_daily_digest to actually send telegram
run_daily_digest()를 다음으로 교체:
async def run_daily_digest(self) -> dict:
"""일일 요약 — 지난 24h normal/urgent 발화를 묶어 텔레그램 1통."""
from ..db import get_recent_lotto_signals, get_signals_history, add_log, get_all_baselines
from ..notifiers.telegram_lotto import send_signal_summary
sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
# 전체 평가 횟수 (noop+warmup 포함)
total_24h = get_signals_history(days=1)
evaluated = len(total_24h)
# weights_trend: drift_weights_cache에서 prev/curr 차이 계산
trend = {}
try:
from ..db import get_baseline
cache = get_baseline("drift_weights_cache")
if cache and isinstance(cache["window_values"], list) and len(cache["window_values"]) >= 2:
prev_w = cache["window_values"][-2]
curr_w = cache["window_values"][-1]
trend = {
k: curr_w.get(k, 0.0) - prev_w.get(k, 0.0)
for k in (set(prev_w) | set(curr_w))
}
except Exception as e:
add_log(self.agent_id, f"weights_trend 계산 실패: {e}", level="warning")
digest = {
"evaluated": evaluated,
"fired": len(sigs),
"signals": sigs,
"weights_trend": trend,
}
await send_signal_summary(digest)
add_log(self.agent_id, f"daily_digest 발송: 평가 {evaluated} / 발화 {len(sigs)}")
return {"ok": True, **digest}
- Step 3: Smoke import check
cd agent-office && python -c "
import asyncio
from app.agents.lotto import LottoAgent
agent = LottoAgent()
print('methods:', [m for m in dir(agent) if 'signal' in m or 'digest' in m])
"
Expected: methods: ['run_daily_digest', 'run_signal_check']
- Step 4: Commit
git add agent-office/app/agents/lotto.py
git commit -m "feat(lotto-signals): urgent 텔레그램 발송 + throttle/cap + daily digest 전송"
Task 10: FastAPI endpoint 3개 추가
Files:
-
Modify:
agent-office/app/main.py -
Step 1: Add endpoints
agent-office/app/main.py에서 다른 lotto 관련 endpoint 근처에 추가:
@app.get("/api/agent-office/lotto/signals")
async def list_lotto_signals(days: int = 7):
"""시그널 이력 (모든 fire_level)."""
from .db import get_signals_history
return {"items": get_signals_history(days=days)}
@app.get("/api/agent-office/lotto/baselines")
async def list_lotto_baselines():
"""현재 baseline μ/σ + window 상태."""
from .db import get_all_baselines
return {"items": get_all_baselines()}
@app.post("/api/agent-office/lotto/signal-check")
async def trigger_signal_check(source: str = "light"):
"""수동 트리거 (디버그·테스트용). source ∈ {light, sim, deep}."""
if source not in ("light", "sim", "deep"):
from fastapi import HTTPException
raise HTTPException(status_code=400, detail="source must be light/sim/deep")
agent = AGENT_REGISTRY.get("lotto")
if not agent:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="lotto agent not registered")
return await agent.run_signal_check(source=source)
- Step 2: Verify endpoints
cd agent-office && python -c "
from app.main import app
routes = [r.path for r in app.routes if 'lotto' in r.path and 'signal' in r.path or 'baseline' in r.path]
print(routes)
"
Expected: 3 routes listed including /api/agent-office/lotto/signals, /baselines, /signal-check
- Step 3: Commit
git add agent-office/app/main.py
git commit -m "feat(lotto-signals): GET signals/baselines + POST signal-check endpoint"
Task 11: CLAUDE.md 업데이트 (agent-office 섹션)
Files:
-
Modify:
web-backend/CLAUDE.md -
Step 1: Update agent-office API & scheduler section
web-backend/CLAUDE.md의 agent-office API 표에 다음 행 추가 (적절한 위치):
| GET | `/api/agent-office/lotto/signals?days=7` | 로또 능동 시그널 이력 (모든 fire_level) |
| GET | `/api/agent-office/lotto/baselines` | 로또 메트릭별 baseline μ/σ + 윈도우 상태 |
| POST | `/api/agent-office/lotto/signal-check?source=light` | 로또 시그널 평가 수동 트리거 (light/sim/deep) |
그리고 "스케줄러 job" 항목에 추가:
- 09:15 매일 — 로또 light_check (시뮬·전략 가중치 평가)
- 매 4시간 :15 — 로또 sim_check (00/04/08/12/16/20시)
- 일/수 21:15 — 로또 deep_check (큐레이션 후 confidence 포함 평가)
- 09:25 매일 — 로또 daily_digest (지난 24h 발화 텔레그램 1통)
또한 새 환경변수 명시:
- `LOTTO_SIGNAL_WINDOW`: baseline 윈도우 크기 (기본 8)
- `LOTTO_Z_NORMAL`: normal fire 임계치 (기본 1.5)
- `LOTTO_Z_URGENT`: urgent fire 임계치 (기본 2.5)
- `LOTTO_THROTTLE_HOURS`: 같은 메트릭 재발화 throttle (기본 6시간)
- `LOTTO_URGENT_DAILY_MAX`: urgent 하루 cap (기본 3통)
- Step 2: Verify
git -C C:/Users/jaeoh/Desktop/workspace/web-backend diff CLAUDE.md | head -50
- Step 3: Commit
git add CLAUDE.md
git commit -m "docs(CLAUDE): agent-office 로또 능동 시그널 API/스케줄러/env 추가"
Task 12: NAS 배포 + 24h 가동 검증 (수동)
- Step 1: git push로 자동 배포
git push
Expected: Gitea webhook → deployer가 docker compose up -d --build 실행 → agent-office 재시작.
- Step 2: 컨테이너 상태 확인
ssh nas "docker logs agent-office --tail 50"
Expected:
-
init_db로그에서lotto_signals,lotto_baselines테이블 생성 성공 메시지 -
scheduler 로그에서
lotto_light_check,lotto_sim_check,lotto_deep_check,lotto_digest4개 job 등록 확인 -
Step 3: 수동 트리거로 즉시 검증
curl -X POST "https://gahusb.synology.me/api/agent-office/lotto/signal-check?source=light"
Expected: {"ok": true, "overall_fire": "warmup", "results": [...]}
(첫 호출은 baseline 비어있어 warmup)
- Step 4: DB에 시그널 들어갔는지 확인
curl "https://gahusb.synology.me/api/agent-office/lotto/signals?days=1"
Expected: {"items": [{"id": 1, "source": "light", "metric": "sim_signal", "fire_level": "warmup", ...}]}
- Step 5: 24시간 가동 후 baseline 누적 확인
24h 후:
curl "https://gahusb.synology.me/api/agent-office/lotto/baselines"
Expected: sim_signal 메트릭의 window_values 길이가 6~7개 (4h 시뮬 6회 + 1회 light)
- Step 6: 텔레그램 발송 검증 (자연 발화 또는 강제)
24h 가동 후에도 자연 발화 없으면 강제 테스트:
LOTTO_Z_NORMAL=0.1로 일시 낮춤 → 재배포 → 다음 sim_check에서 발화- 텔레그램으로 🚨 메시지 도착 확인 후 원복
Self-Review (수행 완료)
1. Spec coverage check
| Spec 섹션 | 구현 task |
|---|---|
| 4.1 Sim Consensus | Task 1, 2 |
| 4.2 Strategy Drift | Task 1, 2 |
| 4.3 Confidence | Task 1, 2 |
| 4.4 Adaptive Baseline | Task 1, 2 |
| 4.5 Trigger × Metric matrix | Task 4 (DRAW_SCOPED_METRICS + source 분기) |
| 4.6 Fire 결정 | Task 1 (decide_overall_fire) |
| 5.1 알림 흐름 | Task 9 |
| 5.2 메시지 폼 | Task 8 |
| 5.3 Throttle | Task 9 |
| 6.1 lotto_signals 테이블 | Task 3 |
| 6.2 lotto_baselines 테이블 | Task 3 |
| 7. API 3종 | Task 10 |
| 8. env vars 7종 | Task 5 |
| 9. cron 4종 | Task 7 |
모두 매핑됨.
2. Placeholder scan: TBD/TODO 없음. 모든 step에 실제 코드 포함.
3. Type consistency:
evaluate_metric_and_persist시그니처는 Task 4 정의 그대로 Task 6에서 호출_format_urgent_signal(event)event 구조:fire_level,triggered_at,results[]→ Task 8 테스트 & Task 9 호출 일치digest구조:evaluated,fired,signals,weights_trend→ Task 8 테스트 & Task 9 호출 일치- DB CRUD 함수명 (
insert_lotto_signal,get_recent_lotto_signals,mark_signal_notified등) — Task 3 정의와 Task 9 호출 일치
이슈 없음.
비목표 (Out of scope)
- Layer B 자동 시뮬 강도 조절 (v2)
- 텔레그램 인라인 키보드 / 사용자 피드백 루프 (v2)
- 핫넘버/콜드넘버 시그널 (노이즈 위험으로 v1 제외)
- 프론트
/lotto/agentUI (web-ui 별도 PR) - Hot/Cold rotation 메트릭 추가