diff --git a/main_server.py b/main_server.py
index ee9d3ec..3c52119 100644
--- a/main_server.py
+++ b/main_server.py
@@ -24,11 +24,12 @@ news_collector = None
import multiprocessing
from modules.bot import AutoTradingBot
-
+from modules.utils.process_tracker import ProcessTracker
from modules.services.telegram_bot.runner import run_telegram_bot_standalone
# 봇 실행 래퍼 함수
def run_trading_bot():
+ ProcessTracker.register("Trading Bot Main")
bot = AutoTradingBot()
bot.loop()
@@ -41,6 +42,12 @@ async def lifespan(app: FastAPI):
Config.validate()
# 2. 전역 객체 초기화 (서버용)
+ # [Process Tracker] 초기화
+ try:
+ ProcessTracker.clear()
+ ProcessTracker.register("Main Server (Uvicorn Worker)")
+ except: pass
+
ai_agent = OllamaManager()
kis_client = KISClient()
news_collector = NewsCollector()
@@ -56,6 +63,13 @@ async def lifespan(app: FastAPI):
telegram_process = multiprocessing.Process(target=run_telegram_bot_standalone)
telegram_process.start()
+ # [Process Tracker] 자식 프로세스 PID 기록 (부모 관점)
+ try:
+ with open(ProcessTracker.FILE_PATH, "a", encoding="utf-8") as f:
+ f.write(f"{bot_process.pid}: Trading Bot Process (Parent View)\n")
+ f.write(f"{telegram_process.pid}: Telegram Bot Process (Parent View)\n")
+ except: pass
+
messenger.send_message("🖥️ **[Server Started]** Windows AI Server (Refactored) Online.")
yield
@@ -65,13 +79,17 @@ async def lifespan(app: FastAPI):
if telegram_process and telegram_process.is_alive():
print(" - Stopping Telegram Bot...")
- telegram_process.terminate()
- telegram_process.join()
+ telegram_process.join(timeout=5)
+ if telegram_process.is_alive():
+ telegram_process.terminate()
+ telegram_process.join()
if bot_process and bot_process.is_alive():
print(" - Stopping Trading Bot...")
- bot_process.terminate()
- bot_process.join()
+ bot_process.join(timeout=5)
+ if bot_process.is_alive():
+ bot_process.terminate()
+ bot_process.join()
messenger.send_message("🛑 **[Server Stopped]** Server Shutting Down.")
@@ -139,4 +157,12 @@ async def analyze_portfolio():
return {"analysis": analysis}
if __name__ == "__main__":
- uvicorn.run("main_server:app", host="0.0.0.0", port=8000, reload=True)
+ # [안정성] 서버 시작 시 이전 좀비 프로세스 정리
+ try:
+ from modules.utils.process_tracker import ProcessTracker
+ ProcessTracker.check_and_kill_zombies()
+ except: pass
+
+ # Reload=True는 멀티프로세싱 자식 프로세스 관리에 취약하므로 비활성화 권장
+ print("🚀 Starting Windows AI Server...")
+ uvicorn.run("main_server:app", host="0.0.0.0", port=8000, reload=False)
diff --git a/modules/analysis/technical.py b/modules/analysis/technical.py
index f13dc85..1e1a48e 100644
--- a/modules/analysis/technical.py
+++ b/modules/analysis/technical.py
@@ -187,4 +187,26 @@ class TechnicalAnalyzer:
else:
volatility = 0.0
- return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1)
+ # [신규] 이동평균선 분석 (20일, 114일)
+ ma114 = TechnicalAnalyzer.calculate_ma(prices_history, 114)
+
+ ma_trend = "Unknown"
+ if ma20 > ma114:
+ ma_trend = "Bullish (Golden Alignment)" # 정배열
+ else:
+ ma_trend = "Bearish (Dead Alignment)" # 역배열
+
+ price_pos = "Unknown"
+ if current_price > ma20:
+ price_pos = "Above MA20"
+ else:
+ price_pos = "Below MA20"
+
+ ma_info = {
+ "ma20": ma20,
+ "ma114": ma114,
+ "trend": ma_trend,
+ "position": price_pos
+ }
+
+ return round(total_score, 4), round(rsi, 2), round(volatility, 2), round(volume_ratio, 1), ma_info
diff --git a/modules/bot.py b/modules/bot.py
index 3b0fdd8..e5aa4b9 100644
--- a/modules/bot.py
+++ b/modules/bot.py
@@ -3,6 +3,7 @@ import os
import sys
import json
from concurrent.futures import ProcessPoolExecutor
+from concurrent.futures.process import BrokenProcessPool
from datetime import datetime
# 모듈 임포트
@@ -24,12 +25,26 @@ except ImportError:
class ThemeManager:
def get_themes(self, code): return []
+# 워커 프로세스 초기화 함수
+def init_worker():
+ try:
+ from modules.utils.process_tracker import ProcessTracker
+ ProcessTracker.register("Trading Bot Worker")
+ except: pass
+
class AutoTradingBot:
def __init__(self):
# 1. 서비스 초기화
self.kis = KISClient()
self.news = NewsCollector()
- self.executor = ProcessPoolExecutor(max_workers=4)
+ # [안정성] GPU OOM 방지를 위해 워커 수 축소 (4 -> 2)
+ # [식별] 워커 프로세스 이름 등록
+ self.executor = ProcessPoolExecutor(max_workers=2, initializer=init_worker)
+ # 워커 프로세스 워밍업 (PID 등록 유도)
+ try:
+ list(self.executor.map(lambda x: x, range(2)))
+ except: pass
+
self.messenger = TelegramMessenger()
self.theme_manager = ThemeManager()
self.ollama_monitor = OllamaManager() # GPU 모니터링용
@@ -67,8 +82,6 @@ class AutoTradingBot:
from modules.analysis.deep_learning import PricePredictor
PricePredictor.verify_hardware()
- # 텔레그램 명령 서버 시작 (Server에서 관리하도록 변경)
- # self.start_telegram_command_server()
pass
def load_trade_history(self):
@@ -129,6 +142,17 @@ class AutoTradingBot:
self.messenger.send_message(report)
self.report_sent = True
+ def restart_executor(self):
+ """프로세스 풀 복구"""
+ print("🔄 [Bot] Restarting Process Executor...")
+ try:
+ self.executor.shutdown(wait=False)
+ except:
+ pass
+ # 워커 재시작
+ self.executor = ProcessPoolExecutor(max_workers=2, initializer=init_worker)
+ print("✅ [Bot] Process Executor Restarted.")
+
def run_cycle(self):
now = datetime.now()
@@ -261,75 +285,96 @@ class AutoTradingBot:
# [수정] 실시간 잔고 추적용 변수 (매수 시 차감)
tracking_deposit = int(balance.get("deposit", 0))
- for ticker, name in target_dict.items():
- prices = self.kis.get_daily_price(ticker)
- if not prices: continue
-
- future = self.executor.submit(analyze_stock_process, ticker, prices, news_data)
- analysis_tasks.append(future)
-
- # 결과 처리
- for future in analysis_tasks:
- try:
- res = future.result()
- ticker_name = target_dict.get(res['ticker'], 'Unknown')
- print(f"📊 [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})")
+ try:
+ for ticker, name in target_dict.items():
+ prices = self.kis.get_daily_price(ticker)
+ if not prices: continue
- if res['decision'] == "BUY":
- if is_crash: continue
+ # [신규] 외인 수급 분석
+ investor_trend = self.kis.get_investor_trend(ticker)
+
+ future = self.executor.submit(analyze_stock_process, ticker, prices, news_data, investor_trend)
+ analysis_tasks.append(future)
+
+ # 결과 처리
+ for future in analysis_tasks:
+ try:
+ res = future.result()
+ ticker_name = target_dict.get(res['ticker'], 'Unknown')
+ print(f"📊 [{ticker_name}] Score: {res['score']:.2f} ({res['decision']})")
- # 매수 로직 (예수금 체크 추가)
- current_qty = 0
- if res['ticker'] in current_holdings:
- current_qty = current_holdings[res['ticker']]['qty']
+ if res['decision'] == "BUY":
+ if is_crash: continue
- current_price = float(res['current_price'])
- if current_price <= 0: continue
-
- # 매수 수량 결정 (기본 1주, 추후 금액 기반으로 변경 가능)
- qty = 1
- required_amount = current_price * qty
-
- # 예수금 확인
- if tracking_deposit < required_amount:
- print(f"💰 [Skip Buy] 예수금 부족 ({ticker_name}): 필요 {required_amount:,.0f} > 잔고 {tracking_deposit:,.0f}")
- continue
+ # 매수 로직 (예수금 체크 추가)
+ current_qty = 0
+ if res['ticker'] in current_holdings:
+ current_qty = current_holdings[res['ticker']]['qty']
+
+ current_price = float(res['current_price'])
+ if current_price <= 0: continue
+
+ # 매수 수량 결정 (기본 1주, 추후 금액 기반으로 변경 가능)
+ qty = 1
+ required_amount = current_price * qty
+
+ # 예수금 확인
+ if tracking_deposit < required_amount:
+ print(f"💰 [Skip Buy] 예수금 부족 ({ticker_name}): 필요 {required_amount:,.0f} > 잔고 {tracking_deposit:,.0f}")
+ continue
- print(f"🚀 Buying {ticker_name} {qty}ea")
-
- # 실제 주문
- order = self.kis.buy_stock(res['ticker'], qty)
- if order.get("status"):
- self.messenger.send_message(f"🚀 **[BUY]** {ticker_name} {qty}주\n"
- f" • 가격: {current_price:,.0f}원")
+ print(f"🚀 Buying {ticker_name} {qty}ea")
- self.daily_trade_history.append({
- "action": "BUY",
- "name": ticker_name,
- "qty": qty,
- "price": current_price
- })
- self.save_trade_history()
-
- # [중요] 가상 잔고 차감 (연속 매수 시 초과 방지)
- tracking_deposit -= required_amount
-
- elif res['decision'] == "SELL":
- print(f"📉 Selling {ticker_name} (Simulation)")
- # 매도 로직 (필요 시 추가)
- except Exception as e:
- print(f"❌ Analysis Error: {e}")
+ # 실제 주문
+ order = self.kis.buy_stock(res['ticker'], qty)
+ if order.get("status"):
+ self.messenger.send_message(f"🚀 **[BUY]** {ticker_name} {qty}주\n"
+ f" • 가격: {current_price:,.0f}원")
+
+ self.daily_trade_history.append({
+ "action": "BUY",
+ "name": ticker_name,
+ "qty": qty,
+ "price": current_price
+ })
+ self.save_trade_history()
+
+ # [중요] 가상 잔고 차감 (연속 매수 시 초과 방지)
+ tracking_deposit -= required_amount
+
+ elif res['decision'] == "SELL":
+ print(f"📉 Selling {ticker_name} (Simulation)")
+ # 매도 로직 (필요 시 추가)
+ except BrokenProcessPool:
+ raise # 상위 레벨에서 처리
+ except Exception as e:
+ print(f"❌ Analysis Worker Error: {e}")
+
+ except BrokenProcessPool:
+ print("⚠️ [Bot] Worker Process Crashed (OOM, CUDA Error?). Restarting Executor...")
+ self.restart_executor()
+ except KeyboardInterrupt:
+ raise
+ except Exception as e:
+ print(f"❌ Cycle Loop Error: {e}")
def loop(self):
print(f"🤖 Bot Module Started (PID: {os.getpid()})")
self.messenger.send_message("🤖 **[Bot Started]** 리팩토링된 봇이 시작되었습니다.")
- while True:
- try:
- self.run_cycle()
- except Exception as e:
- print(f"⚠️ Loop Error: {e}")
- self.messenger.send_message(f"⚠️ Loop Error: {e}")
- time.sleep(60)
+ try:
+ while True:
+ try:
+ self.run_cycle()
+ except Exception as e:
+ print(f"⚠️ Loop Error: {e}")
+ self.messenger.send_message(f"⚠️ Loop Error: {e}")
+ time.sleep(60)
+ except KeyboardInterrupt:
+ print("🛑 [Bot] Stopped by User.")
+ finally:
+ print("🛑 [Bot] Shutting down executor...")
+ self.executor.shutdown(wait=False)
+ print("✅ [Bot] Executor shutdown complete.")
def start_telegram_command_server(self):
"""텔레그램 봇 프로세스 실행 (독립 프로세스)"""
diff --git a/modules/services/kis.py b/modules/services/kis.py
index b69f362..a79cf25 100644
--- a/modules/services/kis.py
+++ b/modules/services/kis.py
@@ -28,6 +28,19 @@ class KISClient:
self.access_token = None
self.token_expired = None
self.last_req_time = 0
+
+ # 토큰 파일 경로 (영구 저장용)
+ self.token_file = os.path.join(Config.DATA_DIR, "kis_token.json")
+ self.load_token() # 초기화 시 토큰 로드 시도
+
+ def _safe_int(self, val):
+ """안전한 int 변환"""
+ try:
+ if not val:
+ return 0
+ return int(str(val).strip())
+ except:
+ return 0
def _throttle(self):
"""API 요청 속도 제한 (초당 2회 이하로 제한)"""
@@ -41,6 +54,38 @@ class KISClient:
self.last_req_time = time.time()
+ def load_token(self):
+ """파일에서 토큰 로드"""
+ if os.path.exists(self.token_file):
+ try:
+ with open(self.token_file, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ # 만료 시간 체크
+ expire_str = data.get("expired_at")
+ if expire_str:
+ expire_dt = datetime.strptime(expire_str, "%Y-%m-%d %H:%M:%S")
+ if datetime.now() < expire_dt:
+ self.access_token = data.get("access_token")
+ self.token_expired = expire_dt
+ print(f"📂 [KIS] Saved Token Loaded (Expires: {expire_str})")
+ except Exception as e:
+ print(f"⚠️ Failed to load token file: {e}")
+
+ def save_token(self):
+ """토큰 파일 저장"""
+ if not self.access_token or not self.token_expired:
+ return
+
+ try:
+ data = {
+ "access_token": self.access_token,
+ "expired_at": self.token_expired.strftime("%Y-%m-%d %H:%M:%S")
+ }
+ with open(self.token_file, "w", encoding="utf-8") as f:
+ json.dump(data, f, indent=2)
+ except Exception as e:
+ print(f"⚠️ Failed to save token file: {e}")
+
def _get_headers(self, tr_id=None):
"""공통 헤더 생성"""
headers = {
@@ -54,10 +99,16 @@ class KISClient:
return headers
- def ensure_token(self):
- """접근 토큰 발급 (OAuth 2.0)"""
- # 토큰 유효성 체크 로직은 생략 (실제 운영 시 만료 시간 체크 필요)
- if self.access_token:
+ def ensure_token(self, force=False):
+ """접근 토큰 발급 (OAuth 2.0) 및 유효성 관리"""
+ # 토큰이 있고, 만료 시간이 아직 안 지났으면 재사용
+ if not force and self.access_token and self.token_expired:
+ if datetime.now() < self.token_expired:
+ return
+
+ # 앱키 확인
+ if not self.app_key or not self.app_secret:
+ print("❌ [KIS] App Key or Secret is missing!")
return
url = f"{self.base_url}/oauth2/tokenP"
@@ -68,16 +119,41 @@ class KISClient:
}
try:
- print("🔑 [KIS] 토큰 발급 요청...")
+ print(f"🔑 [KIS] 토큰 발급 요청: {url}")
res = requests.post(url, json=payload)
res.raise_for_status()
data = res.json()
+
self.access_token = data.get('access_token')
- print("✅ [KIS] 토큰 발급 성공")
+
+ # 만료 시간 설정
+ expires_in = int(data.get('expires_in', 86400))
+ self.token_expired = datetime.now() + timedelta(seconds=expires_in - 60)
+
+ # 파일 저장
+ self.save_token()
+
+ print(f"✅ [KIS] 토큰 발급 성공 (만료: {self.token_expired.strftime('%Y-%m-%d %H:%M:%S')})")
+
except Exception as e:
- print(f"❌ [KIS] 토큰 발급 실패: {e}")
+ # 1분 제한 에러 핸들링 (EGW00133)
+ retry = False
if isinstance(e, requests.exceptions.RequestException) and e.response is not None:
- print(f"📄 [KIS Token Error Body]: {e.response.text}")
+ err_text = e.response.text
+ print(f"📄 [KIS Error]: {err_text}")
+ if "EGW00133" in err_text:
+ print("⏳ [KIS] Rate Limit Hit (1 min). Waiting 65s...")
+ time.sleep(65) # 1분 대기
+ retry = True
+
+ if retry:
+ # 재귀 호출 (한 번만)
+ self.ensure_token()
+ return
+
+ print(f"❌ [KIS] 토큰 발급 실패: {e}")
+ self.access_token = None
+ raise e
def get_hash_key(self, datas):
"""주문 시 필요한 Hash Key 생성 (Koreainvestment header 특화)"""
@@ -94,16 +170,62 @@ class KISClient:
print(f"❌ Hash Key 생성 실패: {e}")
return None
- def get_balance(self):
- """주식 잔고 조회"""
+ def _request_api(self, method, endpoint, tr_id, params=None, data=None, use_hash=False):
+ """API 요청 공통 핸들러 (토큰 만료 시 자동 갱신)"""
self._throttle()
self.ensure_token()
- # 국내주식 잔고조회 TR ID: VTTC8434R (모의), TTTC8434R (실전)
- tr_id = "VTTC8434R" if self.is_virtual else "TTTC8434R"
- url = f"{self.base_url}/uapi/domestic-stock/v1/trading/inquire-balance"
+ url = f"{self.base_url}/{endpoint}"
+ headers = self._get_headers(tr_id)
- headers = self._get_headers(tr_id=tr_id)
+ if use_hash and data:
+ hash_key = self.get_hash_key(data)
+ if hash_key:
+ headers["hashkey"] = hash_key
+
+ try:
+ if method == "GET":
+ res = requests.get(url, headers=headers, params=params)
+ else:
+ res = requests.post(url, headers=headers, json=data)
+
+ # 토큰 만료 체크 (500 에러 or msg_cd 확인)
+ is_token_error = False
+ try:
+ # KIS는 토큰 만료 시 500을 주거나 200/403 등과 함께 msg_cd로 알려줌
+ if res.status_code == 500 or res.status_code == 401 or res.status_code == 403:
+ err_data = res.json()
+ # EGW00121: 유효하지 않은 토큰, EGW00123: 만료된 토큰
+ if err_data.get('msg_cd') in ['EGW00121', 'EGW00123']:
+ is_token_error = True
+ except:
+ pass
+
+ if is_token_error:
+ print("🔄 [KIS] Token expired (caught). Refreshing...")
+ self.ensure_token(force=True)
+ headers = self._get_headers(tr_id)
+ if use_hash and data and "hashkey" in headers:
+ pass # Hash 재활용
+
+ if method == "GET":
+ res = requests.get(url, headers=headers, params=params)
+ else:
+ res = requests.post(url, headers=headers, json=data)
+
+ res.raise_for_status()
+ return res.json()
+
+ except Exception as e:
+ print(f"❌ [KIS] API Request Failed: {url} | {e}")
+ if isinstance(e, requests.exceptions.RequestException) and e.response is not None:
+ print(f"📄 [KIS Error Body]: {e.response.text}")
+ raise e
+
+ def get_balance(self):
+ """주식 잔고 조회"""
+ tr_id = "VTTC8434R" if self.is_virtual else "TTTC8434R"
+ endpoint = "uapi/domestic-stock/v1/trading/inquire-balance"
# 쿼리 파라미터
params = {
@@ -121,9 +243,7 @@ class KISClient:
}
try:
- res = requests.get(url, headers=headers, params=params)
- res.raise_for_status()
- data = res.json()
+ data = self._request_api("GET", endpoint, tr_id, params=params)
# 응답 정리
if data['rt_cd'] != '0':
@@ -149,10 +269,6 @@ class KISClient:
"deposit": int(summary['dnca_tot_amt'])
}
except Exception as e:
- print(f"❌ [KIS] 잔고 조회 실패: {e}")
- # If it's a requests error, verify if there's a response body
- if isinstance(e, requests.exceptions.RequestException) and e.response is not None:
- print(f"📄 [KIS Error Body]: {e.response.text}")
return {"error": str(e)}
def order(self, ticker, qty, buy_sell, price=0):
@@ -311,20 +427,14 @@ class KISClient:
"""지수 현재가 조회 (업종/지수)
ticker: 0001 (KOSPI), 1001 (KOSDAQ), etc.
"""
- self._throttle()
- self.ensure_token()
- url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-index-price"
- headers = self._get_headers(tr_id="FHKUP03500100")
-
+ endpoint = "uapi/domestic-stock/v1/quotations/inquire-index-price"
params = {
"FID_COND_MRKT_DIV_CODE": "U", # U: 업종/지수
"FID_INPUT_ISCD": ticker
}
try:
- res = requests.get(url, headers=headers, params=params)
- res.raise_for_status()
- data = res.json()
+ data = self._request_api("GET", endpoint, "FHKUP03500100", params=params)
if data['rt_cd'] != '0':
return None
return {
@@ -340,10 +450,7 @@ class KISClient:
def get_daily_index_price(self, ticker, period="D"):
"""지수 일별 시세 조회 (Market Stress Index용)"""
- self._throttle()
- self.ensure_token()
- url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-indexchartprice"
- headers = self._get_headers(tr_id="FHKUP03500200")
+ endpoint = "uapi/domestic-stock/v1/quotations/inquire-daily-indexchartprice"
# 날짜 계산 (최근 100일)
end_dt = datetime.now().strftime("%Y%m%d")
@@ -358,11 +465,8 @@ class KISClient:
"FID_ORG_ADJ_PRC": "0" # 수정주가 반영 여부
}
-
try:
- res = requests.get(url, headers=headers, params=params)
- res.raise_for_status()
- data = res.json()
+ data = self._request_api("GET", endpoint, "FHKUP03500200", params=params)
if data['rt_cd'] != '0':
return []
@@ -373,3 +477,38 @@ class KISClient:
except Exception as e:
print(f"❌ 지수 일별 시세 조회 실패({ticker}): {e}")
return []
+
+ def get_investor_trend(self, ticker):
+ """종목별 투자자(외인/기관) 매매동향 조회"""
+ self._throttle()
+ self.ensure_token()
+ url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-investor"
+ headers = self._get_headers(tr_id="FHKST01010900")
+
+ params = {
+ "FID_COND_MRKT_DIV_CODE": "J",
+ "FID_INPUT_ISCD": ticker
+ }
+
+ try:
+ res = requests.get(url, headers=headers, params=params)
+ res.raise_for_status()
+ data = res.json()
+ if data['rt_cd'] != '0':
+ return None
+
+ # output 리스트: [ {stck_bsop_date: 날짜, frgn_ntby_qty: 외인순매수, orgn_ntby_qty: 기관순매수, ...}, ... ]
+ trends = []
+ for item in data['output'][:5]: # 최근 5일치만
+ trends.append({
+ "date": item['stck_bsop_date'],
+ "foreigner": self._safe_int(item.get('frgn_ntby_qty')), # 외인 순매수량
+ "institutional": self._safe_int(item.get('orgn_ntby_qty')), # 기관 순매수량
+ "price_change": float(item['prdy_vrss']) # 전일대비 등락금액
+ })
+
+ # 최근일이 0번 인덱스임
+ return trends
+ except Exception as e:
+ print(f"❌ 투자자 동향 조회 실패({ticker}): {e}")
+ return None
diff --git a/modules/services/telegram_bot/runner.py b/modules/services/telegram_bot/runner.py
index b0e1278..1314f54 100644
--- a/modules/services/telegram_bot/runner.py
+++ b/modules/services/telegram_bot/runner.py
@@ -18,12 +18,14 @@ def run_telegram_bot_standalone():
from modules.services.telegram_bot.server import TelegramBotServer
from modules.utils.ipc import BotIPC
from modules.config import Config
-
+ from modules.utils.process_tracker import ProcessTracker
+
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
print("❌ [Telegram] TELEGRAM_BOT_TOKEN not found in .env")
sys.exit(1)
+ ProcessTracker.register("Telegram Bot Standalone")
print(f"🤖 [Telegram Bot Process] Starting... (PID: {os.getpid()})")
print(f"🔗 [Telegram Bot] Standalone Process Mode (IPC Enabled)")
diff --git a/modules/services/telegram_bot/server.py b/modules/services/telegram_bot/server.py
index be65ae6..48b8070 100644
--- a/modules/services/telegram_bot/server.py
+++ b/modules/services/telegram_bot/server.py
@@ -11,7 +11,7 @@ import logging
import subprocess
import sys
from telegram import Update
-from telegram.ext import Application, CommandHandler, ContextTypes, TypeHandler
+from telegram.ext import Application, CommandHandler, ContextTypes
from dotenv import load_dotenv
# 로깅 설정
@@ -39,7 +39,6 @@ class TelegramBotServer:
def refresh_bot_instance(self):
"""IPC에서 최신 봇 인스턴스 데이터 읽기"""
- # [수정] 모듈 경로 변경
from modules.utils.ipc import BotIPC
ipc = BotIPC()
self.bot_instance = ipc.get_bot_instance_data()
@@ -47,8 +46,9 @@ class TelegramBotServer:
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/start 명령어 핸들러"""
+ print(f"📨 [Telegram] /start received from user {update.effective_user.id}")
await update.message.reply_text(
- "🤖 **AI Trading Bot Command Center**\n"
+ "🤖 AI Trading Bot Command Center\n"
"명령어 목록:\n"
"/status - 현재 봇 및 시장 상태 조회\n"
"/portfolio - 현재 보유 종목 및 평가액\n"
@@ -57,11 +57,11 @@ class TelegramBotServer:
"/macro - 거시경제 지표 및 시장 위험도\n"
"/system - PC 리소스(CPU/GPU) 상태\n"
"/ai - AI 모델 학습 상태 조회\n\n"
- "**[관리 명령어]**\n"
+ "[관리 명령어]\n"
"/restart - 봇 재시작\n"
- "/exec - 원격 명령어 실행\n"
+ "/exec 명령어 - 원격 명령어 실행\n"
"/stop - 봇 종료",
- parse_mode="Markdown"
+ parse_mode="HTML"
)
async def status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
@@ -74,13 +74,13 @@ class TelegramBotServer:
now = datetime.now()
is_market_open = (9 <= now.hour < 15) or (now.hour == 15 and now.minute < 30)
- status_msg = "✅ **System Status: ONLINE**\n"
- status_msg += f"🕒 **Market:** {'OPEN 🟢' if is_market_open else 'CLOSED 🔴'}\n"
+ status_msg = "✅ System Status: ONLINE\n"
+ status_msg += f"🕒 Market: {'OPEN 🟢' if is_market_open else 'CLOSED 🔴'}\n"
macro_warn = self.bot_instance.is_macro_warning_sent
- status_msg += f"🌍 **Macro Filter:** {'DANGER 🚨 (Trading Halted)' if macro_warn else 'SAFE 🟢'}\n"
+ status_msg += f"🌍 Macro Filter: {'DANGER 🚨 (Trading Halted)' if macro_warn else 'SAFE 🟢'}\n"
- await update.message.reply_text(status_msg, parse_mode="Markdown")
+ await update.message.reply_text(status_msg, parse_mode="HTML")
async def portfolio_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/portfolio: 잔고 조회"""
@@ -96,19 +96,19 @@ class TelegramBotServer:
await update.message.reply_text(f"❌ 잔고 조회 실패: {balance['error']}")
return
- msg = f"💰 **Total Asset:** `{int(balance['total_eval']):,} KRW`\n" \
- f"💵 **Deposit:** `{int(balance['deposit']):,} KRW`\n\n"
+ msg = f"💰 Total Asset: {int(balance['total_eval']):,} KRW\n" \
+ f"💵 Deposit: {int(balance['deposit']):,} KRW\n\n"
if balance['holdings']:
- msg += "**[Holdings]**\n"
+ msg += "[Holdings]\n"
for stock in balance['holdings']:
icon = "🔴" if stock['yield'] > 0 else "🔵"
- msg += f"{icon} **{stock['name']}** `{stock['yield']}%`\n" \
+ msg += f"{icon} {stock['name']} {stock['yield']}%\n" \
f" (수량: {stock['qty']} / 평가손익: {stock['profit_loss']:,})\n"
else:
msg += "보유 중인 종목이 없습니다."
- await update.message.reply_text(msg, parse_mode="Markdown")
+ await update.message.reply_text(msg, parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"❌ Error: {str(e)}")
@@ -116,41 +116,41 @@ class TelegramBotServer:
async def watchlist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/watchlist: 감시 대상 종목"""
if not self.refresh_bot_instance():
+ await update.message.reply_text("⚠️ 봇 인스턴스가 연결되지 않았습니다.")
return
target_dict = self.bot_instance.load_watchlist()
discovered = list(self.bot_instance.discovered_stocks)
- msg = f"👀 **Watchlist: {len(target_dict)} items**\n"
+ msg = f"👀 Watchlist: {len(target_dict)} items\n"
for code, name in target_dict.items():
themes = self.bot_instance.theme_manager.get_themes(code)
theme_str = f" ({', '.join(themes)})" if themes else ""
msg += f"- {name}{theme_str}\n"
if discovered:
- msg += f"\n✨ **Discovered Today ({len(discovered)}):**\n"
+ msg += f"\n✨ Discovered Today ({len(discovered)}):\n"
for code in discovered:
msg += f"- {code}\n"
- await update.message.reply_text(msg)
+ await update.message.reply_text(msg, parse_mode="HTML")
async def update_watchlist_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/update_watchlist: Watchlist 즉시 업데이트"""
await update.message.reply_text("🔄 Watchlist를 업데이트하고 있습니다... (30초 소요)")
try:
- # [수정] IPC 모드에서도 직접 수행하기 위해 새로운 인스턴스 생성
from modules.services.kis import KISClient
from watchlist_manager import WatchlistManager
-
- # 독립적인 KIS 클라이언트 생성
from modules.config import Config
+
temp_kis = KISClient()
mgr = WatchlistManager(temp_kis, watchlist_file=Config.WATCHLIST_FILE)
- # 업데이트 수행 (파일 쓰기)
summary = mgr.update_watchlist_daily()
- await update.message.reply_text(summary, parse_mode="Markdown")
+ # HTML 특수문자 이스케이프
+ summary = summary.replace("&", "&").replace("<", "<").replace(">", ">")
+ await update.message.reply_text(summary)
except Exception as e:
await update.message.reply_text(f"❌ 업데이트 실패: {e}")
@@ -164,32 +164,29 @@ class TelegramBotServer:
await update.message.reply_text("⏳ 거시경제 데이터를 불러옵니다...")
try:
- # [수정] IPC 데이터를 직접 사용하여 출력 (FakeKIS의 _macro_indices 활용)
- # FakeKIS는 bot_instance.kis에 할당되어 있음
indices = getattr(self.bot_instance.kis, '_macro_indices', {})
if not indices:
await update.message.reply_text("⚠️ 데이터가 아직 수집되지 않았습니다. 잠시 후 다시 시도하세요.")
return
- # 리스크 점수 계산 (간이)
status = "SAFE"
msi = indices.get('MSI', 0)
if msi >= 50: status = "DANGER"
elif msi >= 30: status = "CAUTION"
color = "🟢" if status == "SAFE" else "🔴" if status == "DANGER" else "🟡"
- msg = f"{color} **Market Risk: {status}**\n\n"
+ msg = f"{color} Market Risk: {status}\n\n"
if 'MSI' in indices:
- msg += f"🌡️ **Stress Index:** `{indices['MSI']}`\n"
+ msg += f"🌡️ Stress Index: {indices['MSI']}\n"
for k, v in indices.items():
if k != "MSI":
icon = "🔺" if v.get('change', 0) > 0 else "🔻"
- msg += f"{icon} **{k}**: {v.get('price', 0)} ({v.get('change', 0)}%)\n"
+ msg += f"{icon} {k}: {v.get('price', 0)} ({v.get('change', 0)}%)\n"
- await update.message.reply_text(msg, parse_mode="Markdown")
+ await update.message.reply_text(msg, parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"❌ Error: {e}")
@@ -220,97 +217,101 @@ class TelegramBotServer:
top_3 = top_processes[:3]
gpu_status = self.bot_instance.ollama_monitor.get_gpu_status()
- gpu_msg = f"N/A"
+ gpu_msg = "N/A"
if gpu_status and gpu_status.get('name') != 'N/A':
gpu_name = gpu_status.get('name', 'GPU')
gpu_msg = f"{gpu_name}\n Temp: {gpu_status.get('temp', 0)}°C / VRAM: {gpu_status.get('vram_used', 0)}GB / {gpu_status.get('vram_total', 0)}GB"
- msg = "🖥️ **PC System Status**\n" \
- f"🧠 **CPU:** `{cpu}%`\n" \
- f"💾 **RAM:** `{ram}%`\n" \
- f"🎮 **GPU:** {gpu_msg}\n\n"
+ msg = "🖥️ PC System Status\n" \
+ f"🧠 CPU: {cpu}%\n" \
+ f"💾 RAM: {ram}%\n" \
+ f"🎮 GPU: {gpu_msg}\n\n"
if top_3:
- msg += "⚙️ **Top CPU Processes:**\n"
+ msg += "⚙️ Top CPU Processes:\n"
for i, proc in enumerate(top_3, 1):
proc_name = proc.get('name', 'Unknown')
proc_cpu = proc.get('cpu_percent', 0)
- msg += f" {i}. `{proc_name}` - {proc_cpu:.1f}%\n"
+ msg += f" {i}. {proc_name} - {proc_cpu:.1f}%\n"
- await update.message.reply_text(msg, parse_mode="Markdown")
+ await update.message.reply_text(msg, parse_mode="HTML")
async def ai_status_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/ai: AI 모델 학습 상태 조회"""
+ if not self.refresh_bot_instance():
+ await update.message.reply_text("⚠️ 메인 봇이 실행 중이 아닙니다.")
+ return
+
gpu = self.bot_instance.ollama_monitor.get_gpu_status()
- msg = "🧠 **AI Model Status**\n"
- msg += f"• **LLM Engine:** Ollama (Llama 3.1)\n"
+ msg = "🧠 AI Model Status\n"
+ msg += "• LLM Engine: Ollama (Llama 3.1)\n"
gpu_name = gpu.get('name', 'NVIDIA RTX 5070 Ti')
- msg += f"• **Device:** {gpu_name}\n"
+ msg += f"• Device: {gpu_name}\n"
if gpu:
- msg += f"• **GPU Load:** `{gpu.get('load', 0)}%`\n"
- msg += f"• **VRAM Usage:** `{gpu.get('vram_used', 0)}GB` / {gpu.get('vram_total', 0)}GB"
+ msg += f"• GPU Load: {gpu.get('load', 0)}%\n"
+ msg += f"• VRAM Usage: {gpu.get('vram_used', 0)}GB / {gpu.get('vram_total', 0)}GB"
- await update.message.reply_text(msg, parse_mode="Markdown")
+ await update.message.reply_text(msg, parse_mode="HTML")
async def restart_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/restart: 텔레그램 봇 모듈만 재시작"""
- await update.message.reply_text("🔄 **텔레그램 인터페이스를 재시작합니다...**")
+ await update.message.reply_text("🔄 텔레그램 인터페이스를 재시작합니다...", parse_mode="HTML")
- # 재시작 플래그 설정 (runner.py에서 감지하여 재시작)
self.should_restart = True
self.application.stop_running()
async def stop_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/stop: 봇 종료"""
- await update.message.reply_text("🛑 **텔레그램 봇을 종료합니다.**")
+ await update.message.reply_text("🛑 텔레그램 봇을 종료합니다.", parse_mode="HTML")
- # 종료 플래그 설정 (runner.py에서 루프 탈출)
self.should_restart = False
self.application.stop_running()
async def exec_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
- """/exec: 원격 명령어 실행"""
- # ultimate_handler에서는 context.args가 비어있으므로 직접 파싱
+ """/exec: 원격 명령어 실행 (Non-blocking)"""
text = update.message.text.strip()
- parts = text.split(maxsplit=1) # "/exec" 와 나머지 명령어로 분리
+ parts = text.split(maxsplit=1)
if len(parts) < 2:
- await update.message.reply_text("❌ 사용법: /exec ")
+ await update.message.reply_text("❌ 사용법: /exec 명령어")
return
- command = parts[1] # "/exec" 이후의 모든 텍스트
- await update.message.reply_text(f"⚙️ 실행 중: `{command}`", parse_mode="Markdown")
+ command = parts[1]
+ await update.message.reply_text(f"⚙️ 실행 중: {command}", parse_mode="HTML")
try:
# 보안: 위험한 명령어 차단
- dangerous_keywords = ['rm', 'del', 'format', 'shutdown', 'reboot', 'ipconfig']
+ dangerous_keywords = ['rm', 'del', 'format', 'shutdown', 'reboot']
if any(keyword in command.lower() for keyword in dangerous_keywords):
await update.message.reply_text("⛔ 위험한 명령어는 실행할 수 없습니다.")
return
- # Windows에서는 PowerShell을 명시적으로 사용
import platform
- if platform.system() == 'Windows':
- exec_command = ['powershell', '-Command', command]
+ is_windows = platform.system() == 'Windows'
+
+ if is_windows:
+ exec_cmd = ['powershell', '-Command', command]
else:
- exec_command = command
+ exec_cmd = command
- # 명령어 실행 (타임아웃 30초)
- result = subprocess.run(
- exec_command,
- shell=False if platform.system() == 'Windows' else True,
- capture_output=True,
- text=True,
- encoding='utf-8',
- errors='replace', # 인코딩 오류 무시
- timeout=30,
- cwd=os.getcwd()
- )
+ def run_subprocess():
+ return subprocess.run(
+ exec_cmd,
+ shell=not is_windows,
+ capture_output=True,
+ text=True,
+ encoding='utf-8',
+ errors='replace',
+ timeout=30,
+ cwd=os.getcwd()
+ )
+
+ loop = asyncio.get_running_loop()
+ result = await loop.run_in_executor(None, run_subprocess)
- # stdout와 stderr 모두 확인
output = result.stdout.strip() if result.stdout else ""
error_output = result.stderr.strip() if result.stderr else ""
@@ -323,97 +324,53 @@ class TelegramBotServer:
else:
combined = "명령어 실행 완료 (출력 없음)"
- # 출력이 너무 길면 잘라내기
if len(combined) > 3000:
- combined = combined[:3000] + "\n... (출력이 너무 깁니다)"
+ combined = combined[:3000] + "\n... (Truncated)"
- await update.message.reply_text(f"```\n{combined}\n```", parse_mode="Markdown")
+ # HTML 특수문자 이스케이프
+ combined = combined.replace("&", "&").replace("<", "<").replace(">", ">")
+ await update.message.reply_text(f"{combined}", parse_mode="HTML")
- except subprocess.TimeoutExpired:
- await update.message.reply_text("⏱️ 명령어 실행 시간 초과 (30초)")
+ except asyncio.TimeoutError:
+ await update.message.reply_text("⏱️ 명령어 실행 시간 초과 (30초)")
except Exception as e:
- print(f"❌ [Telegram /exec] Error: {e}")
- import traceback
- traceback.print_exc()
- await update.message.reply_text(f"❌ 실행 오류: {str(e)}")
+ await update.message.reply_text(f"❌ 실행 오류: {e}")
def run(self):
- """봇 실행 (비동기 polling)"""
- async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
- """에러 핸들러"""
- import traceback
+ """봇 실행 (Handler 등록 및 Polling)"""
+ handlers = [
+ ("start", self.start_command),
+ ("status", self.status_command),
+ ("portfolio", self.portfolio_command),
+ ("watchlist", self.watchlist_command),
+ ("update_watchlist", self.update_watchlist_command),
+ ("macro", self.macro_command),
+ ("system", self.system_command),
+ ("ai", self.ai_status_command),
+ ("restart", self.restart_command),
+ ("stop", self.stop_command),
+ ("exec", self.exec_command)
+ ]
+
+ for cmd, func in handlers:
+ self.application.add_handler(CommandHandler(cmd, func))
- # Conflict 에러는 무시 (다른 봇 인스턴스 실행 중)
+ async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
if "Conflict" in str(context.error):
- print(f"⚠️ [Telegram] 다른 봇 인스턴스가 실행 중입니다. 이 인스턴스를 종료합니다.")
+ print(f"⚠️ [Telegram] Conflict detected. Stopping...")
if self.application.running:
await self.application.stop()
return
-
- tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
- tb_string = ''.join(tb_list)
- print(f"❌ [Telegram Error] {tb_string}")
-
- if isinstance(update, Update) and update.effective_message:
- try:
- await update.effective_message.reply_text(f"⚠️ 오류 발생: {context.error}")
- except:
- pass
+ print(f"❌ [Telegram Error] {context.error}")
- async def ultimate_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
- if not update.message or not update.message.text:
- return
-
- text = update.message.text.strip()
- print(f"📨 [Telegram] Command Received: {text}")
-
- try:
- if text.startswith("/start"):
- await self.start_command(update, context)
- elif text.startswith("/status"):
- await self.status_command(update, context)
- elif text.startswith("/portfolio"):
- await self.portfolio_command(update, context)
- elif text.startswith("/watchlist"):
- await self.watchlist_command(update, context)
- elif text.startswith("/update_watchlist"):
- await self.update_watchlist_command(update, context)
- elif text.startswith("/macro"):
- await self.macro_command(update, context)
- elif text.startswith("/system"):
- await self.system_command(update, context)
- elif text.startswith("/ai"):
- await self.ai_status_command(update, context)
- elif text.startswith("/restart"):
- await self.restart_command(update, context)
- elif text.startswith("/stop"):
- await self.stop_command(update, context)
- elif text.startswith("/exec"):
- await self.exec_command(update, context)
- except Exception as e:
- print(f"❌ Handle Error: {e}")
- await update.message.reply_text(f"⚠️ Error: {e}")
-
- # 에러 핸들러 등록
self.application.add_error_handler(error_handler)
- self.application.add_handler(TypeHandler(Update, ultimate_handler))
- # [최적화] Polling 설정 개선
- print("🤖 [Telegram] Command Server Started (Optimized Polling Mode).")
+ print("🤖 [Telegram] Command Server Started (Standard Polling Mode).")
try:
self.application.run_polling(
allowed_updates=Update.ALL_TYPES,
- stop_signals=None,
- poll_interval=1.0, # 1초마다 폴링 (기본값 0.0)
- timeout=10, # 타임아웃 10초
- drop_pending_updates=True # 대기 중인 업데이트 무시
+ drop_pending_updates=True
)
except Exception as e:
- if "Conflict" in str(e):
- print(f"⚠️ [Telegram] 다른 봇 인스턴스가 실행 중입니다.")
- print(f"⚠️ [Telegram] 기존 봇을 종료하고 다시 시도하세요.")
- else:
- print(f"❌ [Telegram] 봇 실행 오류: {e}")
- import traceback
- traceback.print_exc()
+ print(f"❌ [Telegram] Polling Error: {e}")
diff --git a/modules/strategy/process.py b/modules/strategy/process.py
index b77642d..ef39f68 100644
--- a/modules/strategy/process.py
+++ b/modules/strategy/process.py
@@ -16,130 +16,179 @@ def get_predictor():
_lstm_predictor = PricePredictor()
return _lstm_predictor
-def analyze_stock_process(ticker, prices, news_items):
+def analyze_stock_process(ticker, prices, news_items, investor_trend=None):
"""
[CPU Intensive] 기술적 분석 및 AI 판단을 수행하는 함수
(ProcessPoolExecutor에서 실행됨)
"""
- print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles)...")
-
- # 1. 기술적 지표 계산
- current_price = prices[-1] if prices else 0
- # [수정] 변동성, 거래량 비율 반환 (거래량 데이터가 없으면 None 전달)
- tech_score, rsi, volatility, vol_ratio = TechnicalAnalyzer.get_technical_score(current_price, prices, volume_history=None)
-
- # 2. LSTM 주가 예측
- # [최적화] 전역 캐시된 Predictor 사용
- lstm_predictor = get_predictor()
- if lstm_predictor:
- lstm_predictor.training_status['current_ticker'] = ticker
- pred_result = lstm_predictor.train_and_predict(prices)
-
- lstm_score = 0.5 # 중립
- ai_confidence = 0.5
- ai_loss = 1.0
-
- if pred_result:
- ai_confidence = pred_result.get('confidence', 0.5)
- ai_loss = pred_result.get('loss', 1.0)
-
- # 상승/하락 예측에 따라 점수 조정 (신뢰도 반영)
- # 최대 5% 변동폭까지 반영
- change_magnitude = min(abs(pred_result['change_rate']), 5.0) / 5.0
-
- if pred_result['trend'] == 'UP':
- # 상승 예측 시: 기본 0.5 + (강도 * 신뢰도 * 0.4) -> 최대 0.9
- lstm_score = 0.5 + (change_magnitude * ai_confidence * 0.4)
- else:
- # 하락 예측 시: 기본 0.5 - (강도 * 신뢰도 * 0.4) -> 최소 0.1
- lstm_score = 0.5 - (change_magnitude * ai_confidence * 0.4)
-
- lstm_score = max(0.0, min(1.0, lstm_score))
-
- # 3. AI 뉴스 분석
- ollama = OllamaManager()
- prompt = f"""
- [System Instruction]
- 1. Role: You are a Expert Quant Trader with 20 years of experience.
- 2. Market Data:
- - Technical Score: {tech_score:.2f} (RSI: {rsi:.2f})
- - AI Prediction: {pred_result['predicted']:.0f} KRW ({pred_result['change_rate']}%)
- - AI Confidence: {ai_confidence:.2f} (Loss: {ai_loss:.4f})
- 3. Strategy:
- - If AI Confidence > 0.8 and Trend is UP -> Strong BUY signal.
- - If Tech Score > 0.7 -> BUY signal.
- - If Trend is DOWN -> SELL/AVOID.
- 4. Task: Analyze the news and combine with market data to decide sentiment.
-
- News Data: {json.dumps(news_items, ensure_ascii=False)}
-
- Response (JSON):
- {{
- "sentiment_score": 0.8,
- "reason": "AI confidence is high and news supports the uptrend."
- }}
- """
- ai_resp = ollama.request_inference(prompt)
- sentiment_score = 0.5
try:
- data = json.loads(ai_resp)
- sentiment_score = float(data.get("sentiment_score", 0.5))
- except:
- pass
+ print(f"⚙️ [Bot Process] Analyzing {ticker} ({len(prices)} candles)...")
- # 4. 통합 점수 (동적 가중치)
- # AI 신뢰도가 높으면 AI 비중을 대폭 상향
- if ai_confidence >= 0.85:
- w_tech, w_news, w_ai = 0.2, 0.2, 0.6
- print(f" 🤖 [High Confidence] AI Weight Boosted to 60%")
- else:
- w_tech, w_news, w_ai = 0.4, 0.3, 0.3
+ # 1. 기술적 지표 계산
+ current_price = prices[-1] if prices else 0
+ # [수정] 변동성, 거래량 비율, MA 정보 반환
+ tech_score, rsi, volatility, vol_ratio, ma_info = TechnicalAnalyzer.get_technical_score(current_price, prices, volume_history=None)
- total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score)
-
- decision = "HOLD"
-
- # [신규] 강한 단일 신호 매수 로직 (기준 강화)
- strong_signal = False
- strong_reason = ""
-
- if tech_score >= 0.80:
- strong_signal = True
- strong_reason = "Super Strong Technical"
- elif lstm_score >= 0.80 and ai_confidence >= 0.8:
- strong_signal = True
- strong_reason = f"High Confidence AI Buy (Conf: {ai_confidence})"
- elif sentiment_score >= 0.85:
- strong_signal = True
- strong_reason = "Strong News Sentiment"
-
- if strong_signal:
- decision = "BUY"
- print(f" 🎯 [{strong_reason}] Overriding to BUY!")
- elif total_score >= 0.60: # (0.5 -> 0.6 상향 조정으로 보수적 접근)
- decision = "BUY"
- elif total_score <= 0.30:
- decision = "SELL"
-
- print(f" └─ Scores: Tech={tech_score:.2f} News={sentiment_score:.2f} LSTM={lstm_score:.2f}(Conf:{ai_confidence:.2f}) → Total={total_score:.2f} [{decision}]")
+ # 2. LSTM 주가 예측
+ # [최적화] 전역 캐시된 Predictor 사용
+ lstm_predictor = get_predictor()
+ if lstm_predictor:
+ lstm_predictor.training_status['current_ticker'] = ticker
+ pred_result = lstm_predictor.train_and_predict(prices)
- # [신규] 변동성(Volatility) 계산
- if len(prices) > 1:
- prices_np = np.array(prices)
- changes = np.diff(prices_np) / prices_np[:-1]
- volatility = np.std(changes) * 100 # 퍼센트 단위
- else:
- volatility = 0.0
+ lstm_score = 0.5 # 중립
+ ai_confidence = 0.5
+ ai_loss = 1.0
- return {
- "ticker": ticker,
- "score": total_score,
- "tech": tech_score,
- "sentiment": sentiment_score,
- "lstm_score": lstm_score,
- "volatility": volatility,
- "volume_ratio": vol_ratio,
- "prediction": pred_result,
- "decision": decision,
- "current_price": current_price
- }
+ if pred_result:
+ ai_confidence = pred_result.get('confidence', 0.5)
+ ai_loss = pred_result.get('loss', 1.0)
+
+ # 상승/하락 예측에 따라 점수 조정 (신뢰도 반영)
+ # 최대 5% 변동폭까지 반영
+ change_magnitude = min(abs(pred_result['change_rate']), 5.0) / 5.0
+
+ if pred_result['trend'] == 'UP':
+ # 상승 예측 시: 기본 0.5 + (강도 * 신뢰도 * 0.4) -> 최대 0.9
+ lstm_score = 0.5 + (change_magnitude * ai_confidence * 0.4)
+ else:
+ # 하락 예측 시: 기본 0.5 - (강도 * 신뢰도 * 0.4) -> 최소 0.1
+ lstm_score = 0.5 - (change_magnitude * ai_confidence * 0.4)
+
+ lstm_score = max(0.0, min(1.0, lstm_score))
+
+ # [신규] 수급 분석 (외인/기관)
+ investor_score = 0.0
+ frgn_net_buy = 0
+ orgn_net_buy = 0
+ consecutive_frgn_buy = 0
+
+ if investor_trend:
+ # 최근 5일 합산
+ for day in investor_trend:
+ frgn_net_buy += day['foreigner']
+ orgn_net_buy += day['institutional']
+ if day['foreigner'] > 0:
+ consecutive_frgn_buy += 1
+
+ # 외인 수급 점수 (단순화)
+ if frgn_net_buy > 0:
+ investor_score += 0.05
+ if consecutive_frgn_buy >= 3:
+ investor_score += 0.05
+
+ if investor_score > 0:
+ print(f" 💰 [Investor] Foreign Buy Detected (Net: {frgn_net_buy})")
+
+ # 3. AI 뉴스 분석
+ # pred_result가 None일 경우 기본값 사용
+ if pred_result:
+ pred_price = pred_result.get('predicted', 0)
+ pred_change = pred_result.get('change_rate', 0)
+ else:
+ pred_price = current_price
+ pred_change = 0.0
+
+ ollama = OllamaManager()
+ prompt = f"""
+ [System Instruction]
+ 1. Role: You are a Expert Quant Trader with 20 years of experience.
+ 2. Market Data:
+ - Technical Score: {tech_score:.2f} (RSI: {rsi:.2f})
+ - Moving Average: {ma_info['trend']} (Price is {ma_info['position']})
+ - AI Prediction: {pred_price:.0f} KRW ({pred_change}%)
+ - AI Confidence: {ai_confidence:.2f} (Loss: {ai_loss:.4f})
+ - Investor Trend (5 Days): Foreigner Net Buy {frgn_net_buy}, Institutional Net Buy {orgn_net_buy}
+ 3. Strategy:
+ - If Foreigners are buying AND Trend is UP -> Strong BUY.
+ - If AI Confidence > 0.8 and Trend is UP -> Strong BUY.
+ - If MA is Bullish (Golden Alignment) -> Positive Signal.
+ - If Price is above MA20 -> Support Uptrend.
+ - If Trend is DOWN -> SELL/AVOID.
+ 4. Task: Analyze the news and combine with market data to decide sentiment.
+
+ News Data: {json.dumps(news_items, ensure_ascii=False)}
+
+ Response (JSON):
+ {{
+ "sentiment_score": 0.8,
+ "reason": "Foreigners buying and Golden Cross detected."
+ }}
+ """
+ ai_resp = ollama.request_inference(prompt)
+ sentiment_score = 0.5
+ try:
+ data = json.loads(ai_resp)
+ sentiment_score = float(data.get("sentiment_score", 0.5))
+ except:
+ pass
+
+ # 4. 통합 점수 (동적 가중치)
+ # AI 신뢰도가 높으면 AI 비중을 대폭 상향
+ if ai_confidence >= 0.85:
+ w_tech, w_news, w_ai = 0.2, 0.2, 0.6
+ print(f" 🤖 [High Confidence] AI Weight Boosted to 60%")
+ else:
+ w_tech, w_news, w_ai = 0.4, 0.3, 0.3
+
+ total_score = (w_tech * tech_score) + (w_news * sentiment_score) + (w_ai * lstm_score)
+
+ # [수신] 수급 가산점 추가 (최대 +0.1)
+ total_score += investor_score
+ total_score = min(total_score, 1.0)
+
+ decision = "HOLD"
+
+ # [신규] 강한 단일 신호 매수 로직 (기준 강화)
+ strong_signal = False
+ strong_reason = ""
+
+ if tech_score >= 0.80:
+ strong_signal = True
+ strong_reason = "Super Strong Technical"
+ elif lstm_score >= 0.80 and ai_confidence >= 0.8:
+ strong_signal = True
+ strong_reason = f"High Confidence AI Buy (Conf: {ai_confidence})"
+ elif sentiment_score >= 0.85:
+ strong_signal = True
+ strong_reason = "Strong News Sentiment"
+ elif investor_score >= 0.1 and total_score >= 0.6: # 외인 수급이 좋고 전체 점수 양호
+ strong_signal = True
+ strong_reason = "Strong Foreigner Buying"
+
+ if strong_signal:
+ decision = "BUY"
+ print(f" 🎯 [{strong_reason}] Overriding to BUY!")
+ elif total_score >= 0.60: # (0.5 -> 0.6 상향 조정으로 보수적 접근)
+ decision = "BUY"
+ elif total_score <= 0.30:
+ decision = "SELL"
+
+ print(f" └─ Scores: Tech={tech_score:.2f} News={sentiment_score:.2f} LSTM={lstm_score:.2f} → Total={total_score:.2f} [{decision}]")
+
+ return {
+ "ticker": ticker,
+ "score": total_score,
+ "tech": tech_score,
+ "sentiment": sentiment_score,
+ "lstm_score": lstm_score,
+ "volatility": volatility,
+ "volume_ratio": vol_ratio,
+ "prediction": pred_result,
+ "decision": decision,
+ "current_price": current_price,
+ "ma_info": ma_info
+ }
+
+ except Exception as e:
+ print(f"❌ [Worker Error] Failed to analyze {ticker}: {e}")
+ import traceback
+ traceback.print_exc()
+ # 기본 실패 응답 반환 (프로세스 크래시 방지)
+ return {
+ "ticker": ticker,
+ "score": 0.0,
+ "decision": "HOLD",
+ "current_price": 0,
+ "error": str(e)
+ }
diff --git a/modules/utils/process_tracker.py b/modules/utils/process_tracker.py
new file mode 100644
index 0000000..55f7ce2
--- /dev/null
+++ b/modules/utils/process_tracker.py
@@ -0,0 +1,78 @@
+import os
+import time
+
+class ProcessTracker:
+ FILE_PATH = "pids.txt"
+
+ @staticmethod
+ def register(name):
+ """현재 프로세스의 PID와 이름을 기록"""
+ pid = os.getpid()
+ entry = f"{pid}: {name} (Started: {time.strftime('%Y-%m-%d %H:%M:%S')})\n"
+
+ try:
+ # 파일이 없으면 생성, 있으면 추가
+ # 단, main_server 시작 시 초기화하는 것이 좋음
+ with open(ProcessTracker.FILE_PATH, "a", encoding="utf-8") as f:
+ f.write(entry)
+ print(f"📌 Process Registered: {name} (PID: {pid})")
+ except Exception as e:
+ print(f"⚠️ Failed to register process: {e}")
+
+ @staticmethod
+ def check_and_kill_zombies():
+ """
+ pids.txt에 기록된 이전 프로세스들이 구동 중이라면 강제 종료.
+ 서버 시작 시 1회 호출하여 좀비 프로세스를 정리함.
+ """
+ if not os.path.exists(ProcessTracker.FILE_PATH):
+ return
+
+ print("🔍 Checking for zombie processes...")
+ try:
+ import psutil
+ current_pid = os.getpid()
+
+ with open(ProcessTracker.FILE_PATH, "r", encoding="utf-8") as f:
+ lines = f.readlines()
+
+ killed_count = 0
+ for line in lines:
+ if ":" not in line or "Running Processes" in line:
+ continue
+
+ try:
+ pid_str = line.split(":")[0].strip()
+ pid = int(pid_str)
+
+ if pid == current_pid:
+ continue
+
+ if psutil.pid_exists(pid):
+ proc = psutil.Process(pid)
+ proc_name = proc.name()
+
+ # Python 프로세스만 타겟
+ if "python" in proc_name.lower():
+ print(f"💀 Killing Zombie Process: {pid} ({line.strip()})")
+ proc.kill()
+ killed_count += 1
+ except (ValueError, psutil.NoSuchProcess, psutil.AccessDenied):
+ continue
+
+ if killed_count > 0:
+ print(f"✅ Cleaned up {killed_count} zombie processes.")
+ # 파일 초기화
+ ProcessTracker.clear()
+
+ except Exception as e:
+ print(f"⚠️ Failed to kill zombies: {e}")
+
+ @staticmethod
+ def clear():
+ """PID 파일 초기화"""
+ try:
+ with open(ProcessTracker.FILE_PATH, "w", encoding="utf-8") as f:
+ f.write(f"--- Running Processes (Last Update: {time.strftime('%Y-%m-%d %H:%M:%S')}) ---\n")
+ except:
+ pass