Files
web-page-backend/agent-office/app/notifiers/telegram_lotto.py
gahusb 0f8c71c552 fix(lotto-evolver): previous_base diff + 일요일 cron skip + idempotent evaluate
- weight_evolver.evaluate_weekly: save_base_history 직전에 current_base를
  previous_base로 캡처해 return dict에 포함 → formatter가 진짜 diff 표시 가능
- evaluate_weekly: same effective_from row 이미 존재 시 save skip + idempotent
  return (토 22:00 lotto cron과 agent-office 22:15 재호출 중복 row 방지)
- main._run_weight_evolver_daily: 일요일(weekday=6) 도 skip — 토요일 trial을
  INSERT OR REPLACE로 덮어쓰는 문제 방지
- telegram_lotto._format_evolution_report: eval_result.previous_base 우선
  사용 (없으면 current_base 폴백) → diff 자기 자신 비교 버그 수정
- test_lotto_evolution_format: previous_base 키 추가 + 새 diff 검증 테스트

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-22 03:35:20 +09:00

228 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""로또 큐레이션·당첨 알림 — 텔레그램 푸시."""
import logging
from typing import Dict, Any, List
# 기존 에이전트들과 동일한 패턴: send_raw(text, reply_markup=None, chat_id=None)
# chat_id 생략 시 기본 TELEGRAM_CHAT_ID로 자동 발송.
from ..telegram.messaging import send_raw
logger = logging.getLogger("agent-office")
LOTTO_URL = "https://gahusb.synology.me/lotto"
def _format_briefing(payload: Dict[str, Any]) -> str:
draw_no = payload["draw_no"]
nar = payload["narrative"]
conf = payload["confidence"]
# 분배 칩 — core 5세트의 risk_tag 빈도
core = payload["picks"]["core"]
role_count = {"안정": 0, "균형": 0, "공격": 0}
for p in core:
role_count[p["risk_tag"]] = role_count.get(p["risk_tag"], 0) + 1
chip = " · ".join(f"{k} {v}" for k, v in role_count.items() if v)
msg = [
f"🎟 {draw_no}회 · 큐레이션 떴음",
"",
f"\"{nar['headline']}\"",
f"신뢰도 {conf} · 분배 {chip}",
]
retro = nar.get("retrospective") or ""
if retro:
msg += ["", f"▸ 회고: {retro}"]
msg += ["", f"👉 결정 카드 보러가기 ({LOTTO_URL})"]
return "\n".join(msg)
def _format_prize_alert(event: Dict[str, Any]) -> str:
return (
"🚨 로또 당첨 가능성!\n"
f"{event['draw_no']}회 — {event['match_count']}개 일치\n"
f"번호: {', '.join(str(n) for n in event['numbers'])}\n"
"동행복권에서 즉시 확인하세요."
)
async def send_curator_briefing(payload: Dict[str, Any]) -> None:
text = _format_briefing(payload)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] briefing send failed: {e}")
async def send_prize_alert(event: Dict[str, Any]) -> None:
text = _format_prize_alert(event)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] prize alert send failed: {e}")
# ---------- 능동 시그널 알림 (urgent + digest) ----------
_METRIC_LABEL = {
"sim_signal": "Sim Consensus",
"drift": "Strategy Drift",
"confidence": "Confidence",
}
def _format_urgent_signal(event: Dict[str, Any]) -> str:
"""긴급 시그널 텔레그램 메시지 포맷."""
triggered = event.get("triggered_at", "")[:19].replace("T", " ")
results = event.get("results", [])
fired = [r for r in results if r.get("fire_level") in ("normal", "urgent")]
lines = [
"🚨 로또 능동 신호",
"",
f"[{triggered}]",
f"강한 시그널 {len(fired)}종 발화:",
]
for r in fired:
label = _METRIC_LABEL.get(r["metric"], r["metric"])
v = r.get("value")
mu = r.get("baseline_mu")
sigma = r.get("baseline_sigma")
z = r.get("z_score")
v_text = f"{v:.2f}" if v is not None else "N/A"
if mu is not None and sigma is not None and z is not None:
lines.append(f"{label} {v_text} (μ={mu:.2f}, σ={sigma:.2f}) z={z:.1f}")
else:
lines.append(f"{label} {v_text}")
# drift 페이로드 — 어떤 전략이 변동했는지 한 줄
for r in fired:
if r["metric"] == "drift":
wn = (r.get("payload") or {}).get("weights_now") or {}
wp = (r.get("payload") or {}).get("weights_prev") or {}
if wn and wp:
diffs = {k: wn.get(k, 0) - wp.get(k, 0) for k in (set(wn) | set(wp))}
top = sorted(diffs.items(), key=lambda kv: abs(kv[1]), reverse=True)[:2]
detail = ", ".join(f"{k} {'+' if d>=0 else ''}{d*100:.0f}%p" for k, d in top)
lines.append("")
lines.append(f"요인: {detail}")
break
lines.append("")
lines.append(f"[자세히 보기] ({LOTTO_URL}/agent)")
return "\n".join(lines)
def _format_signal_digest(digest: Dict[str, Any]) -> str:
"""일일 요약 메시지. 발화 0건이면 빈 문자열 (발송 skip 신호)."""
fired = int(digest.get("fired", 0))
if fired == 0:
return ""
signals_list = digest.get("signals", [])
evaluated = digest.get("evaluated", 0)
lines = [
"📊 로또 일일 요약 (지난 24h)",
"",
f"평가 {evaluated}회 / 발화 {fired}",
]
for s in signals_list:
label = _METRIC_LABEL.get(s["metric"], s["metric"])
z = s.get("z_score")
when = (s.get("triggered_at") or "")[11:16] # HH:MM
z_text = f"z={z:.1f}" if z is not None else "z=-"
lines.append(f"{label:14s} {s['fire_level']:6s} {z_text} ({when})")
weights_trend = digest.get("weights_trend") or {}
if weights_trend:
lines += ["", "전략 가중치 추세 (최근 8회 baseline):"]
for strategy, delta in sorted(weights_trend.items(), key=lambda kv: -abs(kv[1])):
arrow = "" if delta > 0.01 else ("" if delta < -0.01 else "")
lines.append(f" {strategy:12s} {arrow} {delta*100:+.0f}%")
return "\n".join(lines)
async def send_urgent_signal(event: Dict[str, Any]) -> None:
text = _format_urgent_signal(event)
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] urgent signal send failed: {e}")
async def send_signal_summary(digest: Dict[str, Any]) -> None:
text = _format_signal_digest(digest)
if not text:
return # 발화 0건이면 발송 skip
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] digest send failed: {e}")
# ---------- Weight Evolver 주간 리포트 ----------
_DAY_NAMES = ["", "", "", "", "", ""]
_METRIC_NAMES = ["freq", "finger", "gap", "cooccur", "divers"]
_REASON_LABEL = {
"winner_4plus": "4개 이상 일치 → base 교체",
"ema_blend": "3개 일치 → EMA blend (0.3)",
"unchanged": "유효 성과 없음 → base 유지",
"cold_start": "초기 균등 적용",
}
def _format_evolution_report(eval_result: Dict[str, Any], current_base: List[float]) -> str:
"""주간 weight evolution 텔레그램 메시지. ok=False 또는 winner 없으면 빈 문자열."""
if not eval_result or "winner" not in eval_result:
return ""
draw_no = eval_result.get("draw_no", "?")
winner = eval_result["winner"]
new_base = eval_result.get("new_base") or [0.0] * 5
reason = eval_result.get("update_reason", "")
dow = winner.get("day_of_week", 0)
day_name = _DAY_NAMES[dow] if 0 <= dow < len(_DAY_NAMES) else "?"
lines = [
f"🧬 로또 학습 주간 리포트 ({draw_no}회차)",
"",
f"이번주 시도: 6일 × {winner.get('n_picks', 5)}세트",
"",
f"🏆 Winner: {day_name}요일",
f" W = [" + ", ".join(
f"{name} {w:.2f}" for name, w in zip(_METRIC_NAMES, winner["weight"])
) + "]",
f" 최고 적중: {winner.get('max_correct', 0)}개 일치 (max={winner.get('max_correct', 0)})",
f" 평균 점수: {winner.get('avg_score', 0):.2f}",
"",
f"📊 다음주 base 변경 ({reason}):",
]
# 우선순위: eval_result.previous_base > current_base (eval 직후 stale) > 균등 fallback
base_now = eval_result.get("previous_base") or current_base or [0.2] * 5
for i, (cur, new) in enumerate(zip(base_now, new_base)):
diff = new - cur
if abs(diff) < 0.005:
marker = "="
elif diff > 0:
marker = "+" if diff < 0.05 else "++"
else:
marker = "-" if diff > -0.05 else "--"
lines.append(f" {_METRIC_NAMES[i]:8s} {cur:.2f}{new:.2f} ({marker})")
lines.append("")
lines.append(f"{_REASON_LABEL.get(reason, reason)}")
lines.append("")
lines.append(f"[웹에서 차트 보기] ({LOTTO_URL}/evolver)")
return "\n".join(lines)
async def send_evolution_report(eval_result: Dict[str, Any], current_base: List[float]) -> None:
text = _format_evolution_report(eval_result, current_base)
if not text:
return
try:
await send_raw(text)
except Exception as e:
logger.warning(f"[telegram_lotto] evolution report send failed: {e}")