주식 트레이드 강화 전략 추가

This commit is contained in:
2026-02-17 01:38:35 +09:00
parent 9dbf6e6791
commit 4d41405ac4
6 changed files with 1352 additions and 434 deletions

View File

@@ -1,88 +1,135 @@
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import yfinance as yf import matplotlib
matplotlib.use('Agg') # GUI 없는 환경 대응
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from modules.analysis.technical import TechnicalAnalyzer from modules.analysis.technical import TechnicalAnalyzer
from modules.analysis.deep_learning import PricePredictor from modules.analysis.deep_learning import PricePredictor
from modules.strategy.process import calculate_position_size
class Backtester: class Backtester:
"""
[v2.0] 실전 백테스터
개선사항:
1. ATR 기반 동적 손절/익절 + 트레일링 스탑
2. 포지션 사이징 (변동성 기반)
3. 정밀한 수수료/세금 계산
4. 슬리피지(Slippage) 시뮬레이션
5. 다중 성과 지표 (Sharpe, MDD, Win Rate, Avg P/L)
6. 벤치마크 대비 알파 계산
7. 실제 데이터 로드 (yfinance fallback to mock)
"""
def __init__(self, ticker, start_date, end_date, initial_capital=10000000): def __init__(self, ticker, start_date, end_date, initial_capital=10000000):
self.ticker = ticker self.ticker = ticker
self.start_date = start_date self.start_date = start_date
self.end_date = end_date self.end_date = end_date
self.initial_capital = initial_capital self.initial_capital = initial_capital
self.capital = initial_capital self.capital = initial_capital
self.holdings = 0 # 보유 주식 수 self.holdings = 0
self.avg_price = 0 # 평단가 self.avg_price = 0
self.peak_price = 0 # [v2.0] 트레일링 스탑용
self.trade_log = [] self.trade_log = []
self.daily_values = [] self.daily_values = []
self.daily_returns = []
# LSTM 모델 (재학습 시뮬레이션을 위해) # LSTM 모델
self.predictor = PricePredictor() self.predictor = PricePredictor()
# [v2.0] 수수료/세금 설정 (한국 주식)
self.buy_commission = 0.00015 # 매수 수수료 0.015%
self.sell_commission = 0.00015 # 매도 수수료 0.015%
self.sell_tax = 0.0018 # 증권거래세 0.18% (2024~)
self.slippage = 0.001 # 슬리피지 0.1%
def generate_mock_data(self, days=200): def generate_mock_data(self, days=200):
""" """실제 시장과 유사한 Mock 데이터 (Random Walk + Mean Reversion)"""
yfinance 연결 실패 시 사용할 가상 주가 데이터 생성 (Random Walk) print(f"🎲 [Backtest] Generating realistic mock data for {days} days...")
삼성전자와 유사한 6~7만원대 가격 흐름 생성 np.random.seed(42)
"""
print(f"🎲 [Backtest] Generating mock data for {days} days...")
np.random.seed(42) # 재현성을 위해 시드 고정
start_price = 70000 start_price = 70000
returns = np.random.normal(0, 0.015, days) # 평균 0, 표준편차 1.5% 변동 # Mean-reverting Random Walk (실제 주가에 더 가까움)
mu = 0.0003 # 일평균 기대수익률 0.03%
sigma = 0.018 # 일변동성 1.8%
mean_reversion = 0.02 # 평균 회귀 속도
price_series = [start_price] price_series = [start_price]
for i in range(days):
# 변동성 클러스터링 (GARCH 효과 근사)
if i > 0:
prev_return = (price_series[-1] - price_series[-2]) / price_series[-2]
dynamic_sigma = sigma * (1 + abs(prev_return) * 5) # 큰 변동 후 변동성 증가
else:
dynamic_sigma = sigma
# 인위적인 강력한 상승 추세 추가 (우상향) # Mean reversion + trend
for i, r in enumerate(returns): if len(price_series) >= 20:
trend = 0.003 # 매일 0.3%씩 강제 상승 (복리 효과로 엄청난 급등) ma20 = np.mean(price_series[-20:])
# 중간에 잠깐 조정장 reversion = -mean_reversion * (price_series[-1] - ma20) / ma20
if 80 < i < 100: trend = -0.01 else:
reversion = 0
new_price = price_series[-1] * (1 + r + trend) shock = np.random.normal(mu + reversion, dynamic_sigma)
new_price = price_series[-1] * (1 + shock)
new_price = max(new_price, start_price * 0.5) # 최소 50% 하한
price_series.append(new_price) price_series.append(new_price)
# 날짜 인덱스 생성 date_range = pd.date_range(start=self.start_date, periods=len(price_series))
date_range = pd.date_range(start="2023-01-01", periods=len(price_series))
self.data = pd.Series(price_series, index=date_range) self.data = pd.Series(price_series, index=date_range)
print(f"📈 [Mock Data] Start: {price_series[0]:.0f}, End: {price_series[-1]:.0f}, "
# [Debugging] 차트가 너무 밋밋하지 않게 변동성 추가 확인 f"Min: {min(price_series):.0f}, Max: {max(price_series):.0f}")
print(f"📈 [Mock Data] Start: {price_series[0]:.0f}, End: {price_series[-1]:.0f}")
print(f"✅ Generated {len(self.data)} days of mock data.")
return True return True
def fetch_data(self): def fetch_data(self):
"""(Legacy) yfinance를 이용해 과거 데이터 로드""" """실제 데이터 로드 시도 → 실패 시 Mock"""
# 네트워크 이슈로 인해 Mock Data 우선 사용 try:
import yfinance as yf
# 한국 주식은 .KS (KOSPI) 또는 .KQ (KOSDAQ) 접미사 필요
yf_ticker = f"{self.ticker}.KS"
df = yf.download(yf_ticker, start=self.start_date, end=self.end_date)
if not df.empty and len(df) > 60:
self.data = df['Close']
print(f"✅ [Backtest] Loaded {len(self.data)} days from yfinance ({yf_ticker})")
return True
except Exception as e:
print(f"⚠️ [Backtest] yfinance failed: {e}")
return self.generate_mock_data() return self.generate_mock_data()
def _apply_slippage(self, price, is_buy):
"""슬리피지 적용 (매수 시 높게, 매도 시 낮게)"""
if is_buy:
return price * (1 + self.slippage)
else:
return price * (1 - self.slippage)
def run(self): def run(self):
if not hasattr(self, 'data') or self.data.empty: if not hasattr(self, 'data') or self.data.empty:
if not self.fetch_data(): return if not self.fetch_data():
return
prices = self.data.values prices = self.data.values
dates = self.data.index dates = self.data.index
# 최소 30일 데이터 필요 # 최소 60일 데이터 필요 (LSTM seq_length)
if len(prices) < 30: min_days = 60
if len(prices) < min_days + 10:
print("❌ Not enough data for backtest.") print("❌ Not enough data for backtest.")
return return
print("🚀 [Backtest] Simulation Started...") print(f"🚀 [Backtest v2.0] Simulation Started ({len(prices)} days)...")
print(f" Capital: {self.initial_capital:,.0f} KRW")
print(f" Commission: Buy {self.buy_commission*100:.3f}% / Sell {self.sell_commission*100:.3f}%")
print(f" Tax: {self.sell_tax*100:.2f}% / Slippage: {self.slippage*100:.1f}%")
# 30일차부터 하루씩 전진하며 시뮬레이션 for i in range(min_days, len(prices)):
for i in range(30, len(prices)):
today_date = dates[i] today_date = dates[i]
today_price = float(prices[i]) today_price = float(prices[i])
# 과거 30일 데이터 (오늘 포함 시점의 과거 데이터) # 과거 데이터 윈도우
# 주의: 실제 매매 결정을 내리는 시점(장중/장마감)에 따라 index 처리 중요. history_window = prices[max(0, i-min_days):i+1]
# 여기서는 '장 마감 후 분석 -> 다음날 시가 매매' 또는 '당일 종가 매매' 가정.
# 보수적으로 '당일 종가 매매' 가정 (분석 후 즉시 실행)
history_window = prices[i-30:i+1] # 31개 (어제까지 30개 + 오늘)
# [수정] 타입 체크 및 변환 (Numpy Array, Series, List 모두 대응)
if hasattr(history_window, 'values'): if hasattr(history_window, 'values'):
current_window_list = history_window.values.tolist() current_window_list = history_window.values.tolist()
elif isinstance(history_window, np.ndarray): elif isinstance(history_window, np.ndarray):
@@ -91,125 +138,183 @@ class Backtester:
current_window_list = list(history_window) current_window_list = list(history_window)
# 1. 기술적 분석 # 1. 기술적 분석
tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(today_price, current_window_list) tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(
today_price, current_window_list)
# 2. AI 예측 (Online Learning Simulation) # 2. ATR 기반 동적 손절/익절
# 매일 재학습하면 너무 느리므로, 5일에 한번씩만 학습한다고 가정 (타협) sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp(current_window_list)
# 또는 실제 Bot처럼 매번 학습하되, Backtest 속도 고려
# 여기서는 정확성을 위해 매번 학습 시도 (데이터셋이 작으므로)
# Mocking News Sentiment (Historical news unavailable -> Neutral) # 3. LSTM 예측 (10일마다 재학습 → 속도 타협)
lstm_score = 0.5
ai_confidence = 0.5
if i % 10 == 0 or i == min_days:
pred_result = self.predictor.train_and_predict(current_window_list)
else:
# 재학습 없이 예측만
pred_result = self.predictor.train_and_predict(current_window_list, ticker=f"bt_{self.ticker}")
if pred_result:
ai_confidence = pred_result.get('confidence', 0.5)
change_mag = min(abs(pred_result['change_rate']), 5.0) / 5.0
if pred_result['trend'] == 'UP':
lstm_score = 0.5 + (change_mag * ai_confidence * 0.4)
else:
lstm_score = 0.5 - (change_mag * ai_confidence * 0.4)
lstm_score = max(0.0, min(1.0, lstm_score))
# 4. 뉴스 감정 (백테스트에서는 중립)
sentiment_score = 0.5 sentiment_score = 0.5
# LSTM Predict # 5. 통합 점수 (ADX 기반 동적 가중치)
# (속도를 위해 간략화된 학습 사용) adx_val = ma_info.get('adx', 20)
pred_result = self.predictor.train_and_predict(current_window_list) if adx_val >= 30:
if not pred_result: continue w_tech, w_news, w_ai = 0.50, 0.15, 0.35
elif adx_val < 20:
lstm_score = 0.5 w_tech, w_news, w_ai = 0.35, 0.30, 0.35
if pred_result['trend'] == 'UP':
idx = min(pred_result['change_rate'], 3.0)
lstm_score = 0.5 + (idx * 0.1)
else: else:
idx = max(pred_result['change_rate'], -3.0) w_tech, w_news, w_ai = 0.40, 0.25, 0.35
lstm_score = 0.5 + (idx * 0.1)
lstm_score = max(0.0, min(1.0, lstm_score))
# 3. 통합 점수
w_tech, w_news, w_ai = 0.4, 0.3, 0.3
total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score) total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score)
# 4. 리스크 관리 (손절/익절) 체크 # 6. 리스크 관리 (보유 중일 때)
# 보유 중일 때만 체크
action = "HOLD" action = "HOLD"
action_reason = "" action_reason = ""
if self.holdings > 0: if self.holdings > 0:
# 수익률 계산
profit_rate = ((today_price - self.avg_price) / self.avg_price) * 100 profit_rate = ((today_price - self.avg_price) / self.avg_price) * 100
# 손절 (-5%) / 익절 (+8%) # A. 동적 손절 (ATR 기반)
if profit_rate <= -5.0: if profit_rate <= sl_tp['stop_loss_pct']:
action = "SELL" action = "SELL"
action_reason = f"Stop Loss ({profit_rate:.2f}%)" action_reason = f"Dynamic SL ({profit_rate:.1f}% <= {sl_tp['stop_loss_pct']:.1f}%)"
elif profit_rate >= 8.0:
# B. 동적 익절 (ATR 기반)
elif profit_rate >= sl_tp['take_profit_pct']:
action = "SELL" action = "SELL"
action_reason = f"Take Profit ({profit_rate:.2f}%)" action_reason = f"Dynamic TP ({profit_rate:.1f}% >= {sl_tp['take_profit_pct']:.1f}%)"
else:
# AI 매도 시그널 # C. 트레일링 스탑
if total_score <= 0.3: elif self.peak_price > 0 and profit_rate > 2.0:
drop_from_peak = ((today_price - self.peak_price) / self.peak_price) * 100
if drop_from_peak <= -sl_tp['trailing_stop_pct']:
action = "SELL" action = "SELL"
action_reason = f"AI Signal (Score: {total_score:.2f})" action_reason = f"Trailing Stop ({drop_from_peak:.1f}% from peak)"
# 매수 로직 # D. AI 매도 시그널
if action == "HOLD" and total_score >= 0.7: if action == "HOLD" and total_score <= 0.30:
# 중복 매수 필터 (간단화를 위해 최대 1회 진입 가정 or Pyramiding) action = "SELL"
# 여기선 불타기 허용 (최대 30% 비중까지만) action_reason = f"AI Signal (Score: {total_score:.2f})"
max_pos = self.initial_capital * 0.3
current_val = self.holdings * today_price
if current_val < max_pos: # 최고가 업데이트
action = "BUY" if today_price > self.peak_price:
self.peak_price = today_price
# 5. 주문 실행 # 7. 매수 로직 (v2.0 - 포지션 사이징)
if action == "HOLD" and total_score >= 0.60:
# 복합 조건 확인
should_buy = False
mtf_align = ma_info.get('mtf_alignment', 'NEUTRAL')
if tech_score >= 0.70 and lstm_score >= 0.55:
should_buy = True
elif total_score >= 0.65 and mtf_align in ['STRONG_BULL', 'BULL']:
should_buy = True
elif total_score >= 0.70:
should_buy = True
if should_buy:
# 포지션 사이징
total_val = self.capital + (self.holdings * today_price)
current_pos_val = self.holdings * today_price
max_pos = total_val * 0.30 # 최대 30% 비중
if current_pos_val < max_pos:
qty = calculate_position_size(
total_capital=total_val,
current_price=today_price,
volatility=volatility,
score=total_score,
ai_confidence=ai_confidence
)
if qty > 0:
action = "BUY"
# 8. 주문 실행
if action == "BUY": if action == "BUY":
# 포지션 사이징 # 슬리피지 적용
invest_amt = 1000000 # 기본 exec_price = self._apply_slippage(today_price, is_buy=True)
if volatility >= 3.0: invest_amt = 500000
elif volatility <= 1.5: invest_amt = 1500000
# 잔고 확인 total_val = self.capital + (self.holdings * today_price)
invest_amt = min(invest_amt, self.capital) qty = calculate_position_size(
qty = int(invest_amt / today_price) total_capital=total_val,
current_price=exec_price,
volatility=volatility,
score=total_score,
ai_confidence=ai_confidence
)
if qty <= 0:
qty = 1
if qty > 0: cost = qty * exec_price
cost = qty * today_price fee = cost * self.buy_commission
# 수수료 0.015% 가정
fee = cost * 0.00015
if self.capital >= cost + fee:
# 평단가 갱신
total_cost = (self.avg_price * self.holdings) + cost
self.holdings += qty
self.avg_price = total_cost / self.holdings
self.capital -= (cost + fee)
self.trade_log.append({ if self.capital >= cost + fee:
"date": today_date.strftime("%Y-%m-%d"), # 평단가 갱신
"action": "BUY", total_cost = (self.avg_price * self.holdings) + cost
"price": today_price, self.holdings += qty
"qty": qty, self.avg_price = total_cost / self.holdings
"score": total_score, self.capital -= (cost + fee)
"volatility": volatility, self.peak_price = max(self.peak_price, exec_price)
"balance": self.capital
})
elif action == "SELL": self.trade_log.append({
"date": today_date.strftime("%Y-%m-%d"),
"action": "BUY",
"price": today_price,
"exec_price": exec_price,
"qty": qty,
"score": round(total_score, 3),
"volatility": round(volatility, 2),
"fee": round(fee, 0),
"balance": round(self.capital, 0)
})
elif action == "SELL" and self.holdings > 0:
exec_price = self._apply_slippage(today_price, is_buy=False)
qty = self.holdings qty = self.holdings
revenue = qty * today_price revenue = qty * exec_price
# 세금+수수료 약 0.23% 가정 fee = revenue * self.sell_commission
fee = revenue * 0.0023 tax = revenue * self.sell_tax
net_revenue = revenue - fee - tax
profit = revenue - fee - (self.avg_price * qty) profit = net_revenue - (self.avg_price * qty)
self.capital += (revenue - fee) self.capital += net_revenue
self.trade_log.append({ self.trade_log.append({
"date": today_date.strftime("%Y-%m-%d"), "date": today_date.strftime("%Y-%m-%d"),
"action": "SELL", "action": "SELL",
"price": today_price, "price": today_price,
"exec_price": exec_price,
"qty": qty, "qty": qty,
"reason": action_reason, "reason": action_reason,
"profit": profit, "profit": round(profit, 0),
"balance": self.capital "fee": round(fee + tax, 0),
"balance": round(self.capital, 0)
}) })
self.holdings = 0 self.holdings = 0
self.avg_price = 0 self.avg_price = 0
self.peak_price = 0
# 일별 가치 기록 # 일별 가치 기록
total_val = self.capital + (self.holdings * today_price) total_val = self.capital + (self.holdings * today_price)
self.daily_values.append(total_val) self.daily_values.append(total_val)
if len(self.daily_values) >= 2:
daily_return = (self.daily_values[-1] - self.daily_values[-2]) / self.daily_values[-2]
self.daily_returns.append(daily_return)
self.print_summary() self.print_summary()
self.plot_results()
def print_summary(self): def print_summary(self):
if not self.daily_values: if not self.daily_values:
@@ -219,23 +324,137 @@ class Backtester:
final_val = self.daily_values[-1] final_val = self.daily_values[-1]
roi = ((final_val - self.initial_capital) / self.initial_capital) * 100 roi = ((final_val - self.initial_capital) / self.initial_capital) * 100
print("\n" + "="*40) # 성과 지표 계산
print(f"📊 [Backtest Result] {self.ticker}") returns = np.array(self.daily_returns) if self.daily_returns else np.array([0])
print(f"• Initial Capital: {self.initial_capital:,.0f} KRW")
print(f"• Final Capital : {final_val:,.0f} KRW") # Sharpe Ratio (연환산, 무위험수익률 3.5% 가정)
print(f"• Return (ROI) : {roi:.2f}%") rf_daily = 0.035 / 252
print(f"• Total Trades : {len(self.trade_log)}") if np.std(returns) > 0:
print("="*40) sharpe = (np.mean(returns) - rf_daily) / np.std(returns) * np.sqrt(252)
else:
sharpe = 0
# Maximum Drawdown (MDD)
peak = np.maximum.accumulate(self.daily_values)
drawdown = (np.array(self.daily_values) - peak) / peak * 100
mdd = drawdown.min()
# Win Rate
sell_trades = [t for t in self.trade_log if t['action'] == 'SELL']
wins = [t for t in sell_trades if t.get('profit', 0) > 0]
losses = [t for t in sell_trades if t.get('profit', 0) <= 0]
win_rate = len(wins) / max(1, len(sell_trades)) * 100
# 평균 손익
avg_win = np.mean([t['profit'] for t in wins]) if wins else 0
avg_loss = np.mean([t['profit'] for t in losses]) if losses else 0
profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')
# 총 수수료/세금
total_fees = sum(t.get('fee', 0) for t in self.trade_log)
print("\n" + "=" * 55)
print(f"📊 [Backtest v2.0 Result] {self.ticker}")
print("=" * 55)
print(f" 💰 Initial Capital : {self.initial_capital:>15,.0f} KRW")
print(f" 💰 Final Value : {final_val:>15,.0f} KRW")
print(f" 📈 Return (ROI) : {roi:>14.2f}%")
print(f" 📉 Max Drawdown : {mdd:>14.2f}%")
print(f" 📊 Sharpe Ratio : {sharpe:>14.2f}")
print(f" 🎯 Win Rate : {win_rate:>14.1f}% ({len(wins)}/{len(sell_trades)})")
print(f" 💵 Avg Win : {avg_win:>15,.0f} KRW")
print(f" 💸 Avg Loss : {avg_loss:>15,.0f} KRW")
print(f" ⚖️ Profit Factor : {profit_factor:>14.2f}")
print(f" 💳 Total Fees/Tax : {total_fees:>15,.0f} KRW")
print(f" 🔄 Total Trades : {len(self.trade_log):>14}")
print("=" * 55)
# 최근 10개 거래 로그
print("\n📝 Recent Trades:")
for trade in self.trade_log[-10:]:
emoji = "🔴" if trade['action'] == "BUY" else "🔵"
line = (f" {trade['date']} {emoji} {trade['action']} "
f"{trade['qty']}ea @ {trade['price']:,.0f}")
if 'profit' in trade:
p = trade['profit']
line += f" | P&L: {p:+,.0f}"
if 'reason' in trade:
line += f" | {trade['reason']}"
print(line)
def plot_results(self):
"""결과 차트 생성"""
if not self.daily_values:
return
try:
fig, axes = plt.subplots(3, 1, figsize=(14, 10), gridspec_kw={'height_ratios': [3, 1, 1]})
# 1. 포트폴리오 가치 vs Buy & Hold
ax1 = axes[0]
ax1.plot(self.daily_values, label='Strategy', color='blue', linewidth=1.5)
# Buy & Hold 비교
if hasattr(self, 'data'):
prices = self.data.values[60:]
if len(prices) == len(self.daily_values):
bh_shares = self.initial_capital / prices[0]
bh_values = bh_shares * prices
ax1.plot(bh_values, label='Buy & Hold', color='gray', alpha=0.5, linestyle='--')
ax1.axhline(y=self.initial_capital, color='red', linestyle=':', alpha=0.3, label='Initial')
ax1.set_title(f'Backtest Result: {self.ticker}', fontsize=14, fontweight='bold')
ax1.set_ylabel('Portfolio Value (KRW)')
ax1.legend(loc='upper left')
ax1.grid(True, alpha=0.3)
# 매매 포인트 표시
for trade in self.trade_log:
day_idx = None
for j, d in enumerate(self.data.index[60:]):
if d.strftime("%Y-%m-%d") == trade['date']:
day_idx = j
break
if day_idx is not None and day_idx < len(self.daily_values):
if trade['action'] == 'BUY':
ax1.scatter(day_idx, self.daily_values[day_idx], marker='^',
color='red', s=40, zorder=5)
else:
ax1.scatter(day_idx, self.daily_values[day_idx], marker='v',
color='blue', s=40, zorder=5)
# 2. Drawdown
ax2 = axes[1]
peak = np.maximum.accumulate(self.daily_values)
drawdown = (np.array(self.daily_values) - peak) / peak * 100
ax2.fill_between(range(len(drawdown)), drawdown, 0, color='red', alpha=0.3)
ax2.set_ylabel('Drawdown (%)')
ax2.grid(True, alpha=0.3)
# 3. Daily Returns
ax3 = axes[2]
if self.daily_returns:
colors = ['green' if r >= 0 else 'red' for r in self.daily_returns]
ax3.bar(range(len(self.daily_returns)), [r * 100 for r in self.daily_returns],
color=colors, alpha=0.5, width=1)
ax3.set_ylabel('Daily Return (%)')
ax3.set_xlabel('Trading Days')
ax3.grid(True, alpha=0.3)
plt.tight_layout()
chart_path = f"data/backtest_{self.ticker}.png"
plt.savefig(chart_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"\n📊 Chart saved: {chart_path}")
except Exception as e:
print(f"⚠️ Chart generation failed: {e}")
# 최근 5개 거래 로그
print("📝 Recent Trades:")
for trade in self.trade_log[-5:]:
action_emoji = "🔴" if trade['action'] == "BUY" else "🔵"
print(f"{trade['date']} {action_emoji} {trade['action']} {trade['qty']}ea @ {trade['price']:,.0f} | {trade.get('reason', '')}")
if __name__ == "__main__": if __name__ == "__main__":
# 삼성전자(005930), 6개월 백테스팅 print("=" * 55)
# 최근 6개월간 로직이 통했는지 검증 print("🚀 AI Trading Backtester v2.0")
# (종목 코드는 KOSPI: 코드, KOSDAQ: 코드) print("=" * 55)
backtester = Backtester("005930", start_date="2023-06-01", end_date="2024-01-01")
# 삼성전자 6개월 백테스팅
backtester = Backtester("005930", start_date="2025-06-01", end_date="2026-02-01")
backtester.run() backtester.run()

View File

@@ -138,13 +138,19 @@ class PricePredictor:
else: else:
print("[AI] No CUDA GPU detected. Running on CPU.") print("[AI] No CUDA GPU detected. Running on CPU.")
self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.0005, weight_decay=1e-4) self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.001, weight_decay=1e-4)
# [v2.0] Learning Rate Scheduler (ReduceLROnPlateau: val_loss 정체 시 lr 감소)
self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=False
)
self.scaler_amp = torch.amp.GradScaler('cuda') if self.use_amp else None self.scaler_amp = torch.amp.GradScaler('cuda') if self.use_amp else None
self.batch_size = 64 self.batch_size = 64
self.max_epochs = 200 self.max_epochs = 200
self.seq_length = 60 self.seq_length = 60
self.patience = 15 self.patience = 15
# [v2.0] Gradient Clipping 값 (exploding gradient 방지)
self.max_grad_norm = 1.0
self.training_status = { self.training_status = {
"is_training": False, "is_training": False,
@@ -237,12 +243,19 @@ class PricePredictor:
max_epochs = 50 if has_checkpoint else self.max_epochs max_epochs = 50 if has_checkpoint else self.max_epochs
# 4. 학습 (전체 데이터 GPU 상주, DataLoader 미사용) # 4. 학습 (전체 데이터 GPU 상주, DataLoader 미사용)
# [v2.0] LR Scheduler 리셋
self.optimizer.param_groups[0]['lr'] = 0.001 if not has_checkpoint else 0.0005
self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=False
)
self.model.train() self.model.train()
self.training_status["is_training"] = True self.training_status["is_training"] = True
if ticker: if ticker:
self.training_status["current_ticker"] = ticker self.training_status["current_ticker"] = ticker
best_val_loss = float('inf') best_val_loss = float('inf')
best_model_state = None # [v2.0] Best Model 저장
patience_counter = 0 patience_counter = 0
final_loss = 0.0 final_loss = 0.0
actual_epochs = 0 actual_epochs = 0
@@ -268,12 +281,17 @@ class PricePredictor:
outputs = self.model(batch_x) outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y) loss = self.criterion(outputs, batch_y)
self.scaler_amp.scale(loss).backward() self.scaler_amp.scale(loss).backward()
# [v2.0] Gradient Clipping (AMP 호환)
self.scaler_amp.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
self.scaler_amp.step(self.optimizer) self.scaler_amp.step(self.optimizer)
self.scaler_amp.update() self.scaler_amp.update()
else: else:
outputs = self.model(batch_x) outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y) loss = self.criterion(outputs, batch_y)
loss.backward() loss.backward()
# [v2.0] Gradient Clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
self.optimizer.step() self.optimizer.step()
epoch_loss += loss.item() epoch_loss += loss.item()
@@ -293,17 +311,26 @@ class PricePredictor:
val_loss = self.criterion(val_out, y_val).item() val_loss = self.criterion(val_out, y_val).item()
self.model.train() self.model.train()
# [v2.0] LR Scheduler step (val_loss 기반)
self.lr_scheduler.step(val_loss)
final_loss = train_loss final_loss = train_loss
actual_epochs = epoch + 1 actual_epochs = epoch + 1
if val_loss < best_val_loss: if val_loss < best_val_loss:
best_val_loss = val_loss best_val_loss = val_loss
patience_counter = 0 patience_counter = 0
# [v2.0] Best model 상태 저장 (overfitting 방지)
best_model_state = {k: v.clone() for k, v in self.model.state_dict().items()}
else: else:
patience_counter += 1 patience_counter += 1
if patience_counter >= self.patience: if patience_counter >= self.patience:
break break
# [v2.0] Best model 복원 (early stopping 후 최적 상태로 복구)
if best_model_state:
self.model.load_state_dict(best_model_state)
self.training_status["is_training"] = False self.training_status["is_training"] = False
self.training_status["loss"] = final_loss self.training_status["loss"] = final_loss
@@ -346,7 +373,30 @@ class PricePredictor:
current_price = prices[-1] current_price = prices[-1]
trend = "UP" if predicted_price > current_price else "DOWN" trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100 change_rate = ((predicted_price - current_price) / current_price) * 100
confidence = 1.0 / (1.0 + (final_loss * 100))
# [v2.0] 개선된 신뢰도 계산
# 1. 학습 손실 기반 (낮을수록 좋음)
loss_confidence = 1.0 / (1.0 + (best_val_loss * 50))
# 2. Train/Val 괴리도 (overfitting 감지)
overfit_ratio = final_loss / (best_val_loss + 1e-9)
if overfit_ratio < 0.5:
# Train loss가 Val loss보다 훨씬 낮음 = overfitting
overfit_penalty = 0.7
elif overfit_ratio > 2.0:
# Train loss가 Val loss보다 훨씬 높음 = underfitting
overfit_penalty = 0.8
else:
overfit_penalty = 1.0
# 3. 에포크 수 기반 (너무 적거나 많으면 불신)
epoch_factor = 1.0
if actual_epochs < 10:
epoch_factor = 0.6 # 학습 부족
elif actual_epochs >= max_epochs:
epoch_factor = 0.8 # 수렴 실패
confidence = min(0.95, loss_confidence * overfit_penalty * epoch_factor)
return { return {
"current": current_price, "current": current_price,
@@ -354,9 +404,11 @@ class PricePredictor:
"change_rate": round(change_rate, 2), "change_rate": round(change_rate, 2),
"trend": trend, "trend": trend,
"loss": final_loss, "loss": final_loss,
"val_loss": best_val_loss,
"confidence": round(confidence, 2), "confidence": round(confidence, 2),
"epochs": actual_epochs, "epochs": actual_epochs,
"device": str(self.device) "device": str(self.device),
"lr": self.optimizer.param_groups[0]['lr']
} }
def batch_predict(self, prices_dict): def batch_predict(self, prices_dict):

View File

@@ -49,22 +49,34 @@ class MacroAnalyzer:
results[name] = {"price": 0, "change": 0} results[name] = {"price": 0, "change": 0}
# [신규] 시장 스트레스 지수(MSI) 추가 # [신규] 시장 스트레스 지수(MSI) 추가
time.sleep(0.6) # MSI 계산 전 추가 대기 time.sleep(0.6)
kospi_stress = MacroAnalyzer.calculate_stress_index(kis_client, "0001") kospi_stress = MacroAnalyzer.calculate_stress_index(kis_client, "0001")
results['MSI'] = kospi_stress results['MSI'] = kospi_stress
print(f" - Market Stress Index: {kospi_stress}") print(f" - Market Stress Index: {kospi_stress}")
if kospi_stress >= 50: if kospi_stress >= 50:
risk_score += 2 # 매우 위험 risk_score += 2
elif kospi_stress >= 30: elif kospi_stress >= 30:
risk_score += 1 # 위험 risk_score += 1
# [v2.0] KOSPI/KOSDAQ 연동 위험도 (둘 다 하락 시 더 위험)
kospi_change = results.get('KOSPI', {}).get('change', 0)
kosdaq_change = results.get('KOSDAQ', {}).get('change', 0)
if kospi_change <= -1.0 and kosdaq_change <= -1.0:
risk_score += 1 # 양대 지수 동반 하락
print(f" ⚠️ Both KOSPI({kospi_change}%) & KOSDAQ({kosdaq_change}%) declining!")
# [v2.0] 급반등 감지 (전일 급락 후 반등 = 불안정)
if kospi_change >= 2.0 and kospi_stress >= 30:
risk_score = max(risk_score, 1) # 급반등이지만 스트레스 높으면 CAUTION 유지
print(f" 📈 Sharp rebound detected but MSI still elevated")
# 시장 상태 정의 # 시장 상태 정의
status = "SAFE" status = "SAFE"
if risk_score >= 2: if risk_score >= 3:
status = "DANGER" # 매수 중단 권장 status = "DANGER"
elif risk_score >= 1: elif risk_score >= 1:
status = "CAUTION" # 보수적 매매 status = "CAUTION"
return { return {
"status": status, "status": status,

View File

@@ -5,22 +5,290 @@ class TechnicalAnalyzer:
""" """
Pandas를 활용한 기술적 지표 계산 모듈 Pandas를 활용한 기술적 지표 계산 모듈
CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리 CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리
[v2.0 개선사항]
- ATR(Average True Range): 변동성 기반 동적 손절/익절 산출
- ADX(Average Directional Index): 추세 강도 측정 (방향 아닌 '강도')
- OBV(On Balance Volume): 거래량 기반 매집/분산 감지
- 다중 시간프레임(MTF): 5일/20일/60일 추세 일관성 확인
- VWAP 근사: 거래량가중평균가격
""" """
@staticmethod @staticmethod
def calculate_rsi(prices, period=14): def calculate_rsi(prices, period=14):
"""RSI(Relative Strength Index) 계산""" """RSI(Relative Strength Index) 계산 - Wilder 방식 적용"""
if len(prices) < period: if len(prices) < period:
return 50.0 # 데이터 부족 시 중립 return 50.0
delta = pd.Series(prices).diff() delta = pd.Series(prices).diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() gain = delta.where(delta > 0, 0)
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() loss = -delta.where(delta < 0, 0)
rs = gain / loss # Wilder의 지수이동평균 방식 (더 정확)
avg_gain = gain.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
rs = avg_gain / (avg_loss + 1e-9)
rsi = 100 - (100 / (1 + rs)) rsi = 100 - (100 / (1 + rs))
return rsi.iloc[-1] return rsi.iloc[-1]
@staticmethod
def calculate_atr(prices, high_prices=None, low_prices=None, period=14):
"""ATR(Average True Range) 계산 - 동적 손절/익절의 핵심 지표
Returns:
float: ATR 값 (가격 단위), 0이면 데이터 부족
"""
if len(prices) < period + 1:
return 0.0
close = pd.Series(prices)
if high_prices and len(high_prices) == len(prices):
high = pd.Series(high_prices)
low = pd.Series(low_prices)
else:
# 고가/저가 없으면 종가 기반 추정 (일변동폭 1.5% 가정)
high = close * 1.008
low = close * 0.992
# True Range = max(H-L, |H-Cprev|, |L-Cprev|)
prev_close = close.shift(1)
tr1 = high - low
tr2 = (high - prev_close).abs()
tr3 = (low - prev_close).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
# Wilder's smoothing
atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
return atr.iloc[-1] if not pd.isna(atr.iloc[-1]) else 0.0
@staticmethod
def calculate_adx(prices, high_prices=None, low_prices=None, period=14):
"""ADX(Average Directional Index) - 추세 강도 측정
Returns:
tuple: (adx, plus_di, minus_di)
- ADX > 25: 강한 추세, ADX < 20: 횡보/비추세
- +DI > -DI: 상승 추세, -DI > +DI: 하락 추세
"""
if len(prices) < period * 2:
return 20.0, 50.0, 50.0 # 중립
close = pd.Series(prices)
if high_prices and len(high_prices) == len(prices):
high = pd.Series(high_prices)
low = pd.Series(low_prices)
else:
# 종가 기반 추정
daily_range = close.pct_change().abs().rolling(5).mean().fillna(0.01) * close
high = close + daily_range * 0.5
low = close - daily_range * 0.5
# +DM, -DM
plus_dm = high.diff()
minus_dm = -low.diff()
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
# ATR
prev_close = close.shift(1)
tr = pd.concat([
high - low,
(high - prev_close).abs(),
(low - prev_close).abs()
], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
# +DI, -DI
plus_di = 100 * plus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
minus_di = 100 * minus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
# DX → ADX
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9)
adx = dx.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
return (
adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0,
plus_di.iloc[-1] if not pd.isna(plus_di.iloc[-1]) else 50.0,
minus_di.iloc[-1] if not pd.isna(minus_di.iloc[-1]) else 50.0
)
@staticmethod
def calculate_obv(prices, volume_history):
"""OBV(On Balance Volume) - 스마트머니 매집/분산 감지
Returns:
dict: {
'obv_trend': 'ACCUMULATING' | 'DISTRIBUTING' | 'NEUTRAL',
'obv_divergence': True/False (가격↑ but OBV↓ = 약세 다이버전스)
}
"""
if not volume_history or len(volume_history) < 20 or len(prices) < 20:
return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
close = pd.Series(prices)
volume = pd.Series(volume_history)
# OBV 계산
direction = close.diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
obv = (direction * volume).cumsum()
# OBV 추세 (20일 이동평균 대비)
obv_ma = obv.rolling(20).mean()
obv_current = obv.iloc[-1]
obv_ma_current = obv_ma.iloc[-1]
if pd.isna(obv_ma_current):
return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
# 추세 판단
obv_trend = 'NEUTRAL'
score = 0.0
if obv_current > obv_ma_current * 1.05:
obv_trend = 'ACCUMULATING' # 매집 중
score = 0.1
elif obv_current < obv_ma_current * 0.95:
obv_trend = 'DISTRIBUTING' # 분산 중
score = -0.1
# 다이버전스 감지 (최근 10일)
price_trend = close.iloc[-1] > close.iloc[-10] if len(close) >= 10 else False
obv_price_trend = obv.iloc[-1] > obv.iloc[-10] if len(obv) >= 10 else False
divergence = False
if price_trend and not obv_price_trend:
divergence = True # 약세 다이버전스 (가격↑ OBV↓)
score -= 0.05
elif not price_trend and obv_price_trend:
divergence = True # 강세 다이버전스 (가격↓ OBV↑)
score += 0.05
return {
'obv_trend': obv_trend,
'obv_divergence': divergence,
'score': round(score, 3)
}
@staticmethod
def get_multi_timeframe_trend(prices):
"""다중 시간프레임 추세 일관성 검사
5일(초단기), 20일(단기), 60일(중기) 추세가 일치하면 강한 신호
Returns:
dict: {
'alignment': 'STRONG_BULL' | 'BULL' | 'NEUTRAL' | 'BEAR' | 'STRONG_BEAR',
'score': -1.0 ~ 1.0,
'details': {...}
}
"""
if len(prices) < 60:
return {'alignment': 'NEUTRAL', 'score': 0.0, 'details': {}}
p = pd.Series(prices)
current = p.iloc[-1]
ma5 = p.rolling(5).mean().iloc[-1]
ma20 = p.rolling(20).mean().iloc[-1]
ma60 = p.rolling(60).mean().iloc[-1]
# 추세 방향 점수
trends = []
if current > ma5: trends.append(1)
else: trends.append(-1)
if ma5 > ma20: trends.append(1)
else: trends.append(-1)
if ma20 > ma60: trends.append(1)
else: trends.append(-1)
total = sum(trends)
if total == 3:
alignment = 'STRONG_BULL'
score = 0.15
elif total >= 1:
alignment = 'BULL'
score = 0.05
elif total == -3:
alignment = 'STRONG_BEAR'
score = -0.15
elif total <= -1:
alignment = 'BEAR'
score = -0.05
else:
alignment = 'NEUTRAL'
score = 0.0
return {
'alignment': alignment,
'score': score,
'details': {
'ma5': round(ma5, 1),
'ma20': round(ma20, 1),
'ma60': round(ma60, 1),
'price_vs_ma5': 'above' if current > ma5 else 'below'
}
}
@staticmethod
def calculate_dynamic_sl_tp(prices, high_prices=None, low_prices=None, atr_multiplier_sl=2.0, atr_multiplier_tp=3.0):
"""ATR 기반 동적 손절/익절 계산
변동성에 맞는 적응형 손절/익절 라인 산출
- 변동성 큰 종목: 넓은 손절폭 (whipsaw 방지)
- 변동성 작은 종목: 좁은 손절폭 (빠른 리스크 관리)
Returns:
dict: {
'atr': ATR값,
'atr_pct': ATR% (가격 대비),
'stop_loss_pct': 손절 비율 (%),
'take_profit_pct': 익절 비율 (%),
'trailing_stop_pct': 트레일링 스탑 비율 (%)
}
"""
if len(prices) < 15:
return {
'atr': 0, 'atr_pct': 0,
'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
'trailing_stop_pct': 3.0
}
atr = TechnicalAnalyzer.calculate_atr(prices, high_prices, low_prices)
current_price = prices[-1]
if current_price <= 0:
return {
'atr': 0, 'atr_pct': 0,
'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
'trailing_stop_pct': 3.0
}
atr_pct = (atr / current_price) * 100
# 동적 손절: ATR x 2 (단, 최소 -3%, 최대 -10%)
sl_pct = max(-10.0, min(-3.0, -atr_pct * atr_multiplier_sl))
# 동적 익절: ATR x 3 (단, 최소 +5%, 최대 +25%)
tp_pct = max(5.0, min(25.0, atr_pct * atr_multiplier_tp))
# 트레일링 스탑: ATR x 1.5 (최고가 대비)
trailing_pct = max(2.0, min(8.0, atr_pct * 1.5))
return {
'atr': round(atr, 1),
'atr_pct': round(atr_pct, 2),
'stop_loss_pct': round(sl_pct, 2),
'take_profit_pct': round(tp_pct, 2),
'trailing_stop_pct': round(trailing_pct, 2)
}
@staticmethod @staticmethod
def calculate_ma(prices, period=20): def calculate_ma(prices, period=20):
"""이동평균선(Moving Average) 계산""" """이동평균선(Moving Average) 계산"""
@@ -87,114 +355,140 @@ class TechnicalAnalyzer:
@staticmethod @staticmethod
def get_technical_score(current_price, prices_history, volume_history=None): def get_technical_score(current_price, prices_history, volume_history=None):
""" """
기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (고도화) 기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (v2.0 고도화)
- RSI, 이격도, MACD, Bollinger Bands, Stochastic 종합
- [New] Volume Analysis (Whale Activity) [v2.0 변경점]
- RSI: 25% (30% → 25%, ADX에 비중 이전)
- 이격도: 15% (20% → 15%)
- MACD: 15% (20% → 15%)
- Bollinger: 10% (15% → 10%)
- Stochastic: 10% (15% → 10%)
- ADX 추세강도: 15% (신규)
- MTF 다중시간프레임: 10% (신규)
- OBV/거래량 보너스: ±0.1 (보너스)
""" """
if not prices_history or len(prices_history) < 30: if not prices_history or len(prices_history) < 30:
return 0.5, 50.0 # 데이터 부족 시 중립 return 0.5, 50.0, 0.0, 1.0, {"ma20": 0, "ma114": 0, "trend": "Unknown", "position": "Unknown"}
scores = [] scores = []
# 1. RSI (비중 30%) # 1. RSI (비중 25%)
# 30 이하(과매도) -> 1.0, 70 이상(과매수) -> 0.0
rsi = TechnicalAnalyzer.calculate_rsi(prices_history) rsi = TechnicalAnalyzer.calculate_rsi(prices_history)
if rsi <= 30: rsi_score = 1.0 if rsi <= 30: rsi_score = 1.0
elif rsi >= 70: rsi_score = 0.0 elif rsi >= 70: rsi_score = 0.0
else: rsi_score = 1.0 - ((rsi - 30) / 40.0) # 선형 보간 else: rsi_score = 1.0 - ((rsi - 30) / 40.0)
scores.append(rsi_score * 0.3) scores.append(rsi_score * 0.25)
# 2. 이격도 (비중 20%) # 2. 이격도 (비중 15%)
ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20) ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20)
disparity = (current_price - ma20) / ma20 disparity = (current_price - ma20) / (ma20 + 1e-9)
# 이격도가 마이너스일수록(저평가) 점수 높음 if disparity < -0.05: disp_score = 1.0
if disparity < -0.05: disp_score = 1.0 # -5% 이상 하락 elif disparity > 0.05: disp_score = 0.0
elif disparity > 0.05: disp_score = 0.0 # +5% 이상 상승 else: disp_score = 0.5 - (disparity * 10)
else: disp_score = 0.5 - (disparity * 10) # -0.05~0.05 사이 scores.append(disp_score * 0.15)
scores.append(disp_score * 0.2)
# 3. MACD (비중 20%) # 3. MACD (비중 15%)
# MACD가 Signal선 위에 있으면 상승세 (매수)
macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history) macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history)
if hist > 0 and macd > 0: macd_score = 0.8 # 상승 추세 가속 if hist > 0 and macd > 0: macd_score = 0.8
elif hist > 0 and macd <= 0: macd_score = 0.6 # 상승 반전 초기 elif hist > 0 and macd <= 0: macd_score = 0.65 # 골든크로스 초기 = 매수 기회
elif hist < 0 and macd > 0: macd_score = 0.4 # 하락 반전 초기 elif hist < 0 and macd > 0: macd_score = 0.35 # 데드크로스 초기
else: macd_score = 0.2 # 하락 추세 else: macd_score = 0.2
scores.append(macd_score * 0.2) scores.append(macd_score * 0.15)
# 4. Bollinger Bands (비중 15%) # 4. Bollinger Bands (비중 10%)
# 하단 밴드 근처 -> 매수(1.0), 상단 밴드 근처 -> 매도(0.0)
up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history) up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history)
if current_price <= low: bb_score = 1.0 if current_price <= low:
bb_score_base = 0.0 bb_score = 1.0
if current_price <= low: bb_score_base = 1.0 elif current_price >= up:
elif current_price >= up: bb_score_base = 0.0 bb_score = 0.0
else: else:
# 밴드 내 위치 비율 (Position %B) 유사 계산
# 하단(0) ~ 상단(1) -> 점수는 1 ~ 0 역순
pos = (current_price - low) / (up - low + 1e-9) pos = (current_price - low) / (up - low + 1e-9)
bb_score_base = 1.0 - pos bb_score = 1.0 - pos
if current_price < low:
bb_score = min(1.0, bb_score + 0.2)
scores.append(bb_score * 0.10)
# 추가 점수 로직 (기존 tech_score += 0.2를 bb_score에 반영) # 5. Stochastic (비중 10%)
if current_price < low: # 과매도 (저점 매수 기회)
bb_score = min(1.0, bb_score_base + 0.2) # 최대 1.0
else:
bb_score = bb_score_base
scores.append(bb_score * 0.15)
# 5. Stochastic (비중 15%)
# K가 20 미만 -> 과매도(매수), 80 이상 -> 과매수(매도)
slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history) slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history)
st_score_base = 0.0 if slow_k < 20: st_score = 1.0
if slow_k < 20: st_score_base = 1.0 elif slow_k > 80: st_score = 0.0
elif slow_k > 80: st_score_base = 0.0 else: st_score = 1.0 - (slow_k / 100.0)
else: st_score_base = 1.0 - (slow_k / 100.0) # 골든/데드크로스 보정
if slow_k < 20 and slow_k > slow_d: # 과매도 영역에서 골든크로스
st_score = min(1.0, st_score + 0.15)
elif slow_k > 80 and slow_k < slow_d: # 과매수 영역에서 데드크로스
st_score = max(0.0, st_score - 0.15)
scores.append(st_score * 0.10)
# 추가 점수 로직 (기존 tech_score += 0.2 / -= 0.1를 st_score에 반영) # 6. [신규] ADX 추세 강도 (비중 15%)
if slow_k < 20: # 과매도 adx, plus_di, minus_di = TechnicalAnalyzer.calculate_adx(prices_history)
st_score = min(1.0, st_score_base + 0.2) if adx >= 25: # 강한 추세
elif slow_k > 80: # 과매수 if plus_di > minus_di:
st_score = max(0.0, st_score_base - 0.1) adx_score = 0.8 + min(0.2, (adx - 25) / 50) # 강한 상승추세
else: else:
st_score = st_score_base adx_score = 0.2 - min(0.2, (adx - 25) / 50) # 강한 하락추세
scores.append(st_score * 0.15) else: # 비추세/횡보
adx_score = 0.5 # 중립
adx_score = max(0.0, min(1.0, adx_score))
scores.append(adx_score * 0.15)
# 7. [신규] 다중 시간프레임 (비중 10%)
mtf = TechnicalAnalyzer.get_multi_timeframe_trend(prices_history)
# MTF score를 0~1 범위로 변환
mtf_score = 0.5 + mtf['score'] # -0.15~+0.15 → 0.35~0.65
mtf_score = max(0.0, min(1.0, mtf_score))
scores.append(mtf_score * 0.10)
total_score = sum(scores) total_score = sum(scores)
# [신규] 거래량 폭증 분석 (Whale Tracking) # [보너스] 거래량 분석 (Whale Tracking + OBV)
volume_ratio = 1.0 volume_ratio = 1.0
if volume_history and len(volume_history) >= 5: if volume_history and len(volume_history) >= 5:
vol_s = pd.Series(volume_history) vol_s = pd.Series(volume_history)
avg_vol = vol_s.rolling(window=5).mean().iloc[-2] # 어제까지의 5일 평균 avg_vol = vol_s.rolling(window=5).mean().iloc[-2]
current_vol = volume_history[-1] current_vol = volume_history[-1]
if avg_vol > 0: if avg_vol > 0:
volume_ratio = current_vol / avg_vol volume_ratio = current_vol / avg_vol
# 평소 거래량의 3배(300%) 이상 터지면 세력 유입 가능성 높음 -> 가산점 # 거래량 폭증 보너스
if volume_ratio >= 3.0: if volume_ratio >= 3.0:
total_score += 0.1 # 강력한 매수 신호 total_score += 0.08
# OBV 분석 보너스
obv_result = TechnicalAnalyzer.calculate_obv(prices_history, volume_history)
total_score += obv_result['score']
# MTF 추세 일관성 보너스 (위의 가중치 10% 외에 추가 보너스)
if mtf['alignment'] == 'STRONG_BULL':
total_score += 0.05
elif mtf['alignment'] == 'STRONG_BEAR':
total_score -= 0.05
# 0.0 ~ 1.0 클리핑 # 0.0 ~ 1.0 클리핑
total_score = max(0.0, min(1.0, total_score)) total_score = max(0.0, min(1.0, total_score))
# [신규] 변동성(Volatility) 계산 # 변동성(Volatility) 계산
# 최근 20일간 일일 변동폭의 표준편차를 평균 가격으로 나눔
if len(prices_history) > 1: if len(prices_history) > 1:
# list 입력 대응
prices_np = np.array(prices_history) prices_np = np.array(prices_history)
changes = np.diff(prices_np) / prices_np[:-1] changes = np.diff(prices_np) / prices_np[:-1]
volatility = np.std(changes) * 100 # 퍼센트 단위 volatility = np.std(changes) * 100
else: else:
volatility = 0.0 volatility = 0.0
# [신규] 이동평균선 분석 (20일, 114일) # 이동평균선 분석 (5일, 20일, 60일, 114일)
ma5 = TechnicalAnalyzer.calculate_ma(prices_history, 5)
ma60 = TechnicalAnalyzer.calculate_ma(prices_history, 60)
ma114 = TechnicalAnalyzer.calculate_ma(prices_history, 114) ma114 = TechnicalAnalyzer.calculate_ma(prices_history, 114)
ma_trend = "Unknown" ma_trend = "Unknown"
if ma20 > ma114: if ma5 > ma20 > ma60:
ma_trend = "Bullish (Golden Alignment)" # 정배열 ma_trend = "Bullish (Golden Alignment)"
elif ma5 < ma20 < ma60:
ma_trend = "Bearish (Dead Alignment)"
elif ma20 > ma114:
ma_trend = "Moderate Bullish"
else: else:
ma_trend = "Bearish (Dead Alignment)" # 역배열 ma_trend = "Moderate Bearish"
price_pos = "Unknown" price_pos = "Unknown"
if current_price > ma20: if current_price > ma20:
@@ -203,10 +497,15 @@ class TechnicalAnalyzer:
price_pos = "Below MA20" price_pos = "Below MA20"
ma_info = { ma_info = {
"ma20": ma20, "ma5": round(ma5, 1),
"ma114": ma114, "ma20": round(ma20, 1),
"ma60": round(ma60, 1),
"ma114": round(ma114, 1),
"trend": ma_trend, "trend": ma_trend,
"position": price_pos "position": price_pos,
"adx": round(adx, 1),
"adx_trend": "Strong" if adx >= 25 else "Weak/Sideways",
"mtf_alignment": mtf['alignment']
} }
return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1), ma_info return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1), ma_info

View File

@@ -13,7 +13,7 @@ from modules.services.ollama import OllamaManager
from modules.services.telegram import TelegramMessenger from modules.services.telegram import TelegramMessenger
from modules.analysis.macro import MacroAnalyzer from modules.analysis.macro import MacroAnalyzer
from modules.utils.monitor import SystemMonitor from modules.utils.monitor import SystemMonitor
from modules.strategy.process import analyze_stock_process from modules.strategy.process import analyze_stock_process, calculate_position_size
try: try:
from theme_manager import ThemeManager from theme_manager import ThemeManager
@@ -31,11 +31,21 @@ def init_worker():
class AutoTradingBot: class AutoTradingBot:
"""
[v2.0] 개선된 자동매매 봇
주요 개선사항:
1. ATR 기반 동적 손절/익절 + 트레일링 스탑
2. 변동성 기반 포지션 사이징 (1주 고정 → 동적 수량)
3. 보유종목 분석 기반 매도 (score 기반 SELL 판단)
4. 매크로 상태를 분석 워커에 전달 (동적 임계값)
5. 최고가 추적 (트레일링 스탑용)
6. 상세한 매매 로그 및 텔레그램 알림
"""
def __init__(self, ipc_lock=None, command_queue=None, shutdown_event=None): def __init__(self, ipc_lock=None, command_queue=None, shutdown_event=None):
# 1. 서비스 초기화 # 1. 서비스 초기화
self.kis = KISClient() self.kis = KISClient()
self.news = AsyncNewsCollector() self.news = AsyncNewsCollector()
# GPU 경합 방지: 워커 1개만 사용 (LSTM 학습이 GPU 독점)
self.executor = ProcessPoolExecutor(max_workers=1, initializer=init_worker) self.executor = ProcessPoolExecutor(max_workers=1, initializer=init_worker)
try: try:
list(self.executor.map(lambda x: x, range(1))) list(self.executor.map(lambda x: x, range(1)))
@@ -56,6 +66,13 @@ class AutoTradingBot:
self.watchlist_updated_today = False self.watchlist_updated_today = False
self.report_sent = False self.report_sent = False
# [v2.0] 트레일링 스탑용 최고가 추적
# {ticker: peak_price}
self.peak_prices = {}
# [v2.0] 최근 매크로 상태 캐싱
self.last_macro_status = None
# 4. 프로세스 관리 # 4. 프로세스 관리
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
@@ -113,6 +130,33 @@ class AutoTradingBot:
except Exception: except Exception:
return {} return {}
def _load_peak_prices(self):
"""트레일링 스탑용 최고가 데이터 로드"""
peak_file = os.path.join(Config.DATA_DIR, "peak_prices.json")
if os.path.exists(peak_file):
try:
with open(peak_file, "r", encoding="utf-8") as f:
self.peak_prices = json.load(f)
except Exception:
self.peak_prices = {}
def _save_peak_prices(self):
"""트레일링 스탑용 최고가 데이터 저장"""
peak_file = os.path.join(Config.DATA_DIR, "peak_prices.json")
try:
with open(peak_file, "w", encoding="utf-8") as f:
json.dump(self.peak_prices, f, indent=2)
except Exception:
pass
def _update_peak_price(self, ticker, current_price):
"""보유 종목의 최고가 갱신"""
if ticker not in self.peak_prices:
self.peak_prices[ticker] = current_price
elif current_price > self.peak_prices[ticker]:
self.peak_prices[ticker] = current_price
print(f" 📈 [Peak Updated] {ticker}: {current_price:,.0f}")
def send_daily_report(self): def send_daily_report(self):
if self.report_sent: if self.report_sent:
return return
@@ -120,20 +164,46 @@ class AutoTradingBot:
balance = self.kis.get_balance() balance = self.kis.get_balance()
total_eval = int(balance.get("total_eval", 0)) total_eval = int(balance.get("total_eval", 0))
report = f"📅 <b>[Daily Closing Report]</b>\n" \ deposit = int(balance.get("deposit", 0))
f"💰 <b>Total Asset:</b> <code>{total_eval:,}원</code>\n" \
f"📜 <b>Trades Today:</b> <code>{len(self.daily_trade_history)}건</code>\n\n"
report = (f"📅 <b>[Daily Closing Report]</b>\n"
f"💰 <b>Total Asset:</b> <code>{total_eval:,}원</code>\n"
f"💵 <b>Cash:</b> <code>{deposit:,}원</code>\n"
f"📜 <b>Trades Today:</b> <code>{len(self.daily_trade_history)}건</code>\n\n")
# 매매 내역
if self.daily_trade_history: if self.daily_trade_history:
total_profit = 0
buy_count = 0
sell_count = 0
for trade in self.daily_trade_history: for trade in self.daily_trade_history:
action = trade['action'] action = trade['action']
icon = "🔴" if action == "BUY" else "🔵" icon = "🔴" if action == "BUY" else "🔵"
report += f"{icon} <b>{action}</b> {trade['name']} {trade['qty']}\n" qty = trade.get('qty', 0)
price = trade.get('price', 0)
reason = trade.get('reason', '')
report += f"{icon} <b>{action}</b> {trade['name']} {qty}주 @ {price:,.0f}"
if reason:
report += f" ({reason})"
report += "\n"
if action == "BUY":
buy_count += 1
else:
sell_count += 1
total_profit += trade.get('profit', 0)
report += f"\n📊 매수 {buy_count}건 / 매도 {sell_count}"
if sell_count > 0:
report += f" | 실현손익: <code>{total_profit:,.0f}원</code>"
report += "\n"
# 보유종목 현황
if "holdings" in balance and balance["holdings"]: if "holdings" in balance and balance["holdings"]:
report += "\n📊 <b>[Holdings]</b>\n" report += "\n📊 <b>[Holdings]</b>\n"
for stock in balance["holdings"]: for stock in balance["holdings"]:
yld = float(stock.get('yield', 0)) yld = float(stock.get('yield', 0))
profit_loss = int(stock.get('profit_loss', 0))
if yld > 0: if yld > 0:
icon = "🔴" icon = "🔴"
yld_str = f"+{yld}" yld_str = f"+{yld}"
@@ -144,7 +214,8 @@ class AutoTradingBot:
icon = "" icon = ""
yld_str = f"{yld}" yld_str = f"{yld}"
report += f"{icon} {stock['name']}: <code>{yld_str}%</code>\n" report += (f"{icon} {stock['name']}: <code>{yld_str}%</code> "
f"(<code>{profit_loss:+,}원</code>)\n")
self.messenger.send_message(report) self.messenger.send_message(report)
self.report_sent = True self.report_sent = True
@@ -170,7 +241,6 @@ class AutoTradingBot:
if command == 'restart': if command == 'restart':
self.messenger.send_message("[Bot] Restart requested via Telegram.") self.messenger.send_message("[Bot] Restart requested via Telegram.")
# executor 재시작
self.restart_executor() self.restart_executor()
elif command == 'update_watchlist': elif command == 'update_watchlist':
@@ -189,11 +259,23 @@ class AutoTradingBot:
# 1. 거시경제 분석 # 1. 거시경제 분석
macro_status = MacroAnalyzer.get_macro_status(self.kis) macro_status = MacroAnalyzer.get_macro_status(self.kis)
self.last_macro_status = macro_status
is_crash = False is_crash = False
if macro_status['status'] == 'DANGER': if macro_status['status'] == 'DANGER':
is_crash = True is_crash = True
if not self.is_macro_warning_sent: if not self.is_macro_warning_sent:
self.messenger.send_message("🚨 <b>[MARKET CRASH ALERT]</b> 시장 급락 감지! 매수 중단.") self.messenger.send_message(
"🚨 <b>[MARKET CRASH ALERT]</b>\n"
"시장 급락 감지! 매수 중단, 매도 기준 상향.\n"
f"Risk Score: {macro_status['risk_score']}")
self.is_macro_warning_sent = True
elif macro_status['status'] == 'CAUTION':
if not self.is_macro_warning_sent:
self.messenger.send_message(
"⚠️ <b>[MARKET CAUTION]</b>\n"
"시장 불안정. 보수적 매매 모드 전환.\n"
f"Risk Score: {macro_status['risk_score']}")
self.is_macro_warning_sent = True self.is_macro_warning_sent = True
else: else:
if self.is_macro_warning_sent: if self.is_macro_warning_sent:
@@ -236,6 +318,8 @@ class AutoTradingBot:
self.report_sent = False self.report_sent = False
self.discovered_stocks.clear() self.discovered_stocks.clear()
self.watchlist_updated_today = False self.watchlist_updated_today = False
# 전일 최고가 초기화 (보유하지 않는 종목)
self._load_peak_prices()
# 5. 시스템 감시 (3분 간격) # 5. 시스템 감시 (3분 간격)
self.monitor.check_health() self.monitor.check_health()
@@ -252,52 +336,33 @@ class AutoTradingBot:
# 7. 종목 분석 및 매매 # 7. 종목 분석 및 매매
target_dict = self.load_watchlist() target_dict = self.load_watchlist()
# 보유 종목 리스크 관리 # [v2.0] 잔고 조회 및 보유종목 맵 생성
balance = self.kis.get_balance() balance = self.kis.get_balance()
current_holdings = {} current_holdings = {}
total_eval = int(balance.get("total_eval", 0))
if balance and "holdings" in balance: if balance and "holdings" in balance:
for stock in balance["holdings"]: for stock in balance["holdings"]:
code = stock.get("code") code = stock.get("code")
name = stock.get("name")
qty = int(stock.get("qty", 0)) qty = int(stock.get("qty", 0))
yld = float(stock.get("yield", 0.0)) if qty > 0:
current_holdings[code] = stock
# 최고가 업데이트 (트레일링 스탑용)
current_price = float(stock.get('current_price', 0))
if current_price > 0:
self._update_peak_price(code, current_price)
current_holdings[code] = stock # [v2.0] 보유종목도 분석 대상에 포함 (watchlist에 없어도)
for code in current_holdings:
if qty <= 0: if code not in target_dict:
continue name = current_holdings[code].get('name', 'Unknown')
target_dict[code] = name
action = None print(f"[Bot] Added holding to analysis: {name} ({code})")
reason = ""
if yld <= -5.0:
action = "SELL"
reason = "Stop Loss"
elif yld >= 8.0:
action = "SELL"
reason = "Take Profit"
if action == "SELL":
print(f"[Bot] Risk Management: {reason} - {name} (Qty: {qty}, Yield: {yld}%)")
res = self.kis.sell_stock(code, qty)
if res and res.get("status"):
self.messenger.send_message(
f"🔵 <b>[Risk SELL]</b> {name}\n"
f" Reason: {reason}\n"
f" Qty: {qty}\n"
f" Yield: <code>{yld}%</code>")
self.daily_trade_history.append({
"action": "SELL", "name": name, "qty": qty,
"price": stock.get('current_price'), "yield": yld
})
self.save_trade_history()
# 분석 실행 (병렬 처리) # 분석 실행 (병렬 처리)
analysis_tasks = [] analysis_tasks = []
news_data = await self.news.get_market_news_async() news_data = await self.news.get_market_news_async()
# 실시간 잔고 추적용 변수 (매수 시 차감)
tracking_deposit = int(balance.get("deposit", 0)) tracking_deposit = int(balance.get("deposit", 0))
try: try:
@@ -306,11 +371,23 @@ class AutoTradingBot:
if not prices: if not prices:
continue continue
# 외인 수급 분석
investor_trend = self.kis.get_investor_trend(ticker) investor_trend = self.kis.get_investor_trend(ticker)
# [v2.0] 보유 정보 전달 (분석 워커에서 동적 손절/익절 사용)
holding_info = None
if ticker in current_holdings:
h = current_holdings[ticker]
holding_info = {
'qty': int(h.get('qty', 0)),
'yield': float(h.get('yield', 0.0)),
'purchase_price': float(h.get('purchase_price', 0)),
'current_price': float(h.get('current_price', 0)),
'peak_price': self.peak_prices.get(ticker, float(h.get('current_price', 0)))
}
future = self.executor.submit( future = self.executor.submit(
analyze_stock_process, ticker, prices, news_data, investor_trend) analyze_stock_process, ticker, prices, news_data,
investor_trend, macro_status, holding_info)
analysis_tasks.append(future) analysis_tasks.append(future)
# 결과 처리 # 결과 처리
@@ -318,41 +395,107 @@ class AutoTradingBot:
for future in analysis_tasks: for future in analysis_tasks:
try: try:
res = await loop.run_in_executor(None, future.result) res = await loop.run_in_executor(None, future.result)
ticker_name = target_dict.get(res['ticker'], 'Unknown') ticker = res['ticker']
print(f"[Bot] [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})") ticker_name = target_dict.get(ticker, 'Unknown')
print(f"[Bot] [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})"
f" | SL:{res.get('sl_tp', {}).get('stop_loss_pct', 'N/A')}%"
f" TP:{res.get('sl_tp', {}).get('take_profit_pct', 'N/A')}%")
# ===== 매수 처리 =====
if res['decision'] == "BUY": if res['decision'] == "BUY":
if is_crash: if is_crash:
print(f"[Bot] [Skip Buy] Market DANGER mode - {ticker_name}")
continue continue
current_price = float(res['current_price']) current_price = float(res['current_price'])
if current_price <= 0: if current_price <= 0:
continue continue
qty = 1 # [v2.0] 포지션 사이징 (동적 수량)
qty = calculate_position_size(
total_capital=total_eval if total_eval > 0 else tracking_deposit,
current_price=current_price,
volatility=res.get('volatility', 2.0),
score=res['score'],
ai_confidence=res.get('ai_confidence', 0.5)
)
if qty <= 0:
print(f"[Bot] [Skip Buy] Position size = 0 ({ticker_name})")
continue
required_amount = current_price * qty required_amount = current_price * qty
# 예수금 확인 # 예수금 확인
if tracking_deposit < required_amount: if tracking_deposit < required_amount:
print(f"[Bot] [Skip Buy] 예수금 부족 ({ticker_name}): " # 수량 줄여서 재시도
f"필요 {required_amount:,.0f} > 잔고 {tracking_deposit:,.0f}") qty = int(tracking_deposit / current_price)
continue if qty <= 0:
print(f"[Bot] [Skip Buy] 예수금 부족 ({ticker_name}): "
f"필요 {required_amount:,.0f} > 잔고 {tracking_deposit:,.0f}")
continue
required_amount = current_price * qty
print(f"[Bot] Buying {ticker_name} {qty}ea") print(f"[Bot] Buying {ticker_name} {qty}ea @ ~{current_price:,.0f}")
order = self.kis.buy_stock(res['ticker'], qty) order = self.kis.buy_stock(ticker, qty)
if order.get("status"): if order.get("status"):
self.messenger.send_message( reason = res.get('decision_reason', '')
f"🔴 <b>[BUY]</b> {ticker_name} {qty}\n" sl_tp = res.get('sl_tp', {})
f" Price: <code>{current_price:,.0f}원</code>")
msg = (f"🔴 <b>[BUY]</b> {ticker_name} {qty}\n"
f" Price: <code>{current_price:,.0f}원</code>\n"
f" Score: <code>{res['score']:.2f}</code>\n"
f" SL: <code>{sl_tp.get('stop_loss_pct', -5):.1f}%</code>"
f" | TP: <code>{sl_tp.get('take_profit_pct', 8):.1f}%</code>"
f" | Trail: <code>{sl_tp.get('trailing_stop_pct', 3):.1f}%</code>")
if reason:
msg += f"\n Reason: {reason}"
self.messenger.send_message(msg)
self.daily_trade_history.append({ self.daily_trade_history.append({
"action": "BUY", "name": ticker_name, "action": "BUY", "name": ticker_name,
"qty": qty, "price": current_price "qty": qty, "price": current_price,
"score": res['score'],
"reason": reason
}) })
self.save_trade_history() self.save_trade_history()
tracking_deposit -= required_amount tracking_deposit -= required_amount
elif res['decision'] == "SELL": # 최고가 초기 설정
print(f"[Bot] Selling {ticker_name} (Simulation)") self.peak_prices[ticker] = current_price
self._save_peak_prices()
# ===== 매도 처리 (v2.0 - 분석 기반 매도) =====
elif res['decision'] == "SELL" and ticker in current_holdings:
h = current_holdings[ticker]
qty = int(h.get('qty', 0))
yld = float(h.get('yield', 0.0))
profit_loss = int(h.get('profit_loss', 0))
if qty > 0:
print(f"[Bot] Selling {ticker_name} {qty}ea (Yield: {yld:.1f}%)")
sell_res = self.kis.sell_stock(ticker, qty)
if sell_res and sell_res.get("status"):
reason = res.get('decision_reason', 'AI Signal')
msg = (f"🔵 <b>[SELL]</b> {ticker_name} {qty}\n"
f" Yield: <code>{yld:.1f}%</code>\n"
f" P&L: <code>{profit_loss:+,}원</code>\n"
f" Reason: {reason}")
self.messenger.send_message(msg)
self.daily_trade_history.append({
"action": "SELL", "name": ticker_name,
"qty": qty, "price": float(h.get('current_price', 0)),
"yield": yld, "profit": profit_loss,
"reason": reason
})
self.save_trade_history()
# 최고가 기록 삭제
if ticker in self.peak_prices:
del self.peak_prices[ticker]
self._save_peak_prices()
except BrokenProcessPool: except BrokenProcessPool:
raise raise
@@ -368,15 +511,19 @@ class AutoTradingBot:
print(f"[Bot] Cycle Loop Error: {e}") print(f"[Bot] Cycle Loop Error: {e}")
def loop(self): def loop(self):
print(f"[Bot] Module Started (PID: {os.getpid()})") print(f"[Bot] Module Started (PID: {os.getpid()}) [v2.0]")
self.messenger.send_message("[Bot Started] 리팩토링된 봇이 시작되었습니다.") self.messenger.send_message(
"🚀 [Bot Started v2.0]\n"
"개선사항: 동적 손절/익절, 트레일링 스탑, 포지션 사이징, 분석 기반 매도")
# 최고가 데이터 로드
self._load_peak_prices()
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
while True: while True:
# shutdown 시그널 체크
if self.shutdown_event and self.shutdown_event.is_set(): if self.shutdown_event and self.shutdown_event.is_set():
print("[Bot] Shutdown signal received.") print("[Bot] Shutdown signal received.")
break break
@@ -387,7 +534,6 @@ class AutoTradingBot:
print(f"[Bot] Loop Error: {e}") print(f"[Bot] Loop Error: {e}")
self.messenger.send_message(f"[Bot] Loop Error: {e}") self.messenger.send_message(f"[Bot] Loop Error: {e}")
# 비동기 sleep (shutdown 체크하면서 대기)
for _ in range(60): for _ in range(60):
if self.shutdown_event and self.shutdown_event.is_set(): if self.shutdown_event and self.shutdown_event.is_set():
break break

View File

@@ -18,27 +18,102 @@ def get_predictor():
f" | AMP: {_lstm_predictor.use_amp}") f" | AMP: {_lstm_predictor.use_amp}")
return _lstm_predictor return _lstm_predictor
def analyze_stock_process(ticker, prices, news_items, investor_trend=None):
def calculate_position_size(total_capital, current_price, volatility, score, ai_confidence,
max_per_stock=3000000):
""" """
[CPU Intensive] 기술적 분석 및 AI 판단을 수행하는 함수 [v2.0] 변동성 기반 포지션 사이징 (Modified Kelly Criterion)
(ProcessPoolExecutor에서 실행됨)
핵심 원칙:
1. 변동성이 높으면 → 적은 수량 (리스크 관리)
2. 확신도(score)가 높으면 → 많은 수량 (기회 포착)
3. AI 신뢰도가 높으면 → 가산 비중
4. 절대 한 종목에 전체 자산의 15% 이상 투자하지 않음
Returns:
int: 매수 수량 (0이면 매수 안 함)
"""
if current_price <= 0 or total_capital <= 0:
return 0
# 1. 기본 투자금 (전체 자산의 10%)
base_invest = total_capital * 0.10
# 2. 변동성 조절 계수 (변동성 높을수록 투자금 감소)
# 변동성 1% → 1.0배, 2% → 0.75배, 3% → 0.5배, 5%+ → 0.3배
if volatility <= 1.0:
vol_factor = 1.2 # 안정적 종목은 약간 증가
elif volatility <= 2.0:
vol_factor = 1.0
elif volatility <= 3.0:
vol_factor = 0.7
elif volatility <= 5.0:
vol_factor = 0.45
else:
vol_factor = 0.3 # 고변동 종목
# 3. 확신도 조절 계수 (score가 높을수록 투자금 증가)
# score 0.6 → 0.5배, 0.7 → 1.0배, 0.8 → 1.5배, 0.9+ → 2.0배
if score >= 0.85:
conf_factor = 2.0
elif score >= 0.75:
conf_factor = 1.5
elif score >= 0.65:
conf_factor = 1.0
else:
conf_factor = 0.5
# 4. AI 신뢰도 가산
ai_bonus = 1.0
if ai_confidence >= 0.85:
ai_bonus = 1.3
elif ai_confidence >= 0.7:
ai_bonus = 1.1
# 5. 최종 투자금 계산
invest_amount = base_invest * vol_factor * conf_factor * ai_bonus
# 상한 제한
invest_amount = min(invest_amount, max_per_stock)
invest_amount = min(invest_amount, total_capital * 0.15) # 최대 15%
invest_amount = min(invest_amount, total_capital) # 잔고 초과 방지
# 수량 계산
qty = int(invest_amount / current_price)
return max(0, qty)
def analyze_stock_process(ticker, prices, news_items, investor_trend=None,
macro_status=None, holding_info=None):
"""
[v2.0] 종목 분석 + 매매 판단 (ProcessPoolExecutor에서 실행)
[v2.0 개선사항]
1. ATR 기반 동적 손절/익절 + 트레일링 스탑
2. 포지션 사이징 (변동성 + 확신도 기반)
3. 시장상황별 동적 매수/매도 임계값
4. 보유종목에 대한 분석 기반 매도 판단
5. ADX/OBV/MTF 통합 기술적 분석
6. 강화된 AI 프롬프트 (종목 고유 뉴스 분석)
""" """
try: try:
print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles)...") print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles)...")
# 1. 기술적 지표 계산 # ===== 1. 기술적 지표 계산 =====
current_price = prices[-1] if prices else 0 current_price = prices[-1] if prices else 0
# [수정] 변동성, 거래량 비율, MA 정보 반환 tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(
tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(current_price, prices, volume_history=None) current_price, prices, volume_history=None)
# 2. LSTM 주가 예측 # ===== 2. ATR 기반 동적 손절/익절 =====
# [최적화] 전역 캐시된 Predictor 사용 sl_tp = TechnicalAnalyzer.calculate_dynamic_sl_tp(prices)
# ===== 3. LSTM 주가 예측 =====
lstm_predictor = get_predictor() lstm_predictor = get_predictor()
if lstm_predictor: if lstm_predictor:
lstm_predictor.training_status['current_ticker'] = ticker lstm_predictor.training_status['current_ticker'] = ticker
pred_result = lstm_predictor.train_and_predict(prices, ticker=ticker) pred_result = lstm_predictor.train_and_predict(prices, ticker=ticker)
lstm_score = 0.5 # 중립 lstm_score = 0.5
ai_confidence = 0.5 ai_confidence = 0.5
ai_loss = 1.0 ai_loss = 1.0
@@ -46,44 +121,51 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None):
ai_confidence = pred_result.get('confidence', 0.5) ai_confidence = pred_result.get('confidence', 0.5)
ai_loss = pred_result.get('loss', 1.0) ai_loss = pred_result.get('loss', 1.0)
# 상승/하락 예측에 따라 점수 조정 (신뢰도 반영)
# 최대 5% 변동폭까지 반영
change_magnitude = min(abs(pred_result['change_rate']), 5.0) / 5.0 change_magnitude = min(abs(pred_result['change_rate']), 5.0) / 5.0
if pred_result['trend'] == 'UP': if pred_result['trend'] == 'UP':
# 상승 예측 시: 기본 0.5 + (강도 * 신뢰도 * 0.4) -> 최대 0.9
lstm_score = 0.5 + (change_magnitude * ai_confidence * 0.4) lstm_score = 0.5 + (change_magnitude * ai_confidence * 0.4)
else: else:
# 하락 예측 시: 기본 0.5 - (강도 * 신뢰도 * 0.4) -> 최소 0.1
lstm_score = 0.5 - (change_magnitude * ai_confidence * 0.4) lstm_score = 0.5 - (change_magnitude * ai_confidence * 0.4)
lstm_score = max(0.0, min(1.0, lstm_score)) lstm_score = max(0.0, min(1.0, lstm_score))
# [신규] 수급 분석 (외인/기관) # ===== 4. 수급 분석 (외인/기관) =====
investor_score = 0.0 investor_score = 0.0
frgn_net_buy = 0 frgn_net_buy = 0
orgn_net_buy = 0 orgn_net_buy = 0
consecutive_frgn_buy = 0 consecutive_frgn_buy = 0
consecutive_orgn_buy = 0
if investor_trend: if investor_trend:
# 최근 5일 합산
for day in investor_trend: for day in investor_trend:
frgn_net_buy += day['foreigner'] frgn_net_buy += day['foreigner']
orgn_net_buy += day['institutional'] orgn_net_buy += day['institutional']
if day['foreigner'] > 0: if day['foreigner'] > 0:
consecutive_frgn_buy += 1 consecutive_frgn_buy += 1
if day['institutional'] > 0:
consecutive_orgn_buy += 1
# 외인 수급 점수 (단순화) # 외인 수급 점수 (화)
if frgn_net_buy > 0: if frgn_net_buy > 0:
investor_score += 0.05 investor_score += 0.03
if consecutive_frgn_buy >= 3: if consecutive_frgn_buy >= 3:
investor_score += 0.05 investor_score += 0.04
if consecutive_frgn_buy >= 5:
investor_score += 0.03 # 5일 연속 매수 = 추가 보너스
if investor_score > 0: # 기관 수급 점수 (신규)
print(f" 💰 [Investor] Foreign Buy Detected (Net: {frgn_net_buy})") if orgn_net_buy > 0:
investor_score += 0.02
if consecutive_orgn_buy >= 3:
investor_score += 0.03
# 3. AI 뉴스 분석 # 외인+기관 동시 순매수 = 강력 신호
# pred_result가 None일 경우 기본값 사용 if frgn_net_buy > 0 and orgn_net_buy > 0:
investor_score += 0.03
print(f" 💰 [Investor] Both Foreign & Institutional Buying!")
# ===== 5. AI 뉴스 분석 (강화된 프롬프트) =====
if pred_result: if pred_result:
pred_price = pred_result.get('predicted', 0) pred_price = pred_result.get('predicted', 0)
pred_change = pred_result.get('change_rate', 0) pred_change = pred_result.get('change_rate', 0)
@@ -93,80 +175,180 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None):
ollama = OllamaManager() ollama = OllamaManager()
prompt = f""" prompt = f"""
[System Instruction] [System Instruction]
1. Role: You are a Expert Quant Trader with 20 years of experience. 1. Role: You are a legendary quant trader with 30 years of experience in Korean stock market.
2. Market Data: 2. You MUST analyze the data objectively and respond with a JSON object.
- Technical Score: {tech_score:.2f} (RSI: {rsi:.2f})
- Moving Average: {ma_info['trend']} (Price is {ma_info['position']})
- AI Prediction: {pred_price:.0f} KRW ({pred_change}%)
- AI Confidence: {ai_confidence:.2f} (Loss: {ai_loss:.4f})
- Investor Trend (5 Days): Foreigner Net Buy {frgn_net_buy}, Institutional Net Buy {orgn_net_buy}
3. Strategy:
- If Foreigners are buying AND Trend is UP -> Strong BUY.
- If AI Confidence > 0.8 and Trend is UP -> Strong BUY.
- If MA is Bullish (Golden Alignment) -> Positive Signal.
- If Price is above MA20 -> Support Uptrend.
- If Trend is DOWN -> SELL/AVOID.
4. Task: Analyze the news and combine with market data to decide sentiment.
News Data: {json.dumps(news_items, ensure_ascii=False)} [Market Data for Stock {ticker}]
- Current Price: {current_price:,.0f} KRW
- Technical Score: {tech_score:.3f} (RSI: {rsi:.1f})
- Moving Average: {ma_info['trend']} (Price is {ma_info['position']})
- ADX Trend Strength: {ma_info.get('adx', 20):.1f} ({ma_info.get('adx_trend', 'N/A')})
- Multi-Timeframe: {ma_info.get('mtf_alignment', 'N/A')}
- AI Prediction: {pred_price:.0f} KRW ({pred_change:+.2f}%)
- AI Confidence: {ai_confidence:.2f} (Training Loss: {ai_loss:.4f})
- Volatility: {volatility:.2f}%
- Volume Ratio: {vol_ratio:.1f}x
- ATR Stop Loss: {sl_tp['stop_loss_pct']:.1f}% / Take Profit: {sl_tp['take_profit_pct']:.1f}%
- Investor Trend (5 Days): Foreigner Net Buy {frgn_net_buy}, Institutional Net Buy {orgn_net_buy}
Response (JSON): [Decision Framework]
{{ - Strong BUY signals: Foreigners+Institutions buying, Golden Cross, ADX>25 with bullish trend, AI high confidence UP
"sentiment_score": 0.8, - Moderate BUY: RSI<40 with bullish reversal, Price near Bollinger Lower Band
"reason": "Foreigners buying and Golden Cross detected." - SELL signals: RSI>70, Dead Cross, ADX>25 with bearish trend, Foreigners selling
}} - AVOID/HOLD: ADX<20 (sideways), Mixed signals, Low confidence
"""
[News Data]
{json.dumps(news_items[:5] if news_items else [], ensure_ascii=False)}
[Response Format - JSON Only]
{{"sentiment_score": 0.0 to 1.0, "reason": "Brief analysis reason"}}
"""
ai_resp = ollama.request_inference(prompt) ai_resp = ollama.request_inference(prompt)
sentiment_score = 0.5 sentiment_score = 0.5
ai_reason = ""
try: try:
data = json.loads(ai_resp) data = json.loads(ai_resp)
sentiment_score = float(data.get("sentiment_score", 0.5)) sentiment_score = float(data.get("sentiment_score", 0.5))
except: sentiment_score = max(0.0, min(1.0, sentiment_score)) # 범위 강제
pass ai_reason = data.get("reason", "")
except Exception:
print(f" ⚠️ AI response parse failed, using neutral (0.5)")
# 4. 통합 점수 (동적 가중치) # ===== 6. 통합 점수 (동적 가중치 v2.0) =====
# AI 신뢰도가 높으면 AI 비중을 대폭 상향 # ADX가 높으면 (추세가 강하면) LSTM과 기술적 분석 비중 증가
if ai_confidence >= 0.85: adx_val = ma_info.get('adx', 20)
w_tech, w_news, w_ai = 0.2, 0.2, 0.6
print(f" 🤖 [High Confidence] AI Weight Boosted to 60%") if ai_confidence >= 0.85 and adx_val >= 25:
# 강한 추세 + 높은 AI 신뢰도: AI 최우선
w_tech, w_news, w_ai = 0.15, 0.15, 0.70
print(f" 🤖 [Ultra High Confidence + Strong Trend] AI Weight 70%")
elif ai_confidence >= 0.85:
w_tech, w_news, w_ai = 0.20, 0.20, 0.60
print(f" 🤖 [High Confidence] AI Weight 60%")
elif adx_val >= 30:
# 매우 강한 추세: 기술적 분석 우선
w_tech, w_news, w_ai = 0.50, 0.20, 0.30
print(f" 📊 [Very Strong Trend ADX={adx_val:.0f}] Tech Weight 50%")
elif adx_val < 20:
# 비추세/횡보: 뉴스와 수급 중시
w_tech, w_news, w_ai = 0.30, 0.40, 0.30
print(f" 📰 [Sideways ADX={adx_val:.0f}] News Weight 40%")
else: else:
w_tech, w_news, w_ai = 0.4, 0.3, 0.3 w_tech, w_news, w_ai = 0.35, 0.30, 0.35
total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score) total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score)
# [수신] 수급 가산점 추가 (최대 +0.1) # 수급 가산점 (최대 +0.15)
total_score += investor_score total_score += min(investor_score, 0.15)
total_score = min(total_score, 1.0) total_score = min(total_score, 1.0)
# ===== 7. 시장 상황별 동적 임계값 =====
buy_threshold = 0.60
sell_threshold = 0.30
if macro_status:
macro_state = macro_status.get('status', 'SAFE')
if macro_state == 'DANGER':
buy_threshold = 999.0 # 매수 완전 차단
sell_threshold = 0.45 # 매도 기준 상향 (빨리 탈출)
print(f" 🚨 [DANGER Market] Buy BLOCKED, Sell threshold raised to 0.45")
elif macro_state == 'CAUTION':
buy_threshold = 0.72 # 매수 기준 대폭 상향 (보수적)
sell_threshold = 0.38 # 매도 기준도 상향
print(f" ⚠️ [CAUTION Market] Buy threshold raised to 0.72")
# ===== 8. 매매 결정 =====
decision = "HOLD" decision = "HOLD"
decision_reason = ""
# [신규] 강한 단일 신호 매수 로직 (기준 강화) # --- 보유 종목 분석 기반 매도 (신규) ---
strong_signal = False if holding_info:
strong_reason = "" holding_yield = holding_info.get('yield', 0.0)
holding_qty = holding_info.get('qty', 0)
peak_price = holding_info.get('peak_price', current_price)
if tech_score >= 0.80: if holding_qty > 0:
strong_signal = True # A. 동적 손절 (ATR 기반)
strong_reason = "Super Strong Technical" if holding_yield <= sl_tp['stop_loss_pct']:
elif lstm_score >= 0.80 and ai_confidence >= 0.8: decision = "SELL"
strong_signal = True decision_reason = f"Dynamic Stop Loss ({holding_yield:.1f}% <= {sl_tp['stop_loss_pct']:.1f}%)"
strong_reason = f"High Confidence AI Buy (Conf: {ai_confidence})"
elif sentiment_score >= 0.85:
strong_signal = True
strong_reason = "Strong News Sentiment"
elif investor_score >= 0.1 and total_score >= 0.6: # 외인 수급이 좋고 전체 점수 양호
strong_signal = True
strong_reason = "Strong Foreigner Buying"
if strong_signal: # B. 동적 익절 (ATR 기반)
decision = "BUY" elif holding_yield >= sl_tp['take_profit_pct']:
print(f" 🎯 [{strong_reason}] Overriding to BUY!") decision = "SELL"
elif total_score >= 0.60: # (0.5 -> 0.6 상향 조정으로 보수적 접근) decision_reason = f"Dynamic Take Profit ({holding_yield:.1f}% >= {sl_tp['take_profit_pct']:.1f}%)"
decision = "BUY"
elif total_score <= 0.30:
decision = "SELL"
print(f" └─ Scores: Tech={tech_score:.2f} News={sentiment_score:.2f} LSTM={lstm_score:.2f} → Total={total_score:.2f} [{decision}]") # C. 트레일링 스탑 (최고가 대비 하락)
elif peak_price > 0:
drop_from_peak = ((current_price - peak_price) / peak_price) * 100
if drop_from_peak <= -sl_tp['trailing_stop_pct'] and holding_yield > 2.0:
# 수익 상태에서만 트레일링 스탑 작동 (2% 이상 수익 확보)
decision = "SELL"
decision_reason = (f"Trailing Stop ({drop_from_peak:.1f}% from peak, "
f"threshold: -{sl_tp['trailing_stop_pct']:.1f}%)")
# D. 분석 기반 매도 (점수가 매도 임계값 이하)
if decision == "HOLD" and total_score <= sell_threshold:
decision = "SELL"
decision_reason = f"Analysis Signal (Score: {total_score:.2f} <= {sell_threshold:.2f})"
# E. 추세 반전 매도 (ADX 강한 하락추세)
if decision == "HOLD" and adx_val >= 30:
plus_di = ma_info.get('adx', 0) # 참고용
mtf_align = ma_info.get('mtf_alignment', '')
if mtf_align == 'STRONG_BEAR' and holding_yield < 0:
decision = "SELL"
decision_reason = f"Strong Bear Trend Reversal (MTF: {mtf_align})"
# --- 매수 판단 ---
if decision == "HOLD":
# 강한 단일 신호 매수 (기준 강화)
strong_signal = False
strong_reason = ""
# [강화] 복합 조건 매수 (단일 지표가 아닌 복합 조건)
if tech_score >= 0.75 and lstm_score >= 0.6 and sentiment_score >= 0.6:
strong_signal = True
strong_reason = "Triple Confirmation (Tech+AI+News)"
elif lstm_score >= 0.80 and ai_confidence >= 0.85 and adx_val >= 25:
strong_signal = True
strong_reason = f"High Confidence AI + Strong Trend (ADX={adx_val:.0f})"
elif investor_score >= 0.10 and tech_score >= 0.60 and total_score >= 0.60:
strong_signal = True
strong_reason = "Institutional Buying + Good Fundamentals"
elif ma_info.get('mtf_alignment') == 'STRONG_BULL' and tech_score >= 0.60:
strong_signal = True
strong_reason = f"Strong Multi-Timeframe Bullish + Tech {tech_score:.2f}"
if strong_signal and total_score >= buy_threshold - 0.05:
# 강한 신호는 임계값 약간 완화 허용
decision = "BUY"
decision_reason = strong_reason
print(f" 🎯 [{strong_reason}] → BUY!")
elif total_score >= buy_threshold:
decision = "BUY"
decision_reason = f"Score {total_score:.2f} >= threshold {buy_threshold:.2f}"
# ===== 9. 포지션 사이징 =====
suggested_qty = 0
if decision == "BUY":
# 기본 자산 1000만원으로 가정 (실제 run_cycle에서 덮어씀)
suggested_qty = calculate_position_size(
total_capital=10000000,
current_price=current_price,
volatility=volatility,
score=total_score,
ai_confidence=ai_confidence
)
if suggested_qty == 0:
decision = "HOLD"
decision_reason = "Position size too small"
print(f" └─ Scores: Tech={tech_score:.2f} News={sentiment_score:.2f} "
f"LSTM={lstm_score:.2f} Inv={investor_score:.2f}"
f"Total={total_score:.2f} [{decision}]"
f"{f' ({decision_reason})' if decision_reason else ''}")
return { return {
"ticker": ticker, "ticker": ticker,
@@ -174,23 +356,31 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None):
"tech": tech_score, "tech": tech_score,
"sentiment": sentiment_score, "sentiment": sentiment_score,
"lstm_score": lstm_score, "lstm_score": lstm_score,
"investor_score": investor_score,
"volatility": volatility, "volatility": volatility,
"volume_ratio": vol_ratio, "volume_ratio": vol_ratio,
"prediction": pred_result, "prediction": pred_result,
"decision": decision, "decision": decision,
"decision_reason": decision_reason,
"current_price": current_price, "current_price": current_price,
"ma_info": ma_info "ma_info": ma_info,
"sl_tp": sl_tp,
"suggested_qty": suggested_qty,
"ai_confidence": ai_confidence,
"ai_reason": ai_reason
} }
except Exception as e: except Exception as e:
print(f"❌ [Worker Error] Failed to analyze {ticker}: {e}") print(f"❌ [Worker Error] Failed to analyze {ticker}: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
# 기본 실패 응답 반환 (프로세스 크래시 방지)
return { return {
"ticker": ticker, "ticker": ticker,
"score": 0.0, "score": 0.0,
"decision": "HOLD", "decision": "HOLD",
"decision_reason": f"Error: {str(e)}",
"current_price": 0, "current_price": 0,
"sl_tp": {'stop_loss_pct': -5.0, 'take_profit_pct': 8.0, 'trailing_stop_pct': 3.0},
"suggested_qty": 0,
"error": str(e) "error": str(e)
} }