diff --git a/agent-office/app/agents/stock.py b/agent-office/app/agents/stock.py index eaa1405..629440a 100644 --- a/agent-office/app/agents/stock.py +++ b/agent-office/app/agents/stock.py @@ -119,7 +119,125 @@ class StockAgent(BaseAgent): update_task_status(task_id, "failed", {"error": str(e)}) await self.transition("idle", f"오류: {e}") + async def on_screener_schedule(self) -> None: + """KRX 강세주 스크리너 자동 잡 (평일 16:30 KST). + + 흐름: + 1) snapshot/refresh — 일봉 갱신 (실패해도 진행, 경고 로그) + 2) screener/run mode='auto' — 실행 + 결과 영구화 + telegram_payload 응답 + 3) status=='skipped_holiday' → 종료 (텔레그램 미발신) + 4) status=='success' → telegram_payload.text 를 parse_mode 그대로 전송 + 5) 예외/실패 → 운영자에게 별도 텔레그램 알림 (HTML) + """ + if self.state not in ("idle", "break"): + return + + task_id = create_task(self.agent_id, "screener_run", {"mode": "auto"}) + await self.transition("working", "스크리너 스냅샷 갱신 중...", task_id) + + try: + # 1) 스냅샷 갱신 — 실패해도 기존 일봉 데이터로 진행 + try: + snap = await service_proxy.refresh_screener_snapshot() + add_log( + self.agent_id, + f"snapshot refreshed: status={snap.get('status', '?')}", + "info", task_id, + ) + except Exception as e: + add_log( + self.agent_id, + f"스냅샷 갱신 실패 (기존 데이터로 진행): {e}", + "warning", task_id, + ) + + await self.transition("working", "스크리너 실행 중...") + + # 2) 스크리너 실행 + body = await service_proxy.run_stock_screener(mode="auto") + status = body.get("status") + asof = body.get("asof") + + # 3) 공휴일 — 종료 + if status == "skipped_holiday": + update_task_status(task_id, "succeeded", { + "status": status, + "asof": asof, + "telegram_sent": False, + }) + add_log(self.agent_id, f"스크리너 건너뜀 (휴일): {asof}", "info", task_id) + await self.transition("idle", "휴일 — 스크리너 건너뜀") + return + + # 4) 성공 → 텔레그램 전송 + if status == "success": + payload = body.get("telegram_payload") or {} + text = payload.get("text") or "" + parse_mode = payload.get("parse_mode", "MarkdownV2") + + if not text: + raise RuntimeError("telegram_payload.text 누락") + + await self.transition("reporting", "스크리너 결과 전송 중...") + + from ..telegram.messaging import send_raw + tg = await send_raw(text, parse_mode=parse_mode) + + update_task_status(task_id, "succeeded", { + "status": status, + "asof": asof, + "run_id": body.get("run_id"), + "survivors_count": body.get("survivors_count"), + "telegram_sent": tg.get("ok", False), + "telegram_message_id": tg.get("message_id"), + }) + + if not tg.get("ok"): + desc = tg.get("description") or "unknown" + code = tg.get("error_code") + add_log( + self.agent_id, + f"Screener telegram send failed: [{code}] {desc}", + "warning", task_id, + ) + if self._ws_manager: + await self._ws_manager.send_notification( + self.agent_id, "telegram_failed", task_id, + "스크리너 텔레그램 전송 실패", + ) + + await self.transition("idle", "스크리너 완료") + return + + # 5) 기타 status — failed 취급 + raise RuntimeError(f"unexpected screener status: {status}") + + except Exception as e: + err_msg = str(e) + add_log(self.agent_id, f"Screener job failed: {err_msg}", "error", task_id) + update_task_status(task_id, "failed", {"error": err_msg}) + + # 운영자 알림 — 기본 HTML parse_mode 사용 + try: + from ..telegram.messaging import send_raw + await send_raw( + f"⚠️ KRX 스크리너 실패\n" + f"{html.escape(err_msg)[:500]}" + ) + except Exception as notify_err: + add_log( + self.agent_id, + f"operator notify failed: {notify_err}", + "warning", task_id, + ) + + await self.transition("idle", f"스크리너 오류: {err_msg[:80]}") + async def on_command(self, command: str, params: dict) -> dict: + if command == "run_screener": + await self.on_screener_schedule() + return {"ok": True, "message": "스크리너 실행 트리거 완료"} + if command == "test_telegram": from ..telegram import send_agent_message result = await send_agent_message( diff --git a/agent-office/app/scheduler.py b/agent-office/app/scheduler.py index c1c265c..37d51f2 100644 --- a/agent-office/app/scheduler.py +++ b/agent-office/app/scheduler.py @@ -14,6 +14,11 @@ async def _run_stock_schedule(): if agent: await agent.on_schedule() +async def _run_stock_screener(): + agent = AGENT_REGISTRY.get("stock") + if agent: + await agent.on_screener_schedule() + async def _run_blog_schedule(): agent = AGENT_REGISTRY.get("blog") if agent: @@ -41,6 +46,14 @@ async def _poll_pipelines(): def init_scheduler(): scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news") + scheduler.add_job( + _run_stock_screener, + "cron", + day_of_week="mon-fri", + hour=16, + minute=30, + id="stock_screener", + ) scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline") scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate") scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research") diff --git a/agent-office/app/service_proxy.py b/agent-office/app/service_proxy.py index 4ff6dc0..31a1d54 100644 --- a/agent-office/app/service_proxy.py +++ b/agent-office/app/service_proxy.py @@ -32,6 +32,34 @@ async def summarize_stock_news(limit: int = 15) -> Dict[str, Any]: return resp.json() +async def refresh_screener_snapshot() -> Dict[str, Any]: + """stock-lab의 KRX 일봉 스냅샷 갱신 (스크리너 실행 전 호출). + + 네이버 금융 일괄 다운로드라 보통 30~120s, 여유있게 180s. + """ + async with httpx.AsyncClient(timeout=180.0) as client: + resp = await client.post(f"{STOCK_LAB_URL}/api/stock/screener/snapshot/refresh") + resp.raise_for_status() + return resp.json() + + +async def run_stock_screener(mode: str = "auto") -> Dict[str, Any]: + """stock-lab의 스크리너 실행. + + 반환 status: + - 'skipped_holiday': 공휴일/주말 — telegram_payload 없음 + - 'success': telegram_payload 동봉 + 엔진 자체는 수 초 내 끝나지만, 컨텍스트 로드+200종목 처리 여유 180s. + """ + async with httpx.AsyncClient(timeout=180.0) as client: + resp = await client.post( + f"{STOCK_LAB_URL}/api/stock/screener/run", + json={"mode": mode}, + ) + resp.raise_for_status() + return resp.json() + + async def scrape_stock_news() -> Dict[str, Any]: """stock-lab의 수동 뉴스 스크랩 트리거 — DB에 최신 뉴스 저장. diff --git a/agent-office/app/telegram/messaging.py b/agent-office/app/telegram/messaging.py index 969bd28..7f962ed 100644 --- a/agent-office/app/telegram/messaging.py +++ b/agent-office/app/telegram/messaging.py @@ -8,14 +8,22 @@ from .client import _enabled, api_call from .formatter import MessageKind, format_agent_message -async def send_raw(text: str, reply_markup: Optional[dict] = None, chat_id: Optional[str] = None) -> dict: - """가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로.""" +async def send_raw( + text: str, + reply_markup: Optional[dict] = None, + chat_id: Optional[str] = None, + parse_mode: str = "HTML", +) -> dict: + """가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로. + + parse_mode: 기본 'HTML'. MarkdownV2 페이로드(예: 스크리너) 전송 시 명시 지정. + """ if not _enabled(): return {"ok": False, "message_id": None} payload = { "chat_id": chat_id or TELEGRAM_CHAT_ID, "text": text, - "parse_mode": "HTML", + "parse_mode": parse_mode, } if reply_markup: payload["reply_markup"] = reply_markup diff --git a/agent-office/tests/test_stock_screener_job.py b/agent-office/tests/test_stock_screener_job.py new file mode 100644 index 0000000..1e33402 --- /dev/null +++ b/agent-office/tests/test_stock_screener_job.py @@ -0,0 +1,177 @@ +"""StockAgent.on_screener_schedule — 평일 16:30 KST 자동 잡 단위 테스트. + +stock-lab HTTP 호출은 service_proxy mock, 텔레그램은 messaging.send_raw mock. +""" +import os +import sys +import tempfile + +_fd, _TMP = tempfile.mkstemp(suffix=".db") +os.close(_fd) +os.unlink(_TMP) +os.environ["AGENT_OFFICE_DB_PATH"] = _TMP + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + +import asyncio +from unittest.mock import AsyncMock, patch +import pytest + + +@pytest.fixture(autouse=True) +def _init_db(): + import gc + gc.collect() + if os.path.exists(_TMP): + os.remove(_TMP) + from app.db import init_db + init_db() + yield + gc.collect() + + +def _success_body(asof="2026-05-12"): + return { + "asof": asof, + "mode": "auto", + "status": "success", + "run_id": 42, + "survivors_count": 600, + "top_n": 20, + "results": [], + "telegram_payload": { + "chat_target": "default", + "parse_mode": "MarkdownV2", + "text": "*KRX 강세주 스크리너* test body", + }, + "warnings": [], + } + + +def _holiday_body(asof="2026-05-05"): + return { + "asof": asof, + "mode": "auto", + "status": "skipped_holiday", + "run_id": None, + "survivors_count": None, + "top_n": 0, + "results": [], + "telegram_payload": None, + "warnings": [f"{asof} is a holiday — skipped"], + } + + +def test_screener_success_sends_markdownv2_telegram(): + from app.agents.stock import StockAgent + from app import service_proxy + from app.telegram import messaging + + fake_snap = AsyncMock(return_value={"status": "ok"}) + fake_run = AsyncMock(return_value=_success_body()) + fake_send = AsyncMock(return_value={"ok": True, "message_id": 7777}) + + with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \ + patch.object(service_proxy, "run_stock_screener", fake_run), \ + patch.object(messaging, "send_raw", fake_send): + agent = StockAgent() + asyncio.run(agent.on_screener_schedule()) + + fake_snap.assert_awaited_once() + fake_run.assert_awaited_once_with(mode="auto") + fake_send.assert_awaited_once() + args, kwargs = fake_send.call_args + # 첫 인자(text) 또는 kwargs로 전달 + text = args[0] if args else kwargs.get("text") + assert "KRX 강세주 스크리너" in text + assert kwargs.get("parse_mode") == "MarkdownV2" + assert agent.state == "idle" + + +def test_screener_holiday_skips_telegram(): + from app.agents.stock import StockAgent + from app import service_proxy + from app.telegram import messaging + + fake_snap = AsyncMock(return_value={"status": "skipped_weekend"}) + fake_run = AsyncMock(return_value=_holiday_body()) + fake_send = AsyncMock(return_value={"ok": True, "message_id": 1}) + + with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \ + patch.object(service_proxy, "run_stock_screener", fake_run), \ + patch.object(messaging, "send_raw", fake_send): + agent = StockAgent() + asyncio.run(agent.on_screener_schedule()) + + fake_run.assert_awaited_once() + # 휴일이면 텔레그램 미발신 + fake_send.assert_not_awaited() + assert agent.state == "idle" + + +def test_screener_snapshot_failure_still_runs_screener(): + """스냅샷 실패는 경고만 남기고 screener 호출은 계속됨.""" + from app.agents.stock import StockAgent + from app import service_proxy + from app.telegram import messaging + + fake_snap = AsyncMock(side_effect=RuntimeError("snapshot upstream down")) + fake_run = AsyncMock(return_value=_success_body()) + fake_send = AsyncMock(return_value={"ok": True, "message_id": 8888}) + + with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \ + patch.object(service_proxy, "run_stock_screener", fake_run), \ + patch.object(messaging, "send_raw", fake_send): + agent = StockAgent() + asyncio.run(agent.on_screener_schedule()) + + fake_snap.assert_awaited_once() + fake_run.assert_awaited_once_with(mode="auto") + fake_send.assert_awaited_once() + + +def test_screener_run_failure_notifies_operator(): + """screener/run 실패 시 운영자 알림 텔레그램 발송.""" + from app.agents.stock import StockAgent + from app import service_proxy + from app.telegram import messaging + + fake_snap = AsyncMock(return_value={"status": "ok"}) + fake_run = AsyncMock(side_effect=RuntimeError("stock-lab 500")) + fake_send = AsyncMock(return_value={"ok": True, "message_id": 1}) + + with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \ + patch.object(service_proxy, "run_stock_screener", fake_run), \ + patch.object(messaging, "send_raw", fake_send): + agent = StockAgent() + asyncio.run(agent.on_screener_schedule()) + + # 운영자 알림 1회는 호출 + assert fake_send.await_count == 1 + args, kwargs = fake_send.call_args + text = args[0] if args else kwargs.get("text") + assert "스크리너 실패" in text + assert agent.state == "idle" + + +def test_screener_unexpected_status_treated_as_failure(): + from app.agents.stock import StockAgent + from app import service_proxy + from app.telegram import messaging + + fake_snap = AsyncMock(return_value={"status": "ok"}) + fake_run = AsyncMock(return_value={"status": "weird", "asof": "2026-05-12"}) + fake_send = AsyncMock(return_value={"ok": True, "message_id": 1}) + + with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \ + patch.object(service_proxy, "run_stock_screener", fake_run), \ + patch.object(messaging, "send_raw", fake_send): + agent = StockAgent() + asyncio.run(agent.on_screener_schedule()) + + # 운영자 알림 1회 + screener payload 미발송 + assert fake_send.await_count == 1 + args, kwargs = fake_send.call_args + text = args[0] if args else kwargs.get("text") + assert "스크리너 실패" in text