import requests import json import time import os from datetime import datetime, timedelta from modules.config import Config class KISClient: """ 한국투자증권 (Korea Investment & Securities) REST API Client """ def __init__(self, is_virtual=None): # Config에서 설정 로드 self.app_key = Config.KIS_APP_KEY self.app_secret = Config.KIS_APP_SECRET self.cano = Config.KIS_ACCOUNT[:8] self.acnt_prdt_cd = Config.KIS_ACCOUNT[-2:] # "01" 등 # 가상/실전 모드 설정 if is_virtual is None: self.is_virtual = Config.KIS_IS_VIRTUAL else: self.is_virtual = is_virtual self.base_url = Config.KIS_BASE_URL self.access_token = None self.token_expired = None self.last_req_time = 0 # 토큰 파일 경로 (영구 저장용) self.token_file = os.path.join(Config.DATA_DIR, "kis_token.json") self.load_token() # 초기화 시 토큰 로드 시도 def _safe_int(self, val): """안전한 int 변환""" try: if not val: return 0 return int(str(val).strip()) except: return 0 def _throttle(self): """API 요청 속도 제한 (초당 2회 이하로 제한)""" # 모의투자는 Rate Limit이 매우 엄격함 (초당 2~3회 권장) min_interval = 0.5 # 0.5초 대기 (초당 2회) now = time.time() elapsed = now - self.last_req_time if elapsed < min_interval: time.sleep(min_interval - elapsed) self.last_req_time = time.time() def load_token(self): """파일에서 토큰 로드""" if os.path.exists(self.token_file): try: with open(self.token_file, "r", encoding="utf-8") as f: data = json.load(f) # 만료 시간 체크 expire_str = data.get("expired_at") if expire_str: expire_dt = datetime.strptime(expire_str, "%Y-%m-%d %H:%M:%S") if datetime.now() < expire_dt: self.access_token = data.get("access_token") self.token_expired = expire_dt print(f"📂 [KIS] Saved Token Loaded (Expires: {expire_str})") except Exception as e: print(f"⚠️ Failed to load token file: {e}") def save_token(self): """토큰 파일 저장""" if not self.access_token or not self.token_expired: return try: data = { "access_token": self.access_token, "expired_at": self.token_expired.strftime("%Y-%m-%d %H:%M:%S") } with open(self.token_file, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) except Exception as e: print(f"⚠️ Failed to save token file: {e}") def _get_headers(self, tr_id=None): """공통 헤더 생성""" headers = { "Content-Type": "application/json; charset=utf-8", "authorization": f"Bearer {self.access_token}", "appkey": self.app_key, "appsecret": self.app_secret, } if tr_id: headers["tr_id"] = tr_id return headers def ensure_token(self, force=False): """접근 토큰 발급 (OAuth 2.0) 및 유효성 관리""" # 토큰이 있고, 만료 시간이 아직 안 지났으면 재사용 if not force and self.access_token and self.token_expired: if datetime.now() < self.token_expired: return # 앱키 확인 if not self.app_key or not self.app_secret: print("❌ [KIS] App Key or Secret is missing!") return url = f"{self.base_url}/oauth2/tokenP" payload = { "grant_type": "client_credentials", "appkey": self.app_key, "appsecret": self.app_secret } try: print(f"🔑 [KIS] 토큰 발급 요청: {url}") res = requests.post(url, json=payload) res.raise_for_status() data = res.json() self.access_token = data.get('access_token') # 만료 시간 설정 expires_in = int(data.get('expires_in', 86400)) self.token_expired = datetime.now() + timedelta(seconds=expires_in - 60) # 파일 저장 self.save_token() print(f"✅ [KIS] 토큰 발급 성공 (만료: {self.token_expired.strftime('%Y-%m-%d %H:%M:%S')})") except Exception as e: # 1분 제한 에러 핸들링 (EGW00133) retry = False if isinstance(e, requests.exceptions.RequestException) and e.response is not None: err_text = e.response.text print(f"📄 [KIS Error]: {err_text}") if "EGW00133" in err_text: print("⏳ [KIS] Rate Limit Hit (1 min). Waiting 65s...") time.sleep(65) # 1분 대기 retry = True if retry: # 재귀 호출 (한 번만) self.ensure_token() return print(f"❌ [KIS] 토큰 발급 실패: {e}") self.access_token = None raise e def get_hash_key(self, datas): """주문 시 필요한 Hash Key 생성 (Koreainvestment header 특화)""" url = f"{self.base_url}/uapi/hashkey" headers = { "content-type": "application/json; charset=utf-8", "appkey": self.app_key, "appsecret": self.app_secret } try: res = requests.post(url, headers=headers, json=datas) return res.json()["HASH"] except Exception as e: print(f"❌ Hash Key 생성 실패: {e}") return None def _request_api(self, method, endpoint, tr_id, params=None, data=None, use_hash=False): """API 요청 공통 핸들러 (토큰 만료 시 자동 갱신)""" self._throttle() self.ensure_token() url = f"{self.base_url}/{endpoint}" headers = self._get_headers(tr_id) if use_hash and data: hash_key = self.get_hash_key(data) if hash_key: headers["hashkey"] = hash_key try: if method == "GET": res = requests.get(url, headers=headers, params=params) else: res = requests.post(url, headers=headers, json=data) # 토큰 만료 체크 (500 에러 or msg_cd 확인) is_token_error = False try: # KIS는 토큰 만료 시 500을 주거나 200/403 등과 함께 msg_cd로 알려줌 if res.status_code == 500 or res.status_code == 401 or res.status_code == 403: err_data = res.json() # EGW00121: 유효하지 않은 토큰, EGW00123: 만료된 토큰 if err_data.get('msg_cd') in ['EGW00121', 'EGW00123']: is_token_error = True except: pass if is_token_error: print("🔄 [KIS] Token expired (caught). Refreshing...") self.ensure_token(force=True) headers = self._get_headers(tr_id) if use_hash and data and "hashkey" in headers: pass # Hash 재활용 if method == "GET": res = requests.get(url, headers=headers, params=params) else: res = requests.post(url, headers=headers, json=data) res.raise_for_status() return res.json() except Exception as e: print(f"❌ [KIS] API Request Failed: {url} | {e}") if isinstance(e, requests.exceptions.RequestException) and e.response is not None: print(f"📄 [KIS Error Body]: {e.response.text}") raise e def get_balance(self): """주식 잔고 조회""" tr_id = "VTTC8434R" if self.is_virtual else "TTTC8434R" endpoint = "uapi/domestic-stock/v1/trading/inquire-balance" # 쿼리 파라미터 params = { "CANO": self.cano, "ACNT_PRDT_CD": self.acnt_prdt_cd, "AFHR_FLPR_YN": "N", "OFL_YN": "", "INQR_DVSN": "02", "UNPR_DVSN": "01", "FUND_STTL_ICLD_YN": "N", "FNCG_AMT_AUTO_RDPT_YN": "N", "PRCS_DVSN": "00", "CTX_AREA_FK100": "", "CTX_AREA_NK100": "" } try: data = self._request_api("GET", endpoint, tr_id, params=params) # 응답 정리 if data['rt_cd'] != '0': return {"error": data['msg1']} holdings = [] for item in data['output1']: if int(item['hldg_qty']) > 0: holdings.append({ "code": item['pdno'], "name": item['prdt_name'], "qty": int(item['hldg_qty']), "yield": float(item['evlu_pfls_rt']), "purchase_price": float(item['pchs_avg_pric']), # 매입평균가 "current_price": float(item['prpr']), # 현재가 "profit_loss": int(item['evlu_pfls_amt']) # 평가손익 }) summary = data['output2'][0] return { "holdings": holdings, "total_eval": int(summary['tot_evlu_amt']), "deposit": int(summary['dnca_tot_amt']) } except Exception as e: return {"error": str(e)} def order(self, ticker, qty, buy_sell, price=0): """주문 (시장가) buy_sell: 'BUY' or 'SELL' """ self._throttle() self.ensure_token() # 모의투자/실전 TR ID 구분 # 매수: VTTC0802U / TTTC0802U # 매도: VTTC0801U / TTTC0801U if buy_sell == 'BUY': tr_id = "VTTC0802U" if self.is_virtual else "TTTC0802U" else: tr_id = "VTTC0801U" if self.is_virtual else "TTTC0801U" url = f"{self.base_url}/uapi/domestic-stock/v1/trading/order-cash" # 주문 파라미터 datas = { "CANO": self.cano, "ACNT_PRDT_CD": self.acnt_prdt_cd, "PDNO": ticker, "ORD_DVSN": "01", # 01: 시장가 "ORD_QTY": str(qty), "ORD_UNPR": "0" # 시장가는 0 } # 헤더 준비 headers = self._get_headers(tr_id=tr_id) # [중요] POST 요청(주문 등) 시 Hash Key 필수 # 단, 모의투자의 경우 일부 상황에서 생략 가능할 수 있으나, 정석대로 포함 hash_key = self.get_hash_key(datas) if hash_key: headers["hashkey"] = hash_key else: print("⚠️ [KIS] Hash Key 생성 실패 (주문 전송 시도)") try: print(f"📤 [KIS] 주문 전송: {buy_sell} {ticker} {qty}ea (시장가)") res = requests.post(url, headers=headers, json=datas) res.raise_for_status() data = res.json() print(f"📥 [KIS] 주문 응답 코드(rt_cd): {data['rt_cd']}") print(f"📥 [KIS] 주문 응답 메시지(msg1): {data['msg1']}") if data['rt_cd'] != '0': return {"status": False, "msg": data['msg1'], "rt_cd": data['rt_cd']} return {"status": True, "msg": "주문 전송 완료", "order_no": data['output']['ODNO'], "rt_cd": data['rt_cd']} except Exception as e: return {"status": False, "msg": str(e), "rt_cd": "EXCEPTION"} def get_current_price(self, ticker): """현재가 조회""" self._throttle() self.ensure_token() url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price" headers = self._get_headers(tr_id="FHKST01010100") params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker } try: res = requests.get(url, headers=headers, params=params) res.raise_for_status() data = res.json() if data['rt_cd'] != '0': return None return int(data['output']['stck_prpr']) # 현재가 except Exception as e: print(f"❌ 현재가 조회 실패: {e}") return None def get_daily_price(self, ticker, period="D"): """일별 시세 조회 (기술적 분석용)""" self._throttle() self.ensure_token() url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-price" headers = self._get_headers(tr_id="FHKST01010400") params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker, "FID_PERIOD_DIV_CODE": period, "FID_ORG_ADJ_PRC": "1" # 수정주가 } try: res = requests.get(url, headers=headers, params=params) res.raise_for_status() data = res.json() if data['rt_cd'] != '0': return [] # 과거 데이터부터 오도록 정렬 필요할 수 있음 (API는 최신순) # output 리스트: [ {stck_clpr: 종가, ...}, ... ] prices = [int(item['stck_clpr']) for item in data['output']] prices.reverse() # 과거 -> 현재 순으로 정렬 return prices except Exception as e: print(f"❌ 일별 시세 조회 실패: {e}") return [] def get_volume_rank(self, limit=5): """거래량 상위 종목 조회""" self._throttle() self.ensure_token() url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/volume-rank" headers = self._get_headers(tr_id="FHPST01710000") params = { "FID_COND_MRKT_DIV_CODE": "J", # 주식, ETF, ETN 전체 "FID_COND_SCR_RSLT_GD_CD": "20171", # 전체 "FID_INPUT_ISCD": "0000", # 전체 "FID_DIV_CLS_CODE": "0", # 0: 전체 "FID_BLNG_CLS_CODE": "0", # 0: 전체 "FID_TRGT_CLS_CODE": "111111111", # 필터링 조건 (이대로 두면 됨) "FID_TRGT_EXCLS_CLS_CODE": "0000000000", # 제외 조건 "FID_INPUT_PRICE_1": "", "FID_INPUT_PRICE_2": "", "FID_VOL_CNT": "", "FID_INPUT_DATE_1": "" } try: res = requests.get(url, headers=headers, params=params) res.raise_for_status() data = res.json() if data['rt_cd'] != '0': return [] results = [] for item in data['output'][:limit]: # 코드는 shtn_iscd, 이름은 hts_kor_isnm results.append({ "code": item['mksc_shrn_iscd'], "name": item['hts_kor_isnm'], "volume": int(item['acml_vol']), "price": int(item['stck_prpr']) }) return results except Exception as e: print(f"❌ 거래량 순위 조회 실패: {e}") return [] def buy_stock(self, ticker, qty): return self.order(ticker, qty, 'BUY') def get_current_index(self, ticker): """지수 현재가 조회 (업종/지수) ticker: 0001 (KOSPI), 1001 (KOSDAQ), etc. """ endpoint = "uapi/domestic-stock/v1/quotations/inquire-index-price" params = { "FID_COND_MRKT_DIV_CODE": "U", # U: 업종/지수 "FID_INPUT_ISCD": ticker } try: data = self._request_api("GET", endpoint, "FHKUP03500100", params=params) if data['rt_cd'] != '0': return None return { "price": float(data['output']['bstp_nmix_prpr']), # 현재지수 "change": float(data['output']['bstp_nmix_prdy_ctrt']) # 등락률(%) } except Exception as e: print(f"❌ 지수 조회 실패({ticker}): {e}") return None def sell_stock(self, ticker, qty): return self.order(ticker, qty, 'SELL') def get_daily_index_price(self, ticker, period="D"): """지수 일별 시세 조회 (Market Stress Index용)""" endpoint = "uapi/domestic-stock/v1/quotations/inquire-daily-indexchartprice" # 날짜 계산 (최근 100일) end_dt = datetime.now().strftime("%Y%m%d") start_dt = (datetime.now() - timedelta(days=100)).strftime("%Y%m%d") params = { "FID_COND_MRKT_DIV_CODE": "U", # U: 업종/지수 "FID_INPUT_ISCD": ticker, "FID_INPUT_DATE_1": start_dt, # 시작일 "FID_INPUT_DATE_2": end_dt, # 종료일 "FID_PERIOD_DIV_CODE": period, "FID_ORG_ADJ_PRC": "0" # 수정주가 반영 여부 } try: data = self._request_api("GET", endpoint, "FHKUP03500200", params=params) if data['rt_cd'] != '0': return [] # output 리스트: [ {bstp_nmix_prpr: 지수, ...}, ... ] prices = [float(item['bstp_nmix_prpr']) for item in data['output']] prices.reverse() # 과거 -> 현재 return prices except Exception as e: print(f"❌ 지수 일별 시세 조회 실패({ticker}): {e}") return [] def get_investor_trend(self, ticker): """종목별 투자자(외인/기관) 매매동향 조회""" self._throttle() self.ensure_token() url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-investor" headers = self._get_headers(tr_id="FHKST01010900") params = { "FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker } try: res = requests.get(url, headers=headers, params=params) res.raise_for_status() data = res.json() if data['rt_cd'] != '0': return None # output 리스트: [ {stck_bsop_date: 날짜, frgn_ntby_qty: 외인순매수, orgn_ntby_qty: 기관순매수, ...}, ... ] trends = [] for item in data['output'][:5]: # 최근 5일치만 trends.append({ "date": item['stck_bsop_date'], "foreigner": self._safe_int(item.get('frgn_ntby_qty')), # 외인 순매수량 "institutional": self._safe_int(item.get('orgn_ntby_qty')), # 기관 순매수량 "price_change": float(item['prdy_vrss']) # 전일대비 등락금액 }) # 최근일이 0번 인덱스임 return trends except Exception as e: print(f"❌ 투자자 동향 조회 실패({ticker}): {e}") return None