diff --git a/modules/analysis/deep_learning.py b/modules/analysis/deep_learning.py index c579a95..8df2f6d 100644 --- a/modules/analysis/deep_learning.py +++ b/modules/analysis/deep_learning.py @@ -1,8 +1,10 @@ import os import time +import pickle import torch import torch.nn as nn import numpy as np +from collections import OrderedDict from sklearn.preprocessing import MinMaxScaler from modules.config import Config @@ -10,6 +12,10 @@ from modules.config import Config # cuDNN 벤치마크 활성화 (고정 입력 크기에 대해 최적 커널 자동 선택) torch.backends.cudnn.benchmark = True +# 체크포인트 버전 (피처 수 변경 시 기존 모델 자동 재학습) +CHECKPOINT_VERSION = "v3" +INPUT_SIZE = 7 # close, open, high, low, volume_norm, rsi_14, macd_hist + class Attention(nn.Module): def __init__(self, hidden_size): @@ -23,7 +29,7 @@ class Attention(nn.Module): class AdvancedLSTM(nn.Module): - def __init__(self, input_size=1, hidden_size=512, num_layers=4, output_size=1, dropout=0.3): + def __init__(self, input_size=INPUT_SIZE, hidden_size=512, num_layers=4, output_size=1, dropout=0.3): super(AdvancedLSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers @@ -49,8 +55,24 @@ class AdvancedLSTM(nn.Module): return out +def _get_free_vram_gb(): + """현재 GPU VRAM 여유량(GB) 반환""" + try: + if torch.cuda.is_available(): + total = torch.cuda.get_device_properties(0).total_memory / 1024**3 + reserved = torch.cuda.memory_reserved(0) / 1024**3 + return total - reserved + except Exception: + pass + return 99.0 # CUDA 없으면 언로드 불필요 + + def _unload_ollama(): - """LSTM 학습 전 Ollama 모델 언로드하여 GPU 메모리 확보""" + """LSTM 학습 전 Ollama 모델 언로드 (VRAM < 2GB 여유일 때만)""" + free_vram = _get_free_vram_gb() + if free_vram >= 2.0: + print(f"[AI] Ollama 언로드 생략 (VRAM 여유 {free_vram:.1f}GB >= 2GB)") + return try: import requests url = f"{Config.OLLAMA_API_URL}/api/generate" @@ -58,14 +80,17 @@ def _unload_ollama(): "model": Config.OLLAMA_MODEL, "keep_alive": 0 }, timeout=5) - print("[AI] Ollama model unloaded (GPU memory freed)") - time.sleep(1) # 메모리 해제 대기 + print(f"[AI] Ollama 언로드 (VRAM 여유 {free_vram:.1f}GB)") + time.sleep(1) except Exception: pass def _preload_ollama(): - """LSTM 학습 후 Ollama 모델 다시 로드""" + """LSTM 학습 후 Ollama 모델 리로드 (언로드했던 경우만)""" + free_vram = _get_free_vram_gb() + if free_vram >= 2.0: + return # 언로드하지 않았으니 리로드도 불필요 try: import requests url = f"{Config.OLLAMA_API_URL}/api/generate" @@ -86,25 +111,99 @@ def _log_gpu_memory(tag=""): print(f"[AI GPU {tag}] Allocated: {allocated:.2f}GB / Reserved: {reserved:.2f}GB") +def _compute_rsi(close_arr, period=14): + """RSI 계산 (numpy 기반)""" + if len(close_arr) < period + 1: + return np.full(len(close_arr), 50.0) + delta = np.diff(close_arr, prepend=close_arr[0]) + gain = np.where(delta > 0, delta, 0.0) + loss = np.where(delta < 0, -delta, 0.0) + alpha = 1.0 / period + rsi_arr = np.zeros(len(close_arr)) + avg_gain = gain[0] + avg_loss = loss[0] + for i in range(1, len(close_arr)): + avg_gain = alpha * gain[i] + (1 - alpha) * avg_gain + avg_loss = alpha * loss[i] + (1 - alpha) * avg_loss + rs = avg_gain / (avg_loss + 1e-9) + rsi_arr[i] = 100 - (100 / (1 + rs)) + return rsi_arr + + +def _compute_macd_hist(close_arr, fast=12, slow=26, signal=9): + """MACD Histogram 계산 (numpy 기반)""" + if len(close_arr) < slow + signal: + return np.zeros(len(close_arr)) + ema_fast = np.zeros(len(close_arr)) + ema_slow = np.zeros(len(close_arr)) + alpha_f = 2 / (fast + 1) + alpha_s = 2 / (slow + 1) + ema_fast[0] = close_arr[0] + ema_slow[0] = close_arr[0] + for i in range(1, len(close_arr)): + ema_fast[i] = alpha_f * close_arr[i] + (1 - alpha_f) * ema_fast[i - 1] + ema_slow[i] = alpha_s * close_arr[i] + (1 - alpha_s) * ema_slow[i - 1] + macd = ema_fast - ema_slow + sig = np.zeros(len(close_arr)) + alpha_sig = 2 / (signal + 1) + sig[0] = macd[0] + for i in range(1, len(close_arr)): + sig[i] = alpha_sig * macd[i] + (1 - alpha_sig) * sig[i - 1] + return macd - sig + + +def _build_feature_matrix(ohlcv_data): + """ + OHLCV 딕셔너리 → 7차원 numpy 피처 행렬 생성 + 피처: [close, open, high, low, volume_norm, rsi_14, macd_hist] + """ + close = np.array(ohlcv_data.get('close', []), dtype=np.float64) + open_ = np.array(ohlcv_data.get('open', close), dtype=np.float64) + high = np.array(ohlcv_data.get('high', close), dtype=np.float64) + low = np.array(ohlcv_data.get('low', close), dtype=np.float64) + volume = np.array(ohlcv_data.get('volume', []), dtype=np.float64) + + n = len(close) + if len(open_) != n: open_ = close.copy() + if len(high) != n: high = close.copy() + if len(low) != n: low = close.copy() + + # 거래량 정규화 (최대값 기준, 0이면 0) + if len(volume) == n and volume.max() > 0: + volume_norm = volume / (volume.max() + 1e-9) + else: + volume_norm = np.zeros(n) + + rsi = _compute_rsi(close, period=14) + rsi_norm = rsi / 100.0 # 0~1 정규화 + + macd_hist = _compute_macd_hist(close) + + # 7차원 피처 스택 (n x 7) + features = np.column_stack([close, open_, high, low, volume_norm, rsi_norm, macd_hist]) + return features # shape: (n, 7) + + class PricePredictor: """ - 주가 예측 Deep Learning 모델 (GPU 최적화) - - 전체 학습 데이터를 GPU에 상주 (CPU↔GPU 전송 최소화) - - Ollama 모델 언로드/리로드로 GPU 메모리 확보 - - Early Stopping + Mixed Precision (FP16) - - 종목별 모델 체크포인트 + [v3.0] 주가 예측 Deep Learning 모델 (GPU 최적화) + - 7차원 멀티피처 LSTM (close/open/high/low/vol_norm/rsi/macd_hist) + - feature_scaler(6개) + target_scaler(1개) 분리 + - 데이터 누수 수정: train 데이터로만 fit + - 체크포인트에 scaler 상태 저장/로드 + - VRAM 여유량 기반 Ollama 언로드 (충분하면 생략) """ def __init__(self): - self.scaler = MinMaxScaler(feature_range=(0, 1)) + self.feature_scaler = MinMaxScaler(feature_range=(0, 1)) # 입력 6개 피처 + self.target_scaler = MinMaxScaler(feature_range=(0, 1)) # 타겟: close 가격 self.hidden_size = 512 self.num_layers = 4 - self.model = AdvancedLSTM(input_size=1, hidden_size=self.hidden_size, + self.model = AdvancedLSTM(input_size=INPUT_SIZE, hidden_size=self.hidden_size, num_layers=self.num_layers, dropout=0.3) self.criterion = nn.MSELoss() - # CUDA 설정 self.device = torch.device('cpu') self.use_amp = False @@ -116,19 +215,18 @@ class PricePredictor: self.device = torch.device('cuda') self.model.to(self.device) - # Mixed Precision (Compute Capability >= 7.0: Volta 이상) if torch.cuda.get_device_capability(0)[0] >= 7: self.use_amp = True - # Warm-up: CUDA 커널 컴파일 유도 - dummy = torch.zeros(1, 60, 1, device=self.device) + # Warm-up + dummy = torch.zeros(1, 60, INPUT_SIZE, device=self.device) with torch.no_grad(): _ = self.model(dummy) torch.cuda.synchronize() print(f"[AI] GPU Mode: {gpu_name} ({vram_gb:.1f}GB)" f" | FP16={'ON' if self.use_amp else 'OFF'}" - f" | cuDNN Benchmark=ON") + f" | Features={INPUT_SIZE} | cuDNN Benchmark=ON") _log_gpu_memory("init") except Exception as e: @@ -139,9 +237,8 @@ class PricePredictor: print("[AI] No CUDA GPU detected. Running on CPU.") self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.001, weight_decay=1e-4) - # [v2.0] Learning Rate Scheduler (ReduceLROnPlateau: val_loss 정체 시 lr 감소) self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( - self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=False + self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6 ) self.scaler_amp = torch.amp.GradScaler('cuda') if self.use_amp else None @@ -149,7 +246,6 @@ class PricePredictor: self.max_epochs = 200 self.seq_length = 60 self.patience = 15 - # [v2.0] Gradient Clipping 값 (exploding gradient 방지) self.max_grad_norm = 1.0 self.training_status = { @@ -164,7 +260,8 @@ class PricePredictor: try: gpu_name = torch.cuda.get_device_name(0) vram_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3 - print(f"[AI Check] {gpu_name} ({vram_gb:.1f}GB VRAM) | cuDNN={torch.backends.cudnn.is_available()}") + print(f"[AI Check] {gpu_name} ({vram_gb:.1f}GB VRAM) | cuDNN={torch.backends.cudnn.is_available()}" + f" | Features={INPUT_SIZE}") return True except Exception as e: print(f"[AI Check] GPU Error: {e}") @@ -173,16 +270,25 @@ class PricePredictor: return False def _get_checkpoint_path(self, ticker): - return os.path.join(Config.MODEL_DIR, f"{ticker}_lstm.pt") + return os.path.join(Config.MODEL_DIR, f"{ticker}_lstm_{CHECKPOINT_VERSION}.pt") def _load_checkpoint(self, ticker): path = self._get_checkpoint_path(ticker) if os.path.exists(path): try: - checkpoint = torch.load(path, map_location=self.device, weights_only=True) + checkpoint = torch.load(path, map_location=self.device, weights_only=False) + # 버전 체크 (v3 이전 체크포인트는 재학습) + if checkpoint.get('version', '') != CHECKPOINT_VERSION: + print(f"[AI] Checkpoint version mismatch ({ticker}): 재학습 필요") + return False self.model.load_state_dict(checkpoint['model_state_dict']) self.optimizer.load_state_dict(checkpoint['optimizer_state_dict']) - print(f"[AI] Checkpoint loaded: {ticker}") + # scaler 복원 + if 'feature_scaler' in checkpoint: + self.feature_scaler = pickle.loads(checkpoint['feature_scaler']) + if 'target_scaler' in checkpoint: + self.target_scaler = pickle.loads(checkpoint['target_scaler']) + print(f"[AI] Checkpoint loaded: {ticker} (v3, 7-features)") return True except Exception as e: print(f"[AI] Checkpoint load failed ({ticker}): {e}") @@ -192,21 +298,127 @@ class PricePredictor: path = self._get_checkpoint_path(ticker) try: torch.save({ + 'version': CHECKPOINT_VERSION, 'model_state_dict': self.model.state_dict(), 'optimizer_state_dict': self.optimizer.state_dict(), 'epoch': epoch, - 'loss': loss + 'loss': loss, + 'feature_scaler': pickle.dumps(self.feature_scaler), + 'target_scaler': pickle.dumps(self.target_scaler) }, path) except Exception as e: print(f"[AI] Checkpoint save failed ({ticker}): {e}") - def train_and_predict(self, prices, forecast_days=1, ticker=None): + def _is_checkpoint_fresh(self, ticker, max_age=None): + """체크포인트가 최근에 학습된 것인지 확인 (쿨다운 판단)""" + if not ticker: + return False + path = self._get_checkpoint_path(ticker) + if not os.path.exists(path): + return False + age = time.time() - os.path.getmtime(path) + threshold = max_age if max_age is not None else Config.LSTM_COOLDOWN + return age < threshold + + def _prepare_scaled_features(self, features, split_point): + """ + 피처 스케일링 (누수 방지: train split으로만 fit) + features: (n, 7) numpy array + split_point: train/val 분리 인덱스 + + Returns: + scaled_features: (n, 7) 스케일된 전체 피처 + scaled_close: (n, 1) 스케일된 close (타겟용) + """ + # 6개 입력 피처 (close 포함 open/high/low/vol_norm/rsi/macd_hist) + # + 타겟은 close만 별도 scaler + input_features = features[:, :] # (n, 7) 전체 7개 피처 입력용 + target_close = features[:, 0:1] # (n, 1) close만 타겟용 + + # train 데이터로만 fit (데이터 누수 방지) + self.feature_scaler.fit(input_features[:split_point]) + self.target_scaler.fit(target_close[:split_point]) + + scaled_features = self.feature_scaler.transform(input_features) + scaled_close = self.target_scaler.transform(target_close) + + return scaled_features, scaled_close + + def _predict_only(self, ohlcv_data, ticker=None): + """학습 없이 현재 체크포인트로만 빠른 예측 (쿨다운 중 사용)""" + prices = ohlcv_data.get('close', []) if isinstance(ohlcv_data, dict) else ohlcv_data + if len(prices) < self.seq_length: + return None + try: + features = _build_feature_matrix( + ohlcv_data if isinstance(ohlcv_data, dict) else {'close': prices} + ) + if len(features) < self.seq_length: + return None + + scaled = self.feature_scaler.transform(features) + last_seq = torch.FloatTensor(scaled[-self.seq_length:]).unsqueeze(0).to(self.device) + + self.model.eval() + with torch.no_grad(): + if self.use_amp: + with torch.amp.autocast('cuda'): + pred_scaled = self.model(last_seq) + else: + pred_scaled = self.model(last_seq) + + predicted_price = self.target_scaler.inverse_transform( + pred_scaled.cpu().float().numpy())[0][0] + + current_price = prices[-1] + trend = "UP" if predicted_price > current_price else "DOWN" + change_rate = ((predicted_price - current_price) / current_price) * 100 + + cached_loss = self.training_status.get("loss", 0.5) + print(f"[AI] {ticker or '?'}: 쿨다운 중 → 캐시 예측 사용 " + f"({predicted_price:.0f} / {change_rate:+.2f}%)") + return { + "current": current_price, + "predicted": float(predicted_price), + "change_rate": round(change_rate, 2), + "trend": trend, + "loss": cached_loss, + "val_loss": cached_loss, + "confidence": 0.62, + "epochs": 0, + "device": str(self.device), + "lr": self.optimizer.param_groups[0]['lr'], + "cached": True + } + except Exception as e: + print(f"[AI] _predict_only 실패 ({ticker}): {e}") + return None + + def train_and_predict(self, ohlcv_data, forecast_days=1, ticker=None): + """ + [v3.0] 7차원 멀티피처 LSTM 학습 + 예측 + ohlcv_data: dict {'close':[], 'open':[], 'high':[], 'low':[], 'volume':[]} + 또는 list (하위 호환: close 리스트) + """ + # 하위 호환: list 형태 + if isinstance(ohlcv_data, list): + ohlcv_data = {'close': ohlcv_data} + + prices = ohlcv_data.get('close', []) if len(prices) < (self.seq_length + 10): return None + # ===== 쿨다운 체크 ===== + if self._is_checkpoint_fresh(ticker): + has_ckpt = self._load_checkpoint(ticker) + if has_ckpt: + result = self._predict_only(ohlcv_data, ticker) + if result: + return result + is_gpu = self.device.type == 'cuda' - # --- Ollama 모델 언로드 (GPU 메모리 확보) --- + # VRAM 여유량 기반 Ollama 언로드 if is_gpu: _unload_ollama() torch.cuda.empty_cache() @@ -214,54 +426,54 @@ class PricePredictor: t_start = time.time() - # 1. 데이터 전처리 (CPU에서 numpy 작업) - data = np.array(prices).reshape(-1, 1) - scaled_data = self.scaler.fit_transform(data) + # 1. 피처 행렬 구성 (n, 7) + features = _build_feature_matrix(ohlcv_data) + if len(features) < (self.seq_length + 10): + return None + n = len(features) + split_point = int(n * 0.8) + + # 2. 스케일링 (train 데이터로만 fit → 누수 방지) + scaled_features, scaled_close = self._prepare_scaled_features(features, split_point) + + # 3. 시퀀스 생성 x_seqs, y_seqs = [], [] - for i in range(len(scaled_data) - self.seq_length): - x_seqs.append(scaled_data[i:i + self.seq_length]) - y_seqs.append(scaled_data[i + self.seq_length]) + for i in range(n - self.seq_length): + x_seqs.append(scaled_features[i:i + self.seq_length]) # (seq, 7) + y_seqs.append(scaled_close[i + self.seq_length]) # (1,) - # 2. 텐서 생성 → 즉시 GPU로 이동 (이후 CPU↔GPU 전송 없음) x_all = torch.FloatTensor(np.array(x_seqs)).to(self.device) y_all = torch.FloatTensor(np.array(y_seqs)).to(self.device) - # Validation split (80/20) - split_idx = int(len(x_all) * 0.8) - x_train = x_all[:split_idx] - y_train = y_all[:split_idx] - x_val = x_all[split_idx:] - y_val = y_all[split_idx:] - + # validation split (80/20) + seq_split = int(len(x_all) * 0.8) + x_train, y_train = x_all[:seq_split], y_all[:seq_split] + x_val, y_val = x_all[seq_split:], y_all[seq_split:] dataset_size = len(x_train) - # 3. 체크포인트 로드 - has_checkpoint = False - if ticker: - has_checkpoint = self._load_checkpoint(ticker) - max_epochs = 50 if has_checkpoint else self.max_epochs + # 4. 체크포인트 로드 + has_checkpoint = self._load_checkpoint(ticker) if ticker else False + max_epochs = Config.LSTM_FAST_EPOCHS if has_checkpoint else self.max_epochs - # 4. 학습 (전체 데이터 GPU 상주, DataLoader 미사용) - # [v2.0] LR Scheduler 리셋 self.optimizer.param_groups[0]['lr'] = 0.001 if not has_checkpoint else 0.0005 self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( - self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=False + self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6 ) + # 5. 학습 self.model.train() self.training_status["is_training"] = True if ticker: self.training_status["current_ticker"] = ticker best_val_loss = float('inf') - best_model_state = None # [v2.0] Best Model 저장 + best_model_state = None patience_counter = 0 final_loss = 0.0 actual_epochs = 0 for epoch in range(max_epochs): - # --- Training (GPU 내에서 셔플 + 미니배치) --- perm = torch.randperm(dataset_size, device=self.device) x_shuffled = x_train[perm] y_shuffled = y_train[perm] @@ -281,7 +493,6 @@ class PricePredictor: outputs = self.model(batch_x) loss = self.criterion(outputs, batch_y) self.scaler_amp.scale(loss).backward() - # [v2.0] Gradient Clipping (AMP 호환) self.scaler_amp.unscale_(self.optimizer) torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm) self.scaler_amp.step(self.optimizer) @@ -290,7 +501,6 @@ class PricePredictor: outputs = self.model(batch_x) loss = self.criterion(outputs, batch_y) loss.backward() - # [v2.0] Gradient Clipping torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm) self.optimizer.step() @@ -299,7 +509,6 @@ class PricePredictor: train_loss = epoch_loss / max(1, steps) - # --- Validation (GPU에서 직접 수행) --- self.model.eval() with torch.no_grad(): if self.use_amp: @@ -311,23 +520,19 @@ class PricePredictor: val_loss = self.criterion(val_out, y_val).item() self.model.train() - # [v2.0] LR Scheduler step (val_loss 기반) self.lr_scheduler.step(val_loss) - final_loss = train_loss actual_epochs = epoch + 1 if val_loss < best_val_loss: best_val_loss = val_loss patience_counter = 0 - # [v2.0] Best model 상태 저장 (overfitting 방지) best_model_state = {k: v.clone() for k, v in self.model.state_dict().items()} else: patience_counter += 1 if patience_counter >= self.patience: break - # [v2.0] Best model 복원 (early stopping 후 최적 상태로 복구) if best_model_state: self.model.load_state_dict(best_model_state) @@ -340,17 +545,17 @@ class PricePredictor: elapsed = time.time() - t_start print(f"[AI] {ticker or '?'}: {actual_epochs} epochs in {elapsed:.1f}s" f" | loss={final_loss:.6f} val={best_val_loss:.6f}" - f" | device={self.device}") + f" | device={self.device} | features={INPUT_SIZE}") - # 5. 체크포인트 저장 + # 6. 체크포인트 저장 (scaler 포함) if ticker: self._save_checkpoint(ticker, actual_epochs, final_loss) - # 6. 예측 + # 7. 예측 self.model.eval() with torch.no_grad(): last_seq = torch.FloatTensor( - scaled_data[-self.seq_length:] + scaled_features[-self.seq_length:] ).unsqueeze(0).to(self.device) if self.use_amp: @@ -359,12 +564,11 @@ class PricePredictor: else: predicted_scaled = self.model(last_seq) - predicted_price = self.scaler.inverse_transform( + predicted_price = self.target_scaler.inverse_transform( predicted_scaled.cpu().float().numpy())[0][0] - # 7. GPU 메모리 정리 + Ollama 리로드 + # 8. GPU 정리 + Ollama 리로드 if is_gpu: - # 학습 중간 텐서 해제 del x_all, y_all, x_train, y_train, x_val, y_val torch.cuda.empty_cache() _log_gpu_memory("post-train") @@ -374,27 +578,22 @@ class PricePredictor: trend = "UP" if predicted_price > current_price else "DOWN" change_rate = ((predicted_price - current_price) / current_price) * 100 - # [v2.0] 개선된 신뢰도 계산 - # 1. 학습 손실 기반 (낮을수록 좋음) + # 신뢰도 계산 loss_confidence = 1.0 / (1.0 + (best_val_loss * 50)) - # 2. Train/Val 괴리도 (overfitting 감지) overfit_ratio = final_loss / (best_val_loss + 1e-9) if overfit_ratio < 0.5: - # Train loss가 Val loss보다 훨씬 낮음 = overfitting overfit_penalty = 0.7 elif overfit_ratio > 2.0: - # Train loss가 Val loss보다 훨씬 높음 = underfitting overfit_penalty = 0.8 else: overfit_penalty = 1.0 - # 3. 에포크 수 기반 (너무 적거나 많으면 불신) epoch_factor = 1.0 if actual_epochs < 10: - epoch_factor = 0.6 # 학습 부족 + epoch_factor = 0.6 elif actual_epochs >= max_epochs: - epoch_factor = 0.8 # 수렴 실패 + epoch_factor = 0.8 confidence = min(0.95, loss_confidence * overfit_penalty * epoch_factor) @@ -411,28 +610,32 @@ class PricePredictor: "lr": self.optimizer.param_groups[0]['lr'] } - def batch_predict(self, prices_dict): + def batch_predict(self, ohlcv_dict): + """여러 종목을 배치로 예측 (체크포인트 있는 종목만)""" results = {} seqs = [] metas = [] - for ticker, prices in prices_dict.items(): + for ticker, ohlcv_data in ohlcv_dict.items(): + if isinstance(ohlcv_data, list): + ohlcv_data = {'close': ohlcv_data} + prices = ohlcv_data.get('close', []) if len(prices) < (self.seq_length + 10): results[ticker] = None continue - data = np.array(prices).reshape(-1, 1) - scaler = MinMaxScaler(feature_range=(0, 1)) - scaled_data = scaler.fit_transform(data) - - seq = torch.FloatTensor(scaled_data[-self.seq_length:]).unsqueeze(0) - seqs.append(seq) - metas.append((ticker, scaler, prices[-1])) + try: + features = _build_feature_matrix(ohlcv_data) + scaled = self.feature_scaler.transform(features) + seq = torch.FloatTensor(scaled[-self.seq_length:]).unsqueeze(0) + seqs.append(seq) + metas.append((ticker, prices[-1])) + except Exception: + results[ticker] = None if not seqs: return results - # 배치로 합쳐서 한번에 GPU 추론 batch = torch.cat(seqs, dim=0).to(self.device) self.model.eval() @@ -445,8 +648,8 @@ class PricePredictor: preds_cpu = preds.cpu().float().numpy() - for i, (ticker, scaler, current_price) in enumerate(metas): - predicted_price = scaler.inverse_transform(preds_cpu[i:i+1])[0][0] + for i, (ticker, current_price) in enumerate(metas): + predicted_price = self.target_scaler.inverse_transform(preds_cpu[i:i+1])[0][0] trend = "UP" if predicted_price > current_price else "DOWN" change_rate = ((predicted_price - current_price) / current_price) * 100 @@ -461,3 +664,52 @@ class PricePredictor: torch.cuda.empty_cache() return results + + +class ModelRegistry: + """ + [v3.0] 종목별 LSTM 모델 격리 (LRU 퇴출, max_models=5) + - 싱글톤 패턴: 워커 프로세스마다 하나의 Registry 유지 + - 16GB VRAM에서 LSTM 5개(~250MB) + Ollama 7B(~4GB) 동시 적재 가능 + """ + _instance = None + + @classmethod + def get_instance(cls): + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def __init__(self, max_models=5): + self.max_models = max_models + self._predictors = OrderedDict() # ticker -> PricePredictor (LRU 순서) + print(f"[ModelRegistry] Initialized (max_models={max_models})") + + def get_predictor(self, ticker): + """종목별 PricePredictor 반환 (없으면 생성, LRU 관리)""" + if ticker in self._predictors: + # LRU: 접근 시 맨 뒤로 이동 + self._predictors.move_to_end(ticker) + return self._predictors[ticker] + + # 용량 초과 시 가장 오래된 것 퇴출 + if len(self._predictors) >= self.max_models: + oldest_ticker, oldest_pred = self._predictors.popitem(last=False) + print(f"[ModelRegistry] Evicted {oldest_ticker} (LRU, {len(self._predictors)}/{self.max_models})") + del oldest_pred + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + predictor = PricePredictor() + self._predictors[ticker] = predictor + print(f"[ModelRegistry] Created predictor for {ticker} ({len(self._predictors)}/{self.max_models})") + return predictor + + def has_predictor(self, ticker): + return ticker in self._predictors + + def clear(self): + """모든 모델 해제""" + self._predictors.clear() + if torch.cuda.is_available(): + torch.cuda.empty_cache() diff --git a/modules/analysis/macro.py b/modules/analysis/macro.py index 9ef5c4a..c373b6c 100644 --- a/modules/analysis/macro.py +++ b/modules/analysis/macro.py @@ -22,31 +22,32 @@ class MacroAnalyzer: """ indicators = { "KOSPI": "0001", - "KOSDAQ": "1001" + "KOSDAQ": "1001", + "KOSPI200": "0028", } - + results = {} risk_score = 0 - + print("🌍 [Macro] Fetching market indices via KIS API...") - + for name, code in indicators.items(): data = kis_client.get_current_index(code) time.sleep(0.6) # Rate Limit 방지 (초당 2회 제한) - - if data: - price = data['price'] + + if data and data.get('price', 0) != 0: + results[name] = data + print(f" - {name}: {data['price']} ({data['change']}%)") + + # 리스크 평가 로직 (2% 이상 폭락 장이면 위험) change = data['change'] - results[name] = {"price": price, "change": change} - print(f" - {name}: {price} ({change}%)") - - # 리스크 평가 로직 (단순화: 2% 이상 폭락 장이면 위험) if change <= -2.0: risk_score += 2 # 패닉 상태 elif change <= -1.0: risk_score += 1 # 주의 상태 else: - results[name] = {"price": 0, "change": 0} + results[name] = {"price": 0, "change": 0, "high": 0, "low": 0, + "prev_close": 0, "volume": 0, "trade_value": 0} # [신규] 시장 스트레스 지수(MSI) 추가 time.sleep(0.6) diff --git a/modules/config.py b/modules/config.py index 103307f..8c57bbf 100644 --- a/modules/config.py +++ b/modules/config.py @@ -12,7 +12,12 @@ class Config: # 2. NAS 및 AI 서버 NAS_API_URL = os.getenv("NAS_API_URL", "http://192.168.45.54:18500") OLLAMA_API_URL = os.getenv("OLLAMA_API_URL", "http://localhost:11434") - OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b-instruct-q8_0") + # [최적화] qwen2.5:7b-instruct-q4_K_M: JSON 정확도↑, 속도↑, VRAM 4GB + # 14B 원하면: qwen2.5:14b-instruct-q4_K_M (VRAM ~9GB, 품질 더 좋음) + OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:7b-instruct-q4_K_M") + OLLAMA_NUM_CTX = int(os.getenv("OLLAMA_NUM_CTX", "4096")) # 8192→4096 (2배 속도) + OLLAMA_NUM_PREDICT = int(os.getenv("OLLAMA_NUM_PREDICT", "200")) # 응답 토큰 제한 + OLLAMA_NUM_THREAD = int(os.getenv("OLLAMA_NUM_THREAD", "8")) # CPU 스레드 (9800X3D 최적화) # 3. KIS 한국투자증권 KIS_ENV_TYPE = os.getenv("KIS_ENV_TYPE", "virtual").lower() @@ -53,7 +58,7 @@ class Config: # 7. IPC 설정 SHM_NAME = "web_ai_bot_ipc" SHM_SIZE = 131072 # 128KB - IPC_STALENESS = 120 # 120초 (메인 봇 사이클 60초 + 여유) + IPC_STALENESS = 600 # 600초 (LSTM 분석 사이클이 길어도 portfolio 명령어 정상 작동) # 8. GPU 설정 VRAM_WARNING_THRESHOLD = 12.0 # GB (14 → 12로 조기 경고) @@ -65,6 +70,16 @@ class Config: # 10. 타임아웃 등 HTTP_TIMEOUT = 10 + # 11. LSTM 학습 최적화 + # 동일 종목을 이 시간(초) 내에 재학습하지 않음 → CPU/GPU 절약 + LSTM_COOLDOWN = int(os.getenv("LSTM_COOLDOWN", "1200")) # 20분 + # 체크포인트가 있을 때 빠른 재학습 에포크 수 (기존 50 → 30) + LSTM_FAST_EPOCHS = int(os.getenv("LSTM_FAST_EPOCHS", "30")) + + # 12. CPU 서킷 브레이커 + CPU_CIRCUIT_BREAKER_THRESHOLD = 92 # CPU% 이상 시 분석 스킵 + CPU_CIRCUIT_BREAKER_CONSECUTIVE = 2 # 연속 N회 초과 시 발동 + @staticmethod def validate(): """필수 설정 검증""" diff --git a/modules/services/kis.py b/modules/services/kis.py index 05fdfac..e4faa6c 100644 --- a/modules/services/kis.py +++ b/modules/services/kis.py @@ -271,38 +271,48 @@ class KISClient: except Exception as e: return {"error": str(e)} - def order(self, ticker, qty, buy_sell, price=0): - """주문 (시장가) + def order(self, ticker, qty, buy_sell, price=0, order_type="market"): + """주문 buy_sell: 'BUY' or 'SELL' + order_type: 'market'(시장가), 'limit'(지정가), 'conditional'(조건부지정가) + price: 지정가일 때 주문 가격 (market이면 무시) """ self._throttle() self.ensure_token() - + # 모의투자/실전 TR ID 구분 - # 매수: VTTC0802U / TTTC0802U - # 매도: VTTC0801U / TTTC0801U if buy_sell == 'BUY': tr_id = "VTTC0802U" if self.is_virtual else "TTTC0802U" else: tr_id = "VTTC0801U" if self.is_virtual else "TTTC0801U" - + + # 주문 구분 코드 + # 00: 지정가, 01: 시장가, 03: 최유리지정가, 05: 장전시간외, 06: 장후시간외 + if order_type == "limit" and price > 0: + ord_dvsn = "00" + ord_unpr = str(int(price)) + order_type_str = f"지정가({price:,.0f})" + elif order_type == "conditional" and price > 0: + ord_dvsn = "03" # 최유리지정가 + ord_unpr = str(int(price)) + order_type_str = f"조건부({price:,.0f})" + else: + ord_dvsn = "01" # 시장가 + ord_unpr = "0" + order_type_str = "시장가" + url = f"{self.base_url}/uapi/domestic-stock/v1/trading/order-cash" - - # 주문 파라미터 + datas = { "CANO": self.cano, "ACNT_PRDT_CD": self.acnt_prdt_cd, "PDNO": ticker, - "ORD_DVSN": "01", # 01: 시장가 + "ORD_DVSN": ord_dvsn, "ORD_QTY": str(qty), - "ORD_UNPR": "0" # 시장가는 0 + "ORD_UNPR": ord_unpr } - - # 헤더 준비 + headers = self._get_headers(tr_id=tr_id) - - # [중요] POST 요청(주문 등) 시 Hash Key 필수 - # 단, 모의투자의 경우 일부 상황에서 생략 가능할 수 있으나, 정석대로 포함 hash_key = self.get_hash_key(datas) if hash_key: headers["hashkey"] = hash_key @@ -310,17 +320,17 @@ class KISClient: print("⚠️ [KIS] Hash Key 생성 실패 (주문 전송 시도)") try: - print(f"📤 [KIS] 주문 전송: {buy_sell} {ticker} {qty}ea (시장가)") + print(f"📤 [KIS] 주문 전송: {buy_sell} {ticker} {qty}ea ({order_type_str})") res = requests.post(url, headers=headers, json=datas) res.raise_for_status() data = res.json() - + print(f"📥 [KIS] 주문 응답 코드(rt_cd): {data['rt_cd']}") print(f"📥 [KIS] 주문 응답 메시지(msg1): {data['msg1']}") - + if data['rt_cd'] != '0': return {"status": False, "msg": data['msg1'], "rt_cd": data['rt_cd']} - + return {"status": True, "msg": "주문 전송 완료", "order_no": data['output']['ODNO'], "rt_cd": data['rt_cd']} except Exception as e: return {"status": False, "msg": str(e), "rt_cd": "EXCEPTION"} @@ -348,34 +358,188 @@ class KISClient: print(f"❌ 현재가 조회 실패: {e}") return None - def get_daily_price(self, ticker, period="D"): - """일별 시세 조회 (기술적 분석용)""" + def _get_daily_ohlcv_by_range(self, ticker, period="D", count=100): + """기간별시세 API (FHKST03010100) - OHLCV 전체 반환 + output2에서 stck_oprc, stck_hgpr, stck_lwpr, stck_clpr, acml_vol 파싱 + """ + self._throttle() + self.ensure_token() + + end_date = datetime.now().strftime("%Y%m%d") + start_date = (datetime.now() - timedelta(days=int(count * 1.6))).strftime("%Y%m%d") + + url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" + headers = self._get_headers(tr_id="FHKST03010100") + + params = { + "FID_COND_MRKT_DIV_CODE": "J", + "FID_INPUT_ISCD": ticker, + "FID_INPUT_DATE_1": start_date, + "FID_INPUT_DATE_2": end_date, + "FID_PERIOD_DIV_CODE": period, + "FID_ORG_ADJ_PRC": "1" + } + + try: + res = requests.get(url, headers=headers, params=params, + timeout=Config.HTTP_TIMEOUT) + res.raise_for_status() + data = res.json() + + if data.get('rt_cd') != '0': + return None + + output = data.get('output2', []) + if not output: + return None + + opens, highs, lows, closes, volumes = [], [], [], [], [] + for item in output: + try: + c = int(item.get('stck_clpr', 0) or 0) + o = int(item.get('stck_oprc', 0) or 0) + h = int(item.get('stck_hgpr', 0) or 0) + l = int(item.get('stck_lwpr', 0) or 0) + v = int(item.get('acml_vol', 0) or 0) + if c > 0: + opens.append(o if o > 0 else c) + highs.append(h if h > 0 else c) + lows.append(l if l > 0 else c) + closes.append(c) + volumes.append(v) + except (ValueError, TypeError): + pass + + if not closes: + return None + + # API는 최신순 → 과거→현재 순으로 변환 + opens.reverse(); highs.reverse(); lows.reverse() + closes.reverse(); volumes.reverse() + + result = { + 'open': opens[-count:], + 'high': highs[-count:], + 'low': lows[-count:], + 'close': closes[-count:], + 'volume': volumes[-count:] + } + print(f"[KIS] {ticker} OHLCV: {len(result['close'])}개 ({start_date}~{end_date})") + return result + + except Exception as e: + print(f"⚠️ [KIS] OHLCV 조회 실패 ({ticker}): {e}") + return None + + def get_daily_ohlcv(self, ticker, period="D", count=100): + """일별 OHLCV 시세 조회 (기술적 분석 + LSTM 7차원 입력용) + 1차: 기간별시세 API OHLCV 파싱 (100일) + 2차: 기존 close-only fallback + """ + ohlcv = self._get_daily_ohlcv_by_range(ticker, period, count) + if ohlcv and len(ohlcv['close']) >= 30: + return ohlcv + + # fallback: close만 반환 (가짜 OHLCV) + print(f"[KIS] {ticker} OHLCV 실패 → close-only fallback") + prices = self._get_daily_price_by_range(ticker, period, count) + if not prices: + return None + return { + 'open': prices, 'high': prices, 'low': prices, + 'close': prices, 'volume': [] + } + + def _get_daily_price_by_range(self, ticker, period="D", count=100): + """기간별시세 API (FHKST03010100) - 날짜 범위로 최대 100일 데이터 반환 + inquire-daily-price(FHKST01010400)가 30일만 반환하는 한계 극복""" + self._throttle() + self.ensure_token() + + end_date = datetime.now().strftime("%Y%m%d") + # 영업일 count개 확보를 위해 역일 1.6배 요청 (주말/공휴일 여유) + start_date = (datetime.now() - timedelta(days=int(count * 1.6))).strftime("%Y%m%d") + + url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" + headers = self._get_headers(tr_id="FHKST03010100") + + params = { + "FID_COND_MRKT_DIV_CODE": "J", + "FID_INPUT_ISCD": ticker, + "FID_INPUT_DATE_1": start_date, + "FID_INPUT_DATE_2": end_date, + "FID_PERIOD_DIV_CODE": period, + "FID_ORG_ADJ_PRC": "1" + } + + try: + res = requests.get(url, headers=headers, params=params, + timeout=Config.HTTP_TIMEOUT) + res.raise_for_status() + data = res.json() + + if data.get('rt_cd') != '0': + return [] + + # 기간별시세는 output2에 배열로 반환 + output = data.get('output2', []) + if not output: + return [] + + prices = [] + for item in output: + clpr = item.get('stck_clpr', '') + if clpr and clpr != '0': + try: + prices.append(int(clpr)) + except ValueError: + pass + + prices.reverse() # API는 최신순 → 과거→현재 순으로 변환 + result = prices[-count:] + print(f"[KIS] {ticker} 기간별시세: {len(result)}개 " + f"({start_date}~{end_date})") + return result + + except Exception as e: + print(f"⚠️ [KIS] 기간별시세 조회 실패 ({ticker}): {e}") + return [] + + def get_daily_price(self, ticker, period="D", count=100): + """일별 시세 조회 (기술적 분석 + LSTM용) + 1차: 기간별시세 API (100일, LSTM 학습 가능) + 2차: 구형 API fallback (30일) + """ + # 1차: 기간별시세 API (FHKST03010100) - 100일 + prices = self._get_daily_price_by_range(ticker, period, count) + if prices and len(prices) >= 30: + return prices + + # 2차: 구형 API fallback (FHKST01010400) - 30일 + print(f"[KIS] {ticker} 기간별시세 실패 → 구형 API(30일) fallback") self._throttle() self.ensure_token() url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-price" headers = self._get_headers(tr_id="FHKST01010400") - params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker, "FID_PERIOD_DIV_CODE": period, - "FID_ORG_ADJ_PRC": "1" # 수정주가 + "FID_ORG_ADJ_PRC": "1" } - try: - res = requests.get(url, headers=headers, params=params) + res = requests.get(url, headers=headers, params=params, + timeout=Config.HTTP_TIMEOUT) res.raise_for_status() data = res.json() - if data['rt_cd'] != '0': + if data.get('rt_cd') != '0': return [] - - # 과거 데이터부터 오도록 정렬 필요할 수 있음 (API는 최신순) - # output 리스트: [ {stck_clpr: 종가, ...}, ... ] - prices = [int(item['stck_clpr']) for item in data['output']] - prices.reverse() # 과거 -> 현재 순으로 정렬 + prices = [int(item['stck_clpr']) for item in data['output'] + if item.get('stck_clpr')] + prices.reverse() return prices except Exception as e: - print(f"❌ 일별 시세 조회 실패: {e}") + print(f"❌ 일별 시세 조회 실패 ({ticker}): {e}") return [] def get_volume_rank(self, limit=5): @@ -437,9 +601,18 @@ class KISClient: data = self._request_api("GET", endpoint, "FHKUP03500100", params=params) if data['rt_cd'] != '0': return None + o = data['output'] + def _f(val): return float(val) if val else 0.0 + def _i(val): return int(float(val)) if val else 0 return { - "price": float(data['output']['bstp_nmix_prpr']), # 현재지수 - "change": float(data['output']['bstp_nmix_prdy_ctrt']) # 등락률(%) + "price": _f(o.get('bstp_nmix_prpr')), # 현재지수 + "change": _f(o.get('bstp_nmix_prdy_ctrt')), # 등락률(%) + "change_val": _f(o.get('bstp_nmix_prdy_vrss')), # 전일 대비 포인트 + "high": _f(o.get('bstp_nmix_hgpr')), # 장중 고가 + "low": _f(o.get('bstp_nmix_lwpr')), # 장중 저가 + "prev_close": _f(o.get('prdy_nmix')), # 전일 종가 + "volume": _i(o.get('acml_vol')), # 누적 거래량(천주) + "trade_value": _i(o.get('acml_tr_pbmn')), # 누적 거래대금(백만원) } except Exception as e: print(f"❌ 지수 조회 실패({ticker}): {e}") @@ -533,9 +706,8 @@ class KISAsyncClient: return None async def get_daily_price_async(self, ticker): - """비동기 일별 시세 조회""" + """비동기 일별 시세 조회 (close only, 하위 호환)""" import aiohttp - import asyncio self.sync.ensure_token() url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-price" @@ -555,6 +727,52 @@ class KISAsyncClient: return prices return [] + async def get_daily_ohlcv_async(self, ticker, count=100): + """비동기 OHLCV 조회 (기간별시세 API 사용)""" + import aiohttp + from datetime import datetime, timedelta + + self.sync.ensure_token() + end_date = datetime.now().strftime("%Y%m%d") + start_date = (datetime.now() - timedelta(days=int(count * 1.6))).strftime("%Y%m%d") + + url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" + headers = self.sync._get_headers(tr_id="FHKST03010100") + params = { + "FID_COND_MRKT_DIV_CODE": "J", + "FID_INPUT_ISCD": ticker, + "FID_INPUT_DATE_1": start_date, + "FID_INPUT_DATE_2": end_date, + "FID_PERIOD_DIV_CODE": "D", + "FID_ORG_ADJ_PRC": "1" + } + + async with aiohttp.ClientSession() as session: + data = await self._async_get(session, url, headers, params) + if data and data.get('rt_cd') == '0': + output = data.get('output2', []) + opens, highs, lows, closes, volumes = [], [], [], [], [] + for item in output: + try: + c = int(item.get('stck_clpr', 0) or 0) + if c > 0: + opens.append(int(item.get('stck_oprc', 0) or c)) + highs.append(int(item.get('stck_hgpr', 0) or c)) + lows.append(int(item.get('stck_lwpr', 0) or c)) + closes.append(c) + volumes.append(int(item.get('acml_vol', 0) or 0)) + except (ValueError, TypeError): + pass + if closes: + opens.reverse(); highs.reverse(); lows.reverse() + closes.reverse(); volumes.reverse() + return { + 'open': opens[-count:], 'high': highs[-count:], + 'low': lows[-count:], 'close': closes[-count:], + 'volume': volumes[-count:] + } + return None + async def get_investor_trend_async(self, ticker): """비동기 투자자 동향 조회""" import aiohttp @@ -582,7 +800,7 @@ class KISAsyncClient: return None async def get_daily_prices_batch(self, tickers): - """여러 종목의 일별 시세를 병렬로 조회""" + """여러 종목의 일별 시세(close only)를 병렬로 조회 (하위 호환)""" import aiohttp import asyncio @@ -592,7 +810,6 @@ class KISAsyncClient: async with aiohttp.ClientSession() as session: tasks = [] for i, ticker in enumerate(tickers): - # rate limit: 0.5초 간격으로 요청 생성 if i > 0: await asyncio.sleep(self.min_interval) @@ -617,6 +834,67 @@ class KISAsyncClient: return results + async def get_daily_ohlcv_batch(self, tickers, count=100): + """여러 종목의 OHLCV를 병렬로 조회 (기간별시세 API)""" + import aiohttp + import asyncio + from datetime import datetime, timedelta + + self.sync.ensure_token() + results = {} + + end_date = datetime.now().strftime("%Y%m%d") + start_date = (datetime.now() - timedelta(days=int(count * 1.6))).strftime("%Y%m%d") + + async with aiohttp.ClientSession() as session: + tasks = [] + for i, ticker in enumerate(tickers): + if i > 0: + await asyncio.sleep(self.min_interval) + + url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice" + headers = self.sync._get_headers(tr_id="FHKST03010100") + params = { + "FID_COND_MRKT_DIV_CODE": "J", + "FID_INPUT_ISCD": ticker, + "FID_INPUT_DATE_1": start_date, + "FID_INPUT_DATE_2": end_date, + "FID_PERIOD_DIV_CODE": "D", + "FID_ORG_ADJ_PRC": "1" + } + tasks.append((ticker, self._async_get(session, url, headers, params))) + + for ticker, task in tasks: + data = await task + if data and data.get('rt_cd') == '0': + output = data.get('output2', []) + opens, highs, lows, closes, volumes = [], [], [], [], [] + for item in output: + try: + c = int(item.get('stck_clpr', 0) or 0) + if c > 0: + opens.append(int(item.get('stck_oprc', 0) or c)) + highs.append(int(item.get('stck_hgpr', 0) or c)) + lows.append(int(item.get('stck_lwpr', 0) or c)) + closes.append(c) + volumes.append(int(item.get('acml_vol', 0) or 0)) + except (ValueError, TypeError): + pass + if closes: + opens.reverse(); highs.reverse(); lows.reverse() + closes.reverse(); volumes.reverse() + results[ticker] = { + 'open': opens[-count:], 'high': highs[-count:], + 'low': lows[-count:], 'close': closes[-count:], + 'volume': volumes[-count:] + } + else: + results[ticker] = None + else: + results[ticker] = None + + return results + async def get_investor_trends_batch(self, tickers): """여러 종목의 투자자 동향을 병렬로 조회""" import aiohttp diff --git a/modules/services/news.py b/modules/services/news.py index 2cf6de4..c638f99 100644 --- a/modules/services/news.py +++ b/modules/services/news.py @@ -28,6 +28,7 @@ class AsyncNewsCollector: self._cache = None self._cache_time = 0 self._cache_ttl = 300 # 5분 + self._stock_cache = {} # {stock_name: (items, timestamp)} def get_market_news(self, query="주식 시장"): """동기 인터페이스 (하위 호환)""" @@ -62,11 +63,43 @@ class AsyncNewsCollector: self._cache_time = now return items except ImportError: - # aiohttp 미설치 시 동기 fallback return self.get_market_news(query) except Exception as e: print(f"[News Async] Collection failed: {e}") - # 캐시가 있으면 반환, 없으면 동기 fallback if self._cache: return self._cache return self.get_market_news(query) + + async def get_stock_news_async(self, stock_name, max_items=3): + """종목별 뉴스 수집 (5분 캐싱) + stock_name: 종목 이름 (e.g. '삼성전자', 'SK하이닉스') + """ + now = time.time() + cached = self._stock_cache.get(stock_name) + if cached and (now - cached[1]) < self._cache_ttl: + return cached[0] + + try: + import aiohttp + import urllib.parse + query = urllib.parse.quote(f"{stock_name} 주가") + url = f"https://news.google.com/rss/search?q={query}&hl=ko&gl=KR&ceid=KR:ko" + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp: + content = await resp.read() + root = ET.fromstring(content) + items = [] + for item in root.findall(".//item")[:max_items]: + title_el = item.find("title") + if title_el is not None and title_el.text: + items.append({"title": title_el.text, "source": "Google News"}) + + self._stock_cache[stock_name] = (items, now) + return items + except Exception as e: + print(f"[News] 종목 뉴스 수집 실패 ({stock_name}): {e}") + return [] + + def clear_stock_cache(self): + """종목 뉴스 캐시 전체 초기화""" + self._stock_cache.clear() diff --git a/modules/services/ollama.py b/modules/services/ollama.py index fe4fded..af6b3bd 100644 --- a/modules/services/ollama.py +++ b/modules/services/ollama.py @@ -113,20 +113,24 @@ class OllamaManager: "model": self.model_name, "prompt": prompt, "stream": False, - "format": "json", # JSON 강제 + "format": "json", "options": { - "num_ctx": 8192, # [5070Ti 최적화] 컨텍스트 크기 2배 증가 (4096 -> 8192) - "temperature": 0.2, # 분석 일관성 유지 - "num_gpu": 1, # GPU 사용 명시 - "num_thread": 8 # CPU 스레드 수 (9800X3D 활용) + "num_ctx": Config.OLLAMA_NUM_CTX, # 4096 (속도 2배) + "num_predict": Config.OLLAMA_NUM_PREDICT, # 응답 토큰 제한 + "temperature": 0.1, # 더 결정론적 (JSON 파싱 안정성) + "num_gpu": 1, + "num_thread": Config.OLLAMA_NUM_THREAD # Config 설정값 (기본 8) }, - "keep_alive": "10m" # [5070Ti 최적화] 10분간 유지 (메모리 여유 있음) + "keep_alive": "5m" # 5분 유지 (불필요한 VRAM 점유 줄임) } - + try: - response = requests.post(self.generate_url, json=payload, timeout=180) # 타임아웃 증가 + response = requests.post(self.generate_url, json=payload, timeout=90) # 180→90초 response.raise_for_status() return response.json().get('response') + except requests.exceptions.Timeout: + print(f"❌ Inference Timeout (90s): {self.model_name}") + return None except Exception as e: print(f"❌ Inference Error: {e}") return None diff --git a/modules/services/telegram.py b/modules/services/telegram.py index 732b6bf..b459f1a 100644 --- a/modules/services/telegram.py +++ b/modules/services/telegram.py @@ -23,7 +23,7 @@ class TelegramMessenger: payload = { "chat_id": self.chat_id, "text": message, - "parse_mode": "Markdown" + "parse_mode": "HTML" } try: requests.post(url, json=payload, timeout=5) diff --git a/modules/services/telegram_bot/runner.py b/modules/services/telegram_bot/runner.py index 21c1bbd..ddd8ab3 100644 --- a/modules/services/telegram_bot/runner.py +++ b/modules/services/telegram_bot/runner.py @@ -30,6 +30,9 @@ def run_telegram_bot_standalone(ipc_lock=None, command_queue=None, shutdown_even # IPC 초기화 (shared memory + command queue) ipc = SharedIPC(lock=ipc_lock, command_queue=command_queue) + conflict_retries = 0 + MAX_CONFLICT_RETRIES = 10 + while True: # shutdown 체크 if shutdown_event and shutdown_event.is_set(): @@ -51,6 +54,7 @@ def run_telegram_bot_standalone(ipc_lock=None, command_queue=None, shutdown_even if bot_server.should_restart: print("[Telegram Bot] Restarting instance...") + conflict_retries = 0 # 정상 재시작 시 카운터 리셋 time.sleep(1) continue else: @@ -61,11 +65,21 @@ def run_telegram_bot_standalone(ipc_lock=None, command_queue=None, shutdown_even print("[Telegram Bot] Stopped by user") break except Exception as e: - if "Conflict" not in str(e): + if "Conflict" in str(e): + conflict_retries += 1 + if conflict_retries >= MAX_CONFLICT_RETRIES: + print(f"[Telegram Bot] Conflict max retries ({MAX_CONFLICT_RETRIES}) reached. Exiting.") + break + wait_secs = min(5 * conflict_retries, 30) + print(f"[Telegram Bot] Conflict detected. Waiting {wait_secs}s before retry " + f"({conflict_retries}/{MAX_CONFLICT_RETRIES})...") + time.sleep(wait_secs) + continue + else: print(f"[Telegram Bot] Error: {e}") import traceback traceback.print_exc() - break + break # 정리 ipc.cleanup() diff --git a/modules/strategy/process.py b/modules/strategy/process.py index d72947e..217944a 100644 --- a/modules/strategy/process.py +++ b/modules/strategy/process.py @@ -3,20 +3,25 @@ import json import numpy as np from modules.services.ollama import OllamaManager from modules.analysis.technical import TechnicalAnalyzer -from modules.analysis.deep_learning import PricePredictor +from modules.analysis.deep_learning import ModelRegistry -# [최적화] 워커 프로세스별 전역 변수 (LSTM 모델 캐싱) -_lstm_predictor = None +# [최적화] 워커 프로세스별 전역 변수 (Ollama 캐싱) +_ollama_manager = None -def get_predictor(): - """워커 프로세스 내에서 PricePredictor 인스턴스를 싱글톤으로 관리""" - global _lstm_predictor - if _lstm_predictor is None: - print(f"[Worker {os.getpid()}] Initializing LSTM Predictor...") - _lstm_predictor = PricePredictor() - print(f"[Worker {os.getpid()}] LSTM Device: {_lstm_predictor.device}" - f" | AMP: {_lstm_predictor.use_amp}") - return _lstm_predictor + +def get_predictor(ticker=None): + """워커 프로세스 내에서 ModelRegistry로 종목별 PricePredictor 관리""" + registry = ModelRegistry.get_instance() + return registry.get_predictor(ticker or "default") + + +def get_ollama(): + """워커 프로세스 내에서 OllamaManager 인스턴스를 싱글톤으로 관리 + - 종목마다 새 인스턴스를 만들면 Ollama에 동시 요청이 폭주해 데드락 발생""" + global _ollama_manager + if _ollama_manager is None: + _ollama_manager = OllamaManager() + return _ollama_manager def calculate_position_size(total_capital, current_price, volatility, score, ai_confidence, @@ -40,9 +45,8 @@ def calculate_position_size(total_capital, current_price, volatility, score, ai_ base_invest = total_capital * 0.10 # 2. 변동성 조절 계수 (변동성 높을수록 투자금 감소) - # 변동성 1% → 1.0배, 2% → 0.75배, 3% → 0.5배, 5%+ → 0.3배 if volatility <= 1.0: - vol_factor = 1.2 # 안정적 종목은 약간 증가 + vol_factor = 1.2 elif volatility <= 2.0: vol_factor = 1.0 elif volatility <= 3.0: @@ -50,10 +54,9 @@ def calculate_position_size(total_capital, current_price, volatility, score, ai_ elif volatility <= 5.0: vol_factor = 0.45 else: - vol_factor = 0.3 # 고변동 종목 + vol_factor = 0.3 - # 3. 확신도 조절 계수 (score가 높을수록 투자금 증가) - # score 0.6 → 0.5배, 0.7 → 1.0배, 0.8 → 1.5배, 0.9+ → 2.0배 + # 3. 확신도 조절 계수 if score >= 0.85: conf_factor = 2.0 elif score >= 0.75: @@ -73,45 +76,85 @@ def calculate_position_size(total_capital, current_price, volatility, score, ai_ # 5. 최종 투자금 계산 invest_amount = base_invest * vol_factor * conf_factor * ai_bonus - # 상한 제한 invest_amount = min(invest_amount, max_per_stock) - invest_amount = min(invest_amount, total_capital * 0.15) # 최대 15% - invest_amount = min(invest_amount, total_capital) # 잔고 초과 방지 + invest_amount = min(invest_amount, total_capital * 0.15) + invest_amount = min(invest_amount, total_capital) - # 수량 계산 qty = int(invest_amount / current_price) return max(0, qty) -def analyze_stock_process(ticker, prices, news_items, investor_trend=None, +def analyze_stock_process(ticker, ohlcv_data, news_items, investor_trend=None, macro_status=None, holding_info=None): """ - [v2.0] 종목 분석 + 매매 판단 (ProcessPoolExecutor에서 실행) + [v3.0] 종목 분석 + 매매 판단 (ProcessPoolExecutor에서 실행) - [v2.0 개선사항] - 1. ATR 기반 동적 손절/익절 + 트레일링 스탑 - 2. 포지션 사이징 (변동성 + 확신도 기반) - 3. 시장상황별 동적 매수/매도 임계값 - 4. 보유종목에 대한 분석 기반 매도 판단 - 5. ADX/OBV/MTF 통합 기술적 분석 - 6. 강화된 AI 프롬프트 (종목 고유 뉴스 분석) + [v3.0 개선사항] + 1. OHLCV 전체 수신 (실제 고가/저가/거래량 사용) + 2. 종목별 ModelRegistry (가중치 덮어쓰기 방지) + 3. 강화된 LLM 프롬프트 (거시경제 상태, 볼린저밴드, 거래량 급증, 보유 수익률) """ try: - print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles)...") + # OHLCV 데이터 분리 (하위호환: list 형태도 허용) + if isinstance(ohlcv_data, dict): + prices = ohlcv_data.get('close', []) + high_prices = ohlcv_data.get('high') or None + low_prices = ohlcv_data.get('low') or None + volume_history = ohlcv_data.get('volume') or None + open_prices = ohlcv_data.get('open') or None + else: + # 하위 호환: 기존 close 리스트 + prices = ohlcv_data if isinstance(ohlcv_data, list) else [] + high_prices = None + low_prices = None + volume_history = None + open_prices = None + + # volume이 모두 0이거나 비어있으면 None 처리 + if volume_history and all(v == 0 for v in volume_history): + volume_history = None + + print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles, " + f"OHLCV={'yes' if high_prices else 'close-only'}, " + f"Vol={'yes' if volume_history else 'no'})...") # ===== 1. 기술적 지표 계산 ===== current_price = prices[-1] if prices else 0 tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score( - current_price, prices, volume_history=None) + current_price, prices, volume_history=volume_history) - # ===== 2. ATR 기반 동적 손절/익절 ===== - sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp(prices) + # ===== 2. ATR 기반 동적 손절/익절 (실제 고가/저가 사용) ===== + sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp( + prices, high_prices=high_prices, low_prices=low_prices) - # ===== 3. LSTM 주가 예측 ===== - lstm_predictor = get_predictor() + # ===== 3. 볼린저밴드 위치 계산 ===== + bb_upper, bb_mid, bb_lower = TechnicalAnalyzer.calculate_bollinger_bands(prices) + if bb_upper > bb_lower: + bb_pos = (current_price - bb_lower) / (bb_upper - bb_lower) # 0=하단, 1=상단 + if bb_pos <= 0.2: + bb_zone = "하단(과매도)" + elif bb_pos >= 0.8: + bb_zone = "상단(과매수)" + else: + bb_zone = f"중간({bb_pos:.0%})" + else: + bb_pos = 0.5 + bb_zone = "중간" + + # ===== 4. LSTM 주가 예측 (ModelRegistry 사용) ===== + lstm_predictor = get_predictor(ticker) if lstm_predictor: lstm_predictor.training_status['current_ticker'] = ticker - pred_result = lstm_predictor.train_and_predict(prices, ticker=ticker) + + # LSTM에 전달할 OHLCV 딕셔너리 구성 + lstm_ohlcv = { + 'close': prices, + 'open': open_prices or prices, + 'high': high_prices or prices, + 'low': low_prices or prices, + 'volume': volume_history or [] + } + pred_result = lstm_predictor.train_and_predict(lstm_ohlcv, ticker=ticker) lstm_score = 0.5 ai_confidence = 0.5 @@ -130,7 +173,7 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None, lstm_score = max(0.0, min(1.0, lstm_score)) - # ===== 4. 수급 분석 (외인/기관) ===== + # ===== 5. 수급 분석 (외인/기관) ===== investor_score = 0.0 frgn_net_buy = 0 orgn_net_buy = 0 @@ -146,26 +189,23 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None, if day['institutional'] > 0: consecutive_orgn_buy += 1 - # 외인 수급 점수 (강화) if frgn_net_buy > 0: investor_score += 0.03 if consecutive_frgn_buy >= 3: investor_score += 0.04 if consecutive_frgn_buy >= 5: - investor_score += 0.03 # 5일 연속 매수 = 추가 보너스 + investor_score += 0.03 - # 기관 수급 점수 (신규) if orgn_net_buy > 0: investor_score += 0.02 if consecutive_orgn_buy >= 3: investor_score += 0.03 - # 외인+기관 동시 순매수 = 강력 신호 if frgn_net_buy > 0 and orgn_net_buy > 0: investor_score += 0.03 print(f" 💰 [Investor] Both Foreign & Institutional Buying!") - # ===== 5. AI 뉴스 분석 (강화된 프롬프트) ===== + # ===== 6. AI 뉴스 분석 (강화된 프롬프트) ===== if pred_result: pred_price = pred_result.get('predicted', 0) pred_change = pred_result.get('change_rate', 0) @@ -173,65 +213,59 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None, pred_price = current_price pred_change = 0.0 - ollama = OllamaManager() - prompt = f""" -[System Instruction] -1. Role: You are a legendary quant trader with 30 years of experience in Korean stock market. -2. You MUST analyze the data objectively and respond with a JSON object. + news_summary = "; ".join( + [n.get('title', '') for n in (news_items or [])[:3] if n.get('title')] + ) or "뉴스 없음" -[Market Data for Stock {ticker}] -- Current Price: {current_price:,.0f} KRW -- Technical Score: {tech_score:.3f} (RSI: {rsi:.1f}) -- Moving Average: {ma_info['trend']} (Price is {ma_info['position']}) -- ADX Trend Strength: {ma_info.get('adx', 20):.1f} ({ma_info.get('adx_trend', 'N/A')}) -- Multi-Timeframe: {ma_info.get('mtf_alignment', 'N/A')} -- AI Prediction: {pred_price:.0f} KRW ({pred_change:+.2f}%) -- AI Confidence: {ai_confidence:.2f} (Training Loss: {ai_loss:.4f}) -- Volatility: {volatility:.2f}% -- Volume Ratio: {vol_ratio:.1f}x -- ATR Stop Loss: {sl_tp['stop_loss_pct']:.1f}% / Take Profit: {sl_tp['take_profit_pct']:.1f}% -- Investor Trend (5 Days): Foreigner Net Buy {frgn_net_buy}, Institutional Net Buy {orgn_net_buy} + # 거시경제 상태 + macro_state = macro_status.get('status', 'SAFE') if macro_status else 'SAFE' -[Decision Framework] -- Strong BUY signals: Foreigners+Institutions buying, Golden Cross, ADX>25 with bullish trend, AI high confidence UP -- Moderate BUY: RSI<40 with bullish reversal, Price near Bollinger Lower Band -- SELL signals: RSI>70, Dead Cross, ADX>25 with bearish trend, Foreigners selling -- AVOID/HOLD: ADX<20 (sideways), Mixed signals, Low confidence + # 거래량 급증 여부 + vol_surge = "급증(x{:.1f})".format(vol_ratio) if vol_ratio >= 2.0 else "정상" -[News Data] -{json.dumps(news_items[:5] if news_items else [], ensure_ascii=False)} + # 보유종목 수익률 + holding_yield_str = "" + if holding_info and holding_info.get('qty', 0) > 0: + yld = holding_info.get('yield', 0.0) + holding_yield_str = f" | 보유수익률={yld:+.1f}%" -[Response Format - JSON Only] -{{"sentiment_score": 0.0 to 1.0, "reason": "Brief analysis reason"}} -""" + ollama = get_ollama() + prompt = ( + f"Korean stock analyst. JSON only: {{\"sentiment_score\":0.0-1.0,\"reason\":\"1 sentence\"}}\n" + f"Stock {ticker} ₩{current_price:,.0f}{holding_yield_str}\n" + f"Market={macro_state} | " + f"Tech={tech_score:.2f} RSI={rsi:.1f} MA={ma_info['trend']} ADX={ma_info.get('adx',20):.0f} " + f"MTF={ma_info.get('mtf_alignment','N/A')}\n" + f"BB={bb_zone} | AI={pred_change:+.2f}% conf={ai_confidence:.0%} | " + f"Vol={volatility:.1f}% VolRatio={vol_surge}\n" + f"Flow: Frgn={frgn_net_buy:+,}({consecutive_frgn_buy}d) " + f"Inst={orgn_net_buy:+,}({consecutive_orgn_buy}d)\n" + f"News: {news_summary}" + ) ai_resp = ollama.request_inference(prompt) sentiment_score = 0.5 ai_reason = "" try: data = json.loads(ai_resp) sentiment_score = float(data.get("sentiment_score", 0.5)) - sentiment_score = max(0.0, min(1.0, sentiment_score)) # 범위 강제 + sentiment_score = max(0.0, min(1.0, sentiment_score)) ai_reason = data.get("reason", "") except Exception: print(f" ⚠️ AI response parse failed, using neutral (0.5)") - # ===== 6. 통합 점수 (동적 가중치 v2.0) ===== - # ADX가 높으면 (추세가 강하면) LSTM과 기술적 분석 비중 증가 + # ===== 7. 통합 점수 (동적 가중치 v2.0) ===== adx_val = ma_info.get('adx', 20) if ai_confidence >= 0.85 and adx_val >= 25: - # 강한 추세 + 높은 AI 신뢰도: AI 최우선 w_tech, w_news, w_ai = 0.15, 0.15, 0.70 print(f" 🤖 [Ultra High Confidence + Strong Trend] AI Weight 70%") elif ai_confidence >= 0.85: w_tech, w_news, w_ai = 0.20, 0.20, 0.60 print(f" 🤖 [High Confidence] AI Weight 60%") elif adx_val >= 30: - # 매우 강한 추세: 기술적 분석 우선 w_tech, w_news, w_ai = 0.50, 0.20, 0.30 print(f" 📊 [Very Strong Trend ADX={adx_val:.0f}] Tech Weight 50%") elif adx_val < 20: - # 비추세/횡보: 뉴스와 수급 중시 w_tech, w_news, w_ai = 0.30, 0.40, 0.30 print(f" 📰 [Sideways ADX={adx_val:.0f}] News Weight 40%") else: @@ -239,63 +273,53 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None, total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score) - # 수급 가산점 (최대 +0.15) total_score += min(investor_score, 0.15) total_score = min(total_score, 1.0) - # ===== 7. 시장 상황별 동적 임계값 ===== + # ===== 8. 시장 상황별 동적 임계값 ===== buy_threshold = 0.60 sell_threshold = 0.30 if macro_status: - macro_state = macro_status.get('status', 'SAFE') if macro_state == 'DANGER': - buy_threshold = 999.0 # 매수 완전 차단 - sell_threshold = 0.45 # 매도 기준 상향 (빨리 탈출) + buy_threshold = 999.0 + sell_threshold = 0.45 print(f" 🚨 [DANGER Market] Buy BLOCKED, Sell threshold raised to 0.45") elif macro_state == 'CAUTION': - buy_threshold = 0.72 # 매수 기준 대폭 상향 (보수적) - sell_threshold = 0.38 # 매도 기준도 상향 + buy_threshold = 0.72 + sell_threshold = 0.38 print(f" ⚠️ [CAUTION Market] Buy threshold raised to 0.72") - # ===== 8. 매매 결정 ===== + # ===== 9. 매매 결정 ===== decision = "HOLD" decision_reason = "" - # --- 보유 종목 분석 기반 매도 (신규) --- if holding_info: holding_yield = holding_info.get('yield', 0.0) holding_qty = holding_info.get('qty', 0) peak_price = holding_info.get('peak_price', current_price) if holding_qty > 0: - # A. 동적 손절 (ATR 기반) if holding_yield <= sl_tp['stop_loss_pct']: decision = "SELL" decision_reason = f"Dynamic Stop Loss ({holding_yield:.1f}% <= {sl_tp['stop_loss_pct']:.1f}%)" - # B. 동적 익절 (ATR 기반) elif holding_yield >= sl_tp['take_profit_pct']: decision = "SELL" decision_reason = f"Dynamic Take Profit ({holding_yield:.1f}% >= {sl_tp['take_profit_pct']:.1f}%)" - # C. 트레일링 스탑 (최고가 대비 하락) elif peak_price > 0: drop_from_peak = ((current_price - peak_price) / peak_price) * 100 if drop_from_peak <= -sl_tp['trailing_stop_pct'] and holding_yield > 2.0: - # 수익 상태에서만 트레일링 스탑 작동 (2% 이상 수익 확보) decision = "SELL" decision_reason = (f"Trailing Stop ({drop_from_peak:.1f}% from peak, " f"threshold: -{sl_tp['trailing_stop_pct']:.1f}%)") - # D. 분석 기반 매도 (점수가 매도 임계값 이하) if decision == "HOLD" and total_score <= sell_threshold: decision = "SELL" decision_reason = f"Analysis Signal (Score: {total_score:.2f} <= {sell_threshold:.2f})" - # E. 추세 반전 매도 (ADX 강한 하락추세) if decision == "HOLD" and adx_val >= 30: - plus_di = ma_info.get('adx', 0) # 참고용 mtf_align = ma_info.get('mtf_alignment', '') if mtf_align == 'STRONG_BEAR' and holding_yield < 0: decision = "SELL" @@ -303,11 +327,9 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None, # --- 매수 판단 --- if decision == "HOLD": - # 강한 단일 신호 매수 (기준 강화) strong_signal = False strong_reason = "" - # [강화] 복합 조건 매수 (단일 지표가 아닌 복합 조건) if tech_score >= 0.75 and lstm_score >= 0.6 and sentiment_score >= 0.6: strong_signal = True strong_reason = "Triple Confirmation (Tech+AI+News)" @@ -322,7 +344,6 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None, strong_reason = f"Strong Multi-Timeframe Bullish + Tech {tech_score:.2f}" if strong_signal and total_score >= buy_threshold - 0.05: - # 강한 신호는 임계값 약간 완화 허용 decision = "BUY" decision_reason = strong_reason print(f" 🎯 [{strong_reason}] → BUY!") @@ -330,10 +351,9 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None, decision = "BUY" decision_reason = f"Score {total_score:.2f} >= threshold {buy_threshold:.2f}" - # ===== 9. 포지션 사이징 ===== + # ===== 10. 포지션 사이징 ===== suggested_qty = 0 if decision == "BUY": - # 기본 자산 1000만원으로 가정 (실제 run_cycle에서 덮어씀) suggested_qty = calculate_position_size( total_capital=10000000, current_price=current_price, diff --git a/modules/utils/ipc.py b/modules/utils/ipc.py index 294ca58..ed55522 100644 --- a/modules/utils/ipc.py +++ b/modules/utils/ipc.py @@ -59,6 +59,7 @@ class SharedIPC: def read_status(self): """텔레그램 봇이 상태를 shared memory에서 읽기""" + raw = None try: shm = self._ensure_shm() @@ -66,13 +67,15 @@ class SharedIPC: self.lock.acquire() try: length = struct.unpack_from('I', shm.buf, 0)[0] - if length == 0 or length > Config.SHM_SIZE - 4: - return None - raw = bytes(shm.buf[4:4 + length]) + if length > 0 and length <= Config.SHM_SIZE - 4: + raw = bytes(shm.buf[4:4 + length]) finally: if self.lock: self.lock.release() + if not raw: + return None + ipc_data = json.loads(raw.decode('utf-8')) age = time.time() - ipc_data.get('timestamp', 0) diff --git a/modules/utils/monitor.py b/modules/utils/monitor.py index fb724bc..97c9db0 100644 --- a/modules/utils/monitor.py +++ b/modules/utils/monitor.py @@ -1,6 +1,8 @@ import psutil from datetime import datetime +from modules.config import Config + class SystemMonitor: def __init__(self, messenger, ollama_manager): @@ -8,52 +10,101 @@ class SystemMonitor: self.ollama_monitor = ollama_manager self.last_health_check = datetime.now() + # CPU 서킷 브레이커 상태 + self._cpu_overload_count = 0 # 연속 과부하 횟수 + self._circuit_open = False # 서킷 브레이커 발동 여부 + self._circuit_open_since = None + + def is_cpu_critical(self): + """서킷 브레이커가 발동 상태인지 반환 (True이면 분석 사이클 스킵)""" + return self._circuit_open + + def reset_circuit(self): + """서킷 브레이커 수동 리셋""" + if self._circuit_open: + print("[Monitor] CPU Circuit Breaker RESET") + self._circuit_open = False + self._cpu_overload_count = 0 + self._circuit_open_since = None + def check_health(self): """시스템 상태 점검 및 알림 (CPU, RAM, GPU) - 3분마다 실행""" now = datetime.now() - if (now - self.last_health_check).total_seconds() < 180: # 5분 → 3분 + if (now - self.last_health_check).total_seconds() < 180: return self.last_health_check = now alerts = [] - # 1. CPU Check (non-blocking 측정) - cpu_usage = psutil.cpu_percent(interval=0) + # 1. CPU Check + cpu_usage = psutil.cpu_percent(interval=1) # 1초 측정 (더 정확) - if cpu_usage > 90: - # 검증: 상위 프로세스 CPU 합계 확인 + if cpu_usage > Config.CPU_CIRCUIT_BREAKER_THRESHOLD: + self._cpu_overload_count += 1 + + # 상위 프로세스 조회 top_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']): try: - if proc.info['name'] == 'System Idle Process': + if proc.info['name'] in ('System Idle Process', 'Idle'): continue top_processes.append(proc.info) except (psutil.NoSuchProcess, psutil.AccessDenied): pass top_processes.sort(key=lambda x: x['cpu_percent'], reverse=True) - total_top_cpu = sum(p['cpu_percent'] for p in top_processes[:3]) + top_3_str = "" + for p in top_processes[:3]: + top_3_str += f"\n- {p['name']} ({p['cpu_percent']}%)" - if total_top_cpu >= 30.0: - top_3_str = "" - for p in top_processes[:3]: - top_3_str += f"\n- {p['name']} ({p['cpu_percent']}%)" - alerts.append(f"[CPU Overload] Usage: {cpu_usage}%\nTop Processes:{top_3_str}") + # 서킷 브레이커 발동 조건 + if self._cpu_overload_count >= Config.CPU_CIRCUIT_BREAKER_CONSECUTIVE: + if not self._circuit_open: + self._circuit_open = True + self._circuit_open_since = now + alerts.append( + f"🔴 [CPU Circuit Breaker OPEN] {cpu_usage}% × {self._cpu_overload_count}회 연속\n" + f"⛔ 분석 사이클 일시 중단 (5분 후 자동 복구)\nTop Processes:{top_3_str}" + ) + print(f"[Monitor] CPU Circuit Breaker OPEN! CPU={cpu_usage}%") + else: + alerts.append( + f"⚠️ [CPU Overload] Usage: {cpu_usage}% ({self._cpu_overload_count}회)\nTop Processes:{top_3_str}" + ) + else: + # CPU 정상 → 카운터 리셋 + if self._cpu_overload_count > 0: + print(f"[Monitor] CPU 정상화 ({cpu_usage}%). 카운터 리셋.") + self._cpu_overload_count = 0 + + # 서킷 브레이커가 열린 후 5분 경과 시 자동 복구 + if self._circuit_open and self._circuit_open_since: + elapsed = (now - self._circuit_open_since).total_seconds() + if elapsed >= 300: # 5분 + self._circuit_open = False + self._circuit_open_since = None + alerts.append("✅ [CPU Circuit Breaker CLOSED] 시스템 안정화. 분석 재개.") + print("[Monitor] CPU Circuit Breaker CLOSED. 분석 재개.") # 2. RAM Check ram = psutil.virtual_memory() if ram.percent > 90: - alerts.append(f"[RAM High] Usage: {ram.percent}% (Free: {ram.available / 1024**3:.1f}GB)") + alerts.append(f"⚠️ [RAM High] Usage: {ram.percent}% (Free: {ram.available / 1024**3:.1f}GB)") # 3. GPU Check if self.ollama_monitor: gpu_status = self.ollama_monitor.get_gpu_status() temp = gpu_status.get('temp', 0) if temp > 80: - alerts.append(f"[GPU Overheat] Temp: {temp}C") + alerts.append(f"🔥 [GPU Overheat] Temp: {temp}°C") - # 알림 전송 + # 알림 전송 (텔레그램 비활성화 - 콘솔 로그만 사용) if alerts: - msg = "[System Health Alert]\n" + "\n".join(alerts) - if self.messenger: - self.messenger.send_message(msg) + # 콘솔에만 출력 + for alert in alerts: + print(f"[Monitor] {alert}") + + # [비활성화] 텔레그램 알림 - 필요시 재활성화 + # msg = "🔔 [System Health Alert]\n" + "\n".join(alerts) + # if self.messenger: + # self.messenger.send_message(msg)