# agent-office/app/curator/signals.py """LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수). DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list. signal_runner.py에서 DB 연동 + cron 진입점 담당. """ from __future__ import annotations import math from dataclasses import dataclass, field from statistics import mean, stdev from typing import Any, Dict, List, Optional, Tuple # ---------- Metric: Sim Consensus ---------- def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]: """20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수.""" if not picks: return [] n_metrics = len(picks[0]["scores"]) columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)] norms_per_col = [] for col in columns: lo, hi = min(col), max(col) rng = hi - lo if rng == 0: # 모두 0이면 0.0(기하평균 페널티), 모두 동일한 양수면 0.5(타이 처리) fallback = 0.0 if lo == 0 else 0.5 norms_per_col.append([fallback] * len(col)) else: norms_per_col.append([(v - lo) / rng for v in col]) return [ [norms_per_col[k][i] for k in range(n_metrics)] for i in range(len(picks)) ] def _geomean(values: List[float]) -> float: """기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티).""" if not values: return 0.0 if any(v <= 0 for v in values): return 0.0 log_sum = sum(math.log(v) for v in values) return math.exp(log_sum / len(values)) def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float: """top-10 후보의 기하평균 consensus 평균.""" if not best_picks: return 0.0 normalized = _normalize_columns(best_picks) consensus = [_geomean(scores) for scores in normalized] consensus.sort(reverse=True) top = consensus[:10] if len(consensus) >= 10 else consensus return mean(top) if top else 0.0 # ---------- Metric: Strategy Drift ---------- def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float: """가중치 변화 절댓값 합. 신규/소멸 전략도 가산.""" keys = set(prev) | set(curr) return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys) # ---------- Metric: Confidence ---------- def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]: """큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None.""" if "confidence" not in curate_result: return None v = float(curate_result["confidence"]) return max(0.0, min(1.0, v)) # ---------- Adaptive Baseline ---------- @dataclass class AdaptiveBaseline: window: List[float] = field(default_factory=list) window_max: int = 8 last_pushed_draw_no: Optional[int] = None @property def size(self) -> int: return len(self.window) @property def mu(self) -> float: return mean(self.window) if self.window else 0.0 @property def sigma(self) -> float: return stdev(self.window) if len(self.window) >= 2 else 0.0 def push(self, value: float, draw_no: Optional[int] = None) -> None: """FIFO push. window_max 초과 시 가장 오래된 값 제거.""" self.window.append(float(value)) if len(self.window) > self.window_max: self.window = self.window[-self.window_max:] if draw_no is not None: self.last_pushed_draw_no = draw_no def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]: """z-score 계산 + fire_level 판정. Returns: (z_score, fire_level) — z_score는 cold start/warmup이면 None. fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'} NOTE: z_score is None when sigma==0 (degenerate window) or warmup. Callers must treat None as "signal present but unquantified" — do not compare None with thresholds directly. """ if self.size < 4: return None, "warmup" z_normal_eff = 2.0 if self.size < self.window_max else z_normal z_urgent_eff = z_urgent if self.sigma == 0: return (None, "urgent") if value > self.mu else (None, "noop") z = (value - self.mu) / self.sigma if z >= z_urgent_eff: return z, "urgent" if z >= z_normal_eff: return z, "normal" return z, "noop" # ---------- Combined fire decision ---------- def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str: """3종 시그널을 종합해 전체 fire_level 결정. Args: signal_results: [{"metric": str, "z": float|None, "fire": str}, ...] Returns: 'noop' | 'normal' | 'urgent' """ fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")] if any(s["fire"] == "urgent" for s in fires): return "urgent" if len(fires) >= 2: return "urgent" if len(fires) == 1: return "normal" return "noop"