Files
web-page/docs/superpowers/plans/2026-05-13-ai-news-sentiment-node.md
gahusb 7ebeba2f3d docs(screener): AI news sentiment node implementation plan (15 tasks)
15-task TDD plan for 8th score node ai_news. backend (scraper + analyzer +
pipeline + telegram + node + router) + agent-office (service_proxy + cron
handler + scheduler) + frontend (canvasLayout 1 file). 단위 테스트 22개
(scraper 4, analyzer 4, pipeline 3, telegram 4, node 5, router 2).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 23:18:00 +09:00

1796 lines
56 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# AI News Sentiment Node Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 시총 상위 100종목의 네이버 뉴스를 Claude Haiku로 호재/악재 분석하여 8번째 점수 노드 `ai_news` 를 스크리너 가중합에 통합한다. 평일 08:00 KST 자동 잡 + 호재/악재 Top 5 텔레그램 알림.
**Architecture:** stock-lab 내부에 `screener/ai_news/` 모듈 신설 (scraper + analyzer + pipeline + telegram). 새 `nodes/ai_news.py``news_sentiment` 테이블을 percentile_rank로 변환해 가중합 기여. agent-office 는 cron 트리거와 텔레그램 발송만 담당. 프론트 변경은 canvasLayout.js 한 곳.
**Tech Stack:** Python 3.11 / FastAPI / SQLite (WAL+busy_timeout) / `anthropic` SDK (async) / `httpx` (async) / `BeautifulSoup4` / APScheduler / React 18.
**선행 spec**: `web-ui/docs/superpowers/specs/2026-05-13-ai-news-sentiment-node-design.md`
---
## 파일 구조
신규 파일 (backend):
```
web-backend/stock-lab/app/screener/
ai_news/
__init__.py
scraper.py — 네이버 종목 뉴스 스크래핑
analyzer.py — Claude Haiku 호재/악재 분석
pipeline.py — refresh_daily() (스크래핑+병렬 LLM+DB)
telegram.py — Top 5/5 메시지 빌더
nodes/
ai_news.py — 8번째 ScoreNode
web-backend/stock-lab/tests/
test_ai_news_scraper.py
test_ai_news_analyzer.py
test_ai_news_pipeline.py
test_ai_news_telegram.py
test_ai_news_node.py
```
수정 파일 (backend):
```
web-backend/stock-lab/app/screener/
schema.py — news_sentiment DDL + DEFAULT_WEIGHTS/PARAMS 보강
registry.py — NODE_REGISTRY["ai_news"] 등록
engine.py — ScreenContext에 news_sentiment 필드 + load 갱신
router.py — POST /snapshot/refresh-news-sentiment 라우트
web-backend/stock-lab/requirements.txt — anthropic 추가
web-backend/agent-office/app/
service_proxy.py — refresh_ai_news_sentiment() helper
agents/stock.py — on_ai_news_schedule 메서드
scheduler.py — cron mon-fri 08:00 등록
```
수정 파일 (frontend):
```
web-ui/src/pages/stock/screener/components/canvas/constants/
canvasLayout.js — AI 노드 추가
canvasLayout.test.js — 카운트 갱신
```
---
### Task 1: 의존성 + DB 스키마 + 기본 설정 보강
**Files:**
- Modify: `web-backend/stock-lab/requirements.txt`
- Modify: `web-backend/stock-lab/app/screener/schema.py`
- [ ] **Step 1: `requirements.txt` 에 anthropic SDK 추가**
`anthropic==0.39.0` 한 줄을 적당한 위치(알파벳 순)에 추가.
- [ ] **Step 2: `schema.py` 의 `DEFAULT_WEIGHTS` / `DEFAULT_NODE_PARAMS` 에 ai_news 추가**
`DEFAULT_WEIGHTS` 딕셔너리 끝에:
```python
"ai_news": 0.8,
```
`DEFAULT_NODE_PARAMS` 딕셔너리 끝에:
```python
"ai_news": {"min_news_count": 1},
```
- [ ] **Step 3: `schema.py` 의 `DDL` 문자열에 `news_sentiment` 테이블 추가**
기존 `screener_results` 테이블 정의 뒤에 추가:
```python
CREATE TABLE IF NOT EXISTS news_sentiment (
ticker TEXT NOT NULL,
date TEXT NOT NULL,
score_raw REAL NOT NULL,
reason TEXT NOT NULL DEFAULT '',
news_count INTEGER NOT NULL DEFAULT 0,
tokens_input INTEGER NOT NULL DEFAULT 0,
tokens_output INTEGER NOT NULL DEFAULT 0,
model TEXT NOT NULL DEFAULT 'claude-haiku-4-5-20251001',
created_at TEXT NOT NULL DEFAULT (datetime('now','localtime')),
PRIMARY KEY (ticker, date)
);
CREATE INDEX IF NOT EXISTS idx_news_sentiment_date ON news_sentiment(date DESC);
```
- [ ] **Step 4: `ensure_screener_schema` 에 1회 마이그레이션 추가 — 기존 settings의 weights에 ai_news 누락 시 보충**
기존 함수 본문의 `conn.executescript(DDL)` 직후, `existing = ...` 이전에 다음 블록 추가:
```python
# ai_news 키 누락 시 1회 보충 (이미 운영 중인 환경에 대해)
row = conn.execute(
"SELECT weights_json, node_params_json FROM screener_settings WHERE id=1"
).fetchone()
if row is not None:
w = json.loads(row[0])
p = json.loads(row[1])
changed = False
if "ai_news" not in w:
w["ai_news"] = DEFAULT_WEIGHTS["ai_news"]
changed = True
if "ai_news" not in p:
p["ai_news"] = DEFAULT_NODE_PARAMS["ai_news"]
changed = True
if changed:
conn.execute(
"UPDATE screener_settings SET weights_json=?, node_params_json=? WHERE id=1",
(json.dumps(w), json.dumps(p)),
)
```
- [ ] **Step 5: 기존 테스트 실행 — 회귀 없음 확인**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest app/test_screener_schema.py -v
```
Expected: 모든 기존 테스트 PASS (DDL 추가는 idempotent 라 충돌 없음).
- [ ] **Step 6: Commit**
```bash
git add app/screener/schema.py requirements.txt
git commit -m "feat(screener): add news_sentiment table + ai_news defaults + migration"
```
---
### Task 2: `ai_news/scraper.py` — 네이버 종목 뉴스 스크래핑 + 테스트
**Files:**
- Create: `web-backend/stock-lab/app/screener/ai_news/__init__.py`
- Create: `web-backend/stock-lab/app/screener/ai_news/scraper.py`
- Test: `web-backend/stock-lab/tests/test_ai_news_scraper.py`
- [ ] **Step 1: `__init__.py` 빈 파일 생성**
내용 없음, 파일만 존재.
- [ ] **Step 2: 실패하는 테스트 작성**
`tests/test_ai_news_scraper.py`:
```python
import pytest
from unittest.mock import AsyncMock
from app.screener.ai_news import scraper
SAMPLE_HTML = """
<html><body>
<table class="type5"><tbody>
<tr><td class="title"><a href="/news1">삼성전자, HBM 양산 가시화</a></td><td class="date">2026.05.13 07:30</td></tr>
<tr><td class="title"><a href="/news2">삼성, 4분기 어닝 쇼크 우려</a></td><td class="date">2026.05.13 06:00</td></tr>
<tr><td class="title"><a href="/news3">메모리 시장 회복세</a></td><td class="date">2026.05.12 18:00</td></tr>
</tbody></table>
</body></html>
"""
EMPTY_HTML = "<html><body><table class='type5'><tbody></tbody></table></body></html>"
def _mk_client(status_code=200, text=SAMPLE_HTML):
client = AsyncMock()
resp = AsyncMock()
resp.status_code = status_code
resp.text = text
client.get = AsyncMock(return_value=resp)
return client
@pytest.mark.asyncio
async def test_fetch_news_success_returns_n_items():
client = _mk_client()
out = await scraper.fetch_news(client, "005930", n=2)
assert len(out) == 2
assert out[0]["title"] == "삼성전자, HBM 양산 가시화"
assert out[0]["date"] == "2026.05.13 07:30"
@pytest.mark.asyncio
async def test_fetch_news_404_returns_empty():
client = _mk_client(status_code=404, text="")
out = await scraper.fetch_news(client, "999999", n=5)
assert out == []
@pytest.mark.asyncio
async def test_fetch_news_empty_table_returns_empty():
client = _mk_client(text=EMPTY_HTML)
out = await scraper.fetch_news(client, "005930", n=5)
assert out == []
@pytest.mark.asyncio
async def test_fetch_news_n_caps_results():
client = _mk_client()
out = await scraper.fetch_news(client, "005930", n=2)
assert len(out) == 2 # 샘플에 3개 있지만 n=2로 잘림
```
- [ ] **Step 3: 테스트 실패 확인**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest tests/test_ai_news_scraper.py -v
```
Expected: FAIL — "No module named 'app.screener.ai_news.scraper'".
- [ ] **Step 4: `scraper.py` 구현**
```python
"""네이버 finance 종목 뉴스 스크래핑."""
from __future__ import annotations
import logging
from typing import Any, Dict, List
from bs4 import BeautifulSoup
log = logging.getLogger(__name__)
NAVER_NEWS_URL = "https://finance.naver.com/item/news_news.naver"
NAVER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": "https://finance.naver.com/",
}
async def fetch_news(client, ticker: str, n: int = 5) -> List[Dict[str, Any]]:
"""Scrape top N news headlines for a ticker. Returns [] on any failure."""
try:
r = await client.get(NAVER_NEWS_URL, params={"code": ticker, "page": 1})
except Exception as e:
log.warning("ai_news scrape http error for %s: %s", ticker, e)
return []
if r.status_code != 200:
return []
soup = BeautifulSoup(r.text, "lxml")
out: List[Dict[str, Any]] = []
for row in soup.select("table.type5 tbody tr")[:n]:
title_el = row.select_one("td.title a")
date_el = row.select_one("td.date")
if not title_el or not date_el:
continue
out.append({
"title": title_el.get_text(strip=True),
"date": date_el.get_text(strip=True),
})
return out
```
- [ ] **Step 5: 테스트 통과 확인**
```bash
python -m pytest tests/test_ai_news_scraper.py -v
```
Expected: PASS — 4 tests passed.
- [ ] **Step 6: Commit**
```bash
git add app/screener/ai_news/__init__.py app/screener/ai_news/scraper.py tests/test_ai_news_scraper.py
git commit -m "feat(screener): ai_news scraper (naver finance ticker news)"
```
---
### Task 3: `ai_news/analyzer.py` — Claude Haiku 분석 + 테스트
**Files:**
- Create: `web-backend/stock-lab/app/screener/ai_news/analyzer.py`
- Test: `web-backend/stock-lab/tests/test_ai_news_analyzer.py`
- [ ] **Step 1: 실패하는 테스트 작성**
`tests/test_ai_news_analyzer.py`:
```python
import json
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.screener.ai_news import analyzer
def _mk_llm(content_text: str, in_tokens: int = 100, out_tokens: int = 20):
llm = AsyncMock()
resp = MagicMock()
block = MagicMock()
block.text = content_text
resp.content = [block]
resp.usage = MagicMock(input_tokens=in_tokens, output_tokens=out_tokens)
llm.messages = MagicMock()
llm.messages.create = AsyncMock(return_value=resp)
return llm
NEWS = [{"title": "삼성전자, HBM 양산"}, {"title": "메모리 가격 반등"}]
@pytest.mark.asyncio
async def test_score_sentiment_success_parses_json():
llm = _mk_llm(json.dumps({"score": 7.5, "reason": "HBM 호재"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS, name="삼성전자")
assert out["ticker"] == "005930"
assert out["score_raw"] == 7.5
assert out["reason"] == "HBM 호재"
assert out["news_count"] == 2
assert out["tokens_input"] == 100
assert out["tokens_output"] == 20
@pytest.mark.asyncio
async def test_score_sentiment_json_parse_fail_returns_zero():
llm = _mk_llm("not valid json")
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == 0.0
assert "parse fail" in out["reason"]
assert out["tokens_input"] == 100 # 호출은 발생했음
@pytest.mark.asyncio
async def test_score_sentiment_clamps_out_of_range():
llm = _mk_llm(json.dumps({"score": 15.0, "reason": "초강세"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == 10.0 # +10 클램프
@pytest.mark.asyncio
async def test_score_sentiment_clamps_negative_out_of_range():
llm = _mk_llm(json.dumps({"score": -42.0, "reason": "초악재"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == -10.0
```
- [ ] **Step 2: 테스트 실패 확인**
```bash
python -m pytest tests/test_ai_news_analyzer.py -v
```
Expected: FAIL — "No module named 'app.screener.ai_news.analyzer'".
- [ ] **Step 3: `analyzer.py` 구현**
```python
"""Claude Haiku 기반 종목 뉴스 호재/악재 분석."""
from __future__ import annotations
import json
import logging
import os
from typing import Any, Dict, List
log = logging.getLogger(__name__)
DEFAULT_MODEL = os.getenv("AI_NEWS_MODEL", "claude-haiku-4-5-20251001")
PROMPT_TEMPLATE = """다음은 종목 {name}({ticker})에 대한 최근 뉴스 {n}개의 헤드라인입니다.
{news_block}
이 뉴스들이 종목에 호재인지 악재인지 평가하세요.
score: -10(매우 강한 악재) ~ +10(매우 강한 호재) 사이의 실수. 0은 중립.
reason: 30자 이내 한 줄 근거.
JSON으로만 응답하세요. 다른 텍스트 금지:
{{"score": <float>, "reason": "<string>"}}"""
def _clamp(x: float, lo: float = -10.0, hi: float = 10.0) -> float:
return max(lo, min(hi, x))
async def score_sentiment(
llm,
ticker: str,
news: List[Dict[str, Any]],
*,
name: str | None = None,
model: str = DEFAULT_MODEL,
) -> Dict[str, Any]:
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}."""
news_block = "\n".join(f"- {n['title']}" for n in news)
prompt = PROMPT_TEMPLATE.format(
name=name or ticker, ticker=ticker,
n=len(news), news_block=news_block,
)
resp = await llm.messages.create(
model=model,
max_tokens=200,
messages=[{"role": "user", "content": prompt}],
)
text = resp.content[0].text if resp.content else ""
in_tokens = int(getattr(resp.usage, "input_tokens", 0) or 0)
out_tokens = int(getattr(resp.usage, "output_tokens", 0) or 0)
try:
data = json.loads(text)
score = _clamp(float(data["score"]))
reason = str(data["reason"])[:200]
return {
"ticker": ticker,
"score_raw": score,
"reason": reason,
"news_count": len(news),
"tokens_input": in_tokens,
"tokens_output": out_tokens,
"model": model,
}
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
log.warning("ai_news parse fail for %s: %s (raw=%r)", ticker, e, text[:100])
return {
"ticker": ticker,
"score_raw": 0.0,
"reason": f"parse fail: {e!s}"[:200],
"news_count": len(news),
"tokens_input": in_tokens,
"tokens_output": out_tokens,
"model": model,
}
```
- [ ] **Step 4: 테스트 통과 확인**
```bash
python -m pytest tests/test_ai_news_analyzer.py -v
```
Expected: PASS — 4 tests passed.
- [ ] **Step 5: Commit**
```bash
git add app/screener/ai_news/analyzer.py tests/test_ai_news_analyzer.py
git commit -m "feat(screener): ai_news Claude Haiku analyzer (-10~+10 + clamp + JSON-fail soft)"
```
---
### Task 4: `ai_news/pipeline.py` — refresh_daily 통합 + 테스트
**Files:**
- Create: `web-backend/stock-lab/app/screener/ai_news/pipeline.py`
- Test: `web-backend/stock-lab/tests/test_ai_news_pipeline.py`
- [ ] **Step 1: 실패하는 테스트 작성**
`tests/test_ai_news_pipeline.py`:
```python
import datetime as dt
import sqlite3
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from app.screener.ai_news import pipeline
from app.screener.schema import ensure_screener_schema
@pytest.fixture
def conn():
c = sqlite3.connect(":memory:")
c.row_factory = sqlite3.Row
ensure_screener_schema(c)
# 시총 상위 3종목 시드
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("005930", "삼성전자", 9_000_000))
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("000660", "SK하이닉스", 8_000_000))
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("373220", "LG에너지솔루션", 7_000_000))
c.commit()
yield c
c.close()
@pytest.mark.asyncio
async def test_refresh_daily_happy_path(conn):
"""3종목 mini integration — 각 종목별로 scraper/analyzer mock."""
asof = dt.date(2026, 5, 13)
fake_news = [{"title": "헤드라인"}]
async def fake_fetch(client, ticker, n):
return fake_news
scores_by_ticker = {
"005930": 7.5, "000660": 4.0, "373220": -6.0,
}
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": scores_by_ticker[ticker],
"reason": f"r{ticker}", "news_count": 1,
"tokens_input": 100, "tokens_output": 20, "model": model,
}
with patch.object(pipeline, "_scraper") as ms, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \
patch.object(pipeline, "_make_http") as mh:
ms.fetch_news = fake_fetch
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock()
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
assert result["asof"] == "2026-05-13"
assert result["updated"] == 3
assert result["failures"] == []
assert len(result["top_pos"]) == 3
assert result["top_pos"][0]["ticker"] == "005930" # 가장 큰 점수
assert result["top_neg"][0]["ticker"] == "373220" # 가장 작은 점수
assert result["tokens_input"] == 300
assert result["tokens_output"] == 60
# DB upsert 확인
rows = conn.execute("SELECT ticker, score_raw FROM news_sentiment WHERE date=?",
("2026-05-13",)).fetchall()
assert len(rows) == 3
by_ticker = {r["ticker"]: r["score_raw"] for r in rows}
assert by_ticker["005930"] == 7.5
assert by_ticker["373220"] == -6.0
@pytest.mark.asyncio
async def test_refresh_daily_failures_isolated(conn):
"""한 종목이 예외 던져도 나머지 종목은 정상 처리."""
asof = dt.date(2026, 5, 13)
async def fake_fetch(client, ticker, n):
return [{"title": "h"}]
async def fake_score(llm, ticker, news, *, name=None, model="m"):
if ticker == "000660":
raise RuntimeError("llm exploded")
return {
"ticker": ticker, "score_raw": 5.0, "reason": "r", "news_count": 1,
"tokens_input": 100, "tokens_output": 20, "model": model,
}
with patch.object(pipeline, "_scraper") as ms, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \
patch.object(pipeline, "_make_http") as mh:
ms.fetch_news = fake_fetch
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock()
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
assert result["updated"] == 2
assert len(result["failures"]) == 1
def test_top_market_cap_tickers(conn):
out = pipeline._top_market_cap_tickers(conn, n=2)
assert out == ["005930", "000660"]
```
- [ ] **Step 2: 테스트 실패 확인**
```bash
python -m pytest tests/test_ai_news_pipeline.py -v
```
Expected: FAIL — "No module named 'app.screener.ai_news.pipeline'".
- [ ] **Step 3: `pipeline.py` 구현**
```python
"""ai_news refresh pipeline — 시총 상위 N종목 병렬 처리."""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
import os
import sqlite3
import time
from typing import Any, Dict, List, Optional
import httpx
from . import scraper as _scraper
from . import analyzer as _analyzer
log = logging.getLogger(__name__)
DEFAULT_TOP_N = 100
DEFAULT_CONCURRENCY = 10
DEFAULT_NEWS_PER_TICKER = 5
DEFAULT_RATE_LIMIT_SEC = 0.2
def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
rows = conn.execute(
"SELECT ticker FROM krx_master "
"WHERE market_cap IS NOT NULL AND is_preferred=0 AND is_spac=0 "
"ORDER BY market_cap DESC LIMIT ?",
(n,),
).fetchall()
return [r[0] for r in rows]
def _make_http():
return httpx.AsyncClient(timeout=10.0, headers=_scraper.NAVER_HEADERS)
def _make_llm():
"""Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수."""
from anthropic import AsyncAnthropic
return AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
async def _process_one(
ticker: str, name: str, sem: asyncio.Semaphore,
http_client, llm, news_per_ticker: int, rate_limit_sec: float, model: str,
) -> Dict[str, Any]:
async with sem:
if rate_limit_sec > 0:
await asyncio.sleep(rate_limit_sec)
news = await _scraper.fetch_news(http_client, ticker, n=news_per_ticker)
if not news:
return {
"ticker": ticker, "score_raw": 0.0, "reason": "no news",
"news_count": 0, "tokens_input": 0, "tokens_output": 0,
"model": model,
}
return await _analyzer.score_sentiment(
llm, ticker, news, name=name, model=model,
)
def _upsert_news_sentiment(
conn: sqlite3.Connection, asof: dt.date, rows: List[Dict[str, Any]]
) -> None:
iso = asof.isoformat()
data = [
(
r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
r["tokens_input"], r["tokens_output"], r["model"],
)
for r in rows
]
conn.executemany(
"""INSERT INTO news_sentiment
(ticker, date, score_raw, reason, news_count,
tokens_input, tokens_output, model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, date) DO UPDATE SET
score_raw=excluded.score_raw,
reason=excluded.reason,
news_count=excluded.news_count,
tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output,
model=excluded.model
""",
data,
)
conn.commit()
async def refresh_daily(
conn: sqlite3.Connection,
asof: dt.date,
*,
top_n: int = DEFAULT_TOP_N,
concurrency: int = DEFAULT_CONCURRENCY,
news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC,
model: str = _analyzer.DEFAULT_MODEL,
) -> Dict[str, Any]:
"""Returns summary dict with top_pos/top_neg/token totals/failures."""
started = time.time()
tickers = _top_market_cap_tickers(conn, n=top_n)
name_map = {
r[0]: r[1] for r in conn.execute(
f"SELECT ticker, name FROM krx_master WHERE ticker IN "
f"({','.join('?' * len(tickers))})", tickers,
).fetchall()
} if tickers else {}
sem = asyncio.Semaphore(concurrency)
async with _make_http() as http_client, _make_llm() as llm:
tasks = [
_process_one(
t, name_map.get(t, t), sem, http_client, llm,
news_per_ticker, rate_limit_sec, model,
)
for t in tickers
]
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = []
failures: List[str] = []
for r in raw_results:
if isinstance(r, BaseException):
failures.append(repr(r))
elif isinstance(r, dict):
successes.append(r)
if successes:
_upsert_news_sentiment(conn, asof, successes)
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
return {
"asof": asof.isoformat(),
"updated": len(successes),
"failures": failures,
"duration_sec": round(time.time() - started, 2),
"tokens_input": sum(r["tokens_input"] for r in successes),
"tokens_output": sum(r["tokens_output"] for r in successes),
"top_pos": top_pos,
"top_neg": top_neg,
"model": model,
}
```
- [ ] **Step 4: 테스트 통과 확인**
```bash
python -m pytest tests/test_ai_news_pipeline.py -v
```
Expected: PASS — 3 tests passed.
- [ ] **Step 5: Commit**
```bash
git add app/screener/ai_news/pipeline.py tests/test_ai_news_pipeline.py
git commit -m "feat(screener): ai_news pipeline (top-100 parallel, fail-soft, upsert)"
```
---
### Task 5: `ai_news/telegram.py` — Top 5/5 메시지 빌더 + 테스트
**Files:**
- Create: `web-backend/stock-lab/app/screener/ai_news/telegram.py`
- Test: `web-backend/stock-lab/tests/test_ai_news_telegram.py`
- [ ] **Step 1: 실패하는 테스트 작성**
```python
from app.screener.ai_news import telegram as tg
def _row(ticker, score, reason="r"):
return {"ticker": ticker, "score_raw": score, "reason": reason,
"news_count": 5, "tokens_input": 100, "tokens_output": 20,
"model": "m"}
def test_build_message_includes_top_sections():
msg = tg.build_message(
asof="2026-05-13",
top_pos=[_row("005930", 8.5, "HBM 호재")],
top_neg=[_row("373220", -6.3, "수주 지연")],
tokens_input=10000, tokens_output=2000,
)
assert "AI 뉴스 분석" in msg
assert "호재 Top" in msg
assert "악재 Top" in msg
assert "005930" in msg
assert "8.5" in msg
assert "HBM" in msg
assert "373220" in msg
def test_build_message_escapes_markdownv2_specials():
msg = tg.build_message(
asof="2026-05-13",
top_pos=[_row("005930", 3.0, "테스트(괄호) [대괄호]")],
top_neg=[],
tokens_input=100, tokens_output=20,
)
# MarkdownV2 특수문자 ( ) [ ] 이 escape 되어야 함
assert r"\(" in msg or r"\)" in msg
assert r"\[" in msg or r"\]" in msg
def test_build_message_cost_won_line():
msg = tg.build_message(
asof="2026-05-13", top_pos=[], top_neg=[],
tokens_input=10000, tokens_output=2000,
)
# tokens_input × 0.0013 + tokens_output × 0.0065 = 13 + 13 = ₩26
assert "₩26" in msg or "₩ 26" in msg or "" in msg
def test_build_message_empty_lists():
msg = tg.build_message(
asof="2026-05-13", top_pos=[], top_neg=[],
tokens_input=0, tokens_output=0,
)
# 빈 리스트라도 헤더는 있어야 함
assert "호재 Top" in msg
assert "악재 Top" in msg
```
- [ ] **Step 2: 테스트 실패 확인**
```bash
python -m pytest tests/test_ai_news_telegram.py -v
```
Expected: FAIL — "No module named 'app.screener.ai_news.telegram'".
- [ ] **Step 3: `telegram.py` 구현**
```python
"""ai_news Top 5/5 텔레그램 메시지 빌더 (MarkdownV2)."""
from __future__ import annotations
from typing import Any, Dict, List
_MD_SPECIAL = r"_*[]()~`>#+-=|{}.!\\"
def _escape(text: str) -> str:
return "".join("\\" + c if c in _MD_SPECIAL else c for c in str(text))
def _cost_won(tokens_input: int, tokens_output: int) -> int:
"""Claude Haiku 가격 환산 (대략): in $1/M × ₩1300, out $5/M × ₩1300."""
return int(tokens_input * 0.0013 + tokens_output * 0.0065)
def _row_line(idx: int, r: Dict[str, Any]) -> str:
score = r["score_raw"]
sign = "+" if score >= 0 else ""
return (
f"{idx}\\. {_escape(r['ticker'])} \\({sign}{score:.1f}\\) — "
f"{_escape(r['reason'])}"
)
def build_message(
*,
asof: str,
top_pos: List[Dict[str, Any]],
top_neg: List[Dict[str, Any]],
tokens_input: int,
tokens_output: int,
) -> str:
lines: List[str] = [
f"🌅 *AI 뉴스 분석* \\({_escape(asof)} 08:00\\)",
"",
"📈 *호재 Top 5*",
]
if top_pos:
for i, r in enumerate(top_pos, 1):
lines.append(_row_line(i, r))
else:
lines.append(_escape("- (없음)"))
lines += ["", "📉 *악재 Top 5*"]
if top_neg:
for i, r in enumerate(top_neg, 1):
lines.append(_row_line(i, r))
else:
lines.append(_escape("- (없음)"))
cost = _cost_won(tokens_input, tokens_output)
lines += [
"",
f"_분석: 시총 상위 100종목 · 토큰 {tokens_input:,} in / {tokens_output:,} out · "
f"약 ₩{cost:,}_",
]
return "\n".join(lines)
```
- [ ] **Step 4: 테스트 통과 확인**
```bash
python -m pytest tests/test_ai_news_telegram.py -v
```
Expected: PASS — 4 tests passed.
- [ ] **Step 5: Commit**
```bash
git add app/screener/ai_news/telegram.py tests/test_ai_news_telegram.py
git commit -m "feat(screener): ai_news telegram message builder (MarkdownV2 + cost line)"
```
---
### Task 6: `nodes/ai_news.py` — ScoreNode 통합 + 테스트
**Files:**
- Create: `web-backend/stock-lab/app/screener/nodes/ai_news.py`
- Test: `web-backend/stock-lab/tests/test_ai_news_node.py`
- [ ] **Step 1: 실패하는 테스트 작성**
```python
import datetime as dt
import pandas as pd
import pytest
from app.screener.nodes.ai_news import AiNewsSentiment
class FakeCtx:
def __init__(self, df=None):
self.news_sentiment = df
self.asof = dt.date(2026, 5, 13)
def test_compute_empty_context():
out = AiNewsSentiment().compute(FakeCtx(None), {"min_news_count": 1})
assert out.empty
def test_compute_with_data_percentile_ranks():
df = pd.DataFrame([
{"ticker": "A", "score_raw": -5.0, "news_count": 3},
{"ticker": "B", "score_raw": 0.0, "news_count": 3},
{"ticker": "C", "score_raw": 8.0, "news_count": 3},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert len(out) == 3
# percentile rank: A (lowest) < B < C (highest)
assert out.loc["A"] < out.loc["B"] < out.loc["C"]
# all within [0, 100]
assert (out >= 0).all() and (out <= 100).all()
def test_compute_filters_by_min_news_count():
df = pd.DataFrame([
{"ticker": "A", "score_raw": -5.0, "news_count": 0}, # 필터됨
{"ticker": "B", "score_raw": 0.0, "news_count": 2},
{"ticker": "C", "score_raw": 8.0, "news_count": 5},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert "A" not in out.index
assert "B" in out.index
assert "C" in out.index
def test_compute_all_filtered_returns_empty():
df = pd.DataFrame([
{"ticker": "A", "score_raw": 5.0, "news_count": 0},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert out.empty
def test_metadata():
n = AiNewsSentiment()
assert n.name == "ai_news"
assert "AI" in n.label or "뉴스" in n.label
assert n.default_params == {"min_news_count": 1}
assert "min_news_count" in n.param_schema["properties"]
```
- [ ] **Step 2: 테스트 실패 확인**
```bash
python -m pytest tests/test_ai_news_node.py -v
```
Expected: FAIL — "No module named 'app.screener.nodes.ai_news'".
- [ ] **Step 3: `nodes/ai_news.py` 구현**
```python
"""AI 뉴스 호재/악재 점수 노드.
ScreenContext.news_sentiment (DataFrame: ticker, score_raw, news_count) 를
min_news_count 로 필터한 뒤 percentile_rank 로 0~100 변환.
"""
from __future__ import annotations
import pandas as pd
from .base import ScoreNode, percentile_rank
class AiNewsSentiment(ScoreNode):
name = "ai_news"
label = "AI 뉴스 호재/악재"
default_params = {"min_news_count": 1}
param_schema = {
"type": "object",
"properties": {
"min_news_count": {
"type": "integer", "minimum": 0, "default": 1,
"description": "최소 분석 뉴스 수. 미만이면 점수 미산출.",
},
},
}
def compute(self, ctx, params: dict) -> pd.Series:
df = getattr(ctx, "news_sentiment", None)
if df is None or df.empty:
return pd.Series(dtype=float)
min_news = int(params.get("min_news_count", 1))
df = df[df["news_count"] >= min_news]
if df.empty:
return pd.Series(dtype=float)
return percentile_rank(df.set_index("ticker")["score_raw"])
```
- [ ] **Step 4: 테스트 통과 확인**
```bash
python -m pytest tests/test_ai_news_node.py -v
```
Expected: PASS — 5 tests passed.
- [ ] **Step 5: Commit**
```bash
git add app/screener/nodes/ai_news.py tests/test_ai_news_node.py
git commit -m "feat(screener): AiNewsSentiment ScoreNode (percentile_rank + min_news_count)"
```
---
### Task 7: `engine.py` ScreenContext + load 갱신
**Files:**
- Modify: `web-backend/stock-lab/app/screener/engine.py`
- [ ] **Step 1: `ScreenContext` dataclass에 `news_sentiment` 필드 추가**
`@dataclass(frozen=True) class ScreenContext:` 본문 마지막 (`asof: dt.date` 다음) 줄에 추가:
```python
news_sentiment: "pd.DataFrame | None" = None
```
기존 frozen 제약 때문에 default 값을 갖는 필드는 마지막에 와야 함. asof 다음으로 위치.
- [ ] **Step 2: `load` 메서드 안에 news_sentiment 로딩 추가**
`load` 메서드의 `flow = pd.read_sql_query(...)` 다음, KOSPI 처리 이전에 추가:
```python
news_sentiment = pd.read_sql_query(
"SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date = ?",
conn, params=(asof_iso,),
)
```
`return cls(...)` 호출에 `news_sentiment=news_sentiment` 인자 추가:
```python
return cls(
master=master, prices=prices, flow=flow,
kospi=kospi, asof=asof, news_sentiment=news_sentiment,
)
```
- [ ] **Step 3: 기존 통합 테스트 회귀 검증**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest app/test_screener_context.py -v
```
Expected: 기존 테스트 모두 PASS (default=None 으로 호환).
- [ ] **Step 4: Commit**
```bash
git add app/screener/engine.py
git commit -m "feat(screener): ScreenContext.news_sentiment field + load query"
```
---
### Task 8: `registry.py` 에 ai_news 등록
**Files:**
- Modify: `web-backend/stock-lab/app/screener/registry.py`
- [ ] **Step 1: import + NODE_REGISTRY 등록**
상단 import에 추가:
```python
from .nodes.ai_news import AiNewsSentiment
```
`NODE_REGISTRY` 딕셔너리 마지막 항목 다음에 추가:
```python
"ai_news": AiNewsSentiment,
```
전체 결과:
```python
"""Registry of node classes (single source of truth for /nodes endpoint)."""
from .nodes.hygiene import HygieneGate
from .nodes.foreign_buy import ForeignBuy
from .nodes.volume_surge import VolumeSurge
from .nodes.momentum import Momentum20
from .nodes.high52w import High52WProximity
from .nodes.rs_rating import RsRating
from .nodes.ma_alignment import MaAlignment
from .nodes.vcp_lite import VcpLite
from .nodes.ai_news import AiNewsSentiment
NODE_REGISTRY: dict = {
"foreign_buy": ForeignBuy,
"volume_surge": VolumeSurge,
"momentum": Momentum20,
"high52w": High52WProximity,
"rs_rating": RsRating,
"ma_alignment": MaAlignment,
"vcp_lite": VcpLite,
"ai_news": AiNewsSentiment,
}
GATE_REGISTRY: dict = {
"hygiene": HygieneGate,
}
```
- [ ] **Step 2: sanity import 검증**
```bash
python -c "from app.screener.registry import NODE_REGISTRY; print(list(NODE_REGISTRY))"
```
Expected: `['foreign_buy', 'volume_surge', 'momentum', 'high52w', 'rs_rating', 'ma_alignment', 'vcp_lite', 'ai_news']`
- [ ] **Step 3: Commit**
```bash
git add app/screener/registry.py
git commit -m "feat(screener): register ai_news in NODE_REGISTRY"
```
---
### Task 9: `router.py` 에 `/snapshot/refresh-news-sentiment` 추가
**Files:**
- Modify: `web-backend/stock-lab/app/screener/router.py`
- [ ] **Step 1: 라우트 추가**
`router.py` 파일 마지막에 추가 (다른 라우트들 아래):
```python
# ---------- /snapshot/refresh-news-sentiment ----------
from . import ai_news as _ai_news_pkg
@router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
if _is_holiday(asof_date):
return {"asof": asof_date.isoformat(), "status": "skipped_holiday"}
with _conn() as c:
summary = await _ai_news_pkg.pipeline.refresh_daily(c, asof_date)
return summary
```
- [ ] **Step 2: ai_news 패키지가 라우터에서 import 가능한지 검증**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -c "from app.screener.router import router; print([r.path for r in router.routes])"
```
Expected: 출력에 `/api/stock/screener/snapshot/refresh-news-sentiment` 포함.
- [ ] **Step 3: 라우터 테스트 (mock pipeline)**
`tests/test_ai_news_router.py` 생성:
```python
import datetime as dt
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from app.main import app
def test_refresh_news_sentiment_weekend_skip():
# 2026-05-16 = Saturday
client = TestClient(app)
resp = client.post(
"/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-16"
)
assert resp.status_code == 200
assert resp.json()["status"] == "skipped_weekend"
def test_refresh_news_sentiment_weekday_invokes_pipeline():
fake_summary = {
"asof": "2026-05-13", "updated": 3, "failures": [],
"duration_sec": 1.0, "tokens_input": 100, "tokens_output": 20,
"top_pos": [], "top_neg": [], "model": "m",
}
with patch("app.screener.router._ai_news_pkg") as m:
m.pipeline.refresh_daily = AsyncMock(return_value=fake_summary)
client = TestClient(app)
resp = client.post(
"/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-13"
)
assert resp.status_code == 200
body = resp.json()
assert body["asof"] == "2026-05-13"
assert body["updated"] == 3
```
```bash
python -m pytest tests/test_ai_news_router.py -v
```
Expected: PASS — 2 tests passed.
- [ ] **Step 4: Commit**
```bash
git add app/screener/router.py tests/test_ai_news_router.py
git commit -m "feat(screener): POST /snapshot/refresh-news-sentiment endpoint"
```
---
### Task 10: agent-office `service_proxy.refresh_ai_news_sentiment()`
**Files:**
- Modify: `web-backend/agent-office/app/service_proxy.py`
- [ ] **Step 1: helper 함수 추가**
`refresh_screener_snapshot` 함수 정의 직후 (line 44 근처) 추가:
```python
async def refresh_ai_news_sentiment() -> Dict[str, Any]:
"""stock-lab의 AI 뉴스 sentiment 분석 트리거 (08:00 cron).
네이버 100종목 스크래핑 + Claude Haiku 100콜 병렬 = 약 30-60초.
여유있게 240s timeout.
"""
async with httpx.AsyncClient(timeout=240.0) as client:
resp = await client.post(
f"{STOCK_LAB_URL}/api/stock/screener/snapshot/refresh-news-sentiment"
)
resp.raise_for_status()
return resp.json()
```
- [ ] **Step 2: import 검증**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
python -c "from app.service_proxy import refresh_ai_news_sentiment; print('ok')"
```
Expected: `ok`.
- [ ] **Step 3: Commit**
```bash
git add app/service_proxy.py
git commit -m "feat(agent-office): refresh_ai_news_sentiment service helper"
```
---
### Task 11: agent-office `StockAgent.on_ai_news_schedule`
**Files:**
- Modify: `web-backend/agent-office/app/agents/stock.py`
- [ ] **Step 1: 메서드 추가**
`on_command` 메서드 정의 직전에 다음 메서드 추가:
```python
async def on_ai_news_schedule(self) -> None:
"""AI 뉴스 sentiment 분석 자동 잡 (평일 08:00 KST).
흐름:
1) stock-lab /snapshot/refresh-news-sentiment 호출
2) status='skipped_weekend'/'skipped_holiday' → 종료 (텔레그램 미발신)
3) updated=0 → 운영자 알림 (HTML)
4) failures > 30% → 경고 알림 후 메인 메시지 발송
5) 정상 → Top 5 호재/악재 메시지 발송 (MarkdownV2)
"""
if self.state not in ("idle", "break"):
return
task_id = create_task(self.agent_id, "ai_news_sentiment", {})
await self.transition("working", "AI 뉴스 분석 중...", task_id)
try:
result = await service_proxy.refresh_ai_news_sentiment()
except Exception as e:
err_msg = str(e)
add_log(self.agent_id, f"AI 뉴스 분석 실패: {err_msg}", "error", task_id)
update_task_status(task_id, "failed", {"error": err_msg})
try:
from ..telegram.messaging import send_raw
await send_raw(
f"⚠️ <b>AI 뉴스 분석 실패</b>\n"
f"<code>{html.escape(err_msg)[:500]}</code>"
)
except Exception as notify_err:
add_log(
self.agent_id,
f"operator notify failed: {notify_err}",
"warning", task_id,
)
await self.transition("idle", f"AI 뉴스 오류: {err_msg[:80]}")
return
status = result.get("status")
if status in ("skipped_weekend", "skipped_holiday"):
update_task_status(task_id, "succeeded", {"status": status})
add_log(self.agent_id, f"AI 뉴스 건너뜀: {status}", "info", task_id)
await self.transition("idle", f"휴일/주말 — 건너뜀")
return
updated = int(result.get("updated", 0))
failures = result.get("failures", []) or []
if updated == 0:
update_task_status(task_id, "failed", {"reason": "0 tickers updated"})
try:
from ..telegram.messaging import send_raw
await send_raw(
"⚠️ <b>AI 뉴스 분석 0종목</b>\n"
"스크래핑/LLM 전체 실패 — 어제 데이터 사용"
)
except Exception:
pass
await self.transition("idle", "AI 뉴스 0건")
return
# 실패율 경고 (별도 알림, 본 메시지는 계속 발송)
failure_rate = len(failures) / max(1, updated + len(failures))
if failure_rate > 0.3:
try:
from ..telegram.messaging import send_raw
await send_raw(
f"⚠️ <b>AI 뉴스 실패율 {failure_rate:.0%}</b>\n"
f"updated={updated}, failures={len(failures)}"
)
except Exception:
pass
# 정상 — Top 5 메시지 (stock-lab이 빌드해서 응답에 telegram_text 동봉)
text = result.get("telegram_text") or ""
if not text:
raise RuntimeError("telegram_text 누락")
await self.transition("reporting", "AI 뉴스 알림 전송 중...")
from ..telegram.messaging import send_raw
tg = await send_raw(text, parse_mode="MarkdownV2")
update_task_status(task_id, "succeeded", {
"asof": result["asof"],
"updated": updated,
"failures": len(failures),
"tokens_input": int(result.get("tokens_input", 0)),
"tokens_output": int(result.get("tokens_output", 0)),
"telegram_sent": tg.get("ok", False),
})
if not tg.get("ok"):
desc = tg.get("description") or "unknown"
code = tg.get("error_code")
add_log(
self.agent_id,
f"AI news telegram send failed: [{code}] {desc}",
"warning", task_id,
)
await self.transition("idle", "AI 뉴스 완료")
```
**설계 결정**: agent-office는 stock-lab의 모듈을 직접 import 불가 (별도 컨테이너). stock-lab API 응답에 `telegram_text` 필드를 동봉하여 agent-office는 그대로 전달만 한다.
- [ ] **Step 2: stock-lab 라우터가 `telegram_text` 를 응답에 동봉하도록 수정**
`web-backend/stock-lab/app/screener/router.py``post_refresh_news_sentiment` 수정:
```python
@router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
if _is_holiday(asof_date):
return {"asof": asof_date.isoformat(), "status": "skipped_holiday"}
with _conn() as c:
summary = await _ai_news_pkg.pipeline.refresh_daily(c, asof_date)
# 텔레그램 텍스트 동봉
summary["telegram_text"] = _ai_news_pkg.telegram.build_message(
asof=summary["asof"],
top_pos=summary["top_pos"], top_neg=summary["top_neg"],
tokens_input=summary["tokens_input"],
tokens_output=summary["tokens_output"],
)
return summary
```
agent-office `on_ai_news_schedule` 의 step 1 코드에서 `text = build_message(...)` 부분을 `text = result["telegram_text"]` 로 변경:
```python
# 정상 — Top 5 메시지 (stock-lab이 빌드해서 응답에 동봉)
text = result.get("telegram_text") or ""
if not text:
raise RuntimeError("telegram_text 누락")
```
- [ ] **Step 3: import 검증**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
python -c "from app.agents.stock import StockAgent; print(hasattr(StockAgent, 'on_ai_news_schedule'))"
```
Expected: `True`.
- [ ] **Step 4: Task 9 라우터 테스트 회귀 (telegram_text 응답 검증)**
`tests/test_ai_news_router.py``test_refresh_news_sentiment_weekday_invokes_pipeline` 보강:
```python
def test_refresh_news_sentiment_weekday_invokes_pipeline():
fake_summary = {
"asof": "2026-05-13", "updated": 3, "failures": [],
"duration_sec": 1.0, "tokens_input": 100, "tokens_output": 20,
"top_pos": [], "top_neg": [], "model": "m",
}
with patch("app.screener.router._ai_news_pkg") as m:
m.pipeline.refresh_daily = AsyncMock(return_value=fake_summary)
m.telegram.build_message = lambda **kw: "BUILT_TEXT"
client = TestClient(app)
resp = client.post(
"/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-13"
)
assert resp.status_code == 200
body = resp.json()
assert body["telegram_text"] == "BUILT_TEXT"
```
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest tests/test_ai_news_router.py -v
```
Expected: PASS.
- [ ] **Step 5: Commit (양쪽 repo)**
```bash
# stock-lab
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
git add app/screener/router.py tests/test_ai_news_router.py
git commit -m "feat(screener): include telegram_text in refresh-news-sentiment response"
# agent-office
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
git add app/agents/stock.py
git commit -m "feat(agent-office): on_ai_news_schedule (cron handler + telegram dispatch)"
```
**주의**: 본 task부터 양쪽 디렉토리 모두 변경되므로 web-backend monorepo 안에서 작업한다 (`cd` 명령으로 sub-package 사이 이동).
---
### Task 12: scheduler.py cron 등록
**Files:**
- Modify: `web-backend/agent-office/app/scheduler.py`
- [ ] **Step 1: 기존 scheduler.py 의 on_screener_schedule 등록 패턴 확인**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
grep -nE "on_screener_schedule|add_job.*stock" app/scheduler.py
```
기존 등록 라인의 변수 이름 (stock_agent / agents["stock"] 등), 들여쓰기, id 명명 컨벤션을 그대로 모방. 예상 패턴:
```python
scheduler.add_job(
stock_agent.on_screener_schedule,
"cron", day_of_week="mon-fri", hour=16, minute=30,
id="stock_screener", timezone="Asia/Seoul",
replace_existing=True,
)
```
- [ ] **Step 2: 동일 패턴으로 ai_news cron 등록 추가**
기존 `on_screener_schedule` 등록 라인 바로 다음에 추가:
```python
scheduler.add_job(
stock_agent.on_ai_news_schedule,
"cron", day_of_week="mon-fri", hour=8, minute=0,
id="stock_ai_news_sentiment",
timezone="Asia/Seoul",
replace_existing=True,
)
```
변수 참조는 기존 라인과 정확히 동일하게 (`stock_agent` 또는 `agents["stock"]`).
- [ ] **Step 3: scheduler import 검증**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
python -c "from app.scheduler import scheduler; print([j.id for j in scheduler.get_jobs()])"
```
Expected: 출력에 `stock_ai_news_sentiment` 포함.
(주의: scheduler를 실제 start하지 않고 단순 import 시 빈 리스트일 수 있음. 그 경우 코드 시각 검증으로 대체.)
- [ ] **Step 4: Commit**
```bash
git add app/scheduler.py
git commit -m "feat(agent-office): cron mon-fri 08:00 ai_news sentiment job"
```
---
### Task 13: frontend canvasLayout.js — AI 노드 추가
**Files:**
- Modify: `web-ui/src/pages/stock/screener/components/canvas/constants/canvasLayout.js`
- Modify: `web-ui/src/pages/stock/screener/components/canvas/constants/canvasLayout.test.js`
- [ ] **Step 1: 테스트 먼저 갱신 (실패 유도)**
`canvasLayout.test.js` 의 다음 항목들 갱신:
```javascript
it('NODE_IDS — 12개 키, 모두 unique', () => {
const ids = Object.values(NODE_IDS);
expect(ids).toHaveLength(12); // 11 → 12
expect(new Set(ids).size).toBe(12);
});
it('EDGES — 18개, source/target이 모두 NODE_IDS 안에 존재', () => {
expect(EDGES).toHaveLength(18); // 16 → 18 (1+8+8+1)
...
});
it('EDGES — 8개 점수 노드는 모두 gate 입력 + combine 출력을 가짐', () => {
const SCORE_IDS = [
NODE_IDS.FOREIGN, NODE_IDS.VOLUME, NODE_IDS.MOMENTUM,
NODE_IDS.HIGH52W, NODE_IDS.RS, NODE_IDS.MA, NODE_IDS.VCP,
NODE_IDS.AI_NEWS, // 추가
];
...
});
it('SCORE_NODE_NAME_MAP — 8개 점수 노드 ID → backend node name', () => {
expect(Object.keys(SCORE_NODE_NAME_MAP)).toHaveLength(8); // 7 → 8
expect(SCORE_NODE_NAME_MAP[NODE_IDS.FOREIGN]).toBe('foreign_buy');
expect(SCORE_NODE_NAME_MAP[NODE_IDS.VOLUME]).toBe('volume_surge');
expect(SCORE_NODE_NAME_MAP[NODE_IDS.AI_NEWS]).toBe('ai_news'); // 추가
});
```
- [ ] **Step 2: 테스트 실패 확인**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-ui
npx vitest run src/pages/stock/screener/components/canvas/constants/canvasLayout.test.js
```
Expected: FAIL — NODE_IDS.AI_NEWS undefined 등.
- [ ] **Step 3: `canvasLayout.js` 갱신**
`NODE_IDS` 객체에 추가 (COMBINE 직전):
```javascript
AI_NEWS: 'score-ai-news',
```
`NODE_KIND_MAP` 에 추가:
```javascript
[NODE_IDS.AI_NEWS]: 'score',
```
`SCORE_NODE_NAME_MAP` 에 추가:
```javascript
[NODE_IDS.AI_NEWS]: 'ai_news',
```
`INITIAL_NODE_POSITIONS` 에 추가 (VCP 다음 y=630):
```javascript
[NODE_IDS.AI_NEWS]: { x: 480, y: 630 },
```
`SCORE_KEYS` 배열 끝에 추가:
```javascript
const SCORE_KEYS = ['FOREIGN','VOLUME','MOMENTUM','HIGH52W','RS','MA','VCP','AI_NEWS'];
```
`SCORE_NODE_LABEL` 에 추가:
```javascript
[NODE_IDS.AI_NEWS]: { icon: '🤖', title: 'AI 뉴스' },
```
- [ ] **Step 4: 테스트 통과 확인**
```bash
npx vitest run src/pages/stock/screener/components/canvas/constants/canvasLayout.test.js
```
Expected: PASS — 6 tests passed (모두 8 score / 12 nodes / 18 edges 기준).
- [ ] **Step 5: 전체 frontend 테스트 회귀**
```bash
npx vitest run
```
Expected: 4 files, 20 tests pass (canvasLayout 외 변경 없음).
- [ ] **Step 6: Commit**
```bash
git add src/pages/stock/screener/components/canvas/constants/canvasLayout.js src/pages/stock/screener/components/canvas/constants/canvasLayout.test.js
git commit -m "feat(screener): canvas adds AI news node (12 nodes, 18 edges)"
```
---
### Task 14: 전체 백엔드 테스트 + 프론트 빌드
**Files:**
- (테스트/빌드만 실행)
- [ ] **Step 1: stock-lab 전체 테스트**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\stock-lab
python -m pytest -v
```
Expected: 모든 신규 테스트 통과 + 기존 71개 테스트 회귀 없음.
- [ ] **Step 2: agent-office 전체 테스트**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend\agent-office
python -m pytest -v 2>&1 | tail -20
```
Expected: 기존 테스트 회귀 없음. on_ai_news_schedule 메서드 추가 단순이라 기존 테스트와 충돌 없음.
- [ ] **Step 3: frontend 빌드**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-ui
npm run build
```
Expected: 빌드 성공. `CanvasLayout-*.js` 청크 크기 거의 변화 없음 (canvasLayout.js +30줄 정도).
- [ ] **Step 4: frontend lint (회귀)**
```bash
npx eslint src/pages/stock/screener/components/canvas/constants/canvasLayout.js
```
Expected: 0 errors.
---
### Task 15: 수동 검증 + 배포
**Files:**
- (실행만, 수동 검증)
- [ ] **Step 1: backend push (Gitea 자격증명 필요 — 사용자 수동)**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-backend
git push origin main
```
실패 시: 사용자에게 자격증명 입력 요청. push 성공 시 Gitea webhook → deployer rsync → docker compose build 자동.
- [ ] **Step 2: NAS docker 컨테이너 갱신 확인 + 환경변수 점검**
NAS 콘솔에서:
```bash
docker logs stock-lab --tail 30
docker logs agent-office --tail 30
```
- stock-lab 환경에 `ANTHROPIC_API_KEY` 가 비어있지 않은지 확인 (`docker-compose.yml`의 stock-lab 환경변수 line 40)
- agent-office 의 scheduler 시작 시 `stock_ai_news_sentiment` job이 등록되었는지 로그 확인
- [ ] **Step 3: 수동 cron 트리거 (운영자 권한)**
agent-office API를 통한 수동 실행 (테스트):
```bash
curl -X POST "https://gahusb.synology.me/api/agent-office/command" \
-H "Content-Type: application/json" \
-d '{"agent_id":"stock","command":"run_ai_news"}'
```
(참고: `run_ai_news` 명령이 `on_command` 에서 지원되지 않으면 직접 cron 호출이 필요. 본 plan은 `on_command` 분기 추가까지는 포함하지 않으므로, NAS에서 다음으로 검증:)
```bash
docker exec agent-office python -c "
import asyncio
from app.agents.stock import StockAgent
from app.scheduler import stock_agent
asyncio.run(stock_agent.on_ai_news_schedule())
"
```
- [ ] **Step 4: 데이터 검증**
```bash
docker exec stock-lab sqlite3 /app/data/stock.db "
SELECT ticker, score_raw, news_count, model FROM news_sentiment
WHERE date = date('now') ORDER BY score_raw DESC LIMIT 10;
"
```
Expected: 약 100 종목 행 생성, score_raw 분포가 -10~+10 사이.
- [ ] **Step 5: 텔레그램 메시지 확인**
08:00 KST 가 되거나 step 3 수동 트리거 후 텔레그램에서 메시지 수신 확인. 형식:
- 🌅 AI 뉴스 분석 (YYYY-MM-DD 08:00)
- 📈 호재 Top 5
- 📉 악재 Top 5
- 토큰/비용 라인
- [ ] **Step 6: 16:30 스크리너 잡 다음 실행 시 ai_news 가중합 반영 확인**
스크리너 실행 후 텔레그램 결과 또는:
```bash
docker exec stock-lab sqlite3 /app/data/stock.db "
SELECT rank, ticker, name, total_score, scores_json
FROM screener_results sr JOIN screener_runs r ON sr.run_id=r.id
WHERE r.asof=date('now') ORDER BY rank LIMIT 5;
"
```
`scores_json``ai_news` 키가 0~100 점수로 존재하는지 확인.
- [ ] **Step 7: 프론트 캔버스에 8번째 노드 표시 확인**
```bash
cd C:\Users\jaeoh\Desktop\workspace\web-ui
npm run release:nas
```
NAS 배포 후 https://gahusb.synology.me/stock/screener 캔버스 모드 진입:
- 🤖 AI 뉴스 노드가 8번째 score 노드로 표시
- gate → AI 뉴스 → combine 엣지가 그려짐
- weight=0 토글 시 흐릿 + 점선 엣지
- [ ] **Step 8: web-ui CLAUDE.md 갱신**
`Stock Screener` 행 또는 spec 인덱스에 AI 뉴스 노드 추가 (선택):
```bash
git add CLAUDE.md
git commit -m "docs(screener): note ai_news 8th score node"
git push origin main
```
- [ ] **Step 9: 메모리 업데이트 (controller 작업)**
`project_stock_screener.md` 갱신:
- ai_news 노드 완료 commit SHA 추가
- 후속 슬라이스 §14 에서 "AI 뉴스 호재/악재 노드" 제거 → 남은 8개로 갱신
---
## 완료 후 검증 체크리스트
본 plan 완료 시:
- [ ] backend: stock-lab 신규 5개 모듈 + 5개 테스트 파일 모두 PASS
- [ ] backend: agent-office on_ai_news_schedule + scheduler cron 등록
- [ ] frontend: 캔버스 모드에 🤖 AI 뉴스 노드 표시, 가중치 슬라이더 동작
- [ ] DB: news_sentiment 테이블 일별 100 종목 행 생성
- [ ] 텔레그램: 평일 08:00 KST Top 5/5 메시지 자동 발송
- [ ] 16:30 스크리너: ai_news 점수가 가중합에 반영 (scores_json 안)
- [ ] LLM 비용: ~$0.075/일 (텔레그램 메시지 하단 ₩ 표시)
- [ ] fail-soft: 한 종목 LLM 실패해도 전체 잡 성공
- [ ] localStorage / cron / DB 마이그레이션 모두 idempotent (재실행 안전)
## 후속 슬라이스 (이번 plan 완료 후)
spec §18에 명시된 8개:
1. URL 단위 캐싱 (비용 ~70% 절감)
2. 16:00 추가 sentiment cron
3. sentiment 트렌드 차트
4. 시총 200~500 확장
5. 백테스트
6. 다국어/거시 뉴스
7. 텔레그램 알림 토글
8. 종목별 sentiment 페이지