반복적인 IPC 오류 해결, 봇 오류 해결, 인증 오류 해결, 서버 자원 할당 오류 해결, 코드 리팩토링

This commit is contained in:
2026-02-14 18:03:13 +09:00
parent 4fd0aa91bc
commit 9dbf6e6791
15 changed files with 1452 additions and 847 deletions

113
README.md
View File

@@ -1,14 +1,19 @@
# 🤖 AI Automated Trading System (Windows Server Edition)
이 프로젝트는 **Python, PyTorch (Deep Learning), Ollama (LLM)**을 활용하여 한국 주식 시장(KIS)에서 자동으로 매매를 수행하는 고성능 AI 트레이딩 봇입니다.
**ProcessPoolExecutor 기반의 병렬 처리**와 **독립된 텔레그램 봇 프로세스**를 통해 높은 안정성과 응답 속도를 보장합니다.
## 🚀 Key Features
* **Multi-Process Architecture**: 메인 서버(`main_server.py`)가 트레이딩 봇 텔레그램 봇 독립된 프로세스로 관리하여 안정성 확보.
* **Advanced AI Analysis**: **RTX 5070 Ti (16GB VRAM)** 하드웨어 가속을 활용한 고성능 예측 모델 탑재.
* **Hybrid Strategy**: 기술적 분석 + 뉴스 감성 분석(LLM) + 가격 예측(LSTM)을 결합한 복합 추론.
* **Telegram Control**: 실시간 상태 조회, 리포트 수신, 프로세스 제어(`/restart` 등) 지원.
* **Safe Trading**: 예수금 초과 매수 방지, 손절/익절 자동화, API 속도 제한(Throttling) 적용.
* **Multi-Process Architecture**: 메인 서버, 트레이딩 봇, 텔레그램 봇이 각각 독립된 프로세스로 실행되어 상호 간섭을 최소화합니다.
* **Advanced AI Analysis**: **RTX 5070 Ti (16GB VRAM)** 하드웨어 가속을 활용한 **Attention-LSTM** 모델이 주가를 예측합니다.
* **Process Management System**:
* **Zombie Killer**: 서버 시작 시 이전에 종료되지 않은 좀비 프로세스를 자동으로 감지하고 제거합니다.
* **PID Tracking**: 실행 중인 모든 프로세스의 ID를 `pids.txt` 파일에 실시간으로 기록하여 식별을 돕습니다.
* **Reliable Telegram Bot**:
* **HTML Parsing**: 마크다운 에러를 방지하기 위해 안정적인 HTML 포맷을 사용하여 메시지를 전송합니다.
* **Interactive Commands**: `/status`, `/portfolio`, `/exec` 등 다양한 명령어로 봇을 실시간 제어할 수 있습니다.
* **Auto-Recovery**: `ProcessPoolExecutor`의 워커 프로세스가 충돌(OOM 등)할 경우 자동으로 감지하고 재시작합니다.
---
@@ -16,28 +21,27 @@
```plaintext
/
├── main_server.py # [Entry Point] 프로세스 매니저 및 서버 실행
├── watchlist_manager.py # [Automation] 매일 아침 종목 선정 및 Watchlist 갱신
├── main_server.py # [Entry Point] 프로세스 매니저 및 FastAPI 서버
├── pids.txt # [Runtime] 실행 중인 프로세스 ID 목록 (자동 관리)
├── modules/
│ ├── bot.py # [Core] 메인 트레이딩 루프 및 사이클 관리
│ ├── bot.py # [Core] 메인 트레이딩 봇 (스케줄러 & 상태 머신)
│ ├── config.py # [Config] 환경 변수 및 상수 관리
│ ├── analysis/ # [Brain] 분석 모듈
│ ├── analysis/ # [Brain] AI 분석 모듈
│ │ ├── deep_learning.py # PyTorch 기반 Attention-LSTM 모델
│ │ ├── technical.py # RSI, 볼린저밴드 등 보조지표 계산
│ │ └── macro.py # 거시경제(환율, 유가, 지수) 분석
│ ├── services/ # [I/O] 외부 서비스 연동
│ │ ├── kis.py # 한국투자증권 API 클라이언트 (Throttling 적용)
│ │ ├── telegram.py # 텔레그램 메시지 발송
│ │ ├── kis.py # 한국투자증권 API (Throttling 적용)
│ │ ├── telegram_bot/ # [Independent] 독립 프로세스 텔레그램 봇
│ │ ├── news.py # 네이버 뉴스 크롤링
│ │ ── ollama.py # Local LLM (Llama 3) 인터페이스
│ └── telegram_bot/ # 텔레그램 봇 서버 (Interactive Mode)
├── strategy/ # [Logic] 매수/매도 의사결정 로직
│ └── utils/ # [Util] IPC, 시스템 모니터링 등
├── data/ # [Runtime Data] 실행 중 생성되는 데이터 (Git 제외)
├── bot_ipc.json # 프로세스 간 상태 공유
├── watchlist.json # 감시 대상 종목 리스트
│ └── daily_trade_history.json # 당일 매매 기록
└── tests/ # 테스트 스크립트
│ │ ── ollama.py # Local LLM (Llama 3) 인터페이스
├── strategy/ # [Logic] 매수/매도 의사결정 프로세스
│ └── process.py # 워커 프로세스용 분석 함수 (병렬 처리)
│ └── utils/ # [Util] 유틸리티
│ ├── process_tracker.py # PID 추적 및 좀비 프로세스 정리
├── ipc.py # 프로세스 간 통신 (IPC)
└── monitor.py # 시스템 리소스 모니터링
└── ...
```
---
@@ -56,60 +60,45 @@
### 2. Hardware Acceleration (RTX 5070 Ti)
* **CUDA Optimization**: PyTorch를 통해 GPU 가속 활성화.
* **Specs**:
* Batch Size: 64
* Epochs: 200
* Precision: FP32
* **Specs**: Batch Size 64, Epochs 200, Precision FP32.
* 서버 시작 시 `High Performance Mode`가 자동으로 감지 및 활성화됩니다.
### 3. Integrated Decision Making (Quant Strategy)
AI 모델의 예측 결과는 단독으로 쓰이지 않고, 다음 요소들과 결합되어 최종 매수 결정을 내립니다.
1. **Technical Score (40%)**: RSI, 거래량, 변동성 지표.
2. **News Sentiment (30%)**: Ollama(LLM)가 분석한 최신 뉴스 긍/부정 점수.
3. **AI Prediction (30% ~ 60%)**: LSTM 모델의 상승 예측 신뢰도.
* *AI Confidence*가 85% 이상일 경우, AI 비중을 **60%**까지 동적으로 상향 조정.
* **OOM Protection**: GPU 메모리 보호를 위해 병렬 워커 수를 2개로 제한하고, 워커 충돌 시 자동 재시작합니다.
---
## 🛠️ Setup & Installation
## 🛠️ Usage & Troubleshooting
### 1. Prerequisites
* Python 3.10+
* NVIDIA GPU + CUDA Toolkit (Recommended for AI performance)
* Ollama (Local LLM Server running on port 11434)
### 2. Installation
### 1. Installation
```bash
# Clone Repository
# Clone & Install
git clone <repository-url>
# Install Dependencies
pip install -r requirements.txt
```
### 3. Configuration (.env)
프로젝트 루트에 `.env` 파일을 생성하고 아래 정보를 입력하세요.
```ini
# 한국투자증권 (KIS)
KIS_APP_KEY=your_app_key
KIS_APP_SECRET=your_app_secret
KIS_ACCOUNT=12345678-01
KIS_IS_VIRTUAL=True # True: 모의투자, False: 실전투자
# Telegram
TELEGRAM_BOT_TOKEN=your_bot_token
TELEGRAM_CHAT_ID=your_chat_id
# AI Server
OLLAMA_API_URL=http://localhost:11434
```
### 4. Run
```bash
# Start Server
python main_server.py
```
### 2. Process Management (`pids.txt`)
서버가 실행되면 `pids.txt` 파일에 현재 실행 중인 프로세스 목록이 기록됩니다.
```text
58360: Main Server (Uvicorn Worker)
72028: Trading Bot Main
66488: Telegram Bot Standalone
16372: Trading Bot Worker
...
```
* **CPU 사용량이 비정상적으로 높을 때**: 작업 관리자나 `Get-Process python`으로 확인한 PID가 `pids.txt`에 없다면 **좀비 프로세스**입니다.
* **자동 정리**: `main_server.py`를 다시 실행하면 시작 시 자동으로 좀비 프로세스를 찾아 종료합니다.
### 3. Telegram Commands
* `/start`: 봇 시작 및 명령어 안내
* `/status`: 현재 봇 상태, 시장 지수, AI 모델 상태 조회
* `/portfolio`: 현재 보유 종목 및 수익률 조회
* `/system`: CPU/GPU 사용량 및 프로세스 상태 확인
* `/restart`: 봇 프로세스 재시작 (업데이트 반영 시 유용)
* `/stop`: 봇 종료
---
## ⚠️ Disclaimer
본 소프트웨어는 투자를 보조하는 도구이며, 투자의 결과에 대한 책임은 전적으로 사용자에게 있습니다. AI의 예측은 100% 정확하지 않으며, 시장 상황에 따라 손실이 발생할 수 있습니다.
본 소프트웨어는 투자를 보조하는 도구이며, 투자의 결과에 대한 책임은 전적으로 사용자에게 있습니다. AI의 예측은 100% 정확하지 않으며, 시장 상황에 따라 손실이 발생할 수 있습니다. 모의투자 환경에서 충분한 테스트 후 사용하시기 바랍니다.

View File

@@ -2,9 +2,8 @@ import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
from analysis_module import TechnicalAnalyzer
from ai_predictor import PricePredictor
import torch
from modules.analysis.technical import TechnicalAnalyzer
from modules.analysis.deep_learning import PricePredictor
class Backtester:
def __init__(self, ticker, start_date, end_date, initial_capital=10000000):
@@ -92,7 +91,7 @@ class Backtester:
current_window_list = list(history_window)
# 1. 기술적 분석
tech_score, rsi, volatility = TechnicalAnalyzer.get_technical_score(today_price, current_window_list)
tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(today_price, current_window_list)
# 2. AI 예측 (Online Learning Simulation)
# 매일 재학습하면 너무 느리므로, 5일에 한번씩만 학습한다고 가정 (타협)

View File

@@ -1,11 +1,7 @@
import os
import uvicorn
import subprocess
import sys
import multiprocessing
from fastapi import FastAPI, Request
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
from contextlib import asynccontextmanager
from modules.config import Config
@@ -13,100 +9,121 @@ from modules.services.ollama import OllamaManager
from modules.services.kis import KISClient
from modules.services.news import NewsCollector
from modules.services.telegram import TelegramMessenger
from modules.bot import AutoTradingBot
from modules.utils.process_tracker import ProcessTracker, ProcessWatchdog
from modules.services.telegram_bot.runner import run_telegram_bot_standalone
# 전역 객체
bot_process = None
telegram_process = None
messenger = TelegramMessenger()
ai_agent = None
kis_client = None
news_collector = None
watchdog = None
import multiprocessing
from modules.bot import AutoTradingBot
from modules.utils.process_tracker import ProcessTracker
from modules.services.telegram_bot.runner import run_telegram_bot_standalone
# 봇 실행 래퍼 함수
def run_trading_bot():
def run_trading_bot(ipc_lock, command_queue, shutdown_event):
"""트레이딩 봇 실행 래퍼"""
ProcessTracker.register("Trading Bot Main")
bot = AutoTradingBot()
bot = AutoTradingBot(ipc_lock=ipc_lock, command_queue=command_queue,
shutdown_event=shutdown_event)
bot.loop()
@asynccontextmanager
async def lifespan(app: FastAPI):
# [Startup]
global bot_process, telegram_process, messenger, ai_agent, kis_client, news_collector
global ai_agent, kis_client, news_collector, watchdog
# 1. 설정 검증
Config.validate()
# 2. 전역 객체 초기화 (서버용)
# [Process Tracker] 초기화
# 2. 좀비 프로세스 정리
try:
ProcessTracker.check_and_kill_zombies()
ProcessTracker.clear()
ProcessTracker.register("Main Server (Uvicorn Worker)")
except: pass
except Exception:
pass
# 3. 전역 객체 초기화
ai_agent = OllamaManager()
kis_client = KISClient()
news_collector = NewsCollector()
print("🤖 Starting AI Trading Bot & Telegram Bot (Multimedia Mode)...")
# 4. 공유 리소스 생성
ipc_lock = multiprocessing.Lock()
command_queue = multiprocessing.Queue()
shutdown_event = multiprocessing.Event()
# 3. 멀티프로세스 실행
# (1) 트레이딩 봇
bot_process = multiprocessing.Process(target=run_trading_bot)
print("[Server] Starting AI Trading Bot & Telegram Bot...")
# 5. 자식 프로세스 생성
bot_args = (ipc_lock, command_queue, shutdown_event)
telegram_args = (ipc_lock, command_queue, shutdown_event)
bot_process = multiprocessing.Process(
target=run_trading_bot, args=bot_args)
bot_process.start()
# (2) 텔레그램 봇 (Polling)
telegram_process = multiprocessing.Process(target=run_telegram_bot_standalone)
telegram_process = multiprocessing.Process(
target=run_telegram_bot_standalone, args=telegram_args)
telegram_process.start()
# [Process Tracker] 자식 프로세스 PID 기록 (부모 관점)
try:
with open(ProcessTracker.FILE_PATH, "a", encoding="utf-8") as f:
f.write(f"{bot_process.pid}: Trading Bot Process (Parent View)\n")
f.write(f"{telegram_process.pid}: Telegram Bot Process (Parent View)\n")
except: pass
# 6. Watchdog 시작
watchdog = ProcessWatchdog(shutdown_event=shutdown_event)
watchdog.watch("Trading Bot", bot_process, run_trading_bot, bot_args)
watchdog.watch("Telegram Bot", telegram_process,
run_telegram_bot_standalone, telegram_args)
watchdog.start()
messenger.send_message("🖥️ **[Server Started]** Windows AI Server (Refactored) Online.")
messenger.send_message("[Server Started] Windows AI Server Online.")
yield
# [Shutdown]
print("🛑 Shutting down processes...")
print("[Server] Shutting down...")
shutdown_event.set()
if telegram_process and telegram_process.is_alive():
print(" - Stopping Telegram Bot...")
telegram_process.join(timeout=5)
if telegram_process.is_alive():
telegram_process.terminate()
telegram_process.join()
if watchdog:
watchdog.stop()
if bot_process and bot_process.is_alive():
print(" - Stopping Trading Bot...")
bot_process.join(timeout=5)
if bot_process.is_alive():
bot_process.terminate()
bot_process.join()
# 자식 프로세스 종료
for name in ["Trading Bot", "Telegram Bot"]:
proc = watchdog.get_process(name) if watchdog else None
if proc and proc.is_alive():
print(f" - Stopping {name}...")
proc.join(timeout=5)
if proc.is_alive():
proc.terminate()
proc.join(timeout=3)
# SharedMemory 정리
try:
from multiprocessing.shared_memory import SharedMemory
shm = SharedMemory(name=Config.SHM_NAME, create=False)
shm.close()
shm.unlink()
except Exception:
pass
messenger.send_message("[Server Stopped] Server Shutting Down.")
messenger.send_message("🛑 **[Server Stopped]** Server Shutting Down.")
app = FastAPI(title="Windows AI Stock Server", lifespan=lifespan)
@app.middleware("http")
async def log_requests(request: Request, call_next):
print(f"📥 {request.method} {request.url}")
print(f"[HTTP] {request.method} {request.url}")
response = await call_next(request)
return response
# 모델 정의
class ManualOrderRequest(BaseModel):
ticker: str
action: str # BUY, SELL
action: str
quantity: int
@app.get("/")
def index():
vram = 0
@@ -118,6 +135,7 @@ def index():
"service": "Windows AI Server (Refactored)"
}
@app.get("/trade/balance")
@app.get("/api/trade/balance")
async def get_balance():
@@ -125,6 +143,7 @@ async def get_balance():
return {"error": "Server not initialized"}
return kis_client.get_balance()
@app.post("/trade/order")
@app.post("/api/trade/order")
async def manual_order(req: ManualOrderRequest):
@@ -140,10 +159,10 @@ async def manual_order(req: ManualOrderRequest):
return {"status": "executed", "kis_result": result}
@app.post("/analyze/portfolio")
@app.post("/api/analyze/portfolio")
async def analyze_portfolio():
# 간단화된 분석 로직
balance = kis_client.get_balance()
news = news_collector.get_market_news()
@@ -156,13 +175,13 @@ async def analyze_portfolio():
analysis = ai_agent.request_inference(prompt)
return {"analysis": analysis}
if __name__ == "__main__":
# [안정성] 서버 시작 시 이전 좀비 프로세스 정리
try:
from modules.utils.process_tracker import ProcessTracker
ProcessTracker.check_and_kill_zombies()
except: pass
# Reload=True는 멀티프로세싱 자식 프로세스 관리에 취약하므로 비활성화 권장
print("🚀 Starting Windows AI Server...")
if __name__ == "__main__":
# 서버 시작 시 좀비 프로세스 정리
try:
ProcessTracker.check_and_kill_zombies()
except Exception:
pass
print("[Server] Starting Windows AI Server...")
uvicorn.run("main_server:app", host="0.0.0.0", port=8000, reload=False)

View File

@@ -1,40 +1,36 @@
import os
import time
import torch
import torch.nn as nn
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from modules.config import Config
# cuDNN 벤치마크 활성화 (고정 입력 크기에 대해 최적 커널 자동 선택)
torch.backends.cudnn.benchmark = True
class Attention(nn.Module):
"""Attention Mechanism for LSTM"""
def __init__(self, hidden_size):
super(Attention, self).__init__()
self.hidden_size = hidden_size
self.attn = nn.Linear(hidden_size, 1)
def forward(self, lstm_output):
# lstm_output: [batch_size, seq_len, hidden_size]
# attn_weights: [batch_size, seq_len, 1]
attn_weights = torch.softmax(self.attn(lstm_output), dim=1)
# context: [batch_size, hidden_size]
context = torch.sum(attn_weights * lstm_output, dim=1)
return context, attn_weights
class AdvancedLSTM(nn.Module):
"""
[RTX 5070 Ti Optimized] High-Capacity LSTM with Attention
- Hidden Size: 512 (Rich Feature Extraction)
- Layers: 4 (Deep Reasoning)
- Attention: Focus on critical time steps
"""
def __init__(self, input_size=1, hidden_size=512, num_layers=4, output_size=1, dropout=0.3):
super(AdvancedLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
batch_first=True, dropout=dropout)
batch_first=True, dropout=dropout)
self.attention = Attention(hidden_size)
self.fc = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
@@ -45,155 +41,311 @@ class AdvancedLSTM(nn.Module):
)
def forward(self, x):
# x: [batch, seq, feature]
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# LSTM Output
lstm_out, _ = self.lstm(x, (h0, c0)) # [batch, seq, hidden]
# Attention Mechanism
context, _ = self.attention(lstm_out) # [batch, hidden]
# Final Prediction
lstm_out, _ = self.lstm(x, (h0, c0))
context, _ = self.attention(lstm_out)
out = self.fc(context)
return out
def _unload_ollama():
"""LSTM 학습 전 Ollama 모델 언로드하여 GPU 메모리 확보"""
try:
import requests
url = f"{Config.OLLAMA_API_URL}/api/generate"
requests.post(url, json={
"model": Config.OLLAMA_MODEL,
"keep_alive": 0
}, timeout=5)
print("[AI] Ollama model unloaded (GPU memory freed)")
time.sleep(1) # 메모리 해제 대기
except Exception:
pass
def _preload_ollama():
"""LSTM 학습 후 Ollama 모델 다시 로드"""
try:
import requests
url = f"{Config.OLLAMA_API_URL}/api/generate"
requests.post(url, json={
"model": Config.OLLAMA_MODEL,
"prompt": "",
"keep_alive": "10m"
}, timeout=10)
except Exception:
pass
def _log_gpu_memory(tag=""):
"""GPU 메모리 사용량 로깅"""
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated(0) / 1024**3
reserved = torch.cuda.memory_reserved(0) / 1024**3
print(f"[AI GPU {tag}] Allocated: {allocated:.2f}GB / Reserved: {reserved:.2f}GB")
class PricePredictor:
"""
주가 예측을 위한 고성능 Deep Learning 모델 (RTX 5070 Ti Edition)
주가 예측 Deep Learning 모델 (GPU 최적화)
- 전체 학습 데이터를 GPU에 상주 (CPU↔GPU 전송 최소화)
- Ollama 모델 언로드/리로드로 GPU 메모리 확보
- Early Stopping + Mixed Precision (FP16)
- 종목별 모델 체크포인트
"""
def __init__(self):
self.scaler = MinMaxScaler(feature_range=(0, 1))
# [Hardware Spec] RTX 5070 Ti (16GB VRAM) 맞춤 설정
self.hidden_size = 512
self.num_layers = 4
self.model = AdvancedLSTM(input_size=1, hidden_size=self.hidden_size,
num_layers=self.num_layers, dropout=0.3)
num_layers=self.num_layers, dropout=0.3)
self.criterion = nn.MSELoss()
# CUDA 설정
self.device = torch.device('cpu')
self.use_amp = False
if torch.cuda.is_available():
try:
gpu_name = torch.cuda.get_device_name(0)
vram_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
# GPU 할당
self.device = torch.device('cuda')
self.model.to(self.device)
# Warm-up (컴파일 최적화 유도)
dummy = torch.zeros(1, 60, 1).to(self.device)
_ = self.model(dummy)
# Mixed Precision (Compute Capability >= 7.0: Volta 이상)
if torch.cuda.get_device_capability(0)[0] >= 7:
self.use_amp = True
print(f"🚀 [AI] Powered by {gpu_name} ({vram_gb:.1f}GB) - High Performance Mode On")
# Warm-up: CUDA 커널 컴파일 유도
dummy = torch.zeros(1, 60, 1, device=self.device)
with torch.no_grad():
_ = self.model(dummy)
torch.cuda.synchronize()
print(f"[AI] GPU Mode: {gpu_name} ({vram_gb:.1f}GB)"
f" | FP16={'ON' if self.use_amp else 'OFF'}"
f" | cuDNN Benchmark=ON")
_log_gpu_memory("init")
except Exception as e:
print(f"⚠️ [AI] GPU Init Failed: {e}")
print(f"[AI] GPU Init Failed ({e}), falling back to CPU")
self.device = torch.device('cpu')
self.model.to(self.device)
else:
print("⚠️ [AI] Running on CPU (Low Performance)")
print("[AI] No CUDA GPU detected. Running on CPU.")
# Optimizer 설정 (AdamW가 일반화 성능이 좀 더 좋음)
self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=0.0005, weight_decay=1e-4)
self.scaler_amp = torch.amp.GradScaler('cuda') if self.use_amp else None
# 학습 파라미터 강화
self.batch_size = 64
self.epochs = 200 # 충분한 학습
self.seq_length = 60 # 60일(약 3개월) 패턴 분석
self.max_epochs = 200
self.seq_length = 60
self.patience = 15
self.training_status = {
"is_training": False,
"loss": 0.0
"loss": 0.0,
"current_ticker": None
}
@staticmethod
def verify_hardware():
"""서버 시작 시 하드웨어 가속 여부 점검 및 로그 출력"""
if torch.cuda.is_available():
try:
gpu_name = torch.cuda.get_device_name(0)
vram_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
print(f"🚀 [AI Check] Hardware Detected: {gpu_name} ({vram_gb:.1f}GB VRAM)")
print(f" ✅ High Performance Mode is READY.")
print(f"[AI Check] {gpu_name} ({vram_gb:.1f}GB VRAM) | cuDNN={torch.backends.cudnn.is_available()}")
return True
except Exception as e:
print(f"⚠️ [AI Check] GPU Error: {e}")
print(f"[AI Check] GPU Error: {e}")
return False
else:
print("⚠️ [AI Check] No GPU Detected. Running in CPU Mode.")
return False
print("[AI Check] No GPU. CPU Mode.")
return False
def train_and_predict(self, prices, forecast_days=1):
"""
Online Learning & Prediction
"""
# 데이터가 최소 시퀀스 길이 + 여유분보다 적으면 예측 불가
def _get_checkpoint_path(self, ticker):
return os.path.join(Config.MODEL_DIR, f"{ticker}_lstm.pt")
def _load_checkpoint(self, ticker):
path = self._get_checkpoint_path(ticker)
if os.path.exists(path):
try:
checkpoint = torch.load(path, map_location=self.device, weights_only=True)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
print(f"[AI] Checkpoint loaded: {ticker}")
return True
except Exception as e:
print(f"[AI] Checkpoint load failed ({ticker}): {e}")
return False
def _save_checkpoint(self, ticker, epoch, loss):
path = self._get_checkpoint_path(ticker)
try:
torch.save({
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'epoch': epoch,
'loss': loss
}, path)
except Exception as e:
print(f"[AI] Checkpoint save failed ({ticker}): {e}")
def train_and_predict(self, prices, forecast_days=1, ticker=None):
if len(prices) < (self.seq_length + 10):
return None
# 1. 데이터 전처리
is_gpu = self.device.type == 'cuda'
# --- Ollama 모델 언로드 (GPU 메모리 확보) ---
if is_gpu:
_unload_ollama()
torch.cuda.empty_cache()
_log_gpu_memory("pre-train")
t_start = time.time()
# 1. 데이터 전처리 (CPU에서 numpy 작업)
data = np.array(prices).reshape(-1, 1)
scaled_data = self.scaler.fit_transform(data)
x_train, y_train = [], []
x_seqs, y_seqs = [], []
for i in range(len(scaled_data) - self.seq_length):
x_train.append(scaled_data[i:i+self.seq_length])
y_train.append(scaled_data[i+self.seq_length])
x_seqs.append(scaled_data[i:i + self.seq_length])
y_seqs.append(scaled_data[i + self.seq_length])
x_train_t = torch.FloatTensor(np.array(x_train)).to(self.device)
y_train_t = torch.FloatTensor(np.array(y_train)).to(self.device)
# 2. 텐서 생성 → 즉시 GPU로 이동 (이후 CPU↔GPU 전송 없음)
x_all = torch.FloatTensor(np.array(x_seqs)).to(self.device)
y_all = torch.FloatTensor(np.array(y_seqs)).to(self.device)
# 2. 학습
# Validation split (80/20)
split_idx = int(len(x_all) * 0.8)
x_train = x_all[:split_idx]
y_train = y_all[:split_idx]
x_val = x_all[split_idx:]
y_val = y_all[split_idx:]
dataset_size = len(x_train)
# 3. 체크포인트 로드
has_checkpoint = False
if ticker:
has_checkpoint = self._load_checkpoint(ticker)
max_epochs = 50 if has_checkpoint else self.max_epochs
# 4. 학습 (전체 데이터 GPU 상주, DataLoader 미사용)
self.model.train()
self.training_status["is_training"] = True
if ticker:
self.training_status["current_ticker"] = ticker
dataset_size = len(x_train_t)
best_val_loss = float('inf')
patience_counter = 0
final_loss = 0.0
actual_epochs = 0
for epoch in range(self.epochs):
perm = torch.randperm(dataset_size).to(self.device)
x_shuffled = x_train_t[perm]
y_shuffled = y_train_t[perm]
for epoch in range(max_epochs):
# --- Training (GPU 내에서 셔플 + 미니배치) ---
perm = torch.randperm(dataset_size, device=self.device)
x_shuffled = x_train[perm]
y_shuffled = y_train[perm]
epoch_loss = 0.0
steps = 0
for i in range(0, dataset_size, self.batch_size):
batch_x = x_shuffled[i:min(i+self.batch_size, dataset_size)]
batch_y = y_shuffled[i:min(i+self.batch_size, dataset_size)]
end = min(i + self.batch_size, dataset_size)
batch_x = x_shuffled[i:end]
batch_y = y_shuffled[i:end]
self.optimizer.zero_grad()
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad(set_to_none=True)
if self.use_amp:
with torch.amp.autocast('cuda'):
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
self.scaler_amp.scale(loss).backward()
self.scaler_amp.step(self.optimizer)
self.scaler_amp.update()
else:
outputs = self.model(batch_x)
loss = self.criterion(outputs, batch_y)
loss.backward()
self.optimizer.step()
epoch_loss += loss.item()
steps += 1
final_loss = epoch_loss / max(1, steps)
train_loss = epoch_loss / max(1, steps)
# --- Validation (GPU에서 직접 수행) ---
self.model.eval()
with torch.no_grad():
if self.use_amp:
with torch.amp.autocast('cuda'):
val_out = self.model(x_val)
val_loss = self.criterion(val_out, y_val).item()
else:
val_out = self.model(x_val)
val_loss = self.criterion(val_out, y_val).item()
self.model.train()
final_loss = train_loss
actual_epochs = epoch + 1
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= self.patience:
break
self.training_status["is_training"] = False
self.training_status["loss"] = final_loss
# 3. 예측
if is_gpu:
torch.cuda.synchronize()
elapsed = time.time() - t_start
print(f"[AI] {ticker or '?'}: {actual_epochs} epochs in {elapsed:.1f}s"
f" | loss={final_loss:.6f} val={best_val_loss:.6f}"
f" | device={self.device}")
# 5. 체크포인트 저장
if ticker:
self._save_checkpoint(ticker, actual_epochs, final_loss)
# 6. 예측
self.model.eval()
with torch.no_grad():
last_seq = torch.FloatTensor(scaled_data[-self.seq_length:]).unsqueeze(0).to(self.device)
predicted_scaled = self.model(last_seq)
predicted_price = self.scaler.inverse_transform(predicted_scaled.cpu().numpy())[0][0]
last_seq = torch.FloatTensor(
scaled_data[-self.seq_length:]
).unsqueeze(0).to(self.device)
if self.use_amp:
with torch.amp.autocast('cuda'):
predicted_scaled = self.model(last_seq)
else:
predicted_scaled = self.model(last_seq)
predicted_price = self.scaler.inverse_transform(
predicted_scaled.cpu().float().numpy())[0][0]
# 7. GPU 메모리 정리 + Ollama 리로드
if is_gpu:
# 학습 중간 텐서 해제
del x_all, y_all, x_train, y_train, x_val, y_val
torch.cuda.empty_cache()
_log_gpu_memory("post-train")
_preload_ollama()
current_price = prices[-1]
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
# 신뢰도 점수 (Loss가 낮을수록 높음, 0~1)
# Loss가 0.001이면 0.99, 0.01이면 0.9 정도 나오게 조정
confidence = 1.0 / (1.0 + (final_loss * 100))
return {
@@ -202,5 +354,58 @@ class PricePredictor:
"change_rate": round(change_rate, 2),
"trend": trend,
"loss": final_loss,
"confidence": round(confidence, 2)
"confidence": round(confidence, 2),
"epochs": actual_epochs,
"device": str(self.device)
}
def batch_predict(self, prices_dict):
results = {}
seqs = []
metas = []
for ticker, prices in prices_dict.items():
if len(prices) < (self.seq_length + 10):
results[ticker] = None
continue
data = np.array(prices).reshape(-1, 1)
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)
seq = torch.FloatTensor(scaled_data[-self.seq_length:]).unsqueeze(0)
seqs.append(seq)
metas.append((ticker, scaler, prices[-1]))
if not seqs:
return results
# 배치로 합쳐서 한번에 GPU 추론
batch = torch.cat(seqs, dim=0).to(self.device)
self.model.eval()
with torch.no_grad():
if self.use_amp:
with torch.amp.autocast('cuda'):
preds = self.model(batch)
else:
preds = self.model(batch)
preds_cpu = preds.cpu().float().numpy()
for i, (ticker, scaler, current_price) in enumerate(metas):
predicted_price = scaler.inverse_transform(preds_cpu[i:i+1])[0][0]
trend = "UP" if predicted_price > current_price else "DOWN"
change_rate = ((predicted_price - current_price) / current_price) * 100
results[ticker] = {
"current": current_price,
"predicted": float(predicted_price),
"change_rate": round(change_rate, 2),
"trend": trend
}
if self.device.type == 'cuda':
torch.cuda.empty_cache()
return results

View File

@@ -1,53 +1,50 @@
import time
import asyncio
import os
import sys
import json
import time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from datetime import datetime
# 모듈 임포트
from modules.config import Config
from modules.services.kis import KISClient
from modules.services.news import NewsCollector
from modules.services.news import AsyncNewsCollector
from modules.services.ollama import OllamaManager
from modules.services.telegram import TelegramMessenger
from modules.analysis.macro import MacroAnalyzer
from modules.utils.monitor import SystemMonitor
from modules.strategy.process import analyze_stock_process
# 기존 코드와의 호환성을 위해 상대 경로나 절대 경로로 임포트
# (리팩토링 과도기에는 일부 파일은 그대로 있을 수 있음)
try:
from theme_manager import ThemeManager
except ImportError:
# 템플릿용 더미
class ThemeManager:
def get_themes(self, code): return []
# 워커 프로세스 초기화 함수
def init_worker():
try:
from modules.utils.process_tracker import ProcessTracker
ProcessTracker.register("Trading Bot Worker")
except: pass
except Exception:
pass
class AutoTradingBot:
def __init__(self):
def __init__(self, ipc_lock=None, command_queue=None, shutdown_event=None):
# 1. 서비스 초기화
self.kis = KISClient()
self.news = NewsCollector()
# [안정성] GPU OOM 방지를 위해 워커 수 축소 (4 -> 2)
# [식별] 워커 프로세스 이름 등록
self.executor = ProcessPoolExecutor(max_workers=2, initializer=init_worker)
# 워커 프로세스 워밍업 (PID 등록 유도)
self.news = AsyncNewsCollector()
# GPU 경합 방지: 워커 1개만 사용 (LSTM 학습이 GPU 독점)
self.executor = ProcessPoolExecutor(max_workers=1, initializer=init_worker)
try:
list(self.executor.map(lambda x: x, range(2)))
except: pass
list(self.executor.map(lambda x: x, range(1)))
except Exception:
pass
self.messenger = TelegramMessenger()
self.theme_manager = ThemeManager()
self.ollama_monitor = OllamaManager() # GPU 모니터링용
self.ollama_monitor = OllamaManager()
# 2. 유틸리티 초기화
self.monitor = SystemMonitor(self.messenger, self.ollama_monitor)
@@ -59,114 +56,148 @@ class AutoTradingBot:
self.watchlist_updated_today = False
self.report_sent = False
# [IPC] BotIPC
# 4. 프로세스 관리
self.shutdown_event = shutdown_event
# 5. IPC (Shared Memory)
try:
from modules.utils.ipc import BotIPC
self.ipc = BotIPC()
from modules.utils.ipc import SharedIPC
self.ipc = SharedIPC(lock=ipc_lock, command_queue=command_queue)
except ImportError:
print("⚠️ BotIPC module not found.")
print("[Bot] SharedIPC module not found.")
self.ipc = None
# [Watchlist Manager]
# 6. Watchlist Manager
try:
from watchlist_manager import WatchlistManager
self.watchlist_manager = WatchlistManager(self.kis, watchlist_file=Config.WATCHLIST_FILE)
except ImportError:
self.watchlist_manager = None
# 기록 로드
# 7. 기록 로드
self.history_file = Config.HISTORY_FILE
self.load_trade_history()
# AI 하드웨어 점검
# 8. AI 하드웨어 점검
from modules.analysis.deep_learning import PricePredictor
PricePredictor.verify_hardware()
pass
# 9. KIS 비동기 클라이언트
try:
from modules.services.kis import KISAsyncClient
self.kis_async = KISAsyncClient(self.kis)
except ImportError:
self.kis_async = None
def load_trade_history(self):
"""파일에서 금일 매매 기록 로드"""
if os.path.exists(self.history_file):
try:
with open(self.history_file, "r", encoding="utf-8") as f:
self.daily_trade_history = json.load(f)
except Exception as e:
print(f"⚠️ Failed to load history: {e}")
except Exception:
self.daily_trade_history = []
else:
self.daily_trade_history = []
def save_trade_history(self):
"""매매 기록 저장"""
try:
with open(self.history_file, "w", encoding="utf-8") as f:
json.dump(self.daily_trade_history, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"⚠️ Failed to save history: {e}")
print(f"[Bot] Failed to save history: {e}")
def load_watchlist(self):
try:
with open(Config.WATCHLIST_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except:
except Exception:
return {}
def send_daily_report(self):
"""장 마감 리포트"""
if self.report_sent: return
print("📝 [Bot] Generating Daily Report...")
if self.report_sent:
return
print("[Bot] Generating Daily Report...")
balance = self.kis.get_balance()
total_eval = 0
if "total_eval" in balance:
total_eval = int(balance.get("total_eval", 0))
report = f"📅 **[Daily Closing Report]**\n" \
f"────────────────\n" \
f"💰 **총 자산**: `{total_eval:,}원`\n" \
f"📜 **오늘의 매매**: `{len(self.daily_trade_history)}건`\n\n"
total_eval = int(balance.get("total_eval", 0))
report = f"📅 <b>[Daily Closing Report]</b>\n" \
f"💰 <b>Total Asset:</b> <code>{total_eval:,}원</code>\n" \
f"📜 <b>Trades Today:</b> <code>{len(self.daily_trade_history)}건</code>\n\n"
if self.daily_trade_history:
for trade in self.daily_trade_history:
icon = "🔴" if trade['action'] == "BUY" else "🔵"
report += f"{icon} {trade['name']} {trade['qty']}\n"
action = trade['action']
icon = "🔴" if action == "BUY" else "🔵"
report += f"{icon} <b>{action}</b> {trade['name']} {trade['qty']}\n"
if "holdings" in balance and balance["holdings"]:
report += "\n📊 **보유 현황**\n"
report += "\n📊 <b>[Holdings]</b>\n"
for stock in balance["holdings"]:
yld = stock.get('yield', 0)
icon = "🔺" if yld > 0 else "🔻"
report += f"{icon} {stock['name']}: `{yld}%`\n"
yld = float(stock.get('yield', 0))
if yld > 0:
icon = "🔴"
yld_str = f"+{yld}"
elif yld < 0:
icon = "🔵"
yld_str = f"{yld}"
else:
icon = ""
yld_str = f"{yld}"
report += f"{icon} {stock['name']}: <code>{yld_str}%</code>\n"
self.messenger.send_message(report)
self.report_sent = True
def restart_executor(self):
"""프로세스 풀 복구"""
print("🔄 [Bot] Restarting Process Executor...")
print("[Bot] Restarting Process Executor...")
try:
self.executor.shutdown(wait=False)
except:
except Exception:
pass
# 워커 재시작
self.executor = ProcessPoolExecutor(max_workers=2, initializer=init_worker)
print("✅ [Bot] Process Executor Restarted.")
self.executor = ProcessPoolExecutor(max_workers=1, initializer=init_worker)
print("[Bot] Process Executor Restarted.")
def run_cycle(self):
def _process_commands(self):
"""IPC command queue 폴링 및 처리"""
if not self.ipc:
return
commands = self.ipc.poll_commands()
for cmd in commands:
command = cmd.get('command', '')
print(f"[Bot] Received command: {command}")
if command == 'restart':
self.messenger.send_message("[Bot] Restart requested via Telegram.")
# executor 재시작
self.restart_executor()
elif command == 'update_watchlist':
if self.watchlist_manager:
try:
summary = self.watchlist_manager.update_watchlist_daily()
self.messenger.send_message(f"[Watchlist Updated]\n{summary}")
except Exception as e:
self.messenger.send_message(f"Watchlist update failed: {e}")
async def run_cycle(self):
now = datetime.now()
# 1. 거시경제 분석 (우선 순위 상향)
# 0. 명령 큐 폴링
self._process_commands()
# 1. 거시경제 분석
macro_status = MacroAnalyzer.get_macro_status(self.kis)
is_crash = False
if macro_status['status'] == 'DANGER':
is_crash = True
if not self.is_macro_warning_sent:
self.messenger.send_message("🚨 **[MARKET CRASH ALERT]** 시장 급락 감지! 매수 중단.")
self.messenger.send_message("🚨 <b>[MARKET CRASH ALERT]</b> 시장 급락 감지! 매수 중단.")
self.is_macro_warning_sent = True
else:
if self.is_macro_warning_sent:
self.messenger.send_message("🌤️ **[MARKET RECOVERY]** 시장 안정화.")
self.messenger.send_message("🌤️ <b>[MARKET RECOVERY]</b> 시장 안정화.")
self.is_macro_warning_sent = False
# 2. IPC 상태 업데이트
@@ -181,16 +212,16 @@ class AutoTradingBot:
'watchlist': watchlist,
'discovered_stocks': list(self.discovered_stocks),
'is_macro_warning': self.is_macro_warning_sent,
'macro_indices': macro_status['indicators'], # [수정] 거시경제 지표 전달
'macro_indices': macro_status['indicators'],
'themes': {}
})
except Exception:
pass
# 2. 아침 업데이트 (08:00)
# 3. 아침 업데이트 (08:00)
if now.hour == 8 and 0 <= now.minute < 5:
if not self.watchlist_updated_today and self.watchlist_manager:
print("🌅 Morning Update...")
print("[Bot] Morning Update...")
try:
summary = self.watchlist_manager.update_watchlist_daily()
self.messenger.send_message(summary)
@@ -198,7 +229,7 @@ class AutoTradingBot:
except Exception as e:
self.messenger.send_message(f"Update Failed: {e}")
# 3. 리셋 (09:00)
# 4. 리셋 (09:00)
if now.hour == 9 and now.minute < 5:
self.daily_trade_history = []
self.save_trade_history()
@@ -206,26 +237,20 @@ class AutoTradingBot:
self.discovered_stocks.clear()
self.watchlist_updated_today = False
# 4. 시스템 감시
# 5. 시스템 감시 (3분 간격)
self.monitor.check_health()
# 5. 장 운영 시간 체크
# 6. 장 운영 시간 체크
if not (9 <= now.hour < 15 or (now.hour == 15 and now.minute < 30)):
# 장 마감 리포트 (15:40)
if now.hour == 15 and now.minute >= 40:
self.send_daily_report()
print("💤 Market Closed. Waiting...")
print("[Bot] Market Closed. Waiting...")
return
print(f"[Bot] Cycle Start: {now.strftime('%H:%M:%S')}")
# 6. 거시경제 분석 (완료됨)
# macro_status = ... (Moved to top)
print(f"[Bot] Cycle Start: {now.strftime('%H:%M:%S')}")
# 7. 종목 분석 및 매매
target_dict = self.load_watchlist()
# (Discovery 로직 생략 - 필요시 추가)
# 보유 종목 리스크 관리
balance = self.kis.get_balance()
@@ -240,148 +265,140 @@ class AutoTradingBot:
current_holdings[code] = stock
# 보유 수량이 0 이하라면 스킵 (오류 방지)
if qty <= 0:
continue
action = None
reason = ""
# 손절/익절 로직
if yld <= -5.0:
action = "SELL"
reason = "Stop Loss 📉"
reason = "Stop Loss"
elif yld >= 8.0:
action = "SELL"
reason = "Take Profit 🚀"
reason = "Take Profit"
if action == "SELL":
print(f"🚨 Risk Management: {reason} - {name} (Qty: {qty}, Yield: {yld}%)")
# 전량 매도
print(f"[Bot] Risk Management: {reason} - {name} (Qty: {qty}, Yield: {yld}%)")
res = self.kis.sell_stock(code, qty)
if res and res.get("status"):
self.messenger.send_message(f"🛡️ **[Risk SELL]** {name}\n"
f" • 사유: {reason}\n"
f" • 수량: {qty}\n"
f" • 수익률: {yld}%")
self.messenger.send_message(
f"🔵 <b>[Risk SELL]</b> {name}\n"
f" Reason: {reason}\n"
f" Qty: {qty}\n"
f" Yield: <code>{yld}%</code>")
self.daily_trade_history.append({
"action": "SELL",
"name": name,
"qty": qty,
"price": stock.get('current_price'),
"yield": yld
"action": "SELL", "name": name, "qty": qty,
"price": stock.get('current_price'), "yield": yld
})
self.save_trade_history()
else:
print(f"❌ Sell Failed for {name}: {res}")
# 분석 실행 (병렬 처리)
analysis_tasks = []
news_data = self.news.get_market_news()
news_data = await self.news.get_market_news_async()
# [수정] 실시간 잔고 추적용 변수 (매수 시 차감)
# 실시간 잔고 추적용 변수 (매수 시 차감)
tracking_deposit = int(balance.get("deposit", 0))
try:
for ticker, name in target_dict.items():
prices = self.kis.get_daily_price(ticker)
if not prices: continue
if not prices:
continue
# [신규] 외인 수급 분석
# 외인 수급 분석
investor_trend = self.kis.get_investor_trend(ticker)
future = self.executor.submit(analyze_stock_process, ticker, prices, news_data, investor_trend)
future = self.executor.submit(
analyze_stock_process, ticker, prices, news_data, investor_trend)
analysis_tasks.append(future)
# 결과 처리
loop = asyncio.get_event_loop()
for future in analysis_tasks:
try:
res = future.result()
res = await loop.run_in_executor(None, future.result)
ticker_name = target_dict.get(res['ticker'], 'Unknown')
print(f"📊 [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})")
print(f"[Bot] [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})")
if res['decision'] == "BUY":
if is_crash: continue
# 매수 로직 (예수금 체크 추가)
current_qty = 0
if res['ticker'] in current_holdings:
current_qty = current_holdings[res['ticker']]['qty']
if is_crash:
continue
current_price = float(res['current_price'])
if current_price <= 0: continue
if current_price <= 0:
continue
# 매수 수량 결정 (기본 1주, 추후 금액 기반으로 변경 가능)
qty = 1
required_amount = current_price * qty
# 예수금 확인
if tracking_deposit < required_amount:
print(f"💰 [Skip Buy] 예수금 부족 ({ticker_name}): 필요 {required_amount:,.0f} > 잔고 {tracking_deposit:,.0f}")
print(f"[Bot] [Skip Buy] 예수금 부족 ({ticker_name}): "
f"필요 {required_amount:,.0f} > 잔고 {tracking_deposit:,.0f}")
continue
print(f"🚀 Buying {ticker_name} {qty}ea")
# 실제 주문
print(f"[Bot] Buying {ticker_name} {qty}ea")
order = self.kis.buy_stock(res['ticker'], qty)
if order.get("status"):
self.messenger.send_message(f"🚀 **[BUY]** {ticker_name} {qty}\n"
f" • 가격: {current_price:,.0f}")
self.messenger.send_message(
f"🔴 <b>[BUY]</b> {ticker_name} {qty}\n"
f" Price: <code>{current_price:,.0f}원</code>")
self.daily_trade_history.append({
"action": "BUY",
"name": ticker_name,
"qty": qty,
"price": current_price
"action": "BUY", "name": ticker_name,
"qty": qty, "price": current_price
})
self.save_trade_history()
# [중요] 가상 잔고 차감 (연속 매수 시 초과 방지)
tracking_deposit -= required_amount
elif res['decision'] == "SELL":
print(f"📉 Selling {ticker_name} (Simulation)")
# 매도 로직 (필요 시 추가)
print(f"[Bot] Selling {ticker_name} (Simulation)")
except BrokenProcessPool:
raise # 상위 레벨에서 처리
raise
except Exception as e:
print(f" Analysis Worker Error: {e}")
print(f"[Bot] Analysis Worker Error: {e}")
except BrokenProcessPool:
print("⚠️ [Bot] Worker Process Crashed (OOM, CUDA Error?). Restarting Executor...")
print("[Bot] Worker Process Crashed. Restarting Executor...")
self.restart_executor()
except KeyboardInterrupt:
raise
except Exception as e:
print(f" Cycle Loop Error: {e}")
print(f"[Bot] Cycle Loop Error: {e}")
def loop(self):
print(f"🤖 Bot Module Started (PID: {os.getpid()})")
self.messenger.send_message("🤖 **[Bot Started]** 리팩토링된 봇이 시작되었습니다.")
print(f"[Bot] Module Started (PID: {os.getpid()})")
self.messenger.send_message("[Bot Started] 리팩토링된 봇이 시작되었습니다.")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while True:
try:
self.run_cycle()
except Exception as e:
print(f"⚠️ Loop Error: {e}")
self.messenger.send_message(f"⚠️ Loop Error: {e}")
time.sleep(60)
except KeyboardInterrupt:
print("🛑 [Bot] Stopped by User.")
finally:
print("🛑 [Bot] Shutting down executor...")
self.executor.shutdown(wait=False)
print("✅ [Bot] Executor shutdown complete.")
# shutdown 시그널 체크
if self.shutdown_event and self.shutdown_event.is_set():
print("[Bot] Shutdown signal received.")
break
def start_telegram_command_server(self):
"""텔레그램 봇 프로세스 실행 (독립 프로세스)"""
script = os.path.join(os.getcwd(), "modules", "services", "telegram_bot", "runner.py")
if os.path.exists(script):
import subprocess
subprocess.Popen([sys.executable, script], creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
print("🚀 Telegram Command Server Started")
else:
print(f"⚠️ Telegram Bot Runner not found: {script}")
try:
loop.run_until_complete(self.run_cycle())
except Exception as e:
print(f"[Bot] Loop Error: {e}")
self.messenger.send_message(f"[Bot] Loop Error: {e}")
# 비동기 sleep (shutdown 체크하면서 대기)
for _ in range(60):
if self.shutdown_event and self.shutdown_event.is_set():
break
time.sleep(1)
except KeyboardInterrupt:
print("[Bot] Stopped by User.")
finally:
print("[Bot] Shutting down executor...")
self.executor.shutdown(wait=False)
if self.ipc:
self.ipc.cleanup()
loop.close()
print("[Bot] Executor shutdown complete.")

View File

@@ -42,11 +42,27 @@ class Config:
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR, exist_ok=True)
IPC_FILE = os.path.join(DATA_DIR, "bot_ipc.json")
HISTORY_FILE = os.path.join(DATA_DIR, "daily_trade_history.json")
WATCHLIST_FILE = os.path.join(DATA_DIR, "watchlist.json")
# 7. 타임아웃 등
# 모델 체크포인트 디렉토리
MODEL_DIR = os.path.join(DATA_DIR, "models")
if not os.path.exists(MODEL_DIR):
os.makedirs(MODEL_DIR, exist_ok=True)
# 7. IPC 설정
SHM_NAME = "web_ai_bot_ipc"
SHM_SIZE = 131072 # 128KB
IPC_STALENESS = 120 # 120초 (메인 봇 사이클 60초 + 여유)
# 8. GPU 설정
VRAM_WARNING_THRESHOLD = 12.0 # GB (14 → 12로 조기 경고)
# 9. 프로세스 관리
WATCHDOG_INTERVAL = 30 # 헬스체크 간격(초)
MAX_RESTART_COUNT = 3 # 최대 자동 재시작 횟수
# 10. 타임아웃 등
HTTP_TIMEOUT = 10
@staticmethod

View File

@@ -497,18 +497,161 @@ class KISClient:
if data['rt_cd'] != '0':
return None
# output 리스트: [ {stck_bsop_date: 날짜, frgn_ntby_qty: 외인순매수, orgn_ntby_qty: 기관순매수, ...}, ... ]
trends = []
for item in data['output'][:5]: # 최근 5일치만
for item in data['output'][:5]:
trends.append({
"date": item['stck_bsop_date'],
"foreigner": self._safe_int(item.get('frgn_ntby_qty')), # 외인 순매수량
"institutional": self._safe_int(item.get('orgn_ntby_qty')), # 기관 순매수량
"price_change": float(item['prdy_vrss']) # 전일대비 등락금액
"foreigner": self._safe_int(item.get('frgn_ntby_qty')),
"institutional": self._safe_int(item.get('orgn_ntby_qty')),
"price_change": float(item['prdy_vrss'])
})
# 최근일이 0번 인덱스임
return trends
except Exception as e:
print(f" 투자자 동향 조회 실패({ticker}): {e}")
print(f"[KIS] 투자자 동향 조회 실패({ticker}): {e}")
return None
class KISAsyncClient:
"""
비동기 KIS API 클라이언트
- aiohttp 기반 HTTP 호출
- 동기 KISClient의 토큰/설정을 공유
- 다중 종목 병렬 수집용
"""
def __init__(self, sync_client):
self.sync = sync_client
self.min_interval = 0.5 # 초당 2회 제한
async def _async_get(self, session, url, headers, params):
"""비동기 GET 요청"""
try:
async with session.get(url, headers=headers, params=params) as resp:
return await resp.json()
except Exception as e:
print(f"[KIS Async] Request failed: {e}")
return None
async def get_daily_price_async(self, ticker):
"""비동기 일별 시세 조회"""
import aiohttp
import asyncio
self.sync.ensure_token()
url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-price"
headers = self.sync._get_headers(tr_id="FHKST01010400")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1"
}
async with aiohttp.ClientSession() as session:
data = await self._async_get(session, url, headers, params)
if data and data.get('rt_cd') == '0':
prices = [int(item['stck_clpr']) for item in data['output']]
prices.reverse()
return prices
return []
async def get_investor_trend_async(self, ticker):
"""비동기 투자자 동향 조회"""
import aiohttp
self.sync.ensure_token()
url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-investor"
headers = self.sync._get_headers(tr_id="FHKST01010900")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker
}
async with aiohttp.ClientSession() as session:
data = await self._async_get(session, url, headers, params)
if data and data.get('rt_cd') == '0':
trends = []
for item in data['output'][:5]:
trends.append({
"date": item['stck_bsop_date'],
"foreigner": self.sync._safe_int(item.get('frgn_ntby_qty')),
"institutional": self.sync._safe_int(item.get('orgn_ntby_qty')),
"price_change": float(item['prdy_vrss'])
})
return trends
return None
async def get_daily_prices_batch(self, tickers):
"""여러 종목의 일별 시세를 병렬로 조회"""
import aiohttp
import asyncio
self.sync.ensure_token()
results = {}
async with aiohttp.ClientSession() as session:
tasks = []
for i, ticker in enumerate(tickers):
# rate limit: 0.5초 간격으로 요청 생성
if i > 0:
await asyncio.sleep(self.min_interval)
url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-price"
headers = self.sync._get_headers(tr_id="FHKST01010400")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker,
"FID_PERIOD_DIV_CODE": "D",
"FID_ORG_ADJ_PRC": "1"
}
tasks.append((ticker, self._async_get(session, url, headers, params)))
for ticker, task in tasks:
data = await task
if data and data.get('rt_cd') == '0':
prices = [int(item['stck_clpr']) for item in data['output']]
prices.reverse()
results[ticker] = prices
else:
results[ticker] = []
return results
async def get_investor_trends_batch(self, tickers):
"""여러 종목의 투자자 동향을 병렬로 조회"""
import aiohttp
import asyncio
self.sync.ensure_token()
results = {}
async with aiohttp.ClientSession() as session:
tasks = []
for i, ticker in enumerate(tickers):
if i > 0:
await asyncio.sleep(self.min_interval)
url = f"{self.sync.base_url}/uapi/domestic-stock/v1/quotations/inquire-investor"
headers = self.sync._get_headers(tr_id="FHKST01010900")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": ticker
}
tasks.append((ticker, self._async_get(session, url, headers, params)))
for ticker, task in tasks:
data = await task
if data and data.get('rt_cd') == '0':
trends = []
for item in data['output'][:5]:
trends.append({
"date": item['stck_bsop_date'],
"foreigner": self.sync._safe_int(item.get('frgn_ntby_qty')),
"institutional": self.sync._safe_int(item.get('orgn_ntby_qty')),
"price_change": float(item['prdy_vrss'])
})
results[ticker] = trends
else:
results[ticker] = None
return results

View File

@@ -1,11 +1,10 @@
import time
import requests
import xml.etree.ElementTree as ET
class NewsCollector:
"""
NAS에서 뉴스를 받지 못할 경우, Windows 서버에서 직접 뉴스를 수집하는 모듈
(Google News RSS 활용)
"""
"""동기 뉴스 수집 (Google News RSS)"""
@staticmethod
def get_market_news(query="주식 시장"):
url = f"https://news.google.com/rss/search?q={query}&hl=ko&gl=KR&ceid=KR:ko"
@@ -18,5 +17,56 @@ class NewsCollector:
items.append({"title": title, "source": "Google News"})
return items
except Exception as e:
print(f"❌ 뉴스 수집 실패: {e}")
print(f"[News] Collection failed: {e}")
return []
class AsyncNewsCollector:
"""비동기 뉴스 수집 + 5분 캐싱"""
def __init__(self):
self._cache = None
self._cache_time = 0
self._cache_ttl = 300 # 5분
def get_market_news(self, query="주식 시장"):
"""동기 인터페이스 (하위 호환)"""
now = time.time()
if self._cache and (now - self._cache_time) < self._cache_ttl:
return self._cache
result = NewsCollector.get_market_news(query)
self._cache = result
self._cache_time = now
return result
async def get_market_news_async(self, query="주식 시장"):
"""비동기 뉴스 수집 (aiohttp + 캐싱)"""
now = time.time()
if self._cache and (now - self._cache_time) < self._cache_ttl:
return self._cache
try:
import aiohttp
url = f"https://news.google.com/rss/search?q={query}&hl=ko&gl=KR&ceid=KR:ko"
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
content = await resp.read()
root = ET.fromstring(content)
items = []
for item in root.findall(".//item")[:5]:
title = item.find("title").text
items.append({"title": title, "source": "Google News"})
self._cache = items
self._cache_time = now
return items
except ImportError:
# aiohttp 미설치 시 동기 fallback
return self.get_market_news(query)
except Exception as e:
print(f"[News Async] Collection failed: {e}")
# 캐시가 있으면 반환, 없으면 동기 fallback
if self._cache:
return self._cache
return self.get_market_news(query)

View File

@@ -23,7 +23,7 @@ class OllamaManager:
try:
if pynvml:
pynvml.nvmlInit()
self.handle = pynvml.nvmlDeviceGetHandleByIndex(0) # 0번 GPU (3070 Ti)
self.handle = pynvml.nvmlDeviceGetHandleByIndex(0) # 0번 GPU (5070 Ti)
self.gpu_available = True
print("✅ [OllamaManager] NVIDIA GPU Monitoring On")
else:
@@ -74,16 +74,34 @@ class OllamaManager:
print(f"⚠️ GPU Status Check Failed: {e}")
return {"name": "N/A", "temp": 0, "vram_used": 0, "vram_total": 0, "load": 0}
def is_training_active(self):
"""LSTM 학습 중인지 확인 (GPU 메모리 충돌 방지)"""
try:
import torch
if torch.cuda.is_available():
# VRAM 사용량으로 학습 여부 추정
vram = self.check_vram()
return vram > Config.VRAM_WARNING_THRESHOLD
except Exception:
pass
return False
def request_inference(self, prompt, context_data=None):
"""
Ollama에 추론 요청
:param prompt: 시스템 프롬프트 + 사용자 입력
:param context_data: (Optional) 이전 대화 컨텍스트
- LSTM 학습 중이면 대기 (GPU 메모리 충돌 방지)
"""
# [5070Ti 최적화] VRAM이 14GB 이상이면 모델 언로드 시도 (16GB 중 여유분 확보)
# LSTM 학습 중이면 최대 60초 대기
import time as _time
for _ in range(12):
if not self.is_training_active():
break
print("[Ollama] Waiting for LSTM training to finish...")
_time.sleep(5)
vram = self.check_vram()
if vram > 14.0:
print(f"⚠️ [OllamaManager] High VRAM Usage ({vram:.1f}GB). Requesting unload.")
if vram > Config.VRAM_WARNING_THRESHOLD:
print(f"[OllamaManager] High VRAM Usage ({vram:.1f}GB). Requesting unload.")
try:
# keep_alive=0으로 설정하여 모델 즉시 언로드
requests.post(self.generate_url,

View File

@@ -4,72 +4,73 @@
"""
import os
import sys
import time
import multiprocessing
from dotenv import load_dotenv
# 환경 변수 로드
load_dotenv()
def run_telegram_bot_standalone():
def run_telegram_bot_standalone(ipc_lock=None, command_queue=None, shutdown_event=None):
"""텔레그램 봇만 독립적으로 실행"""
# 경로 문제 해결을 위해 상위 디렉토리 추가
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
from modules.services.telegram_bot.server import TelegramBotServer
from modules.utils.ipc import BotIPC
from modules.config import Config
from modules.utils.ipc import SharedIPC
from modules.utils.process_tracker import ProcessTracker
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
print("[Telegram] TELEGRAM_BOT_TOKEN not found in .env")
print("[Telegram] TELEGRAM_BOT_TOKEN not found in .env")
sys.exit(1)
ProcessTracker.register("Telegram Bot Standalone")
print(f"🤖 [Telegram Bot Process] Starting... (PID: {os.getpid()})")
print(f"🔗 [Telegram Bot] Standalone Process Mode (IPC Enabled)")
print(f"[Telegram Bot Process] Starting... (PID: {os.getpid()})")
# IPC 초기화
ipc = BotIPC()
# IPC 초기화 (shared memory + command queue)
ipc = SharedIPC(lock=ipc_lock, command_queue=command_queue)
# [최적화] 재시작 루프 구현
while True:
try:
# 봇 서버 생성 (매번 새로 생성)
bot_server = TelegramBotServer(token)
# shutdown 체크
if shutdown_event and shutdown_event.is_set():
print("[Telegram Bot] Shutdown signal received.")
break
# IPC를 통해 메인 봇 데이터 가져오기
try:
bot_server = TelegramBotServer(token, ipc=ipc, shutdown_event=shutdown_event)
# 초기 데이터 로드
try:
# 초기 연결 시도
instance_data = ipc.get_bot_instance_data()
if instance_data:
bot_server.set_bot_instance(instance_data)
except Exception:
pass # 연결 실패해도 일단 봇은 띄움
pass
# 봇 실행 (블로킹)
bot_server.run()
# 재시작 요청 확인
if bot_server.should_restart:
print("🔄 [Telegram Bot] Restarting instance...")
import time
time.sleep(1) # 잠시 대기
print("[Telegram Bot] Restarting instance...")
time.sleep(1)
continue
else:
print("🛑 [Telegram Bot] Process exiting.")
print("[Telegram Bot] Process exiting.")
break
except KeyboardInterrupt:
print("\n🛑 [Telegram Bot] Stopped by user")
print("[Telegram Bot] Stopped by user")
break
except Exception as e:
if "Conflict" not in str(e):
print(f"[Telegram Bot] Error: {e}")
print(f"[Telegram Bot] Error: {e}")
import traceback
traceback.print_exc()
break
# 정리
ipc.cleanup()
if __name__ == "__main__":
multiprocessing.freeze_support()
run_telegram_bot_standalone()

View File

@@ -1,54 +1,49 @@
"""
텔레그램 봇 최적화 버전
- Polling 최적화 (CPU 사용률 감소)
- 별도 프로세스로 분리
- 봇 재시작 명령어
- 원격 명령어 실행
텔레그램 봇 - Shared Memory IPC + 양방향 명령 채널
"""
import os
import asyncio
import logging
import subprocess
import sys
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
from dotenv import load_dotenv
# 로깅 설정
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logging.getLogger("httpx").setLevel(logging.WARNING)
class TelegramBotServer:
def __init__(self, bot_token):
# [최적화] 연결 풀 설정 추가
def __init__(self, bot_token, ipc=None, shutdown_event=None):
self.application = Application.builder()\
.token(bot_token)\
.concurrent_updates(True)\
.build()
self.bot_instance = None
self.ipc = ipc
self.shutdown_event = shutdown_event
self.is_shutting_down = False
self.should_restart = False
def set_bot_instance(self, bot):
"""AutoTradingBot 인스턴스를 주입받음"""
self.bot_instance = bot
def refresh_bot_instance(self):
"""IPC에서 최신 봇 인스턴스 데이터 읽기"""
from modules.utils.ipc import BotIPC
ipc = BotIPC()
self.bot_instance = ipc.get_bot_instance_data()
if self.ipc:
self.bot_instance = self.ipc.get_bot_instance_data()
else:
# fallback: 새 IPC 인스턴스 생성
from modules.utils.ipc import SharedIPC
ipc = SharedIPC()
self.bot_instance = ipc.get_bot_instance_data()
return self.bot_instance is not None
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/start 명령어 핸들러"""
print(f"📨 [Telegram] /start received from user {update.effective_user.id}")
await update.message.reply_text(
"🤖 <b>AI Trading Bot Command Center</b>\n"
"<b>AI Trading Bot Command Center</b>\n"
"명령어 목록:\n"
"/status - 현재 봇 및 시장 상태 조회\n"
"/portfolio - 현재 보유 종목 및 평가액\n"
@@ -58,151 +53,170 @@ class TelegramBotServer:
"/system - PC 리소스(CPU/GPU) 상태\n"
"/ai - AI 모델 학습 상태 조회\n\n"
"<b>[관리 명령어]</b>\n"
"/restart - 봇 재시작\n"
"/restart - 메인 봇 재시작 요청\n"
"/exec <code>명령어</code> - 원격 명령어 실행\n"
"/stop - 봇 종료",
parse_mode="HTML"
)
async def status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/status: 종합 상태 브리핑"""
if not self.refresh_bot_instance():
await update.message.reply_text("⚠️ 메인 봇이 실행 중이 아닙니다.")
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
from datetime import datetime
now = datetime.now()
is_market_open = (9 <= now.hour < 15) or (now.hour == 15 and now.minute < 30)
status_msg = "<b>System Status: ONLINE</b>\n"
status_msg += f"🕒 <b>Market:</b> {'OPEN 🟢' if is_market_open else 'CLOSED 🔴'}\n"
status_msg = "<b>System Status: ONLINE</b>\n"
status_msg += f"<b>Market:</b> {'OPEN' if is_market_open else 'CLOSED'}\n"
macro_warn = self.bot_instance.is_macro_warning_sent
status_msg += f"🌍 <b>Macro Filter:</b> {'DANGER 🚨 (Trading Halted)' if macro_warn else 'SAFE 🟢'}\n"
status_msg += f"<b>Macro Filter:</b> {'DANGER (Trading Halted)' if macro_warn else 'SAFE'}\n"
await update.message.reply_text(status_msg, parse_mode="HTML")
async def portfolio_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/portfolio: 잔고 조회"""
if not self.refresh_bot_instance():
await update.message.reply_text("⚠️ 봇 인스턴스가 연결되지 않았습니다.")
await update.message.reply_text("봇 인스턴스가 연결되지 않았습니다.")
return
await update.message.reply_text("잔고를 조회 중입니다...")
await update.message.reply_text("잔고를 조회 중입니다...")
try:
balance = self.bot_instance.kis.get_balance()
if "error" in balance:
await update.message.reply_text(f"잔고 조회 실패: {balance['error']}")
await update.message.reply_text(f"잔고 조회 실패: {balance['error']}")
return
msg = f"💰 <b>Total Asset:</b> <code>{int(balance['total_eval']):,} KRW</code>\n" \
f"💵 <b>Deposit:</b> <code>{int(balance['deposit']):,} KRW</code>\n\n"
msg = f"<b>Total Asset:</b> <code>{int(balance['total_eval']):,} KRW</code>\n" \
f"<b>Deposit:</b> <code>{int(balance['deposit']):,} KRW</code>\n\n"
if balance['holdings']:
msg += "<b>[Holdings]</b>\n"
for stock in balance['holdings']:
icon = "🔴" if stock['yield'] > 0 else "🔵"
msg += f"{icon} <b>{stock['name']}</b> <code>{stock['yield']}%</code>\n" \
f" (수량: {stock['qty']} / 평가손익: {stock['profit_loss']:,})\n"
yld = float(stock.get('yield', 0))
# 상승(빨강), 하락(파랑) 이모지 적용
if yld > 0:
icon = "🔴"
yld_str = f"+{yld}"
elif yld < 0:
icon = "🔵"
yld_str = f"{yld}"
else:
icon = ""
yld_str = f"{yld}"
msg += f"{icon} <b>{stock['name']}</b>: <code>{yld_str}%</code>\n" \
f" (수량: {stock['qty']} / 손익: {stock['profit_loss']:,})\n"
else:
msg += "보유 중인 종목이 없습니다."
await update.message.reply_text(msg, parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"Error: {str(e)}")
await update.message.reply_text(f"Error: {str(e)}")
async def watchlist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/watchlist: 감시 대상 종목"""
if not self.refresh_bot_instance():
await update.message.reply_text("⚠️ 봇 인스턴스가 연결되지 않았습니다.")
await update.message.reply_text("봇 인스턴스가 연결되지 않았습니다.")
return
target_dict = self.bot_instance.load_watchlist()
discovered = list(self.bot_instance.discovered_stocks)
discovered = self.bot_instance.discovered_stocks
msg = f"👀 <b>Watchlist: {len(target_dict)} items</b>\n"
msg = f"<b>Watchlist: {len(target_dict)} items</b>\n"
for code, name in target_dict.items():
themes = self.bot_instance.theme_manager.get_themes(code)
theme_str = f" ({', '.join(themes)})" if themes else ""
msg += f"- {name}{theme_str}\n"
msg += f"• <b>{name}</b>{theme_str}\n"
if discovered:
msg += f"\n<b>Discovered Today ({len(discovered)}):</b>\n"
msg += f"\n<b>Discovered Today ({len(discovered)}):</b>\n"
for code in discovered:
msg += f"- {code}\n"
await update.message.reply_text(msg, parse_mode="HTML")
async def update_watchlist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/update_watchlist: Watchlist 즉시 업데이트"""
await update.message.reply_text("🔄 Watchlist를 업데이트하고 있습니다... (30초 소요)")
"""Watchlist 업데이트 - command queue를 통해 메인 봇에 요청"""
if self.ipc and self.ipc.send_command('update_watchlist'):
await update.message.reply_text("Watchlist 업데이트를 메인 봇에 요청했습니다.")
else:
# fallback: 직접 업데이트
await update.message.reply_text("Watchlist를 업데이트하고 있습니다... (30초 소요)")
try:
from modules.services.kis import KISClient
from watchlist_manager import WatchlistManager
from modules.config import Config
try:
from modules.services.kis import KISClient
from watchlist_manager import WatchlistManager
from modules.config import Config
temp_kis = KISClient()
mgr = WatchlistManager(temp_kis, watchlist_file=Config.WATCHLIST_FILE)
summary = mgr.update_watchlist_daily()
# HTML 특수문자 이스케이프
summary = summary.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
await update.message.reply_text(summary)
except Exception as e:
await update.message.reply_text(f"❌ 업데이트 실패: {e}")
temp_kis = KISClient()
mgr = WatchlistManager(temp_kis, watchlist_file=Config.WATCHLIST_FILE)
summary = mgr.update_watchlist_daily()
summary = summary.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
await update.message.reply_text(summary)
except Exception as e:
await update.message.reply_text(f"업데이트 실패: {e}")
async def macro_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/macro: 거시경제 지표 조회 (IPC 데이터 사용)"""
if not self.refresh_bot_instance():
await update.message.reply_text("⚠️ 메인 봇 연결 대기 중...")
await update.message.reply_text("메인 봇 연결 대기 중...")
return
await update.message.reply_text("거시경제 데이터를 불러옵니다...")
await update.message.reply_text("거시경제 데이터를 불러옵니다...")
try:
indices = getattr(self.bot_instance.kis, '_macro_indices', {})
if not indices:
await update.message.reply_text("⚠️ 데이터가 아직 수집되지 않았습니다. 잠시 후 다시 시도하세요.")
return
await update.message.reply_text("데이터가 아직 수집되지 않았습니다.")
return
status = "SAFE"
msi = indices.get('MSI', 0)
if msi >= 50: status = "DANGER"
elif msi >= 30: status = "CAUTION"
if msi >= 50:
status = "DANGER"
elif msi >= 30:
status = "CAUTION"
color = "🟢" if status == "SAFE" else "🔴" if status == "DANGER" else "🟡"
msg = f"{color} <b>Market Risk: {status}</b>\n\n"
msg = f"<b>Market Risk: {status}</b>\n\n"
if 'MSI' in indices:
msg += f"🌡️ <b>Stress Index:</b> <code>{indices['MSI']}</code>\n"
msg += f"<b>Stress Index:</b> <code>{indices['MSI']}</code>\n"
for k, v in indices.items():
if k != "MSI":
icon = "🔺" if v.get('change', 0) > 0 else "🔻"
msg += f"{icon} <b>{k}</b>: {v.get('price', 0)} ({v.get('change', 0)}%)\n"
change = float(v.get('change', 0))
price = v.get('price', 0)
if change > 0:
icon = "🔴"
chg_str = f"+{change}"
elif change < 0:
icon = "🔵"
chg_str = f"{change}"
else:
icon = ""
chg_str = f"{change}"
msg += f"{icon} <b>{k}</b>: {price} ({chg_str}%)\n"
await update.message.reply_text(msg, parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"Error: {e}")
await update.message.reply_text(f"Error: {e}")
async def system_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/system: 시스템 리소스 상태"""
if not self.refresh_bot_instance():
await update.message.reply_text("⚠️ 메인 봇이 실행 중이 아닙니다.")
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
import psutil
cpu = psutil.cpu_percent(interval=1)
# non-blocking CPU 측정
cpu = psutil.cpu_percent(interval=0)
ram = psutil.virtual_memory().percent
# CPU 점유율 상위 3개 프로세스 수집
top_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
@@ -220,15 +234,16 @@ class TelegramBotServer:
gpu_msg = "N/A"
if gpu_status and gpu_status.get('name') != 'N/A':
gpu_name = gpu_status.get('name', 'GPU')
gpu_msg = f"{gpu_name}\n Temp: {gpu_status.get('temp', 0)}°C / VRAM: {gpu_status.get('vram_used', 0)}GB / {gpu_status.get('vram_total', 0)}GB"
gpu_msg = f"{gpu_name}\n Temp: {gpu_status.get('temp', 0)}C / " \
f"VRAM: {gpu_status.get('vram_used', 0)}GB / {gpu_status.get('vram_total', 0)}GB"
msg = "🖥️ <b>PC System Status</b>\n" \
f"🧠 <b>CPU:</b> <code>{cpu}%</code>\n" \
f"💾 <b>RAM:</b> <code>{ram}%</code>\n" \
f"🎮 <b>GPU:</b> {gpu_msg}\n\n"
msg = "<b>PC System Status</b>\n" \
f"<b>CPU:</b> <code>{cpu}%</code>\n" \
f"<b>RAM:</b> <code>{ram}%</code>\n" \
f"<b>GPU:</b> {gpu_msg}\n\n"
if top_3:
msg += "⚙️ <b>Top CPU Processes:</b>\n"
msg += "<b>Top CPU Processes:</b>\n"
for i, proc in enumerate(top_3, 1):
proc_name = proc.get('name', 'Unknown')
proc_cpu = proc.get('cpu_percent', 0)
@@ -237,56 +252,55 @@ class TelegramBotServer:
await update.message.reply_text(msg, parse_mode="HTML")
async def ai_status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/ai: AI 모델 학습 상태 조회"""
if not self.refresh_bot_instance():
await update.message.reply_text("⚠️ 메인 봇이 실행 중이 아닙니다.")
await update.message.reply_text("메인 봇이 실행 중이 아닙니다.")
return
gpu = self.bot_instance.ollama_monitor.get_gpu_status()
msg = "🧠 <b>AI Model Status</b>\n"
msg += " <b>LLM Engine:</b> Ollama (Llama 3.1)\n"
gpu_name = gpu.get('name', 'NVIDIA RTX 5070 Ti')
msg += f"• <b>Device:</b> {gpu_name}\n"
msg = "<b>AI Model Status</b>\n"
msg += "* <b>LLM Engine:</b> Ollama (Llama 3.1)\n"
msg += f"* <b>Device:</b> {gpu.get('name', 'GPU')}\n"
if gpu:
msg += f" <b>GPU Load:</b> <code>{gpu.get('load', 0)}%</code>\n"
msg += f" <b>VRAM Usage:</b> <code>{gpu.get('vram_used', 0)}GB</code> / {gpu.get('vram_total', 0)}GB"
msg += f"* <b>GPU Load:</b> <code>{gpu.get('load', 0)}%</code>\n"
msg += f"* <b>VRAM Usage:</b> <code>{gpu.get('vram_used', 0)}GB</code> / {gpu.get('vram_total', 0)}GB"
await update.message.reply_text(msg, parse_mode="HTML")
async def restart_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/restart: 텔레그램 봇 모듈만 재시작"""
await update.message.reply_text("🔄 <b>텔레그램 인터페이스를 재시작합니다...</b>", parse_mode="HTML")
self.should_restart = True
self.application.stop_running()
"""/restart: 메인 봇에 재시작 명령 전달"""
if self.ipc and self.ipc.send_command('restart'):
await update.message.reply_text(
"<b>메인 봇에 재시작 요청을 전송했습니다.</b>", parse_mode="HTML")
else:
# IPC 명령 실패 시 텔레그램 봇만 재시작
await update.message.reply_text(
"<b>텔레그램 인터페이스를 재시작합니다...</b>", parse_mode="HTML")
self.should_restart = True
self.application.stop_running()
async def stop_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/stop: 봇 종료"""
await update.message.reply_text("🛑 <b>텔레그램 봇을 종료합니다.</b>", parse_mode="HTML")
await update.message.reply_text(
"<b>텔레그램 봇을 종료합니다.</b>", parse_mode="HTML")
self.should_restart = False
self.application.stop_running()
async def exec_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/exec: 원격 명령어 실행 (Non-blocking)"""
text = update.message.text.strip()
parts = text.split(maxsplit=1)
if len(parts) < 2:
await update.message.reply_text("사용법: /exec 명령어")
await update.message.reply_text("사용법: /exec 명령어")
return
command = parts[1]
await update.message.reply_text(f"⚙️ 실행 중: <code>{command}</code>", parse_mode="HTML")
await update.message.reply_text(f"실행 중: <code>{command}</code>", parse_mode="HTML")
try:
# 보안: 위험한 명령어 차단
dangerous_keywords = ['rm', 'del', 'format', 'shutdown', 'reboot']
if any(keyword in command.lower() for keyword in dangerous_keywords):
await update.message.reply_text("위험한 명령어는 실행할 수 없습니다.")
await update.message.reply_text("위험한 명령어는 실행할 수 없습니다.")
return
import platform
@@ -327,17 +341,15 @@ class TelegramBotServer:
if len(combined) > 3000:
combined = combined[:3000] + "\n... (Truncated)"
# HTML 특수문자 이스케이프
combined = combined.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
await update.message.reply_text(f"<pre>{combined}</pre>", parse_mode="HTML")
except asyncio.TimeoutError:
await update.message.reply_text("⏱️ 명령어 실행 시간 초과 (30초)")
await update.message.reply_text("명령어 실행 시간 초과 (30초)")
except Exception as e:
await update.message.reply_text(f"실행 오류: {e}")
await update.message.reply_text(f"실행 오류: {e}")
def run(self):
"""봇 실행 (Handler 등록 및 Polling)"""
handlers = [
("start", self.start_command),
("status", self.status_command),
@@ -357,15 +369,15 @@ class TelegramBotServer:
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
if "Conflict" in str(context.error):
print(f"⚠️ [Telegram] Conflict detected. Stopping...")
print(f"[Telegram] Conflict detected. Stopping...")
if self.application.running:
await self.application.stop()
return
print(f"[Telegram Error] {context.error}")
print(f"[Telegram Error] {context.error}")
self.application.add_error_handler(error_handler)
print("🤖 [Telegram] Command Server Started (Standard Polling Mode).")
print("[Telegram] Command Server Started (Shared Memory IPC Mode).")
try:
self.application.run_polling(
@@ -373,4 +385,4 @@ class TelegramBotServer:
drop_pending_updates=True
)
except Exception as e:
print(f"[Telegram] Polling Error: {e}")
print(f"[Telegram] Polling Error: {e}")

View File

@@ -12,8 +12,10 @@ def get_predictor():
"""워커 프로세스 내에서 PricePredictor 인스턴스를 싱글톤으로 관리"""
global _lstm_predictor
if _lstm_predictor is None:
print(f"🧩 [Worker {os.getpid()}] Initializing LSTM Predictor...")
print(f"[Worker {os.getpid()}] Initializing LSTM Predictor...")
_lstm_predictor = PricePredictor()
print(f"[Worker {os.getpid()}] LSTM Device: {_lstm_predictor.device}"
f" | AMP: {_lstm_predictor.use_amp}")
return _lstm_predictor
def analyze_stock_process(ticker, prices, news_items, investor_trend=None):
@@ -34,7 +36,7 @@ def analyze_stock_process(ticker, prices, news_items, investor_trend=None):
lstm_predictor = get_predictor()
if lstm_predictor:
lstm_predictor.training_status['current_ticker'] = ticker
pred_result = lstm_predictor.train_and_predict(prices)
pred_result = lstm_predictor.train_and_predict(prices, ticker=ticker)
lstm_score = 0.5 # 중립
ai_confidence = 0.5

View File

@@ -1,62 +1,126 @@
"""
프로세스 간 통신 (IPC) - 파일 기반
텔레그램 봇과 메인 봇 간 데이터 공유
프로세스 간 통신 (IPC) - Shared Memory 기반
텔레그램 봇과 메인 봇 간 데이터 공유 + 양방향 명령 채널
"""
import os
import json
import time
from datetime import datetime
import struct
from multiprocessing.shared_memory import SharedMemory
from modules.config import Config
class BotIPC:
"""파일 기반 IPC (Inter-Process Communication)"""
def __init__(self, ipc_file=None):
self.ipc_file = ipc_file if ipc_file else Config.IPC_FILE
self.last_update = 0
class SharedIPC:
"""Shared Memory + Command Queue 기반 IPC"""
def __init__(self, lock=None, command_queue=None):
self.lock = lock
self.command_queue = command_queue
self._shm = None
self._is_creator = False
def _ensure_shm(self):
"""SharedMemory 블록에 연결 (없으면 생성)"""
if self._shm is not None:
return self._shm
try:
self._shm = SharedMemory(name=Config.SHM_NAME, create=False)
except FileNotFoundError:
self._shm = SharedMemory(name=Config.SHM_NAME, create=True, size=Config.SHM_SIZE)
self._is_creator = True
# 초기화: 길이 필드를 0으로 설정
struct.pack_into('I', self._shm.buf, 0, 0)
return self._shm
def write_status(self, data):
"""메인 봇이 상태를 파일에 기록"""
"""메인 봇이 상태를 shared memory에 기록"""
try:
with open(self.ipc_file, 'w', encoding='utf-8') as f:
json.dump({
'timestamp': time.time(),
'data': data
}, f, ensure_ascii=False, indent=2)
shm = self._ensure_shm()
payload = json.dumps({
'timestamp': time.time(),
'data': data
}, ensure_ascii=False).encode('utf-8')
if len(payload) + 4 > Config.SHM_SIZE:
print(f"[IPC] Data too large: {len(payload)} bytes")
return
if self.lock:
self.lock.acquire()
try:
# [4바이트 길이][JSON 페이로드]
struct.pack_into('I', shm.buf, 0, len(payload))
shm.buf[4:4 + len(payload)] = payload
finally:
if self.lock:
self.lock.release()
except Exception as e:
print(f"⚠️ [IPC] Write failed: {e}")
print(f"[IPC] Write failed: {e}")
def read_status(self):
"""텔레그램 봇이 상태를 파일에서 읽기"""
"""텔레그램 봇이 상태를 shared memory에서 읽기"""
try:
if not os.path.exists(self.ipc_file):
print(f"⚠️ [IPC] File not found: {self.ipc_file}")
shm = self._ensure_shm()
if self.lock:
self.lock.acquire()
try:
length = struct.unpack_from('I', shm.buf, 0)[0]
if length == 0 or length > Config.SHM_SIZE - 4:
return None
raw = bytes(shm.buf[4:4 + length])
finally:
if self.lock:
self.lock.release()
ipc_data = json.loads(raw.decode('utf-8'))
age = time.time() - ipc_data.get('timestamp', 0)
if age > Config.IPC_STALENESS:
print(f"[IPC] Data too old: {age:.1f}s")
return None
with open(self.ipc_file, 'r', encoding='utf-8') as f:
ipc_data = json.load(f)
# 60초 이상 오래된 데이터는 무시 (10초 → 60초로 완화)
timestamp = ipc_data.get('timestamp', 0)
age = time.time() - timestamp
if age > 60:
print(f"⚠️ [IPC] Data too old: {age:.1f}s")
return None
print(f"✅ [IPC] Data loaded (age: {age:.1f}s)")
return ipc_data.get('data')
except Exception as e:
print(f"⚠️ [IPC] Read failed: {e}")
print(f"[IPC] Read failed: {e}")
return None
# --- 명령 채널 (텔레그램 → 메인 봇) ---
def send_command(self, command, **kwargs):
"""텔레그램 → 메인 봇 명령 전송"""
if self.command_queue:
try:
self.command_queue.put_nowait({
'command': command,
'timestamp': time.time(),
**kwargs
})
return True
except Exception as e:
print(f"[IPC] Command send failed: {e}")
return False
def poll_commands(self):
"""메인 봇이 명령 큐를 폴링"""
commands = []
if self.command_queue:
try:
while not self.command_queue.empty():
cmd = self.command_queue.get_nowait()
commands.append(cmd)
except Exception:
pass
return commands
# --- FakeBot 인스턴스 (호환성 유지) ---
def get_bot_instance_data(self):
"""봇 인스턴스 데이터 가져오기 (호환성 유지)"""
"""봇 인스턴스 데이터 가져오기 (텔레그램 봇용)"""
status = self.read_status()
if not status:
return None
# 가짜 봇 인스턴스 객체 생성 (기존 코드 호환)
class FakeBotInstance:
def __init__(self, data):
self.kis = FakeKIS(data.get('balance', {}), data.get('macro_indices', {}))
@@ -70,9 +134,7 @@ class BotIPC:
class FakeKIS:
def __init__(self, balance_data, macro_indices):
self._balance = balance_data if balance_data else {
'total_eval': 0,
'deposit': 0,
'holdings': []
'total_eval': 0, 'deposit': 0, 'holdings': []
}
self._macro_indices = macro_indices if macro_indices else {}
@@ -80,60 +142,36 @@ class BotIPC:
return self._balance
def get_current_index(self, ticker):
"""지수 조회 - IPC에서 저장된 데이터 반환"""
if ticker in self._macro_indices:
return self._macro_indices[ticker]
# 데이터 없으면 기본값
return {
'price': 2500.0,
'change': 0.0
}
return {'price': 2500.0, 'change': 0.0}
def get_daily_index_price(self, ticker, period="D"):
"""지수 일별 시세 조회 - IPC 모드에서는 더미 데이터 반환"""
# MacroAnalyzer의 MSI 계산용
# 실제 데이터는 메인 봇에서만 조회 가능
# IPC 모드에서는 기본 더미 데이터 반환 (20일치)
base_price = 2500.0
if ticker in self._macro_indices:
base_price = self._macro_indices[ticker].get('price', 2500.0)
# 20일치 더미 데이터 (약간의 변동)
import random
prices = []
for i in range(20):
variation = random.uniform(-0.02, 0.02) # ±2% 변동
prices.append(base_price * (1 + variation))
return prices
return [base_price * (1 + random.uniform(-0.02, 0.02)) for _ in range(20)]
def get_current_price(self, ticker):
"""현재가 조회 - IPC 모드에서는 사용 불가"""
return None
def get_daily_price(self, ticker, period="D"):
"""일별 시세 조회 - IPC 모드에서는 사용 불가"""
return []
def get_volume_rank(self, market="0"):
"""거래량 순위 조회 - IPC 모드에서는 사용 불가"""
return []
def buy_stock(self, ticker, qty):
"""매수 주문 - IPC 모드에서는 사용 불가"""
return {"success": False, "msg": "IPC mode: buy not available"}
return {"success": False, "msg": "IPC mode"}
def sell_stock(self, ticker, qty):
"""매도 주문 - IPC 모드에서는 사용 불가"""
return {"success": False, "msg": "IPC mode: sell not available"}
return {"success": False, "msg": "IPC mode"}
class FakeOllama:
def __init__(self, gpu_data):
self._gpu = gpu_data if gpu_data else {
'name': 'N/A',
'temp': 0,
'vram_used': 0,
'vram_total': 0,
'load': 0
'name': 'N/A', 'temp': 0, 'vram_used': 0, 'vram_total': 0, 'load': 0
}
def get_gpu_status(self):
@@ -151,13 +189,17 @@ class BotIPC:
self._watchlist = watchlist_data if watchlist_data else {}
def update_watchlist_daily(self):
return "⚠️ Watchlist update not available in IPC mode"
return "Watchlist update not available in IPC mode"
return FakeBotInstance(status)
def load_watchlist(self):
"""Watchlist 로드"""
status = self.read_status()
if status:
return status.get('watchlist', {})
return {}
def cleanup(self):
"""리소스 정리"""
if self._shm:
try:
self._shm.close()
if self._is_creator:
self._shm.unlink()
except Exception:
pass
self._shm = None

View File

@@ -1,7 +1,7 @@
import psutil
import time
from datetime import datetime
class SystemMonitor:
def __init__(self, messenger, ollama_manager):
self.messenger = messenger
@@ -9,64 +9,51 @@ class SystemMonitor:
self.last_health_check = datetime.now()
def check_health(self):
"""시스템 상태 점검 및 알림 (CPU, RAM, GPU) - 5분마다 실행"""
"""시스템 상태 점검 및 알림 (CPU, RAM, GPU) - 3분마다 실행"""
now = datetime.now()
# 5분에 한 번씩만 체크
if (now - self.last_health_check).total_seconds() < 300:
if (now - self.last_health_check).total_seconds() < 180: # 5분 → 3분
return
self.last_health_check = now
alerts = []
# 1. CPU Check (Double Verify)
# 1초 간격으로 측정
cpu_usage = psutil.cpu_percent(interval=1)
# 1. CPU Check (non-blocking 측정)
cpu_usage = psutil.cpu_percent(interval=0)
if cpu_usage > 90:
# 일시적인 스파이크일 수 있으므로 3초 후 재측정
time.sleep(3)
cpu_usage_2nd = psutil.cpu_percent(interval=1)
# 검증: 상위 프로세스 CPU 합계 확인
top_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
if proc.info['name'] == 'System Idle Process':
continue
top_processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
if cpu_usage_2nd > 90:
# 과부하 시 원인 프로세스 추적
top_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
# Windows 유휴 프로세스 제외
if proc.info['name'] == 'System Idle Process':
continue
top_processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
top_processes.sort(key=lambda x: x['cpu_percent'], reverse=True)
total_top_cpu = sum(p['cpu_percent'] for p in top_processes[:3])
# CPU 사용률 내림차순 정렬
top_processes.sort(key=lambda x: x['cpu_percent'], reverse=True)
# 상위 프로세스들의 CPU 합계 검증 (측정 오류 필터링)
total_top_cpu = sum(p['cpu_percent'] for p in top_processes[:3])
if total_top_cpu < 30.0:
print(f"⚠️ [Monitor] Ignored CPU Alert: usage={cpu_usage_2nd}% but top3_sum={total_top_cpu}%")
else:
top_3_str = ""
for p in top_processes[:3]:
top_3_str += f"\n- {p['name']} ({p['cpu_percent']}%)"
alerts.append(f"🔥 **[CPU Overload]** Usage: `{cpu_usage_2nd}%`\n**Top Processes:**{top_3_str}")
if total_top_cpu >= 30.0:
top_3_str = ""
for p in top_processes[:3]:
top_3_str += f"\n- {p['name']} ({p['cpu_percent']}%)"
alerts.append(f"[CPU Overload] Usage: {cpu_usage}%\nTop Processes:{top_3_str}")
# 2. RAM Check
ram = psutil.virtual_memory()
if ram.percent > 90:
alerts.append(f"💾 **[RAM High]** Usage: `{ram.percent}%` (Free: {ram.available / 1024**3:.1f}GB)")
alerts.append(f"[RAM High] Usage: {ram.percent}% (Free: {ram.available / 1024**3:.1f}GB)")
# 3. GPU Check
if self.ollama_monitor:
gpu_status = self.ollama_monitor.get_gpu_status()
temp = gpu_status.get('temp', 0)
if temp > 80:
alerts.append(f"♨️ **[GPU Overheat]** Temp: `{temp}°C`")
alerts.append(f"[GPU Overheat] Temp: {temp}C")
# 알림 전송
if alerts:
msg = "⚠️ **[System Health Alert]**\n" + "\n".join(alerts)
msg = "[System Health Alert]\n" + "\n".join(alerts)
if self.messenger:
self.messenger.send_message(msg)

View File

@@ -1,78 +1,183 @@
"""
프로세스 생명주기 관리
- 메모리 기반 PID 관리 (pids.txt 폐기)
- Watchdog 헬스체크
- 자동 재시작 (최대 3회)
"""
import os
import time
import threading
from multiprocessing.shared_memory import SharedMemory
from modules.config import Config
class ProcessTracker:
"""메모리 기반 프로세스 추적기"""
# 클래스 변수: 등록된 프로세스 정보
_processes = {} # {name: pid}
_lock = threading.Lock()
# 하위 호환: 기존 pids.txt 정리용
FILE_PATH = "pids.txt"
@staticmethod
def register(name):
"""현재 프로세스의 PID와 이름을 기록"""
"""현재 프로세스 등록 (메모리 기반)"""
pid = os.getpid()
entry = f"{pid}: {name} (Started: {time.strftime('%Y-%m-%d %H:%M:%S')})\n"
with ProcessTracker._lock:
ProcessTracker._processes[name] = pid
print(f"[Process] Registered: {name} (PID: {pid})")
try:
# 파일이 없으면 생성, 있으면 추가
# 단, main_server 시작 시 초기화하는 것이 좋음
with open(ProcessTracker.FILE_PATH, "a", encoding="utf-8") as f:
f.write(entry)
print(f"📌 Process Registered: {name} (PID: {pid})")
except Exception as e:
print(f"⚠️ Failed to register process: {e}")
@staticmethod
def unregister(name):
"""프로세스 등록 해제"""
with ProcessTracker._lock:
ProcessTracker._processes.pop(name, None)
@staticmethod
def get_all():
"""등록된 모든 프로세스 반환"""
with ProcessTracker._lock:
return dict(ProcessTracker._processes)
@staticmethod
def check_and_kill_zombies():
"""
pids.txt록된 이전 프로세스들이 구동 중이라면 강제 종료.
서버 시작 시 1회 호출하여 좀비 프로세스를 정리함.
"""
if not os.path.exists(ProcessTracker.FILE_PATH):
return
"""이전 실행의 좀비 프로세스 정리 + stale SharedMemory 정리"""
# 1. pids.txt 기반 좀비 정리 (하위 호환)
if os.path.exists(ProcessTracker.FILE_PATH):
try:
import psutil
current_pid = os.getpid()
print("🔍 Checking for zombie processes...")
try:
import psutil
current_pid = os.getpid()
with open(ProcessTracker.FILE_PATH, "r", encoding="utf-8") as f:
lines = f.readlines()
with open(ProcessTracker.FILE_PATH, "r", encoding="utf-8") as f:
lines = f.readlines()
killed_count = 0
for line in lines:
if ":" not in line or "Running Processes" in line:
continue
try:
pid_str = line.split(":")[0].strip()
pid = int(pid_str)
if pid == current_pid:
killed_count = 0
for line in lines:
if ":" not in line or "Running Processes" in line:
continue
try:
pid = int(line.split(":")[0].strip())
if pid == current_pid:
continue
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
if "python" in proc.name().lower():
print(f"[Process] Killing zombie: PID {pid} ({line.strip()})")
proc.kill()
killed_count += 1
except (ValueError, psutil.NoSuchProcess, psutil.AccessDenied):
continue
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
proc_name = proc.name()
if killed_count > 0:
print(f"[Process] Cleaned up {killed_count} zombie processes.")
except Exception as e:
print(f"[Process] Zombie cleanup failed: {e}")
# Python 프로세스만 타겟
if "python" in proc_name.lower():
print(f"💀 Killing Zombie Process: {pid} ({line.strip()})")
proc.kill()
killed_count += 1
except (ValueError, psutil.NoSuchProcess, psutil.AccessDenied):
continue
# pids.txt 삭제 (더 이상 사용하지 않음)
try:
os.remove(ProcessTracker.FILE_PATH)
except Exception:
pass
if killed_count > 0:
print(f"✅ Cleaned up {killed_count} zombie processes.")
# 파일 초기화
ProcessTracker.clear()
except Exception as e:
print(f"⚠️ Failed to kill zombies: {e}")
# 2. Stale SharedMemory 정리
try:
shm = SharedMemory(name=Config.SHM_NAME, create=False)
shm.close()
shm.unlink()
print(f"[Process] Cleaned stale SharedMemory: {Config.SHM_NAME}")
except FileNotFoundError:
pass
except Exception:
pass
@staticmethod
def clear():
"""PID 파일 초기화"""
try:
with open(ProcessTracker.FILE_PATH, "w", encoding="utf-8") as f:
f.write(f"--- Running Processes (Last Update: {time.strftime('%Y-%m-%d %H:%M:%S')}) ---\n")
except:
pass
"""등록 정보 초기화"""
with ProcessTracker._lock:
ProcessTracker._processes.clear()
class ProcessWatchdog:
"""자식 프로세스 감시 및 자동 재시작"""
def __init__(self, shutdown_event=None):
self.shutdown_event = shutdown_event
self._watched = {} # {name: {process, target, args, restart_count}}
self._thread = None
self._running = False
def watch(self, name, process, target, args=()):
"""프로세스를 감시 대상에 등록"""
self._watched[name] = {
'process': process,
'target': target,
'args': args,
'restart_count': 0
}
def start(self):
"""Watchdog 스레드 시작"""
self._running = True
self._thread = threading.Thread(target=self._watchdog_loop, daemon=True)
self._thread.start()
print(f"[Watchdog] Started (interval: {Config.WATCHDOG_INTERVAL}s)")
def stop(self):
"""Watchdog 중지"""
self._running = False
if self._thread:
self._thread.join(timeout=5)
def get_process(self, name):
"""감시 중인 프로세스 반환"""
entry = self._watched.get(name)
return entry['process'] if entry else None
def _watchdog_loop(self):
"""주기적으로 자식 프로세스 상태 확인"""
import multiprocessing
while self._running:
if self.shutdown_event and self.shutdown_event.is_set():
break
for name, entry in list(self._watched.items()):
proc = entry['process']
if proc.is_alive():
continue
# 프로세스가 죽었음
exit_code = proc.exitcode
restart_count = entry['restart_count']
if restart_count >= Config.MAX_RESTART_COUNT:
print(f"[Watchdog] {name} crashed (exit={exit_code}). "
f"Max restarts ({Config.MAX_RESTART_COUNT}) reached. Giving up.")
continue
print(f"[Watchdog] {name} crashed (exit={exit_code}). "
f"Restarting... ({restart_count + 1}/{Config.MAX_RESTART_COUNT})")
try:
new_proc = multiprocessing.Process(
target=entry['target'],
args=entry['args']
)
new_proc.start()
entry['process'] = new_proc
entry['restart_count'] = restart_count + 1
print(f"[Watchdog] {name} restarted (new PID: {new_proc.pid})")
except Exception as e:
print(f"[Watchdog] Failed to restart {name}: {e}")
# 인터벌 대기 (shutdown_event 체크하면서)
for _ in range(Config.WATCHDOG_INTERVAL):
if not self._running:
break
if self.shutdown_event and self.shutdown_event.is_set():
break
time.sleep(1)