46 lines
1.7 KiB
Python
46 lines
1.7 KiB
Python
"""매매 알람 텔레그램 포맷+전송 (본인+아내 각각)."""
|
|
import logging
|
|
from typing import Any, Dict, List
|
|
|
|
from ..telegram.messaging import send_raw
|
|
from ..config import TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID
|
|
|
|
logger = logging.getLogger("agent-office")
|
|
|
|
_KIND_LABEL = {"buy": "🟢 매수", "sell": "🔴 매도"}
|
|
_COND_LABEL = {
|
|
"buy_ma20_pullback": "지지선 되돌림", "buy_breakout": "돌파", "buy_rsi_bounce": "RSI 과매도 반등",
|
|
"sell_stop_loss": "손절", "sell_ma_break": "이평 이탈", "sell_take_profit": "익절",
|
|
"sell_climax": "급등 소진", "sell_trailing_stop": "트레일링 스톱",
|
|
}
|
|
|
|
|
|
def format_trade_alert(a: Dict[str, Any]) -> str:
|
|
kind = _KIND_LABEL.get(a["kind"], a["kind"])
|
|
cond = _COND_LABEL.get(a["condition"], a["condition"])
|
|
name = a.get("name") or a["ticker"]
|
|
price = a.get("price")
|
|
price_s = f"{int(price):,}원" if price else "-"
|
|
return f"{kind} 알람\n<b>{name}</b> ({a['ticker']})\n조건: {cond}\n현재가: {price_s}"
|
|
|
|
|
|
async def send_trade_alerts(alerts: List[Dict[str, Any]]) -> dict:
|
|
"""알람마다 본인+아내 chat_id 각각으로 send_raw. 실패해도 계속 진행."""
|
|
sent = 0
|
|
all_ok = True
|
|
chat_ids = [c for c in (TELEGRAM_CHAT_ID, TELEGRAM_WIFE_CHAT_ID) if c]
|
|
for a in alerts:
|
|
text = format_trade_alert(a)
|
|
for cid in chat_ids:
|
|
try:
|
|
r = await send_raw(text, chat_id=cid)
|
|
except Exception as e:
|
|
logger.warning(f"[telegram_trade] send failed (chat_id={cid}): {e}")
|
|
all_ok = False
|
|
continue
|
|
if r.get("ok"):
|
|
sent += 1
|
|
else:
|
|
all_ok = False
|
|
return {"sent": sent, "ok": all_ok}
|