feat(signal_v2): stock_client + 6 integration tests
httpx async client with custom retry loop (max 3, exponential 1s/2s/4s), memory dict cache (portfolio 60s / news-sentiment 300s / screener 60s), X-WebAI-Key auth header injection. Stale fallback returns last successful response with logger.warning on persistent failures. 6 integration tests pass with respx httpx mock. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
128
signal_v2/stock_client.py
Normal file
128
signal_v2/stock_client.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""Stock API HTTP client — async httpx + retry + memory cache."""
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Cache TTL by endpoint (seconds)
|
||||
_TTL = {
|
||||
"portfolio": 60.0,
|
||||
"news-sentiment": 300.0,
|
||||
"screener-preview": 60.0,
|
||||
}
|
||||
|
||||
# Retry policy
|
||||
_MAX_ATTEMPTS = 3
|
||||
_RETRY_STATUSES = {429, 500, 502, 503, 504}
|
||||
|
||||
|
||||
class StockClient:
|
||||
"""stock API wrapper. Async httpx + self-retry + memory cache."""
|
||||
|
||||
def __init__(self, base_url: str, api_key: str, timeout: float = 10.0):
|
||||
self._base_url = base_url.rstrip("/")
|
||||
self._api_key = api_key
|
||||
self._client = httpx.AsyncClient(timeout=timeout)
|
||||
# cache: key → (data, timestamp_monotonic)
|
||||
self._cache: dict[str, tuple[Any, float]] = {}
|
||||
|
||||
async def close(self) -> None:
|
||||
await self._client.aclose()
|
||||
|
||||
async def get_portfolio(self) -> dict:
|
||||
return await self._cached_request(
|
||||
"portfolio", "GET", "/api/webai/portfolio"
|
||||
)
|
||||
|
||||
async def get_news_sentiment(self, date: str | None = None) -> dict:
|
||||
path = "/api/webai/news-sentiment"
|
||||
if date is not None:
|
||||
path += f"?date={date}"
|
||||
cache_key = f"news-sentiment:{date or 'latest'}"
|
||||
return await self._cached_request(
|
||||
cache_key, "GET", path, _ttl_key="news-sentiment"
|
||||
)
|
||||
|
||||
async def run_screener_preview(
|
||||
self, weights: dict | None = None, top_n: int = 20
|
||||
) -> dict:
|
||||
body = {"mode": "preview", "top_n": top_n}
|
||||
if weights is not None:
|
||||
body["weights"] = weights
|
||||
return await self._cached_request(
|
||||
"screener-preview",
|
||||
"POST",
|
||||
"/api/stock/screener/run",
|
||||
json=body,
|
||||
_ttl_key="screener-preview",
|
||||
)
|
||||
|
||||
async def _cached_request(
|
||||
self,
|
||||
cache_key: str,
|
||||
method: str,
|
||||
path: str,
|
||||
*,
|
||||
_ttl_key: str | None = None,
|
||||
**kwargs,
|
||||
) -> dict:
|
||||
ttl_key = _ttl_key or cache_key
|
||||
ttl = _TTL.get(ttl_key, 60.0)
|
||||
# Fresh cache hit?
|
||||
if cache_key in self._cache:
|
||||
data, ts = self._cache[cache_key]
|
||||
if time.monotonic() - ts < ttl:
|
||||
return data
|
||||
|
||||
# Fetch (with retry)
|
||||
try:
|
||||
data = await self._request_with_retry(method, path, **kwargs)
|
||||
self._cache[cache_key] = (data, time.monotonic())
|
||||
return data
|
||||
except Exception:
|
||||
# Stale fallback: serve old cached value if exists
|
||||
if cache_key in self._cache:
|
||||
stale_data, stale_ts = self._cache[cache_key]
|
||||
age = time.monotonic() - stale_ts
|
||||
logger.warning(
|
||||
"serving stale cache for %s (age=%.1fs)", cache_key, age
|
||||
)
|
||||
return stale_data
|
||||
raise
|
||||
|
||||
async def _request_with_retry(self, method: str, path: str, **kwargs) -> dict:
|
||||
url = f"{self._base_url}{path}"
|
||||
headers = self._auth_headers()
|
||||
last_exc: Exception | None = None
|
||||
for attempt in range(_MAX_ATTEMPTS):
|
||||
try:
|
||||
response = await self._client.request(
|
||||
method, url, headers=headers, **kwargs
|
||||
)
|
||||
if response.status_code in _RETRY_STATUSES:
|
||||
if attempt < _MAX_ATTEMPTS - 1:
|
||||
await asyncio.sleep(2**attempt)
|
||||
continue
|
||||
response.raise_for_status()
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except httpx.TimeoutException as e:
|
||||
last_exc = e
|
||||
if attempt < _MAX_ATTEMPTS - 1:
|
||||
await asyncio.sleep(2**attempt)
|
||||
continue
|
||||
raise
|
||||
except httpx.HTTPStatusError as e:
|
||||
last_exc = e
|
||||
raise
|
||||
if last_exc is not None:
|
||||
raise last_exc
|
||||
raise RuntimeError("retry exhausted")
|
||||
|
||||
def _auth_headers(self) -> dict[str, str]:
|
||||
return {"X-WebAI-Key": self._api_key}
|
||||
Reference in New Issue
Block a user