512 lines
18 KiB
Python
512 lines
18 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
|
|
class TechnicalAnalyzer:
|
|
"""
|
|
Pandas를 활용한 기술적 지표 계산 모듈
|
|
CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리
|
|
|
|
[v2.0 개선사항]
|
|
- ATR(Average True Range): 변동성 기반 동적 손절/익절 산출
|
|
- ADX(Average Directional Index): 추세 강도 측정 (방향 아닌 '강도')
|
|
- OBV(On Balance Volume): 거래량 기반 매집/분산 감지
|
|
- 다중 시간프레임(MTF): 5일/20일/60일 추세 일관성 확인
|
|
- VWAP 근사: 거래량가중평균가격
|
|
"""
|
|
|
|
@staticmethod
|
|
def calculate_rsi(prices, period=14):
|
|
"""RSI(Relative Strength Index) 계산 - Wilder 방식 적용"""
|
|
if len(prices) < period:
|
|
return 50.0
|
|
|
|
delta = pd.Series(prices).diff()
|
|
gain = delta.where(delta > 0, 0)
|
|
loss = -delta.where(delta < 0, 0)
|
|
|
|
# Wilder의 지수이동평균 방식 (더 정확)
|
|
avg_gain = gain.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
|
|
avg_loss = loss.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
|
|
|
|
rs = avg_gain / (avg_loss + 1e-9)
|
|
rsi = 100 - (100 / (1 + rs))
|
|
return rsi.iloc[-1]
|
|
|
|
@staticmethod
|
|
def calculate_atr(prices, high_prices=None, low_prices=None, period=14):
|
|
"""ATR(Average True Range) 계산 - 동적 손절/익절의 핵심 지표
|
|
|
|
Returns:
|
|
float: ATR 값 (가격 단위), 0이면 데이터 부족
|
|
"""
|
|
if len(prices) < period + 1:
|
|
return 0.0
|
|
|
|
close = pd.Series(prices)
|
|
|
|
if high_prices and len(high_prices) == len(prices):
|
|
high = pd.Series(high_prices)
|
|
low = pd.Series(low_prices)
|
|
else:
|
|
# 고가/저가 없으면 종가 기반 추정 (일변동폭 1.5% 가정)
|
|
high = close * 1.008
|
|
low = close * 0.992
|
|
|
|
# True Range = max(H-L, |H-Cprev|, |L-Cprev|)
|
|
prev_close = close.shift(1)
|
|
tr1 = high - low
|
|
tr2 = (high - prev_close).abs()
|
|
tr3 = (low - prev_close).abs()
|
|
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
|
|
|
|
# Wilder's smoothing
|
|
atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
|
|
return atr.iloc[-1] if not pd.isna(atr.iloc[-1]) else 0.0
|
|
|
|
@staticmethod
|
|
def calculate_adx(prices, high_prices=None, low_prices=None, period=14):
|
|
"""ADX(Average Directional Index) - 추세 강도 측정
|
|
|
|
Returns:
|
|
tuple: (adx, plus_di, minus_di)
|
|
- ADX > 25: 강한 추세, ADX < 20: 횡보/비추세
|
|
- +DI > -DI: 상승 추세, -DI > +DI: 하락 추세
|
|
"""
|
|
if len(prices) < period * 2:
|
|
return 20.0, 50.0, 50.0 # 중립
|
|
|
|
close = pd.Series(prices)
|
|
|
|
if high_prices and len(high_prices) == len(prices):
|
|
high = pd.Series(high_prices)
|
|
low = pd.Series(low_prices)
|
|
else:
|
|
# 종가 기반 추정
|
|
daily_range = close.pct_change().abs().rolling(5).mean().fillna(0.01) * close
|
|
high = close + daily_range * 0.5
|
|
low = close - daily_range * 0.5
|
|
|
|
# +DM, -DM
|
|
plus_dm = high.diff()
|
|
minus_dm = -low.diff()
|
|
|
|
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
|
|
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
|
|
|
|
# ATR
|
|
prev_close = close.shift(1)
|
|
tr = pd.concat([
|
|
high - low,
|
|
(high - prev_close).abs(),
|
|
(low - prev_close).abs()
|
|
], axis=1).max(axis=1)
|
|
|
|
atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
|
|
|
|
# +DI, -DI
|
|
plus_di = 100 * plus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
|
|
minus_di = 100 * minus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
|
|
|
|
# DX → ADX
|
|
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9)
|
|
adx = dx.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
|
|
|
|
return (
|
|
adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0,
|
|
plus_di.iloc[-1] if not pd.isna(plus_di.iloc[-1]) else 50.0,
|
|
minus_di.iloc[-1] if not pd.isna(minus_di.iloc[-1]) else 50.0
|
|
)
|
|
|
|
@staticmethod
|
|
def calculate_obv(prices, volume_history):
|
|
"""OBV(On Balance Volume) - 스마트머니 매집/분산 감지
|
|
|
|
Returns:
|
|
dict: {
|
|
'obv_trend': 'ACCUMULATING' | 'DISTRIBUTING' | 'NEUTRAL',
|
|
'obv_divergence': True/False (가격↑ but OBV↓ = 약세 다이버전스)
|
|
}
|
|
"""
|
|
if not volume_history or len(volume_history) < 20 or len(prices) < 20:
|
|
return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
|
|
|
|
close = pd.Series(prices)
|
|
volume = pd.Series(volume_history)
|
|
|
|
# OBV 계산
|
|
direction = close.diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
|
|
obv = (direction * volume).cumsum()
|
|
|
|
# OBV 추세 (20일 이동평균 대비)
|
|
obv_ma = obv.rolling(20).mean()
|
|
obv_current = obv.iloc[-1]
|
|
obv_ma_current = obv_ma.iloc[-1]
|
|
|
|
if pd.isna(obv_ma_current):
|
|
return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
|
|
|
|
# 추세 판단
|
|
obv_trend = 'NEUTRAL'
|
|
score = 0.0
|
|
if obv_current > obv_ma_current * 1.05:
|
|
obv_trend = 'ACCUMULATING' # 매집 중
|
|
score = 0.1
|
|
elif obv_current < obv_ma_current * 0.95:
|
|
obv_trend = 'DISTRIBUTING' # 분산 중
|
|
score = -0.1
|
|
|
|
# 다이버전스 감지 (최근 10일)
|
|
price_trend = close.iloc[-1] > close.iloc[-10] if len(close) >= 10 else False
|
|
obv_price_trend = obv.iloc[-1] > obv.iloc[-10] if len(obv) >= 10 else False
|
|
|
|
divergence = False
|
|
if price_trend and not obv_price_trend:
|
|
divergence = True # 약세 다이버전스 (가격↑ OBV↓)
|
|
score -= 0.05
|
|
elif not price_trend and obv_price_trend:
|
|
divergence = True # 강세 다이버전스 (가격↓ OBV↑)
|
|
score += 0.05
|
|
|
|
return {
|
|
'obv_trend': obv_trend,
|
|
'obv_divergence': divergence,
|
|
'score': round(score, 3)
|
|
}
|
|
|
|
@staticmethod
|
|
def get_multi_timeframe_trend(prices):
|
|
"""다중 시간프레임 추세 일관성 검사
|
|
|
|
5일(초단기), 20일(단기), 60일(중기) 추세가 일치하면 강한 신호
|
|
|
|
Returns:
|
|
dict: {
|
|
'alignment': 'STRONG_BULL' | 'BULL' | 'NEUTRAL' | 'BEAR' | 'STRONG_BEAR',
|
|
'score': -1.0 ~ 1.0,
|
|
'details': {...}
|
|
}
|
|
"""
|
|
if len(prices) < 60:
|
|
return {'alignment': 'NEUTRAL', 'score': 0.0, 'details': {}}
|
|
|
|
p = pd.Series(prices)
|
|
current = p.iloc[-1]
|
|
|
|
ma5 = p.rolling(5).mean().iloc[-1]
|
|
ma20 = p.rolling(20).mean().iloc[-1]
|
|
ma60 = p.rolling(60).mean().iloc[-1]
|
|
|
|
# 추세 방향 점수
|
|
trends = []
|
|
if current > ma5: trends.append(1)
|
|
else: trends.append(-1)
|
|
|
|
if ma5 > ma20: trends.append(1)
|
|
else: trends.append(-1)
|
|
|
|
if ma20 > ma60: trends.append(1)
|
|
else: trends.append(-1)
|
|
|
|
total = sum(trends)
|
|
|
|
if total == 3:
|
|
alignment = 'STRONG_BULL'
|
|
score = 0.15
|
|
elif total >= 1:
|
|
alignment = 'BULL'
|
|
score = 0.05
|
|
elif total == -3:
|
|
alignment = 'STRONG_BEAR'
|
|
score = -0.15
|
|
elif total <= -1:
|
|
alignment = 'BEAR'
|
|
score = -0.05
|
|
else:
|
|
alignment = 'NEUTRAL'
|
|
score = 0.0
|
|
|
|
return {
|
|
'alignment': alignment,
|
|
'score': score,
|
|
'details': {
|
|
'ma5': round(ma5, 1),
|
|
'ma20': round(ma20, 1),
|
|
'ma60': round(ma60, 1),
|
|
'price_vs_ma5': 'above' if current > ma5 else 'below'
|
|
}
|
|
}
|
|
|
|
@staticmethod
|
|
def calculate_dynamic_sl_tp(prices, high_prices=None, low_prices=None, atr_multiplier_sl=2.0, atr_multiplier_tp=3.0):
|
|
"""ATR 기반 동적 손절/익절 계산
|
|
|
|
변동성에 맞는 적응형 손절/익절 라인 산출
|
|
- 변동성 큰 종목: 넓은 손절폭 (whipsaw 방지)
|
|
- 변동성 작은 종목: 좁은 손절폭 (빠른 리스크 관리)
|
|
|
|
Returns:
|
|
dict: {
|
|
'atr': ATR값,
|
|
'atr_pct': ATR% (가격 대비),
|
|
'stop_loss_pct': 손절 비율 (%),
|
|
'take_profit_pct': 익절 비율 (%),
|
|
'trailing_stop_pct': 트레일링 스탑 비율 (%)
|
|
}
|
|
"""
|
|
if len(prices) < 15:
|
|
return {
|
|
'atr': 0, 'atr_pct': 0,
|
|
'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
|
|
'trailing_stop_pct': 3.0
|
|
}
|
|
|
|
atr = TechnicalAnalyzer.calculate_atr(prices, high_prices, low_prices)
|
|
current_price = prices[-1]
|
|
|
|
if current_price <= 0:
|
|
return {
|
|
'atr': 0, 'atr_pct': 0,
|
|
'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
|
|
'trailing_stop_pct': 3.0
|
|
}
|
|
|
|
atr_pct = (atr / current_price) * 100
|
|
|
|
# 동적 손절: ATR x 2 (단, 최소 -3%, 최대 -10%)
|
|
sl_pct = max(-10.0, min(-3.0, -atr_pct * atr_multiplier_sl))
|
|
|
|
# 동적 익절: ATR x 3 (단, 최소 +5%, 최대 +25%)
|
|
tp_pct = max(5.0, min(25.0, atr_pct * atr_multiplier_tp))
|
|
|
|
# 트레일링 스탑: ATR x 1.5 (최고가 대비)
|
|
trailing_pct = max(2.0, min(8.0, atr_pct * 1.5))
|
|
|
|
return {
|
|
'atr': round(atr, 1),
|
|
'atr_pct': round(atr_pct, 2),
|
|
'stop_loss_pct': round(sl_pct, 2),
|
|
'take_profit_pct': round(tp_pct, 2),
|
|
'trailing_stop_pct': round(trailing_pct, 2)
|
|
}
|
|
|
|
@staticmethod
|
|
def calculate_ma(prices, period=20):
|
|
"""이동평균선(Moving Average) 계산"""
|
|
if len(prices) < period:
|
|
return prices[-1] if prices else 0
|
|
return pd.Series(prices).rolling(window=period).mean().iloc[-1]
|
|
|
|
@staticmethod
|
|
def calculate_macd(prices, fast=12, slow=26, signal=9):
|
|
"""MACD (Moving Average Convergence Divergence) 계산"""
|
|
if len(prices) < slow + signal:
|
|
return 0, 0, 0 # 데이터 부족
|
|
|
|
s = pd.Series(prices)
|
|
ema_fast = s.ewm(span=fast, adjust=False).mean()
|
|
ema_slow = s.ewm(span=slow, adjust=False).mean()
|
|
macd = ema_fast - ema_slow
|
|
signal_line = macd.ewm(span=signal, adjust=False).mean()
|
|
histogram = macd - signal_line
|
|
|
|
return macd.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1]
|
|
|
|
@staticmethod
|
|
def calculate_bollinger_bands(prices, period=20, num_std=2):
|
|
"""Bollinger Bands 계산 (상단, 중단, 하단)"""
|
|
if len(prices) < period:
|
|
return 0, 0, 0
|
|
|
|
s = pd.Series(prices)
|
|
sma = s.rolling(window=period).mean()
|
|
std = s.rolling(window=period).std()
|
|
|
|
upper = sma + (std * num_std)
|
|
lower = sma - (std * num_std)
|
|
|
|
return upper.iloc[-1], sma.iloc[-1], lower.iloc[-1]
|
|
|
|
@staticmethod
|
|
def calculate_stochastic(prices, high_prices=None, low_prices=None, n=14, k=3, d=3):
|
|
"""Stochastic Oscillator (Fast/Slow)
|
|
고가/저가 데이터가 없으면 종가(prices)로 추정 계산
|
|
"""
|
|
if len(prices) < n:
|
|
return 50, 50
|
|
|
|
close = pd.Series(prices)
|
|
# 고가/저가 데이터가 별도로 없으면 종가로 대체 (정확도는 떨어짐)
|
|
high = pd.Series(high_prices) if high_prices else close
|
|
low = pd.Series(low_prices) if low_prices else close
|
|
|
|
# 최근 n일간 최고가/최저가
|
|
highest_high = high.rolling(window=n).max()
|
|
lowest_low = low.rolling(window=n).min()
|
|
|
|
# Fast %K
|
|
fast_k = ((close - lowest_low) / (highest_high - lowest_low + 1e-9)) * 100
|
|
# Slow %K (= Fast %D)
|
|
slow_k = fast_k.rolling(window=k).mean()
|
|
# Slow %D
|
|
slow_d = slow_k.rolling(window=d).mean()
|
|
|
|
return slow_k.iloc[-1], slow_d.iloc[-1]
|
|
|
|
@staticmethod
|
|
def get_technical_score(current_price, prices_history, volume_history=None):
|
|
"""
|
|
기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (v2.0 고도화)
|
|
|
|
[v2.0 변경점]
|
|
- RSI: 25% (30% → 25%, ADX에 비중 이전)
|
|
- 이격도: 15% (20% → 15%)
|
|
- MACD: 15% (20% → 15%)
|
|
- Bollinger: 10% (15% → 10%)
|
|
- Stochastic: 10% (15% → 10%)
|
|
- ADX 추세강도: 15% (신규)
|
|
- MTF 다중시간프레임: 10% (신규)
|
|
- OBV/거래량 보너스: ±0.1 (보너스)
|
|
"""
|
|
if not prices_history or len(prices_history) < 30:
|
|
return 0.5, 50.0, 0.0, 1.0, {"ma20": 0, "ma114": 0, "trend": "Unknown", "position": "Unknown"}
|
|
|
|
scores = []
|
|
|
|
# 1. RSI (비중 25%)
|
|
rsi = TechnicalAnalyzer.calculate_rsi(prices_history)
|
|
if rsi <= 30: rsi_score = 1.0
|
|
elif rsi >= 70: rsi_score = 0.0
|
|
else: rsi_score = 1.0 - ((rsi - 30) / 40.0)
|
|
scores.append(rsi_score * 0.25)
|
|
|
|
# 2. 이격도 (비중 15%)
|
|
ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20)
|
|
disparity = (current_price - ma20) / (ma20 + 1e-9)
|
|
if disparity < -0.05: disp_score = 1.0
|
|
elif disparity > 0.05: disp_score = 0.0
|
|
else: disp_score = 0.5 - (disparity * 10)
|
|
scores.append(disp_score * 0.15)
|
|
|
|
# 3. MACD (비중 15%)
|
|
macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history)
|
|
if hist > 0 and macd > 0: macd_score = 0.8
|
|
elif hist > 0 and macd <= 0: macd_score = 0.65 # 골든크로스 초기 = 매수 기회
|
|
elif hist < 0 and macd > 0: macd_score = 0.35 # 데드크로스 초기
|
|
else: macd_score = 0.2
|
|
scores.append(macd_score * 0.15)
|
|
|
|
# 4. Bollinger Bands (비중 10%)
|
|
up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history)
|
|
if current_price <= low:
|
|
bb_score = 1.0
|
|
elif current_price >= up:
|
|
bb_score = 0.0
|
|
else:
|
|
pos = (current_price - low) / (up - low + 1e-9)
|
|
bb_score = 1.0 - pos
|
|
if current_price < low:
|
|
bb_score = min(1.0, bb_score + 0.2)
|
|
scores.append(bb_score * 0.10)
|
|
|
|
# 5. Stochastic (비중 10%)
|
|
slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history)
|
|
if slow_k < 20: st_score = 1.0
|
|
elif slow_k > 80: st_score = 0.0
|
|
else: st_score = 1.0 - (slow_k / 100.0)
|
|
# 골든/데드크로스 보정
|
|
if slow_k < 20 and slow_k > slow_d: # 과매도 영역에서 골든크로스
|
|
st_score = min(1.0, st_score + 0.15)
|
|
elif slow_k > 80 and slow_k < slow_d: # 과매수 영역에서 데드크로스
|
|
st_score = max(0.0, st_score - 0.15)
|
|
scores.append(st_score * 0.10)
|
|
|
|
# 6. [신규] ADX 추세 강도 (비중 15%)
|
|
adx, plus_di, minus_di = TechnicalAnalyzer.calculate_adx(prices_history)
|
|
if adx >= 25: # 강한 추세
|
|
if plus_di > minus_di:
|
|
adx_score = 0.8 + min(0.2, (adx - 25) / 50) # 강한 상승추세
|
|
else:
|
|
adx_score = 0.2 - min(0.2, (adx - 25) / 50) # 강한 하락추세
|
|
else: # 비추세/횡보
|
|
adx_score = 0.5 # 중립
|
|
adx_score = max(0.0, min(1.0, adx_score))
|
|
scores.append(adx_score * 0.15)
|
|
|
|
# 7. [신규] 다중 시간프레임 (비중 10%)
|
|
mtf = TechnicalAnalyzer.get_multi_timeframe_trend(prices_history)
|
|
# MTF score를 0~1 범위로 변환
|
|
mtf_score = 0.5 + mtf['score'] # -0.15~+0.15 → 0.35~0.65
|
|
mtf_score = max(0.0, min(1.0, mtf_score))
|
|
scores.append(mtf_score * 0.10)
|
|
|
|
total_score = sum(scores)
|
|
|
|
# [보너스] 거래량 분석 (Whale Tracking + OBV)
|
|
volume_ratio = 1.0
|
|
if volume_history and len(volume_history) >= 5:
|
|
vol_s = pd.Series(volume_history)
|
|
avg_vol = vol_s.rolling(window=5).mean().iloc[-2]
|
|
current_vol = volume_history[-1]
|
|
if avg_vol > 0:
|
|
volume_ratio = current_vol / avg_vol
|
|
|
|
# 거래량 폭증 보너스
|
|
if volume_ratio >= 3.0:
|
|
total_score += 0.08
|
|
|
|
# OBV 분석 보너스
|
|
obv_result = TechnicalAnalyzer.calculate_obv(prices_history, volume_history)
|
|
total_score += obv_result['score']
|
|
|
|
# MTF 추세 일관성 보너스 (위의 가중치 10% 외에 추가 보너스)
|
|
if mtf['alignment'] == 'STRONG_BULL':
|
|
total_score += 0.05
|
|
elif mtf['alignment'] == 'STRONG_BEAR':
|
|
total_score -= 0.05
|
|
|
|
# 0.0 ~ 1.0 클리핑
|
|
total_score = max(0.0, min(1.0, total_score))
|
|
|
|
# 변동성(Volatility) 계산
|
|
if len(prices_history) > 1:
|
|
prices_np = np.array(prices_history)
|
|
changes = np.diff(prices_np) / prices_np[:-1]
|
|
volatility = np.std(changes) * 100
|
|
else:
|
|
volatility = 0.0
|
|
|
|
# 이동평균선 분석 (5일, 20일, 60일, 114일)
|
|
ma5 = TechnicalAnalyzer.calculate_ma(prices_history, 5)
|
|
ma60 = TechnicalAnalyzer.calculate_ma(prices_history, 60)
|
|
ma114 = TechnicalAnalyzer.calculate_ma(prices_history, 114)
|
|
|
|
ma_trend = "Unknown"
|
|
if ma5 > ma20 > ma60:
|
|
ma_trend = "Bullish (Golden Alignment)"
|
|
elif ma5 < ma20 < ma60:
|
|
ma_trend = "Bearish (Dead Alignment)"
|
|
elif ma20 > ma114:
|
|
ma_trend = "Moderate Bullish"
|
|
else:
|
|
ma_trend = "Moderate Bearish"
|
|
|
|
price_pos = "Unknown"
|
|
if current_price > ma20:
|
|
price_pos = "Above MA20"
|
|
else:
|
|
price_pos = "Below MA20"
|
|
|
|
ma_info = {
|
|
"ma5": round(ma5, 1),
|
|
"ma20": round(ma20, 1),
|
|
"ma60": round(ma60, 1),
|
|
"ma114": round(ma114, 1),
|
|
"trend": ma_trend,
|
|
"position": price_pos,
|
|
"adx": round(adx, 1),
|
|
"adx_trend": "Strong" if adx >= 25 else "Weak/Sideways",
|
|
"mtf_alignment": mtf['alignment']
|
|
}
|
|
|
|
return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1), ma_info
|