Compare commits

...

3 Commits

Author SHA1 Message Date
760d1906ed 백테스팅, 앙상블, 워밍업 재시작 스크립트 추가
- analysis/backtest.py: 백테스팅 프레임워크 신규 추가
- analysis/ensemble.py: 적응형 앙상블 가중치 신규 추가
- warmup_and_restart.py: 봇 워밍업 및 재시작 스크립트 신규 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 23:09:32 +09:00
4e77a1acf1 LSTM v3 멀티피처, KIS OHLCV 배치, 동적 전략 강화
- deep_learning.py: INPUT_SIZE=7 (close/open/high/low/volume/rsi/macd),
  feature_scaler/target_scaler 분리, ModelRegistry LRU 종목별 격리 (v3 체크포인트)
- kis.py: get_daily_ohlcv() OHLCV 전체 반환, KISAsyncClient 비동기 배치 조회 추가,
  order() 지정가/조건부 주문 지원
- strategy/process.py: ATR/ADX 기반 동적 손절익절, 트레일링 스탑, 포지션 사이징 강화
- config.py: OLLAMA_NUM_THREAD=8 (9800X3D 최적화), LSTM_COOLDOWN/FAST_EPOCHS 환경변수화
- macro.py: 거시경제 지표 계산 개선
- ollama.py: VRAM 여유량 기반 선택적 언로드
- monitor.py: CPU 서킷 브레이커 연속 횟수 조건 추가
- ipc.py: IPC_STALENESS 600초로 확대
- news.py: 비동기 뉴스 수집 개선
- telegram.py, runner.py: 안정성 개선

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 23:08:33 +09:00
37f6d87bec 매매 성과 평가지표 시스템 구현
- modules/utils/performance_db.py 신규: 일별 자산 스냅샷(16:00~16:30) 및
  매매 기록 영구 저장 (PerformanceDB 클래스)
- modules/analysis/evaluator.py 신규: Sharpe/Sortino/MDD/Alpha 등 16개 지표 산출,
  S~F 등급 시스템, Ollama 5명 전문가 패널, 텔레그램 HTML 주간 보고서 (PerformanceEvaluator 클래스)
- modules/bot.py 수정: BUY/SELL 시 perf_db 기록 강화, 금요일 15:35 주간 평가 자동 실행,
  IPC 'evaluate' 명령 처리
- modules/services/telegram_bot/server.py 수정: /evaluate 명령어 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 23:07:34 +09:00
18 changed files with 2435 additions and 309 deletions

View File

@@ -0,0 +1,255 @@
"""
백테스팅 프레임워크 (Phase 3-1)
- 과거 OHLCV 데이터로 전략 시뮬레이션
- 성과지표: Sharpe ratio, MDD, 승률, 평균손익비
- Phase 2 모델 변경 전후 비교 검증용
"""
import json
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
@dataclass
class Trade:
ticker: str
entry_date: int # 데이터 인덱스
entry_price: float
exit_date: int
exit_price: float
qty: int
direction: str = "LONG" # LONG / SHORT
@property
def pnl(self):
if self.direction == "LONG":
return (self.exit_price - self.entry_price) * self.qty
return (self.entry_price - self.exit_price) * self.qty
@property
def pnl_pct(self):
return (self.exit_price - self.entry_price) / self.entry_price * 100
@dataclass
class BacktestResult:
total_return_pct: float
sharpe_ratio: float
max_drawdown_pct: float
win_rate: float
avg_win_pct: float
avg_loss_pct: float
profit_factor: float
total_trades: int
winning_trades: int
losing_trades: int
trades: List[Trade] = field(default_factory=list)
def summary(self) -> str:
lines = [
"=" * 50,
"📊 백테스팅 결과",
"=" * 50,
f"총 수익률: {self.total_return_pct:+.2f}%",
f"Sharpe Ratio: {self.sharpe_ratio:.3f}",
f"Max Drawdown: {self.max_drawdown_pct:.2f}%",
f"승률: {self.win_rate:.1f}% ({self.winning_trades}/{self.total_trades})",
f"평균 수익: {self.avg_win_pct:+.2f}%",
f"평균 손실: {self.avg_loss_pct:.2f}%",
f"손익비(PF): {self.profit_factor:.2f}",
"=" * 50,
]
return "\n".join(lines)
class Backtester:
"""
OHLCV 기반 전략 백테스터
사용 예시:
bt = Backtester(initial_capital=10_000_000)
result = bt.run(
ohlcv_data={"close": [...], "high": [...], "low": [...], "volume": [...]},
strategy_fn=my_strategy,
ticker="005930"
)
print(result.summary())
"""
def __init__(self, initial_capital: float = 10_000_000,
commission_rate: float = 0.00015, # 0.015% (증권사 기본)
slippage_rate: float = 0.001): # 0.1% 슬리피지
self.initial_capital = initial_capital
self.commission_rate = commission_rate
self.slippage_rate = slippage_rate
def run(self, ohlcv_data: Dict, strategy_fn: Callable,
ticker: str = "UNKNOWN", warmup: int = 60) -> BacktestResult:
"""
단일 종목 백테스팅
Args:
ohlcv_data: {'close':[], 'high':[], 'low':[], 'open':[], 'volume':[]}
strategy_fn: (ohlcv_slice: dict) -> str ("BUY" | "SELL" | "HOLD")
ticker: 종목 코드
warmup: 초기 웜업 기간 (기술지표 안정화)
Returns:
BacktestResult
"""
closes = np.array(ohlcv_data.get('close', []), dtype=float)
highs = np.array(ohlcv_data.get('high', closes), dtype=float)
lows = np.array(ohlcv_data.get('low', closes), dtype=float)
volumes = np.array(ohlcv_data.get('volume', np.zeros_like(closes)), dtype=float)
n = len(closes)
if n < warmup + 10:
return self._empty_result()
capital = self.initial_capital
position = 0 # 보유 수량
entry_price = 0.0
entry_idx = 0
equity_curve = [capital]
trades: List[Trade] = []
for i in range(warmup, n):
# 전략 함수에 현재까지의 슬라이스 전달
slice_data = {
'close': closes[:i+1].tolist(),
'high': highs[:i+1].tolist(),
'low': lows[:i+1].tolist(),
'volume': volumes[:i+1].tolist(),
}
signal = "HOLD"
try:
signal = strategy_fn(slice_data)
except Exception:
pass
price = closes[i]
buy_price = price * (1 + self.slippage_rate) # 슬리피지 포함 매수가
sell_price = price * (1 - self.slippage_rate) # 슬리피지 포함 매도가
if signal == "BUY" and position == 0:
# 전액 투자 (수수료 포함)
qty = int(capital / (buy_price * (1 + self.commission_rate)))
if qty > 0:
cost = qty * buy_price * (1 + self.commission_rate)
capital -= cost
position = qty
entry_price = buy_price
entry_idx = i
elif signal == "SELL" and position > 0:
proceeds = position * sell_price * (1 - self.commission_rate)
capital += proceeds
trades.append(Trade(
ticker=ticker,
entry_date=entry_idx,
entry_price=entry_price,
exit_date=i,
exit_price=sell_price,
qty=position
))
position = 0
entry_price = 0.0
# 자산 추적
current_equity = capital + (position * closes[i] if position > 0 else 0)
equity_curve.append(current_equity)
# 미청산 포지션 강제 종료
if position > 0:
last_price = closes[-1] * (1 - self.slippage_rate)
proceeds = position * last_price * (1 - self.commission_rate)
capital += proceeds
trades.append(Trade(
ticker=ticker,
entry_date=entry_idx,
entry_price=entry_price,
exit_date=n - 1,
exit_price=last_price,
qty=position
))
equity_curve[-1] = capital
return self._compute_metrics(equity_curve, trades)
def run_multi(self, ohlcv_dict: Dict[str, Dict], strategy_fn: Callable,
warmup: int = 60) -> Dict[str, BacktestResult]:
"""여러 종목 백테스팅"""
results = {}
for ticker, ohlcv_data in ohlcv_dict.items():
results[ticker] = self.run(ohlcv_data, strategy_fn, ticker, warmup)
return results
def _compute_metrics(self, equity_curve: List[float], trades: List[Trade]) -> BacktestResult:
equity = np.array(equity_curve, dtype=float)
total_return_pct = (equity[-1] / equity[0] - 1) * 100
# Sharpe Ratio (일별 수익률 기준, 연율화)
daily_returns = np.diff(equity) / equity[:-1]
if daily_returns.std() > 0:
sharpe = (daily_returns.mean() / daily_returns.std()) * np.sqrt(252)
else:
sharpe = 0.0
# Max Drawdown
peak = np.maximum.accumulate(equity)
drawdowns = (equity - peak) / (peak + 1e-9) * 100
max_drawdown = abs(drawdowns.min())
# 승률 / 손익비
wins = [t for t in trades if t.pnl_pct > 0]
losses = [t for t in trades if t.pnl_pct <= 0]
win_rate = len(wins) / len(trades) * 100 if trades else 0
avg_win = np.mean([t.pnl_pct for t in wins]) if wins else 0
avg_loss = np.mean([t.pnl_pct for t in losses]) if losses else 0
total_win = sum(t.pnl for t in wins)
total_loss = abs(sum(t.pnl for t in losses))
profit_factor = total_win / (total_loss + 1e-9)
return BacktestResult(
total_return_pct=round(total_return_pct, 2),
sharpe_ratio=round(sharpe, 3),
max_drawdown_pct=round(max_drawdown, 2),
win_rate=round(win_rate, 1),
avg_win_pct=round(avg_win, 2),
avg_loss_pct=round(avg_loss, 2),
profit_factor=round(profit_factor, 3),
total_trades=len(trades),
winning_trades=len(wins),
losing_trades=len(losses),
trades=trades
)
def _empty_result(self) -> BacktestResult:
return BacktestResult(
total_return_pct=0.0, sharpe_ratio=0.0, max_drawdown_pct=0.0,
win_rate=0.0, avg_win_pct=0.0, avg_loss_pct=0.0,
profit_factor=0.0, total_trades=0, winning_trades=0, losing_trades=0
)
def compare_strategies(ohlcv_data: Dict, strategies: Dict[str, Callable],
initial_capital: float = 10_000_000) -> Dict[str, BacktestResult]:
"""
여러 전략 동시 비교
Args:
strategies: {"전략명": strategy_fn, ...}
Returns:
{"전략명": BacktestResult, ...}
"""
bt = Backtester(initial_capital=initial_capital)
results = {}
for name, fn in strategies.items():
results[name] = bt.run(ohlcv_data, fn)
print(f"\n[{name}]")
print(results[name].summary())
return results

View File

@@ -1,8 +1,10 @@
import os
import time
import pickle
import torch
import torch.nn as nn
import numpy as np
from collections import OrderedDict
from sklearn.preprocessing import MinMaxScaler
from modules.config import Config
@@ -10,6 +12,10 @@ from modules.config import Config
# cuDNN 벤치마크 활성화 (고정 입력 크기에 대해 최적 커널 자동 선택)
torch.backends.cudnn.benchmark = True
# 체크포인트 버전 (피처 수 변경 시 기존 모델 자동 재학습)
CHECKPOINT_VERSION = "v3"
INPUT_SIZE = 7 # close, open, high, low, volume_norm, rsi_14, macd_hist
class Attention(nn.Module):
def __init__(self, hidden_size):
@@ -23,7 +29,7 @@ class Attention(nn.Module):
class AdvancedLSTM(nn.Module):
def __init__(self, input_size=1, hidden_size=512, num_layers=4, output_size=1, dropout=0.3):
def __init__(self, input_size=INPUT_SIZE, hidden_size=512, num_layers=4, output_size=1, dropout=0.3):
super(AdvancedLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
@@ -49,8 +55,24 @@ class AdvancedLSTM(nn.Module):
return out
def _get_free_vram_gb():
"""현재 GPU VRAM 여유량(GB) 반환"""
try:
if torch.cuda.is_available():
total = torch.cuda.get_device_properties(0).total_memory / 1024**3
reserved = torch.cuda.memory_reserved(0) / 1024**3
return total - reserved
except Exception:
pass
return 99.0 # CUDA 없으면 언로드 불필요
def _unload_ollama():
"""LSTM 학습 전 Ollama 모델 언로드하여 GPU 메모리 확보"""
"""LSTM 학습 전 Ollama 모델 언로드 (VRAM < 2GB 여유일 때만)"""
free_vram = _get_free_vram_gb()
if free_vram >= 2.0:
print(f"[AI] Ollama 언로드 생략 (VRAM 여유 {free_vram:.1f}GB >= 2GB)")
return
try:
import requests
url = f"{Config.OLLAMA_API_URL}/api/generate"
@@ -58,14 +80,17 @@ def _unload_ollama():
"model": Config.OLLAMA_MODEL,
"keep_alive": 0
}, timeout=5)
print("[AI] Ollama model unloaded (GPU memory freed)")
time.sleep(1) # 메모리 해제 대기
print(f"[AI] Ollama 언로드 (VRAM 여유 {free_vram:.1f}GB)")
time.sleep(1)
except Exception:
pass
def _preload_ollama():
"""LSTM 학습 후 Ollama 모델 다시 로드"""
"""LSTM 학습 후 Ollama 모델 리로드 (언로드했던 경우만)"""
free_vram = _get_free_vram_gb()
if free_vram >= 2.0:
return # 언로드하지 않았으니 리로드도 불필요
try:
import requests
url = f"{Config.OLLAMA_API_URL}/api/generate"
@@ -86,25 +111,99 @@ def _log_gpu_memory(tag=""):
print(f"[AI GPU {tag}] Allocated: {allocated:.2f}GB / Reserved: {reserved:.2f}GB")
def _compute_rsi(close_arr, period=14):
"""RSI 계산 (numpy 기반)"""
if len(close_arr) < period + 1:
return np.full(len(close_arr), 50.0)
delta = np.diff(close_arr, prepend=close_arr[0])
gain = np.where(delta > 0, delta, 0.0)
loss = np.where(delta < 0, -delta, 0.0)
alpha = 1.0 / period
rsi_arr = np.zeros(len(close_arr))
avg_gain = gain[0]
avg_loss = loss[0]
for i in range(1, len(close_arr)):
avg_gain = alpha * gain[i] + (1 - alpha) * avg_gain
avg_loss = alpha * loss[i] + (1 - alpha) * avg_loss
rs = avg_gain / (avg_loss + 1e-9)
rsi_arr[i] = 100 - (100 / (1 + rs))
return rsi_arr
def _compute_macd_hist(close_arr, fast=12, slow=26, signal=9):
"""MACD Histogram 계산 (numpy 기반)"""
if len(close_arr) < slow + signal:
return np.zeros(len(close_arr))
ema_fast = np.zeros(len(close_arr))
ema_slow = np.zeros(len(close_arr))
alpha_f = 2 / (fast + 1)
alpha_s = 2 / (slow + 1)
ema_fast[0] = close_arr[0]
ema_slow[0] = close_arr[0]
for i in range(1, len(close_arr)):
ema_fast[i] = alpha_f * close_arr[i] + (1 - alpha_f) * ema_fast[i - 1]
ema_slow[i] = alpha_s * close_arr[i] + (1 - alpha_s) * ema_slow[i - 1]
macd = ema_fast - ema_slow
sig = np.zeros(len(close_arr))
alpha_sig = 2 / (signal + 1)
sig[0] = macd[0]
for i in range(1, len(close_arr)):
sig[i] = alpha_sig * macd[i] + (1 - alpha_sig) * sig[i - 1]
return macd - sig
def _build_feature_matrix(ohlcv_data):
"""
OHLCV 딕셔너리 → 7차원 numpy 피처 행렬 생성
피처: [close, open, high, low, volume_norm, rsi_14, macd_hist]
"""
close = np.array(ohlcv_data.get('close', []), dtype=np.float64)
open_ = np.array(ohlcv_data.get('open', close), dtype=np.float64)
high = np.array(ohlcv_data.get('high', close), dtype=np.float64)
low = np.array(ohlcv_data.get('low', close), dtype=np.float64)
volume = np.array(ohlcv_data.get('volume', []), dtype=np.float64)
n = len(close)
if len(open_) != n: open_ = close.copy()
if len(high) != n: high = close.copy()
if len(low) != n: low = close.copy()
# 거래량 정규화 (최대값 기준, 0이면 0)
if len(volume) == n and volume.max() > 0:
volume_norm = volume / (volume.max() + 1e-9)
else:
volume_norm = np.zeros(n)
rsi = _compute_rsi(close, period=14)
rsi_norm = rsi / 100.0 # 0~1 정규화
macd_hist = _compute_macd_hist(close)
# 7차원 피처 스택 (n x 7)
features = np.column_stack([close, open_, high, low, volume_norm, rsi_norm, macd_hist])
return features # shape: (n, 7)
class PricePredictor:
"""
주가 예측 Deep Learning 모델 (GPU 최적화)
- 전체 학습 데이터를 GPU에 상주 (CPU↔GPU 전송 최소화)
- Ollama 모델 언로드/리로드로 GPU 메모리 확보
- Early Stopping + Mixed Precision (FP16)
- 종목별 모델 체크포인트
[v3.0] 주가 예측 Deep Learning 모델 (GPU 최적화)
- 7차원 멀티피처 LSTM (close/open/high/low/vol_norm/rsi/macd_hist)
- feature_scaler(6개) + target_scaler(1개) 분리
- 데이터 누수 수정: train 데이터로만 fit
- 체크포인트에 scaler 상태 저장/로드
- VRAM 여유량 기반 Ollama 언로드 (충분하면 생략)
"""
def __init__(self):
self.scaler = MinMaxScaler(feature_range=(0, 1))
self.feature_scaler = MinMaxScaler(feature_range=(0, 1)) # 입력 6개 피처
self.target_scaler = MinMaxScaler(feature_range=(0, 1)) # 타겟: close 가격
self.hidden_size = 512
self.num_layers = 4
self.model = AdvancedLSTM(input_size=1, hidden_size=self.hidden_size,
self.model = AdvancedLSTM(input_size=INPUT_SIZE, hidden_size=self.hidden_size,
num_layers=self.num_layers, dropout=0.3)
self.criterion = nn.MSELoss()
# CUDA 설정
self.device = torch.device('cpu')
self.use_amp = False
@@ -116,19 +215,18 @@ class PricePredictor:
self.device = torch.device('cuda')
self.model.to(self.device)
# Mixed Precision (Compute Capability >= 7.0: Volta 이상)
if torch.cuda.get_device_capability(0)[0] >= 7:
self.use_amp = True
# Warm-up: CUDA 커널 컴파일 유도
dummy = torch.zeros(1, 60, 1, device=self.device)
# Warm-up
dummy = torch.zeros(1, 60, INPUT_SIZE, device=self.device)
with torch.no_grad():
_ = self.model(dummy)
torch.cuda.synchronize()
print(f"[AI] GPU Mode: {gpu_name} ({vram_gb:.1f}GB)"
f" | FP16={'ON' if self.use_amp else 'OFF'}"
f" | cuDNN Benchmark=ON")
f" | Features={INPUT_SIZE} | cuDNN Benchmark=ON")
_log_gpu_memory("init")
except Exception as e:
@@ -139,9 +237,8 @@ class PricePredictor:
print("[AI] No CUDA GPU detected. Running on CPU.")
self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.001, weight_decay=1e-4)
# [v2.0] Learning Rate Scheduler (ReduceLROnPlateau: val_loss 정체 시 lr 감소)
self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=False
self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6
)
self.scaler_amp = torch.amp.GradScaler('cuda') if self.use_amp else None
@@ -149,7 +246,6 @@ class PricePredictor:
self.max_epochs = 200
self.seq_length = 60
self.patience = 15
# [v2.0] Gradient Clipping 값 (exploding gradient 방지)
self.max_grad_norm = 1.0
self.training_status = {
@@ -164,7 +260,8 @@ class PricePredictor:
try:
gpu_name = torch.cuda.get_device_name(0)
vram_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
print(f"[AI Check] {gpu_name} ({vram_gb:.1f}GB VRAM) | cuDNN={torch.backends.cudnn.is_available()}")
print(f"[AI Check] {gpu_name} ({vram_gb:.1f}GB VRAM) | cuDNN={torch.backends.cudnn.is_available()}"
f" | Features={INPUT_SIZE}")
return True
except Exception as e:
print(f"[AI Check] GPU Error: {e}")
@@ -173,16 +270,25 @@ class PricePredictor:
return False
def _get_checkpoint_path(self, ticker):
return os.path.join(Config.MODEL_DIR, f"{ticker}_lstm.pt")
return os.path.join(Config.MODEL_DIR, f"{ticker}_lstm_{CHECKPOINT_VERSION}.pt")
def _load_checkpoint(self, ticker):
path = self._get_checkpoint_path(ticker)
if os.path.exists(path):
try:
checkpoint = torch.load(path, map_location=self.device, weights_only=True)
checkpoint = torch.load(path, map_location=self.device, weights_only=False)
# 버전 체크 (v3 이전 체크포인트는 재학습)
if checkpoint.get('version', '') != CHECKPOINT_VERSION:
print(f"[AI] Checkpoint version mismatch ({ticker}): 재학습 필요")
return False
self.model.load_state_dict(checkpoint['model_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
print(f"[AI] Checkpoint loaded: {ticker}")
# scaler 복원
if 'feature_scaler' in checkpoint:
self.feature_scaler = pickle.loads(checkpoint['feature_scaler'])
if 'target_scaler' in checkpoint:
self.target_scaler = pickle.loads(checkpoint['target_scaler'])
print(f"[AI] Checkpoint loaded: {ticker} (v3, 7-features)")
return True
except Exception as e:
print(f"[AI] Checkpoint load failed ({ticker}): {e}")
@@ -192,21 +298,127 @@ class PricePredictor:
path = self._get_checkpoint_path(ticker)
try:
torch.save({
'version': CHECKPOINT_VERSION,
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'epoch': epoch,
'loss': loss
'loss': loss,
'feature_scaler': pickle.dumps(self.feature_scaler),
'target_scaler': pickle.dumps(self.target_scaler)
}, path)
except Exception as e:
print(f"[AI] Checkpoint save failed ({ticker}): {e}")
def train_and_predict(self, prices, forecast_days=1, ticker=None):
def _is_checkpoint_fresh(self, ticker, max_age=None):
"""체크포인트가 최근에 학습된 것인지 확인 (쿨다운 판단)"""
if not ticker:
return False
path = self._get_checkpoint_path(ticker)
if not os.path.exists(path):
return False
age = time.time() - os.path.getmtime(path)
threshold = max_age if max_age is not None else Config.LSTM_COOLDOWN
return age < threshold
def _prepare_scaled_features(self, features, split_point):
"""
피처 스케일링 (누수 방지: train split으로만 fit)
features: (n, 7) numpy array
split_point: train/val 분리 인덱스
Returns:
scaled_features: (n, 7) 스케일된 전체 피처
scaled_close: (n, 1) 스케일된 close (타겟용)
"""
# 6개 입력 피처 (close 포함 open/high/low/vol_norm/rsi/macd_hist)
# + 타겟은 close만 별도 scaler
input_features = features[:, :] # (n, 7) 전체 7개 피처 입력용
target_close = features[:, 0:1] # (n, 1) close만 타겟용
# train 데이터로만 fit (데이터 누수 방지)
self.feature_scaler.fit(input_features[:split_point])
self.target_scaler.fit(target_close[:split_point])
scaled_features = self.feature_scaler.transform(input_features)
scaled_close = self.target_scaler.transform(target_close)
return scaled_features, scaled_close
def _predict_only(self, ohlcv_data, ticker=None):
"""학습 없이 현재 체크포인트로만 빠른 예측 (쿨다운 중 사용)"""
prices = ohlcv_data.get('close', []) if isinstance(ohlcv_data, dict) else ohlcv_data
if len(prices) < self.seq_length:
return None
try:
features = _build_feature_matrix(
ohlcv_data if isinstance(ohlcv_data, dict) else {'close': prices}
)
if len(features) < self.seq_length:
return None
scaled = self.feature_scaler.transform(features)
last_seq = torch.FloatTensor(scaled[-self.seq_length:]).unsqueeze(0).to(self.device)
self.model.eval()
with torch.no_grad():
if self.use_amp:
with torch.amp.autocast('cuda'):
pred_scaled = self.model(last_seq)
else:
pred_scaled = self.model(last_seq)
predicted_price = self.target_scaler.inverse_transform(
pred_scaled.cpu().float().numpy())[0][0]
current_price = prices[-1]
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
cached_loss = self.training_status.get("loss", 0.5)
print(f"[AI] {ticker or '?'}: 쿨다운 중 → 캐시 예측 사용 "
f"({predicted_price:.0f} / {change_rate:+.2f}%)")
return {
"current": current_price,
"predicted": float(predicted_price),
"change_rate": round(change_rate, 2),
"trend": trend,
"loss": cached_loss,
"val_loss": cached_loss,
"confidence": 0.62,
"epochs": 0,
"device": str(self.device),
"lr": self.optimizer.param_groups[0]['lr'],
"cached": True
}
except Exception as e:
print(f"[AI] _predict_only 실패 ({ticker}): {e}")
return None
def train_and_predict(self, ohlcv_data, forecast_days=1, ticker=None):
"""
[v3.0] 7차원 멀티피처 LSTM 학습 + 예측
ohlcv_data: dict {'close':[], 'open':[], 'high':[], 'low':[], 'volume':[]}
또는 list (하위 호환: close 리스트)
"""
# 하위 호환: list 형태
if isinstance(ohlcv_data, list):
ohlcv_data = {'close': ohlcv_data}
prices = ohlcv_data.get('close', [])
if len(prices) < (self.seq_length + 10):
return None
# ===== 쿨다운 체크 =====
if self._is_checkpoint_fresh(ticker):
has_ckpt = self._load_checkpoint(ticker)
if has_ckpt:
result = self._predict_only(ohlcv_data, ticker)
if result:
return result
is_gpu = self.device.type == 'cuda'
# --- Ollama 모델 언로드 (GPU 메모리 확보) ---
# VRAM 여유량 기반 Ollama 언로드
if is_gpu:
_unload_ollama()
torch.cuda.empty_cache()
@@ -214,54 +426,54 @@ class PricePredictor:
t_start = time.time()
# 1. 데이터 전처리 (CPU에서 numpy 작업)
data = np.array(prices).reshape(-1, 1)
scaled_data = self.scaler.fit_transform(data)
# 1. 피처 행렬 구성 (n, 7)
features = _build_feature_matrix(ohlcv_data)
if len(features) < (self.seq_length + 10):
return None
n = len(features)
split_point = int(n * 0.8)
# 2. 스케일링 (train 데이터로만 fit → 누수 방지)
scaled_features, scaled_close = self._prepare_scaled_features(features, split_point)
# 3. 시퀀스 생성
x_seqs, y_seqs = [], []
for i in range(len(scaled_data) - self.seq_length):
x_seqs.append(scaled_data[i:i + self.seq_length])
y_seqs.append(scaled_data[i + self.seq_length])
for i in range(n - self.seq_length):
x_seqs.append(scaled_features[i:i + self.seq_length]) # (seq, 7)
y_seqs.append(scaled_close[i + self.seq_length]) # (1,)
# 2. 텐서 생성 → 즉시 GPU로 이동 (이후 CPU↔GPU 전송 없음)
x_all = torch.FloatTensor(np.array(x_seqs)).to(self.device)
y_all = torch.FloatTensor(np.array(y_seqs)).to(self.device)
# Validation split (80/20)
split_idx = int(len(x_all) * 0.8)
x_train = x_all[:split_idx]
y_train = y_all[:split_idx]
x_val = x_all[split_idx:]
y_val = y_all[split_idx:]
# validation split (80/20)
seq_split = int(len(x_all) * 0.8)
x_train, y_train = x_all[:seq_split], y_all[:seq_split]
x_val, y_val = x_all[seq_split:], y_all[seq_split:]
dataset_size = len(x_train)
# 3. 체크포인트 로드
has_checkpoint = False
if ticker:
has_checkpoint = self._load_checkpoint(ticker)
max_epochs = 50 if has_checkpoint else self.max_epochs
# 4. 체크포인트 로드
has_checkpoint = self._load_checkpoint(ticker) if ticker else False
max_epochs = Config.LSTM_FAST_EPOCHS if has_checkpoint else self.max_epochs
# 4. 학습 (전체 데이터 GPU 상주, DataLoader 미사용)
# [v2.0] LR Scheduler 리셋
self.optimizer.param_groups[0]['lr'] = 0.001 if not has_checkpoint else 0.0005
self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=False
self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6
)
# 5. 학습
self.model.train()
self.training_status["is_training"] = True
if ticker:
self.training_status["current_ticker"] = ticker
best_val_loss = float('inf')
best_model_state = None # [v2.0] Best Model 저장
best_model_state = None
patience_counter = 0
final_loss = 0.0
actual_epochs = 0
for epoch in range(max_epochs):
# --- Training (GPU 내에서 셔플 + 미니배치) ---
perm = torch.randperm(dataset_size, device=self.device)
x_shuffled = x_train[perm]
y_shuffled = y_train[perm]
@@ -281,7 +493,6 @@ class PricePredictor:
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
self.scaler_amp.scale(loss).backward()
# [v2.0] Gradient Clipping (AMP 호환)
self.scaler_amp.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
self.scaler_amp.step(self.optimizer)
@@ -290,7 +501,6 @@ class PricePredictor:
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
loss.backward()
# [v2.0] Gradient Clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
self.optimizer.step()
@@ -299,7 +509,6 @@ class PricePredictor:
train_loss = epoch_loss / max(1, steps)
# --- Validation (GPU에서 직접 수행) ---
self.model.eval()
with torch.no_grad():
if self.use_amp:
@@ -311,23 +520,19 @@ class PricePredictor:
val_loss = self.criterion(val_out, y_val).item()
self.model.train()
# [v2.0] LR Scheduler step (val_loss 기반)
self.lr_scheduler.step(val_loss)
final_loss = train_loss
actual_epochs = epoch + 1
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# [v2.0] Best model 상태 저장 (overfitting 방지)
best_model_state = {k: v.clone() for k, v in self.model.state_dict().items()}
else:
patience_counter += 1
if patience_counter >= self.patience:
break
# [v2.0] Best model 복원 (early stopping 후 최적 상태로 복구)
if best_model_state:
self.model.load_state_dict(best_model_state)
@@ -340,17 +545,17 @@ class PricePredictor:
elapsed = time.time() - t_start
print(f"[AI] {ticker or '?'}: {actual_epochs} epochs in {elapsed:.1f}s"
f" | loss={final_loss:.6f} val={best_val_loss:.6f}"
f" | device={self.device}")
f" | device={self.device} | features={INPUT_SIZE}")
# 5. 체크포인트 저장
# 6. 체크포인트 저장 (scaler 포함)
if ticker:
self._save_checkpoint(ticker, actual_epochs, final_loss)
# 6. 예측
# 7. 예측
self.model.eval()
with torch.no_grad():
last_seq = torch.FloatTensor(
scaled_data[-self.seq_length:]
scaled_features[-self.seq_length:]
).unsqueeze(0).to(self.device)
if self.use_amp:
@@ -359,12 +564,11 @@ class PricePredictor:
else:
predicted_scaled = self.model(last_seq)
predicted_price = self.scaler.inverse_transform(
predicted_price = self.target_scaler.inverse_transform(
predicted_scaled.cpu().float().numpy())[0][0]
# 7. GPU 메모리 정리 + Ollama 리로드
# 8. GPU 정리 + Ollama 리로드
if is_gpu:
# 학습 중간 텐서 해제
del x_all, y_all, x_train, y_train, x_val, y_val
torch.cuda.empty_cache()
_log_gpu_memory("post-train")
@@ -374,27 +578,22 @@ class PricePredictor:
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
# [v2.0] 개선된 신뢰도 계산
# 1. 학습 손실 기반 (낮을수록 좋음)
# 신뢰도 계산
loss_confidence = 1.0 / (1.0 + (best_val_loss * 50))
# 2. Train/Val 괴리도 (overfitting 감지)
overfit_ratio = final_loss / (best_val_loss + 1e-9)
if overfit_ratio < 0.5:
# Train loss가 Val loss보다 훨씬 낮음 = overfitting
overfit_penalty = 0.7
elif overfit_ratio > 2.0:
# Train loss가 Val loss보다 훨씬 높음 = underfitting
overfit_penalty = 0.8
else:
overfit_penalty = 1.0
# 3. 에포크 수 기반 (너무 적거나 많으면 불신)
epoch_factor = 1.0
if actual_epochs < 10:
epoch_factor = 0.6 # 학습 부족
epoch_factor = 0.6
elif actual_epochs >= max_epochs:
epoch_factor = 0.8 # 수렴 실패
epoch_factor = 0.8
confidence = min(0.95, loss_confidence * overfit_penalty * epoch_factor)
@@ -411,28 +610,32 @@ class PricePredictor:
"lr": self.optimizer.param_groups[0]['lr']
}
def batch_predict(self, prices_dict):
def batch_predict(self, ohlcv_dict):
"""여러 종목을 배치로 예측 (체크포인트 있는 종목만)"""
results = {}
seqs = []
metas = []
for ticker, prices in prices_dict.items():
for ticker, ohlcv_data in ohlcv_dict.items():
if isinstance(ohlcv_data, list):
ohlcv_data = {'close': ohlcv_data}
prices = ohlcv_data.get('close', [])
if len(prices) < (self.seq_length + 10):
results[ticker] = None
continue
data = np.array(prices).reshape(-1, 1)
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)
seq = torch.FloatTensor(scaled_data[-self.seq_length:]).unsqueeze(0)
seqs.append(seq)
metas.append((ticker, scaler, prices[-1]))
try:
features = _build_feature_matrix(ohlcv_data)
scaled = self.feature_scaler.transform(features)
seq = torch.FloatTensor(scaled[-self.seq_length:]).unsqueeze(0)
seqs.append(seq)
metas.append((ticker, prices[-1]))
except Exception:
results[ticker] = None
if not seqs:
return results
# 배치로 합쳐서 한번에 GPU 추론
batch = torch.cat(seqs, dim=0).to(self.device)
self.model.eval()
@@ -445,8 +648,8 @@ class PricePredictor:
preds_cpu = preds.cpu().float().numpy()
for i, (ticker, scaler, current_price) in enumerate(metas):
predicted_price = scaler.inverse_transform(preds_cpu[i:i+1])[0][0]
for i, (ticker, current_price) in enumerate(metas):
predicted_price = self.target_scaler.inverse_transform(preds_cpu[i:i+1])[0][0]
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
@@ -461,3 +664,52 @@ class PricePredictor:
torch.cuda.empty_cache()
return results
class ModelRegistry:
"""
[v3.0] 종목별 LSTM 모델 격리 (LRU 퇴출, max_models=5)
- 싱글톤 패턴: 워커 프로세스마다 하나의 Registry 유지
- 16GB VRAM에서 LSTM 5개(~250MB) + Ollama 7B(~4GB) 동시 적재 가능
"""
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self, max_models=5):
self.max_models = max_models
self._predictors = OrderedDict() # ticker -> PricePredictor (LRU 순서)
print(f"[ModelRegistry] Initialized (max_models={max_models})")
def get_predictor(self, ticker):
"""종목별 PricePredictor 반환 (없으면 생성, LRU 관리)"""
if ticker in self._predictors:
# LRU: 접근 시 맨 뒤로 이동
self._predictors.move_to_end(ticker)
return self._predictors[ticker]
# 용량 초과 시 가장 오래된 것 퇴출
if len(self._predictors) >= self.max_models:
oldest_ticker, oldest_pred = self._predictors.popitem(last=False)
print(f"[ModelRegistry] Evicted {oldest_ticker} (LRU, {len(self._predictors)}/{self.max_models})")
del oldest_pred
if torch.cuda.is_available():
torch.cuda.empty_cache()
predictor = PricePredictor()
self._predictors[ticker] = predictor
print(f"[ModelRegistry] Created predictor for {ticker} ({len(self._predictors)}/{self.max_models})")
return predictor
def has_predictor(self, ticker):
return ticker in self._predictors
def clear(self):
"""모든 모델 해제"""
self._predictors.clear()
if torch.cuda.is_available():
torch.cuda.empty_cache()

View File

@@ -0,0 +1,230 @@
"""
앙상블 예측 모듈 (Phase 3-2)
- LSTM + 기술지표 + LLM 감성 → 적응형 가중치
- 과거 매매 결과 기반 가중치 자동 조정
- process.py의 하드코딩된 w_tech/w_news/w_ai 대체
"""
import os
import json
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, Optional
from modules.config import Config
@dataclass
class SignalWeights:
"""앙상블 가중치"""
tech: float = 0.35
sentiment: float = 0.30
lstm: float = 0.35
def normalize(self):
total = self.tech + self.sentiment + self.lstm
if total > 0:
self.tech /= total
self.sentiment /= total
self.lstm /= total
return self
def to_dict(self):
return {"tech": self.tech, "sentiment": self.sentiment, "lstm": self.lstm}
@classmethod
def from_dict(cls, d):
return cls(tech=d.get("tech", 0.35),
sentiment=d.get("sentiment", 0.30),
lstm=d.get("lstm", 0.35))
class AdaptiveEnsemble:
"""
적응형 앙상블 가중치 관리자
핵심 로직:
1. 종목별 최근 N 매매의 결과를 추적
2. 어떤 신호가 정확했는지 소급 평가
3. 정확도가 높은 신호의 가중치를 점진적으로 증가
4. 시장 상황(ADX, 거시경제) 반영한 컨텍스트별 가중치 분리
"""
def __init__(self, history_file=None, max_history=50):
self.max_history = max_history
self.history_file = history_file or os.path.join(
Config.DATA_DIR, "ensemble_history.json"
)
# {ticker: [{"tech": f, "sentiment": f, "lstm": f, "decision": str, "outcome": float}, ...]}
self._trade_history: Dict[str, list] = {}
# {context: SignalWeights} - context: "strong_trend" | "sideways" | "danger"
self._context_weights: Dict[str, SignalWeights] = {
"strong_trend": SignalWeights(tech=0.50, sentiment=0.20, lstm=0.30),
"sideways": SignalWeights(tech=0.30, sentiment=0.40, lstm=0.30),
"danger": SignalWeights(tech=0.20, sentiment=0.50, lstm=0.30),
"default": SignalWeights(tech=0.35, sentiment=0.30, lstm=0.35),
}
self._load()
def _load(self):
if os.path.exists(self.history_file):
try:
with open(self.history_file, "r", encoding="utf-8") as f:
data = json.load(f)
self._trade_history = data.get("history", {})
weights_raw = data.get("weights", {})
for ctx, w in weights_raw.items():
self._context_weights[ctx] = SignalWeights.from_dict(w)
except Exception as e:
print(f"[Ensemble] Load failed: {e}")
def _save(self):
try:
data = {
"history": {k: v[-self.max_history:] for k, v in self._trade_history.items()},
"weights": {ctx: w.to_dict() for ctx, w in self._context_weights.items()}
}
with open(self.history_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[Ensemble] Save failed: {e}")
def get_context(self, adx: float, macro_state: str) -> str:
"""현재 시장 컨텍스트 결정"""
if macro_state == "DANGER":
return "danger"
if adx >= 25:
return "strong_trend"
if adx < 20:
return "sideways"
return "default"
def get_weights(self, ticker: str, adx: float = 20.0,
macro_state: str = "SAFE",
ai_confidence: float = 0.5) -> SignalWeights:
"""
종목 + 시장 컨텍스트에 맞는 가중치 반환
1. 기본: 컨텍스트별 기준 가중치
2. AI 신뢰도 높으면 lstm 가중치 보정
3. 종목별 학습 결과 반영
"""
context = self.get_context(adx, macro_state)
base = self._context_weights.get(context, self._context_weights["default"])
# 적응형 조정: 해당 종목의 과거 성과 반영
ticker_history = self._trade_history.get(ticker, [])
adjusted = SignalWeights(tech=base.tech, sentiment=base.sentiment, lstm=base.lstm)
if len(ticker_history) >= 5:
# 최근 5회 신호별 정확도 평가
recent = ticker_history[-10:]
tech_acc = self._accuracy([h["tech_score"] for h in recent],
[h["outcome"] for h in recent])
news_acc = self._accuracy([h["sentiment_score"] for h in recent],
[h["outcome"] for h in recent])
lstm_acc = self._accuracy([h["lstm_score"] for h in recent],
[h["outcome"] for h in recent])
# 정확도 기반 가중치 미세 조정 (±0.1 범위)
alpha = 0.05
adjusted.tech = max(0.1, min(0.6, base.tech + alpha * (tech_acc - 0.5)))
adjusted.sentiment = max(0.1, min(0.6, base.sentiment + alpha * (news_acc - 0.5)))
adjusted.lstm = max(0.1, min(0.6, base.lstm + alpha * (lstm_acc - 0.5)))
# AI 신뢰도 보정
if ai_confidence >= 0.85:
adjusted.lstm = min(0.70, adjusted.lstm * 1.3)
elif ai_confidence < 0.5:
adjusted.lstm = max(0.10, adjusted.lstm * 0.7)
return adjusted.normalize()
def record_trade(self, ticker: str, tech_score: float, sentiment_score: float,
lstm_score: float, decision: str, outcome_pct: float):
"""
매매 결과 기록 (가중치 학습 데이터)
outcome_pct: 실현 수익률 (%). 양수=이익, 음수=손실
"""
if ticker not in self._trade_history:
self._trade_history[ticker] = []
record = {
"tech_score": tech_score,
"sentiment_score": sentiment_score,
"lstm_score": lstm_score,
"decision": decision,
"outcome": outcome_pct
}
self._trade_history[ticker].append(record)
# 히스토리 크기 제한
if len(self._trade_history[ticker]) > self.max_history:
self._trade_history[ticker] = self._trade_history[ticker][-self.max_history:]
# 가중치 점진적 업데이트
self._update_weights(ticker)
self._save()
def _update_weights(self, ticker: str):
"""종목별 성과를 반영해 컨텍스트 가중치 점진적 업데이트"""
history = self._trade_history.get(ticker, [])
if len(history) < 5:
return
recent = history[-10:]
outcomes = [h["outcome"] for h in recent]
mean_outcome = np.mean(outcomes)
if mean_outcome > 0:
# 전략이 효과적 → 현재 가중치 유지 (강화)
pass
elif mean_outcome < -2.0:
# 손실이 큰 경우 → 기본값으로 리셋
for ctx in self._context_weights:
self._context_weights[ctx] = SignalWeights(
tech=0.35, sentiment=0.30, lstm=0.35)
def compute_ensemble_score(self, tech_score: float, sentiment_score: float,
lstm_score: float, investor_score: float = 0.0,
weights: Optional[SignalWeights] = None) -> float:
"""
앙상블 통합 점수 계산
Args:
weights: 가중치 (None이면 기본값 사용)
"""
if weights is None:
weights = SignalWeights()
total = (weights.tech * tech_score
+ weights.sentiment * sentiment_score
+ weights.lstm * lstm_score)
# 수급 가산점 (최대 +0.15)
total += min(investor_score, 0.15)
return min(1.0, max(0.0, total))
@staticmethod
def _accuracy(scores: list, outcomes: list) -> float:
"""신호와 결과의 상관도 계산 (0.5 = 무관, 1.0 = 완전 일치)"""
if len(scores) < 3:
return 0.5
# 신호가 높을 때 수익, 낮을 때 손실이면 정확
correct = sum(
1 for s, o in zip(scores, outcomes)
if (s >= 0.5 and o > 0) or (s < 0.5 and o <= 0)
)
return correct / len(scores)
# 전역 싱글톤
_ensemble_instance: Optional[AdaptiveEnsemble] = None
def get_ensemble() -> AdaptiveEnsemble:
"""워커 프로세스 내 싱글톤 앙상블 관리자"""
global _ensemble_instance
if _ensemble_instance is None:
_ensemble_instance = AdaptiveEnsemble()
return _ensemble_instance

View File

@@ -0,0 +1,421 @@
"""
성과 평가 엔진 - PerformanceEvaluator
기능:
1. compute_metrics() - 핵심 성과 지표 계산
2. get_grade() - 지표별 S/A/B/C/D/F 등급 산출
3. generate_expert_panel() - Ollama LLM 5명 전문가 의견
4. generate_weekly_report() - 텔레그램 HTML 주간 보고서
"""
import json
import math
from datetime import datetime, timedelta
from modules.utils.performance_db import PerformanceDB
class PerformanceEvaluator:
def __init__(self):
self.perf_db = PerformanceDB()
# ─────────────────────────────────────────
# 1. 핵심 지표 계산
# ─────────────────────────────────────────
def compute_metrics(self, snapshots, trades):
"""성과 지표를 딕셔너리로 반환.
Args:
snapshots (list): daily_snapshots 리스트
trades (list): trade_records 리스트
Returns:
dict: 지표 딕셔너리 (또는 {"error": ...})
"""
if not snapshots:
return {"error": "스냅샷 데이터 없음 (운영 시작 후 첫 영업일까지 대기)"}
metrics = {}
# ── 수익률 ──────────────────────────────
initial = snapshots[0].get("total_eval", 0)
current = snapshots[-1].get("total_eval", 0)
metrics["total_return_pct"] = round(
(current - initial) / initial * 100, 2) if initial > 0 else 0.0
cutoff_7 = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
recent_snaps = [s for s in snapshots if s.get("date", "") >= cutoff_7]
if len(recent_snaps) >= 2:
w_init = recent_snaps[0].get("total_eval", 0)
w_curr = recent_snaps[-1].get("total_eval", 0)
metrics["weekly_return_pct"] = round(
(w_curr - w_init) / w_init * 100, 2) if w_init > 0 else 0.0
else:
metrics["weekly_return_pct"] = 0.0
# ── 리스크 지표 ──────────────────────────
daily_returns = [s.get("daily_return_pct", 0.0) / 100.0 for s in snapshots]
if len(daily_returns) >= 2:
mean_daily = sum(daily_returns) / len(daily_returns)
variance = sum((r - mean_daily) ** 2 for r in daily_returns) / len(daily_returns)
std_daily = math.sqrt(variance) if variance > 0 else 0.0
annual_return = mean_daily * 252
# Sharpe Ratio
if std_daily > 0:
metrics["sharpe_ratio"] = round(
annual_return / (std_daily * math.sqrt(252)), 3)
else:
metrics["sharpe_ratio"] = 0.0
# Sortino Ratio (하방 편차만 사용)
downside = [r for r in daily_returns if r < 0]
if downside:
dv = sum(r ** 2 for r in downside) / len(downside)
ds = math.sqrt(dv)
metrics["sortino_ratio"] = round(
annual_return / (ds * math.sqrt(252)), 3) if ds > 0 else 0.0
else:
metrics["sortino_ratio"] = 10.0 # 손실 없음
# Max Drawdown
peak = snapshots[0].get("total_eval", 0)
max_dd = 0.0
for snap in snapshots:
ev = snap.get("total_eval", 0)
if ev > peak:
peak = ev
if peak > 0:
dd = (peak - ev) / peak * 100
if dd > max_dd:
max_dd = dd
metrics["max_drawdown_pct"] = round(max_dd, 2)
# Calmar Ratio
ann_pct = annual_return * 100
metrics["calmar_ratio"] = round(
ann_pct / max_dd, 3) if max_dd > 0 else 0.0
else:
metrics["sharpe_ratio"] = 0.0
metrics["sortino_ratio"] = 0.0
metrics["max_drawdown_pct"] = 0.0
metrics["calmar_ratio"] = 0.0
# ── 매매 지표 ─────────────────────────────
closed = [t for t in trades
if t.get("action") == "BUY" and t.get("outcome_return_pct") is not None]
if closed:
wins = [t for t in closed if t.get("outcome_return_pct", 0) > 0]
losses = [t for t in closed if t.get("outcome_return_pct", 0) <= 0]
metrics["win_rate_pct"] = round(
len(wins) / len(closed) * 100, 1)
total_profit = sum(t["outcome_return_pct"] for t in wins)
total_loss = abs(sum(t["outcome_return_pct"] for t in losses))
metrics["profit_factor"] = round(
total_profit / total_loss, 3) if total_loss > 0 else 10.0
hd_list = [t["holding_days"] for t in closed if t.get("holding_days") is not None]
metrics["avg_holding_days"] = round(
sum(hd_list) / len(hd_list), 1) if hd_list else 0.0
else:
metrics["win_rate_pct"] = 0.0
metrics["profit_factor"] = 0.0
metrics["avg_holding_days"] = 0.0
metrics["total_trades"] = len(closed)
# ── 벤치마크 Alpha ────────────────────────
kospi_vals = [s.get("benchmark_kospi_close") for s in snapshots]
kospi_valid = [k for k in kospi_vals if k is not None]
if len(kospi_valid) >= 2:
kospi_ret = (kospi_valid[-1] - kospi_valid[0]) / kospi_valid[0] * 100
metrics["alpha"] = round(metrics["total_return_pct"] - kospi_ret, 2)
metrics["kospi_return_pct"] = round(kospi_ret, 2)
else:
metrics["alpha"] = 0.0
metrics["kospi_return_pct"] = 0.0
# ── AI 품질 지표 ──────────────────────────
if closed:
# LSTM 방향 정확도
correct = 0
direction_n = 0
for t in closed:
pred = t.get("ai_prediction_change")
outcome = t.get("outcome_return_pct")
if pred is not None and outcome is not None:
if (pred > 0) == (outcome > 0):
correct += 1
direction_n += 1
metrics["lstm_direction_accuracy"] = round(
correct / direction_n * 100, 1) if direction_n > 0 else 0.0
# 신호별 수익 상관도
outcomes = [t.get("outcome_return_pct", 0) for t in closed]
def pearson(xs, ys):
n = len(xs)
if n < 2:
return 0.0
mx = sum(xs) / n
my = sum(ys) / n
num = sum((x - mx) * (y - my) for x, y in zip(xs, ys))
denom_x = sum((x - mx) ** 2 for x in xs)
denom_y = sum((y - my) ** 2 for y in ys)
denom = math.sqrt(denom_x * denom_y)
return num / denom if denom > 0 else 0.0
corr_tech = pearson([t.get("tech_score", 0) for t in closed], outcomes)
corr_sent = pearson([t.get("sentiment_score", 0) for t in closed], outcomes)
corr_lstm = pearson([t.get("lstm_score", 0) for t in closed], outcomes)
metrics["signal_correlation"] = {
"tech": round(corr_tech, 3),
"sentiment": round(corr_sent, 3),
"lstm": round(corr_lstm, 3)
}
metrics["best_signal_source"] = max(
["tech", "sentiment", "lstm"],
key=lambda k: abs(metrics["signal_correlation"][k])
)
else:
metrics["lstm_direction_accuracy"] = 0.0
metrics["signal_correlation"] = {"tech": 0.0, "sentiment": 0.0, "lstm": 0.0}
metrics["best_signal_source"] = "unknown"
metrics["snapshot_count"] = len(snapshots)
return metrics
# ─────────────────────────────────────────
# 2. 등급 산출
# ─────────────────────────────────────────
def get_grade(self, metric, value):
"""지표 이름과 값으로 S/A/B/C/D/F 등급 반환."""
# MDD는 낮을수록 좋음
if metric == "max_drawdown_pct":
thresholds = [(5, "S"), (10, "A"), (15, "B"), (20, "C"), (30, "D")]
for threshold, grade in thresholds:
if value < threshold:
return grade
return "F"
grade_rules = {
"sharpe_ratio": [(2.0, "S"), (1.5, "A"), (1.0, "B"), (0.5, "C"), (0.0, "D")],
"sortino_ratio": [(3.0, "S"), (2.0, "A"), (1.5, "B"), (1.0, "C"), (0.0, "D")],
"win_rate_pct": [(70, "S"), (60, "A"), (50, "B"), (40, "C"), (30, "D")],
"profit_factor": [(3.0, "S"), (2.0, "A"), (1.5, "B"), (1.0, "C"), (0.5, "D")],
"alpha": [(15, "S"), (10, "A"), (5, "B"), (0, "C"), (-5, "D")],
"total_return_pct": [(30, "S"), (20, "A"), (10, "B"), (0, "C"), (-10, "D")],
"weekly_return_pct": [(5, "S"), (3, "A"), (1, "B"), (0, "C"), (-1, "D")],
"lstm_direction_accuracy":[(70, "S"), (60, "A"), (55, "B"), (50, "C"), (40, "D")],
"calmar_ratio": [(3.0, "S"), (2.0, "A"), (1.0, "B"), (0.5, "C"), (0.0, "D")],
}
thresholds = grade_rules.get(metric, [])
for threshold, grade in thresholds:
if value >= threshold:
return grade
return "F"
# ─────────────────────────────────────────
# 3. 전문가 패널 (Ollama LLM)
# ─────────────────────────────────────────
def generate_expert_panel(self, metrics):
"""5명의 전문가 역할로 Ollama에 평가를 요청.
Returns:
list[dict]: [{role, grade, comment, suggestion}, ...]
"""
from modules.services.ollama import OllamaManager
ollama = OllamaManager()
sig_corr = metrics.get("signal_correlation", {})
experts = [
{
"role": "Risk Manager",
"focus": "risk level assessment and bankruptcy risk",
"data": (
f"Sharpe={metrics.get('sharpe_ratio', 0):.2f}, "
f"Sortino={metrics.get('sortino_ratio', 0):.2f}, "
f"MDD={metrics.get('max_drawdown_pct', 0):.1f}%, "
f"Calmar={metrics.get('calmar_ratio', 0):.2f}"
)
},
{
"role": "Fund Manager",
"focus": "alpha generation vs market benchmark",
"data": (
f"TotalReturn={metrics.get('total_return_pct', 0):.2f}%, "
f"Alpha={metrics.get('alpha', 0):.2f}%, "
f"KOSPI={metrics.get('kospi_return_pct', 0):.2f}%, "
f"WeeklyReturn={metrics.get('weekly_return_pct', 0):.2f}%"
)
},
{
"role": "Quant Analyst",
"focus": "AI model validity and signal quality",
"data": (
f"LSTM_Accuracy={metrics.get('lstm_direction_accuracy', 0):.1f}%, "
f"TechCorr={sig_corr.get('tech', 0):.3f}, "
f"SentCorr={sig_corr.get('sentiment', 0):.3f}, "
f"LSTMCorr={sig_corr.get('lstm', 0):.3f}, "
f"BestSignal={metrics.get('best_signal_source', 'N/A')}"
)
},
{
"role": "Trader",
"focus": "trading strategy effectiveness",
"data": (
f"WinRate={metrics.get('win_rate_pct', 0):.1f}%, "
f"ProfitFactor={metrics.get('profit_factor', 0):.2f}, "
f"AvgHolding={metrics.get('avg_holding_days', 0):.1f}days, "
f"TotalTrades={metrics.get('total_trades', 0)}"
)
},
{
"role": "Portfolio PM",
"focus": "overall strategy direction and sustainability",
"data": (
f"WeeklyReturn={metrics.get('weekly_return_pct', 0):.2f}%, "
f"Sharpe={metrics.get('sharpe_ratio', 0):.2f}, "
f"WinRate={metrics.get('win_rate_pct', 0):.1f}%, "
f"Alpha={metrics.get('alpha', 0):.2f}%, "
f"MDD={metrics.get('max_drawdown_pct', 0):.1f}%"
)
}
]
results = []
for exp in experts:
prompt = (
f"You are a professional {exp['role']} evaluating an AI stock trading bot. "
f"Your focus: {exp['focus']}. "
f"Performance data: {exp['data']}. "
f"Respond ONLY with valid JSON (no markdown, no extra text): "
f"{{\"grade\":\"S|A|B|C|D|F\","
f"\"comment\":\"1 sentence evaluation in Korean\","
f"\"suggestion\":\"1 sentence improvement tip in Korean\"}}"
)
try:
resp = ollama.request_inference(prompt)
if not resp:
raise ValueError("Empty response from Ollama")
data = json.loads(resp)
results.append({
"role": exp["role"],
"grade": data.get("grade", "C"),
"comment": data.get("comment", "(응답 없음)"),
"suggestion": data.get("suggestion", "데이터 축적 필요")
})
except Exception as e:
print(f"[Evaluator] Expert panel [{exp['role']}] error: {e}")
results.append({
"role": exp["role"],
"grade": "C",
"comment": "평가 데이터가 부족합니다.",
"suggestion": "더 많은 거래 데이터 축적 후 재평가를 권장합니다."
})
return results
# ─────────────────────────────────────────
# 4. 주간 보고서 생성
# ─────────────────────────────────────────
def generate_weekly_report(self):
"""주간 성과 보고서 (텔레그램 HTML 형식) 반환."""
snapshots = self.perf_db.load_snapshots(days=7)
# 매매 완료 건은 30일치 사용 (주간 거래 수가 적을 수 있음)
trades = self.perf_db.load_trades(days=30)
metrics = self.compute_metrics(snapshots, trades)
if "error" in metrics:
return (
f"<b>[주간 성과 평가 보고서]</b>\n"
f"⚠️ {metrics['error']}\n"
f"<i>매일 오전 09:05~09:15에 스냅샷이 저장됩니다.</i>"
)
# 등급 계산
g_sharpe = self.get_grade("sharpe_ratio", metrics.get("sharpe_ratio", 0))
g_win = self.get_grade("win_rate_pct", metrics.get("win_rate_pct", 0))
g_mdd = self.get_grade("max_drawdown_pct", metrics.get("max_drawdown_pct", 0))
g_alpha = self.get_grade("alpha", metrics.get("alpha", 0))
g_weekly = self.get_grade("weekly_return_pct", metrics.get("weekly_return_pct", 0))
g_lstm = self.get_grade("lstm_direction_accuracy",
metrics.get("lstm_direction_accuracy", 0))
# 종합 등급 (Sharpe/Win/MDD/Alpha 평균)
grade_map = {"S": 5, "A": 4, "B": 3, "C": 2, "D": 1, "F": 0}
grade_rev = {v: k for k, v in grade_map.items()}
key_grades = [grade_map[g] for g in [g_sharpe, g_win, g_mdd, g_alpha]]
overall_grade = grade_rev[round(sum(key_grades) / len(key_grades))]
# 전문가 패널 (Ollama 호출)
try:
experts = self.generate_expert_panel(metrics)
except Exception as e:
print(f"[Evaluator] Expert panel skipped: {e}")
experts = []
now_str = datetime.now().strftime("%Y/%m/%d %H:%M")
corr = metrics.get("signal_correlation", {})
report = (
f"📊 <b>[주간 성과 평가 보고서]</b> <code>{now_str}</code>\n"
f"━━━━━━━━━━━━━━━━━━━━━━\n"
f"\n<b>■ 수익률</b>\n"
f" 주간: <code>{metrics.get('weekly_return_pct', 0):+.2f}%</code> [{g_weekly}]"
f" 누적: <code>{metrics.get('total_return_pct', 0):+.2f}%</code>\n"
f" Alpha: <code>{metrics.get('alpha', 0):+.2f}%</code> [{g_alpha}]"
f" vs KOSPI <code>{metrics.get('kospi_return_pct', 0):+.2f}%</code>\n"
f"\n<b>■ 리스크</b>\n"
f" Sharpe: <code>{metrics.get('sharpe_ratio', 0):.2f}</code> [{g_sharpe}]"
f" Sortino: <code>{metrics.get('sortino_ratio', 0):.2f}</code>\n"
f" MDD: <code>{metrics.get('max_drawdown_pct', 0):.1f}%</code> [{g_mdd}]"
f" Calmar: <code>{metrics.get('calmar_ratio', 0):.2f}</code>\n"
f"\n<b>■ 매매 통계</b>\n"
f" 승률: <code>{metrics.get('win_rate_pct', 0):.1f}%</code> [{g_win}]"
f" PF: <code>{metrics.get('profit_factor', 0):.2f}</code>\n"
f" 평균보유: <code>{metrics.get('avg_holding_days', 0):.1f}일</code>"
f" 완료매매: <code>{metrics.get('total_trades', 0)}건</code>\n"
f"\n<b>■ AI 품질</b>\n"
f" LSTM 방향정확도: <code>{metrics.get('lstm_direction_accuracy', 0):.1f}%</code>"
f" [{g_lstm}]\n"
f" 신호 상관도 — Tech: <code>{corr.get('tech', 0):.3f}</code>"
f" Sent: <code>{corr.get('sentiment', 0):.3f}</code>"
f" LSTM: <code>{corr.get('lstm', 0):.3f}</code>\n"
f" 최고기여 신호: <code>{metrics.get('best_signal_source', 'N/A')}</code>\n"
)
if experts:
role_icons = {
"Risk Manager": "🛡",
"Fund Manager": "💼",
"Quant Analyst": "🧮",
"Trader": "📈",
"Portfolio PM": "🏦"
}
report += "\n<b>■ 전문가 패널 의견</b>\n"
for exp in experts:
icon = role_icons.get(exp["role"], "👤")
report += (
f"{icon} <b>{exp['role']}</b> [{exp['grade']}]\n"
f" {exp['comment']}\n"
f" 💡 {exp['suggestion']}\n"
)
report += (
f"\n━━━━━━━━━━━━━━━━━━━━━━\n"
f"🏆 <b>종합 등급: [{overall_grade}]</b>\n"
f"<i>스냅샷 {metrics.get('snapshot_count', 0)}일 | 완료매매 {metrics.get('total_trades', 0)}건 기준</i>"
)
return report

View File

@@ -22,31 +22,32 @@ class MacroAnalyzer:
"""
indicators = {
"KOSPI": "0001",
"KOSDAQ": "1001"
"KOSDAQ": "1001",
"KOSPI200": "0028",
}
results = {}
risk_score = 0
print("🌍 [Macro] Fetching market indices via KIS API...")
for name, code in indicators.items():
data = kis_client.get_current_index(code)
time.sleep(0.6) # Rate Limit 방지 (초당 2회 제한)
if data:
price = data['price']
if data and data.get('price', 0) != 0:
results[name] = data
print(f" - {name}: {data['price']} ({data['change']}%)")
# 리스크 평가 로직 (2% 이상 폭락 장이면 위험)
change = data['change']
results[name] = {"price": price, "change": change}
print(f" - {name}: {price} ({change}%)")
# 리스크 평가 로직 (단순화: 2% 이상 폭락 장이면 위험)
if change <= -2.0:
risk_score += 2 # 패닉 상태
elif change <= -1.0:
risk_score += 1 # 주의 상태
else:
results[name] = {"price": 0, "change": 0}
results[name] = {"price": 0, "change": 0, "high": 0, "low": 0,
"prev_close": 0, "volume": 0, "trade_value": 0}
# [신규] 시장 스트레스 지수(MSI) 추가
time.sleep(0.6)

View File

@@ -13,6 +13,7 @@ from modules.services.ollama import OllamaManager
from modules.services.telegram import TelegramMessenger
from modules.analysis.macro import MacroAnalyzer
from modules.utils.monitor import SystemMonitor
from modules.utils.performance_db import PerformanceDB
from modules.strategy.process import analyze_stock_process, calculate_position_size
try:
@@ -47,10 +48,6 @@ class AutoTradingBot:
self.kis = KISClient()
self.news = AsyncNewsCollector()
self.executor = ProcessPoolExecutor(max_workers=1, initializer=init_worker)
try:
list(self.executor.map(lambda x: x, range(1)))
except Exception:
pass
self.messenger = TelegramMessenger()
self.theme_manager = ThemeManager()
@@ -95,6 +92,12 @@ class AutoTradingBot:
self.history_file = Config.HISTORY_FILE
self.load_trade_history()
# 7-1. 성과 DB 및 평가 플래그
self.perf_db = PerformanceDB()
self.weekly_eval_sent = False
self._snapshot_taken_today = False
self._pending_evaluate = False
# 8. AI 하드웨어 점검
from modules.analysis.deep_learning import PricePredictor
PricePredictor.verify_hardware()
@@ -130,6 +133,49 @@ class AutoTradingBot:
except Exception:
return {}
def _take_daily_snapshot(self, macro_status, balance):
"""일별 자산 스냅샷을 perf_db에 저장 (09:05~09:15 호출)."""
try:
total_eval_snap = int(balance.get("total_eval", 0))
deposit_snap = int(balance.get("deposit", 0))
holdings_count_snap = len([
h for h in balance.get("holdings", [])
if int(h.get("qty", 0)) > 0
])
# KOSPI 현재가 (macro_status 지표에서 추출)
kospi_close = None
try:
indicators = macro_status.get("indicators", {})
kospi_price = float(indicators.get("KOSPI", {}).get("price", 0))
if kospi_price > 0:
kospi_close = kospi_price
except Exception:
pass
self.perf_db.save_daily_snapshot(
total_eval_snap, deposit_snap, holdings_count_snap, kospi_close)
self._snapshot_taken_today = True
except Exception as e:
print(f"[Bot] Daily snapshot error: {e}")
async def _run_weekly_evaluation(self):
"""주간 성과 평가 실행 후 텔레그램으로 전송."""
try:
from modules.analysis.evaluator import PerformanceEvaluator
evaluator = PerformanceEvaluator()
loop = asyncio.get_running_loop()
# Ollama 호출이 동기 블로킹이므로 executor에서 실행
report = await loop.run_in_executor(None, evaluator.generate_weekly_report)
if len(report) > 4000:
report = report[:4000] + "\n... (일부 생략)"
self.messenger.send_message(report)
self.weekly_eval_sent = True
print("[Bot] Weekly evaluation report sent.")
except Exception as e:
print(f"[Bot] Weekly evaluation error: {e}")
self.messenger.send_message(f"[Bot] 주간 평가 오류: {e}")
def _load_peak_prices(self):
"""트레일링 스탑용 최고가 데이터 로드"""
peak_file = os.path.join(Config.DATA_DIR, "peak_prices.json")
@@ -251,12 +297,20 @@ class AutoTradingBot:
except Exception as e:
self.messenger.send_message(f"Watchlist update failed: {e}")
elif command == 'evaluate':
self._pending_evaluate = True
async def run_cycle(self):
now = datetime.now()
# 0. 명령 큐 폴링
self._process_commands()
# 0-1. 즉시 평가 요청 처리 (IPC 'evaluate' 명령)
if self._pending_evaluate:
self._pending_evaluate = False
await self._run_weekly_evaluation()
# 1. 거시경제 분석
macro_status = MacroAnalyzer.get_macro_status(self.kis)
self.last_macro_status = macro_status
@@ -316,6 +370,8 @@ class AutoTradingBot:
self.daily_trade_history = []
self.save_trade_history()
self.report_sent = False
self.weekly_eval_sent = False
self._snapshot_taken_today = False
self.discovered_stocks.clear()
self.watchlist_updated_today = False
# 전일 최고가 초기화 (보유하지 않는 종목)
@@ -328,9 +384,28 @@ class AutoTradingBot:
if not (9 <= now.hour < 15 or (now.hour == 15 and now.minute < 30)):
if now.hour == 15 and now.minute >= 40:
self.send_daily_report()
# 일별 스냅샷 (16:00~16:30, 당일 최종 포트폴리오 가치 기록)
if now.hour == 16 and now.minute <= 30 and not self._snapshot_taken_today:
try:
balance_snap = self.kis.get_balance()
self._take_daily_snapshot(macro_status, balance_snap)
except Exception as e:
print(f"[Bot] Snapshot error: {e}")
# 주간 평가 (금요일 15:35~15:45, 장 마감 직후)
if (now.weekday() == 4 and now.hour == 15
and 35 <= now.minute <= 45 and not self.weekly_eval_sent):
await self._run_weekly_evaluation()
# 장 외 시간에는 서킷 브레이커도 리셋
self.monitor.reset_circuit()
print("[Bot] Market Closed. Waiting...")
return
# [서킷 브레이커] CPU 과부하 시 분석 사이클 일시 중단
if self.monitor.is_cpu_critical():
print("[Bot] ⛔ CPU Circuit Breaker 발동 중. 분석 사이클 스킵.")
return
cycle_start_time = time.time()
print(f"[Bot] Cycle Start: {now.strftime('%H:%M:%S')}")
# 7. 종목 분석 및 매매
@@ -365,14 +440,30 @@ class AutoTradingBot:
tracking_deposit = int(balance.get("deposit", 0))
# [v3.0] 비동기 OHLCV + 투자자 동향 배치 조회
tickers_list = list(target_dict.keys())
ohlcv_batch = {}
investor_batch = {}
if self.kis_async and tickers_list:
try:
print(f"[Bot] 비동기 OHLCV 배치 조회: {len(tickers_list)}종목")
ohlcv_batch = await self.kis_async.get_daily_ohlcv_batch(tickers_list)
investor_batch = await self.kis_async.get_investor_trends_batch(tickers_list)
except Exception as e:
print(f"[Bot] 비동기 배치 조회 실패: {e} -> 동기 fallback")
ohlcv_batch = {}
investor_batch = {}
try:
for ticker, name in target_dict.items():
prices = self.kis.get_daily_price(ticker)
if not prices:
# OHLCV 데이터 획득 (배치 결과 우선, 실패 시 동기 fallback)
ohlcv_data = ohlcv_batch.get(ticker)
if not ohlcv_data or not ohlcv_data.get('close'):
ohlcv_data = self.kis.get_daily_ohlcv(ticker)
if not ohlcv_data or not ohlcv_data.get('close'):
continue
investor_trend = self.kis.get_investor_trend(ticker)
# [v2.0] 보유 정보 전달 (분석 워커에서 동적 손절/익절 사용)
holding_info = None
if ticker in current_holdings:
@@ -385,16 +476,22 @@ class AutoTradingBot:
'peak_price': self.peak_prices.get(ticker, float(h.get('current_price', 0)))
}
# investor_trend fallback
investor_trend = investor_batch.get(ticker)
if investor_trend is None:
investor_trend = self.kis.get_investor_trend(ticker)
future = self.executor.submit(
analyze_stock_process, ticker, prices, news_data,
analyze_stock_process, ticker, ohlcv_data, news_data,
investor_trend, macro_status, holding_info)
analysis_tasks.append(future)
# 결과 처리
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
for future in analysis_tasks:
try:
res = await loop.run_in_executor(None, future.result)
# 240초 타임아웃: LSTM 학습 + Ollama 추론 시간 고려
res = await loop.run_in_executor(None, lambda f=future: f.result(240))
ticker = res['ticker']
ticker_name = target_dict.get(ticker, 'Unknown')
print(f"[Bot] [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})"
@@ -458,6 +555,24 @@ class AutoTradingBot:
"reason": reason
})
self.save_trade_history()
# 성과 DB 기록
pred = res.get("prediction") or {}
self.perf_db.save_trade_record(
action="BUY", ticker=ticker, name=ticker_name,
qty=qty, price=current_price,
scores_dict={
"tech": res.get("tech", 0.0),
"sentiment": res.get("sentiment", 0.0),
"lstm_score": res.get("lstm_score", 0.0),
"score": res.get("score", 0.0),
"ai_confidence": res.get("ai_confidence", 0.5),
"prediction_change": pred.get("change_rate", 0.0)
},
reason=reason,
macro_state=macro_status.get("status", "SAFE")
)
tracking_deposit -= required_amount
# 최고가 초기 설정
@@ -484,14 +599,18 @@ class AutoTradingBot:
f" Reason: {reason}")
self.messenger.send_message(msg)
sell_price = float(h.get('current_price', 0))
self.daily_trade_history.append({
"action": "SELL", "name": ticker_name,
"qty": qty, "price": float(h.get('current_price', 0)),
"qty": qty, "price": sell_price,
"yield": yld, "profit": profit_loss,
"reason": reason
})
self.save_trade_history()
# 성과 DB 매도 결과 기록
self.perf_db.close_trade(ticker, sell_price, yld)
# 최고가 기록 삭제
if ticker in self.peak_prices:
del self.peak_prices[ticker]
@@ -510,11 +629,21 @@ class AutoTradingBot:
except Exception as e:
print(f"[Bot] Cycle Loop Error: {e}")
# 사이클 소요시간 로깅 (120초 초과 시 경고)
cycle_elapsed = time.time() - cycle_start_time
if cycle_elapsed > 120:
print(f"[Bot] ⚠️ 사이클 소요 {cycle_elapsed:.0f}초 (120초 초과) → LSTM 쿨다운 활성화 권장")
else:
print(f"[Bot] Cycle Done: {cycle_elapsed:.1f}")
def loop(self):
print(f"[Bot] Module Started (PID: {os.getpid()}) [v2.0]")
print(f"[Bot] Module Started (PID: {os.getpid()}) [v3.0]")
self.messenger.send_message(
"🚀 [Bot Started v2.0]\n"
"개선사항: 동적 손절/익절, 트레일링 스탑, 포지션 사이징, 분석 기반 매도")
"🚀 <b>[Bot Started v3.0]</b>\n"
f"✅ LSTM 쿨다운: {Config.LSTM_COOLDOWN//60}\n"
f"✅ AI 모델: {Config.OLLAMA_MODEL}\n"
f"✅ CPU 서킷브레이커: {Config.CPU_CIRCUIT_BREAKER_THRESHOLD}% 기준\n"
"✅ 동적 손절/익절, 트레일링 스탑, 포지션 사이징")
# 최고가 데이터 로드
self._load_peak_prices()

View File

@@ -12,7 +12,12 @@ class Config:
# 2. NAS 및 AI 서버
NAS_API_URL = os.getenv("NAS_API_URL", "http://192.168.45.54:18500")
OLLAMA_API_URL = os.getenv("OLLAMA_API_URL", "http://localhost:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b-instruct-q8_0")
# [최적화] qwen2.5:7b-instruct-q4_K_M: JSON 정확도↑, 속도↑, VRAM 4GB
# 14B 원하면: qwen2.5:14b-instruct-q4_K_M (VRAM ~9GB, 품질 더 좋음)
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:7b-instruct-q4_K_M")
OLLAMA_NUM_CTX = int(os.getenv("OLLAMA_NUM_CTX", "4096")) # 8192→4096 (2배 속도)
OLLAMA_NUM_PREDICT = int(os.getenv("OLLAMA_NUM_PREDICT", "200")) # 응답 토큰 제한
OLLAMA_NUM_THREAD = int(os.getenv("OLLAMA_NUM_THREAD", "8")) # CPU 스레드 (9800X3D 최적화)
# 3. KIS 한국투자증권
KIS_ENV_TYPE = os.getenv("KIS_ENV_TYPE", "virtual").lower()
@@ -53,7 +58,7 @@ class Config:
# 7. IPC 설정
SHM_NAME = "web_ai_bot_ipc"
SHM_SIZE = 131072 # 128KB
IPC_STALENESS = 120 # 120초 (메인 봇 사이클 60초 + 여유)
IPC_STALENESS = 600 # 600초 (LSTM 분석 사이클이 길어도 portfolio 명령어 정상 작동)
# 8. GPU 설정
VRAM_WARNING_THRESHOLD = 12.0 # GB (14 → 12로 조기 경고)
@@ -65,6 +70,16 @@ class Config:
# 10. 타임아웃 등
HTTP_TIMEOUT = 10
# 11. LSTM 학습 최적화
# 동일 종목을 이 시간(초) 내에 재학습하지 않음 → CPU/GPU 절약
LSTM_COOLDOWN = int(os.getenv("LSTM_COOLDOWN", "1200")) # 20분
# 체크포인트가 있을 때 빠른 재학습 에포크 수 (기존 50 → 30)
LSTM_FAST_EPOCHS = int(os.getenv("LSTM_FAST_EPOCHS", "30"))
# 12. CPU 서킷 브레이커
CPU_CIRCUIT_BREAKER_THRESHOLD = 92 # CPU% 이상 시 분석 스킵
CPU_CIRCUIT_BREAKER_CONSECUTIVE = 2 # 연속 N회 초과 시 발동
@staticmethod
def validate():
"""필수 설정 검증"""

View File

@@ -271,38 +271,48 @@ class KISClient:
except Exception as e:
return {"error": str(e)}
def order(self, ticker, qty, buy_sell, price=0):
"""주문 (시장가)
def order(self, ticker, qty, buy_sell, price=0, order_type="market"):
"""주문
buy_sell: 'BUY' or 'SELL'
order_type: 'market'(시장가), 'limit'(지정가), 'conditional'(조건부지정가)
price: 지정가일 때 주문 가격 (market이면 무시)
"""
self._throttle()
self.ensure_token()
# 모의투자/실전 TR ID 구분
# 매수: VTTC0802U / TTTC0802U
# 매도: VTTC0801U / TTTC0801U
if buy_sell == 'BUY':
tr_id = "VTTC0802U" if self.is_virtual else "TTTC0802U"
else:
tr_id = "VTTC0801U" if self.is_virtual else "TTTC0801U"
# 주문 구분 코드
# 00: 지정가, 01: 시장가, 03: 최유리지정가, 05: 장전시간외, 06: 장후시간외
if order_type == "limit" and price > 0:
ord_dvsn = "00"
ord_unpr = str(int(price))
order_type_str = f"지정가({price:,.0f})"
elif order_type == "conditional" and price > 0:
ord_dvsn = "03" # 최유리지정가
ord_unpr = str(int(price))
order_type_str = f"조건부({price:,.0f})"
else:
ord_dvsn = "01" # 시장가
ord_unpr = "0"
order_type_str = "시장가"
url = f"{self.base_url}/uapi/domestic-stock/v1/trading/order-cash"
# 주문 파라미터
datas = {
"CANO": self.cano,
"ACNT_PRDT_CD": self.acnt_prdt_cd,
"PDNO": ticker,
"ORD_DVSN": "01", # 01: 시장가
"ORD_DVSN": ord_dvsn,
"ORD_QTY": str(qty),
"ORD_UNPR": "0" # 시장가는 0
"ORD_UNPR": ord_unpr
}
# 헤더 준비
headers = self._get_headers(tr_id=tr_id)
# [중요] POST 요청(주문 등) 시 Hash Key 필수
# 단, 모의투자의 경우 일부 상황에서 생략 가능할 수 있으나, 정석대로 포함
hash_key = self.get_hash_key(datas)
if hash_key:
headers["hashkey"] = hash_key
@@ -310,17 +320,17 @@ class KISClient:
print("⚠️ [KIS] Hash Key 생성 실패 (주문 전송 시도)")
try:
print(f"📤 [KIS] 주문 전송: {buy_sell} {ticker} {qty}ea (시장가)")
print(f"📤 [KIS] 주문 전송: {buy_sell} {ticker} {qty}ea ({order_type_str})")
res = requests.post(url, headers=headers, json=datas)
res.raise_for_status()
data = res.json()
print(f"📥 [KIS] 주문 응답 코드(rt_cd): {data['rt_cd']}")
print(f"📥 [KIS] 주문 응답 메시지(msg1): {data['msg1']}")
if data['rt_cd'] != '0':
return {"status": False, "msg": data['msg1'], "rt_cd": data['rt_cd']}
return {"status": True, "msg": "주문 전송 완료", "order_no": data['output']['ODNO'], "rt_cd": data['rt_cd']}
except Exception as e:
return {"status": False, "msg": str(e), "rt_cd": "EXCEPTION"}
@@ -348,34 +358,188 @@ class KISClient:
print(f"❌ 현재가 조회 실패: {e}")
return None
def get_daily_price(self, ticker, period="D"):
"""일별 시세 조회 (기술적 분석용)"""
def _get_daily_ohlcv_by_range(self, ticker, period="D", count=100):
"""기간별시세 API (FHKST03010100) - OHLCV 전체 반환
output2에서 stck_oprc, stck_hgpr, stck_lwpr, stck_clpr, acml_vol 파싱
"""
self._throttle()
self.ensure_token()
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=int(count * 1.6))).strftime("%Y%m%d")
url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
headers = self._get_headers(tr_id="FHKST03010100")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_INPUT_DATE_1": start_date,
"FID_INPUT_DATE_2": end_date,
"FID_PERIOD_DIV_CODE": period,
"FID_ORG_ADJ_PRC": "1"
}
try:
res = requests.get(url, headers=headers, params=params,
timeout=Config.HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if data.get('rt_cd') != '0':
return None
output = data.get('output2', [])
if not output:
return None
opens, highs, lows, closes, volumes = [], [], [], [], []
for item in output:
try:
c = int(item.get('stck_clpr', 0) or 0)
o = int(item.get('stck_oprc', 0) or 0)
h = int(item.get('stck_hgpr', 0) or 0)
l = int(item.get('stck_lwpr', 0) or 0)
v = int(item.get('acml_vol', 0) or 0)
if c > 0:
opens.append(o if o > 0 else c)
highs.append(h if h > 0 else c)
lows.append(l if l > 0 else c)
closes.append(c)
volumes.append(v)
except (ValueError, TypeError):
pass
if not closes:
return None
# API는 최신순 → 과거→현재 순으로 변환
opens.reverse(); highs.reverse(); lows.reverse()
closes.reverse(); volumes.reverse()
result = {
'open': opens[-count:],
'high': highs[-count:],
'low': lows[-count:],
'close': closes[-count:],
'volume': volumes[-count:]
}
print(f"[KIS] {ticker} OHLCV: {len(result['close'])}개 ({start_date}~{end_date})")
return result
except Exception as e:
print(f"⚠️ [KIS] OHLCV 조회 실패 ({ticker}): {e}")
return None
def get_daily_ohlcv(self, ticker, period="D", count=100):
"""일별 OHLCV 시세 조회 (기술적 분석 + LSTM 7차원 입력용)
1차: 기간별시세 API OHLCV 파싱 (100일)
2차: 기존 close-only fallback
"""
ohlcv = self._get_daily_ohlcv_by_range(ticker, period, count)
if ohlcv and len(ohlcv['close']) >= 30:
return ohlcv
# fallback: close만 반환 (가짜 OHLCV)
print(f"[KIS] {ticker} OHLCV 실패 → close-only fallback")
prices = self._get_daily_price_by_range(ticker, period, count)
if not prices:
return None
return {
'open': prices, 'high': prices, 'low': prices,
'close': prices, 'volume': []
}
def _get_daily_price_by_range(self, ticker, period="D", count=100):
"""기간별시세 API (FHKST03010100) - 날짜 범위로 최대 100일 데이터 반환
inquire-daily-price(FHKST01010400)가 30일만 반환하는 한계 극복"""
self._throttle()
self.ensure_token()
end_date = datetime.now().strftime("%Y%m%d")
# 영업일 count개 확보를 위해 역일 1.6배 요청 (주말/공휴일 여유)
start_date = (datetime.now() - timedelta(days=int(count * 1.6))).strftime("%Y%m%d")
url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
headers = self._get_headers(tr_id="FHKST03010100")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_INPUT_DATE_1": start_date,
"FID_INPUT_DATE_2": end_date,
"FID_PERIOD_DIV_CODE": period,
"FID_ORG_ADJ_PRC": "1"
}
try:
res = requests.get(url, headers=headers, params=params,
timeout=Config.HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if data.get('rt_cd') != '0':
return []
# 기간별시세는 output2에 배열로 반환
output = data.get('output2', [])
if not output:
return []
prices = []
for item in output:
clpr = item.get('stck_clpr', '')
if clpr and clpr != '0':
try:
prices.append(int(clpr))
except ValueError:
pass
prices.reverse() # API는 최신순 → 과거→현재 순으로 변환
result = prices[-count:]
print(f"[KIS] {ticker} 기간별시세: {len(result)}"
f"({start_date}~{end_date})")
return result
except Exception as e:
print(f"⚠️ [KIS] 기간별시세 조회 실패 ({ticker}): {e}")
return []
def get_daily_price(self, ticker, period="D", count=100):
"""일별 시세 조회 (기술적 분석 + LSTM용)
1차: 기간별시세 API (100일, LSTM 학습 가능)
2차: 구형 API fallback (30일)
"""
# 1차: 기간별시세 API (FHKST03010100) - 100일
prices = self._get_daily_price_by_range(ticker, period, count)
if prices and len(prices) >= 30:
return prices
# 2차: 구형 API fallback (FHKST01010400) - 30일
print(f"[KIS] {ticker} 기간별시세 실패 → 구형 API(30일) fallback")
self._throttle()
self.ensure_token()
url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-price"
headers = self._get_headers(tr_id="FHKST01010400")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_PERIOD_DIV_CODE": period,
"FID_ORG_ADJ_PRC": "1" # 수정주가
"FID_ORG_ADJ_PRC": "1"
}
try:
res = requests.get(url, headers=headers, params=params)
res = requests.get(url, headers=headers, params=params,
timeout=Config.HTTP_TIMEOUT)
res.raise_for_status()
data = res.json()
if data['rt_cd'] != '0':
if data.get('rt_cd') != '0':
return []
# 과거 데이터부터 오도록 정렬 필요할 수 있음 (API는 최신순)
# output 리스트: [ {stck_clpr: 종가, ...}, ... ]
prices = [int(item['stck_clpr']) for item in data['output']]
prices.reverse() # 과거 -> 현재 순으로 정렬
prices = [int(item['stck_clpr']) for item in data['output']
if item.get('stck_clpr')]
prices.reverse()
return prices
except Exception as e:
print(f"❌ 일별 시세 조회 실패: {e}")
print(f"❌ 일별 시세 조회 실패 ({ticker}): {e}")
return []
def get_volume_rank(self, limit=5):
@@ -437,9 +601,18 @@ class KISClient:
data = self._request_api("GET", endpoint, "FHKUP03500100", params=params)
if data['rt_cd'] != '0':
return None
o = data['output']
def _f(val): return float(val) if val else 0.0
def _i(val): return int(float(val)) if val else 0
return {
"price": float(data['output']['bstp_nmix_prpr']), # 현재지수
"change": float(data['output']['bstp_nmix_prdy_ctrt']) # 등락률(%)
"price": _f(o.get('bstp_nmix_prpr')), # 현재지수
"change": _f(o.get('bstp_nmix_prdy_ctrt')), # 등락률(%)
"change_val": _f(o.get('bstp_nmix_prdy_vrss')), # 전일 대비 포인트
"high": _f(o.get('bstp_nmix_hgpr')), # 장중 고가
"low": _f(o.get('bstp_nmix_lwpr')), # 장중 저가
"prev_close": _f(o.get('prdy_nmix')), # 전일 종가
"volume": _i(o.get('acml_vol')), # 누적 거래량(천주)
"trade_value": _i(o.get('acml_tr_pbmn')), # 누적 거래대금(백만원)
}
except Exception as e:
print(f"❌ 지수 조회 실패({ticker}): {e}")
@@ -533,9 +706,8 @@ class KISAsyncClient:
return None
async def get_daily_price_async(self, ticker):
"""비동기 일별 시세 조회"""
"""비동기 일별 시세 조회 (close only, 하위 호환)"""
import aiohttp
import asyncio
self.sync.ensure_token()
url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-price"
@@ -555,6 +727,52 @@ class KISAsyncClient:
return prices
return []
async def get_daily_ohlcv_async(self, ticker, count=100):
"""비동기 OHLCV 조회 (기간별시세 API 사용)"""
import aiohttp
from datetime import datetime, timedelta
self.sync.ensure_token()
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=int(count * 1.6))).strftime("%Y%m%d")
url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
headers = self.sync._get_headers(tr_id="FHKST03010100")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_INPUT_DATE_1": start_date,
"FID_INPUT_DATE_2": end_date,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1"
}
async with aiohttp.ClientSession() as session:
data = await self._async_get(session, url, headers, params)
if data and data.get('rt_cd') == '0':
output = data.get('output2', [])
opens, highs, lows, closes, volumes = [], [], [], [], []
for item in output:
try:
c = int(item.get('stck_clpr', 0) or 0)
if c > 0:
opens.append(int(item.get('stck_oprc', 0) or c))
highs.append(int(item.get('stck_hgpr', 0) or c))
lows.append(int(item.get('stck_lwpr', 0) or c))
closes.append(c)
volumes.append(int(item.get('acml_vol', 0) or 0))
except (ValueError, TypeError):
pass
if closes:
opens.reverse(); highs.reverse(); lows.reverse()
closes.reverse(); volumes.reverse()
return {
'open': opens[-count:], 'high': highs[-count:],
'low': lows[-count:], 'close': closes[-count:],
'volume': volumes[-count:]
}
return None
async def get_investor_trend_async(self, ticker):
"""비동기 투자자 동향 조회"""
import aiohttp
@@ -582,7 +800,7 @@ class KISAsyncClient:
return None
async def get_daily_prices_batch(self, tickers):
"""여러 종목의 일별 시세를 병렬로 조회"""
"""여러 종목의 일별 시세(close only)를 병렬로 조회 (하위 호환)"""
import aiohttp
import asyncio
@@ -592,7 +810,6 @@ class KISAsyncClient:
async with aiohttp.ClientSession() as session:
tasks = []
for i, ticker in enumerate(tickers):
# rate limit: 0.5초 간격으로 요청 생성
if i > 0:
await asyncio.sleep(self.min_interval)
@@ -617,6 +834,67 @@ class KISAsyncClient:
return results
async def get_daily_ohlcv_batch(self, tickers, count=100):
"""여러 종목의 OHLCV를 병렬로 조회 (기간별시세 API)"""
import aiohttp
import asyncio
from datetime import datetime, timedelta
self.sync.ensure_token()
results = {}
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=int(count * 1.6))).strftime("%Y%m%d")
async with aiohttp.ClientSession() as session:
tasks = []
for i, ticker in enumerate(tickers):
if i > 0:
await asyncio.sleep(self.min_interval)
url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
headers = self.sync._get_headers(tr_id="FHKST03010100")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_INPUT_DATE_1": start_date,
"FID_INPUT_DATE_2": end_date,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1"
}
tasks.append((ticker, self._async_get(session, url, headers, params)))
for ticker, task in tasks:
data = await task
if data and data.get('rt_cd') == '0':
output = data.get('output2', [])
opens, highs, lows, closes, volumes = [], [], [], [], []
for item in output:
try:
c = int(item.get('stck_clpr', 0) or 0)
if c > 0:
opens.append(int(item.get('stck_oprc', 0) or c))
highs.append(int(item.get('stck_hgpr', 0) or c))
lows.append(int(item.get('stck_lwpr', 0) or c))
closes.append(c)
volumes.append(int(item.get('acml_vol', 0) or 0))
except (ValueError, TypeError):
pass
if closes:
opens.reverse(); highs.reverse(); lows.reverse()
closes.reverse(); volumes.reverse()
results[ticker] = {
'open': opens[-count:], 'high': highs[-count:],
'low': lows[-count:], 'close': closes[-count:],
'volume': volumes[-count:]
}
else:
results[ticker] = None
else:
results[ticker] = None
return results
async def get_investor_trends_batch(self, tickers):
"""여러 종목의 투자자 동향을 병렬로 조회"""
import aiohttp

View File

@@ -28,6 +28,7 @@ class AsyncNewsCollector:
self._cache = None
self._cache_time = 0
self._cache_ttl = 300 # 5분
self._stock_cache = {} # {stock_name: (items, timestamp)}
def get_market_news(self, query="주식 시장"):
"""동기 인터페이스 (하위 호환)"""
@@ -62,11 +63,43 @@ class AsyncNewsCollector:
self._cache_time = now
return items
except ImportError:
# aiohttp 미설치 시 동기 fallback
return self.get_market_news(query)
except Exception as e:
print(f"[News Async] Collection failed: {e}")
# 캐시가 있으면 반환, 없으면 동기 fallback
if self._cache:
return self._cache
return self.get_market_news(query)
async def get_stock_news_async(self, stock_name, max_items=3):
"""종목별 뉴스 수집 (5분 캐싱)
stock_name: 종목 이름 (e.g. '삼성전자', 'SK하이닉스')
"""
now = time.time()
cached = self._stock_cache.get(stock_name)
if cached and (now - cached[1]) < self._cache_ttl:
return cached[0]
try:
import aiohttp
import urllib.parse
query = urllib.parse.quote(f"{stock_name} 주가")
url = f"https://news.google.com/rss/search?q={query}&hl=ko&gl=KR&ceid=KR:ko"
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
content = await resp.read()
root = ET.fromstring(content)
items = []
for item in root.findall(".//item")[:max_items]:
title_el = item.find("title")
if title_el is not None and title_el.text:
items.append({"title": title_el.text, "source": "Google News"})
self._stock_cache[stock_name] = (items, now)
return items
except Exception as e:
print(f"[News] 종목 뉴스 수집 실패 ({stock_name}): {e}")
return []
def clear_stock_cache(self):
"""종목 뉴스 캐시 전체 초기화"""
self._stock_cache.clear()

View File

@@ -113,20 +113,24 @@ class OllamaManager:
"model": self.model_name,
"prompt": prompt,
"stream": False,
"format": "json", # JSON 강제
"format": "json",
"options": {
"num_ctx": 8192, # [5070Ti 최적화] 컨텍스트 크기 2배 증가 (4096 -> 8192)
"temperature": 0.2, # 분석 일관성 유지
"num_gpu": 1, # GPU 사용 명시
"num_thread": 8 # CPU 스레드 수 (9800X3D 활용)
"num_ctx": Config.OLLAMA_NUM_CTX, # 4096 (속도 2배)
"num_predict": Config.OLLAMA_NUM_PREDICT, # 응답 토큰 제한
"temperature": 0.1, # 더 결정론적 (JSON 파싱 안정성)
"num_gpu": 1,
"num_thread": Config.OLLAMA_NUM_THREAD # Config 설정값 (기본 8)
},
"keep_alive": "10m" # [5070Ti 최적화] 10분간 유지 (메모리 여유 있음)
"keep_alive": "5m" # 5분 유지 (불필요한 VRAM 점유 줄임)
}
try:
response = requests.post(self.generate_url, json=payload, timeout=180) # 타임아웃 증가
response = requests.post(self.generate_url, json=payload, timeout=90) # 180→90초
response.raise_for_status()
return response.json().get('response')
except requests.exceptions.Timeout:
print(f"❌ Inference Timeout (90s): {self.model_name}")
return None
except Exception as e:
print(f"❌ Inference Error: {e}")
return None

View File

@@ -23,7 +23,7 @@ class TelegramMessenger:
payload = {
"chat_id": self.chat_id,
"text": message,
"parse_mode": "Markdown"
"parse_mode": "HTML"
}
try:
requests.post(url, json=payload, timeout=5)

View File

@@ -30,6 +30,9 @@ def run_telegram_bot_standalone(ipc_lock=None, command_queue=None, shutdown_even
# IPC 초기화 (shared memory + command queue)
ipc = SharedIPC(lock=ipc_lock, command_queue=command_queue)
conflict_retries = 0
MAX_CONFLICT_RETRIES = 10
while True:
# shutdown 체크
if shutdown_event and shutdown_event.is_set():
@@ -51,6 +54,7 @@ def run_telegram_bot_standalone(ipc_lock=None, command_queue=None, shutdown_even
if bot_server.should_restart:
print("[Telegram Bot] Restarting instance...")
conflict_retries = 0 # 정상 재시작 시 카운터 리셋
time.sleep(1)
continue
else:
@@ -61,11 +65,21 @@ def run_telegram_bot_standalone(ipc_lock=None, command_queue=None, shutdown_even
print("[Telegram Bot] Stopped by user")
break
except Exception as e:
if "Conflict" not in str(e):
if "Conflict" in str(e):
conflict_retries += 1
if conflict_retries >= MAX_CONFLICT_RETRIES:
print(f"[Telegram Bot] Conflict max retries ({MAX_CONFLICT_RETRIES}) reached. Exiting.")
break
wait_secs = min(5 * conflict_retries, 30)
print(f"[Telegram Bot] Conflict detected. Waiting {wait_secs}s before retry "
f"({conflict_retries}/{MAX_CONFLICT_RETRIES})...")
time.sleep(wait_secs)
continue
else:
print(f"[Telegram Bot] Error: {e}")
import traceback
traceback.print_exc()
break
break
# 정리
ipc.cleanup()

View File

@@ -7,9 +7,17 @@ import logging
import subprocess
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
# [디버깅] 파일 로깅 추가
log_file = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))),
"telegram_bot.log")
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
level=logging.INFO,
handlers=[logging.StreamHandler(), file_handler]
)
logging.getLogger("httpx").setLevel(logging.WARNING)
@@ -42,6 +50,7 @@ class TelegramBotServer:
return self.bot_instance is not None
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
logging.info(f"[Command] /start from user {update.effective_user.id}")
await update.message.reply_text(
"<b>AI Trading Bot Command Center</b>\n"
"명령어 목록:\n"
@@ -51,7 +60,8 @@ class TelegramBotServer:
"/update_watchlist - Watchlist 즉시 업데이트\n"
"/macro - 거시경제 지표 및 시장 위험도\n"
"/system - PC 리소스(CPU/GPU) 상태\n"
"/ai - AI 모델 학습 상태 조회\n\n"
"/ai - AI 모델 학습 상태 조회\n"
"/evaluate - 즉시 성과 평가 보고서 생성\n\n"
"<b>[관리 명령어]</b>\n"
"/restart - 메인 봇 재시작 요청\n"
"/exec <code>명령어</code> - 원격 명령어 실행\n"
@@ -60,6 +70,7 @@ class TelegramBotServer:
)
async def status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
logging.info(f"[Command] /status from user {update.effective_user.id}")
if not self.refresh_bot_instance():
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
@@ -172,34 +183,67 @@ class TelegramBotServer:
await update.message.reply_text("데이터가 아직 수집되지 않았습니다.")
return
status = "SAFE"
msi = indices.get('MSI', 0)
msi = float(indices.get('MSI', 0))
if msi >= 50:
status = "DANGER"
risk_status = "🔴 DANGER"
risk_desc = "시장 극도 불안정 - 매수 중단 권고"
elif msi >= 30:
status = "CAUTION"
risk_status = "🟡 CAUTION"
risk_desc = "시장 불안정 - 보수적 매매 권고"
else:
risk_status = "🟢 SAFE"
risk_desc = "시장 안정 - 정상 매매 가능"
msg = f"<b>Market Risk: {status}</b>\n\n"
from datetime import datetime
now_str = datetime.now().strftime("%m/%d %H:%M")
if 'MSI' in indices:
msg += f"<b>Stress Index:</b> <code>{indices['MSI']}</code>\n"
msg = f"<b>거시경제 지표</b> <code>{now_str}</code>\n"
msg += f"━━━━━━━━━━━━━━━━━━\n"
msg += f"<b>Market Risk:</b> {risk_status}\n"
msg += f"<i>{risk_desc}</i>\n\n"
for k, v in indices.items():
if k != "MSI":
change = float(v.get('change', 0))
price = v.get('price', 0)
if change > 0:
icon = "🔴"
chg_str = f"+{change}"
elif change < 0:
icon = "🔵"
chg_str = f"{change}"
else:
icon = ""
chg_str = f"{change}"
msg += f"{icon} <b>{k}</b>: {price} ({chg_str}%)\n"
# MSI 상세
msi_bar = "" * int(msi / 10) + "" * (10 - int(msi / 10))
msg += f"<b>Stress Index (MSI):</b> <code>{msi:.1f}/100</code>\n"
msg += f"<code>[{msi_bar}]</code>\n\n"
# 지수 상세
index_order = ["KOSPI", "KOSDAQ", "KOSPI200"]
for k in index_order:
if k not in indices:
continue
v = indices[k]
price = float(v.get('price', 0))
change = float(v.get('change', 0))
change_val = float(v.get('change_val', 0))
high = float(v.get('high', 0))
low = float(v.get('low', 0))
prev_close = float(v.get('prev_close', 0))
volume = int(v.get('volume', 0))
if price == 0:
msg += f"⚫ <b>{k}:</b> <i>데이터 없음 (장 마감 후)</i>\n\n"
continue
if change > 0:
icon = "🔴"
chg_str = f"+{change:.2f}% (+{change_val:.2f}pt)"
elif change < 0:
icon = "🔵"
chg_str = f"{change:.2f}% ({change_val:.2f}pt)"
else:
icon = ""
chg_str = f"{change:.2f}%"
msg += f"{icon} <b>{k}:</b> <code>{price:,.2f}</code> {chg_str}\n"
if high and low:
msg += f" 고: <code>{high:,.2f}</code> 저: <code>{low:,.2f}</code>"
if prev_close:
msg += f" 전일종가: <code>{prev_close:,.2f}</code>"
msg += "\n"
if volume:
msg += f" 거래량: <code>{volume:,}천주</code>\n"
msg += "\n"
await update.message.reply_text(msg, parse_mode="HTML")
@@ -256,10 +300,11 @@ class TelegramBotServer:
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
from modules.config import Config
gpu = self.bot_instance.ollama_monitor.get_gpu_status()
msg = "<b>AI Model Status</b>\n"
msg += "* <b>LLM Engine:</b> Ollama (Llama 3.1)\n"
msg += f"* <b>LLM Engine:</b> Ollama ({Config.OLLAMA_MODEL})\n"
msg += f"* <b>Device:</b> {gpu.get('name', 'GPU')}\n"
if gpu:
@@ -349,6 +394,29 @@ class TelegramBotServer:
except Exception as e:
await update.message.reply_text(f"실행 오류: {e}")
async def evaluate_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/evaluate: 즉시 성과 평가 보고서 생성 (LLM 분석 포함)"""
await update.message.reply_text(
"📊 성과 평가를 실행합니다...\n"
"<i>LLM 전문가 패널 분석 포함 시 30초~1분 소요됩니다.</i>",
parse_mode="HTML"
)
try:
from modules.utils.performance_db import PerformanceDB
from modules.analysis.evaluator import PerformanceEvaluator
evaluator = PerformanceEvaluator()
loop = asyncio.get_running_loop()
report = await loop.run_in_executor(None, evaluator.generate_weekly_report)
if len(report) > 4000:
report = report[:4000] + "\n... (일부 생략)"
await update.message.reply_text(report, parse_mode="HTML")
except Exception as e:
logging.error(f"[Command] /evaluate error: {e}")
await update.message.reply_text(f"평가 오류: {e}")
def run(self):
handlers = [
("start", self.start_command),
@@ -359,6 +427,7 @@ class TelegramBotServer:
("macro", self.macro_command),
("system", self.system_command),
("ai", self.ai_status_command),
("evaluate", self.evaluate_command),
("restart", self.restart_command),
("stop", self.stop_command),
("exec", self.exec_command)
@@ -377,6 +446,7 @@ class TelegramBotServer:
self.application.add_error_handler(error_handler)
logging.info("[Telegram] Command Server Started (Shared Memory IPC Mode).")
print("[Telegram] Command Server Started (Shared Memory IPC Mode).")
try:

View File

@@ -3,20 +3,25 @@ import json
import numpy as np
from modules.services.ollama import OllamaManager
from modules.analysis.technical import TechnicalAnalyzer
from modules.analysis.deep_learning import PricePredictor
from modules.analysis.deep_learning import ModelRegistry
# [최적화] 워커 프로세스별 전역 변수 (LSTM 모델 캐싱)
_lstm_predictor = None
# [최적화] 워커 프로세스별 전역 변수 (Ollama 캐싱)
_ollama_manager = None
def get_predictor():
"""워커 프로세스 내에서 PricePredictor 인스턴스를 싱글톤으로 관리"""
global _lstm_predictor
if _lstm_predictor is None:
print(f"[Worker {os.getpid()}] Initializing LSTM Predictor...")
_lstm_predictor = PricePredictor()
print(f"[Worker {os.getpid()}] LSTM Device: {_lstm_predictor.device}"
f" | AMP: {_lstm_predictor.use_amp}")
return _lstm_predictor
def get_predictor(ticker=None):
"""워커 프로세스 내에서 ModelRegistry로 종목별 PricePredictor 관리"""
registry = ModelRegistry.get_instance()
return registry.get_predictor(ticker or "default")
def get_ollama():
"""워커 프로세스 내에서 OllamaManager 인스턴스를 싱글톤으로 관리
- 종목마다 새 인스턴스를 만들면 Ollama에 동시 요청이 폭주해 데드락 발생"""
global _ollama_manager
if _ollama_manager is None:
_ollama_manager = OllamaManager()
return _ollama_manager
def calculate_position_size(total_capital, current_price, volatility, score, ai_confidence,
@@ -40,9 +45,8 @@ def calculate_position_size(total_capital, current_price, volatility, score, ai_
base_invest = total_capital * 0.10
# 2. 변동성 조절 계수 (변동성 높을수록 투자금 감소)
# 변동성 1% → 1.0배, 2% → 0.75배, 3% → 0.5배, 5%+ → 0.3배
if volatility <= 1.0:
vol_factor = 1.2 # 안정적 종목은 약간 증가
vol_factor = 1.2
elif volatility <= 2.0:
vol_factor = 1.0
elif volatility <= 3.0:
@@ -50,10 +54,9 @@ def calculate_position_size(total_capital, current_price, volatility, score, ai_
elif volatility <= 5.0:
vol_factor = 0.45
else:
vol_factor = 0.3 # 고변동 종목
vol_factor = 0.3
# 3. 확신도 조절 계수 (score가 높을수록 투자금 증가)
# score 0.6 → 0.5배, 0.7 → 1.0배, 0.8 → 1.5배, 0.9+ → 2.0배
# 3. 확신도 조절 계수
if score >= 0.85:
conf_factor = 2.0
elif score >= 0.75:
@@ -73,45 +76,85 @@ def calculate_position_size(total_capital, current_price, volatility, score, ai_
# 5. 최종 투자금 계산
invest_amount = base_invest * vol_factor * conf_factor * ai_bonus
# 상한 제한
invest_amount = min(invest_amount, max_per_stock)
invest_amount = min(invest_amount, total_capital * 0.15) # 최대 15%
invest_amount = min(invest_amount, total_capital) # 잔고 초과 방지
invest_amount = min(invest_amount, total_capital * 0.15)
invest_amount = min(invest_amount, total_capital)
# 수량 계산
qty = int(invest_amount / current_price)
return max(0, qty)
def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
def analyze_stock_process(ticker, ohlcv_data, news_items, investor_trend=None,
macro_status=None, holding_info=None):
"""
[v2.0] 종목 분석 + 매매 판단 (ProcessPoolExecutor에서 실행)
[v3.0] 종목 분석 + 매매 판단 (ProcessPoolExecutor에서 실행)
[v2.0 개선사항]
1. ATR 기반 동적 손절/익절 + 트레일링 스탑
2. 포지션 사이징 (변동성 + 확신도 기반)
3. 시장상황별 동적 매수/매도 임계값
4. 보유종목에 대한 분석 기반 매도 판단
5. ADX/OBV/MTF 통합 기술적 분석
6. 강화된 AI 프롬프트 (종목 고유 뉴스 분석)
[v3.0 개선사항]
1. OHLCV 전체 수신 (실제 고가/저가/거래량 사용)
2. 종목별 ModelRegistry (가중치 덮어쓰기 방지)
3. 강화된 LLM 프롬프트 (거시경제 상태, 볼린저밴드, 거래량 급증, 보유 수익률)
"""
try:
print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles)...")
# OHLCV 데이터 분리 (하위호환: list 형태도 허용)
if isinstance(ohlcv_data, dict):
prices = ohlcv_data.get('close', [])
high_prices = ohlcv_data.get('high') or None
low_prices = ohlcv_data.get('low') or None
volume_history = ohlcv_data.get('volume') or None
open_prices = ohlcv_data.get('open') or None
else:
# 하위 호환: 기존 close 리스트
prices = ohlcv_data if isinstance(ohlcv_data, list) else []
high_prices = None
low_prices = None
volume_history = None
open_prices = None
# volume이 모두 0이거나 비어있으면 None 처리
if volume_history and all(v == 0 for v in volume_history):
volume_history = None
print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles, "
f"OHLCV={'yes' if high_prices else 'close-only'}, "
f"Vol={'yes' if volume_history else 'no'})...")
# ===== 1. 기술적 지표 계산 =====
current_price = prices[-1] if prices else 0
tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(
current_price, prices, volume_history=None)
current_price, prices, volume_history=volume_history)
# ===== 2. ATR 기반 동적 손절/익절 =====
sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp(prices)
# ===== 2. ATR 기반 동적 손절/익절 (실제 고가/저가 사용) =====
sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp(
prices, high_prices=high_prices, low_prices=low_prices)
# ===== 3. LSTM 주가 예측 =====
lstm_predictor = get_predictor()
# ===== 3. 볼린저밴드 위치 계산 =====
bb_upper, bb_mid, bb_lower = TechnicalAnalyzer.calculate_bollinger_bands(prices)
if bb_upper > bb_lower:
bb_pos = (current_price - bb_lower) / (bb_upper - bb_lower) # 0=하단, 1=상단
if bb_pos <= 0.2:
bb_zone = "하단(과매도)"
elif bb_pos >= 0.8:
bb_zone = "상단(과매수)"
else:
bb_zone = f"중간({bb_pos:.0%})"
else:
bb_pos = 0.5
bb_zone = "중간"
# ===== 4. LSTM 주가 예측 (ModelRegistry 사용) =====
lstm_predictor = get_predictor(ticker)
if lstm_predictor:
lstm_predictor.training_status['current_ticker'] = ticker
pred_result = lstm_predictor.train_and_predict(prices, ticker=ticker)
# LSTM에 전달할 OHLCV 딕셔너리 구성
lstm_ohlcv = {
'close': prices,
'open': open_prices or prices,
'high': high_prices or prices,
'low': low_prices or prices,
'volume': volume_history or []
}
pred_result = lstm_predictor.train_and_predict(lstm_ohlcv, ticker=ticker)
lstm_score = 0.5
ai_confidence = 0.5
@@ -130,7 +173,7 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
lstm_score = max(0.0, min(1.0, lstm_score))
# ===== 4. 수급 분석 (외인/기관) =====
# ===== 5. 수급 분석 (외인/기관) =====
investor_score = 0.0
frgn_net_buy = 0
orgn_net_buy = 0
@@ -146,26 +189,23 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
if day['institutional'] > 0:
consecutive_orgn_buy += 1
# 외인 수급 점수 (강화)
if frgn_net_buy > 0:
investor_score += 0.03
if consecutive_frgn_buy >= 3:
investor_score += 0.04
if consecutive_frgn_buy >= 5:
investor_score += 0.03 # 5일 연속 매수 = 추가 보너스
investor_score += 0.03
# 기관 수급 점수 (신규)
if orgn_net_buy > 0:
investor_score += 0.02
if consecutive_orgn_buy >= 3:
investor_score += 0.03
# 외인+기관 동시 순매수 = 강력 신호
if frgn_net_buy > 0 and orgn_net_buy > 0:
investor_score += 0.03
print(f" 💰 [Investor] Both Foreign & Institutional Buying!")
# ===== 5. AI 뉴스 분석 (강화된 프롬프트) =====
# ===== 6. AI 뉴스 분석 (강화된 프롬프트) =====
if pred_result:
pred_price = pred_result.get('predicted', 0)
pred_change = pred_result.get('change_rate', 0)
@@ -173,65 +213,59 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
pred_price = current_price
pred_change = 0.0
ollama = OllamaManager()
prompt = f"""
[System Instruction]
1. Role: You are a legendary quant trader with 30 years of experience in Korean stock market.
2. You MUST analyze the data objectively and respond with a JSON object.
news_summary = "; ".join(
[n.get('title', '') for n in (news_items or [])[:3] if n.get('title')]
) or "뉴스 없음"
[Market Data for Stock {ticker}]
- Current Price: {current_price:,.0f} KRW
- Technical Score: {tech_score:.3f} (RSI: {rsi:.1f})
- Moving Average: {ma_info['trend']} (Price is {ma_info['position']})
- ADX Trend Strength: {ma_info.get('adx', 20):.1f} ({ma_info.get('adx_trend', 'N/A')})
- Multi-Timeframe: {ma_info.get('mtf_alignment', 'N/A')}
- AI Prediction: {pred_price:.0f} KRW ({pred_change:+.2f}%)
- AI Confidence: {ai_confidence:.2f} (Training Loss: {ai_loss:.4f})
- Volatility: {volatility:.2f}%
- Volume Ratio: {vol_ratio:.1f}x
- ATR Stop Loss: {sl_tp['stop_loss_pct']:.1f}% / Take Profit: {sl_tp['take_profit_pct']:.1f}%
- Investor Trend (5 Days): Foreigner Net Buy {frgn_net_buy}, Institutional Net Buy {orgn_net_buy}
# 거시경제 상태
macro_state = macro_status.get('status', 'SAFE') if macro_status else 'SAFE'
[Decision Framework]
- Strong BUY signals: Foreigners+Institutions buying, Golden Cross, ADX>25 with bullish trend, AI high confidence UP
- Moderate BUY: RSI<40 with bullish reversal, Price near Bollinger Lower Band
- SELL signals: RSI>70, Dead Cross, ADX>25 with bearish trend, Foreigners selling
- AVOID/HOLD: ADX<20 (sideways), Mixed signals, Low confidence
# 거래량 급증 여부
vol_surge = "급증(x{:.1f})".format(vol_ratio) if vol_ratio >= 2.0 else "정상"
[News Data]
{json.dumps(news_items[:5] if news_items else [], ensure_ascii=False)}
# 보유종목 수익률
holding_yield_str = ""
if holding_info and holding_info.get('qty', 0) > 0:
yld = holding_info.get('yield', 0.0)
holding_yield_str = f" | 보유수익률={yld:+.1f}%"
[Response Format - JSON Only]
{{"sentiment_score": 0.0 to 1.0, "reason": "Brief analysis reason"}}
"""
ollama = get_ollama()
prompt = (
f"Korean stock analyst. JSON only: {{\"sentiment_score\":0.0-1.0,\"reason\":\"1 sentence\"}}\n"
f"Stock {ticker}{current_price:,.0f}{holding_yield_str}\n"
f"Market={macro_state} | "
f"Tech={tech_score:.2f} RSI={rsi:.1f} MA={ma_info['trend']} ADX={ma_info.get('adx',20):.0f} "
f"MTF={ma_info.get('mtf_alignment','N/A')}\n"
f"BB={bb_zone} | AI={pred_change:+.2f}% conf={ai_confidence:.0%} | "
f"Vol={volatility:.1f}% VolRatio={vol_surge}\n"
f"Flow: Frgn={frgn_net_buy:+,}({consecutive_frgn_buy}d) "
f"Inst={orgn_net_buy:+,}({consecutive_orgn_buy}d)\n"
f"News: {news_summary}"
)
ai_resp = ollama.request_inference(prompt)
sentiment_score = 0.5
ai_reason = ""
try:
data = json.loads(ai_resp)
sentiment_score = float(data.get("sentiment_score", 0.5))
sentiment_score = max(0.0, min(1.0, sentiment_score)) # 범위 강제
sentiment_score = max(0.0, min(1.0, sentiment_score))
ai_reason = data.get("reason", "")
except Exception:
print(f" ⚠️ AI response parse failed, using neutral (0.5)")
# ===== 6. 통합 점수 (동적 가중치 v2.0) =====
# ADX가 높으면 (추세가 강하면) LSTM과 기술적 분석 비중 증가
# ===== 7. 통합 점수 (동적 가중치 v2.0) =====
adx_val = ma_info.get('adx', 20)
if ai_confidence >= 0.85 and adx_val >= 25:
# 강한 추세 + 높은 AI 신뢰도: AI 최우선
w_tech, w_news, w_ai = 0.15, 0.15, 0.70
print(f" 🤖 [Ultra High Confidence + Strong Trend] AI Weight 70%")
elif ai_confidence >= 0.85:
w_tech, w_news, w_ai = 0.20, 0.20, 0.60
print(f" 🤖 [High Confidence] AI Weight 60%")
elif adx_val >= 30:
# 매우 강한 추세: 기술적 분석 우선
w_tech, w_news, w_ai = 0.50, 0.20, 0.30
print(f" 📊 [Very Strong Trend ADX={adx_val:.0f}] Tech Weight 50%")
elif adx_val < 20:
# 비추세/횡보: 뉴스와 수급 중시
w_tech, w_news, w_ai = 0.30, 0.40, 0.30
print(f" 📰 [Sideways ADX={adx_val:.0f}] News Weight 40%")
else:
@@ -239,63 +273,53 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score)
# 수급 가산점 (최대 +0.15)
total_score += min(investor_score, 0.15)
total_score = min(total_score, 1.0)
# ===== 7. 시장 상황별 동적 임계값 =====
# ===== 8. 시장 상황별 동적 임계값 =====
buy_threshold = 0.60
sell_threshold = 0.30
if macro_status:
macro_state = macro_status.get('status', 'SAFE')
if macro_state == 'DANGER':
buy_threshold = 999.0 # 매수 완전 차단
sell_threshold = 0.45 # 매도 기준 상향 (빨리 탈출)
buy_threshold = 999.0
sell_threshold = 0.45
print(f" 🚨 [DANGER Market] Buy BLOCKED, Sell threshold raised to 0.45")
elif macro_state == 'CAUTION':
buy_threshold = 0.72 # 매수 기준 대폭 상향 (보수적)
sell_threshold = 0.38 # 매도 기준도 상향
buy_threshold = 0.72
sell_threshold = 0.38
print(f" ⚠️ [CAUTION Market] Buy threshold raised to 0.72")
# ===== 8. 매매 결정 =====
# ===== 9. 매매 결정 =====
decision = "HOLD"
decision_reason = ""
# --- 보유 종목 분석 기반 매도 (신규) ---
if holding_info:
holding_yield = holding_info.get('yield', 0.0)
holding_qty = holding_info.get('qty', 0)
peak_price = holding_info.get('peak_price', current_price)
if holding_qty > 0:
# A. 동적 손절 (ATR 기반)
if holding_yield <= sl_tp['stop_loss_pct']:
decision = "SELL"
decision_reason = f"Dynamic Stop Loss ({holding_yield:.1f}% <= {sl_tp['stop_loss_pct']:.1f}%)"
# B. 동적 익절 (ATR 기반)
elif holding_yield >= sl_tp['take_profit_pct']:
decision = "SELL"
decision_reason = f"Dynamic Take Profit ({holding_yield:.1f}% >= {sl_tp['take_profit_pct']:.1f}%)"
# C. 트레일링 스탑 (최고가 대비 하락)
elif peak_price > 0:
drop_from_peak = ((current_price - peak_price) / peak_price) * 100
if drop_from_peak <= -sl_tp['trailing_stop_pct'] and holding_yield > 2.0:
# 수익 상태에서만 트레일링 스탑 작동 (2% 이상 수익 확보)
decision = "SELL"
decision_reason = (f"Trailing Stop ({drop_from_peak:.1f}% from peak, "
f"threshold: -{sl_tp['trailing_stop_pct']:.1f}%)")
# D. 분석 기반 매도 (점수가 매도 임계값 이하)
if decision == "HOLD" and total_score <= sell_threshold:
decision = "SELL"
decision_reason = f"Analysis Signal (Score: {total_score:.2f} <= {sell_threshold:.2f})"
# E. 추세 반전 매도 (ADX 강한 하락추세)
if decision == "HOLD" and adx_val >= 30:
plus_di = ma_info.get('adx', 0) # 참고용
mtf_align = ma_info.get('mtf_alignment', '')
if mtf_align == 'STRONG_BEAR' and holding_yield < 0:
decision = "SELL"
@@ -303,11 +327,9 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
# --- 매수 판단 ---
if decision == "HOLD":
# 강한 단일 신호 매수 (기준 강화)
strong_signal = False
strong_reason = ""
# [강화] 복합 조건 매수 (단일 지표가 아닌 복합 조건)
if tech_score >= 0.75 and lstm_score >= 0.6 and sentiment_score >= 0.6:
strong_signal = True
strong_reason = "Triple Confirmation (Tech+AI+News)"
@@ -322,7 +344,6 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
strong_reason = f"Strong Multi-Timeframe Bullish + Tech {tech_score:.2f}"
if strong_signal and total_score >= buy_threshold - 0.05:
# 강한 신호는 임계값 약간 완화 허용
decision = "BUY"
decision_reason = strong_reason
print(f" 🎯 [{strong_reason}] → BUY!")
@@ -330,10 +351,9 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
decision = "BUY"
decision_reason = f"Score {total_score:.2f} >= threshold {buy_threshold:.2f}"
# ===== 9. 포지션 사이징 =====
# ===== 10. 포지션 사이징 =====
suggested_qty = 0
if decision == "BUY":
# 기본 자산 1000만원으로 가정 (실제 run_cycle에서 덮어씀)
suggested_qty = calculate_position_size(
total_capital=10000000,
current_price=current_price,

View File

@@ -59,6 +59,7 @@ class SharedIPC:
def read_status(self):
"""텔레그램 봇이 상태를 shared memory에서 읽기"""
raw = None
try:
shm = self._ensure_shm()
@@ -66,13 +67,15 @@ class SharedIPC:
self.lock.acquire()
try:
length = struct.unpack_from('I', shm.buf, 0)[0]
if length == 0 or length > Config.SHM_SIZE - 4:
return None
raw = bytes(shm.buf[4:4 + length])
if length > 0 and length <= Config.SHM_SIZE - 4:
raw = bytes(shm.buf[4:4 + length])
finally:
if self.lock:
self.lock.release()
if not raw:
return None
ipc_data = json.loads(raw.decode('utf-8'))
age = time.time() - ipc_data.get('timestamp', 0)

View File

@@ -1,6 +1,8 @@
import psutil
from datetime import datetime
from modules.config import Config
class SystemMonitor:
def __init__(self, messenger, ollama_manager):
@@ -8,52 +10,101 @@ class SystemMonitor:
self.ollama_monitor = ollama_manager
self.last_health_check = datetime.now()
# CPU 서킷 브레이커 상태
self._cpu_overload_count = 0 # 연속 과부하 횟수
self._circuit_open = False # 서킷 브레이커 발동 여부
self._circuit_open_since = None
def is_cpu_critical(self):
"""서킷 브레이커가 발동 상태인지 반환 (True이면 분석 사이클 스킵)"""
return self._circuit_open
def reset_circuit(self):
"""서킷 브레이커 수동 리셋"""
if self._circuit_open:
print("[Monitor] CPU Circuit Breaker RESET")
self._circuit_open = False
self._cpu_overload_count = 0
self._circuit_open_since = None
def check_health(self):
"""시스템 상태 점검 및 알림 (CPU, RAM, GPU) - 3분마다 실행"""
now = datetime.now()
if (now - self.last_health_check).total_seconds() < 180: # 5분 → 3분
if (now - self.last_health_check).total_seconds() < 180:
return
self.last_health_check = now
alerts = []
# 1. CPU Check (non-blocking 측정)
cpu_usage = psutil.cpu_percent(interval=0)
# 1. CPU Check
cpu_usage = psutil.cpu_percent(interval=1) # 1초 측정 (더 정확)
if cpu_usage > 90:
# 검증: 상위 프로세스 CPU 합계 확인
if cpu_usage > Config.CPU_CIRCUIT_BREAKER_THRESHOLD:
self._cpu_overload_count += 1
# 상위 프로세스 조회
top_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
if proc.info['name'] == 'System Idle Process':
if proc.info['name'] in ('System Idle Process', 'Idle'):
continue
top_processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
top_processes.sort(key=lambda x: x['cpu_percent'], reverse=True)
total_top_cpu = sum(p['cpu_percent'] for p in top_processes[:3])
top_3_str = ""
for p in top_processes[:3]:
top_3_str += f"\n- {p['name']} ({p['cpu_percent']}%)"
if total_top_cpu >= 30.0:
top_3_str = ""
for p in top_processes[:3]:
top_3_str += f"\n- {p['name']} ({p['cpu_percent']}%)"
alerts.append(f"[CPU Overload] Usage: {cpu_usage}%\nTop Processes:{top_3_str}")
# 서킷 브레이커 발동 조건
if self._cpu_overload_count >= Config.CPU_CIRCUIT_BREAKER_CONSECUTIVE:
if not self._circuit_open:
self._circuit_open = True
self._circuit_open_since = now
alerts.append(
f"🔴 [CPU Circuit Breaker OPEN] {cpu_usage}% × {self._cpu_overload_count}회 연속\n"
f"⛔ 분석 사이클 일시 중단 (5분 후 자동 복구)\nTop Processes:{top_3_str}"
)
print(f"[Monitor] CPU Circuit Breaker OPEN! CPU={cpu_usage}%")
else:
alerts.append(
f"⚠️ [CPU Overload] Usage: {cpu_usage}% ({self._cpu_overload_count}회)\nTop Processes:{top_3_str}"
)
else:
# CPU 정상 → 카운터 리셋
if self._cpu_overload_count > 0:
print(f"[Monitor] CPU 정상화 ({cpu_usage}%). 카운터 리셋.")
self._cpu_overload_count = 0
# 서킷 브레이커가 열린 후 5분 경과 시 자동 복구
if self._circuit_open and self._circuit_open_since:
elapsed = (now - self._circuit_open_since).total_seconds()
if elapsed >= 300: # 5분
self._circuit_open = False
self._circuit_open_since = None
alerts.append("✅ [CPU Circuit Breaker CLOSED] 시스템 안정화. 분석 재개.")
print("[Monitor] CPU Circuit Breaker CLOSED. 분석 재개.")
# 2. RAM Check
ram = psutil.virtual_memory()
if ram.percent > 90:
alerts.append(f"[RAM High] Usage: {ram.percent}% (Free: {ram.available / 1024**3:.1f}GB)")
alerts.append(f"⚠️ [RAM High] Usage: {ram.percent}% (Free: {ram.available / 1024**3:.1f}GB)")
# 3. GPU Check
if self.ollama_monitor:
gpu_status = self.ollama_monitor.get_gpu_status()
temp = gpu_status.get('temp', 0)
if temp > 80:
alerts.append(f"[GPU Overheat] Temp: {temp}C")
alerts.append(f"🔥 [GPU Overheat] Temp: {temp}°C")
# 알림 전송
# 알림 전송 (텔레그램 비활성화 - 콘솔 로그만 사용)
if alerts:
msg = "[System Health Alert]\n" + "\n".join(alerts)
if self.messenger:
self.messenger.send_message(msg)
# 콘솔에만 출력
for alert in alerts:
print(f"[Monitor] {alert}")
# [비활성화] 텔레그램 알림 - 필요시 재활성화
# msg = "🔔 <b>[System Health Alert]</b>\n" + "\n".join(alerts)
# if self.messenger:
# self.messenger.send_message(msg)

View File

@@ -0,0 +1,211 @@
"""
성과 데이터 영구 저장 - PerformanceDB
데이터 파일:
data/performance/daily_snapshots.json - 일별 자산 스냅샷
data/performance/trade_records.json - 강화 매매 기록 (영구 보관)
"""
import os
import json
from datetime import datetime, timedelta
from modules.config import Config
PERF_DIR = os.path.join(Config.DATA_DIR, "performance")
SNAPSHOTS_FILE = os.path.join(PERF_DIR, "daily_snapshots.json")
TRADES_FILE = os.path.join(PERF_DIR, "trade_records.json")
class PerformanceDB:
def __init__(self):
os.makedirs(PERF_DIR, exist_ok=True)
self._snapshots = self._load_json(SNAPSHOTS_FILE, [])
self._trades = self._load_json(TRADES_FILE, [])
def _load_json(self, path, default):
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"[PerformanceDB] Load failed {path}: {e}")
return default
return default
def _save_json(self, path, data):
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[PerformanceDB] Save failed {path}: {e}")
# ─────────────────────────────────────────
# 일별 스냅샷
# ─────────────────────────────────────────
def save_daily_snapshot(self, total_eval, deposit, holdings_count, benchmark_close=None):
"""일별 자산 스냅샷 저장 (하루 1회 호출 권장).
Args:
total_eval (int): 총 평가액 (원)
deposit (int): 예수금 (원)
holdings_count (int): 보유 종목 수
benchmark_close (float|None): KOSPI 현재가 (벤치마크 비교용)
"""
today = datetime.now().strftime("%Y-%m-%d")
# 오늘 이미 저장된 스냅샷이 있으면 업데이트
for snap in self._snapshots:
if snap.get("date") == today:
snap["total_eval"] = total_eval
snap["deposit"] = deposit
snap["holdings_count"] = holdings_count
if benchmark_close is not None:
snap["benchmark_kospi_close"] = benchmark_close
self._save_json(SNAPSHOTS_FILE, self._snapshots)
return
# 일별/누적 수익률 계산
daily_return_pct = 0.0
cumulative_return_pct = 0.0
if self._snapshots:
prev_eval = self._snapshots[-1].get("total_eval", 0)
if prev_eval > 0:
daily_return_pct = (total_eval - prev_eval) / prev_eval * 100
initial_capital = self.get_initial_capital()
if initial_capital and initial_capital > 0:
cumulative_return_pct = (total_eval - initial_capital) / initial_capital * 100
snap = {
"date": today,
"total_eval": total_eval,
"deposit": deposit,
"holdings_count": holdings_count,
"benchmark_kospi_close": benchmark_close,
"daily_return_pct": round(daily_return_pct, 4),
"cumulative_return_pct": round(cumulative_return_pct, 4)
}
self._snapshots.append(snap)
self._save_json(SNAPSHOTS_FILE, self._snapshots)
print(f"[PerformanceDB] Snapshot saved: {today} "
f"total={total_eval:,}원 daily={daily_return_pct:+.2f}%")
# ─────────────────────────────────────────
# 매매 기록
# ─────────────────────────────────────────
def save_trade_record(self, action, ticker, name, qty, price,
scores_dict=None, reason="", macro_state="SAFE"):
"""매수/매도 기록 저장.
Args:
action (str): "BUY" | "SELL"
ticker (str): 종목 코드
name (str): 종목명
qty (int): 수량
price (float): 체결가
scores_dict (dict|None): 분석 점수 딕셔너리
{tech, sentiment, lstm_score, score, ai_confidence, prediction_change}
reason (str): 매매 사유
macro_state (str): 매크로 상태 ("SAFE"/"CAUTION"/"DANGER")
"""
sd = scores_dict or {}
now_iso = datetime.now().isoformat()
trade = {
"id": f"{ticker}_{now_iso}",
"action": action,
"ticker": ticker,
"name": name,
"qty": qty,
"price": price,
"timestamp": now_iso,
"reason": reason,
"macro_state": macro_state,
# 점수 (BUY 시에만 의미 있음)
"tech_score": float(sd.get("tech", 0.0)),
"sentiment_score": float(sd.get("sentiment", 0.0)),
"lstm_score": float(sd.get("lstm_score", 0.0)),
"total_score": float(sd.get("score", 0.0)),
"ai_confidence": float(sd.get("ai_confidence", 0.5)),
"ai_prediction_change": float(sd.get("prediction_change", 0.0)),
# 매도 후 채워지는 결과 필드
"outcome_return_pct": None,
"holding_days": None,
"closed_at": None
}
self._trades.append(trade)
self._save_json(TRADES_FILE, self._trades)
def close_trade(self, ticker, sell_price, sell_yield_pct=None):
"""가장 최근 미체결 BUY를 찾아 매도 결과를 기록.
Args:
ticker (str): 종목 코드
sell_price (float): 매도 체결가
sell_yield_pct (float|None): KIS에서 받은 수익률 (보조용)
"""
for trade in reversed(self._trades):
if (trade.get("ticker") == ticker
and trade.get("action") == "BUY"
and trade.get("outcome_return_pct") is None):
buy_price = trade.get("price", 0)
if buy_price and buy_price > 0:
outcome_return_pct = (sell_price - buy_price) / buy_price * 100
elif sell_yield_pct is not None:
outcome_return_pct = sell_yield_pct
else:
outcome_return_pct = 0.0
# 보유일 계산
holding_days = 0
buy_ts = trade.get("timestamp", "")
if buy_ts:
try:
buy_dt = datetime.fromisoformat(buy_ts)
holding_days = (datetime.now() - buy_dt).days
except Exception:
pass
trade["outcome_return_pct"] = round(outcome_return_pct, 4)
trade["holding_days"] = holding_days
trade["closed_at"] = datetime.now().isoformat()
self._save_json(TRADES_FILE, self._trades)
print(f"[PerformanceDB] Trade closed: {ticker} "
f"return={outcome_return_pct:.2f}% holding={holding_days}d")
return
print(f"[PerformanceDB] No open BUY found for {ticker}")
# ─────────────────────────────────────────
# 조회
# ─────────────────────────────────────────
def load_snapshots(self, days=90):
"""최근 N일 스냅샷 반환."""
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
return [s for s in self._snapshots if s.get("date", "") >= cutoff]
def load_trades(self, days=90):
"""최근 N일 매매 기록 반환."""
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
return [t for t in self._trades if t.get("timestamp", "")[:10] >= cutoff]
def get_initial_capital(self):
"""첫 스냅샷 기준 초기 자본 반환."""
if self._snapshots:
return self._snapshots[0].get("total_eval", 0)
return 0
def get_summary(self):
"""간단한 현황 딕셔너리 반환 (디버깅용)."""
return {
"total_snapshots": len(self._snapshots),
"total_trades": len(self._trades),
"closed_trades": sum(1 for t in self._trades
if t.get("outcome_return_pct") is not None),
"initial_capital": self.get_initial_capital()
}

139
warmup_and_restart.py Normal file
View File

@@ -0,0 +1,139 @@
"""
LSTM 사전학습 + 봇 자동 시작 통합 스크립트
- 구 봇 종료 후 이 스크립트가 백그라운드로 실행됨
- 12개 종목 LSTM 사전학습 → 완료 시 main_server.py 자동 시작
"""
import sys
import os
import time
import subprocess
import json
# 프로젝트 루트를 path에 추가
ROOT = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, ROOT)
os.chdir(ROOT)
LOG_FILE = os.path.join(ROOT, "warmup.log")
def log(msg):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
print(line, flush=True)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(line + "\n")
def run_warmup():
log("=" * 50)
log("LSTM 사전학습 시작 (Warmup)")
log("=" * 50)
try:
from modules.config import Config
from modules.services.kis import KISClient
from modules.analysis.deep_learning import PricePredictor
from modules.services.telegram import TelegramMessenger
messenger = TelegramMessenger()
messenger.send_message(
"🔄 <b>[Bot Restarting]</b>\n"
"LSTM 사전학습 중... 완료 후 자동 시작됩니다.\n"
f"대상: Watchlist 전체 종목"
)
kis = KISClient()
with open(Config.WATCHLIST_FILE, "r", encoding="utf-8") as f:
watchlist = json.load(f)
log(f"대상 종목: {len(watchlist)}")
predictor = PricePredictor()
total = len(watchlist)
success = 0
failed = 0
total_time = 0.0
for i, (ticker, name) in enumerate(watchlist.items(), 1):
log(f"[{i}/{total}] {name} ({ticker}) 학습 중...")
try:
# 100일 데이터로 LSTM 학습 (기간별시세 API)
prices = kis.get_daily_price(ticker, count=100)
if not prices or len(prices) < 70:
log(f" ⚠️ 데이터 부족 ({len(prices) if prices else 0}개)")
failed += 1
continue
t0 = time.time()
result = predictor.train_and_predict(prices, ticker=ticker)
elapsed = time.time() - t0
total_time += elapsed
if result:
log(f"{result['epochs']}에포크 | loss={result['loss']:.6f} | "
f"{result['change_rate']:+.2f}% | {elapsed:.1f}")
success += 1
else:
log(f" ⚠️ 결과 없음 ({elapsed:.1f}초)")
failed += 1
# KIS API 과호출 방지
time.sleep(0.5)
except Exception as e:
log(f" ❌ 오류: {e}")
failed += 1
log("=" * 50)
log(f"Warmup 완료: 성공 {success}개 / 실패 {failed}개 / 총 {total_time:.0f}")
log("=" * 50)
messenger.send_message(
f"✅ <b>[Warmup 완료]</b>\n"
f"성공: {success}종목 / 실패: {failed}종목\n"
f"소요: {total_time/60:.1f}\n"
f"→ 봇 시작 중..."
)
return True
except Exception as e:
log(f"❌ Warmup 실패: {e}")
import traceback
log(traceback.format_exc())
return False
def start_bot():
log("봇 시작: main_server.py")
try:
# 새 콘솔 창에서 봇 실행 (Windows)
if sys.platform == "win32":
subprocess.Popen(
[sys.executable, "main_server.py"],
creationflags=subprocess.CREATE_NEW_CONSOLE,
cwd=ROOT
)
else:
subprocess.Popen(
[sys.executable, "main_server.py"],
cwd=ROOT
)
log("✅ 봇 프로세스 시작 완료")
return True
except Exception as e:
log(f"❌ 봇 시작 실패: {e}")
return False
if __name__ == "__main__":
# warmup.log 초기화
with open(LOG_FILE, "w", encoding="utf-8") as f:
f.write(f"Warmup 시작: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
warmup_ok = run_warmup()
if warmup_ok:
time.sleep(2)
start_bot()
else:
log("⚠️ Warmup 실패. 봇을 수동으로 시작해주세요: python main_server.py")