feat(lotto-agent): run_signal_check task_id wrap + 단위 테스트

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-23 01:55:20 +09:00
parent 4b6996b0f7
commit e47ccdb762
2 changed files with 125 additions and 33 deletions

View File

@@ -28,30 +28,32 @@ class LottoAgent(BaseAgent):
pass pass
async def run_signal_check(self, source: str = "light") -> dict: async def run_signal_check(self, source: str = "light") -> dict:
"""비-LLM 시그널 평가 (light/sim) 또는 deep_check (LLM 호출 후). """비-LLM 시그널 평가. task_id wrap 적용."""
Phase 3 (Task 9): urgent 시그널 텔레그램 발송 + throttle/daily-cap 추가.
"""
from ..curator.signal_runner import run_signal_check from ..curator.signal_runner import run_signal_check
from ..config import LOTTO_Z_NORMAL, LOTTO_Z_URGENT from ..config import (
from ..db import add_log LOTTO_Z_NORMAL, LOTTO_Z_URGENT,
LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX,
)
from ..db import (
create_task, update_task_status, add_log,
get_last_signal_notification, get_recent_urgent_count,
mark_signal_notified,
)
from ..notifiers.telegram_lotto import send_urgent_signal
from ..service_proxy import lotto_latest_draw
if self.state not in ("idle", "reporting"): if self.state not in ("idle", "reporting"):
return {"ok": False, "message": f"busy ({self.state})"} return {"ok": False, "message": f"busy ({self.state})"}
task_id = create_task("lotto", "signal_check", {"source": source})
try: try:
curate_result = None curate_result = None
# 회차 단위 메트릭(drift/confidence) 가드를 위해 항상 최신 회차 가져옴
from ..service_proxy import lotto_latest_draw
current_draw_no = await lotto_latest_draw() current_draw_no = await lotto_latest_draw()
if source == "deep": if source == "deep":
from ..curator.pipeline import curate_weekly from ..curator.pipeline import curate_weekly
cw = await curate_weekly(source="signal_deep") cw = await curate_weekly(source="signal_deep")
# curate_weekly returns {"ok", "draw_no", "confidence", "tokens", "payload"}
curate_result = {"confidence": cw.get("confidence")} curate_result = {"confidence": cw.get("confidence")}
# deep_check 시 curate_weekly가 반환하는 draw_no를 우선 사용 (직접 수집)
if cw.get("draw_no"): if cw.get("draw_no"):
current_draw_no = cw.get("draw_no") current_draw_no = cw.get("draw_no")
@@ -62,35 +64,19 @@ class LottoAgent(BaseAgent):
curate_result=curate_result, curate_result=curate_result,
current_draw_no=current_draw_no, current_draw_no=current_draw_no,
) )
add_log(
self.agent_id,
f"signal_check({source}) → overall={outcome['overall_fire']} results={len(outcome['results'])}",
)
# --- Throttle + 텔레그램 urgent 발송 ---
from ..config import LOTTO_THROTTLE_HOURS, LOTTO_URGENT_DAILY_MAX
from ..db import (
get_last_signal_notification, get_recent_urgent_count,
mark_signal_notified,
)
from ..notifiers.telegram_lotto import send_urgent_signal
# urgent 텔레그램 + throttle (기존 동작 유지)
if outcome["overall_fire"] == "urgent": if outcome["overall_fire"] == "urgent":
if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX: if get_recent_urgent_count(hours=24) >= LOTTO_URGENT_DAILY_MAX:
add_log( add_log("lotto", "urgent daily cap 도달 → normal로 강등", level="warning", task_id=task_id)
self.agent_id,
"urgent daily cap 도달 → normal로 강등 (digest 합류)",
level="warning",
)
else: else:
blocked = False blocked = False
for r in outcome["results"]: for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"): if r["fire_level"] in ("normal", "urgent"):
last = get_last_signal_notification( if get_last_signal_notification(
metric=r["metric"], fire_level=r["fire_level"], metric=r["metric"], fire_level=r["fire_level"],
hours=LOTTO_THROTTLE_HOURS, hours=LOTTO_THROTTLE_HOURS,
) ):
if last:
blocked = True blocked = True
break break
if not blocked: if not blocked:
@@ -104,11 +90,23 @@ class LottoAgent(BaseAgent):
for r in outcome["results"]: for r in outcome["results"]:
if r["fire_level"] in ("normal", "urgent"): if r["fire_level"] in ("normal", "urgent"):
mark_signal_notified(r["signal_id"]) mark_signal_notified(r["signal_id"])
add_log(self.agent_id, f"urgent 텔레그램 발송 완료 (시그널 {len(outcome['results'])}마킹)") add_log("lotto", f"urgent 텔레그램 발송 ({len(outcome['results'])}시그널)", task_id=task_id)
fired_metrics = [
r["metric"] for r in outcome["results"]
if r["fire_level"] not in ("noop", "warmup")
]
update_task_status(task_id, "succeeded", result_data={
"source": source,
"overall_fire": outcome["overall_fire"],
"n_results": len(outcome["results"]),
"fired_metrics": fired_metrics,
})
add_log("lotto", f"signal_check({source}) → {outcome['overall_fire']} results={len(outcome['results'])}", task_id=task_id)
return {"ok": True, **outcome} return {"ok": True, **outcome}
except Exception as e: except Exception as e:
add_log(self.agent_id, f"signal_check 예외: {e}", level="error") update_task_status(task_id, "failed", result_data={"error": str(e)})
add_log("lotto", f"signal_check 예외: {e}", level="error", task_id=task_id)
return {"ok": False, "message": f"{type(e).__name__}: {e}"} return {"ok": False, "message": f"{type(e).__name__}: {e}"}
async def run_daily_digest(self) -> dict: async def run_daily_digest(self) -> dict:

View File

@@ -0,0 +1,94 @@
# agent-office/tests/test_lotto_task_wrap.py
import os
import sys
import tempfile
import gc
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import pytest
from app import db
db.DB_PATH = _TMP
@pytest.fixture(autouse=True)
def fresh_db():
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
db.init_db()
yield
gc.collect()
if os.path.exists(_TMP):
try:
os.remove(_TMP)
except PermissionError:
pass
@pytest.mark.asyncio
async def test_run_signal_check_creates_task_row(monkeypatch):
"""run_signal_check이 agent_tasks에 row를 만들고 result_data를 저장."""
from app.agents.lotto import LottoAgent
from app.curator import signal_runner
async def fake_run_signal_check(**kwargs):
return {
"overall_fire": "normal",
"results": [
{"signal_id": 1, "metric": "sim_signal",
"value": 0.6, "z_score": 1.7, "fire_level": "normal",
"baseline_mu": 0.5, "baseline_sigma": 0.05, "payload": {}},
],
}
monkeypatch.setattr(signal_runner, "run_signal_check", fake_run_signal_check)
from app import service_proxy
async def fake_latest():
return 1226
monkeypatch.setattr(service_proxy, "lotto_latest_draw", fake_latest)
from app.notifiers import telegram_lotto
async def fake_send(_event): pass
monkeypatch.setattr(telegram_lotto, "send_urgent_signal", fake_send)
agent = LottoAgent()
result = await agent.run_signal_check(source="light")
assert result["ok"] is True
tasks = db.get_agent_tasks("lotto", task_type="signal_check", days=1)
assert len(tasks) == 1
t = tasks[0]
assert t["status"] == "succeeded"
assert t["result_data"]["source"] == "light"
assert t["result_data"]["overall_fire"] == "normal"
assert "sim_signal" in t["result_data"]["fired_metrics"]
@pytest.mark.asyncio
async def test_run_signal_check_failure_marks_task_failed(monkeypatch):
from app.agents.lotto import LottoAgent
from app.curator import signal_runner
from app import service_proxy
async def boom(**kwargs):
raise RuntimeError("boom")
monkeypatch.setattr(signal_runner, "run_signal_check", boom)
async def fake_latest():
return 1226
monkeypatch.setattr(service_proxy, "lotto_latest_draw", fake_latest)
agent = LottoAgent()
result = await agent.run_signal_check(source="sim")
assert result["ok"] is False
tasks = db.get_agent_tasks("lotto", task_type="signal_check", days=1)
assert len(tasks) == 1
assert tasks[0]["status"] == "failed"
assert "boom" in tasks[0]["result_data"]["error"]