diff --git a/agent-office/app/curator/signal_runner.py b/agent-office/app/curator/signal_runner.py new file mode 100644 index 0000000..013cca6 --- /dev/null +++ b/agent-office/app/curator/signal_runner.py @@ -0,0 +1,183 @@ +"""LottoAgent 능동 시그널 — DB I/O + cron 진입점 + 평가 orchestration.""" +from __future__ import annotations +import logging +from typing import Any, Dict, List, Optional + +from .. import db +from .. import service_proxy +from . import signals + +logger = logging.getLogger("agent-office.lotto-signals") + +# 회차 단위 메트릭 (window push 시 last_pushed_draw_no 비교) +DRAW_SCOPED_METRICS = {"drift", "confidence"} + + +def _load_baseline(metric: str) -> signals.AdaptiveBaseline: + row = db.get_baseline(metric) + if row is None: + return signals.AdaptiveBaseline(window=[], window_max=8) + return signals.AdaptiveBaseline( + window=list(row["window_values"]), + window_max=8, + last_pushed_draw_no=row.get("last_pushed_draw_no"), + ) + + +def _save_baseline(metric: str, bl: signals.AdaptiveBaseline) -> None: + db.upsert_baseline( + metric=metric, + window_values=bl.window, + mu=bl.mu, + sigma=bl.sigma, + last_pushed_draw_no=bl.last_pushed_draw_no, + ) + + +def evaluate_metric_and_persist( + source: str, + metric: str, + value: float, + draw_no: Optional[int], + z_normal: float, + z_urgent: float, + push_to_window: bool, + payload: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """단일 메트릭 평가 → lotto_signals INSERT → baseline 갱신. + + 회차 단위 메트릭(drift, confidence)은 같은 draw_no에서 window push 생략. + """ + bl = _load_baseline(metric) + + # 회차 가드 + do_push = push_to_window + if metric in DRAW_SCOPED_METRICS and draw_no is not None: + if bl.last_pushed_draw_no == draw_no: + do_push = False + + # 평가는 push 전 baseline 기준 + z, fire = bl.evaluate(value=value, z_normal=z_normal, z_urgent=z_urgent) + + if do_push: + bl.push(value=value, draw_no=draw_no) + _save_baseline(metric, bl) + else: + # cold start에서도 baseline row를 만들어 두려면 upsert 필요 + _save_baseline(metric, bl) + + sid = db.insert_lotto_signal( + source=source, + metric=metric, + value=value, + baseline_mu=bl.mu if bl.size > 0 else None, + baseline_sigma=bl.sigma if bl.size >= 2 else None, + z_score=z, + fire_level=fire, + payload=payload, + ) + return { + "signal_id": sid, + "metric": metric, + "value": value, + "z_score": z, + "fire_level": fire, + "payload": payload or {}, + } + + +# ---------- Service proxy thin wrappers (monkeypatch 대상) ---------- + +async def _fetch_best_picks() -> List[Dict[str, Any]]: + return await service_proxy.lotto_best() + + +async def _fetch_strategy_weights() -> Dict[str, float]: + return await service_proxy.lotto_strategy_weights() + + +# ---------- Orchestrator ---------- + +async def run_signal_check( + source: str, + z_normal: float = 1.5, + z_urgent: float = 2.5, + curate_result: Optional[Dict[str, Any]] = None, + current_draw_no: Optional[int] = None, +) -> Dict[str, Any]: + """cron 진입점. source ∈ {'light', 'sim', 'deep'}. + + light/sim: Sim Consensus + Strategy Drift 평가 + deep: 위 2종 + Confidence (curate_result 필요) + """ + results: List[Dict[str, Any]] = [] + + # --- Sim Consensus --- + try: + best = await _fetch_best_picks() + v = signals.sim_consensus_score(best) + results.append( + evaluate_metric_and_persist( + source=source, metric="sim_signal", + value=v, draw_no=None, + z_normal=z_normal, z_urgent=z_urgent, + push_to_window=True, + payload={"top_count": min(len(best), 10)}, + ) + ) + except Exception as e: + logger.warning(f"sim_consensus 평가 실패: {e}") + + # --- Strategy Drift (회차 단위) --- + try: + w_curr = await _fetch_strategy_weights() + # weights 캐시: lotto_baselines의 별도 metric 'drift_weights_cache'에 prev/curr 2개 보관 + prev_payload_row = db.get_baseline("drift_weights_cache") + w_prev = prev_payload_row["window_values"] if prev_payload_row else None + + if w_prev and isinstance(w_prev, list) and len(w_prev) > 0 and isinstance(w_prev[0], dict): + prev_dict = w_prev[-1] + drift_value = signals.strategy_drift_score(prev_dict, w_curr) + results.append( + evaluate_metric_and_persist( + source=source, metric="drift", + value=drift_value, draw_no=current_draw_no, + z_normal=z_normal, z_urgent=z_urgent, + push_to_window=True, + payload={"weights_now": w_curr, "weights_prev": prev_dict}, + ) + ) + # weights 캐시 갱신 (최대 2개 FIFO) + cache_window = (w_prev or []) + [w_curr] + if len(cache_window) > 2: + cache_window = cache_window[-2:] + db.upsert_baseline( + metric="drift_weights_cache", + window_values=cache_window, + mu=0.0, sigma=0.0, + last_pushed_draw_no=current_draw_no, + ) + except Exception as e: + logger.warning(f"strategy_drift 평가 실패: {e}") + + # --- Confidence (deep_check + curate_result 필수) --- + if source == "deep" and curate_result is not None: + try: + cv = signals.confidence_score(curate_result) + if cv is not None: + results.append( + evaluate_metric_and_persist( + source=source, metric="confidence", + value=cv, draw_no=current_draw_no, + z_normal=z_normal, z_urgent=z_urgent, + push_to_window=True, + payload={"draw_no": current_draw_no}, + ) + ) + except Exception as e: + logger.warning(f"confidence 평가 실패: {e}") + + overall = signals.decide_overall_fire( + [{"metric": r["metric"], "z": r["z_score"], "fire": r["fire_level"]} for r in results] + ) + return {"overall_fire": overall, "results": results} diff --git a/agent-office/app/service_proxy.py b/agent-office/app/service_proxy.py index a9bb318..ae8afae 100644 --- a/agent-office/app/service_proxy.py +++ b/agent-office/app/service_proxy.py @@ -338,3 +338,25 @@ async def lookup_pipeline_by_msg(msg_id: int) -> Optional[dict]: if resp.status_code == 200: return resp.json() return None + + +async def lotto_best() -> List[Dict[str, Any]]: + """GET /api/lotto/best — best_picks 20개 (numbers + scores 5종).""" + from .config import LOTTO_BACKEND_URL + resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/best") + resp.raise_for_status() + data = resp.json() + items = data.get("items") if isinstance(data, dict) else data + return items or [] + + +async def lotto_strategy_weights() -> Dict[str, float]: + """GET /api/lotto/strategy/weights — 전략별 가중치 dict.""" + from .config import LOTTO_BACKEND_URL + resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/strategy/weights") + resp.raise_for_status() + data = resp.json() + weights = data.get("weights") if isinstance(data, dict) else data + if isinstance(weights, list): + return {item["strategy"]: float(item["weight"]) for item in weights} + return {k: float(v) for k, v in (weights or {}).items()} diff --git a/agent-office/tests/test_lotto_signal_runner.py b/agent-office/tests/test_lotto_signal_runner.py new file mode 100644 index 0000000..beb968e --- /dev/null +++ b/agent-office/tests/test_lotto_signal_runner.py @@ -0,0 +1,114 @@ +import gc +import os +import sys +import tempfile + +_fd, _TMP = tempfile.mkstemp(suffix=".db") +os.close(_fd) +os.unlink(_TMP) +os.environ["AGENT_OFFICE_DB_PATH"] = _TMP + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import pytest + +from app.curator import signal_runner +from app import db + + +@pytest.fixture(autouse=True) +def fresh_db(): + gc.collect() + if os.path.exists(_TMP): + os.remove(_TMP) + db.init_db() + yield + gc.collect() + if os.path.exists(_TMP): + try: + os.remove(_TMP) + except PermissionError: + pass # Windows: WAL-mode file locked; DB is ephemeral anyway + + +def test_evaluate_and_persist_cold_start(): + """첫 호출은 warmup으로 기록되고 baseline에 값이 들어간다.""" + result = signal_runner.evaluate_metric_and_persist( + source="light", + metric="sim_signal", + value=1.5, + draw_no=None, + z_normal=1.5, + z_urgent=2.5, + push_to_window=True, + ) + assert result["fire_level"] == "warmup" + assert result["z_score"] is None + + bl = db.get_baseline("sim_signal") + assert bl is not None + assert bl["window_values"] == [1.5] + + +def test_evaluate_after_window_filled_normal_fire(): + """8회 push 후 정상 운영, 평균 대비 z≥1.5면 normal.""" + for v in [1.0, 1.1, 0.9, 1.0, 1.0, 1.1, 0.9, 1.0]: + signal_runner.evaluate_metric_and_persist( + source="sim", + metric="sim_signal", + value=v, + draw_no=None, + z_normal=1.5, + z_urgent=2.5, + push_to_window=True, + ) + + result = signal_runner.evaluate_metric_and_persist( + source="sim", + metric="sim_signal", + value=1.12, + draw_no=None, + z_normal=1.5, + z_urgent=2.5, + push_to_window=True, + ) + assert result["fire_level"] in ("normal", "urgent") + assert result["z_score"] is not None and result["z_score"] >= 1.5 + + +def test_evaluate_drift_skips_same_draw_push(): + """drift는 회차 단위. 같은 회차에서 두 번 호출하면 두 번째는 window push X.""" + signal_runner.evaluate_metric_and_persist( + source="sim", metric="drift", value=0.05, draw_no=1100, + z_normal=1.5, z_urgent=2.5, push_to_window=True, + ) + bl_before = db.get_baseline("drift") + assert bl_before["window_values"] == [0.05] + assert bl_before["last_pushed_draw_no"] == 1100 + + signal_runner.evaluate_metric_and_persist( + source="sim", metric="drift", value=0.08, draw_no=1100, + z_normal=1.5, z_urgent=2.5, push_to_window=True, + ) + bl_after = db.get_baseline("drift") + assert bl_after["window_values"] == [0.05] + + +@pytest.mark.asyncio +async def test_run_signal_check_aggregates_three_metrics(monkeypatch): + """run_signal_check이 3종 메트릭 모두 평가하고 overall fire를 반환.""" + async def fake_lotto_best(): + return [{"numbers": [1,2,3,4,5,6], "scores": [10,10,10,10,10]}] * 20 + + async def fake_lotto_strategy_weights(): + return {"gap_focus": 0.4, "hot_focus": 0.3, "pair_bias": 0.3} + + monkeypatch.setattr(signal_runner, "_fetch_best_picks", fake_lotto_best) + monkeypatch.setattr(signal_runner, "_fetch_strategy_weights", fake_lotto_strategy_weights) + + out = await signal_runner.run_signal_check(source="light", curate_result=None, current_draw_no=1101) + assert "overall_fire" in out + assert "results" in out + assert any(r["metric"] == "sim_signal" for r in out["results"]) + # light_check는 confidence 평가 안 함 + assert not any(r["metric"] == "confidence" for r in out["results"])