- git mv stock-lab/ → stock/ - docker-compose.yml: 서비스 키 + container_name + build.context + frontend.depends_on + agent-office STOCK_LAB_URL → STOCK_URL - agent-office/app: config.py, service_proxy.py, agents/stock.py, tests/ STOCK_LAB_URL → STOCK_URL - nginx/default.conf: proxy_pass http://stock-lab → http://stock (3 lines) - CLAUDE.md / README.md / STATUS.md / scripts/ 문구 갱신 - stock/ 내부 자기 참조 갱신 lab 네이밍 정책 (feedback_lab_naming.md) graduation. API URL / Python import / DB 파일명 변경 없음.
142 lines
4.3 KiB
Python
142 lines
4.3 KiB
Python
"""ai_news refresh pipeline — 시총 상위 N종목 병렬 처리."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import datetime as dt
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
from typing import Any, Dict, List
|
|
|
|
from . import scraper as _scraper # legacy, kept for backward import
|
|
from . import analyzer as _analyzer
|
|
from . import articles_source # 신규
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
DEFAULT_TOP_N = 100
|
|
DEFAULT_CONCURRENCY = 10
|
|
DEFAULT_NEWS_PER_TICKER = 5
|
|
|
|
|
|
def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
|
|
rows = conn.execute(
|
|
"SELECT ticker FROM krx_master "
|
|
"WHERE market_cap IS NOT NULL AND is_preferred=0 AND is_spac=0 "
|
|
"ORDER BY market_cap DESC LIMIT ?",
|
|
(n,),
|
|
).fetchall()
|
|
return [r[0] for r in rows]
|
|
|
|
|
|
def _make_llm():
|
|
"""Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수."""
|
|
from anthropic import AsyncAnthropic
|
|
return AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
|
|
|
|
|
|
async def _process_one(
|
|
ticker: str, name: str, articles: List[Dict[str, Any]],
|
|
sem: asyncio.Semaphore, llm, model: str,
|
|
) -> Dict[str, Any]:
|
|
async with sem:
|
|
return await _analyzer.score_sentiment(
|
|
llm, ticker, articles, name=name, model=model,
|
|
)
|
|
|
|
|
|
def _upsert_news_sentiment(
|
|
conn: sqlite3.Connection, asof: dt.date,
|
|
rows: List[Dict[str, Any]], *, source: str = "articles",
|
|
) -> None:
|
|
iso = asof.isoformat()
|
|
data = [
|
|
(
|
|
r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
|
|
r["tokens_input"], r["tokens_output"], r["model"], source,
|
|
)
|
|
for r in rows
|
|
]
|
|
conn.executemany(
|
|
"""INSERT INTO news_sentiment
|
|
(ticker, date, score_raw, reason, news_count,
|
|
tokens_input, tokens_output, model, source)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(ticker, date) DO UPDATE SET
|
|
score_raw=excluded.score_raw,
|
|
reason=excluded.reason,
|
|
news_count=excluded.news_count,
|
|
tokens_input=excluded.tokens_input,
|
|
tokens_output=excluded.tokens_output,
|
|
model=excluded.model,
|
|
source=excluded.source
|
|
""",
|
|
data,
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
async def refresh_daily(
|
|
conn: sqlite3.Connection,
|
|
asof: dt.date,
|
|
*,
|
|
top_n: int = DEFAULT_TOP_N,
|
|
concurrency: int = DEFAULT_CONCURRENCY,
|
|
max_news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
|
|
window_days: int = 1,
|
|
model: str = _analyzer.DEFAULT_MODEL,
|
|
) -> Dict[str, Any]:
|
|
started = time.time()
|
|
tickers = _top_market_cap_tickers(conn, n=top_n)
|
|
name_map = {
|
|
r[0]: r[1] for r in conn.execute(
|
|
f"SELECT ticker, name FROM krx_master WHERE ticker IN "
|
|
f"({','.join('?' * len(tickers))})", tickers,
|
|
).fetchall()
|
|
} if tickers else {}
|
|
|
|
articles_by_ticker, mapping_stats = articles_source.gather_articles_for_tickers(
|
|
conn, tickers, asof,
|
|
window_days=window_days,
|
|
max_per_ticker=max_news_per_ticker,
|
|
)
|
|
|
|
sem = asyncio.Semaphore(concurrency)
|
|
async with _make_llm() as llm:
|
|
tasks = []
|
|
for t in tickers:
|
|
arts = articles_by_ticker.get(t, [])
|
|
if not arts:
|
|
continue # 매핑 0 — score 미생성
|
|
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model))
|
|
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
successes: List[Dict[str, Any]] = []
|
|
failures: List[str] = []
|
|
for r in raw_results:
|
|
if isinstance(r, BaseException):
|
|
failures.append(repr(r))
|
|
elif isinstance(r, dict):
|
|
successes.append(r)
|
|
|
|
if successes:
|
|
_upsert_news_sentiment(conn, asof, successes, source="articles")
|
|
|
|
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
|
|
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
|
|
|
|
return {
|
|
"asof": asof.isoformat(),
|
|
"updated": len(successes),
|
|
"failures": failures,
|
|
"duration_sec": round(time.time() - started, 2),
|
|
"tokens_input": sum(r["tokens_input"] for r in successes),
|
|
"tokens_output": sum(r["tokens_output"] for r in successes),
|
|
"top_pos": top_pos,
|
|
"top_neg": top_neg,
|
|
"model": model,
|
|
"mapping": mapping_stats,
|
|
}
|