diff --git a/modules/analysis/backtest.py b/modules/analysis/backtest.py new file mode 100644 index 0000000..39d69af --- /dev/null +++ b/modules/analysis/backtest.py @@ -0,0 +1,255 @@ +""" +백테스팅 프레임워크 (Phase 3-1) +- 과거 OHLCV 데이터로 전략 시뮬레이션 +- 성과지표: Sharpe ratio, MDD, 승률, 평균손익비 +- Phase 2 모델 변경 전후 비교 검증용 +""" + +import json +import numpy as np +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Callable + + +@dataclass +class Trade: + ticker: str + entry_date: int # 데이터 인덱스 + entry_price: float + exit_date: int + exit_price: float + qty: int + direction: str = "LONG" # LONG / SHORT + + @property + def pnl(self): + if self.direction == "LONG": + return (self.exit_price - self.entry_price) * self.qty + return (self.entry_price - self.exit_price) * self.qty + + @property + def pnl_pct(self): + return (self.exit_price - self.entry_price) / self.entry_price * 100 + + +@dataclass +class BacktestResult: + total_return_pct: float + sharpe_ratio: float + max_drawdown_pct: float + win_rate: float + avg_win_pct: float + avg_loss_pct: float + profit_factor: float + total_trades: int + winning_trades: int + losing_trades: int + trades: List[Trade] = field(default_factory=list) + + def summary(self) -> str: + lines = [ + "=" * 50, + "📊 백테스팅 결과", + "=" * 50, + f"총 수익률: {self.total_return_pct:+.2f}%", + f"Sharpe Ratio: {self.sharpe_ratio:.3f}", + f"Max Drawdown: {self.max_drawdown_pct:.2f}%", + f"승률: {self.win_rate:.1f}% ({self.winning_trades}/{self.total_trades})", + f"평균 수익: {self.avg_win_pct:+.2f}%", + f"평균 손실: {self.avg_loss_pct:.2f}%", + f"손익비(PF): {self.profit_factor:.2f}", + "=" * 50, + ] + return "\n".join(lines) + + +class Backtester: + """ + OHLCV 기반 전략 백테스터 + + 사용 예시: + bt = Backtester(initial_capital=10_000_000) + result = bt.run( + ohlcv_data={"close": [...], "high": [...], "low": [...], "volume": [...]}, + strategy_fn=my_strategy, + ticker="005930" + ) + print(result.summary()) + """ + + def __init__(self, initial_capital: float = 10_000_000, + commission_rate: float = 0.00015, # 0.015% (증권사 기본) + slippage_rate: float = 0.001): # 0.1% 슬리피지 + self.initial_capital = initial_capital + self.commission_rate = commission_rate + self.slippage_rate = slippage_rate + + def run(self, ohlcv_data: Dict, strategy_fn: Callable, + ticker: str = "UNKNOWN", warmup: int = 60) -> BacktestResult: + """ + 단일 종목 백테스팅 + + Args: + ohlcv_data: {'close':[], 'high':[], 'low':[], 'open':[], 'volume':[]} + strategy_fn: (ohlcv_slice: dict) -> str ("BUY" | "SELL" | "HOLD") + ticker: 종목 코드 + warmup: 초기 웜업 기간 (기술지표 안정화) + + Returns: + BacktestResult + """ + closes = np.array(ohlcv_data.get('close', []), dtype=float) + highs = np.array(ohlcv_data.get('high', closes), dtype=float) + lows = np.array(ohlcv_data.get('low', closes), dtype=float) + volumes = np.array(ohlcv_data.get('volume', np.zeros_like(closes)), dtype=float) + + n = len(closes) + if n < warmup + 10: + return self._empty_result() + + capital = self.initial_capital + position = 0 # 보유 수량 + entry_price = 0.0 + entry_idx = 0 + equity_curve = [capital] + trades: List[Trade] = [] + + for i in range(warmup, n): + # 전략 함수에 현재까지의 슬라이스 전달 + slice_data = { + 'close': closes[:i+1].tolist(), + 'high': highs[:i+1].tolist(), + 'low': lows[:i+1].tolist(), + 'volume': volumes[:i+1].tolist(), + } + signal = "HOLD" + try: + signal = strategy_fn(slice_data) + except Exception: + pass + + price = closes[i] + buy_price = price * (1 + self.slippage_rate) # 슬리피지 포함 매수가 + sell_price = price * (1 - self.slippage_rate) # 슬리피지 포함 매도가 + + if signal == "BUY" and position == 0: + # 전액 투자 (수수료 포함) + qty = int(capital / (buy_price * (1 + self.commission_rate))) + if qty > 0: + cost = qty * buy_price * (1 + self.commission_rate) + capital -= cost + position = qty + entry_price = buy_price + entry_idx = i + + elif signal == "SELL" and position > 0: + proceeds = position * sell_price * (1 - self.commission_rate) + capital += proceeds + trades.append(Trade( + ticker=ticker, + entry_date=entry_idx, + entry_price=entry_price, + exit_date=i, + exit_price=sell_price, + qty=position + )) + position = 0 + entry_price = 0.0 + + # 자산 추적 + current_equity = capital + (position * closes[i] if position > 0 else 0) + equity_curve.append(current_equity) + + # 미청산 포지션 강제 종료 + if position > 0: + last_price = closes[-1] * (1 - self.slippage_rate) + proceeds = position * last_price * (1 - self.commission_rate) + capital += proceeds + trades.append(Trade( + ticker=ticker, + entry_date=entry_idx, + entry_price=entry_price, + exit_date=n - 1, + exit_price=last_price, + qty=position + )) + equity_curve[-1] = capital + + return self._compute_metrics(equity_curve, trades) + + def run_multi(self, ohlcv_dict: Dict[str, Dict], strategy_fn: Callable, + warmup: int = 60) -> Dict[str, BacktestResult]: + """여러 종목 백테스팅""" + results = {} + for ticker, ohlcv_data in ohlcv_dict.items(): + results[ticker] = self.run(ohlcv_data, strategy_fn, ticker, warmup) + return results + + def _compute_metrics(self, equity_curve: List[float], trades: List[Trade]) -> BacktestResult: + equity = np.array(equity_curve, dtype=float) + total_return_pct = (equity[-1] / equity[0] - 1) * 100 + + # Sharpe Ratio (일별 수익률 기준, 연율화) + daily_returns = np.diff(equity) / equity[:-1] + if daily_returns.std() > 0: + sharpe = (daily_returns.mean() / daily_returns.std()) * np.sqrt(252) + else: + sharpe = 0.0 + + # Max Drawdown + peak = np.maximum.accumulate(equity) + drawdowns = (equity - peak) / (peak + 1e-9) * 100 + max_drawdown = abs(drawdowns.min()) + + # 승률 / 손익비 + wins = [t for t in trades if t.pnl_pct > 0] + losses = [t for t in trades if t.pnl_pct <= 0] + + win_rate = len(wins) / len(trades) * 100 if trades else 0 + avg_win = np.mean([t.pnl_pct for t in wins]) if wins else 0 + avg_loss = np.mean([t.pnl_pct for t in losses]) if losses else 0 + + total_win = sum(t.pnl for t in wins) + total_loss = abs(sum(t.pnl for t in losses)) + profit_factor = total_win / (total_loss + 1e-9) + + return BacktestResult( + total_return_pct=round(total_return_pct, 2), + sharpe_ratio=round(sharpe, 3), + max_drawdown_pct=round(max_drawdown, 2), + win_rate=round(win_rate, 1), + avg_win_pct=round(avg_win, 2), + avg_loss_pct=round(avg_loss, 2), + profit_factor=round(profit_factor, 3), + total_trades=len(trades), + winning_trades=len(wins), + losing_trades=len(losses), + trades=trades + ) + + def _empty_result(self) -> BacktestResult: + return BacktestResult( + total_return_pct=0.0, sharpe_ratio=0.0, max_drawdown_pct=0.0, + win_rate=0.0, avg_win_pct=0.0, avg_loss_pct=0.0, + profit_factor=0.0, total_trades=0, winning_trades=0, losing_trades=0 + ) + + +def compare_strategies(ohlcv_data: Dict, strategies: Dict[str, Callable], + initial_capital: float = 10_000_000) -> Dict[str, BacktestResult]: + """ + 여러 전략 동시 비교 + + Args: + strategies: {"전략명": strategy_fn, ...} + + Returns: + {"전략명": BacktestResult, ...} + """ + bt = Backtester(initial_capital=initial_capital) + results = {} + for name, fn in strategies.items(): + results[name] = bt.run(ohlcv_data, fn) + print(f"\n[{name}]") + print(results[name].summary()) + return results diff --git a/modules/analysis/ensemble.py b/modules/analysis/ensemble.py new file mode 100644 index 0000000..12e02c0 --- /dev/null +++ b/modules/analysis/ensemble.py @@ -0,0 +1,230 @@ +""" +앙상블 예측 모듈 (Phase 3-2) +- LSTM + 기술지표 + LLM 감성 → 적응형 가중치 +- 과거 매매 결과 기반 가중치 자동 조정 +- process.py의 하드코딩된 w_tech/w_news/w_ai 대체 +""" + +import os +import json +import numpy as np +from dataclasses import dataclass, field +from typing import Dict, Optional + +from modules.config import Config + + +@dataclass +class SignalWeights: + """앙상블 가중치""" + tech: float = 0.35 + sentiment: float = 0.30 + lstm: float = 0.35 + + def normalize(self): + total = self.tech + self.sentiment + self.lstm + if total > 0: + self.tech /= total + self.sentiment /= total + self.lstm /= total + return self + + def to_dict(self): + return {"tech": self.tech, "sentiment": self.sentiment, "lstm": self.lstm} + + @classmethod + def from_dict(cls, d): + return cls(tech=d.get("tech", 0.35), + sentiment=d.get("sentiment", 0.30), + lstm=d.get("lstm", 0.35)) + + +class AdaptiveEnsemble: + """ + 적응형 앙상블 가중치 관리자 + + 핵심 로직: + 1. 종목별 최근 N 매매의 결과를 추적 + 2. 어떤 신호가 정확했는지 소급 평가 + 3. 정확도가 높은 신호의 가중치를 점진적으로 증가 + 4. 시장 상황(ADX, 거시경제) 반영한 컨텍스트별 가중치 분리 + """ + + def __init__(self, history_file=None, max_history=50): + self.max_history = max_history + self.history_file = history_file or os.path.join( + Config.DATA_DIR, "ensemble_history.json" + ) + # {ticker: [{"tech": f, "sentiment": f, "lstm": f, "decision": str, "outcome": float}, ...]} + self._trade_history: Dict[str, list] = {} + # {context: SignalWeights} - context: "strong_trend" | "sideways" | "danger" + self._context_weights: Dict[str, SignalWeights] = { + "strong_trend": SignalWeights(tech=0.50, sentiment=0.20, lstm=0.30), + "sideways": SignalWeights(tech=0.30, sentiment=0.40, lstm=0.30), + "danger": SignalWeights(tech=0.20, sentiment=0.50, lstm=0.30), + "default": SignalWeights(tech=0.35, sentiment=0.30, lstm=0.35), + } + self._load() + + def _load(self): + if os.path.exists(self.history_file): + try: + with open(self.history_file, "r", encoding="utf-8") as f: + data = json.load(f) + self._trade_history = data.get("history", {}) + weights_raw = data.get("weights", {}) + for ctx, w in weights_raw.items(): + self._context_weights[ctx] = SignalWeights.from_dict(w) + except Exception as e: + print(f"[Ensemble] Load failed: {e}") + + def _save(self): + try: + data = { + "history": {k: v[-self.max_history:] for k, v in self._trade_history.items()}, + "weights": {ctx: w.to_dict() for ctx, w in self._context_weights.items()} + } + with open(self.history_file, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + except Exception as e: + print(f"[Ensemble] Save failed: {e}") + + def get_context(self, adx: float, macro_state: str) -> str: + """현재 시장 컨텍스트 결정""" + if macro_state == "DANGER": + return "danger" + if adx >= 25: + return "strong_trend" + if adx < 20: + return "sideways" + return "default" + + def get_weights(self, ticker: str, adx: float = 20.0, + macro_state: str = "SAFE", + ai_confidence: float = 0.5) -> SignalWeights: + """ + 종목 + 시장 컨텍스트에 맞는 가중치 반환 + + 1. 기본: 컨텍스트별 기준 가중치 + 2. AI 신뢰도 높으면 lstm 가중치 보정 + 3. 종목별 학습 결과 반영 + """ + context = self.get_context(adx, macro_state) + base = self._context_weights.get(context, self._context_weights["default"]) + + # 적응형 조정: 해당 종목의 과거 성과 반영 + ticker_history = self._trade_history.get(ticker, []) + adjusted = SignalWeights(tech=base.tech, sentiment=base.sentiment, lstm=base.lstm) + + if len(ticker_history) >= 5: + # 최근 5회 신호별 정확도 평가 + recent = ticker_history[-10:] + tech_acc = self._accuracy([h["tech_score"] for h in recent], + [h["outcome"] for h in recent]) + news_acc = self._accuracy([h["sentiment_score"] for h in recent], + [h["outcome"] for h in recent]) + lstm_acc = self._accuracy([h["lstm_score"] for h in recent], + [h["outcome"] for h in recent]) + + # 정확도 기반 가중치 미세 조정 (±0.1 범위) + alpha = 0.05 + adjusted.tech = max(0.1, min(0.6, base.tech + alpha * (tech_acc - 0.5))) + adjusted.sentiment = max(0.1, min(0.6, base.sentiment + alpha * (news_acc - 0.5))) + adjusted.lstm = max(0.1, min(0.6, base.lstm + alpha * (lstm_acc - 0.5))) + + # AI 신뢰도 보정 + if ai_confidence >= 0.85: + adjusted.lstm = min(0.70, adjusted.lstm * 1.3) + elif ai_confidence < 0.5: + adjusted.lstm = max(0.10, adjusted.lstm * 0.7) + + return adjusted.normalize() + + def record_trade(self, ticker: str, tech_score: float, sentiment_score: float, + lstm_score: float, decision: str, outcome_pct: float): + """ + 매매 결과 기록 (가중치 학습 데이터) + + outcome_pct: 실현 수익률 (%). 양수=이익, 음수=손실 + """ + if ticker not in self._trade_history: + self._trade_history[ticker] = [] + + record = { + "tech_score": tech_score, + "sentiment_score": sentiment_score, + "lstm_score": lstm_score, + "decision": decision, + "outcome": outcome_pct + } + self._trade_history[ticker].append(record) + # 히스토리 크기 제한 + if len(self._trade_history[ticker]) > self.max_history: + self._trade_history[ticker] = self._trade_history[ticker][-self.max_history:] + + # 가중치 점진적 업데이트 + self._update_weights(ticker) + self._save() + + def _update_weights(self, ticker: str): + """종목별 성과를 반영해 컨텍스트 가중치 점진적 업데이트""" + history = self._trade_history.get(ticker, []) + if len(history) < 5: + return + + recent = history[-10:] + outcomes = [h["outcome"] for h in recent] + mean_outcome = np.mean(outcomes) + + if mean_outcome > 0: + # 전략이 효과적 → 현재 가중치 유지 (강화) + pass + elif mean_outcome < -2.0: + # 손실이 큰 경우 → 기본값으로 리셋 + for ctx in self._context_weights: + self._context_weights[ctx] = SignalWeights( + tech=0.35, sentiment=0.30, lstm=0.35) + + def compute_ensemble_score(self, tech_score: float, sentiment_score: float, + lstm_score: float, investor_score: float = 0.0, + weights: Optional[SignalWeights] = None) -> float: + """ + 앙상블 통합 점수 계산 + + Args: + weights: 가중치 (None이면 기본값 사용) + """ + if weights is None: + weights = SignalWeights() + + total = (weights.tech * tech_score + + weights.sentiment * sentiment_score + + weights.lstm * lstm_score) + + # 수급 가산점 (최대 +0.15) + total += min(investor_score, 0.15) + return min(1.0, max(0.0, total)) + + @staticmethod + def _accuracy(scores: list, outcomes: list) -> float: + """신호와 결과의 상관도 계산 (0.5 = 무관, 1.0 = 완전 일치)""" + if len(scores) < 3: + return 0.5 + # 신호가 높을 때 수익, 낮을 때 손실이면 정확 + correct = sum( + 1 for s, o in zip(scores, outcomes) + if (s >= 0.5 and o > 0) or (s < 0.5 and o <= 0) + ) + return correct / len(scores) + + +# 전역 싱글톤 +_ensemble_instance: Optional[AdaptiveEnsemble] = None + + +def get_ensemble() -> AdaptiveEnsemble: + """워커 프로세스 내 싱글톤 앙상블 관리자""" + global _ensemble_instance + if _ensemble_instance is None: + _ensemble_instance = AdaptiveEnsemble() + return _ensemble_instance diff --git a/warmup_and_restart.py b/warmup_and_restart.py new file mode 100644 index 0000000..41ef0ab --- /dev/null +++ b/warmup_and_restart.py @@ -0,0 +1,139 @@ +""" +LSTM 사전학습 + 봇 자동 시작 통합 스크립트 +- 구 봇 종료 후 이 스크립트가 백그라운드로 실행됨 +- 12개 종목 LSTM 사전학습 → 완료 시 main_server.py 자동 시작 +""" +import sys +import os +import time +import subprocess +import json + +# 프로젝트 루트를 path에 추가 +ROOT = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, ROOT) +os.chdir(ROOT) + +LOG_FILE = os.path.join(ROOT, "warmup.log") + +def log(msg): + ts = time.strftime("%H:%M:%S") + line = f"[{ts}] {msg}" + print(line, flush=True) + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(line + "\n") + + +def run_warmup(): + log("=" * 50) + log("LSTM 사전학습 시작 (Warmup)") + log("=" * 50) + + try: + from modules.config import Config + from modules.services.kis import KISClient + from modules.analysis.deep_learning import PricePredictor + from modules.services.telegram import TelegramMessenger + + messenger = TelegramMessenger() + messenger.send_message( + "🔄 [Bot Restarting]\n" + "LSTM 사전학습 중... 완료 후 자동 시작됩니다.\n" + f"대상: Watchlist 전체 종목" + ) + + kis = KISClient() + with open(Config.WATCHLIST_FILE, "r", encoding="utf-8") as f: + watchlist = json.load(f) + + log(f"대상 종목: {len(watchlist)}개") + + predictor = PricePredictor() + total = len(watchlist) + success = 0 + failed = 0 + total_time = 0.0 + + for i, (ticker, name) in enumerate(watchlist.items(), 1): + log(f"[{i}/{total}] {name} ({ticker}) 학습 중...") + try: + # 100일 데이터로 LSTM 학습 (기간별시세 API) + prices = kis.get_daily_price(ticker, count=100) + if not prices or len(prices) < 70: + log(f" ⚠️ 데이터 부족 ({len(prices) if prices else 0}개)") + failed += 1 + continue + + t0 = time.time() + result = predictor.train_and_predict(prices, ticker=ticker) + elapsed = time.time() - t0 + total_time += elapsed + + if result: + log(f" ✅ {result['epochs']}에포크 | loss={result['loss']:.6f} | " + f"{result['change_rate']:+.2f}% | {elapsed:.1f}초") + success += 1 + else: + log(f" ⚠️ 결과 없음 ({elapsed:.1f}초)") + failed += 1 + + # KIS API 과호출 방지 + time.sleep(0.5) + + except Exception as e: + log(f" ❌ 오류: {e}") + failed += 1 + + log("=" * 50) + log(f"Warmup 완료: 성공 {success}개 / 실패 {failed}개 / 총 {total_time:.0f}초") + log("=" * 50) + + messenger.send_message( + f"✅ [Warmup 완료]\n" + f"성공: {success}종목 / 실패: {failed}종목\n" + f"소요: {total_time/60:.1f}분\n" + f"→ 봇 시작 중..." + ) + return True + + except Exception as e: + log(f"❌ Warmup 실패: {e}") + import traceback + log(traceback.format_exc()) + return False + + +def start_bot(): + log("봇 시작: main_server.py") + try: + # 새 콘솔 창에서 봇 실행 (Windows) + if sys.platform == "win32": + subprocess.Popen( + [sys.executable, "main_server.py"], + creationflags=subprocess.CREATE_NEW_CONSOLE, + cwd=ROOT + ) + else: + subprocess.Popen( + [sys.executable, "main_server.py"], + cwd=ROOT + ) + log("✅ 봇 프로세스 시작 완료") + return True + except Exception as e: + log(f"❌ 봇 시작 실패: {e}") + return False + + +if __name__ == "__main__": + # warmup.log 초기화 + with open(LOG_FILE, "w", encoding="utf-8") as f: + f.write(f"Warmup 시작: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") + + warmup_ok = run_warmup() + + if warmup_ok: + time.sleep(2) + start_bot() + else: + log("⚠️ Warmup 실패. 봇을 수동으로 시작해주세요: python main_server.py")