Files
web-page/docs/superpowers/plans/2026-05-13-ai-news-sentiment-node.md
gahusb 7ebeba2f3d docs(screener): AI news sentiment node implementation plan (15 tasks)
15-task TDD plan for 8th score node ai_news. backend (scraper + analyzer +
pipeline + telegram + node + router) + agent-office (service_proxy + cron
handler + scheduler) + frontend (canvasLayout 1 file). 단위 테스트 22개
(scraper 4, analyzer 4, pipeline 3, telegram 4, node 5, router 2).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 23:18:00 +09:00

56 KiB
Raw Permalink Blame History

AI News Sentiment Node Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 시총 상위 100종목의 네이버 뉴스를 Claude Haiku로 호재/악재 분석하여 8번째 점수 노드 ai_news 를 스크리너 가중합에 통합한다. 평일 08:00 KST 자동 잡 + 호재/악재 Top 5 텔레그램 알림.

Architecture: stock-lab 내부에 screener/ai_news/ 모듈 신설 (scraper + analyzer + pipeline + telegram). 새 nodes/ai_news.pynews_sentiment 테이블을 percentile_rank로 변환해 가중합 기여. agent-office 는 cron 트리거와 텔레그램 발송만 담당. 프론트 변경은 canvasLayout.js 한 곳.

Tech Stack: Python 3.11 / FastAPI / SQLite (WAL+busy_timeout) / anthropic SDK (async) / httpx (async) / BeautifulSoup4 / APScheduler / React 18.

선행 spec: web-ui/docs/superpowers/specs/2026-05-13-ai-news-sentiment-node-design.md


파일 구조

신규 파일 (backend):

web-backend/stock-lab/app/screener/
  ai_news/
    __init__.py
    scraper.py             — 네이버 종목 뉴스 스크래핑
    analyzer.py            — Claude Haiku 호재/악재 분석
    pipeline.py            — refresh_daily() (스크래핑+병렬 LLM+DB)
    telegram.py            — Top 5/5 메시지 빌더
  nodes/
    ai_news.py             — 8번째 ScoreNode

web-backend/stock-lab/tests/
  test_ai_news_scraper.py
  test_ai_news_analyzer.py
  test_ai_news_pipeline.py
  test_ai_news_telegram.py
  test_ai_news_node.py

수정 파일 (backend):

web-backend/stock-lab/app/screener/
  schema.py              — news_sentiment DDL + DEFAULT_WEIGHTS/PARAMS 보강
  registry.py            — NODE_REGISTRY["ai_news"] 등록
  engine.py              — ScreenContext에 news_sentiment 필드 + load 갱신
  router.py              — POST /snapshot/refresh-news-sentiment 라우트
web-backend/stock-lab/requirements.txt   — anthropic 추가

web-backend/agent-office/app/
  service_proxy.py       — refresh_ai_news_sentiment() helper
  agents/stock.py        — on_ai_news_schedule 메서드
  scheduler.py           — cron mon-fri 08:00 등록

수정 파일 (frontend):

web-ui/src/pages/stock/screener/components/canvas/constants/
  canvasLayout.js        — AI 노드 추가
  canvasLayout.test.js   — 카운트 갱신

Task 1: 의존성 + DB 스키마 + 기본 설정 보강

Files:

  • Modify: web-backend/stock-lab/requirements.txt

  • Modify: web-backend/stock-lab/app/screener/schema.py

  • Step 1: requirements.txt 에 anthropic SDK 추가

anthropic==0.39.0 한 줄을 적당한 위치(알파벳 순)에 추가.

  • Step 2: schema.pyDEFAULT_WEIGHTS / DEFAULT_NODE_PARAMS 에 ai_news 추가

DEFAULT_WEIGHTS 딕셔너리 끝에:

    "ai_news":      0.8,

DEFAULT_NODE_PARAMS 딕셔너리 끝에:

    "ai_news":      {"min_news_count": 1},
  • Step 3: schema.pyDDL 문자열에 news_sentiment 테이블 추가

기존 screener_results 테이블 정의 뒤에 추가:

CREATE TABLE IF NOT EXISTS news_sentiment (
  ticker          TEXT NOT NULL,
  date            TEXT NOT NULL,
  score_raw       REAL NOT NULL,
  reason          TEXT NOT NULL DEFAULT '',
  news_count      INTEGER NOT NULL DEFAULT 0,
  tokens_input    INTEGER NOT NULL DEFAULT 0,
  tokens_output   INTEGER NOT NULL DEFAULT 0,
  model           TEXT NOT NULL DEFAULT 'claude-haiku-4-5-20251001',
  created_at      TEXT NOT NULL DEFAULT (datetime('now','localtime')),
  PRIMARY KEY (ticker, date)
);
CREATE INDEX IF NOT EXISTS idx_news_sentiment_date ON news_sentiment(date DESC);
  • Step 4: ensure_screener_schema 에 1회 마이그레이션 추가 — 기존 settings의 weights에 ai_news 누락 시 보충

기존 함수 본문의 conn.executescript(DDL) 직후, existing = ... 이전에 다음 블록 추가:

    # ai_news 키 누락 시 1회 보충 (이미 운영 중인 환경에 대해)
    row = conn.execute(
        "SELECT weights_json, node_params_json FROM screener_settings WHERE id=1"
    ).fetchone()
    if row is not None:
        w = json.loads(row[0])
        p = json.loads(row[1])
        changed = False
        if "ai_news" not in w:
            w["ai_news"] = DEFAULT_WEIGHTS["ai_news"]
            changed = True
        if "ai_news" not in p:
            p["ai_news"] = DEFAULT_NODE_PARAMS["ai_news"]
            changed = True
        if changed:
            conn.execute(
                "UPDATE screener_settings SET weights_json=?, node_params_json=? WHERE id=1",
                (json.dumps(w), json.dumps(p)),
            )
  • Step 5: 기존 테스트 실행 — 회귀 없음 확인
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest app/test_screener_schema.py -v

Expected: 모든 기존 테스트 PASS (DDL 추가는 idempotent 라 충돌 없음).

  • Step 6: Commit
git add app/screener/schema.py requirements.txt
git commit -m "feat(screener): add news_sentiment table + ai_news defaults + migration"

Task 2: ai_news/scraper.py — 네이버 종목 뉴스 스크래핑 + 테스트

Files:

  • Create: web-backend/stock-lab/app/screener/ai_news/__init__.py

  • Create: web-backend/stock-lab/app/screener/ai_news/scraper.py

  • Test: web-backend/stock-lab/tests/test_ai_news_scraper.py

  • Step 1: __init__.py 빈 파일 생성

내용 없음, 파일만 존재.

  • Step 2: 실패하는 테스트 작성

tests/test_ai_news_scraper.py:

import pytest
from unittest.mock import AsyncMock
from app.screener.ai_news import scraper


SAMPLE_HTML = """
<html><body>
<table class="type5"><tbody>
<tr><td class="title"><a href="/news1">삼성전자, HBM 양산 가시화</a></td><td class="date">2026.05.13 07:30</td></tr>
<tr><td class="title"><a href="/news2">삼성, 4분기 어닝 쇼크 우려</a></td><td class="date">2026.05.13 06:00</td></tr>
<tr><td class="title"><a href="/news3">메모리 시장 회복세</a></td><td class="date">2026.05.12 18:00</td></tr>
</tbody></table>
</body></html>
"""

EMPTY_HTML = "<html><body><table class='type5'><tbody></tbody></table></body></html>"


def _mk_client(status_code=200, text=SAMPLE_HTML):
    client = AsyncMock()
    resp = AsyncMock()
    resp.status_code = status_code
    resp.text = text
    client.get = AsyncMock(return_value=resp)
    return client


@pytest.mark.asyncio
async def test_fetch_news_success_returns_n_items():
    client = _mk_client()
    out = await scraper.fetch_news(client, "005930", n=2)
    assert len(out) == 2
    assert out[0]["title"] == "삼성전자, HBM 양산 가시화"
    assert out[0]["date"] == "2026.05.13 07:30"


@pytest.mark.asyncio
async def test_fetch_news_404_returns_empty():
    client = _mk_client(status_code=404, text="")
    out = await scraper.fetch_news(client, "999999", n=5)
    assert out == []


@pytest.mark.asyncio
async def test_fetch_news_empty_table_returns_empty():
    client = _mk_client(text=EMPTY_HTML)
    out = await scraper.fetch_news(client, "005930", n=5)
    assert out == []


@pytest.mark.asyncio
async def test_fetch_news_n_caps_results():
    client = _mk_client()
    out = await scraper.fetch_news(client, "005930", n=2)
    assert len(out) == 2  # 샘플에 3개 있지만 n=2로 잘림
  • Step 3: 테스트 실패 확인
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest tests/test_ai_news_scraper.py -v

Expected: FAIL — "No module named 'app.screener.ai_news.scraper'".

  • Step 4: scraper.py 구현
"""네이버 finance 종목 뉴스 스크래핑."""

from __future__ import annotations

import logging
from typing import Any, Dict, List

from bs4 import BeautifulSoup

log = logging.getLogger(__name__)

NAVER_NEWS_URL = "https://finance.naver.com/item/news_news.naver"
NAVER_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Referer": "https://finance.naver.com/",
}


async def fetch_news(client, ticker: str, n: int = 5) -> List[Dict[str, Any]]:
    """Scrape top N news headlines for a ticker. Returns [] on any failure."""
    try:
        r = await client.get(NAVER_NEWS_URL, params={"code": ticker, "page": 1})
    except Exception as e:
        log.warning("ai_news scrape http error for %s: %s", ticker, e)
        return []
    if r.status_code != 200:
        return []
    soup = BeautifulSoup(r.text, "lxml")
    out: List[Dict[str, Any]] = []
    for row in soup.select("table.type5 tbody tr")[:n]:
        title_el = row.select_one("td.title a")
        date_el = row.select_one("td.date")
        if not title_el or not date_el:
            continue
        out.append({
            "title": title_el.get_text(strip=True),
            "date": date_el.get_text(strip=True),
        })
    return out
  • Step 5: 테스트 통과 확인
python -m pytest tests/test_ai_news_scraper.py -v

Expected: PASS — 4 tests passed.

  • Step 6: Commit
git add app/screener/ai_news/__init__.py app/screener/ai_news/scraper.py tests/test_ai_news_scraper.py
git commit -m "feat(screener): ai_news scraper (naver finance ticker news)"

Task 3: ai_news/analyzer.py — Claude Haiku 분석 + 테스트

Files:

  • Create: web-backend/stock-lab/app/screener/ai_news/analyzer.py

  • Test: web-backend/stock-lab/tests/test_ai_news_analyzer.py

  • Step 1: 실패하는 테스트 작성

tests/test_ai_news_analyzer.py:

import json
import pytest
from unittest.mock import AsyncMock, MagicMock

from app.screener.ai_news import analyzer


def _mk_llm(content_text: str, in_tokens: int = 100, out_tokens: int = 20):
    llm = AsyncMock()
    resp = MagicMock()
    block = MagicMock()
    block.text = content_text
    resp.content = [block]
    resp.usage = MagicMock(input_tokens=in_tokens, output_tokens=out_tokens)
    llm.messages = MagicMock()
    llm.messages.create = AsyncMock(return_value=resp)
    return llm


NEWS = [{"title": "삼성전자, HBM 양산"}, {"title": "메모리 가격 반등"}]


@pytest.mark.asyncio
async def test_score_sentiment_success_parses_json():
    llm = _mk_llm(json.dumps({"score": 7.5, "reason": "HBM 호재"}))
    out = await analyzer.score_sentiment(llm, "005930", NEWS, name="삼성전자")
    assert out["ticker"] == "005930"
    assert out["score_raw"] == 7.5
    assert out["reason"] == "HBM 호재"
    assert out["news_count"] == 2
    assert out["tokens_input"] == 100
    assert out["tokens_output"] == 20


@pytest.mark.asyncio
async def test_score_sentiment_json_parse_fail_returns_zero():
    llm = _mk_llm("not valid json")
    out = await analyzer.score_sentiment(llm, "005930", NEWS)
    assert out["score_raw"] == 0.0
    assert "parse fail" in out["reason"]
    assert out["tokens_input"] == 100  # 호출은 발생했음


@pytest.mark.asyncio
async def test_score_sentiment_clamps_out_of_range():
    llm = _mk_llm(json.dumps({"score": 15.0, "reason": "초강세"}))
    out = await analyzer.score_sentiment(llm, "005930", NEWS)
    assert out["score_raw"] == 10.0  # +10 클램프


@pytest.mark.asyncio
async def test_score_sentiment_clamps_negative_out_of_range():
    llm = _mk_llm(json.dumps({"score": -42.0, "reason": "초악재"}))
    out = await analyzer.score_sentiment(llm, "005930", NEWS)
    assert out["score_raw"] == -10.0
  • Step 2: 테스트 실패 확인
python -m pytest tests/test_ai_news_analyzer.py -v

Expected: FAIL — "No module named 'app.screener.ai_news.analyzer'".

  • Step 3: analyzer.py 구현
"""Claude Haiku 기반 종목 뉴스 호재/악재 분석."""

from __future__ import annotations

import json
import logging
import os
from typing import Any, Dict, List

log = logging.getLogger(__name__)

DEFAULT_MODEL = os.getenv("AI_NEWS_MODEL", "claude-haiku-4-5-20251001")

PROMPT_TEMPLATE = """다음은 종목 {name}({ticker})에 대한 최근 뉴스 {n}개의 헤드라인입니다.

{news_block}

이 뉴스들이 종목에 호재인지 악재인지 평가하세요.
score: -10(매우 강한 악재) ~ +10(매우 강한 호재) 사이의 실수. 0은 중립.
reason: 30자 이내 한 줄 근거.

JSON으로만 응답하세요. 다른 텍스트 금지:
{{"score": <float>, "reason": "<string>"}}"""


def _clamp(x: float, lo: float = -10.0, hi: float = 10.0) -> float:
    return max(lo, min(hi, x))


async def score_sentiment(
    llm,
    ticker: str,
    news: List[Dict[str, Any]],
    *,
    name: str | None = None,
    model: str = DEFAULT_MODEL,
) -> Dict[str, Any]:
    """Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}."""
    news_block = "\n".join(f"- {n['title']}" for n in news)
    prompt = PROMPT_TEMPLATE.format(
        name=name or ticker, ticker=ticker,
        n=len(news), news_block=news_block,
    )
    resp = await llm.messages.create(
        model=model,
        max_tokens=200,
        messages=[{"role": "user", "content": prompt}],
    )
    text = resp.content[0].text if resp.content else ""
    in_tokens = int(getattr(resp.usage, "input_tokens", 0) or 0)
    out_tokens = int(getattr(resp.usage, "output_tokens", 0) or 0)

    try:
        data = json.loads(text)
        score = _clamp(float(data["score"]))
        reason = str(data["reason"])[:200]
        return {
            "ticker": ticker,
            "score_raw": score,
            "reason": reason,
            "news_count": len(news),
            "tokens_input": in_tokens,
            "tokens_output": out_tokens,
            "model": model,
        }
    except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
        log.warning("ai_news parse fail for %s: %s (raw=%r)", ticker, e, text[:100])
        return {
            "ticker": ticker,
            "score_raw": 0.0,
            "reason": f"parse fail: {e!s}"[:200],
            "news_count": len(news),
            "tokens_input": in_tokens,
            "tokens_output": out_tokens,
            "model": model,
        }
  • Step 4: 테스트 통과 확인
python -m pytest tests/test_ai_news_analyzer.py -v

Expected: PASS — 4 tests passed.

  • Step 5: Commit
git add app/screener/ai_news/analyzer.py tests/test_ai_news_analyzer.py
git commit -m "feat(screener): ai_news Claude Haiku analyzer (-10~+10 + clamp + JSON-fail soft)"

Task 4: ai_news/pipeline.py — refresh_daily 통합 + 테스트

Files:

  • Create: web-backend/stock-lab/app/screener/ai_news/pipeline.py

  • Test: web-backend/stock-lab/tests/test_ai_news_pipeline.py

  • Step 1: 실패하는 테스트 작성

tests/test_ai_news_pipeline.py:

import datetime as dt
import sqlite3
import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from app.screener.ai_news import pipeline
from app.screener.schema import ensure_screener_schema


@pytest.fixture
def conn():
    c = sqlite3.connect(":memory:")
    c.row_factory = sqlite3.Row
    ensure_screener_schema(c)
    # 시총 상위 3종목 시드
    c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
              "VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("005930", "삼성전자", 9_000_000))
    c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
              "VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("000660", "SK하이닉스", 8_000_000))
    c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
              "VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("373220", "LG에너지솔루션", 7_000_000))
    c.commit()
    yield c
    c.close()


@pytest.mark.asyncio
async def test_refresh_daily_happy_path(conn):
    """3종목 mini integration — 각 종목별로 scraper/analyzer mock."""
    asof = dt.date(2026, 5, 13)
    fake_news = [{"title": "헤드라인"}]
    
    async def fake_fetch(client, ticker, n):
        return fake_news
    
    scores_by_ticker = {
        "005930": 7.5, "000660": 4.0, "373220": -6.0,
    }
    async def fake_score(llm, ticker, news, *, name=None, model="m"):
        return {
            "ticker": ticker, "score_raw": scores_by_ticker[ticker],
            "reason": f"r{ticker}", "news_count": 1,
            "tokens_input": 100, "tokens_output": 20, "model": model,
        }
    
    with patch.object(pipeline, "_scraper") as ms, \
         patch.object(pipeline, "_analyzer") as ma, \
         patch.object(pipeline, "_make_llm") as ml, \
         patch.object(pipeline, "_make_http") as mh:
        ms.fetch_news = fake_fetch
        ma.score_sentiment = fake_score
        ml.return_value.__aenter__.return_value = AsyncMock()
        ml.return_value.__aexit__.return_value = None
        mh.return_value.__aenter__.return_value = AsyncMock()
        mh.return_value.__aexit__.return_value = None
        result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
    
    assert result["asof"] == "2026-05-13"
    assert result["updated"] == 3
    assert result["failures"] == []
    assert len(result["top_pos"]) == 3
    assert result["top_pos"][0]["ticker"] == "005930"  # 가장 큰 점수
    assert result["top_neg"][0]["ticker"] == "373220"  # 가장 작은 점수
    assert result["tokens_input"] == 300
    assert result["tokens_output"] == 60
    
    # DB upsert 확인
    rows = conn.execute("SELECT ticker, score_raw FROM news_sentiment WHERE date=?",
                        ("2026-05-13",)).fetchall()
    assert len(rows) == 3
    by_ticker = {r["ticker"]: r["score_raw"] for r in rows}
    assert by_ticker["005930"] == 7.5
    assert by_ticker["373220"] == -6.0


@pytest.mark.asyncio
async def test_refresh_daily_failures_isolated(conn):
    """한 종목이 예외 던져도 나머지 종목은 정상 처리."""
    asof = dt.date(2026, 5, 13)
    
    async def fake_fetch(client, ticker, n):
        return [{"title": "h"}]
    
    async def fake_score(llm, ticker, news, *, name=None, model="m"):
        if ticker == "000660":
            raise RuntimeError("llm exploded")
        return {
            "ticker": ticker, "score_raw": 5.0, "reason": "r", "news_count": 1,
            "tokens_input": 100, "tokens_output": 20, "model": model,
        }
    
    with patch.object(pipeline, "_scraper") as ms, \
         patch.object(pipeline, "_analyzer") as ma, \
         patch.object(pipeline, "_make_llm") as ml, \
         patch.object(pipeline, "_make_http") as mh:
        ms.fetch_news = fake_fetch
        ma.score_sentiment = fake_score
        ml.return_value.__aenter__.return_value = AsyncMock()
        ml.return_value.__aexit__.return_value = None
        mh.return_value.__aenter__.return_value = AsyncMock()
        mh.return_value.__aexit__.return_value = None
        result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
    
    assert result["updated"] == 2
    assert len(result["failures"]) == 1


def test_top_market_cap_tickers(conn):
    out = pipeline._top_market_cap_tickers(conn, n=2)
    assert out == ["005930", "000660"]
  • Step 2: 테스트 실패 확인
python -m pytest tests/test_ai_news_pipeline.py -v

Expected: FAIL — "No module named 'app.screener.ai_news.pipeline'".

  • Step 3: pipeline.py 구현
"""ai_news refresh pipeline — 시총 상위 N종목 병렬 처리."""

from __future__ import annotations

import asyncio
import datetime as dt
import logging
import os
import sqlite3
import time
from typing import Any, Dict, List, Optional

import httpx

from . import scraper as _scraper
from . import analyzer as _analyzer

log = logging.getLogger(__name__)

DEFAULT_TOP_N = 100
DEFAULT_CONCURRENCY = 10
DEFAULT_NEWS_PER_TICKER = 5
DEFAULT_RATE_LIMIT_SEC = 0.2


def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
    rows = conn.execute(
        "SELECT ticker FROM krx_master "
        "WHERE market_cap IS NOT NULL AND is_preferred=0 AND is_spac=0 "
        "ORDER BY market_cap DESC LIMIT ?",
        (n,),
    ).fetchall()
    return [r[0] for r in rows]


def _make_http():
    return httpx.AsyncClient(timeout=10.0, headers=_scraper.NAVER_HEADERS)


def _make_llm():
    """Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수."""
    from anthropic import AsyncAnthropic
    return AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])


async def _process_one(
    ticker: str, name: str, sem: asyncio.Semaphore,
    http_client, llm, news_per_ticker: int, rate_limit_sec: float, model: str,
) -> Dict[str, Any]:
    async with sem:
        if rate_limit_sec > 0:
            await asyncio.sleep(rate_limit_sec)
        news = await _scraper.fetch_news(http_client, ticker, n=news_per_ticker)
        if not news:
            return {
                "ticker": ticker, "score_raw": 0.0, "reason": "no news",
                "news_count": 0, "tokens_input": 0, "tokens_output": 0,
                "model": model,
            }
        return await _analyzer.score_sentiment(
            llm, ticker, news, name=name, model=model,
        )


def _upsert_news_sentiment(
    conn: sqlite3.Connection, asof: dt.date, rows: List[Dict[str, Any]]
) -> None:
    iso = asof.isoformat()
    data = [
        (
            r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
            r["tokens_input"], r["tokens_output"], r["model"],
        )
        for r in rows
    ]
    conn.executemany(
        """INSERT INTO news_sentiment
             (ticker, date, score_raw, reason, news_count,
              tokens_input, tokens_output, model)
           VALUES (?, ?, ?, ?, ?, ?, ?, ?)
           ON CONFLICT(ticker, date) DO UPDATE SET
             score_raw=excluded.score_raw,
             reason=excluded.reason,
             news_count=excluded.news_count,
             tokens_input=excluded.tokens_input,
             tokens_output=excluded.tokens_output,
             model=excluded.model
        """,
        data,
    )
    conn.commit()


async def refresh_daily(
    conn: sqlite3.Connection,
    asof: dt.date,
    *,
    top_n: int = DEFAULT_TOP_N,
    concurrency: int = DEFAULT_CONCURRENCY,
    news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
    rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC,
    model: str = _analyzer.DEFAULT_MODEL,
) -> Dict[str, Any]:
    """Returns summary dict with top_pos/top_neg/token totals/failures."""
    started = time.time()
    tickers = _top_market_cap_tickers(conn, n=top_n)
    name_map = {
        r[0]: r[1] for r in conn.execute(
            f"SELECT ticker, name FROM krx_master WHERE ticker IN "
            f"({','.join('?' * len(tickers))})", tickers,
        ).fetchall()
    } if tickers else {}

    sem = asyncio.Semaphore(concurrency)

    async with _make_http() as http_client, _make_llm() as llm:
        tasks = [
            _process_one(
                t, name_map.get(t, t), sem, http_client, llm,
                news_per_ticker, rate_limit_sec, model,
            )
            for t in tickers
        ]
        raw_results = await asyncio.gather(*tasks, return_exceptions=True)

    successes: List[Dict[str, Any]] = []
    failures: List[str] = []
    for r in raw_results:
        if isinstance(r, BaseException):
            failures.append(repr(r))
        elif isinstance(r, dict):
            successes.append(r)

    if successes:
        _upsert_news_sentiment(conn, asof, successes)

    top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
    top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]

    return {
        "asof": asof.isoformat(),
        "updated": len(successes),
        "failures": failures,
        "duration_sec": round(time.time() - started, 2),
        "tokens_input": sum(r["tokens_input"] for r in successes),
        "tokens_output": sum(r["tokens_output"] for r in successes),
        "top_pos": top_pos,
        "top_neg": top_neg,
        "model": model,
    }
  • Step 4: 테스트 통과 확인
python -m pytest tests/test_ai_news_pipeline.py -v

Expected: PASS — 3 tests passed.

  • Step 5: Commit
git add app/screener/ai_news/pipeline.py tests/test_ai_news_pipeline.py
git commit -m "feat(screener): ai_news pipeline (top-100 parallel, fail-soft, upsert)"

Task 5: ai_news/telegram.py — Top 5/5 메시지 빌더 + 테스트

Files:

  • Create: web-backend/stock-lab/app/screener/ai_news/telegram.py

  • Test: web-backend/stock-lab/tests/test_ai_news_telegram.py

  • Step 1: 실패하는 테스트 작성

from app.screener.ai_news import telegram as tg


def _row(ticker, score, reason="r"):
    return {"ticker": ticker, "score_raw": score, "reason": reason,
            "news_count": 5, "tokens_input": 100, "tokens_output": 20,
            "model": "m"}


def test_build_message_includes_top_sections():
    msg = tg.build_message(
        asof="2026-05-13",
        top_pos=[_row("005930", 8.5, "HBM 호재")],
        top_neg=[_row("373220", -6.3, "수주 지연")],
        tokens_input=10000, tokens_output=2000,
    )
    assert "AI 뉴스 분석" in msg
    assert "호재 Top" in msg
    assert "악재 Top" in msg
    assert "005930" in msg
    assert "8.5" in msg
    assert "HBM" in msg
    assert "373220" in msg


def test_build_message_escapes_markdownv2_specials():
    msg = tg.build_message(
        asof="2026-05-13",
        top_pos=[_row("005930", 3.0, "테스트(괄호) [대괄호]")],
        top_neg=[],
        tokens_input=100, tokens_output=20,
    )
    # MarkdownV2 특수문자 ( ) [ ] 이 escape 되어야 함
    assert r"\(" in msg or r"\)" in msg
    assert r"\[" in msg or r"\]" in msg


def test_build_message_cost_won_line():
    msg = tg.build_message(
        asof="2026-05-13", top_pos=[], top_neg=[],
        tokens_input=10000, tokens_output=2000,
    )
    # tokens_input × 0.0013 + tokens_output × 0.0065 = 13 + 13 = ₩26
    assert "₩26" in msg or "₩ 26" in msg or "₩" in msg


def test_build_message_empty_lists():
    msg = tg.build_message(
        asof="2026-05-13", top_pos=[], top_neg=[],
        tokens_input=0, tokens_output=0,
    )
    # 빈 리스트라도 헤더는 있어야 함
    assert "호재 Top" in msg
    assert "악재 Top" in msg
  • Step 2: 테스트 실패 확인
python -m pytest tests/test_ai_news_telegram.py -v

Expected: FAIL — "No module named 'app.screener.ai_news.telegram'".

  • Step 3: telegram.py 구현
"""ai_news Top 5/5 텔레그램 메시지 빌더 (MarkdownV2)."""

from __future__ import annotations

from typing import Any, Dict, List


_MD_SPECIAL = r"_*[]()~`>#+-=|{}.!\\"


def _escape(text: str) -> str:
    return "".join("\\" + c if c in _MD_SPECIAL else c for c in str(text))


def _cost_won(tokens_input: int, tokens_output: int) -> int:
    """Claude Haiku 가격 환산 (대략): in $1/M × ₩1300, out $5/M × ₩1300."""
    return int(tokens_input * 0.0013 + tokens_output * 0.0065)


def _row_line(idx: int, r: Dict[str, Any]) -> str:
    score = r["score_raw"]
    sign = "+" if score >= 0 else ""
    return (
        f"{idx}\\. {_escape(r['ticker'])} \\({sign}{score:.1f}\\) — "
        f"{_escape(r['reason'])}"
    )


def build_message(
    *,
    asof: str,
    top_pos: List[Dict[str, Any]],
    top_neg: List[Dict[str, Any]],
    tokens_input: int,
    tokens_output: int,
) -> str:
    lines: List[str] = [
        f"🌅 *AI 뉴스 분석* \\({_escape(asof)} 08:00\\)",
        "",
        "📈 *호재 Top 5*",
    ]
    if top_pos:
        for i, r in enumerate(top_pos, 1):
            lines.append(_row_line(i, r))
    else:
        lines.append(_escape("- (없음)"))

    lines += ["", "📉 *악재 Top 5*"]
    if top_neg:
        for i, r in enumerate(top_neg, 1):
            lines.append(_row_line(i, r))
    else:
        lines.append(_escape("- (없음)"))

    cost = _cost_won(tokens_input, tokens_output)
    lines += [
        "",
        f"_분석: 시총 상위 100종목 · 토큰 {tokens_input:,} in / {tokens_output:,} out · "
        f"약 ₩{cost:,}_",
    ]
    return "\n".join(lines)
  • Step 4: 테스트 통과 확인
python -m pytest tests/test_ai_news_telegram.py -v

Expected: PASS — 4 tests passed.

  • Step 5: Commit
git add app/screener/ai_news/telegram.py tests/test_ai_news_telegram.py
git commit -m "feat(screener): ai_news telegram message builder (MarkdownV2 + cost line)"

Task 6: nodes/ai_news.py — ScoreNode 통합 + 테스트

Files:

  • Create: web-backend/stock-lab/app/screener/nodes/ai_news.py

  • Test: web-backend/stock-lab/tests/test_ai_news_node.py

  • Step 1: 실패하는 테스트 작성

import datetime as dt
import pandas as pd
import pytest
from app.screener.nodes.ai_news import AiNewsSentiment


class FakeCtx:
    def __init__(self, df=None):
        self.news_sentiment = df
        self.asof = dt.date(2026, 5, 13)


def test_compute_empty_context():
    out = AiNewsSentiment().compute(FakeCtx(None), {"min_news_count": 1})
    assert out.empty


def test_compute_with_data_percentile_ranks():
    df = pd.DataFrame([
        {"ticker": "A", "score_raw": -5.0, "news_count": 3},
        {"ticker": "B", "score_raw":  0.0, "news_count": 3},
        {"ticker": "C", "score_raw":  8.0, "news_count": 3},
    ])
    out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
    assert len(out) == 3
    # percentile rank: A (lowest) < B < C (highest)
    assert out.loc["A"] < out.loc["B"] < out.loc["C"]
    # all within [0, 100]
    assert (out >= 0).all() and (out <= 100).all()


def test_compute_filters_by_min_news_count():
    df = pd.DataFrame([
        {"ticker": "A", "score_raw": -5.0, "news_count": 0},  # 필터됨
        {"ticker": "B", "score_raw":  0.0, "news_count": 2},
        {"ticker": "C", "score_raw":  8.0, "news_count": 5},
    ])
    out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
    assert "A" not in out.index
    assert "B" in out.index
    assert "C" in out.index


def test_compute_all_filtered_returns_empty():
    df = pd.DataFrame([
        {"ticker": "A", "score_raw": 5.0, "news_count": 0},
    ])
    out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
    assert out.empty


def test_metadata():
    n = AiNewsSentiment()
    assert n.name == "ai_news"
    assert "AI" in n.label or "뉴스" in n.label
    assert n.default_params == {"min_news_count": 1}
    assert "min_news_count" in n.param_schema["properties"]
  • Step 2: 테스트 실패 확인
python -m pytest tests/test_ai_news_node.py -v

Expected: FAIL — "No module named 'app.screener.nodes.ai_news'".

  • Step 3: nodes/ai_news.py 구현
"""AI 뉴스 호재/악재 점수 노드.

ScreenContext.news_sentiment (DataFrame: ticker, score_raw, news_count) 를
min_news_count 로 필터한 뒤 percentile_rank 로 0~100 변환.
"""

from __future__ import annotations

import pandas as pd

from .base import ScoreNode, percentile_rank


class AiNewsSentiment(ScoreNode):
    name  = "ai_news"
    label = "AI 뉴스 호재/악재"
    default_params = {"min_news_count": 1}
    param_schema = {
        "type": "object",
        "properties": {
            "min_news_count": {
                "type": "integer", "minimum": 0, "default": 1,
                "description": "최소 분석 뉴스 수. 미만이면 점수 미산출.",
            },
        },
    }

    def compute(self, ctx, params: dict) -> pd.Series:
        df = getattr(ctx, "news_sentiment", None)
        if df is None or df.empty:
            return pd.Series(dtype=float)
        min_news = int(params.get("min_news_count", 1))
        df = df[df["news_count"] >= min_news]
        if df.empty:
            return pd.Series(dtype=float)
        return percentile_rank(df.set_index("ticker")["score_raw"])
  • Step 4: 테스트 통과 확인
python -m pytest tests/test_ai_news_node.py -v

Expected: PASS — 5 tests passed.

  • Step 5: Commit
git add app/screener/nodes/ai_news.py tests/test_ai_news_node.py
git commit -m "feat(screener): AiNewsSentiment ScoreNode (percentile_rank + min_news_count)"

Task 7: engine.py ScreenContext + load 갱신

Files:

  • Modify: web-backend/stock-lab/app/screener/engine.py

  • Step 1: ScreenContext dataclass에 news_sentiment 필드 추가

@dataclass(frozen=True) class ScreenContext: 본문 마지막 (asof: dt.date 다음) 줄에 추가:

    news_sentiment: "pd.DataFrame | None" = None

기존 frozen 제약 때문에 default 값을 갖는 필드는 마지막에 와야 함. asof 다음으로 위치.

  • Step 2: load 메서드 안에 news_sentiment 로딩 추가

load 메서드의 flow = pd.read_sql_query(...) 다음, KOSPI 처리 이전에 추가:

        news_sentiment = pd.read_sql_query(
            "SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date = ?",
            conn, params=(asof_iso,),
        )

return cls(...) 호출에 news_sentiment=news_sentiment 인자 추가:

        return cls(
            master=master, prices=prices, flow=flow,
            kospi=kospi, asof=asof, news_sentiment=news_sentiment,
        )
  • Step 3: 기존 통합 테스트 회귀 검증
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest app/test_screener_context.py -v

Expected: 기존 테스트 모두 PASS (default=None 으로 호환).

  • Step 4: Commit
git add app/screener/engine.py
git commit -m "feat(screener): ScreenContext.news_sentiment field + load query"

Task 8: registry.py 에 ai_news 등록

Files:

  • Modify: web-backend/stock-lab/app/screener/registry.py

  • Step 1: import + NODE_REGISTRY 등록

상단 import에 추가:

from .nodes.ai_news      import AiNewsSentiment

NODE_REGISTRY 딕셔너리 마지막 항목 다음에 추가:

    "ai_news":      AiNewsSentiment,

전체 결과:

"""Registry of node classes (single source of truth for /nodes endpoint)."""

from .nodes.hygiene      import HygieneGate
from .nodes.foreign_buy  import ForeignBuy
from .nodes.volume_surge import VolumeSurge
from .nodes.momentum     import Momentum20
from .nodes.high52w      import High52WProximity
from .nodes.rs_rating    import RsRating
from .nodes.ma_alignment import MaAlignment
from .nodes.vcp_lite     import VcpLite
from .nodes.ai_news      import AiNewsSentiment

NODE_REGISTRY: dict = {
    "foreign_buy":  ForeignBuy,
    "volume_surge": VolumeSurge,
    "momentum":     Momentum20,
    "high52w":      High52WProximity,
    "rs_rating":    RsRating,
    "ma_alignment": MaAlignment,
    "vcp_lite":     VcpLite,
    "ai_news":      AiNewsSentiment,
}

GATE_REGISTRY: dict = {
    "hygiene": HygieneGate,
}
  • Step 2: sanity import 검증
python -c "from app.screener.registry import NODE_REGISTRY; print(list(NODE_REGISTRY))"

Expected: ['foreign_buy', 'volume_surge', 'momentum', 'high52w', 'rs_rating', 'ma_alignment', 'vcp_lite', 'ai_news']

  • Step 3: Commit
git add app/screener/registry.py
git commit -m "feat(screener): register ai_news in NODE_REGISTRY"

Task 9: router.py/snapshot/refresh-news-sentiment 추가

Files:

  • Modify: web-backend/stock-lab/app/screener/router.py

  • Step 1: 라우트 추가

router.py 파일 마지막에 추가 (다른 라우트들 아래):

# ---------- /snapshot/refresh-news-sentiment ----------

from . import ai_news as _ai_news_pkg


@router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None):
    asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
    if asof_date.weekday() >= 5:
        return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
    if _is_holiday(asof_date):
        return {"asof": asof_date.isoformat(), "status": "skipped_holiday"}
    with _conn() as c:
        summary = await _ai_news_pkg.pipeline.refresh_daily(c, asof_date)
    return summary
  • Step 2: ai_news 패키지가 라우터에서 import 가능한지 검증
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -c "from app.screener.router import router; print([r.path for r in router.routes])"

Expected: 출력에 /api/stock/screener/snapshot/refresh-news-sentiment 포함.

  • Step 3: 라우터 테스트 (mock pipeline)

tests/test_ai_news_router.py 생성:

import datetime as dt
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient

from app.main import app


def test_refresh_news_sentiment_weekend_skip():
    # 2026-05-16 = Saturday
    client = TestClient(app)
    resp = client.post(
        "/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-16"
    )
    assert resp.status_code == 200
    assert resp.json()["status"] == "skipped_weekend"


def test_refresh_news_sentiment_weekday_invokes_pipeline():
    fake_summary = {
        "asof": "2026-05-13", "updated": 3, "failures": [],
        "duration_sec": 1.0, "tokens_input": 100, "tokens_output": 20,
        "top_pos": [], "top_neg": [], "model": "m",
    }
    with patch("app.screener.router._ai_news_pkg") as m:
        m.pipeline.refresh_daily = AsyncMock(return_value=fake_summary)
        client = TestClient(app)
        resp = client.post(
            "/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-13"
        )
    assert resp.status_code == 200
    body = resp.json()
    assert body["asof"] == "2026-05-13"
    assert body["updated"] == 3
python -m pytest tests/test_ai_news_router.py -v

Expected: PASS — 2 tests passed.

  • Step 4: Commit
git add app/screener/router.py tests/test_ai_news_router.py
git commit -m "feat(screener): POST /snapshot/refresh-news-sentiment endpoint"

Task 10: agent-office service_proxy.refresh_ai_news_sentiment()

Files:

  • Modify: web-backend/agent-office/app/service_proxy.py

  • Step 1: helper 함수 추가

refresh_screener_snapshot 함수 정의 직후 (line 44 근처) 추가:

async def refresh_ai_news_sentiment() -> Dict[str, Any]:
    """stock-lab의 AI 뉴스 sentiment 분석 트리거 (08:00 cron).

    네이버 100종목 스크래핑 + Claude Haiku 100콜 병렬 = 약 30-60초.
    여유있게 240s timeout.
    """
    async with httpx.AsyncClient(timeout=240.0) as client:
        resp = await client.post(
            f"{STOCK_LAB_URL}/api/stock/screener/snapshot/refresh-news-sentiment"
        )
        resp.raise_for_status()
        return resp.json()
  • Step 2: import 검증
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
python -c "from app.service_proxy import refresh_ai_news_sentiment; print('ok')"

Expected: ok.

  • Step 3: Commit
git add app/service_proxy.py
git commit -m "feat(agent-office): refresh_ai_news_sentiment service helper"

Task 11: agent-office StockAgent.on_ai_news_schedule

Files:

  • Modify: web-backend/agent-office/app/agents/stock.py

  • Step 1: 메서드 추가

on_command 메서드 정의 직전에 다음 메서드 추가:

    async def on_ai_news_schedule(self) -> None:
        """AI 뉴스 sentiment 분석 자동 잡 (평일 08:00 KST).

        흐름:
          1) stock-lab /snapshot/refresh-news-sentiment 호출
          2) status='skipped_weekend'/'skipped_holiday' → 종료 (텔레그램 미발신)
          3) updated=0 → 운영자 알림 (HTML)
          4) failures > 30% → 경고 알림 후 메인 메시지 발송
          5) 정상 → Top 5 호재/악재 메시지 발송 (MarkdownV2)
        """
        if self.state not in ("idle", "break"):
            return

        task_id = create_task(self.agent_id, "ai_news_sentiment", {})
        await self.transition("working", "AI 뉴스 분석 중...", task_id)

        try:
            result = await service_proxy.refresh_ai_news_sentiment()
        except Exception as e:
            err_msg = str(e)
            add_log(self.agent_id, f"AI 뉴스 분석 실패: {err_msg}", "error", task_id)
            update_task_status(task_id, "failed", {"error": err_msg})
            try:
                from ..telegram.messaging import send_raw
                await send_raw(
                    f"⚠️ <b>AI 뉴스 분석 실패</b>\n"
                    f"<code>{html.escape(err_msg)[:500]}</code>"
                )
            except Exception as notify_err:
                add_log(
                    self.agent_id,
                    f"operator notify failed: {notify_err}",
                    "warning", task_id,
                )
            await self.transition("idle", f"AI 뉴스 오류: {err_msg[:80]}")
            return

        status = result.get("status")
        if status in ("skipped_weekend", "skipped_holiday"):
            update_task_status(task_id, "succeeded", {"status": status})
            add_log(self.agent_id, f"AI 뉴스 건너뜀: {status}", "info", task_id)
            await self.transition("idle", f"휴일/주말 — 건너뜀")
            return

        updated = int(result.get("updated", 0))
        failures = result.get("failures", []) or []
        if updated == 0:
            update_task_status(task_id, "failed", {"reason": "0 tickers updated"})
            try:
                from ..telegram.messaging import send_raw
                await send_raw(
                    "⚠️ <b>AI 뉴스 분석 0종목</b>\n"
                    "스크래핑/LLM 전체 실패 — 어제 데이터 사용"
                )
            except Exception:
                pass
            await self.transition("idle", "AI 뉴스 0건")
            return

        # 실패율 경고 (별도 알림, 본 메시지는 계속 발송)
        failure_rate = len(failures) / max(1, updated + len(failures))
        if failure_rate > 0.3:
            try:
                from ..telegram.messaging import send_raw
                await send_raw(
                    f"⚠️ <b>AI 뉴스 실패율 {failure_rate:.0%}</b>\n"
                    f"updated={updated}, failures={len(failures)}"
                )
            except Exception:
                pass

        # 정상 — Top 5 메시지 (stock-lab이 빌드해서 응답에 telegram_text 동봉)
        text = result.get("telegram_text") or ""
        if not text:
            raise RuntimeError("telegram_text 누락")

        await self.transition("reporting", "AI 뉴스 알림 전송 중...")
        from ..telegram.messaging import send_raw
        tg = await send_raw(text, parse_mode="MarkdownV2")

        update_task_status(task_id, "succeeded", {
            "asof": result["asof"],
            "updated": updated,
            "failures": len(failures),
            "tokens_input": int(result.get("tokens_input", 0)),
            "tokens_output": int(result.get("tokens_output", 0)),
            "telegram_sent": tg.get("ok", False),
        })

        if not tg.get("ok"):
            desc = tg.get("description") or "unknown"
            code = tg.get("error_code")
            add_log(
                self.agent_id,
                f"AI news telegram send failed: [{code}] {desc}",
                "warning", task_id,
            )

        await self.transition("idle", "AI 뉴스 완료")

설계 결정: agent-office는 stock-lab의 모듈을 직접 import 불가 (별도 컨테이너). stock-lab API 응답에 telegram_text 필드를 동봉하여 agent-office는 그대로 전달만 한다.

  • Step 2: stock-lab 라우터가 telegram_text 를 응답에 동봉하도록 수정

web-backend/stock-lab/app/screener/router.pypost_refresh_news_sentiment 수정:

@router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None):
    asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
    if asof_date.weekday() >= 5:
        return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
    if _is_holiday(asof_date):
        return {"asof": asof_date.isoformat(), "status": "skipped_holiday"}
    with _conn() as c:
        summary = await _ai_news_pkg.pipeline.refresh_daily(c, asof_date)
    # 텔레그램 텍스트 동봉
    summary["telegram_text"] = _ai_news_pkg.telegram.build_message(
        asof=summary["asof"],
        top_pos=summary["top_pos"], top_neg=summary["top_neg"],
        tokens_input=summary["tokens_input"],
        tokens_output=summary["tokens_output"],
    )
    return summary

agent-office on_ai_news_schedule 의 step 1 코드에서 text = build_message(...) 부분을 text = result["telegram_text"] 로 변경:

        # 정상 — Top 5 메시지 (stock-lab이 빌드해서 응답에 동봉)
        text = result.get("telegram_text") or ""
        if not text:
            raise RuntimeError("telegram_text 누락")
  • Step 3: import 검증
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
python -c "from app.agents.stock import StockAgent; print(hasattr(StockAgent, 'on_ai_news_schedule'))"

Expected: True.

  • Step 4: Task 9 라우터 테스트 회귀 (telegram_text 응답 검증)

tests/test_ai_news_router.pytest_refresh_news_sentiment_weekday_invokes_pipeline 보강:

def test_refresh_news_sentiment_weekday_invokes_pipeline():
    fake_summary = {
        "asof": "2026-05-13", "updated": 3, "failures": [],
        "duration_sec": 1.0, "tokens_input": 100, "tokens_output": 20,
        "top_pos": [], "top_neg": [], "model": "m",
    }
    with patch("app.screener.router._ai_news_pkg") as m:
        m.pipeline.refresh_daily = AsyncMock(return_value=fake_summary)
        m.telegram.build_message = lambda **kw: "BUILT_TEXT"
        client = TestClient(app)
        resp = client.post(
            "/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-13"
        )
    assert resp.status_code == 200
    body = resp.json()
    assert body["telegram_text"] == "BUILT_TEXT"
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest tests/test_ai_news_router.py -v

Expected: PASS.

  • Step 5: Commit (양쪽 repo)
# stock-lab
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
git add app/screener/router.py tests/test_ai_news_router.py
git commit -m "feat(screener): include telegram_text in refresh-news-sentiment response"

# agent-office
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
git add app/agents/stock.py
git commit -m "feat(agent-office): on_ai_news_schedule (cron handler + telegram dispatch)"

주의: 본 task부터 양쪽 디렉토리 모두 변경되므로 web-backend monorepo 안에서 작업한다 (cd 명령으로 sub-package 사이 이동).


Task 12: scheduler.py cron 등록

Files:

  • Modify: web-backend/agent-office/app/scheduler.py

  • Step 1: 기존 scheduler.py 의 on_screener_schedule 등록 패턴 확인

cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
grep -nE "on_screener_schedule|add_job.*stock" app/scheduler.py

기존 등록 라인의 변수 이름 (stock_agent / agents["stock"] 등), 들여쓰기, id 명명 컨벤션을 그대로 모방. 예상 패턴:

scheduler.add_job(
    stock_agent.on_screener_schedule,
    "cron", day_of_week="mon-fri", hour=16, minute=30,
    id="stock_screener", timezone="Asia/Seoul",
    replace_existing=True,
)
  • Step 2: 동일 패턴으로 ai_news cron 등록 추가

기존 on_screener_schedule 등록 라인 바로 다음에 추가:

scheduler.add_job(
    stock_agent.on_ai_news_schedule,
    "cron", day_of_week="mon-fri", hour=8, minute=0,
    id="stock_ai_news_sentiment",
    timezone="Asia/Seoul",
    replace_existing=True,
)

변수 참조는 기존 라인과 정확히 동일하게 (stock_agent 또는 agents["stock"]).

  • Step 3: scheduler import 검증
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
python -c "from app.scheduler import scheduler; print([j.id for j in scheduler.get_jobs()])"

Expected: 출력에 stock_ai_news_sentiment 포함.

(주의: scheduler를 실제 start하지 않고 단순 import 시 빈 리스트일 수 있음. 그 경우 코드 시각 검증으로 대체.)

  • Step 4: Commit
git add app/scheduler.py
git commit -m "feat(agent-office): cron mon-fri 08:00 ai_news sentiment job"

Task 13: frontend canvasLayout.js — AI 노드 추가

Files:

  • Modify: web-ui/src/pages/stock/screener/components/canvas/constants/canvasLayout.js

  • Modify: web-ui/src/pages/stock/screener/components/canvas/constants/canvasLayout.test.js

  • Step 1: 테스트 먼저 갱신 (실패 유도)

canvasLayout.test.js 의 다음 항목들 갱신:

  it('NODE_IDS — 12개 키, 모두 unique', () => {
    const ids = Object.values(NODE_IDS);
    expect(ids).toHaveLength(12);    // 11 → 12
    expect(new Set(ids).size).toBe(12);
  });

  it('EDGES — 18개, source/target이 모두 NODE_IDS 안에 존재', () => {
    expect(EDGES).toHaveLength(18);  // 16 → 18 (1+8+8+1)
    ...
  });

  it('EDGES — 8개 점수 노드는 모두 gate 입력 + combine 출력을 가짐', () => {
    const SCORE_IDS = [
      NODE_IDS.FOREIGN, NODE_IDS.VOLUME, NODE_IDS.MOMENTUM,
      NODE_IDS.HIGH52W, NODE_IDS.RS, NODE_IDS.MA, NODE_IDS.VCP,
      NODE_IDS.AI_NEWS,                  // 추가
    ];
    ...
  });

  it('SCORE_NODE_NAME_MAP — 8개 점수 노드 ID → backend node name', () => {
    expect(Object.keys(SCORE_NODE_NAME_MAP)).toHaveLength(8);  // 7 → 8
    expect(SCORE_NODE_NAME_MAP[NODE_IDS.FOREIGN]).toBe('foreign_buy');
    expect(SCORE_NODE_NAME_MAP[NODE_IDS.VOLUME]).toBe('volume_surge');
    expect(SCORE_NODE_NAME_MAP[NODE_IDS.AI_NEWS]).toBe('ai_news');  // 추가
  });
  • Step 2: 테스트 실패 확인
cd C:\Users\jaeoh\Desktop\workspace\web-ui
npx vitest run src/pages/stock/screener/components/canvas/constants/canvasLayout.test.js

Expected: FAIL — NODE_IDS.AI_NEWS undefined 등.

  • Step 3: canvasLayout.js 갱신

NODE_IDS 객체에 추가 (COMBINE 직전):

  AI_NEWS:  'score-ai-news',

NODE_KIND_MAP 에 추가:

  [NODE_IDS.AI_NEWS]:  'score',

SCORE_NODE_NAME_MAP 에 추가:

  [NODE_IDS.AI_NEWS]:  'ai_news',

INITIAL_NODE_POSITIONS 에 추가 (VCP 다음 y=630):

  [NODE_IDS.AI_NEWS]:  { x:  480, y: 630 },

SCORE_KEYS 배열 끝에 추가:

const SCORE_KEYS = ['FOREIGN','VOLUME','MOMENTUM','HIGH52W','RS','MA','VCP','AI_NEWS'];

SCORE_NODE_LABEL 에 추가:

  [NODE_IDS.AI_NEWS]:  { icon: '🤖', title: 'AI 뉴스' },
  • Step 4: 테스트 통과 확인
npx vitest run src/pages/stock/screener/components/canvas/constants/canvasLayout.test.js

Expected: PASS — 6 tests passed (모두 8 score / 12 nodes / 18 edges 기준).

  • Step 5: 전체 frontend 테스트 회귀
npx vitest run

Expected: 4 files, 20 tests pass (canvasLayout 외 변경 없음).

  • Step 6: Commit
git add src/pages/stock/screener/components/canvas/constants/canvasLayout.js src/pages/stock/screener/components/canvas/constants/canvasLayout.test.js
git commit -m "feat(screener): canvas adds AI news node (12 nodes, 18 edges)"

Task 14: 전체 백엔드 테스트 + 프론트 빌드

Files:

  • (테스트/빌드만 실행)

  • Step 1: stock-lab 전체 테스트

cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest -v

Expected: 모든 신규 테스트 통과 + 기존 71개 테스트 회귀 없음.

  • Step 2: agent-office 전체 테스트
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
python -m pytest -v 2>&1 | tail -20

Expected: 기존 테스트 회귀 없음. on_ai_news_schedule 메서드 추가 단순이라 기존 테스트와 충돌 없음.

  • Step 3: frontend 빌드
cd C:\Users\jaeoh\Desktop\workspace\web-ui
npm run build

Expected: 빌드 성공. CanvasLayout-*.js 청크 크기 거의 변화 없음 (canvasLayout.js +30줄 정도).

  • Step 4: frontend lint (회귀)
npx eslint src/pages/stock/screener/components/canvas/constants/canvasLayout.js

Expected: 0 errors.


Task 15: 수동 검증 + 배포

Files:

  • (실행만, 수동 검증)

  • Step 1: backend push (Gitea 자격증명 필요 — 사용자 수동)

cd C:\Users\jaeoh\Desktop\workspace\web-backend
git push origin main

실패 시: 사용자에게 자격증명 입력 요청. push 성공 시 Gitea webhook → deployer rsync → docker compose build 자동.

  • Step 2: NAS docker 컨테이너 갱신 확인 + 환경변수 점검

NAS 콘솔에서:

docker logs stock-lab --tail 30
docker logs agent-office --tail 30
  • stock-lab 환경에 ANTHROPIC_API_KEY 가 비어있지 않은지 확인 (docker-compose.yml의 stock-lab 환경변수 line 40)

  • agent-office 의 scheduler 시작 시 stock_ai_news_sentiment job이 등록되었는지 로그 확인

  • Step 3: 수동 cron 트리거 (운영자 권한)

agent-office API를 통한 수동 실행 (테스트):

curl -X POST "https://gahusb.synology.me/api/agent-office/command" \
  -H "Content-Type: application/json" \
  -d '{"agent_id":"stock","command":"run_ai_news"}'

(참고: run_ai_news 명령이 on_command 에서 지원되지 않으면 직접 cron 호출이 필요. 본 plan은 on_command 분기 추가까지는 포함하지 않으므로, NAS에서 다음으로 검증:)

docker exec agent-office python -c "
import asyncio
from app.agents.stock import StockAgent
from app.scheduler import stock_agent
asyncio.run(stock_agent.on_ai_news_schedule())
"
  • Step 4: 데이터 검증
docker exec stock-lab sqlite3 /app/data/stock.db "
  SELECT ticker, score_raw, news_count, model FROM news_sentiment
  WHERE date = date('now') ORDER BY score_raw DESC LIMIT 10;
"

Expected: 약 100 종목 행 생성, score_raw 분포가 -10~+10 사이.

  • Step 5: 텔레그램 메시지 확인

08:00 KST 가 되거나 step 3 수동 트리거 후 텔레그램에서 메시지 수신 확인. 형식:

  • 🌅 AI 뉴스 분석 (YYYY-MM-DD 08:00)

  • 📈 호재 Top 5

  • 📉 악재 Top 5

  • 토큰/비용 라인

  • Step 6: 16:30 스크리너 잡 다음 실행 시 ai_news 가중합 반영 확인

스크리너 실행 후 텔레그램 결과 또는:

docker exec stock-lab sqlite3 /app/data/stock.db "
  SELECT rank, ticker, name, total_score, scores_json
  FROM screener_results sr JOIN screener_runs r ON sr.run_id=r.id
  WHERE r.asof=date('now') ORDER BY rank LIMIT 5;
"

scores_jsonai_news 키가 0~100 점수로 존재하는지 확인.

  • Step 7: 프론트 캔버스에 8번째 노드 표시 확인
cd C:\Users\jaeoh\Desktop\workspace\web-ui
npm run release:nas

NAS 배포 후 https://gahusb.synology.me/stock/screener 캔버스 모드 진입:

  • 🤖 AI 뉴스 노드가 8번째 score 노드로 표시

  • gate → AI 뉴스 → combine 엣지가 그려짐

  • weight=0 토글 시 흐릿 + 점선 엣지

  • Step 8: web-ui CLAUDE.md 갱신

Stock Screener 행 또는 spec 인덱스에 AI 뉴스 노드 추가 (선택):

git add CLAUDE.md
git commit -m "docs(screener): note ai_news 8th score node"
git push origin main
  • Step 9: 메모리 업데이트 (controller 작업)

project_stock_screener.md 갱신:

  • ai_news 노드 완료 commit SHA 추가
  • 후속 슬라이스 §14 에서 "AI 뉴스 호재/악재 노드" 제거 → 남은 8개로 갱신

완료 후 검증 체크리스트

본 plan 완료 시:

  • backend: stock-lab 신규 5개 모듈 + 5개 테스트 파일 모두 PASS
  • backend: agent-office on_ai_news_schedule + scheduler cron 등록
  • frontend: 캔버스 모드에 🤖 AI 뉴스 노드 표시, 가중치 슬라이더 동작
  • DB: news_sentiment 테이블 일별 100 종목 행 생성
  • 텔레그램: 평일 08:00 KST Top 5/5 메시지 자동 발송
  • 16:30 스크리너: ai_news 점수가 가중합에 반영 (scores_json 안)
  • LLM 비용: ~$0.075/일 (텔레그램 메시지 하단 ₩ 표시)
  • fail-soft: 한 종목 LLM 실패해도 전체 잡 성공
  • localStorage / cron / DB 마이그레이션 모두 idempotent (재실행 안전)

후속 슬라이스 (이번 plan 완료 후)

spec §18에 명시된 8개:

  1. URL 단위 캐싱 (비용 ~70% 절감)
  2. 16:00 추가 sentiment cron
  3. sentiment 트렌드 차트
  4. 시총 200~500 확장
  5. 백테스트
  6. 다국어/거시 뉴스
  7. 텔레그램 알림 토글
  8. 종목별 sentiment 페이지