v3.1 과매수 방지, 앙상블 학습, KRX 캘린더 기반 장중 전용 운영 구현

[잔고 관리]
- _today_buy_total 인스턴스 변수로 당일 누적 매수 추적 (KIS T+2 미차감 보완)
- MAX_BUY_PER_CYCLE, MAX_DAILY_BUY_RATIO 설정 추가
- available_deposit = max_daily_buy - effective_today_buy 계산

[앙상블 & 포지션 사이징]
- AdaptiveEnsemble 실제 연동 (하드코딩 가중치 제거)
- Kelly Criterion Half-Kelly 포지션 비중 계산
- SignalWeights.normalize() Water-Filling 알고리즘으로 경계 위반 해결
- _accuracy_weighted() 크기 가중 정확도로 통일
- ensemble_weights.json → ensemble_history.json 통합

[LLM 클라이언트]
- GeminiLLMClient 추가 (Gemini → Ollama 폴백 체인)
- _class_last_call_ts 클래스 변수로 워커 재시작 후에도 스로틀 유지
- Ollama 미실행 조기 감지 및 명확한 오류 메시지

[KIS API]
- 모든 requests.get/post에 timeout=Config.HTTP_TIMEOUT 적용
- get_balance()에 today_buy_amt 필드 추가

[장중 전용 운영]
- KRXCalendar: exchange_calendars 기반, 2024~2026 공휴일 하드코딩 폴백
- EOD 셧다운: 15:35에 전체 상태 저장 후 서버 자동 종료
- Watchdog: .eod_date 마커로 EOD 후 재시작 차단
- daily_launcher.py: 매일 08:30 실행, 휴장일 감지 후 봇 미시작
- Windows 작업 스케줄러 WebAI_DailyLauncher 등록

[텔레그램 스킬 수정]
- PYTHONIOENCODING=utf-8 서브프로세스 환경 설정 (cp949 이모지 오류 해결)
- /regime: IPC macro_indices 파싱 구현, --json 모드 input() 블로킹 제거
- /weights: ensemble_history.json 형식 파싱 업데이트
- /model_health: glob 패턴 *_v3.pt 수정
- /postmortem: 거래 없을 때 빈 JSON 출력으로 Telegram 오류 해결
- /macro: price=0 시 prev_close 폴백 표시

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-29 05:21:23 +09:00
parent 760d1906ed
commit 0aebca7ff0
17 changed files with 3816 additions and 200 deletions

View File

@@ -0,0 +1,445 @@
"""
AI 전문가 회의 시스템 (Multi-Agent Council)
- 4명의 전문가 에이전트가 독립 분석 후 의장 AI가 최종 결정
- 코스피 레짐 기반 모델 교체 권고
- process.py 분석 결과를 입력받아 심층 검토 수행
흐름:
전문가 1~4 (각 역할별 Ollama 호출)
의장 AI (전문가 의견 취합 + 최종 결정 + 모델 건전성 평가)
CouncilDecision (결정 + 모델 교체 권고 + 회의록)
"""
import json
import time
from dataclasses import dataclass, field
from typing import Optional, List, Any
from modules.analysis.market_regime import MarketRegimeDetector, MarketRegime, RegimeAnalysis
@dataclass
class ExpertOpinion:
"""개별 전문가 의견"""
expert_name: str
role: str
decision: str # BUY / SELL / HOLD
confidence: float # 0~1
reasoning: str
key_concern: str
model_feedback: str # 현재 AI 모델 적합성 평가
@dataclass
class CouncilDecision:
"""회의 최종 결정"""
final_decision: str # BUY / SELL / HOLD
consensus_score: float # 0~1 (1 = 만장일치)
confidence: float # 0~1
majority_reasoning: str # 주요 결정 근거
dissenting_views: str # 소수 의견
model_health_score: float # 0~1 (현재 모델 신뢰도)
model_replacement_recommended: bool # 모델 교체 필요 여부
recommended_model: str # 교체 권고 모델명
council_summary: str # 회의 전체 요약
expert_opinions: List[dict] = field(default_factory=list)
# 전문가 페르소나 정의
_EXPERTS = [
{
"name": "기술분석가",
"role": "technical",
"persona": (
"20년 경력의 코스피 전문 기술분석가. "
"RSI, MACD, 볼린저밴드, 추세선, 거래량 분석을 주로 사용. "
"단기 가격 모멘텀과 지지/저항 구간을 중시함."
),
"focus": (
"RSI 과매수/과매도, 볼린저밴드 위치, ADX 추세 강도, "
"거래량 급증, 멀티타임프레임 정렬 여부를 핵심 근거로 사용하세요."
),
},
{
"name": "퀀트전문가",
"role": "quant",
"persona": (
"AI/ML 기반 퀀트 투자 전문가. "
"LSTM 예측 신뢰도, 통계적 유의성, 백테스트 성과를 중시. "
"모델의 현재 시장 환경 적합성을 항상 평가함."
),
"focus": (
"LSTM 신뢰도와 예측 방향을 중심으로 분석하세요. "
"현재 코스피 레짐에서 LSTM v3 모델이 적합한지 반드시 평가하고, "
"더 나은 대안 모델이 있으면 구체적으로 제안하세요."
),
},
{
"name": "리스크관리자",
"role": "risk",
"persona": (
"글로벌 헤지펀드 리스크 관리 전문가. "
"포지션 사이징, 최대 낙폭(MDD), VaR, 손절 기준을 최우선으로 고려. "
"수익보다 손실 방어를 먼저 생각함."
),
"focus": (
"변동성 대비 포지션 크기 적절성, 손절 기준 타당성, "
"현재 보유 중이라면 추가 하락 리스크를 집중 평가하세요."
),
},
{
"name": "거시경제분석가",
"role": "macro",
"persona": (
"글로벌 매크로 및 한국 증시 전문가. "
"코스피 지수 수준, 원/달러 환율, 미국 금리, 외국인 수급을 중시. "
"현재 시장이 역사적으로 어떤 위치인지 판단함."
),
"focus": (
"코스피 지수 현재 수준이 역사적으로 어떤 의미인지, "
"이 가격대에서 매수/보유가 타당한지 거시경제 관점에서 평가하세요."
),
},
]
def _build_expert_prompt(expert: dict, ticker: str, data: dict) -> str:
"""전문가 역할에 맞는 분석 프롬프트 생성"""
kospi = data.get("kospi_price", 2500)
regime_label = MarketRegimeDetector.get_regime_label(kospi)
base = (
f"종목: {ticker} | 현재가: {data.get('current_price', 0):,.0f}\n"
f"코스피: {kospi:.0f} [{regime_label}]\n"
f"시장상태: {data.get('macro_state', 'SAFE')}\n"
f"---기술지표---\n"
f"기술점수: {data.get('tech_score', 0.5):.3f} | "
f"RSI: {data.get('rsi', 50):.1f} | ADX: {data.get('adx', 20):.1f}\n"
f"변동성: {data.get('volatility', 2.0):.2f}% | BB위치: {data.get('bb_zone', '중간')}\n"
f"MTF정렬: {data.get('mtf_alignment', 'N/A')}\n"
f"---AI모델---\n"
f"LSTM예측: {data.get('lstm_predicted', 0):,.0f}"
f"(변화율: {data.get('lstm_change_rate', 0):+.2f}%)\n"
f"LSTM신뢰도: {data.get('ai_confidence', 0.5):.2f} | "
f"LSTM점수: {data.get('lstm_score', 0.5):.3f}\n"
f"---수급/감성---\n"
f"감성점수: {data.get('sentiment_score', 0.5):.3f} | "
f"수급점수: {data.get('investor_score', 0):.3f}\n"
f"외인순매수: {data.get('frgn_net_buy', 0):+,} "
f"({data.get('consecutive_frgn_buy', 0)}일 연속)\n"
f"---포지션---\n"
f"보유중: {data.get('is_holding', False)} | "
f"보유수익률: {data.get('holding_yield', 0):+.2f}%\n"
f"통합점수: {data.get('total_score', 0.5):.3f}\n"
)
role_addition = (
f"\n당신은 {expert['persona']}\n"
f"분석 초점: {expert['focus']}\n"
)
output_format = (
"\n반드시 아래 JSON 형식으로만 응답하세요 (다른 텍스트 금지):\n"
"{\n"
' "decision": "BUY" 또는 "SELL" 또는 "HOLD",\n'
' "confidence": 0.0~1.0,\n'
' "reasoning": "주요 판단 근거 (1~2문장, 한국어)",\n'
' "key_concern": "가장 우려되는 리스크 (1문장, 한국어)",\n'
' "model_feedback": "현재 LSTM v3 모델이 이 시장 환경에서 적합한지 평가 (1문장)"\n'
"}"
)
return base + role_addition + output_format
def _build_chairman_prompt(
ticker: str,
opinions: List[ExpertOpinion],
data: dict,
regime: RegimeAnalysis,
) -> str:
"""의장 AI 최종 결정 프롬프트"""
opinions_text = "\n".join([
f"[{op.expert_name}] {op.decision} (확신도: {op.confidence:.2f})\n"
f" 근거: {op.reasoning}\n"
f" 우려: {op.key_concern}\n"
f" 모델평가: {op.model_feedback}"
for op in opinions
])
votes = [op.decision for op in opinions]
buy_n = votes.count("BUY")
sell_n = votes.count("SELL")
hold_n = votes.count("HOLD")
avg_conf = sum(op.confidence for op in opinions) / max(len(opinions), 1)
return (
"당신은 AI 투자 전문가 회의를 주재하는 의장입니다.\n\n"
f"=== 종목: {ticker} ===\n"
f"현재가: {data.get('current_price', 0):,.0f}원 | "
f"코스피: {data.get('kospi_price', 2500):.0f}\n"
f"시장 레짐: {regime.regime.value} ({regime.description})\n"
f"레짐 권고: {regime.model_recommendation}\n\n"
f"=== 전문가 의견 ===\n{opinions_text}\n\n"
f"=== 투표: 매수 {buy_n} / 매도 {sell_n} / 보유 {hold_n} "
f"(평균 확신도: {avg_conf:.2f}) ===\n\n"
"당신의 임무:\n"
"1. 4명 의견을 종합하여 최종 매매 결정\n"
f"2. LSTM v3 모델이 코스피 {data.get('kospi_price', 2500):.0f} 레짐에서 적합한지 평가\n"
"3. 필요 시 대안 모델 구체적으로 권고\n\n"
"반드시 아래 JSON 형식으로만 응답하세요:\n"
"{\n"
' "final_decision": "BUY" 또는 "SELL" 또는 "HOLD",\n'
' "consensus_score": 0.0~1.0,\n'
' "confidence": 0.0~1.0,\n'
' "majority_reasoning": "최종 결정 근거 2~3문장 (한국어)",\n'
' "dissenting_views": "소수 의견 요약 (없으면 빈 문자열)",\n'
' "model_health_score": 0.0~1.0,\n'
' "model_replacement_recommended": true 또는 false,\n'
' "recommended_model": "교체 권고 모델명 (없으면 \'현재 모델 유지\')",\n'
' "council_summary": "회의 전체 요약 3~4문장 (한국어)"\n'
"}"
)
def _parse_json_response(raw: Optional[str]) -> Optional[dict]:
"""LLM 응답에서 JSON 추출 (폴백 포함)"""
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
import re
match = re.search(r'\{[\s\S]*\}', raw)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
return None
def _vote_fallback(opinions: List[ExpertOpinion]) -> CouncilDecision:
"""의장 AI 실패 시 단순 다수결 폴백"""
from collections import Counter
if not opinions:
return CouncilDecision(
final_decision="HOLD", consensus_score=0.5, confidence=0.5,
majority_reasoning="분석 데이터 부족", dissenting_views="",
model_health_score=0.5, model_replacement_recommended=False,
recommended_model="현재 모델 유지",
council_summary="전문가 의견 수집 실패로 HOLD 처리",
)
votes = [op.decision for op in opinions]
final = Counter(votes).most_common(1)[0][0]
avg_conf = sum(op.confidence for op in opinions) / len(opinions)
vote_counts = Counter(votes)
consensus = vote_counts[final] / len(votes)
return CouncilDecision(
final_decision=final,
consensus_score=round(consensus, 3),
confidence=round(avg_conf, 3),
majority_reasoning=f"전문가 {vote_counts[final]}/{len(votes)} 다수결 결과",
dissenting_views="",
model_health_score=0.5,
model_replacement_recommended=False,
recommended_model="현재 모델 유지",
council_summary="의장 AI 오류 - 전문가 투표로 대체",
expert_opinions=[
{"name": op.expert_name, "decision": op.decision,
"confidence": op.confidence, "reasoning": op.reasoning}
for op in opinions
],
)
class AICouncil:
"""
AI 전문가 회의 시스템
사용 방법:
council = AICouncil(llm_client)
decision = council.convene(ticker, analysis_data, regime_analysis)
fast_mode=True 시 전문가 생략, 의장 AI 단독 판단 (속도 약 4배 향상)
llm_client: GeminiLLMClient 또는 OllamaManager (request_inference 인터페이스 공용)
"""
def __init__(self, llm_client: Any = None):
self._ollama = llm_client # 내부 변수명 유지 (하위호환)
def _get_ollama(self) -> Any:
if self._ollama is None:
from modules.services.llm_client import get_llm_client
self._ollama = get_llm_client()
return self._ollama
def _ask_expert(self, expert: dict, ticker: str, data: dict) -> ExpertOpinion:
"""단일 전문가 의견 수집"""
prompt = _build_expert_prompt(expert, ticker, data)
raw = self._get_ollama().request_inference(prompt)
parsed = _parse_json_response(raw)
if parsed:
return ExpertOpinion(
expert_name=expert["name"],
role=expert["role"],
decision=str(parsed.get("decision", "HOLD")).upper(),
confidence=float(parsed.get("confidence", 0.5)),
reasoning=str(parsed.get("reasoning", "")),
key_concern=str(parsed.get("key_concern", "")),
model_feedback=str(parsed.get("model_feedback", "")),
)
# 파싱 실패 → 중립
print(f"[Council] {expert['name']} 응답 파싱 실패 → HOLD 처리")
return ExpertOpinion(
expert_name=expert["name"],
role=expert["role"],
decision="HOLD",
confidence=0.5,
reasoning="응답 파싱 실패",
key_concern="",
model_feedback="",
)
def convene(
self,
ticker: str,
analysis_data: dict,
regime_analysis: Optional[RegimeAnalysis] = None,
fast_mode: bool = True,
) -> CouncilDecision:
"""
전문가 회의 소집 및 최종 결정
Args:
ticker: 종목 코드
analysis_data: process.py 분석 결과 딕셔너리
regime_analysis: MarketRegimeDetector.detect() 결과
fast_mode: True=의장 AI 단독(빠름), False=전문가 4명+의장(심층)
Returns:
CouncilDecision
"""
# 레짐 기본값
if regime_analysis is None:
kospi = analysis_data.get("kospi_price", 2500)
regime_analysis = MarketRegimeDetector.detect(kospi)
expert_opinions: List[ExpertOpinion] = []
if not fast_mode:
print(f"[Council] {ticker} - 전문가 회의 시작 (4명)")
for expert in _EXPERTS:
print(f"[Council] {expert['name']} 분석 중...")
opinion = self._ask_expert(expert, ticker, analysis_data)
expert_opinions.append(opinion)
time.sleep(0.3) # Ollama 연속 요청 간격
else:
print(f"[Council] {ticker} - Fast mode (의장 단독)")
# 의장 AI 취합
chairman_prompt = _build_chairman_prompt(
ticker, expert_opinions, analysis_data, regime_analysis
)
raw_chairman = self._get_ollama().request_inference(chairman_prompt)
parsed_chairman = _parse_json_response(raw_chairman)
if parsed_chairman:
decision = CouncilDecision(
final_decision=str(parsed_chairman.get("final_decision", "HOLD")).upper(),
consensus_score=float(parsed_chairman.get("consensus_score", 0.5)),
confidence=float(parsed_chairman.get("confidence", 0.5)),
majority_reasoning=str(parsed_chairman.get("majority_reasoning", "")),
dissenting_views=str(parsed_chairman.get("dissenting_views", "")),
model_health_score=float(parsed_chairman.get("model_health_score", 0.7)),
model_replacement_recommended=bool(
parsed_chairman.get("model_replacement_recommended", False)
),
recommended_model=str(
parsed_chairman.get("recommended_model", "현재 모델 유지")
),
council_summary=str(parsed_chairman.get("council_summary", "")),
expert_opinions=[
{
"name": op.expert_name,
"decision": op.decision,
"confidence": op.confidence,
"reasoning": op.reasoning,
}
for op in expert_opinions
],
)
status_icon = "⚠️" if decision.model_replacement_recommended else ""
print(
f"[Council] {ticker}{decision.final_decision} "
f"(합의율: {decision.consensus_score:.0%}, "
f"모델건전성: {decision.model_health_score:.0%}) "
f"{status_icon}"
)
if decision.model_replacement_recommended:
print(f"[Council] 모델 교체 권고: {decision.recommended_model}")
return decision
# 의장 실패 → 투표 폴백
print(f"[Council] {ticker} - 의장 AI 실패, 투표 폴백 사용")
return _vote_fallback(expert_opinions)
def quick_validate(
self,
ticker: str,
kospi_price: float,
ai_confidence: float,
backtest_sharpe: Optional[float] = None,
) -> dict:
"""
LLM 호출 없이 규칙 기반 빠른 모델 검증
Returns:
{
"regime": str,
"model_ok": bool,
"score": float,
"recommendation": str,
"should_replace": bool,
}
"""
regime_analysis = MarketRegimeDetector.detect(kospi_price)
validation = MarketRegimeDetector.validate_model_for_regime(
regime_analysis.regime,
backtest_sharpe=backtest_sharpe,
)
# AI 신뢰도 하락 시 추가 감점
score = validation["confidence_score"]
if ai_confidence < 0.4:
score *= 0.8
return {
"regime": regime_analysis.regime.value,
"regime_description": regime_analysis.description,
"model_ok": score >= 0.5 and not validation["should_replace"],
"score": round(score, 3),
"recommendation": validation["recommendation"],
"should_replace": validation["should_replace"],
"alternative_models": validation["alternative_models"],
}
# 전역 싱글톤
_council_instance: Optional[AICouncil] = None
def get_council(llm_client: Any = None) -> AICouncil:
"""워커 프로세스 내 AICouncil 싱글톤 반환 (GeminiLLMClient 또는 OllamaManager 수용)"""
global _council_instance
if _council_instance is None:
_council_instance = AICouncil(llm_client)
return _council_instance

View File

@@ -4,6 +4,7 @@ import pickle
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from collections import OrderedDict
from sklearn.preprocessing import MinMaxScaler
@@ -164,15 +165,21 @@ def _build_feature_matrix(ohlcv_data):
volume = np.array(ohlcv_data.get('volume', []), dtype=np.float64)
n = len(close)
if len(open_) != n: open_ = close.copy()
if len(high) != n: high = close.copy()
if len(low) != n: low = close.copy()
_degraded = []
if len(open_) != n: open_ = close.copy(); _degraded.append('open')
if len(high) != n: high = close.copy(); _degraded.append('high')
if len(low) != n: low = close.copy(); _degraded.append('low')
if _degraded:
print(f"[LSTM] ⚠️ OHLCV 피처 불완전 ({', '.join(_degraded)} → close 대체). 예측 신뢰도 저하 가능")
# 거래량 정규화 (최대값 기준, 0이면 0)
# 거래량 정규화 (20일 이동평균 대비 비율, max 기준보다 정보량이 높음)
if len(volume) == n and volume.max() > 0:
volume_norm = volume / (volume.max() + 1e-9)
vol_series = pd.Series(volume)
vol_ma20 = vol_series.rolling(20, min_periods=1).mean().values
volume_norm = volume / (vol_ma20 + 1e-9)
volume_norm = np.clip(volume_norm, 0.0, 5.0) / 5.0 # 0~5배 → 0~1 정규화
else:
volume_norm = np.zeros(n)
volume_norm = np.full(n, 0.2) # 데이터 없으면 중립값
rsi = _compute_rsi(close, period=14)
rsi_norm = rsi / 100.0 # 0~1 정규화
@@ -375,8 +382,10 @@ class PricePredictor:
change_rate = ((predicted_price - current_price) / current_price) * 100
cached_loss = self.training_status.get("loss", 0.5)
# 캐시 신뢰도: 마지막 학습 loss 기반 동적 계산 (고정값 제거)
cached_conf = min(0.70, 1.0 / (1.0 + (cached_loss * 200)))
print(f"[AI] {ticker or '?'}: 쿨다운 중 → 캐시 예측 사용 "
f"({predicted_price:.0f} / {change_rate:+.2f}%)")
f"({predicted_price:.0f} / {change_rate:+.2f}% / conf={cached_conf:.2f})")
return {
"current": current_price,
"predicted": float(predicted_price),
@@ -384,7 +393,7 @@ class PricePredictor:
"trend": trend,
"loss": cached_loss,
"val_loss": cached_loss,
"confidence": 0.62,
"confidence": round(cached_conf, 2),
"epochs": 0,
"device": str(self.device),
"lr": self.optimizer.param_groups[0]['lr'],
@@ -578,24 +587,28 @@ class PricePredictor:
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
# 신뢰도 계산
loss_confidence = 1.0 / (1.0 + (best_val_loss * 50))
# ── 신뢰도 계산 (보수적 버전) ──────────────────────────────
# val_loss 기반: 0.001→0.74, 0.003→0.62, 0.01→0.50 (이전보다 보수적)
loss_confidence = 1.0 / (1.0 + (best_val_loss * 200))
# 오버피팅 페널티
overfit_ratio = final_loss / (best_val_loss + 1e-9)
if overfit_ratio < 0.5:
overfit_penalty = 0.7
elif overfit_ratio > 2.0:
overfit_penalty = 0.8
overfit_penalty = 0.65 # 심각한 언더피팅
elif overfit_ratio > 2.5:
overfit_penalty = 0.75 # 오버피팅
else:
overfit_penalty = 1.0
# 에포크 수 기반 수렴 판단
epoch_factor = 1.0
if actual_epochs < 10:
epoch_factor = 0.6
epoch_factor = 0.55 # 너무 이른 수렴 → 불신뢰
elif actual_epochs >= max_epochs:
epoch_factor = 0.8
epoch_factor = 0.80 # 미수렴 → 부분 신뢰
confidence = min(0.95, loss_confidence * overfit_penalty * epoch_factor)
# 최종 상한: 0.80 (이전 0.95보다 보수적 — LSTM 70% 가중치 남발 방지)
confidence = min(0.80, loss_confidence * overfit_penalty * epoch_factor)
return {
"current": current_price,

View File

@@ -1,14 +1,17 @@
"""
앙상블 예측 모듈 (Phase 3-2)
앙상블 예측 모듈 (Phase 3-3)
- LSTM + 기술지표 + LLM 감성 → 적응형 가중치
- 과거 매매 결과 기반 가중치 자동 조정
- Kelly Criterion 기반 포지션 비중 계산
- process.py의 하드코딩된 w_tech/w_news/w_ai 대체
- 파일 mtime 기반 cross-process 동기화 (워커 ↔ 메인 프로세스)
"""
import os
import json
import time
import numpy as np
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Dict, Optional
from modules.config import Config
@@ -21,12 +24,61 @@ class SignalWeights:
sentiment: float = 0.30
lstm: float = 0.35
# 각 신호의 허용 범위
MIN_WEIGHT = 0.10
MAX_WEIGHT = 0.65
def normalize(self):
total = self.tech + self.sentiment + self.lstm
if total > 0:
self.tech /= total
self.sentiment /= total
self.lstm /= total
"""
경계 보존 정규화 (합=1, MIN≤각값≤MAX 동시 보장)
단순 1/2차 정규화는 경계 위반을 반복 유발하므로
반복 배분 알고리즘(Water-Filling) 사용:
1. 단순 정규화 (비율 유지)
2. 경계 위반 값 → 경계에 고정, 나머지에 잔여 비중 비례 배분
3. 모든 값이 경계 내에 들 때까지 반복 (최대 10회)
"""
MIN, MAX = self.MIN_WEIGHT, self.MAX_WEIGHT
vals = [max(MIN * 0.1, self.tech),
max(MIN * 0.1, self.sentiment),
max(MIN * 0.1, self.lstm)]
for _ in range(10):
total = sum(vals)
if total > 0:
vals = [v / total for v in vals]
fixed = [None, None, None]
has_violation = False
for i, v in enumerate(vals):
if v < MIN:
fixed[i] = MIN
has_violation = True
elif v > MAX:
fixed[i] = MAX
has_violation = True
if not has_violation:
break
fixed_sum = sum(f for f in fixed if f is not None)
remaining = 1.0 - fixed_sum
free = [(i, vals[i]) for i, f in enumerate(fixed) if f is None]
free_sum = sum(v for _, v in free)
new_vals = list(fixed)
if free and free_sum > 0:
factor = remaining / free_sum
for i, v in free:
new_vals[i] = v * factor
elif free:
per = remaining / len(free)
for i, _ in free:
new_vals[i] = per
vals = [v if v is not None else 0.0 for v in new_vals]
self.tech, self.sentiment, self.lstm = vals
return self
def to_dict(self):
@@ -45,9 +97,11 @@ class AdaptiveEnsemble:
핵심 로직:
1. 종목별 최근 N 매매의 결과를 추적
2. 어떤 신호가 정확했는지 소급 평가
2. 어떤 신호가 정확했는지 소급 평가 (크기 가중 정확도)
3. 정확도가 높은 신호의 가중치를 점진적으로 증가
4. 시장 상황(ADX, 거시경제) 반영한 컨텍스트별 가중치 분리
5. Kelly Criterion 기반 최적 포지션 비중 제공
6. 파일 mtime 기반 cross-process 동기화 (워커 프로세스 갱신)
"""
def __init__(self, history_file=None, max_history=50):
@@ -55,17 +109,23 @@ class AdaptiveEnsemble:
self.history_file = history_file or os.path.join(
Config.DATA_DIR, "ensemble_history.json"
)
# {ticker: [{"tech": f, "sentiment": f, "lstm": f, "decision": str, "outcome": float}, ...]}
# {ticker: [{"tech_score": f, "sentiment_score": f, "lstm_score": f,
# "decision": str, "outcome": float}, ...]}
self._trade_history: Dict[str, list] = {}
# {context: SignalWeights} - context: "strong_trend" | "sideways" | "danger"
# {context: SignalWeights} - context: "strong_trend" | "sideways" | "danger" | "default"
self._context_weights: Dict[str, SignalWeights] = {
"strong_trend": SignalWeights(tech=0.50, sentiment=0.20, lstm=0.30),
"sideways": SignalWeights(tech=0.30, sentiment=0.40, lstm=0.30),
"danger": SignalWeights(tech=0.20, sentiment=0.50, lstm=0.30),
"default": SignalWeights(tech=0.35, sentiment=0.30, lstm=0.35),
"sideways": SignalWeights(tech=0.30, sentiment=0.40, lstm=0.30),
"danger": SignalWeights(tech=0.20, sentiment=0.50, lstm=0.30),
"default": SignalWeights(tech=0.35, sentiment=0.30, lstm=0.35),
}
self._load_mtime: float = 0.0 # 마지막 파일 로드 시각
self._load()
# ──────────────────────────────────────────────
# 파일 I/O
# ──────────────────────────────────────────────
def _load(self):
if os.path.exists(self.history_file):
try:
@@ -75,6 +135,7 @@ class AdaptiveEnsemble:
weights_raw = data.get("weights", {})
for ctx, w in weights_raw.items():
self._context_weights[ctx] = SignalWeights.from_dict(w)
self._load_mtime = os.path.getmtime(self.history_file)
except Exception as e:
print(f"[Ensemble] Load failed: {e}")
@@ -86,9 +147,29 @@ class AdaptiveEnsemble:
}
with open(self.history_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self._load_mtime = os.path.getmtime(self.history_file)
except Exception as e:
print(f"[Ensemble] Save failed: {e}")
def reload_if_stale(self):
"""
파일이 마지막 로드 이후 수정되었으면 재로드.
워커 프로세스가 메인 프로세스의 record_trade 결과를 반영하기 위해 사용.
"""
if not os.path.exists(self.history_file):
return
try:
mtime = os.path.getmtime(self.history_file)
if mtime > self._load_mtime:
self._load()
print("[Ensemble] 파일 변경 감지, 가중치 재로드")
except Exception:
pass
# ──────────────────────────────────────────────
# 컨텍스트 & 가중치
# ──────────────────────────────────────────────
def get_context(self, adx: float, macro_state: str) -> str:
"""현재 시장 컨텍스트 결정"""
if macro_state == "DANGER":
@@ -105,85 +186,45 @@ class AdaptiveEnsemble:
"""
종목 + 시장 컨텍스트에 맞는 가중치 반환
1. 기본: 컨텍스트별 기준 가중치
1. 컨텍스트별 기준 가중치 선택
2. AI 신뢰도 높으면 lstm 가중치 보정
3. 종목별 학습 결과 반영
3. 종목별 학습 결과 반영 (크기 가중 정확도 사용)
"""
context = self.get_context(adx, macro_state)
base = self._context_weights.get(context, self._context_weights["default"])
# 적응형 조정: 해당 종목의 과거 성과 반영
ticker_history = self._trade_history.get(ticker, [])
adjusted = SignalWeights(tech=base.tech, sentiment=base.sentiment, lstm=base.lstm)
if len(ticker_history) >= 5:
# 최근 5회 신호별 정확도 평가
recent = ticker_history[-10:]
tech_acc = self._accuracy([h["tech_score"] for h in recent],
[h["outcome"] for h in recent])
news_acc = self._accuracy([h["sentiment_score"] for h in recent],
[h["outcome"] for h in recent])
lstm_acc = self._accuracy([h["lstm_score"] for h in recent],
[h["outcome"] for h in recent])
# _accuracy_weighted: 방향 일치 + 수익 크기 가중 반영 (단순 binary X)
tech_acc = self._accuracy_weighted(
[h.get("tech_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
news_acc = self._accuracy_weighted(
[h.get("sentiment_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
lstm_acc = self._accuracy_weighted(
[h.get("lstm_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
# 정확도 기반 가중치 미세 조정 (±0.1 범위)
alpha = 0.05
adjusted.tech = max(0.1, min(0.6, base.tech + alpha * (tech_acc - 0.5)))
adjusted.sentiment = max(0.1, min(0.6, base.sentiment + alpha * (news_acc - 0.5)))
adjusted.lstm = max(0.1, min(0.6, base.lstm + alpha * (lstm_acc - 0.5)))
alpha = 0.05 # 미세 조정 (±0.1 범위)
adjusted.tech = max(0.10, min(0.60, base.tech + alpha * (tech_acc - 0.5)))
adjusted.sentiment = max(0.10, min(0.60, base.sentiment + alpha * (news_acc - 0.5)))
adjusted.lstm = max(0.10, min(0.60, base.lstm + alpha * (lstm_acc - 0.5)))
# AI 신뢰도 보정
if ai_confidence >= 0.85:
adjusted.lstm = min(0.70, adjusted.lstm * 1.3)
# AI 신뢰도 보정 (LSTM confidence 상한 0.80 기준 조정)
if ai_confidence >= 0.75:
adjusted.lstm = min(0.65, adjusted.lstm * 1.25)
elif ai_confidence < 0.5:
adjusted.lstm = max(0.10, adjusted.lstm * 0.7)
adjusted.lstm = max(0.10, adjusted.lstm * 0.75)
return adjusted.normalize()
def record_trade(self, ticker: str, tech_score: float, sentiment_score: float,
lstm_score: float, decision: str, outcome_pct: float):
"""
매매 결과 기록 (가중치 학습 데이터)
outcome_pct: 실현 수익률 (%). 양수=이익, 음수=손실
"""
if ticker not in self._trade_history:
self._trade_history[ticker] = []
record = {
"tech_score": tech_score,
"sentiment_score": sentiment_score,
"lstm_score": lstm_score,
"decision": decision,
"outcome": outcome_pct
}
self._trade_history[ticker].append(record)
# 히스토리 크기 제한
if len(self._trade_history[ticker]) > self.max_history:
self._trade_history[ticker] = self._trade_history[ticker][-self.max_history:]
# 가중치 점진적 업데이트
self._update_weights(ticker)
self._save()
def _update_weights(self, ticker: str):
"""종목별 성과를 반영해 컨텍스트 가중치 점진적 업데이트"""
history = self._trade_history.get(ticker, [])
if len(history) < 5:
return
recent = history[-10:]
outcomes = [h["outcome"] for h in recent]
mean_outcome = np.mean(outcomes)
if mean_outcome > 0:
# 전략이 효과적 → 현재 가중치 유지 (강화)
pass
elif mean_outcome < -2.0:
# 손실이 큰 경우 → 기본값으로 리셋
for ctx in self._context_weights:
self._context_weights[ctx] = SignalWeights(
tech=0.35, sentiment=0.30, lstm=0.35)
# ──────────────────────────────────────────────
# 앙상블 점수
# ──────────────────────────────────────────────
def compute_ensemble_score(self, tech_score: float, sentiment_score: float,
lstm_score: float, investor_score: float = 0.0,
@@ -205,25 +246,170 @@ class AdaptiveEnsemble:
total += min(investor_score, 0.15)
return min(1.0, max(0.0, total))
# ──────────────────────────────────────────────
# Kelly Criterion
# ──────────────────────────────────────────────
def get_kelly_fraction(self, ticker: str = None, half_kelly: bool = True) -> float:
"""
Modified Kelly Criterion 기반 최적 투자 비중 계산
f* = (p * b - q) / b
where:
p = 과거 승리 거래 비율 (win rate)
q = 1 - p
b = 평균이익 / 평균손실 비율 (avg profit / avg loss, Risk-Reward)
Returns:
0.03 ~ 0.25 범위의 Kelly 분수
- half_kelly=True: 변동성 과대추정 보완을 위해 1/2 적용
- 거래 데이터 < 10건: 보수적 기본값 0.08 반환
"""
# 해당 종목 우선, 없으면 전체 통합 히스토리 사용
if ticker and ticker in self._trade_history:
outcomes = [h["outcome"] for h in self._trade_history[ticker]
if h.get("outcome") is not None]
else:
# 전체 종목 결과 통합 (시장 전반 win rate)
outcomes = [
h["outcome"]
for records in self._trade_history.values()
for h in records
if h.get("outcome") is not None
]
if len(outcomes) < 10:
return 0.08 # 데이터 부족 → 보수적 8%
wins = [o for o in outcomes if o > 0]
losses = [abs(o) for o in outcomes if o <= 0]
if not wins:
return 0.03 # 승리 거래 없음 → 최소 비중
if not losses:
return 0.20 # 손실 거래 없음 → 낙관적이나 상한 제한
p = len(wins) / len(outcomes)
q = 1.0 - p
avg_win = sum(wins) / len(wins)
avg_loss = sum(losses) / len(losses)
if avg_loss == 0:
return 0.20
b = avg_win / avg_loss # Risk-Reward ratio
kelly = (p * b - q) / b
if half_kelly:
kelly /= 2.0 # Half-Kelly: 실제 활용 시 표준
result = max(0.03, min(0.25, kelly)) # 3% ~ 25% 범위 제한
return result
# ──────────────────────────────────────────────
# 거래 결과 기록 & 가중치 학습
# ──────────────────────────────────────────────
def record_trade(self, ticker: str, tech_score: float, sentiment_score: float,
lstm_score: float, decision: str, outcome_pct: float):
"""
매매 결과 기록 → 가중치 학습 데이터 축적
Args:
outcome_pct: 실현 수익률 (%). 양수=이익, 음수=손실
"""
if ticker not in self._trade_history:
self._trade_history[ticker] = []
record = {
"tech_score": tech_score,
"sentiment_score": sentiment_score,
"lstm_score": lstm_score,
"decision": decision,
"outcome": outcome_pct
}
self._trade_history[ticker].append(record)
if len(self._trade_history[ticker]) > self.max_history:
self._trade_history[ticker] = self._trade_history[ticker][-self.max_history:]
self._update_weights(ticker)
self._save()
def _update_weights(self, ticker: str):
"""
종목별 성과를 반영해 컨텍스트 가중치 점진적 업데이트.
- 크기 가중 정확도(accuracy_weighted) 사용 → 큰 손실에 강한 패널티
- 지수이동평균(alpha=0.10)으로 점진 반영 → 급격한 가중치 전환 방지
- normalize() 후 재경계 적용 → 경계값 위반 방지
"""
history = self._trade_history.get(ticker, [])
if len(history) < 5:
return
recent = history[-10:]
outcomes = [h["outcome"] for h in recent]
tech_acc = self._accuracy_weighted(
[h.get("tech_score", 0.5) for h in recent], outcomes)
news_acc = self._accuracy_weighted(
[h.get("sentiment_score", 0.5) for h in recent], outcomes)
lstm_acc = self._accuracy_weighted(
[h.get("lstm_score", 0.5) for h in recent], outcomes)
alpha = 0.10 # EMA 계수 (10회 거래 후 완전 반영)
for ctx, w in self._context_weights.items():
delta_tech = alpha * (tech_acc - 0.5) * 0.4 # 최대 ±0.02
delta_news = alpha * (news_acc - 0.5) * 0.4
delta_lstm = alpha * (lstm_acc - 0.5) * 0.4
# 경계 적용 → normalize (경계 재반영) → normalize (합=1 보장)
w.tech = max(0.10, min(0.65, w.tech + delta_tech))
w.sentiment = max(0.10, min(0.65, w.sentiment + delta_news))
w.lstm = max(0.10, min(0.65, w.lstm + delta_lstm))
w.normalize() # normalize() 내부에서 경계 재클램핑 + 2차 정규화 수행
print(f"[Ensemble] {ctx} tech={w.tech:.2f} news={w.sentiment:.2f} lstm={w.lstm:.2f} "
f"(acc T={tech_acc:.2f} N={news_acc:.2f} L={lstm_acc:.2f})")
# ──────────────────────────────────────────────
# 정확도 지표
# ──────────────────────────────────────────────
@staticmethod
def _accuracy(scores: list, outcomes: list) -> float:
"""신호와 결과의 상관도 계산 (0.5 = 무관, 1.0 = 완전 일치)"""
def _accuracy_weighted(scores: list, outcomes: list) -> float:
"""
신호-결과 크기 가중 정확도 (0.0~1.0, 0.5=무관)
- 단순 방향 일치(0/1)가 아닌 수익률 절댓값으로 가중
- 큰 손실 예측 실패는 작은 이익 예측 성공보다 강하게 패널티
"""
if len(scores) < 3:
return 0.5
# 신호가 높을 때 수익, 낮을 때 손실이면 정확
correct = sum(
1 for s, o in zip(scores, outcomes)
if (s >= 0.5 and o > 0) or (s < 0.5 and o <= 0)
)
return correct / len(scores)
total_weight = 0.0
weighted_correct = 0.0
for s, o in zip(scores, outcomes):
weight = max(1.0, abs(o)) # 수익률 절댓값 기반 가중치 (최소 1.0)
total_weight += weight
if (s >= 0.5 and o > 0) or (s < 0.5 and o <= 0):
weighted_correct += weight
if total_weight == 0:
return 0.5
return weighted_correct / total_weight
# 전역 싱글톤
# ──────────────────────────────────────────────
# 전역 싱글톤 (프로세스별)
# ──────────────────────────────────────────────
_ensemble_instance: Optional[AdaptiveEnsemble] = None
def get_ensemble() -> AdaptiveEnsemble:
"""워커 프로세스 내 싱글톤 앙상블 관리자"""
"""프로세스 내 싱글톤 앙상블 관리자 반환 (워커/메인 각각 독립 인스턴스)"""
global _ensemble_instance
if _ensemble_instance is None:
_ensemble_instance = AdaptiveEnsemble()

View File

@@ -0,0 +1,279 @@
"""
시장 레짐 감지 모듈
- 코스피 지수 수준에 따른 시장 레짐 분류
- 코스피 6300 목표 수준에서의 모델 적합성 평가
- 레짐별 전략 파라미터 자동 조정
"""
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict
class MarketRegime(Enum):
BULL_EXTREME = "bull_extreme" # 코스피 5000+ (역사적 극고점, 6300 시나리오)
BULL_STRONG = "bull_strong" # 코스피 3500~5000 (강한 상승장)
BULL_NORMAL = "bull_normal" # 코스피 2500~3500 (정상 상승장)
SIDEWAYS = "sideways" # 코스피 2000~2500 (횡보)
BEAR_MILD = "bear_mild" # 코스피 1500~2000 (약세)
BEAR_SEVERE = "bear_severe" # 코스피 1500 미만 (심각한 약세)
@dataclass
class RegimeAnalysis:
"""레짐 분석 결과"""
regime: MarketRegime
kospi_level: float
description: str
recommended_strategy: str
buy_threshold_adj: float # 매수 임계값 조정치 (+: 더 엄격, -: 완화)
position_size_adj: float # 포지션 크기 조정 배수 (1.0 = 기본)
lstm_weight_adj: float # LSTM 앙상블 가중치 조정 (+0.1 = 10% 증가)
model_recommendation: str # 모델 유지/교체 권고
risk_level: str # LOW / MEDIUM / HIGH / EXTREME
class MarketRegimeDetector:
"""
코스피 지수 수준 기반 시장 레짐 감지기
코스피 6300 시나리오:
- 현재 한국 증시 역대 최고점(2021년 3300) 대비 약 2배 수준
- BULL_EXTREME 레짐에 해당 → LSTM 단독 의존 지양, Transformer/Mamba 검토 필요
- 추세 추종 강화 + 고점 리스크 관리 병행
"""
# 레짐별 상세 파라미터
_REGIME_PARAMS: Dict[MarketRegime, dict] = {
MarketRegime.BULL_EXTREME: {
"description": "코스피 극강세장 5000+ (6300 시나리오)",
"recommended_strategy": (
"추세 추종 극대화, 트레일링 스탑 확대(ATR×4), "
"고점 과열 구간으로 포지션 축소 병행"
),
"buy_threshold_adj": -0.04, # 강세 모멘텀 → 진입 소폭 완화
"position_size_adj": 0.75, # 고점 리스크로 포지션 축소
"lstm_weight_adj": -0.12, # LSTM 비중 축소 (비선형 가격 동작)
"model_recommendation": (
"Temporal Fusion Transformer(TFT) 또는 Mamba(SSM) 교체 권고 - "
"LSTM은 극강세 과열 구간에서 비선형 가격 동작 포착 한계"
),
"risk_level": "EXTREME",
},
MarketRegime.BULL_STRONG: {
"description": "코스피 강상승장 3500~5000",
"recommended_strategy": "추세 추종, 모멘텀 강화, 손절 완화(ATR×2.5)",
"buy_threshold_adj": -0.03,
"position_size_adj": 1.1,
"lstm_weight_adj": 0.05,
"model_recommendation": "현재 LSTM v3 적합 - 성능 모니터링 유지",
"risk_level": "MEDIUM",
},
MarketRegime.BULL_NORMAL: {
"description": "코스피 정상 상승장 2500~3500",
"recommended_strategy": "기본 전략 유지 (기술+LSTM+LLM 균형)",
"buy_threshold_adj": 0.0,
"position_size_adj": 1.0,
"lstm_weight_adj": 0.0,
"model_recommendation": "현재 LSTM v3 최적 환경",
"risk_level": "LOW",
},
MarketRegime.SIDEWAYS: {
"description": "코스피 횡보장 2000~2500",
"recommended_strategy": "박스권 매매, LLM 감성 비중 확대, 빠른 익절",
"buy_threshold_adj": 0.03,
"position_size_adj": 0.85,
"lstm_weight_adj": -0.05,
"model_recommendation": "현재 LSTM v3 적합 - 감성 분석 가중치 강화",
"risk_level": "LOW",
},
MarketRegime.BEAR_MILD: {
"description": "코스피 약세장 1500~2000",
"recommended_strategy": "현금 비중 확대(50%+), 방어주 선별 매수",
"buy_threshold_adj": 0.08,
"position_size_adj": 0.5,
"lstm_weight_adj": 0.0,
"model_recommendation": "현재 LSTM v3 적합 - 리스크 관리 파라미터 강화",
"risk_level": "HIGH",
},
MarketRegime.BEAR_SEVERE: {
"description": "코스피 극약세장 1500 미만",
"recommended_strategy": "전면 현금화, 매수 중단",
"buy_threshold_adj": 0.20,
"position_size_adj": 0.2,
"lstm_weight_adj": 0.0,
"model_recommendation": "매크로 팩터 기반 방어 모델 전환 필요",
"risk_level": "EXTREME",
},
}
@classmethod
def detect(
cls,
kospi_price: float,
kospi_change_pct: float = 0.0,
volatility_20d: float = 0.0,
) -> RegimeAnalysis:
"""
코스피 지수 수준 + 변동성으로 시장 레짐 감지
Args:
kospi_price: 현재 코스피 지수 (예: 2600, 6300)
kospi_change_pct: 전일 대비 등락률 (%)
volatility_20d: 20일 변동성 (선택, 0이면 무시)
Returns:
RegimeAnalysis: 레짐 분석 결과 및 전략 파라미터
"""
# 1. 지수 수준으로 기본 레짐 결정
if kospi_price >= 5000:
regime = MarketRegime.BULL_EXTREME
elif kospi_price >= 3500:
regime = MarketRegime.BULL_STRONG
elif kospi_price >= 2500:
regime = MarketRegime.BULL_NORMAL
elif kospi_price >= 2000:
regime = MarketRegime.SIDEWAYS
elif kospi_price >= 1500:
regime = MarketRegime.BEAR_MILD
else:
regime = MarketRegime.BEAR_SEVERE
params = cls._REGIME_PARAMS[regime]
# 2. 변동성 기반 포지션 사이징 추가 조정
position_adj = params["position_size_adj"]
if volatility_20d > 30:
position_adj *= 0.6 # 극단적 변동성 → 추가 50% 축소
elif volatility_20d > 20:
position_adj *= 0.8 # 높은 변동성 → 20% 축소
# 3. 급락 중 레짐 하향 조정 (패닉 감지)
if kospi_change_pct <= -3.0:
# 극단적 일일 급락 → 포지션 추가 축소
position_adj *= 0.5
print(f"[Regime] PANIC DETECTED (일일 {kospi_change_pct:.1f}%) → 포지션 50% 추가 축소")
return RegimeAnalysis(
regime=regime,
kospi_level=kospi_price,
description=params["description"],
recommended_strategy=params["recommended_strategy"],
buy_threshold_adj=params["buy_threshold_adj"],
position_size_adj=round(position_adj, 3),
lstm_weight_adj=params["lstm_weight_adj"],
model_recommendation=params["model_recommendation"],
risk_level=params["risk_level"],
)
@classmethod
def validate_model_for_regime(
cls,
regime: MarketRegime,
backtest_sharpe: Optional[float] = None,
backtest_winrate: Optional[float] = None,
backtest_mdd: Optional[float] = None,
) -> dict:
"""
현재 LSTM v3 모델이 해당 레짐에서 적합한지 검증
Returns:
{
"is_suitable": bool,
"confidence_score": float (0~1),
"recommendation": str,
"should_replace": bool,
"alternative_models": list[str],
"reason": str,
}
"""
result = {
"is_suitable": True,
"confidence_score": 0.75,
"recommendation": "현재 LSTM v3 모델 유지",
"should_replace": False,
"alternative_models": [],
"reason": "정상 상승장 구간 - LSTM v3 최적 환경",
}
# 레짐 기반 기본 평가
if regime == MarketRegime.BULL_EXTREME:
result.update({
"is_suitable": False,
"confidence_score": 0.38,
"recommendation": "Transformer 계열 모델 교체 강력 권고",
"should_replace": True,
"alternative_models": [
"Temporal Fusion Transformer (TFT) - 장기 시계열 최강",
"Mamba (SSM) - 초고속 추론 + 긴 컨텍스트",
"PatchTST - Transformer 기반 주가 예측 특화",
"TimesNet - 2D 시계열 변환 + CNN",
"N-BEATS / N-HiTS - 해석 가능 딥러닝",
],
"reason": (
"코스피 5000+ 극강세장에서 LSTM은 비선형적 가격 급등 패턴을 "
"충분히 학습하지 못함. Attention 메커니즘만으로는 장기 상승 추세의 "
"복잡한 의존성 포착에 한계 존재."
),
})
elif regime == MarketRegime.BEAR_SEVERE:
result.update({
"is_suitable": False,
"confidence_score": 0.30,
"recommendation": "매크로 팩터 + Regime-Switching 모델 교체 권고",
"should_replace": True,
"alternative_models": [
"Regime-Switching LSTM (HMM + LSTM)",
"매크로 멀티팩터 모델 (환율, 금리, VIX 통합)",
"GRU + Attention (LSTM 경량 대안)",
],
"reason": "극약세장에서는 기술적 지표보다 거시경제 팩터가 지배적",
})
elif regime == MarketRegime.BULL_STRONG:
result.update({
"confidence_score": 0.72,
"reason": "강상승장 - LSTM 추세 학습 양호하나 성능 모니터링 필요",
})
elif regime == MarketRegime.SIDEWAYS:
result.update({
"confidence_score": 0.68,
"reason": "횡보장 - LSTM 예측력 저하, LLM 감성 보완 필수",
"recommendation": "현재 LSTM v3 유지 + LLM 감성 가중치 상향",
})
# 백테스트 결과 반영
if backtest_sharpe is not None:
if backtest_sharpe < 0:
result["confidence_score"] *= 0.5
result["should_replace"] = True
result["recommendation"] += " ⚠️ Sharpe < 0 → 즉시 교체 검토"
elif backtest_sharpe < 0.5:
result["confidence_score"] *= 0.75
result["recommendation"] += f" (Sharpe={backtest_sharpe:.2f} 미흡)"
if backtest_winrate is not None and backtest_winrate < 45:
result["confidence_score"] *= 0.8
result["recommendation"] += f" (승률={backtest_winrate:.1f}% 미흡)"
if backtest_mdd is not None and backtest_mdd < -25:
result["confidence_score"] *= 0.7
result["should_replace"] = True
result["recommendation"] += f" ⚠️ MDD={backtest_mdd:.1f}% 과다"
result["confidence_score"] = round(max(0.0, min(1.0, result["confidence_score"])), 3)
return result
@staticmethod
def get_regime_label(kospi_price: float) -> str:
"""간략 레짐 라벨 반환 (로그/UI 표시용)"""
if kospi_price >= 5000:
return f"BULL_EXTREME({kospi_price:.0f})"
elif kospi_price >= 3500:
return f"BULL_STRONG({kospi_price:.0f})"
elif kospi_price >= 2500:
return f"BULL_NORMAL({kospi_price:.0f})"
elif kospi_price >= 2000:
return f"SIDEWAYS({kospi_price:.0f})"
elif kospi_price >= 1500:
return f"BEAR_MILD({kospi_price:.0f})"
return f"BEAR_SEVERE({kospi_price:.0f})"

View File

@@ -0,0 +1,348 @@
"""
모델 검증 시스템 (Market-Regime Aware Model Validator)
- 백테스트 기반 현재 LSTM v3 성능 검증
- 코스피 레짐별 모델 적합성 평가
- 코스피 6300 강세장 시나리오 대응 점검
- 모델 교체 권고 보고서 생성
사용법:
validator = ModelValidator()
report = validator.validate(ticker, ohlcv_data, strategy_fn, kospi_price=2600)
print(report.summary())
validator.send_alert(report) # 텔레그램 알림 (심각한 경우만)
"""
import os
import json
import time
from dataclasses import dataclass, field
from typing import Optional, List
from modules.config import Config
from modules.analysis.backtest import Backtester, BacktestResult
from modules.analysis.market_regime import MarketRegimeDetector, MarketRegime, RegimeAnalysis
# 모델 적합성 최소 기준
_MIN_SHARPE = 0.5
_MIN_WIN_RATE = 50.0 # %
_MAX_MDD = -20.0 # % (초과 시 문제)
_MIN_PROFIT_FACTOR = 1.2
_CACHE_TTL_SECONDS = 86400 # 24시간
@dataclass
class ValidationReport:
"""모델 검증 보고서"""
ticker: str
kospi_level: float
regime: str
regime_description: str
backtest_result: Optional[BacktestResult]
model_suitable: bool
suitability_score: float # 0~1
issues: List[str] = field(default_factory=list)
recommendations: List[str] = field(default_factory=list)
alternative_models: List[str] = field(default_factory=list)
regime_strategy_hint: str = ""
risk_level: str = "LOW"
def summary(self) -> str:
lines = [
"=" * 55,
f"🔍 모델 검증 보고서 [{self.ticker}]",
"=" * 55,
f"코스피 수준 : {self.kospi_level:.0f} ({self.regime_description})",
f"시장 레짐 : {self.regime} [리스크: {self.risk_level}]",
f"모델 적합성 : {'✅ 적합' if self.model_suitable else '⚠️ 부적합'} "
f"({self.suitability_score:.0%})",
]
if self.backtest_result:
bt = self.backtest_result
lines += [
"",
"📊 백테스트 성과",
f" 총 수익률 : {bt.total_return_pct:+.2f}%",
f" Sharpe Ratio : {bt.sharpe_ratio:.3f}",
f" Max Drawdown : {bt.max_drawdown_pct:.2f}%",
f" 승률 : {bt.win_rate:.1f}% ({bt.winning_trades}/{bt.total_trades})",
f" 손익비(PF) : {bt.profit_factor:.2f}",
]
if self.issues:
lines.append("")
lines.append(f"⚠️ 발견된 문제 ({len(self.issues)}건)")
for issue in self.issues:
lines.append(f" - {issue}")
if self.recommendations:
lines.append("")
lines.append("💡 권고사항")
for rec in self.recommendations:
lines.append(f"{rec}")
if self.alternative_models:
lines.append("")
lines.append("🔄 대안 모델 목록")
for model in self.alternative_models:
lines.append(f"{model}")
if self.regime_strategy_hint:
lines.append("")
lines.append(f"📌 레짐 전략: {self.regime_strategy_hint}")
lines.append("=" * 55)
return "\n".join(lines)
def is_critical(self) -> bool:
"""즉각적인 조치가 필요한 수준인지 (텔레그램 알림 기준)"""
if not self.model_suitable and self.suitability_score < 0.4:
return True
if self.backtest_result and self.backtest_result.sharpe_ratio < 0:
return True
if self.backtest_result and self.backtest_result.max_drawdown_pct < -30:
return True
return False
class ModelValidator:
"""
LSTM v3 모델 검증기
검증 흐름:
1. 시장 레짐 감지 (코스피 수준)
2. 백테스트 실행 (선택)
3. 레짐별 모델 적합성 평가
4. 종합 보고서 생성
5. 심각한 경우 텔레그램 알림
"""
_CACHE_FILE = "model_validation_cache.json"
def __init__(self):
self._cache_path = os.path.join(Config.DATA_DIR, self._CACHE_FILE)
self._cache: dict = self._load_cache()
def _load_cache(self) -> dict:
if os.path.exists(self._cache_path):
try:
with open(self._cache_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def _save_cache(self):
try:
with open(self._cache_path, "w", encoding="utf-8") as f:
json.dump(self._cache, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[Validator] 캐시 저장 실패: {e}")
def validate(
self,
ticker: str,
ohlcv_data: dict,
strategy_fn=None,
kospi_price: float = 2500.0,
kospi_change_pct: float = 0.0,
run_backtest: bool = True,
) -> ValidationReport:
"""
모델 검증 실행
Args:
ticker: 종목 코드
ohlcv_data: OHLCV 딕셔너리
strategy_fn: 백테스트용 전략 함수 (None이면 백테스트 생략)
kospi_price: 현재 코스피 지수
kospi_change_pct: 코스피 당일 등락률
run_backtest: 백테스트 실행 여부
Returns:
ValidationReport
"""
issues: List[str] = []
recommendations: List[str] = []
# ── 1. 시장 레짐 감지 ────────────────────────────────
regime_analysis: RegimeAnalysis = MarketRegimeDetector.detect(
kospi_price, kospi_change_pct
)
# ── 2. 백테스트 (선택) ───────────────────────────────
backtest_result: Optional[BacktestResult] = None
if run_backtest and strategy_fn is not None:
try:
backtester = Backtester()
backtest_result = backtester.run(ohlcv_data, strategy_fn, ticker)
except Exception as e:
issues.append(f"백테스트 실행 오류: {e}")
# ── 3. 백테스트 결과 기준 위반 체크 ─────────────────
bt_sharpe = backtest_result.sharpe_ratio if backtest_result else None
bt_winrate = backtest_result.win_rate if backtest_result else None
bt_mdd = backtest_result.max_drawdown_pct if backtest_result else None
bt_pf = backtest_result.profit_factor if backtest_result else None
if backtest_result:
if bt_sharpe < _MIN_SHARPE:
issues.append(
f"Sharpe Ratio 미흡: {bt_sharpe:.3f} (최소 {_MIN_SHARPE})"
)
recommendations.append("LSTM 피처 확장 또는 모델 아키텍처 재검토")
if bt_winrate < _MIN_WIN_RATE:
issues.append(
f"승률 미흡: {bt_winrate:.1f}% (최소 {_MIN_WIN_RATE:.0f}%)"
)
recommendations.append("매수 진입 임계값 상향 조정 (+0.05)")
if bt_mdd < _MAX_MDD:
issues.append(
f"MDD 과다: {bt_mdd:.2f}% (허용 {_MAX_MDD:.0f}%)"
)
recommendations.append("ATR 손절 배수 축소 (ATR×2 → ATR×1.5)")
if bt_pf < _MIN_PROFIT_FACTOR:
issues.append(
f"손익비 미흡: {bt_pf:.2f} (최소 {_MIN_PROFIT_FACTOR})"
)
recommendations.append("익절 배수 확대 (ATR×3 → ATR×4)")
# ── 4. 레짐 기반 모델 적합성 평가 ───────────────────
regime_validation = MarketRegimeDetector.validate_model_for_regime(
regime_analysis.regime,
backtest_sharpe=bt_sharpe,
backtest_winrate=bt_winrate,
backtest_mdd=bt_mdd,
)
if not regime_validation["is_suitable"]:
issues.append(
f"레짐 부적합: {regime_analysis.regime.value} 환경에서 "
f"LSTM v3 한계 감지"
)
recommendations.append(regime_validation["recommendation"])
# 코스피 6300 특별 경고
if kospi_price >= 5000:
issues.append(
f"⚠️ 코스피 {kospi_price:.0f} - 역사적 극고점 수준 "
"LSTM 비선형 패턴 포착 한계 주의"
)
recommendations.append(
"Temporal Fusion Transformer(TFT) 또는 Mamba 모델 전환 검토"
)
# ── 5. 종합 적합성 점수 ──────────────────────────────
suitability_score = regime_validation["confidence_score"]
# 문제 건수에 따라 감점 (건당 10%, 최대 50% 감점)
penalty = min(len(issues) * 0.10, 0.50)
suitability_score = max(0.0, suitability_score - penalty)
suitability_score = round(suitability_score, 3)
# ── 6. 보고서 생성 ───────────────────────────────────
report = ValidationReport(
ticker=ticker,
kospi_level=kospi_price,
regime=regime_analysis.regime.value,
regime_description=regime_analysis.description,
backtest_result=backtest_result,
model_suitable=(suitability_score >= 0.5 and not regime_validation["should_replace"]),
suitability_score=suitability_score,
issues=issues,
recommendations=list(set(recommendations)), # 중복 제거
alternative_models=regime_validation.get("alternative_models", []),
regime_strategy_hint=regime_analysis.recommended_strategy,
risk_level=regime_analysis.risk_level,
)
# ── 7. 캐시 저장 ─────────────────────────────────────
self._cache[ticker] = {
"timestamp": time.time(),
"kospi_level": kospi_price,
"regime": regime_analysis.regime.value,
"suitability_score": suitability_score,
"should_replace": regime_validation["should_replace"],
"issue_count": len(issues),
}
self._save_cache()
return report
def get_cached(self, ticker: str) -> Optional[dict]:
"""캐시된 검증 결과 반환 (24시간 이내)"""
cached = self._cache.get(ticker)
if not cached:
return None
if time.time() - cached.get("timestamp", 0) > _CACHE_TTL_SECONDS:
return None
return cached
def send_alert(self, report: ValidationReport):
"""심각한 검증 결과 텔레그램 알림"""
if not report.is_critical():
return
try:
from modules.services.telegram import TelegramMessenger
msg = (
f"🚨 [모델 경고] {report.ticker}\n"
f"코스피 {report.kospi_level:.0f} | 레짐: {report.regime}\n"
f"적합성: {report.suitability_score:.0%}\n"
)
if report.issues:
msg += "문제:\n" + "\n".join(f"{i}" for i in report.issues[:3])
if report.alternative_models:
msg += f"\n권고 모델: {report.alternative_models[0]}"
TelegramMessenger().send_message(msg)
except Exception:
pass
def generate_regime_report(self, kospi_price: float) -> str:
"""코스피 수준만으로 빠른 레짐 보고서 생성 (백테스트 없음)"""
regime_analysis = MarketRegimeDetector.detect(kospi_price)
validation = MarketRegimeDetector.validate_model_for_regime(regime_analysis.regime)
lines = [
"=" * 55,
f"📈 코스피 {kospi_price:.0f} 레짐 분석",
"=" * 55,
f"레짐 : {regime_analysis.regime.value}",
f"설명 : {regime_analysis.description}",
f"리스크 수준 : {regime_analysis.risk_level}",
"",
"─ 전략 파라미터 조정 ─",
f"매수 임계값 : {'+' if regime_analysis.buy_threshold_adj >= 0 else ''}"
f"{regime_analysis.buy_threshold_adj:+.2f} 조정",
f"포지션 크기 : x{regime_analysis.position_size_adj:.2f}",
f"LSTM 가중치 : {'+' if regime_analysis.lstm_weight_adj >= 0 else ''}"
f"{regime_analysis.lstm_weight_adj:+.2f}",
"",
"─ 모델 평가 ─",
f"현재 모델 적합: {'' if validation['is_suitable'] else '⚠️'} "
f"(신뢰도 {validation['confidence_score']:.0%})",
f"교체 필요 : {'' if validation['should_replace'] else '아니오'}",
f"권고사항 : {validation['recommendation']}",
]
if validation["alternative_models"]:
lines.append("")
lines.append("대안 모델 목록:")
for model in validation["alternative_models"]:
lines.append(f"{model}")
lines.append("")
lines.append(f"📌 전략: {regime_analysis.recommended_strategy}")
lines.append("=" * 55)
return "\n".join(lines)
# 전역 싱글톤
_validator_instance: Optional[ModelValidator] = None
def get_validator() -> ModelValidator:
"""ModelValidator 싱글톤 반환"""
global _validator_instance
if _validator_instance is None:
_validator_instance = ModelValidator()
return _validator_instance