Files
gahusb 7ea1a21487 refactor: web-ai V1 assets → signal_v1/ (graduation prep)
Atomic mv of root V1 assets (main_server.py + modules/ + data/ +
tests/ + entry scripts + docs + logs) into signal_v1/ subdirectory.
load_dotenv() updated to load web-ai/.env explicitly via Path.

Adds web-ai/CLAUDE.md (workspace guide) and web-ai/start.bat
(signal_v1 entry wrapper). Prepares for signal_v2/ Phase 2.

Tests: signal_v1/tests/unit baseline preserved (no regression).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 03:00:11 +09:00

417 lines
17 KiB
Python

"""
앙상블 예측 모듈 (Phase 3-3)
- LSTM + 기술지표 + LLM 감성 → 적응형 가중치
- 과거 매매 결과 기반 가중치 자동 조정
- Kelly Criterion 기반 포지션 비중 계산
- process.py의 하드코딩된 w_tech/w_news/w_ai 대체
- 파일 mtime 기반 cross-process 동기화 (워커 ↔ 메인 프로세스)
"""
import os
import json
import time
import numpy as np
from dataclasses import dataclass
from typing import Dict, Optional
from modules.config import Config
@dataclass
class SignalWeights:
"""앙상블 가중치"""
tech: float = 0.35
sentiment: float = 0.30
lstm: float = 0.35
# 각 신호의 허용 범위
MIN_WEIGHT = 0.10
MAX_WEIGHT = 0.65
def normalize(self):
"""
경계 보존 정규화 (합=1, MIN≤각값≤MAX 동시 보장)
단순 1/2차 정규화는 경계 위반을 반복 유발하므로
반복 배분 알고리즘(Water-Filling) 사용:
1. 단순 정규화 (비율 유지)
2. 경계 위반 값 → 경계에 고정, 나머지에 잔여 비중 비례 배분
3. 모든 값이 경계 내에 들 때까지 반복 (최대 10회)
"""
MIN, MAX = self.MIN_WEIGHT, self.MAX_WEIGHT
vals = [max(MIN * 0.1, self.tech),
max(MIN * 0.1, self.sentiment),
max(MIN * 0.1, self.lstm)]
for _ in range(10):
total = sum(vals)
if total > 0:
vals = [v / total for v in vals]
fixed = [None, None, None]
has_violation = False
for i, v in enumerate(vals):
if v < MIN:
fixed[i] = MIN
has_violation = True
elif v > MAX:
fixed[i] = MAX
has_violation = True
if not has_violation:
break
fixed_sum = sum(f for f in fixed if f is not None)
remaining = 1.0 - fixed_sum
free = [(i, vals[i]) for i, f in enumerate(fixed) if f is None]
free_sum = sum(v for _, v in free)
new_vals = list(fixed)
if free and free_sum > 0:
factor = remaining / free_sum
for i, v in free:
new_vals[i] = v * factor
elif free:
per = remaining / len(free)
for i, _ in free:
new_vals[i] = per
vals = [v if v is not None else 0.0 for v in new_vals]
self.tech, self.sentiment, self.lstm = vals
return self
def to_dict(self):
return {"tech": self.tech, "sentiment": self.sentiment, "lstm": self.lstm}
@classmethod
def from_dict(cls, d):
return cls(tech=d.get("tech", 0.35),
sentiment=d.get("sentiment", 0.30),
lstm=d.get("lstm", 0.35))
class AdaptiveEnsemble:
"""
적응형 앙상블 가중치 관리자
핵심 로직:
1. 종목별 최근 N 매매의 결과를 추적
2. 어떤 신호가 정확했는지 소급 평가 (크기 가중 정확도)
3. 정확도가 높은 신호의 가중치를 점진적으로 증가
4. 시장 상황(ADX, 거시경제) 반영한 컨텍스트별 가중치 분리
5. Kelly Criterion 기반 최적 포지션 비중 제공
6. 파일 mtime 기반 cross-process 동기화 (워커 프로세스 갱신)
"""
def __init__(self, history_file=None, max_history=50):
self.max_history = max_history
self.history_file = history_file or os.path.join(
Config.DATA_DIR, "ensemble_history.json"
)
# {ticker: [{"tech_score": f, "sentiment_score": f, "lstm_score": f,
# "decision": str, "outcome": float}, ...]}
self._trade_history: Dict[str, list] = {}
# {context: SignalWeights} - context: "strong_trend" | "sideways" | "danger" | "default"
self._context_weights: Dict[str, SignalWeights] = {
"strong_trend": SignalWeights(tech=0.50, sentiment=0.20, lstm=0.30),
"sideways": SignalWeights(tech=0.30, sentiment=0.40, lstm=0.30),
"danger": SignalWeights(tech=0.20, sentiment=0.50, lstm=0.30),
"default": SignalWeights(tech=0.35, sentiment=0.30, lstm=0.35),
}
self._load_mtime: float = 0.0 # 마지막 파일 로드 시각
self._load()
# ──────────────────────────────────────────────
# 파일 I/O
# ──────────────────────────────────────────────
def _load(self):
if os.path.exists(self.history_file):
try:
with open(self.history_file, "r", encoding="utf-8") as f:
data = json.load(f)
self._trade_history = data.get("history", {})
weights_raw = data.get("weights", {})
for ctx, w in weights_raw.items():
self._context_weights[ctx] = SignalWeights.from_dict(w)
self._load_mtime = os.path.getmtime(self.history_file)
except Exception as e:
print(f"[Ensemble] Load failed: {e}")
def _save(self):
try:
data = {
"history": {k: v[-self.max_history:] for k, v in self._trade_history.items()},
"weights": {ctx: w.to_dict() for ctx, w in self._context_weights.items()}
}
with open(self.history_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self._load_mtime = os.path.getmtime(self.history_file)
except Exception as e:
print(f"[Ensemble] Save failed: {e}")
def reload_if_stale(self):
"""
파일이 마지막 로드 이후 수정되었으면 재로드.
워커 프로세스가 메인 프로세스의 record_trade 결과를 반영하기 위해 사용.
"""
if not os.path.exists(self.history_file):
return
try:
mtime = os.path.getmtime(self.history_file)
if mtime > self._load_mtime:
self._load()
print("[Ensemble] 파일 변경 감지, 가중치 재로드")
except Exception:
pass
# ──────────────────────────────────────────────
# 컨텍스트 & 가중치
# ──────────────────────────────────────────────
def get_context(self, adx: float, macro_state: str) -> str:
"""현재 시장 컨텍스트 결정"""
if macro_state == "DANGER":
return "danger"
if adx >= 25:
return "strong_trend"
if adx < 20:
return "sideways"
return "default"
def get_weights(self, ticker: str, adx: float = 20.0,
macro_state: str = "SAFE",
ai_confidence: float = 0.5) -> SignalWeights:
"""
종목 + 시장 컨텍스트에 맞는 가중치 반환
1. 컨텍스트별 기준 가중치 선택
2. AI 신뢰도 높으면 lstm 가중치 보정
3. 종목별 학습 결과 반영 (크기 가중 정확도 사용)
"""
context = self.get_context(adx, macro_state)
base = self._context_weights.get(context, self._context_weights["default"])
ticker_history = self._trade_history.get(ticker, [])
adjusted = SignalWeights(tech=base.tech, sentiment=base.sentiment, lstm=base.lstm)
if len(ticker_history) >= 5:
recent = ticker_history[-10:]
# _accuracy_weighted: 방향 일치 + 수익 크기 가중 반영 (단순 binary X)
tech_acc = self._accuracy_weighted(
[h.get("tech_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
news_acc = self._accuracy_weighted(
[h.get("sentiment_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
lstm_acc = self._accuracy_weighted(
[h.get("lstm_score", 0.5) for h in recent],
[h["outcome"] for h in recent])
alpha = 0.05 # 미세 조정폭 (±0.1 범위)
adjusted.tech = max(0.10, min(0.60, base.tech + alpha * (tech_acc - 0.5)))
adjusted.sentiment = max(0.10, min(0.60, base.sentiment + alpha * (news_acc - 0.5)))
adjusted.lstm = max(0.10, min(0.60, base.lstm + alpha * (lstm_acc - 0.5)))
# AI 신뢰도 보정 (LSTM confidence 상한 0.80 기준 조정)
if ai_confidence >= 0.75:
adjusted.lstm = min(0.65, adjusted.lstm * 1.25)
elif ai_confidence < 0.5:
adjusted.lstm = max(0.10, adjusted.lstm * 0.75)
return adjusted.normalize()
# ──────────────────────────────────────────────
# 앙상블 점수
# ──────────────────────────────────────────────
def compute_ensemble_score(self, tech_score: float, sentiment_score: float,
lstm_score: float, investor_score: float = 0.0,
weights: Optional[SignalWeights] = None) -> float:
"""
앙상블 통합 점수 계산
Args:
weights: 가중치 (None이면 기본값 사용)
"""
if weights is None:
weights = SignalWeights()
total = (weights.tech * tech_score
+ weights.sentiment * sentiment_score
+ weights.lstm * lstm_score)
# 수급 가산점 (최대 +0.15)
total += min(investor_score, 0.15)
return min(1.0, max(0.0, total))
# ──────────────────────────────────────────────
# Kelly Criterion
# ──────────────────────────────────────────────
def get_kelly_fraction(self, ticker: str = None, half_kelly: bool = True) -> float:
"""
Modified Kelly Criterion 기반 최적 투자 비중 계산
f* = (p * b - q) / b
where:
p = 과거 승리 거래 비율 (win rate)
q = 1 - p
b = 평균이익 / 평균손실 비율 (avg profit / avg loss, Risk-Reward)
Returns:
0.03 ~ 0.25 범위의 Kelly 분수
- half_kelly=True: 변동성 과대추정 보완을 위해 1/2 적용
- 거래 데이터 < 10건: 보수적 기본값 0.08 반환
"""
# 해당 종목 우선, 없으면 전체 통합 히스토리 사용
if ticker and ticker in self._trade_history:
outcomes = [h["outcome"] for h in self._trade_history[ticker]
if h.get("outcome") is not None]
else:
# 전체 종목 결과 통합 (시장 전반 win rate)
outcomes = [
h["outcome"]
for records in self._trade_history.values()
for h in records
if h.get("outcome") is not None
]
if len(outcomes) < 10:
return 0.08 # 데이터 부족 → 보수적 8%
wins = [o for o in outcomes if o > 0]
losses = [abs(o) for o in outcomes if o <= 0]
if not wins:
return 0.03 # 승리 거래 없음 → 최소 비중
if not losses:
return 0.20 # 손실 거래 없음 → 낙관적이나 상한 제한
p = len(wins) / len(outcomes)
q = 1.0 - p
avg_win = sum(wins) / len(wins)
avg_loss = sum(losses) / len(losses)
if avg_loss == 0:
return 0.20
b = avg_win / avg_loss # Risk-Reward ratio
kelly = (p * b - q) / b
if half_kelly:
kelly /= 2.0 # Half-Kelly: 실제 활용 시 표준
result = max(0.03, min(0.25, kelly)) # 3% ~ 25% 범위 제한
return result
# ──────────────────────────────────────────────
# 거래 결과 기록 & 가중치 학습
# ──────────────────────────────────────────────
def record_trade(self, ticker: str, tech_score: float, sentiment_score: float,
lstm_score: float, decision: str, outcome_pct: float):
"""
매매 결과 기록 → 가중치 학습 데이터 축적
Args:
outcome_pct: 실현 수익률 (%). 양수=이익, 음수=손실
"""
if ticker not in self._trade_history:
self._trade_history[ticker] = []
record = {
"tech_score": tech_score,
"sentiment_score": sentiment_score,
"lstm_score": lstm_score,
"decision": decision,
"outcome": outcome_pct
}
self._trade_history[ticker].append(record)
if len(self._trade_history[ticker]) > self.max_history:
self._trade_history[ticker] = self._trade_history[ticker][-self.max_history:]
self._update_weights(ticker)
self._save()
def _update_weights(self, ticker: str):
"""
종목별 성과를 반영해 컨텍스트 가중치 점진적 업데이트.
- 크기 가중 정확도(accuracy_weighted) 사용 → 큰 손실에 강한 패널티
- 지수이동평균(alpha=0.10)으로 점진 반영 → 급격한 가중치 전환 방지
- normalize() 후 재경계 적용 → 경계값 위반 방지
"""
history = self._trade_history.get(ticker, [])
if len(history) < 5:
return
recent = history[-10:]
outcomes = [h["outcome"] for h in recent]
tech_acc = self._accuracy_weighted(
[h.get("tech_score", 0.5) for h in recent], outcomes)
news_acc = self._accuracy_weighted(
[h.get("sentiment_score", 0.5) for h in recent], outcomes)
lstm_acc = self._accuracy_weighted(
[h.get("lstm_score", 0.5) for h in recent], outcomes)
alpha = 0.10 # EMA 계수 (10회 거래 후 완전 반영)
for ctx, w in self._context_weights.items():
delta_tech = alpha * (tech_acc - 0.5) * 0.4 # 최대 ±0.02
delta_news = alpha * (news_acc - 0.5) * 0.4
delta_lstm = alpha * (lstm_acc - 0.5) * 0.4
# 경계 적용 → normalize (경계 재반영) → normalize (합=1 보장)
w.tech = max(0.10, min(0.65, w.tech + delta_tech))
w.sentiment = max(0.10, min(0.65, w.sentiment + delta_news))
w.lstm = max(0.10, min(0.65, w.lstm + delta_lstm))
w.normalize() # normalize() 내부에서 경계 재클램핑 + 2차 정규화 수행
print(f"[Ensemble] {ctx} tech={w.tech:.2f} news={w.sentiment:.2f} lstm={w.lstm:.2f} "
f"(acc T={tech_acc:.2f} N={news_acc:.2f} L={lstm_acc:.2f})")
# ──────────────────────────────────────────────
# 정확도 지표
# ──────────────────────────────────────────────
@staticmethod
def _accuracy_weighted(scores: list, outcomes: list) -> float:
"""
신호-결과 크기 가중 정확도 (0.0~1.0, 0.5=무관)
- 단순 방향 일치(0/1)가 아닌 수익률 절댓값으로 가중
- 큰 손실 예측 실패는 작은 이익 예측 성공보다 강하게 패널티
"""
if len(scores) < 3:
return 0.5
total_weight = 0.0
weighted_correct = 0.0
for s, o in zip(scores, outcomes):
weight = max(1.0, abs(o)) # 수익률 절댓값 기반 가중치 (최소 1.0)
total_weight += weight
if (s >= 0.5 and o > 0) or (s < 0.5 and o <= 0):
weighted_correct += weight
if total_weight == 0:
return 0.5
return weighted_correct / total_weight
# ──────────────────────────────────────────────
# 전역 싱글톤 (프로세스별)
# ──────────────────────────────────────────────
_ensemble_instance: Optional[AdaptiveEnsemble] = None
def get_ensemble() -> AdaptiveEnsemble:
"""프로세스 내 싱글톤 앙상블 관리자 반환 (워커/메인 각각 독립 인스턴스)"""
global _ensemble_instance
if _ensemble_instance is None:
_ensemble_instance = AdaptiveEnsemble()
return _ensemble_instance