162 lines
5.5 KiB
Python
162 lines
5.5 KiB
Python
"""로또 큐레이션·당첨 알림 — 텔레그램 푸시."""
|
||
import logging
|
||
from typing import Dict, Any
|
||
|
||
# 기존 에이전트들과 동일한 패턴: send_raw(text, reply_markup=None, chat_id=None)
|
||
# chat_id 생략 시 기본 TELEGRAM_CHAT_ID로 자동 발송.
|
||
from ..telegram.messaging import send_raw
|
||
|
||
logger = logging.getLogger("agent-office")
|
||
|
||
LOTTO_URL = "https://gahusb.synology.me/lotto"
|
||
|
||
|
||
def _format_briefing(payload: Dict[str, Any]) -> str:
|
||
draw_no = payload["draw_no"]
|
||
nar = payload["narrative"]
|
||
conf = payload["confidence"]
|
||
|
||
# 분배 칩 — core 5세트의 risk_tag 빈도
|
||
core = payload["picks"]["core"]
|
||
role_count = {"안정": 0, "균형": 0, "공격": 0}
|
||
for p in core:
|
||
role_count[p["risk_tag"]] = role_count.get(p["risk_tag"], 0) + 1
|
||
chip = " · ".join(f"{k} {v}" for k, v in role_count.items() if v)
|
||
|
||
msg = [
|
||
f"🎟 {draw_no}회 · 큐레이션 떴음",
|
||
"",
|
||
f"\"{nar['headline']}\"",
|
||
f"신뢰도 {conf} · 분배 {chip}",
|
||
]
|
||
retro = nar.get("retrospective") or ""
|
||
if retro:
|
||
msg += ["", f"▸ 회고: {retro}"]
|
||
msg += ["", f"👉 결정 카드 보러가기 ({LOTTO_URL})"]
|
||
return "\n".join(msg)
|
||
|
||
|
||
def _format_prize_alert(event: Dict[str, Any]) -> str:
|
||
return (
|
||
"🚨 로또 당첨 가능성!\n"
|
||
f"{event['draw_no']}회 — {event['match_count']}개 일치\n"
|
||
f"번호: {', '.join(str(n) for n in event['numbers'])}\n"
|
||
"동행복권에서 즉시 확인하세요."
|
||
)
|
||
|
||
|
||
async def send_curator_briefing(payload: Dict[str, Any]) -> None:
|
||
text = _format_briefing(payload)
|
||
try:
|
||
await send_raw(text)
|
||
except Exception as e:
|
||
logger.warning(f"[telegram_lotto] briefing send failed: {e}")
|
||
|
||
|
||
async def send_prize_alert(event: Dict[str, Any]) -> None:
|
||
text = _format_prize_alert(event)
|
||
try:
|
||
await send_raw(text)
|
||
except Exception as e:
|
||
logger.warning(f"[telegram_lotto] prize alert send failed: {e}")
|
||
|
||
|
||
# ---------- 능동 시그널 알림 (urgent + digest) ----------
|
||
|
||
_METRIC_LABEL = {
|
||
"sim_signal": "Sim Consensus",
|
||
"drift": "Strategy Drift",
|
||
"confidence": "Confidence",
|
||
}
|
||
|
||
|
||
def _format_urgent_signal(event: Dict[str, Any]) -> str:
|
||
"""긴급 시그널 텔레그램 메시지 포맷."""
|
||
triggered = event.get("triggered_at", "")[:19].replace("T", " ")
|
||
results = event.get("results", [])
|
||
fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")]
|
||
|
||
lines = [
|
||
"🚨 로또 능동 신호",
|
||
"",
|
||
f"[{triggered}]",
|
||
f"강한 시그널 {len(fired)}종 발화:",
|
||
]
|
||
for r in fired:
|
||
label = _METRIC_LABEL.get(r["metric"], r["metric"])
|
||
v = r.get("value")
|
||
mu = r.get("baseline_mu")
|
||
sigma = r.get("baseline_sigma")
|
||
z = r.get("z_score")
|
||
v_text = f"{v:.2f}" if v is not None else "N/A"
|
||
if mu is not None and sigma is not None and z is not None:
|
||
lines.append(f"• {label} {v_text} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}")
|
||
else:
|
||
lines.append(f"• {label} {v_text}")
|
||
|
||
# drift 페이로드 — 어떤 전략이 변동했는지 한 줄
|
||
for r in fired:
|
||
if r["metric"] == "drift":
|
||
wn = (r.get("payload") or {}).get("weights_now") or {}
|
||
wp = (r.get("payload") or {}).get("weights_prev") or {}
|
||
if wn and wp:
|
||
diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))}
|
||
top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2]
|
||
detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top)
|
||
lines.append("")
|
||
lines.append(f"요인: {detail}")
|
||
break
|
||
|
||
lines.append("")
|
||
lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _format_signal_digest(digest: Dict[str, Any]) -> str:
|
||
"""일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호)."""
|
||
fired = int(digest.get("fired", 0))
|
||
if fired == 0:
|
||
return ""
|
||
|
||
signals_list = digest.get("signals", [])
|
||
evaluated = digest.get("evaluated", 0)
|
||
|
||
lines = [
|
||
"📊 로또 일일 요약 (지난 24h)",
|
||
"",
|
||
f"평가 {evaluated}회 / 발화 {fired}회",
|
||
]
|
||
for s in signals_list:
|
||
label = _METRIC_LABEL.get(s["metric"], s["metric"])
|
||
z = s.get("z_score")
|
||
when = (s.get("triggered_at") or "")[11:16] # HH:MM
|
||
z_text = f"z={z:.1f}" if z is not None else "z=-"
|
||
lines.append(f"• {label:14s} {s['fire_level']:6s} {z_text} ({when})")
|
||
|
||
weights_trend = digest.get("weights_trend") or {}
|
||
if weights_trend:
|
||
lines += ["", "전략 가중치 추세 (최근 8회 baseline):"]
|
||
for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])):
|
||
arrow = "↑" if delta > 0.01 else ("↓" if delta < -0.01 else "→")
|
||
lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def send_urgent_signal(event: Dict[str, Any]) -> None:
|
||
text = _format_urgent_signal(event)
|
||
try:
|
||
await send_raw(text)
|
||
except Exception as e:
|
||
logger.warning(f"[telegram_lotto] urgent signal send failed: {e}")
|
||
|
||
|
||
async def send_signal_summary(digest: Dict[str, Any]) -> None:
|
||
text = _format_signal_digest(digest)
|
||
if not text:
|
||
return # 발화 0건이면 발송 skip
|
||
try:
|
||
await send_raw(text)
|
||
except Exception as e:
|
||
logger.warning(f"[telegram_lotto] digest send failed: {e}")
|