""" 프로세스 간 통신 (IPC) - Shared Memory 기반 텔레그램 봇과 메인 봇 간 데이터 공유 + 양방향 명령 채널 """ import json import time import struct from multiprocessing.shared_memory import SharedMemory from modules.config import Config class SharedIPC: """Shared Memory + Command Queue 기반 IPC""" def __init__(self, lock=None, command_queue=None): self.lock = lock self.command_queue = command_queue self._shm = None self._is_creator = False def _ensure_shm(self): """SharedMemory 블록에 연결 (없으면 생성)""" if self._shm is not None: return self._shm try: self._shm = SharedMemory(name=Config.SHM_NAME, create=False) except FileNotFoundError: self._shm = SharedMemory(name=Config.SHM_NAME, create=True, size=Config.SHM_SIZE) self._is_creator = True # 초기화: 길이 필드를 0으로 설정 struct.pack_into('I', self._shm.buf, 0, 0) return self._shm def write_status(self, data): """메인 봇이 상태를 shared memory에 기록""" try: shm = self._ensure_shm() payload = json.dumps({ 'timestamp': time.time(), 'data': data }, ensure_ascii=False).encode('utf-8') if len(payload) + 4 > Config.SHM_SIZE: print(f"[IPC] Data too large: {len(payload)} bytes") return if self.lock: self.lock.acquire() try: # [4바이트 길이][JSON 페이로드] struct.pack_into('I', shm.buf, 0, len(payload)) shm.buf[4:4 + len(payload)] = payload finally: if self.lock: self.lock.release() except Exception as e: print(f"[IPC] Write failed: {e}") def read_status(self): """텔레그램 봇이 상태를 shared memory에서 읽기""" raw = None try: shm = self._ensure_shm() if self.lock: self.lock.acquire() try: length = struct.unpack_from('I', shm.buf, 0)[0] if length > 0 and length <= Config.SHM_SIZE - 4: raw = bytes(shm.buf[4:4 + length]) finally: if self.lock: self.lock.release() if not raw: return None ipc_data = json.loads(raw.decode('utf-8')) age = time.time() - ipc_data.get('timestamp', 0) if age > Config.IPC_STALENESS: print(f"[IPC] Data too old: {age:.1f}s") return None return ipc_data.get('data') except Exception as e: print(f"[IPC] Read failed: {e}") return None # --- 명령 채널 (텔레그램 → 메인 봇) --- def send_command(self, command, **kwargs): """텔레그램 → 메인 봇 명령 전송""" if self.command_queue: try: self.command_queue.put_nowait({ 'command': command, 'timestamp': time.time(), **kwargs }) return True except Exception as e: print(f"[IPC] Command send failed: {e}") return False def poll_commands(self): """메인 봇이 명령 큐를 폴링""" commands = [] if self.command_queue: try: while not self.command_queue.empty(): cmd = self.command_queue.get_nowait() commands.append(cmd) except Exception: pass return commands # --- FakeBot 인스턴스 (호환성 유지) --- def get_bot_instance_data(self): """봇 인스턴스 데이터 가져오기 (텔레그램 봇용)""" status = self.read_status() if not status: return None class FakeBotInstance: def __init__(self, data): self.kis = FakeKIS(data.get('balance', {}), data.get('macro_indices', {})) self.ollama_monitor = FakeOllama(data.get('gpu', {})) self.theme_manager = FakeThemeManager(data.get('themes', {})) self.discovered_stocks = set(data.get('discovered_stocks', [])) self.is_macro_warning_sent = data.get('is_macro_warning', False) self.watchlist_manager = FakeWatchlistManager(data.get('watchlist', {})) self.load_watchlist = lambda: data.get('watchlist', {}) class FakeKIS: def __init__(self, balance_data, macro_indices): self._balance = balance_data if balance_data else { 'total_eval': 0, 'deposit': 0, 'holdings': [] } self._macro_indices = macro_indices if macro_indices else {} def get_balance(self): return self._balance def get_current_index(self, ticker): if ticker in self._macro_indices: return self._macro_indices[ticker] return {'price': 2500.0, 'change': 0.0} def get_daily_index_price(self, ticker, period="D"): base_price = 2500.0 if ticker in self._macro_indices: base_price = self._macro_indices[ticker].get('price', 2500.0) import random return [base_price * (1 + random.uniform(-0.02, 0.02)) for _ in range(20)] def get_current_price(self, ticker): return None def get_daily_price(self, ticker, period="D"): return [] def get_volume_rank(self, market="0"): return [] def buy_stock(self, ticker, qty): return {"success": False, "msg": "IPC mode"} def sell_stock(self, ticker, qty): return {"success": False, "msg": "IPC mode"} class FakeOllama: def __init__(self, gpu_data): self._gpu = gpu_data if gpu_data else { 'name': 'N/A', 'temp': 0, 'vram_used': 0, 'vram_total': 0, 'load': 0 } def get_gpu_status(self): return self._gpu class FakeThemeManager: def __init__(self, themes_data): self._themes = themes_data if themes_data else {} def get_themes(self, ticker): return self._themes.get(ticker, []) class FakeWatchlistManager: def __init__(self, watchlist_data): self._watchlist = watchlist_data if watchlist_data else {} def update_watchlist_daily(self): return "Watchlist update not available in IPC mode" return FakeBotInstance(status) def cleanup(self): """리소스 정리""" if self._shm: try: self._shm.close() if self._is_creator: self._shm.unlink() except Exception: pass self._shm = None