Files
ai-trade/signal_v1/modules/analysis/technical.py
gahusb 7ea1a21487 refactor: web-ai V1 assets → signal_v1/ (graduation prep)
Atomic mv of root V1 assets (main_server.py + modules/ + data/ +
tests/ + entry scripts + docs + logs) into signal_v1/ subdirectory.
load_dotenv() updated to load web-ai/.env explicitly via Path.

Adds web-ai/CLAUDE.md (workspace guide) and web-ai/start.bat
(signal_v1 entry wrapper). Prepares for signal_v2/ Phase 2.

Tests: signal_v1/tests/unit baseline preserved (no regression).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 03:00:11 +09:00

512 lines
18 KiB
Python

import pandas as pd
import numpy as np
class TechnicalAnalyzer:
"""
Pandas를 활용한 기술적 지표 계산 모듈
CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리
[v2.0 개선사항]
- ATR(Average True Range): 변동성 기반 동적 손절/익절 산출
- ADX(Average Directional Index): 추세 강도 측정 (방향 아닌 '강도')
- OBV(On Balance Volume): 거래량 기반 매집/분산 감지
- 다중 시간프레임(MTF): 5일/20일/60일 추세 일관성 확인
- VWAP 근사: 거래량가중평균가격
"""
@staticmethod
def calculate_rsi(prices, period=14):
"""RSI(Relative Strength Index) 계산 - Wilder 방식 적용"""
if len(prices) < period:
return 50.0
delta = pd.Series(prices).diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
# Wilder의 지수이동평균 방식 (더 정확)
avg_gain = gain.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
rs = avg_gain / (avg_loss + 1e-9)
rsi = 100 - (100 / (1 + rs))
return rsi.iloc[-1]
@staticmethod
def calculate_atr(prices, high_prices=None, low_prices=None, period=14):
"""ATR(Average True Range) 계산 - 동적 손절/익절의 핵심 지표
Returns:
float: ATR 값 (가격 단위), 0이면 데이터 부족
"""
if len(prices) < period + 1:
return 0.0
close = pd.Series(prices)
if high_prices and len(high_prices) == len(prices):
high = pd.Series(high_prices)
low = pd.Series(low_prices)
else:
# 고가/저가 없으면 종가 기반 추정 (일변동폭 1.5% 가정)
high = close * 1.008
low = close * 0.992
# True Range = max(H-L, |H-Cprev|, |L-Cprev|)
prev_close = close.shift(1)
tr1 = high - low
tr2 = (high - prev_close).abs()
tr3 = (low - prev_close).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
# Wilder's smoothing
atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
return atr.iloc[-1] if not pd.isna(atr.iloc[-1]) else 0.0
@staticmethod
def calculate_adx(prices, high_prices=None, low_prices=None, period=14):
"""ADX(Average Directional Index) - 추세 강도 측정
Returns:
tuple: (adx, plus_di, minus_di)
- ADX > 25: 강한 추세, ADX < 20: 횡보/비추세
- +DI > -DI: 상승 추세, -DI > +DI: 하락 추세
"""
if len(prices) < period * 2:
return 20.0, 50.0, 50.0 # 중립
close = pd.Series(prices)
if high_prices and len(high_prices) == len(prices):
high = pd.Series(high_prices)
low = pd.Series(low_prices)
else:
# 종가 기반 추정
daily_range = close.pct_change().abs().rolling(5).mean().fillna(0.01) * close
high = close + daily_range * 0.5
low = close - daily_range * 0.5
# +DM, -DM
plus_dm = high.diff()
minus_dm = -low.diff()
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
# ATR
prev_close = close.shift(1)
tr = pd.concat([
high - low,
(high - prev_close).abs(),
(low - prev_close).abs()
], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
# +DI, -DI
plus_di = 100 * plus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
minus_di = 100 * minus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
# DX → ADX
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9)
adx = dx.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
return (
adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0,
plus_di.iloc[-1] if not pd.isna(plus_di.iloc[-1]) else 50.0,
minus_di.iloc[-1] if not pd.isna(minus_di.iloc[-1]) else 50.0
)
@staticmethod
def calculate_obv(prices, volume_history):
"""OBV(On Balance Volume) - 스마트머니 매집/분산 감지
Returns:
dict: {
'obv_trend': 'ACCUMULATING' | 'DISTRIBUTING' | 'NEUTRAL',
'obv_divergence': True/False (가격↑ but OBV↓ = 약세 다이버전스)
}
"""
if not volume_history or len(volume_history) < 20 or len(prices) < 20:
return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
close = pd.Series(prices)
volume = pd.Series(volume_history)
# OBV 계산
direction = close.diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
obv = (direction * volume).cumsum()
# OBV 추세 (20일 이동평균 대비)
obv_ma = obv.rolling(20).mean()
obv_current = obv.iloc[-1]
obv_ma_current = obv_ma.iloc[-1]
if pd.isna(obv_ma_current):
return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
# 추세 판단
obv_trend = 'NEUTRAL'
score = 0.0
if obv_current > obv_ma_current * 1.05:
obv_trend = 'ACCUMULATING' # 매집 중
score = 0.1
elif obv_current < obv_ma_current * 0.95:
obv_trend = 'DISTRIBUTING' # 분산 중
score = -0.1
# 다이버전스 감지 (최근 10일)
price_trend = close.iloc[-1] > close.iloc[-10] if len(close) >= 10 else False
obv_price_trend = obv.iloc[-1] > obv.iloc[-10] if len(obv) >= 10 else False
divergence = False
if price_trend and not obv_price_trend:
divergence = True # 약세 다이버전스 (가격↑ OBV↓)
score -= 0.05
elif not price_trend and obv_price_trend:
divergence = True # 강세 다이버전스 (가격↓ OBV↑)
score += 0.05
return {
'obv_trend': obv_trend,
'obv_divergence': divergence,
'score': round(score, 3)
}
@staticmethod
def get_multi_timeframe_trend(prices):
"""다중 시간프레임 추세 일관성 검사
5일(초단기), 20일(단기), 60일(중기) 추세가 일치하면 강한 신호
Returns:
dict: {
'alignment': 'STRONG_BULL' | 'BULL' | 'NEUTRAL' | 'BEAR' | 'STRONG_BEAR',
'score': -1.0 ~ 1.0,
'details': {...}
}
"""
if len(prices) < 60:
return {'alignment': 'NEUTRAL', 'score': 0.0, 'details': {}}
p = pd.Series(prices)
current = p.iloc[-1]
ma5 = p.rolling(5).mean().iloc[-1]
ma20 = p.rolling(20).mean().iloc[-1]
ma60 = p.rolling(60).mean().iloc[-1]
# 추세 방향 점수
trends = []
if current > ma5: trends.append(1)
else: trends.append(-1)
if ma5 > ma20: trends.append(1)
else: trends.append(-1)
if ma20 > ma60: trends.append(1)
else: trends.append(-1)
total = sum(trends)
if total == 3:
alignment = 'STRONG_BULL'
score = 0.15
elif total >= 1:
alignment = 'BULL'
score = 0.05
elif total == -3:
alignment = 'STRONG_BEAR'
score = -0.15
elif total <= -1:
alignment = 'BEAR'
score = -0.05
else:
alignment = 'NEUTRAL'
score = 0.0
return {
'alignment': alignment,
'score': score,
'details': {
'ma5': round(ma5, 1),
'ma20': round(ma20, 1),
'ma60': round(ma60, 1),
'price_vs_ma5': 'above' if current > ma5 else 'below'
}
}
@staticmethod
def calculate_dynamic_sl_tp(prices, high_prices=None, low_prices=None, atr_multiplier_sl=2.0, atr_multiplier_tp=3.0):
"""ATR 기반 동적 손절/익절 계산
변동성에 맞는 적응형 손절/익절 라인 산출
- 변동성 큰 종목: 넓은 손절폭 (whipsaw 방지)
- 변동성 작은 종목: 좁은 손절폭 (빠른 리스크 관리)
Returns:
dict: {
'atr': ATR값,
'atr_pct': ATR% (가격 대비),
'stop_loss_pct': 손절 비율 (%),
'take_profit_pct': 익절 비율 (%),
'trailing_stop_pct': 트레일링 스탑 비율 (%)
}
"""
if len(prices) < 15:
return {
'atr': 0, 'atr_pct': 0,
'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
'trailing_stop_pct': 3.0
}
atr = TechnicalAnalyzer.calculate_atr(prices, high_prices, low_prices)
current_price = prices[-1]
if current_price <= 0:
return {
'atr': 0, 'atr_pct': 0,
'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
'trailing_stop_pct': 3.0
}
atr_pct = (atr / current_price) * 100
# 동적 손절: ATR x 2 (단, 최소 -3%, 최대 -10%)
sl_pct = max(-10.0, min(-3.0, -atr_pct * atr_multiplier_sl))
# 동적 익절: ATR x 3 (단, 최소 +5%, 최대 +25%)
tp_pct = max(5.0, min(25.0, atr_pct * atr_multiplier_tp))
# 트레일링 스탑: ATR x 1.5 (최고가 대비)
trailing_pct = max(2.0, min(8.0, atr_pct * 1.5))
return {
'atr': round(atr, 1),
'atr_pct': round(atr_pct, 2),
'stop_loss_pct': round(sl_pct, 2),
'take_profit_pct': round(tp_pct, 2),
'trailing_stop_pct': round(trailing_pct, 2)
}
@staticmethod
def calculate_ma(prices, period=20):
"""이동평균선(Moving Average) 계산"""
if len(prices) < period:
return prices[-1] if prices else 0
return pd.Series(prices).rolling(window=period).mean().iloc[-1]
@staticmethod
def calculate_macd(prices, fast=12, slow=26, signal=9):
"""MACD (Moving Average Convergence Divergence) 계산"""
if len(prices) < slow + signal:
return 0, 0, 0 # 데이터 부족
s = pd.Series(prices)
ema_fast = s.ewm(span=fast, adjust=False).mean()
ema_slow = s.ewm(span=slow, adjust=False).mean()
macd = ema_fast - ema_slow
signal_line = macd.ewm(span=signal, adjust=False).mean()
histogram = macd - signal_line
return macd.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1]
@staticmethod
def calculate_bollinger_bands(prices, period=20, num_std=2):
"""Bollinger Bands 계산 (상단, 중단, 하단)"""
if len(prices) < period:
return 0, 0, 0
s = pd.Series(prices)
sma = s.rolling(window=period).mean()
std = s.rolling(window=period).std()
upper = sma + (std * num_std)
lower = sma - (std * num_std)
return upper.iloc[-1], sma.iloc[-1], lower.iloc[-1]
@staticmethod
def calculate_stochastic(prices, high_prices=None, low_prices=None, n=14, k=3, d=3):
"""Stochastic Oscillator (Fast/Slow)
고가/저가 데이터가 없으면 종가(prices)로 추정 계산
"""
if len(prices) < n:
return 50, 50
close = pd.Series(prices)
# 고가/저가 데이터가 별도로 없으면 종가로 대체 (정확도는 떨어짐)
high = pd.Series(high_prices) if high_prices else close
low = pd.Series(low_prices) if low_prices else close
# 최근 n일간 최고가/최저가
highest_high = high.rolling(window=n).max()
lowest_low = low.rolling(window=n).min()
# Fast %K
fast_k = ((close - lowest_low) / (highest_high - lowest_low + 1e-9)) * 100
# Slow %K (= Fast %D)
slow_k = fast_k.rolling(window=k).mean()
# Slow %D
slow_d = slow_k.rolling(window=d).mean()
return slow_k.iloc[-1], slow_d.iloc[-1]
@staticmethod
def get_technical_score(current_price, prices_history, volume_history=None):
"""
기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (v2.0 고도화)
[v2.0 변경점]
- RSI: 25% (30% → 25%, ADX에 비중 이전)
- 이격도: 15% (20% → 15%)
- MACD: 15% (20% → 15%)
- Bollinger: 10% (15% → 10%)
- Stochastic: 10% (15% → 10%)
- ADX 추세강도: 15% (신규)
- MTF 다중시간프레임: 10% (신규)
- OBV/거래량 보너스: ±0.1 (보너스)
"""
if not prices_history or len(prices_history) < 30:
return 0.5, 50.0, 0.0, 1.0, {"ma20": 0, "ma114": 0, "trend": "Unknown", "position": "Unknown"}
scores = []
# 1. RSI (비중 25%)
rsi = TechnicalAnalyzer.calculate_rsi(prices_history)
if rsi <= 30: rsi_score = 1.0
elif rsi >= 70: rsi_score = 0.0
else: rsi_score = 1.0 - ((rsi - 30) / 40.0)
scores.append(rsi_score * 0.25)
# 2. 이격도 (비중 15%)
ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20)
disparity = (current_price - ma20) / (ma20 + 1e-9)
if disparity < -0.05: disp_score = 1.0
elif disparity > 0.05: disp_score = 0.0
else: disp_score = 0.5 - (disparity * 10)
scores.append(disp_score * 0.15)
# 3. MACD (비중 15%)
macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history)
if hist > 0 and macd > 0: macd_score = 0.8
elif hist > 0 and macd <= 0: macd_score = 0.65 # 골든크로스 초기 = 매수 기회
elif hist < 0 and macd > 0: macd_score = 0.35 # 데드크로스 초기
else: macd_score = 0.2
scores.append(macd_score * 0.15)
# 4. Bollinger Bands (비중 10%)
up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history)
if current_price <= low:
bb_score = 1.0
elif current_price >= up:
bb_score = 0.0
else:
pos = (current_price - low) / (up - low + 1e-9)
bb_score = 1.0 - pos
if current_price < low:
bb_score = min(1.0, bb_score + 0.2)
scores.append(bb_score * 0.10)
# 5. Stochastic (비중 10%)
slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history)
if slow_k < 20: st_score = 1.0
elif slow_k > 80: st_score = 0.0
else: st_score = 1.0 - (slow_k / 100.0)
# 골든/데드크로스 보정
if slow_k < 20 and slow_k > slow_d: # 과매도 영역에서 골든크로스
st_score = min(1.0, st_score + 0.15)
elif slow_k > 80 and slow_k < slow_d: # 과매수 영역에서 데드크로스
st_score = max(0.0, st_score - 0.15)
scores.append(st_score * 0.10)
# 6. [신규] ADX 추세 강도 (비중 15%)
adx, plus_di, minus_di = TechnicalAnalyzer.calculate_adx(prices_history)
if adx >= 25: # 강한 추세
if plus_di > minus_di:
adx_score = 0.8 + min(0.2, (adx - 25) / 50) # 강한 상승추세
else:
adx_score = 0.2 - min(0.2, (adx - 25) / 50) # 강한 하락추세
else: # 비추세/횡보
adx_score = 0.5 # 중립
adx_score = max(0.0, min(1.0, adx_score))
scores.append(adx_score * 0.15)
# 7. [신규] 다중 시간프레임 (비중 10%)
mtf = TechnicalAnalyzer.get_multi_timeframe_trend(prices_history)
# MTF score를 0~1 범위로 변환
mtf_score = 0.5 + mtf['score'] # -0.15~+0.15 → 0.35~0.65
mtf_score = max(0.0, min(1.0, mtf_score))
scores.append(mtf_score * 0.10)
total_score = sum(scores)
# [보너스] 거래량 분석 (Whale Tracking + OBV)
volume_ratio = 1.0
if volume_history and len(volume_history) >= 5:
vol_s = pd.Series(volume_history)
avg_vol = vol_s.rolling(window=5).mean().iloc[-2]
current_vol = volume_history[-1]
if avg_vol > 0:
volume_ratio = current_vol / avg_vol
# 거래량 폭증 보너스
if volume_ratio >= 3.0:
total_score += 0.08
# OBV 분석 보너스
obv_result = TechnicalAnalyzer.calculate_obv(prices_history, volume_history)
total_score += obv_result['score']
# MTF 추세 일관성 보너스 (위의 가중치 10% 외에 추가 보너스)
if mtf['alignment'] == 'STRONG_BULL':
total_score += 0.05
elif mtf['alignment'] == 'STRONG_BEAR':
total_score -= 0.05
# 0.0 ~ 1.0 클리핑
total_score = max(0.0, min(1.0, total_score))
# 변동성(Volatility) 계산
if len(prices_history) > 1:
prices_np = np.array(prices_history)
changes = np.diff(prices_np) / prices_np[:-1]
volatility = np.std(changes) * 100
else:
volatility = 0.0
# 이동평균선 분석 (5일, 20일, 60일, 114일)
ma5 = TechnicalAnalyzer.calculate_ma(prices_history, 5)
ma60 = TechnicalAnalyzer.calculate_ma(prices_history, 60)
ma114 = TechnicalAnalyzer.calculate_ma(prices_history, 114)
ma_trend = "Unknown"
if ma5 > ma20 > ma60:
ma_trend = "Bullish (Golden Alignment)"
elif ma5 < ma20 < ma60:
ma_trend = "Bearish (Dead Alignment)"
elif ma20 > ma114:
ma_trend = "Moderate Bullish"
else:
ma_trend = "Moderate Bearish"
price_pos = "Unknown"
if current_price > ma20:
price_pos = "Above MA20"
else:
price_pos = "Below MA20"
ma_info = {
"ma5": round(ma5, 1),
"ma20": round(ma20, 1),
"ma60": round(ma60, 1),
"ma114": round(ma114, 1),
"trend": ma_trend,
"position": price_pos,
"adx": round(adx, 1),
"adx_trend": "Strong" if adx >= 25 else "Weak/Sideways",
"mtf_alignment": mtf['alignment']
}
return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1), ma_info