diff --git a/agent-office/app/curator/signals.py b/agent-office/app/curator/signals.py new file mode 100644 index 0000000..0c63b67 --- /dev/null +++ b/agent-office/app/curator/signals.py @@ -0,0 +1,146 @@ +# agent-office/app/curator/signals.py +"""LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수). + +DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list. +signal_runner.py에서 DB 연동 + cron 진입점 담당. +""" +from __future__ import annotations +import math +from dataclasses import dataclass, field +from statistics import mean, pstdev +from typing import Any, Dict, List, Optional, Tuple + + +# ---------- Metric: Sim Consensus ---------- + +def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]: + """20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수.""" + if not picks: + return [] + n_metrics = len(picks[0]["scores"]) + columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)] + norms_per_col = [] + for col in columns: + lo, hi = min(col), max(col) + rng = hi - lo + if rng == 0: + # 모두 0이면 0.0(기하평균 페널티), 모두 동일한 양수면 0.5(타이 처리) + fallback = 0.0 if lo == 0 else 0.5 + norms_per_col.append([fallback] * len(col)) + else: + norms_per_col.append([(v - lo) / rng for v in col]) + return [ + [norms_per_col[k][i] for k in range(n_metrics)] + for i in range(len(picks)) + ] + + +def _geomean(values: List[float]) -> float: + """기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티).""" + if not values: + return 0.0 + if any(v <= 0 for v in values): + return 0.0 + log_sum = sum(math.log(v) for v in values) + return math.exp(log_sum / len(values)) + + +def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float: + """top-10 후보의 기하평균 consensus 평균.""" + if not best_picks: + return 0.0 + normalized = _normalize_columns(best_picks) + consensus = [_geomean(scores) for scores in normalized] + consensus.sort(reverse=True) + top = consensus[:10] if len(consensus) >= 10 else consensus + return mean(top) if top else 0.0 + + +# ---------- Metric: Strategy Drift ---------- + +def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float: + """가중치 변화 절댓값 합. 신규/소멸 전략도 가산.""" + keys = set(prev) | set(curr) + return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys) + + +# ---------- Metric: Confidence ---------- + +def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]: + """큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None.""" + if "confidence" not in curate_result: + return None + v = float(curate_result["confidence"]) + return max(0.0, min(1.0, v)) + + +# ---------- Adaptive Baseline ---------- + +@dataclass +class AdaptiveBaseline: + window: List[float] = field(default_factory=list) + window_max: int = 8 + last_pushed_draw_no: Optional[int] = None + + @property + def size(self) -> int: + return len(self.window) + + @property + def mu(self) -> float: + return mean(self.window) if self.window else 0.0 + + @property + def sigma(self) -> float: + return pstdev(self.window) if len(self.window) >= 2 else 0.0 + + def push(self, value: float, draw_no: Optional[int] = None) -> None: + """FIFO push. window_max 초과 시 가장 오래된 값 제거.""" + self.window.append(float(value)) + if len(self.window) > self.window_max: + self.window = self.window[-self.window_max:] + if draw_no is not None: + self.last_pushed_draw_no = draw_no + + def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]: + """z-score 계산 + fire_level 판정. + + Returns: + (z_score, fire_level) — z_score는 cold start/warmup이면 None. + fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'} + """ + if self.size < 4: + return None, "warmup" + + z_normal_eff = 2.0 if self.size < self.window_max else z_normal + z_urgent_eff = z_urgent + + if self.sigma == 0: + return (None, "urgent") if value > self.mu else (None, "noop") + + z = (value - self.mu) / self.sigma + if z >= z_urgent_eff: + return z, "urgent" + if z >= z_normal_eff: + return z, "normal" + return z, "noop" + + +# ---------- Combined fire decision ---------- + +def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str: + """3종 시그널을 종합해 전체 fire_level 결정. + + Args: + signal_results: [{"metric": str, "z": float|None, "fire": str}, ...] + Returns: + 'noop' | 'normal' | 'urgent' + """ + fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")] + if any(s["fire"] == "urgent" for s in fires): + return "urgent" + if len(fires) >= 2: + return "urgent" + if len(fires) == 1: + return "normal" + return "noop"