"""로또 큐레이션·당첨 알림 — 텔레그램 푸시.""" import logging from typing import Dict, Any # 기존 에이전트들과 동일한 패턴: send_raw(text, reply_markup=None, chat_id=None) # chat_id 생략 시 기본 TELEGRAM_CHAT_ID로 자동 발송. from ..telegram.messaging import send_raw logger = logging.getLogger("agent-office") LOTTO_URL = "https://gahusb.synology.me/lotto" def _format_briefing(payload: Dict[str, Any]) -> str: draw_no = payload["draw_no"] nar = payload["narrative"] conf = payload["confidence"] # 분배 칩 — core 5세트의 risk_tag 빈도 core = payload["picks"]["core"] role_count = {"안정": 0, "균형": 0, "공격": 0} for p in core: role_count[p["risk_tag"]] = role_count.get(p["risk_tag"], 0) + 1 chip = " · ".join(f"{k} {v}" for k, v in role_count.items() if v) msg = [ f"🎟 {draw_no}회 · 큐레이션 떴음", "", f"\"{nar['headline']}\"", f"신뢰도 {conf} · 분배 {chip}", ] retro = nar.get("retrospective") or "" if retro: msg += ["", f"▸ 회고: {retro}"] msg += ["", f"👉 결정 카드 보러가기 ({LOTTO_URL})"] return "\n".join(msg) def _format_prize_alert(event: Dict[str, Any]) -> str: return ( "🚨 로또 당첨 가능성!\n" f"{event['draw_no']}회 — {event['match_count']}개 일치\n" f"번호: {', '.join(str(n) for n in event['numbers'])}\n" "동행복권에서 즉시 확인하세요." ) async def send_curator_briefing(payload: Dict[str, Any]) -> None: text = _format_briefing(payload) try: await send_raw(text) except Exception as e: logger.warning(f"[telegram_lotto] briefing send failed: {e}") async def send_prize_alert(event: Dict[str, Any]) -> None: text = _format_prize_alert(event) try: await send_raw(text) except Exception as e: logger.warning(f"[telegram_lotto] prize alert send failed: {e}") # ---------- 능동 시그널 알림 (urgent + digest) ---------- _METRIC_LABEL = { "sim_signal": "Sim Consensus", "drift": "Strategy Drift", "confidence": "Confidence", } def _format_urgent_signal(event: Dict[str, Any]) -> str: """긴급 시그널 텔레그램 메시지 포맷.""" triggered = event.get("triggered_at", "")[:19].replace("T", " ") results = event.get("results", []) fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")] lines = [ "🚨 로또 능동 신호", "", f"[{triggered}]", f"강한 시그널 {len(fired)}종 발화:", ] for r in fired: label = _METRIC_LABEL.get(r["metric"], r["metric"]) v = r.get("value") mu = r.get("baseline_mu") sigma = r.get("baseline_sigma") z = r.get("z_score") v_text = f"{v:.2f}" if v is not None else "N/A" if mu is not None and sigma is not None and z is not None: lines.append(f"• {label} {v_text} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}") else: lines.append(f"• {label} {v_text}") # drift 페이로드 — 어떤 전략이 변동했는지 한 줄 for r in fired: if r["metric"] == "drift": wn = (r.get("payload") or {}).get("weights_now") or {} wp = (r.get("payload") or {}).get("weights_prev") or {} if wn and wp: diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))} top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2] detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top) lines.append("") lines.append(f"요인: {detail}") break lines.append("") lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)") return "\n".join(lines) def _format_signal_digest(digest: Dict[str, Any]) -> str: """일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호).""" fired = int(digest.get("fired", 0)) if fired == 0: return "" signals_list = digest.get("signals", []) evaluated = digest.get("evaluated", 0) lines = [ "📊 로또 일일 요약 (지난 24h)", "", f"평가 {evaluated}회 / 발화 {fired}회", ] for s in signals_list: label = _METRIC_LABEL.get(s["metric"], s["metric"]) z = s.get("z_score") when = (s.get("triggered_at") or "")[11:16] # HH:MM z_text = f"z={z:.1f}" if z is not None else "z=-" lines.append(f"• {label:14s} {s['fire_level']:6s} {z_text} ({when})") weights_trend = digest.get("weights_trend") or {} if weights_trend: lines += ["", "전략 가중치 추세 (최근 8회 baseline):"] for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])): arrow = "↑" if delta > 0.01 else ("↓" if delta < -0.01 else "→") lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%") return "\n".join(lines) async def send_urgent_signal(event: Dict[str, Any]) -> None: text = _format_urgent_signal(event) try: await send_raw(text) except Exception as e: logger.warning(f"[telegram_lotto] urgent signal send failed: {e}") async def send_signal_summary(digest: Dict[str, Any]) -> None: text = _format_signal_digest(digest) if not text: return # 발화 0건이면 발송 skip try: await send_raw(text) except Exception as e: logger.warning(f"[telegram_lotto] digest send failed: {e}")