import pandas as pd import numpy as np class TechnicalAnalyzer: """ Pandas를 활용한 기술적 지표 계산 모듈 CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리 """ @staticmethod def calculate_rsi(prices, period=14): """RSI(Relative Strength Index) 계산""" if len(prices) < period: return 50.0 # 데이터 부족 시 중립 delta = pd.Series(prices).diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi.iloc[-1] @staticmethod def calculate_ma(prices, period=20): """이동평균선(Moving Average) 계산""" if len(prices) < period: return prices[-1] if prices else 0 return pd.Series(prices).rolling(window=period).mean().iloc[-1] @staticmethod def calculate_macd(prices, fast=12, slow=26, signal=9): """MACD (Moving Average Convergence Divergence) 계산""" if len(prices) < slow + signal: return 0, 0, 0 # 데이터 부족 s = pd.Series(prices) ema_fast = s.ewm(span=fast, adjust=False).mean() ema_slow = s.ewm(span=slow, adjust=False).mean() macd = ema_fast - ema_slow signal_line = macd.ewm(span=signal, adjust=False).mean() histogram = macd - signal_line return macd.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1] @staticmethod def calculate_bollinger_bands(prices, period=20, num_std=2): """Bollinger Bands 계산 (상단, 중단, 하단)""" if len(prices) < period: return 0, 0, 0 s = pd.Series(prices) sma = s.rolling(window=period).mean() std = s.rolling(window=period).std() upper = sma + (std * num_std) lower = sma - (std * num_std) return upper.iloc[-1], sma.iloc[-1], lower.iloc[-1] @staticmethod def calculate_stochastic(prices, high_prices=None, low_prices=None, n=14, k=3, d=3): """Stochastic Oscillator (Fast/Slow) 고가/저가 데이터가 없으면 종가(prices)로 추정 계산 """ if len(prices) < n: return 50, 50 close = pd.Series(prices) # 고가/저가 데이터가 별도로 없으면 종가로 대체 (정확도는 떨어짐) high = pd.Series(high_prices) if high_prices else close low = pd.Series(low_prices) if low_prices else close # 최근 n일간 최고가/최저가 highest_high = high.rolling(window=n).max() lowest_low = low.rolling(window=n).min() # Fast %K fast_k = ((close - lowest_low) / (highest_high - lowest_low + 1e-9)) * 100 # Slow %K (= Fast %D) slow_k = fast_k.rolling(window=k).mean() # Slow %D slow_d = slow_k.rolling(window=d).mean() return slow_k.iloc[-1], slow_d.iloc[-1] @staticmethod def get_technical_score(current_price, prices_history, volume_history=None): """ 기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (고도화됨) - RSI, 이격도, MACD, Bollinger Bands, Stochastic 종합 - [New] Volume Analysis (Whale Activity) """ if not prices_history or len(prices_history) < 30: return 0.5, 50.0 # 데이터 부족 시 중립 scores = [] # 1. RSI (비중 30%) # 30 이하(과매도) -> 1.0, 70 이상(과매수) -> 0.0 rsi = TechnicalAnalyzer.calculate_rsi(prices_history) if rsi <= 30: rsi_score = 1.0 elif rsi >= 70: rsi_score = 0.0 else: rsi_score = 1.0 - ((rsi - 30) / 40.0) # 선형 보간 scores.append(rsi_score * 0.3) # 2. 이격도 (비중 20%) ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20) disparity = (current_price - ma20) / ma20 # 이격도가 마이너스일수록(저평가) 점수 높음 if disparity < -0.05: disp_score = 1.0 # -5% 이상 하락 elif disparity > 0.05: disp_score = 0.0 # +5% 이상 상승 else: disp_score = 0.5 - (disparity * 10) # -0.05~0.05 사이 scores.append(disp_score * 0.2) # 3. MACD (비중 20%) # MACD가 Signal선 위에 있으면 상승세 (매수) macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history) if hist > 0 and macd > 0: macd_score = 0.8 # 상승 추세 가속 elif hist > 0 and macd <= 0: macd_score = 0.6 # 상승 반전 초기 elif hist < 0 and macd > 0: macd_score = 0.4 # 하락 반전 초기 else: macd_score = 0.2 # 하락 추세 scores.append(macd_score * 0.2) # 4. Bollinger Bands (비중 15%) # 하단 밴드 근처 -> 매수(1.0), 상단 밴드 근처 -> 매도(0.0) up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history) if current_price <= low: bb_score = 1.0 bb_score_base = 0.0 if current_price <= low: bb_score_base = 1.0 elif current_price >= up: bb_score_base = 0.0 else: # 밴드 내 위치 비율 (Position %B) 유사 계산 # 하단(0) ~ 상단(1) -> 점수는 1 ~ 0 역순 pos = (current_price - low) / (up - low + 1e-9) bb_score_base = 1.0 - pos # 추가 점수 로직 (기존 tech_score += 0.2를 bb_score에 반영) if current_price < low: # 과매도 (저점 매수 기회) bb_score = min(1.0, bb_score_base + 0.2) # 최대 1.0 else: bb_score = bb_score_base scores.append(bb_score * 0.15) # 5. Stochastic (비중 15%) # K가 20 미만 -> 과매도(매수), 80 이상 -> 과매수(매도) slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history) st_score_base = 0.0 if slow_k < 20: st_score_base = 1.0 elif slow_k > 80: st_score_base = 0.0 else: st_score_base = 1.0 - (slow_k / 100.0) # 추가 점수 로직 (기존 tech_score += 0.2 / -= 0.1를 st_score에 반영) if slow_k < 20: # 과매도 st_score = min(1.0, st_score_base + 0.2) elif slow_k > 80: # 과매수 st_score = max(0.0, st_score_base - 0.1) else: st_score = st_score_base scores.append(st_score * 0.15) total_score = sum(scores) # [신규] 거래량 폭증 분석 (Whale Tracking) volume_ratio = 1.0 if volume_history and len(volume_history) >= 5: vol_s = pd.Series(volume_history) avg_vol = vol_s.rolling(window=5).mean().iloc[-2] # 어제까지의 5일 평균 current_vol = volume_history[-1] if avg_vol > 0: volume_ratio = current_vol / avg_vol # 평소 거래량의 3배(300%) 이상 터지면 세력 유입 가능성 높음 -> 가산점 if volume_ratio >= 3.0: total_score += 0.1 # 강력한 매수 신호 # 0.0 ~ 1.0 클리핑 total_score = max(0.0, min(1.0, total_score)) # [신규] 변동성(Volatility) 계산 # 최근 20일간 일일 변동폭의 표준편차를 평균 가격으로 나눔 if len(prices_history) > 1: # list 입력 대응 prices_np = np.array(prices_history) changes = np.diff(prices_np) / prices_np[:-1] volatility = np.std(changes) * 100 # 퍼센트 단위 else: volatility = 0.0 return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1)