diff --git a/modules/analysis/evaluator.py b/modules/analysis/evaluator.py
new file mode 100644
index 0000000..a99c7e7
--- /dev/null
+++ b/modules/analysis/evaluator.py
@@ -0,0 +1,421 @@
+"""
+성과 평가 엔진 - PerformanceEvaluator
+
+기능:
+ 1. compute_metrics() - 핵심 성과 지표 계산
+ 2. get_grade() - 지표별 S/A/B/C/D/F 등급 산출
+ 3. generate_expert_panel() - Ollama LLM 5명 전문가 의견
+ 4. generate_weekly_report() - 텔레그램 HTML 주간 보고서
+"""
+import json
+import math
+from datetime import datetime, timedelta
+
+from modules.utils.performance_db import PerformanceDB
+
+
+class PerformanceEvaluator:
+ def __init__(self):
+ self.perf_db = PerformanceDB()
+
+ # ─────────────────────────────────────────
+ # 1. 핵심 지표 계산
+ # ─────────────────────────────────────────
+
+ def compute_metrics(self, snapshots, trades):
+ """성과 지표를 딕셔너리로 반환.
+
+ Args:
+ snapshots (list): daily_snapshots 리스트
+ trades (list): trade_records 리스트
+
+ Returns:
+ dict: 지표 딕셔너리 (또는 {"error": ...})
+ """
+ if not snapshots:
+ return {"error": "스냅샷 데이터 없음 (운영 시작 후 첫 영업일까지 대기)"}
+
+ metrics = {}
+
+ # ── 수익률 ──────────────────────────────
+ initial = snapshots[0].get("total_eval", 0)
+ current = snapshots[-1].get("total_eval", 0)
+ metrics["total_return_pct"] = round(
+ (current - initial) / initial * 100, 2) if initial > 0 else 0.0
+
+ cutoff_7 = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
+ recent_snaps = [s for s in snapshots if s.get("date", "") >= cutoff_7]
+ if len(recent_snaps) >= 2:
+ w_init = recent_snaps[0].get("total_eval", 0)
+ w_curr = recent_snaps[-1].get("total_eval", 0)
+ metrics["weekly_return_pct"] = round(
+ (w_curr - w_init) / w_init * 100, 2) if w_init > 0 else 0.0
+ else:
+ metrics["weekly_return_pct"] = 0.0
+
+ # ── 리스크 지표 ──────────────────────────
+ daily_returns = [s.get("daily_return_pct", 0.0) / 100.0 for s in snapshots]
+
+ if len(daily_returns) >= 2:
+ mean_daily = sum(daily_returns) / len(daily_returns)
+ variance = sum((r - mean_daily) ** 2 for r in daily_returns) / len(daily_returns)
+ std_daily = math.sqrt(variance) if variance > 0 else 0.0
+ annual_return = mean_daily * 252
+
+ # Sharpe Ratio
+ if std_daily > 0:
+ metrics["sharpe_ratio"] = round(
+ annual_return / (std_daily * math.sqrt(252)), 3)
+ else:
+ metrics["sharpe_ratio"] = 0.0
+
+ # Sortino Ratio (하방 편차만 사용)
+ downside = [r for r in daily_returns if r < 0]
+ if downside:
+ dv = sum(r ** 2 for r in downside) / len(downside)
+ ds = math.sqrt(dv)
+ metrics["sortino_ratio"] = round(
+ annual_return / (ds * math.sqrt(252)), 3) if ds > 0 else 0.0
+ else:
+ metrics["sortino_ratio"] = 10.0 # 손실 없음
+
+ # Max Drawdown
+ peak = snapshots[0].get("total_eval", 0)
+ max_dd = 0.0
+ for snap in snapshots:
+ ev = snap.get("total_eval", 0)
+ if ev > peak:
+ peak = ev
+ if peak > 0:
+ dd = (peak - ev) / peak * 100
+ if dd > max_dd:
+ max_dd = dd
+ metrics["max_drawdown_pct"] = round(max_dd, 2)
+
+ # Calmar Ratio
+ ann_pct = annual_return * 100
+ metrics["calmar_ratio"] = round(
+ ann_pct / max_dd, 3) if max_dd > 0 else 0.0
+ else:
+ metrics["sharpe_ratio"] = 0.0
+ metrics["sortino_ratio"] = 0.0
+ metrics["max_drawdown_pct"] = 0.0
+ metrics["calmar_ratio"] = 0.0
+
+ # ── 매매 지표 ─────────────────────────────
+ closed = [t for t in trades
+ if t.get("action") == "BUY" and t.get("outcome_return_pct") is not None]
+
+ if closed:
+ wins = [t for t in closed if t.get("outcome_return_pct", 0) > 0]
+ losses = [t for t in closed if t.get("outcome_return_pct", 0) <= 0]
+
+ metrics["win_rate_pct"] = round(
+ len(wins) / len(closed) * 100, 1)
+
+ total_profit = sum(t["outcome_return_pct"] for t in wins)
+ total_loss = abs(sum(t["outcome_return_pct"] for t in losses))
+ metrics["profit_factor"] = round(
+ total_profit / total_loss, 3) if total_loss > 0 else 10.0
+
+ hd_list = [t["holding_days"] for t in closed if t.get("holding_days") is not None]
+ metrics["avg_holding_days"] = round(
+ sum(hd_list) / len(hd_list), 1) if hd_list else 0.0
+ else:
+ metrics["win_rate_pct"] = 0.0
+ metrics["profit_factor"] = 0.0
+ metrics["avg_holding_days"] = 0.0
+
+ metrics["total_trades"] = len(closed)
+
+ # ── 벤치마크 Alpha ────────────────────────
+ kospi_vals = [s.get("benchmark_kospi_close") for s in snapshots]
+ kospi_valid = [k for k in kospi_vals if k is not None]
+ if len(kospi_valid) >= 2:
+ kospi_ret = (kospi_valid[-1] - kospi_valid[0]) / kospi_valid[0] * 100
+ metrics["alpha"] = round(metrics["total_return_pct"] - kospi_ret, 2)
+ metrics["kospi_return_pct"] = round(kospi_ret, 2)
+ else:
+ metrics["alpha"] = 0.0
+ metrics["kospi_return_pct"] = 0.0
+
+ # ── AI 품질 지표 ──────────────────────────
+ if closed:
+ # LSTM 방향 정확도
+ correct = 0
+ direction_n = 0
+ for t in closed:
+ pred = t.get("ai_prediction_change")
+ outcome = t.get("outcome_return_pct")
+ if pred is not None and outcome is not None:
+ if (pred > 0) == (outcome > 0):
+ correct += 1
+ direction_n += 1
+ metrics["lstm_direction_accuracy"] = round(
+ correct / direction_n * 100, 1) if direction_n > 0 else 0.0
+
+ # 신호별 수익 상관도
+ outcomes = [t.get("outcome_return_pct", 0) for t in closed]
+
+ def pearson(xs, ys):
+ n = len(xs)
+ if n < 2:
+ return 0.0
+ mx = sum(xs) / n
+ my = sum(ys) / n
+ num = sum((x - mx) * (y - my) for x, y in zip(xs, ys))
+ denom_x = sum((x - mx) ** 2 for x in xs)
+ denom_y = sum((y - my) ** 2 for y in ys)
+ denom = math.sqrt(denom_x * denom_y)
+ return num / denom if denom > 0 else 0.0
+
+ corr_tech = pearson([t.get("tech_score", 0) for t in closed], outcomes)
+ corr_sent = pearson([t.get("sentiment_score", 0) for t in closed], outcomes)
+ corr_lstm = pearson([t.get("lstm_score", 0) for t in closed], outcomes)
+
+ metrics["signal_correlation"] = {
+ "tech": round(corr_tech, 3),
+ "sentiment": round(corr_sent, 3),
+ "lstm": round(corr_lstm, 3)
+ }
+ metrics["best_signal_source"] = max(
+ ["tech", "sentiment", "lstm"],
+ key=lambda k: abs(metrics["signal_correlation"][k])
+ )
+ else:
+ metrics["lstm_direction_accuracy"] = 0.0
+ metrics["signal_correlation"] = {"tech": 0.0, "sentiment": 0.0, "lstm": 0.0}
+ metrics["best_signal_source"] = "unknown"
+
+ metrics["snapshot_count"] = len(snapshots)
+ return metrics
+
+ # ─────────────────────────────────────────
+ # 2. 등급 산출
+ # ─────────────────────────────────────────
+
+ def get_grade(self, metric, value):
+ """지표 이름과 값으로 S/A/B/C/D/F 등급 반환."""
+ # MDD는 낮을수록 좋음
+ if metric == "max_drawdown_pct":
+ thresholds = [(5, "S"), (10, "A"), (15, "B"), (20, "C"), (30, "D")]
+ for threshold, grade in thresholds:
+ if value < threshold:
+ return grade
+ return "F"
+
+ grade_rules = {
+ "sharpe_ratio": [(2.0, "S"), (1.5, "A"), (1.0, "B"), (0.5, "C"), (0.0, "D")],
+ "sortino_ratio": [(3.0, "S"), (2.0, "A"), (1.5, "B"), (1.0, "C"), (0.0, "D")],
+ "win_rate_pct": [(70, "S"), (60, "A"), (50, "B"), (40, "C"), (30, "D")],
+ "profit_factor": [(3.0, "S"), (2.0, "A"), (1.5, "B"), (1.0, "C"), (0.5, "D")],
+ "alpha": [(15, "S"), (10, "A"), (5, "B"), (0, "C"), (-5, "D")],
+ "total_return_pct": [(30, "S"), (20, "A"), (10, "B"), (0, "C"), (-10, "D")],
+ "weekly_return_pct": [(5, "S"), (3, "A"), (1, "B"), (0, "C"), (-1, "D")],
+ "lstm_direction_accuracy":[(70, "S"), (60, "A"), (55, "B"), (50, "C"), (40, "D")],
+ "calmar_ratio": [(3.0, "S"), (2.0, "A"), (1.0, "B"), (0.5, "C"), (0.0, "D")],
+ }
+
+ thresholds = grade_rules.get(metric, [])
+ for threshold, grade in thresholds:
+ if value >= threshold:
+ return grade
+ return "F"
+
+ # ─────────────────────────────────────────
+ # 3. 전문가 패널 (Ollama LLM)
+ # ─────────────────────────────────────────
+
+ def generate_expert_panel(self, metrics):
+ """5명의 전문가 역할로 Ollama에 평가를 요청.
+
+ Returns:
+ list[dict]: [{role, grade, comment, suggestion}, ...]
+ """
+ from modules.services.ollama import OllamaManager
+ ollama = OllamaManager()
+
+ sig_corr = metrics.get("signal_correlation", {})
+ experts = [
+ {
+ "role": "Risk Manager",
+ "focus": "risk level assessment and bankruptcy risk",
+ "data": (
+ f"Sharpe={metrics.get('sharpe_ratio', 0):.2f}, "
+ f"Sortino={metrics.get('sortino_ratio', 0):.2f}, "
+ f"MDD={metrics.get('max_drawdown_pct', 0):.1f}%, "
+ f"Calmar={metrics.get('calmar_ratio', 0):.2f}"
+ )
+ },
+ {
+ "role": "Fund Manager",
+ "focus": "alpha generation vs market benchmark",
+ "data": (
+ f"TotalReturn={metrics.get('total_return_pct', 0):.2f}%, "
+ f"Alpha={metrics.get('alpha', 0):.2f}%, "
+ f"KOSPI={metrics.get('kospi_return_pct', 0):.2f}%, "
+ f"WeeklyReturn={metrics.get('weekly_return_pct', 0):.2f}%"
+ )
+ },
+ {
+ "role": "Quant Analyst",
+ "focus": "AI model validity and signal quality",
+ "data": (
+ f"LSTM_Accuracy={metrics.get('lstm_direction_accuracy', 0):.1f}%, "
+ f"TechCorr={sig_corr.get('tech', 0):.3f}, "
+ f"SentCorr={sig_corr.get('sentiment', 0):.3f}, "
+ f"LSTMCorr={sig_corr.get('lstm', 0):.3f}, "
+ f"BestSignal={metrics.get('best_signal_source', 'N/A')}"
+ )
+ },
+ {
+ "role": "Trader",
+ "focus": "trading strategy effectiveness",
+ "data": (
+ f"WinRate={metrics.get('win_rate_pct', 0):.1f}%, "
+ f"ProfitFactor={metrics.get('profit_factor', 0):.2f}, "
+ f"AvgHolding={metrics.get('avg_holding_days', 0):.1f}days, "
+ f"TotalTrades={metrics.get('total_trades', 0)}"
+ )
+ },
+ {
+ "role": "Portfolio PM",
+ "focus": "overall strategy direction and sustainability",
+ "data": (
+ f"WeeklyReturn={metrics.get('weekly_return_pct', 0):.2f}%, "
+ f"Sharpe={metrics.get('sharpe_ratio', 0):.2f}, "
+ f"WinRate={metrics.get('win_rate_pct', 0):.1f}%, "
+ f"Alpha={metrics.get('alpha', 0):.2f}%, "
+ f"MDD={metrics.get('max_drawdown_pct', 0):.1f}%"
+ )
+ }
+ ]
+
+ results = []
+ for exp in experts:
+ prompt = (
+ f"You are a professional {exp['role']} evaluating an AI stock trading bot. "
+ f"Your focus: {exp['focus']}. "
+ f"Performance data: {exp['data']}. "
+ f"Respond ONLY with valid JSON (no markdown, no extra text): "
+ f"{{\"grade\":\"S|A|B|C|D|F\","
+ f"\"comment\":\"1 sentence evaluation in Korean\","
+ f"\"suggestion\":\"1 sentence improvement tip in Korean\"}}"
+ )
+ try:
+ resp = ollama.request_inference(prompt)
+ if not resp:
+ raise ValueError("Empty response from Ollama")
+ data = json.loads(resp)
+ results.append({
+ "role": exp["role"],
+ "grade": data.get("grade", "C"),
+ "comment": data.get("comment", "(응답 없음)"),
+ "suggestion": data.get("suggestion", "데이터 축적 필요")
+ })
+ except Exception as e:
+ print(f"[Evaluator] Expert panel [{exp['role']}] error: {e}")
+ results.append({
+ "role": exp["role"],
+ "grade": "C",
+ "comment": "평가 데이터가 부족합니다.",
+ "suggestion": "더 많은 거래 데이터 축적 후 재평가를 권장합니다."
+ })
+
+ return results
+
+ # ─────────────────────────────────────────
+ # 4. 주간 보고서 생성
+ # ─────────────────────────────────────────
+
+ def generate_weekly_report(self):
+ """주간 성과 보고서 (텔레그램 HTML 형식) 반환."""
+ snapshots = self.perf_db.load_snapshots(days=7)
+ # 매매 완료 건은 30일치 사용 (주간 거래 수가 적을 수 있음)
+ trades = self.perf_db.load_trades(days=30)
+
+ metrics = self.compute_metrics(snapshots, trades)
+
+ if "error" in metrics:
+ return (
+ f"[주간 성과 평가 보고서]\n"
+ f"⚠️ {metrics['error']}\n"
+ f"매일 오전 09:05~09:15에 스냅샷이 저장됩니다."
+ )
+
+ # 등급 계산
+ g_sharpe = self.get_grade("sharpe_ratio", metrics.get("sharpe_ratio", 0))
+ g_win = self.get_grade("win_rate_pct", metrics.get("win_rate_pct", 0))
+ g_mdd = self.get_grade("max_drawdown_pct", metrics.get("max_drawdown_pct", 0))
+ g_alpha = self.get_grade("alpha", metrics.get("alpha", 0))
+ g_weekly = self.get_grade("weekly_return_pct", metrics.get("weekly_return_pct", 0))
+ g_lstm = self.get_grade("lstm_direction_accuracy",
+ metrics.get("lstm_direction_accuracy", 0))
+
+ # 종합 등급 (Sharpe/Win/MDD/Alpha 평균)
+ grade_map = {"S": 5, "A": 4, "B": 3, "C": 2, "D": 1, "F": 0}
+ grade_rev = {v: k for k, v in grade_map.items()}
+ key_grades = [grade_map[g] for g in [g_sharpe, g_win, g_mdd, g_alpha]]
+ overall_grade = grade_rev[round(sum(key_grades) / len(key_grades))]
+
+ # 전문가 패널 (Ollama 호출)
+ try:
+ experts = self.generate_expert_panel(metrics)
+ except Exception as e:
+ print(f"[Evaluator] Expert panel skipped: {e}")
+ experts = []
+
+ now_str = datetime.now().strftime("%Y/%m/%d %H:%M")
+ corr = metrics.get("signal_correlation", {})
+
+ report = (
+ f"📊 [주간 성과 평가 보고서] {now_str}\n"
+ f"━━━━━━━━━━━━━━━━━━━━━━\n"
+ f"\n■ 수익률\n"
+ f" 주간: {metrics.get('weekly_return_pct', 0):+.2f}% [{g_weekly}]"
+ f" 누적: {metrics.get('total_return_pct', 0):+.2f}%\n"
+ f" Alpha: {metrics.get('alpha', 0):+.2f}% [{g_alpha}]"
+ f" vs KOSPI {metrics.get('kospi_return_pct', 0):+.2f}%\n"
+ f"\n■ 리스크\n"
+ f" Sharpe: {metrics.get('sharpe_ratio', 0):.2f} [{g_sharpe}]"
+ f" Sortino: {metrics.get('sortino_ratio', 0):.2f}\n"
+ f" MDD: {metrics.get('max_drawdown_pct', 0):.1f}% [{g_mdd}]"
+ f" Calmar: {metrics.get('calmar_ratio', 0):.2f}\n"
+ f"\n■ 매매 통계\n"
+ f" 승률: {metrics.get('win_rate_pct', 0):.1f}% [{g_win}]"
+ f" PF: {metrics.get('profit_factor', 0):.2f}\n"
+ f" 평균보유: {metrics.get('avg_holding_days', 0):.1f}일"
+ f" 완료매매: {metrics.get('total_trades', 0)}건\n"
+ f"\n■ AI 품질\n"
+ f" LSTM 방향정확도: {metrics.get('lstm_direction_accuracy', 0):.1f}%"
+ f" [{g_lstm}]\n"
+ f" 신호 상관도 — Tech: {corr.get('tech', 0):.3f}"
+ f" Sent: {corr.get('sentiment', 0):.3f}"
+ f" LSTM: {corr.get('lstm', 0):.3f}\n"
+ f" 최고기여 신호: {metrics.get('best_signal_source', 'N/A')}\n"
+ )
+
+ if experts:
+ role_icons = {
+ "Risk Manager": "🛡",
+ "Fund Manager": "💼",
+ "Quant Analyst": "🧮",
+ "Trader": "📈",
+ "Portfolio PM": "🏦"
+ }
+ report += "\n■ 전문가 패널 의견\n"
+ for exp in experts:
+ icon = role_icons.get(exp["role"], "👤")
+ report += (
+ f"{icon} {exp['role']} [{exp['grade']}]\n"
+ f" {exp['comment']}\n"
+ f" 💡 {exp['suggestion']}\n"
+ )
+
+ report += (
+ f"\n━━━━━━━━━━━━━━━━━━━━━━\n"
+ f"🏆 종합 등급: [{overall_grade}]\n"
+ f"스냅샷 {metrics.get('snapshot_count', 0)}일 | 완료매매 {metrics.get('total_trades', 0)}건 기준"
+ )
+
+ return report
diff --git a/modules/bot.py b/modules/bot.py
index 6ca3689..e2825a1 100644
--- a/modules/bot.py
+++ b/modules/bot.py
@@ -13,6 +13,7 @@ from modules.services.ollama import OllamaManager
from modules.services.telegram import TelegramMessenger
from modules.analysis.macro import MacroAnalyzer
from modules.utils.monitor import SystemMonitor
+from modules.utils.performance_db import PerformanceDB
from modules.strategy.process import analyze_stock_process, calculate_position_size
try:
@@ -47,10 +48,6 @@ class AutoTradingBot:
self.kis = KISClient()
self.news = AsyncNewsCollector()
self.executor = ProcessPoolExecutor(max_workers=1, initializer=init_worker)
- try:
- list(self.executor.map(lambda x: x, range(1)))
- except Exception:
- pass
self.messenger = TelegramMessenger()
self.theme_manager = ThemeManager()
@@ -95,6 +92,12 @@ class AutoTradingBot:
self.history_file = Config.HISTORY_FILE
self.load_trade_history()
+ # 7-1. 성과 DB 및 평가 플래그
+ self.perf_db = PerformanceDB()
+ self.weekly_eval_sent = False
+ self._snapshot_taken_today = False
+ self._pending_evaluate = False
+
# 8. AI 하드웨어 점검
from modules.analysis.deep_learning import PricePredictor
PricePredictor.verify_hardware()
@@ -130,6 +133,49 @@ class AutoTradingBot:
except Exception:
return {}
+ def _take_daily_snapshot(self, macro_status, balance):
+ """일별 자산 스냅샷을 perf_db에 저장 (09:05~09:15 호출)."""
+ try:
+ total_eval_snap = int(balance.get("total_eval", 0))
+ deposit_snap = int(balance.get("deposit", 0))
+ holdings_count_snap = len([
+ h for h in balance.get("holdings", [])
+ if int(h.get("qty", 0)) > 0
+ ])
+
+ # KOSPI 현재가 (macro_status 지표에서 추출)
+ kospi_close = None
+ try:
+ indicators = macro_status.get("indicators", {})
+ kospi_price = float(indicators.get("KOSPI", {}).get("price", 0))
+ if kospi_price > 0:
+ kospi_close = kospi_price
+ except Exception:
+ pass
+
+ self.perf_db.save_daily_snapshot(
+ total_eval_snap, deposit_snap, holdings_count_snap, kospi_close)
+ self._snapshot_taken_today = True
+ except Exception as e:
+ print(f"[Bot] Daily snapshot error: {e}")
+
+ async def _run_weekly_evaluation(self):
+ """주간 성과 평가 실행 후 텔레그램으로 전송."""
+ try:
+ from modules.analysis.evaluator import PerformanceEvaluator
+ evaluator = PerformanceEvaluator()
+ loop = asyncio.get_running_loop()
+ # Ollama 호출이 동기 블로킹이므로 executor에서 실행
+ report = await loop.run_in_executor(None, evaluator.generate_weekly_report)
+ if len(report) > 4000:
+ report = report[:4000] + "\n... (일부 생략)"
+ self.messenger.send_message(report)
+ self.weekly_eval_sent = True
+ print("[Bot] Weekly evaluation report sent.")
+ except Exception as e:
+ print(f"[Bot] Weekly evaluation error: {e}")
+ self.messenger.send_message(f"[Bot] 주간 평가 오류: {e}")
+
def _load_peak_prices(self):
"""트레일링 스탑용 최고가 데이터 로드"""
peak_file = os.path.join(Config.DATA_DIR, "peak_prices.json")
@@ -251,12 +297,20 @@ class AutoTradingBot:
except Exception as e:
self.messenger.send_message(f"Watchlist update failed: {e}")
+ elif command == 'evaluate':
+ self._pending_evaluate = True
+
async def run_cycle(self):
now = datetime.now()
# 0. 명령 큐 폴링
self._process_commands()
+ # 0-1. 즉시 평가 요청 처리 (IPC 'evaluate' 명령)
+ if self._pending_evaluate:
+ self._pending_evaluate = False
+ await self._run_weekly_evaluation()
+
# 1. 거시경제 분석
macro_status = MacroAnalyzer.get_macro_status(self.kis)
self.last_macro_status = macro_status
@@ -316,6 +370,8 @@ class AutoTradingBot:
self.daily_trade_history = []
self.save_trade_history()
self.report_sent = False
+ self.weekly_eval_sent = False
+ self._snapshot_taken_today = False
self.discovered_stocks.clear()
self.watchlist_updated_today = False
# 전일 최고가 초기화 (보유하지 않는 종목)
@@ -328,9 +384,28 @@ class AutoTradingBot:
if not (9 <= now.hour < 15 or (now.hour == 15 and now.minute < 30)):
if now.hour == 15 and now.minute >= 40:
self.send_daily_report()
+ # 일별 스냅샷 (16:00~16:30, 당일 최종 포트폴리오 가치 기록)
+ if now.hour == 16 and now.minute <= 30 and not self._snapshot_taken_today:
+ try:
+ balance_snap = self.kis.get_balance()
+ self._take_daily_snapshot(macro_status, balance_snap)
+ except Exception as e:
+ print(f"[Bot] Snapshot error: {e}")
+ # 주간 평가 (금요일 15:35~15:45, 장 마감 직후)
+ if (now.weekday() == 4 and now.hour == 15
+ and 35 <= now.minute <= 45 and not self.weekly_eval_sent):
+ await self._run_weekly_evaluation()
+ # 장 외 시간에는 서킷 브레이커도 리셋
+ self.monitor.reset_circuit()
print("[Bot] Market Closed. Waiting...")
return
+ # [서킷 브레이커] CPU 과부하 시 분석 사이클 일시 중단
+ if self.monitor.is_cpu_critical():
+ print("[Bot] ⛔ CPU Circuit Breaker 발동 중. 분석 사이클 스킵.")
+ return
+
+ cycle_start_time = time.time()
print(f"[Bot] Cycle Start: {now.strftime('%H:%M:%S')}")
# 7. 종목 분석 및 매매
@@ -365,14 +440,30 @@ class AutoTradingBot:
tracking_deposit = int(balance.get("deposit", 0))
+ # [v3.0] 비동기 OHLCV + 투자자 동향 배치 조회
+ tickers_list = list(target_dict.keys())
+ ohlcv_batch = {}
+ investor_batch = {}
+
+ if self.kis_async and tickers_list:
+ try:
+ print(f"[Bot] 비동기 OHLCV 배치 조회: {len(tickers_list)}종목")
+ ohlcv_batch = await self.kis_async.get_daily_ohlcv_batch(tickers_list)
+ investor_batch = await self.kis_async.get_investor_trends_batch(tickers_list)
+ except Exception as e:
+ print(f"[Bot] 비동기 배치 조회 실패: {e} -> 동기 fallback")
+ ohlcv_batch = {}
+ investor_batch = {}
+
try:
for ticker, name in target_dict.items():
- prices = self.kis.get_daily_price(ticker)
- if not prices:
+ # OHLCV 데이터 획득 (배치 결과 우선, 실패 시 동기 fallback)
+ ohlcv_data = ohlcv_batch.get(ticker)
+ if not ohlcv_data or not ohlcv_data.get('close'):
+ ohlcv_data = self.kis.get_daily_ohlcv(ticker)
+ if not ohlcv_data or not ohlcv_data.get('close'):
continue
- investor_trend = self.kis.get_investor_trend(ticker)
-
# [v2.0] 보유 정보 전달 (분석 워커에서 동적 손절/익절 사용)
holding_info = None
if ticker in current_holdings:
@@ -385,16 +476,22 @@ class AutoTradingBot:
'peak_price': self.peak_prices.get(ticker, float(h.get('current_price', 0)))
}
+ # investor_trend fallback
+ investor_trend = investor_batch.get(ticker)
+ if investor_trend is None:
+ investor_trend = self.kis.get_investor_trend(ticker)
+
future = self.executor.submit(
- analyze_stock_process, ticker, prices, news_data,
+ analyze_stock_process, ticker, ohlcv_data, news_data,
investor_trend, macro_status, holding_info)
analysis_tasks.append(future)
# 결과 처리
- loop = asyncio.get_event_loop()
+ loop = asyncio.get_running_loop()
for future in analysis_tasks:
try:
- res = await loop.run_in_executor(None, future.result)
+ # 240초 타임아웃: LSTM 학습 + Ollama 추론 시간 고려
+ res = await loop.run_in_executor(None, lambda f=future: f.result(240))
ticker = res['ticker']
ticker_name = target_dict.get(ticker, 'Unknown')
print(f"[Bot] [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})"
@@ -458,6 +555,24 @@ class AutoTradingBot:
"reason": reason
})
self.save_trade_history()
+
+ # 성과 DB 기록
+ pred = res.get("prediction") or {}
+ self.perf_db.save_trade_record(
+ action="BUY", ticker=ticker, name=ticker_name,
+ qty=qty, price=current_price,
+ scores_dict={
+ "tech": res.get("tech", 0.0),
+ "sentiment": res.get("sentiment", 0.0),
+ "lstm_score": res.get("lstm_score", 0.0),
+ "score": res.get("score", 0.0),
+ "ai_confidence": res.get("ai_confidence", 0.5),
+ "prediction_change": pred.get("change_rate", 0.0)
+ },
+ reason=reason,
+ macro_state=macro_status.get("status", "SAFE")
+ )
+
tracking_deposit -= required_amount
# 최고가 초기 설정
@@ -484,14 +599,18 @@ class AutoTradingBot:
f" Reason: {reason}")
self.messenger.send_message(msg)
+ sell_price = float(h.get('current_price', 0))
self.daily_trade_history.append({
"action": "SELL", "name": ticker_name,
- "qty": qty, "price": float(h.get('current_price', 0)),
+ "qty": qty, "price": sell_price,
"yield": yld, "profit": profit_loss,
"reason": reason
})
self.save_trade_history()
+ # 성과 DB 매도 결과 기록
+ self.perf_db.close_trade(ticker, sell_price, yld)
+
# 최고가 기록 삭제
if ticker in self.peak_prices:
del self.peak_prices[ticker]
@@ -510,11 +629,21 @@ class AutoTradingBot:
except Exception as e:
print(f"[Bot] Cycle Loop Error: {e}")
+ # 사이클 소요시간 로깅 (120초 초과 시 경고)
+ cycle_elapsed = time.time() - cycle_start_time
+ if cycle_elapsed > 120:
+ print(f"[Bot] ⚠️ 사이클 소요 {cycle_elapsed:.0f}초 (120초 초과) → LSTM 쿨다운 활성화 권장")
+ else:
+ print(f"[Bot] Cycle Done: {cycle_elapsed:.1f}초")
+
def loop(self):
- print(f"[Bot] Module Started (PID: {os.getpid()}) [v2.0]")
+ print(f"[Bot] Module Started (PID: {os.getpid()}) [v3.0]")
self.messenger.send_message(
- "🚀 [Bot Started v2.0]\n"
- "개선사항: 동적 손절/익절, 트레일링 스탑, 포지션 사이징, 분석 기반 매도")
+ "🚀 [Bot Started v3.0]\n"
+ f"✅ LSTM 쿨다운: {Config.LSTM_COOLDOWN//60}분\n"
+ f"✅ AI 모델: {Config.OLLAMA_MODEL}\n"
+ f"✅ CPU 서킷브레이커: {Config.CPU_CIRCUIT_BREAKER_THRESHOLD}% 기준\n"
+ "✅ 동적 손절/익절, 트레일링 스탑, 포지션 사이징")
# 최고가 데이터 로드
self._load_peak_prices()
diff --git a/modules/services/telegram_bot/server.py b/modules/services/telegram_bot/server.py
index d4c1397..5953c1c 100644
--- a/modules/services/telegram_bot/server.py
+++ b/modules/services/telegram_bot/server.py
@@ -7,9 +7,17 @@ import logging
import subprocess
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
+
+# [디버깅] 파일 로깅 추가
+log_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))),
+ "telegram_bot.log")
+file_handler = logging.FileHandler(log_file, encoding='utf-8')
+file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
+
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- level=logging.INFO
+ level=logging.INFO,
+ handlers=[logging.StreamHandler(), file_handler]
)
logging.getLogger("httpx").setLevel(logging.WARNING)
@@ -42,6 +50,7 @@ class TelegramBotServer:
return self.bot_instance is not None
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ logging.info(f"[Command] /start from user {update.effective_user.id}")
await update.message.reply_text(
"AI Trading Bot Command Center\n"
"명령어 목록:\n"
@@ -51,7 +60,8 @@ class TelegramBotServer:
"/update_watchlist - Watchlist 즉시 업데이트\n"
"/macro - 거시경제 지표 및 시장 위험도\n"
"/system - PC 리소스(CPU/GPU) 상태\n"
- "/ai - AI 모델 학습 상태 조회\n\n"
+ "/ai - AI 모델 학습 상태 조회\n"
+ "/evaluate - 즉시 성과 평가 보고서 생성\n\n"
"[관리 명령어]\n"
"/restart - 메인 봇 재시작 요청\n"
"/exec 명령어 - 원격 명령어 실행\n"
@@ -60,6 +70,7 @@ class TelegramBotServer:
)
async def status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ logging.info(f"[Command] /status from user {update.effective_user.id}")
if not self.refresh_bot_instance():
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
@@ -172,34 +183,67 @@ class TelegramBotServer:
await update.message.reply_text("데이터가 아직 수집되지 않았습니다.")
return
- status = "SAFE"
- msi = indices.get('MSI', 0)
+ msi = float(indices.get('MSI', 0))
if msi >= 50:
- status = "DANGER"
+ risk_status = "🔴 DANGER"
+ risk_desc = "시장 극도 불안정 - 매수 중단 권고"
elif msi >= 30:
- status = "CAUTION"
+ risk_status = "🟡 CAUTION"
+ risk_desc = "시장 불안정 - 보수적 매매 권고"
+ else:
+ risk_status = "🟢 SAFE"
+ risk_desc = "시장 안정 - 정상 매매 가능"
- msg = f"Market Risk: {status}\n\n"
+ from datetime import datetime
+ now_str = datetime.now().strftime("%m/%d %H:%M")
- if 'MSI' in indices:
- msg += f"Stress Index: {indices['MSI']}\n"
+ msg = f"거시경제 지표 {now_str}\n"
+ msg += f"━━━━━━━━━━━━━━━━━━\n"
+ msg += f"Market Risk: {risk_status}\n"
+ msg += f"{risk_desc}\n\n"
- for k, v in indices.items():
- if k != "MSI":
- change = float(v.get('change', 0))
- price = v.get('price', 0)
-
- if change > 0:
- icon = "🔴"
- chg_str = f"+{change}"
- elif change < 0:
- icon = "🔵"
- chg_str = f"{change}"
- else:
- icon = "⚪"
- chg_str = f"{change}"
-
- msg += f"{icon} {k}: {price} ({chg_str}%)\n"
+ # MSI 상세
+ msi_bar = "█" * int(msi / 10) + "░" * (10 - int(msi / 10))
+ msg += f"Stress Index (MSI): {msi:.1f}/100\n"
+ msg += f"[{msi_bar}]\n\n"
+
+ # 지수 상세
+ index_order = ["KOSPI", "KOSDAQ", "KOSPI200"]
+ for k in index_order:
+ if k not in indices:
+ continue
+ v = indices[k]
+ price = float(v.get('price', 0))
+ change = float(v.get('change', 0))
+ change_val = float(v.get('change_val', 0))
+ high = float(v.get('high', 0))
+ low = float(v.get('low', 0))
+ prev_close = float(v.get('prev_close', 0))
+ volume = int(v.get('volume', 0))
+
+ if price == 0:
+ msg += f"⚫ {k}: 데이터 없음 (장 마감 후)\n\n"
+ continue
+
+ if change > 0:
+ icon = "🔴"
+ chg_str = f"+{change:.2f}% (+{change_val:.2f}pt)"
+ elif change < 0:
+ icon = "🔵"
+ chg_str = f"{change:.2f}% ({change_val:.2f}pt)"
+ else:
+ icon = "⚪"
+ chg_str = f"{change:.2f}%"
+
+ msg += f"{icon} {k}: {price:,.2f} {chg_str}\n"
+ if high and low:
+ msg += f" 고: {high:,.2f} 저: {low:,.2f}"
+ if prev_close:
+ msg += f" 전일종가: {prev_close:,.2f}"
+ msg += "\n"
+ if volume:
+ msg += f" 거래량: {volume:,}천주\n"
+ msg += "\n"
await update.message.reply_text(msg, parse_mode="HTML")
@@ -256,10 +300,11 @@ class TelegramBotServer:
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
+ from modules.config import Config
gpu = self.bot_instance.ollama_monitor.get_gpu_status()
msg = "AI Model Status\n"
- msg += "* LLM Engine: Ollama (Llama 3.1)\n"
+ msg += f"* LLM Engine: Ollama ({Config.OLLAMA_MODEL})\n"
msg += f"* Device: {gpu.get('name', 'GPU')}\n"
if gpu:
@@ -349,6 +394,29 @@ class TelegramBotServer:
except Exception as e:
await update.message.reply_text(f"실행 오류: {e}")
+ async def evaluate_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """/evaluate: 즉시 성과 평가 보고서 생성 (LLM 분석 포함)"""
+ await update.message.reply_text(
+ "📊 성과 평가를 실행합니다...\n"
+ "LLM 전문가 패널 분석 포함 시 30초~1분 소요됩니다.",
+ parse_mode="HTML"
+ )
+ try:
+ from modules.utils.performance_db import PerformanceDB
+ from modules.analysis.evaluator import PerformanceEvaluator
+
+ evaluator = PerformanceEvaluator()
+ loop = asyncio.get_running_loop()
+ report = await loop.run_in_executor(None, evaluator.generate_weekly_report)
+
+ if len(report) > 4000:
+ report = report[:4000] + "\n... (일부 생략)"
+
+ await update.message.reply_text(report, parse_mode="HTML")
+ except Exception as e:
+ logging.error(f"[Command] /evaluate error: {e}")
+ await update.message.reply_text(f"평가 오류: {e}")
+
def run(self):
handlers = [
("start", self.start_command),
@@ -359,6 +427,7 @@ class TelegramBotServer:
("macro", self.macro_command),
("system", self.system_command),
("ai", self.ai_status_command),
+ ("evaluate", self.evaluate_command),
("restart", self.restart_command),
("stop", self.stop_command),
("exec", self.exec_command)
@@ -377,6 +446,7 @@ class TelegramBotServer:
self.application.add_error_handler(error_handler)
+ logging.info("[Telegram] Command Server Started (Shared Memory IPC Mode).")
print("[Telegram] Command Server Started (Shared Memory IPC Mode).")
try:
diff --git a/modules/utils/performance_db.py b/modules/utils/performance_db.py
new file mode 100644
index 0000000..f1cf3c0
--- /dev/null
+++ b/modules/utils/performance_db.py
@@ -0,0 +1,211 @@
+"""
+성과 데이터 영구 저장 - PerformanceDB
+
+데이터 파일:
+ data/performance/daily_snapshots.json - 일별 자산 스냅샷
+ data/performance/trade_records.json - 강화 매매 기록 (영구 보관)
+"""
+import os
+import json
+from datetime import datetime, timedelta
+
+from modules.config import Config
+
+PERF_DIR = os.path.join(Config.DATA_DIR, "performance")
+SNAPSHOTS_FILE = os.path.join(PERF_DIR, "daily_snapshots.json")
+TRADES_FILE = os.path.join(PERF_DIR, "trade_records.json")
+
+
+class PerformanceDB:
+ def __init__(self):
+ os.makedirs(PERF_DIR, exist_ok=True)
+ self._snapshots = self._load_json(SNAPSHOTS_FILE, [])
+ self._trades = self._load_json(TRADES_FILE, [])
+
+ def _load_json(self, path, default):
+ if os.path.exists(path):
+ try:
+ with open(path, "r", encoding="utf-8") as f:
+ return json.load(f)
+ except Exception as e:
+ print(f"[PerformanceDB] Load failed {path}: {e}")
+ return default
+ return default
+
+ def _save_json(self, path, data):
+ try:
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump(data, f, ensure_ascii=False, indent=2)
+ except Exception as e:
+ print(f"[PerformanceDB] Save failed {path}: {e}")
+
+ # ─────────────────────────────────────────
+ # 일별 스냅샷
+ # ─────────────────────────────────────────
+
+ def save_daily_snapshot(self, total_eval, deposit, holdings_count, benchmark_close=None):
+ """일별 자산 스냅샷 저장 (하루 1회 호출 권장).
+
+ Args:
+ total_eval (int): 총 평가액 (원)
+ deposit (int): 예수금 (원)
+ holdings_count (int): 보유 종목 수
+ benchmark_close (float|None): KOSPI 현재가 (벤치마크 비교용)
+ """
+ today = datetime.now().strftime("%Y-%m-%d")
+
+ # 오늘 이미 저장된 스냅샷이 있으면 업데이트
+ for snap in self._snapshots:
+ if snap.get("date") == today:
+ snap["total_eval"] = total_eval
+ snap["deposit"] = deposit
+ snap["holdings_count"] = holdings_count
+ if benchmark_close is not None:
+ snap["benchmark_kospi_close"] = benchmark_close
+ self._save_json(SNAPSHOTS_FILE, self._snapshots)
+ return
+
+ # 일별/누적 수익률 계산
+ daily_return_pct = 0.0
+ cumulative_return_pct = 0.0
+
+ if self._snapshots:
+ prev_eval = self._snapshots[-1].get("total_eval", 0)
+ if prev_eval > 0:
+ daily_return_pct = (total_eval - prev_eval) / prev_eval * 100
+
+ initial_capital = self.get_initial_capital()
+ if initial_capital and initial_capital > 0:
+ cumulative_return_pct = (total_eval - initial_capital) / initial_capital * 100
+
+ snap = {
+ "date": today,
+ "total_eval": total_eval,
+ "deposit": deposit,
+ "holdings_count": holdings_count,
+ "benchmark_kospi_close": benchmark_close,
+ "daily_return_pct": round(daily_return_pct, 4),
+ "cumulative_return_pct": round(cumulative_return_pct, 4)
+ }
+ self._snapshots.append(snap)
+ self._save_json(SNAPSHOTS_FILE, self._snapshots)
+ print(f"[PerformanceDB] Snapshot saved: {today} "
+ f"total={total_eval:,}원 daily={daily_return_pct:+.2f}%")
+
+ # ─────────────────────────────────────────
+ # 매매 기록
+ # ─────────────────────────────────────────
+
+ def save_trade_record(self, action, ticker, name, qty, price,
+ scores_dict=None, reason="", macro_state="SAFE"):
+ """매수/매도 기록 저장.
+
+ Args:
+ action (str): "BUY" | "SELL"
+ ticker (str): 종목 코드
+ name (str): 종목명
+ qty (int): 수량
+ price (float): 체결가
+ scores_dict (dict|None): 분석 점수 딕셔너리
+ {tech, sentiment, lstm_score, score, ai_confidence, prediction_change}
+ reason (str): 매매 사유
+ macro_state (str): 매크로 상태 ("SAFE"/"CAUTION"/"DANGER")
+ """
+ sd = scores_dict or {}
+ now_iso = datetime.now().isoformat()
+
+ trade = {
+ "id": f"{ticker}_{now_iso}",
+ "action": action,
+ "ticker": ticker,
+ "name": name,
+ "qty": qty,
+ "price": price,
+ "timestamp": now_iso,
+ "reason": reason,
+ "macro_state": macro_state,
+ # 점수 (BUY 시에만 의미 있음)
+ "tech_score": float(sd.get("tech", 0.0)),
+ "sentiment_score": float(sd.get("sentiment", 0.0)),
+ "lstm_score": float(sd.get("lstm_score", 0.0)),
+ "total_score": float(sd.get("score", 0.0)),
+ "ai_confidence": float(sd.get("ai_confidence", 0.5)),
+ "ai_prediction_change": float(sd.get("prediction_change", 0.0)),
+ # 매도 후 채워지는 결과 필드
+ "outcome_return_pct": None,
+ "holding_days": None,
+ "closed_at": None
+ }
+ self._trades.append(trade)
+ self._save_json(TRADES_FILE, self._trades)
+
+ def close_trade(self, ticker, sell_price, sell_yield_pct=None):
+ """가장 최근 미체결 BUY를 찾아 매도 결과를 기록.
+
+ Args:
+ ticker (str): 종목 코드
+ sell_price (float): 매도 체결가
+ sell_yield_pct (float|None): KIS에서 받은 수익률 (보조용)
+ """
+ for trade in reversed(self._trades):
+ if (trade.get("ticker") == ticker
+ and trade.get("action") == "BUY"
+ and trade.get("outcome_return_pct") is None):
+
+ buy_price = trade.get("price", 0)
+ if buy_price and buy_price > 0:
+ outcome_return_pct = (sell_price - buy_price) / buy_price * 100
+ elif sell_yield_pct is not None:
+ outcome_return_pct = sell_yield_pct
+ else:
+ outcome_return_pct = 0.0
+
+ # 보유일 계산
+ holding_days = 0
+ buy_ts = trade.get("timestamp", "")
+ if buy_ts:
+ try:
+ buy_dt = datetime.fromisoformat(buy_ts)
+ holding_days = (datetime.now() - buy_dt).days
+ except Exception:
+ pass
+
+ trade["outcome_return_pct"] = round(outcome_return_pct, 4)
+ trade["holding_days"] = holding_days
+ trade["closed_at"] = datetime.now().isoformat()
+ self._save_json(TRADES_FILE, self._trades)
+ print(f"[PerformanceDB] Trade closed: {ticker} "
+ f"return={outcome_return_pct:.2f}% holding={holding_days}d")
+ return
+
+ print(f"[PerformanceDB] No open BUY found for {ticker}")
+
+ # ─────────────────────────────────────────
+ # 조회
+ # ─────────────────────────────────────────
+
+ def load_snapshots(self, days=90):
+ """최근 N일 스냅샷 반환."""
+ cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
+ return [s for s in self._snapshots if s.get("date", "") >= cutoff]
+
+ def load_trades(self, days=90):
+ """최근 N일 매매 기록 반환."""
+ cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
+ return [t for t in self._trades if t.get("timestamp", "")[:10] >= cutoff]
+
+ def get_initial_capital(self):
+ """첫 스냅샷 기준 초기 자본 반환."""
+ if self._snapshots:
+ return self._snapshots[0].get("total_eval", 0)
+ return 0
+
+ def get_summary(self):
+ """간단한 현황 딕셔너리 반환 (디버깅용)."""
+ return {
+ "total_snapshots": len(self._snapshots),
+ "total_trades": len(self._trades),
+ "closed_trades": sum(1 for t in self._trades
+ if t.get("outcome_return_pct") is not None),
+ "initial_capital": self.get_initial_capital()
+ }