17 Commits

Author SHA1 Message Date
2a552d3cc8 test(screener): update node count test to 8 (ai_news added) 2026-05-13 23:52:54 +09:00
f37b21a408 fix(agent-office): on_ai_news_schedule — graceful fail on missing telegram_text 2026-05-13 23:48:59 +09:00
df7a8d985e feat(agent-office): cron mon-fri 08:00 ai_news sentiment job 2026-05-13 23:46:37 +09:00
c5d0c84183 feat(agent-office): on_ai_news_schedule (cron handler + telegram dispatch) 2026-05-13 23:46:17 +09:00
53a78a1062 feat(agent-office): refresh_ai_news_sentiment service helper 2026-05-13 23:45:51 +09:00
ca8bcb3fed feat(screener): POST /snapshot/refresh-news-sentiment with telegram_text 2026-05-13 23:44:38 +09:00
4b4f91c052 feat(screener): register ai_news in NODE_REGISTRY 2026-05-13 23:41:21 +09:00
6c3a84b8ec feat(screener): ScreenContext.news_sentiment field + load query 2026-05-13 23:41:01 +09:00
2ff2645240 feat(screener): AiNewsSentiment ScoreNode (percentile_rank + min_news_count) 2026-05-13 23:39:42 +09:00
f2143b3889 feat(screener): ai_news telegram message builder (MarkdownV2 + cost line) 2026-05-13 23:38:07 +09:00
810cc76d40 feat(screener): ai_news pipeline (top-100 parallel, fail-soft, upsert) 2026-05-13 23:36:03 +09:00
0a91f43c46 feat(screener): ai_news Claude Haiku analyzer (-10~+10 + clamp + JSON-fail soft) 2026-05-13 23:33:20 +09:00
3d321f2b4b chore(stock-lab): add pytest + pytest-asyncio to requirements 2026-05-13 23:30:47 +09:00
6ba29599aa feat(screener): ai_news scraper (naver finance ticker news) 2026-05-13 23:29:52 +09:00
658ed13571 feat(screener): add news_sentiment table + ai_news defaults + migration 2026-05-13 23:26:38 +09:00
15ee3c3301 fix(compose): frontend.depends_on 누락된 6개 lab 추가
lotto, stock-lab, agent-office, personal, packs-lab, travel-proxy 가
누락되어 있어 한 컨테이너 다운 시 nginx upstream resolve 실패 위험.
이번 사이클에 lotto httpx 사고로 명시화된 risk 를 해소.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 22:12:07 +09:00
2b5009f864 fix(sqlite): WAL + busy_timeout 120s standardized across all labs
8개 lab의 _conn() 함수에 표준 동시성 패턴 통일:
- timeout=120.0 (connection 획득)
- PRAGMA journal_mode=WAL (reader/writer 분리)
- PRAGMA busy_timeout=120000 (트랜잭션 충돌 시 120초 대기)

stock-lab/screener/router.py 의 검증된 패턴(d9b6122) 을 lotto, stock-lab(메인),
music-lab, blog-lab, realestate-lab, agent-office, personal, travel-proxy 로 확산.
기존 'database is locked' 오류 윈도우를 흡수.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 22:12:01 +09:00
30 changed files with 958 additions and 11 deletions

View File

@@ -233,6 +233,109 @@ class StockAgent(BaseAgent):
await self.transition("idle", f"스크리너 오류: {err_msg[:80]}")
async def on_ai_news_schedule(self) -> None:
"""AI 뉴스 sentiment 분석 자동 잡 (평일 08:00 KST).
흐름:
1) stock-lab /snapshot/refresh-news-sentiment 호출
2) status='skipped_weekend'/'skipped_holiday' → 종료 (텔레그램 미발신)
3) updated=0 → 운영자 알림 (HTML)
4) failures > 30% → 경고 알림 후 메인 메시지 발송
5) 정상 → Top 5 호재/악재 메시지 발송 (MarkdownV2)
"""
if self.state not in ("idle", "break"):
return
task_id = create_task(self.agent_id, "ai_news_sentiment", {})
await self.transition("working", "AI 뉴스 분석 중...", task_id)
try:
result = await service_proxy.refresh_ai_news_sentiment()
except Exception as e:
err_msg = str(e)
add_log(self.agent_id, f"AI 뉴스 분석 실패: {err_msg}", "error", task_id)
update_task_status(task_id, "failed", {"error": err_msg})
try:
from ..telegram.messaging import send_raw
await send_raw(
f"⚠️ <b>AI 뉴스 분석 실패</b>\n"
f"<code>{html.escape(err_msg)[:500]}</code>"
)
except Exception as notify_err:
add_log(
self.agent_id,
f"operator notify failed: {notify_err}",
"warning", task_id,
)
await self.transition("idle", f"AI 뉴스 오류: {err_msg[:80]}")
return
status = result.get("status")
if status in ("skipped_weekend", "skipped_holiday"):
update_task_status(task_id, "succeeded", {"status": status})
add_log(self.agent_id, f"AI 뉴스 건너뜀: {status}", "info", task_id)
await self.transition("idle", "휴일/주말 — 건너뜀")
return
updated = int(result.get("updated", 0))
failures = result.get("failures", []) or []
if updated == 0:
update_task_status(task_id, "failed", {"reason": "0 tickers updated"})
try:
from ..telegram.messaging import send_raw
await send_raw(
"⚠️ <b>AI 뉴스 분석 0종목</b>\n"
"스크래핑/LLM 전체 실패 — 어제 데이터 사용"
)
except Exception:
pass
await self.transition("idle", "AI 뉴스 0건")
return
# 실패율 경고 (별도 알림, 본 메시지는 계속 발송)
failure_rate = len(failures) / max(1, updated + len(failures))
if failure_rate > 0.3:
try:
from ..telegram.messaging import send_raw
await send_raw(
f"⚠️ <b>AI 뉴스 실패율 {failure_rate:.0%}</b>\n"
f"updated={updated}, failures={len(failures)}"
)
except Exception:
pass
# 정상 — Top 5 메시지 (stock-lab이 빌드해서 응답에 telegram_text 동봉)
text = result.get("telegram_text") or ""
if not text:
add_log(self.agent_id, "telegram_text 누락 — stock-lab 응답 결함", "error", task_id)
update_task_status(task_id, "failed", {"error": "telegram_text 누락"})
await self.transition("idle", "AI 뉴스 응답 결함")
return
await self.transition("reporting", "AI 뉴스 알림 전송 중...")
from ..telegram.messaging import send_raw
tg = await send_raw(text, parse_mode="MarkdownV2")
update_task_status(task_id, "succeeded", {
"asof": result["asof"],
"updated": updated,
"failures": len(failures),
"tokens_input": int(result.get("tokens_input", 0)),
"tokens_output": int(result.get("tokens_output", 0)),
"telegram_sent": tg.get("ok", False),
})
if not tg.get("ok"):
desc = tg.get("description") or "unknown"
code = tg.get("error_code")
add_log(
self.agent_id,
f"AI news telegram send failed: [{code}] {desc}",
"warning", task_id,
)
await self.transition("idle", "AI 뉴스 완료")
async def on_command(self, command: str, params: dict) -> dict:
if command == "run_screener":
await self.on_screener_schedule()

View File

@@ -9,9 +9,10 @@ from .config import DB_PATH
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, timeout=10)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn

View File

@@ -19,6 +19,11 @@ async def _run_stock_screener():
if agent:
await agent.on_screener_schedule()
async def _run_stock_ai_news():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.on_ai_news_schedule()
async def _run_blog_schedule():
agent = AGENT_REGISTRY.get("blog")
if agent:
@@ -54,6 +59,14 @@ def init_scheduler():
minute=30,
id="stock_screener",
)
scheduler.add_job(
_run_stock_ai_news,
"cron",
day_of_week="mon-fri",
hour=8,
minute=0,
id="stock_ai_news_sentiment",
)
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")

View File

@@ -43,6 +43,20 @@ async def refresh_screener_snapshot() -> Dict[str, Any]:
return resp.json()
async def refresh_ai_news_sentiment() -> Dict[str, Any]:
"""stock-lab의 AI 뉴스 sentiment 분석 트리거 (08:00 cron).
네이버 100종목 스크래핑 + Claude Haiku 100콜 병렬 = 약 30-60초.
여유있게 240s timeout.
"""
async with httpx.AsyncClient(timeout=240.0) as client:
resp = await client.post(
f"{STOCK_LAB_URL}/api/stock/screener/snapshot/refresh-news-sentiment"
)
resp.raise_for_status()
return resp.json()
async def run_stock_screener(mode: str = "auto") -> Dict[str, Any]:
"""stock-lab의 스크리너 실행.

View File

@@ -8,9 +8,10 @@ from .config import DB_PATH
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn

View File

@@ -241,9 +241,15 @@ services:
container_name: frontend
restart: unless-stopped
depends_on:
- lotto
- stock-lab
- music-lab
- blog-lab
- realestate-lab
- agent-office
- personal
- packs-lab
- travel-proxy
ports:
- "8080:80"
volumes:

View File

@@ -9,8 +9,10 @@ DB_PATH = "/app/data/lotto.db"
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn
def _ensure_column(conn: sqlite3.Connection, table: str, col: str, ddl: str) -> None:

View File

@@ -9,8 +9,10 @@ DB_PATH = "/app/data/music.db"
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn

View File

@@ -9,9 +9,10 @@ DB_PATH = "/app/data/personal.db"
def _conn():
c = sqlite3.connect(DB_PATH, timeout=10)
c = sqlite3.connect(DB_PATH, timeout=120.0)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL;")
c.execute("PRAGMA busy_timeout=120000;")
c.execute("PRAGMA foreign_keys=ON;")
return c

View File

@@ -12,9 +12,10 @@ DB_PATH = os.getenv("REALESTATE_DB_PATH", "/app/data/realestate.db")
def _conn():
c = sqlite3.connect(DB_PATH, timeout=10)
c = sqlite3.connect(DB_PATH, timeout=120.0)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL;")
c.execute("PRAGMA busy_timeout=120000;")
c.execute("PRAGMA foreign_keys=ON;")
return c

View File

@@ -12,8 +12,10 @@ def _conn() -> sqlite3.Connection:
parent = os.path.dirname(db_path)
if parent:
os.makedirs(parent, exist_ok=True)
conn = sqlite3.connect(db_path)
conn = sqlite3.connect(db_path, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn
def init_db():

View File

@@ -0,0 +1,76 @@
"""Claude Haiku 기반 종목 뉴스 호재/악재 분석."""
from __future__ import annotations
import json
import logging
import os
from typing import Any, Dict, List
log = logging.getLogger(__name__)
DEFAULT_MODEL = os.getenv("AI_NEWS_MODEL", "claude-haiku-4-5-20251001")
PROMPT_TEMPLATE = """다음은 종목 {name}({ticker})에 대한 최근 뉴스 {n}개의 헤드라인입니다.
{news_block}
이 뉴스들이 종목에 호재인지 악재인지 평가하세요.
score: -10(매우 강한 악재) ~ +10(매우 강한 호재) 사이의 실수. 0은 중립.
reason: 30자 이내 한 줄 근거.
JSON으로만 응답하세요. 다른 텍스트 금지:
{{"score": <float>, "reason": "<string>"}}"""
def _clamp(x: float, lo: float = -10.0, hi: float = 10.0) -> float:
return max(lo, min(hi, x))
async def score_sentiment(
llm,
ticker: str,
news: List[Dict[str, Any]],
*,
name: str | None = None,
model: str = DEFAULT_MODEL,
) -> Dict[str, Any]:
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}."""
news_block = "\n".join(f"- {n['title']}" for n in news)
prompt = PROMPT_TEMPLATE.format(
name=name or ticker, ticker=ticker,
n=len(news), news_block=news_block,
)
resp = await llm.messages.create(
model=model,
max_tokens=200,
messages=[{"role": "user", "content": prompt}],
)
text = resp.content[0].text if resp.content else ""
in_tokens = int(getattr(resp.usage, "input_tokens", 0) or 0)
out_tokens = int(getattr(resp.usage, "output_tokens", 0) or 0)
try:
data = json.loads(text)
score = _clamp(float(data["score"]))
reason = str(data["reason"])[:200]
return {
"ticker": ticker,
"score_raw": score,
"reason": reason,
"news_count": len(news),
"tokens_input": in_tokens,
"tokens_output": out_tokens,
"model": model,
}
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
log.warning("ai_news parse fail for %s: %s (raw=%r)", ticker, e, text[:100])
return {
"ticker": ticker,
"score_raw": 0.0,
"reason": f"parse fail: {e!s}"[:200],
"news_count": len(news),
"tokens_input": in_tokens,
"tokens_output": out_tokens,
"model": model,
}

View File

@@ -0,0 +1,150 @@
"""ai_news refresh pipeline — 시총 상위 N종목 병렬 처리."""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
import os
import sqlite3
import time
from typing import Any, Dict, List, Optional
import httpx
from . import scraper as _scraper
from . import analyzer as _analyzer
log = logging.getLogger(__name__)
DEFAULT_TOP_N = 100
DEFAULT_CONCURRENCY = 10
DEFAULT_NEWS_PER_TICKER = 5
DEFAULT_RATE_LIMIT_SEC = 0.2
def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
rows = conn.execute(
"SELECT ticker FROM krx_master "
"WHERE market_cap IS NOT NULL AND is_preferred=0 AND is_spac=0 "
"ORDER BY market_cap DESC LIMIT ?",
(n,),
).fetchall()
return [r[0] for r in rows]
def _make_http():
return httpx.AsyncClient(timeout=10.0, headers=_scraper.NAVER_HEADERS)
def _make_llm():
"""Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수."""
from anthropic import AsyncAnthropic
return AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
async def _process_one(
ticker: str, name: str, sem: asyncio.Semaphore,
http_client, llm, news_per_ticker: int, rate_limit_sec: float, model: str,
) -> Dict[str, Any]:
async with sem:
if rate_limit_sec > 0:
await asyncio.sleep(rate_limit_sec)
news = await _scraper.fetch_news(http_client, ticker, n=news_per_ticker)
if not news:
return {
"ticker": ticker, "score_raw": 0.0, "reason": "no news",
"news_count": 0, "tokens_input": 0, "tokens_output": 0,
"model": model,
}
return await _analyzer.score_sentiment(
llm, ticker, news, name=name, model=model,
)
def _upsert_news_sentiment(
conn: sqlite3.Connection, asof: dt.date, rows: List[Dict[str, Any]]
) -> None:
iso = asof.isoformat()
data = [
(
r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
r["tokens_input"], r["tokens_output"], r["model"],
)
for r in rows
]
conn.executemany(
"""INSERT INTO news_sentiment
(ticker, date, score_raw, reason, news_count,
tokens_input, tokens_output, model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, date) DO UPDATE SET
score_raw=excluded.score_raw,
reason=excluded.reason,
news_count=excluded.news_count,
tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output,
model=excluded.model
""",
data,
)
conn.commit()
async def refresh_daily(
conn: sqlite3.Connection,
asof: dt.date,
*,
top_n: int = DEFAULT_TOP_N,
concurrency: int = DEFAULT_CONCURRENCY,
news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC,
model: str = _analyzer.DEFAULT_MODEL,
) -> Dict[str, Any]:
"""Returns summary dict with top_pos/top_neg/token totals/failures."""
started = time.time()
tickers = _top_market_cap_tickers(conn, n=top_n)
name_map = {
r[0]: r[1] for r in conn.execute(
f"SELECT ticker, name FROM krx_master WHERE ticker IN "
f"({','.join('?' * len(tickers))})", tickers,
).fetchall()
} if tickers else {}
sem = asyncio.Semaphore(concurrency)
async with _make_http() as http_client, _make_llm() as llm:
tasks = [
_process_one(
t, name_map.get(t, t), sem, http_client, llm,
news_per_ticker, rate_limit_sec, model,
)
for t in tickers
]
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = []
failures: List[str] = []
for r in raw_results:
if isinstance(r, BaseException):
failures.append(repr(r))
elif isinstance(r, dict):
successes.append(r)
if successes:
_upsert_news_sentiment(conn, asof, successes)
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
return {
"asof": asof.isoformat(),
"updated": len(successes),
"failures": failures,
"duration_sec": round(time.time() - started, 2),
"tokens_input": sum(r["tokens_input"] for r in successes),
"tokens_output": sum(r["tokens_output"] for r in successes),
"top_pos": top_pos,
"top_neg": top_neg,
"model": model,
}

View File

@@ -0,0 +1,39 @@
"""네이버 finance 종목 뉴스 스크래핑."""
from __future__ import annotations
import logging
from typing import Any, Dict, List
from bs4 import BeautifulSoup
log = logging.getLogger(__name__)
NAVER_NEWS_URL = "https://finance.naver.com/item/news_news.naver"
NAVER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": "https://finance.naver.com/",
}
async def fetch_news(client, ticker: str, n: int = 5) -> List[Dict[str, Any]]:
"""Scrape top N news headlines for a ticker. Returns [] on any failure."""
try:
r = await client.get(NAVER_NEWS_URL, params={"code": ticker, "page": 1})
except Exception as e:
log.warning("ai_news scrape http error for %s: %s", ticker, e)
return []
if r.status_code != 200:
return []
soup = BeautifulSoup(r.text, "lxml")
out: List[Dict[str, Any]] = []
for row in soup.select("table.type5 tbody tr")[:n]:
title_el = row.select_one("td.title a")
date_el = row.select_one("td.date")
if not title_el or not date_el:
continue
out.append({
"title": title_el.get_text(strip=True),
"date": date_el.get_text(strip=True),
})
return out

View File

@@ -0,0 +1,61 @@
"""ai_news Top 5/5 텔레그램 메시지 빌더 (MarkdownV2)."""
from __future__ import annotations
from typing import Any, Dict, List
_MD_SPECIAL = r"_*[]()~`>#+-=|{}.!\\"
def _escape(text: str) -> str:
return "".join("\\" + c if c in _MD_SPECIAL else c for c in str(text))
def _cost_won(tokens_input: int, tokens_output: int) -> int:
"""Claude Haiku 가격 환산 (대략): in $1/M × ₩1300, out $5/M × ₩1300."""
return int(tokens_input * 0.0013 + tokens_output * 0.0065)
def _row_line(idx: int, r: Dict[str, Any]) -> str:
score = r["score_raw"]
sign = "+" if score >= 0 else ""
return (
f"{idx}\\. {_escape(r['ticker'])} \\({sign}{score:.1f}\\) — "
f"{_escape(r['reason'])}"
)
def build_message(
*,
asof: str,
top_pos: List[Dict[str, Any]],
top_neg: List[Dict[str, Any]],
tokens_input: int,
tokens_output: int,
) -> str:
lines: List[str] = [
f"🌅 *AI 뉴스 분석* \\({_escape(asof)} 08:00\\)",
"",
"📈 *호재 Top 5*",
]
if top_pos:
for i, r in enumerate(top_pos, 1):
lines.append(_row_line(i, r))
else:
lines.append(_escape("- (없음)"))
lines += ["", "📉 *악재 Top 5*"]
if top_neg:
for i, r in enumerate(top_neg, 1):
lines.append(_row_line(i, r))
else:
lines.append(_escape("- (없음)"))
cost = _cost_won(tokens_input, tokens_output)
lines += [
"",
f"_분석: 시총 상위 100종목 · 토큰 {tokens_input:,} in / {tokens_output:,} out · "
f"약 ₩{cost:,}_",
]
return "\n".join(lines)

View File

@@ -17,6 +17,7 @@ class ScreenContext:
flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net
kospi: pd.Series # index=date(str), name="kospi"
asof: dt.date
news_sentiment: "pd.DataFrame | None" = None
@classmethod
def load(cls, conn: sqlite3.Connection, asof: dt.date,
@@ -38,6 +39,10 @@ class ScreenContext:
"FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date",
conn, params=(cutoff, asof_iso),
)
news_sentiment = pd.read_sql_query(
"SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date = ?",
conn, params=(asof_iso,),
)
# KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용.
# 후속 슬라이스에서 ^KS11 별도 캐시.
@@ -47,7 +52,8 @@ class ScreenContext:
kospi = sub.copy()
kospi.name = "kospi"
return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof)
return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof,
news_sentiment=news_sentiment)
def restrict(self, tickers) -> "ScreenContext":
tickers = pd.Index(tickers)

View File

@@ -0,0 +1,36 @@
"""AI 뉴스 호재/악재 점수 노드.
ScreenContext.news_sentiment (DataFrame: ticker, score_raw, news_count) 를
min_news_count 로 필터한 뒤 percentile_rank 로 0~100 변환.
"""
from __future__ import annotations
import pandas as pd
from .base import ScoreNode, percentile_rank
class AiNewsSentiment(ScoreNode):
name = "ai_news"
label = "AI 뉴스 호재/악재"
default_params = {"min_news_count": 1}
param_schema = {
"type": "object",
"properties": {
"min_news_count": {
"type": "integer", "minimum": 0, "default": 1,
"description": "최소 분석 뉴스 수. 미만이면 점수 미산출.",
},
},
}
def compute(self, ctx, params: dict) -> pd.Series:
df = getattr(ctx, "news_sentiment", None)
if df is None or df.empty:
return pd.Series(dtype=float)
min_news = int(params.get("min_news_count", 1))
df = df[df["news_count"] >= min_news]
if df.empty:
return pd.Series(dtype=float)
return percentile_rank(df.set_index("ticker")["score_raw"])

View File

@@ -8,6 +8,7 @@ from .nodes.high52w import High52WProximity
from .nodes.rs_rating import RsRating
from .nodes.ma_alignment import MaAlignment
from .nodes.vcp_lite import VcpLite
from .nodes.ai_news import AiNewsSentiment
NODE_REGISTRY: dict = {
"foreign_buy": ForeignBuy,
@@ -17,6 +18,7 @@ NODE_REGISTRY: dict = {
"rs_rating": RsRating,
"ma_alignment": MaAlignment,
"vcp_lite": VcpLite,
"ai_news": AiNewsSentiment,
}
GATE_REGISTRY: dict = {

View File

@@ -276,6 +276,30 @@ def list_runs(limit: int = 30):
]
# ---------- /snapshot/refresh-news-sentiment ----------
from .ai_news import pipeline as _ai_pipeline
from .ai_news import telegram as _ai_telegram
@router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
if _is_holiday(asof_date):
return {"asof": asof_date.isoformat(), "status": "skipped_holiday"}
with _conn() as c:
summary = await _ai_pipeline.refresh_daily(c, asof_date)
summary["telegram_text"] = _ai_telegram.build_message(
asof=summary["asof"],
top_pos=summary["top_pos"], top_neg=summary["top_neg"],
tokens_input=summary["tokens_input"],
tokens_output=summary["tokens_output"],
)
return summary
@router.get("/runs/{run_id}")
def get_run(run_id: int):
with _conn() as c:

View File

@@ -12,6 +12,7 @@ DEFAULT_WEIGHTS = {
"rs_rating": 1.2,
"ma_alignment": 1.0,
"vcp_lite": 0.8,
"ai_news": 0.8,
}
DEFAULT_NODE_PARAMS = {
"foreign_buy": {"window_days": 5},
@@ -21,6 +22,7 @@ DEFAULT_NODE_PARAMS = {
"rs_rating": {"weights": {"3m": 2, "6m": 1, "9m": 1, "12m": 1}},
"ma_alignment": {"ma_periods": [50, 150, 200]},
"vcp_lite": {"short_window": 40, "long_window": 252},
"ai_news": {"min_news_count": 1},
}
DEFAULT_GATE_PARAMS = {
"min_market_cap_won": 50_000_000_000,
@@ -110,12 +112,45 @@ CREATE TABLE IF NOT EXISTS screener_results (
FOREIGN KEY (run_id) REFERENCES screener_runs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_results_run_rank ON screener_results(run_id, rank);
CREATE TABLE IF NOT EXISTS news_sentiment (
ticker TEXT NOT NULL,
date TEXT NOT NULL,
score_raw REAL NOT NULL,
reason TEXT NOT NULL DEFAULT '',
news_count INTEGER NOT NULL DEFAULT 0,
tokens_input INTEGER NOT NULL DEFAULT 0,
tokens_output INTEGER NOT NULL DEFAULT 0,
model TEXT NOT NULL DEFAULT 'claude-haiku-4-5-20251001',
created_at TEXT NOT NULL DEFAULT (datetime('now','localtime')),
PRIMARY KEY (ticker, date)
);
CREATE INDEX IF NOT EXISTS idx_news_sentiment_date ON news_sentiment(date DESC);
"""
def ensure_screener_schema(conn: sqlite3.Connection) -> None:
"""Create tables and seed default settings (idempotent)."""
conn.executescript(DDL)
# ai_news 키 누락 시 1회 보충 (이미 운영 중인 환경에 대해)
row = conn.execute(
"SELECT weights_json, node_params_json FROM screener_settings WHERE id=1"
).fetchone()
if row is not None:
w = json.loads(row[0])
p = json.loads(row[1])
changed = False
if "ai_news" not in w:
w["ai_news"] = DEFAULT_WEIGHTS["ai_news"]
changed = True
if "ai_news" not in p:
p["ai_news"] = DEFAULT_NODE_PARAMS["ai_news"]
changed = True
if changed:
conn.execute(
"UPDATE screener_settings SET weights_json=?, node_params_json=? WHERE id=1",
(json.dumps(w), json.dumps(p)),
)
existing = conn.execute("SELECT id FROM screener_settings WHERE id=1").fetchone()
if existing is None:
now = datetime.now(timezone.utc).isoformat()

View File

@@ -21,15 +21,16 @@ def client():
return TestClient(app)
def test_get_nodes_lists_7_score_and_1_gate(client):
def test_get_nodes_lists_8_score_and_1_gate(client):
r = client.get("/api/stock/screener/nodes")
assert r.status_code == 200
body = r.json()
assert len(body["score_nodes"]) == 7
assert len(body["score_nodes"]) == 8
assert len(body["gate_nodes"]) == 1
assert {n["name"] for n in body["score_nodes"]} == {
"foreign_buy", "volume_surge", "momentum",
"high52w", "rs_rating", "ma_alignment", "vcp_lite",
"ai_news",
}

View File

@@ -1,4 +1,5 @@
# 주식 서비스용 라이브러리
anthropic==0.39.0
requests==2.32.3
httpx==0.27.2
beautifulsoup4==4.12.3
@@ -8,4 +9,6 @@ apscheduler==3.10.4
python-dotenv==1.0.1
finance-datareader==0.9.110
lxml==6.1.0
pytest==8.3.2
pytest-asyncio==0.24.0

View File

@@ -0,0 +1,55 @@
import json
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.screener.ai_news import analyzer
def _mk_llm(content_text: str, in_tokens: int = 100, out_tokens: int = 20):
llm = AsyncMock()
resp = MagicMock()
block = MagicMock()
block.text = content_text
resp.content = [block]
resp.usage = MagicMock(input_tokens=in_tokens, output_tokens=out_tokens)
llm.messages = MagicMock()
llm.messages.create = AsyncMock(return_value=resp)
return llm
NEWS = [{"title": "삼성전자, HBM 양산"}, {"title": "메모리 가격 반등"}]
@pytest.mark.asyncio
async def test_score_sentiment_success_parses_json():
llm = _mk_llm(json.dumps({"score": 7.5, "reason": "HBM 호재"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS, name="삼성전자")
assert out["ticker"] == "005930"
assert out["score_raw"] == 7.5
assert out["reason"] == "HBM 호재"
assert out["news_count"] == 2
assert out["tokens_input"] == 100
assert out["tokens_output"] == 20
@pytest.mark.asyncio
async def test_score_sentiment_json_parse_fail_returns_zero():
llm = _mk_llm("not valid json")
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == 0.0
assert "parse fail" in out["reason"]
assert out["tokens_input"] == 100 # 호출은 발생했음
@pytest.mark.asyncio
async def test_score_sentiment_clamps_out_of_range():
llm = _mk_llm(json.dumps({"score": 15.0, "reason": "초강세"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == 10.0 # +10 클램프
@pytest.mark.asyncio
async def test_score_sentiment_clamps_negative_out_of_range():
llm = _mk_llm(json.dumps({"score": -42.0, "reason": "초악재"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == -10.0

View File

@@ -0,0 +1,57 @@
import datetime as dt
import pandas as pd
import pytest
from app.screener.nodes.ai_news import AiNewsSentiment
class FakeCtx:
def __init__(self, df=None):
self.news_sentiment = df
self.asof = dt.date(2026, 5, 13)
def test_compute_empty_context():
out = AiNewsSentiment().compute(FakeCtx(None), {"min_news_count": 1})
assert out.empty
def test_compute_with_data_percentile_ranks():
df = pd.DataFrame([
{"ticker": "A", "score_raw": -5.0, "news_count": 3},
{"ticker": "B", "score_raw": 0.0, "news_count": 3},
{"ticker": "C", "score_raw": 8.0, "news_count": 3},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert len(out) == 3
# percentile rank: A (lowest) < B < C (highest)
assert out.loc["A"] < out.loc["B"] < out.loc["C"]
# all within [0, 100]
assert (out >= 0).all() and (out <= 100).all()
def test_compute_filters_by_min_news_count():
df = pd.DataFrame([
{"ticker": "A", "score_raw": -5.0, "news_count": 0}, # 필터됨
{"ticker": "B", "score_raw": 0.0, "news_count": 2},
{"ticker": "C", "score_raw": 8.0, "news_count": 5},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert "A" not in out.index
assert "B" in out.index
assert "C" in out.index
def test_compute_all_filtered_returns_empty():
df = pd.DataFrame([
{"ticker": "A", "score_raw": 5.0, "news_count": 0},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert out.empty
def test_metadata():
n = AiNewsSentiment()
assert n.name == "ai_news"
assert "AI" in n.label or "뉴스" in n.label
assert n.default_params == {"min_news_count": 1}
assert "min_news_count" in n.param_schema["properties"]

View File

@@ -0,0 +1,110 @@
import datetime as dt
import sqlite3
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from app.screener.ai_news import pipeline
from app.screener.schema import ensure_screener_schema
@pytest.fixture
def conn():
c = sqlite3.connect(":memory:")
c.row_factory = sqlite3.Row
ensure_screener_schema(c)
# 시총 상위 3종목 시드
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("005930", "삼성전자", 9_000_000))
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("000660", "SK하이닉스", 8_000_000))
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("373220", "LG에너지솔루션", 7_000_000))
c.commit()
yield c
c.close()
@pytest.mark.asyncio
async def test_refresh_daily_happy_path(conn):
"""3종목 mini integration — 각 종목별로 scraper/analyzer mock."""
asof = dt.date(2026, 5, 13)
fake_news = [{"title": "헤드라인"}]
async def fake_fetch(client, ticker, n):
return fake_news
scores_by_ticker = {
"005930": 7.5, "000660": 4.0, "373220": -6.0,
}
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": scores_by_ticker[ticker],
"reason": f"r{ticker}", "news_count": 1,
"tokens_input": 100, "tokens_output": 20, "model": model,
}
with patch.object(pipeline, "_scraper") as ms, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \
patch.object(pipeline, "_make_http") as mh:
ms.fetch_news = fake_fetch
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock()
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
assert result["asof"] == "2026-05-13"
assert result["updated"] == 3
assert result["failures"] == []
assert len(result["top_pos"]) == 3
assert result["top_pos"][0]["ticker"] == "005930" # 가장 큰 점수
assert result["top_neg"][0]["ticker"] == "373220" # 가장 작은 점수
assert result["tokens_input"] == 300
assert result["tokens_output"] == 60
# DB upsert 확인
rows = conn.execute("SELECT ticker, score_raw FROM news_sentiment WHERE date=?",
("2026-05-13",)).fetchall()
assert len(rows) == 3
by_ticker = {r["ticker"]: r["score_raw"] for r in rows}
assert by_ticker["005930"] == 7.5
assert by_ticker["373220"] == -6.0
@pytest.mark.asyncio
async def test_refresh_daily_failures_isolated(conn):
"""한 종목이 예외 던져도 나머지 종목은 정상 처리."""
asof = dt.date(2026, 5, 13)
async def fake_fetch(client, ticker, n):
return [{"title": "h"}]
async def fake_score(llm, ticker, news, *, name=None, model="m"):
if ticker == "000660":
raise RuntimeError("llm exploded")
return {
"ticker": ticker, "score_raw": 5.0, "reason": "r", "news_count": 1,
"tokens_input": 100, "tokens_output": 20, "model": model,
}
with patch.object(pipeline, "_scraper") as ms, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \
patch.object(pipeline, "_make_http") as mh:
ms.fetch_news = fake_fetch
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock()
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
assert result["updated"] == 2
assert len(result["failures"]) == 1
def test_top_market_cap_tickers(conn):
out = pipeline._top_market_cap_tickers(conn, n=2)
assert out == ["005930", "000660"]

View File

@@ -0,0 +1,36 @@
import datetime as dt
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from app.main import app
def test_refresh_news_sentiment_weekend_skip():
# 2026-05-16 = Saturday
client = TestClient(app)
resp = client.post(
"/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-16"
)
assert resp.status_code == 200
assert resp.json()["status"] == "skipped_weekend"
def test_refresh_news_sentiment_weekday_invokes_pipeline():
fake_summary = {
"asof": "2026-05-13", "updated": 3, "failures": [],
"duration_sec": 1.0, "tokens_input": 100, "tokens_output": 20,
"top_pos": [], "top_neg": [], "model": "m",
}
with patch("app.screener.router._ai_pipeline") as mp, \
patch("app.screener.router._ai_telegram") as mt:
mp.refresh_daily = AsyncMock(return_value=fake_summary)
mt.build_message = lambda **kw: "BUILT_TEXT"
client = TestClient(app)
resp = client.post(
"/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-13"
)
assert resp.status_code == 200
body = resp.json()
assert body["asof"] == "2026-05-13"
assert body["updated"] == 3
assert body["telegram_text"] == "BUILT_TEXT"

View File

@@ -0,0 +1,55 @@
import pytest
from unittest.mock import AsyncMock
from app.screener.ai_news import scraper
SAMPLE_HTML = """
<html><body>
<table class="type5"><tbody>
<tr><td class="title"><a href="/news1">삼성전자, HBM 양산 가시화</a></td><td class="date">2026.05.13 07:30</td></tr>
<tr><td class="title"><a href="/news2">삼성, 4분기 어닝 쇼크 우려</a></td><td class="date">2026.05.13 06:00</td></tr>
<tr><td class="title"><a href="/news3">메모리 시장 회복세</a></td><td class="date">2026.05.12 18:00</td></tr>
</tbody></table>
</body></html>
"""
EMPTY_HTML = "<html><body><table class='type5'><tbody></tbody></table></body></html>"
def _mk_client(status_code=200, text=SAMPLE_HTML):
client = AsyncMock()
resp = AsyncMock()
resp.status_code = status_code
resp.text = text
client.get = AsyncMock(return_value=resp)
return client
@pytest.mark.asyncio
async def test_fetch_news_success_returns_n_items():
client = _mk_client()
out = await scraper.fetch_news(client, "005930", n=2)
assert len(out) == 2
assert out[0]["title"] == "삼성전자, HBM 양산 가시화"
assert out[0]["date"] == "2026.05.13 07:30"
@pytest.mark.asyncio
async def test_fetch_news_404_returns_empty():
client = _mk_client(status_code=404, text="")
out = await scraper.fetch_news(client, "999999", n=5)
assert out == []
@pytest.mark.asyncio
async def test_fetch_news_empty_table_returns_empty():
client = _mk_client(text=EMPTY_HTML)
out = await scraper.fetch_news(client, "005930", n=5)
assert out == []
@pytest.mark.asyncio
async def test_fetch_news_n_caps_results():
client = _mk_client()
out = await scraper.fetch_news(client, "005930", n=2)
assert len(out) == 2 # 샘플에 3개 있지만 n=2로 잘림

View File

@@ -0,0 +1,54 @@
from app.screener.ai_news import telegram as tg
def _row(ticker, score, reason="r"):
return {"ticker": ticker, "score_raw": score, "reason": reason,
"news_count": 5, "tokens_input": 100, "tokens_output": 20,
"model": "m"}
def test_build_message_includes_top_sections():
msg = tg.build_message(
asof="2026-05-13",
top_pos=[_row("005930", 8.5, "HBM 호재")],
top_neg=[_row("373220", -6.3, "수주 지연")],
tokens_input=10000, tokens_output=2000,
)
assert "AI 뉴스 분석" in msg
assert "호재 Top" in msg
assert "악재 Top" in msg
assert "005930" in msg
assert "8.5" in msg
assert "HBM" in msg
assert "373220" in msg
def test_build_message_escapes_markdownv2_specials():
msg = tg.build_message(
asof="2026-05-13",
top_pos=[_row("005930", 3.0, "테스트(괄호) [대괄호]")],
top_neg=[],
tokens_input=100, tokens_output=20,
)
# MarkdownV2 특수문자 ( ) [ ] 이 escape 되어야 함
assert r"\(" in msg or r"\)" in msg
assert r"\[" in msg or r"\]" in msg
def test_build_message_cost_won_line():
msg = tg.build_message(
asof="2026-05-13", top_pos=[], top_neg=[],
tokens_input=10000, tokens_output=2000,
)
# tokens_input × 0.0013 + tokens_output × 0.0065 = 13 + 13 = ₩26
assert "₩26" in msg or "₩ 26" in msg or "" in msg
def test_build_message_empty_lists():
msg = tg.build_message(
asof="2026-05-13", top_pos=[], top_neg=[],
tokens_input=0, tokens_output=0,
)
# 빈 리스트라도 헤더는 있어야 함
assert "호재 Top" in msg
assert "악재 Top" in msg

View File

@@ -7,9 +7,10 @@ DB_PATH = os.getenv("TRAVEL_DB_PATH", "/data/thumbs/travel.db")
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn