docs(ai_news): Phase 1 design — articles table reuse (replace Naver scraper)

기존 articles 테이블 재사용 + 종목명 substring 매핑. Naver 차단/중복
인프라 해소, LLM 입력에 summary 포함. weight=0 유지 (검증 전 차단),
news_sentiment.source 컬럼 추가 (Phase 2 비교 baseline).

선행 review (adversarial)에서 가장 강한 비판: "이미 매일 수집 중인
articles 테이블을 무시하고 Naver를 100번 더 긁는 중복" → 해소.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 01:47:25 +09:00
parent ec3ca5fcfa
commit cdfa31b0c1

View File

@@ -0,0 +1,422 @@
# AI News Phase 1 — `articles` Source Integration Design
**작성일**: 2026-05-14
**작성자**: gahusb
**상태**: Approved for implementation
**선행 spec**: `2026-05-13-ai-news-sentiment-node-design.md`
**선행 review**: adversarial review (Claude general-purpose, codex CLI ENOENT fallback)
---
## 1. 목표
`ai_news` 파이프라인의 데이터 소스를 **Naver 종목 뉴스 스크래핑 → 기존 `articles` 테이블 재사용** 으로 교체한다. 인프라 중복 제거(이미 매일 cron으로 수집 중) + Naver 차단 회피 + LLM 입력 풍부화(summary 포함).
본 슬라이스는 **Phase 1** 전략의 일부. 4주 IC 측정 결과를 보고 (a) IC < 0.05 → 노드 폐기, (b) IC ≥ 0.05 → Phase 2 (DART OpenAPI 추가) 결정.
**Why**: adversarial review에서 가장 강한 비판이 **"이미 매일 수집 중인 `articles` 테이블을 무시하고 Naver를 100번 더 긁는 중복 인프라"**였음. weight=0 차단(이전 슬라이스 `943f676`)과 짝을 이루어 본 슬라이스로 인프라 중복 해소.
---
## 2. 범위
**포함 (Phase 1)**:
- 신규 모듈 `ai_news/articles_source.py` — 기존 articles 테이블 조회 + 종목명 substring 매핑
- `news_sentiment` 테이블에 `source TEXT NOT NULL DEFAULT 'articles'` 컬럼 추가
- `pipeline.py` 가 articles_source 사용 (Naver scraper 호출 제거)
- `analyzer.py` 가 LLM 입력에 `summary` 추가 (제목 + 요약)
- 텔레그램 메시지에 매핑 hit-rate 표시 (e.g., "matched 42/100")
- 단위 테스트 — articles_source 6개, pipeline 통합 회귀
**범위 외 (NOT)**:
- DART OpenAPI 통합 (Phase 2, IC 검증 후)
- alias dict / LLM ticker 추출 (Phase 1.5, hit-rate 낮을 시)
- failure taxonomy (별도 슬라이스)
- legacy `scraper.py` 삭제 (Phase 2 결정 후)
- 환경변수로 source 토글 fallback (YAGNI)
- weight 변경 (여전히 0.0 유지)
- 매핑 정확도 자동 alarm/threshold
---
## 3. 아키텍처
```
┌──────────────────────────────┐
[08:00 KST 평일] │ agent-office on_ai_news_ │
│ schedule (변경 없음) │
└──────────┬───────────────────┘
│ HTTP POST
┌────────────────────────────────────────────────────────┐
│ stock-lab /snapshot/refresh-news-sentiment (변경 없음) │
│ │
│ ai_news/pipeline.refresh_daily(asof): │
│ 1. top-100 tickers by market_cap (그대로) │
│ 2. articles_source.gather_articles_for_tickers(...) │
│ - SELECT * FROM articles WHERE crawled_at >= asof-1d│
│ - 각 article (title+summary) ∋ ticker.name 매칭 │
│ - {ticker: [article_dict, ...]} 반환 │
│ 3. asyncio.gather (매핑된 ticker만): │
│ a. analyzer.score_sentiment(llm, ticker, articles) │
│ (Naver scraper 호출 없음 — articles 그대로 전달) │
│ 4. news_sentiment upsert with source='articles' │
│ 5. 텔레그램 페이로드: matched_count / total_count 추가 │
└────────────────────────────────────────────────────────┘
```
**의존성 변경 없음**: anthropic SDK 유지, httpx/BeautifulSoup 제거하지 않음 (legacy scraper에서 import 유지).
---
## 4. 파일 변경
### 4.1 신규
```
web-backend/stock-lab/app/screener/ai_news/
articles_source.py ← DB articles 조회 + 종목 매핑
web-backend/stock-lab/tests/
test_ai_news_articles_source.py ← 6 tests
```
### 4.2 수정
```
web-backend/stock-lab/app/screener/
schema.py ← news_sentiment.source 컬럼 + migration
ai_news/pipeline.py ← scraper 호출 제거, articles_source 사용
ai_news/analyzer.py ← summary 활용
```
### 4.3 변경 없음
- `ai_news/scraper.py` (deprecate 주석만, 다음 슬라이스에서 삭제 결정)
- `ai_news/telegram.py` (매핑 통계는 router 에서 처리하거나 telegram 빌더에 인자 추가)
- `ai_news/validation.py` (IC 측정은 데이터 소스 무관)
- `nodes/ai_news.py`
- `engine.py`
- `router.py` (응답 구조는 동일, 새 통계 필드만 추가)
- agent-office 전체
- 프론트엔드
---
## 5. DB 스키마 변경
```sql
ALTER TABLE news_sentiment ADD COLUMN source TEXT NOT NULL DEFAULT 'articles';
```
`schema.py``ensure_screener_schema(conn)` 에 migration block:
```python
cols = {r[1] for r in conn.execute("PRAGMA table_info(news_sentiment)").fetchall()}
if "source" not in cols:
conn.execute(
"ALTER TABLE news_sentiment ADD COLUMN source TEXT NOT NULL DEFAULT 'articles'"
)
```
기존 운영 row (Naver 출처)는 default `'articles'` 로 채워짐 — 이는 의미적으로 부정확하지만 다음 cron부터 실제 articles 출처로 upsert되어 덮어쓰여짐. 24시간 내 정확화. Phase 2 비교 시점(4주 후)에는 충분히 cleared.
---
## 6. `articles_source.py` 구현
```python
"""기존 articles 테이블에서 종목별 뉴스 매핑."""
from __future__ import annotations
import datetime as dt
import logging
import sqlite3
from typing import Any, Dict, List, Tuple
log = logging.getLogger(__name__)
def gather_articles_for_tickers(
conn: sqlite3.Connection,
tickers: List[str],
asof: dt.date,
*,
window_days: int = 1,
max_per_ticker: int = 5,
) -> Tuple[Dict[str, List[Dict[str, Any]]], Dict[str, int]]:
"""Returns ({ticker: [article, ...]}, stats)."""
cutoff = (asof - dt.timedelta(days=window_days)).isoformat()
# 1. tickers 의 회사명 조회
if not tickers:
return {}, {"total_articles": 0, "matched_pairs": 0, "hit_tickers": 0}
placeholders = ",".join("?" * len(tickers))
name_rows = conn.execute(
f"SELECT ticker, name FROM krx_master WHERE ticker IN ({placeholders})",
tickers,
).fetchall()
name_map = {r[0]: r[1] for r in name_rows if r[1]}
# 2. 최근 articles 조회
articles = conn.execute(
"SELECT title, summary, press, pub_date, crawled_at "
"FROM articles WHERE crawled_at >= ? ORDER BY crawled_at DESC",
(cutoff,),
).fetchall()
# 3. 매핑
out: Dict[str, List[Dict[str, Any]]] = {t: [] for t in tickers}
matched_pairs = 0
for a in articles:
title = (a[0] or "").strip()
summary = (a[1] or "").strip()
haystack = title + " " + summary
for ticker, name in name_map.items():
if not name or len(name) < 2:
continue
if name in haystack:
if len(out[ticker]) >= max_per_ticker:
continue
out[ticker].append({
"title": title,
"summary": summary,
"press": a[2] or "",
"pub_date": a[3] or "",
})
matched_pairs += 1
hit_tickers = sum(1 for arts in out.values() if arts)
stats = {
"total_articles": len(articles),
"matched_pairs": matched_pairs,
"hit_tickers": hit_tickers,
}
return out, stats
```
---
## 7. `pipeline.py` 변경
`refresh_daily()``_make_http()` / `asyncio.Semaphore(rate_limit)` / scraper 호출 부분 교체:
```python
async def refresh_daily(conn, asof, *, top_n=100, concurrency=10,
max_news_per_ticker=5, model=_analyzer.DEFAULT_MODEL):
started = time.time()
tickers = _top_market_cap_tickers(conn, n=top_n)
name_map = {...} # 기존 그대로
# 새: articles 매핑
articles_by_ticker, mapping_stats = articles_source.gather_articles_for_tickers(
conn, tickers, asof, window_days=1, max_per_ticker=max_news_per_ticker,
)
sem = asyncio.Semaphore(concurrency)
async with _make_llm() as llm:
tasks = []
for t in tickers:
articles = articles_by_ticker.get(t, [])
if not articles:
continue # 매핑 0 — score 미생성
tasks.append(_process_one_articles(
t, name_map.get(t, t), articles, sem, llm, model
))
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes, failures = _split_results(raw_results)
if successes:
_upsert_news_sentiment(conn, asof, successes, source="articles")
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
return {
"asof": asof.isoformat(),
"updated": len(successes),
"failures": [str(f) for f in failures],
"duration_sec": round(time.time() - started, 2),
"tokens_input": sum(r["tokens_input"] for r in successes),
"tokens_output": sum(r["tokens_output"] for r in successes),
"top_pos": top_pos, "top_neg": top_neg, "model": model,
"mapping": mapping_stats, # 신규
}
async def _process_one_articles(ticker, name, articles, sem, llm, model):
async with sem:
return await _analyzer.score_sentiment(llm, ticker, articles, name=name, model=model)
```
`_make_http()` 제거. legacy scraper 의존 없음.
`_upsert_news_sentiment``source` 인자 추가:
```python
def _upsert_news_sentiment(conn, asof, rows, *, source="articles"):
iso = asof.isoformat()
data = [(
r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
r["tokens_input"], r["tokens_output"], r["model"], source,
) for r in rows]
conn.executemany(
"""INSERT INTO news_sentiment
(ticker, date, score_raw, reason, news_count,
tokens_input, tokens_output, model, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, date) DO UPDATE SET
score_raw=excluded.score_raw, reason=excluded.reason,
news_count=excluded.news_count, tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output, model=excluded.model,
source=excluded.source
""", data,
)
conn.commit()
```
---
## 8. `analyzer.py` 변경 (미세)
`news_block` 빌더만:
```python
def _format_news_block(news: List[Dict[str, Any]]) -> str:
lines = []
for n in news:
date = n.get("pub_date", "")
title = n["title"]
summary = (n.get("summary") or "").strip()
if summary:
lines.append(f"- [{date}] {title}\n {summary[:200]}")
else:
lines.append(f"- [{date}] {title}")
return "\n".join(lines)
```
`score_sentiment()` 의 prompt 빌드 부분:
```python
news_block = _format_news_block(news)
```
LLM 입력 토큰 ~2-3배 (summary 200자 cap). 매핑 수가 감소(예상 100 → 30-60)하므로 총 토큰 비용은 비슷하거나 약간 감소.
---
## 9. 텔레그램 매핑 통계 표시
`telegram.build_message()``mapping` 인자 추가:
```python
def build_message(*, asof, top_pos, top_neg, tokens_input, tokens_output,
mapping=None):
...
cost = _cost_won(tokens_input, tokens_output)
mapping_line = ""
if mapping:
mapping_line = (
f"매핑: {mapping['hit_tickers']}/100 ticker "
f"\\({mapping['matched_pairs']}쌍 / articles {mapping['total_articles']}\\) · "
)
lines += [
"",
f"_분석: 시총 상위 100종목 · {mapping_line}"
f"토큰 {tokens_input:,} in / {tokens_output:,} out · 약 ₩{cost:,}_",
]
return "\n".join(lines)
```
`router.py` 에서 `mapping=summary.get('mapping')` 전달.
---
## 10. 테스트 전략
### 10.1 신규 `test_ai_news_articles_source.py` (6 tests)
1. **single_ticker_match_in_title** — title 에 회사명 → 매핑 hit
2. **single_ticker_match_in_summary** — summary 에 회사명 → 매핑 hit
3. **multi_ticker_match** — 한 article 이 두 회사명 포함 → 두 ticker 모두 매핑
4. **no_match_returns_empty_list** — 회사명 미포함 article → 빈 리스트
5. **max_per_ticker_caps_results** — 6개 매핑 가능한 articles 중 max=5
6. **window_days_filters_old_articles** — crawled_at < cutoff 인 article 제외
### 10.2 갱신 `test_ai_news_pipeline.py`
기존 `patch.object(pipeline, "_scraper")` 패턴을 `patch.object(pipeline, "articles_source")` 로 교체. 시나리오:
- happy path: 3 ticker × 1 article each
- failures isolated: 한 ticker LLM error
- 매핑 0 ticker (skip 검증)
### 10.3 갱신 `test_ai_news_analyzer.py`
- `news` 입력에 `summary` 가 있을 때 prompt 에 포함되는지
- summary 없을 때 title 만 사용
- pub_date 표시
### 10.4 갱신 `test_ai_news_telegram.py`
- `mapping` 인자 있을 때 매핑 라인 포함
- `mapping=None` 일 때 기존 동작
### 10.5 갱신 `test_ai_news_router.py`
- response 에 `mapping` 필드 포함
### 10.6 갱신 `test_screener_schema.py`
- migration 시 `source` 컬럼 생성
- 기존 row 의 source default 검증
---
## 11. 운영 가정 + 모니터링
| 가정 | 모니터링 |
|------|----------|
| 기존 `stock_news` cron (7:30 KST)이 articles 매일 수집 | 그게 깨지면 ai_news 도 0 결과 — articles 일별 count 별도 모니터링 권장 (이번 슬라이스 외) |
| 시장 뉴스에 시총 상위 100종목 회사명이 자주 등장 | hit-rate 텔레그램 라인으로 일별 확인. <30% 면 alias dict 추가 검토 |
| 회사명 substring match가 false positive 적음 | 4주 IC 결과로 검증 (positive면 매핑 정확도 OK 추정) |
---
## 12. 에러 처리
| 상황 | 처리 |
|------|------|
| articles 테이블 비어 있음 | gather() 반환 = `{}`, stats `total=0`. 모든 ticker skip, news_sentiment 0 row 추가, telegram에 "매핑 0/100" 표시 |
| 시총 상위 ticker 모두 매핑 0 | `updated=0` → on_ai_news_schedule 의 운영자 알림 분기 (기존 그대로) |
| krx_master 비어 있음 | gather() 가 빈 결과, 위와 동일 |
| LLM 실패 (특정 ticker) | 기존 fail-soft 그대로. failures 리스트에 추가, 다른 ticker 영향 없음 |
| migration 실행 실패 (예: 이미 컬럼 존재) | PRAGMA table_info 체크로 idempotent. ALTER 안 실행 |
---
## 13. 비용 / 성능 비교
| 항목 | 현재 (Naver) | Phase 1 (articles) |
|------|--------------|-------------------|
| 외부 HTTP | 100건/일 (Naver) | 0건 |
| 실패율 | 30%+ (Naver 차단) | 0% (DB 조회) |
| LLM calls | 100 | hit_tickers 수 (예상 30-60) |
| LLM input tokens | ~25K | ~30-50K (summary 포함) |
| 일 비용 | ~$0.075 | ~$0.05-0.10 (실측 후) |
| 처리 시간 | 30-60초 | 5-15초 (DB + LLM) |
---
## 14. Rollback
- 데이터: `news_sentiment.source` 컬럼으로 Phase 1 데이터와 이전 Naver 데이터 구분 가능
- 코드: `git revert` 만으로 가능. legacy `scraper.py` 유지로 코드 회복 즉시
- 환경변수 토글: **미포함** (YAGNI)
---
## 15. 후속 슬라이스 (Phase 1 이후 결정)
- **Phase 1.5** — 매핑 hit-rate < 30% 면 alias dict 추가 (50-100개)
- **Phase 2** — 4주 IC ≥ 0.05 시 DART OpenAPI 추가 (하이브리드 점수)
- **Phase X** — IC < 0.05 시 노드 deprecate 후 삭제 (scraper + analyzer + pipeline + node + DB cleanup)
---
## 16. 완료 조건 (Definition of Done)
- [ ] `articles_source.py` + 6개 단위 테스트
- [ ] `news_sentiment.source` 컬럼 추가 + migration
- [ ] `pipeline.py` 가 articles_source 사용 (scraper 호출 없음)
- [ ] `analyzer.py` 가 summary 포함 prompt
- [ ] `telegram.py` 에 매핑 통계 라인
- [ ] `router.py` 응답에 `mapping` 필드
- [ ] 기존 76 단위 테스트 + 갱신/신규 테스트 모두 통과
- [ ] 운영 환경 트리거 시 텔레그램에 "매핑 N/100" 표시 + news_sentiment 행에 source='articles'
- [ ] LLM 비용이 일 ~$0.05-0.10 범위로 감소 (텔레그램 ₩ 라인으로 확인)
- [ ] 첫 실행 후 매핑 hit-rate 메모리 기록 (1.5/2 결정 baseline)