"""LottoAgent 능동 시그널 — DB I/O + cron 진입점 + 평가 orchestration.""" from __future__ import annotations import logging from typing import Any, Dict, List, Optional from .. import db from .. import service_proxy from . import signals logger = logging.getLogger("agent-office.lotto-signals") # 회차 단위 메트릭 (window push 시 last_pushed_draw_no 비교) DRAW_SCOPED_METRICS = {"drift", "confidence"} def _load_baseline(metric: str) -> signals.AdaptiveBaseline: row = db.get_baseline(metric) if row is None: return signals.AdaptiveBaseline(window=[], window_max=8) return signals.AdaptiveBaseline( window=list(row["window_values"]), window_max=8, last_pushed_draw_no=row.get("last_pushed_draw_no"), ) def _save_baseline(metric: str, bl: signals.AdaptiveBaseline) -> None: db.upsert_baseline( metric=metric, window_values=bl.window, mu=bl.mu, sigma=bl.sigma, last_pushed_draw_no=bl.last_pushed_draw_no, ) def evaluate_metric_and_persist( source: str, metric: str, value: float, draw_no: Optional[int], z_normal: float, z_urgent: float, push_to_window: bool, payload: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """단일 메트릭 평가 → lotto_signals INSERT → baseline 갱신. 회차 단위 메트릭(drift, confidence)은 같은 draw_no에서 window push 생략. """ bl = _load_baseline(metric) # 회차 가드 do_push = push_to_window if metric in DRAW_SCOPED_METRICS and draw_no is not None: if bl.last_pushed_draw_no == draw_no: do_push = False # 평가는 push 전 baseline 기준 z, fire = bl.evaluate(value=value, z_normal=z_normal, z_urgent=z_urgent) if do_push: bl.push(value=value, draw_no=draw_no) _save_baseline(metric, bl) else: # cold start에서도 baseline row를 만들어 두려면 upsert 필요 _save_baseline(metric, bl) sid = db.insert_lotto_signal( source=source, metric=metric, value=value, baseline_mu=bl.mu if bl.size > 0 else None, baseline_sigma=bl.sigma if bl.size >= 2 else None, z_score=z, fire_level=fire, payload=payload, ) return { "signal_id": sid, "metric": metric, "value": value, "z_score": z, "fire_level": fire, "payload": payload or {}, } # ---------- Service proxy thin wrappers (monkeypatch 대상) ---------- async def _fetch_best_picks() -> List[Dict[str, Any]]: return await service_proxy.lotto_best() async def _fetch_strategy_weights() -> Dict[str, float]: return await service_proxy.lotto_strategy_weights() # ---------- Orchestrator ---------- async def run_signal_check( source: str, z_normal: float = 1.5, z_urgent: float = 2.5, curate_result: Optional[Dict[str, Any]] = None, current_draw_no: Optional[int] = None, ) -> Dict[str, Any]: """cron 진입점. source ∈ {'light', 'sim', 'deep'}. light/sim: Sim Consensus + Strategy Drift 평가 deep: 위 2종 + Confidence (curate_result 필요) """ results: List[Dict[str, Any]] = [] # --- Sim Consensus --- try: best = await _fetch_best_picks() v = signals.sim_consensus_score(best) results.append( evaluate_metric_and_persist( source=source, metric="sim_signal", value=v, draw_no=None, z_normal=z_normal, z_urgent=z_urgent, push_to_window=True, payload={"top_count": min(len(best), 10)}, ) ) except Exception as e: logger.warning(f"sim_consensus 평가 실패: {e}") # --- Strategy Drift (회차 단위) --- try: w_curr = await _fetch_strategy_weights() # weights 캐시: lotto_baselines의 별도 metric 'drift_weights_cache'에 prev/curr 2개 보관 prev_payload_row = db.get_baseline("drift_weights_cache") w_prev = prev_payload_row["window_values"] if prev_payload_row else None if w_prev and isinstance(w_prev, list) and len(w_prev) > 0 and isinstance(w_prev[0], dict): prev_dict = w_prev[-1] drift_value = signals.strategy_drift_score(prev_dict, w_curr) results.append( evaluate_metric_and_persist( source=source, metric="drift", value=drift_value, draw_no=current_draw_no, z_normal=z_normal, z_urgent=z_urgent, push_to_window=True, payload={"weights_now": w_curr, "weights_prev": prev_dict}, ) ) # weights 캐시 갱신 (최대 2개 FIFO) cache_window = (w_prev or []) + [w_curr] if len(cache_window) > 2: cache_window = cache_window[-2:] db.upsert_baseline( metric="drift_weights_cache", window_values=cache_window, mu=0.0, sigma=0.0, last_pushed_draw_no=current_draw_no, ) except Exception as e: logger.warning(f"strategy_drift 평가 실패: {e}") # --- Confidence (deep_check + curate_result 필수) --- if source == "deep" and curate_result is not None: try: cv = signals.confidence_score(curate_result) if cv is not None: results.append( evaluate_metric_and_persist( source=source, metric="confidence", value=cv, draw_no=current_draw_no, z_normal=z_normal, z_urgent=z_urgent, push_to_window=True, payload={"draw_no": current_draw_no}, ) ) except Exception as e: logger.warning(f"confidence 평가 실패: {e}") overall = signals.decide_overall_fire( [{"metric": r["metric"], "z": r["z_score"], "fire": r["fire_level"]} for r in results] ) return {"overall_fire": overall, "results": results}