147 lines
4.8 KiB
Python
147 lines
4.8 KiB
Python
# agent-office/app/curator/signals.py
|
|
"""LottoAgent 능동 모니터링 — 시그널 평가 & adaptive baseline (순수 함수).
|
|
|
|
DB I/O 없음. 입력은 모두 dict/list, 출력도 dict/list.
|
|
signal_runner.py에서 DB 연동 + cron 진입점 담당.
|
|
"""
|
|
from __future__ import annotations
|
|
import math
|
|
from dataclasses import dataclass, field
|
|
from statistics import mean, pstdev
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
|
# ---------- Metric: Sim Consensus ----------
|
|
|
|
def _normalize_columns(picks: List[Dict[str, Any]]) -> List[List[float]]:
|
|
"""20개 후보의 5종 점수 컬럼별 min-max normalize → 후보별 5종 정규화 점수."""
|
|
if not picks:
|
|
return []
|
|
n_metrics = len(picks[0]["scores"])
|
|
columns = [[p["scores"][k] for p in picks] for k in range(n_metrics)]
|
|
norms_per_col = []
|
|
for col in columns:
|
|
lo, hi = min(col), max(col)
|
|
rng = hi - lo
|
|
if rng == 0:
|
|
# 모두 0이면 0.0(기하평균 페널티), 모두 동일한 양수면 0.5(타이 처리)
|
|
fallback = 0.0 if lo == 0 else 0.5
|
|
norms_per_col.append([fallback] * len(col))
|
|
else:
|
|
norms_per_col.append([(v - lo) / rng for v in col])
|
|
return [
|
|
[norms_per_col[k][i] for k in range(n_metrics)]
|
|
for i in range(len(picks))
|
|
]
|
|
|
|
|
|
def _geomean(values: List[float]) -> float:
|
|
"""기하평균. 0이 하나라도 있으면 0 (한 차원이 0인 후보 강하게 페널티)."""
|
|
if not values:
|
|
return 0.0
|
|
if any(v <= 0 for v in values):
|
|
return 0.0
|
|
log_sum = sum(math.log(v) for v in values)
|
|
return math.exp(log_sum / len(values))
|
|
|
|
|
|
def sim_consensus_score(best_picks: List[Dict[str, Any]]) -> float:
|
|
"""top-10 후보의 기하평균 consensus 평균."""
|
|
if not best_picks:
|
|
return 0.0
|
|
normalized = _normalize_columns(best_picks)
|
|
consensus = [_geomean(scores) for scores in normalized]
|
|
consensus.sort(reverse=True)
|
|
top = consensus[:10] if len(consensus) >= 10 else consensus
|
|
return mean(top) if top else 0.0
|
|
|
|
|
|
# ---------- Metric: Strategy Drift ----------
|
|
|
|
def strategy_drift_score(prev: Dict[str, float], curr: Dict[str, float]) -> float:
|
|
"""가중치 변화 절댓값 합. 신규/소멸 전략도 가산."""
|
|
keys = set(prev) | set(curr)
|
|
return sum(abs(curr.get(k, 0.0) - prev.get(k, 0.0)) for k in keys)
|
|
|
|
|
|
# ---------- Metric: Confidence ----------
|
|
|
|
def confidence_score(curate_result: Dict[str, Any]) -> Optional[float]:
|
|
"""큐레이션 결과의 confidence를 0~1로 clamp. 없으면 None."""
|
|
if "confidence" not in curate_result:
|
|
return None
|
|
v = float(curate_result["confidence"])
|
|
return max(0.0, min(1.0, v))
|
|
|
|
|
|
# ---------- Adaptive Baseline ----------
|
|
|
|
@dataclass
|
|
class AdaptiveBaseline:
|
|
window: List[float] = field(default_factory=list)
|
|
window_max: int = 8
|
|
last_pushed_draw_no: Optional[int] = None
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
return len(self.window)
|
|
|
|
@property
|
|
def mu(self) -> float:
|
|
return mean(self.window) if self.window else 0.0
|
|
|
|
@property
|
|
def sigma(self) -> float:
|
|
return pstdev(self.window) if len(self.window) >= 2 else 0.0
|
|
|
|
def push(self, value: float, draw_no: Optional[int] = None) -> None:
|
|
"""FIFO push. window_max 초과 시 가장 오래된 값 제거."""
|
|
self.window.append(float(value))
|
|
if len(self.window) > self.window_max:
|
|
self.window = self.window[-self.window_max:]
|
|
if draw_no is not None:
|
|
self.last_pushed_draw_no = draw_no
|
|
|
|
def evaluate(self, value: float, z_normal: float, z_urgent: float) -> Tuple[Optional[float], str]:
|
|
"""z-score 계산 + fire_level 판정.
|
|
|
|
Returns:
|
|
(z_score, fire_level) — z_score는 cold start/warmup이면 None.
|
|
fire_level ∈ {'warmup', 'noop', 'normal', 'urgent'}
|
|
"""
|
|
if self.size < 4:
|
|
return None, "warmup"
|
|
|
|
z_normal_eff = 2.0 if self.size < self.window_max else z_normal
|
|
z_urgent_eff = z_urgent
|
|
|
|
if self.sigma == 0:
|
|
return (None, "urgent") if value > self.mu else (None, "noop")
|
|
|
|
z = (value - self.mu) / self.sigma
|
|
if z >= z_urgent_eff:
|
|
return z, "urgent"
|
|
if z >= z_normal_eff:
|
|
return z, "normal"
|
|
return z, "noop"
|
|
|
|
|
|
# ---------- Combined fire decision ----------
|
|
|
|
def decide_overall_fire(signal_results: List[Dict[str, Any]]) -> str:
|
|
"""3종 시그널을 종합해 전체 fire_level 결정.
|
|
|
|
Args:
|
|
signal_results: [{"metric": str, "z": float|None, "fire": str}, ...]
|
|
Returns:
|
|
'noop' | 'normal' | 'urgent'
|
|
"""
|
|
fires = [s for s in signal_results if s["fire"] in ("normal", "urgent")]
|
|
if any(s["fire"] == "urgent" for s in fires):
|
|
return "urgent"
|
|
if len(fires) >= 2:
|
|
return "urgent"
|
|
if len(fires) == 1:
|
|
return "normal"
|
|
return "noop"
|