"""KIS REST client — 자체 OAuth 토큰(TM_KIS_*) + quote + 일봉 + throttle.""" from __future__ import annotations import asyncio import logging import time from datetime import datetime, timedelta from zoneinfo import ZoneInfo import httpx logger = logging.getLogger(__name__) KST = ZoneInfo("Asia/Seoul") _MAX_ATTEMPTS = 3 _THROTTLE_INTERVAL = 0.5 # 초당 2회 _TOKEN_MARGIN = 600 # 만료 10분 전 재발급 class KISClient: def __init__(self, app_key, app_secret, account, is_virtual, timeout: float = 10.0): self._app_key = app_key self._app_secret = app_secret self._account = account self._base_url = ( "https://openapivts.koreainvestment.com:29443" if is_virtual else "https://openapi.koreainvestment.com:9443" ) self._client = httpx.AsyncClient(timeout=timeout) self._token: str | None = None self._token_exp: float = 0.0 self._last_throttle_at = 0.0 self._throttle_lock = asyncio.Lock() self._token_lock = asyncio.Lock() async def close(self) -> None: await self._client.aclose() async def _issue_token(self) -> str: async with self._token_lock: now = time.time() if self._token and now < self._token_exp - _TOKEN_MARGIN: return self._token r = await self._client.post( f"{self._base_url}/oauth2/tokenP", json={"grant_type": "client_credentials", "appkey": self._app_key, "appsecret": self._app_secret}, ) r.raise_for_status() data = r.json() self._token = data["access_token"] self._token_exp = now + int(data.get("expires_in", 86400)) return self._token async def _throttle(self) -> None: async with self._throttle_lock: elapsed = time.monotonic() - self._last_throttle_at if elapsed < _THROTTLE_INTERVAL: await asyncio.sleep(_THROTTLE_INTERVAL - elapsed) self._last_throttle_at = time.monotonic() async def _request(self, method: str, path: str, tr_id: str, **kwargs) -> dict: token = await self._issue_token() headers = { "authorization": f"Bearer {token}", "appkey": self._app_key, "appsecret": self._app_secret, "tr_id": tr_id, "custtype": "P", } url = f"{self._base_url}{path}" for attempt in range(_MAX_ATTEMPTS): await self._throttle() try: resp = await self._client.request(method, url, headers=headers, **kwargs) if resp.status_code == 429 and attempt < _MAX_ATTEMPTS - 1: await asyncio.sleep(2 ** attempt) continue resp.raise_for_status() return resp.json() except httpx.TimeoutException: if attempt < _MAX_ATTEMPTS - 1: await asyncio.sleep(2 ** attempt) continue raise raise RuntimeError("retry exhausted") async def get_quote(self, ticker: str) -> dict: raw = await self._request( "GET", "/uapi/domestic-stock/v1/quotations/inquire-price", tr_id="FHKST01010100", params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker}, ) o = raw.get("output", {}) return { "price": int(o["stck_prpr"]), "day_open": int(o["stck_oprc"]), "today_volume": int(o["acml_vol"]), "as_of": datetime.now(KST).isoformat(), } async def get_daily_ohlcv(self, ticker: str, days: int = 250) -> list[dict]: today = datetime.now(KST).strftime("%Y%m%d") start = (datetime.now(KST) - timedelta(days=days * 2)).strftime("%Y%m%d") raw = await self._request( "GET", "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice", tr_id="FHKST03010100", params={"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": ticker, "FID_INPUT_DATE_1": start, "FID_INPUT_DATE_2": today, "FID_PERIOD_DIV_CODE": "D", "FID_ORG_ADJ_PRC": "1"}, ) bars = [] for row in raw.get("output2", []): try: d = row["stck_bsop_date"] bars.append({ "datetime": f"{d[:4]}-{d[4:6]}-{d[6:]}", "open": int(row["stck_oprc"]), "high": int(row["stck_hgpr"]), "low": int(row["stck_lwpr"]), "close": int(row["stck_clpr"]), "volume": int(row["acml_vol"]), }) except (KeyError, ValueError): continue bars.reverse() return bars[-days:]