feat(agent-office): stock screener 평일 16:30 KST 자동 잡 + 텔레그램 전송

- StockAgent.on_screener_schedule: snapshot/refresh → screener/run(mode=auto)
  → telegram_payload(MarkdownV2) 발송. skipped_holiday는 무발신,
  실패 시 운영자 HTML 알림.
- service_proxy: refresh_screener_snapshot, run_stock_screener 추가
  (각각 180s timeout, STOCK_LAB_URL 기존 env 재사용).
- telegram.messaging.send_raw: parse_mode 파라미터 추가
  (기본 HTML 유지, MarkdownV2 페이로드 직접 전달용).
- scheduler: cron day_of_week=mon-fri hour=16 minute=30 id=stock_screener
  (Asia/Seoul TZ).
- on_command 'run_screener' 수동 트리거 추가.
- tests: 성공/휴일/스냅샷실패/run실패/이상status 5케이스.
This commit is contained in:
2026-05-12 14:54:24 +09:00
parent c4cb18a25c
commit 119ac88e1e
5 changed files with 347 additions and 3 deletions

View File

@@ -119,7 +119,125 @@ class StockAgent(BaseAgent):
update_task_status(task_id, "failed", {"error": str(e)}) update_task_status(task_id, "failed", {"error": str(e)})
await self.transition("idle", f"오류: {e}") await self.transition("idle", f"오류: {e}")
async def on_screener_schedule(self) -> None:
"""KRX 강세주 스크리너 자동 잡 (평일 16:30 KST).
흐름:
1) snapshot/refresh — 일봉 갱신 (실패해도 진행, 경고 로그)
2) screener/run mode='auto' — 실행 + 결과 영구화 + telegram_payload 응답
3) status=='skipped_holiday' → 종료 (텔레그램 미발신)
4) status=='success' → telegram_payload.text 를 parse_mode 그대로 전송
5) 예외/실패 → 운영자에게 별도 텔레그램 알림 (HTML)
"""
if self.state not in ("idle", "break"):
return
task_id = create_task(self.agent_id, "screener_run", {"mode": "auto"})
await self.transition("working", "스크리너 스냅샷 갱신 중...", task_id)
try:
# 1) 스냅샷 갱신 — 실패해도 기존 일봉 데이터로 진행
try:
snap = await service_proxy.refresh_screener_snapshot()
add_log(
self.agent_id,
f"snapshot refreshed: status={snap.get('status', '?')}",
"info", task_id,
)
except Exception as e:
add_log(
self.agent_id,
f"스냅샷 갱신 실패 (기존 데이터로 진행): {e}",
"warning", task_id,
)
await self.transition("working", "스크리너 실행 중...")
# 2) 스크리너 실행
body = await service_proxy.run_stock_screener(mode="auto")
status = body.get("status")
asof = body.get("asof")
# 3) 공휴일 — 종료
if status == "skipped_holiday":
update_task_status(task_id, "succeeded", {
"status": status,
"asof": asof,
"telegram_sent": False,
})
add_log(self.agent_id, f"스크리너 건너뜀 (휴일): {asof}", "info", task_id)
await self.transition("idle", "휴일 — 스크리너 건너뜀")
return
# 4) 성공 → 텔레그램 전송
if status == "success":
payload = body.get("telegram_payload") or {}
text = payload.get("text") or ""
parse_mode = payload.get("parse_mode", "MarkdownV2")
if not text:
raise RuntimeError("telegram_payload.text 누락")
await self.transition("reporting", "스크리너 결과 전송 중...")
from ..telegram.messaging import send_raw
tg = await send_raw(text, parse_mode=parse_mode)
update_task_status(task_id, "succeeded", {
"status": status,
"asof": asof,
"run_id": body.get("run_id"),
"survivors_count": body.get("survivors_count"),
"telegram_sent": tg.get("ok", False),
"telegram_message_id": tg.get("message_id"),
})
if not tg.get("ok"):
desc = tg.get("description") or "unknown"
code = tg.get("error_code")
add_log(
self.agent_id,
f"Screener telegram send failed: [{code}] {desc}",
"warning", task_id,
)
if self._ws_manager:
await self._ws_manager.send_notification(
self.agent_id, "telegram_failed", task_id,
"스크리너 텔레그램 전송 실패",
)
await self.transition("idle", "스크리너 완료")
return
# 5) 기타 status — failed 취급
raise RuntimeError(f"unexpected screener status: {status}")
except Exception as e:
err_msg = str(e)
add_log(self.agent_id, f"Screener job failed: {err_msg}", "error", task_id)
update_task_status(task_id, "failed", {"error": err_msg})
# 운영자 알림 — 기본 HTML parse_mode 사용
try:
from ..telegram.messaging import send_raw
await send_raw(
f"⚠️ <b>KRX 스크리너 실패</b>\n"
f"<code>{html.escape(err_msg)[:500]}</code>"
)
except Exception as notify_err:
add_log(
self.agent_id,
f"operator notify failed: {notify_err}",
"warning", task_id,
)
await self.transition("idle", f"스크리너 오류: {err_msg[:80]}")
async def on_command(self, command: str, params: dict) -> dict: async def on_command(self, command: str, params: dict) -> dict:
if command == "run_screener":
await self.on_screener_schedule()
return {"ok": True, "message": "스크리너 실행 트리거 완료"}
if command == "test_telegram": if command == "test_telegram":
from ..telegram import send_agent_message from ..telegram import send_agent_message
result = await send_agent_message( result = await send_agent_message(

View File

@@ -14,6 +14,11 @@ async def _run_stock_schedule():
if agent: if agent:
await agent.on_schedule() await agent.on_schedule()
async def _run_stock_screener():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.on_screener_schedule()
async def _run_blog_schedule(): async def _run_blog_schedule():
agent = AGENT_REGISTRY.get("blog") agent = AGENT_REGISTRY.get("blog")
if agent: if agent:
@@ -41,6 +46,14 @@ async def _poll_pipelines():
def init_scheduler(): def init_scheduler():
scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news") scheduler.add_job(_run_stock_schedule, "cron", hour=7, minute=30, id="stock_news")
scheduler.add_job(
_run_stock_screener,
"cron",
day_of_week="mon-fri",
hour=16,
minute=30,
id="stock_screener",
)
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline") scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate") scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research") scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")

View File

@@ -32,6 +32,34 @@ async def summarize_stock_news(limit: int = 15) -> Dict[str, Any]:
return resp.json() return resp.json()
async def refresh_screener_snapshot() -> Dict[str, Any]:
"""stock-lab의 KRX 일봉 스냅샷 갱신 (스크리너 실행 전 호출).
네이버 금융 일괄 다운로드라 보통 30~120s, 여유있게 180s.
"""
async with httpx.AsyncClient(timeout=180.0) as client:
resp = await client.post(f"{STOCK_LAB_URL}/api/stock/screener/snapshot/refresh")
resp.raise_for_status()
return resp.json()
async def run_stock_screener(mode: str = "auto") -> Dict[str, Any]:
"""stock-lab의 스크리너 실행.
반환 status:
- 'skipped_holiday': 공휴일/주말 — telegram_payload 없음
- 'success': telegram_payload 동봉
엔진 자체는 수 초 내 끝나지만, 컨텍스트 로드+200종목 처리 여유 180s.
"""
async with httpx.AsyncClient(timeout=180.0) as client:
resp = await client.post(
f"{STOCK_LAB_URL}/api/stock/screener/run",
json={"mode": mode},
)
resp.raise_for_status()
return resp.json()
async def scrape_stock_news() -> Dict[str, Any]: async def scrape_stock_news() -> Dict[str, Any]:
"""stock-lab의 수동 뉴스 스크랩 트리거 — DB에 최신 뉴스 저장. """stock-lab의 수동 뉴스 스크랩 트리거 — DB에 최신 뉴스 저장.

View File

@@ -8,14 +8,22 @@ from .client import _enabled, api_call
from .formatter import MessageKind, format_agent_message from .formatter import MessageKind, format_agent_message
async def send_raw(text: str, reply_markup: Optional[dict] = None, chat_id: Optional[str] = None) -> dict: async def send_raw(
"""가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로.""" text: str,
reply_markup: Optional[dict] = None,
chat_id: Optional[str] = None,
parse_mode: str = "HTML",
) -> dict:
"""가장 저수준. 원문 텍스트 그대로 전송. chat_id 생략 시 기본 TELEGRAM_CHAT_ID로.
parse_mode: 기본 'HTML'. MarkdownV2 페이로드(예: 스크리너) 전송 시 명시 지정.
"""
if not _enabled(): if not _enabled():
return {"ok": False, "message_id": None} return {"ok": False, "message_id": None}
payload = { payload = {
"chat_id": chat_id or TELEGRAM_CHAT_ID, "chat_id": chat_id or TELEGRAM_CHAT_ID,
"text": text, "text": text,
"parse_mode": "HTML", "parse_mode": parse_mode,
} }
if reply_markup: if reply_markup:
payload["reply_markup"] = reply_markup payload["reply_markup"] = reply_markup

View File

@@ -0,0 +1,177 @@
"""StockAgent.on_screener_schedule — 평일 16:30 KST 자동 잡 단위 테스트.
stock-lab HTTP 호출은 service_proxy mock, 텔레그램은 messaging.send_raw mock.
"""
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
@pytest.fixture(autouse=True)
def _init_db():
import gc
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
from app.db import init_db
init_db()
yield
gc.collect()
def _success_body(asof="2026-05-12"):
return {
"asof": asof,
"mode": "auto",
"status": "success",
"run_id": 42,
"survivors_count": 600,
"top_n": 20,
"results": [],
"telegram_payload": {
"chat_target": "default",
"parse_mode": "MarkdownV2",
"text": "*KRX 강세주 스크리너* test body",
},
"warnings": [],
}
def _holiday_body(asof="2026-05-05"):
return {
"asof": asof,
"mode": "auto",
"status": "skipped_holiday",
"run_id": None,
"survivors_count": None,
"top_n": 0,
"results": [],
"telegram_payload": None,
"warnings": [f"{asof} is a holiday — skipped"],
}
def test_screener_success_sends_markdownv2_telegram():
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(return_value={"status": "ok"})
fake_run = AsyncMock(return_value=_success_body())
fake_send = AsyncMock(return_value={"ok": True, "message_id": 7777})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
fake_snap.assert_awaited_once()
fake_run.assert_awaited_once_with(mode="auto")
fake_send.assert_awaited_once()
args, kwargs = fake_send.call_args
# 첫 인자(text) 또는 kwargs로 전달
text = args[0] if args else kwargs.get("text")
assert "KRX 강세주 스크리너" in text
assert kwargs.get("parse_mode") == "MarkdownV2"
assert agent.state == "idle"
def test_screener_holiday_skips_telegram():
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(return_value={"status": "skipped_weekend"})
fake_run = AsyncMock(return_value=_holiday_body())
fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
fake_run.assert_awaited_once()
# 휴일이면 텔레그램 미발신
fake_send.assert_not_awaited()
assert agent.state == "idle"
def test_screener_snapshot_failure_still_runs_screener():
"""스냅샷 실패는 경고만 남기고 screener 호출은 계속됨."""
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(side_effect=RuntimeError("snapshot upstream down"))
fake_run = AsyncMock(return_value=_success_body())
fake_send = AsyncMock(return_value={"ok": True, "message_id": 8888})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
fake_snap.assert_awaited_once()
fake_run.assert_awaited_once_with(mode="auto")
fake_send.assert_awaited_once()
def test_screener_run_failure_notifies_operator():
"""screener/run 실패 시 운영자 알림 텔레그램 발송."""
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(return_value={"status": "ok"})
fake_run = AsyncMock(side_effect=RuntimeError("stock-lab 500"))
fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
# 운영자 알림 1회는 호출
assert fake_send.await_count == 1
args, kwargs = fake_send.call_args
text = args[0] if args else kwargs.get("text")
assert "스크리너 실패" in text
assert agent.state == "idle"
def test_screener_unexpected_status_treated_as_failure():
from app.agents.stock import StockAgent
from app import service_proxy
from app.telegram import messaging
fake_snap = AsyncMock(return_value={"status": "ok"})
fake_run = AsyncMock(return_value={"status": "weird", "asof": "2026-05-12"})
fake_send = AsyncMock(return_value={"ok": True, "message_id": 1})
with patch.object(service_proxy, "refresh_screener_snapshot", fake_snap), \
patch.object(service_proxy, "run_stock_screener", fake_run), \
patch.object(messaging, "send_raw", fake_send):
agent = StockAgent()
asyncio.run(agent.on_screener_schedule())
# 운영자 알림 1회 + screener payload 미발송
assert fake_send.await_count == 1
args, kwargs = fake_send.call_args
text = args[0] if args else kwargs.get("text")
assert "스크리너 실패" in text