Files
ai-trade/signal_v1/modules/analysis/ai_council.py
gahusb 7ea1a21487 refactor: web-ai V1 assets → signal_v1/ (graduation prep)
Atomic mv of root V1 assets (main_server.py + modules/ + data/ +
tests/ + entry scripts + docs + logs) into signal_v1/ subdirectory.
load_dotenv() updated to load web-ai/.env explicitly via Path.

Adds web-ai/CLAUDE.md (workspace guide) and web-ai/start.bat
(signal_v1 entry wrapper). Prepares for signal_v2/ Phase 2.

Tests: signal_v1/tests/unit baseline preserved (no regression).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 03:00:11 +09:00

446 lines
17 KiB
Python

"""
AI 전문가 회의 시스템 (Multi-Agent Council)
- 4명의 전문가 에이전트가 독립 분석 후 의장 AI가 최종 결정
- 코스피 레짐 기반 모델 교체 권고
- process.py 분석 결과를 입력받아 심층 검토 수행
흐름:
전문가 1~4 (각 역할별 Ollama 호출)
의장 AI (전문가 의견 취합 + 최종 결정 + 모델 건전성 평가)
CouncilDecision (결정 + 모델 교체 권고 + 회의록)
"""
import json
import time
from dataclasses import dataclass, field
from typing import Optional, List, Any
from modules.analysis.market_regime import MarketRegimeDetector, MarketRegime, RegimeAnalysis
@dataclass
class ExpertOpinion:
"""개별 전문가 의견"""
expert_name: str
role: str
decision: str # BUY / SELL / HOLD
confidence: float # 0~1
reasoning: str
key_concern: str
model_feedback: str # 현재 AI 모델 적합성 평가
@dataclass
class CouncilDecision:
"""회의 최종 결정"""
final_decision: str # BUY / SELL / HOLD
consensus_score: float # 0~1 (1 = 만장일치)
confidence: float # 0~1
majority_reasoning: str # 주요 결정 근거
dissenting_views: str # 소수 의견
model_health_score: float # 0~1 (현재 모델 신뢰도)
model_replacement_recommended: bool # 모델 교체 필요 여부
recommended_model: str # 교체 권고 모델명
council_summary: str # 회의 전체 요약
expert_opinions: List[dict] = field(default_factory=list)
# 전문가 페르소나 정의
_EXPERTS = [
{
"name": "기술분석가",
"role": "technical",
"persona": (
"20년 경력의 코스피 전문 기술분석가. "
"RSI, MACD, 볼린저밴드, 추세선, 거래량 분석을 주로 사용. "
"단기 가격 모멘텀과 지지/저항 구간을 중시함."
),
"focus": (
"RSI 과매수/과매도, 볼린저밴드 위치, ADX 추세 강도, "
"거래량 급증, 멀티타임프레임 정렬 여부를 핵심 근거로 사용하세요."
),
},
{
"name": "퀀트전문가",
"role": "quant",
"persona": (
"AI/ML 기반 퀀트 투자 전문가. "
"LSTM 예측 신뢰도, 통계적 유의성, 백테스트 성과를 중시. "
"모델의 현재 시장 환경 적합성을 항상 평가함."
),
"focus": (
"LSTM 신뢰도와 예측 방향을 중심으로 분석하세요. "
"현재 코스피 레짐에서 LSTM v3 모델이 적합한지 반드시 평가하고, "
"더 나은 대안 모델이 있으면 구체적으로 제안하세요."
),
},
{
"name": "리스크관리자",
"role": "risk",
"persona": (
"글로벌 헤지펀드 리스크 관리 전문가. "
"포지션 사이징, 최대 낙폭(MDD), VaR, 손절 기준을 최우선으로 고려. "
"수익보다 손실 방어를 먼저 생각함."
),
"focus": (
"변동성 대비 포지션 크기 적절성, 손절 기준 타당성, "
"현재 보유 중이라면 추가 하락 리스크를 집중 평가하세요."
),
},
{
"name": "거시경제분석가",
"role": "macro",
"persona": (
"글로벌 매크로 및 한국 증시 전문가. "
"코스피 지수 수준, 원/달러 환율, 미국 금리, 외국인 수급을 중시. "
"현재 시장이 역사적으로 어떤 위치인지 판단함."
),
"focus": (
"코스피 지수 현재 수준이 역사적으로 어떤 의미인지, "
"이 가격대에서 매수/보유가 타당한지 거시경제 관점에서 평가하세요."
),
},
]
def _build_expert_prompt(expert: dict, ticker: str, data: dict) -> str:
"""전문가 역할에 맞는 분석 프롬프트 생성"""
kospi = data.get("kospi_price", 2500)
regime_label = MarketRegimeDetector.get_regime_label(kospi)
base = (
f"종목: {ticker} | 현재가: {data.get('current_price', 0):,.0f}\n"
f"코스피: {kospi:.0f} [{regime_label}]\n"
f"시장상태: {data.get('macro_state', 'SAFE')}\n"
f"---기술지표---\n"
f"기술점수: {data.get('tech_score', 0.5):.3f} | "
f"RSI: {data.get('rsi', 50):.1f} | ADX: {data.get('adx', 20):.1f}\n"
f"변동성: {data.get('volatility', 2.0):.2f}% | BB위치: {data.get('bb_zone', '중간')}\n"
f"MTF정렬: {data.get('mtf_alignment', 'N/A')}\n"
f"---AI모델---\n"
f"LSTM예측: {data.get('lstm_predicted', 0):,.0f}"
f"(변화율: {data.get('lstm_change_rate', 0):+.2f}%)\n"
f"LSTM신뢰도: {data.get('ai_confidence', 0.5):.2f} | "
f"LSTM점수: {data.get('lstm_score', 0.5):.3f}\n"
f"---수급/감성---\n"
f"감성점수: {data.get('sentiment_score', 0.5):.3f} | "
f"수급점수: {data.get('investor_score', 0):.3f}\n"
f"외인순매수: {data.get('frgn_net_buy', 0):+,} "
f"({data.get('consecutive_frgn_buy', 0)}일 연속)\n"
f"---포지션---\n"
f"보유중: {data.get('is_holding', False)} | "
f"보유수익률: {data.get('holding_yield', 0):+.2f}%\n"
f"통합점수: {data.get('total_score', 0.5):.3f}\n"
)
role_addition = (
f"\n당신은 {expert['persona']}\n"
f"분석 초점: {expert['focus']}\n"
)
output_format = (
"\n반드시 아래 JSON 형식으로만 응답하세요 (다른 텍스트 금지):\n"
"{\n"
' "decision": "BUY" 또는 "SELL" 또는 "HOLD",\n'
' "confidence": 0.0~1.0,\n'
' "reasoning": "주요 판단 근거 (1~2문장, 한국어)",\n'
' "key_concern": "가장 우려되는 리스크 (1문장, 한국어)",\n'
' "model_feedback": "현재 LSTM v3 모델이 이 시장 환경에서 적합한지 평가 (1문장)"\n'
"}"
)
return base + role_addition + output_format
def _build_chairman_prompt(
ticker: str,
opinions: List[ExpertOpinion],
data: dict,
regime: RegimeAnalysis,
) -> str:
"""의장 AI 최종 결정 프롬프트"""
opinions_text = "\n".join([
f"[{op.expert_name}] {op.decision} (확신도: {op.confidence:.2f})\n"
f" 근거: {op.reasoning}\n"
f" 우려: {op.key_concern}\n"
f" 모델평가: {op.model_feedback}"
for op in opinions
])
votes = [op.decision for op in opinions]
buy_n = votes.count("BUY")
sell_n = votes.count("SELL")
hold_n = votes.count("HOLD")
avg_conf = sum(op.confidence for op in opinions) / max(len(opinions), 1)
return (
"당신은 AI 투자 전문가 회의를 주재하는 의장입니다.\n\n"
f"=== 종목: {ticker} ===\n"
f"현재가: {data.get('current_price', 0):,.0f}원 | "
f"코스피: {data.get('kospi_price', 2500):.0f}\n"
f"시장 레짐: {regime.regime.value} ({regime.description})\n"
f"레짐 권고: {regime.model_recommendation}\n\n"
f"=== 전문가 의견 ===\n{opinions_text}\n\n"
f"=== 투표: 매수 {buy_n} / 매도 {sell_n} / 보유 {hold_n} "
f"(평균 확신도: {avg_conf:.2f}) ===\n\n"
"당신의 임무:\n"
"1. 4명 의견을 종합하여 최종 매매 결정\n"
f"2. LSTM v3 모델이 코스피 {data.get('kospi_price', 2500):.0f} 레짐에서 적합한지 평가\n"
"3. 필요 시 대안 모델 구체적으로 권고\n\n"
"반드시 아래 JSON 형식으로만 응답하세요:\n"
"{\n"
' "final_decision": "BUY" 또는 "SELL" 또는 "HOLD",\n'
' "consensus_score": 0.0~1.0,\n'
' "confidence": 0.0~1.0,\n'
' "majority_reasoning": "최종 결정 근거 2~3문장 (한국어)",\n'
' "dissenting_views": "소수 의견 요약 (없으면 빈 문자열)",\n'
' "model_health_score": 0.0~1.0,\n'
' "model_replacement_recommended": true 또는 false,\n'
' "recommended_model": "교체 권고 모델명 (없으면 \'현재 모델 유지\')",\n'
' "council_summary": "회의 전체 요약 3~4문장 (한국어)"\n'
"}"
)
def _parse_json_response(raw: Optional[str]) -> Optional[dict]:
"""LLM 응답에서 JSON 추출 (폴백 포함)"""
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
import re
match = re.search(r'\{[\s\S]*\}', raw)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
return None
def _vote_fallback(opinions: List[ExpertOpinion]) -> CouncilDecision:
"""의장 AI 실패 시 단순 다수결 폴백"""
from collections import Counter
if not opinions:
return CouncilDecision(
final_decision="HOLD", consensus_score=0.5, confidence=0.5,
majority_reasoning="분석 데이터 부족", dissenting_views="",
model_health_score=0.5, model_replacement_recommended=False,
recommended_model="현재 모델 유지",
council_summary="전문가 의견 수집 실패로 HOLD 처리",
)
votes = [op.decision for op in opinions]
final = Counter(votes).most_common(1)[0][0]
avg_conf = sum(op.confidence for op in opinions) / len(opinions)
vote_counts = Counter(votes)
consensus = vote_counts[final] / len(votes)
return CouncilDecision(
final_decision=final,
consensus_score=round(consensus, 3),
confidence=round(avg_conf, 3),
majority_reasoning=f"전문가 {vote_counts[final]}/{len(votes)} 다수결 결과",
dissenting_views="",
model_health_score=0.5,
model_replacement_recommended=False,
recommended_model="현재 모델 유지",
council_summary="의장 AI 오류 - 전문가 투표로 대체",
expert_opinions=[
{"name": op.expert_name, "decision": op.decision,
"confidence": op.confidence, "reasoning": op.reasoning}
for op in opinions
],
)
class AICouncil:
"""
AI 전문가 회의 시스템
사용 방법:
council = AICouncil(llm_client)
decision = council.convene(ticker, analysis_data, regime_analysis)
fast_mode=True 시 전문가 생략, 의장 AI 단독 판단 (속도 약 4배 향상)
llm_client: GeminiLLMClient 또는 OllamaManager (request_inference 인터페이스 공용)
"""
def __init__(self, llm_client: Any = None):
self._ollama = llm_client # 내부 변수명 유지 (하위호환)
def _get_ollama(self) -> Any:
if self._ollama is None:
from modules.services.llm_client import get_llm_client
self._ollama = get_llm_client()
return self._ollama
def _ask_expert(self, expert: dict, ticker: str, data: dict) -> ExpertOpinion:
"""단일 전문가 의견 수집"""
prompt = _build_expert_prompt(expert, ticker, data)
raw = self._get_ollama().request_inference(prompt)
parsed = _parse_json_response(raw)
if parsed:
return ExpertOpinion(
expert_name=expert["name"],
role=expert["role"],
decision=str(parsed.get("decision", "HOLD")).upper(),
confidence=float(parsed.get("confidence", 0.5)),
reasoning=str(parsed.get("reasoning", "")),
key_concern=str(parsed.get("key_concern", "")),
model_feedback=str(parsed.get("model_feedback", "")),
)
# 파싱 실패 → 중립
print(f"[Council] {expert['name']} 응답 파싱 실패 → HOLD 처리")
return ExpertOpinion(
expert_name=expert["name"],
role=expert["role"],
decision="HOLD",
confidence=0.5,
reasoning="응답 파싱 실패",
key_concern="",
model_feedback="",
)
def convene(
self,
ticker: str,
analysis_data: dict,
regime_analysis: Optional[RegimeAnalysis] = None,
fast_mode: bool = True,
) -> CouncilDecision:
"""
전문가 회의 소집 및 최종 결정
Args:
ticker: 종목 코드
analysis_data: process.py 분석 결과 딕셔너리
regime_analysis: MarketRegimeDetector.detect() 결과
fast_mode: True=의장 AI 단독(빠름), False=전문가 4명+의장(심층)
Returns:
CouncilDecision
"""
# 레짐 기본값
if regime_analysis is None:
kospi = analysis_data.get("kospi_price", 2500)
regime_analysis = MarketRegimeDetector.detect(kospi)
expert_opinions: List[ExpertOpinion] = []
if not fast_mode:
print(f"[Council] {ticker} - 전문가 회의 시작 (4명)")
for expert in _EXPERTS:
print(f"[Council] {expert['name']} 분석 중...")
opinion = self._ask_expert(expert, ticker, analysis_data)
expert_opinions.append(opinion)
time.sleep(0.3) # Ollama 연속 요청 간격
else:
print(f"[Council] {ticker} - Fast mode (의장 단독)")
# 의장 AI 취합
chairman_prompt = _build_chairman_prompt(
ticker, expert_opinions, analysis_data, regime_analysis
)
raw_chairman = self._get_ollama().request_inference(chairman_prompt)
parsed_chairman = _parse_json_response(raw_chairman)
if parsed_chairman:
decision = CouncilDecision(
final_decision=str(parsed_chairman.get("final_decision", "HOLD")).upper(),
consensus_score=float(parsed_chairman.get("consensus_score", 0.5)),
confidence=float(parsed_chairman.get("confidence", 0.5)),
majority_reasoning=str(parsed_chairman.get("majority_reasoning", "")),
dissenting_views=str(parsed_chairman.get("dissenting_views", "")),
model_health_score=float(parsed_chairman.get("model_health_score", 0.7)),
model_replacement_recommended=bool(
parsed_chairman.get("model_replacement_recommended", False)
),
recommended_model=str(
parsed_chairman.get("recommended_model", "현재 모델 유지")
),
council_summary=str(parsed_chairman.get("council_summary", "")),
expert_opinions=[
{
"name": op.expert_name,
"decision": op.decision,
"confidence": op.confidence,
"reasoning": op.reasoning,
}
for op in expert_opinions
],
)
status_icon = "⚠️" if decision.model_replacement_recommended else ""
print(
f"[Council] {ticker}{decision.final_decision} "
f"(합의율: {decision.consensus_score:.0%}, "
f"모델건전성: {decision.model_health_score:.0%}) "
f"{status_icon}"
)
if decision.model_replacement_recommended:
print(f"[Council] 모델 교체 권고: {decision.recommended_model}")
return decision
# 의장 실패 → 투표 폴백
print(f"[Council] {ticker} - 의장 AI 실패, 투표 폴백 사용")
return _vote_fallback(expert_opinions)
def quick_validate(
self,
ticker: str,
kospi_price: float,
ai_confidence: float,
backtest_sharpe: Optional[float] = None,
) -> dict:
"""
LLM 호출 없이 규칙 기반 빠른 모델 검증
Returns:
{
"regime": str,
"model_ok": bool,
"score": float,
"recommendation": str,
"should_replace": bool,
}
"""
regime_analysis = MarketRegimeDetector.detect(kospi_price)
validation = MarketRegimeDetector.validate_model_for_regime(
regime_analysis.regime,
backtest_sharpe=backtest_sharpe,
)
# AI 신뢰도 하락 시 추가 감점
score = validation["confidence_score"]
if ai_confidence < 0.4:
score *= 0.8
return {
"regime": regime_analysis.regime.value,
"regime_description": regime_analysis.description,
"model_ok": score >= 0.5 and not validation["should_replace"],
"score": round(score, 3),
"recommendation": validation["recommendation"],
"should_replace": validation["should_replace"],
"alternative_models": validation["alternative_models"],
}
# 전역 싱글톤
_council_instance: Optional[AICouncil] = None
def get_council(llm_client: Any = None) -> AICouncil:
"""워커 프로세스 내 AICouncil 싱글톤 반환 (GeminiLLMClient 또는 OllamaManager 수용)"""
global _council_instance
if _council_instance is None:
_council_instance = AICouncil(llm_client)
return _council_instance