""" 앙상블 예측 모듈 (Phase 3-2) - LSTM + 기술지표 + LLM 감성 → 적응형 가중치 - 과거 매매 결과 기반 가중치 자동 조정 - process.py의 하드코딩된 w_tech/w_news/w_ai 대체 """ import os import json import numpy as np from dataclasses import dataclass, field from typing import Dict, Optional from modules.config import Config @dataclass class SignalWeights: """앙상블 가중치""" tech: float = 0.35 sentiment: float = 0.30 lstm: float = 0.35 def normalize(self): total = self.tech + self.sentiment + self.lstm if total > 0: self.tech /= total self.sentiment /= total self.lstm /= total return self def to_dict(self): return {"tech": self.tech, "sentiment": self.sentiment, "lstm": self.lstm} @classmethod def from_dict(cls, d): return cls(tech=d.get("tech", 0.35), sentiment=d.get("sentiment", 0.30), lstm=d.get("lstm", 0.35)) class AdaptiveEnsemble: """ 적응형 앙상블 가중치 관리자 핵심 로직: 1. 종목별 최근 N 매매의 결과를 추적 2. 어떤 신호가 정확했는지 소급 평가 3. 정확도가 높은 신호의 가중치를 점진적으로 증가 4. 시장 상황(ADX, 거시경제) 반영한 컨텍스트별 가중치 분리 """ def __init__(self, history_file=None, max_history=50): self.max_history = max_history self.history_file = history_file or os.path.join( Config.DATA_DIR, "ensemble_history.json" ) # {ticker: [{"tech": f, "sentiment": f, "lstm": f, "decision": str, "outcome": float}, ...]} self._trade_history: Dict[str, list] = {} # {context: SignalWeights} - context: "strong_trend" | "sideways" | "danger" self._context_weights: Dict[str, SignalWeights] = { "strong_trend": SignalWeights(tech=0.50, sentiment=0.20, lstm=0.30), "sideways": SignalWeights(tech=0.30, sentiment=0.40, lstm=0.30), "danger": SignalWeights(tech=0.20, sentiment=0.50, lstm=0.30), "default": SignalWeights(tech=0.35, sentiment=0.30, lstm=0.35), } self._load() def _load(self): if os.path.exists(self.history_file): try: with open(self.history_file, "r", encoding="utf-8") as f: data = json.load(f) self._trade_history = data.get("history", {}) weights_raw = data.get("weights", {}) for ctx, w in weights_raw.items(): self._context_weights[ctx] = SignalWeights.from_dict(w) except Exception as e: print(f"[Ensemble] Load failed: {e}") def _save(self): try: data = { "history": {k: v[-self.max_history:] for k, v in self._trade_history.items()}, "weights": {ctx: w.to_dict() for ctx, w in self._context_weights.items()} } with open(self.history_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"[Ensemble] Save failed: {e}") def get_context(self, adx: float, macro_state: str) -> str: """현재 시장 컨텍스트 결정""" if macro_state == "DANGER": return "danger" if adx >= 25: return "strong_trend" if adx < 20: return "sideways" return "default" def get_weights(self, ticker: str, adx: float = 20.0, macro_state: str = "SAFE", ai_confidence: float = 0.5) -> SignalWeights: """ 종목 + 시장 컨텍스트에 맞는 가중치 반환 1. 기본: 컨텍스트별 기준 가중치 2. AI 신뢰도 높으면 lstm 가중치 보정 3. 종목별 학습 결과 반영 """ context = self.get_context(adx, macro_state) base = self._context_weights.get(context, self._context_weights["default"]) # 적응형 조정: 해당 종목의 과거 성과 반영 ticker_history = self._trade_history.get(ticker, []) adjusted = SignalWeights(tech=base.tech, sentiment=base.sentiment, lstm=base.lstm) if len(ticker_history) >= 5: # 최근 5회 신호별 정확도 평가 recent = ticker_history[-10:] tech_acc = self._accuracy([h["tech_score"] for h in recent], [h["outcome"] for h in recent]) news_acc = self._accuracy([h["sentiment_score"] for h in recent], [h["outcome"] for h in recent]) lstm_acc = self._accuracy([h["lstm_score"] for h in recent], [h["outcome"] for h in recent]) # 정확도 기반 가중치 미세 조정 (±0.1 범위) alpha = 0.05 adjusted.tech = max(0.1, min(0.6, base.tech + alpha * (tech_acc - 0.5))) adjusted.sentiment = max(0.1, min(0.6, base.sentiment + alpha * (news_acc - 0.5))) adjusted.lstm = max(0.1, min(0.6, base.lstm + alpha * (lstm_acc - 0.5))) # AI 신뢰도 보정 if ai_confidence >= 0.85: adjusted.lstm = min(0.70, adjusted.lstm * 1.3) elif ai_confidence < 0.5: adjusted.lstm = max(0.10, adjusted.lstm * 0.7) return adjusted.normalize() def record_trade(self, ticker: str, tech_score: float, sentiment_score: float, lstm_score: float, decision: str, outcome_pct: float): """ 매매 결과 기록 (가중치 학습 데이터) outcome_pct: 실현 수익률 (%). 양수=이익, 음수=손실 """ if ticker not in self._trade_history: self._trade_history[ticker] = [] record = { "tech_score": tech_score, "sentiment_score": sentiment_score, "lstm_score": lstm_score, "decision": decision, "outcome": outcome_pct } self._trade_history[ticker].append(record) # 히스토리 크기 제한 if len(self._trade_history[ticker]) > self.max_history: self._trade_history[ticker] = self._trade_history[ticker][-self.max_history:] # 가중치 점진적 업데이트 self._update_weights(ticker) self._save() def _update_weights(self, ticker: str): """종목별 성과를 반영해 컨텍스트 가중치 점진적 업데이트""" history = self._trade_history.get(ticker, []) if len(history) < 5: return recent = history[-10:] outcomes = [h["outcome"] for h in recent] mean_outcome = np.mean(outcomes) if mean_outcome > 0: # 전략이 효과적 → 현재 가중치 유지 (강화) pass elif mean_outcome < -2.0: # 손실이 큰 경우 → 기본값으로 리셋 for ctx in self._context_weights: self._context_weights[ctx] = SignalWeights( tech=0.35, sentiment=0.30, lstm=0.35) def compute_ensemble_score(self, tech_score: float, sentiment_score: float, lstm_score: float, investor_score: float = 0.0, weights: Optional[SignalWeights] = None) -> float: """ 앙상블 통합 점수 계산 Args: weights: 가중치 (None이면 기본값 사용) """ if weights is None: weights = SignalWeights() total = (weights.tech * tech_score + weights.sentiment * sentiment_score + weights.lstm * lstm_score) # 수급 가산점 (최대 +0.15) total += min(investor_score, 0.15) return min(1.0, max(0.0, total)) @staticmethod def _accuracy(scores: list, outcomes: list) -> float: """신호와 결과의 상관도 계산 (0.5 = 무관, 1.0 = 완전 일치)""" if len(scores) < 3: return 0.5 # 신호가 높을 때 수익, 낮을 때 손실이면 정확 correct = sum( 1 for s, o in zip(scores, outcomes) if (s >= 0.5 and o > 0) or (s < 0.5 and o <= 0) ) return correct / len(scores) # 전역 싱글톤 _ensemble_instance: Optional[AdaptiveEnsemble] = None def get_ensemble() -> AdaptiveEnsemble: """워커 프로세스 내 싱글톤 앙상블 관리자""" global _ensemble_instance if _ensemble_instance is None: _ensemble_instance = AdaptiveEnsemble() return _ensemble_instance