fix(agent-office/lotto): deep CuratorError fallthrough + urgent 발송 재시도
결함1: deep signal-check에서 curate_weekly가 CuratorError면 전체 check가 abort돼 sim/drift 시그널이 미평가되던 문제 → try/except로 confidence만 포기하고 sim/drift는 계속(curate_result=None fallthrough). 결함2: send_urgent_signal 실패가 outer except로 빠져 task 실패+미마킹이던 문제 → _send_urgent_with_retry(3회/60s) 추출, 최종 실패해도 raise 안 함(시그널 평가·태스크 보존), 성공 시에만 mark_signal_notified. TDD 3 신규 테스트. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,10 @@ from .base import BaseAgent
|
|||||||
from ..db import create_task, update_task_status, add_log
|
from ..db import create_task, update_task_status, add_log
|
||||||
from ..curator.pipeline import curate_weekly, CuratorError
|
from ..curator.pipeline import curate_weekly, CuratorError
|
||||||
|
|
||||||
|
# urgent 텔레그램 발송 재시도 (전송 실패가 시그널 평가/태스크를 중단시키지 않도록)
|
||||||
|
URGENT_SEND_MAX_ATTEMPTS = 3
|
||||||
|
URGENT_SEND_RETRY_SEC = 60
|
||||||
|
|
||||||
|
|
||||||
class LottoAgent(BaseAgent):
|
class LottoAgent(BaseAgent):
|
||||||
agent_id = "lotto"
|
agent_id = "lotto"
|
||||||
@@ -54,10 +58,16 @@ class LottoAgent(BaseAgent):
|
|||||||
|
|
||||||
if source == "deep":
|
if source == "deep":
|
||||||
from ..curator.pipeline import curate_weekly
|
from ..curator.pipeline import curate_weekly
|
||||||
|
try:
|
||||||
cw = await curate_weekly(source="signal_deep")
|
cw = await curate_weekly(source="signal_deep")
|
||||||
curate_result = {"confidence": cw.get("confidence")}
|
curate_result = {"confidence": cw.get("confidence")}
|
||||||
if cw.get("draw_no"):
|
if cw.get("draw_no"):
|
||||||
current_draw_no = cw.get("draw_no")
|
current_draw_no = cw.get("draw_no")
|
||||||
|
except CuratorError as e:
|
||||||
|
# 큐레이션 실패는 confidence 시그널만 포기 — sim/drift 평가는 계속(fallthrough)
|
||||||
|
add_log("lotto", f"deep curate_weekly 실패 → sim/drift만 평가: {e}",
|
||||||
|
level="warning", task_id=task_id)
|
||||||
|
curate_result = None
|
||||||
|
|
||||||
outcome = await run_signal_check(
|
outcome = await run_signal_check(
|
||||||
source=source,
|
source=source,
|
||||||
@@ -88,11 +98,7 @@ class LottoAgent(BaseAgent):
|
|||||||
"triggered_at": datetime.now(timezone.utc).isoformat(),
|
"triggered_at": datetime.now(timezone.utc).isoformat(),
|
||||||
"results": outcome["results"],
|
"results": outcome["results"],
|
||||||
}
|
}
|
||||||
await send_urgent_signal(event)
|
await self._send_urgent_with_retry(event, outcome["results"], task_id)
|
||||||
for r in outcome["results"]:
|
|
||||||
if r["fire_level"] in ("normal", "urgent"):
|
|
||||||
mark_signal_notified(r["signal_id"])
|
|
||||||
add_log("lotto", f"urgent 텔레그램 발송 ({len(outcome['results'])}개 시그널)", task_id=task_id)
|
|
||||||
|
|
||||||
fired_metrics = [
|
fired_metrics = [
|
||||||
r["metric"] for r in outcome["results"]
|
r["metric"] for r in outcome["results"]
|
||||||
@@ -111,6 +117,31 @@ class LottoAgent(BaseAgent):
|
|||||||
add_log("lotto", f"signal_check 예외: {e}", level="error", task_id=task_id)
|
add_log("lotto", f"signal_check 예외: {e}", level="error", task_id=task_id)
|
||||||
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
|
return {"ok": False, "message": f"{type(e).__name__}: {e}"}
|
||||||
|
|
||||||
|
async def _send_urgent_with_retry(self, event: dict, results: list, task_id: str) -> bool:
|
||||||
|
"""urgent 텔레그램 발송 + 실패 시 재시도. 최종 실패해도 raise하지 않음(시그널 평가·태스크 보존).
|
||||||
|
성공 시 fired 시그널을 notified로 마킹. 반환: 발송 성공 여부."""
|
||||||
|
import asyncio
|
||||||
|
from ..db import add_log, mark_signal_notified
|
||||||
|
from ..notifiers.telegram_lotto import send_urgent_signal
|
||||||
|
for attempt in range(1, URGENT_SEND_MAX_ATTEMPTS + 1):
|
||||||
|
try:
|
||||||
|
await send_urgent_signal(event)
|
||||||
|
for r in results:
|
||||||
|
if r["fire_level"] in ("normal", "urgent"):
|
||||||
|
mark_signal_notified(r["signal_id"])
|
||||||
|
add_log("lotto", f"urgent 텔레그램 발송 ({len(results)}개 시그널, attempt {attempt})", task_id=task_id)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
if attempt < URGENT_SEND_MAX_ATTEMPTS:
|
||||||
|
add_log("lotto", f"urgent 발송 실패(attempt {attempt}) → {URGENT_SEND_RETRY_SEC}s 후 재시도: {e}",
|
||||||
|
level="warning", task_id=task_id)
|
||||||
|
await asyncio.sleep(URGENT_SEND_RETRY_SEC)
|
||||||
|
else:
|
||||||
|
add_log("lotto", f"urgent 발송 {URGENT_SEND_MAX_ATTEMPTS}회 실패 — 미발송: {e}",
|
||||||
|
level="error", task_id=task_id)
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
async def run_daily_digest(self) -> dict:
|
async def run_daily_digest(self) -> dict:
|
||||||
"""일일 요약 — 지난 24h normal/urgent 발화 텔레그램 1통. task_id wrap."""
|
"""일일 요약 — 지난 24h normal/urgent 발화 텔레그램 1통. task_id wrap."""
|
||||||
from ..db import (
|
from ..db import (
|
||||||
|
|||||||
@@ -96,6 +96,81 @@ async def test_run_signal_check_failure_marks_task_failed(monkeypatch):
|
|||||||
assert "boom" in tasks[0]["result_data"]["error"]
|
assert "boom" in tasks[0]["result_data"]["error"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_deep_curate_error_still_evaluates_signals(monkeypatch):
|
||||||
|
"""deep: curate_weekly가 CuratorError여도 sim/drift 시그널 평가는 계속(fallthrough)."""
|
||||||
|
from app.agents.lotto import LottoAgent
|
||||||
|
from app.curator import signal_runner, pipeline
|
||||||
|
from app import service_proxy
|
||||||
|
from app.notifiers import telegram_lotto
|
||||||
|
|
||||||
|
async def boom_curate(**kwargs):
|
||||||
|
raise pipeline.CuratorError("curation 실패")
|
||||||
|
monkeypatch.setattr(pipeline, "curate_weekly", boom_curate)
|
||||||
|
|
||||||
|
called = {"signal": False, "curate_result": "UNSET"}
|
||||||
|
async def fake_signal(**kwargs):
|
||||||
|
called["signal"] = True
|
||||||
|
called["curate_result"] = kwargs.get("curate_result")
|
||||||
|
return {"overall_fire": "normal", "results": [
|
||||||
|
{"signal_id": 1, "metric": "sim_signal", "value": 0.6, "z_score": 1.7,
|
||||||
|
"fire_level": "normal", "baseline_mu": 0.5, "baseline_sigma": 0.05, "payload": {}}]}
|
||||||
|
monkeypatch.setattr(signal_runner, "run_signal_check", fake_signal)
|
||||||
|
|
||||||
|
async def fake_latest():
|
||||||
|
return 1226
|
||||||
|
monkeypatch.setattr(service_proxy, "lotto_latest_draw", fake_latest)
|
||||||
|
async def fake_send(_e):
|
||||||
|
pass
|
||||||
|
monkeypatch.setattr(telegram_lotto, "send_urgent_signal", fake_send)
|
||||||
|
|
||||||
|
agent = LottoAgent()
|
||||||
|
result = await agent.run_signal_check(source="deep")
|
||||||
|
assert result["ok"] is True # CuratorError로 중단되지 않음
|
||||||
|
assert called["signal"] is True # sim/drift 평가 계속됨
|
||||||
|
assert called["curate_result"] is None # confidence는 None으로 fallthrough
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_urgent_send_retries_then_succeeds(monkeypatch):
|
||||||
|
"""urgent 발송이 실패하면 재시도하고, 성공하면 True."""
|
||||||
|
from app.agents.lotto import LottoAgent
|
||||||
|
from app.notifiers import telegram_lotto
|
||||||
|
import app.agents.lotto as lotto_mod
|
||||||
|
monkeypatch.setattr(lotto_mod, "URGENT_SEND_RETRY_SEC", 0) # 실대기 제거
|
||||||
|
|
||||||
|
attempts = {"n": 0}
|
||||||
|
async def flaky_send(_event):
|
||||||
|
attempts["n"] += 1
|
||||||
|
if attempts["n"] < 3:
|
||||||
|
raise RuntimeError("telegram down")
|
||||||
|
monkeypatch.setattr(telegram_lotto, "send_urgent_signal", flaky_send)
|
||||||
|
|
||||||
|
agent = LottoAgent()
|
||||||
|
results = [{"signal_id": 1, "fire_level": "urgent"}]
|
||||||
|
ok = await agent._send_urgent_with_retry({"x": 1}, results, task_id="t1")
|
||||||
|
assert ok is True
|
||||||
|
assert attempts["n"] == 3
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_urgent_send_all_fail_returns_false_no_raise(monkeypatch):
|
||||||
|
"""urgent 발송이 끝까지 실패해도 raise하지 않고 False (시그널 평가/태스크 보존)."""
|
||||||
|
from app.agents.lotto import LottoAgent
|
||||||
|
from app.notifiers import telegram_lotto
|
||||||
|
import app.agents.lotto as lotto_mod
|
||||||
|
monkeypatch.setattr(lotto_mod, "URGENT_SEND_RETRY_SEC", 0)
|
||||||
|
|
||||||
|
async def always_fail(_event):
|
||||||
|
raise RuntimeError("telegram down")
|
||||||
|
monkeypatch.setattr(telegram_lotto, "send_urgent_signal", always_fail)
|
||||||
|
|
||||||
|
agent = LottoAgent()
|
||||||
|
ok = await agent._send_urgent_with_retry(
|
||||||
|
{"x": 1}, [{"signal_id": 1, "fire_level": "urgent"}], task_id="t1")
|
||||||
|
assert ok is False
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_run_daily_digest_creates_task(monkeypatch):
|
async def test_run_daily_digest_creates_task(monkeypatch):
|
||||||
"""run_daily_digest이 agent_tasks에 task 생성 + result_data 저장."""
|
"""run_daily_digest이 agent_tasks에 task 생성 + result_data 저장."""
|
||||||
|
|||||||
Reference in New Issue
Block a user