Files
web-page/docs/superpowers/specs/2026-05-16-signal-v2-phase2-web-ai-pull-worker.md
gahusb b9dabd07e0 docs(signal-v2): Phase 2 web-ai pull worker spec
stock pull worker + asyncio scheduler + rate limit SQLite + FastAPI
app (:8001). 16 tests planned. brainstorming 6 decisions:
batch=A(separate FastAPI :8001) / scope=A(3 items) / scheduler=B(asyncio
cron) / http=B(httpx + custom retry + memory cache) / rate-limit=A(SQLite
WAL) / test=B(pytest-asyncio + httpx mock).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 03:14:45 +09:00

17 KiB

Confidence Signal Pipeline V2 — Phase 2: web-ai Pull Worker Design

작성일: 2026-05-16 작성자: gahusb 상태: Approved for implementation 선행 spec:

  • Phase 0 architecture (2026-05-15-confidence-signal-pipeline-v2-architecture.md)
  • Phase 1 stock WebAI API (2026-05-15-signal-v2-phase1-webai-api.md)
  • signal_v1 rename (2026-05-16-web-ai-v1-rename-to-signal-v1.md) — 본 spec 부터 web-ai/signal_v1/ + web-ai/signal_v2/ 구조 사용

브레인스토밍 결정 6개:

  • 배치 = A (별도 FastAPI app :8001, 새 디렉토리 web-ai/signal_v2/)
  • Scope = A (client + scheduler + rate limit DB 3 항목)
  • Scheduler = B (asyncio + 자체 cron loop, FastAPI lifespan)
  • HTTP client = B (httpx async + 자체 retry loop + 메모리 cache)
  • Rate limit DB = A (SQLite + WAL + busy_timeout)
  • Test = B (pytest + pytest-asyncio + httpx mock + tmp_path sqlite)

1. 목표

web-ai 머신에 V2 신호 파이프라인 인프라 구축. stock NAS 와 안정적으로 통신하는 client + 시간대별 polling scheduler + 24h dedup 인프라.

Phase 3 (Chronos-2 추론) 이 이 위에 추론 코드를 얹는다. Phase 4 (signal generator) 가 rate limit DB 를 사용. Phase 5 에서 같은 FastAPI app 에 POST /signal endpoint 추가.

Why: Phase 0 §3 책임 분리 — "web-ai = 시점 분석". web-ai 가 NAS DB 직접 접근 안 함, 모든 데이터는 stock API 경유. Phase 1 endpoint (X-WebAI-Key 인증) 가 입력 계약 = Phase 2 의 client 가 이 위에 동작.


2. 범위

포함

  • StockClient 클래스 — httpx async + 자체 retry loop (max 3, exponential backoff 1s→2s→4s) + 메모리 dict cache (TTL: portfolio 60s / news-sentiment 300s / screener 60s) + 마지막 성공 응답 stale fallback
  • Polling scheduler — asyncio cron loop (FastAPI lifespan + asyncio.create_task). 시간대별 분기 (장전 5분 / 장중 1분 / 장후 5분 / 야간·휴장 skip)
  • Rate limit DB — SQLite (WAL + busy_timeout=120000) signal_dedup 테이블. Phase 4 가 사용
  • FastAPI app — 새 port :8001. GET /health endpoint + startup/shutdown lifespan
  • PollState — process-wide singleton (portfolio/news_sentiment/screener_preview + last_updated + fetch_errors)
  • 테스트 16 케이스 (stock_client 6 + scheduler 5 + rate_limit 3 + main 2)

범위 외 (NOT)

  • Chronos-2 추론, KIS WebSocket, 분봉 (Phase 3)
  • Signal generator 매수/매도 룰 (Phase 4) — rate limit DB 사용은 Phase 4
  • agent-office POST /signal 호출 (Phase 5)
  • 기존 signal_v1 (V1 자동매매) 분리/정리/deprecation (Phase 6)
  • Ollama Qwen3 호스팅 (Phase 5)
  • ticker filter / 운영 모니터링 메트릭 (Phase 7)
  • holidays.json 자동 동기화 (backlog — 일단 stock/app/holidays.json 의 manual copy)
  • 메모리 cache TTL 만료 entry 명시 cleanup (YAGNI)

3. 파일 구조

3.1 신규 디렉토리: web-ai/signal_v2/

web-ai/signal_v2/
├── __init__.py
├── main.py                  # FastAPI app + lifespan + GET /health
├── config.py                # env 로딩 (STOCK_API_URL, WEBAI_API_KEY, SIGNAL_V2_PORT)
├── stock_client.py          # StockClient: httpx async + retry + cache + auth header
├── scheduler.py             # poll_loop, _next_interval, _is_market_day, _seconds_until_next_market_open
├── pull_worker.py           # _run_polling_cycle: 3 endpoint 병렬 fetch + state 갱신
├── rate_limit.py            # SignalDedup: is_recent + record (WAL + busy_timeout)
├── state.py                 # PollState dataclass (process-wide singleton)
├── holidays.json            # 한국 휴장일 (stock/app/holidays.json 복사)
├── start.bat                # uvicorn signal_v2.main:app --port 8001
├── data/
│   ├── .gitkeep
│   └── signal_v2.db         # SQLite (gitignore)
└── tests/
    ├── __init__.py
    ├── conftest.py          # pytest-asyncio + fixtures
    ├── test_stock_client.py # 6 케이스
    ├── test_scheduler.py    # 5 케이스
    ├── test_rate_limit.py   # 3 케이스
    └── test_main.py         # 2 케이스

3.2 변경 매트릭스

파일 작업
web-ai/signal_v2/ 전체 신규 디렉토리
web-ai/.env 3 줄 추가: STOCK_API_URL=https://gahusb.synology.me, WEBAI_API_KEY=<Phase 1 동일 값>, SIGNAL_V2_PORT=8001
web-ai/.gitignore signal_v2/data/*.db, signal_v2/data/*.db-* (WAL/SHM) 추가
web-ai/CLAUDE.md signal_v2/ 섹션은 이미 signal_v1 rename slice 에서 작성됨 — 무변경

3.3 기존 파일 무변경

  • web-ai/signal_v1/ 전체 (V1 자동매매)
  • web-ai/start.bat (V1 진입)
  • 다른 lab / web-backend / web-ui 영향 0

4. API 계약

4.1 StockClient 클래스 (signal_v2/stock_client.py)

class StockClient:
    """stock API 호출 wrapper. httpx async + 자체 retry + 메모리 cache."""

    def __init__(self, base_url: str, api_key: str, timeout: float = 10.0):
        self._base_url = base_url.rstrip("/")
        self._api_key = api_key
        self._client = httpx.AsyncClient(timeout=timeout)
        self._cache: dict[str, tuple[Any, float]] = {}

    async def get_portfolio(self) -> dict:
        """GET /api/webai/portfolio. cache TTL 60s."""

    async def get_news_sentiment(self, date: str | None = None) -> dict:
        """GET /api/webai/news-sentiment. cache TTL 300s."""

    async def run_screener_preview(
        self, weights: dict | None = None, top_n: int = 20
    ) -> dict:
        """POST /api/stock/screener/run {mode:'preview', ...}. cache TTL 60s."""

    async def close(self) -> None: ...

    # internal
    async def _request_with_retry(self, method, path, **kwargs) -> dict: ...
    def _cache_get(self, key: str) -> Any | None: ...
    def _cache_set(self, key: str, data: Any) -> None: ...
    def _auth_headers(self) -> dict[str, str]: ...  # {"X-WebAI-Key": self._api_key}

retry 정책:

  • max_attempts = 3
  • timeout = 10s
  • 429 응답: exponential backoff (1s → 2s → 4s)
  • 5xx 응답: 짧은 retry (max 3회) 후 raise
  • 모든 retry 실패 + cache 에 이전 성공 응답 있음 → stale fallback + logger.warning

cache TTL:

  • portfolio: 60s
  • news-sentiment: 300s (일별 갱신이라 TTL 길어도 무방)
  • screener preview: 60s

4.2 FastAPI app (signal_v2/main.py)

app = FastAPI(title="Signal V2 Pull Worker", version="0.1.0")

@app.on_event("startup")
async def startup():
    # 1. config 로드
    # 2. SignalDedup DB 초기화
    # 3. StockClient 생성 (전역 상태)
    # 4. asyncio.create_task(poll_loop(...))

@app.on_event("shutdown")
async def shutdown():
    # 1. shutdown_event.set() → poll_loop 종료
    # 2. StockClient.close()

@app.get("/health")
async def health() -> dict:
    return {
        "status": "online",
        "stock_api_url": settings.stock_api_url,
        "last_poll": state.last_updated,
        "cache_size": len(client._cache),
    }

Phase 5 이후 추가될 endpoint (본 spec 외): POST /signal (agent-office 호출).

4.3 PollState (signal_v2/state.py)

@dataclass
class PollState:
    portfolio: dict | None = None
    news_sentiment: dict | None = None
    screener_preview: dict | None = None
    last_updated: dict[str, str] = field(default_factory=dict)
    fetch_errors: dict[str, int] = field(default_factory=dict)

단일 process-wide 인스턴스 (state.py 모듈 변수). Phase 3 가 from signal_v2.state import state 로 read-only 참조.


5. Scheduler 구현

5.1 polling 주기 결정 (signal_v2/scheduler.py)

KST = ZoneInfo("Asia/Seoul")
_HOLIDAYS = set(json.loads((Path(__file__).parent / "holidays.json").read_text()))

_PRE_MARKET = (time(7, 0), time(9, 0))       # 5분
_MARKET = (time(9, 0), time(15, 30))         # 1분
_POST_MARKET = (time(15, 30), time(20, 0))   # 5분
# 그 외 야간 (20:00-07:00): polling 없음

def _is_market_day(now: datetime) -> bool:
    if now.weekday() >= 5: return False
    if now.strftime("%Y-%m-%d") in _HOLIDAYS: return False
    return True

def _next_interval(now: datetime) -> float:
    """다음 폴링까지 sleep 초수."""
    if not _is_market_day(now):
        return _seconds_until_next_market_open(now)
    t = now.time()
    if _PRE_MARKET[0] <= t < _PRE_MARKET[1]: return 300
    elif _MARKET[0] <= t < _MARKET[1]: return 60
    elif _POST_MARKET[0] <= t < _POST_MARKET[1]: return 300
    else: return _seconds_until_next_market_open(now)

5.2 polling loop

async def poll_loop(client: StockClient, state: PollState, shutdown: asyncio.Event) -> None:
    logger.info("poll_loop started")
    while not shutdown.is_set():
        now = datetime.now(KST)
        if _is_market_day(now) and _is_polling_window(now):
            try:
                await _run_polling_cycle(client, state)
            except Exception:
                logger.exception("poll cycle failed")
        interval = _next_interval(now)
        try:
            await asyncio.wait_for(shutdown.wait(), timeout=interval)
            break
        except asyncio.TimeoutError:
            continue

async def _run_polling_cycle(client: StockClient, state: PollState) -> None:
    """3 endpoint 병렬 fetch + state 갱신."""
    portfolio, sentiment, screener = await asyncio.gather(
        client.get_portfolio(),
        client.get_news_sentiment(),
        client.run_screener_preview(),
        return_exceptions=True,
    )
    now_iso = datetime.now(KST).isoformat()
    if isinstance(portfolio, dict):
        state.portfolio = portfolio
        state.last_updated["portfolio"] = now_iso
        state.fetch_errors["portfolio"] = 0
    elif isinstance(portfolio, Exception):
        state.fetch_errors["portfolio"] = state.fetch_errors.get("portfolio", 0) + 1
    # 동일 처리 for sentiment, screener

5.3 holidays.json

stock/app/holidays.json 의 복사본을 signal_v2/holidays.json 으로 manual copy. 향후 backlog: 자동 동기화 또는 shared library.


6. Rate Limit DB

6.1 SQLite schema (signal_v2/rate_limit.py 의 startup 시 생성)

CREATE TABLE IF NOT EXISTS signal_dedup (
    ticker      TEXT NOT NULL,
    action      TEXT NOT NULL,         -- 'buy' or 'sell'
    last_sent   TEXT NOT NULL,         -- ISO timestamp KST
    confidence  REAL NOT NULL,
    PRIMARY KEY (ticker, action)
);
CREATE INDEX IF NOT EXISTS idx_signal_dedup_last_sent ON signal_dedup(last_sent);

6.2 SignalDedup 클래스

class SignalDedup:
    """Phase 4 signal generator 가 사용. WAL + busy_timeout=120000."""

    def __init__(self, db_path: Path): ...

    def _conn(self) -> sqlite3.Connection:
        conn = sqlite3.connect(self._db_path, timeout=120.0)
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA busy_timeout=120000")
        return conn

    def _init_schema(self) -> None: ...

    def is_recent(self, ticker: str, action: str, within_hours: int = 24) -> bool:
        """True 면 24h 내 발송됨 → silent."""

    def record(self, ticker: str, action: str, confidence: float) -> None:
        """발송 직후 호출. PK 충돌 시 last_sent 갱신 (UPSERT)."""

Phase 2 에서는 인프라만 구축. Phase 4 가 매수/매도 결정 직전 is_recent() 체크 + 발송 후 record() 호출.


7. 테스트

7.1 test_stock_client.py (6 케이스)

케이스 검증
test_get_portfolio_normal_returns_dict_with_pnl_pct 정상 200 + 응답 파싱 + cache 저장
test_get_portfolio_uses_cache_within_ttl 첫 호출 후 60s 내 두번째 호출 = mock httpx 콜 1회
test_get_portfolio_refetches_after_ttl_expiry frozen_time 으로 60s+1 진행 후 mock httpx 콜 2회
test_get_portfolio_retries_3_times_on_timeout mock 이 처음 2회 timeout → 3회차 200 → exponential sleep 검증
test_get_portfolio_429_triggers_backoff 429 응답 → 1s sleep → 재시도 → 200
test_get_portfolio_falls_back_to_stale_on_all_failures cache 에 이전 성공 + 모든 retry 5xx → stale 반환 + logger.warning

7.2 test_scheduler.py (5 케이스)

케이스 검증
test_next_interval_pre_market_5min now=08:30 평일 → 300
test_next_interval_market_open_1min now=10:00 평일 → 60
test_next_interval_post_market_5min now=17:00 평일 → 300
test_next_interval_overnight_skip_to_next_morning now=22:00 평일 → 다음날 07:00 까지
test_next_interval_holiday_skip now=2026-08-15 (공휴일) → 다음 영업일 07:00 까지

7.3 test_rate_limit.py (3 케이스)

케이스 검증
test_is_recent_returns_false_for_new_ticker_action record 없음 → False
test_is_recent_returns_true_within_24h record 호출 1초 후 → True
test_is_recent_returns_false_after_24h record + 24h 1분 후 → False

7.4 test_main.py (2 케이스)

케이스 검증
test_health_endpoint_returns_status_online TestClient → GET /health → 200 + status online
test_startup_warns_if_webai_api_key_missing env 미설정 + startup → logger.warning

총 16 신규 테스트. 외부 stock 호출 0 (전부 mock).

7.5 conftest.py

import pytest
from pathlib import Path
import respx

@pytest.fixture
def tmp_dedup_db(tmp_path) -> Path:
    return tmp_path / "test_signal_v2.db"

@pytest.fixture
async def mock_stock_api():
    async with respx.mock(base_url="https://test.stock.local") as mock:
        yield mock

@pytest.fixture
def frozen_now(monkeypatch):
    """datetime.now(KST) 고정용 (freezegun 또는 monkeypatch)."""

pytest-asyncio mode = "auto" — pyproject.toml 또는 pytest.ini 에 명시.


8. 위험 및 완화

위험 완화
stock API 응답 지연 (NAS 부하 / 네트워크) timeout 10s + retry 3회 + cache fallback (stale)
.env 의 WEBAI_API_KEY 미설정 → 모든 호출 401 startup ERROR log + Phase 1 의 503 응답 fallback 활용
Polling cycle 중 web-ai 종료 shutdown.wait timeout 으로 즉시 break, asyncio cleanup
holidays.json 미동기화 → 휴일 폴링 시도 stock 측 응답 정상 (데이터 stale). Phase 7 모니터링
SQLite WAL lock (Phase 4 가 signal generator 동시 write) busy_timeout=120000 + WAL → reader/writer 분리. Phase 4 단일 writer 직렬 보장
메모리 cache 누수 (장기 운영) TTL 만료 entry 명시 cleanup 없음 (YAGNI). Phase 7 모니터링
signal_v1 (port 8000) ↔ signal_v2 (port 8001) 충돌 다른 port. 같은 머신에서 동시 가동 가능
시간대 (KST) 계산 오류 (DST) KST 는 DST 없음 (Asia/Seoul +09:00 고정). 안전
asyncio + sqlite3 (sync) 혼합 rate_limit 호출은 짧음. Phase 4 의 호출 패턴 결정 시 점검
Phase 1 rate limit (60r/m) 초과 polling 빈도 분당 3 → 20x 여유. 정상 동작 시 무관

9. 운영 영향

항목 영향
다운타임 0 (V1 영향 없음, V2 신규 시작)
사용자 영향 없음 (V2 silent, Phase 5 까지 신호 발송 없음)
.env 갱신 사용자 1회 (WEBAI_API_KEY, STOCK_API_URL, SIGNAL_V2_PORT)
V1 영향 0 (별도 process / port / 디렉토리)
stock NAS 부하 매우 작음 (장중 분당 3 call)
외부 의존성 추가 httpx, pytest-asyncio, respx

10. Phase 2 완료 조건 (DoD)

  • web-ai/signal_v2/ 디렉토리 + 7 파이썬 파일 (main.py / config.py / stock_client.py / scheduler.py / pull_worker.py / rate_limit.py / state.py + init.py)
  • holidays.json 복사
  • tests/ 디렉토리 + conftest.py + 4 test 파일 + 16 케이스 모두 PASS
  • python -m uvicorn signal_v2.main:app --port 8001 정상 시작 + GET http://localhost:8001/health 200
  • 1 회 polling cycle 완료 → state.portfolio + state.news_sentiment + state.screener_preview 갱신 확인 (수동 trigger 또는 첫 자연 cycle)
  • rate_limit DB 파일 생성 + WAL + busy_timeout 설정 확인
  • .env 갱신 (사용자 1회): STOCK_API_URL=https://gahusb.synology.me, WEBAI_API_KEY=<Phase 1 동일>, SIGNAL_V2_PORT=8001
  • web-ai V1 봇 무영향 검증 (start.bat 정상 시작)
  • git push (web-ai repo)

11. Phase 3 와의 관계

본 Phase 2 완료 후 즉시 Phase 3 (KIS WebSocket + 분봉 + Chronos-2 추론) spec → plan → 구현. 의존성:

[Phase 2 spec/plan/실행]   →   [Phase 3 spec/plan/실행]
       2주                          2주

Phase 3 의 입력 계약 = 본 spec 의 PollState (Phase 3 코드가 read-only 로 import). Phase 3 의 추론 결과 (Chronos-2 quantile 등) 는 별도 state 객체 또는 PollState 확장 — Phase 3 spec 에서 결정.


12. Backlog (본 spec NOT)

  • ticker filter (news-sentiment ?tickers= 옵션 활용) — V2 운영 후 종목 필터 시
  • 운영 메트릭 (응답시간 / 에러율 / 텔레그램 alert) — Phase 7
  • holidays.json 자동 동기화 (stock → web-ai)
  • cache 만료 entry 명시 cleanup (장기 운영 시 메모리 누수 발견 시)
  • Phase 5 POST /signal endpoint (agent-office 호출) — Phase 5 spec
  • WebSocket-based polling (현재 HTTP polling, 향후 stock 측이 WebSocket push 도입 시)
  • Phase 6 signal_v1 deprecation (V1 자동매매 정리)
  • Phase 4 가 rate_limit 호출 시 asyncio.to_thread vs 직접 호출 결정