191 lines
7.8 KiB
Python
191 lines
7.8 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
|
|
class TechnicalAnalyzer:
|
|
"""
|
|
Pandas를 활용한 기술적 지표 계산 모듈
|
|
CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리
|
|
"""
|
|
|
|
@staticmethod
|
|
def calculate_rsi(prices, period=14):
|
|
"""RSI(Relative Strength Index) 계산"""
|
|
if len(prices) < period:
|
|
return 50.0 # 데이터 부족 시 중립
|
|
|
|
delta = pd.Series(prices).diff()
|
|
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
|
|
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
|
|
|
|
rs = gain / loss
|
|
rsi = 100 - (100 / (1 + rs))
|
|
return rsi.iloc[-1]
|
|
|
|
@staticmethod
|
|
def calculate_ma(prices, period=20):
|
|
"""이동평균선(Moving Average) 계산"""
|
|
if len(prices) < period:
|
|
return prices[-1] if prices else 0
|
|
return pd.Series(prices).rolling(window=period).mean().iloc[-1]
|
|
|
|
@staticmethod
|
|
def calculate_macd(prices, fast=12, slow=26, signal=9):
|
|
"""MACD (Moving Average Convergence Divergence) 계산"""
|
|
if len(prices) < slow + signal:
|
|
return 0, 0, 0 # 데이터 부족
|
|
|
|
s = pd.Series(prices)
|
|
ema_fast = s.ewm(span=fast, adjust=False).mean()
|
|
ema_slow = s.ewm(span=slow, adjust=False).mean()
|
|
macd = ema_fast - ema_slow
|
|
signal_line = macd.ewm(span=signal, adjust=False).mean()
|
|
histogram = macd - signal_line
|
|
|
|
return macd.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1]
|
|
|
|
@staticmethod
|
|
def calculate_bollinger_bands(prices, period=20, num_std=2):
|
|
"""Bollinger Bands 계산 (상단, 중단, 하단)"""
|
|
if len(prices) < period:
|
|
return 0, 0, 0
|
|
|
|
s = pd.Series(prices)
|
|
sma = s.rolling(window=period).mean()
|
|
std = s.rolling(window=period).std()
|
|
|
|
upper = sma + (std * num_std)
|
|
lower = sma - (std * num_std)
|
|
|
|
return upper.iloc[-1], sma.iloc[-1], lower.iloc[-1]
|
|
|
|
@staticmethod
|
|
def calculate_stochastic(prices, high_prices=None, low_prices=None, n=14, k=3, d=3):
|
|
"""Stochastic Oscillator (Fast/Slow)
|
|
고가/저가 데이터가 없으면 종가(prices)로 추정 계산
|
|
"""
|
|
if len(prices) < n:
|
|
return 50, 50
|
|
|
|
close = pd.Series(prices)
|
|
# 고가/저가 데이터가 별도로 없으면 종가로 대체 (정확도는 떨어짐)
|
|
high = pd.Series(high_prices) if high_prices else close
|
|
low = pd.Series(low_prices) if low_prices else close
|
|
|
|
# 최근 n일간 최고가/최저가
|
|
highest_high = high.rolling(window=n).max()
|
|
lowest_low = low.rolling(window=n).min()
|
|
|
|
# Fast %K
|
|
fast_k = ((close - lowest_low) / (highest_high - lowest_low + 1e-9)) * 100
|
|
# Slow %K (= Fast %D)
|
|
slow_k = fast_k.rolling(window=k).mean()
|
|
# Slow %D
|
|
slow_d = slow_k.rolling(window=d).mean()
|
|
|
|
return slow_k.iloc[-1], slow_d.iloc[-1]
|
|
|
|
@staticmethod
|
|
def get_technical_score(current_price, prices_history, volume_history=None):
|
|
"""
|
|
기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (고도화됨)
|
|
- RSI, 이격도, MACD, Bollinger Bands, Stochastic 종합
|
|
- [New] Volume Analysis (Whale Activity)
|
|
"""
|
|
if not prices_history or len(prices_history) < 30:
|
|
return 0.5, 50.0 # 데이터 부족 시 중립
|
|
|
|
scores = []
|
|
|
|
# 1. RSI (비중 30%)
|
|
# 30 이하(과매도) -> 1.0, 70 이상(과매수) -> 0.0
|
|
rsi = TechnicalAnalyzer.calculate_rsi(prices_history)
|
|
if rsi <= 30: rsi_score = 1.0
|
|
elif rsi >= 70: rsi_score = 0.0
|
|
else: rsi_score = 1.0 - ((rsi - 30) / 40.0) # 선형 보간
|
|
scores.append(rsi_score * 0.3)
|
|
|
|
# 2. 이격도 (비중 20%)
|
|
ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20)
|
|
disparity = (current_price - ma20) / ma20
|
|
# 이격도가 마이너스일수록(저평가) 점수 높음
|
|
if disparity < -0.05: disp_score = 1.0 # -5% 이상 하락
|
|
elif disparity > 0.05: disp_score = 0.0 # +5% 이상 상승
|
|
else: disp_score = 0.5 - (disparity * 10) # -0.05~0.05 사이
|
|
scores.append(disp_score * 0.2)
|
|
|
|
# 3. MACD (비중 20%)
|
|
# MACD가 Signal선 위에 있으면 상승세 (매수)
|
|
macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history)
|
|
if hist > 0 and macd > 0: macd_score = 0.8 # 상승 추세 가속
|
|
elif hist > 0 and macd <= 0: macd_score = 0.6 # 상승 반전 초기
|
|
elif hist < 0 and macd > 0: macd_score = 0.4 # 하락 반전 초기
|
|
else: macd_score = 0.2 # 하락 추세
|
|
scores.append(macd_score * 0.2)
|
|
|
|
# 4. Bollinger Bands (비중 15%)
|
|
# 하단 밴드 근처 -> 매수(1.0), 상단 밴드 근처 -> 매도(0.0)
|
|
up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history)
|
|
if current_price <= low: bb_score = 1.0
|
|
bb_score_base = 0.0
|
|
if current_price <= low: bb_score_base = 1.0
|
|
elif current_price >= up: bb_score_base = 0.0
|
|
else:
|
|
# 밴드 내 위치 비율 (Position %B) 유사 계산
|
|
# 하단(0) ~ 상단(1) -> 점수는 1 ~ 0 역순
|
|
pos = (current_price - low) / (up - low + 1e-9)
|
|
bb_score_base = 1.0 - pos
|
|
|
|
# 추가 점수 로직 (기존 tech_score += 0.2를 bb_score에 반영)
|
|
if current_price < low: # 과매도 (저점 매수 기회)
|
|
bb_score = min(1.0, bb_score_base + 0.2) # 최대 1.0
|
|
else:
|
|
bb_score = bb_score_base
|
|
scores.append(bb_score * 0.15)
|
|
|
|
# 5. Stochastic (비중 15%)
|
|
# K가 20 미만 -> 과매도(매수), 80 이상 -> 과매수(매도)
|
|
slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history)
|
|
st_score_base = 0.0
|
|
if slow_k < 20: st_score_base = 1.0
|
|
elif slow_k > 80: st_score_base = 0.0
|
|
else: st_score_base = 1.0 - (slow_k / 100.0)
|
|
|
|
# 추가 점수 로직 (기존 tech_score += 0.2 / -= 0.1를 st_score에 반영)
|
|
if slow_k < 20: # 과매도
|
|
st_score = min(1.0, st_score_base + 0.2)
|
|
elif slow_k > 80: # 과매수
|
|
st_score = max(0.0, st_score_base - 0.1)
|
|
else:
|
|
st_score = st_score_base
|
|
scores.append(st_score * 0.15)
|
|
|
|
total_score = sum(scores)
|
|
|
|
# [신규] 거래량 폭증 분석 (Whale Tracking)
|
|
volume_ratio = 1.0
|
|
if volume_history and len(volume_history) >= 5:
|
|
vol_s = pd.Series(volume_history)
|
|
avg_vol = vol_s.rolling(window=5).mean().iloc[-2] # 어제까지의 5일 평균
|
|
current_vol = volume_history[-1]
|
|
if avg_vol > 0:
|
|
volume_ratio = current_vol / avg_vol
|
|
|
|
# 평소 거래량의 3배(300%) 이상 터지면 세력 유입 가능성 높음 -> 가산점
|
|
if volume_ratio >= 3.0:
|
|
total_score += 0.1 # 강력한 매수 신호
|
|
|
|
# 0.0 ~ 1.0 클리핑
|
|
total_score = max(0.0, min(1.0, total_score))
|
|
|
|
# [신규] 변동성(Volatility) 계산
|
|
# 최근 20일간 일일 변동폭의 표준편차를 평균 가격으로 나눔
|
|
if len(prices_history) > 1:
|
|
# list 입력 대응
|
|
prices_np = np.array(prices_history)
|
|
changes = np.diff(prices_np) / prices_np[:-1]
|
|
volatility = np.std(changes) * 100 # 퍼센트 단위
|
|
else:
|
|
volatility = 0.0
|
|
|
|
return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1)
|