diff --git a/docs/superpowers/specs/2026-05-16-signal-v2-phase2-web-ai-pull-worker.md b/docs/superpowers/specs/2026-05-16-signal-v2-phase2-web-ai-pull-worker.md new file mode 100644 index 0000000..0d0a89d --- /dev/null +++ b/docs/superpowers/specs/2026-05-16-signal-v2-phase2-web-ai-pull-worker.md @@ -0,0 +1,436 @@ +# Confidence Signal Pipeline V2 — Phase 2: web-ai Pull Worker Design + +**작성일**: 2026-05-16 +**작성자**: gahusb +**상태**: Approved for implementation +**선행 spec**: +- Phase 0 architecture (`2026-05-15-confidence-signal-pipeline-v2-architecture.md`) +- Phase 1 stock WebAI API (`2026-05-15-signal-v2-phase1-webai-api.md`) +- signal_v1 rename (`2026-05-16-web-ai-v1-rename-to-signal-v1.md`) — 본 spec 부터 `web-ai/signal_v1/` + `web-ai/signal_v2/` 구조 사용 + +**브레인스토밍 결정 6개**: +- 배치 = A (별도 FastAPI app `:8001`, 새 디렉토리 `web-ai/signal_v2/`) +- Scope = A (client + scheduler + rate limit DB 3 항목) +- Scheduler = B (asyncio + 자체 cron loop, FastAPI lifespan) +- HTTP client = B (httpx async + 자체 retry loop + 메모리 cache) +- Rate limit DB = A (SQLite + WAL + busy_timeout) +- Test = B (pytest + pytest-asyncio + httpx mock + tmp_path sqlite) + +--- + +## 1. 목표 + +web-ai 머신에 V2 신호 파이프라인 인프라 구축. stock NAS 와 안정적으로 통신하는 client + 시간대별 polling scheduler + 24h dedup 인프라. + +Phase 3 (Chronos-2 추론) 이 이 위에 추론 코드를 얹는다. Phase 4 (signal generator) 가 rate limit DB 를 사용. Phase 5 에서 같은 FastAPI app 에 `POST /signal` endpoint 추가. + +**Why**: Phase 0 §3 책임 분리 — "web-ai = 시점 분석". web-ai 가 NAS DB 직접 접근 안 함, 모든 데이터는 stock API 경유. Phase 1 endpoint (X-WebAI-Key 인증) 가 입력 계약 = Phase 2 의 client 가 이 위에 동작. + +--- + +## 2. 범위 + +### 포함 + +- ① **StockClient 클래스** — httpx async + 자체 retry loop (max 3, exponential backoff 1s→2s→4s) + 메모리 dict cache (TTL: portfolio 60s / news-sentiment 300s / screener 60s) + 마지막 성공 응답 stale fallback +- ② **Polling scheduler** — asyncio cron loop (FastAPI lifespan + asyncio.create_task). 시간대별 분기 (장전 5분 / 장중 1분 / 장후 5분 / 야간·휴장 skip) +- ③ **Rate limit DB** — SQLite (WAL + busy_timeout=120000) `signal_dedup` 테이블. Phase 4 가 사용 +- ④ **FastAPI app** — 새 port `:8001`. `GET /health` endpoint + startup/shutdown lifespan +- ⑤ **PollState** — process-wide singleton (portfolio/news_sentiment/screener_preview + last_updated + fetch_errors) +- ⑥ **테스트 16 케이스** (stock_client 6 + scheduler 5 + rate_limit 3 + main 2) + +### 범위 외 (NOT) + +- Chronos-2 추론, KIS WebSocket, 분봉 (Phase 3) +- Signal generator 매수/매도 룰 (Phase 4) — rate limit DB 사용은 Phase 4 +- agent-office `POST /signal` 호출 (Phase 5) +- 기존 signal_v1 (V1 자동매매) 분리/정리/deprecation (Phase 6) +- Ollama Qwen3 호스팅 (Phase 5) +- ticker filter / 운영 모니터링 메트릭 (Phase 7) +- holidays.json 자동 동기화 (backlog — 일단 stock/app/holidays.json 의 manual copy) +- 메모리 cache TTL 만료 entry 명시 cleanup (YAGNI) + +--- + +## 3. 파일 구조 + +### 3.1 신규 디렉토리: `web-ai/signal_v2/` + +``` +web-ai/signal_v2/ +├── __init__.py +├── main.py # FastAPI app + lifespan + GET /health +├── config.py # env 로딩 (STOCK_API_URL, WEBAI_API_KEY, SIGNAL_V2_PORT) +├── stock_client.py # StockClient: httpx async + retry + cache + auth header +├── scheduler.py # poll_loop, _next_interval, _is_market_day, _seconds_until_next_market_open +├── pull_worker.py # _run_polling_cycle: 3 endpoint 병렬 fetch + state 갱신 +├── rate_limit.py # SignalDedup: is_recent + record (WAL + busy_timeout) +├── state.py # PollState dataclass (process-wide singleton) +├── holidays.json # 한국 휴장일 (stock/app/holidays.json 복사) +├── start.bat # uvicorn signal_v2.main:app --port 8001 +├── data/ +│ ├── .gitkeep +│ └── signal_v2.db # SQLite (gitignore) +└── tests/ + ├── __init__.py + ├── conftest.py # pytest-asyncio + fixtures + ├── test_stock_client.py # 6 케이스 + ├── test_scheduler.py # 5 케이스 + ├── test_rate_limit.py # 3 케이스 + └── test_main.py # 2 케이스 +``` + +### 3.2 변경 매트릭스 + +| 파일 | 작업 | +|------|------| +| `web-ai/signal_v2/` 전체 | 신규 디렉토리 | +| `web-ai/.env` | 3 줄 추가: `STOCK_API_URL=https://gahusb.synology.me`, `WEBAI_API_KEY=`, `SIGNAL_V2_PORT=8001` | +| `web-ai/.gitignore` | `signal_v2/data/*.db`, `signal_v2/data/*.db-*` (WAL/SHM) 추가 | +| `web-ai/CLAUDE.md` | `signal_v2/` 섹션은 이미 signal_v1 rename slice 에서 작성됨 — 무변경 | + +### 3.3 기존 파일 무변경 + +- `web-ai/signal_v1/` 전체 (V1 자동매매) +- `web-ai/start.bat` (V1 진입) +- 다른 lab / web-backend / web-ui 영향 0 + +--- + +## 4. API 계약 + +### 4.1 `StockClient` 클래스 (signal_v2/stock_client.py) + +```python +class StockClient: + """stock API 호출 wrapper. httpx async + 자체 retry + 메모리 cache.""" + + def __init__(self, base_url: str, api_key: str, timeout: float = 10.0): + self._base_url = base_url.rstrip("/") + self._api_key = api_key + self._client = httpx.AsyncClient(timeout=timeout) + self._cache: dict[str, tuple[Any, float]] = {} + + async def get_portfolio(self) -> dict: + """GET /api/webai/portfolio. cache TTL 60s.""" + + async def get_news_sentiment(self, date: str | None = None) -> dict: + """GET /api/webai/news-sentiment. cache TTL 300s.""" + + async def run_screener_preview( + self, weights: dict | None = None, top_n: int = 20 + ) -> dict: + """POST /api/stock/screener/run {mode:'preview', ...}. cache TTL 60s.""" + + async def close(self) -> None: ... + + # internal + async def _request_with_retry(self, method, path, **kwargs) -> dict: ... + def _cache_get(self, key: str) -> Any | None: ... + def _cache_set(self, key: str, data: Any) -> None: ... + def _auth_headers(self) -> dict[str, str]: ... # {"X-WebAI-Key": self._api_key} +``` + +retry 정책: +- max_attempts = 3 +- timeout = 10s +- 429 응답: exponential backoff (1s → 2s → 4s) +- 5xx 응답: 짧은 retry (max 3회) 후 raise +- 모든 retry 실패 + cache 에 이전 성공 응답 있음 → stale fallback + `logger.warning` + +cache TTL: +- portfolio: 60s +- news-sentiment: 300s (일별 갱신이라 TTL 길어도 무방) +- screener preview: 60s + +### 4.2 FastAPI app (signal_v2/main.py) + +```python +app = FastAPI(title="Signal V2 Pull Worker", version="0.1.0") + +@app.on_event("startup") +async def startup(): + # 1. config 로드 + # 2. SignalDedup DB 초기화 + # 3. StockClient 생성 (전역 상태) + # 4. asyncio.create_task(poll_loop(...)) + +@app.on_event("shutdown") +async def shutdown(): + # 1. shutdown_event.set() → poll_loop 종료 + # 2. StockClient.close() + +@app.get("/health") +async def health() -> dict: + return { + "status": "online", + "stock_api_url": settings.stock_api_url, + "last_poll": state.last_updated, + "cache_size": len(client._cache), + } +``` + +Phase 5 이후 추가될 endpoint (본 spec 외): `POST /signal` (agent-office 호출). + +### 4.3 PollState (signal_v2/state.py) + +```python +@dataclass +class PollState: + portfolio: dict | None = None + news_sentiment: dict | None = None + screener_preview: dict | None = None + last_updated: dict[str, str] = field(default_factory=dict) + fetch_errors: dict[str, int] = field(default_factory=dict) +``` + +단일 process-wide 인스턴스 (`state.py` 모듈 변수). Phase 3 가 `from signal_v2.state import state` 로 read-only 참조. + +--- + +## 5. Scheduler 구현 + +### 5.1 polling 주기 결정 (signal_v2/scheduler.py) + +```python +KST = ZoneInfo("Asia/Seoul") +_HOLIDAYS = set(json.loads((Path(__file__).parent / "holidays.json").read_text())) + +_PRE_MARKET = (time(7, 0), time(9, 0)) # 5분 +_MARKET = (time(9, 0), time(15, 30)) # 1분 +_POST_MARKET = (time(15, 30), time(20, 0)) # 5분 +# 그 외 야간 (20:00-07:00): polling 없음 + +def _is_market_day(now: datetime) -> bool: + if now.weekday() >= 5: return False + if now.strftime("%Y-%m-%d") in _HOLIDAYS: return False + return True + +def _next_interval(now: datetime) -> float: + """다음 폴링까지 sleep 초수.""" + if not _is_market_day(now): + return _seconds_until_next_market_open(now) + t = now.time() + if _PRE_MARKET[0] <= t < _PRE_MARKET[1]: return 300 + elif _MARKET[0] <= t < _MARKET[1]: return 60 + elif _POST_MARKET[0] <= t < _POST_MARKET[1]: return 300 + else: return _seconds_until_next_market_open(now) +``` + +### 5.2 polling loop + +```python +async def poll_loop(client: StockClient, state: PollState, shutdown: asyncio.Event) -> None: + logger.info("poll_loop started") + while not shutdown.is_set(): + now = datetime.now(KST) + if _is_market_day(now) and _is_polling_window(now): + try: + await _run_polling_cycle(client, state) + except Exception: + logger.exception("poll cycle failed") + interval = _next_interval(now) + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + break + except asyncio.TimeoutError: + continue + +async def _run_polling_cycle(client: StockClient, state: PollState) -> None: + """3 endpoint 병렬 fetch + state 갱신.""" + portfolio, sentiment, screener = await asyncio.gather( + client.get_portfolio(), + client.get_news_sentiment(), + client.run_screener_preview(), + return_exceptions=True, + ) + now_iso = datetime.now(KST).isoformat() + if isinstance(portfolio, dict): + state.portfolio = portfolio + state.last_updated["portfolio"] = now_iso + state.fetch_errors["portfolio"] = 0 + elif isinstance(portfolio, Exception): + state.fetch_errors["portfolio"] = state.fetch_errors.get("portfolio", 0) + 1 + # 동일 처리 for sentiment, screener +``` + +### 5.3 holidays.json + +`stock/app/holidays.json` 의 복사본을 `signal_v2/holidays.json` 으로 manual copy. 향후 backlog: 자동 동기화 또는 shared library. + +--- + +## 6. Rate Limit DB + +### 6.1 SQLite schema (signal_v2/rate_limit.py 의 startup 시 생성) + +```sql +CREATE TABLE IF NOT EXISTS signal_dedup ( + ticker TEXT NOT NULL, + action TEXT NOT NULL, -- 'buy' or 'sell' + last_sent TEXT NOT NULL, -- ISO timestamp KST + confidence REAL NOT NULL, + PRIMARY KEY (ticker, action) +); +CREATE INDEX IF NOT EXISTS idx_signal_dedup_last_sent ON signal_dedup(last_sent); +``` + +### 6.2 `SignalDedup` 클래스 + +```python +class SignalDedup: + """Phase 4 signal generator 가 사용. WAL + busy_timeout=120000.""" + + def __init__(self, db_path: Path): ... + + def _conn(self) -> sqlite3.Connection: + conn = sqlite3.connect(self._db_path, timeout=120.0) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=120000") + return conn + + def _init_schema(self) -> None: ... + + def is_recent(self, ticker: str, action: str, within_hours: int = 24) -> bool: + """True 면 24h 내 발송됨 → silent.""" + + def record(self, ticker: str, action: str, confidence: float) -> None: + """발송 직후 호출. PK 충돌 시 last_sent 갱신 (UPSERT).""" +``` + +Phase 2 에서는 인프라만 구축. Phase 4 가 매수/매도 결정 직전 `is_recent()` 체크 + 발송 후 `record()` 호출. + +--- + +## 7. 테스트 + +### 7.1 `test_stock_client.py` (6 케이스) + +| 케이스 | 검증 | +|--------|------| +| `test_get_portfolio_normal_returns_dict_with_pnl_pct` | 정상 200 + 응답 파싱 + cache 저장 | +| `test_get_portfolio_uses_cache_within_ttl` | 첫 호출 후 60s 내 두번째 호출 = mock httpx 콜 1회 | +| `test_get_portfolio_refetches_after_ttl_expiry` | frozen_time 으로 60s+1 진행 후 mock httpx 콜 2회 | +| `test_get_portfolio_retries_3_times_on_timeout` | mock 이 처음 2회 timeout → 3회차 200 → exponential sleep 검증 | +| `test_get_portfolio_429_triggers_backoff` | 429 응답 → 1s sleep → 재시도 → 200 | +| `test_get_portfolio_falls_back_to_stale_on_all_failures` | cache 에 이전 성공 + 모든 retry 5xx → stale 반환 + logger.warning | + +### 7.2 `test_scheduler.py` (5 케이스) + +| 케이스 | 검증 | +|--------|------| +| `test_next_interval_pre_market_5min` | now=08:30 평일 → 300 | +| `test_next_interval_market_open_1min` | now=10:00 평일 → 60 | +| `test_next_interval_post_market_5min` | now=17:00 평일 → 300 | +| `test_next_interval_overnight_skip_to_next_morning` | now=22:00 평일 → 다음날 07:00 까지 | +| `test_next_interval_holiday_skip` | now=2026-08-15 (공휴일) → 다음 영업일 07:00 까지 | + +### 7.3 `test_rate_limit.py` (3 케이스) + +| 케이스 | 검증 | +|--------|------| +| `test_is_recent_returns_false_for_new_ticker_action` | record 없음 → False | +| `test_is_recent_returns_true_within_24h` | record 호출 1초 후 → True | +| `test_is_recent_returns_false_after_24h` | record + 24h 1분 후 → False | + +### 7.4 `test_main.py` (2 케이스) + +| 케이스 | 검증 | +|--------|------| +| `test_health_endpoint_returns_status_online` | TestClient → GET /health → 200 + status online | +| `test_startup_warns_if_webai_api_key_missing` | env 미설정 + startup → logger.warning | + +**총 16 신규 테스트**. 외부 stock 호출 0 (전부 mock). + +### 7.5 conftest.py + +```python +import pytest +from pathlib import Path +import respx + +@pytest.fixture +def tmp_dedup_db(tmp_path) -> Path: + return tmp_path / "test_signal_v2.db" + +@pytest.fixture +async def mock_stock_api(): + async with respx.mock(base_url="https://test.stock.local") as mock: + yield mock + +@pytest.fixture +def frozen_now(monkeypatch): + """datetime.now(KST) 고정용 (freezegun 또는 monkeypatch).""" +``` + +pytest-asyncio mode = "auto" — `pyproject.toml` 또는 `pytest.ini` 에 명시. + +--- + +## 8. 위험 및 완화 + +| 위험 | 완화 | +|------|------| +| stock API 응답 지연 (NAS 부하 / 네트워크) | timeout 10s + retry 3회 + cache fallback (stale) | +| `.env` 의 WEBAI_API_KEY 미설정 → 모든 호출 401 | startup ERROR log + Phase 1 의 503 응답 fallback 활용 | +| Polling cycle 중 web-ai 종료 | shutdown.wait timeout 으로 즉시 break, asyncio cleanup | +| holidays.json 미동기화 → 휴일 폴링 시도 | stock 측 응답 정상 (데이터 stale). Phase 7 모니터링 | +| SQLite WAL lock (Phase 4 가 signal generator 동시 write) | busy_timeout=120000 + WAL → reader/writer 분리. Phase 4 단일 writer 직렬 보장 | +| 메모리 cache 누수 (장기 운영) | TTL 만료 entry 명시 cleanup 없음 (YAGNI). Phase 7 모니터링 | +| signal_v1 (port 8000) ↔ signal_v2 (port 8001) 충돌 | 다른 port. 같은 머신에서 동시 가동 가능 | +| 시간대 (KST) 계산 오류 (DST) | KST 는 DST 없음 (Asia/Seoul +09:00 고정). 안전 | +| asyncio + sqlite3 (sync) 혼합 | rate_limit 호출은 짧음. Phase 4 의 호출 패턴 결정 시 점검 | +| Phase 1 rate limit (60r/m) 초과 | polling 빈도 분당 3 → 20x 여유. 정상 동작 시 무관 | + +--- + +## 9. 운영 영향 + +| 항목 | 영향 | +|------|------| +| 다운타임 | 0 (V1 영향 없음, V2 신규 시작) | +| 사용자 영향 | 없음 (V2 silent, Phase 5 까지 신호 발송 없음) | +| `.env` 갱신 | 사용자 1회 (`WEBAI_API_KEY`, `STOCK_API_URL`, `SIGNAL_V2_PORT`) | +| V1 영향 | 0 (별도 process / port / 디렉토리) | +| stock NAS 부하 | 매우 작음 (장중 분당 3 call) | +| 외부 의존성 추가 | `httpx`, `pytest-asyncio`, `respx` | + +--- + +## 10. Phase 2 완료 조건 (DoD) + +- [ ] `web-ai/signal_v2/` 디렉토리 + 7 파이썬 파일 (main.py / config.py / stock_client.py / scheduler.py / pull_worker.py / rate_limit.py / state.py + __init__.py) +- [ ] `holidays.json` 복사 +- [ ] `tests/` 디렉토리 + conftest.py + 4 test 파일 + 16 케이스 모두 PASS +- [ ] `python -m uvicorn signal_v2.main:app --port 8001` 정상 시작 + `GET http://localhost:8001/health` 200 +- [ ] 1 회 polling cycle 완료 → `state.portfolio` + `state.news_sentiment` + `state.screener_preview` 갱신 확인 (수동 trigger 또는 첫 자연 cycle) +- [ ] rate_limit DB 파일 생성 + WAL + busy_timeout 설정 확인 +- [ ] `.env` 갱신 (사용자 1회): `STOCK_API_URL=https://gahusb.synology.me`, `WEBAI_API_KEY=`, `SIGNAL_V2_PORT=8001` +- [ ] web-ai V1 봇 무영향 검증 (`start.bat` 정상 시작) +- [ ] git push (web-ai repo) + +--- + +## 11. Phase 3 와의 관계 + +본 Phase 2 완료 후 즉시 **Phase 3 (KIS WebSocket + 분봉 + Chronos-2 추론)** spec → plan → 구현. 의존성: + +``` +[Phase 2 spec/plan/실행] → [Phase 3 spec/plan/실행] + 2주 2주 +``` + +Phase 3 의 입력 계약 = 본 spec 의 `PollState` (Phase 3 코드가 read-only 로 import). Phase 3 의 추론 결과 (Chronos-2 quantile 등) 는 별도 state 객체 또는 PollState 확장 — Phase 3 spec 에서 결정. + +--- + +## 12. Backlog (본 spec NOT) + +- ticker filter (news-sentiment `?tickers=` 옵션 활용) — V2 운영 후 종목 필터 시 +- 운영 메트릭 (응답시간 / 에러율 / 텔레그램 alert) — Phase 7 +- holidays.json 자동 동기화 (stock → web-ai) +- cache 만료 entry 명시 cleanup (장기 운영 시 메모리 누수 발견 시) +- Phase 5 `POST /signal` endpoint (agent-office 호출) — Phase 5 spec +- WebSocket-based polling (현재 HTTP polling, 향후 stock 측이 WebSocket push 도입 시) +- Phase 6 signal_v1 deprecation (V1 자동매매 정리) +- Phase 4 가 rate_limit 호출 시 asyncio.to_thread vs 직접 호출 결정