diff --git a/agent-office/app/main.py b/agent-office/app/main.py index 608b6c6..cf60454 100644 --- a/agent-office/app/main.py +++ b/agent-office/app/main.py @@ -278,3 +278,19 @@ async def trigger_signal_check(source: str = "light"): if not agent: raise HTTPException(status_code=503, detail="lotto agent not registered") return await agent.run_signal_check(source=source) + + +# --- Trade Alert Notify Endpoint --- + +class TradeAlertBody(BaseModel): + alerts: List[Dict[str, Any]] = [] + + +@app.post("/api/agent-office/stock/trade-alert") +async def stock_trade_alert(body: TradeAlertBody): + from .notifiers.telegram_trade import send_trade_alerts + from .db import add_log + res = await send_trade_alerts(body.alerts) + for a in body.alerts: + add_log("stock", f"매매알람 {a.get('kind')} {a.get('ticker')} {a.get('condition')}", "info") + return res diff --git a/agent-office/app/notifiers/telegram_trade.py b/agent-office/app/notifiers/telegram_trade.py new file mode 100644 index 0000000..898d9b9 --- /dev/null +++ b/agent-office/app/notifiers/telegram_trade.py @@ -0,0 +1,45 @@ +"""매매 알람 텔레그램 포맷+전송 (본인+아내 각각).""" +import logging +from typing import Any, Dict, List + +from ..telegram.messaging import send_raw +from ..config import TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID + +logger = logging.getLogger("agent-office") + +_KIND_LABEL = {"buy": "🟢 매수", "sell": "🔴 매도"} +_COND_LABEL = { + "buy_ma20_pullback": "지지선 되돌림", "buy_breakout": "돌파", "buy_rsi_bounce": "RSI 과매도 반등", + "sell_stop_loss": "손절", "sell_ma_break": "이평 이탈", "sell_take_profit": "익절", + "sell_climax": "급등 소진", "sell_trailing_stop": "트레일링 스톱", +} + + +def format_trade_alert(a: Dict[str, Any]) -> str: + kind = _KIND_LABEL.get(a["kind"], a["kind"]) + cond = _COND_LABEL.get(a["condition"], a["condition"]) + name = a.get("name") or a["ticker"] + price = a.get("price") + price_s = f"{int(price):,}원" if price else "-" + return f"{kind} 알람\n{name} ({a['ticker']})\n조건: {cond}\n현재가: {price_s}" + + +async def send_trade_alerts(alerts: List[Dict[str, Any]]) -> dict: + """알람마다 본인+아내 chat_id 각각으로 send_raw. 실패해도 계속 진행.""" + sent = 0 + all_ok = True + chat_ids = [c for c in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if c] + for a in alerts: + text = format_trade_alert(a) + for cid in chat_ids: + try: + r = await send_raw(text, chat_id=cid) + except Exception as e: + logger.warning(f"[telegram_trade] send failed (chat_id={cid}): {e}") + all_ok = False + continue + if r.get("ok"): + sent += 1 + else: + all_ok = False + return {"sent": sent, "ok": all_ok} diff --git a/agent-office/tests/test_trade_alert_notify.py b/agent-office/tests/test_trade_alert_notify.py new file mode 100644 index 0000000..cf6844d --- /dev/null +++ b/agent-office/tests/test_trade_alert_notify.py @@ -0,0 +1,55 @@ +import os +import sys +import tempfile + +_fd, _TMP = tempfile.mkstemp(suffix=".db") +os.close(_fd) +os.unlink(_TMP) +os.environ["AGENT_OFFICE_DB_PATH"] = _TMP + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import pytest +from unittest.mock import AsyncMock, patch + + +@pytest.fixture(autouse=True) +def _init_db(monkeypatch): + import gc + gc.collect() + # config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시 + # db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로 + # 강제해 영속 테이블의 테스트 간 누수를 결정적으로 차단. + import app.db as _db + monkeypatch.setattr(_db, "DB_PATH", _TMP) + # WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음 + for suffix in ("", "-wal", "-shm"): + p = _TMP + suffix + if os.path.exists(p): + os.remove(p) + _db.init_db() + yield + gc.collect() + + +@pytest.mark.asyncio +async def test_send_trade_alerts_to_user_and_wife(): + from app.notifiers import telegram_trade + alerts = [{"ticker": "005930", "name": "삼성전자", "kind": "buy", + "condition": "buy_breakout", "price": 71500, "detail": {}}] + with patch("app.notifiers.telegram_trade.send_raw", + new=AsyncMock(return_value={"ok": True})) as m, \ + patch("app.notifiers.telegram_trade.TELEGRAM_CHAT_ID", "U"), \ + patch("app.notifiers.telegram_trade.TELEGRAM_WIFE_CHAT_ID", "W"): + res = await telegram_trade.send_trade_alerts(alerts) + assert res["ok"] is True + chat_ids = {c.kwargs.get("chat_id") for c in m.await_args_list} + assert chat_ids == {"U", "W"} # 둘 다 발송 + + +@pytest.mark.asyncio +async def test_format_trade_alert_has_direction(): + from app.notifiers.telegram_trade import format_trade_alert + txt = format_trade_alert({"ticker": "005930", "name": "삼성전자", "kind": "sell", + "condition": "sell_stop_loss", "price": 60000, "detail": {}}) + assert "매도" in txt and "삼성전자" in txt