feat(lotto-signals): 텔레그램 urgent/digest 메시지 포맷

This commit is contained in:
2026-05-20 03:07:30 +09:00
parent b1c786e59d
commit 8552cbc184
2 changed files with 148 additions and 0 deletions

View File

@@ -59,3 +59,102 @@ async def send_prize_alert(event: Dict[str, Any]) -> None:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] prize alert send failed: {e}")
# ---------- 능동 시그널 알림 (urgent + digest) ----------
_METRIC_LABEL = {
"sim_signal": "Sim Consensus",
"drift": "Strategy Drift",
"confidence": "Confidence",
}
def _format_urgent_signal(event: Dict[str, Any]) -> str:
"""긴급 시그널 텔레그램 메시지 포맷."""
triggered = event.get("triggered_at", "")[:19].replace("T", " ")
results = event.get("results", [])
fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")]
lines = [
"🚨 로또 능동 신호",
"",
f"[{triggered}]",
f"강한 시그널 {len(fired)}종 발화:",
]
for r in fired:
label = _METRIC_LABEL.get(r["metric"], r["metric"])
v = r.get("value")
mu = r.get("baseline_mu")
sigma = r.get("baseline_sigma")
z = r.get("z_score")
if mu is not None and sigma is not None and z is not None:
lines.append(f"{label} {v:.2f} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}")
else:
lines.append(f"{label} {v:.2f}")
# drift 페이로드 — 어떤 전략이 변동했는지 한 줄
for r in fired:
if r["metric"] == "drift":
wn = (r.get("payload") or {}).get("weights_now") or {}
wp = (r.get("payload") or {}).get("weights_prev") or {}
if wn and wp:
diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))}
top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2]
detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top)
lines.append("")
lines.append(f"요인: {detail}")
break
lines.append("")
lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)")
return "\n".join(lines)
def _format_signal_digest(digest: Dict[str, Any]) -> str:
"""일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호)."""
fired = int(digest.get("fired", 0))
if fired == 0:
return ""
signals_list = digest.get("signals", [])
evaluated = digest.get("evaluated", 0)
lines = [
"📊 로또 일일 요약 (지난 24h)",
"",
f"평가 {evaluated}회 / 발화 {fired}",
]
for s in signals_list:
label = _METRIC_LABEL.get(s["metric"], s["metric"])
z = s.get("z_score")
when = (s.get("triggered_at") or "")[11:16] # HH:MM
z_text = f"z={z:.1f}" if z is not None else "z=-"
lines.append(f"{label:14s} {s['fire_level']:6s} {z_text} ({when})")
weights_trend = digest.get("weights_trend") or {}
if weights_trend:
lines += ["", "전략 가중치 추세 (최근 8회 baseline):"]
for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])):
arrow = "" if delta > 0.01 else ("" if delta < -0.01 else "")
lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%")
return "\n".join(lines)
async def send_urgent_signal(event: Dict[str, Any]) -> None:
text = _format_urgent_signal(event)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] urgent signal send failed: {e}")
async def send_signal_summary(digest: Dict[str, Any]) -> None:
text = _format_signal_digest(digest)
if not text:
return # 발화 0건이면 발송 skip
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] digest send failed: {e}")