refactor: web-ai V1 assets → signal_v1/ (graduation prep)

Atomic mv of root V1 assets (main_server.py + modules/ + data/ +
tests/ + entry scripts + docs + logs) into signal_v1/ subdirectory.
load_dotenv() updated to load web-ai/.env explicitly via Path.

Adds web-ai/CLAUDE.md (workspace guide) and web-ai/start.bat
(signal_v1 entry wrapper). Prepares for signal_v2/ Phase 2.

Tests: signal_v1/tests/unit baseline preserved (no regression).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-16 03:00:11 +09:00
parent 42b91d03cf
commit 7ea1a21487
39 changed files with 722 additions and 691 deletions

View File

@@ -0,0 +1,445 @@
"""
AI 전문가 회의 시스템 (Multi-Agent Council)
- 4명의 전문가 에이전트가 독립 분석 후 의장 AI가 최종 결정
- 코스피 레짐 기반 모델 교체 권고
- process.py 분석 결과를 입력받아 심층 검토 수행
흐름:
전문가 1~4 (각 역할별 Ollama 호출)
의장 AI (전문가 의견 취합 + 최종 결정 + 모델 건전성 평가)
CouncilDecision (결정 + 모델 교체 권고 + 회의록)
"""
import json
import time
from dataclasses import dataclass, field
from typing import Optional, List, Any
from modules.analysis.market_regime import MarketRegimeDetector, MarketRegime, RegimeAnalysis
@dataclass
class ExpertOpinion:
"""개별 전문가 의견"""
expert_name: str
role: str
decision: str # BUY / SELL / HOLD
confidence: float # 0~1
reasoning: str
key_concern: str
model_feedback: str # 현재 AI 모델 적합성 평가
@dataclass
class CouncilDecision:
"""회의 최종 결정"""
final_decision: str # BUY / SELL / HOLD
consensus_score: float # 0~1 (1 = 만장일치)
confidence: float # 0~1
majority_reasoning: str # 주요 결정 근거
dissenting_views: str # 소수 의견
model_health_score: float # 0~1 (현재 모델 신뢰도)
model_replacement_recommended: bool # 모델 교체 필요 여부
recommended_model: str # 교체 권고 모델명
council_summary: str # 회의 전체 요약
expert_opinions: List[dict] = field(default_factory=list)
# 전문가 페르소나 정의
_EXPERTS = [
{
"name": "기술분석가",
"role": "technical",
"persona": (
"20년 경력의 코스피 전문 기술분석가. "
"RSI, MACD, 볼린저밴드, 추세선, 거래량 분석을 주로 사용. "
"단기 가격 모멘텀과 지지/저항 구간을 중시함."
),
"focus": (
"RSI 과매수/과매도, 볼린저밴드 위치, ADX 추세 강도, "
"거래량 급증, 멀티타임프레임 정렬 여부를 핵심 근거로 사용하세요."
),
},
{
"name": "퀀트전문가",
"role": "quant",
"persona": (
"AI/ML 기반 퀀트 투자 전문가. "
"LSTM 예측 신뢰도, 통계적 유의성, 백테스트 성과를 중시. "
"모델의 현재 시장 환경 적합성을 항상 평가함."
),
"focus": (
"LSTM 신뢰도와 예측 방향을 중심으로 분석하세요. "
"현재 코스피 레짐에서 LSTM v3 모델이 적합한지 반드시 평가하고, "
"더 나은 대안 모델이 있으면 구체적으로 제안하세요."
),
},
{
"name": "리스크관리자",
"role": "risk",
"persona": (
"글로벌 헤지펀드 리스크 관리 전문가. "
"포지션 사이징, 최대 낙폭(MDD), VaR, 손절 기준을 최우선으로 고려. "
"수익보다 손실 방어를 먼저 생각함."
),
"focus": (
"변동성 대비 포지션 크기 적절성, 손절 기준 타당성, "
"현재 보유 중이라면 추가 하락 리스크를 집중 평가하세요."
),
},
{
"name": "거시경제분석가",
"role": "macro",
"persona": (
"글로벌 매크로 및 한국 증시 전문가. "
"코스피 지수 수준, 원/달러 환율, 미국 금리, 외국인 수급을 중시. "
"현재 시장이 역사적으로 어떤 위치인지 판단함."
),
"focus": (
"코스피 지수 현재 수준이 역사적으로 어떤 의미인지, "
"이 가격대에서 매수/보유가 타당한지 거시경제 관점에서 평가하세요."
),
},
]
def _build_expert_prompt(expert: dict, ticker: str, data: dict) -> str:
"""전문가 역할에 맞는 분석 프롬프트 생성"""
kospi = data.get("kospi_price", 2500)
regime_label = MarketRegimeDetector.get_regime_label(kospi)
base = (
f"종목: {ticker} | 현재가: {data.get('current_price', 0):,.0f}\n"
f"코스피: {kospi:.0f} [{regime_label}]\n"
f"시장상태: {data.get('macro_state', 'SAFE')}\n"
f"---기술지표---\n"
f"기술점수: {data.get('tech_score', 0.5):.3f} | "
f"RSI: {data.get('rsi', 50):.1f} | ADX: {data.get('adx', 20):.1f}\n"
f"변동성: {data.get('volatility', 2.0):.2f}% | BB위치: {data.get('bb_zone', '중간')}\n"
f"MTF정렬: {data.get('mtf_alignment', 'N/A')}\n"
f"---AI모델---\n"
f"LSTM예측: {data.get('lstm_predicted', 0):,.0f}"
f"(변화율: {data.get('lstm_change_rate', 0):+.2f}%)\n"
f"LSTM신뢰도: {data.get('ai_confidence', 0.5):.2f} | "
f"LSTM점수: {data.get('lstm_score', 0.5):.3f}\n"
f"---수급/감성---\n"
f"감성점수: {data.get('sentiment_score', 0.5):.3f} | "
f"수급점수: {data.get('investor_score', 0):.3f}\n"
f"외인순매수: {data.get('frgn_net_buy', 0):+,} "
f"({data.get('consecutive_frgn_buy', 0)}일 연속)\n"
f"---포지션---\n"
f"보유중: {data.get('is_holding', False)} | "
f"보유수익률: {data.get('holding_yield', 0):+.2f}%\n"
f"통합점수: {data.get('total_score', 0.5):.3f}\n"
)
role_addition = (
f"\n당신은 {expert['persona']}\n"
f"분석 초점: {expert['focus']}\n"
)
output_format = (
"\n반드시 아래 JSON 형식으로만 응답하세요 (다른 텍스트 금지):\n"
"{\n"
' "decision": "BUY" 또는 "SELL" 또는 "HOLD",\n'
' "confidence": 0.0~1.0,\n'
' "reasoning": "주요 판단 근거 (1~2문장, 한국어)",\n'
' "key_concern": "가장 우려되는 리스크 (1문장, 한국어)",\n'
' "model_feedback": "현재 LSTM v3 모델이 이 시장 환경에서 적합한지 평가 (1문장)"\n'
"}"
)
return base + role_addition + output_format
def _build_chairman_prompt(
ticker: str,
opinions: List[ExpertOpinion],
data: dict,
regime: RegimeAnalysis,
) -> str:
"""의장 AI 최종 결정 프롬프트"""
opinions_text = "\n".join([
f"[{op.expert_name}] {op.decision} (확신도: {op.confidence:.2f})\n"
f" 근거: {op.reasoning}\n"
f" 우려: {op.key_concern}\n"
f" 모델평가: {op.model_feedback}"
for op in opinions
])
votes = [op.decision for op in opinions]
buy_n = votes.count("BUY")
sell_n = votes.count("SELL")
hold_n = votes.count("HOLD")
avg_conf = sum(op.confidence for op in opinions) / max(len(opinions), 1)
return (
"당신은 AI 투자 전문가 회의를 주재하는 의장입니다.\n\n"
f"=== 종목: {ticker} ===\n"
f"현재가: {data.get('current_price', 0):,.0f}원 | "
f"코스피: {data.get('kospi_price', 2500):.0f}\n"
f"시장 레짐: {regime.regime.value} ({regime.description})\n"
f"레짐 권고: {regime.model_recommendation}\n\n"
f"=== 전문가 의견 ===\n{opinions_text}\n\n"
f"=== 투표: 매수 {buy_n} / 매도 {sell_n} / 보유 {hold_n} "
f"(평균 확신도: {avg_conf:.2f}) ===\n\n"
"당신의 임무:\n"
"1. 4명 의견을 종합하여 최종 매매 결정\n"
f"2. LSTM v3 모델이 코스피 {data.get('kospi_price', 2500):.0f} 레짐에서 적합한지 평가\n"
"3. 필요 시 대안 모델 구체적으로 권고\n\n"
"반드시 아래 JSON 형식으로만 응답하세요:\n"
"{\n"
' "final_decision": "BUY" 또는 "SELL" 또는 "HOLD",\n'
' "consensus_score": 0.0~1.0,\n'
' "confidence": 0.0~1.0,\n'
' "majority_reasoning": "최종 결정 근거 2~3문장 (한국어)",\n'
' "dissenting_views": "소수 의견 요약 (없으면 빈 문자열)",\n'
' "model_health_score": 0.0~1.0,\n'
' "model_replacement_recommended": true 또는 false,\n'
' "recommended_model": "교체 권고 모델명 (없으면 \'현재 모델 유지\')",\n'
' "council_summary": "회의 전체 요약 3~4문장 (한국어)"\n'
"}"
)
def _parse_json_response(raw: Optional[str]) -> Optional[dict]:
"""LLM 응답에서 JSON 추출 (폴백 포함)"""
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
import re
match = re.search(r'\{[\s\S]*\}', raw)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
return None
def _vote_fallback(opinions: List[ExpertOpinion]) -> CouncilDecision:
"""의장 AI 실패 시 단순 다수결 폴백"""
from collections import Counter
if not opinions:
return CouncilDecision(
final_decision="HOLD", consensus_score=0.5, confidence=0.5,
majority_reasoning="분석 데이터 부족", dissenting_views="",
model_health_score=0.5, model_replacement_recommended=False,
recommended_model="현재 모델 유지",
council_summary="전문가 의견 수집 실패로 HOLD 처리",
)
votes = [op.decision for op in opinions]
final = Counter(votes).most_common(1)[0][0]
avg_conf = sum(op.confidence for op in opinions) / len(opinions)
vote_counts = Counter(votes)
consensus = vote_counts[final] / len(votes)
return CouncilDecision(
final_decision=final,
consensus_score=round(consensus, 3),
confidence=round(avg_conf, 3),
majority_reasoning=f"전문가 {vote_counts[final]}/{len(votes)} 다수결 결과",
dissenting_views="",
model_health_score=0.5,
model_replacement_recommended=False,
recommended_model="현재 모델 유지",
council_summary="의장 AI 오류 - 전문가 투표로 대체",
expert_opinions=[
{"name": op.expert_name, "decision": op.decision,
"confidence": op.confidence, "reasoning": op.reasoning}
for op in opinions
],
)
class AICouncil:
"""
AI 전문가 회의 시스템
사용 방법:
council = AICouncil(llm_client)
decision = council.convene(ticker, analysis_data, regime_analysis)
fast_mode=True 시 전문가 생략, 의장 AI 단독 판단 (속도 약 4배 향상)
llm_client: GeminiLLMClient 또는 OllamaManager (request_inference 인터페이스 공용)
"""
def __init__(self, llm_client: Any = None):
self._ollama = llm_client # 내부 변수명 유지 (하위호환)
def _get_ollama(self) -> Any:
if self._ollama is None:
from modules.services.llm_client import get_llm_client
self._ollama = get_llm_client()
return self._ollama
def _ask_expert(self, expert: dict, ticker: str, data: dict) -> ExpertOpinion:
"""단일 전문가 의견 수집"""
prompt = _build_expert_prompt(expert, ticker, data)
raw = self._get_ollama().request_inference(prompt)
parsed = _parse_json_response(raw)
if parsed:
return ExpertOpinion(
expert_name=expert["name"],
role=expert["role"],
decision=str(parsed.get("decision", "HOLD")).upper(),
confidence=float(parsed.get("confidence", 0.5)),
reasoning=str(parsed.get("reasoning", "")),
key_concern=str(parsed.get("key_concern", "")),
model_feedback=str(parsed.get("model_feedback", "")),
)
# 파싱 실패 → 중립
print(f"[Council] {expert['name']} 응답 파싱 실패 → HOLD 처리")
return ExpertOpinion(
expert_name=expert["name"],
role=expert["role"],
decision="HOLD",
confidence=0.5,
reasoning="응답 파싱 실패",
key_concern="",
model_feedback="",
)
def convene(
self,
ticker: str,
analysis_data: dict,
regime_analysis: Optional[RegimeAnalysis] = None,
fast_mode: bool = True,
) -> CouncilDecision:
"""
전문가 회의 소집 및 최종 결정
Args:
ticker: 종목 코드
analysis_data: process.py 분석 결과 딕셔너리
regime_analysis: MarketRegimeDetector.detect() 결과
fast_mode: True=의장 AI 단독(빠름), False=전문가 4명+의장(심층)
Returns:
CouncilDecision
"""
# 레짐 기본값
if regime_analysis is None:
kospi = analysis_data.get("kospi_price", 2500)
regime_analysis = MarketRegimeDetector.detect(kospi)
expert_opinions: List[ExpertOpinion] = []
if not fast_mode:
print(f"[Council] {ticker} - 전문가 회의 시작 (4명)")
for expert in _EXPERTS:
print(f"[Council] {expert['name']} 분석 중...")
opinion = self._ask_expert(expert, ticker, analysis_data)
expert_opinions.append(opinion)
time.sleep(0.3) # Ollama 연속 요청 간격
else:
print(f"[Council] {ticker} - Fast mode (의장 단독)")
# 의장 AI 취합
chairman_prompt = _build_chairman_prompt(
ticker, expert_opinions, analysis_data, regime_analysis
)
raw_chairman = self._get_ollama().request_inference(chairman_prompt)
parsed_chairman = _parse_json_response(raw_chairman)
if parsed_chairman:
decision = CouncilDecision(
final_decision=str(parsed_chairman.get("final_decision", "HOLD")).upper(),
consensus_score=float(parsed_chairman.get("consensus_score", 0.5)),
confidence=float(parsed_chairman.get("confidence", 0.5)),
majority_reasoning=str(parsed_chairman.get("majority_reasoning", "")),
dissenting_views=str(parsed_chairman.get("dissenting_views", "")),
model_health_score=float(parsed_chairman.get("model_health_score", 0.7)),
model_replacement_recommended=bool(
parsed_chairman.get("model_replacement_recommended", False)
),
recommended_model=str(
parsed_chairman.get("recommended_model", "현재 모델 유지")
),
council_summary=str(parsed_chairman.get("council_summary", "")),
expert_opinions=[
{
"name": op.expert_name,
"decision": op.decision,
"confidence": op.confidence,
"reasoning": op.reasoning,
}
for op in expert_opinions
],
)
status_icon = "⚠️" if decision.model_replacement_recommended else ""
print(
f"[Council] {ticker}{decision.final_decision} "
f"(합의율: {decision.consensus_score:.0%}, "
f"모델건전성: {decision.model_health_score:.0%}) "
f"{status_icon}"
)
if decision.model_replacement_recommended:
print(f"[Council] 모델 교체 권고: {decision.recommended_model}")
return decision
# 의장 실패 → 투표 폴백
print(f"[Council] {ticker} - 의장 AI 실패, 투표 폴백 사용")
return _vote_fallback(expert_opinions)
def quick_validate(
self,
ticker: str,
kospi_price: float,
ai_confidence: float,
backtest_sharpe: Optional[float] = None,
) -> dict:
"""
LLM 호출 없이 규칙 기반 빠른 모델 검증
Returns:
{
"regime": str,
"model_ok": bool,
"score": float,
"recommendation": str,
"should_replace": bool,
}
"""
regime_analysis = MarketRegimeDetector.detect(kospi_price)
validation = MarketRegimeDetector.validate_model_for_regime(
regime_analysis.regime,
backtest_sharpe=backtest_sharpe,
)
# AI 신뢰도 하락 시 추가 감점
score = validation["confidence_score"]
if ai_confidence < 0.4:
score *= 0.8
return {
"regime": regime_analysis.regime.value,
"regime_description": regime_analysis.description,
"model_ok": score >= 0.5 and not validation["should_replace"],
"score": round(score, 3),
"recommendation": validation["recommendation"],
"should_replace": validation["should_replace"],
"alternative_models": validation["alternative_models"],
}
# 전역 싱글톤
_council_instance: Optional[AICouncil] = None
def get_council(llm_client: Any = None) -> AICouncil:
"""워커 프로세스 내 AICouncil 싱글톤 반환 (GeminiLLMClient 또는 OllamaManager 수용)"""
global _council_instance
if _council_instance is None:
_council_instance = AICouncil(llm_client)
return _council_instance

View File

@@ -0,0 +1,274 @@
"""
백테스팅 프레임워크 (v3.2 — Realism 보강)
개선 사항 (v3.2):
1. 다음 봉 시가 체결 옵션 (look-ahead bias 제거)
2. 증권거래세 (매도 시 0.2%, 수수료와 별개 부과)
3. 거래량 기반 부분 체결 (한 봉 거래량의 N% 상한)
4. Calmar, Payoff, Turnover 지표 추가
"""
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
@dataclass
class Trade:
ticker: str
entry_date: int
entry_price: float
exit_date: int
exit_price: float
qty: int
direction: str = "LONG"
@property
def pnl(self):
if self.direction == "LONG":
return (self.exit_price - self.entry_price) * self.qty
return (self.entry_price - self.exit_price) * self.qty
@property
def pnl_pct(self):
return (self.exit_price - self.entry_price) / self.entry_price * 100
@dataclass
class BacktestResult:
total_return_pct: float
sharpe_ratio: float
max_drawdown_pct: float
win_rate: float
avg_win_pct: float
avg_loss_pct: float
profit_factor: float
total_trades: int
winning_trades: int
losing_trades: int
calmar_ratio: float = 0.0
payoff_ratio: float = 0.0 # 평균수익 / |평균손실|
turnover_ratio: float = 0.0 # 총 매매대금 / 초기자본
trades: List[Trade] = field(default_factory=list)
def summary(self) -> str:
lines = [
"=" * 50,
"📊 백테스팅 결과 (v3.2)",
"=" * 50,
f"총 수익률: {self.total_return_pct:+.2f}%",
f"Sharpe Ratio: {self.sharpe_ratio:.3f}",
f"Calmar Ratio: {self.calmar_ratio:.3f}",
f"Max Drawdown: {self.max_drawdown_pct:.2f}%",
f"승률: {self.win_rate:.1f}% ({self.winning_trades}/{self.total_trades})",
f"평균 수익: {self.avg_win_pct:+.2f}%",
f"평균 손실: {self.avg_loss_pct:.2f}%",
f"손익비(PF): {self.profit_factor:.2f}",
f"Payoff Ratio: {self.payoff_ratio:.2f}",
f"Turnover: {self.turnover_ratio:.2f}x",
"=" * 50,
]
return "\n".join(lines)
class Backtester:
"""
OHLCV 기반 전략 백테스터.
체결 모델 (v3.2):
- next_bar_open=True: 신호 발생 다음 봉 시가로 체결 (look-ahead 제거)
- slippage: 체결가에 ±slippage_rate 적용
- commission_rate: 매수/매도 양쪽에 부과 (증권사 수수료)
- sell_tax_rate: 매도 시에만 부과 (증권거래세 0.2%)
- max_volume_participation: 봉 거래량의 N% 이하로 체결 제한
"""
def __init__(self,
initial_capital: float = 10_000_000,
commission_rate: float = 0.00015,
slippage_rate: float = 0.001,
sell_tax_rate: float = 0.002,
next_bar_open: bool = True,
max_volume_participation: float = 0.01):
self.initial_capital = initial_capital
self.commission_rate = commission_rate
self.slippage_rate = slippage_rate
self.sell_tax_rate = sell_tax_rate
self.next_bar_open = next_bar_open
self.max_volume_participation = max_volume_participation
# ──────────────────────────────────────────────
# 단일 종목
# ──────────────────────────────────────────────
def run(self, ohlcv_data: Dict, strategy_fn: Callable,
ticker: str = "UNKNOWN", warmup: int = 60) -> BacktestResult:
closes = np.array(ohlcv_data.get('close', []), dtype=float)
opens = np.array(ohlcv_data.get('open', closes), dtype=float)
highs = np.array(ohlcv_data.get('high', closes), dtype=float)
lows = np.array(ohlcv_data.get('low', closes), dtype=float)
volumes = np.array(ohlcv_data.get('volume', np.zeros_like(closes)), dtype=float)
n = len(closes)
if n < warmup + 10:
return self._empty_result()
capital = self.initial_capital
position = 0
entry_price = 0.0
entry_idx = 0
equity_curve = [capital]
trades: List[Trade] = []
total_turnover = 0.0 # 누적 매매대금
# 마지막 인덱스는 next-bar 체결 시 여유 필요
last_signal_idx = n - 2 if self.next_bar_open else n - 1
for i in range(warmup, last_signal_idx + 1):
slice_data = {
'close': closes[:i+1].tolist(),
'open': opens[:i+1].tolist(),
'high': highs[:i+1].tolist(),
'low': lows[:i+1].tolist(),
'volume': volumes[:i+1].tolist(),
}
signal = "HOLD"
try:
signal = strategy_fn(slice_data)
except Exception:
pass
# 체결가 산출 — next_bar_open이면 i+1 시가, 아니면 i 종가
fill_idx = i + 1 if self.next_bar_open and i + 1 < n else i
base_price = opens[fill_idx] if self.next_bar_open else closes[fill_idx]
fill_volume = volumes[fill_idx]
buy_price = base_price * (1 + self.slippage_rate)
sell_price = base_price * (1 - self.slippage_rate)
if signal == "BUY" and position == 0:
# 전액 투자 (수수료 포함 총비용 기준)
raw_qty = int(capital / (buy_price * (1 + self.commission_rate)))
# 거래량 상한 — 봉 거래량의 N%까지만 체결
vol_cap = int(fill_volume * self.max_volume_participation)
qty = min(raw_qty, vol_cap) if vol_cap > 0 else raw_qty
if qty > 0:
cost = qty * buy_price * (1 + self.commission_rate)
capital -= cost
position = qty
entry_price = buy_price
entry_idx = fill_idx
total_turnover += qty * buy_price
elif signal == "SELL" and position > 0:
# 매도: 수수료 + 증권거래세
sell_cost_rate = self.commission_rate + self.sell_tax_rate
vol_cap = int(fill_volume * self.max_volume_participation) if fill_volume > 0 else position
exec_qty = min(position, vol_cap) if vol_cap > 0 else position
proceeds = exec_qty * sell_price * (1 - sell_cost_rate)
capital += proceeds
total_turnover += exec_qty * sell_price
trades.append(Trade(
ticker=ticker,
entry_date=entry_idx,
entry_price=entry_price,
exit_date=fill_idx,
exit_price=sell_price,
qty=exec_qty
))
position -= exec_qty
if position == 0:
entry_price = 0.0
current_equity = capital + (position * closes[i] if position > 0 else 0)
equity_curve.append(current_equity)
# 미청산 포지션: 마지막 종가 기준 강제 청산 (수수료+세금 반영)
if position > 0:
last_price = closes[-1] * (1 - self.slippage_rate)
sell_cost_rate = self.commission_rate + self.sell_tax_rate
proceeds = position * last_price * (1 - sell_cost_rate)
capital += proceeds
total_turnover += position * last_price
trades.append(Trade(
ticker=ticker,
entry_date=entry_idx,
entry_price=entry_price,
exit_date=n - 1,
exit_price=last_price,
qty=position
))
equity_curve[-1] = capital
position = 0
return self._compute_metrics(equity_curve, trades, total_turnover)
def run_multi(self, ohlcv_dict: Dict[str, Dict], strategy_fn: Callable,
warmup: int = 60) -> Dict[str, BacktestResult]:
return {t: self.run(d, strategy_fn, t, warmup) for t, d in ohlcv_dict.items()}
# ──────────────────────────────────────────────
# 지표 계산
# ──────────────────────────────────────────────
def _compute_metrics(self, equity_curve: List[float], trades: List[Trade],
total_turnover: float) -> BacktestResult:
equity = np.array(equity_curve, dtype=float)
total_return_pct = (equity[-1] / equity[0] - 1) * 100
daily_returns = np.diff(equity) / (equity[:-1] + 1e-9)
sharpe = (daily_returns.mean() / daily_returns.std()) * np.sqrt(252) \
if daily_returns.std() > 0 else 0.0
peak = np.maximum.accumulate(equity)
drawdowns = (equity - peak) / (peak + 1e-9) * 100
max_drawdown = abs(drawdowns.min())
wins = [t for t in trades if t.pnl_pct > 0]
losses = [t for t in trades if t.pnl_pct <= 0]
win_rate = len(wins) / len(trades) * 100 if trades else 0
avg_win = float(np.mean([t.pnl_pct for t in wins])) if wins else 0.0
avg_loss = float(np.mean([t.pnl_pct for t in losses])) if losses else 0.0
total_win = sum(t.pnl for t in wins)
total_loss = abs(sum(t.pnl for t in losses))
profit_factor = total_win / (total_loss + 1e-9)
# 신규 지표
calmar = (total_return_pct / max_drawdown) if max_drawdown > 0 else 0.0
payoff = (avg_win / abs(avg_loss)) if avg_loss != 0 else 0.0
turnover_ratio = total_turnover / (self.initial_capital + 1e-9)
return BacktestResult(
total_return_pct=round(total_return_pct, 2),
sharpe_ratio=round(sharpe, 3),
max_drawdown_pct=round(max_drawdown, 2),
win_rate=round(win_rate, 1),
avg_win_pct=round(avg_win, 2),
avg_loss_pct=round(avg_loss, 2),
profit_factor=round(profit_factor, 3),
total_trades=len(trades),
winning_trades=len(wins),
losing_trades=len(losses),
calmar_ratio=round(calmar, 3),
payoff_ratio=round(payoff, 3),
turnover_ratio=round(turnover_ratio, 3),
trades=trades,
)
def _empty_result(self) -> BacktestResult:
return BacktestResult(
total_return_pct=0.0, sharpe_ratio=0.0, max_drawdown_pct=0.0,
win_rate=0.0, avg_win_pct=0.0, avg_loss_pct=0.0,
profit_factor=0.0, total_trades=0, winning_trades=0, losing_trades=0
)
def compare_strategies(ohlcv_data: Dict, strategies: Dict[str, Callable],
initial_capital: float = 10_000_000) -> Dict[str, BacktestResult]:
bt = Backtester(initial_capital=initial_capital)
results = {}
for name, fn in strategies.items():
results[name] = bt.run(ohlcv_data, fn)
print(f"\n[{name}]")
print(results[name].summary())
return results

View File

@@ -0,0 +1,728 @@
import os
import time
import pickle
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from collections import OrderedDict
from sklearn.preprocessing import MinMaxScaler
from modules.config import Config
# cuDNN 벤치마크 활성화 (고정 입력 크기에 대해 최적 커널 자동 선택)
torch.backends.cudnn.benchmark = True
# 체크포인트 버전 (피처 수 변경 시 기존 모델 자동 재학습)
CHECKPOINT_VERSION = "v3"
INPUT_SIZE = 7 # close, open, high, low, volume_norm, rsi_14, macd_hist
class Attention(nn.Module):
def __init__(self, hidden_size):
super(Attention, self).__init__()
self.attn = nn.Linear(hidden_size, 1)
def forward(self, lstm_output):
attn_weights = torch.softmax(self.attn(lstm_output), dim=1)
context = torch.sum(attn_weights * lstm_output, dim=1)
return context, attn_weights
class AdvancedLSTM(nn.Module):
def __init__(self, input_size=INPUT_SIZE, hidden_size=512, num_layers=4, output_size=1, dropout=0.3):
super(AdvancedLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
batch_first=True, dropout=dropout)
self.attention = Attention(hidden_size)
self.fc = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, hidden_size // 4),
nn.ReLU(),
nn.Linear(hidden_size // 4, output_size)
)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
lstm_out, _ = self.lstm(x, (h0, c0))
context, _ = self.attention(lstm_out)
out = self.fc(context)
return out
def _get_free_vram_gb():
"""현재 GPU VRAM 여유량(GB) 반환"""
try:
if torch.cuda.is_available():
total = torch.cuda.get_device_properties(0).total_memory / 1024**3
reserved = torch.cuda.memory_reserved(0) / 1024**3
return total - reserved
except Exception:
pass
return 99.0 # CUDA 없으면 언로드 불필요
def _unload_ollama():
"""LSTM 학습 전 Ollama 모델 언로드 (VRAM < 2GB 여유일 때만)"""
free_vram = _get_free_vram_gb()
if free_vram >= 2.0:
print(f"[AI] Ollama 언로드 생략 (VRAM 여유 {free_vram:.1f}GB >= 2GB)")
return
try:
import requests
url = f"{Config.OLLAMA_API_URL}/api/generate"
requests.post(url, json={
"model": Config.OLLAMA_MODEL,
"keep_alive": 0
}, timeout=5)
print(f"[AI] Ollama 언로드 (VRAM 여유 {free_vram:.1f}GB)")
time.sleep(1)
except Exception:
pass
def _preload_ollama():
"""LSTM 학습 후 Ollama 모델 리로드 (언로드했던 경우만)"""
free_vram = _get_free_vram_gb()
if free_vram >= 2.0:
return # 언로드하지 않았으니 리로드도 불필요
try:
import requests
url = f"{Config.OLLAMA_API_URL}/api/generate"
requests.post(url, json={
"model": Config.OLLAMA_MODEL,
"prompt": "",
"keep_alive": "10m"
}, timeout=10)
except Exception:
pass
def _log_gpu_memory(tag=""):
"""GPU 메모리 사용량 로깅"""
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated(0) / 1024**3
reserved = torch.cuda.memory_reserved(0) / 1024**3
print(f"[AI GPU {tag}] Allocated: {allocated:.2f}GB / Reserved: {reserved:.2f}GB")
def _compute_rsi(close_arr, period=14):
"""RSI 계산 (numpy 기반)"""
if len(close_arr) < period + 1:
return np.full(len(close_arr), 50.0)
delta = np.diff(close_arr, prepend=close_arr[0])
gain = np.where(delta > 0, delta, 0.0)
loss = np.where(delta < 0, -delta, 0.0)
alpha = 1.0 / period
rsi_arr = np.zeros(len(close_arr))
avg_gain = gain[0]
avg_loss = loss[0]
for i in range(1, len(close_arr)):
avg_gain = alpha * gain[i] + (1 - alpha) * avg_gain
avg_loss = alpha * loss[i] + (1 - alpha) * avg_loss
rs = avg_gain / (avg_loss + 1e-9)
rsi_arr[i] = 100 - (100 / (1 + rs))
return rsi_arr
def _compute_macd_hist(close_arr, fast=12, slow=26, signal=9):
"""MACD Histogram 계산 (numpy 기반)"""
if len(close_arr) < slow + signal:
return np.zeros(len(close_arr))
ema_fast = np.zeros(len(close_arr))
ema_slow = np.zeros(len(close_arr))
alpha_f = 2 / (fast + 1)
alpha_s = 2 / (slow + 1)
ema_fast[0] = close_arr[0]
ema_slow[0] = close_arr[0]
for i in range(1, len(close_arr)):
ema_fast[i] = alpha_f * close_arr[i] + (1 - alpha_f) * ema_fast[i - 1]
ema_slow[i] = alpha_s * close_arr[i] + (1 - alpha_s) * ema_slow[i - 1]
macd = ema_fast - ema_slow
sig = np.zeros(len(close_arr))
alpha_sig = 2 / (signal + 1)
sig[0] = macd[0]
for i in range(1, len(close_arr)):
sig[i] = alpha_sig * macd[i] + (1 - alpha_sig) * sig[i - 1]
return macd - sig
def _build_feature_matrix(ohlcv_data):
"""
OHLCV 딕셔너리 → 7차원 numpy 피처 행렬 생성
피처: [close, open, high, low, volume_norm, rsi_14, macd_hist]
"""
close = np.array(ohlcv_data.get('close', []), dtype=np.float64)
open_ = np.array(ohlcv_data.get('open', close), dtype=np.float64)
high = np.array(ohlcv_data.get('high', close), dtype=np.float64)
low = np.array(ohlcv_data.get('low', close), dtype=np.float64)
volume = np.array(ohlcv_data.get('volume', []), dtype=np.float64)
n = len(close)
_degraded = []
if len(open_) != n: open_ = close.copy(); _degraded.append('open')
if len(high) != n: high = close.copy(); _degraded.append('high')
if len(low) != n: low = close.copy(); _degraded.append('low')
if _degraded:
print(f"[LSTM] ⚠️ OHLCV 피처 불완전 ({', '.join(_degraded)} → close 대체). 예측 신뢰도 저하 가능")
# 거래량 정규화 (20일 이동평균 대비 비율, max 기준보다 정보량이 높음)
if len(volume) == n and volume.max() > 0:
vol_series = pd.Series(volume)
vol_ma20 = vol_series.rolling(20, min_periods=1).mean().values
volume_norm = volume / (vol_ma20 + 1e-9)
volume_norm = np.clip(volume_norm, 0.0, 5.0) / 5.0 # 0~5배 → 0~1 정규화
else:
volume_norm = np.full(n, 0.2) # 데이터 없으면 중립값
rsi = _compute_rsi(close, period=14)
rsi_norm = rsi / 100.0 # 0~1 정규화
macd_hist = _compute_macd_hist(close)
# 7차원 피처 스택 (n x 7)
features = np.column_stack([close, open_, high, low, volume_norm, rsi_norm, macd_hist])
return features # shape: (n, 7)
class PricePredictor:
"""
[v3.0] 주가 예측 Deep Learning 모델 (GPU 최적화)
- 7차원 멀티피처 LSTM (close/open/high/low/vol_norm/rsi/macd_hist)
- feature_scaler(6개) + target_scaler(1개) 분리
- 데이터 누수 수정: train 데이터로만 fit
- 체크포인트에 scaler 상태 저장/로드
- VRAM 여유량 기반 Ollama 언로드 (충분하면 생략)
"""
def __init__(self):
self.feature_scaler = MinMaxScaler(feature_range=(0, 1)) # 입력 6개 피처
self.target_scaler = MinMaxScaler(feature_range=(0, 1)) # 타겟: close 가격
self.hidden_size = 512
self.num_layers = 4
self.model = AdvancedLSTM(input_size=INPUT_SIZE, hidden_size=self.hidden_size,
num_layers=self.num_layers, dropout=0.3)
self.criterion = nn.MSELoss()
self.device = torch.device('cpu')
self.use_amp = False
if torch.cuda.is_available():
try:
gpu_name = torch.cuda.get_device_name(0)
vram_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
self.device = torch.device('cuda')
self.model.to(self.device)
if torch.cuda.get_device_capability(0)[0] >= 7:
self.use_amp = True
# Warm-up
dummy = torch.zeros(1, 60, INPUT_SIZE, device=self.device)
with torch.no_grad():
_ = self.model(dummy)
torch.cuda.synchronize()
print(f"[AI] GPU Mode: {gpu_name} ({vram_gb:.1f}GB)"
f" | FP16={'ON' if self.use_amp else 'OFF'}"
f" | Features={INPUT_SIZE} | cuDNN Benchmark=ON")
_log_gpu_memory("init")
except Exception as e:
print(f"[AI] GPU Init Failed ({e}), falling back to CPU")
self.device = torch.device('cpu')
self.model.to(self.device)
else:
print("[AI] No CUDA GPU detected. Running on CPU.")
self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.001, weight_decay=1e-4)
self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6
)
self.scaler_amp = torch.amp.GradScaler('cuda') if self.use_amp else None
self.batch_size = 64
self.max_epochs = 200
self.seq_length = 60
self.patience = 15
self.max_grad_norm = 1.0
self.training_status = {
"is_training": False,
"loss": 0.0,
"current_ticker": None
}
@staticmethod
def verify_hardware():
if torch.cuda.is_available():
try:
gpu_name = torch.cuda.get_device_name(0)
vram_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
print(f"[AI Check] {gpu_name} ({vram_gb:.1f}GB VRAM) | cuDNN={torch.backends.cudnn.is_available()}"
f" | Features={INPUT_SIZE}")
return True
except Exception as e:
print(f"[AI Check] GPU Error: {e}")
return False
print("[AI Check] No GPU. CPU Mode.")
return False
def _get_checkpoint_path(self, ticker):
return os.path.join(Config.MODEL_DIR, f"{ticker}_lstm_{CHECKPOINT_VERSION}.pt")
def _load_checkpoint(self, ticker):
path = self._get_checkpoint_path(ticker)
if os.path.exists(path):
try:
checkpoint = torch.load(path, map_location=self.device, weights_only=False)
# 버전 체크 (v3 이전 체크포인트는 재학습)
if checkpoint.get('version', '') != CHECKPOINT_VERSION:
print(f"[AI] Checkpoint version mismatch ({ticker}): 재학습 필요")
return False
self.model.load_state_dict(checkpoint['model_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
# scaler 복원
if 'feature_scaler' in checkpoint:
self.feature_scaler = pickle.loads(checkpoint['feature_scaler'])
if 'target_scaler' in checkpoint:
self.target_scaler = pickle.loads(checkpoint['target_scaler'])
print(f"[AI] Checkpoint loaded: {ticker} (v3, 7-features)")
return True
except Exception as e:
print(f"[AI] Checkpoint load failed ({ticker}): {e}")
return False
def _save_checkpoint(self, ticker, epoch, loss):
path = self._get_checkpoint_path(ticker)
try:
torch.save({
'version': CHECKPOINT_VERSION,
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'epoch': epoch,
'loss': loss,
'feature_scaler': pickle.dumps(self.feature_scaler),
'target_scaler': pickle.dumps(self.target_scaler)
}, path)
except Exception as e:
print(f"[AI] Checkpoint save failed ({ticker}): {e}")
def _is_checkpoint_fresh(self, ticker, max_age=None):
"""체크포인트가 최근에 학습된 것인지 확인 (쿨다운 판단)"""
if not ticker:
return False
path = self._get_checkpoint_path(ticker)
if not os.path.exists(path):
return False
age = time.time() - os.path.getmtime(path)
threshold = max_age if max_age is not None else Config.LSTM_COOLDOWN
return age < threshold
def _prepare_scaled_features(self, features, split_point):
"""
피처 스케일링 (누수 방지: train split으로만 fit)
features: (n, 7) numpy array
split_point: train/val 분리 인덱스
Returns:
scaled_features: (n, 7) 스케일된 전체 피처
scaled_close: (n, 1) 스케일된 close (타겟용)
"""
# 6개 입력 피처 (close 포함 open/high/low/vol_norm/rsi/macd_hist)
# + 타겟은 close만 별도 scaler
input_features = features[:, :] # (n, 7) 전체 7개 피처 입력용
target_close = features[:, 0:1] # (n, 1) close만 타겟용
# train 데이터로만 fit (데이터 누수 방지)
self.feature_scaler.fit(input_features[:split_point])
self.target_scaler.fit(target_close[:split_point])
scaled_features = self.feature_scaler.transform(input_features)
scaled_close = self.target_scaler.transform(target_close)
return scaled_features, scaled_close
def _predict_only(self, ohlcv_data, ticker=None):
"""학습 없이 현재 체크포인트로만 빠른 예측 (쿨다운 중 사용)"""
prices = ohlcv_data.get('close', []) if isinstance(ohlcv_data, dict) else ohlcv_data
if len(prices) < self.seq_length:
return None
try:
features = _build_feature_matrix(
ohlcv_data if isinstance(ohlcv_data, dict) else {'close': prices}
)
if len(features) < self.seq_length:
return None
scaled = self.feature_scaler.transform(features)
last_seq = torch.FloatTensor(scaled[-self.seq_length:]).unsqueeze(0).to(self.device)
self.model.eval()
with torch.no_grad():
if self.use_amp:
with torch.amp.autocast('cuda'):
pred_scaled = self.model(last_seq)
else:
pred_scaled = self.model(last_seq)
predicted_price = self.target_scaler.inverse_transform(
pred_scaled.cpu().float().numpy())[0][0]
current_price = prices[-1]
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
cached_loss = self.training_status.get("loss", 0.5)
# 캐시 신뢰도: 마지막 학습 loss 기반 동적 계산 (고정값 제거)
cached_conf = min(0.70, 1.0 / (1.0 + (cached_loss * 200)))
print(f"[AI] {ticker or '?'}: 쿨다운 중 → 캐시 예측 사용 "
f"({predicted_price:.0f} / {change_rate:+.2f}% / conf={cached_conf:.2f})")
return {
"current": current_price,
"predicted": float(predicted_price),
"change_rate": round(change_rate, 2),
"trend": trend,
"loss": cached_loss,
"val_loss": cached_loss,
"confidence": round(cached_conf, 2),
"epochs": 0,
"device": str(self.device),
"lr": self.optimizer.param_groups[0]['lr'],
"cached": True
}
except Exception as e:
print(f"[AI] _predict_only 실패 ({ticker}): {e}")
return None
def train_and_predict(self, ohlcv_data, forecast_days=1, ticker=None):
"""
[v3.0] 7차원 멀티피처 LSTM 학습 + 예측
ohlcv_data: dict {'close':[], 'open':[], 'high':[], 'low':[], 'volume':[]}
또는 list (하위 호환: close 리스트)
"""
# 하위 호환: list 형태
if isinstance(ohlcv_data, list):
ohlcv_data = {'close': ohlcv_data}
prices = ohlcv_data.get('close', [])
if len(prices) < (self.seq_length + 10):
return None
# ===== 쿨다운 체크 =====
if self._is_checkpoint_fresh(ticker):
has_ckpt = self._load_checkpoint(ticker)
if has_ckpt:
result = self._predict_only(ohlcv_data, ticker)
if result:
return result
is_gpu = self.device.type == 'cuda'
# VRAM 여유량 기반 Ollama 언로드
if is_gpu:
_unload_ollama()
torch.cuda.empty_cache()
_log_gpu_memory("pre-train")
t_start = time.time()
# 1. 피처 행렬 구성 (n, 7)
features = _build_feature_matrix(ohlcv_data)
if len(features) < (self.seq_length + 10):
return None
n = len(features)
split_point = int(n * 0.8)
# 2. 스케일링 (train 데이터로만 fit → 누수 방지)
scaled_features, scaled_close = self._prepare_scaled_features(features, split_point)
# 3. 시퀀스 생성
x_seqs, y_seqs = [], []
for i in range(n - self.seq_length):
x_seqs.append(scaled_features[i:i + self.seq_length]) # (seq, 7)
y_seqs.append(scaled_close[i + self.seq_length]) # (1,)
x_all = torch.FloatTensor(np.array(x_seqs)).to(self.device)
y_all = torch.FloatTensor(np.array(y_seqs)).to(self.device)
# validation split (80/20)
seq_split = int(len(x_all) * 0.8)
x_train, y_train = x_all[:seq_split], y_all[:seq_split]
x_val, y_val = x_all[seq_split:], y_all[seq_split:]
dataset_size = len(x_train)
# 4. 체크포인트 로드
has_checkpoint = self._load_checkpoint(ticker) if ticker else False
max_epochs = Config.LSTM_FAST_EPOCHS if has_checkpoint else self.max_epochs
self.optimizer.param_groups[0]['lr'] = 0.001 if not has_checkpoint else 0.0005
self.lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6
)
# 5. 학습
self.model.train()
self.training_status["is_training"] = True
if ticker:
self.training_status["current_ticker"] = ticker
best_val_loss = float('inf')
best_model_state = None
patience_counter = 0
final_loss = 0.0
actual_epochs = 0
for epoch in range(max_epochs):
perm = torch.randperm(dataset_size, device=self.device)
x_shuffled = x_train[perm]
y_shuffled = y_train[perm]
epoch_loss = 0.0
steps = 0
for i in range(0, dataset_size, self.batch_size):
end = min(i + self.batch_size, dataset_size)
batch_x = x_shuffled[i:end]
batch_y = y_shuffled[i:end]
self.optimizer.zero_grad(set_to_none=True)
if self.use_amp:
with torch.amp.autocast('cuda'):
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
self.scaler_amp.scale(loss).backward()
self.scaler_amp.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
self.scaler_amp.step(self.optimizer)
self.scaler_amp.update()
else:
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)
self.optimizer.step()
epoch_loss += loss.item()
steps += 1
train_loss = epoch_loss / max(1, steps)
self.model.eval()
with torch.no_grad():
if self.use_amp:
with torch.amp.autocast('cuda'):
val_out = self.model(x_val)
val_loss = self.criterion(val_out, y_val).item()
else:
val_out = self.model(x_val)
val_loss = self.criterion(val_out, y_val).item()
self.model.train()
self.lr_scheduler.step(val_loss)
final_loss = train_loss
actual_epochs = epoch + 1
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
best_model_state = {k: v.clone() for k, v in self.model.state_dict().items()}
else:
patience_counter += 1
if patience_counter >= self.patience:
break
if best_model_state:
self.model.load_state_dict(best_model_state)
self.training_status["is_training"] = False
self.training_status["loss"] = final_loss
if is_gpu:
torch.cuda.synchronize()
elapsed = time.time() - t_start
print(f"[AI] {ticker or '?'}: {actual_epochs} epochs in {elapsed:.1f}s"
f" | loss={final_loss:.6f} val={best_val_loss:.6f}"
f" | device={self.device} | features={INPUT_SIZE}")
# 6. 체크포인트 저장 (scaler 포함)
if ticker:
self._save_checkpoint(ticker, actual_epochs, final_loss)
# 7. 예측
self.model.eval()
with torch.no_grad():
last_seq = torch.FloatTensor(
scaled_features[-self.seq_length:]
).unsqueeze(0).to(self.device)
if self.use_amp:
with torch.amp.autocast('cuda'):
predicted_scaled = self.model(last_seq)
else:
predicted_scaled = self.model(last_seq)
predicted_price = self.target_scaler.inverse_transform(
predicted_scaled.cpu().float().numpy())[0][0]
# 8. GPU 정리 + Ollama 리로드
if is_gpu:
del x_all, y_all, x_train, y_train, x_val, y_val
torch.cuda.empty_cache()
_log_gpu_memory("post-train")
_preload_ollama()
current_price = prices[-1]
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
# ── 신뢰도 계산 (보수적 버전) ──────────────────────────────
# val_loss 기반: 0.001→0.74, 0.003→0.62, 0.01→0.50 (이전보다 보수적)
loss_confidence = 1.0 / (1.0 + (best_val_loss * 200))
# 오버피팅 페널티
overfit_ratio = final_loss / (best_val_loss + 1e-9)
if overfit_ratio < 0.5:
overfit_penalty = 0.65 # 심각한 언더피팅
elif overfit_ratio > 2.5:
overfit_penalty = 0.75 # 오버피팅
else:
overfit_penalty = 1.0
# 에포크 수 기반 수렴 판단
epoch_factor = 1.0
if actual_epochs < 10:
epoch_factor = 0.55 # 너무 이른 수렴 → 불신뢰
elif actual_epochs >= max_epochs:
epoch_factor = 0.80 # 미수렴 → 부분 신뢰
# 최종 상한: 0.80 (이전 0.95보다 보수적 — LSTM 70% 가중치 남발 방지)
confidence = min(0.80, loss_confidence * overfit_penalty * epoch_factor)
return {
"current": current_price,
"predicted": float(predicted_price),
"change_rate": round(change_rate, 2),
"trend": trend,
"loss": final_loss,
"val_loss": best_val_loss,
"confidence": round(confidence, 2),
"epochs": actual_epochs,
"device": str(self.device),
"lr": self.optimizer.param_groups[0]['lr']
}
def batch_predict(self, ohlcv_dict):
"""여러 종목을 배치로 예측 (체크포인트 있는 종목만)"""
results = {}
seqs = []
metas = []
for ticker, ohlcv_data in ohlcv_dict.items():
if isinstance(ohlcv_data, list):
ohlcv_data = {'close': ohlcv_data}
prices = ohlcv_data.get('close', [])
if len(prices) < (self.seq_length + 10):
results[ticker] = None
continue
try:
features = _build_feature_matrix(ohlcv_data)
scaled = self.feature_scaler.transform(features)
seq = torch.FloatTensor(scaled[-self.seq_length:]).unsqueeze(0)
seqs.append(seq)
metas.append((ticker, prices[-1]))
except Exception:
results[ticker] = None
if not seqs:
return results
batch = torch.cat(seqs, dim=0).to(self.device)
self.model.eval()
with torch.no_grad():
if self.use_amp:
with torch.amp.autocast('cuda'):
preds = self.model(batch)
else:
preds = self.model(batch)
preds_cpu = preds.cpu().float().numpy()
for i, (ticker, current_price) in enumerate(metas):
predicted_price = self.target_scaler.inverse_transform(preds_cpu[i:i+1])[0][0]
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
results[ticker] = {
"current": current_price,
"predicted": float(predicted_price),
"change_rate": round(change_rate, 2),
"trend": trend
}
if self.device.type == 'cuda':
torch.cuda.empty_cache()
return results
class ModelRegistry:
"""
[v3.0] 종목별 LSTM 모델 격리 (LRU 퇴출, max_models=5)
- 싱글톤 패턴: 워커 프로세스마다 하나의 Registry 유지
- 16GB VRAM에서 LSTM 5개(~250MB) + Ollama 7B(~4GB) 동시 적재 가능
"""
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self, max_models=5):
self.max_models = max_models
self._predictors = OrderedDict() # ticker -> PricePredictor (LRU 순서)
print(f"[ModelRegistry] Initialized (max_models={max_models})")
def get_predictor(self, ticker):
"""종목별 PricePredictor 반환 (없으면 생성, LRU 관리)"""
if ticker in self._predictors:
# LRU: 접근 시 맨 뒤로 이동
self._predictors.move_to_end(ticker)
return self._predictors[ticker]
# 용량 초과 시 가장 오래된 것 퇴출
if len(self._predictors) >= self.max_models:
oldest_ticker, oldest_pred = self._predictors.popitem(last=False)
print(f"[ModelRegistry] Evicted {oldest_ticker} (LRU, {len(self._predictors)}/{self.max_models})")
del oldest_pred
if torch.cuda.is_available():
torch.cuda.empty_cache()
predictor = PricePredictor()
self._predictors[ticker] = predictor
print(f"[ModelRegistry] Created predictor for {ticker} ({len(self._predictors)}/{self.max_models})")
return predictor
def has_predictor(self, ticker):
return ticker in self._predictors
def clear(self):
"""모든 모델 해제"""
self._predictors.clear()
if torch.cuda.is_available():
torch.cuda.empty_cache()

View File

@@ -0,0 +1,416 @@
"""
앙상블 예측 모듈 (Phase 3-3)
- LSTM + 기술지표 + LLM 감성 → 적응형 가중치
- 과거 매매 결과 기반 가중치 자동 조정
- Kelly Criterion 기반 포지션 비중 계산
- process.py의 하드코딩된 w_tech/w_news/w_ai 대체
- 파일 mtime 기반 cross-process 동기화 (워커 ↔ 메인 프로세스)
"""
import os
import json
import time
import numpy as np
from dataclasses import dataclass
from typing import Dict, Optional
from modules.config import Config
@dataclass
class SignalWeights:
"""앙상블 가중치"""
tech: float = 0.35
sentiment: float = 0.30
lstm: float = 0.35
# 각 신호의 허용 범위
MIN_WEIGHT = 0.10
MAX_WEIGHT = 0.65
def normalize(self):
"""
경계 보존 정규화 (합=1, MIN≤각값≤MAX 동시 보장)
단순 1/2차 정규화는 경계 위반을 반복 유발하므로
반복 배분 알고리즘(Water-Filling) 사용:
1. 단순 정규화 (비율 유지)
2. 경계 위반 값 → 경계에 고정, 나머지에 잔여 비중 비례 배분
3. 모든 값이 경계 내에 들 때까지 반복 (최대 10회)
"""
MIN, MAX = self.MIN_WEIGHT, self.MAX_WEIGHT
vals = [max(MIN * 0.1, self.tech),
max(MIN * 0.1, self.sentiment),
max(MIN * 0.1, self.lstm)]
for _ in range(10):
total = sum(vals)
if total > 0:
vals = [v / total for v in vals]
fixed = [None, None, None]
has_violation = False
for i, v in enumerate(vals):
if v < MIN:
fixed[i] = MIN
has_violation = True
elif v > MAX:
fixed[i] = MAX
has_violation = True
if not has_violation:
break
fixed_sum = sum(f for f in fixed if f is not None)
remaining = 1.0 - fixed_sum
free = [(i, vals[i]) for i, f in enumerate(fixed) if f is None]
free_sum = sum(v for _, v in free)
new_vals = list(fixed)
if free and free_sum > 0:
factor = remaining / free_sum
for i, v in free:
new_vals[i] = v * factor
elif free:
per = remaining / len(free)
for i, _ in free:
new_vals[i] = per
vals = [v if v is not None else 0.0 for v in new_vals]
self.tech, self.sentiment, self.lstm = vals
return self
def to_dict(self):
return {"tech": self.tech, "sentiment": self.sentiment, "lstm": self.lstm}
@classmethod
def from_dict(cls, d):
return cls(tech=d.get("tech", 0.35),
sentiment=d.get("sentiment", 0.30),
lstm=d.get("lstm", 0.35))
class AdaptiveEnsemble:
"""
적응형 앙상블 가중치 관리자
핵심 로직:
1. 종목별 최근 N 매매의 결과를 추적
2. 어떤 신호가 정확했는지 소급 평가 (크기 가중 정확도)
3. 정확도가 높은 신호의 가중치를 점진적으로 증가
4. 시장 상황(ADX, 거시경제) 반영한 컨텍스트별 가중치 분리
5. Kelly Criterion 기반 최적 포지션 비중 제공
6. 파일 mtime 기반 cross-process 동기화 (워커 프로세스 갱신)
"""
def __init__(self, history_file=None, max_history=50):
self.max_history = max_history
self.history_file = history_file or os.path.join(
Config.DATA_DIR, "ensemble_history.json"
)
# {ticker: [{"tech_score": f, "sentiment_score": f, "lstm_score": f,
# "decision": str, "outcome": float}, ...]}
self._trade_history: Dict[str, list] = {}
# {context: SignalWeights} - context: "strong_trend" | "sideways" | "danger" | "default"
self._context_weights: Dict[str, SignalWeights] = {
"strong_trend": SignalWeights(tech=0.50, sentiment=0.20, lstm=0.30),
"sideways": SignalWeights(tech=0.30, sentiment=0.40, lstm=0.30),
"danger": SignalWeights(tech=0.20, sentiment=0.50, lstm=0.30),
"default": SignalWeights(tech=0.35, sentiment=0.30, lstm=0.35),
}
self._load_mtime: float = 0.0 # 마지막 파일 로드 시각
self._load()
# ──────────────────────────────────────────────
# 파일 I/O
# ──────────────────────────────────────────────
def _load(self):
if os.path.exists(self.history_file):
try:
with open(self.history_file, "r", encoding="utf-8") as f:
data = json.load(f)
self._trade_history = data.get("history", {})
weights_raw = data.get("weights", {})
for ctx, w in weights_raw.items():
self._context_weights[ctx] = SignalWeights.from_dict(w)
self._load_mtime = os.path.getmtime(self.history_file)
except Exception as e:
print(f"[Ensemble] Load failed: {e}")
def _save(self):
try:
data = {
"history": {k: v[-self.max_history:] for k, v in self._trade_history.items()},
"weights": {ctx: w.to_dict() for ctx, w in self._context_weights.items()}
}
with open(self.history_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self._load_mtime = os.path.getmtime(self.history_file)
except Exception as e:
print(f"[Ensemble] Save failed: {e}")
def reload_if_stale(self):
"""
파일이 마지막 로드 이후 수정되었으면 재로드.
워커 프로세스가 메인 프로세스의 record_trade 결과를 반영하기 위해 사용.
"""
if not os.path.exists(self.history_file):
return
try:
mtime = os.path.getmtime(self.history_file)
if mtime > self._load_mtime:
self._load()
print("[Ensemble] 파일 변경 감지, 가중치 재로드")
except Exception:
pass
# ──────────────────────────────────────────────
# 컨텍스트 & 가중치
# ──────────────────────────────────────────────
def get_context(self, adx: float, macro_state: str) -> str:
"""현재 시장 컨텍스트 결정"""
if macro_state == "DANGER":
return "danger"
if adx >= 25:
return "strong_trend"
if adx < 20:
return "sideways"
return "default"
def get_weights(self, ticker: str, adx: float = 20.0,
macro_state: str = "SAFE",
ai_confidence: float = 0.5) -> SignalWeights:
"""
종목 + 시장 컨텍스트에 맞는 가중치 반환
1. 컨텍스트별 기준 가중치 선택
2. AI 신뢰도 높으면 lstm 가중치 보정
3. 종목별 학습 결과 반영 (크기 가중 정확도 사용)
"""
context = self.get_context(adx, macro_state)
base = self._context_weights.get(context, self._context_weights["default"])
ticker_history = self._trade_history.get(ticker, [])
adjusted = SignalWeights(tech=base.tech, sentiment=base.sentiment, lstm=base.lstm)
if len(ticker_history) >= 5:
recent = ticker_history[-10:]
# _accuracy_weighted: 방향 일치 + 수익 크기 가중 반영 (단순 binary X)
tech_acc = self._accuracy_weighted(
[h.get("tech_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
news_acc = self._accuracy_weighted(
[h.get("sentiment_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
lstm_acc = self._accuracy_weighted(
[h.get("lstm_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
alpha = 0.05 # 미세 조정폭 (±0.1 범위)
adjusted.tech = max(0.10, min(0.60, base.tech + alpha * (tech_acc - 0.5)))
adjusted.sentiment = max(0.10, min(0.60, base.sentiment + alpha * (news_acc - 0.5)))
adjusted.lstm = max(0.10, min(0.60, base.lstm + alpha * (lstm_acc - 0.5)))
# AI 신뢰도 보정 (LSTM confidence 상한 0.80 기준 조정)
if ai_confidence >= 0.75:
adjusted.lstm = min(0.65, adjusted.lstm * 1.25)
elif ai_confidence < 0.5:
adjusted.lstm = max(0.10, adjusted.lstm * 0.75)
return adjusted.normalize()
# ──────────────────────────────────────────────
# 앙상블 점수
# ──────────────────────────────────────────────
def compute_ensemble_score(self, tech_score: float, sentiment_score: float,
lstm_score: float, investor_score: float = 0.0,
weights: Optional[SignalWeights] = None) -> float:
"""
앙상블 통합 점수 계산
Args:
weights: 가중치 (None이면 기본값 사용)
"""
if weights is None:
weights = SignalWeights()
total = (weights.tech * tech_score
+ weights.sentiment * sentiment_score
+ weights.lstm * lstm_score)
# 수급 가산점 (최대 +0.15)
total += min(investor_score, 0.15)
return min(1.0, max(0.0, total))
# ──────────────────────────────────────────────
# Kelly Criterion
# ──────────────────────────────────────────────
def get_kelly_fraction(self, ticker: str = None, half_kelly: bool = True) -> float:
"""
Modified Kelly Criterion 기반 최적 투자 비중 계산
f* = (p * b - q) / b
where:
p = 과거 승리 거래 비율 (win rate)
q = 1 - p
b = 평균이익 / 평균손실 비율 (avg profit / avg loss, Risk-Reward)
Returns:
0.03 ~ 0.25 범위의 Kelly 분수
- half_kelly=True: 변동성 과대추정 보완을 위해 1/2 적용
- 거래 데이터 < 10건: 보수적 기본값 0.08 반환
"""
# 해당 종목 우선, 없으면 전체 통합 히스토리 사용
if ticker and ticker in self._trade_history:
outcomes = [h["outcome"] for h in self._trade_history[ticker]
if h.get("outcome") is not None]
else:
# 전체 종목 결과 통합 (시장 전반 win rate)
outcomes = [
h["outcome"]
for records in self._trade_history.values()
for h in records
if h.get("outcome") is not None
]
if len(outcomes) < 10:
return 0.08 # 데이터 부족 → 보수적 8%
wins = [o for o in outcomes if o > 0]
losses = [abs(o) for o in outcomes if o <= 0]
if not wins:
return 0.03 # 승리 거래 없음 → 최소 비중
if not losses:
return 0.20 # 손실 거래 없음 → 낙관적이나 상한 제한
p = len(wins) / len(outcomes)
q = 1.0 - p
avg_win = sum(wins) / len(wins)
avg_loss = sum(losses) / len(losses)
if avg_loss == 0:
return 0.20
b = avg_win / avg_loss # Risk-Reward ratio
kelly = (p * b - q) / b
if half_kelly:
kelly /= 2.0 # Half-Kelly: 실제 활용 시 표준
result = max(0.03, min(0.25, kelly)) # 3% ~ 25% 범위 제한
return result
# ──────────────────────────────────────────────
# 거래 결과 기록 & 가중치 학습
# ──────────────────────────────────────────────
def record_trade(self, ticker: str, tech_score: float, sentiment_score: float,
lstm_score: float, decision: str, outcome_pct: float):
"""
매매 결과 기록 → 가중치 학습 데이터 축적
Args:
outcome_pct: 실현 수익률 (%). 양수=이익, 음수=손실
"""
if ticker not in self._trade_history:
self._trade_history[ticker] = []
record = {
"tech_score": tech_score,
"sentiment_score": sentiment_score,
"lstm_score": lstm_score,
"decision": decision,
"outcome": outcome_pct
}
self._trade_history[ticker].append(record)
if len(self._trade_history[ticker]) > self.max_history:
self._trade_history[ticker] = self._trade_history[ticker][-self.max_history:]
self._update_weights(ticker)
self._save()
def _update_weights(self, ticker: str):
"""
종목별 성과를 반영해 컨텍스트 가중치 점진적 업데이트.
- 크기 가중 정확도(accuracy_weighted) 사용 → 큰 손실에 강한 패널티
- 지수이동평균(alpha=0.10)으로 점진 반영 → 급격한 가중치 전환 방지
- normalize() 후 재경계 적용 → 경계값 위반 방지
"""
history = self._trade_history.get(ticker, [])
if len(history) < 5:
return
recent = history[-10:]
outcomes = [h["outcome"] for h in recent]
tech_acc = self._accuracy_weighted(
[h.get("tech_score", 0.5) for h in recent], outcomes)
news_acc = self._accuracy_weighted(
[h.get("sentiment_score", 0.5) for h in recent], outcomes)
lstm_acc = self._accuracy_weighted(
[h.get("lstm_score", 0.5) for h in recent], outcomes)
alpha = 0.10 # EMA 계수 (10회 거래 후 완전 반영)
for ctx, w in self._context_weights.items():
delta_tech = alpha * (tech_acc - 0.5) * 0.4 # 최대 ±0.02
delta_news = alpha * (news_acc - 0.5) * 0.4
delta_lstm = alpha * (lstm_acc - 0.5) * 0.4
# 경계 적용 → normalize (경계 재반영) → normalize (합=1 보장)
w.tech = max(0.10, min(0.65, w.tech + delta_tech))
w.sentiment = max(0.10, min(0.65, w.sentiment + delta_news))
w.lstm = max(0.10, min(0.65, w.lstm + delta_lstm))
w.normalize() # normalize() 내부에서 경계 재클램핑 + 2차 정규화 수행
print(f"[Ensemble] {ctx} tech={w.tech:.2f} news={w.sentiment:.2f} lstm={w.lstm:.2f} "
f"(acc T={tech_acc:.2f} N={news_acc:.2f} L={lstm_acc:.2f})")
# ──────────────────────────────────────────────
# 정확도 지표
# ──────────────────────────────────────────────
@staticmethod
def _accuracy_weighted(scores: list, outcomes: list) -> float:
"""
신호-결과 크기 가중 정확도 (0.0~1.0, 0.5=무관)
- 단순 방향 일치(0/1)가 아닌 수익률 절댓값으로 가중
- 큰 손실 예측 실패는 작은 이익 예측 성공보다 강하게 패널티
"""
if len(scores) < 3:
return 0.5
total_weight = 0.0
weighted_correct = 0.0
for s, o in zip(scores, outcomes):
weight = max(1.0, abs(o)) # 수익률 절댓값 기반 가중치 (최소 1.0)
total_weight += weight
if (s >= 0.5 and o > 0) or (s < 0.5 and o <= 0):
weighted_correct += weight
if total_weight == 0:
return 0.5
return weighted_correct / total_weight
# ──────────────────────────────────────────────
# 전역 싱글톤 (프로세스별)
# ──────────────────────────────────────────────
_ensemble_instance: Optional[AdaptiveEnsemble] = None
def get_ensemble() -> AdaptiveEnsemble:
"""프로세스 내 싱글톤 앙상블 관리자 반환 (워커/메인 각각 독립 인스턴스)"""
global _ensemble_instance
if _ensemble_instance is None:
_ensemble_instance = AdaptiveEnsemble()
return _ensemble_instance

View File

@@ -0,0 +1,421 @@
"""
성과 평가 엔진 - PerformanceEvaluator
기능:
1. compute_metrics() - 핵심 성과 지표 계산
2. get_grade() - 지표별 S/A/B/C/D/F 등급 산출
3. generate_expert_panel() - Ollama LLM 5명 전문가 의견
4. generate_weekly_report() - 텔레그램 HTML 주간 보고서
"""
import json
import math
from datetime import datetime, timedelta
from modules.utils.performance_db import PerformanceDB
class PerformanceEvaluator:
def __init__(self):
self.perf_db = PerformanceDB()
# ─────────────────────────────────────────
# 1. 핵심 지표 계산
# ─────────────────────────────────────────
def compute_metrics(self, snapshots, trades):
"""성과 지표를 딕셔너리로 반환.
Args:
snapshots (list): daily_snapshots 리스트
trades (list): trade_records 리스트
Returns:
dict: 지표 딕셔너리 (또는 {"error": ...})
"""
if not snapshots:
return {"error": "스냅샷 데이터 없음 (운영 시작 후 첫 영업일까지 대기)"}
metrics = {}
# ── 수익률 ──────────────────────────────
initial = snapshots[0].get("total_eval", 0)
current = snapshots[-1].get("total_eval", 0)
metrics["total_return_pct"] = round(
(current - initial) / initial * 100, 2) if initial > 0 else 0.0
cutoff_7 = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
recent_snaps = [s for s in snapshots if s.get("date", "") >= cutoff_7]
if len(recent_snaps) >= 2:
w_init = recent_snaps[0].get("total_eval", 0)
w_curr = recent_snaps[-1].get("total_eval", 0)
metrics["weekly_return_pct"] = round(
(w_curr - w_init) / w_init * 100, 2) if w_init > 0 else 0.0
else:
metrics["weekly_return_pct"] = 0.0
# ── 리스크 지표 ──────────────────────────
daily_returns = [s.get("daily_return_pct", 0.0) / 100.0 for s in snapshots]
if len(daily_returns) >= 2:
mean_daily = sum(daily_returns) / len(daily_returns)
variance = sum((r - mean_daily) ** 2 for r in daily_returns) / len(daily_returns)
std_daily = math.sqrt(variance) if variance > 0 else 0.0
annual_return = mean_daily * 252
# Sharpe Ratio
if std_daily > 0:
metrics["sharpe_ratio"] = round(
annual_return / (std_daily * math.sqrt(252)), 3)
else:
metrics["sharpe_ratio"] = 0.0
# Sortino Ratio (하방 편차만 사용)
downside = [r for r in daily_returns if r < 0]
if downside:
dv = sum(r ** 2 for r in downside) / len(downside)
ds = math.sqrt(dv)
metrics["sortino_ratio"] = round(
annual_return / (ds * math.sqrt(252)), 3) if ds > 0 else 0.0
else:
metrics["sortino_ratio"] = 10.0 # 손실 없음
# Max Drawdown
peak = snapshots[0].get("total_eval", 0)
max_dd = 0.0
for snap in snapshots:
ev = snap.get("total_eval", 0)
if ev > peak:
peak = ev
if peak > 0:
dd = (peak - ev) / peak * 100
if dd > max_dd:
max_dd = dd
metrics["max_drawdown_pct"] = round(max_dd, 2)
# Calmar Ratio
ann_pct = annual_return * 100
metrics["calmar_ratio"] = round(
ann_pct / max_dd, 3) if max_dd > 0 else 0.0
else:
metrics["sharpe_ratio"] = 0.0
metrics["sortino_ratio"] = 0.0
metrics["max_drawdown_pct"] = 0.0
metrics["calmar_ratio"] = 0.0
# ── 매매 지표 ─────────────────────────────
closed = [t for t in trades
if t.get("action") == "BUY" and t.get("outcome_return_pct") is not None]
if closed:
wins = [t for t in closed if t.get("outcome_return_pct", 0) > 0]
losses = [t for t in closed if t.get("outcome_return_pct", 0) <= 0]
metrics["win_rate_pct"] = round(
len(wins) / len(closed) * 100, 1)
total_profit = sum(t["outcome_return_pct"] for t in wins)
total_loss = abs(sum(t["outcome_return_pct"] for t in losses))
metrics["profit_factor"] = round(
total_profit / total_loss, 3) if total_loss > 0 else 10.0
hd_list = [t["holding_days"] for t in closed if t.get("holding_days") is not None]
metrics["avg_holding_days"] = round(
sum(hd_list) / len(hd_list), 1) if hd_list else 0.0
else:
metrics["win_rate_pct"] = 0.0
metrics["profit_factor"] = 0.0
metrics["avg_holding_days"] = 0.0
metrics["total_trades"] = len(closed)
# ── 벤치마크 Alpha ────────────────────────
kospi_vals = [s.get("benchmark_kospi_close") for s in snapshots]
kospi_valid = [k for k in kospi_vals if k is not None]
if len(kospi_valid) >= 2:
kospi_ret = (kospi_valid[-1] - kospi_valid[0]) / kospi_valid[0] * 100
metrics["alpha"] = round(metrics["total_return_pct"] - kospi_ret, 2)
metrics["kospi_return_pct"] = round(kospi_ret, 2)
else:
metrics["alpha"] = 0.0
metrics["kospi_return_pct"] = 0.0
# ── AI 품질 지표 ──────────────────────────
if closed:
# LSTM 방향 정확도
correct = 0
direction_n = 0
for t in closed:
pred = t.get("ai_prediction_change")
outcome = t.get("outcome_return_pct")
if pred is not None and outcome is not None:
if (pred > 0) == (outcome > 0):
correct += 1
direction_n += 1
metrics["lstm_direction_accuracy"] = round(
correct / direction_n * 100, 1) if direction_n > 0 else 0.0
# 신호별 수익 상관도
outcomes = [t.get("outcome_return_pct", 0) for t in closed]
def pearson(xs, ys):
n = len(xs)
if n < 2:
return 0.0
mx = sum(xs) / n
my = sum(ys) / n
num = sum((x - mx) * (y - my) for x, y in zip(xs, ys))
denom_x = sum((x - mx) ** 2 for x in xs)
denom_y = sum((y - my) ** 2 for y in ys)
denom = math.sqrt(denom_x * denom_y)
return num / denom if denom > 0 else 0.0
corr_tech = pearson([t.get("tech_score", 0) for t in closed], outcomes)
corr_sent = pearson([t.get("sentiment_score", 0) for t in closed], outcomes)
corr_lstm = pearson([t.get("lstm_score", 0) for t in closed], outcomes)
metrics["signal_correlation"] = {
"tech": round(corr_tech, 3),
"sentiment": round(corr_sent, 3),
"lstm": round(corr_lstm, 3)
}
metrics["best_signal_source"] = max(
["tech", "sentiment", "lstm"],
key=lambda k: abs(metrics["signal_correlation"][k])
)
else:
metrics["lstm_direction_accuracy"] = 0.0
metrics["signal_correlation"] = {"tech": 0.0, "sentiment": 0.0, "lstm": 0.0}
metrics["best_signal_source"] = "unknown"
metrics["snapshot_count"] = len(snapshots)
return metrics
# ─────────────────────────────────────────
# 2. 등급 산출
# ─────────────────────────────────────────
def get_grade(self, metric, value):
"""지표 이름과 값으로 S/A/B/C/D/F 등급 반환."""
# MDD는 낮을수록 좋음
if metric == "max_drawdown_pct":
thresholds = [(5, "S"), (10, "A"), (15, "B"), (20, "C"), (30, "D")]
for threshold, grade in thresholds:
if value < threshold:
return grade
return "F"
grade_rules = {
"sharpe_ratio": [(2.0, "S"), (1.5, "A"), (1.0, "B"), (0.5, "C"), (0.0, "D")],
"sortino_ratio": [(3.0, "S"), (2.0, "A"), (1.5, "B"), (1.0, "C"), (0.0, "D")],
"win_rate_pct": [(70, "S"), (60, "A"), (50, "B"), (40, "C"), (30, "D")],
"profit_factor": [(3.0, "S"), (2.0, "A"), (1.5, "B"), (1.0, "C"), (0.5, "D")],
"alpha": [(15, "S"), (10, "A"), (5, "B"), (0, "C"), (-5, "D")],
"total_return_pct": [(30, "S"), (20, "A"), (10, "B"), (0, "C"), (-10, "D")],
"weekly_return_pct": [(5, "S"), (3, "A"), (1, "B"), (0, "C"), (-1, "D")],
"lstm_direction_accuracy":[(70, "S"), (60, "A"), (55, "B"), (50, "C"), (40, "D")],
"calmar_ratio": [(3.0, "S"), (2.0, "A"), (1.0, "B"), (0.5, "C"), (0.0, "D")],
}
thresholds = grade_rules.get(metric, [])
for threshold, grade in thresholds:
if value >= threshold:
return grade
return "F"
# ─────────────────────────────────────────
# 3. 전문가 패널 (Ollama LLM)
# ─────────────────────────────────────────
def generate_expert_panel(self, metrics):
"""5명의 전문가 역할로 Ollama에 평가를 요청.
Returns:
list[dict]: [{role, grade, comment, suggestion}, ...]
"""
from modules.services.ollama import OllamaManager
ollama = OllamaManager()
sig_corr = metrics.get("signal_correlation", {})
experts = [
{
"role": "Risk Manager",
"focus": "risk level assessment and bankruptcy risk",
"data": (
f"Sharpe={metrics.get('sharpe_ratio', 0):.2f}, "
f"Sortino={metrics.get('sortino_ratio', 0):.2f}, "
f"MDD={metrics.get('max_drawdown_pct', 0):.1f}%, "
f"Calmar={metrics.get('calmar_ratio', 0):.2f}"
)
},
{
"role": "Fund Manager",
"focus": "alpha generation vs market benchmark",
"data": (
f"TotalReturn={metrics.get('total_return_pct', 0):.2f}%, "
f"Alpha={metrics.get('alpha', 0):.2f}%, "
f"KOSPI={metrics.get('kospi_return_pct', 0):.2f}%, "
f"WeeklyReturn={metrics.get('weekly_return_pct', 0):.2f}%"
)
},
{
"role": "Quant Analyst",
"focus": "AI model validity and signal quality",
"data": (
f"LSTM_Accuracy={metrics.get('lstm_direction_accuracy', 0):.1f}%, "
f"TechCorr={sig_corr.get('tech', 0):.3f}, "
f"SentCorr={sig_corr.get('sentiment', 0):.3f}, "
f"LSTMCorr={sig_corr.get('lstm', 0):.3f}, "
f"BestSignal={metrics.get('best_signal_source', 'N/A')}"
)
},
{
"role": "Trader",
"focus": "trading strategy effectiveness",
"data": (
f"WinRate={metrics.get('win_rate_pct', 0):.1f}%, "
f"ProfitFactor={metrics.get('profit_factor', 0):.2f}, "
f"AvgHolding={metrics.get('avg_holding_days', 0):.1f}days, "
f"TotalTrades={metrics.get('total_trades', 0)}"
)
},
{
"role": "Portfolio PM",
"focus": "overall strategy direction and sustainability",
"data": (
f"WeeklyReturn={metrics.get('weekly_return_pct', 0):.2f}%, "
f"Sharpe={metrics.get('sharpe_ratio', 0):.2f}, "
f"WinRate={metrics.get('win_rate_pct', 0):.1f}%, "
f"Alpha={metrics.get('alpha', 0):.2f}%, "
f"MDD={metrics.get('max_drawdown_pct', 0):.1f}%"
)
}
]
results = []
for exp in experts:
prompt = (
f"You are a professional {exp['role']} evaluating an AI stock trading bot. "
f"Your focus: {exp['focus']}. "
f"Performance data: {exp['data']}. "
f"Respond ONLY with valid JSON (no markdown, no extra text): "
f"{{\"grade\":\"S|A|B|C|D|F\","
f"\"comment\":\"1 sentence evaluation in Korean\","
f"\"suggestion\":\"1 sentence improvement tip in Korean\"}}"
)
try:
resp = ollama.request_inference(prompt)
if not resp:
raise ValueError("Empty response from Ollama")
data = json.loads(resp)
results.append({
"role": exp["role"],
"grade": data.get("grade", "C"),
"comment": data.get("comment", "(응답 없음)"),
"suggestion": data.get("suggestion", "데이터 축적 필요")
})
except Exception as e:
print(f"[Evaluator] Expert panel [{exp['role']}] error: {e}")
results.append({
"role": exp["role"],
"grade": "C",
"comment": "평가 데이터가 부족합니다.",
"suggestion": "더 많은 거래 데이터 축적 후 재평가를 권장합니다."
})
return results
# ─────────────────────────────────────────
# 4. 주간 보고서 생성
# ─────────────────────────────────────────
def generate_weekly_report(self):
"""주간 성과 보고서 (텔레그램 HTML 형식) 반환."""
snapshots = self.perf_db.load_snapshots(days=7)
# 매매 완료 건은 30일치 사용 (주간 거래 수가 적을 수 있음)
trades = self.perf_db.load_trades(days=30)
metrics = self.compute_metrics(snapshots, trades)
if "error" in metrics:
return (
f"<b>[주간 성과 평가 보고서]</b>\n"
f"⚠️ {metrics['error']}\n"
f"<i>매일 오전 09:05~09:15에 스냅샷이 저장됩니다.</i>"
)
# 등급 계산
g_sharpe = self.get_grade("sharpe_ratio", metrics.get("sharpe_ratio", 0))
g_win = self.get_grade("win_rate_pct", metrics.get("win_rate_pct", 0))
g_mdd = self.get_grade("max_drawdown_pct", metrics.get("max_drawdown_pct", 0))
g_alpha = self.get_grade("alpha", metrics.get("alpha", 0))
g_weekly = self.get_grade("weekly_return_pct", metrics.get("weekly_return_pct", 0))
g_lstm = self.get_grade("lstm_direction_accuracy",
metrics.get("lstm_direction_accuracy", 0))
# 종합 등급 (Sharpe/Win/MDD/Alpha 평균)
grade_map = {"S": 5, "A": 4, "B": 3, "C": 2, "D": 1, "F": 0}
grade_rev = {v: k for k, v in grade_map.items()}
key_grades = [grade_map[g] for g in [g_sharpe, g_win, g_mdd, g_alpha]]
overall_grade = grade_rev[round(sum(key_grades) / len(key_grades))]
# 전문가 패널 (Ollama 호출)
try:
experts = self.generate_expert_panel(metrics)
except Exception as e:
print(f"[Evaluator] Expert panel skipped: {e}")
experts = []
now_str = datetime.now().strftime("%Y/%m/%d %H:%M")
corr = metrics.get("signal_correlation", {})
report = (
f"📊 <b>[주간 성과 평가 보고서]</b> <code>{now_str}</code>\n"
f"━━━━━━━━━━━━━━━━━━━━━━\n"
f"\n<b>■ 수익률</b>\n"
f" 주간: <code>{metrics.get('weekly_return_pct', 0):+.2f}%</code> [{g_weekly}]"
f" 누적: <code>{metrics.get('total_return_pct', 0):+.2f}%</code>\n"
f" Alpha: <code>{metrics.get('alpha', 0):+.2f}%</code> [{g_alpha}]"
f" vs KOSPI <code>{metrics.get('kospi_return_pct', 0):+.2f}%</code>\n"
f"\n<b>■ 리스크</b>\n"
f" Sharpe: <code>{metrics.get('sharpe_ratio', 0):.2f}</code> [{g_sharpe}]"
f" Sortino: <code>{metrics.get('sortino_ratio', 0):.2f}</code>\n"
f" MDD: <code>{metrics.get('max_drawdown_pct', 0):.1f}%</code> [{g_mdd}]"
f" Calmar: <code>{metrics.get('calmar_ratio', 0):.2f}</code>\n"
f"\n<b>■ 매매 통계</b>\n"
f" 승률: <code>{metrics.get('win_rate_pct', 0):.1f}%</code> [{g_win}]"
f" PF: <code>{metrics.get('profit_factor', 0):.2f}</code>\n"
f" 평균보유: <code>{metrics.get('avg_holding_days', 0):.1f}일</code>"
f" 완료매매: <code>{metrics.get('total_trades', 0)}건</code>\n"
f"\n<b>■ AI 품질</b>\n"
f" LSTM 방향정확도: <code>{metrics.get('lstm_direction_accuracy', 0):.1f}%</code>"
f" [{g_lstm}]\n"
f" 신호 상관도 — Tech: <code>{corr.get('tech', 0):.3f}</code>"
f" Sent: <code>{corr.get('sentiment', 0):.3f}</code>"
f" LSTM: <code>{corr.get('lstm', 0):.3f}</code>\n"
f" 최고기여 신호: <code>{metrics.get('best_signal_source', 'N/A')}</code>\n"
)
if experts:
role_icons = {
"Risk Manager": "🛡",
"Fund Manager": "💼",
"Quant Analyst": "🧮",
"Trader": "📈",
"Portfolio PM": "🏦"
}
report += "\n<b>■ 전문가 패널 의견</b>\n"
for exp in experts:
icon = role_icons.get(exp["role"], "👤")
report += (
f"{icon} <b>{exp['role']}</b> [{exp['grade']}]\n"
f" {exp['comment']}\n"
f" 💡 {exp['suggestion']}\n"
)
report += (
f"\n━━━━━━━━━━━━━━━━━━━━━━\n"
f"🏆 <b>종합 등급: [{overall_grade}]</b>\n"
f"<i>스냅샷 {metrics.get('snapshot_count', 0)}일 | 완료매매 {metrics.get('total_trades', 0)}건 기준</i>"
)
return report

View File

@@ -0,0 +1,157 @@
from datetime import datetime
import time
import os
from pathlib import Path
from dotenv import load_dotenv
from modules.services.kis import KISClient
class MacroAnalyzer:
"""
KIS API를 활용한 거시경제(시장 지수) 분석 모듈
yfinance 대신 한국투자증권 API를 사용하여 안정적인 KOSPI, KOSDAQ 데이터를 수집함.
"""
@staticmethod
def get_macro_status(kis_client):
"""
시장 주요 지수(KOSPI, KOSDAQ)를 조회하여 시장 위험도를 평가함.
Args:
kis_client (KISClient): 인증된 KIS API 클라이언트 인스턴스
Returns:
dict: 시장 상태 (SAFE, CAUTION, DANGER) 및 지표 데이터
"""
indicators = {
"KOSPI": "0001",
"KOSDAQ": "1001",
"KOSPI200": "0028",
}
results = {}
risk_score = 0
print("🌍 [Macro] Fetching market indices via KIS API...")
for name, code in indicators.items():
data = kis_client.get_current_index(code)
time.sleep(0.6) # Rate Limit 방지 (초당 2회 제한)
if data and data.get('price', 0) != 0:
results[name] = data
print(f" - {name}: {data['price']} ({data['change']}%)")
# 리스크 평가 로직 (2% 이상 폭락 장이면 위험)
change = data['change']
if change <= -2.0:
risk_score += 2 # 패닉 상태
elif change <= -1.0:
risk_score += 1 # 주의 상태
else:
results[name] = {"price": 0, "change": 0, "high": 0, "low": 0,
"prev_close": 0, "volume": 0, "trade_value": 0}
# [신규] 시장 스트레스 지수(MSI) 추가
time.sleep(0.6)
kospi_stress = MacroAnalyzer.calculate_stress_index(kis_client, "0001")
results['MSI'] = kospi_stress
print(f" - Market Stress Index: {kospi_stress}")
if kospi_stress >= 50:
risk_score += 2
elif kospi_stress >= 30:
risk_score += 1
# [v2.0] KOSPI/KOSDAQ 연동 위험도 (둘 다 하락 시 더 위험)
kospi_change = results.get('KOSPI', {}).get('change', 0)
kosdaq_change = results.get('KOSDAQ', {}).get('change', 0)
if kospi_change <= -1.0 and kosdaq_change <= -1.0:
risk_score += 1 # 양대 지수 동반 하락
print(f" ⚠️ Both KOSPI({kospi_change}%) & KOSDAQ({kosdaq_change}%) declining!")
# [v2.0] 급반등 감지 (전일 급락 후 반등 = 불안정)
if kospi_change >= 2.0 and kospi_stress >= 30:
risk_score = max(risk_score, 1) # 급반등이지만 스트레스 높으면 CAUTION 유지
print(f" 📈 Sharp rebound detected but MSI still elevated")
# 시장 상태 정의
status = "SAFE"
if risk_score >= 3:
status = "DANGER"
elif risk_score >= 1:
status = "CAUTION"
return {
"status": status,
"risk_score": risk_score,
"indicators": results
}
@staticmethod
def calculate_stress_index(kis_client, market_code="0001"):
"""
시장 스트레스 지수(MSI) 계산
- 0~100 사이의 값 (높을수록 위험)
- 요소: 변동성(Volatility), 추세 이격도(MA Divergence)
"""
import numpy as np
# 일봉 데이터 조회 (약 3개월치 = 60일 이상)
prices = kis_client.get_daily_index_price(market_code, period="D")
if not prices or len(prices) < 20:
return 0
prices = np.array(prices)
# 1. 역사적 변동성 (20일)
# 로그 수익률 계산
returns = np.diff(np.log(prices))
# 연환산 변동성 (Trading days = 252)
volatility = np.std(returns[-20:]) * np.sqrt(252) * 100
# 2. 이동평균 이격도
ma20 = np.mean(prices[-20:])
current_price = prices[-1]
disparity = (current_price - ma20) / ma20 * 100
# 3. 스트레스 점수 산출
# 변동성이 20% 넘어가면 위험, 이격도가 -5% 이하면 위험
stress_score = 0
# 변동성 기여 (평소 10~15%, 30% 이상 공포)
# 10 이하면 0점, 40 이상이면 60점 만점
v_score = min(max((volatility - 10) * 2, 0), 60)
# 하락 추세 기여 (-10% 이격이면 +40점)
d_score = 0
if disparity < 0:
d_score = min(abs(disparity) * 4, 40)
total_stress = v_score + d_score
return round(total_stress, 2)
if __name__ == "__main__":
# 테스트를 위한 코드
load_dotenv(Path(__file__).parent.parent.parent / ".env")
# 환경변수 로딩 및 클라이언트 초기화
if os.getenv("KIS_ENV_TYPE") == "real":
app_key = os.getenv("KIS_REAL_APP_KEY")
app_secret = os.getenv("KIS_REAL_APP_SECRET")
account = os.getenv("KIS_REAL_ACCOUNT")
is_virtual = False
else:
app_key = os.getenv("KIS_VIRTUAL_APP_KEY")
app_secret = os.getenv("KIS_VIRTUAL_APP_SECRET")
account = os.getenv("KIS_VIRTUAL_ACCOUNT")
is_virtual = True
kis = KISClient(app_key, app_secret, account, is_virtual)
# 토큰 발급 (필요 시)
kis.ensure_token()
# 분석 실행
report = MacroAnalyzer.get_macro_status(kis)
print("\n📊 [Macro Report]")
print(f"Status: {report['status']}")
print(f"Data: {report['indicators']}")

View File

@@ -0,0 +1,279 @@
"""
시장 레짐 감지 모듈
- 코스피 지수 수준에 따른 시장 레짐 분류
- 코스피 6300 목표 수준에서의 모델 적합성 평가
- 레짐별 전략 파라미터 자동 조정
"""
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict
class MarketRegime(Enum):
BULL_EXTREME = "bull_extreme" # 코스피 5000+ (역사적 극고점, 6300 시나리오)
BULL_STRONG = "bull_strong" # 코스피 3500~5000 (강한 상승장)
BULL_NORMAL = "bull_normal" # 코스피 2500~3500 (정상 상승장)
SIDEWAYS = "sideways" # 코스피 2000~2500 (횡보)
BEAR_MILD = "bear_mild" # 코스피 1500~2000 (약세)
BEAR_SEVERE = "bear_severe" # 코스피 1500 미만 (심각한 약세)
@dataclass
class RegimeAnalysis:
"""레짐 분석 결과"""
regime: MarketRegime
kospi_level: float
description: str
recommended_strategy: str
buy_threshold_adj: float # 매수 임계값 조정치 (+: 더 엄격, -: 완화)
position_size_adj: float # 포지션 크기 조정 배수 (1.0 = 기본)
lstm_weight_adj: float # LSTM 앙상블 가중치 조정 (+0.1 = 10% 증가)
model_recommendation: str # 모델 유지/교체 권고
risk_level: str # LOW / MEDIUM / HIGH / EXTREME
class MarketRegimeDetector:
"""
코스피 지수 수준 기반 시장 레짐 감지기
코스피 6300 시나리오:
- 현재 한국 증시 역대 최고점(2021년 3300) 대비 약 2배 수준
- BULL_EXTREME 레짐에 해당 → LSTM 단독 의존 지양, Transformer/Mamba 검토 필요
- 추세 추종 강화 + 고점 리스크 관리 병행
"""
# 레짐별 상세 파라미터
_REGIME_PARAMS: Dict[MarketRegime, dict] = {
MarketRegime.BULL_EXTREME: {
"description": "코스피 극강세장 5000+ (6300 시나리오)",
"recommended_strategy": (
"추세 추종 극대화, 트레일링 스탑 확대(ATR×4), "
"고점 과열 구간으로 포지션 축소 병행"
),
"buy_threshold_adj": -0.04, # 강세 모멘텀 → 진입 소폭 완화
"position_size_adj": 0.75, # 고점 리스크로 포지션 축소
"lstm_weight_adj": -0.12, # LSTM 비중 축소 (비선형 가격 동작)
"model_recommendation": (
"Temporal Fusion Transformer(TFT) 또는 Mamba(SSM) 교체 권고 - "
"LSTM은 극강세 과열 구간에서 비선형 가격 동작 포착 한계"
),
"risk_level": "EXTREME",
},
MarketRegime.BULL_STRONG: {
"description": "코스피 강상승장 3500~5000",
"recommended_strategy": "추세 추종, 모멘텀 강화, 손절 완화(ATR×2.5)",
"buy_threshold_adj": -0.03,
"position_size_adj": 1.1,
"lstm_weight_adj": 0.05,
"model_recommendation": "현재 LSTM v3 적합 - 성능 모니터링 유지",
"risk_level": "MEDIUM",
},
MarketRegime.BULL_NORMAL: {
"description": "코스피 정상 상승장 2500~3500",
"recommended_strategy": "기본 전략 유지 (기술+LSTM+LLM 균형)",
"buy_threshold_adj": 0.0,
"position_size_adj": 1.0,
"lstm_weight_adj": 0.0,
"model_recommendation": "현재 LSTM v3 최적 환경",
"risk_level": "LOW",
},
MarketRegime.SIDEWAYS: {
"description": "코스피 횡보장 2000~2500",
"recommended_strategy": "박스권 매매, LLM 감성 비중 확대, 빠른 익절",
"buy_threshold_adj": 0.03,
"position_size_adj": 0.85,
"lstm_weight_adj": -0.05,
"model_recommendation": "현재 LSTM v3 적합 - 감성 분석 가중치 강화",
"risk_level": "LOW",
},
MarketRegime.BEAR_MILD: {
"description": "코스피 약세장 1500~2000",
"recommended_strategy": "현금 비중 확대(50%+), 방어주 선별 매수",
"buy_threshold_adj": 0.08,
"position_size_adj": 0.5,
"lstm_weight_adj": 0.0,
"model_recommendation": "현재 LSTM v3 적합 - 리스크 관리 파라미터 강화",
"risk_level": "HIGH",
},
MarketRegime.BEAR_SEVERE: {
"description": "코스피 극약세장 1500 미만",
"recommended_strategy": "전면 현금화, 매수 중단",
"buy_threshold_adj": 0.20,
"position_size_adj": 0.2,
"lstm_weight_adj": 0.0,
"model_recommendation": "매크로 팩터 기반 방어 모델 전환 필요",
"risk_level": "EXTREME",
},
}
@classmethod
def detect(
cls,
kospi_price: float,
kospi_change_pct: float = 0.0,
volatility_20d: float = 0.0,
) -> RegimeAnalysis:
"""
코스피 지수 수준 + 변동성으로 시장 레짐 감지
Args:
kospi_price: 현재 코스피 지수 (예: 2600, 6300)
kospi_change_pct: 전일 대비 등락률 (%)
volatility_20d: 20일 변동성 (선택, 0이면 무시)
Returns:
RegimeAnalysis: 레짐 분석 결과 및 전략 파라미터
"""
# 1. 지수 수준으로 기본 레짐 결정
if kospi_price >= 5000:
regime = MarketRegime.BULL_EXTREME
elif kospi_price >= 3500:
regime = MarketRegime.BULL_STRONG
elif kospi_price >= 2500:
regime = MarketRegime.BULL_NORMAL
elif kospi_price >= 2000:
regime = MarketRegime.SIDEWAYS
elif kospi_price >= 1500:
regime = MarketRegime.BEAR_MILD
else:
regime = MarketRegime.BEAR_SEVERE
params = cls._REGIME_PARAMS[regime]
# 2. 변동성 기반 포지션 사이징 추가 조정
position_adj = params["position_size_adj"]
if volatility_20d > 30:
position_adj *= 0.6 # 극단적 변동성 → 추가 50% 축소
elif volatility_20d > 20:
position_adj *= 0.8 # 높은 변동성 → 20% 축소
# 3. 급락 중 레짐 하향 조정 (패닉 감지)
if kospi_change_pct <= -3.0:
# 극단적 일일 급락 → 포지션 추가 축소
position_adj *= 0.5
print(f"[Regime] PANIC DETECTED (일일 {kospi_change_pct:.1f}%) → 포지션 50% 추가 축소")
return RegimeAnalysis(
regime=regime,
kospi_level=kospi_price,
description=params["description"],
recommended_strategy=params["recommended_strategy"],
buy_threshold_adj=params["buy_threshold_adj"],
position_size_adj=round(position_adj, 3),
lstm_weight_adj=params["lstm_weight_adj"],
model_recommendation=params["model_recommendation"],
risk_level=params["risk_level"],
)
@classmethod
def validate_model_for_regime(
cls,
regime: MarketRegime,
backtest_sharpe: Optional[float] = None,
backtest_winrate: Optional[float] = None,
backtest_mdd: Optional[float] = None,
) -> dict:
"""
현재 LSTM v3 모델이 해당 레짐에서 적합한지 검증
Returns:
{
"is_suitable": bool,
"confidence_score": float (0~1),
"recommendation": str,
"should_replace": bool,
"alternative_models": list[str],
"reason": str,
}
"""
result = {
"is_suitable": True,
"confidence_score": 0.75,
"recommendation": "현재 LSTM v3 모델 유지",
"should_replace": False,
"alternative_models": [],
"reason": "정상 상승장 구간 - LSTM v3 최적 환경",
}
# 레짐 기반 기본 평가
if regime == MarketRegime.BULL_EXTREME:
result.update({
"is_suitable": False,
"confidence_score": 0.38,
"recommendation": "Transformer 계열 모델 교체 강력 권고",
"should_replace": True,
"alternative_models": [
"Temporal Fusion Transformer (TFT) - 장기 시계열 최강",
"Mamba (SSM) - 초고속 추론 + 긴 컨텍스트",
"PatchTST - Transformer 기반 주가 예측 특화",
"TimesNet - 2D 시계열 변환 + CNN",
"N-BEATS / N-HiTS - 해석 가능 딥러닝",
],
"reason": (
"코스피 5000+ 극강세장에서 LSTM은 비선형적 가격 급등 패턴을 "
"충분히 학습하지 못함. Attention 메커니즘만으로는 장기 상승 추세의 "
"복잡한 의존성 포착에 한계 존재."
),
})
elif regime == MarketRegime.BEAR_SEVERE:
result.update({
"is_suitable": False,
"confidence_score": 0.30,
"recommendation": "매크로 팩터 + Regime-Switching 모델 교체 권고",
"should_replace": True,
"alternative_models": [
"Regime-Switching LSTM (HMM + LSTM)",
"매크로 멀티팩터 모델 (환율, 금리, VIX 통합)",
"GRU + Attention (LSTM 경량 대안)",
],
"reason": "극약세장에서는 기술적 지표보다 거시경제 팩터가 지배적",
})
elif regime == MarketRegime.BULL_STRONG:
result.update({
"confidence_score": 0.72,
"reason": "강상승장 - LSTM 추세 학습 양호하나 성능 모니터링 필요",
})
elif regime == MarketRegime.SIDEWAYS:
result.update({
"confidence_score": 0.68,
"reason": "횡보장 - LSTM 예측력 저하, LLM 감성 보완 필수",
"recommendation": "현재 LSTM v3 유지 + LLM 감성 가중치 상향",
})
# 백테스트 결과 반영
if backtest_sharpe is not None:
if backtest_sharpe < 0:
result["confidence_score"] *= 0.5
result["should_replace"] = True
result["recommendation"] += " ⚠️ Sharpe < 0 → 즉시 교체 검토"
elif backtest_sharpe < 0.5:
result["confidence_score"] *= 0.75
result["recommendation"] += f" (Sharpe={backtest_sharpe:.2f} 미흡)"
if backtest_winrate is not None and backtest_winrate < 45:
result["confidence_score"] *= 0.8
result["recommendation"] += f" (승률={backtest_winrate:.1f}% 미흡)"
if backtest_mdd is not None and backtest_mdd < -25:
result["confidence_score"] *= 0.7
result["should_replace"] = True
result["recommendation"] += f" ⚠️ MDD={backtest_mdd:.1f}% 과다"
result["confidence_score"] = round(max(0.0, min(1.0, result["confidence_score"])), 3)
return result
@staticmethod
def get_regime_label(kospi_price: float) -> str:
"""간략 레짐 라벨 반환 (로그/UI 표시용)"""
if kospi_price >= 5000:
return f"BULL_EXTREME({kospi_price:.0f})"
elif kospi_price >= 3500:
return f"BULL_STRONG({kospi_price:.0f})"
elif kospi_price >= 2500:
return f"BULL_NORMAL({kospi_price:.0f})"
elif kospi_price >= 2000:
return f"SIDEWAYS({kospi_price:.0f})"
elif kospi_price >= 1500:
return f"BEAR_MILD({kospi_price:.0f})"
return f"BEAR_SEVERE({kospi_price:.0f})"

View File

@@ -0,0 +1,348 @@
"""
모델 검증 시스템 (Market-Regime Aware Model Validator)
- 백테스트 기반 현재 LSTM v3 성능 검증
- 코스피 레짐별 모델 적합성 평가
- 코스피 6300 강세장 시나리오 대응 점검
- 모델 교체 권고 보고서 생성
사용법:
validator = ModelValidator()
report = validator.validate(ticker, ohlcv_data, strategy_fn, kospi_price=2600)
print(report.summary())
validator.send_alert(report) # 텔레그램 알림 (심각한 경우만)
"""
import os
import json
import time
from dataclasses import dataclass, field
from typing import Optional, List
from modules.config import Config
from modules.analysis.backtest import Backtester, BacktestResult
from modules.analysis.market_regime import MarketRegimeDetector, MarketRegime, RegimeAnalysis
# 모델 적합성 최소 기준
_MIN_SHARPE = 0.5
_MIN_WIN_RATE = 50.0 # %
_MAX_MDD = -20.0 # % (초과 시 문제)
_MIN_PROFIT_FACTOR = 1.2
_CACHE_TTL_SECONDS = 86400 # 24시간
@dataclass
class ValidationReport:
"""모델 검증 보고서"""
ticker: str
kospi_level: float
regime: str
regime_description: str
backtest_result: Optional[BacktestResult]
model_suitable: bool
suitability_score: float # 0~1
issues: List[str] = field(default_factory=list)
recommendations: List[str] = field(default_factory=list)
alternative_models: List[str] = field(default_factory=list)
regime_strategy_hint: str = ""
risk_level: str = "LOW"
def summary(self) -> str:
lines = [
"=" * 55,
f"🔍 모델 검증 보고서 [{self.ticker}]",
"=" * 55,
f"코스피 수준 : {self.kospi_level:.0f} ({self.regime_description})",
f"시장 레짐 : {self.regime} [리스크: {self.risk_level}]",
f"모델 적합성 : {'✅ 적합' if self.model_suitable else '⚠️ 부적합'} "
f"({self.suitability_score:.0%})",
]
if self.backtest_result:
bt = self.backtest_result
lines += [
"",
"📊 백테스트 성과",
f" 총 수익률 : {bt.total_return_pct:+.2f}%",
f" Sharpe Ratio : {bt.sharpe_ratio:.3f}",
f" Max Drawdown : {bt.max_drawdown_pct:.2f}%",
f" 승률 : {bt.win_rate:.1f}% ({bt.winning_trades}/{bt.total_trades})",
f" 손익비(PF) : {bt.profit_factor:.2f}",
]
if self.issues:
lines.append("")
lines.append(f"⚠️ 발견된 문제 ({len(self.issues)}건)")
for issue in self.issues:
lines.append(f" - {issue}")
if self.recommendations:
lines.append("")
lines.append("💡 권고사항")
for rec in self.recommendations:
lines.append(f"{rec}")
if self.alternative_models:
lines.append("")
lines.append("🔄 대안 모델 목록")
for model in self.alternative_models:
lines.append(f"{model}")
if self.regime_strategy_hint:
lines.append("")
lines.append(f"📌 레짐 전략: {self.regime_strategy_hint}")
lines.append("=" * 55)
return "\n".join(lines)
def is_critical(self) -> bool:
"""즉각적인 조치가 필요한 수준인지 (텔레그램 알림 기준)"""
if not self.model_suitable and self.suitability_score < 0.4:
return True
if self.backtest_result and self.backtest_result.sharpe_ratio < 0:
return True
if self.backtest_result and self.backtest_result.max_drawdown_pct < -30:
return True
return False
class ModelValidator:
"""
LSTM v3 모델 검증기
검증 흐름:
1. 시장 레짐 감지 (코스피 수준)
2. 백테스트 실행 (선택)
3. 레짐별 모델 적합성 평가
4. 종합 보고서 생성
5. 심각한 경우 텔레그램 알림
"""
_CACHE_FILE = "model_validation_cache.json"
def __init__(self):
self._cache_path = os.path.join(Config.DATA_DIR, self._CACHE_FILE)
self._cache: dict = self._load_cache()
def _load_cache(self) -> dict:
if os.path.exists(self._cache_path):
try:
with open(self._cache_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def _save_cache(self):
try:
with open(self._cache_path, "w", encoding="utf-8") as f:
json.dump(self._cache, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[Validator] 캐시 저장 실패: {e}")
def validate(
self,
ticker: str,
ohlcv_data: dict,
strategy_fn=None,
kospi_price: float = 2500.0,
kospi_change_pct: float = 0.0,
run_backtest: bool = True,
) -> ValidationReport:
"""
모델 검증 실행
Args:
ticker: 종목 코드
ohlcv_data: OHLCV 딕셔너리
strategy_fn: 백테스트용 전략 함수 (None이면 백테스트 생략)
kospi_price: 현재 코스피 지수
kospi_change_pct: 코스피 당일 등락률
run_backtest: 백테스트 실행 여부
Returns:
ValidationReport
"""
issues: List[str] = []
recommendations: List[str] = []
# ── 1. 시장 레짐 감지 ────────────────────────────────
regime_analysis: RegimeAnalysis = MarketRegimeDetector.detect(
kospi_price, kospi_change_pct
)
# ── 2. 백테스트 (선택) ───────────────────────────────
backtest_result: Optional[BacktestResult] = None
if run_backtest and strategy_fn is not None:
try:
backtester = Backtester()
backtest_result = backtester.run(ohlcv_data, strategy_fn, ticker)
except Exception as e:
issues.append(f"백테스트 실행 오류: {e}")
# ── 3. 백테스트 결과 기준 위반 체크 ─────────────────
bt_sharpe = backtest_result.sharpe_ratio if backtest_result else None
bt_winrate = backtest_result.win_rate if backtest_result else None
bt_mdd = backtest_result.max_drawdown_pct if backtest_result else None
bt_pf = backtest_result.profit_factor if backtest_result else None
if backtest_result:
if bt_sharpe < _MIN_SHARPE:
issues.append(
f"Sharpe Ratio 미흡: {bt_sharpe:.3f} (최소 {_MIN_SHARPE})"
)
recommendations.append("LSTM 피처 확장 또는 모델 아키텍처 재검토")
if bt_winrate < _MIN_WIN_RATE:
issues.append(
f"승률 미흡: {bt_winrate:.1f}% (최소 {_MIN_WIN_RATE:.0f}%)"
)
recommendations.append("매수 진입 임계값 상향 조정 (+0.05)")
if bt_mdd < _MAX_MDD:
issues.append(
f"MDD 과다: {bt_mdd:.2f}% (허용 {_MAX_MDD:.0f}%)"
)
recommendations.append("ATR 손절 배수 축소 (ATR×2 → ATR×1.5)")
if bt_pf < _MIN_PROFIT_FACTOR:
issues.append(
f"손익비 미흡: {bt_pf:.2f} (최소 {_MIN_PROFIT_FACTOR})"
)
recommendations.append("익절 배수 확대 (ATR×3 → ATR×4)")
# ── 4. 레짐 기반 모델 적합성 평가 ───────────────────
regime_validation = MarketRegimeDetector.validate_model_for_regime(
regime_analysis.regime,
backtest_sharpe=bt_sharpe,
backtest_winrate=bt_winrate,
backtest_mdd=bt_mdd,
)
if not regime_validation["is_suitable"]:
issues.append(
f"레짐 부적합: {regime_analysis.regime.value} 환경에서 "
f"LSTM v3 한계 감지"
)
recommendations.append(regime_validation["recommendation"])
# 코스피 6300 특별 경고
if kospi_price >= 5000:
issues.append(
f"⚠️ 코스피 {kospi_price:.0f} - 역사적 극고점 수준 "
"LSTM 비선형 패턴 포착 한계 주의"
)
recommendations.append(
"Temporal Fusion Transformer(TFT) 또는 Mamba 모델 전환 검토"
)
# ── 5. 종합 적합성 점수 ──────────────────────────────
suitability_score = regime_validation["confidence_score"]
# 문제 건수에 따라 감점 (건당 10%, 최대 50% 감점)
penalty = min(len(issues) * 0.10, 0.50)
suitability_score = max(0.0, suitability_score - penalty)
suitability_score = round(suitability_score, 3)
# ── 6. 보고서 생성 ───────────────────────────────────
report = ValidationReport(
ticker=ticker,
kospi_level=kospi_price,
regime=regime_analysis.regime.value,
regime_description=regime_analysis.description,
backtest_result=backtest_result,
model_suitable=(suitability_score >= 0.5 and not regime_validation["should_replace"]),
suitability_score=suitability_score,
issues=issues,
recommendations=list(set(recommendations)), # 중복 제거
alternative_models=regime_validation.get("alternative_models", []),
regime_strategy_hint=regime_analysis.recommended_strategy,
risk_level=regime_analysis.risk_level,
)
# ── 7. 캐시 저장 ─────────────────────────────────────
self._cache[ticker] = {
"timestamp": time.time(),
"kospi_level": kospi_price,
"regime": regime_analysis.regime.value,
"suitability_score": suitability_score,
"should_replace": regime_validation["should_replace"],
"issue_count": len(issues),
}
self._save_cache()
return report
def get_cached(self, ticker: str) -> Optional[dict]:
"""캐시된 검증 결과 반환 (24시간 이내)"""
cached = self._cache.get(ticker)
if not cached:
return None
if time.time() - cached.get("timestamp", 0) > _CACHE_TTL_SECONDS:
return None
return cached
def send_alert(self, report: ValidationReport):
"""심각한 검증 결과 텔레그램 알림"""
if not report.is_critical():
return
try:
from modules.services.telegram import TelegramMessenger
msg = (
f"🚨 [모델 경고] {report.ticker}\n"
f"코스피 {report.kospi_level:.0f} | 레짐: {report.regime}\n"
f"적합성: {report.suitability_score:.0%}\n"
)
if report.issues:
msg += "문제:\n" + "\n".join(f"{i}" for i in report.issues[:3])
if report.alternative_models:
msg += f"\n권고 모델: {report.alternative_models[0]}"
TelegramMessenger().send_message(msg)
except Exception:
pass
def generate_regime_report(self, kospi_price: float) -> str:
"""코스피 수준만으로 빠른 레짐 보고서 생성 (백테스트 없음)"""
regime_analysis = MarketRegimeDetector.detect(kospi_price)
validation = MarketRegimeDetector.validate_model_for_regime(regime_analysis.regime)
lines = [
"=" * 55,
f"📈 코스피 {kospi_price:.0f} 레짐 분석",
"=" * 55,
f"레짐 : {regime_analysis.regime.value}",
f"설명 : {regime_analysis.description}",
f"리스크 수준 : {regime_analysis.risk_level}",
"",
"─ 전략 파라미터 조정 ─",
f"매수 임계값 : {'+' if regime_analysis.buy_threshold_adj >= 0 else ''}"
f"{regime_analysis.buy_threshold_adj:+.2f} 조정",
f"포지션 크기 : x{regime_analysis.position_size_adj:.2f}",
f"LSTM 가중치 : {'+' if regime_analysis.lstm_weight_adj >= 0 else ''}"
f"{regime_analysis.lstm_weight_adj:+.2f}",
"",
"─ 모델 평가 ─",
f"현재 모델 적합: {'' if validation['is_suitable'] else '⚠️'} "
f"(신뢰도 {validation['confidence_score']:.0%})",
f"교체 필요 : {'' if validation['should_replace'] else '아니오'}",
f"권고사항 : {validation['recommendation']}",
]
if validation["alternative_models"]:
lines.append("")
lines.append("대안 모델 목록:")
for model in validation["alternative_models"]:
lines.append(f"{model}")
lines.append("")
lines.append(f"📌 전략: {regime_analysis.recommended_strategy}")
lines.append("=" * 55)
return "\n".join(lines)
# 전역 싱글톤
_validator_instance: Optional[ModelValidator] = None
def get_validator() -> ModelValidator:
"""ModelValidator 싱글톤 반환"""
global _validator_instance
if _validator_instance is None:
_validator_instance = ModelValidator()
return _validator_instance

View File

@@ -0,0 +1,511 @@
import pandas as pd
import numpy as np
class TechnicalAnalyzer:
"""
Pandas를 활용한 기술적 지표 계산 모듈
CPU 멀티코어 성능(9800X3D)을 십분 활용하기 위해 복잡한 연산은 여기서 처리
[v2.0 개선사항]
- ATR(Average True Range): 변동성 기반 동적 손절/익절 산출
- ADX(Average Directional Index): 추세 강도 측정 (방향 아닌 '강도')
- OBV(On Balance Volume): 거래량 기반 매집/분산 감지
- 다중 시간프레임(MTF): 5일/20일/60일 추세 일관성 확인
- VWAP 근사: 거래량가중평균가격
"""
@staticmethod
def calculate_rsi(prices, period=14):
"""RSI(Relative Strength Index) 계산 - Wilder 방식 적용"""
if len(prices) < period:
return 50.0
delta = pd.Series(prices).diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
# Wilder의 지수이동평균 방식 (더 정확)
avg_gain = gain.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
rs = avg_gain / (avg_loss + 1e-9)
rsi = 100 - (100 / (1 + rs))
return rsi.iloc[-1]
@staticmethod
def calculate_atr(prices, high_prices=None, low_prices=None, period=14):
"""ATR(Average True Range) 계산 - 동적 손절/익절의 핵심 지표
Returns:
float: ATR 값 (가격 단위), 0이면 데이터 부족
"""
if len(prices) < period + 1:
return 0.0
close = pd.Series(prices)
if high_prices and len(high_prices) == len(prices):
high = pd.Series(high_prices)
low = pd.Series(low_prices)
else:
# 고가/저가 없으면 종가 기반 추정 (일변동폭 1.5% 가정)
high = close * 1.008
low = close * 0.992
# True Range = max(H-L, |H-Cprev|, |L-Cprev|)
prev_close = close.shift(1)
tr1 = high - low
tr2 = (high - prev_close).abs()
tr3 = (low - prev_close).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
# Wilder's smoothing
atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
return atr.iloc[-1] if not pd.isna(atr.iloc[-1]) else 0.0
@staticmethod
def calculate_adx(prices, high_prices=None, low_prices=None, period=14):
"""ADX(Average Directional Index) - 추세 강도 측정
Returns:
tuple: (adx, plus_di, minus_di)
- ADX > 25: 강한 추세, ADX < 20: 횡보/비추세
- +DI > -DI: 상승 추세, -DI > +DI: 하락 추세
"""
if len(prices) < period * 2:
return 20.0, 50.0, 50.0 # 중립
close = pd.Series(prices)
if high_prices and len(high_prices) == len(prices):
high = pd.Series(high_prices)
low = pd.Series(low_prices)
else:
# 종가 기반 추정
daily_range = close.pct_change().abs().rolling(5).mean().fillna(0.01) * close
high = close + daily_range * 0.5
low = close - daily_range * 0.5
# +DM, -DM
plus_dm = high.diff()
minus_dm = -low.diff()
plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
# ATR
prev_close = close.shift(1)
tr = pd.concat([
high - low,
(high - prev_close).abs(),
(low - prev_close).abs()
], axis=1).max(axis=1)
atr = tr.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
# +DI, -DI
plus_di = 100 * plus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
minus_di = 100 * minus_dm.ewm(alpha=1/period, min_periods=period, adjust=False).mean() / (atr + 1e-9)
# DX → ADX
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9)
adx = dx.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
return (
adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0,
plus_di.iloc[-1] if not pd.isna(plus_di.iloc[-1]) else 50.0,
minus_di.iloc[-1] if not pd.isna(minus_di.iloc[-1]) else 50.0
)
@staticmethod
def calculate_obv(prices, volume_history):
"""OBV(On Balance Volume) - 스마트머니 매집/분산 감지
Returns:
dict: {
'obv_trend': 'ACCUMULATING' | 'DISTRIBUTING' | 'NEUTRAL',
'obv_divergence': True/False (가격↑ but OBV↓ = 약세 다이버전스)
}
"""
if not volume_history or len(volume_history) < 20 or len(prices) < 20:
return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
close = pd.Series(prices)
volume = pd.Series(volume_history)
# OBV 계산
direction = close.diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
obv = (direction * volume).cumsum()
# OBV 추세 (20일 이동평균 대비)
obv_ma = obv.rolling(20).mean()
obv_current = obv.iloc[-1]
obv_ma_current = obv_ma.iloc[-1]
if pd.isna(obv_ma_current):
return {'obv_trend': 'NEUTRAL', 'obv_divergence': False, 'score': 0.0}
# 추세 판단
obv_trend = 'NEUTRAL'
score = 0.0
if obv_current > obv_ma_current * 1.05:
obv_trend = 'ACCUMULATING' # 매집 중
score = 0.1
elif obv_current < obv_ma_current * 0.95:
obv_trend = 'DISTRIBUTING' # 분산 중
score = -0.1
# 다이버전스 감지 (최근 10일)
price_trend = close.iloc[-1] > close.iloc[-10] if len(close) >= 10 else False
obv_price_trend = obv.iloc[-1] > obv.iloc[-10] if len(obv) >= 10 else False
divergence = False
if price_trend and not obv_price_trend:
divergence = True # 약세 다이버전스 (가격↑ OBV↓)
score -= 0.05
elif not price_trend and obv_price_trend:
divergence = True # 강세 다이버전스 (가격↓ OBV↑)
score += 0.05
return {
'obv_trend': obv_trend,
'obv_divergence': divergence,
'score': round(score, 3)
}
@staticmethod
def get_multi_timeframe_trend(prices):
"""다중 시간프레임 추세 일관성 검사
5일(초단기), 20일(단기), 60일(중기) 추세가 일치하면 강한 신호
Returns:
dict: {
'alignment': 'STRONG_BULL' | 'BULL' | 'NEUTRAL' | 'BEAR' | 'STRONG_BEAR',
'score': -1.0 ~ 1.0,
'details': {...}
}
"""
if len(prices) < 60:
return {'alignment': 'NEUTRAL', 'score': 0.0, 'details': {}}
p = pd.Series(prices)
current = p.iloc[-1]
ma5 = p.rolling(5).mean().iloc[-1]
ma20 = p.rolling(20).mean().iloc[-1]
ma60 = p.rolling(60).mean().iloc[-1]
# 추세 방향 점수
trends = []
if current > ma5: trends.append(1)
else: trends.append(-1)
if ma5 > ma20: trends.append(1)
else: trends.append(-1)
if ma20 > ma60: trends.append(1)
else: trends.append(-1)
total = sum(trends)
if total == 3:
alignment = 'STRONG_BULL'
score = 0.15
elif total >= 1:
alignment = 'BULL'
score = 0.05
elif total == -3:
alignment = 'STRONG_BEAR'
score = -0.15
elif total <= -1:
alignment = 'BEAR'
score = -0.05
else:
alignment = 'NEUTRAL'
score = 0.0
return {
'alignment': alignment,
'score': score,
'details': {
'ma5': round(ma5, 1),
'ma20': round(ma20, 1),
'ma60': round(ma60, 1),
'price_vs_ma5': 'above' if current > ma5 else 'below'
}
}
@staticmethod
def calculate_dynamic_sl_tp(prices, high_prices=None, low_prices=None, atr_multiplier_sl=2.0, atr_multiplier_tp=3.0):
"""ATR 기반 동적 손절/익절 계산
변동성에 맞는 적응형 손절/익절 라인 산출
- 변동성 큰 종목: 넓은 손절폭 (whipsaw 방지)
- 변동성 작은 종목: 좁은 손절폭 (빠른 리스크 관리)
Returns:
dict: {
'atr': ATR값,
'atr_pct': ATR% (가격 대비),
'stop_loss_pct': 손절 비율 (%),
'take_profit_pct': 익절 비율 (%),
'trailing_stop_pct': 트레일링 스탑 비율 (%)
}
"""
if len(prices) < 15:
return {
'atr': 0, 'atr_pct': 0,
'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
'trailing_stop_pct': 3.0
}
atr = TechnicalAnalyzer.calculate_atr(prices, high_prices, low_prices)
current_price = prices[-1]
if current_price <= 0:
return {
'atr': 0, 'atr_pct': 0,
'stop_loss_pct': -5.0, 'take_profit_pct': 8.0,
'trailing_stop_pct': 3.0
}
atr_pct = (atr / current_price) * 100
# 동적 손절: ATR x 2 (단, 최소 -3%, 최대 -10%)
sl_pct = max(-10.0, min(-3.0, -atr_pct * atr_multiplier_sl))
# 동적 익절: ATR x 3 (단, 최소 +5%, 최대 +25%)
tp_pct = max(5.0, min(25.0, atr_pct * atr_multiplier_tp))
# 트레일링 스탑: ATR x 1.5 (최고가 대비)
trailing_pct = max(2.0, min(8.0, atr_pct * 1.5))
return {
'atr': round(atr, 1),
'atr_pct': round(atr_pct, 2),
'stop_loss_pct': round(sl_pct, 2),
'take_profit_pct': round(tp_pct, 2),
'trailing_stop_pct': round(trailing_pct, 2)
}
@staticmethod
def calculate_ma(prices, period=20):
"""이동평균선(Moving Average) 계산"""
if len(prices) < period:
return prices[-1] if prices else 0
return pd.Series(prices).rolling(window=period).mean().iloc[-1]
@staticmethod
def calculate_macd(prices, fast=12, slow=26, signal=9):
"""MACD (Moving Average Convergence Divergence) 계산"""
if len(prices) < slow + signal:
return 0, 0, 0 # 데이터 부족
s = pd.Series(prices)
ema_fast = s.ewm(span=fast, adjust=False).mean()
ema_slow = s.ewm(span=slow, adjust=False).mean()
macd = ema_fast - ema_slow
signal_line = macd.ewm(span=signal, adjust=False).mean()
histogram = macd - signal_line
return macd.iloc[-1], signal_line.iloc[-1], histogram.iloc[-1]
@staticmethod
def calculate_bollinger_bands(prices, period=20, num_std=2):
"""Bollinger Bands 계산 (상단, 중단, 하단)"""
if len(prices) < period:
return 0, 0, 0
s = pd.Series(prices)
sma = s.rolling(window=period).mean()
std = s.rolling(window=period).std()
upper = sma + (std * num_std)
lower = sma - (std * num_std)
return upper.iloc[-1], sma.iloc[-1], lower.iloc[-1]
@staticmethod
def calculate_stochastic(prices, high_prices=None, low_prices=None, n=14, k=3, d=3):
"""Stochastic Oscillator (Fast/Slow)
고가/저가 데이터가 없으면 종가(prices)로 추정 계산
"""
if len(prices) < n:
return 50, 50
close = pd.Series(prices)
# 고가/저가 데이터가 별도로 없으면 종가로 대체 (정확도는 떨어짐)
high = pd.Series(high_prices) if high_prices else close
low = pd.Series(low_prices) if low_prices else close
# 최근 n일간 최고가/최저가
highest_high = high.rolling(window=n).max()
lowest_low = low.rolling(window=n).min()
# Fast %K
fast_k = ((close - lowest_low) / (highest_high - lowest_low + 1e-9)) * 100
# Slow %K (= Fast %D)
slow_k = fast_k.rolling(window=k).mean()
# Slow %D
slow_d = slow_k.rolling(window=d).mean()
return slow_k.iloc[-1], slow_d.iloc[-1]
@staticmethod
def get_technical_score(current_price, prices_history, volume_history=None):
"""
기술적 지표 통합 점수(0.0 ~ 1.0) 계산 (v2.0 고도화)
[v2.0 변경점]
- RSI: 25% (30% → 25%, ADX에 비중 이전)
- 이격도: 15% (20% → 15%)
- MACD: 15% (20% → 15%)
- Bollinger: 10% (15% → 10%)
- Stochastic: 10% (15% → 10%)
- ADX 추세강도: 15% (신규)
- MTF 다중시간프레임: 10% (신규)
- OBV/거래량 보너스: ±0.1 (보너스)
"""
if not prices_history or len(prices_history) < 30:
return 0.5, 50.0, 0.0, 1.0, {"ma20": 0, "ma114": 0, "trend": "Unknown", "position": "Unknown"}
scores = []
# 1. RSI (비중 25%)
rsi = TechnicalAnalyzer.calculate_rsi(prices_history)
if rsi <= 30: rsi_score = 1.0
elif rsi >= 70: rsi_score = 0.0
else: rsi_score = 1.0 - ((rsi - 30) / 40.0)
scores.append(rsi_score * 0.25)
# 2. 이격도 (비중 15%)
ma20 = TechnicalAnalyzer.calculate_ma(prices_history, 20)
disparity = (current_price - ma20) / (ma20 + 1e-9)
if disparity < -0.05: disp_score = 1.0
elif disparity > 0.05: disp_score = 0.0
else: disp_score = 0.5 - (disparity * 10)
scores.append(disp_score * 0.15)
# 3. MACD (비중 15%)
macd, signal, hist = TechnicalAnalyzer.calculate_macd(prices_history)
if hist > 0 and macd > 0: macd_score = 0.8
elif hist > 0 and macd <= 0: macd_score = 0.65 # 골든크로스 초기 = 매수 기회
elif hist < 0 and macd > 0: macd_score = 0.35 # 데드크로스 초기
else: macd_score = 0.2
scores.append(macd_score * 0.15)
# 4. Bollinger Bands (비중 10%)
up, mid, low = TechnicalAnalyzer.calculate_bollinger_bands(prices_history)
if current_price <= low:
bb_score = 1.0
elif current_price >= up:
bb_score = 0.0
else:
pos = (current_price - low) / (up - low + 1e-9)
bb_score = 1.0 - pos
if current_price < low:
bb_score = min(1.0, bb_score + 0.2)
scores.append(bb_score * 0.10)
# 5. Stochastic (비중 10%)
slow_k, slow_d = TechnicalAnalyzer.calculate_stochastic(prices_history)
if slow_k < 20: st_score = 1.0
elif slow_k > 80: st_score = 0.0
else: st_score = 1.0 - (slow_k / 100.0)
# 골든/데드크로스 보정
if slow_k < 20 and slow_k > slow_d: # 과매도 영역에서 골든크로스
st_score = min(1.0, st_score + 0.15)
elif slow_k > 80 and slow_k < slow_d: # 과매수 영역에서 데드크로스
st_score = max(0.0, st_score - 0.15)
scores.append(st_score * 0.10)
# 6. [신규] ADX 추세 강도 (비중 15%)
adx, plus_di, minus_di = TechnicalAnalyzer.calculate_adx(prices_history)
if adx >= 25: # 강한 추세
if plus_di > minus_di:
adx_score = 0.8 + min(0.2, (adx - 25) / 50) # 강한 상승추세
else:
adx_score = 0.2 - min(0.2, (adx - 25) / 50) # 강한 하락추세
else: # 비추세/횡보
adx_score = 0.5 # 중립
adx_score = max(0.0, min(1.0, adx_score))
scores.append(adx_score * 0.15)
# 7. [신규] 다중 시간프레임 (비중 10%)
mtf = TechnicalAnalyzer.get_multi_timeframe_trend(prices_history)
# MTF score를 0~1 범위로 변환
mtf_score = 0.5 + mtf['score'] # -0.15~+0.15 → 0.35~0.65
mtf_score = max(0.0, min(1.0, mtf_score))
scores.append(mtf_score * 0.10)
total_score = sum(scores)
# [보너스] 거래량 분석 (Whale Tracking + OBV)
volume_ratio = 1.0
if volume_history and len(volume_history) >= 5:
vol_s = pd.Series(volume_history)
avg_vol = vol_s.rolling(window=5).mean().iloc[-2]
current_vol = volume_history[-1]
if avg_vol > 0:
volume_ratio = current_vol / avg_vol
# 거래량 폭증 보너스
if volume_ratio >= 3.0:
total_score += 0.08
# OBV 분석 보너스
obv_result = TechnicalAnalyzer.calculate_obv(prices_history, volume_history)
total_score += obv_result['score']
# MTF 추세 일관성 보너스 (위의 가중치 10% 외에 추가 보너스)
if mtf['alignment'] == 'STRONG_BULL':
total_score += 0.05
elif mtf['alignment'] == 'STRONG_BEAR':
total_score -= 0.05
# 0.0 ~ 1.0 클리핑
total_score = max(0.0, min(1.0, total_score))
# 변동성(Volatility) 계산
if len(prices_history) > 1:
prices_np = np.array(prices_history)
changes = np.diff(prices_np) / prices_np[:-1]
volatility = np.std(changes) * 100
else:
volatility = 0.0
# 이동평균선 분석 (5일, 20일, 60일, 114일)
ma5 = TechnicalAnalyzer.calculate_ma(prices_history, 5)
ma60 = TechnicalAnalyzer.calculate_ma(prices_history, 60)
ma114 = TechnicalAnalyzer.calculate_ma(prices_history, 114)
ma_trend = "Unknown"
if ma5 > ma20 > ma60:
ma_trend = "Bullish (Golden Alignment)"
elif ma5 < ma20 < ma60:
ma_trend = "Bearish (Dead Alignment)"
elif ma20 > ma114:
ma_trend = "Moderate Bullish"
else:
ma_trend = "Moderate Bearish"
price_pos = "Unknown"
if current_price > ma20:
price_pos = "Above MA20"
else:
price_pos = "Below MA20"
ma_info = {
"ma5": round(ma5, 1),
"ma20": round(ma20, 1),
"ma60": round(ma60, 1),
"ma114": round(ma114, 1),
"trend": ma_trend,
"position": price_pos,
"adx": round(adx, 1),
"adx_trend": "Strong" if adx >= 25 else "Weak/Sideways",
"mtf_alignment": mtf['alignment']
}
return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1), ma_info