- git mv stock-lab/ → stock/ - docker-compose.yml: 서비스 키 + container_name + build.context + frontend.depends_on + agent-office STOCK_LAB_URL → STOCK_URL - agent-office/app: config.py, service_proxy.py, agents/stock.py, tests/ STOCK_LAB_URL → STOCK_URL - nginx/default.conf: proxy_pass http://stock-lab → http://stock (3 lines) - CLAUDE.md / README.md / STATUS.md / scripts/ 문구 갱신 - stock/ 내부 자기 참조 갱신 lab 네이밍 정책 (feedback_lab_naming.md) graduation. API URL / Python import / DB 파일명 변경 없음.
392 lines
16 KiB
Python
392 lines
16 KiB
Python
import asyncio
|
|
import html
|
|
from typing import Optional
|
|
|
|
from .base import BaseAgent
|
|
from ..db import create_task, update_task_status, get_agent_config, add_log
|
|
from .. import service_proxy
|
|
|
|
|
|
def _build_briefing_body(result: dict, max_headlines: int = 5) -> str:
|
|
"""아침 시장 브리핑 본문 조립.
|
|
|
|
LLM 요약 + 주요 뉴스 헤드라인(링크) 섹션을 합친다.
|
|
향후 본문 고도화 시 이 함수만 수정하면 됨 (텔레그램 HTML parse_mode).
|
|
"""
|
|
summary = (result.get("summary") or "").strip()
|
|
articles = result.get("articles") or []
|
|
|
|
# body_is_html=True 로 보낼 예정이므로 LLM 요약(plain text)도 escape
|
|
parts = [html.escape(summary)] if summary else []
|
|
|
|
headlines = []
|
|
for a in articles[:max_headlines]:
|
|
title = (a.get("title") or "").strip()
|
|
if not title:
|
|
continue
|
|
title_esc = html.escape(title)
|
|
link = (a.get("link") or "").strip()
|
|
press = (a.get("press") or "").strip()
|
|
press_suffix = f" — {html.escape(press)}" if press else ""
|
|
if link:
|
|
headlines.append(f'• <a href="{html.escape(link, quote=True)}">{title_esc}</a>{press_suffix}')
|
|
else:
|
|
headlines.append(f"• {title_esc}{press_suffix}")
|
|
|
|
if headlines:
|
|
parts.append("📰 <b>주요 뉴스</b>\n" + "\n".join(headlines))
|
|
|
|
return "\n\n".join(parts)
|
|
|
|
|
|
class StockAgent(BaseAgent):
|
|
agent_id = "stock"
|
|
display_name = "주식 트레이더"
|
|
|
|
async def on_schedule(self) -> None:
|
|
if self.state not in ("idle", "break"):
|
|
return
|
|
|
|
task_id = create_task(self.agent_id, "news_summary", {"limit": 15})
|
|
await self.transition("working", "최신 뉴스 수집 중...", task_id)
|
|
|
|
try:
|
|
# stock cron(매일 8:00)이 7:30 브리핑보다 늦게 돌아 어제 뉴스가
|
|
# 요약되던 문제 방지 — 요약 직전에 동기 스크랩으로 DB를 갱신한다.
|
|
try:
|
|
await service_proxy.scrape_stock_news()
|
|
except Exception as e:
|
|
add_log(self.agent_id, f"뉴스 스크랩 실패 (이전 데이터로 진행): {e}", "warning", task_id)
|
|
|
|
await self.transition("working", "AI 뉴스 요약 생성 중...")
|
|
|
|
# AI 요약 호출 (LLM 처리는 stock이 담당)
|
|
result = await service_proxy.summarize_stock_news(limit=15)
|
|
|
|
await self.transition("reporting", "뉴스 요약 전송 중...")
|
|
|
|
body = _build_briefing_body(result)
|
|
|
|
# 새 통합 텔레그램 API 사용
|
|
from ..telegram import send_agent_message
|
|
tg_result = await send_agent_message(
|
|
agent_id=self.agent_id,
|
|
kind="report",
|
|
title="아침 시장 브리핑",
|
|
body=body,
|
|
body_is_html=True,
|
|
task_id=task_id,
|
|
metadata={
|
|
"tokens": result["tokens"]["total"],
|
|
"duration_ms": result["duration_ms"],
|
|
"model": result["model"],
|
|
},
|
|
)
|
|
|
|
# 아내 chat 추가 전송 (설정된 경우) — 제목 + 본문만 간결하게
|
|
from ..config import TELEGRAM_WIFE_CHAT_ID
|
|
if TELEGRAM_WIFE_CHAT_ID:
|
|
from ..telegram.messaging import send_raw
|
|
wife_text = f"📈 <b>아침 시장 브리핑</b>\n\n{body}"
|
|
wife_result = await send_raw(wife_text, chat_id=TELEGRAM_WIFE_CHAT_ID)
|
|
if not wife_result.get("ok"):
|
|
desc = wife_result.get("description") or "unknown"
|
|
add_log(self.agent_id, f"Wife telegram send failed: {desc}", "warning", task_id)
|
|
|
|
update_task_status(task_id, "succeeded", {
|
|
"summary": result["summary"],
|
|
"article_count": result.get("article_count", 0),
|
|
"tokens": result["tokens"],
|
|
"model": result["model"],
|
|
"duration_ms": result["duration_ms"],
|
|
"telegram_sent": tg_result.get("ok", False),
|
|
"telegram_message_id": tg_result.get("message_id"),
|
|
})
|
|
|
|
if not tg_result.get("ok"):
|
|
desc = tg_result.get("description") or "unknown"
|
|
code = tg_result.get("error_code")
|
|
add_log(self.agent_id, f"Telegram send failed: [{code}] {desc}", "warning", task_id)
|
|
if self._ws_manager:
|
|
await self._ws_manager.send_notification(
|
|
self.agent_id, "telegram_failed", task_id, "텔레그램 전송 실패"
|
|
)
|
|
|
|
await self.transition("idle", "뉴스 요약 완료")
|
|
|
|
except Exception as e:
|
|
add_log(self.agent_id, f"News summary failed: {e}", "error", task_id)
|
|
update_task_status(task_id, "failed", {"error": str(e)})
|
|
await self.transition("idle", f"오류: {e}")
|
|
|
|
async def on_screener_schedule(self) -> None:
|
|
"""KRX 강세주 스크리너 자동 잡 (평일 16:30 KST).
|
|
|
|
흐름:
|
|
1) snapshot/refresh — 일봉 갱신 (실패해도 진행, 경고 로그)
|
|
2) screener/run mode='auto' — 실행 + 결과 영구화 + telegram_payload 응답
|
|
3) status=='skipped_holiday' → 종료 (텔레그램 미발신)
|
|
4) status=='success' → telegram_payload.text 를 parse_mode 그대로 전송
|
|
5) 예외/실패 → 운영자에게 별도 텔레그램 알림 (HTML)
|
|
"""
|
|
if self.state not in ("idle", "break"):
|
|
return
|
|
|
|
task_id = create_task(self.agent_id, "screener_run", {"mode": "auto"})
|
|
await self.transition("working", "스크리너 스냅샷 갱신 중...", task_id)
|
|
|
|
try:
|
|
# 1) 스냅샷 갱신 — 실패해도 기존 일봉 데이터로 진행
|
|
try:
|
|
snap = await service_proxy.refresh_screener_snapshot()
|
|
add_log(
|
|
self.agent_id,
|
|
f"snapshot refreshed: status={snap.get('status', '?')}",
|
|
"info", task_id,
|
|
)
|
|
except Exception as e:
|
|
add_log(
|
|
self.agent_id,
|
|
f"스냅샷 갱신 실패 (기존 데이터로 진행): {e}",
|
|
"warning", task_id,
|
|
)
|
|
|
|
await self.transition("working", "스크리너 실행 중...")
|
|
|
|
# 2) 스크리너 실행
|
|
body = await service_proxy.run_stock_screener(mode="auto")
|
|
status = body.get("status")
|
|
asof = body.get("asof")
|
|
|
|
# 3) 공휴일 — 종료
|
|
if status == "skipped_holiday":
|
|
update_task_status(task_id, "succeeded", {
|
|
"status": status,
|
|
"asof": asof,
|
|
"telegram_sent": False,
|
|
})
|
|
add_log(self.agent_id, f"스크리너 건너뜀 (휴일): {asof}", "info", task_id)
|
|
await self.transition("idle", "휴일 — 스크리너 건너뜀")
|
|
return
|
|
|
|
# 4) 성공 → 텔레그램 전송
|
|
if status == "success":
|
|
payload = body.get("telegram_payload") or {}
|
|
text = payload.get("text") or ""
|
|
parse_mode = payload.get("parse_mode", "MarkdownV2")
|
|
|
|
if not text:
|
|
raise RuntimeError("telegram_payload.text 누락")
|
|
|
|
await self.transition("reporting", "스크리너 결과 전송 중...")
|
|
|
|
from ..telegram.messaging import send_raw
|
|
tg = await send_raw(text, parse_mode=parse_mode)
|
|
|
|
update_task_status(task_id, "succeeded", {
|
|
"status": status,
|
|
"asof": asof,
|
|
"run_id": body.get("run_id"),
|
|
"survivors_count": body.get("survivors_count"),
|
|
"telegram_sent": tg.get("ok", False),
|
|
"telegram_message_id": tg.get("message_id"),
|
|
})
|
|
|
|
if not tg.get("ok"):
|
|
desc = tg.get("description") or "unknown"
|
|
code = tg.get("error_code")
|
|
add_log(
|
|
self.agent_id,
|
|
f"Screener telegram send failed: [{code}] {desc}",
|
|
"warning", task_id,
|
|
)
|
|
if self._ws_manager:
|
|
await self._ws_manager.send_notification(
|
|
self.agent_id, "telegram_failed", task_id,
|
|
"스크리너 텔레그램 전송 실패",
|
|
)
|
|
|
|
await self.transition("idle", "스크리너 완료")
|
|
return
|
|
|
|
# 5) 기타 status — failed 취급
|
|
raise RuntimeError(f"unexpected screener status: {status}")
|
|
|
|
except Exception as e:
|
|
err_msg = str(e)
|
|
add_log(self.agent_id, f"Screener job failed: {err_msg}", "error", task_id)
|
|
update_task_status(task_id, "failed", {"error": err_msg})
|
|
|
|
# 운영자 알림 — 기본 HTML parse_mode 사용
|
|
try:
|
|
from ..telegram.messaging import send_raw
|
|
await send_raw(
|
|
f"⚠️ <b>KRX 스크리너 실패</b>\n"
|
|
f"<code>{html.escape(err_msg)[:500]}</code>"
|
|
)
|
|
except Exception as notify_err:
|
|
add_log(
|
|
self.agent_id,
|
|
f"operator notify failed: {notify_err}",
|
|
"warning", task_id,
|
|
)
|
|
|
|
await self.transition("idle", f"스크리너 오류: {err_msg[:80]}")
|
|
|
|
async def on_ai_news_schedule(self) -> None:
|
|
"""AI 뉴스 sentiment 분석 자동 잡 (평일 08:00 KST).
|
|
|
|
흐름:
|
|
1) stock /snapshot/refresh-news-sentiment 호출
|
|
2) status='skipped_weekend'/'skipped_holiday' → 종료 (텔레그램 미발신)
|
|
3) updated=0 → 운영자 알림 (HTML)
|
|
4) failures > 30% → 경고 알림 후 메인 메시지 발송
|
|
5) 정상 → Top 5 호재/악재 메시지 발송 (MarkdownV2)
|
|
"""
|
|
if self.state not in ("idle", "break"):
|
|
return
|
|
|
|
task_id = create_task(self.agent_id, "ai_news_sentiment", {})
|
|
await self.transition("working", "AI 뉴스 분석 중...", task_id)
|
|
|
|
try:
|
|
result = await service_proxy.refresh_ai_news_sentiment()
|
|
except Exception as e:
|
|
err_msg = str(e)
|
|
add_log(self.agent_id, f"AI 뉴스 분석 실패: {err_msg}", "error", task_id)
|
|
update_task_status(task_id, "failed", {"error": err_msg})
|
|
try:
|
|
from ..telegram.messaging import send_raw
|
|
await send_raw(
|
|
f"⚠️ <b>AI 뉴스 분석 실패</b>\n"
|
|
f"<code>{html.escape(err_msg)[:500]}</code>"
|
|
)
|
|
except Exception as notify_err:
|
|
add_log(
|
|
self.agent_id,
|
|
f"operator notify failed: {notify_err}",
|
|
"warning", task_id,
|
|
)
|
|
await self.transition("idle", f"AI 뉴스 오류: {err_msg[:80]}")
|
|
return
|
|
|
|
status = result.get("status")
|
|
if status in ("skipped_weekend", "skipped_holiday"):
|
|
update_task_status(task_id, "succeeded", {"status": status})
|
|
add_log(self.agent_id, f"AI 뉴스 건너뜀: {status}", "info", task_id)
|
|
await self.transition("idle", "휴일/주말 — 건너뜀")
|
|
return
|
|
|
|
updated = int(result.get("updated", 0))
|
|
failures = result.get("failures", []) or []
|
|
if updated == 0:
|
|
update_task_status(task_id, "failed", {"reason": "0 tickers updated"})
|
|
try:
|
|
from ..telegram.messaging import send_raw
|
|
await send_raw(
|
|
"⚠️ <b>AI 뉴스 분석 0종목</b>\n"
|
|
"스크래핑/LLM 전체 실패 — 어제 데이터 사용"
|
|
)
|
|
except Exception:
|
|
pass
|
|
await self.transition("idle", "AI 뉴스 0건")
|
|
return
|
|
|
|
# 실패율 경고 (별도 알림, 본 메시지는 계속 발송)
|
|
failure_rate = len(failures) / max(1, updated + len(failures))
|
|
if failure_rate > 0.3:
|
|
try:
|
|
from ..telegram.messaging import send_raw
|
|
await send_raw(
|
|
f"⚠️ <b>AI 뉴스 실패율 {failure_rate:.0%}</b>\n"
|
|
f"updated={updated}, failures={len(failures)}"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# 정상 — Top 5 메시지 (stock이 빌드해서 응답에 telegram_text 동봉)
|
|
text = result.get("telegram_text") or ""
|
|
if not text:
|
|
add_log(self.agent_id, "telegram_text 누락 — stock 응답 결함", "error", task_id)
|
|
update_task_status(task_id, "failed", {"error": "telegram_text 누락"})
|
|
await self.transition("idle", "AI 뉴스 응답 결함")
|
|
return
|
|
|
|
await self.transition("reporting", "AI 뉴스 알림 전송 중...")
|
|
from ..telegram.messaging import send_raw
|
|
tg = await send_raw(text, parse_mode="MarkdownV2")
|
|
|
|
update_task_status(task_id, "succeeded", {
|
|
"asof": result["asof"],
|
|
"updated": updated,
|
|
"failures": len(failures),
|
|
"tokens_input": int(result.get("tokens_input", 0)),
|
|
"tokens_output": int(result.get("tokens_output", 0)),
|
|
"telegram_sent": tg.get("ok", False),
|
|
})
|
|
|
|
if not tg.get("ok"):
|
|
desc = tg.get("description") or "unknown"
|
|
code = tg.get("error_code")
|
|
add_log(
|
|
self.agent_id,
|
|
f"AI news telegram send failed: [{code}] {desc}",
|
|
"warning", task_id,
|
|
)
|
|
|
|
await self.transition("idle", "AI 뉴스 완료")
|
|
|
|
async def on_command(self, command: str, params: dict) -> dict:
|
|
if command == "run_screener":
|
|
await self.on_screener_schedule()
|
|
return {"ok": True, "message": "스크리너 실행 트리거 완료"}
|
|
|
|
if command == "run_ai_news":
|
|
await self.on_ai_news_schedule()
|
|
return {"ok": True, "message": "AI 뉴스 분석 트리거 완료"}
|
|
|
|
if command == "test_telegram":
|
|
from ..telegram import send_agent_message
|
|
result = await send_agent_message(
|
|
agent_id=self.agent_id,
|
|
kind="info",
|
|
title="연결 테스트",
|
|
body="텔레그램 연동이 정상적으로 동작합니다.",
|
|
)
|
|
return {
|
|
"ok": result.get("ok", False),
|
|
"message": "텔레그램 전송 성공" if result.get("ok") else "텔레그램 전송 실패",
|
|
"telegram_message_id": result.get("message_id"),
|
|
}
|
|
|
|
if command == "fetch_news":
|
|
await self.on_schedule()
|
|
return {"ok": True, "message": "뉴스 수집 시작"}
|
|
|
|
if command == "add_alert":
|
|
symbol = params.get("symbol")
|
|
target_price = params.get("target_price")
|
|
if not symbol or target_price is None:
|
|
return {"ok": False, "message": "symbol과 target_price는 필수입니다"}
|
|
config = get_agent_config(self.agent_id)
|
|
alerts = config["custom_config"].get("alerts", [])
|
|
alerts.append({
|
|
"symbol": symbol,
|
|
"name": params.get("name", symbol),
|
|
"target_price": target_price,
|
|
"direction": params.get("direction", "above"),
|
|
})
|
|
from ..db import update_agent_config
|
|
update_agent_config(self.agent_id, custom_config={**config["custom_config"], "alerts": alerts})
|
|
return {"ok": True, "message": f"알람 추가: {params['symbol']}"}
|
|
|
|
if command == "list_alerts":
|
|
config = get_agent_config(self.agent_id)
|
|
alerts = config["custom_config"].get("alerts", [])
|
|
return {"ok": True, "alerts": alerts}
|
|
|
|
return {"ok": False, "message": f"Unknown command: {command}"}
|
|
|
|
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
|
|
pass
|