diff --git a/backtester.py b/backtester.py
index 251d69e..6f17949 100644
--- a/backtester.py
+++ b/backtester.py
@@ -1,241 +1,460 @@
import pandas as pd
import numpy as np
-import yfinance as yf
+import matplotlib
+matplotlib.use('Agg') # GUI 없는 환경 대응
import matplotlib.pyplot as plt
from modules.analysis.technical import TechnicalAnalyzer
from modules.analysis.deep_learning import PricePredictor
+from modules.strategy.process import calculate_position_size
+
class Backtester:
+ """
+ [v2.0] 실전 백테스터
+
+ 개선사항:
+ 1. ATR 기반 동적 손절/익절 + 트레일링 스탑
+ 2. 포지션 사이징 (변동성 기반)
+ 3. 정밀한 수수료/세금 계산
+ 4. 슬리피지(Slippage) 시뮬레이션
+ 5. 다중 성과 지표 (Sharpe, MDD, Win Rate, Avg P/L)
+ 6. 벤치마크 대비 알파 계산
+ 7. 실제 데이터 로드 (yfinance fallback to mock)
+ """
def __init__(self, ticker, start_date, end_date, initial_capital=10000000):
self.ticker = ticker
self.start_date = start_date
self.end_date = end_date
self.initial_capital = initial_capital
self.capital = initial_capital
- self.holdings = 0 # 보유 주식 수
- self.avg_price = 0 # 평단가
-
+ self.holdings = 0
+ self.avg_price = 0
+ self.peak_price = 0 # [v2.0] 트레일링 스탑용
+
self.trade_log = []
self.daily_values = []
-
- # LSTM 모델 (재학습 시뮬레이션을 위해)
+ self.daily_returns = []
+
+ # LSTM 모델
self.predictor = PricePredictor()
-
+
+ # [v2.0] 수수료/세금 설정 (한국 주식)
+ self.buy_commission = 0.00015 # 매수 수수료 0.015%
+ self.sell_commission = 0.00015 # 매도 수수료 0.015%
+ self.sell_tax = 0.0018 # 증권거래세 0.18% (2024~)
+ self.slippage = 0.001 # 슬리피지 0.1%
+
def generate_mock_data(self, days=200):
- """
- yfinance 연결 실패 시 사용할 가상 주가 데이터 생성 (Random Walk)
- 삼성전자와 유사한 6~7만원대 가격 흐름 생성
- """
- print(f"🎲 [Backtest] Generating mock data for {days} days...")
- np.random.seed(42) # 재현성을 위해 시드 고정
-
+ """실제 시장과 유사한 Mock 데이터 (Random Walk + Mean Reversion)"""
+ print(f"🎲 [Backtest] Generating realistic mock data for {days} days...")
+ np.random.seed(42)
+
start_price = 70000
- returns = np.random.normal(0, 0.015, days) # 평균 0, 표준편차 1.5% 변동
+ # Mean-reverting Random Walk (실제 주가에 더 가까움)
+ mu = 0.0003 # 일평균 기대수익률 0.03%
+ sigma = 0.018 # 일변동성 1.8%
+ mean_reversion = 0.02 # 평균 회귀 속도
+
price_series = [start_price]
-
- # 인위적인 강력한 상승 추세 추가 (우상향)
- for i, r in enumerate(returns):
- trend = 0.003 # 매일 0.3%씩 강제 상승 (복리 효과로 엄청난 급등)
- # 중간에 잠깐 조정장
- if 80 < i < 100: trend = -0.01
-
- new_price = price_series[-1] * (1 + r + trend)
+ for i in range(days):
+ # 변동성 클러스터링 (GARCH 효과 근사)
+ if i > 0:
+ prev_return = (price_series[-1] - price_series[-2]) / price_series[-2]
+ dynamic_sigma = sigma * (1 + abs(prev_return) * 5) # 큰 변동 후 변동성 증가
+ else:
+ dynamic_sigma = sigma
+
+ # Mean reversion + trend
+ if len(price_series) >= 20:
+ ma20 = np.mean(price_series[-20:])
+ reversion = -mean_reversion * (price_series[-1] - ma20) / ma20
+ else:
+ reversion = 0
+
+ shock = np.random.normal(mu + reversion, dynamic_sigma)
+ new_price = price_series[-1] * (1 + shock)
+ new_price = max(new_price, start_price * 0.5) # 최소 50% 하한
price_series.append(new_price)
-
- # 날짜 인덱스 생성
- date_range = pd.date_range(start="2023-01-01", periods=len(price_series))
+
+ date_range = pd.date_range(start=self.start_date, periods=len(price_series))
self.data = pd.Series(price_series, index=date_range)
-
- # [Debugging] 차트가 너무 밋밋하지 않게 변동성 추가 확인
- print(f"📈 [Mock Data] Start: {price_series[0]:.0f}, End: {price_series[-1]:.0f}")
-
- print(f"✅ Generated {len(self.data)} days of mock data.")
+ print(f"📈 [Mock Data] Start: {price_series[0]:.0f}, End: {price_series[-1]:.0f}, "
+ f"Min: {min(price_series):.0f}, Max: {max(price_series):.0f}")
return True
def fetch_data(self):
- """(Legacy) yfinance를 이용해 과거 데이터 로드"""
- # 네트워크 이슈로 인해 Mock Data 우선 사용
+ """실제 데이터 로드 시도 → 실패 시 Mock"""
+ try:
+ import yfinance as yf
+ # 한국 주식은 .KS (KOSPI) 또는 .KQ (KOSDAQ) 접미사 필요
+ yf_ticker = f"{self.ticker}.KS"
+ df = yf.download(yf_ticker, start=self.start_date, end=self.end_date)
+ if not df.empty and len(df) > 60:
+ self.data = df['Close']
+ print(f"✅ [Backtest] Loaded {len(self.data)} days from yfinance ({yf_ticker})")
+ return True
+ except Exception as e:
+ print(f"⚠️ [Backtest] yfinance failed: {e}")
+
return self.generate_mock_data()
+ def _apply_slippage(self, price, is_buy):
+ """슬리피지 적용 (매수 시 높게, 매도 시 낮게)"""
+ if is_buy:
+ return price * (1 + self.slippage)
+ else:
+ return price * (1 - self.slippage)
+
def run(self):
if not hasattr(self, 'data') or self.data.empty:
- if not self.fetch_data(): return
+ if not self.fetch_data():
+ return
prices = self.data.values
dates = self.data.index
-
- # 최소 30일 데이터 필요
- if len(prices) < 30:
+
+ # 최소 60일 데이터 필요 (LSTM seq_length)
+ min_days = 60
+ if len(prices) < min_days + 10:
print("❌ Not enough data for backtest.")
return
- print("🚀 [Backtest] Simulation Started...")
-
- # 30일차부터 하루씩 전진하며 시뮬레이션
- for i in range(30, len(prices)):
+ print(f"🚀 [Backtest v2.0] Simulation Started ({len(prices)} days)...")
+ print(f" Capital: {self.initial_capital:,.0f} KRW")
+ print(f" Commission: Buy {self.buy_commission*100:.3f}% / Sell {self.sell_commission*100:.3f}%")
+ print(f" Tax: {self.sell_tax*100:.2f}% / Slippage: {self.slippage*100:.1f}%")
+
+ for i in range(min_days, len(prices)):
today_date = dates[i]
today_price = float(prices[i])
-
- # 과거 30일 데이터 (오늘 포함 시점의 과거 데이터)
- # 주의: 실제 매매 결정을 내리는 시점(장중/장마감)에 따라 index 처리 중요.
- # 여기서는 '장 마감 후 분석 -> 다음날 시가 매매' 또는 '당일 종가 매매' 가정.
- # 보수적으로 '당일 종가 매매' 가정 (분석 후 즉시 실행)
-
- history_window = prices[i-30:i+1] # 31개 (어제까지 30개 + 오늘)
- # [수정] 타입 체크 및 변환 (Numpy Array, Series, List 모두 대응)
+
+ # 과거 데이터 윈도우
+ history_window = prices[max(0, i-min_days):i+1]
if hasattr(history_window, 'values'):
current_window_list = history_window.values.tolist()
elif isinstance(history_window, np.ndarray):
current_window_list = history_window.tolist()
else:
current_window_list = list(history_window)
-
- # 1. 기술적 분석
- tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(today_price, current_window_list)
-
- # 2. AI 예측 (Online Learning Simulation)
- # 매일 재학습하면 너무 느리므로, 5일에 한번씩만 학습한다고 가정 (타협)
- # 또는 실제 Bot처럼 매번 학습하되, Backtest 속도 고려
- # 여기서는 정확성을 위해 매번 학습 시도 (데이터셋이 작으므로)
-
- # Mocking News Sentiment (Historical news unavailable -> Neutral)
- sentiment_score = 0.5
-
- # LSTM Predict
- # (속도를 위해 간략화된 학습 사용)
- pred_result = self.predictor.train_and_predict(current_window_list)
- if not pred_result: continue
-
- lstm_score = 0.5
- if pred_result['trend'] == 'UP':
- idx = min(pred_result['change_rate'], 3.0)
- lstm_score = 0.5 + (idx * 0.1)
- else:
- idx = max(pred_result['change_rate'], -3.0)
- lstm_score = 0.5 + (idx * 0.1)
- lstm_score = max(0.0, min(1.0, lstm_score))
- # 3. 통합 점수
- w_tech, w_news, w_ai = 0.4, 0.3, 0.3
+ # 1. 기술적 분석
+ tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(
+ today_price, current_window_list)
+
+ # 2. ATR 기반 동적 손절/익절
+ sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp(current_window_list)
+
+ # 3. LSTM 예측 (10일마다 재학습 → 속도 타협)
+ lstm_score = 0.5
+ ai_confidence = 0.5
+ if i % 10 == 0 or i == min_days:
+ pred_result = self.predictor.train_and_predict(current_window_list)
+ else:
+ # 재학습 없이 예측만
+ pred_result = self.predictor.train_and_predict(current_window_list, ticker=f"bt_{self.ticker}")
+
+ if pred_result:
+ ai_confidence = pred_result.get('confidence', 0.5)
+ change_mag = min(abs(pred_result['change_rate']), 5.0) / 5.0
+ if pred_result['trend'] == 'UP':
+ lstm_score = 0.5 + (change_mag * ai_confidence * 0.4)
+ else:
+ lstm_score = 0.5 - (change_mag * ai_confidence * 0.4)
+ lstm_score = max(0.0, min(1.0, lstm_score))
+
+ # 4. 뉴스 감정 (백테스트에서는 중립)
+ sentiment_score = 0.5
+
+ # 5. 통합 점수 (ADX 기반 동적 가중치)
+ adx_val = ma_info.get('adx', 20)
+ if adx_val >= 30:
+ w_tech, w_news, w_ai = 0.50, 0.15, 0.35
+ elif adx_val < 20:
+ w_tech, w_news, w_ai = 0.35, 0.30, 0.35
+ else:
+ w_tech, w_news, w_ai = 0.40, 0.25, 0.35
+
total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score)
-
- # 4. 리스크 관리 (손절/익절) 체크
- # 보유 중일 때만 체크
+
+ # 6. 리스크 관리 (보유 중일 때)
action = "HOLD"
action_reason = ""
-
+
if self.holdings > 0:
- # 수익률 계산
profit_rate = ((today_price - self.avg_price) / self.avg_price) * 100
-
- # 손절 (-5%) / 익절 (+8%)
- if profit_rate <= -5.0:
+
+ # A. 동적 손절 (ATR 기반)
+ if profit_rate <= sl_tp['stop_loss_pct']:
action = "SELL"
- action_reason = f"Stop Loss ({profit_rate:.2f}%)"
- elif profit_rate >= 8.0:
+ action_reason = f"Dynamic SL ({profit_rate:.1f}% <= {sl_tp['stop_loss_pct']:.1f}%)"
+
+ # B. 동적 익절 (ATR 기반)
+ elif profit_rate >= sl_tp['take_profit_pct']:
action = "SELL"
- action_reason = f"Take Profit ({profit_rate:.2f}%)"
- else:
- # AI 매도 시그널
- if total_score <= 0.3:
+ action_reason = f"Dynamic TP ({profit_rate:.1f}% >= {sl_tp['take_profit_pct']:.1f}%)"
+
+ # C. 트레일링 스탑
+ elif self.peak_price > 0 and profit_rate > 2.0:
+ drop_from_peak = ((today_price - self.peak_price) / self.peak_price) * 100
+ if drop_from_peak <= -sl_tp['trailing_stop_pct']:
action = "SELL"
- action_reason = f"AI Signal (Score: {total_score:.2f})"
-
- # 매수 로직
- if action == "HOLD" and total_score >= 0.7:
- # 중복 매수 필터 (간단화를 위해 최대 1회 진입 가정 or Pyramiding)
- # 여기선 불타기 허용 (최대 30% 비중까지만)
- max_pos = self.initial_capital * 0.3
- current_val = self.holdings * today_price
-
- if current_val < max_pos:
- action = "BUY"
-
- # 5. 주문 실행
+ action_reason = f"Trailing Stop ({drop_from_peak:.1f}% from peak)"
+
+ # D. AI 매도 시그널
+ if action == "HOLD" and total_score <= 0.30:
+ action = "SELL"
+ action_reason = f"AI Signal (Score: {total_score:.2f})"
+
+ # 최고가 업데이트
+ if today_price > self.peak_price:
+ self.peak_price = today_price
+
+ # 7. 매수 로직 (v2.0 - 포지션 사이징)
+ if action == "HOLD" and total_score >= 0.60:
+ # 복합 조건 확인
+ should_buy = False
+ mtf_align = ma_info.get('mtf_alignment', 'NEUTRAL')
+
+ if tech_score >= 0.70 and lstm_score >= 0.55:
+ should_buy = True
+ elif total_score >= 0.65 and mtf_align in ['STRONG_BULL', 'BULL']:
+ should_buy = True
+ elif total_score >= 0.70:
+ should_buy = True
+
+ if should_buy:
+ # 포지션 사이징
+ total_val = self.capital + (self.holdings * today_price)
+ current_pos_val = self.holdings * today_price
+ max_pos = total_val * 0.30 # 최대 30% 비중
+
+ if current_pos_val < max_pos:
+ qty = calculate_position_size(
+ total_capital=total_val,
+ current_price=today_price,
+ volatility=volatility,
+ score=total_score,
+ ai_confidence=ai_confidence
+ )
+ if qty > 0:
+ action = "BUY"
+
+ # 8. 주문 실행
if action == "BUY":
- # 포지션 사이징
- invest_amt = 1000000 # 기본
- if volatility >= 3.0: invest_amt = 500000
- elif volatility <= 1.5: invest_amt = 1500000
-
- # 잔고 확인
- invest_amt = min(invest_amt, self.capital)
- qty = int(invest_amt / today_price)
-
- if qty > 0:
- cost = qty * today_price
- # 수수료 0.015% 가정
- fee = cost * 0.00015
- if self.capital >= cost + fee:
- # 평단가 갱신
- total_cost = (self.avg_price * self.holdings) + cost
- self.holdings += qty
- self.avg_price = total_cost / self.holdings
- self.capital -= (cost + fee)
-
- self.trade_log.append({
- "date": today_date.strftime("%Y-%m-%d"),
- "action": "BUY",
- "price": today_price,
- "qty": qty,
- "score": total_score,
- "volatility": volatility,
- "balance": self.capital
- })
-
- elif action == "SELL":
+ # 슬리피지 적용
+ exec_price = self._apply_slippage(today_price, is_buy=True)
+
+ total_val = self.capital + (self.holdings * today_price)
+ qty = calculate_position_size(
+ total_capital=total_val,
+ current_price=exec_price,
+ volatility=volatility,
+ score=total_score,
+ ai_confidence=ai_confidence
+ )
+ if qty <= 0:
+ qty = 1
+
+ cost = qty * exec_price
+ fee = cost * self.buy_commission
+
+ if self.capital >= cost + fee:
+ # 평단가 갱신
+ total_cost = (self.avg_price * self.holdings) + cost
+ self.holdings += qty
+ self.avg_price = total_cost / self.holdings
+ self.capital -= (cost + fee)
+ self.peak_price = max(self.peak_price, exec_price)
+
+ self.trade_log.append({
+ "date": today_date.strftime("%Y-%m-%d"),
+ "action": "BUY",
+ "price": today_price,
+ "exec_price": exec_price,
+ "qty": qty,
+ "score": round(total_score, 3),
+ "volatility": round(volatility, 2),
+ "fee": round(fee, 0),
+ "balance": round(self.capital, 0)
+ })
+
+ elif action == "SELL" and self.holdings > 0:
+ exec_price = self._apply_slippage(today_price, is_buy=False)
qty = self.holdings
- revenue = qty * today_price
- # 세금+수수료 약 0.23% 가정
- fee = revenue * 0.0023
-
- profit = revenue - fee - (self.avg_price * qty)
- self.capital += (revenue - fee)
-
+ revenue = qty * exec_price
+ fee = revenue * self.sell_commission
+ tax = revenue * self.sell_tax
+ net_revenue = revenue - fee - tax
+
+ profit = net_revenue - (self.avg_price * qty)
+ self.capital += net_revenue
+
self.trade_log.append({
"date": today_date.strftime("%Y-%m-%d"),
"action": "SELL",
"price": today_price,
+ "exec_price": exec_price,
"qty": qty,
"reason": action_reason,
- "profit": profit,
- "balance": self.capital
+ "profit": round(profit, 0),
+ "fee": round(fee + tax, 0),
+ "balance": round(self.capital, 0)
})
-
+
self.holdings = 0
self.avg_price = 0
+ self.peak_price = 0
# 일별 가치 기록
total_val = self.capital + (self.holdings * today_price)
self.daily_values.append(total_val)
-
+
+ if len(self.daily_values) >= 2:
+ daily_return = (self.daily_values[-1] - self.daily_values[-2]) / self.daily_values[-2]
+ self.daily_returns.append(daily_return)
+
self.print_summary()
-
+ self.plot_results()
+
def print_summary(self):
if not self.daily_values:
print("❌ No simulation data.")
return
-
+
final_val = self.daily_values[-1]
roi = ((final_val - self.initial_capital) / self.initial_capital) * 100
-
- print("\n" + "="*40)
- print(f"📊 [Backtest Result] {self.ticker}")
- print(f"• Initial Capital: {self.initial_capital:,.0f} KRW")
- print(f"• Final Capital : {final_val:,.0f} KRW")
- print(f"• Return (ROI) : {roi:.2f}%")
- print(f"• Total Trades : {len(self.trade_log)}")
- print("="*40)
-
- # 최근 5개 거래 로그
- print("📝 Recent Trades:")
- for trade in self.trade_log[-5:]:
- action_emoji = "🔴" if trade['action'] == "BUY" else "🔵"
- print(f"{trade['date']} {action_emoji} {trade['action']} {trade['qty']}ea @ {trade['price']:,.0f} | {trade.get('reason', '')}")
+
+ # 성과 지표 계산
+ returns = np.array(self.daily_returns) if self.daily_returns else np.array([0])
+
+ # Sharpe Ratio (연환산, 무위험수익률 3.5% 가정)
+ rf_daily = 0.035 / 252
+ if np.std(returns) > 0:
+ sharpe = (np.mean(returns) - rf_daily) / np.std(returns) * np.sqrt(252)
+ else:
+ sharpe = 0
+
+ # Maximum Drawdown (MDD)
+ peak = np.maximum.accumulate(self.daily_values)
+ drawdown = (np.array(self.daily_values) - peak) / peak * 100
+ mdd = drawdown.min()
+
+ # Win Rate
+ sell_trades = [t for t in self.trade_log if t['action'] == 'SELL']
+ wins = [t for t in sell_trades if t.get('profit', 0) > 0]
+ losses = [t for t in sell_trades if t.get('profit', 0) <= 0]
+ win_rate = len(wins) / max(1, len(sell_trades)) * 100
+
+ # 평균 손익
+ avg_win = np.mean([t['profit'] for t in wins]) if wins else 0
+ avg_loss = np.mean([t['profit'] for t in losses]) if losses else 0
+ profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')
+
+ # 총 수수료/세금
+ total_fees = sum(t.get('fee', 0) for t in self.trade_log)
+
+ print("\n" + "=" * 55)
+ print(f"📊 [Backtest v2.0 Result] {self.ticker}")
+ print("=" * 55)
+ print(f" 💰 Initial Capital : {self.initial_capital:>15,.0f} KRW")
+ print(f" 💰 Final Value : {final_val:>15,.0f} KRW")
+ print(f" 📈 Return (ROI) : {roi:>14.2f}%")
+ print(f" 📉 Max Drawdown : {mdd:>14.2f}%")
+ print(f" 📊 Sharpe Ratio : {sharpe:>14.2f}")
+ print(f" 🎯 Win Rate : {win_rate:>14.1f}% ({len(wins)}/{len(sell_trades)})")
+ print(f" 💵 Avg Win : {avg_win:>15,.0f} KRW")
+ print(f" 💸 Avg Loss : {avg_loss:>15,.0f} KRW")
+ print(f" ⚖️ Profit Factor : {profit_factor:>14.2f}")
+ print(f" 💳 Total Fees/Tax : {total_fees:>15,.0f} KRW")
+ print(f" 🔄 Total Trades : {len(self.trade_log):>14}")
+ print("=" * 55)
+
+ # 최근 10개 거래 로그
+ print("\n📝 Recent Trades:")
+ for trade in self.trade_log[-10:]:
+ emoji = "🔴" if trade['action'] == "BUY" else "🔵"
+ line = (f" {trade['date']} {emoji} {trade['action']} "
+ f"{trade['qty']}ea @ {trade['price']:,.0f}")
+ if 'profit' in trade:
+ p = trade['profit']
+ line += f" | P&L: {p:+,.0f}"
+ if 'reason' in trade:
+ line += f" | {trade['reason']}"
+ print(line)
+
+ def plot_results(self):
+ """결과 차트 생성"""
+ if not self.daily_values:
+ return
+
+ try:
+ fig, axes = plt.subplots(3, 1, figsize=(14, 10), gridspec_kw={'height_ratios': [3, 1, 1]})
+
+ # 1. 포트폴리오 가치 vs Buy & Hold
+ ax1 = axes[0]
+ ax1.plot(self.daily_values, label='Strategy', color='blue', linewidth=1.5)
+
+ # Buy & Hold 비교
+ if hasattr(self, 'data'):
+ prices = self.data.values[60:]
+ if len(prices) == len(self.daily_values):
+ bh_shares = self.initial_capital / prices[0]
+ bh_values = bh_shares * prices
+ ax1.plot(bh_values, label='Buy & Hold', color='gray', alpha=0.5, linestyle='--')
+
+ ax1.axhline(y=self.initial_capital, color='red', linestyle=':', alpha=0.3, label='Initial')
+ ax1.set_title(f'Backtest Result: {self.ticker}', fontsize=14, fontweight='bold')
+ ax1.set_ylabel('Portfolio Value (KRW)')
+ ax1.legend(loc='upper left')
+ ax1.grid(True, alpha=0.3)
+
+ # 매매 포인트 표시
+ for trade in self.trade_log:
+ day_idx = None
+ for j, d in enumerate(self.data.index[60:]):
+ if d.strftime("%Y-%m-%d") == trade['date']:
+ day_idx = j
+ break
+ if day_idx is not None and day_idx < len(self.daily_values):
+ if trade['action'] == 'BUY':
+ ax1.scatter(day_idx, self.daily_values[day_idx], marker='^',
+ color='red', s=40, zorder=5)
+ else:
+ ax1.scatter(day_idx, self.daily_values[day_idx], marker='v',
+ color='blue', s=40, zorder=5)
+
+ # 2. Drawdown
+ ax2 = axes[1]
+ peak = np.maximum.accumulate(self.daily_values)
+ drawdown = (np.array(self.daily_values) - peak) / peak * 100
+ ax2.fill_between(range(len(drawdown)), drawdown, 0, color='red', alpha=0.3)
+ ax2.set_ylabel('Drawdown (%)')
+ ax2.grid(True, alpha=0.3)
+
+ # 3. Daily Returns
+ ax3 = axes[2]
+ if self.daily_returns:
+ colors = ['green' if r >= 0 else 'red' for r in self.daily_returns]
+ ax3.bar(range(len(self.daily_returns)), [r * 100 for r in self.daily_returns],
+ color=colors, alpha=0.5, width=1)
+ ax3.set_ylabel('Daily Return (%)')
+ ax3.set_xlabel('Trading Days')
+ ax3.grid(True, alpha=0.3)
+
+ plt.tight_layout()
+ chart_path = f"data/backtest_{self.ticker}.png"
+ plt.savefig(chart_path, dpi=150, bbox_inches='tight')
+ plt.close()
+ print(f"\n📊 Chart saved: {chart_path}")
+ except Exception as e:
+ print(f"⚠️ Chart generation failed: {e}")
+
if __name__ == "__main__":
- # 삼성전자(005930), 6개월 백테스팅
- # 최근 6개월간 로직이 통했는지 검증
- # (종목 코드는 KOSPI: 코드, KOSDAQ: 코드)
- backtester = Backtester("005930", start_date="2023-06-01", end_date="2024-01-01")
+ print("=" * 55)
+ print("🚀 AI Trading Backtester v2.0")
+ print("=" * 55)
+
+ # 삼성전자 6개월 백테스팅
+ backtester = Backtester("005930", start_date="2025-06-01", end_date="2026-02-01")
backtester.run()
diff --git a/modules/analysis/deep_learning.py b/modules/analysis/deep_learning.py
index c16c1d4..c579a95 100644
--- a/modules/analysis/deep_learning.py
+++ b/modules/analysis/deep_learning.py
@@ -138,13 +138,19 @@ class PricePredictor:
else:
print("[AI] No CUDA GPU detected. Running on CPU.")
- self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.0005, weight_decay=1e-4)
+ self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.001, weight_decay=1e-4)
+ # [v2.0] Learning Rate Scheduler (ReduceLROnPlateau: val_loss 정체 시 lr 감소)
+ self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
+ self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=False
+ )
self.scaler_amp = torch.amp.GradScaler('cuda') if self.use_amp else None
self.batch_size = 64
self.max_epochs = 200
self.seq_length = 60
self.patience = 15
+ # [v2.0] Gradient Clipping 값 (exploding gradient 방지)
+ self.max_grad_norm = 1.0
self.training_status = {
"is_training": False,
@@ -237,12 +243,19 @@ class PricePredictor:
max_epochs = 50 if has_checkpoint else self.max_epochs
# 4. 학습 (전체 데이터 GPU 상주, DataLoader 미사용)
+ # [v2.0] LR Scheduler 리셋
+ self.optimizer.param_groups[0]['lr'] = 0.001 if not has_checkpoint else 0.0005
+ self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
+ self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=False
+ )
+
self.model.train()
self.training_status["is_training"] = True
if ticker:
self.training_status["current_ticker"] = ticker
best_val_loss = float('inf')
+ best_model_state = None # [v2.0] Best Model 저장
patience_counter = 0
final_loss = 0.0
actual_epochs = 0
@@ -268,12 +281,17 @@ class PricePredictor:
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
self.scaler_amp.scale(loss).backward()
+ # [v2.0] Gradient Clipping (AMP 호환)
+ self.scaler_amp.unscale_(self.optimizer)
+ torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
self.scaler_amp.step(self.optimizer)
self.scaler_amp.update()
else:
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
loss.backward()
+ # [v2.0] Gradient Clipping
+ torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
self.optimizer.step()
epoch_loss += loss.item()
@@ -293,17 +311,26 @@ class PricePredictor:
val_loss = self.criterion(val_out, y_val).item()
self.model.train()
+ # [v2.0] LR Scheduler step (val_loss 기반)
+ self.lr_scheduler.step(val_loss)
+
final_loss = train_loss
actual_epochs = epoch + 1
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
+ # [v2.0] Best model 상태 저장 (overfitting 방지)
+ best_model_state = {k: v.clone() for k, v in self.model.state_dict().items()}
else:
patience_counter += 1
if patience_counter >= self.patience:
break
+ # [v2.0] Best model 복원 (early stopping 후 최적 상태로 복구)
+ if best_model_state:
+ self.model.load_state_dict(best_model_state)
+
self.training_status["is_training"] = False
self.training_status["loss"] = final_loss
@@ -346,7 +373,30 @@ class PricePredictor:
current_price = prices[-1]
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
- confidence = 1.0 / (1.0 + (final_loss * 100))
+
+ # [v2.0] 개선된 신뢰도 계산
+ # 1. 학습 손실 기반 (낮을수록 좋음)
+ loss_confidence = 1.0 / (1.0 + (best_val_loss * 50))
+
+ # 2. Train/Val 괴리도 (overfitting 감지)
+ overfit_ratio = final_loss / (best_val_loss + 1e-9)
+ if overfit_ratio < 0.5:
+ # Train loss가 Val loss보다 훨씬 낮음 = overfitting
+ overfit_penalty = 0.7
+ elif overfit_ratio > 2.0:
+ # Train loss가 Val loss보다 훨씬 높음 = underfitting
+ overfit_penalty = 0.8
+ else:
+ overfit_penalty = 1.0
+
+ # 3. 에포크 수 기반 (너무 적거나 많으면 불신)
+ epoch_factor = 1.0
+ if actual_epochs < 10:
+ epoch_factor = 0.6 # 학습 부족
+ elif actual_epochs >= max_epochs:
+ epoch_factor = 0.8 # 수렴 실패
+
+ confidence = min(0.95, loss_confidence * overfit_penalty * epoch_factor)
return {
"current": current_price,
@@ -354,9 +404,11 @@ class PricePredictor:
"change_rate": round(change_rate, 2),
"trend": trend,
"loss": final_loss,
+ "val_loss": best_val_loss,
"confidence": round(confidence, 2),
"epochs": actual_epochs,
- "device": str(self.device)
+ "device": str(self.device),
+ "lr": self.optimizer.param_groups[0]['lr']
}
def batch_predict(self, prices_dict):
diff --git a/modules/analysis/macro.py b/modules/analysis/macro.py
index 8c6a7cb..9ef5c4a 100644
--- a/modules/analysis/macro.py
+++ b/modules/analysis/macro.py
@@ -49,26 +49,38 @@ class MacroAnalyzer:
results[name] = {"price": 0, "change": 0}
# [신규] 시장 스트레스 지수(MSI) 추가
- time.sleep(0.6) # MSI 계산 전 추가 대기
+ time.sleep(0.6)
kospi_stress = MacroAnalyzer.calculate_stress_index(kis_client, "0001")
results['MSI'] = kospi_stress
print(f" - Market Stress Index: {kospi_stress}")
-
+
if kospi_stress >= 50:
- risk_score += 2 # 매우 위험
+ risk_score += 2
elif kospi_stress >= 30:
- risk_score += 1 # 위험
+ risk_score += 1
+
+ # [v2.0] KOSPI/KOSDAQ 연동 위험도 (둘 다 하락 시 더 위험)
+ kospi_change = results.get('KOSPI', {}).get('change', 0)
+ kosdaq_change = results.get('KOSDAQ', {}).get('change', 0)
+ if kospi_change <= -1.0 and kosdaq_change <= -1.0:
+ risk_score += 1 # 양대 지수 동반 하락
+ print(f" ⚠️ Both KOSPI({kospi_change}%) & KOSDAQ({kosdaq_change}%) declining!")
+
+ # [v2.0] 급반등 감지 (전일 급락 후 반등 = 불안정)
+ if kospi_change >= 2.0 and kospi_stress >= 30:
+ risk_score = max(risk_score, 1) # 급반등이지만 스트레스 높으면 CAUTION 유지
+ print(f" 📈 Sharp rebound detected but MSI still elevated")
# 시장 상태 정의
status = "SAFE"
- if risk_score >= 2:
- status = "DANGER" # 매수 중단 권장
+ if risk_score >= 3:
+ status = "DANGER"
elif risk_score >= 1:
- status = "CAUTION" # 보수적 매매
-
+ status = "CAUTION"
+
return {
"status": status,
- "risk_score": risk_score,
+ "risk_score": risk_score,
"indicators": results
}
diff --git a/modules/analysis/technical.py b/modules/analysis/technical.py
index 1e1a48e..67d09b6 100644
--- a/modules/analysis/technical.py
+++ b/modules/analysis/technical.py
@@ -5,22 +5,290 @@ class TechnicalAnalyzer:
"""
Pandas를 활용한 기술적 지표 계산 모듈
CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리
+
+ [v2.0 개선사항]
+ - ATR(Average True Range): 변동성 기반 동적 손절/익절 산출
+ - ADX(Average Directional Index): 추세 강도 측정 (방향 아닌 '강도')
+ - OBV(On Balance Volume): 거래량 기반 매집/분산 감지
+ - 다중 시간프레임(MTF): 5일/20일/60일 추세 일관성 확인
+ - VWAP 근사: 거래량가중평균가격
"""
-
+
@staticmethod
def calculate_rsi(prices, period=14):
- """RSI(Relative Strength Index) 계산"""
+ """RSI(Relative Strength Index) 계산 - Wilder 방식 적용"""
if len(prices) < period:
- return 50.0 # 데이터 부족 시 중립
-
+ return 50.0
+
delta = pd.Series(prices).diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
-
- rs = gain / loss
+ gain = delta.where(delta > 0, 0)
+ loss = -delta.where(delta < 0, 0)
+
+ # Wilder의 지수이동평균 방식 (더 정확)
+ avg_gain = gain.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
+ avg_loss = loss.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
+
+ rs = avg_gain / (avg_loss + 1e-9)
rsi = 100 - (100 / (1 + rs))
return rsi.iloc[-1]
+ @staticmethod
+ def calculate_atr(prices, high_prices=None, low_prices=None, period=14):
+ """ATR(Average True Range) 계산 - 동적 손절/익절의 핵심 지표
+
+ Returns:
+ float: ATR 값 (가격 단위), 0이면 데이터 부족
+ """
+ if len(prices) < period + 1:
+ return 0.0
+
+ close = pd.Series(prices)
+
+ if high_prices and len(high_prices) == len(prices):
+ high = pd.Series(high_prices)
+ low = pd.Series(low_prices)
+ else:
+ # 고가/저가 없으면 종가 기반 추정 (일변동폭 1.5% 가정)
+ high = close * 1.008
+ low = close * 0.992
+
+ # True Range = max(H-L, |H-Cprev|, |L-Cprev|)
+ prev_close = close.shift(1)
+ tr1 = high - low
+ tr2 = (high - prev_close).abs()
+ tr3 = (low - prev_close).abs()
+ tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
+
+ # Wilder's smoothing
+ atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
+ return atr.iloc[-1] if not pd.isna(atr.iloc[-1]) else 0.0
+
+ @staticmethod
+ def calculate_adx(prices, high_prices=None, low_prices=None, period=14):
+ """ADX(Average Directional Index) - 추세 강도 측정
+
+ Returns:
+ tuple: (adx, plus_di, minus_di)
+ - ADX > 25: 강한 추세, ADX < 20: 횡보/비추세
+ - +DI > -DI: 상승 추세, -DI > +DI: 하락 추세
+ """
+ if len(prices) < period * 2:
+ return 20.0, 50.0, 50.0 # 중립
+
+ close = pd.Series(prices)
+
+ if high_prices and len(high_prices) == len(prices):
+ high = pd.Series(high_prices)
+ low = pd.Series(low_prices)
+ else:
+ # 종가 기반 추정
+ daily_range = close.pct_change().abs().rolling(5).mean().fillna(0.01) * close
+ high = close + daily_range * 0.5
+ low = close - daily_range * 0.5
+
+ # +DM, -DM
+ plus_dm = high.diff()
+ minus_dm = -low.diff()
+
+ plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
+ minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
+
+ # ATR
+ prev_close = close.shift(1)
+ tr = pd.concat([
+ high - low,
+ (high - prev_close).abs(),
+ (low - prev_close).abs()
+ ], axis=1).max(axis=1)
+
+ atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
+
+ # +DI, -DI
+ plus_di = 100 * plus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
+ minus_di = 100 * minus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
+
+ # DX → ADX
+ dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9)
+ adx = dx.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
+
+ return (
+ adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0,
+ plus_di.iloc[-1] if not pd.isna(plus_di.iloc[-1]) else 50.0,
+ minus_di.iloc[-1] if not pd.isna(minus_di.iloc[-1]) else 50.0
+ )
+
+ @staticmethod
+ def calculate_obv(prices, volume_history):
+ """OBV(On Balance Volume) - 스마트머니 매집/분산 감지
+
+ Returns:
+ dict: {
+ 'obv_trend': 'ACCUMULATING' | 'DISTRIBUTING' | 'NEUTRAL',
+ 'obv_divergence': True/False (가격↑ but OBV↓ = 약세 다이버전스)
+ }
+ """
+ if not volume_history or len(volume_history) < 20 or len(prices) < 20:
+ return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
+
+ close = pd.Series(prices)
+ volume = pd.Series(volume_history)
+
+ # OBV 계산
+ direction = close.diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
+ obv = (direction * volume).cumsum()
+
+ # OBV 추세 (20일 이동평균 대비)
+ obv_ma = obv.rolling(20).mean()
+ obv_current = obv.iloc[-1]
+ obv_ma_current = obv_ma.iloc[-1]
+
+ if pd.isna(obv_ma_current):
+ return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
+
+ # 추세 판단
+ obv_trend = 'NEUTRAL'
+ score = 0.0
+ if obv_current > obv_ma_current * 1.05:
+ obv_trend = 'ACCUMULATING' # 매집 중
+ score = 0.1
+ elif obv_current < obv_ma_current * 0.95:
+ obv_trend = 'DISTRIBUTING' # 분산 중
+ score = -0.1
+
+ # 다이버전스 감지 (최근 10일)
+ price_trend = close.iloc[-1] > close.iloc[-10] if len(close) >= 10 else False
+ obv_price_trend = obv.iloc[-1] > obv.iloc[-10] if len(obv) >= 10 else False
+
+ divergence = False
+ if price_trend and not obv_price_trend:
+ divergence = True # 약세 다이버전스 (가격↑ OBV↓)
+ score -= 0.05
+ elif not price_trend and obv_price_trend:
+ divergence = True # 강세 다이버전스 (가격↓ OBV↑)
+ score += 0.05
+
+ return {
+ 'obv_trend': obv_trend,
+ 'obv_divergence': divergence,
+ 'score': round(score, 3)
+ }
+
+ @staticmethod
+ def get_multi_timeframe_trend(prices):
+ """다중 시간프레임 추세 일관성 검사
+
+ 5일(초단기), 20일(단기), 60일(중기) 추세가 일치하면 강한 신호
+
+ Returns:
+ dict: {
+ 'alignment': 'STRONG_BULL' | 'BULL' | 'NEUTRAL' | 'BEAR' | 'STRONG_BEAR',
+ 'score': -1.0 ~ 1.0,
+ 'details': {...}
+ }
+ """
+ if len(prices) < 60:
+ return {'alignment': 'NEUTRAL', 'score': 0.0, 'details': {}}
+
+ p = pd.Series(prices)
+ current = p.iloc[-1]
+
+ ma5 = p.rolling(5).mean().iloc[-1]
+ ma20 = p.rolling(20).mean().iloc[-1]
+ ma60 = p.rolling(60).mean().iloc[-1]
+
+ # 추세 방향 점수
+ trends = []
+ if current > ma5: trends.append(1)
+ else: trends.append(-1)
+
+ if ma5 > ma20: trends.append(1)
+ else: trends.append(-1)
+
+ if ma20 > ma60: trends.append(1)
+ else: trends.append(-1)
+
+ total = sum(trends)
+
+ if total == 3:
+ alignment = 'STRONG_BULL'
+ score = 0.15
+ elif total >= 1:
+ alignment = 'BULL'
+ score = 0.05
+ elif total == -3:
+ alignment = 'STRONG_BEAR'
+ score = -0.15
+ elif total <= -1:
+ alignment = 'BEAR'
+ score = -0.05
+ else:
+ alignment = 'NEUTRAL'
+ score = 0.0
+
+ return {
+ 'alignment': alignment,
+ 'score': score,
+ 'details': {
+ 'ma5': round(ma5, 1),
+ 'ma20': round(ma20, 1),
+ 'ma60': round(ma60, 1),
+ 'price_vs_ma5': 'above' if current > ma5 else 'below'
+ }
+ }
+
+ @staticmethod
+ def calculate_dynamic_sl_tp(prices, high_prices=None, low_prices=None, atr_multiplier_sl=2.0, atr_multiplier_tp=3.0):
+ """ATR 기반 동적 손절/익절 계산
+
+ 변동성에 맞는 적응형 손절/익절 라인 산출
+ - 변동성 큰 종목: 넓은 손절폭 (whipsaw 방지)
+ - 변동성 작은 종목: 좁은 손절폭 (빠른 리스크 관리)
+
+ Returns:
+ dict: {
+ 'atr': ATR값,
+ 'atr_pct': ATR% (가격 대비),
+ 'stop_loss_pct': 손절 비율 (%),
+ 'take_profit_pct': 익절 비율 (%),
+ 'trailing_stop_pct': 트레일링 스탑 비율 (%)
+ }
+ """
+ if len(prices) < 15:
+ return {
+ 'atr': 0, 'atr_pct': 0,
+ 'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
+ 'trailing_stop_pct': 3.0
+ }
+
+ atr = TechnicalAnalyzer.calculate_atr(prices, high_prices, low_prices)
+ current_price = prices[-1]
+
+ if current_price <= 0:
+ return {
+ 'atr': 0, 'atr_pct': 0,
+ 'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
+ 'trailing_stop_pct': 3.0
+ }
+
+ atr_pct = (atr / current_price) * 100
+
+ # 동적 손절: ATR x 2 (단, 최소 -3%, 최대 -10%)
+ sl_pct = max(-10.0, min(-3.0, -atr_pct * atr_multiplier_sl))
+
+ # 동적 익절: ATR x 3 (단, 최소 +5%, 최대 +25%)
+ tp_pct = max(5.0, min(25.0, atr_pct * atr_multiplier_tp))
+
+ # 트레일링 스탑: ATR x 1.5 (최고가 대비)
+ trailing_pct = max(2.0, min(8.0, atr_pct * 1.5))
+
+ return {
+ 'atr': round(atr, 1),
+ 'atr_pct': round(atr_pct, 2),
+ 'stop_loss_pct': round(sl_pct, 2),
+ 'take_profit_pct': round(tp_pct, 2),
+ 'trailing_stop_pct': round(trailing_pct, 2)
+ }
+
@staticmethod
def calculate_ma(prices, period=20):
"""이동평균선(Moving Average) 계산"""
@@ -87,126 +355,157 @@ class TechnicalAnalyzer:
@staticmethod
def get_technical_score(current_price, prices_history, volume_history=None):
"""
- 기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (고도화됨)
- - RSI, 이격도, MACD, Bollinger Bands, Stochastic 종합
- - [New] Volume Analysis (Whale Activity)
+ 기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (v2.0 고도화)
+
+ [v2.0 변경점]
+ - RSI: 25% (30% → 25%, ADX에 비중 이전)
+ - 이격도: 15% (20% → 15%)
+ - MACD: 15% (20% → 15%)
+ - Bollinger: 10% (15% → 10%)
+ - Stochastic: 10% (15% → 10%)
+ - ADX 추세강도: 15% (신규)
+ - MTF 다중시간프레임: 10% (신규)
+ - OBV/거래량 보너스: ±0.1 (보너스)
"""
if not prices_history or len(prices_history) < 30:
- return 0.5, 50.0 # 데이터 부족 시 중립
-
+ return 0.5, 50.0, 0.0, 1.0, {"ma20": 0, "ma114": 0, "trend": "Unknown", "position": "Unknown"}
+
scores = []
-
- # 1. RSI (비중 30%)
- # 30 이하(과매도) -> 1.0, 70 이상(과매수) -> 0.0
+
+ # 1. RSI (비중 25%)
rsi = TechnicalAnalyzer.calculate_rsi(prices_history)
if rsi <= 30: rsi_score = 1.0
elif rsi >= 70: rsi_score = 0.0
- else: rsi_score = 1.0 - ((rsi - 30) / 40.0) # 선형 보간
- scores.append(rsi_score * 0.3)
-
- # 2. 이격도 (비중 20%)
+ else: rsi_score = 1.0 - ((rsi - 30) / 40.0)
+ scores.append(rsi_score * 0.25)
+
+ # 2. 이격도 (비중 15%)
ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20)
- disparity = (current_price - ma20) / ma20
- # 이격도가 마이너스일수록(저평가) 점수 높음
- if disparity < -0.05: disp_score = 1.0 # -5% 이상 하락
- elif disparity > 0.05: disp_score = 0.0 # +5% 이상 상승
- else: disp_score = 0.5 - (disparity * 10) # -0.05~0.05 사이
- scores.append(disp_score * 0.2)
-
- # 3. MACD (비중 20%)
- # MACD가 Signal선 위에 있으면 상승세 (매수)
+ disparity = (current_price - ma20) / (ma20 + 1e-9)
+ if disparity < -0.05: disp_score = 1.0
+ elif disparity > 0.05: disp_score = 0.0
+ else: disp_score = 0.5 - (disparity * 10)
+ scores.append(disp_score * 0.15)
+
+ # 3. MACD (비중 15%)
macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history)
- if hist > 0 and macd > 0: macd_score = 0.8 # 상승 추세 가속
- elif hist > 0 and macd <= 0: macd_score = 0.6 # 상승 반전 초기
- elif hist < 0 and macd > 0: macd_score = 0.4 # 하락 반전 초기
- else: macd_score = 0.2 # 하락 추세
- scores.append(macd_score * 0.2)
-
- # 4. Bollinger Bands (비중 15%)
- # 하단 밴드 근처 -> 매수(1.0), 상단 밴드 근처 -> 매도(0.0)
+ if hist > 0 and macd > 0: macd_score = 0.8
+ elif hist > 0 and macd <= 0: macd_score = 0.65 # 골든크로스 초기 = 매수 기회
+ elif hist < 0 and macd > 0: macd_score = 0.35 # 데드크로스 초기
+ else: macd_score = 0.2
+ scores.append(macd_score * 0.15)
+
+ # 4. Bollinger Bands (비중 10%)
up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history)
- if current_price <= low: bb_score = 1.0
- bb_score_base = 0.0
- if current_price <= low: bb_score_base = 1.0
- elif current_price >= up: bb_score_base = 0.0
+ if current_price <= low:
+ bb_score = 1.0
+ elif current_price >= up:
+ bb_score = 0.0
else:
- # 밴드 내 위치 비율 (Position %B) 유사 계산
- # 하단(0) ~ 상단(1) -> 점수는 1 ~ 0 역순
pos = (current_price - low) / (up - low + 1e-9)
- bb_score_base = 1.0 - pos
-
- # 추가 점수 로직 (기존 tech_score += 0.2를 bb_score에 반영)
- if current_price < low: # 과매도 (저점 매수 기회)
- bb_score = min(1.0, bb_score_base + 0.2) # 최대 1.0
- else:
- bb_score = bb_score_base
- scores.append(bb_score * 0.15)
-
- # 5. Stochastic (비중 15%)
- # K가 20 미만 -> 과매도(매수), 80 이상 -> 과매수(매도)
+ bb_score = 1.0 - pos
+ if current_price < low:
+ bb_score = min(1.0, bb_score + 0.2)
+ scores.append(bb_score * 0.10)
+
+ # 5. Stochastic (비중 10%)
slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history)
- st_score_base = 0.0
- if slow_k < 20: st_score_base = 1.0
- elif slow_k > 80: st_score_base = 0.0
- else: st_score_base = 1.0 - (slow_k / 100.0)
-
- # 추가 점수 로직 (기존 tech_score += 0.2 / -= 0.1를 st_score에 반영)
- if slow_k < 20: # 과매도
- st_score = min(1.0, st_score_base + 0.2)
- elif slow_k > 80: # 과매수
- st_score = max(0.0, st_score_base - 0.1)
- else:
- st_score = st_score_base
- scores.append(st_score * 0.15)
-
+ if slow_k < 20: st_score = 1.0
+ elif slow_k > 80: st_score = 0.0
+ else: st_score = 1.0 - (slow_k / 100.0)
+ # 골든/데드크로스 보정
+ if slow_k < 20 and slow_k > slow_d: # 과매도 영역에서 골든크로스
+ st_score = min(1.0, st_score + 0.15)
+ elif slow_k > 80 and slow_k < slow_d: # 과매수 영역에서 데드크로스
+ st_score = max(0.0, st_score - 0.15)
+ scores.append(st_score * 0.10)
+
+ # 6. [신규] ADX 추세 강도 (비중 15%)
+ adx, plus_di, minus_di = TechnicalAnalyzer.calculate_adx(prices_history)
+ if adx >= 25: # 강한 추세
+ if plus_di > minus_di:
+ adx_score = 0.8 + min(0.2, (adx - 25) / 50) # 강한 상승추세
+ else:
+ adx_score = 0.2 - min(0.2, (adx - 25) / 50) # 강한 하락추세
+ else: # 비추세/횡보
+ adx_score = 0.5 # 중립
+ adx_score = max(0.0, min(1.0, adx_score))
+ scores.append(adx_score * 0.15)
+
+ # 7. [신규] 다중 시간프레임 (비중 10%)
+ mtf = TechnicalAnalyzer.get_multi_timeframe_trend(prices_history)
+ # MTF score를 0~1 범위로 변환
+ mtf_score = 0.5 + mtf['score'] # -0.15~+0.15 → 0.35~0.65
+ mtf_score = max(0.0, min(1.0, mtf_score))
+ scores.append(mtf_score * 0.10)
+
total_score = sum(scores)
-
- # [신규] 거래량 폭증 분석 (Whale Tracking)
+
+ # [보너스] 거래량 분석 (Whale Tracking + OBV)
volume_ratio = 1.0
if volume_history and len(volume_history) >= 5:
vol_s = pd.Series(volume_history)
- avg_vol = vol_s.rolling(window=5).mean().iloc[-2] # 어제까지의 5일 평균
+ avg_vol = vol_s.rolling(window=5).mean().iloc[-2]
current_vol = volume_history[-1]
if avg_vol > 0:
volume_ratio = current_vol / avg_vol
-
- # 평소 거래량의 3배(300%) 이상 터지면 세력 유입 가능성 높음 -> 가산점
+
+ # 거래량 폭증 보너스
if volume_ratio >= 3.0:
- total_score += 0.1 # 강력한 매수 신호
-
+ total_score += 0.08
+
+ # OBV 분석 보너스
+ obv_result = TechnicalAnalyzer.calculate_obv(prices_history, volume_history)
+ total_score += obv_result['score']
+
+ # MTF 추세 일관성 보너스 (위의 가중치 10% 외에 추가 보너스)
+ if mtf['alignment'] == 'STRONG_BULL':
+ total_score += 0.05
+ elif mtf['alignment'] == 'STRONG_BEAR':
+ total_score -= 0.05
+
# 0.0 ~ 1.0 클리핑
total_score = max(0.0, min(1.0, total_score))
-
- # [신규] 변동성(Volatility) 계산
- # 최근 20일간 일일 변동폭의 표준편차를 평균 가격으로 나눔
+
+ # 변동성(Volatility) 계산
if len(prices_history) > 1:
- # list 입력 대응
prices_np = np.array(prices_history)
changes = np.diff(prices_np) / prices_np[:-1]
- volatility = np.std(changes) * 100 # 퍼센트 단위
+ volatility = np.std(changes) * 100
else:
volatility = 0.0
- # [신규] 이동평균선 분석 (20일, 114일)
+ # 이동평균선 분석 (5일, 20일, 60일, 114일)
+ ma5 = TechnicalAnalyzer.calculate_ma(prices_history, 5)
+ ma60 = TechnicalAnalyzer.calculate_ma(prices_history, 60)
ma114 = TechnicalAnalyzer.calculate_ma(prices_history, 114)
-
+
ma_trend = "Unknown"
- if ma20 > ma114:
- ma_trend = "Bullish (Golden Alignment)" # 정배열
+ if ma5 > ma20 > ma60:
+ ma_trend = "Bullish (Golden Alignment)"
+ elif ma5 < ma20 < ma60:
+ ma_trend = "Bearish (Dead Alignment)"
+ elif ma20 > ma114:
+ ma_trend = "Moderate Bullish"
else:
- ma_trend = "Bearish (Dead Alignment)" # 역배열
-
+ ma_trend = "Moderate Bearish"
+
price_pos = "Unknown"
if current_price > ma20:
price_pos = "Above MA20"
else:
price_pos = "Below MA20"
-
+
ma_info = {
- "ma20": ma20,
- "ma114": ma114,
+ "ma5": round(ma5, 1),
+ "ma20": round(ma20, 1),
+ "ma60": round(ma60, 1),
+ "ma114": round(ma114, 1),
"trend": ma_trend,
- "position": price_pos
+ "position": price_pos,
+ "adx": round(adx, 1),
+ "adx_trend": "Strong" if adx >= 25 else "Weak/Sideways",
+ "mtf_alignment": mtf['alignment']
}
return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1), ma_info
diff --git a/modules/bot.py b/modules/bot.py
index cd94dee..6ca3689 100644
--- a/modules/bot.py
+++ b/modules/bot.py
@@ -13,7 +13,7 @@ from modules.services.ollama import OllamaManager
from modules.services.telegram import TelegramMessenger
from modules.analysis.macro import MacroAnalyzer
from modules.utils.monitor import SystemMonitor
-from modules.strategy.process import analyze_stock_process
+from modules.strategy.process import analyze_stock_process, calculate_position_size
try:
from theme_manager import ThemeManager
@@ -31,11 +31,21 @@ def init_worker():
class AutoTradingBot:
+ """
+ [v2.0] 개선된 자동매매 봇
+
+ 주요 개선사항:
+ 1. ATR 기반 동적 손절/익절 + 트레일링 스탑
+ 2. 변동성 기반 포지션 사이징 (1주 고정 → 동적 수량)
+ 3. 보유종목 분석 기반 매도 (score 기반 SELL 판단)
+ 4. 매크로 상태를 분석 워커에 전달 (동적 임계값)
+ 5. 최고가 추적 (트레일링 스탑용)
+ 6. 상세한 매매 로그 및 텔레그램 알림
+ """
def __init__(self, ipc_lock=None, command_queue=None, shutdown_event=None):
# 1. 서비스 초기화
self.kis = KISClient()
self.news = AsyncNewsCollector()
- # GPU 경합 방지: 워커 1개만 사용 (LSTM 학습이 GPU 독점)
self.executor = ProcessPoolExecutor(max_workers=1, initializer=init_worker)
try:
list(self.executor.map(lambda x: x, range(1)))
@@ -56,6 +66,13 @@ class AutoTradingBot:
self.watchlist_updated_today = False
self.report_sent = False
+ # [v2.0] 트레일링 스탑용 최고가 추적
+ # {ticker: peak_price}
+ self.peak_prices = {}
+
+ # [v2.0] 최근 매크로 상태 캐싱
+ self.last_macro_status = None
+
# 4. 프로세스 관리
self.shutdown_event = shutdown_event
@@ -113,6 +130,33 @@ class AutoTradingBot:
except Exception:
return {}
+ def _load_peak_prices(self):
+ """트레일링 스탑용 최고가 데이터 로드"""
+ peak_file = os.path.join(Config.DATA_DIR, "peak_prices.json")
+ if os.path.exists(peak_file):
+ try:
+ with open(peak_file, "r", encoding="utf-8") as f:
+ self.peak_prices = json.load(f)
+ except Exception:
+ self.peak_prices = {}
+
+ def _save_peak_prices(self):
+ """트레일링 스탑용 최고가 데이터 저장"""
+ peak_file = os.path.join(Config.DATA_DIR, "peak_prices.json")
+ try:
+ with open(peak_file, "w", encoding="utf-8") as f:
+ json.dump(self.peak_prices, f, indent=2)
+ except Exception:
+ pass
+
+ def _update_peak_price(self, ticker, current_price):
+ """보유 종목의 최고가 갱신"""
+ if ticker not in self.peak_prices:
+ self.peak_prices[ticker] = current_price
+ elif current_price > self.peak_prices[ticker]:
+ self.peak_prices[ticker] = current_price
+ print(f" 📈 [Peak Updated] {ticker}: {current_price:,.0f}")
+
def send_daily_report(self):
if self.report_sent:
return
@@ -120,20 +164,46 @@ class AutoTradingBot:
balance = self.kis.get_balance()
total_eval = int(balance.get("total_eval", 0))
- report = f"📅 [Daily Closing Report]\n" \
- f"💰 Total Asset: {total_eval:,}원\n" \
- f"📜 Trades Today: {len(self.daily_trade_history)}건\n\n"
+ deposit = int(balance.get("deposit", 0))
+ report = (f"📅 [Daily Closing Report]\n"
+ f"💰 Total Asset: {total_eval:,}원\n"
+ f"💵 Cash: {deposit:,}원\n"
+ f"📜 Trades Today: {len(self.daily_trade_history)}건\n\n")
+
+ # 매매 내역
if self.daily_trade_history:
+ total_profit = 0
+ buy_count = 0
+ sell_count = 0
for trade in self.daily_trade_history:
action = trade['action']
icon = "🔴" if action == "BUY" else "🔵"
- report += f"{icon} {action} {trade['name']} {trade['qty']}주\n"
+ qty = trade.get('qty', 0)
+ price = trade.get('price', 0)
+ reason = trade.get('reason', '')
+ report += f"{icon} {action} {trade['name']} {qty}주 @ {price:,.0f}원"
+ if reason:
+ report += f" ({reason})"
+ report += "\n"
+ if action == "BUY":
+ buy_count += 1
+ else:
+ sell_count += 1
+ total_profit += trade.get('profit', 0)
+
+ report += f"\n📊 매수 {buy_count}건 / 매도 {sell_count}건"
+ if sell_count > 0:
+ report += f" | 실현손익: {total_profit:,.0f}원"
+ report += "\n"
+
+ # 보유종목 현황
if "holdings" in balance and balance["holdings"]:
report += "\n📊 [Holdings]\n"
for stock in balance["holdings"]:
yld = float(stock.get('yield', 0))
+ profit_loss = int(stock.get('profit_loss', 0))
if yld > 0:
icon = "🔴"
yld_str = f"+{yld}"
@@ -143,8 +213,9 @@ class AutoTradingBot:
else:
icon = "⚪"
yld_str = f"{yld}"
-
- report += f"{icon} {stock['name']}: {yld_str}%\n"
+
+ report += (f"{icon} {stock['name']}: {yld_str}% "
+ f"({profit_loss:+,}원)\n")
self.messenger.send_message(report)
self.report_sent = True
@@ -170,7 +241,6 @@ class AutoTradingBot:
if command == 'restart':
self.messenger.send_message("[Bot] Restart requested via Telegram.")
- # executor 재시작
self.restart_executor()
elif command == 'update_watchlist':
@@ -189,11 +259,23 @@ class AutoTradingBot:
# 1. 거시경제 분석
macro_status = MacroAnalyzer.get_macro_status(self.kis)
+ self.last_macro_status = macro_status
is_crash = False
+
if macro_status['status'] == 'DANGER':
is_crash = True
if not self.is_macro_warning_sent:
- self.messenger.send_message("🚨 [MARKET CRASH ALERT] 시장 급락 감지! 매수 중단.")
+ self.messenger.send_message(
+ "🚨 [MARKET CRASH ALERT]\n"
+ "시장 급락 감지! 매수 중단, 매도 기준 상향.\n"
+ f"Risk Score: {macro_status['risk_score']}")
+ self.is_macro_warning_sent = True
+ elif macro_status['status'] == 'CAUTION':
+ if not self.is_macro_warning_sent:
+ self.messenger.send_message(
+ "⚠️ [MARKET CAUTION]\n"
+ "시장 불안정. 보수적 매매 모드 전환.\n"
+ f"Risk Score: {macro_status['risk_score']}")
self.is_macro_warning_sent = True
else:
if self.is_macro_warning_sent:
@@ -236,6 +318,8 @@ class AutoTradingBot:
self.report_sent = False
self.discovered_stocks.clear()
self.watchlist_updated_today = False
+ # 전일 최고가 초기화 (보유하지 않는 종목)
+ self._load_peak_prices()
# 5. 시스템 감시 (3분 간격)
self.monitor.check_health()
@@ -252,52 +336,33 @@ class AutoTradingBot:
# 7. 종목 분석 및 매매
target_dict = self.load_watchlist()
- # 보유 종목 리스크 관리
+ # [v2.0] 잔고 조회 및 보유종목 맵 생성
balance = self.kis.get_balance()
current_holdings = {}
+ total_eval = int(balance.get("total_eval", 0))
if balance and "holdings" in balance:
for stock in balance["holdings"]:
code = stock.get("code")
- name = stock.get("name")
qty = int(stock.get("qty", 0))
- yld = float(stock.get("yield", 0.0))
+ if qty > 0:
+ current_holdings[code] = stock
+ # 최고가 업데이트 (트레일링 스탑용)
+ current_price = float(stock.get('current_price', 0))
+ if current_price > 0:
+ self._update_peak_price(code, current_price)
- current_holdings[code] = stock
-
- if qty <= 0:
- continue
-
- action = None
- reason = ""
-
- if yld <= -5.0:
- action = "SELL"
- reason = "Stop Loss"
- elif yld >= 8.0:
- action = "SELL"
- reason = "Take Profit"
-
- if action == "SELL":
- print(f"[Bot] Risk Management: {reason} - {name} (Qty: {qty}, Yield: {yld}%)")
- res = self.kis.sell_stock(code, qty)
- if res and res.get("status"):
- self.messenger.send_message(
- f"🔵 [Risk SELL] {name}\n"
- f" Reason: {reason}\n"
- f" Qty: {qty}\n"
- f" Yield: {yld}%")
- self.daily_trade_history.append({
- "action": "SELL", "name": name, "qty": qty,
- "price": stock.get('current_price'), "yield": yld
- })
- self.save_trade_history()
+ # [v2.0] 보유종목도 분석 대상에 포함 (watchlist에 없어도)
+ for code in current_holdings:
+ if code not in target_dict:
+ name = current_holdings[code].get('name', 'Unknown')
+ target_dict[code] = name
+ print(f"[Bot] Added holding to analysis: {name} ({code})")
# 분석 실행 (병렬 처리)
analysis_tasks = []
news_data = await self.news.get_market_news_async()
- # 실시간 잔고 추적용 변수 (매수 시 차감)
tracking_deposit = int(balance.get("deposit", 0))
try:
@@ -306,11 +371,23 @@ class AutoTradingBot:
if not prices:
continue
- # 외인 수급 분석
investor_trend = self.kis.get_investor_trend(ticker)
+ # [v2.0] 보유 정보 전달 (분석 워커에서 동적 손절/익절 사용)
+ holding_info = None
+ if ticker in current_holdings:
+ h = current_holdings[ticker]
+ holding_info = {
+ 'qty': int(h.get('qty', 0)),
+ 'yield': float(h.get('yield', 0.0)),
+ 'purchase_price': float(h.get('purchase_price', 0)),
+ 'current_price': float(h.get('current_price', 0)),
+ 'peak_price': self.peak_prices.get(ticker, float(h.get('current_price', 0)))
+ }
+
future = self.executor.submit(
- analyze_stock_process, ticker, prices, news_data, investor_trend)
+ analyze_stock_process, ticker, prices, news_data,
+ investor_trend, macro_status, holding_info)
analysis_tasks.append(future)
# 결과 처리
@@ -318,41 +395,107 @@ class AutoTradingBot:
for future in analysis_tasks:
try:
res = await loop.run_in_executor(None, future.result)
- ticker_name = target_dict.get(res['ticker'], 'Unknown')
- print(f"[Bot] [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})")
+ ticker = res['ticker']
+ ticker_name = target_dict.get(ticker, 'Unknown')
+ print(f"[Bot] [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})"
+ f" | SL:{res.get('sl_tp', {}).get('stop_loss_pct', 'N/A')}%"
+ f" TP:{res.get('sl_tp', {}).get('take_profit_pct', 'N/A')}%")
+ # ===== 매수 처리 =====
if res['decision'] == "BUY":
if is_crash:
+ print(f"[Bot] [Skip Buy] Market DANGER mode - {ticker_name}")
continue
current_price = float(res['current_price'])
if current_price <= 0:
continue
- qty = 1
+ # [v2.0] 포지션 사이징 (동적 수량)
+ qty = calculate_position_size(
+ total_capital=total_eval if total_eval > 0 else tracking_deposit,
+ current_price=current_price,
+ volatility=res.get('volatility', 2.0),
+ score=res['score'],
+ ai_confidence=res.get('ai_confidence', 0.5)
+ )
+ if qty <= 0:
+ print(f"[Bot] [Skip Buy] Position size = 0 ({ticker_name})")
+ continue
+
required_amount = current_price * qty
# 예수금 확인
if tracking_deposit < required_amount:
- print(f"[Bot] [Skip Buy] 예수금 부족 ({ticker_name}): "
- f"필요 {required_amount:,.0f} > 잔고 {tracking_deposit:,.0f}")
- continue
+ # 수량 줄여서 재시도
+ qty = int(tracking_deposit / current_price)
+ if qty <= 0:
+ print(f"[Bot] [Skip Buy] 예수금 부족 ({ticker_name}): "
+ f"필요 {required_amount:,.0f} > 잔고 {tracking_deposit:,.0f}")
+ continue
+ required_amount = current_price * qty
- print(f"[Bot] Buying {ticker_name} {qty}ea")
- order = self.kis.buy_stock(res['ticker'], qty)
+ print(f"[Bot] Buying {ticker_name} {qty}ea @ ~{current_price:,.0f}")
+ order = self.kis.buy_stock(ticker, qty)
if order.get("status"):
- self.messenger.send_message(
- f"🔴 [BUY] {ticker_name} {qty}주\n"
- f" Price: {current_price:,.0f}원")
+ reason = res.get('decision_reason', '')
+ sl_tp = res.get('sl_tp', {})
+
+ msg = (f"🔴 [BUY] {ticker_name} {qty}주\n"
+ f" Price: {current_price:,.0f}원\n"
+ f" Score: {res['score']:.2f}\n"
+ f" SL: {sl_tp.get('stop_loss_pct', -5):.1f}%"
+ f" | TP: {sl_tp.get('take_profit_pct', 8):.1f}%"
+ f" | Trail: {sl_tp.get('trailing_stop_pct', 3):.1f}%")
+ if reason:
+ msg += f"\n Reason: {reason}"
+
+ self.messenger.send_message(msg)
self.daily_trade_history.append({
"action": "BUY", "name": ticker_name,
- "qty": qty, "price": current_price
+ "qty": qty, "price": current_price,
+ "score": res['score'],
+ "reason": reason
})
self.save_trade_history()
tracking_deposit -= required_amount
- elif res['decision'] == "SELL":
- print(f"[Bot] Selling {ticker_name} (Simulation)")
+ # 최고가 초기 설정
+ self.peak_prices[ticker] = current_price
+ self._save_peak_prices()
+
+ # ===== 매도 처리 (v2.0 - 분석 기반 매도) =====
+ elif res['decision'] == "SELL" and ticker in current_holdings:
+ h = current_holdings[ticker]
+ qty = int(h.get('qty', 0))
+ yld = float(h.get('yield', 0.0))
+ profit_loss = int(h.get('profit_loss', 0))
+
+ if qty > 0:
+ print(f"[Bot] Selling {ticker_name} {qty}ea (Yield: {yld:.1f}%)")
+ sell_res = self.kis.sell_stock(ticker, qty)
+
+ if sell_res and sell_res.get("status"):
+ reason = res.get('decision_reason', 'AI Signal')
+
+ msg = (f"🔵 [SELL] {ticker_name} {qty}주\n"
+ f" Yield: {yld:.1f}%\n"
+ f" P&L: {profit_loss:+,}원\n"
+ f" Reason: {reason}")
+
+ self.messenger.send_message(msg)
+ self.daily_trade_history.append({
+ "action": "SELL", "name": ticker_name,
+ "qty": qty, "price": float(h.get('current_price', 0)),
+ "yield": yld, "profit": profit_loss,
+ "reason": reason
+ })
+ self.save_trade_history()
+
+ # 최고가 기록 삭제
+ if ticker in self.peak_prices:
+ del self.peak_prices[ticker]
+ self._save_peak_prices()
except BrokenProcessPool:
raise
@@ -368,15 +511,19 @@ class AutoTradingBot:
print(f"[Bot] Cycle Loop Error: {e}")
def loop(self):
- print(f"[Bot] Module Started (PID: {os.getpid()})")
- self.messenger.send_message("[Bot Started] 리팩토링된 봇이 시작되었습니다.")
+ print(f"[Bot] Module Started (PID: {os.getpid()}) [v2.0]")
+ self.messenger.send_message(
+ "🚀 [Bot Started v2.0]\n"
+ "개선사항: 동적 손절/익절, 트레일링 스탑, 포지션 사이징, 분석 기반 매도")
+
+ # 최고가 데이터 로드
+ self._load_peak_prices()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while True:
- # shutdown 시그널 체크
if self.shutdown_event and self.shutdown_event.is_set():
print("[Bot] Shutdown signal received.")
break
@@ -387,7 +534,6 @@ class AutoTradingBot:
print(f"[Bot] Loop Error: {e}")
self.messenger.send_message(f"[Bot] Loop Error: {e}")
- # 비동기 sleep (shutdown 체크하면서 대기)
for _ in range(60):
if self.shutdown_event and self.shutdown_event.is_set():
break
diff --git a/modules/strategy/process.py b/modules/strategy/process.py
index c9d07c0..d72947e 100644
--- a/modules/strategy/process.py
+++ b/modules/strategy/process.py
@@ -18,179 +18,369 @@ def get_predictor():
f" | AMP: {_lstm_predictor.use_amp}")
return _lstm_predictor
-def analyze_stock_process(ticker, prices, news_items, investor_trend=None):
+
+def calculate_position_size(total_capital, current_price, volatility, score, ai_confidence,
+ max_per_stock=3000000):
"""
- [CPU Intensive] 기술적 분석 및 AI 판단을 수행하는 함수
- (ProcessPoolExecutor에서 실행됨)
+ [v2.0] 변동성 기반 포지션 사이징 (Modified Kelly Criterion)
+
+ 핵심 원칙:
+ 1. 변동성이 높으면 → 적은 수량 (리스크 관리)
+ 2. 확신도(score)가 높으면 → 많은 수량 (기회 포착)
+ 3. AI 신뢰도가 높으면 → 가산 비중
+ 4. 절대 한 종목에 전체 자산의 15% 이상 투자하지 않음
+
+ Returns:
+ int: 매수 수량 (0이면 매수 안 함)
+ """
+ if current_price <= 0 or total_capital <= 0:
+ return 0
+
+ # 1. 기본 투자금 (전체 자산의 10%)
+ base_invest = total_capital * 0.10
+
+ # 2. 변동성 조절 계수 (변동성 높을수록 투자금 감소)
+ # 변동성 1% → 1.0배, 2% → 0.75배, 3% → 0.5배, 5%+ → 0.3배
+ if volatility <= 1.0:
+ vol_factor = 1.2 # 안정적 종목은 약간 증가
+ elif volatility <= 2.0:
+ vol_factor = 1.0
+ elif volatility <= 3.0:
+ vol_factor = 0.7
+ elif volatility <= 5.0:
+ vol_factor = 0.45
+ else:
+ vol_factor = 0.3 # 고변동 종목
+
+ # 3. 확신도 조절 계수 (score가 높을수록 투자금 증가)
+ # score 0.6 → 0.5배, 0.7 → 1.0배, 0.8 → 1.5배, 0.9+ → 2.0배
+ if score >= 0.85:
+ conf_factor = 2.0
+ elif score >= 0.75:
+ conf_factor = 1.5
+ elif score >= 0.65:
+ conf_factor = 1.0
+ else:
+ conf_factor = 0.5
+
+ # 4. AI 신뢰도 가산
+ ai_bonus = 1.0
+ if ai_confidence >= 0.85:
+ ai_bonus = 1.3
+ elif ai_confidence >= 0.7:
+ ai_bonus = 1.1
+
+ # 5. 최종 투자금 계산
+ invest_amount = base_invest * vol_factor * conf_factor * ai_bonus
+
+ # 상한 제한
+ invest_amount = min(invest_amount, max_per_stock)
+ invest_amount = min(invest_amount, total_capital * 0.15) # 최대 15%
+ invest_amount = min(invest_amount, total_capital) # 잔고 초과 방지
+
+ # 수량 계산
+ qty = int(invest_amount / current_price)
+ return max(0, qty)
+
+
+def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
+ macro_status=None, holding_info=None):
+ """
+ [v2.0] 종목 분석 + 매매 판단 (ProcessPoolExecutor에서 실행)
+
+ [v2.0 개선사항]
+ 1. ATR 기반 동적 손절/익절 + 트레일링 스탑
+ 2. 포지션 사이징 (변동성 + 확신도 기반)
+ 3. 시장상황별 동적 매수/매도 임계값
+ 4. 보유종목에 대한 분석 기반 매도 판단
+ 5. ADX/OBV/MTF 통합 기술적 분석
+ 6. 강화된 AI 프롬프트 (종목 고유 뉴스 분석)
"""
try:
print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles)...")
-
- # 1. 기술적 지표 계산
+
+ # ===== 1. 기술적 지표 계산 =====
current_price = prices[-1] if prices else 0
- # [수정] 변동성, 거래량 비율, MA 정보 반환
- tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(current_price, prices, volume_history=None)
-
- # 2. LSTM 주가 예측
- # [최적화] 전역 캐시된 Predictor 사용
+ tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(
+ current_price, prices, volume_history=None)
+
+ # ===== 2. ATR 기반 동적 손절/익절 =====
+ sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp(prices)
+
+ # ===== 3. LSTM 주가 예측 =====
lstm_predictor = get_predictor()
if lstm_predictor:
lstm_predictor.training_status['current_ticker'] = ticker
pred_result = lstm_predictor.train_and_predict(prices, ticker=ticker)
-
- lstm_score = 0.5 # 중립
+
+ lstm_score = 0.5
ai_confidence = 0.5
ai_loss = 1.0
if pred_result:
ai_confidence = pred_result.get('confidence', 0.5)
ai_loss = pred_result.get('loss', 1.0)
-
- # 상승/하락 예측에 따라 점수 조정 (신뢰도 반영)
- # 최대 5% 변동폭까지 반영
- change_magnitude = min(abs(pred_result['change_rate']), 5.0) / 5.0
-
+
+ change_magnitude = min(abs(pred_result['change_rate']), 5.0) / 5.0
+
if pred_result['trend'] == 'UP':
- # 상승 예측 시: 기본 0.5 + (강도 * 신뢰도 * 0.4) -> 최대 0.9
lstm_score = 0.5 + (change_magnitude * ai_confidence * 0.4)
else:
- # 하락 예측 시: 기본 0.5 - (강도 * 신뢰도 * 0.4) -> 최소 0.1
lstm_score = 0.5 - (change_magnitude * ai_confidence * 0.4)
-
+
lstm_score = max(0.0, min(1.0, lstm_score))
- # [신규] 수급 분석 (외인/기관)
+ # ===== 4. 수급 분석 (외인/기관) =====
investor_score = 0.0
frgn_net_buy = 0
orgn_net_buy = 0
consecutive_frgn_buy = 0
-
+ consecutive_orgn_buy = 0
+
if investor_trend:
- # 최근 5일 합산
for day in investor_trend:
frgn_net_buy += day['foreigner']
orgn_net_buy += day['institutional']
if day['foreigner'] > 0:
consecutive_frgn_buy += 1
-
- # 외인 수급 점수 (단순화)
- if frgn_net_buy > 0:
- investor_score += 0.05
- if consecutive_frgn_buy >= 3:
- investor_score += 0.05
-
- if investor_score > 0:
- print(f" 💰 [Investor] Foreign Buy Detected (Net: {frgn_net_buy})")
+ if day['institutional'] > 0:
+ consecutive_orgn_buy += 1
- # 3. AI 뉴스 분석
- # pred_result가 None일 경우 기본값 사용
+ # 외인 수급 점수 (강화)
+ if frgn_net_buy > 0:
+ investor_score += 0.03
+ if consecutive_frgn_buy >= 3:
+ investor_score += 0.04
+ if consecutive_frgn_buy >= 5:
+ investor_score += 0.03 # 5일 연속 매수 = 추가 보너스
+
+ # 기관 수급 점수 (신규)
+ if orgn_net_buy > 0:
+ investor_score += 0.02
+ if consecutive_orgn_buy >= 3:
+ investor_score += 0.03
+
+ # 외인+기관 동시 순매수 = 강력 신호
+ if frgn_net_buy > 0 and orgn_net_buy > 0:
+ investor_score += 0.03
+ print(f" 💰 [Investor] Both Foreign & Institutional Buying!")
+
+ # ===== 5. AI 뉴스 분석 (강화된 프롬프트) =====
if pred_result:
pred_price = pred_result.get('predicted', 0)
pred_change = pred_result.get('change_rate', 0)
else:
pred_price = current_price
pred_change = 0.0
-
- ollama = OllamaManager()
+
+ ollama = OllamaManager()
prompt = f"""
- [System Instruction]
- 1. Role: You are a Expert Quant Trader with 20 years of experience.
- 2. Market Data:
- - Technical Score: {tech_score:.2f} (RSI: {rsi:.2f})
- - Moving Average: {ma_info['trend']} (Price is {ma_info['position']})
- - AI Prediction: {pred_price:.0f} KRW ({pred_change}%)
- - AI Confidence: {ai_confidence:.2f} (Loss: {ai_loss:.4f})
- - Investor Trend (5 Days): Foreigner Net Buy {frgn_net_buy}, Institutional Net Buy {orgn_net_buy}
- 3. Strategy:
- - If Foreigners are buying AND Trend is UP -> Strong BUY.
- - If AI Confidence > 0.8 and Trend is UP -> Strong BUY.
- - If MA is Bullish (Golden Alignment) -> Positive Signal.
- - If Price is above MA20 -> Support Uptrend.
- - If Trend is DOWN -> SELL/AVOID.
- 4. Task: Analyze the news and combine with market data to decide sentiment.
-
- News Data: {json.dumps(news_items, ensure_ascii=False)}
-
- Response (JSON):
- {{
- "sentiment_score": 0.8,
- "reason": "Foreigners buying and Golden Cross detected."
- }}
- """
+[System Instruction]
+1. Role: You are a legendary quant trader with 30 years of experience in Korean stock market.
+2. You MUST analyze the data objectively and respond with a JSON object.
+
+[Market Data for Stock {ticker}]
+- Current Price: {current_price:,.0f} KRW
+- Technical Score: {tech_score:.3f} (RSI: {rsi:.1f})
+- Moving Average: {ma_info['trend']} (Price is {ma_info['position']})
+- ADX Trend Strength: {ma_info.get('adx', 20):.1f} ({ma_info.get('adx_trend', 'N/A')})
+- Multi-Timeframe: {ma_info.get('mtf_alignment', 'N/A')}
+- AI Prediction: {pred_price:.0f} KRW ({pred_change:+.2f}%)
+- AI Confidence: {ai_confidence:.2f} (Training Loss: {ai_loss:.4f})
+- Volatility: {volatility:.2f}%
+- Volume Ratio: {vol_ratio:.1f}x
+- ATR Stop Loss: {sl_tp['stop_loss_pct']:.1f}% / Take Profit: {sl_tp['take_profit_pct']:.1f}%
+- Investor Trend (5 Days): Foreigner Net Buy {frgn_net_buy}, Institutional Net Buy {orgn_net_buy}
+
+[Decision Framework]
+- Strong BUY signals: Foreigners+Institutions buying, Golden Cross, ADX>25 with bullish trend, AI high confidence UP
+- Moderate BUY: RSI<40 with bullish reversal, Price near Bollinger Lower Band
+- SELL signals: RSI>70, Dead Cross, ADX>25 with bearish trend, Foreigners selling
+- AVOID/HOLD: ADX<20 (sideways), Mixed signals, Low confidence
+
+[News Data]
+{json.dumps(news_items[:5] if news_items else [], ensure_ascii=False)}
+
+[Response Format - JSON Only]
+{{"sentiment_score": 0.0 to 1.0, "reason": "Brief analysis reason"}}
+"""
ai_resp = ollama.request_inference(prompt)
sentiment_score = 0.5
+ ai_reason = ""
try:
data = json.loads(ai_resp)
sentiment_score = float(data.get("sentiment_score", 0.5))
- except:
- pass
-
- # 4. 통합 점수 (동적 가중치)
- # AI 신뢰도가 높으면 AI 비중을 대폭 상향
- if ai_confidence >= 0.85:
- w_tech, w_news, w_ai = 0.2, 0.2, 0.6
- print(f" 🤖 [High Confidence] AI Weight Boosted to 60%")
+ sentiment_score = max(0.0, min(1.0, sentiment_score)) # 범위 강제
+ ai_reason = data.get("reason", "")
+ except Exception:
+ print(f" ⚠️ AI response parse failed, using neutral (0.5)")
+
+ # ===== 6. 통합 점수 (동적 가중치 v2.0) =====
+ # ADX가 높으면 (추세가 강하면) LSTM과 기술적 분석 비중 증가
+ adx_val = ma_info.get('adx', 20)
+
+ if ai_confidence >= 0.85 and adx_val >= 25:
+ # 강한 추세 + 높은 AI 신뢰도: AI 최우선
+ w_tech, w_news, w_ai = 0.15, 0.15, 0.70
+ print(f" 🤖 [Ultra High Confidence + Strong Trend] AI Weight 70%")
+ elif ai_confidence >= 0.85:
+ w_tech, w_news, w_ai = 0.20, 0.20, 0.60
+ print(f" 🤖 [High Confidence] AI Weight 60%")
+ elif adx_val >= 30:
+ # 매우 강한 추세: 기술적 분석 우선
+ w_tech, w_news, w_ai = 0.50, 0.20, 0.30
+ print(f" 📊 [Very Strong Trend ADX={adx_val:.0f}] Tech Weight 50%")
+ elif adx_val < 20:
+ # 비추세/횡보: 뉴스와 수급 중시
+ w_tech, w_news, w_ai = 0.30, 0.40, 0.30
+ print(f" 📰 [Sideways ADX={adx_val:.0f}] News Weight 40%")
else:
- w_tech, w_news, w_ai = 0.4, 0.3, 0.3
-
+ w_tech, w_news, w_ai = 0.35, 0.30, 0.35
+
total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score)
-
- # [수신] 수급 가산점 추가 (최대 +0.1)
- total_score += investor_score
+
+ # 수급 가산점 (최대 +0.15)
+ total_score += min(investor_score, 0.15)
total_score = min(total_score, 1.0)
-
+
+ # ===== 7. 시장 상황별 동적 임계값 =====
+ buy_threshold = 0.60
+ sell_threshold = 0.30
+
+ if macro_status:
+ macro_state = macro_status.get('status', 'SAFE')
+ if macro_state == 'DANGER':
+ buy_threshold = 999.0 # 매수 완전 차단
+ sell_threshold = 0.45 # 매도 기준 상향 (빨리 탈출)
+ print(f" 🚨 [DANGER Market] Buy BLOCKED, Sell threshold raised to 0.45")
+ elif macro_state == 'CAUTION':
+ buy_threshold = 0.72 # 매수 기준 대폭 상향 (보수적)
+ sell_threshold = 0.38 # 매도 기준도 상향
+ print(f" ⚠️ [CAUTION Market] Buy threshold raised to 0.72")
+
+ # ===== 8. 매매 결정 =====
decision = "HOLD"
-
- # [신규] 강한 단일 신호 매수 로직 (기준 강화)
- strong_signal = False
- strong_reason = ""
-
- if tech_score >= 0.80:
- strong_signal = True
- strong_reason = "Super Strong Technical"
- elif lstm_score >= 0.80 and ai_confidence >= 0.8:
- strong_signal = True
- strong_reason = f"High Confidence AI Buy (Conf: {ai_confidence})"
- elif sentiment_score >= 0.85:
- strong_signal = True
- strong_reason = "Strong News Sentiment"
- elif investor_score >= 0.1 and total_score >= 0.6: # 외인 수급이 좋고 전체 점수 양호
- strong_signal = True
- strong_reason = "Strong Foreigner Buying"
-
- if strong_signal:
- decision = "BUY"
- print(f" 🎯 [{strong_reason}] Overriding to BUY!")
- elif total_score >= 0.60: # (0.5 -> 0.6 상향 조정으로 보수적 접근)
- decision = "BUY"
- elif total_score <= 0.30:
- decision = "SELL"
-
- print(f" └─ Scores: Tech={tech_score:.2f} News={sentiment_score:.2f} LSTM={lstm_score:.2f} → Total={total_score:.2f} [{decision}]")
-
+ decision_reason = ""
+
+ # --- 보유 종목 분석 기반 매도 (신규) ---
+ if holding_info:
+ holding_yield = holding_info.get('yield', 0.0)
+ holding_qty = holding_info.get('qty', 0)
+ peak_price = holding_info.get('peak_price', current_price)
+
+ if holding_qty > 0:
+ # A. 동적 손절 (ATR 기반)
+ if holding_yield <= sl_tp['stop_loss_pct']:
+ decision = "SELL"
+ decision_reason = f"Dynamic Stop Loss ({holding_yield:.1f}% <= {sl_tp['stop_loss_pct']:.1f}%)"
+
+ # B. 동적 익절 (ATR 기반)
+ elif holding_yield >= sl_tp['take_profit_pct']:
+ decision = "SELL"
+ decision_reason = f"Dynamic Take Profit ({holding_yield:.1f}% >= {sl_tp['take_profit_pct']:.1f}%)"
+
+ # C. 트레일링 스탑 (최고가 대비 하락)
+ elif peak_price > 0:
+ drop_from_peak = ((current_price - peak_price) / peak_price) * 100
+ if drop_from_peak <= -sl_tp['trailing_stop_pct'] and holding_yield > 2.0:
+ # 수익 상태에서만 트레일링 스탑 작동 (2% 이상 수익 확보)
+ decision = "SELL"
+ decision_reason = (f"Trailing Stop ({drop_from_peak:.1f}% from peak, "
+ f"threshold: -{sl_tp['trailing_stop_pct']:.1f}%)")
+
+ # D. 분석 기반 매도 (점수가 매도 임계값 이하)
+ if decision == "HOLD" and total_score <= sell_threshold:
+ decision = "SELL"
+ decision_reason = f"Analysis Signal (Score: {total_score:.2f} <= {sell_threshold:.2f})"
+
+ # E. 추세 반전 매도 (ADX 강한 하락추세)
+ if decision == "HOLD" and adx_val >= 30:
+ plus_di = ma_info.get('adx', 0) # 참고용
+ mtf_align = ma_info.get('mtf_alignment', '')
+ if mtf_align == 'STRONG_BEAR' and holding_yield < 0:
+ decision = "SELL"
+ decision_reason = f"Strong Bear Trend Reversal (MTF: {mtf_align})"
+
+ # --- 매수 판단 ---
+ if decision == "HOLD":
+ # 강한 단일 신호 매수 (기준 강화)
+ strong_signal = False
+ strong_reason = ""
+
+ # [강화] 복합 조건 매수 (단일 지표가 아닌 복합 조건)
+ if tech_score >= 0.75 and lstm_score >= 0.6 and sentiment_score >= 0.6:
+ strong_signal = True
+ strong_reason = "Triple Confirmation (Tech+AI+News)"
+ elif lstm_score >= 0.80 and ai_confidence >= 0.85 and adx_val >= 25:
+ strong_signal = True
+ strong_reason = f"High Confidence AI + Strong Trend (ADX={adx_val:.0f})"
+ elif investor_score >= 0.10 and tech_score >= 0.60 and total_score >= 0.60:
+ strong_signal = True
+ strong_reason = "Institutional Buying + Good Fundamentals"
+ elif ma_info.get('mtf_alignment') == 'STRONG_BULL' and tech_score >= 0.60:
+ strong_signal = True
+ strong_reason = f"Strong Multi-Timeframe Bullish + Tech {tech_score:.2f}"
+
+ if strong_signal and total_score >= buy_threshold - 0.05:
+ # 강한 신호는 임계값 약간 완화 허용
+ decision = "BUY"
+ decision_reason = strong_reason
+ print(f" 🎯 [{strong_reason}] → BUY!")
+ elif total_score >= buy_threshold:
+ decision = "BUY"
+ decision_reason = f"Score {total_score:.2f} >= threshold {buy_threshold:.2f}"
+
+ # ===== 9. 포지션 사이징 =====
+ suggested_qty = 0
+ if decision == "BUY":
+ # 기본 자산 1000만원으로 가정 (실제 run_cycle에서 덮어씀)
+ suggested_qty = calculate_position_size(
+ total_capital=10000000,
+ current_price=current_price,
+ volatility=volatility,
+ score=total_score,
+ ai_confidence=ai_confidence
+ )
+ if suggested_qty == 0:
+ decision = "HOLD"
+ decision_reason = "Position size too small"
+
+ print(f" └─ Scores: Tech={tech_score:.2f} News={sentiment_score:.2f} "
+ f"LSTM={lstm_score:.2f} Inv={investor_score:.2f} → "
+ f"Total={total_score:.2f} [{decision}]"
+ f"{f' ({decision_reason})' if decision_reason else ''}")
+
return {
"ticker": ticker,
"score": total_score,
"tech": tech_score,
"sentiment": sentiment_score,
"lstm_score": lstm_score,
+ "investor_score": investor_score,
"volatility": volatility,
"volume_ratio": vol_ratio,
"prediction": pred_result,
"decision": decision,
+ "decision_reason": decision_reason,
"current_price": current_price,
- "ma_info": ma_info
+ "ma_info": ma_info,
+ "sl_tp": sl_tp,
+ "suggested_qty": suggested_qty,
+ "ai_confidence": ai_confidence,
+ "ai_reason": ai_reason
}
except Exception as e:
print(f"❌ [Worker Error] Failed to analyze {ticker}: {e}")
import traceback
traceback.print_exc()
- # 기본 실패 응답 반환 (프로세스 크래시 방지)
return {
"ticker": ticker,
"score": 0.0,
"decision": "HOLD",
+ "decision_reason": f"Error: {str(e)}",
"current_price": 0,
+ "sl_tp": {'stop_loss_pct': -5.0, 'take_profit_pct': 8.0, 'trailing_stop_pct': 3.0},
+ "suggested_qty": 0,
"error": str(e)
}