import os import time import pickle import torch import torch.nn as nn import numpy as np import pandas as pd from collections import OrderedDict from sklearn.preprocessing import MinMaxScaler from modules.config import Config # cuDNN 벤치마크 활성화 (고정 입력 크기에 대해 최적 커널 자동 선택) torch.backends.cudnn.benchmark = True # 체크포인트 버전 (피처 수 변경 시 기존 모델 자동 재학습) CHECKPOINT_VERSION = "v3" INPUT_SIZE = 7 # close, open, high, low, volume_norm, rsi_14, macd_hist class Attention(nn.Module): def __init__(self, hidden_size): super(Attention, self).__init__() self.attn = nn.Linear(hidden_size, 1) def forward(self, lstm_output): attn_weights = torch.softmax(self.attn(lstm_output), dim=1) context = torch.sum(attn_weights * lstm_output, dim=1) return context, attn_weights class AdvancedLSTM(nn.Module): def __init__(self, input_size=INPUT_SIZE, hidden_size=512, num_layers=4, output_size=1, dropout=0.3): super(AdvancedLSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout) self.attention = Attention(hidden_size) self.fc = nn.Sequential( nn.Linear(hidden_size, hidden_size // 2), nn.ReLU(), nn.Dropout(dropout), nn.Linear(hidden_size // 2, hidden_size // 4), nn.ReLU(), nn.Linear(hidden_size // 4, output_size) ) def forward(self, x): h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) lstm_out, _ = self.lstm(x, (h0, c0)) context, _ = self.attention(lstm_out) out = self.fc(context) return out def _get_free_vram_gb(): """현재 GPU VRAM 여유량(GB) 반환""" try: if torch.cuda.is_available(): total = torch.cuda.get_device_properties(0).total_memory / 1024**3 reserved = torch.cuda.memory_reserved(0) / 1024**3 return total - reserved except Exception: pass return 99.0 # CUDA 없으면 언로드 불필요 def _unload_ollama(): """LSTM 학습 전 Ollama 모델 언로드 (VRAM < 2GB 여유일 때만)""" free_vram = _get_free_vram_gb() if free_vram >= 2.0: print(f"[AI] Ollama 언로드 생략 (VRAM 여유 {free_vram:.1f}GB >= 2GB)") return try: import requests url = f"{Config.OLLAMA_API_URL}/api/generate" requests.post(url, json={ "model": Config.OLLAMA_MODEL, "keep_alive": 0 }, timeout=5) print(f"[AI] Ollama 언로드 (VRAM 여유 {free_vram:.1f}GB)") time.sleep(1) except Exception: pass def _preload_ollama(): """LSTM 학습 후 Ollama 모델 리로드 (언로드했던 경우만)""" free_vram = _get_free_vram_gb() if free_vram >= 2.0: return # 언로드하지 않았으니 리로드도 불필요 try: import requests url = f"{Config.OLLAMA_API_URL}/api/generate" requests.post(url, json={ "model": Config.OLLAMA_MODEL, "prompt": "", "keep_alive": "10m" }, timeout=10) except Exception: pass def _log_gpu_memory(tag=""): """GPU 메모리 사용량 로깅""" if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated(0) / 1024**3 reserved = torch.cuda.memory_reserved(0) / 1024**3 print(f"[AI GPU {tag}] Allocated: {allocated:.2f}GB / Reserved: {reserved:.2f}GB") def _compute_rsi(close_arr, period=14): """RSI 계산 (numpy 기반)""" if len(close_arr) < period + 1: return np.full(len(close_arr), 50.0) delta = np.diff(close_arr, prepend=close_arr[0]) gain = np.where(delta > 0, delta, 0.0) loss = np.where(delta < 0, -delta, 0.0) alpha = 1.0 / period rsi_arr = np.zeros(len(close_arr)) avg_gain = gain[0] avg_loss = loss[0] for i in range(1, len(close_arr)): avg_gain = alpha * gain[i] + (1 - alpha) * avg_gain avg_loss = alpha * loss[i] + (1 - alpha) * avg_loss rs = avg_gain / (avg_loss + 1e-9) rsi_arr[i] = 100 - (100 / (1 + rs)) return rsi_arr def _compute_macd_hist(close_arr, fast=12, slow=26, signal=9): """MACD Histogram 계산 (numpy 기반)""" if len(close_arr) < slow + signal: return np.zeros(len(close_arr)) ema_fast = np.zeros(len(close_arr)) ema_slow = np.zeros(len(close_arr)) alpha_f = 2 / (fast + 1) alpha_s = 2 / (slow + 1) ema_fast[0] = close_arr[0] ema_slow[0] = close_arr[0] for i in range(1, len(close_arr)): ema_fast[i] = alpha_f * close_arr[i] + (1 - alpha_f) * ema_fast[i - 1] ema_slow[i] = alpha_s * close_arr[i] + (1 - alpha_s) * ema_slow[i - 1] macd = ema_fast - ema_slow sig = np.zeros(len(close_arr)) alpha_sig = 2 / (signal + 1) sig[0] = macd[0] for i in range(1, len(close_arr)): sig[i] = alpha_sig * macd[i] + (1 - alpha_sig) * sig[i - 1] return macd - sig def _build_feature_matrix(ohlcv_data): """ OHLCV 딕셔너리 → 7차원 numpy 피처 행렬 생성 피처: [close, open, high, low, volume_norm, rsi_14, macd_hist] """ close = np.array(ohlcv_data.get('close', []), dtype=np.float64) open_ = np.array(ohlcv_data.get('open', close), dtype=np.float64) high = np.array(ohlcv_data.get('high', close), dtype=np.float64) low = np.array(ohlcv_data.get('low', close), dtype=np.float64) volume = np.array(ohlcv_data.get('volume', []), dtype=np.float64) n = len(close) _degraded = [] if len(open_) != n: open_ = close.copy(); _degraded.append('open') if len(high) != n: high = close.copy(); _degraded.append('high') if len(low) != n: low = close.copy(); _degraded.append('low') if _degraded: print(f"[LSTM] ⚠️ OHLCV 피처 불완전 ({', '.join(_degraded)} → close 대체). 예측 신뢰도 저하 가능") # 거래량 정규화 (20일 이동평균 대비 비율, max 기준보다 정보량이 높음) if len(volume) == n and volume.max() > 0: vol_series = pd.Series(volume) vol_ma20 = vol_series.rolling(20, min_periods=1).mean().values volume_norm = volume / (vol_ma20 + 1e-9) volume_norm = np.clip(volume_norm, 0.0, 5.0) / 5.0 # 0~5배 → 0~1 정규화 else: volume_norm = np.full(n, 0.2) # 데이터 없으면 중립값 rsi = _compute_rsi(close, period=14) rsi_norm = rsi / 100.0 # 0~1 정규화 macd_hist = _compute_macd_hist(close) # 7차원 피처 스택 (n x 7) features = np.column_stack([close, open_, high, low, volume_norm, rsi_norm, macd_hist]) return features # shape: (n, 7) class PricePredictor: """ [v3.0] 주가 예측 Deep Learning 모델 (GPU 최적화) - 7차원 멀티피처 LSTM (close/open/high/low/vol_norm/rsi/macd_hist) - feature_scaler(6개) + target_scaler(1개) 분리 - 데이터 누수 수정: train 데이터로만 fit - 체크포인트에 scaler 상태 저장/로드 - VRAM 여유량 기반 Ollama 언로드 (충분하면 생략) """ def __init__(self): self.feature_scaler = MinMaxScaler(feature_range=(0, 1)) # 입력 6개 피처 self.target_scaler = MinMaxScaler(feature_range=(0, 1)) # 타겟: close 가격 self.hidden_size = 512 self.num_layers = 4 self.model = AdvancedLSTM(input_size=INPUT_SIZE, hidden_size=self.hidden_size, num_layers=self.num_layers, dropout=0.3) self.criterion = nn.MSELoss() self.device = torch.device('cpu') self.use_amp = False if torch.cuda.is_available(): try: gpu_name = torch.cuda.get_device_name(0) vram_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3 self.device = torch.device('cuda') self.model.to(self.device) if torch.cuda.get_device_capability(0)[0] >= 7: self.use_amp = True # Warm-up dummy = torch.zeros(1, 60, INPUT_SIZE, device=self.device) with torch.no_grad(): _ = self.model(dummy) torch.cuda.synchronize() print(f"[AI] GPU Mode: {gpu_name} ({vram_gb:.1f}GB)" f" | FP16={'ON' if self.use_amp else 'OFF'}" f" | Features={INPUT_SIZE} | cuDNN Benchmark=ON") _log_gpu_memory("init") except Exception as e: print(f"[AI] GPU Init Failed ({e}), falling back to CPU") self.device = torch.device('cpu') self.model.to(self.device) else: print("[AI] No CUDA GPU detected. Running on CPU.") self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.001, weight_decay=1e-4) self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6 ) self.scaler_amp = torch.amp.GradScaler('cuda') if self.use_amp else None self.batch_size = 64 self.max_epochs = 200 self.seq_length = 60 self.patience = 15 self.max_grad_norm = 1.0 self.training_status = { "is_training": False, "loss": 0.0, "current_ticker": None } @staticmethod def verify_hardware(): if torch.cuda.is_available(): try: gpu_name = torch.cuda.get_device_name(0) vram_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3 print(f"[AI Check] {gpu_name} ({vram_gb:.1f}GB VRAM) | cuDNN={torch.backends.cudnn.is_available()}" f" | Features={INPUT_SIZE}") return True except Exception as e: print(f"[AI Check] GPU Error: {e}") return False print("[AI Check] No GPU. CPU Mode.") return False def _get_checkpoint_path(self, ticker): return os.path.join(Config.MODEL_DIR, f"{ticker}_lstm_{CHECKPOINT_VERSION}.pt") def _load_checkpoint(self, ticker): path = self._get_checkpoint_path(ticker) if os.path.exists(path): try: checkpoint = torch.load(path, map_location=self.device, weights_only=False) # 버전 체크 (v3 이전 체크포인트는 재학습) if checkpoint.get('version', '') != CHECKPOINT_VERSION: print(f"[AI] Checkpoint version mismatch ({ticker}): 재학습 필요") return False self.model.load_state_dict(checkpoint['model_state_dict']) self.optimizer.load_state_dict(checkpoint['optimizer_state_dict']) # scaler 복원 if 'feature_scaler' in checkpoint: self.feature_scaler = pickle.loads(checkpoint['feature_scaler']) if 'target_scaler' in checkpoint: self.target_scaler = pickle.loads(checkpoint['target_scaler']) print(f"[AI] Checkpoint loaded: {ticker} (v3, 7-features)") return True except Exception as e: print(f"[AI] Checkpoint load failed ({ticker}): {e}") return False def _save_checkpoint(self, ticker, epoch, loss): path = self._get_checkpoint_path(ticker) try: torch.save({ 'version': CHECKPOINT_VERSION, 'model_state_dict': self.model.state_dict(), 'optimizer_state_dict': self.optimizer.state_dict(), 'epoch': epoch, 'loss': loss, 'feature_scaler': pickle.dumps(self.feature_scaler), 'target_scaler': pickle.dumps(self.target_scaler) }, path) except Exception as e: print(f"[AI] Checkpoint save failed ({ticker}): {e}") def _is_checkpoint_fresh(self, ticker, max_age=None): """체크포인트가 최근에 학습된 것인지 확인 (쿨다운 판단)""" if not ticker: return False path = self._get_checkpoint_path(ticker) if not os.path.exists(path): return False age = time.time() - os.path.getmtime(path) threshold = max_age if max_age is not None else Config.LSTM_COOLDOWN return age < threshold def _prepare_scaled_features(self, features, split_point): """ 피처 스케일링 (누수 방지: train split으로만 fit) features: (n, 7) numpy array split_point: train/val 분리 인덱스 Returns: scaled_features: (n, 7) 스케일된 전체 피처 scaled_close: (n, 1) 스케일된 close (타겟용) """ # 6개 입력 피처 (close 포함 open/high/low/vol_norm/rsi/macd_hist) # + 타겟은 close만 별도 scaler input_features = features[:, :] # (n, 7) 전체 7개 피처 입력용 target_close = features[:, 0:1] # (n, 1) close만 타겟용 # train 데이터로만 fit (데이터 누수 방지) self.feature_scaler.fit(input_features[:split_point]) self.target_scaler.fit(target_close[:split_point]) scaled_features = self.feature_scaler.transform(input_features) scaled_close = self.target_scaler.transform(target_close) return scaled_features, scaled_close def _predict_only(self, ohlcv_data, ticker=None): """학습 없이 현재 체크포인트로만 빠른 예측 (쿨다운 중 사용)""" prices = ohlcv_data.get('close', []) if isinstance(ohlcv_data, dict) else ohlcv_data if len(prices) < self.seq_length: return None try: features = _build_feature_matrix( ohlcv_data if isinstance(ohlcv_data, dict) else {'close': prices} ) if len(features) < self.seq_length: return None scaled = self.feature_scaler.transform(features) last_seq = torch.FloatTensor(scaled[-self.seq_length:]).unsqueeze(0).to(self.device) self.model.eval() with torch.no_grad(): if self.use_amp: with torch.amp.autocast('cuda'): pred_scaled = self.model(last_seq) else: pred_scaled = self.model(last_seq) predicted_price = self.target_scaler.inverse_transform( pred_scaled.cpu().float().numpy())[0][0] current_price = prices[-1] trend = "UP" if predicted_price > current_price else "DOWN" change_rate = ((predicted_price - current_price) / current_price) * 100 cached_loss = self.training_status.get("loss", 0.5) # 캐시 신뢰도: 마지막 학습 loss 기반 동적 계산 (고정값 제거) cached_conf = min(0.70, 1.0 / (1.0 + (cached_loss * 200))) print(f"[AI] {ticker or '?'}: 쿨다운 중 → 캐시 예측 사용 " f"({predicted_price:.0f} / {change_rate:+.2f}% / conf={cached_conf:.2f})") return { "current": current_price, "predicted": float(predicted_price), "change_rate": round(change_rate, 2), "trend": trend, "loss": cached_loss, "val_loss": cached_loss, "confidence": round(cached_conf, 2), "epochs": 0, "device": str(self.device), "lr": self.optimizer.param_groups[0]['lr'], "cached": True } except Exception as e: print(f"[AI] _predict_only 실패 ({ticker}): {e}") return None def train_and_predict(self, ohlcv_data, forecast_days=1, ticker=None): """ [v3.0] 7차원 멀티피처 LSTM 학습 + 예측 ohlcv_data: dict {'close':[], 'open':[], 'high':[], 'low':[], 'volume':[]} 또는 list (하위 호환: close 리스트) """ # 하위 호환: list 형태 if isinstance(ohlcv_data, list): ohlcv_data = {'close': ohlcv_data} prices = ohlcv_data.get('close', []) if len(prices) < (self.seq_length + 10): return None # ===== 쿨다운 체크 ===== if self._is_checkpoint_fresh(ticker): has_ckpt = self._load_checkpoint(ticker) if has_ckpt: result = self._predict_only(ohlcv_data, ticker) if result: return result is_gpu = self.device.type == 'cuda' # VRAM 여유량 기반 Ollama 언로드 if is_gpu: _unload_ollama() torch.cuda.empty_cache() _log_gpu_memory("pre-train") t_start = time.time() # 1. 피처 행렬 구성 (n, 7) features = _build_feature_matrix(ohlcv_data) if len(features) < (self.seq_length + 10): return None n = len(features) split_point = int(n * 0.8) # 2. 스케일링 (train 데이터로만 fit → 누수 방지) scaled_features, scaled_close = self._prepare_scaled_features(features, split_point) # 3. 시퀀스 생성 x_seqs, y_seqs = [], [] for i in range(n - self.seq_length): x_seqs.append(scaled_features[i:i + self.seq_length]) # (seq, 7) y_seqs.append(scaled_close[i + self.seq_length]) # (1,) x_all = torch.FloatTensor(np.array(x_seqs)).to(self.device) y_all = torch.FloatTensor(np.array(y_seqs)).to(self.device) # validation split (80/20) seq_split = int(len(x_all) * 0.8) x_train, y_train = x_all[:seq_split], y_all[:seq_split] x_val, y_val = x_all[seq_split:], y_all[seq_split:] dataset_size = len(x_train) # 4. 체크포인트 로드 has_checkpoint = self._load_checkpoint(ticker) if ticker else False max_epochs = Config.LSTM_FAST_EPOCHS if has_checkpoint else self.max_epochs self.optimizer.param_groups[0]['lr'] = 0.001 if not has_checkpoint else 0.0005 self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6 ) # 5. 학습 self.model.train() self.training_status["is_training"] = True if ticker: self.training_status["current_ticker"] = ticker best_val_loss = float('inf') best_model_state = None patience_counter = 0 final_loss = 0.0 actual_epochs = 0 for epoch in range(max_epochs): perm = torch.randperm(dataset_size, device=self.device) x_shuffled = x_train[perm] y_shuffled = y_train[perm] epoch_loss = 0.0 steps = 0 for i in range(0, dataset_size, self.batch_size): end = min(i + self.batch_size, dataset_size) batch_x = x_shuffled[i:end] batch_y = y_shuffled[i:end] self.optimizer.zero_grad(set_to_none=True) if self.use_amp: with torch.amp.autocast('cuda'): outputs = self.model(batch_x) loss = self.criterion(outputs, batch_y) self.scaler_amp.scale(loss).backward() self.scaler_amp.unscale_(self.optimizer) torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm) self.scaler_amp.step(self.optimizer) self.scaler_amp.update() else: outputs = self.model(batch_x) loss = self.criterion(outputs, batch_y) loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm) self.optimizer.step() epoch_loss += loss.item() steps += 1 train_loss = epoch_loss / max(1, steps) self.model.eval() with torch.no_grad(): if self.use_amp: with torch.amp.autocast('cuda'): val_out = self.model(x_val) val_loss = self.criterion(val_out, y_val).item() else: val_out = self.model(x_val) val_loss = self.criterion(val_out, y_val).item() self.model.train() self.lr_scheduler.step(val_loss) final_loss = train_loss actual_epochs = epoch + 1 if val_loss < best_val_loss: best_val_loss = val_loss patience_counter = 0 best_model_state = {k: v.clone() for k, v in self.model.state_dict().items()} else: patience_counter += 1 if patience_counter >= self.patience: break if best_model_state: self.model.load_state_dict(best_model_state) self.training_status["is_training"] = False self.training_status["loss"] = final_loss if is_gpu: torch.cuda.synchronize() elapsed = time.time() - t_start print(f"[AI] {ticker or '?'}: {actual_epochs} epochs in {elapsed:.1f}s" f" | loss={final_loss:.6f} val={best_val_loss:.6f}" f" | device={self.device} | features={INPUT_SIZE}") # 6. 체크포인트 저장 (scaler 포함) if ticker: self._save_checkpoint(ticker, actual_epochs, final_loss) # 7. 예측 self.model.eval() with torch.no_grad(): last_seq = torch.FloatTensor( scaled_features[-self.seq_length:] ).unsqueeze(0).to(self.device) if self.use_amp: with torch.amp.autocast('cuda'): predicted_scaled = self.model(last_seq) else: predicted_scaled = self.model(last_seq) predicted_price = self.target_scaler.inverse_transform( predicted_scaled.cpu().float().numpy())[0][0] # 8. GPU 정리 + Ollama 리로드 if is_gpu: del x_all, y_all, x_train, y_train, x_val, y_val torch.cuda.empty_cache() _log_gpu_memory("post-train") _preload_ollama() current_price = prices[-1] trend = "UP" if predicted_price > current_price else "DOWN" change_rate = ((predicted_price - current_price) / current_price) * 100 # ── 신뢰도 계산 (보수적 버전) ────────────────────────────── # val_loss 기반: 0.001→0.74, 0.003→0.62, 0.01→0.50 (이전보다 보수적) loss_confidence = 1.0 / (1.0 + (best_val_loss * 200)) # 오버피팅 페널티 overfit_ratio = final_loss / (best_val_loss + 1e-9) if overfit_ratio < 0.5: overfit_penalty = 0.65 # 심각한 언더피팅 elif overfit_ratio > 2.5: overfit_penalty = 0.75 # 오버피팅 else: overfit_penalty = 1.0 # 에포크 수 기반 수렴 판단 epoch_factor = 1.0 if actual_epochs < 10: epoch_factor = 0.55 # 너무 이른 수렴 → 불신뢰 elif actual_epochs >= max_epochs: epoch_factor = 0.80 # 미수렴 → 부분 신뢰 # 최종 상한: 0.80 (이전 0.95보다 보수적 — LSTM 70% 가중치 남발 방지) confidence = min(0.80, loss_confidence * overfit_penalty * epoch_factor) return { "current": current_price, "predicted": float(predicted_price), "change_rate": round(change_rate, 2), "trend": trend, "loss": final_loss, "val_loss": best_val_loss, "confidence": round(confidence, 2), "epochs": actual_epochs, "device": str(self.device), "lr": self.optimizer.param_groups[0]['lr'] } def batch_predict(self, ohlcv_dict): """여러 종목을 배치로 예측 (체크포인트 있는 종목만)""" results = {} seqs = [] metas = [] for ticker, ohlcv_data in ohlcv_dict.items(): if isinstance(ohlcv_data, list): ohlcv_data = {'close': ohlcv_data} prices = ohlcv_data.get('close', []) if len(prices) < (self.seq_length + 10): results[ticker] = None continue try: features = _build_feature_matrix(ohlcv_data) scaled = self.feature_scaler.transform(features) seq = torch.FloatTensor(scaled[-self.seq_length:]).unsqueeze(0) seqs.append(seq) metas.append((ticker, prices[-1])) except Exception: results[ticker] = None if not seqs: return results batch = torch.cat(seqs, dim=0).to(self.device) self.model.eval() with torch.no_grad(): if self.use_amp: with torch.amp.autocast('cuda'): preds = self.model(batch) else: preds = self.model(batch) preds_cpu = preds.cpu().float().numpy() for i, (ticker, current_price) in enumerate(metas): predicted_price = self.target_scaler.inverse_transform(preds_cpu[i:i+1])[0][0] trend = "UP" if predicted_price > current_price else "DOWN" change_rate = ((predicted_price - current_price) / current_price) * 100 results[ticker] = { "current": current_price, "predicted": float(predicted_price), "change_rate": round(change_rate, 2), "trend": trend } if self.device.type == 'cuda': torch.cuda.empty_cache() return results class ModelRegistry: """ [v3.0] 종목별 LSTM 모델 격리 (LRU 퇴출, max_models=5) - 싱글톤 패턴: 워커 프로세스마다 하나의 Registry 유지 - 16GB VRAM에서 LSTM 5개(~250MB) + Ollama 7B(~4GB) 동시 적재 가능 """ _instance = None @classmethod def get_instance(cls): if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self, max_models=5): self.max_models = max_models self._predictors = OrderedDict() # ticker -> PricePredictor (LRU 순서) print(f"[ModelRegistry] Initialized (max_models={max_models})") def get_predictor(self, ticker): """종목별 PricePredictor 반환 (없으면 생성, LRU 관리)""" if ticker in self._predictors: # LRU: 접근 시 맨 뒤로 이동 self._predictors.move_to_end(ticker) return self._predictors[ticker] # 용량 초과 시 가장 오래된 것 퇴출 if len(self._predictors) >= self.max_models: oldest_ticker, oldest_pred = self._predictors.popitem(last=False) print(f"[ModelRegistry] Evicted {oldest_ticker} (LRU, {len(self._predictors)}/{self.max_models})") del oldest_pred if torch.cuda.is_available(): torch.cuda.empty_cache() predictor = PricePredictor() self._predictors[ticker] = predictor print(f"[ModelRegistry] Created predictor for {ticker} ({len(self._predictors)}/{self.max_models})") return predictor def has_predictor(self, ticker): return ticker in self._predictors def clear(self): """모든 모델 해제""" self._predictors.clear() if torch.cuda.is_available(): torch.cuda.empty_cache()