import asyncio import html from typing import Optional from .base import BaseAgent from ..db import create_task, update_task_status, get_agent_config, add_log from .. import service_proxy def _build_briefing_body(result: dict, max_headlines: int = 5) -> str: """아침 시장 브리핑 본문 조립. LLM 요약 + 주요 뉴스 헤드라인(링크) 섹션을 합친다. 향후 본문 고도화 시 이 함수만 수정하면 됨 (텔레그램 HTML parse_mode). """ summary = (result.get("summary") or "").strip() articles = result.get("articles") or [] # body_is_html=True 로 보낼 예정이므로 LLM 요약(plain text)도 escape parts = [html.escape(summary)] if summary else [] headlines = [] for a in articles[:max_headlines]: title = (a.get("title") or "").strip() if not title: continue title_esc = html.escape(title) link = (a.get("link") or "").strip() press = (a.get("press") or "").strip() press_suffix = f" — {html.escape(press)}" if press else "" if link: headlines.append(f'• {title_esc}{press_suffix}') else: headlines.append(f"• {title_esc}{press_suffix}") if headlines: parts.append("📰 주요 뉴스\n" + "\n".join(headlines)) return "\n\n".join(parts) class StockAgent(BaseAgent): agent_id = "stock" display_name = "주식 트레이더" async def on_schedule(self) -> None: if self.state not in ("idle", "break"): return task_id = create_task(self.agent_id, "news_summary", {"limit": 15}) await self.transition("working", "최신 뉴스 수집 중...", task_id) try: # stock cron(매일 8:00)이 7:30 브리핑보다 늦게 돌아 어제 뉴스가 # 요약되던 문제 방지 — 요약 직전에 동기 스크랩으로 DB를 갱신한다. try: await service_proxy.scrape_stock_news() except Exception as e: add_log(self.agent_id, f"뉴스 스크랩 실패 (이전 데이터로 진행): {e}", "warning", task_id) await self.transition("working", "AI 뉴스 요약 생성 중...") # AI 요약 호출 (LLM 처리는 stock이 담당) result = await service_proxy.summarize_stock_news(limit=15) await self.transition("reporting", "뉴스 요약 전송 중...") body = _build_briefing_body(result) # 새 통합 텔레그램 API 사용 from ..telegram import send_agent_message tg_result = await send_agent_message( agent_id=self.agent_id, kind="report", title="아침 시장 브리핑", body=body, body_is_html=True, task_id=task_id, metadata={ "tokens": result["tokens"]["total"], "duration_ms": result["duration_ms"], "model": result["model"], }, ) # 아내 chat 추가 전송 (설정된 경우) — 제목 + 본문만 간결하게 from ..config import TELEGRAM_WIFE_CHAT_ID if TELEGRAM_WIFE_CHAT_ID: from ..telegram.messaging import send_raw wife_text = f"📈 아침 시장 브리핑\n\n{body}" wife_result = await send_raw(wife_text, chat_id=TELEGRAM_WIFE_CHAT_ID) if not wife_result.get("ok"): desc = wife_result.get("description") or "unknown" add_log(self.agent_id, f"Wife telegram send failed: {desc}", "warning", task_id) update_task_status(task_id, "succeeded", { "summary": result["summary"], "article_count": result.get("article_count", 0), "tokens": result["tokens"], "model": result["model"], "duration_ms": result["duration_ms"], "telegram_sent": tg_result.get("ok", False), "telegram_message_id": tg_result.get("message_id"), }) if not tg_result.get("ok"): desc = tg_result.get("description") or "unknown" code = tg_result.get("error_code") add_log(self.agent_id, f"Telegram send failed: [{code}] {desc}", "warning", task_id) if self._ws_manager: await self._ws_manager.send_notification( self.agent_id, "telegram_failed", task_id, "텔레그램 전송 실패" ) await self.transition("idle", "뉴스 요약 완료") except Exception as e: add_log(self.agent_id, f"News summary failed: {e}", "error", task_id) update_task_status(task_id, "failed", {"error": str(e)}) await self.transition("idle", f"오류: {e}") async def on_screener_schedule(self) -> None: """KRX 강세주 스크리너 자동 잡 (평일 16:30 KST). 흐름: 1) snapshot/refresh — 일봉 갱신 (실패해도 진행, 경고 로그) 2) screener/run mode='auto' — 실행 + 결과 영구화 + telegram_payload 응답 3) status=='skipped_holiday' → 종료 (텔레그램 미발신) 4) status=='success' → telegram_payload.text 를 parse_mode 그대로 전송 5) 예외/실패 → 운영자에게 별도 텔레그램 알림 (HTML) """ if self.state not in ("idle", "break"): return task_id = create_task(self.agent_id, "screener_run", {"mode": "auto"}) await self.transition("working", "스크리너 스냅샷 갱신 중...", task_id) try: # 1) 스냅샷 갱신 — 실패해도 기존 일봉 데이터로 진행 try: snap = await service_proxy.refresh_screener_snapshot() add_log( self.agent_id, f"snapshot refreshed: status={snap.get('status', '?')}", "info", task_id, ) except Exception as e: add_log( self.agent_id, f"스냅샷 갱신 실패 (기존 데이터로 진행): {e}", "warning", task_id, ) await self.transition("working", "스크리너 실행 중...") # 2) 스크리너 실행 body = await service_proxy.run_stock_screener(mode="auto") status = body.get("status") asof = body.get("asof") # 3) 공휴일 — 종료 if status == "skipped_holiday": update_task_status(task_id, "succeeded", { "status": status, "asof": asof, "telegram_sent": False, }) add_log(self.agent_id, f"스크리너 건너뜀 (휴일): {asof}", "info", task_id) await self.transition("idle", "휴일 — 스크리너 건너뜀") return # 4) 성공 → 텔레그램 전송 if status == "success": payload = body.get("telegram_payload") or {} text = payload.get("text") or "" parse_mode = payload.get("parse_mode", "MarkdownV2") if not text: raise RuntimeError("telegram_payload.text 누락") await self.transition("reporting", "스크리너 결과 전송 중...") from ..telegram.messaging import send_raw tg = await send_raw(text, parse_mode=parse_mode) update_task_status(task_id, "succeeded", { "status": status, "asof": asof, "run_id": body.get("run_id"), "survivors_count": body.get("survivors_count"), "telegram_sent": tg.get("ok", False), "telegram_message_id": tg.get("message_id"), }) if not tg.get("ok"): desc = tg.get("description") or "unknown" code = tg.get("error_code") add_log( self.agent_id, f"Screener telegram send failed: [{code}] {desc}", "warning", task_id, ) if self._ws_manager: await self._ws_manager.send_notification( self.agent_id, "telegram_failed", task_id, "스크리너 텔레그램 전송 실패", ) await self.transition("idle", "스크리너 완료") return # 5) 기타 status — failed 취급 raise RuntimeError(f"unexpected screener status: {status}") except Exception as e: err_msg = str(e) add_log(self.agent_id, f"Screener job failed: {err_msg}", "error", task_id) update_task_status(task_id, "failed", {"error": err_msg}) # 운영자 알림 — 기본 HTML parse_mode 사용 try: from ..telegram.messaging import send_raw await send_raw( f"⚠️ KRX 스크리너 실패\n" f"{html.escape(err_msg)[:500]}" ) except Exception as notify_err: add_log( self.agent_id, f"operator notify failed: {notify_err}", "warning", task_id, ) await self.transition("idle", f"스크리너 오류: {err_msg[:80]}") async def on_ai_news_schedule(self) -> None: """AI 뉴스 sentiment 분석 자동 잡 (평일 08:00 KST). 흐름: 1) stock /snapshot/refresh-news-sentiment 호출 2) status='skipped_weekend'/'skipped_holiday' → 종료 (텔레그램 미발신) 3) updated=0 → 운영자 알림 (HTML) 4) failures > 30% → 경고 알림 후 메인 메시지 발송 5) 정상 → Top 5 호재/악재 메시지 발송 (MarkdownV2) """ if self.state not in ("idle", "break"): return task_id = create_task(self.agent_id, "ai_news_sentiment", {}) await self.transition("working", "AI 뉴스 분석 중...", task_id) try: result = await service_proxy.refresh_ai_news_sentiment() except Exception as e: err_msg = str(e) add_log(self.agent_id, f"AI 뉴스 분석 실패: {err_msg}", "error", task_id) update_task_status(task_id, "failed", {"error": err_msg}) try: from ..telegram.messaging import send_raw await send_raw( f"⚠️ AI 뉴스 분석 실패\n" f"{html.escape(err_msg)[:500]}" ) except Exception as notify_err: add_log( self.agent_id, f"operator notify failed: {notify_err}", "warning", task_id, ) await self.transition("idle", f"AI 뉴스 오류: {err_msg[:80]}") return status = result.get("status") if status in ("skipped_weekend", "skipped_holiday"): update_task_status(task_id, "succeeded", {"status": status}) add_log(self.agent_id, f"AI 뉴스 건너뜀: {status}", "info", task_id) await self.transition("idle", "휴일/주말 — 건너뜀") return updated = int(result.get("updated", 0)) failures = result.get("failures", []) or [] if updated == 0: update_task_status(task_id, "failed", {"reason": "0 tickers updated"}) try: from ..telegram.messaging import send_raw await send_raw( "⚠️ AI 뉴스 분석 0종목\n" "스크래핑/LLM 전체 실패 — 어제 데이터 사용" ) except Exception: pass await self.transition("idle", "AI 뉴스 0건") return # 실패율 경고 (별도 알림, 본 메시지는 계속 발송) failure_rate = len(failures) / max(1, updated + len(failures)) if failure_rate > 0.3: try: from ..telegram.messaging import send_raw await send_raw( f"⚠️ AI 뉴스 실패율 {failure_rate:.0%}\n" f"updated={updated}, failures={len(failures)}" ) except Exception: pass # 정상 — Top 5 메시지 (stock이 빌드해서 응답에 telegram_text 동봉) text = result.get("telegram_text") or "" if not text: add_log(self.agent_id, "telegram_text 누락 — stock 응답 결함", "error", task_id) update_task_status(task_id, "failed", {"error": "telegram_text 누락"}) await self.transition("idle", "AI 뉴스 응답 결함") return await self.transition("reporting", "AI 뉴스 알림 전송 중...") from ..telegram.messaging import send_raw tg = await send_raw(text, parse_mode="MarkdownV2") update_task_status(task_id, "succeeded", { "asof": result["asof"], "updated": updated, "failures": len(failures), "tokens_input": int(result.get("tokens_input", 0)), "tokens_output": int(result.get("tokens_output", 0)), "telegram_sent": tg.get("ok", False), }) if not tg.get("ok"): desc = tg.get("description") or "unknown" code = tg.get("error_code") add_log( self.agent_id, f"AI news telegram send failed: [{code}] {desc}", "warning", task_id, ) await self.transition("idle", "AI 뉴스 완료") async def on_command(self, command: str, params: dict) -> dict: if command == "run_screener": await self.on_screener_schedule() return {"ok": True, "message": "스크리너 실행 트리거 완료"} if command == "run_ai_news": await self.on_ai_news_schedule() return {"ok": True, "message": "AI 뉴스 분석 트리거 완료"} if command == "test_telegram": from ..telegram import send_agent_message result = await send_agent_message( agent_id=self.agent_id, kind="info", title="연결 테스트", body="텔레그램 연동이 정상적으로 동작합니다.", ) return { "ok": result.get("ok", False), "message": "텔레그램 전송 성공" if result.get("ok") else "텔레그램 전송 실패", "telegram_message_id": result.get("message_id"), } if command == "fetch_news": await self.on_schedule() return {"ok": True, "message": "뉴스 수집 시작"} if command == "add_alert": symbol = params.get("symbol") target_price = params.get("target_price") if not symbol or target_price is None: return {"ok": False, "message": "symbol과 target_price는 필수입니다"} config = get_agent_config(self.agent_id) alerts = config["custom_config"].get("alerts", []) alerts.append({ "symbol": symbol, "name": params.get("name", symbol), "target_price": target_price, "direction": params.get("direction", "above"), }) from ..db import update_agent_config update_agent_config(self.agent_id, custom_config={**config["custom_config"], "alerts": alerts}) return {"ok": True, "message": f"알람 추가: {params['symbol']}"} if command == "list_alerts": config = get_agent_config(self.agent_id) alerts = config["custom_config"].get("alerts", []) return {"ok": True, "alerts": alerts} return {"ok": False, "message": f"Unknown command: {command}"} async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None: pass