Files
ai-trade/backtester.py

461 lines
19 KiB
Python

import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg') # GUI 없는 환경 대응
import matplotlib.pyplot as plt
from modules.analysis.technical import TechnicalAnalyzer
from modules.analysis.deep_learning import PricePredictor
from modules.strategy.process import calculate_position_size
class Backtester:
"""
[v2.0] 실전 백테스터
개선사항:
1. ATR 기반 동적 손절/익절 + 트레일링 스탑
2. 포지션 사이징 (변동성 기반)
3. 정밀한 수수료/세금 계산
4. 슬리피지(Slippage) 시뮬레이션
5. 다중 성과 지표 (Sharpe, MDD, Win Rate, Avg P/L)
6. 벤치마크 대비 알파 계산
7. 실제 데이터 로드 (yfinance fallback to mock)
"""
def __init__(self, ticker, start_date, end_date, initial_capital=10000000):
self.ticker = ticker
self.start_date = start_date
self.end_date = end_date
self.initial_capital = initial_capital
self.capital = initial_capital
self.holdings = 0
self.avg_price = 0
self.peak_price = 0 # [v2.0] 트레일링 스탑용
self.trade_log = []
self.daily_values = []
self.daily_returns = []
# LSTM 모델
self.predictor = PricePredictor()
# [v2.0] 수수료/세금 설정 (한국 주식)
self.buy_commission = 0.00015 # 매수 수수료 0.015%
self.sell_commission = 0.00015 # 매도 수수료 0.015%
self.sell_tax = 0.0018 # 증권거래세 0.18% (2024~)
self.slippage = 0.001 # 슬리피지 0.1%
def generate_mock_data(self, days=200):
"""실제 시장과 유사한 Mock 데이터 (Random Walk + Mean Reversion)"""
print(f"🎲 [Backtest] Generating realistic mock data for {days} days...")
np.random.seed(42)
start_price = 70000
# Mean-reverting Random Walk (실제 주가에 더 가까움)
mu = 0.0003 # 일평균 기대수익률 0.03%
sigma = 0.018 # 일변동성 1.8%
mean_reversion = 0.02 # 평균 회귀 속도
price_series = [start_price]
for i in range(days):
# 변동성 클러스터링 (GARCH 효과 근사)
if i > 0:
prev_return = (price_series[-1] - price_series[-2]) / price_series[-2]
dynamic_sigma = sigma * (1 + abs(prev_return) * 5) # 큰 변동 후 변동성 증가
else:
dynamic_sigma = sigma
# Mean reversion + trend
if len(price_series) >= 20:
ma20 = np.mean(price_series[-20:])
reversion = -mean_reversion * (price_series[-1] - ma20) / ma20
else:
reversion = 0
shock = np.random.normal(mu + reversion, dynamic_sigma)
new_price = price_series[-1] * (1 + shock)
new_price = max(new_price, start_price * 0.5) # 최소 50% 하한
price_series.append(new_price)
date_range = pd.date_range(start=self.start_date, periods=len(price_series))
self.data = pd.Series(price_series, index=date_range)
print(f"📈 [Mock Data] Start: {price_series[0]:.0f}, End: {price_series[-1]:.0f}, "
f"Min: {min(price_series):.0f}, Max: {max(price_series):.0f}")
return True
def fetch_data(self):
"""실제 데이터 로드 시도 → 실패 시 Mock"""
try:
import yfinance as yf
# 한국 주식은 .KS (KOSPI) 또는 .KQ (KOSDAQ) 접미사 필요
yf_ticker = f"{self.ticker}.KS"
df = yf.download(yf_ticker, start=self.start_date, end=self.end_date)
if not df.empty and len(df) > 60:
self.data = df['Close']
print(f"✅ [Backtest] Loaded {len(self.data)} days from yfinance ({yf_ticker})")
return True
except Exception as e:
print(f"⚠️ [Backtest] yfinance failed: {e}")
return self.generate_mock_data()
def _apply_slippage(self, price, is_buy):
"""슬리피지 적용 (매수 시 높게, 매도 시 낮게)"""
if is_buy:
return price * (1 + self.slippage)
else:
return price * (1 - self.slippage)
def run(self):
if not hasattr(self, 'data') or self.data.empty:
if not self.fetch_data():
return
prices = self.data.values
dates = self.data.index
# 최소 60일 데이터 필요 (LSTM seq_length)
min_days = 60
if len(prices) < min_days + 10:
print("❌ Not enough data for backtest.")
return
print(f"🚀 [Backtest v2.0] Simulation Started ({len(prices)} days)...")
print(f" Capital: {self.initial_capital:,.0f} KRW")
print(f" Commission: Buy {self.buy_commission*100:.3f}% / Sell {self.sell_commission*100:.3f}%")
print(f" Tax: {self.sell_tax*100:.2f}% / Slippage: {self.slippage*100:.1f}%")
for i in range(min_days, len(prices)):
today_date = dates[i]
today_price = float(prices[i])
# 과거 데이터 윈도우
history_window = prices[max(0, i-min_days):i+1]
if hasattr(history_window, 'values'):
current_window_list = history_window.values.tolist()
elif isinstance(history_window, np.ndarray):
current_window_list = history_window.tolist()
else:
current_window_list = list(history_window)
# 1. 기술적 분석
tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(
today_price, current_window_list)
# 2. ATR 기반 동적 손절/익절
sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp(current_window_list)
# 3. LSTM 예측 (10일마다 재학습 → 속도 타협)
lstm_score = 0.5
ai_confidence = 0.5
if i % 10 == 0 or i == min_days:
pred_result = self.predictor.train_and_predict(current_window_list)
else:
# 재학습 없이 예측만
pred_result = self.predictor.train_and_predict(current_window_list, ticker=f"bt_{self.ticker}")
if pred_result:
ai_confidence = pred_result.get('confidence', 0.5)
change_mag = min(abs(pred_result['change_rate']), 5.0) / 5.0
if pred_result['trend'] == 'UP':
lstm_score = 0.5 + (change_mag * ai_confidence * 0.4)
else:
lstm_score = 0.5 - (change_mag * ai_confidence * 0.4)
lstm_score = max(0.0, min(1.0, lstm_score))
# 4. 뉴스 감정 (백테스트에서는 중립)
sentiment_score = 0.5
# 5. 통합 점수 (ADX 기반 동적 가중치)
adx_val = ma_info.get('adx', 20)
if adx_val >= 30:
w_tech, w_news, w_ai = 0.50, 0.15, 0.35
elif adx_val < 20:
w_tech, w_news, w_ai = 0.35, 0.30, 0.35
else:
w_tech, w_news, w_ai = 0.40, 0.25, 0.35
total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score)
# 6. 리스크 관리 (보유 중일 때)
action = "HOLD"
action_reason = ""
if self.holdings > 0:
profit_rate = ((today_price - self.avg_price) / self.avg_price) * 100
# A. 동적 손절 (ATR 기반)
if profit_rate <= sl_tp['stop_loss_pct']:
action = "SELL"
action_reason = f"Dynamic SL ({profit_rate:.1f}% <= {sl_tp['stop_loss_pct']:.1f}%)"
# B. 동적 익절 (ATR 기반)
elif profit_rate >= sl_tp['take_profit_pct']:
action = "SELL"
action_reason = f"Dynamic TP ({profit_rate:.1f}% >= {sl_tp['take_profit_pct']:.1f}%)"
# C. 트레일링 스탑
elif self.peak_price > 0 and profit_rate > 2.0:
drop_from_peak = ((today_price - self.peak_price) / self.peak_price) * 100
if drop_from_peak <= -sl_tp['trailing_stop_pct']:
action = "SELL"
action_reason = f"Trailing Stop ({drop_from_peak:.1f}% from peak)"
# D. AI 매도 시그널
if action == "HOLD" and total_score <= 0.30:
action = "SELL"
action_reason = f"AI Signal (Score: {total_score:.2f})"
# 최고가 업데이트
if today_price > self.peak_price:
self.peak_price = today_price
# 7. 매수 로직 (v2.0 - 포지션 사이징)
if action == "HOLD" and total_score >= 0.60:
# 복합 조건 확인
should_buy = False
mtf_align = ma_info.get('mtf_alignment', 'NEUTRAL')
if tech_score >= 0.70 and lstm_score >= 0.55:
should_buy = True
elif total_score >= 0.65 and mtf_align in ['STRONG_BULL', 'BULL']:
should_buy = True
elif total_score >= 0.70:
should_buy = True
if should_buy:
# 포지션 사이징
total_val = self.capital + (self.holdings * today_price)
current_pos_val = self.holdings * today_price
max_pos = total_val * 0.30 # 최대 30% 비중
if current_pos_val < max_pos:
qty = calculate_position_size(
total_capital=total_val,
current_price=today_price,
volatility=volatility,
score=total_score,
ai_confidence=ai_confidence
)
if qty > 0:
action = "BUY"
# 8. 주문 실행
if action == "BUY":
# 슬리피지 적용
exec_price = self._apply_slippage(today_price, is_buy=True)
total_val = self.capital + (self.holdings * today_price)
qty = calculate_position_size(
total_capital=total_val,
current_price=exec_price,
volatility=volatility,
score=total_score,
ai_confidence=ai_confidence
)
if qty <= 0:
qty = 1
cost = qty * exec_price
fee = cost * self.buy_commission
if self.capital >= cost + fee:
# 평단가 갱신
total_cost = (self.avg_price * self.holdings) + cost
self.holdings += qty
self.avg_price = total_cost / self.holdings
self.capital -= (cost + fee)
self.peak_price = max(self.peak_price, exec_price)
self.trade_log.append({
"date": today_date.strftime("%Y-%m-%d"),
"action": "BUY",
"price": today_price,
"exec_price": exec_price,
"qty": qty,
"score": round(total_score, 3),
"volatility": round(volatility, 2),
"fee": round(fee, 0),
"balance": round(self.capital, 0)
})
elif action == "SELL" and self.holdings > 0:
exec_price = self._apply_slippage(today_price, is_buy=False)
qty = self.holdings
revenue = qty * exec_price
fee = revenue * self.sell_commission
tax = revenue * self.sell_tax
net_revenue = revenue - fee - tax
profit = net_revenue - (self.avg_price * qty)
self.capital += net_revenue
self.trade_log.append({
"date": today_date.strftime("%Y-%m-%d"),
"action": "SELL",
"price": today_price,
"exec_price": exec_price,
"qty": qty,
"reason": action_reason,
"profit": round(profit, 0),
"fee": round(fee + tax, 0),
"balance": round(self.capital, 0)
})
self.holdings = 0
self.avg_price = 0
self.peak_price = 0
# 일별 가치 기록
total_val = self.capital + (self.holdings * today_price)
self.daily_values.append(total_val)
if len(self.daily_values) >= 2:
daily_return = (self.daily_values[-1] - self.daily_values[-2]) / self.daily_values[-2]
self.daily_returns.append(daily_return)
self.print_summary()
self.plot_results()
def print_summary(self):
if not self.daily_values:
print("❌ No simulation data.")
return
final_val = self.daily_values[-1]
roi = ((final_val - self.initial_capital) / self.initial_capital) * 100
# 성과 지표 계산
returns = np.array(self.daily_returns) if self.daily_returns else np.array([0])
# Sharpe Ratio (연환산, 무위험수익률 3.5% 가정)
rf_daily = 0.035 / 252
if np.std(returns) > 0:
sharpe = (np.mean(returns) - rf_daily) / np.std(returns) * np.sqrt(252)
else:
sharpe = 0
# Maximum Drawdown (MDD)
peak = np.maximum.accumulate(self.daily_values)
drawdown = (np.array(self.daily_values) - peak) / peak * 100
mdd = drawdown.min()
# Win Rate
sell_trades = [t for t in self.trade_log if t['action'] == 'SELL']
wins = [t for t in sell_trades if t.get('profit', 0) > 0]
losses = [t for t in sell_trades if t.get('profit', 0) <= 0]
win_rate = len(wins) / max(1, len(sell_trades)) * 100
# 평균 손익
avg_win = np.mean([t['profit'] for t in wins]) if wins else 0
avg_loss = np.mean([t['profit'] for t in losses]) if losses else 0
profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')
# 총 수수료/세금
total_fees = sum(t.get('fee', 0) for t in self.trade_log)
print("\n" + "=" * 55)
print(f"📊 [Backtest v2.0 Result] {self.ticker}")
print("=" * 55)
print(f" 💰 Initial Capital : {self.initial_capital:>15,.0f} KRW")
print(f" 💰 Final Value : {final_val:>15,.0f} KRW")
print(f" 📈 Return (ROI) : {roi:>14.2f}%")
print(f" 📉 Max Drawdown : {mdd:>14.2f}%")
print(f" 📊 Sharpe Ratio : {sharpe:>14.2f}")
print(f" 🎯 Win Rate : {win_rate:>14.1f}% ({len(wins)}/{len(sell_trades)})")
print(f" 💵 Avg Win : {avg_win:>15,.0f} KRW")
print(f" 💸 Avg Loss : {avg_loss:>15,.0f} KRW")
print(f" ⚖️ Profit Factor : {profit_factor:>14.2f}")
print(f" 💳 Total Fees/Tax : {total_fees:>15,.0f} KRW")
print(f" 🔄 Total Trades : {len(self.trade_log):>14}")
print("=" * 55)
# 최근 10개 거래 로그
print("\n📝 Recent Trades:")
for trade in self.trade_log[-10:]:
emoji = "🔴" if trade['action'] == "BUY" else "🔵"
line = (f" {trade['date']} {emoji} {trade['action']} "
f"{trade['qty']}ea @ {trade['price']:,.0f}")
if 'profit' in trade:
p = trade['profit']
line += f" | P&L: {p:+,.0f}"
if 'reason' in trade:
line += f" | {trade['reason']}"
print(line)
def plot_results(self):
"""결과 차트 생성"""
if not self.daily_values:
return
try:
fig, axes = plt.subplots(3, 1, figsize=(14, 10), gridspec_kw={'height_ratios': [3, 1, 1]})
# 1. 포트폴리오 가치 vs Buy & Hold
ax1 = axes[0]
ax1.plot(self.daily_values, label='Strategy', color='blue', linewidth=1.5)
# Buy & Hold 비교
if hasattr(self, 'data'):
prices = self.data.values[60:]
if len(prices) == len(self.daily_values):
bh_shares = self.initial_capital / prices[0]
bh_values = bh_shares * prices
ax1.plot(bh_values, label='Buy & Hold', color='gray', alpha=0.5, linestyle='--')
ax1.axhline(y=self.initial_capital, color='red', linestyle=':', alpha=0.3, label='Initial')
ax1.set_title(f'Backtest Result: {self.ticker}', fontsize=14, fontweight='bold')
ax1.set_ylabel('Portfolio Value (KRW)')
ax1.legend(loc='upper left')
ax1.grid(True, alpha=0.3)
# 매매 포인트 표시
for trade in self.trade_log:
day_idx = None
for j, d in enumerate(self.data.index[60:]):
if d.strftime("%Y-%m-%d") == trade['date']:
day_idx = j
break
if day_idx is not None and day_idx < len(self.daily_values):
if trade['action'] == 'BUY':
ax1.scatter(day_idx, self.daily_values[day_idx], marker='^',
color='red', s=40, zorder=5)
else:
ax1.scatter(day_idx, self.daily_values[day_idx], marker='v',
color='blue', s=40, zorder=5)
# 2. Drawdown
ax2 = axes[1]
peak = np.maximum.accumulate(self.daily_values)
drawdown = (np.array(self.daily_values) - peak) / peak * 100
ax2.fill_between(range(len(drawdown)), drawdown, 0, color='red', alpha=0.3)
ax2.set_ylabel('Drawdown (%)')
ax2.grid(True, alpha=0.3)
# 3. Daily Returns
ax3 = axes[2]
if self.daily_returns:
colors = ['green' if r >= 0 else 'red' for r in self.daily_returns]
ax3.bar(range(len(self.daily_returns)), [r * 100 for r in self.daily_returns],
color=colors, alpha=0.5, width=1)
ax3.set_ylabel('Daily Return (%)')
ax3.set_xlabel('Trading Days')
ax3.grid(True, alpha=0.3)
plt.tight_layout()
chart_path = f"data/backtest_{self.ticker}.png"
plt.savefig(chart_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"\n📊 Chart saved: {chart_path}")
except Exception as e:
print(f"⚠️ Chart generation failed: {e}")
if __name__ == "__main__":
print("=" * 55)
print("🚀 AI Trading Backtester v2.0")
print("=" * 55)
# 삼성전자 6개월 백테스팅
backtester = Backtester("005930", start_date="2025-06-01", end_date="2026-02-01")
backtester.run()