Files
web-page-backend/docs/superpowers/plans/2026-05-20-lotto-active-agent.md
gahusb b4e873b5b0 docs(plan): LottoAgent 능동성 확장 구현 plan (12 tasks, Phase 1-3)
Why: spec (2026-05-20-lotto-active-agent-design.md)을 12개 atomic task
(TDD: 테스트→fail→구현→pass→commit)로 분해. 24h 가동 검증 task 포함.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 02:26:49 +09:00

57 KiB
Raw Permalink Blame History

LottoAgent 능동성 확장 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: LottoAgent에 다중 트리거 + 적응형 시그널 모니터링 + 텔레그램 능동 알림(긴급/일일 요약) 기능 추가

Architecture: agent-office 단일 컨테이너 안에 curator/signals.py 모듈 신설 + agent_office.db에 2개 테이블 추가. lotto-lab은 read-only GET API만 소비, 변경 없음. cron 4개로 light / sim / deep / digest 분리.

Tech Stack: Python 3.12, FastAPI, APScheduler, SQLite, httpx, pytest, pytest-asyncio

Spec: docs/superpowers/specs/2026-05-20-lotto-active-agent-design.md


File Structure

경로 작업 책임
agent-office/app/curator/signals.py Create 3종 메트릭 평가 + adaptive baseline (순수 함수)
agent-office/app/curator/signal_runner.py Create DB I/O + cron 진입점 + 알림 결정
agent-office/app/db.py Modify lotto_signals, lotto_baselines 테이블 + CRUD
agent-office/app/service_proxy.py Modify lotto_best(), lotto_strategy_weights() 추가
agent-office/app/agents/lotto.py Modify on_signal_check(), on_daily_digest() 추가
agent-office/app/scheduler.py Modify cron 4개 등록
agent-office/app/notifiers/telegram_lotto.py Modify send_urgent_signal(), send_signal_summary()
agent-office/app/main.py Modify endpoint 3개
agent-office/app/config.py Modify env vars 7개
agent-office/tests/test_lotto_signals.py Create signals.py 단위 테스트
agent-office/tests/test_lotto_signal_runner.py Create signal_runner DB I/O 통합 테스트
agent-office/tests/test_lotto_telegram_signal.py Create 텔레그램 메시지 포맷 테스트
web-backend/CLAUDE.md Modify agent-office 섹션 갱신

Phase 1 — 순수 함수 + DB 마이그레이션

Task 1: signals.py 메트릭 함수 단위 테스트

Files:

  • Create: agent-office/tests/test_lotto_signals.py

  • Step 1: Write failing tests for Sim Consensus

# agent-office/tests/test_lotto_signals.py
import math
import pytest

from app.curator import signals


def test_sim_consensus_top10_geomean():
    """top-10 consensus 평균이 기하평균 기반인지."""
    best_picks = [
        {"scores": [10, 10, 10, 10, 10]},  # high & uniform
        {"scores": [9,  9,  9,  9,  9]},
        {"scores": [8,  8,  8,  8,  8]},
        {"scores": [7,  7,  7,  7,  7]},
        {"scores": [6,  6,  6,  6,  6]},
        {"scores": [5,  5,  5,  5,  5]},
        {"scores": [4,  4,  4,  4,  4]},
        {"scores": [3,  3,  3,  3,  3]},
        {"scores": [2,  2,  2,  2,  2]},
        {"scores": [1,  1,  1,  1,  1]},  # top 10
        {"scores": [0,  0,  0,  0,  0]},  # bottom 10
    ] * 1 + [{"scores": [0, 0, 0, 0, 0]}] * 10
    # 처음 10개가 top, 다 normalize 후 consensus 동일하니까 top10 평균은 명확
    result = signals.sim_consensus_score(best_picks)
    assert 0.0 <= result <= 1.0
    # uniform 분포에선 top10 평균이 단조 감소 분포의 상반부 평균과 같아야 함
    assert result > 0.4


def test_sim_consensus_geomean_penalizes_imbalance():
    """5종 중 한 종만 폭주하는 outlier 후보는 균형 후보보다 작아야 한다."""
    balanced = [{"scores": [5, 5, 5, 5, 5]}] * 20
    imbalanced = [{"scores": [25, 0, 0, 0, 0]}] * 20  # 평균은 같지만 한 점수만 폭주
    s_balanced = signals.sim_consensus_score(balanced)
    s_imbalanced = signals.sim_consensus_score(imbalanced)
    # imbalanced는 normalize 후 한 차원만 1.0, 나머지 0.0 → 기하평균 0
    assert s_imbalanced < s_balanced


def test_strategy_drift_score():
    """drift = 전략별 가중치 변화 절댓값 합."""
    w_prev = {"gap_focus": 0.30, "hot_focus": 0.25, "pair_bias": 0.45}
    w_curr = {"gap_focus": 0.40, "hot_focus": 0.20, "pair_bias": 0.40}
    # |0.10| + |-0.05| + |-0.05| = 0.20
    result = signals.strategy_drift_score(w_prev, w_curr)
    assert abs(result - 0.20) < 1e-9


def test_strategy_drift_new_strategy_appears():
    """이전에 없던 전략이 등장하면 그 가중치 전체가 drift에 가산."""
    w_prev = {"gap_focus": 0.5, "hot_focus": 0.5}
    w_curr = {"gap_focus": 0.4, "hot_focus": 0.4, "newbie": 0.2}
    # |0.5-0.4| + |0.5-0.4| + |0-0.2| = 0.4
    result = signals.strategy_drift_score(w_prev, w_curr)
    assert abs(result - 0.4) < 1e-9


def test_confidence_score_passthrough():
    """confidence는 큐레이션 결과의 값 그대로 (0~1 clamp 확인)."""
    assert signals.confidence_score({"confidence": 0.85}) == 0.85
    assert signals.confidence_score({"confidence": 1.2}) == 1.0   # clamp
    assert signals.confidence_score({"confidence": -0.1}) == 0.0
    assert signals.confidence_score({}) is None


def test_adaptive_baseline_cold_start():
    """window 크기 < 4 → warmup, z=None."""
    bl = signals.AdaptiveBaseline(window=[1.0, 1.1, 0.9], window_max=8)
    z, fire = bl.evaluate(value=1.5, z_normal=1.5, z_urgent=2.5)
    assert fire == "warmup"
    assert z is None


def test_adaptive_baseline_preparing():
    """window 4~7 → 보수적 임계치 z=2.0."""
    bl = signals.AdaptiveBaseline(window=[1.0, 1.0, 1.0, 1.0], window_max=8)
    # value=1.0 → mu=1.0, sigma=0 → div0 처리 필요. preparing 단계라 임계치는 보수적.
    z, fire = bl.evaluate(value=3.0, z_normal=1.5, z_urgent=2.5)
    # sigma=0인 경우 처리 — value > mu면 무한대로 보고 urgent? 또는 z계산 불가 → warmup?
    # 결정: sigma == 0이면 (window 분산 없음) z=inf 취급, value > mu면 urgent로
    assert fire in ("normal", "urgent")


def test_adaptive_baseline_normal_window_full():
    """window 8 풀, value가 평균보다 1.5σ 이상이면 normal."""
    bl = signals.AdaptiveBaseline(
        window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0],
        window_max=8,
    )
    # μ ≈ 1.0, σ ≈ 0.08 → z=1.5는 value ≈ 1.12
    z, fire = bl.evaluate(value=1.20, z_normal=1.5, z_urgent=2.5)
    assert fire == "normal"
    assert z is not None and z >= 1.5


def test_adaptive_baseline_urgent():
    """z >= 2.5 → urgent."""
    bl = signals.AdaptiveBaseline(
        window=[1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0],
        window_max=8,
    )
    z, fire = bl.evaluate(value=2.0, z_normal=1.5, z_urgent=2.5)
    assert fire == "urgent"


def test_adaptive_baseline_push_updates_window():
    """push 시 FIFO 동작."""
    bl = signals.AdaptiveBaseline(window=[1, 2, 3, 4, 5, 6, 7, 8], window_max=8)
    bl.push(9.0)
    assert bl.window == [2, 3, 4, 5, 6, 7, 8, 9.0]


def test_decide_fire_level_combination():
    """2개 이상 normal 발화 → urgent."""
    # 2개 normal z ≥ 1.5
    sigs = [
        {"metric": "sim", "z": 1.6, "fire": "normal"},
        {"metric": "drift", "z": 1.7, "fire": "normal"},
        {"metric": "conf", "z": 0.5, "fire": "noop"},
    ]
    assert signals.decide_overall_fire(sigs) == "urgent"

    # 1개 normal
    sigs2 = [
        {"metric": "sim", "z": 1.6, "fire": "normal"},
        {"metric": "drift", "z": 0.3, "fire": "noop"},
    ]
    assert signals.decide_overall_fire(sigs2) == "normal"

    # 단일 극값 z ≥ 2.5 → urgent
    sigs3 = [
        {"metric": "sim", "z": 3.0, "fire": "urgent"},
        {"metric": "drift", "z": 0.2, "fire": "noop"},
    ]
    assert signals.decide_overall_fire(sigs3) == "urgent"

    # 전부 noop
    sigs4 = [{"metric": "sim", "z": 0.5, "fire": "noop"}]
    assert signals.decide_overall_fire(sigs4) == "noop"
  • Step 2: Run test to verify it fails

Run: cd agent-office && pytest tests/test_lotto_signals.py -v Expected: FAIL with ModuleNotFoundError: No module named 'app.curator.signals'

  • Step 3: Commit failing tests
git add agent-office/tests/test_lotto_signals.py
git commit -m "test(lotto-signals): 메트릭 함수·adaptive baseline 단위 테스트"

Task 2: signals.py 구현

Files:

  • Create: agent-office/app/curator/signals.py

  • Step 1: Implement signals module

# agent-office/app/curator/signals.py
"""LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수).

DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list.
signal_runner.py에서 DB 연동 + cron 진입점 담당.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from statistics import mean, pstdev
from typing import Any, Dict, List, Optional, Tuple


# ---------- Metric: Sim Consensus ----------

def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]:
    """20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수."""
    if not picks:
        return []
    n_metrics = len(picks[0]["scores"])
    columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)]
    norms_per_col = []
    for col in columns:
        lo, hi = min(col), max(col)
        rng = hi - lo
        if rng == 0:
            norms_per_col.append([0.5] * len(col))  # 모두 동일하면 중립 0.5
        else:
            norms_per_col.append([(v - lo) / rng for v in col])
    # 전치: 후보별 정규화 점수 리스트
    return [
        [norms_per_col[k][i] for k in range(n_metrics)]
        for i in range(len(picks))
    ]


def _geomean(values: List[float]) -> float:
    """기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티)."""
    if not values:
        return 0.0
    if any(v <= 0 for v in values):
        return 0.0
    log_sum = sum(math.log(v) for v in values)
    return math.exp(log_sum / len(values))


def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float:
    """top-10 후보의 기하평균 consensus 평균.

    Args:
        best_picks: [{"numbers": [...], "scores": [s1, s2, s3, s4, s5]}, ...]
    """
    if not best_picks:
        return 0.0
    normalized = _normalize_columns(best_picks)
    consensus = [_geomean(scores) for scores in normalized]
    consensus.sort(reverse=True)
    top = consensus[:10] if len(consensus) >= 10 else consensus
    return mean(top) if top else 0.0


# ---------- Metric: Strategy Drift ----------

def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float:
    """가중치 변화 절댓값 합. 신규/소멸 전략도 가산."""
    keys = set(prev) | set(curr)
    return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys)


# ---------- Metric: Confidence ----------

def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]:
    """큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None."""
    if "confidence" not in curate_result:
        return None
    v = float(curate_result["confidence"])
    return max(0.0, min(1.0, v))


# ---------- Adaptive Baseline ----------

@dataclass
class AdaptiveBaseline:
    window: List[float] = field(default_factory=list)
    window_max: int = 8
    last_pushed_draw_no: Optional[int] = None  # 회차 단위 메트릭 중복 push 방지

    @property
    def size(self) -> int:
        return len(self.window)

    @property
    def mu(self) -> float:
        return mean(self.window) if self.window else 0.0

    @property
    def sigma(self) -> float:
        # population stdev (8개 윈도우라 ddof=0이 안정)
        return pstdev(self.window) if len(self.window) >= 2 else 0.0

    def push(self, value: float, draw_no: Optional[int] = None) -> None:
        """FIFO push. window_max 초과 시 가장 오래된 값 제거."""
        self.window.append(float(value))
        if len(self.window) > self.window_max:
            self.window = self.window[-self.window_max:]
        if draw_no is not None:
            self.last_pushed_draw_no = draw_no

    def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]:
        """z-score 계산 + fire_level 판정.

        Returns:
            (z_score, fire_level) — z_score는 cold start/warmup이면 None.
            fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'}
        """
        if self.size < 4:
            return None, "warmup"

        # 보수적 임계치 (window 4~7)
        z_normal_eff = 2.0 if self.size < self.window_max else z_normal
        z_urgent_eff = z_urgent

        if self.sigma == 0:
            # 분산 없음 — value가 평균을 크게 벗어나면 urgent로 분류, 아니면 noop
            return (None, "urgent") if value > self.mu else (None, "noop")

        z = (value - self.mu) / self.sigma
        if z >= z_urgent_eff:
            return z, "urgent"
        if z >= z_normal_eff:
            return z, "normal"
        return z, "noop"


# ---------- Combined fire decision ----------

def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str:
    """3종 시그널을 종합해 전체 fire_level 결정.

    Args:
        signal_results: [{"metric": str, "z": float|None, "fire": str}, ...]
    Returns:
        'noop' | 'normal' | 'urgent'
    """
    fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")]
    if any(s["fire"] == "urgent" for s in fires):
        return "urgent"
    if len(fires) >= 2:
        return "urgent"  # 2개 이상 normal → urgent로 승격
    if len(fires) == 1:
        return "normal"
    return "noop"
  • Step 2: Run tests to verify they pass

Run: cd agent-office && pytest tests/test_lotto_signals.py -v Expected: All 9 tests PASS

  • Step 3: Commit
git add agent-office/app/curator/signals.py
git commit -m "feat(lotto-signals): 메트릭 함수·adaptive baseline 순수함수 구현"

Task 3: DB 마이그레이션 (lotto_signals + lotto_baselines)

Files:

  • Modify: agent-office/app/db.py:19 (init_db 함수 내)

  • Step 1: Add table creation to init_db

init_db() 함수의 마지막 INSERT OR IGNORE 루프 직전(line 100, youtube_research_jobs 테이블 뒤)에 다음 추가:

        conn.execute("""
            CREATE TABLE IF NOT EXISTS lotto_signals (
                id              INTEGER PRIMARY KEY AUTOINCREMENT,
                triggered_at    TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
                source          TEXT NOT NULL,
                metric          TEXT NOT NULL,
                value           REAL NOT NULL,
                baseline_mu     REAL,
                baseline_sigma  REAL,
                z_score         REAL,
                fire_level      TEXT NOT NULL,
                notified_at     TEXT,
                payload         TEXT
            )
        """)
        conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_ls_triggered
            ON lotto_signals(triggered_at DESC)
        """)
        conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_ls_fire
            ON lotto_signals(fire_level, notified_at)
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS lotto_baselines (
                metric              TEXT PRIMARY KEY,
                window_values       TEXT NOT NULL DEFAULT '[]',
                mu                  REAL NOT NULL DEFAULT 0.0,
                sigma               REAL NOT NULL DEFAULT 0.0,
                last_pushed_draw_no INTEGER,
                updated_at          TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
            )
        """)
  • Step 2: Add CRUD helpers at end of db.py

db.py 파일 맨 아래에 추가:

# --- lotto_signals / lotto_baselines CRUD ---

def insert_lotto_signal(
    source: str,
    metric: str,
    value: float,
    baseline_mu: Optional[float],
    baseline_sigma: Optional[float],
    z_score: Optional[float],
    fire_level: str,
    payload: Optional[Dict[str, Any]] = None,
) -> int:
    with _conn() as conn:
        cur = conn.execute(
            """
            INSERT INTO lotto_signals
              (source, metric, value, baseline_mu, baseline_sigma, z_score, fire_level, payload)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                source, metric, value,
                baseline_mu, baseline_sigma, z_score, fire_level,
                json.dumps(payload or {}, ensure_ascii=False),
            ),
        )
        return cur.lastrowid


def mark_signal_notified(signal_id: int) -> None:
    with _conn() as conn:
        conn.execute(
            "UPDATE lotto_signals SET notified_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?",
            (signal_id,),
        )


def get_recent_lotto_signals(hours: int = 24, min_fire: str = "normal") -> List[Dict[str, Any]]:
    """지난 N시간 발화 시그널. min_fire='normal'이면 normal+urgent."""
    levels = ("urgent",) if min_fire == "urgent" else ("normal", "urgent")
    placeholders = ",".join("?" * len(levels))
    with _conn() as conn:
        rows = conn.execute(
            f"""
            SELECT * FROM lotto_signals
            WHERE triggered_at >= datetime('now', ?)
              AND fire_level IN ({placeholders})
            ORDER BY triggered_at DESC
            """,
            (f"-{int(hours)} hours", *levels),
        ).fetchall()
    return [dict(r) for r in rows]


def get_signals_history(days: int = 7) -> List[Dict[str, Any]]:
    """차트/이력 페이지용 — 모든 fire_level 포함."""
    with _conn() as conn:
        rows = conn.execute(
            """
            SELECT * FROM lotto_signals
            WHERE triggered_at >= datetime('now', ?)
            ORDER BY triggered_at DESC
            """,
            (f"-{int(days)} days",),
        ).fetchall()
    return [dict(r) for r in rows]


def get_recent_urgent_count(hours: int = 24) -> int:
    with _conn() as conn:
        row = conn.execute(
            """
            SELECT COUNT(*) AS c FROM lotto_signals
            WHERE triggered_at >= datetime('now', ?)
              AND fire_level = 'urgent'
              AND notified_at IS NOT NULL
            """,
            (f"-{int(hours)} hours",),
        ).fetchone()
    return int(row["c"]) if row else 0


def get_last_signal_notification(metric: str, fire_level: str, hours: int) -> Optional[str]:
    """같은 metric+fire_level이 hours 내에 알림 발송된 마지막 시각. throttle용."""
    with _conn() as conn:
        row = conn.execute(
            """
            SELECT notified_at FROM lotto_signals
            WHERE metric = ?
              AND fire_level = ?
              AND notified_at IS NOT NULL
              AND notified_at >= datetime('now', ?)
            ORDER BY notified_at DESC LIMIT 1
            """,
            (metric, fire_level, f"-{int(hours)} hours"),
        ).fetchone()
    return row["notified_at"] if row else None


def get_baseline(metric: str) -> Optional[Dict[str, Any]]:
    with _conn() as conn:
        row = conn.execute(
            "SELECT * FROM lotto_baselines WHERE metric = ?",
            (metric,),
        ).fetchone()
    if not row:
        return None
    d = dict(row)
    d["window_values"] = json.loads(d["window_values"])
    return d


def upsert_baseline(
    metric: str,
    window_values: List[float],
    mu: float,
    sigma: float,
    last_pushed_draw_no: Optional[int],
) -> None:
    with _conn() as conn:
        conn.execute(
            """
            INSERT INTO lotto_baselines
              (metric, window_values, mu, sigma, last_pushed_draw_no, updated_at)
            VALUES (?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%fZ','now'))
            ON CONFLICT(metric) DO UPDATE SET
              window_values = excluded.window_values,
              mu = excluded.mu,
              sigma = excluded.sigma,
              last_pushed_draw_no = excluded.last_pushed_draw_no,
              updated_at = excluded.updated_at
            """,
            (
                metric,
                json.dumps(window_values),
                mu, sigma, last_pushed_draw_no,
            ),
        )


def get_all_baselines() -> List[Dict[str, Any]]:
    with _conn() as conn:
        rows = conn.execute("SELECT * FROM lotto_baselines ORDER BY metric").fetchall()
    out = []
    for r in rows:
        d = dict(r)
        d["window_values"] = json.loads(d["window_values"])
        out.append(d)
    return out
  • Step 3: Smoke test the migration

새 임시 DB로 마이그레이션이 동작하는지:

cd agent-office && python -c "
import os
os.environ['AGENT_OFFICE_DB_PATH'] = '/tmp/test_migration.db'
from app.db import init_db, insert_lotto_signal, get_baseline, upsert_baseline
init_db()
sid = insert_lotto_signal('light','sim_signal', 1.5, 1.0, 0.2, 2.5, 'urgent', {'top':[1,2,3]})
print('signal id:', sid)
upsert_baseline('sim_signal', [1.0, 1.1, 0.9], 1.0, 0.1, None)
print('baseline:', get_baseline('sim_signal'))
"
rm /tmp/test_migration.db

Expected: signal id가 1 출력, baseline dict 출력 (window_values는 list)

  • Step 4: Commit
git add agent-office/app/db.py
git commit -m "feat(lotto-signals): lotto_signals/lotto_baselines 테이블 + CRUD"

Task 4: signal_runner.py — DB I/O + 평가 orchestration

Files:

  • Create: agent-office/app/curator/signal_runner.py

  • Create: agent-office/tests/test_lotto_signal_runner.py

  • Step 1: Write failing test for signal_runner

# agent-office/tests/test_lotto_signal_runner.py
import os
import pytest

os.environ["AGENT_OFFICE_DB_PATH"] = "/tmp/test_signal_runner.db"

from app.curator import signal_runner
from app import db


@pytest.fixture(autouse=True)
def fresh_db():
    if os.path.exists("/tmp/test_signal_runner.db"):
        os.remove("/tmp/test_signal_runner.db")
    db.init_db()
    yield
    if os.path.exists("/tmp/test_signal_runner.db"):
        os.remove("/tmp/test_signal_runner.db")


def test_evaluate_and_persist_cold_start():
    """첫 호출은 warmup으로 기록되고 baseline은 비어있다."""
    result = signal_runner.evaluate_metric_and_persist(
        source="light",
        metric="sim_signal",
        value=1.5,
        draw_no=None,
        z_normal=1.5,
        z_urgent=2.5,
        push_to_window=True,
    )
    assert result["fire_level"] == "warmup"
    assert result["z_score"] is None

    bl = db.get_baseline("sim_signal")
    assert bl is not None
    assert bl["window_values"] == [1.5]


def test_evaluate_after_window_filled_normal_fire():
    """8회 push 후 정상 운영, 평균 대비 z≥1.5면 normal."""
    for v in [1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0]:
        signal_runner.evaluate_metric_and_persist(
            source="sim",
            metric="sim_signal",
            value=v,
            draw_no=None,
            z_normal=1.5,
            z_urgent=2.5,
            push_to_window=True,
        )

    # 9번째: window는 [v2..v8, 1.0]이지만 push 후 평가. 평균≈1.0, σ≈0.08 → z 큰 값
    result = signal_runner.evaluate_metric_and_persist(
        source="sim",
        metric="sim_signal",
        value=1.20,
        draw_no=None,
        z_normal=1.5,
        z_urgent=2.5,
        push_to_window=True,
    )
    assert result["fire_level"] in ("normal", "urgent")
    assert result["z_score"] is not None and result["z_score"] >= 1.5


def test_evaluate_drift_skips_same_draw_push():
    """drift는 회차 단위. 같은 회차에서 두 번 호출하면 두 번째는 window push X."""
    # 첫 push (draw 1100)
    signal_runner.evaluate_metric_and_persist(
        source="sim", metric="drift", value=0.05, draw_no=1100,
        z_normal=1.5, z_urgent=2.5, push_to_window=True,
    )
    bl_before = db.get_baseline("drift")
    assert bl_before["window_values"] == [0.05]
    assert bl_before["last_pushed_draw_no"] == 1100

    # 같은 회차 두 번째 호출 — window push X
    signal_runner.evaluate_metric_and_persist(
        source="sim", metric="drift", value=0.08, draw_no=1100,
        z_normal=1.5, z_urgent=2.5, push_to_window=True,
    )
    bl_after = db.get_baseline("drift")
    assert bl_after["window_values"] == [0.05]  # 변경 없음


def test_run_signal_check_aggregates_three_metrics(monkeypatch):
    """run_signal_check이 3종 메트릭 모두 평가하고 overall fire를 반환."""
    # mock service_proxy
    async def fake_lotto_best():
        # 동일한 5종 점수의 균등 후보 20개
        return [{"numbers": [1,2,3,4,5,6], "scores": [10,10,10,10,10]}] * 20

    async def fake_lotto_strategy_weights():
        return {"gap_focus": 0.4, "hot_focus": 0.3, "pair_bias": 0.3}

    monkeypatch.setattr(signal_runner, "_fetch_best_picks", fake_lotto_best)
    monkeypatch.setattr(signal_runner, "_fetch_strategy_weights", fake_lotto_strategy_weights)

    import asyncio
    out = asyncio.get_event_loop().run_until_complete(
        signal_runner.run_signal_check(source="light", curate_result=None, current_draw_no=1101)
    )
    assert "overall_fire" in out
    assert "results" in out
    assert any(r["metric"] == "sim_signal" for r in out["results"])
    assert any(r["metric"] == "drift" for r in out["results"])
    # light_check는 confidence 평가 안 함
    assert not any(r["metric"] == "confidence" for r in out["results"])
  • Step 2: Run test to verify it fails

Run: cd agent-office && pytest tests/test_lotto_signal_runner.py -v Expected: FAIL ModuleNotFoundError: No module named 'app.curator.signal_runner'

  • Step 3: Implement signal_runner.py
# agent-office/app/curator/signal_runner.py
"""LottoAgent 능동 시그널 — DB I/O + cron 진입점 + 평가 orchestration."""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional

from .. import db
from .. import service_proxy
from . import signals

logger = logging.getLogger("agent-office.lotto-signals")

# 회차 단위 메트릭 (window push 시 last_pushed_draw_no 비교)
DRAW_SCOPED_METRICS = {"drift", "confidence"}


def _load_baseline(metric: str) -> signals.AdaptiveBaseline:
    row = db.get_baseline(metric)
    if row is None:
        return signals.AdaptiveBaseline(window=[], window_max=8)
    return signals.AdaptiveBaseline(
        window=list(row["window_values"]),
        window_max=8,
        last_pushed_draw_no=row.get("last_pushed_draw_no"),
    )


def _save_baseline(metric: str, bl: signals.AdaptiveBaseline) -> None:
    db.upsert_baseline(
        metric=metric,
        window_values=bl.window,
        mu=bl.mu,
        sigma=bl.sigma,
        last_pushed_draw_no=bl.last_pushed_draw_no,
    )


def evaluate_metric_and_persist(
    source: str,
    metric: str,
    value: float,
    draw_no: Optional[int],
    z_normal: float,
    z_urgent: float,
    push_to_window: bool,
    payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """단일 메트릭 평가 → lotto_signals INSERT → baseline 갱신.

    회차 단위 메트릭(drift, confidence)은 같은 draw_no에서 window push 생략.
    """
    bl = _load_baseline(metric)

    # 회차 가드: 같은 회차 재평가 시 window push 생략
    do_push = push_to_window
    if metric in DRAW_SCOPED_METRICS and draw_no is not None:
        if bl.last_pushed_draw_no == draw_no:
            do_push = False

    # 평가는 push 전 baseline 기준 (현재값 vs 과거 분포)
    z, fire = bl.evaluate(value=value, z_normal=z_normal, z_urgent=z_urgent)

    if do_push:
        bl.push(value=value, draw_no=draw_no)
        _save_baseline(metric, bl)

    sid = db.insert_lotto_signal(
        source=source,
        metric=metric,
        value=value,
        baseline_mu=bl.mu if bl.size > 0 else None,
        baseline_sigma=bl.sigma if bl.size >= 2 else None,
        z_score=z,
        fire_level=fire,
        payload=payload,
    )
    return {
        "signal_id": sid,
        "metric": metric,
        "value": value,
        "z_score": z,
        "fire_level": fire,
        "payload": payload or {},
    }


# ---------- Service proxy thin wrappers (monkeypatch 대상) ----------

async def _fetch_best_picks() -> List[Dict[str, Any]]:
    return await service_proxy.lotto_best()


async def _fetch_strategy_weights() -> Dict[str, float]:
    return await service_proxy.lotto_strategy_weights()


# ---------- Orchestrator ----------

async def run_signal_check(
    source: str,
    z_normal: float = 1.5,
    z_urgent: float = 2.5,
    curate_result: Optional[Dict[str, Any]] = None,
    current_draw_no: Optional[int] = None,
) -> Dict[str, Any]:
    """cron 진입점. source ∈ {'light', 'sim', 'deep'}.

    light/sim: Sim Consensus + Strategy Drift 평가
    deep: 위 2종 + Confidence (curate_result 필요)
    """
    results: List[Dict[str, Any]] = []

    # --- Sim Consensus ---
    try:
        best = await _fetch_best_picks()
        v = signals.sim_consensus_score(best)
        results.append(
            evaluate_metric_and_persist(
                source=source, metric="sim_signal",
                value=v, draw_no=None,
                z_normal=z_normal, z_urgent=z_urgent,
                push_to_window=True,
                payload={"top_count": min(len(best), 10)},
            )
        )
    except Exception as e:
        logger.warning(f"sim_consensus 평가 실패: {e}")

    # --- Strategy Drift (회차 단위) ---
    try:
        w_curr = await _fetch_strategy_weights()
        bl_drift = _load_baseline("drift")
        # drift는 prev → curr 비교가 필요. baseline window의 마지막 entry는 이전 drift값.
        # prev_weights는 별도 저장이 필요한데, 간단히 lotto_baselines.payload에 안 두고
        # 매 호출마다 GET 후 캐시되지 않은 직전 값과 비교 불가 → drift = 1회차 단위.
        # 단순화: 이번 호출의 가중치를 baseline에 저장하기 전 직전 저장값을 prev로 사용.
        prev_payload_row = db.get_baseline("drift_weights_cache")
        w_prev = prev_payload_row["window_values"] if prev_payload_row else None

        if w_prev and isinstance(w_prev, list) and len(w_prev) > 0 and isinstance(w_prev[0], dict):
            # last entry is dict of weights
            prev_dict = w_prev[-1]
            drift_value = signals.strategy_drift_score(prev_dict, w_curr)
            results.append(
                evaluate_metric_and_persist(
                    source=source, metric="drift",
                    value=drift_value, draw_no=current_draw_no,
                    z_normal=z_normal, z_urgent=z_urgent,
                    push_to_window=True,
                    payload={"weights_now": w_curr, "weights_prev": prev_dict},
                )
            )
        # weights 캐시 갱신 (FIFO 2개만 보관)
        cache_window = (w_prev or []) + [w_curr]
        if len(cache_window) > 2:
            cache_window = cache_window[-2:]
        db.upsert_baseline(
            metric="drift_weights_cache",
            window_values=cache_window,
            mu=0.0, sigma=0.0,
            last_pushed_draw_no=current_draw_no,
        )
    except Exception as e:
        logger.warning(f"strategy_drift 평가 실패: {e}")

    # --- Confidence (deep_check + curate_result 필수) ---
    if source == "deep" and curate_result is not None:
        try:
            cv = signals.confidence_score(curate_result)
            if cv is not None:
                results.append(
                    evaluate_metric_and_persist(
                        source=source, metric="confidence",
                        value=cv, draw_no=current_draw_no,
                        z_normal=z_normal, z_urgent=z_urgent,
                        push_to_window=True,
                        payload={"draw_no": current_draw_no},
                    )
                )
        except Exception as e:
            logger.warning(f"confidence 평가 실패: {e}")

    overall = signals.decide_overall_fire(
        [{"metric": r["metric"], "z": r["z_score"], "fire": r["fire_level"]} for r in results]
    )
    return {"overall_fire": overall, "results": results}
  • Step 4: Add lotto_best + lotto_strategy_weights to service_proxy.py

agent-office/app/service_proxy.py 파일 맨 아래 추가:

async def lotto_best() -> List[Dict[str, Any]]:
    """GET /api/lotto/best — best_picks 20개 (numbers + scores 5종)."""
    from .config import LOTTO_BACKEND_URL
    resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/best")
    resp.raise_for_status()
    data = resp.json()
    items = data.get("items") if isinstance(data, dict) else data
    return items or []


async def lotto_strategy_weights() -> Dict[str, float]:
    """GET /api/lotto/strategy/weights — 전략별 가중치 dict."""
    from .config import LOTTO_BACKEND_URL
    resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/strategy/weights")
    resp.raise_for_status()
    data = resp.json()
    weights = data.get("weights") if isinstance(data, dict) else data
    # API 응답 형태가 [{strategy, weight, ...}] 일 수 있어 dict로 정규화
    if isinstance(weights, list):
        return {item["strategy"]: float(item["weight"]) for item in weights}
    return {k: float(v) for k, v in (weights or {}).items()}
  • Step 5: Run tests to verify they pass

Run: cd agent-office && pytest tests/test_lotto_signal_runner.py -v Expected: All 4 tests PASS

  • Step 6: Commit
git add agent-office/app/curator/signal_runner.py \
        agent-office/app/service_proxy.py \
        agent-office/tests/test_lotto_signal_runner.py
git commit -m "feat(lotto-signals): signal_runner orchestrator + service_proxy GET helpers"

Phase 2 — cron 통합 (텔레그램 X, DB 기록만)

Task 5: config.py env vars 7개 추가

Files:

  • Modify: agent-office/app/config.py (파일 끝)

  • Step 1: Append env var section

agent-office/app/config.py 끝에 추가:


# Lotto Active Signals
LOTTO_SIGNAL_WINDOW = int(os.getenv("LOTTO_SIGNAL_WINDOW", "8"))
LOTTO_Z_NORMAL = float(os.getenv("LOTTO_Z_NORMAL", "1.5"))
LOTTO_Z_URGENT = float(os.getenv("LOTTO_Z_URGENT", "2.5"))
LOTTO_DIGEST_HOUR = int(os.getenv("LOTTO_DIGEST_HOUR", "9"))
LOTTO_DIGEST_MIN = int(os.getenv("LOTTO_DIGEST_MIN", "25"))
LOTTO_THROTTLE_HOURS = int(os.getenv("LOTTO_THROTTLE_HOURS", "6"))
LOTTO_URGENT_DAILY_MAX = int(os.getenv("LOTTO_URGENT_DAILY_MAX", "3"))
  • Step 2: Smoke check
cd agent-office && python -c "from app.config import LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR; print(LOTTO_Z_NORMAL, LOTTO_DIGEST_HOUR)"

Expected: 1.5 9

  • Step 3: Commit
git add agent-office/app/config.py
git commit -m "feat(lotto-signals): config env vars 7종 추가 (window/임계치/digest/throttle)"

Task 6: LottoAgent에 on_signal_check / on_daily_digest 추가 (텔레그램 발송 X)

Files:

  • Modify: agent-office/app/agents/lotto.py

  • Step 1: Extend LottoAgent class

agent-office/app/agents/lotto.pyon_command()를 다음으로 교체 (signal check 액션 추가):

    async def on_command(self, action: str, params: dict) -> dict:
        if action in ("curate_now", "curate_weekly"):
            return await self._run(source="manual")
        if action == "status":
            return {"ok": True, "message": f"{self.state}: {self.state_detail}"}
        if action in ("signal_check", "light_check", "sim_check", "deep_check"):
            return await self.run_signal_check(source=action.replace("_check", "") if action != "signal_check" else "light")
        if action == "daily_digest":
            return await self.run_daily_digest()
        return {"ok": False, "message": f"unknown action: {action}"}

그리고 클래스 끝에 신규 메서드 추가:

    async def run_signal_check(self, source: str = "light") -> dict:
        """비-LLM 시그널 평가 (light/sim) 또는 deep_check (LLM 호출 후).

        Phase 2에선 텔레그램 발송 안 함. lotto_signals INSERT만.
        Phase 3에서 텔레그램 트리거 추가.
        """
        from ..curator.signal_runner import run_signal_check
        from ..config import LOTTO_Z_NORMAL, LOTTO_Z_URGENT
        from ..db import add_log

        if self.state not in ("idle", "reporting"):
            return {"ok": False, "message": f"busy ({self.state})"}

        try:
            curate_result: dict | None = None
            current_draw_no: int | None = None

            if source == "deep":
                # deep_check은 큐레이션을 먼저 수행 (LLM 호출)
                from ..curator.pipeline import curate_weekly
                cw = await curate_weekly(source="signal_deep")
                curate_result = cw.get("payload") or cw  # confidence 키 필요
                current_draw_no = cw.get("draw_no")

            outcome = await run_signal_check(
                source=source,
                z_normal=LOTTO_Z_NORMAL,
                z_urgent=LOTTO_Z_URGENT,
                curate_result=curate_result,
                current_draw_no=current_draw_no,
            )
            add_log(
                self.agent_id,
                f"signal_check({source}) → overall={outcome['overall_fire']} results={len(outcome['results'])}",
            )
            return {"ok": True, **outcome}
        except Exception as e:
            add_log(self.agent_id, f"signal_check 예외: {e}", level="error")
            return {"ok": False, "message": f"{type(e).__name__}: {e}"}

    async def run_daily_digest(self) -> dict:
        """Phase 2: 발화 카운트만 반환. Phase 3에서 텔레그램 발송 추가."""
        from ..db import get_recent_lotto_signals, add_log
        sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
        add_log(self.agent_id, f"daily_digest: 지난 24h 발화 {len(sigs)}건")
        return {"ok": True, "count": len(sigs), "signals": sigs}
  • Step 2: Smoke import check
cd agent-office && python -c "from app.agents.lotto import LottoAgent; print(LottoAgent.agent_id)"

Expected: lotto

  • Step 3: Commit
git add agent-office/app/agents/lotto.py
git commit -m "feat(lotto-signals): LottoAgent.run_signal_check/run_daily_digest (텔레그램 X)"

Task 7: scheduler.py에 cron 4개 추가

Files:

  • Modify: agent-office/app/scheduler.py

  • Step 1: Add cron job functions + registrations

기존 _run_lotto_schedule 근처에 함수 4개 추가:

async def _run_lotto_light_check():
    agent = AGENT_REGISTRY.get("lotto")
    if agent:
        await agent.run_signal_check(source="light")

async def _run_lotto_sim_check():
    agent = AGENT_REGISTRY.get("lotto")
    if agent:
        await agent.run_signal_check(source="sim")

async def _run_lotto_deep_check():
    agent = AGENT_REGISTRY.get("lotto")
    if agent:
        await agent.run_signal_check(source="deep")

async def _run_lotto_daily_digest():
    agent = AGENT_REGISTRY.get("lotto")
    if agent:
        await agent.run_daily_digest()

기존 lotto_curate cron 등록(line 75) 직후에 4개 추가:

    scheduler.add_job(_run_lotto_light_check,   "cron", hour=9, minute=15, id="lotto_light_check")
    scheduler.add_job(_run_lotto_sim_check,     "cron", minute=15, hour="0,4,8,12,16,20", id="lotto_sim_check")
    scheduler.add_job(_run_lotto_deep_check,    "cron", day_of_week="sun,wed", hour=21, minute=15, id="lotto_deep_check")
    # digest hour/min은 동적 (env)이지만 cron 등록은 상수로 — 9:25 고정. 변경 필요 시 컨테이너 재시작.
    scheduler.add_job(_run_lotto_daily_digest,  "cron", hour=9, minute=25, id="lotto_digest")
  • Step 2: Verify import + cron registration
cd agent-office && python -c "
from app.scheduler import _run_lotto_light_check, _run_lotto_sim_check, _run_lotto_deep_check, _run_lotto_daily_digest
print('all imports ok')
"

Expected: all imports ok

  • Step 3: Commit
git add agent-office/app/scheduler.py
git commit -m "feat(lotto-signals): scheduler cron 4종 등록 (light/sim/deep/digest)"

Phase 3 — 텔레그램 알림 + Throttle + API endpoints

Task 8: 텔레그램 메시지 폼 단위 테스트

Files:

  • Create: agent-office/tests/test_lotto_telegram_signal.py

  • Step 1: Write failing tests

# agent-office/tests/test_lotto_telegram_signal.py
from app.notifiers.telegram_lotto import (
    _format_urgent_signal,
    _format_signal_digest,
)


def test_urgent_signal_format_basic():
    event = {
        "fire_level": "urgent",
        "triggered_at": "2026-05-20T07:18:00.000Z",
        "results": [
            {"metric": "sim_signal", "value": 1.84, "z_score": 3.9,
             "baseline_mu": 1.02, "baseline_sigma": 0.21, "payload": {}},
            {"metric": "drift", "value": 0.18, "z_score": 3.0,
             "baseline_mu": 0.06, "baseline_sigma": 0.04,
             "payload": {"weights_now": {"gap_focus": 0.5, "hot_focus": 0.5}}},
        ],
    }
    text = _format_urgent_signal(event)
    assert "🚨" in text
    assert "Sim Consensus" in text
    assert "z=3.9" in text
    assert "Strategy Drift" in text or "drift" in text.lower()


def test_signal_digest_format_with_signals():
    digest = {
        "evaluated": 6,
        "fired": 2,
        "signals": [
            {"metric": "sim_signal", "fire_level": "normal", "z_score": 1.7,
             "triggered_at": "2026-05-20T16:18:00Z", "payload": {}},
            {"metric": "confidence", "fire_level": "normal", "z_score": 1.6,
             "triggered_at": "2026-05-20T09:05:00Z", "payload": {}},
        ],
        "weights_trend": {"gap_focus": +0.12, "hot_focus": -0.02, "pair_bias": -0.08},
    }
    text = _format_signal_digest(digest)
    assert "📊" in text
    assert "지난 24h" in text
    assert "z=1.7" in text


def test_signal_digest_empty_returns_empty_string():
    """발화 0건이면 빈 문자열 → 발송 자체 skip 가능."""
    text = _format_signal_digest({"evaluated": 6, "fired": 0, "signals": [], "weights_trend": {}})
    assert text == ""
  • Step 2: Run test to verify it fails

Run: cd agent-office && pytest tests/test_lotto_telegram_signal.py -v Expected: FAIL ImportError: cannot import name '_format_urgent_signal'

  • Step 3: Implement telegram_lotto.py additions

agent-office/app/notifiers/telegram_lotto.py에 추가 (파일 끝):



# ---------- 능동 시그널 알림 (urgent + digest) ----------

_METRIC_LABEL = {
    "sim_signal": "Sim Consensus",
    "drift": "Strategy Drift",
    "confidence": "Confidence",
}


def _format_urgent_signal(event: Dict[str, Any]) -> str:
    """긴급 시그널 텔레그램 메시지 포맷."""
    triggered = event.get("triggered_at", "")[:19].replace("T", " ")
    results = event.get("results", [])
    fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")]

    lines = [
        "🚨 로또 능동 신호",
        "",
        f"[{triggered}]",
        f"강한 시그널 {len(fired)}종 발화:",
    ]
    for r in fired:
        label = _METRIC_LABEL.get(r["metric"], r["metric"])
        v = r.get("value")
        mu = r.get("baseline_mu")
        sigma = r.get("baseline_sigma")
        z = r.get("z_score")
        if mu is not None and sigma is not None:
            lines.append(f"• {label} {v:.2f} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}")
        else:
            lines.append(f"• {label} {v:.2f}")

    # drift 페이로드 — 어떤 전략이 변동했는지 한 줄
    for r in fired:
        if r["metric"] == "drift":
            wn = (r.get("payload") or {}).get("weights_now") or {}
            wp = (r.get("payload") or {}).get("weights_prev") or {}
            if wn and wp:
                diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))}
                top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2]
                detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top)
                lines.append("")
                lines.append(f"요인: {detail}")
            break

    lines.append("")
    lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)")
    return "\n".join(lines)


def _format_signal_digest(digest: Dict[str, Any]) -> str:
    """일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호)."""
    fired = int(digest.get("fired", 0))
    if fired == 0:
        return ""

    signals_list = digest.get("signals", [])
    evaluated = digest.get("evaluated", 0)

    lines = [
        "📊 로또 일일 요약 (지난 24h)",
        "",
        f"평가 {evaluated}회 / 발화 {fired}회",
    ]
    for s in signals_list:
        label = _METRIC_LABEL.get(s["metric"], s["metric"])
        z = s.get("z_score")
        when = (s.get("triggered_at") or "")[11:16]  # HH:MM
        z_text = f"z={z:.1f}" if z is not None else "z=-"
        lines.append(f"• {label:14s} {s['fire_level']:6s} {z_text}  ({when})")

    weights_trend = digest.get("weights_trend") or {}
    if weights_trend:
        lines += ["", "전략 가중치 추세 (최근 8회 baseline):"]
        for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])):
            arrow = "↑" if delta > 0.01 else ("↓" if delta < -0.01 else "→")
            lines.append(f"  {strategy:12s} {arrow} {delta*100:+.0f}%")

    return "\n".join(lines)


async def send_urgent_signal(event: Dict[str, Any]) -> None:
    text = _format_urgent_signal(event)
    try:
        await send_raw(text)
    except Exception as e:
        logger.warning(f"[telegram_lotto] urgent signal send failed: {e}")


async def send_signal_summary(digest: Dict[str, Any]) -> None:
    text = _format_signal_digest(digest)
    if not text:
        return  # 발화 0건이면 발송 skip
    try:
        await send_raw(text)
    except Exception as e:
        logger.warning(f"[telegram_lotto] digest send failed: {e}")
  • Step 4: Run tests to verify they pass

Run: cd agent-office && pytest tests/test_lotto_telegram_signal.py -v Expected: All 3 tests PASS

  • Step 5: Commit
git add agent-office/app/notifiers/telegram_lotto.py \
        agent-office/tests/test_lotto_telegram_signal.py
git commit -m "feat(lotto-signals): 텔레그램 urgent/digest 메시지 포맷"

Task 9: signal_check이 텔레그램 발송 트리거 + throttle 통합

Files:

  • Modify: agent-office/app/agents/lotto.py

  • Step 1: Update run_signal_check to fire telegram on urgent

run_signal_check()의 마지막 return {"ok": True, **outcome} 직전에 추가:

            # --- Throttle + 텔레그램 urgent 발송 ---
            from ..config import (
                LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX,
            )
            from ..db import (
                get_last_signal_notification, get_recent_urgent_count,
                mark_signal_notified,
            )
            from ..notifiers.telegram_lotto import send_urgent_signal

            if outcome["overall_fire"] == "urgent":
                # daily cap 체크
                if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX:
                    add_log(
                        self.agent_id,
                        f"urgent daily cap 도달 → normal로 강등 (digest 합류)",
                        level="warning",
                    )
                else:
                    # throttle: 어느 메트릭이라도 6시간 이내 같은 fire_level 알림이 있으면 skip
                    blocked = False
                    for r in outcome["results"]:
                        if r["fire_level"] in ("normal", "urgent"):
                            last = get_last_signal_notification(
                                metric=r["metric"], fire_level=r["fire_level"],
                                hours=LOTTO_THROTTLE_HOURS,
                            )
                            if last:
                                blocked = True
                                break
                    if not blocked:
                        from datetime import datetime, timezone
                        event = {
                            "fire_level": "urgent",
                            "triggered_at": datetime.now(timezone.utc).isoformat(),
                            "results": outcome["results"],
                        }
                        await send_urgent_signal(event)
                        # 모든 발화 시그널에 대해 notified_at 마킹
                        for r in outcome["results"]:
                            if r["fire_level"] in ("normal", "urgent"):
                                mark_signal_notified(r["signal_id"])
                        add_log(self.agent_id, f"urgent 텔레그램 발송 완료 (시그널 {len(outcome['results'])}개 마킹)")
  • Step 2: Update run_daily_digest to actually send telegram

run_daily_digest()를 다음으로 교체:

    async def run_daily_digest(self) -> dict:
        """일일 요약 — 지난 24h normal/urgent 발화를 묶어 텔레그램 1통."""
        from ..db import get_recent_lotto_signals, get_signals_history, add_log, get_all_baselines
        from ..notifiers.telegram_lotto import send_signal_summary

        sigs = get_recent_lotto_signals(hours=24, min_fire="normal")
        # 전체 평가 횟수 (noop+warmup 포함)
        total_24h = get_signals_history(days=1)
        evaluated = len(total_24h)

        # weights_trend: drift_weights_cache에서 prev/curr 차이 계산
        trend = {}
        try:
            from ..db import get_baseline
            cache = get_baseline("drift_weights_cache")
            if cache and isinstance(cache["window_values"], list) and len(cache["window_values"]) >= 2:
                prev_w = cache["window_values"][-2]
                curr_w = cache["window_values"][-1]
                trend = {
                    k: curr_w.get(k, 0.0) - prev_w.get(k, 0.0)
                    for k in (set(prev_w) | set(curr_w))
                }
        except Exception as e:
            add_log(self.agent_id, f"weights_trend 계산 실패: {e}", level="warning")

        digest = {
            "evaluated": evaluated,
            "fired": len(sigs),
            "signals": sigs,
            "weights_trend": trend,
        }
        await send_signal_summary(digest)
        add_log(self.agent_id, f"daily_digest 발송: 평가 {evaluated} / 발화 {len(sigs)}")
        return {"ok": True, **digest}
  • Step 3: Smoke import check
cd agent-office && python -c "
import asyncio
from app.agents.lotto import LottoAgent
agent = LottoAgent()
print('methods:', [m for m in dir(agent) if 'signal' in m or 'digest' in m])
"

Expected: methods: ['run_daily_digest', 'run_signal_check']

  • Step 4: Commit
git add agent-office/app/agents/lotto.py
git commit -m "feat(lotto-signals): urgent 텔레그램 발송 + throttle/cap + daily digest 전송"

Task 10: FastAPI endpoint 3개 추가

Files:

  • Modify: agent-office/app/main.py

  • Step 1: Add endpoints

agent-office/app/main.py에서 다른 lotto 관련 endpoint 근처에 추가:

@app.get("/api/agent-office/lotto/signals")
async def list_lotto_signals(days: int = 7):
    """시그널 이력 (모든 fire_level)."""
    from .db import get_signals_history
    return {"items": get_signals_history(days=days)}


@app.get("/api/agent-office/lotto/baselines")
async def list_lotto_baselines():
    """현재 baseline μ/σ + window 상태."""
    from .db import get_all_baselines
    return {"items": get_all_baselines()}


@app.post("/api/agent-office/lotto/signal-check")
async def trigger_signal_check(source: str = "light"):
    """수동 트리거 (디버그·테스트용). source ∈ {light, sim, deep}."""
    if source not in ("light", "sim", "deep"):
        from fastapi import HTTPException
        raise HTTPException(status_code=400, detail="source must be light/sim/deep")
    agent = AGENT_REGISTRY.get("lotto")
    if not agent:
        from fastapi import HTTPException
        raise HTTPException(status_code=503, detail="lotto agent not registered")
    return await agent.run_signal_check(source=source)
  • Step 2: Verify endpoints
cd agent-office && python -c "
from app.main import app
routes = [r.path for r in app.routes if 'lotto' in r.path and 'signal' in r.path or 'baseline' in r.path]
print(routes)
"

Expected: 3 routes listed including /api/agent-office/lotto/signals, /baselines, /signal-check

  • Step 3: Commit
git add agent-office/app/main.py
git commit -m "feat(lotto-signals): GET signals/baselines + POST signal-check endpoint"

Task 11: CLAUDE.md 업데이트 (agent-office 섹션)

Files:

  • Modify: web-backend/CLAUDE.md

  • Step 1: Update agent-office API & scheduler section

web-backend/CLAUDE.md의 agent-office API 표에 다음 행 추가 (적절한 위치):

| GET | `/api/agent-office/lotto/signals?days=7` | 로또 능동 시그널 이력 (모든 fire_level) |
| GET | `/api/agent-office/lotto/baselines` | 로또 메트릭별 baseline μ/σ + 윈도우 상태 |
| POST | `/api/agent-office/lotto/signal-check?source=light` | 로또 시그널 평가 수동 트리거 (light/sim/deep) |

그리고 "스케줄러 job" 항목에 추가:

- 09:15 매일 — 로또 light_check (시뮬·전략 가중치 평가)
- 매 4시간 :15 — 로또 sim_check (00/04/08/12/16/20시)
- 일/수 21:15 — 로또 deep_check (큐레이션 후 confidence 포함 평가)
- 09:25 매일 — 로또 daily_digest (지난 24h 발화 텔레그램 1통)

또한 새 환경변수 명시:

- `LOTTO_SIGNAL_WINDOW`: baseline 윈도우 크기 (기본 8)
- `LOTTO_Z_NORMAL`: normal fire 임계치 (기본 1.5)
- `LOTTO_Z_URGENT`: urgent fire 임계치 (기본 2.5)
- `LOTTO_THROTTLE_HOURS`: 같은 메트릭 재발화 throttle (기본 6시간)
- `LOTTO_URGENT_DAILY_MAX`: urgent 하루 cap (기본 3통)
  • Step 2: Verify
git -C C:/Users/jaeoh/Desktop/workspace/web-backend diff CLAUDE.md | head -50
  • Step 3: Commit
git add CLAUDE.md
git commit -m "docs(CLAUDE): agent-office 로또 능동 시그널 API/스케줄러/env 추가"

Task 12: NAS 배포 + 24h 가동 검증 (수동)

  • Step 1: git push로 자동 배포
git push

Expected: Gitea webhook → deployer가 docker compose up -d --build 실행 → agent-office 재시작.

  • Step 2: 컨테이너 상태 확인
ssh nas "docker logs agent-office --tail 50"

Expected:

  • init_db 로그에서 lotto_signals, lotto_baselines 테이블 생성 성공 메시지

  • scheduler 로그에서 lotto_light_check, lotto_sim_check, lotto_deep_check, lotto_digest 4개 job 등록 확인

  • Step 3: 수동 트리거로 즉시 검증

curl -X POST "https://gahusb.synology.me/api/agent-office/lotto/signal-check?source=light"

Expected: {"ok": true, "overall_fire": "warmup", "results": [...]} (첫 호출은 baseline 비어있어 warmup)

  • Step 4: DB에 시그널 들어갔는지 확인
curl "https://gahusb.synology.me/api/agent-office/lotto/signals?days=1"

Expected: {"items": [{"id": 1, "source": "light", "metric": "sim_signal", "fire_level": "warmup", ...}]}

  • Step 5: 24시간 가동 후 baseline 누적 확인

24h 후:

curl "https://gahusb.synology.me/api/agent-office/lotto/baselines"

Expected: sim_signal 메트릭의 window_values 길이가 6~7개 (4h 시뮬 6회 + 1회 light)

  • Step 6: 텔레그램 발송 검증 (자연 발화 또는 강제)

24h 가동 후에도 자연 발화 없으면 강제 테스트:

  • LOTTO_Z_NORMAL=0.1로 일시 낮춤 → 재배포 → 다음 sim_check에서 발화
  • 텔레그램으로 🚨 메시지 도착 확인 후 원복

Self-Review (수행 완료)

1. Spec coverage check

Spec 섹션 구현 task
4.1 Sim Consensus Task 1, 2
4.2 Strategy Drift Task 1, 2
4.3 Confidence Task 1, 2
4.4 Adaptive Baseline Task 1, 2
4.5 Trigger × Metric matrix Task 4 (DRAW_SCOPED_METRICS + source 분기)
4.6 Fire 결정 Task 1 (decide_overall_fire)
5.1 알림 흐름 Task 9
5.2 메시지 폼 Task 8
5.3 Throttle Task 9
6.1 lotto_signals 테이블 Task 3
6.2 lotto_baselines 테이블 Task 3
7. API 3종 Task 10
8. env vars 7종 Task 5
9. cron 4종 Task 7

모두 매핑됨.

2. Placeholder scan: TBD/TODO 없음. 모든 step에 실제 코드 포함.

3. Type consistency:

  • evaluate_metric_and_persist 시그니처는 Task 4 정의 그대로 Task 6에서 호출
  • _format_urgent_signal(event) event 구조: fire_level, triggered_at, results[] → Task 8 테스트 & Task 9 호출 일치
  • digest 구조: evaluated, fired, signals, weights_trend → Task 8 테스트 & Task 9 호출 일치
  • DB CRUD 함수명 (insert_lotto_signal, get_recent_lotto_signals, mark_signal_notified 등) — Task 3 정의와 Task 9 호출 일치

이슈 없음.


비목표 (Out of scope)

  • Layer B 자동 시뮬 강도 조절 (v2)
  • 텔레그램 인라인 키보드 / 사용자 피드백 루프 (v2)
  • 핫넘버/콜드넘버 시그널 (노이즈 위험으로 v1 제외)
  • 프론트 /lotto/agent UI (web-ui 별도 PR)
  • Hot/Cold rotation 메트릭 추가