import pandas as pd import numpy as np class TechnicalAnalyzer: """ Pandas를 활용한 기술적 지표 계산 모듈 CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리 [v2.0 개선사항] - ATR(Average True Range): 변동성 기반 동적 손절/익절 산출 - ADX(Average Directional Index): 추세 강도 측정 (방향 아닌 '강도') - OBV(On Balance Volume): 거래량 기반 매집/분산 감지 - 다중 시간프레임(MTF): 5일/20일/60일 추세 일관성 확인 - VWAP 근사: 거래량가중평균가격 """ @staticmethod def calculate_rsi(prices, period=14): """RSI(Relative Strength Index) 계산 - Wilder 방식 적용""" if len(prices) < period: return 50.0 delta = pd.Series(prices).diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) # Wilder의 지수이동평균 방식 (더 정확) avg_gain = gain.ewm(alpha=1/period, min_periods=period, adjust=False).mean() avg_loss = loss.ewm(alpha=1/period, min_periods=period, adjust=False).mean() rs = avg_gain / (avg_loss + 1e-9) rsi = 100 - (100 / (1 + rs)) return rsi.iloc[-1] @staticmethod def calculate_atr(prices, high_prices=None, low_prices=None, period=14): """ATR(Average True Range) 계산 - 동적 손절/익절의 핵심 지표 Returns: float: ATR 값 (가격 단위), 0이면 데이터 부족 """ if len(prices) < period + 1: return 0.0 close = pd.Series(prices) if high_prices and len(high_prices) == len(prices): high = pd.Series(high_prices) low = pd.Series(low_prices) else: # 고가/저가 없으면 종가 기반 추정 (일변동폭 1.5% 가정) high = close * 1.008 low = close * 0.992 # True Range = max(H-L, |H-Cprev|, |L-Cprev|) prev_close = close.shift(1) tr1 = high - low tr2 = (high - prev_close).abs() tr3 = (low - prev_close).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) # Wilder's smoothing atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean() return atr.iloc[-1] if not pd.isna(atr.iloc[-1]) else 0.0 @staticmethod def calculate_adx(prices, high_prices=None, low_prices=None, period=14): """ADX(Average Directional Index) - 추세 강도 측정 Returns: tuple: (adx, plus_di, minus_di) - ADX > 25: 강한 추세, ADX < 20: 횡보/비추세 - +DI > -DI: 상승 추세, -DI > +DI: 하락 추세 """ if len(prices) < period * 2: return 20.0, 50.0, 50.0 # 중립 close = pd.Series(prices) if high_prices and len(high_prices) == len(prices): high = pd.Series(high_prices) low = pd.Series(low_prices) else: # 종가 기반 추정 daily_range = close.pct_change().abs().rolling(5).mean().fillna(0.01) * close high = close + daily_range * 0.5 low = close - daily_range * 0.5 # +DM, -DM plus_dm = high.diff() minus_dm = -low.diff() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0) # ATR prev_close = close.shift(1) tr = pd.concat([ high - low, (high - prev_close).abs(), (low - prev_close).abs() ], axis=1).max(axis=1) atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean() # +DI, -DI plus_di = 100 * plus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9) minus_di = 100 * minus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9) # DX → ADX dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9) adx = dx.ewm(alpha=1/period, min_periods=period, adjust=False).mean() return ( adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0, plus_di.iloc[-1] if not pd.isna(plus_di.iloc[-1]) else 50.0, minus_di.iloc[-1] if not pd.isna(minus_di.iloc[-1]) else 50.0 ) @staticmethod def calculate_obv(prices, volume_history): """OBV(On Balance Volume) - 스마트머니 매집/분산 감지 Returns: dict: { 'obv_trend': 'ACCUMULATING' | 'DISTRIBUTING' | 'NEUTRAL', 'obv_divergence': True/False (가격↑ but OBV↓ = 약세 다이버전스) } """ if not volume_history or len(volume_history) < 20 or len(prices) < 20: return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0} close = pd.Series(prices) volume = pd.Series(volume_history) # OBV 계산 direction = close.diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0)) obv = (direction * volume).cumsum() # OBV 추세 (20일 이동평균 대비) obv_ma = obv.rolling(20).mean() obv_current = obv.iloc[-1] obv_ma_current = obv_ma.iloc[-1] if pd.isna(obv_ma_current): return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0} # 추세 판단 obv_trend = 'NEUTRAL' score = 0.0 if obv_current > obv_ma_current * 1.05: obv_trend = 'ACCUMULATING' # 매집 중 score = 0.1 elif obv_current < obv_ma_current * 0.95: obv_trend = 'DISTRIBUTING' # 분산 중 score = -0.1 # 다이버전스 감지 (최근 10일) price_trend = close.iloc[-1] > close.iloc[-10] if len(close) >= 10 else False obv_price_trend = obv.iloc[-1] > obv.iloc[-10] if len(obv) >= 10 else False divergence = False if price_trend and not obv_price_trend: divergence = True # 약세 다이버전스 (가격↑ OBV↓) score -= 0.05 elif not price_trend and obv_price_trend: divergence = True # 강세 다이버전스 (가격↓ OBV↑) score += 0.05 return { 'obv_trend': obv_trend, 'obv_divergence': divergence, 'score': round(score, 3) } @staticmethod def get_multi_timeframe_trend(prices): """다중 시간프레임 추세 일관성 검사 5일(초단기), 20일(단기), 60일(중기) 추세가 일치하면 강한 신호 Returns: dict: { 'alignment': 'STRONG_BULL' | 'BULL' | 'NEUTRAL' | 'BEAR' | 'STRONG_BEAR', 'score': -1.0 ~ 1.0, 'details': {...} } """ if len(prices) < 60: return {'alignment': 'NEUTRAL', 'score': 0.0, 'details': {}} p = pd.Series(prices) current = p.iloc[-1] ma5 = p.rolling(5).mean().iloc[-1] ma20 = p.rolling(20).mean().iloc[-1] ma60 = p.rolling(60).mean().iloc[-1] # 추세 방향 점수 trends = [] if current > ma5: trends.append(1) else: trends.append(-1) if ma5 > ma20: trends.append(1) else: trends.append(-1) if ma20 > ma60: trends.append(1) else: trends.append(-1) total = sum(trends) if total == 3: alignment = 'STRONG_BULL' score = 0.15 elif total >= 1: alignment = 'BULL' score = 0.05 elif total == -3: alignment = 'STRONG_BEAR' score = -0.15 elif total <= -1: alignment = 'BEAR' score = -0.05 else: alignment = 'NEUTRAL' score = 0.0 return { 'alignment': alignment, 'score': score, 'details': { 'ma5': round(ma5, 1), 'ma20': round(ma20, 1), 'ma60': round(ma60, 1), 'price_vs_ma5': 'above' if current > ma5 else 'below' } } @staticmethod def calculate_dynamic_sl_tp(prices, high_prices=None, low_prices=None, atr_multiplier_sl=2.0, atr_multiplier_tp=3.0): """ATR 기반 동적 손절/익절 계산 변동성에 맞는 적응형 손절/익절 라인 산출 - 변동성 큰 종목: 넓은 손절폭 (whipsaw 방지) - 변동성 작은 종목: 좁은 손절폭 (빠른 리스크 관리) Returns: dict: { 'atr': ATR값, 'atr_pct': ATR% (가격 대비), 'stop_loss_pct': 손절 비율 (%), 'take_profit_pct': 익절 비율 (%), 'trailing_stop_pct': 트레일링 스탑 비율 (%) } """ if len(prices) < 15: return { 'atr': 0, 'atr_pct': 0, 'stop_loss_pct': -5.0, 'take_profit_pct': 8.0, 'trailing_stop_pct': 3.0 } atr = TechnicalAnalyzer.calculate_atr(prices, high_prices, low_prices) current_price = prices[-1] if current_price <= 0: return { 'atr': 0, 'atr_pct': 0, 'stop_loss_pct': -5.0, 'take_profit_pct': 8.0, 'trailing_stop_pct': 3.0 } atr_pct = (atr / current_price) * 100 # 동적 손절: ATR x 2 (단, 최소 -3%, 최대 -10%) sl_pct = max(-10.0, min(-3.0, -atr_pct * atr_multiplier_sl)) # 동적 익절: ATR x 3 (단, 최소 +5%, 최대 +25%) tp_pct = max(5.0, min(25.0, atr_pct * atr_multiplier_tp)) # 트레일링 스탑: ATR x 1.5 (최고가 대비) trailing_pct = max(2.0, min(8.0, atr_pct * 1.5)) return { 'atr': round(atr, 1), 'atr_pct': round(atr_pct, 2), 'stop_loss_pct': round(sl_pct, 2), 'take_profit_pct': round(tp_pct, 2), 'trailing_stop_pct': round(trailing_pct, 2) } @staticmethod def calculate_ma(prices, period=20): """이동평균선(Moving Average) 계산""" if len(prices) < period: return prices[-1] if prices else 0 return pd.Series(prices).rolling(window=period).mean().iloc[-1] @staticmethod def calculate_macd(prices, fast=12, slow=26, signal=9): """MACD (Moving Average Convergence Divergence) 계산""" if len(prices) < slow + signal: return 0, 0, 0 # 데이터 부족 s = pd.Series(prices) ema_fast = s.ewm(span=fast, adjust=False).mean() ema_slow = s.ewm(span=slow, adjust=False).mean() macd = ema_fast - ema_slow signal_line = macd.ewm(span=signal, adjust=False).mean() histogram = macd - signal_line return macd.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1] @staticmethod def calculate_bollinger_bands(prices, period=20, num_std=2): """Bollinger Bands 계산 (상단, 중단, 하단)""" if len(prices) < period: return 0, 0, 0 s = pd.Series(prices) sma = s.rolling(window=period).mean() std = s.rolling(window=period).std() upper = sma + (std * num_std) lower = sma - (std * num_std) return upper.iloc[-1], sma.iloc[-1], lower.iloc[-1] @staticmethod def calculate_stochastic(prices, high_prices=None, low_prices=None, n=14, k=3, d=3): """Stochastic Oscillator (Fast/Slow) 고가/저가 데이터가 없으면 종가(prices)로 추정 계산 """ if len(prices) < n: return 50, 50 close = pd.Series(prices) # 고가/저가 데이터가 별도로 없으면 종가로 대체 (정확도는 떨어짐) high = pd.Series(high_prices) if high_prices else close low = pd.Series(low_prices) if low_prices else close # 최근 n일간 최고가/최저가 highest_high = high.rolling(window=n).max() lowest_low = low.rolling(window=n).min() # Fast %K fast_k = ((close - lowest_low) / (highest_high - lowest_low + 1e-9)) * 100 # Slow %K (= Fast %D) slow_k = fast_k.rolling(window=k).mean() # Slow %D slow_d = slow_k.rolling(window=d).mean() return slow_k.iloc[-1], slow_d.iloc[-1] @staticmethod def get_technical_score(current_price, prices_history, volume_history=None): """ 기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (v2.0 고도화) [v2.0 변경점] - RSI: 25% (30% → 25%, ADX에 비중 이전) - 이격도: 15% (20% → 15%) - MACD: 15% (20% → 15%) - Bollinger: 10% (15% → 10%) - Stochastic: 10% (15% → 10%) - ADX 추세강도: 15% (신규) - MTF 다중시간프레임: 10% (신규) - OBV/거래량 보너스: ±0.1 (보너스) """ if not prices_history or len(prices_history) < 30: return 0.5, 50.0, 0.0, 1.0, {"ma20": 0, "ma114": 0, "trend": "Unknown", "position": "Unknown"} scores = [] # 1. RSI (비중 25%) rsi = TechnicalAnalyzer.calculate_rsi(prices_history) if rsi <= 30: rsi_score = 1.0 elif rsi >= 70: rsi_score = 0.0 else: rsi_score = 1.0 - ((rsi - 30) / 40.0) scores.append(rsi_score * 0.25) # 2. 이격도 (비중 15%) ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20) disparity = (current_price - ma20) / (ma20 + 1e-9) if disparity < -0.05: disp_score = 1.0 elif disparity > 0.05: disp_score = 0.0 else: disp_score = 0.5 - (disparity * 10) scores.append(disp_score * 0.15) # 3. MACD (비중 15%) macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history) if hist > 0 and macd > 0: macd_score = 0.8 elif hist > 0 and macd <= 0: macd_score = 0.65 # 골든크로스 초기 = 매수 기회 elif hist < 0 and macd > 0: macd_score = 0.35 # 데드크로스 초기 else: macd_score = 0.2 scores.append(macd_score * 0.15) # 4. Bollinger Bands (비중 10%) up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history) if current_price <= low: bb_score = 1.0 elif current_price >= up: bb_score = 0.0 else: pos = (current_price - low) / (up - low + 1e-9) bb_score = 1.0 - pos if current_price < low: bb_score = min(1.0, bb_score + 0.2) scores.append(bb_score * 0.10) # 5. Stochastic (비중 10%) slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history) if slow_k < 20: st_score = 1.0 elif slow_k > 80: st_score = 0.0 else: st_score = 1.0 - (slow_k / 100.0) # 골든/데드크로스 보정 if slow_k < 20 and slow_k > slow_d: # 과매도 영역에서 골든크로스 st_score = min(1.0, st_score + 0.15) elif slow_k > 80 and slow_k < slow_d: # 과매수 영역에서 데드크로스 st_score = max(0.0, st_score - 0.15) scores.append(st_score * 0.10) # 6. [신규] ADX 추세 강도 (비중 15%) adx, plus_di, minus_di = TechnicalAnalyzer.calculate_adx(prices_history) if adx >= 25: # 강한 추세 if plus_di > minus_di: adx_score = 0.8 + min(0.2, (adx - 25) / 50) # 강한 상승추세 else: adx_score = 0.2 - min(0.2, (adx - 25) / 50) # 강한 하락추세 else: # 비추세/횡보 adx_score = 0.5 # 중립 adx_score = max(0.0, min(1.0, adx_score)) scores.append(adx_score * 0.15) # 7. [신규] 다중 시간프레임 (비중 10%) mtf = TechnicalAnalyzer.get_multi_timeframe_trend(prices_history) # MTF score를 0~1 범위로 변환 mtf_score = 0.5 + mtf['score'] # -0.15~+0.15 → 0.35~0.65 mtf_score = max(0.0, min(1.0, mtf_score)) scores.append(mtf_score * 0.10) total_score = sum(scores) # [보너스] 거래량 분석 (Whale Tracking + OBV) volume_ratio = 1.0 if volume_history and len(volume_history) >= 5: vol_s = pd.Series(volume_history) avg_vol = vol_s.rolling(window=5).mean().iloc[-2] current_vol = volume_history[-1] if avg_vol > 0: volume_ratio = current_vol / avg_vol # 거래량 폭증 보너스 if volume_ratio >= 3.0: total_score += 0.08 # OBV 분석 보너스 obv_result = TechnicalAnalyzer.calculate_obv(prices_history, volume_history) total_score += obv_result['score'] # MTF 추세 일관성 보너스 (위의 가중치 10% 외에 추가 보너스) if mtf['alignment'] == 'STRONG_BULL': total_score += 0.05 elif mtf['alignment'] == 'STRONG_BEAR': total_score -= 0.05 # 0.0 ~ 1.0 클리핑 total_score = max(0.0, min(1.0, total_score)) # 변동성(Volatility) 계산 if len(prices_history) > 1: prices_np = np.array(prices_history) changes = np.diff(prices_np) / prices_np[:-1] volatility = np.std(changes) * 100 else: volatility = 0.0 # 이동평균선 분석 (5일, 20일, 60일, 114일) ma5 = TechnicalAnalyzer.calculate_ma(prices_history, 5) ma60 = TechnicalAnalyzer.calculate_ma(prices_history, 60) ma114 = TechnicalAnalyzer.calculate_ma(prices_history, 114) ma_trend = "Unknown" if ma5 > ma20 > ma60: ma_trend = "Bullish (Golden Alignment)" elif ma5 < ma20 < ma60: ma_trend = "Bearish (Dead Alignment)" elif ma20 > ma114: ma_trend = "Moderate Bullish" else: ma_trend = "Moderate Bearish" price_pos = "Unknown" if current_price > ma20: price_pos = "Above MA20" else: price_pos = "Below MA20" ma_info = { "ma5": round(ma5, 1), "ma20": round(ma20, 1), "ma60": round(ma60, 1), "ma114": round(ma114, 1), "trend": ma_trend, "position": price_pos, "adx": round(adx, 1), "adx_trend": "Strong" if adx >= 25 else "Weak/Sideways", "mtf_alignment": mtf['alignment'] } return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1), ma_info