Files
gahusb 7ea1a21487 refactor: web-ai V1 assets → signal_v1/ (graduation prep)
Atomic mv of root V1 assets (main_server.py + modules/ + data/ +
tests/ + entry scripts + docs + logs) into signal_v1/ subdirectory.
load_dotenv() updated to load web-ai/.env explicitly via Path.

Adds web-ai/CLAUDE.md (workspace guide) and web-ai/start.bat
(signal_v1 entry wrapper). Prepares for signal_v2/ Phase 2.

Tests: signal_v1/tests/unit baseline preserved (no regression).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 03:00:11 +09:00

209 lines
7.1 KiB
Python

"""
프로세스 간 통신 (IPC) - Shared Memory 기반
텔레그램 봇과 메인 봇 간 데이터 공유 + 양방향 명령 채널
"""
import json
import time
import struct
from multiprocessing.shared_memory import SharedMemory
from modules.config import Config
class SharedIPC:
"""Shared Memory + Command Queue 기반 IPC"""
def __init__(self, lock=None, command_queue=None):
self.lock = lock
self.command_queue = command_queue
self._shm = None
self._is_creator = False
def _ensure_shm(self):
"""SharedMemory 블록에 연결 (없으면 생성)"""
if self._shm is not None:
return self._shm
try:
self._shm = SharedMemory(name=Config.SHM_NAME, create=False)
except FileNotFoundError:
self._shm = SharedMemory(name=Config.SHM_NAME, create=True, size=Config.SHM_SIZE)
self._is_creator = True
# 초기화: 길이 필드를 0으로 설정
struct.pack_into('I', self._shm.buf, 0, 0)
return self._shm
def write_status(self, data):
"""메인 봇이 상태를 shared memory에 기록"""
try:
shm = self._ensure_shm()
payload = json.dumps({
'timestamp': time.time(),
'data': data
}, ensure_ascii=False).encode('utf-8')
if len(payload) + 4 > Config.SHM_SIZE:
print(f"[IPC] Data too large: {len(payload)} bytes")
return
if self.lock:
self.lock.acquire()
try:
# [4바이트 길이][JSON 페이로드]
struct.pack_into('I', shm.buf, 0, len(payload))
shm.buf[4:4 + len(payload)] = payload
finally:
if self.lock:
self.lock.release()
except Exception as e:
print(f"[IPC] Write failed: {e}")
def read_status(self):
"""텔레그램 봇이 상태를 shared memory에서 읽기"""
raw = None
try:
shm = self._ensure_shm()
if self.lock:
self.lock.acquire()
try:
length = struct.unpack_from('I', shm.buf, 0)[0]
if length > 0 and length <= Config.SHM_SIZE - 4:
raw = bytes(shm.buf[4:4 + length])
finally:
if self.lock:
self.lock.release()
if not raw:
return None
ipc_data = json.loads(raw.decode('utf-8'))
age = time.time() - ipc_data.get('timestamp', 0)
if age > Config.IPC_STALENESS:
print(f"[IPC] Data too old: {age:.1f}s")
return None
return ipc_data.get('data')
except Exception as e:
print(f"[IPC] Read failed: {e}")
return None
# --- 명령 채널 (텔레그램 → 메인 봇) ---
def send_command(self, command, **kwargs):
"""텔레그램 → 메인 봇 명령 전송"""
if self.command_queue:
try:
self.command_queue.put_nowait({
'command': command,
'timestamp': time.time(),
**kwargs
})
return True
except Exception as e:
print(f"[IPC] Command send failed: {e}")
return False
def poll_commands(self):
"""메인 봇이 명령 큐를 폴링"""
commands = []
if self.command_queue:
try:
while not self.command_queue.empty():
cmd = self.command_queue.get_nowait()
commands.append(cmd)
except Exception:
pass
return commands
# --- FakeBot 인스턴스 (호환성 유지) ---
def get_bot_instance_data(self):
"""봇 인스턴스 데이터 가져오기 (텔레그램 봇용)"""
status = self.read_status()
if not status:
return None
class FakeBotInstance:
def __init__(self, data):
self.kis = FakeKIS(data.get('balance', {}), data.get('macro_indices', {}))
self.ollama_monitor = FakeOllama(data.get('gpu', {}))
self.theme_manager = FakeThemeManager(data.get('themes', {}))
self.discovered_stocks = set(data.get('discovered_stocks', []))
self.is_macro_warning_sent = data.get('is_macro_warning', False)
self.watchlist_manager = FakeWatchlistManager(data.get('watchlist', {}))
self.load_watchlist = lambda: data.get('watchlist', {})
class FakeKIS:
def __init__(self, balance_data, macro_indices):
self._balance = balance_data if balance_data else {
'total_eval': 0, 'deposit': 0, 'holdings': []
}
self._macro_indices = macro_indices if macro_indices else {}
def get_balance(self):
return self._balance
def get_current_index(self, ticker):
if ticker in self._macro_indices:
return self._macro_indices[ticker]
return {'price': 2500.0, 'change': 0.0}
def get_daily_index_price(self, ticker, period="D"):
base_price = 2500.0
if ticker in self._macro_indices:
base_price = self._macro_indices[ticker].get('price', 2500.0)
import random
return [base_price * (1 + random.uniform(-0.02, 0.02)) for _ in range(20)]
def get_current_price(self, ticker):
return None
def get_daily_price(self, ticker, period="D"):
return []
def get_volume_rank(self, market="0"):
return []
def buy_stock(self, ticker, qty):
return {"success": False, "msg": "IPC mode"}
def sell_stock(self, ticker, qty):
return {"success": False, "msg": "IPC mode"}
class FakeOllama:
def __init__(self, gpu_data):
self._gpu = gpu_data if gpu_data else {
'name': 'N/A', 'temp': 0, 'vram_used': 0, 'vram_total': 0, 'load': 0
}
def get_gpu_status(self):
return self._gpu
class FakeThemeManager:
def __init__(self, themes_data):
self._themes = themes_data if themes_data else {}
def get_themes(self, ticker):
return self._themes.get(ticker, [])
class FakeWatchlistManager:
def __init__(self, watchlist_data):
self._watchlist = watchlist_data if watchlist_data else {}
def update_watchlist_daily(self):
return "Watchlist update not available in IPC mode"
return FakeBotInstance(status)
def cleanup(self):
"""리소스 정리"""
if self._shm:
try:
self._shm.close()
if self._is_creator:
self._shm.unlink()
except Exception:
pass
self._shm = None