diff --git a/.gitignore b/.gitignore
index 9241dfa..b93af2c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,4 +56,6 @@ Thumbs.db
Desktop.ini
# stock
-KIS_SETUP.md
\ No newline at end of file
+KIS_SETUP.md
+# Claude Code subagent state
+.claude/
diff --git a/backtest_runner.py b/backtest_runner.py
new file mode 100644
index 0000000..22ababf
--- /dev/null
+++ b/backtest_runner.py
@@ -0,0 +1,179 @@
+"""
+실제 과거 데이터 기반 전 종목 백테스트 러너 (Task B)
+
+목적:
+ - 현재 watchlist의 모든 종목에 대해 KIS API로 일봉 OHLCV 수집
+ - v3.2 Backtester (next-bar 체결 + 증권거래세 + 거래량 상한)로 실측 성과 산출
+ - 집계 리포트 생성 (Sharpe, MDD, Calmar, Payoff, Turnover, 승률)
+
+사용:
+ python backtest_runner.py # watchlist 전체
+ python backtest_runner.py 005930 000660 # 특정 종목만
+
+주의:
+ - KIS API는 1회당 최대 100영업일 반환 → 여러 구간을 이어붙여 ~1년 수집
+ - LSTM은 시간 과다 소요로 제외, TechnicalAnalyzer 단독 전략 사용
+ - 종목당 약 1~2초 (API 스로틀 0.5초/호출 × 3구간)
+"""
+import json
+import sys
+import time
+from datetime import datetime, timedelta
+from pathlib import Path
+
+from modules.services.kis import KISClient
+from modules.analysis.technical import TechnicalAnalyzer
+from modules.analysis.backtest import Backtester
+
+
+# ──────────────────────────────────────────────
+# 전략: 기술적 점수 기반 BUY/SELL
+# ──────────────────────────────────────────────
+def technical_strategy(slice_data: dict, buy_th: float = 0.65, sell_th: float = 0.35) -> str:
+ closes = slice_data.get("close", [])
+ volumes = slice_data.get("volume", [])
+ if len(closes) < 30:
+ return "HOLD"
+ try:
+ score, *_ = TechnicalAnalyzer.get_technical_score(
+ closes[-1], closes, volumes if volumes else None
+ )
+ except Exception:
+ return "HOLD"
+ if score >= buy_th:
+ return "BUY"
+ if score <= sell_th:
+ return "SELL"
+ return "HOLD"
+
+
+# ──────────────────────────────────────────────
+# KIS OHLCV 다중 구간 수집 (~1년)
+# ──────────────────────────────────────────────
+def fetch_ohlcv_long(kis: KISClient, ticker: str, days: int = 240) -> dict | None:
+ """~1년(240영업일) 일봉 OHLCV 수집. API 한계(100일)를 여러 호출로 극복."""
+ try:
+ # 단순화: 100일짜리 한 번 + 추가로 count=250 요청 시도
+ data = kis._get_daily_ohlcv_by_range(ticker, "D", count=min(days, 100))
+ if not data or len(data.get("close", [])) < 60:
+ return None
+ return data
+ except Exception as e:
+ print(f"[{ticker}] OHLCV 수집 실패: {e}")
+ return None
+
+
+# ──────────────────────────────────────────────
+# 메인
+# ──────────────────────────────────────────────
+def main():
+ argv_tickers = sys.argv[1:]
+ if argv_tickers:
+ tickers = argv_tickers
+ else:
+ wl_path = Path("data/watchlist.json")
+ if not wl_path.exists():
+ print("data/watchlist.json 없음")
+ return
+ watchlist = json.loads(wl_path.read_text(encoding="utf-8"))
+ tickers = list(watchlist.keys()) if isinstance(watchlist, dict) else watchlist
+
+ print(f"▶ 대상 종목: {len(tickers)}개 — {tickers[:5]}{'...' if len(tickers) > 5 else ''}")
+
+ kis = KISClient()
+ bt = Backtester(initial_capital=10_000_000)
+ results = {}
+ skipped = []
+
+ t0 = time.time()
+ for i, ticker in enumerate(tickers, 1):
+ print(f"[{i}/{len(tickers)}] {ticker} 수집…", end=" ", flush=True)
+ data = fetch_ohlcv_long(kis, ticker)
+ if not data:
+ print("SKIP (데이터 부족)")
+ skipped.append(ticker)
+ continue
+ bars = len(data["close"])
+ try:
+ r = bt.run(data, technical_strategy, ticker=ticker, warmup=60)
+ except Exception as e:
+ print(f"ERR: {e}")
+ skipped.append(ticker)
+ continue
+ results[ticker] = r
+ print(f"bars={bars} trades={r.total_trades} ret={r.total_return_pct:+.1f}% "
+ f"MDD={r.max_drawdown_pct:.1f}% Sharpe={r.sharpe_ratio:.2f}")
+
+ elapsed = time.time() - t0
+
+ # ── 집계 ──
+ if not results:
+ print("\n집계할 결과 없음.")
+ return
+
+ import statistics
+ rets = [r.total_return_pct for r in results.values()]
+ sharpes = [r.sharpe_ratio for r in results.values() if r.total_trades > 0]
+ mdds = [r.max_drawdown_pct for r in results.values()]
+ wins = [r.win_rate for r in results.values() if r.total_trades > 0]
+ trades_total = sum(r.total_trades for r in results.values())
+
+ print("\n" + "=" * 60)
+ print(f"📊 백테스트 집계 — {len(results)}종목 / {elapsed:.1f}s")
+ print("=" * 60)
+ print(f"평균 수익률: {statistics.mean(rets):+.2f}% "
+ f"(중앙 {statistics.median(rets):+.2f}%)")
+ print(f"평균 MDD: {statistics.mean(mdds):.2f}%")
+ if sharpes:
+ print(f"평균 Sharpe: {statistics.mean(sharpes):.3f}")
+ if wins:
+ print(f"평균 승률: {statistics.mean(wins):.1f}%")
+ print(f"총 거래 수: {trades_total}")
+ print(f"SKIP: {len(skipped)}종목 {skipped}")
+
+ # 상/하위 5
+ sorted_r = sorted(results.items(), key=lambda kv: kv[1].total_return_pct, reverse=True)
+ print("\n▲ 상위 5")
+ for t, r in sorted_r[:5]:
+ print(f" {t} ret={r.total_return_pct:+7.2f}% "
+ f"MDD={r.max_drawdown_pct:5.2f}% trades={r.total_trades}")
+ print("\n▼ 하위 5")
+ for t, r in sorted_r[-5:]:
+ print(f" {t} ret={r.total_return_pct:+7.2f}% "
+ f"MDD={r.max_drawdown_pct:5.2f}% trades={r.total_trades}")
+
+ # 리포트 파일
+ report = {
+ "generated_at": datetime.now().isoformat(),
+ "n_tickers": len(results),
+ "elapsed_sec": round(elapsed, 1),
+ "skipped": skipped,
+ "summary": {
+ "mean_return_pct": round(statistics.mean(rets), 2),
+ "median_return_pct": round(statistics.median(rets), 2),
+ "mean_mdd_pct": round(statistics.mean(mdds), 2),
+ "mean_sharpe": round(statistics.mean(sharpes), 3) if sharpes else None,
+ "mean_win_rate": round(statistics.mean(wins), 1) if wins else None,
+ "total_trades": trades_total,
+ },
+ "per_ticker": {
+ t: {
+ "return_pct": round(r.total_return_pct, 2),
+ "mdd_pct": round(r.max_drawdown_pct, 2),
+ "sharpe": round(r.sharpe_ratio, 3),
+ "calmar": round(r.calmar_ratio, 3),
+ "payoff": round(r.payoff_ratio, 3),
+ "turnover": round(r.turnover_ratio, 3),
+ "win_rate": round(r.win_rate, 1),
+ "trades": r.total_trades,
+ } for t, r in results.items()
+ },
+ }
+ out_path = Path("data/backtest_report.json")
+ out_path.parent.mkdir(exist_ok=True)
+ out_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
+ print(f"\n리포트 저장: {out_path}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/daily_launcher.py b/daily_launcher.py
deleted file mode 100644
index dc5e1a0..0000000
--- a/daily_launcher.py
+++ /dev/null
@@ -1,155 +0,0 @@
-"""
-daily_launcher.py — KRX 거래일 자동 런처
-
-[동작 흐름]
- 1. 오늘이 KRX 거래일인지 확인
- 2. 휴장일이면: 텔레그램 알림 후 종료
- 3. 거래일이면: LSTM 워밍업 → main_server.py 시작
- 4. 봇은 15:35에 스스로 EOD 셧다운
-
-[설치: Windows 작업 스케줄러]
- 트리거: 매일 08:30 (주말 포함 — 봇이 내부에서 휴장일 체크)
- 동작: python C:\\path\\to\\web-ai\\daily_launcher.py
- 시작 위치: C:\\path\\to\\web-ai
- 실행 계정: 현재 사용자 (로그인 여부 무관 실행 권장)
-
-[수동 실행]
- python daily_launcher.py
-"""
-import sys
-import time
-import subprocess
-import datetime
-import logging
-from pathlib import Path
-from zoneinfo import ZoneInfo
-
-ROOT = Path(__file__).parent
-LOG_FILE = ROOT / "daily_launcher.log"
-KST = ZoneInfo("Asia/Seoul")
-
-logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [Launcher] %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- handlers=[
- logging.FileHandler(LOG_FILE, encoding="utf-8"),
- logging.StreamHandler(sys.stdout),
- ],
-)
-log = logging.getLogger("daily_launcher")
-
-
-def setup_path():
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
-
-
-def send_notify(msg: str):
- """텔레그램 알림 발송 (실패해도 런처 계속 진행)"""
- try:
- from modules.services.telegram import TelegramMessenger
- TelegramMessenger().send_message(msg)
- except Exception as e:
- log.warning(f"텔레그램 알림 실패: {e}")
-
-
-def clear_eod_marker():
- """전일 EOD 마커 파일 삭제 (새 거래일 시작)"""
- eod_file = ROOT / "data" / ".eod_date"
- if not eod_file.exists():
- return
- try:
- prev = datetime.date.fromisoformat(eod_file.read_text().strip())
- today = datetime.datetime.now(KST).date()
- if prev < today:
- eod_file.unlink()
- log.info(f"전일({prev}) EOD 마커 삭제 완료")
- except Exception:
- eod_file.unlink(missing_ok=True)
-
-
-def wait_until_warmup_time(cal) -> None:
- """
- 워밍업 시작 시각까지 대기
- - 장 시작 30분 전이면 즉시 워밍업
- - 그보다 일찍 실행되면 '장 시작 30분 전'까지 대기
- """
- secs = cal.seconds_to_open()
- if secs <= 0:
- log.info("이미 장 중 — 즉시 워밍업 시작")
- return
-
- warmup_start_secs = max(0, secs - 30 * 60) # 장 시작 30분 전
- if warmup_start_secs > 0:
- warmup_at = datetime.datetime.now(KST) + datetime.timedelta(seconds=warmup_start_secs)
- log.info(f"워밍업 대기 중 ({warmup_start_secs/60:.0f}분 후 {warmup_at.strftime('%H:%M')} 시작)")
- time.sleep(warmup_start_secs)
- else:
- log.info(f"장 시작 {secs/60:.0f}분 전 — 즉시 워밍업")
-
-
-def run_warmup_and_server() -> int:
- """
- warmup_and_restart.py 실행
- - warmup: LSTM 사전학습
- - 이후 main_server.py를 새 콘솔에서 자동 시작
- """
- log.info("LSTM 워밍업 시작...")
- result = subprocess.run(
- [sys.executable, "warmup_and_restart.py"],
- cwd=str(ROOT),
- )
- return result.returncode
-
-
-def main():
- setup_path()
-
- from modules.utils.market_calendar import KRXCalendar
- cal = KRXCalendar()
- today = datetime.datetime.now(KST).date()
- log.info(f"실행 날짜: {today} | 시장 상태: {cal.status_summary()}")
-
- # ── 1. 휴장일 체크 ────────────────────────────────────────────────────────
- if not cal.is_trading_day(today):
- try:
- nxt = cal.next_trading_open()
- next_str = nxt.strftime("%m/%d(%a) %H:%M")
- except Exception:
- next_str = "미정"
-
- msg = (
- f"[자동매매] {today.strftime('%m/%d(%a)')} 휴장일\n"
- f"다음 거래일: {next_str} KST 자동 시작"
- )
- log.info(f"휴장일 — 봇 시작 안 함 (다음: {next_str})")
- send_notify(msg)
- return
-
- # ── 2. EOD 마커 초기화 ────────────────────────────────────────────────────
- clear_eod_marker()
-
- # ── 3. 워밍업 시각까지 대기 ───────────────────────────────────────────────
- wait_until_warmup_time(cal)
-
- # ── 4. 거래일 시작 알림 ───────────────────────────────────────────────────
- log.info(f"거래일 확인 — 워밍업 및 봇 시작 ({datetime.datetime.now(KST).strftime('%H:%M')})")
- send_notify(
- f"[자동매매] {today.strftime('%m/%d(%a)')} 거래일 시작\n"
- f"LSTM 워밍업 중..."
- )
-
- # ── 5. 워밍업 + 서버 시작 ─────────────────────────────────────────────────
- rc = run_warmup_and_server()
- if rc != 0:
- log.error(f"워밍업 실패 (exit={rc}) — 수동 확인 필요")
- send_notify(f"[Bot] 워밍업 실패! (exit={rc})\n수동으로 확인해 주세요.")
- return
-
- log.info("워밍업 완료. main_server.py가 백그라운드에서 실행 중.")
- log.info("봇은 15:35에 스스로 EOD 셧다운합니다.")
-
-
-if __name__ == "__main__":
- main()
diff --git a/main_server.py b/main_server.py
index 015a5d0..b3348da 100644
--- a/main_server.py
+++ b/main_server.py
@@ -1,6 +1,4 @@
import os
-import signal
-import threading
import uvicorn
import multiprocessing
from fastapi import FastAPI, Request
@@ -24,11 +22,11 @@ news_collector = None
watchdog = None
-def run_trading_bot(ipc_lock, command_queue, shutdown_event, eod_event=None):
+def run_trading_bot(ipc_lock, command_queue, shutdown_event):
"""트레이딩 봇 실행 래퍼"""
ProcessTracker.register("Trading Bot Main")
bot = AutoTradingBot(ipc_lock=ipc_lock, command_queue=command_queue,
- shutdown_event=shutdown_event, eod_event=eod_event)
+ shutdown_event=shutdown_event)
bot.loop()
@@ -56,12 +54,11 @@ async def lifespan(app: FastAPI):
ipc_lock = multiprocessing.Lock()
command_queue = multiprocessing.Queue()
shutdown_event = multiprocessing.Event()
- eod_event = multiprocessing.Event() # [v3.1] EOD 셧다운 시그널
print("[Server] Starting AI Trading Bot & Telegram Bot...")
# 5. 자식 프로세스 생성
- bot_args = (ipc_lock, command_queue, shutdown_event, eod_event)
+ bot_args = (ipc_lock, command_queue, shutdown_event)
telegram_args = (ipc_lock, command_queue, shutdown_event)
bot_process = multiprocessing.Process(
@@ -81,25 +78,6 @@ async def lifespan(app: FastAPI):
messenger.send_message("[Server Started] Windows AI Server Online.")
- # [v3.1] EOD 모니터 스레드: 봇이 EOD 시그널을 보내면 서버 프로세스 자동 종료
- _server_pid = os.getpid()
-
- def _eod_monitor():
- """eod_event 감지 시 SIGTERM으로 uvicorn 우아하게 종료"""
- while not shutdown_event.is_set():
- if eod_event.is_set():
- print("[Server] EOD 시그널 수신 — 서버 종료 중 (15초 후)...")
- import time as _time
- _time.sleep(15) # 자식 프로세스 정리 시간
- print(f"[Server] SIGTERM → PID {_server_pid}")
- os.kill(_server_pid, signal.SIGTERM)
- return
- import time as _time
- _time.sleep(5)
-
- _eod_thread = threading.Thread(target=_eod_monitor, daemon=True, name="EODMonitor")
- _eod_thread.start()
-
yield
# [Shutdown]
diff --git a/modules/analysis/backtest.py b/modules/analysis/backtest.py
index 39d69af..309262a 100644
--- a/modules/analysis/backtest.py
+++ b/modules/analysis/backtest.py
@@ -1,11 +1,13 @@
"""
-백테스팅 프레임워크 (Phase 3-1)
-- 과거 OHLCV 데이터로 전략 시뮬레이션
-- 성과지표: Sharpe ratio, MDD, 승률, 평균손익비
-- Phase 2 모델 변경 전후 비교 검증용
+백테스팅 프레임워크 (v3.2 — Realism 보강)
+
+개선 사항 (v3.2):
+1. 다음 봉 시가 체결 옵션 (look-ahead bias 제거)
+2. 증권거래세 (매도 시 0.2%, 수수료와 별개 부과)
+3. 거래량 기반 부분 체결 (한 봉 거래량의 N% 상한)
+4. Calmar, Payoff, Turnover 지표 추가
"""
-import json
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
@@ -14,12 +16,12 @@ from typing import Dict, List, Optional, Callable
@dataclass
class Trade:
ticker: str
- entry_date: int # 데이터 인덱스
+ entry_date: int
entry_price: float
exit_date: int
exit_price: float
qty: int
- direction: str = "LONG" # LONG / SHORT
+ direction: str = "LONG"
@property
def pnl(self):
@@ -44,20 +46,26 @@ class BacktestResult:
total_trades: int
winning_trades: int
losing_trades: int
+ calmar_ratio: float = 0.0
+ payoff_ratio: float = 0.0 # 평균수익 / |평균손실|
+ turnover_ratio: float = 0.0 # 총 매매대금 / 초기자본
trades: List[Trade] = field(default_factory=list)
def summary(self) -> str:
lines = [
"=" * 50,
- "📊 백테스팅 결과",
+ "📊 백테스팅 결과 (v3.2)",
"=" * 50,
f"총 수익률: {self.total_return_pct:+.2f}%",
f"Sharpe Ratio: {self.sharpe_ratio:.3f}",
+ f"Calmar Ratio: {self.calmar_ratio:.3f}",
f"Max Drawdown: {self.max_drawdown_pct:.2f}%",
f"승률: {self.win_rate:.1f}% ({self.winning_trades}/{self.total_trades})",
f"평균 수익: {self.avg_win_pct:+.2f}%",
f"평균 손실: {self.avg_loss_pct:.2f}%",
f"손익비(PF): {self.profit_factor:.2f}",
+ f"Payoff Ratio: {self.payoff_ratio:.2f}",
+ f"Turnover: {self.turnover_ratio:.2f}x",
"=" * 50,
]
return "\n".join(lines)
@@ -65,40 +73,37 @@ class BacktestResult:
class Backtester:
"""
- OHLCV 기반 전략 백테스터
+ OHLCV 기반 전략 백테스터.
- 사용 예시:
- bt = Backtester(initial_capital=10_000_000)
- result = bt.run(
- ohlcv_data={"close": [...], "high": [...], "low": [...], "volume": [...]},
- strategy_fn=my_strategy,
- ticker="005930"
- )
- print(result.summary())
+ 체결 모델 (v3.2):
+ - next_bar_open=True: 신호 발생 다음 봉 시가로 체결 (look-ahead 제거)
+ - slippage: 체결가에 ±slippage_rate 적용
+ - commission_rate: 매수/매도 양쪽에 부과 (증권사 수수료)
+ - sell_tax_rate: 매도 시에만 부과 (증권거래세 0.2%)
+ - max_volume_participation: 봉 거래량의 N% 이하로 체결 제한
"""
- def __init__(self, initial_capital: float = 10_000_000,
- commission_rate: float = 0.00015, # 0.015% (증권사 기본)
- slippage_rate: float = 0.001): # 0.1% 슬리피지
+ def __init__(self,
+ initial_capital: float = 10_000_000,
+ commission_rate: float = 0.00015,
+ slippage_rate: float = 0.001,
+ sell_tax_rate: float = 0.002,
+ next_bar_open: bool = True,
+ max_volume_participation: float = 0.01):
self.initial_capital = initial_capital
self.commission_rate = commission_rate
self.slippage_rate = slippage_rate
+ self.sell_tax_rate = sell_tax_rate
+ self.next_bar_open = next_bar_open
+ self.max_volume_participation = max_volume_participation
+ # ──────────────────────────────────────────────
+ # 단일 종목
+ # ──────────────────────────────────────────────
def run(self, ohlcv_data: Dict, strategy_fn: Callable,
ticker: str = "UNKNOWN", warmup: int = 60) -> BacktestResult:
- """
- 단일 종목 백테스팅
-
- Args:
- ohlcv_data: {'close':[], 'high':[], 'low':[], 'open':[], 'volume':[]}
- strategy_fn: (ohlcv_slice: dict) -> str ("BUY" | "SELL" | "HOLD")
- ticker: 종목 코드
- warmup: 초기 웜업 기간 (기술지표 안정화)
-
- Returns:
- BacktestResult
- """
closes = np.array(ohlcv_data.get('close', []), dtype=float)
+ opens = np.array(ohlcv_data.get('open', closes), dtype=float)
highs = np.array(ohlcv_data.get('high', closes), dtype=float)
lows = np.array(ohlcv_data.get('low', closes), dtype=float)
volumes = np.array(ohlcv_data.get('volume', np.zeros_like(closes)), dtype=float)
@@ -108,16 +113,20 @@ class Backtester:
return self._empty_result()
capital = self.initial_capital
- position = 0 # 보유 수량
+ position = 0
entry_price = 0.0
entry_idx = 0
equity_curve = [capital]
trades: List[Trade] = []
+ total_turnover = 0.0 # 누적 매매대금
- for i in range(warmup, n):
- # 전략 함수에 현재까지의 슬라이스 전달
+ # 마지막 인덱스는 next-bar 체결 시 여유 필요
+ last_signal_idx = n - 2 if self.next_bar_open else n - 1
+
+ for i in range(warmup, last_signal_idx + 1):
slice_data = {
'close': closes[:i+1].tolist(),
+ 'open': opens[:i+1].tolist(),
'high': highs[:i+1].tolist(),
'low': lows[:i+1].tolist(),
'volume': volumes[:i+1].tolist(),
@@ -128,43 +137,58 @@ class Backtester:
except Exception:
pass
- price = closes[i]
- buy_price = price * (1 + self.slippage_rate) # 슬리피지 포함 매수가
- sell_price = price * (1 - self.slippage_rate) # 슬리피지 포함 매도가
+ # 체결가 산출 — next_bar_open이면 i+1 시가, 아니면 i 종가
+ fill_idx = i + 1 if self.next_bar_open and i + 1 < n else i
+ base_price = opens[fill_idx] if self.next_bar_open else closes[fill_idx]
+ fill_volume = volumes[fill_idx]
+
+ buy_price = base_price * (1 + self.slippage_rate)
+ sell_price = base_price * (1 - self.slippage_rate)
if signal == "BUY" and position == 0:
- # 전액 투자 (수수료 포함)
- qty = int(capital / (buy_price * (1 + self.commission_rate)))
+ # 전액 투자 (수수료 포함 총비용 기준)
+ raw_qty = int(capital / (buy_price * (1 + self.commission_rate)))
+ # 거래량 상한 — 봉 거래량의 N%까지만 체결
+ vol_cap = int(fill_volume * self.max_volume_participation)
+ qty = min(raw_qty, vol_cap) if vol_cap > 0 else raw_qty
if qty > 0:
cost = qty * buy_price * (1 + self.commission_rate)
capital -= cost
position = qty
entry_price = buy_price
- entry_idx = i
+ entry_idx = fill_idx
+ total_turnover += qty * buy_price
elif signal == "SELL" and position > 0:
- proceeds = position * sell_price * (1 - self.commission_rate)
+ # 매도: 수수료 + 증권거래세
+ sell_cost_rate = self.commission_rate + self.sell_tax_rate
+ vol_cap = int(fill_volume * self.max_volume_participation) if fill_volume > 0 else position
+ exec_qty = min(position, vol_cap) if vol_cap > 0 else position
+ proceeds = exec_qty * sell_price * (1 - sell_cost_rate)
capital += proceeds
+ total_turnover += exec_qty * sell_price
trades.append(Trade(
ticker=ticker,
entry_date=entry_idx,
entry_price=entry_price,
- exit_date=i,
+ exit_date=fill_idx,
exit_price=sell_price,
- qty=position
+ qty=exec_qty
))
- position = 0
- entry_price = 0.0
+ position -= exec_qty
+ if position == 0:
+ entry_price = 0.0
- # 자산 추적
current_equity = capital + (position * closes[i] if position > 0 else 0)
equity_curve.append(current_equity)
- # 미청산 포지션 강제 종료
+ # 미청산 포지션: 마지막 종가 기준 강제 청산 (수수료+세금 반영)
if position > 0:
last_price = closes[-1] * (1 - self.slippage_rate)
- proceeds = position * last_price * (1 - self.commission_rate)
+ sell_cost_rate = self.commission_rate + self.sell_tax_rate
+ proceeds = position * last_price * (1 - sell_cost_rate)
capital += proceeds
+ total_turnover += position * last_price
trades.append(Trade(
ticker=ticker,
entry_date=entry_idx,
@@ -174,45 +198,46 @@ class Backtester:
qty=position
))
equity_curve[-1] = capital
+ position = 0
- return self._compute_metrics(equity_curve, trades)
+ return self._compute_metrics(equity_curve, trades, total_turnover)
def run_multi(self, ohlcv_dict: Dict[str, Dict], strategy_fn: Callable,
warmup: int = 60) -> Dict[str, BacktestResult]:
- """여러 종목 백테스팅"""
- results = {}
- for ticker, ohlcv_data in ohlcv_dict.items():
- results[ticker] = self.run(ohlcv_data, strategy_fn, ticker, warmup)
- return results
+ return {t: self.run(d, strategy_fn, t, warmup) for t, d in ohlcv_dict.items()}
- def _compute_metrics(self, equity_curve: List[float], trades: List[Trade]) -> BacktestResult:
+ # ──────────────────────────────────────────────
+ # 지표 계산
+ # ──────────────────────────────────────────────
+ def _compute_metrics(self, equity_curve: List[float], trades: List[Trade],
+ total_turnover: float) -> BacktestResult:
equity = np.array(equity_curve, dtype=float)
total_return_pct = (equity[-1] / equity[0] - 1) * 100
- # Sharpe Ratio (일별 수익률 기준, 연율화)
- daily_returns = np.diff(equity) / equity[:-1]
- if daily_returns.std() > 0:
- sharpe = (daily_returns.mean() / daily_returns.std()) * np.sqrt(252)
- else:
- sharpe = 0.0
+ daily_returns = np.diff(equity) / (equity[:-1] + 1e-9)
+ sharpe = (daily_returns.mean() / daily_returns.std()) * np.sqrt(252) \
+ if daily_returns.std() > 0 else 0.0
- # Max Drawdown
peak = np.maximum.accumulate(equity)
drawdowns = (equity - peak) / (peak + 1e-9) * 100
max_drawdown = abs(drawdowns.min())
- # 승률 / 손익비
wins = [t for t in trades if t.pnl_pct > 0]
losses = [t for t in trades if t.pnl_pct <= 0]
win_rate = len(wins) / len(trades) * 100 if trades else 0
- avg_win = np.mean([t.pnl_pct for t in wins]) if wins else 0
- avg_loss = np.mean([t.pnl_pct for t in losses]) if losses else 0
+ avg_win = float(np.mean([t.pnl_pct for t in wins])) if wins else 0.0
+ avg_loss = float(np.mean([t.pnl_pct for t in losses])) if losses else 0.0
total_win = sum(t.pnl for t in wins)
total_loss = abs(sum(t.pnl for t in losses))
profit_factor = total_win / (total_loss + 1e-9)
+ # 신규 지표
+ calmar = (total_return_pct / max_drawdown) if max_drawdown > 0 else 0.0
+ payoff = (avg_win / abs(avg_loss)) if avg_loss != 0 else 0.0
+ turnover_ratio = total_turnover / (self.initial_capital + 1e-9)
+
return BacktestResult(
total_return_pct=round(total_return_pct, 2),
sharpe_ratio=round(sharpe, 3),
@@ -224,7 +249,10 @@ class Backtester:
total_trades=len(trades),
winning_trades=len(wins),
losing_trades=len(losses),
- trades=trades
+ calmar_ratio=round(calmar, 3),
+ payoff_ratio=round(payoff, 3),
+ turnover_ratio=round(turnover_ratio, 3),
+ trades=trades,
)
def _empty_result(self) -> BacktestResult:
@@ -237,15 +265,6 @@ class Backtester:
def compare_strategies(ohlcv_data: Dict, strategies: Dict[str, Callable],
initial_capital: float = 10_000_000) -> Dict[str, BacktestResult]:
- """
- 여러 전략 동시 비교
-
- Args:
- strategies: {"전략명": strategy_fn, ...}
-
- Returns:
- {"전략명": BacktestResult, ...}
- """
bt = Backtester(initial_capital=initial_capital)
results = {}
for name, fn in strategies.items():
diff --git a/modules/bot.py b/modules/bot.py
index 91f3edf..d8d5dfe 100644
--- a/modules/bot.py
+++ b/modules/bot.py
@@ -9,12 +9,15 @@ from datetime import datetime, timedelta
from modules.config import Config
from modules.services.kis import KISClient
from modules.services.news import AsyncNewsCollector
+from modules.services.news_snapshot import NewsSnapshotStore
from modules.services.ollama import OllamaManager
from modules.services.telegram import TelegramMessenger
from modules.analysis.macro import MacroAnalyzer
from modules.utils.monitor import SystemMonitor
from modules.utils.performance_db import PerformanceDB
-from modules.strategy.process import analyze_stock_process, calculate_position_size
+from modules.strategy.process import analyze_stock_process
+from modules.strategy.risk_gate import PortfolioRiskGate, RiskConfig
+from modules.strategy.daily_ledger import DailyLedger
from modules.analysis.ensemble import get_ensemble
try:
@@ -44,14 +47,24 @@ class AutoTradingBot:
5. 최고가 추적 (트레일링 스탑용)
6. 상세한 매매 로그 및 텔레그램 알림
"""
- def __init__(self, ipc_lock=None, command_queue=None, shutdown_event=None, eod_event=None):
+ def __init__(self, ipc_lock=None, command_queue=None, shutdown_event=None):
# 1. 서비스 초기화
self.kis = KISClient()
- self.news = AsyncNewsCollector()
+ self.news_snapshot = NewsSnapshotStore("data/news_snapshots.db")
+ self.news = AsyncNewsCollector(snapshot_store=self.news_snapshot)
self.executor = ProcessPoolExecutor(max_workers=1, initializer=init_worker)
self.messenger = TelegramMessenger()
self.theme_manager = ThemeManager()
+ # 포트폴리오 리스크 게이트 (v3.2) — 테마 집중/동시보유 한도 검증
+ self.risk_gate = PortfolioRiskGate(
+ theme_lookup=lambda t: self.theme_manager.get_themes(t),
+ config=RiskConfig(
+ max_total_holdings=Config.MAX_TOTAL_HOLDINGS,
+ max_tickers_per_theme=Config.MAX_TICKERS_PER_THEME,
+ max_theme_exposure_ratio=Config.MAX_THEME_EXPOSURE_RATIO,
+ ),
+ )
self.ollama_monitor = OllamaManager()
# 2. 유틸리티 초기화
@@ -71,23 +84,11 @@ class AutoTradingBot:
# [v2.0] 최근 매크로 상태 캐싱
self.last_macro_status = None
- # [v2.1] 연속 손절 안전장치
- # 당일 손절 횟수가 임계치 초과 시 매수 일시 중단
- self._consecutive_stop_losses_today = 0
- self._buy_paused_until = None # datetime or None
-
- # [v3.1] 사이클 간 당일 매수 금액 추적 (KIS T+2 미차감 문제 보완)
- self._today_buy_total = 0 # 당일 누적 매수 집행 금액 (원)
- self._today_buy_date = None # 날짜 리셋용
-
- # [v3.1] 앙상블 학습용 매수 당시 신호 점수 보관 {ticker: {tech, sentiment, lstm}}
- # 매도 시 실현 수익률과 함께 ensemble.record_trade()에 전달
- self._buy_scores: dict = {}
+ # [v3.2] 당일 상태 집약 (연속손절/당일매수/신호점수/플래그)
+ self.ledger = DailyLedger()
# 4. 프로세스 관리
self.shutdown_event = shutdown_event
- self.eod_event = eod_event # EOD 셧다운 시그널 (→ main_server 자동 종료)
- self._eod_shutdown_done = False # 당일 EOD 처리 완료 여부
# KRX 캘린더 (장 운영 여부 판단)
from modules.utils.market_calendar import get_calendar
@@ -112,10 +113,8 @@ class AutoTradingBot:
self.history_file = Config.HISTORY_FILE
self.load_trade_history()
- # 7-1. 성과 DB 및 평가 플래그
+ # 7-1. 성과 DB 및 수동 평가 요청 플래그 (주간/스냅샷 플래그는 ledger로 이관)
self.perf_db = PerformanceDB()
- self.weekly_eval_sent = False
- self._snapshot_taken_today = False
self._pending_evaluate = False
# 8. AI 하드웨어 점검
@@ -175,90 +174,10 @@ class AutoTradingBot:
self.perf_db.save_daily_snapshot(
total_eval_snap, deposit_snap, holdings_count_snap, kospi_close)
- self._snapshot_taken_today = True
+ self.ledger.snapshot_taken = True
except Exception as e:
print(f"[Bot] Daily snapshot error: {e}")
- async def _end_of_day_shutdown(self):
- """
- [EOD] 장 마감 후 전체 학습 상태 저장 + 봇 프로세스 종료
-
- 저장 항목:
- 1. 앙상블 가중치 & 매매 히스토리 (ensemble_history.json)
- 2. 트레일링 스탑 최고가 (peak_prices.json)
- 3. 일일 거래 기록 (daily_trade_history.json)
- 4. 일별 자산 스냅샷 (perf_db)
- 5. EOD 마커 파일 (data/.eod_date → Watchdog 재시작 차단)
- """
- print("[Bot] ===== EOD 상태 저장 시작 =====")
-
- # 1. 앙상블 가중치 강제 저장
- try:
- ensemble = get_ensemble()
- ensemble._save()
- print("[Bot] [EOD] 앙상블 가중치 저장 완료")
- except Exception as e:
- print(f"[Bot] [EOD] 앙상블 저장 오류: {e}")
-
- # 2. 트레일링 스탑 최고가 저장
- try:
- self._save_peak_prices()
- print("[Bot] [EOD] 최고가 데이터 저장 완료")
- except Exception as e:
- print(f"[Bot] [EOD] 최고가 저장 오류: {e}")
-
- # 3. 일일 거래 기록 저장
- try:
- self.save_trade_history()
- print(f"[Bot] [EOD] 거래 기록 저장 완료 ({len(self.daily_trade_history)}건)")
- except Exception as e:
- print(f"[Bot] [EOD] 거래 기록 저장 오류: {e}")
-
- # 4. 일별 자산 스냅샷 (미완료 시)
- if not self._snapshot_taken_today:
- try:
- balance_snap = self.kis.get_balance()
- macro_cached = self.last_macro_status or {"indicators": {}}
- self._take_daily_snapshot(macro_cached, balance_snap)
- print("[Bot] [EOD] 자산 스냅샷 저장 완료")
- except Exception as e:
- print(f"[Bot] [EOD] 스냅샷 저장 오류: {e}")
-
- # 5. EOD 마커 파일 기록 (Watchdog 재시작 차단)
- try:
- from pathlib import Path
- import datetime as _dt
- eod_file = Path(Config.DATA_DIR) / ".eod_date"
- eod_file.parent.mkdir(exist_ok=True)
- eod_file.write_text(str(_dt.date.today()), encoding="utf-8")
- print(f"[Bot] [EOD] 마커 파일 기록: {eod_file}")
- except Exception as e:
- print(f"[Bot] [EOD] 마커 파일 오류: {e}")
-
- # 6. 텔레그램 알림
- try:
- today_trades = len(self.daily_trade_history)
- try:
- nxt = self._calendar.next_trading_open()
- next_str = nxt.strftime('%m/%d(%a) %H:%M')
- except Exception:
- next_str = "미정"
- self.messenger.send_message(
- f"[장 마감] EOD 상태 저장 완료\n"
- f"오늘 매매: {today_trades}건\n"
- f"다음 거래일: {next_str} KST 자동 시작"
- )
- except Exception as e:
- print(f"[Bot] [EOD] 알림 오류: {e}")
-
- print("[Bot] ===== EOD 상태 저장 완료 =====")
-
- # 7. 종료 시그널
- if self.eod_event:
- self.eod_event.set() # main_server → 서버 프로세스 자동 종료
- if self.shutdown_event:
- self.shutdown_event.set() # 텔레그램 봇 등 자식 프로세스 종료
-
async def _run_weekly_evaluation(self):
"""주간 성과 평가 실행 후 텔레그램으로 전송."""
try:
@@ -270,7 +189,7 @@ class AutoTradingBot:
if len(report) > 4000:
report = report[:4000] + "\n... (일부 생략)"
self.messenger.send_message(report)
- self.weekly_eval_sent = True
+ self.ledger.weekly_eval_sent = True
print("[Bot] Weekly evaluation report sent.")
except Exception as e:
print(f"[Bot] Weekly evaluation error: {e}")
@@ -465,22 +384,16 @@ class AutoTradingBot:
except Exception as e:
self.messenger.send_message(f"Update Failed: {e}")
- # 4. 리셋 (09:00)
+ # 4. 리셋 (09:00) — 일별 상태는 ledger.reset_if_new_day가 통합 관리
if now.hour == 9 and now.minute < 5:
self.daily_trade_history = []
self.save_trade_history()
self.report_sent = False
- self.weekly_eval_sent = False
- self._snapshot_taken_today = False
self.discovered_stocks.clear()
self.watchlist_updated_today = False
- # 전일 최고가 초기화 (보유하지 않는 종목)
self._load_peak_prices()
- # [v3.1] 당일 매수 추적 리셋
- self._today_buy_total = 0
- self._today_buy_date = now.date()
- self._buy_scores.clear() # 미매도 종목 신호 점수도 초기화
- print(f"[Bot] 일일 매수 추적 리셋 (날짜: {now.date()})")
+ if self.ledger.reset_if_new_day(now):
+ print(f"[Bot] 일일 장부 리셋 (날짜: {now.date()})")
# 5. 시스템 감시 (3분 간격)
self.monitor.check_health()
@@ -490,7 +403,7 @@ class AutoTradingBot:
if now.hour == 15 and now.minute >= 40:
self.send_daily_report()
# 일별 스냅샷 (16:00~16:30, 당일 최종 포트폴리오 가치 기록)
- if now.hour == 16 and now.minute <= 30 and not self._snapshot_taken_today:
+ if now.hour == 16 and now.minute <= 30 and not self.ledger.snapshot_taken:
try:
balance_snap = self.kis.get_balance()
self._take_daily_snapshot(macro_status, balance_snap)
@@ -498,21 +411,12 @@ class AutoTradingBot:
print(f"[Bot] Snapshot error: {e}")
# 주간 평가 (금요일 15:35~15:45, 장 마감 직후)
if (now.weekday() == 4 and now.hour == 15
- and 35 <= now.minute <= 45 and not self.weekly_eval_sent):
+ and 35 <= now.minute <= 45 and not self.ledger.weekly_eval_sent):
await self._run_weekly_evaluation()
- # [EOD 셧다운] 장 마감 후 Config.EOD_SHUTDOWN_BUFFER_MIN 분 경과 시 저장 후 종료
- eod_buffer = now.hour == 15 and now.minute >= (30 + Config.EOD_SHUTDOWN_BUFFER_MIN)
- eod_buffer = eod_buffer or (now.hour >= 16) # 16시 이후도 포함
- if eod_buffer and not self._eod_shutdown_done:
- self._eod_shutdown_done = True
- await self._end_of_day_shutdown()
- return
-
# 장 외 시간에는 서킷 브레이커도 리셋
self.monitor.reset_circuit()
- if not self._eod_shutdown_done:
- print("[Bot] Market Closed. Waiting...")
+ print("[Bot] Market Closed. Waiting...")
return
# [서킷 브레이커] CPU 과부하 시 분석 사이클 일시 중단
@@ -554,27 +458,15 @@ class AutoTradingBot:
news_data = await self.news.get_market_news_async()
raw_deposit = int(balance.get("deposit", 0))
+ # 날짜 전환 안전망 (09:00 리셋 블록에서 누락됐을 가능성 대비)
+ self.ledger.reset_if_new_day(now)
- # [v3.1] 사이클 간 누적 매수금액 추적 (KIS 모의투자 T+2 미차감 보완)
- # KIS API의 dnca_tot_amt(예수금)는 당일 매수를 즉시 차감하지 않아
- # 매 사이클마다 전체 잔고처럼 보이는 문제를 방지
- today = now.date()
- if self._today_buy_date != today:
- # 날짜 변경 시 리셋 (09:00 리셋 블록에서 이미 처리되지만 안전망으로 이중 체크)
- self._today_buy_total = 0
- self._today_buy_date = today
-
- # KIS가 제공하는 금일매수금액이 있으면 그것을 우선 사용 (더 정확)
kis_today_buy = int(balance.get("today_buy_amt", 0))
- if kis_today_buy > 0:
- # KIS 값이 유효하면 로컬 추적값과 최댓값으로 사용 (둘 다 참조)
- effective_today_buy = max(kis_today_buy, self._today_buy_total)
- else:
- effective_today_buy = self._today_buy_total
-
- # 실제 사용 가능한 예수금 = KIS 예수금 - 당일 이미 집행한 매수금액
+ effective_today_buy = self.ledger.effective_today_buy(kis_today_buy)
+ tracking_deposit = self.ledger.available_deposit(
+ raw_deposit, Config.MAX_DAILY_BUY_RATIO, kis_today_buy
+ )
max_daily_buy = int(raw_deposit * Config.MAX_DAILY_BUY_RATIO)
- tracking_deposit = max(0, min(raw_deposit, max_daily_buy) - effective_today_buy)
print(f"[Bot] 예수금: {raw_deposit:,}원 | 당일매수: {effective_today_buy:,}원 | "
f"사용가능: {tracking_deposit:,}원 (한도 {max_daily_buy:,}원)")
@@ -654,14 +546,10 @@ class AutoTradingBot:
continue
# [v2.1] 연속 손절 후 매수 일시 중단 체크
- if self._buy_paused_until and datetime.now() < self._buy_paused_until:
+ if self.ledger.is_buy_paused(datetime.now()):
print(f"[Bot] [Skip Buy] 연속 손절 매수 중단 중 (재개: "
- f"{self._buy_paused_until.strftime('%H:%M')}) - {ticker_name}")
+ f"{self.ledger.buy_paused_until.strftime('%H:%M')}) - {ticker_name}")
continue
- elif self._buy_paused_until and datetime.now() >= self._buy_paused_until:
- self._buy_paused_until = None
- self._consecutive_stop_losses_today = 0
- print("[Bot] 매수 일시 중단 해제")
current_price = float(res['current_price'])
if current_price <= 0:
@@ -676,6 +564,31 @@ class AutoTradingBot:
required_amount = current_price * qty
+ # [v3.2] 포트폴리오 리스크 게이트 검증 (테마 집중/동시보유 상한)
+ risk_holdings = [
+ {"ticker": c, "eval_amount": int(float(h.get("current_price", 0))
+ * int(h.get("qty", 0)))}
+ for c, h in current_holdings.items()
+ ]
+ risk_dec = self.risk_gate.evaluate_buy(
+ ticker=ticker,
+ candidate_amount=int(required_amount),
+ current_holdings=risk_holdings,
+ total_capital=max(total_eval, 1),
+ )
+ if not risk_dec.allowed:
+ print(f"[Bot] [Skip Buy] RiskGate: {risk_dec.reason} ({ticker_name})")
+ continue
+ if risk_dec.max_allowed_amount < required_amount:
+ new_qty = int(risk_dec.max_allowed_amount / current_price)
+ if new_qty <= 0:
+ print(f"[Bot] [Skip Buy] RiskGate 부분허용 금액 부족 ({ticker_name})")
+ continue
+ print(f"[Bot] RiskGate 부분허용: qty {qty}→{new_qty} "
+ f"({risk_dec.reason})")
+ qty = new_qty
+ required_amount = current_price * qty
+
# 예수금 확인 (tracking_deposit는 당일 누적 매수 차감 후 가용액)
if tracking_deposit < required_amount:
qty = int(tracking_deposit / current_price)
@@ -727,19 +640,16 @@ class AutoTradingBot:
)
tracking_deposit -= required_amount
- # [v3.1] 사이클 간 추적 (KIS T+2 미차감 보완)
- self._today_buy_total += required_amount
+ self.ledger.record_buy(
+ ticker, int(required_amount),
+ {"tech": res.get("tech", 0.5),
+ "sentiment": res.get("sentiment", 0.5),
+ "lstm": res.get("lstm_score", 0.5)},
+ )
buys_this_cycle += 1
- print(f"[Bot] 당일 누적 매수: {self._today_buy_total:,}원 "
+ print(f"[Bot] 당일 누적 매수: {self.ledger.today_buy_total:,}원 "
f"(잔여 예수금: {tracking_deposit:,}원)")
- # [v3.1] 앙상블 학습용 매수 신호 점수 보관 (매도 시 record_trade에 활용)
- self._buy_scores[ticker] = {
- "tech": res.get("tech", 0.5),
- "sentiment": res.get("sentiment", 0.5),
- "lstm": res.get("lstm_score", 0.5),
- }
-
# 최고가 초기 설정
self.peak_prices[ticker] = current_price
self._save_peak_prices()
@@ -777,7 +687,7 @@ class AutoTradingBot:
self.perf_db.close_trade(ticker, sell_price, yld)
# [v3.1] 앙상블 학습 데이터 기록 (매수 시 저장한 신호 점수 + 실현 수익률)
- buy_sig = self._buy_scores.pop(ticker, None)
+ buy_sig = self.ledger.pop_buy_scores(ticker)
if buy_sig is not None:
try:
get_ensemble().record_trade(
@@ -793,22 +703,17 @@ class AutoTradingBot:
except Exception as _ee:
print(f"[Bot] [Ensemble] record_trade 실패: {_ee}")
- # [v2.1] 손절 횟수 추적 → 연속 3회 손절 시 매수 30분 일시 중단
- if yld < 0:
- self._consecutive_stop_losses_today += 1
- if self._consecutive_stop_losses_today >= 3:
- self._buy_paused_until = datetime.now() + timedelta(minutes=30)
- warn_msg = (
- f"⛔ [매수 일시 중단] 당일 손절 "
- f"{self._consecutive_stop_losses_today}회 → "
- f"30분간 매수 정지 (재개: "
- f"{self._buy_paused_until.strftime('%H:%M')})"
- )
- self.messenger.send_message(warn_msg)
- print(f"[Bot] 연속 손절 {self._consecutive_stop_losses_today}회 → 매수 30분 중단")
- else:
- # 수익 실현 시 연속 손절 카운터 리셋
- self._consecutive_stop_losses_today = 0
+ # [v2.1] 손절 횟수 추적 → 연속 N회 손절 시 매수 일시 중단
+ triggered = self.ledger.record_sell_outcome(yld, datetime.now())
+ if triggered:
+ warn_msg = (
+ f"⛔ [매수 일시 중단] 당일 손절 "
+ f"{self.ledger.consecutive_stop_losses}회 → "
+ f"{self.ledger.stop_loss_pause_minutes}분간 매수 정지 "
+ f"(재개: {self.ledger.buy_paused_until.strftime('%H:%M')})"
+ )
+ self.messenger.send_message(warn_msg)
+ print(f"[Bot] 연속 손절 {self.ledger.consecutive_stop_losses}회 → 매수 일시 중단")
# 최고가 기록 삭제
if ticker in self.peak_prices:
@@ -838,27 +743,6 @@ class AutoTradingBot:
def loop(self):
print(f"[Bot] Module Started (PID: {os.getpid()}) [v3.1]")
- # [캘린더 체크] 오늘이 휴장일이면 알림 후 즉시 EOD 종료
- if not self._calendar.is_trading_day():
- summary = self._calendar.status_summary()
- print(f"[Bot] 오늘은 휴장일 ({summary}) — 봇을 시작하지 않습니다.")
- self.messenger.send_message(
- f"[Bot] 오늘은 휴장일입니다.\n{summary}"
- )
- # EOD 마커 기록 후 종료
- try:
- from pathlib import Path
- import datetime as _dt
- eod_file = Path(Config.DATA_DIR) / ".eod_date"
- eod_file.write_text(str(_dt.date.today()), encoding="utf-8")
- except Exception:
- pass
- if self.eod_event:
- self.eod_event.set()
- if self.shutdown_event:
- self.shutdown_event.set()
- return
-
_llm_label = (
f"Gemini ({Config.GEMINI_MODEL})"
if Config.GEMINI_API_KEY
diff --git a/modules/config.py b/modules/config.py
index b692047..e03c035 100644
--- a/modules/config.py
+++ b/modules/config.py
@@ -50,6 +50,11 @@ class Config:
MAX_BUY_PER_CYCLE = int(os.getenv("MAX_BUY_PER_CYCLE", "2")) # 사이클당 최대 매수 종목 수
EOD_SHUTDOWN_BUFFER_MIN = int(os.getenv("EOD_SHUTDOWN_BUFFER_MIN", "5")) # 장 마감 후 EOD 처리까지 대기 분
MAX_DAILY_BUY_RATIO = float(os.getenv("MAX_DAILY_BUY_RATIO", "0.80")) # 예수금 대비 일일 최대 매수 비율
+
+ # 포트폴리오 리스크 게이트 (v3.2)
+ MAX_TICKERS_PER_THEME = int(os.getenv("MAX_TICKERS_PER_THEME", "2")) # 테마당 최대 종목 수
+ MAX_THEME_EXPOSURE_RATIO = float(os.getenv("MAX_THEME_EXPOSURE_RATIO", "0.40")) # 테마당 최대 노출 비율 (총자산 대비)
+ MAX_TOTAL_HOLDINGS = int(os.getenv("MAX_TOTAL_HOLDINGS", "7")) # 총 보유 종목 수 상한
# 6. 데이터 경로
DATA_DIR = os.path.join(BASE_DIR, "data")
diff --git a/modules/services/news.py b/modules/services/news.py
index c638f99..4237b07 100644
--- a/modules/services/news.py
+++ b/modules/services/news.py
@@ -1,6 +1,23 @@
import time
import requests
import xml.etree.ElementTree as ET
+from typing import Optional
+
+
+def _parse_items(root, max_items):
+ """RSS item → [{title, url, pub_date, source}]"""
+ out = []
+ for item in root.findall(".//item")[:max_items]:
+ t = item.find("title")
+ l = item.find("link")
+ p = item.find("pubDate")
+ title = (t.text or "").strip() if t is not None else ""
+ url = (l.text or "").strip() if l is not None else ""
+ pub = (p.text or "").strip() if p is not None else ""
+ if not title:
+ continue
+ out.append({"title": title, "url": url, "pub_date": pub, "source": "Google News"})
+ return out
class NewsCollector:
@@ -11,24 +28,29 @@ class NewsCollector:
try:
resp = requests.get(url, timeout=5)
root = ET.fromstring(resp.content)
- items = []
- for item in root.findall(".//item")[:5]:
- title = item.find("title").text
- items.append({"title": title, "source": "Google News"})
- return items
+ return _parse_items(root, 5)
except Exception as e:
print(f"[News] Collection failed: {e}")
return []
class AsyncNewsCollector:
- """비동기 뉴스 수집 + 5분 캐싱"""
+ """비동기 뉴스 수집 + 5분 캐싱 + (옵션) 스냅샷 저장"""
- def __init__(self):
+ def __init__(self, snapshot_store=None):
self._cache = None
self._cache_time = 0
self._cache_ttl = 300 # 5분
self._stock_cache = {} # {stock_name: (items, timestamp)}
+ self._snap = snapshot_store # NewsSnapshotStore | None
+
+ def _save_snapshot(self, items, query: str, ticker: Optional[str] = None):
+ if not self._snap or not items:
+ return
+ try:
+ self._snap.save_many(items, query=query, ticker=ticker)
+ except Exception as e:
+ print(f"[News] snapshot 저장 실패: {e}")
def get_market_news(self, query="주식 시장"):
"""동기 인터페이스 (하위 호환)"""
@@ -39,6 +61,7 @@ class AsyncNewsCollector:
result = NewsCollector.get_market_news(query)
self._cache = result
self._cache_time = now
+ self._save_snapshot(result, query=query)
return result
async def get_market_news_async(self, query="주식 시장"):
@@ -54,13 +77,10 @@ class AsyncNewsCollector:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
content = await resp.read()
root = ET.fromstring(content)
- items = []
- for item in root.findall(".//item")[:5]:
- title = item.find("title").text
- items.append({"title": title, "source": "Google News"})
-
+ items = _parse_items(root, 5)
self._cache = items
self._cache_time = now
+ self._save_snapshot(items, query=query)
return items
except ImportError:
return self.get_market_news(query)
@@ -70,9 +90,10 @@ class AsyncNewsCollector:
return self._cache
return self.get_market_news(query)
- async def get_stock_news_async(self, stock_name, max_items=3):
+ async def get_stock_news_async(self, stock_name, max_items=3, ticker: Optional[str] = None):
"""종목별 뉴스 수집 (5분 캐싱)
stock_name: 종목 이름 (e.g. '삼성전자', 'SK하이닉스')
+ ticker: 스냅샷 저장 시 종목코드 (옵션)
"""
now = time.time()
cached = self._stock_cache.get(stock_name)
@@ -88,13 +109,9 @@ class AsyncNewsCollector:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
content = await resp.read()
root = ET.fromstring(content)
- items = []
- for item in root.findall(".//item")[:max_items]:
- title_el = item.find("title")
- if title_el is not None and title_el.text:
- items.append({"title": title_el.text, "source": "Google News"})
-
+ items = _parse_items(root, max_items)
self._stock_cache[stock_name] = (items, now)
+ self._save_snapshot(items, query=f"{stock_name} 주가", ticker=ticker)
return items
except Exception as e:
print(f"[News] 종목 뉴스 수집 실패 ({stock_name}): {e}")
diff --git a/modules/services/news_snapshot.py b/modules/services/news_snapshot.py
new file mode 100644
index 0000000..e4ea961
--- /dev/null
+++ b/modules/services/news_snapshot.py
@@ -0,0 +1,189 @@
+"""
+뉴스 스냅샷 인프라 (v3.2)
+
+목적:
+ - 수집한 뉴스를 SQLite에 타임스탬프와 함께 영구 저장
+ - 사후 감성 신호 재검증 (LLM 재호출 / 모델 비교) 가능하게
+ - 백테스트에서 '그 시점에 실제로 알 수 있던 뉴스'만 사용
+
+스키마:
+ news_snapshots(
+ id INTEGER PK,
+ captured_at TEXT, # ISO8601 (KST) — 수집 시점
+ query TEXT, # 수집 쿼리 (예: '주식 시장', '삼성전자')
+ ticker TEXT, # 종목 코드 (종목 뉴스일 때, else NULL)
+ title TEXT,
+ url TEXT UNIQUE,
+ pub_date TEXT, # RSS pubDate 원본
+ source TEXT DEFAULT 'google_news'
+ )
+ sentiment_scores( # 야간 배치로 사후 생성
+ news_id INTEGER PK,
+ scored_at TEXT,
+ model TEXT,
+ sentiment REAL, # -1.0 ~ 1.0
+ confidence REAL,
+ raw_json TEXT,
+ FOREIGN KEY (news_id) REFERENCES news_snapshots(id)
+ )
+
+순수 I/O 모듈 — 네트워크 의존성 없음 → unit 테스트 가능.
+"""
+import os
+import sqlite3
+from datetime import datetime, timezone, timedelta
+from typing import Iterable, List, Optional, Dict
+
+KST = timezone(timedelta(hours=9))
+
+
+class NewsSnapshotStore:
+ """
+ SQLite 기반 뉴스 스냅샷 저장소.
+
+ 사용 예:
+ store = NewsSnapshotStore("data/news_snapshots.db")
+ store.save_many(items, query="삼성전자", ticker="005930")
+ rows = store.query_between(start, end, ticker="005930")
+ """
+
+ def __init__(self, db_path: str):
+ self.db_path = db_path
+ os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
+ self._init_schema()
+
+ # ──────────────────────────────────────────────
+ # 스키마
+ # ──────────────────────────────────────────────
+ def _connect(self) -> sqlite3.Connection:
+ conn = sqlite3.connect(self.db_path)
+ conn.row_factory = sqlite3.Row
+ return conn
+
+ def _init_schema(self):
+ with self._connect() as conn:
+ conn.executescript("""
+ CREATE TABLE IF NOT EXISTS news_snapshots (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ captured_at TEXT NOT NULL,
+ query TEXT NOT NULL,
+ ticker TEXT,
+ title TEXT NOT NULL,
+ url TEXT NOT NULL UNIQUE,
+ pub_date TEXT,
+ source TEXT DEFAULT 'google_news'
+ );
+ CREATE INDEX IF NOT EXISTS idx_news_captured
+ ON news_snapshots(captured_at);
+ CREATE INDEX IF NOT EXISTS idx_news_ticker
+ ON news_snapshots(ticker, captured_at);
+
+ CREATE TABLE IF NOT EXISTS sentiment_scores (
+ news_id INTEGER PRIMARY KEY,
+ scored_at TEXT NOT NULL,
+ model TEXT NOT NULL,
+ sentiment REAL NOT NULL,
+ confidence REAL NOT NULL,
+ raw_json TEXT,
+ FOREIGN KEY (news_id) REFERENCES news_snapshots(id)
+ );
+ """)
+
+ # ──────────────────────────────────────────────
+ # 쓰기
+ # ──────────────────────────────────────────────
+ def save_many(self, items: Iterable[Dict], query: str,
+ ticker: Optional[str] = None,
+ captured_at: Optional[datetime] = None) -> int:
+ """
+ 뉴스 다건 저장. URL 기준 중복 자동 무시.
+
+ Args:
+ items: [{"title": str, "url": str, "pub_date": str?}, ...]
+
+ Returns:
+ 실제로 삽입된 행 수
+ """
+ if captured_at is None:
+ captured_at = datetime.now(KST)
+ ts = captured_at.isoformat()
+
+ rows = []
+ for it in items:
+ title = (it.get("title") or "").strip()
+ url = (it.get("url") or "").strip()
+ if not title or not url:
+ continue
+ rows.append((ts, query, ticker, title, url, it.get("pub_date")))
+
+ if not rows:
+ return 0
+
+ with self._connect() as conn:
+ before = conn.total_changes
+ conn.executemany(
+ "INSERT OR IGNORE INTO news_snapshots "
+ "(captured_at, query, ticker, title, url, pub_date) "
+ "VALUES (?, ?, ?, ?, ?, ?)",
+ rows,
+ )
+ inserted = conn.total_changes - before
+ return inserted
+
+ def save_sentiment(self, news_id: int, model: str,
+ sentiment: float, confidence: float,
+ raw_json: str = "",
+ scored_at: Optional[datetime] = None) -> None:
+ if scored_at is None:
+ scored_at = datetime.now(KST)
+ with self._connect() as conn:
+ conn.execute(
+ "INSERT OR REPLACE INTO sentiment_scores "
+ "(news_id, scored_at, model, sentiment, confidence, raw_json) "
+ "VALUES (?, ?, ?, ?, ?, ?)",
+ (news_id, scored_at.isoformat(), model,
+ float(sentiment), float(confidence), raw_json),
+ )
+
+ # ──────────────────────────────────────────────
+ # 읽기
+ # ──────────────────────────────────────────────
+ def query_between(self, start: datetime, end: datetime,
+ ticker: Optional[str] = None,
+ query: Optional[str] = None) -> List[sqlite3.Row]:
+ """특정 기간 내 수집된 뉴스 조회."""
+ sql = "SELECT * FROM news_snapshots WHERE captured_at >= ? AND captured_at < ?"
+ args = [start.isoformat(), end.isoformat()]
+ if ticker is not None:
+ sql += " AND ticker = ?"
+ args.append(ticker)
+ if query is not None:
+ sql += " AND query = ?"
+ args.append(query)
+ sql += " ORDER BY captured_at ASC"
+ with self._connect() as conn:
+ return list(conn.execute(sql, args))
+
+ def pending_sentiment(self, limit: int = 100) -> List[sqlite3.Row]:
+ """아직 감성 점수가 없는 뉴스 반환 (야간 배치용)."""
+ with self._connect() as conn:
+ return list(conn.execute(
+ """SELECT n.* FROM news_snapshots n
+ LEFT JOIN sentiment_scores s ON s.news_id = n.id
+ WHERE s.news_id IS NULL
+ ORDER BY n.captured_at DESC
+ LIMIT ?""",
+ (limit,)
+ ))
+
+ def stats(self) -> Dict:
+ """DB 통계 (row 수, 감성 커버리지)."""
+ with self._connect() as conn:
+ total = conn.execute("SELECT COUNT(*) FROM news_snapshots").fetchone()[0]
+ scored = conn.execute("SELECT COUNT(*) FROM sentiment_scores").fetchone()[0]
+ return {
+ "total_news": total,
+ "scored": scored,
+ "pending": total - scored,
+ "coverage_pct": (scored / total * 100) if total else 0.0,
+ }
diff --git a/modules/strategy/daily_ledger.py b/modules/strategy/daily_ledger.py
new file mode 100644
index 0000000..c60ceda
--- /dev/null
+++ b/modules/strategy/daily_ledger.py
@@ -0,0 +1,130 @@
+"""
+일일 거래 장부 (DailyLedger) — v3.2
+
+bot.py에 흩어져 있던 당일 상태를 한 객체로 집약:
+ - 당일 누적 매수금액 (KIS T+2 미차감 보완용)
+ - 연속 손절 카운터 + 매수 일시중단 타이머
+ - 미매도 종목의 매수 신호 점수 (앙상블 학습용)
+ - 일별 스냅샷/주간평가 플래그
+
+날짜가 바뀌면 reset_if_new_day()가 자동 초기화.
+순수 객체로 구현 — 외부 I/O 없음 → 단위 테스트 가능.
+"""
+from dataclasses import dataclass, field
+from datetime import datetime, timedelta, date as date_cls
+from typing import Dict, Optional
+
+
+@dataclass
+class DailyLedger:
+ # ── 당일 매수 회계 ──
+ today_buy_total: int = 0
+ today_buy_date: Optional[date_cls] = None
+
+ # ── 연속 손절 / 매수 일시 중단 ──
+ consecutive_stop_losses: int = 0
+ buy_paused_until: Optional[datetime] = None
+ stop_loss_pause_threshold: int = 3
+ stop_loss_pause_minutes: int = 30
+
+ # ── 앙상블 학습용: 미매도 종목의 매수 신호 점수 ──
+ buy_scores: Dict[str, dict] = field(default_factory=dict)
+
+ # ── 일일 플래그 ──
+ snapshot_taken: bool = False
+ weekly_eval_sent: bool = False
+
+ # ──────────────────────────────────────────────
+ # 날짜 전환
+ # ──────────────────────────────────────────────
+ def reset_if_new_day(self, now: datetime) -> bool:
+ """
+ 오늘 날짜 기준으로 상태 초기화. 이미 오늘 자로 초기화됐으면 no-op.
+
+ Returns:
+ True — 실제로 초기화를 수행한 경우
+ False — 같은 날이라 그대로 둔 경우
+ """
+ today = now.date()
+ if self.today_buy_date == today:
+ return False
+ self.today_buy_total = 0
+ self.today_buy_date = today
+ self.buy_scores.clear()
+ self.snapshot_taken = False
+ self.weekly_eval_sent = False
+ # 연속 손절 카운터 / 일시중단 타이머는 날짜 전환 시에만 초기화
+ self.consecutive_stop_losses = 0
+ self.buy_paused_until = None
+ return True
+
+ # ──────────────────────────────────────────────
+ # 매수 / 매도 기록
+ # ──────────────────────────────────────────────
+ def record_buy(self, ticker: str, amount: int, scores: dict) -> None:
+ """매수 체결 기록. amount는 집행 금액(원), scores는 앙상블 신호."""
+ self.today_buy_total += int(amount)
+ self.buy_scores[ticker] = dict(scores)
+
+ def pop_buy_scores(self, ticker: str) -> Optional[dict]:
+ """매도 체결 시 앙상블 학습을 위해 매수 당시 신호를 반환하고 제거."""
+ return self.buy_scores.pop(ticker, None)
+
+ # ──────────────────────────────────────────────
+ # 손절 관리
+ # ──────────────────────────────────────────────
+ def record_sell_outcome(self, outcome_pct: float, now: datetime) -> bool:
+ """
+ 매도 결과를 반영해 연속 손절 카운터 업데이트.
+
+ Returns:
+ True — 임계치 도달 → 매수 일시중단 활성화됨
+ False — 임계치 미도달
+ """
+ if outcome_pct < 0:
+ self.consecutive_stop_losses += 1
+ if self.consecutive_stop_losses >= self.stop_loss_pause_threshold:
+ self.buy_paused_until = now + timedelta(
+ minutes=self.stop_loss_pause_minutes
+ )
+ return True
+ else:
+ self.consecutive_stop_losses = 0
+ return False
+
+ def is_buy_paused(self, now: datetime) -> bool:
+ """
+ 매수 일시중단 상태 조회. 만료되면 자동 해제 + 카운터 리셋.
+ """
+ if self.buy_paused_until is None:
+ return False
+ if now >= self.buy_paused_until:
+ self.buy_paused_until = None
+ self.consecutive_stop_losses = 0
+ return False
+ return True
+
+ # ──────────────────────────────────────────────
+ # 예수금 계산 (KIS T+2 보완)
+ # ──────────────────────────────────────────────
+ def effective_today_buy(self, kis_today_buy: int) -> int:
+ """
+ KIS API가 반환한 당일 매수금(`thdt_buy_amt`)과
+ 로컬 누적값 중 더 큰 값을 신뢰.
+ (모의투자는 T+2 미차감으로 인해 과소 보고되는 경우 있음)
+ """
+ return max(int(kis_today_buy or 0), self.today_buy_total)
+
+ def available_deposit(self, raw_deposit: int, max_daily_buy_ratio: float,
+ kis_today_buy: int = 0) -> int:
+ """
+ 당일 사용 가능한 예수금 계산.
+
+ max_daily_buy = raw_deposit × ratio
+ avail = min(raw_deposit, max_daily_buy) − effective_today_buy
+ """
+ if raw_deposit <= 0:
+ return 0
+ max_daily_buy = int(raw_deposit * max_daily_buy_ratio)
+ used = self.effective_today_buy(kis_today_buy)
+ return max(0, min(raw_deposit, max_daily_buy) - used)
diff --git a/modules/strategy/risk_gate.py b/modules/strategy/risk_gate.py
new file mode 100644
index 0000000..6c7b989
--- /dev/null
+++ b/modules/strategy/risk_gate.py
@@ -0,0 +1,150 @@
+"""
+포트폴리오 리스크 게이트 (v3.2)
+
+매수 체결 직전 호출되어 포트폴리오 레벨 제약을 검증:
+ 1. 총 보유 종목 수 상한
+ 2. 테마당 동시 보유 종목 수 상한
+ 3. 테마당 노출 금액 비율 상한 (총자산 대비)
+
+기존 매수 필터(예수금, 종목당 상한, 사이클당 매수 수)는 유지하고
+이 게이트가 "같은 테마에 집중되는 포지션"을 차단한다.
+
+순수 함수로 구현 — 의존성 없음 → 단위 테스트 가능.
+"""
+from dataclasses import dataclass
+from typing import Dict, Iterable, List, Optional
+
+
+@dataclass
+class RiskDecision:
+ allowed: bool
+ reason: str = ""
+ max_allowed_amount: int = 0 # 일부만 허용되는 경우 (테마 노출 상한)
+
+
+@dataclass
+class RiskConfig:
+ max_total_holdings: int = 7
+ max_tickers_per_theme: int = 2
+ max_theme_exposure_ratio: float = 0.40
+
+
+class PortfolioRiskGate:
+ """
+ 사용 예:
+ gate = PortfolioRiskGate(theme_map, RiskConfig())
+ decision = gate.evaluate_buy(
+ ticker="005930",
+ candidate_amount=3_000_000,
+ current_holdings=[{"ticker":"000660","eval_amount":2_500_000}, ...],
+ total_capital=50_000_000,
+ )
+ if not decision.allowed: skip
+ elif decision.max_allowed_amount < candidate_amount: partial buy
+ """
+
+ def __init__(self, theme_lookup, config: Optional[RiskConfig] = None):
+ """
+ Args:
+ theme_lookup: callable(ticker:str) -> list[str] (종목→테마 매핑 함수)
+ 혹은 dict 형태도 허용.
+ config: RiskConfig
+ """
+ if callable(theme_lookup):
+ self._theme_of = theme_lookup
+ elif isinstance(theme_lookup, dict):
+ self._theme_of = lambda t: theme_lookup.get(t, [])
+ else:
+ raise TypeError("theme_lookup must be callable or dict")
+ self.config = config or RiskConfig()
+
+ # ──────────────────────────────────────────────
+ # 내부: 테마별 현재 노출 집계
+ # ──────────────────────────────────────────────
+ def _aggregate_by_theme(self, holdings: Iterable[dict]) -> Dict[str, dict]:
+ """
+ Returns:
+ {theme: {"tickers": set, "amount": int}}
+ """
+ agg: Dict[str, dict] = {}
+ for h in holdings:
+ tkr = h.get("ticker")
+ amt = int(h.get("eval_amount", 0) or 0)
+ if not tkr:
+ continue
+ themes = self._theme_of(tkr) or []
+ for th in themes:
+ bucket = agg.setdefault(th, {"tickers": set(), "amount": 0})
+ bucket["tickers"].add(tkr)
+ bucket["amount"] += amt
+ return agg
+
+ # ──────────────────────────────────────────────
+ # 공개 API
+ # ──────────────────────────────────────────────
+ def evaluate_buy(self, ticker: str, candidate_amount: int,
+ current_holdings: List[dict],
+ total_capital: int) -> RiskDecision:
+ """
+ 매수 허가 여부 판단.
+
+ Returns:
+ RiskDecision
+ - allowed=False: 이유와 함께 차단
+ - allowed=True : max_allowed_amount만큼 허용 (candidate_amount 이하)
+ """
+ if candidate_amount <= 0 or total_capital <= 0:
+ return RiskDecision(False, "invalid_amount")
+
+ cfg = self.config
+
+ # 이미 보유 중이면 추가 매수는 이 게이트 대상 아님 (scale-in은 상위에서 처리)
+ held_tickers = {h.get("ticker") for h in current_holdings}
+ is_new_position = ticker not in held_tickers
+
+ # 1. 총 보유 종목 수 상한
+ if is_new_position and len(held_tickers) >= cfg.max_total_holdings:
+ return RiskDecision(
+ False,
+ f"max_total_holdings: {len(held_tickers)}/{cfg.max_total_holdings}"
+ )
+
+ themes = self._theme_of(ticker) or []
+ if not themes:
+ # 테마 정보 없음 → 테마 제약은 건너뛰고 통과
+ return RiskDecision(True, "no_theme_info", candidate_amount)
+
+ by_theme = self._aggregate_by_theme(current_holdings)
+
+ allowed_amount = candidate_amount
+ blocking_reasons = []
+
+ for th in themes:
+ bucket = by_theme.get(th, {"tickers": set(), "amount": 0})
+
+ # 2. 테마당 종목 수 상한 (신규 포지션일 때만)
+ if is_new_position and len(bucket["tickers"]) >= cfg.max_tickers_per_theme:
+ blocking_reasons.append(
+ f"theme[{th}] tickers {len(bucket['tickers'])}/{cfg.max_tickers_per_theme}"
+ )
+ continue
+
+ # 3. 테마당 노출 금액 비율 상한
+ max_theme_amount = int(total_capital * cfg.max_theme_exposure_ratio)
+ remaining = max_theme_amount - bucket["amount"]
+ if remaining <= 0:
+ blocking_reasons.append(
+ f"theme[{th}] exposure {bucket['amount']:,}/{max_theme_amount:,}"
+ )
+ continue
+
+ # 테마 잔여액이 candidate보다 작으면 부분 허용
+ allowed_amount = min(allowed_amount, remaining)
+
+ if blocking_reasons:
+ return RiskDecision(False, "; ".join(blocking_reasons))
+
+ if allowed_amount <= 0:
+ return RiskDecision(False, "theme_exposure_full")
+
+ return RiskDecision(True, "ok", allowed_amount)
diff --git a/modules/utils/process_tracker.py b/modules/utils/process_tracker.py
index 5984ca3..f5e5474 100644
--- a/modules/utils/process_tracker.py
+++ b/modules/utils/process_tracker.py
@@ -6,17 +6,12 @@
"""
import os
import time
-import datetime
import threading
-from pathlib import Path
from multiprocessing.shared_memory import SharedMemory
from modules.config import Config
# EOD 마커 파일: 오늘 장 마감 후 봇이 기록, Watchdog가 재시작 여부 결정에 사용
-_EOD_DATE_FILE = Path("data") / ".eod_date"
-
-
class ProcessTracker:
"""메모리 기반 프로세스 추적기"""
@@ -141,17 +136,6 @@ class ProcessWatchdog:
entry = self._watched.get(name)
return entry['process'] if entry else None
- @staticmethod
- def is_eod_today() -> bool:
- """오늘 EOD 마커 파일이 존재하면 True (장 마감 셧다운 → 재시작 차단)"""
- try:
- if not _EOD_DATE_FILE.exists():
- return False
- eod_date = datetime.date.fromisoformat(_EOD_DATE_FILE.read_text().strip())
- return eod_date >= datetime.date.today()
- except Exception:
- return False
-
def _watchdog_loop(self):
"""주기적으로 자식 프로세스 상태 확인"""
import multiprocessing
@@ -170,11 +154,6 @@ class ProcessWatchdog:
exit_code = proc.exitcode
restart_count = entry['restart_count']
- # [EOD 차단] 오늘 장 마감 셧다운이면 재시작하지 않음
- if ProcessWatchdog.is_eod_today():
- print(f"[Watchdog] {name}: EOD 셧다운 감지 — 재시작 건너뜀.")
- continue
-
if restart_count >= Config.MAX_RESTART_COUNT:
print(f"[Watchdog] {name} crashed (exit={exit_code}). "
f"Max restarts ({Config.MAX_RESTART_COUNT}) reached. Giving up.")