22 Commits

Author SHA1 Message Date
2a552d3cc8 test(screener): update node count test to 8 (ai_news added) 2026-05-13 23:52:54 +09:00
f37b21a408 fix(agent-office): on_ai_news_schedule — graceful fail on missing telegram_text 2026-05-13 23:48:59 +09:00
df7a8d985e feat(agent-office): cron mon-fri 08:00 ai_news sentiment job 2026-05-13 23:46:37 +09:00
c5d0c84183 feat(agent-office): on_ai_news_schedule (cron handler + telegram dispatch) 2026-05-13 23:46:17 +09:00
53a78a1062 feat(agent-office): refresh_ai_news_sentiment service helper 2026-05-13 23:45:51 +09:00
ca8bcb3fed feat(screener): POST /snapshot/refresh-news-sentiment with telegram_text 2026-05-13 23:44:38 +09:00
4b4f91c052 feat(screener): register ai_news in NODE_REGISTRY 2026-05-13 23:41:21 +09:00
6c3a84b8ec feat(screener): ScreenContext.news_sentiment field + load query 2026-05-13 23:41:01 +09:00
2ff2645240 feat(screener): AiNewsSentiment ScoreNode (percentile_rank + min_news_count) 2026-05-13 23:39:42 +09:00
f2143b3889 feat(screener): ai_news telegram message builder (MarkdownV2 + cost line) 2026-05-13 23:38:07 +09:00
810cc76d40 feat(screener): ai_news pipeline (top-100 parallel, fail-soft, upsert) 2026-05-13 23:36:03 +09:00
0a91f43c46 feat(screener): ai_news Claude Haiku analyzer (-10~+10 + clamp + JSON-fail soft) 2026-05-13 23:33:20 +09:00
3d321f2b4b chore(stock-lab): add pytest + pytest-asyncio to requirements 2026-05-13 23:30:47 +09:00
6ba29599aa feat(screener): ai_news scraper (naver finance ticker news) 2026-05-13 23:29:52 +09:00
658ed13571 feat(screener): add news_sentiment table + ai_news defaults + migration 2026-05-13 23:26:38 +09:00
15ee3c3301 fix(compose): frontend.depends_on 누락된 6개 lab 추가
lotto, stock-lab, agent-office, personal, packs-lab, travel-proxy 가
누락되어 있어 한 컨테이너 다운 시 nginx upstream resolve 실패 위험.
이번 사이클에 lotto httpx 사고로 명시화된 risk 를 해소.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 22:12:07 +09:00
2b5009f864 fix(sqlite): WAL + busy_timeout 120s standardized across all labs
8개 lab의 _conn() 함수에 표준 동시성 패턴 통일:
- timeout=120.0 (connection 획득)
- PRAGMA journal_mode=WAL (reader/writer 분리)
- PRAGMA busy_timeout=120000 (트랜잭션 충돌 시 120초 대기)

stock-lab/screener/router.py 의 검증된 패턴(d9b6122) 을 lotto, stock-lab(메인),
music-lab, blog-lab, realestate-lab, agent-office, personal, travel-proxy 로 확산.
기존 'database is locked' 오류 윈도우를 흡수.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 22:12:01 +09:00
d9b612253a fix(stock-lab): snapshot flow 범위 100종목 + busy_timeout 2분 (writer 충돌 완화)
자동 잡 16:30 KST 실패 원인:
- agent-office httpx timeout 180s
- 그러나 snapshot/refresh의 flow 스크래핑(500종목 × 0.2-0.5s) = 100~250s
- 180s 초과 시 client timeout → 서버 background 처리 계속
- 곧 /run 호출 → snapshot의 long write transaction과 INSERT 충돌
- WAL은 reader/writer 분리만, writer 두 명은 직렬 → busy_timeout 30s 초과 lock

Fix:
- DEFAULT_FLOW_TOP_N 500 → 100 (시총 상위 100종목 × 0.2s = ~20s)
- busy_timeout 30s → 120s (snapshot write 시간보다 충분히 김)
- connect timeout 30s → 120s

외국인 매수 시그널은 대형주에서 의미 큼. 상위 100종목으로 충분.
더 많은 커버리지 필요 시 별도 cron으로 snapshot/refresh와 /run 시간 분리.
2026-05-13 19:56:30 +09:00
db4322006d fix(stock-lab): screener DB connection WAL 모드 + busy_timeout 30s
snapshot/refresh 직후 /run mode=auto가 'database is locked'으로 500
실패하던 증상 fix. SQLite 기본 rollback journal 모드 + busy_timeout=0
조합에서 long write transaction과 read가 겹치면 즉시 OperationalError.

PRAGMA journal_mode=WAL: reader가 writer를 block 안 함
PRAGMA busy_timeout=30000: 30초 대기 후 timeout (즉시 실패 X)
sqlite3.connect timeout=30: connection 획득 자체에도 대기 적용

agent-office 자동 잡 16:30 KST 흐름 안정화.
2026-05-13 16:50:25 +09:00
a05e6ba8ca feat(stock-lab): 텔레그램 노드 풀 라벨 + 원 단위 표기
- 아이콘(👤외/🆙고/...) 제거하고 풀 한글 라벨로 변경
  (외국인/거래량급증/20일모멘텀/52주신고가/RS레이팅/이평선정배열/VCP수축)
- 가격은 "103,917원" 형태로 원 단위 명시
- 활성 노드 없을 때 fallback 문구
- 테스트도 새 포맷으로 갱신 + 원 단위 검증 신규 케이스
2026-05-13 07:52:17 +09:00
4a333434ac Merge feature/stock-screener-board: Stock Screener Board MVP (backend + agent-office)
stock-lab:
- pykrx→FDR/네이버 데이터 전환 (KRX 인증 회피)
- 스키마 7테이블 + 디폴트 시드
- snapshot.py (FDR 마스터·일봉 + 네이버 외국인 수급, 시총 상위 500종목)
- ScreenContext, ScoreNode/GateNode 추상, percentile_rank
- 게이트 1 (HygieneGate) + 점수 노드 7 (ForeignBuy/VolumeSurge/Momentum20/
  High52WProximity/RsRating/MaAlignment/VcpLite)
- Screener 엔진 + combine + position_sizer (ATR Wilder) + telegram 빌더
- FastAPI 라우터: /nodes, /settings, /run (preview/manual_save/auto),
  /snapshot/refresh, /runs (리스트·상세), 공휴일·주말 skipped_holiday

agent-office:
- StockAgent.on_screener_schedule + run_screener 명령
- 평일 16:30 KST APScheduler cron (Asia/Seoul)
- service_proxy 헬퍼, send_raw parse_mode 확장 (MarkdownV2 지원)
- 5 신규 테스트, 38 회귀 통과
2026-05-13 07:23:43 +09:00
204cee67d6 fix(lotto): grade_weekly_review import용 httpx 의존성 추가
운영 사이트 nginx emerg 'host not found in upstream lotto'의 진짜
원인은 lotto 컨테이너 자체가 ModuleNotFoundError: httpx로 시작 실패한
것이었음. grade_weekly_review.py가 httpx를 import하는데 requirements
에서 누락. 재빌드 시 컨테이너 정상 부팅 가능.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 08:03:34 +09:00
34 changed files with 1021 additions and 37 deletions

View File

@@ -233,6 +233,109 @@ class StockAgent(BaseAgent):
await self.transition("idle", f"스크리너 오류: {err_msg[:80]}")
async def on_ai_news_schedule(self) -> None:
"""AI 뉴스 sentiment 분석 자동 잡 (평일 08:00 KST).
흐름:
1) stock-lab /snapshot/refresh-news-sentiment 호출
2) status='skipped_weekend'/'skipped_holiday' → 종료 (텔레그램 미발신)
3) updated=0 → 운영자 알림 (HTML)
4) failures > 30% → 경고 알림 후 메인 메시지 발송
5) 정상 → Top 5 호재/악재 메시지 발송 (MarkdownV2)
"""
if self.state not in ("idle", "break"):
return
task_id = create_task(self.agent_id, "ai_news_sentiment", {})
await self.transition("working", "AI 뉴스 분석 중...", task_id)
try:
result = await service_proxy.refresh_ai_news_sentiment()
except Exception as e:
err_msg = str(e)
add_log(self.agent_id, f"AI 뉴스 분석 실패: {err_msg}", "error", task_id)
update_task_status(task_id, "failed", {"error": err_msg})
try:
from ..telegram.messaging import send_raw
await send_raw(
f"⚠️ <b>AI 뉴스 분석 실패</b>\n"
f"<code>{html.escape(err_msg)[:500]}</code>"
)
except Exception as notify_err:
add_log(
self.agent_id,
f"operator notify failed: {notify_err}",
"warning", task_id,
)
await self.transition("idle", f"AI 뉴스 오류: {err_msg[:80]}")
return
status = result.get("status")
if status in ("skipped_weekend", "skipped_holiday"):
update_task_status(task_id, "succeeded", {"status": status})
add_log(self.agent_id, f"AI 뉴스 건너뜀: {status}", "info", task_id)
await self.transition("idle", "휴일/주말 — 건너뜀")
return
updated = int(result.get("updated", 0))
failures = result.get("failures", []) or []
if updated == 0:
update_task_status(task_id, "failed", {"reason": "0 tickers updated"})
try:
from ..telegram.messaging import send_raw
await send_raw(
"⚠️ <b>AI 뉴스 분석 0종목</b>\n"
"스크래핑/LLM 전체 실패 — 어제 데이터 사용"
)
except Exception:
pass
await self.transition("idle", "AI 뉴스 0건")
return
# 실패율 경고 (별도 알림, 본 메시지는 계속 발송)
failure_rate = len(failures) / max(1, updated + len(failures))
if failure_rate > 0.3:
try:
from ..telegram.messaging import send_raw
await send_raw(
f"⚠️ <b>AI 뉴스 실패율 {failure_rate:.0%}</b>\n"
f"updated={updated}, failures={len(failures)}"
)
except Exception:
pass
# 정상 — Top 5 메시지 (stock-lab이 빌드해서 응답에 telegram_text 동봉)
text = result.get("telegram_text") or ""
if not text:
add_log(self.agent_id, "telegram_text 누락 — stock-lab 응답 결함", "error", task_id)
update_task_status(task_id, "failed", {"error": "telegram_text 누락"})
await self.transition("idle", "AI 뉴스 응답 결함")
return
await self.transition("reporting", "AI 뉴스 알림 전송 중...")
from ..telegram.messaging import send_raw
tg = await send_raw(text, parse_mode="MarkdownV2")
update_task_status(task_id, "succeeded", {
"asof": result["asof"],
"updated": updated,
"failures": len(failures),
"tokens_input": int(result.get("tokens_input", 0)),
"tokens_output": int(result.get("tokens_output", 0)),
"telegram_sent": tg.get("ok", False),
})
if not tg.get("ok"):
desc = tg.get("description") or "unknown"
code = tg.get("error_code")
add_log(
self.agent_id,
f"AI news telegram send failed: [{code}] {desc}",
"warning", task_id,
)
await self.transition("idle", "AI 뉴스 완료")
async def on_command(self, command: str, params: dict) -> dict:
if command == "run_screener":
await self.on_screener_schedule()

View File

@@ -9,9 +9,10 @@ from .config import DB_PATH
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, timeout=10)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn

View File

@@ -19,6 +19,11 @@ async def _run_stock_screener():
if agent:
await agent.on_screener_schedule()
async def _run_stock_ai_news():
agent = AGENT_REGISTRY.get("stock")
if agent:
await agent.on_ai_news_schedule()
async def _run_blog_schedule():
agent = AGENT_REGISTRY.get("blog")
if agent:
@@ -54,6 +59,14 @@ def init_scheduler():
minute=30,
id="stock_screener",
)
scheduler.add_job(
_run_stock_ai_news,
"cron",
day_of_week="mon-fri",
hour=8,
minute=0,
id="stock_ai_news_sentiment",
)
scheduler.add_job(_run_blog_schedule, "cron", hour=10, minute=0, id="blog_pipeline")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")

View File

@@ -43,6 +43,20 @@ async def refresh_screener_snapshot() -> Dict[str, Any]:
return resp.json()
async def refresh_ai_news_sentiment() -> Dict[str, Any]:
"""stock-lab의 AI 뉴스 sentiment 분석 트리거 (08:00 cron).
네이버 100종목 스크래핑 + Claude Haiku 100콜 병렬 = 약 30-60초.
여유있게 240s timeout.
"""
async with httpx.AsyncClient(timeout=240.0) as client:
resp = await client.post(
f"{STOCK_LAB_URL}/api/stock/screener/snapshot/refresh-news-sentiment"
)
resp.raise_for_status()
return resp.json()
async def run_stock_screener(mode: str = "auto") -> Dict[str, Any]:
"""stock-lab의 스크리너 실행.

View File

@@ -8,9 +8,10 @@ from .config import DB_PATH
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn

View File

@@ -241,9 +241,15 @@ services:
container_name: frontend
restart: unless-stopped
depends_on:
- lotto
- stock-lab
- music-lab
- blog-lab
- realestate-lab
- agent-office
- personal
- packs-lab
- travel-proxy
ports:
- "8080:80"
volumes:

View File

@@ -9,8 +9,10 @@ DB_PATH = "/app/data/lotto.db"
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn
def _ensure_column(conn: sqlite3.Connection, table: str, col: str, ddl: str) -> None:

View File

@@ -1,5 +1,6 @@
fastapi==0.115.6
uvicorn[standard]==0.30.6
requests==2.32.3
httpx==0.27.2
beautifulsoup4==4.12.3
APScheduler==3.10.4

View File

@@ -9,8 +9,10 @@ DB_PATH = "/app/data/music.db"
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn

View File

@@ -9,9 +9,10 @@ DB_PATH = "/app/data/personal.db"
def _conn():
c = sqlite3.connect(DB_PATH, timeout=10)
c = sqlite3.connect(DB_PATH, timeout=120.0)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL;")
c.execute("PRAGMA busy_timeout=120000;")
c.execute("PRAGMA foreign_keys=ON;")
return c

View File

@@ -12,9 +12,10 @@ DB_PATH = os.getenv("REALESTATE_DB_PATH", "/app/data/realestate.db")
def _conn():
c = sqlite3.connect(DB_PATH, timeout=10)
c = sqlite3.connect(DB_PATH, timeout=120.0)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL;")
c.execute("PRAGMA busy_timeout=120000;")
c.execute("PRAGMA foreign_keys=ON;")
return c

View File

@@ -12,8 +12,10 @@ def _conn() -> sqlite3.Connection:
parent = os.path.dirname(db_path)
if parent:
os.makedirs(parent, exist_ok=True)
conn = sqlite3.connect(db_path)
conn = sqlite3.connect(db_path, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn
def init_db():

View File

@@ -0,0 +1,76 @@
"""Claude Haiku 기반 종목 뉴스 호재/악재 분석."""
from __future__ import annotations
import json
import logging
import os
from typing import Any, Dict, List
log = logging.getLogger(__name__)
DEFAULT_MODEL = os.getenv("AI_NEWS_MODEL", "claude-haiku-4-5-20251001")
PROMPT_TEMPLATE = """다음은 종목 {name}({ticker})에 대한 최근 뉴스 {n}개의 헤드라인입니다.
{news_block}
이 뉴스들이 종목에 호재인지 악재인지 평가하세요.
score: -10(매우 강한 악재) ~ +10(매우 강한 호재) 사이의 실수. 0은 중립.
reason: 30자 이내 한 줄 근거.
JSON으로만 응답하세요. 다른 텍스트 금지:
{{"score": <float>, "reason": "<string>"}}"""
def _clamp(x: float, lo: float = -10.0, hi: float = 10.0) -> float:
return max(lo, min(hi, x))
async def score_sentiment(
llm,
ticker: str,
news: List[Dict[str, Any]],
*,
name: str | None = None,
model: str = DEFAULT_MODEL,
) -> Dict[str, Any]:
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}."""
news_block = "\n".join(f"- {n['title']}" for n in news)
prompt = PROMPT_TEMPLATE.format(
name=name or ticker, ticker=ticker,
n=len(news), news_block=news_block,
)
resp = await llm.messages.create(
model=model,
max_tokens=200,
messages=[{"role": "user", "content": prompt}],
)
text = resp.content[0].text if resp.content else ""
in_tokens = int(getattr(resp.usage, "input_tokens", 0) or 0)
out_tokens = int(getattr(resp.usage, "output_tokens", 0) or 0)
try:
data = json.loads(text)
score = _clamp(float(data["score"]))
reason = str(data["reason"])[:200]
return {
"ticker": ticker,
"score_raw": score,
"reason": reason,
"news_count": len(news),
"tokens_input": in_tokens,
"tokens_output": out_tokens,
"model": model,
}
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
log.warning("ai_news parse fail for %s: %s (raw=%r)", ticker, e, text[:100])
return {
"ticker": ticker,
"score_raw": 0.0,
"reason": f"parse fail: {e!s}"[:200],
"news_count": len(news),
"tokens_input": in_tokens,
"tokens_output": out_tokens,
"model": model,
}

View File

@@ -0,0 +1,150 @@
"""ai_news refresh pipeline — 시총 상위 N종목 병렬 처리."""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
import os
import sqlite3
import time
from typing import Any, Dict, List, Optional
import httpx
from . import scraper as _scraper
from . import analyzer as _analyzer
log = logging.getLogger(__name__)
DEFAULT_TOP_N = 100
DEFAULT_CONCURRENCY = 10
DEFAULT_NEWS_PER_TICKER = 5
DEFAULT_RATE_LIMIT_SEC = 0.2
def _top_market_cap_tickers(conn: sqlite3.Connection, n: int) -> List[str]:
rows = conn.execute(
"SELECT ticker FROM krx_master "
"WHERE market_cap IS NOT NULL AND is_preferred=0 AND is_spac=0 "
"ORDER BY market_cap DESC LIMIT ?",
(n,),
).fetchall()
return [r[0] for r in rows]
def _make_http():
return httpx.AsyncClient(timeout=10.0, headers=_scraper.NAVER_HEADERS)
def _make_llm():
"""Anthropic AsyncClient — env에 ANTHROPIC_API_KEY 필수."""
from anthropic import AsyncAnthropic
return AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
async def _process_one(
ticker: str, name: str, sem: asyncio.Semaphore,
http_client, llm, news_per_ticker: int, rate_limit_sec: float, model: str,
) -> Dict[str, Any]:
async with sem:
if rate_limit_sec > 0:
await asyncio.sleep(rate_limit_sec)
news = await _scraper.fetch_news(http_client, ticker, n=news_per_ticker)
if not news:
return {
"ticker": ticker, "score_raw": 0.0, "reason": "no news",
"news_count": 0, "tokens_input": 0, "tokens_output": 0,
"model": model,
}
return await _analyzer.score_sentiment(
llm, ticker, news, name=name, model=model,
)
def _upsert_news_sentiment(
conn: sqlite3.Connection, asof: dt.date, rows: List[Dict[str, Any]]
) -> None:
iso = asof.isoformat()
data = [
(
r["ticker"], iso, r["score_raw"], r["reason"], r["news_count"],
r["tokens_input"], r["tokens_output"], r["model"],
)
for r in rows
]
conn.executemany(
"""INSERT INTO news_sentiment
(ticker, date, score_raw, reason, news_count,
tokens_input, tokens_output, model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, date) DO UPDATE SET
score_raw=excluded.score_raw,
reason=excluded.reason,
news_count=excluded.news_count,
tokens_input=excluded.tokens_input,
tokens_output=excluded.tokens_output,
model=excluded.model
""",
data,
)
conn.commit()
async def refresh_daily(
conn: sqlite3.Connection,
asof: dt.date,
*,
top_n: int = DEFAULT_TOP_N,
concurrency: int = DEFAULT_CONCURRENCY,
news_per_ticker: int = DEFAULT_NEWS_PER_TICKER,
rate_limit_sec: float = DEFAULT_RATE_LIMIT_SEC,
model: str = _analyzer.DEFAULT_MODEL,
) -> Dict[str, Any]:
"""Returns summary dict with top_pos/top_neg/token totals/failures."""
started = time.time()
tickers = _top_market_cap_tickers(conn, n=top_n)
name_map = {
r[0]: r[1] for r in conn.execute(
f"SELECT ticker, name FROM krx_master WHERE ticker IN "
f"({','.join('?' * len(tickers))})", tickers,
).fetchall()
} if tickers else {}
sem = asyncio.Semaphore(concurrency)
async with _make_http() as http_client, _make_llm() as llm:
tasks = [
_process_one(
t, name_map.get(t, t), sem, http_client, llm,
news_per_ticker, rate_limit_sec, model,
)
for t in tickers
]
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
successes: List[Dict[str, Any]] = []
failures: List[str] = []
for r in raw_results:
if isinstance(r, BaseException):
failures.append(repr(r))
elif isinstance(r, dict):
successes.append(r)
if successes:
_upsert_news_sentiment(conn, asof, successes)
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5]
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5]
return {
"asof": asof.isoformat(),
"updated": len(successes),
"failures": failures,
"duration_sec": round(time.time() - started, 2),
"tokens_input": sum(r["tokens_input"] for r in successes),
"tokens_output": sum(r["tokens_output"] for r in successes),
"top_pos": top_pos,
"top_neg": top_neg,
"model": model,
}

View File

@@ -0,0 +1,39 @@
"""네이버 finance 종목 뉴스 스크래핑."""
from __future__ import annotations
import logging
from typing import Any, Dict, List
from bs4 import BeautifulSoup
log = logging.getLogger(__name__)
NAVER_NEWS_URL = "https://finance.naver.com/item/news_news.naver"
NAVER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": "https://finance.naver.com/",
}
async def fetch_news(client, ticker: str, n: int = 5) -> List[Dict[str, Any]]:
"""Scrape top N news headlines for a ticker. Returns [] on any failure."""
try:
r = await client.get(NAVER_NEWS_URL, params={"code": ticker, "page": 1})
except Exception as e:
log.warning("ai_news scrape http error for %s: %s", ticker, e)
return []
if r.status_code != 200:
return []
soup = BeautifulSoup(r.text, "lxml")
out: List[Dict[str, Any]] = []
for row in soup.select("table.type5 tbody tr")[:n]:
title_el = row.select_one("td.title a")
date_el = row.select_one("td.date")
if not title_el or not date_el:
continue
out.append({
"title": title_el.get_text(strip=True),
"date": date_el.get_text(strip=True),
})
return out

View File

@@ -0,0 +1,61 @@
"""ai_news Top 5/5 텔레그램 메시지 빌더 (MarkdownV2)."""
from __future__ import annotations
from typing import Any, Dict, List
_MD_SPECIAL = r"_*[]()~`>#+-=|{}.!\\"
def _escape(text: str) -> str:
return "".join("\\" + c if c in _MD_SPECIAL else c for c in str(text))
def _cost_won(tokens_input: int, tokens_output: int) -> int:
"""Claude Haiku 가격 환산 (대략): in $1/M × ₩1300, out $5/M × ₩1300."""
return int(tokens_input * 0.0013 + tokens_output * 0.0065)
def _row_line(idx: int, r: Dict[str, Any]) -> str:
score = r["score_raw"]
sign = "+" if score >= 0 else ""
return (
f"{idx}\\. {_escape(r['ticker'])} \\({sign}{score:.1f}\\) — "
f"{_escape(r['reason'])}"
)
def build_message(
*,
asof: str,
top_pos: List[Dict[str, Any]],
top_neg: List[Dict[str, Any]],
tokens_input: int,
tokens_output: int,
) -> str:
lines: List[str] = [
f"🌅 *AI 뉴스 분석* \\({_escape(asof)} 08:00\\)",
"",
"📈 *호재 Top 5*",
]
if top_pos:
for i, r in enumerate(top_pos, 1):
lines.append(_row_line(i, r))
else:
lines.append(_escape("- (없음)"))
lines += ["", "📉 *악재 Top 5*"]
if top_neg:
for i, r in enumerate(top_neg, 1):
lines.append(_row_line(i, r))
else:
lines.append(_escape("- (없음)"))
cost = _cost_won(tokens_input, tokens_output)
lines += [
"",
f"_분석: 시총 상위 100종목 · 토큰 {tokens_input:,} in / {tokens_output:,} out · "
f"약 ₩{cost:,}_",
]
return "\n".join(lines)

View File

@@ -17,6 +17,7 @@ class ScreenContext:
flow: pd.DataFrame # cols: ticker,date,foreign_net,institution_net
kospi: pd.Series # index=date(str), name="kospi"
asof: dt.date
news_sentiment: "pd.DataFrame | None" = None
@classmethod
def load(cls, conn: sqlite3.Connection, asof: dt.date,
@@ -38,6 +39,10 @@ class ScreenContext:
"FROM krx_flow WHERE date BETWEEN ? AND ? ORDER BY date",
conn, params=(cutoff, asof_iso),
)
news_sentiment = pd.read_sql_query(
"SELECT ticker, score_raw, news_count FROM news_sentiment WHERE date = ?",
conn, params=(asof_iso,),
)
# KOSPI 지수: MVP에서는 005930(삼성전자) 종가를 시장 대용으로 사용.
# 후속 슬라이스에서 ^KS11 별도 캐시.
@@ -47,7 +52,8 @@ class ScreenContext:
kospi = sub.copy()
kospi.name = "kospi"
return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof)
return cls(master=master, prices=prices, flow=flow, kospi=kospi, asof=asof,
news_sentiment=news_sentiment)
def restrict(self, tickers) -> "ScreenContext":
tickers = pd.Index(tickers)

View File

@@ -0,0 +1,36 @@
"""AI 뉴스 호재/악재 점수 노드.
ScreenContext.news_sentiment (DataFrame: ticker, score_raw, news_count) 를
min_news_count 로 필터한 뒤 percentile_rank 로 0~100 변환.
"""
from __future__ import annotations
import pandas as pd
from .base import ScoreNode, percentile_rank
class AiNewsSentiment(ScoreNode):
name = "ai_news"
label = "AI 뉴스 호재/악재"
default_params = {"min_news_count": 1}
param_schema = {
"type": "object",
"properties": {
"min_news_count": {
"type": "integer", "minimum": 0, "default": 1,
"description": "최소 분석 뉴스 수. 미만이면 점수 미산출.",
},
},
}
def compute(self, ctx, params: dict) -> pd.Series:
df = getattr(ctx, "news_sentiment", None)
if df is None or df.empty:
return pd.Series(dtype=float)
min_news = int(params.get("min_news_count", 1))
df = df[df["news_count"] >= min_news]
if df.empty:
return pd.Series(dtype=float)
return percentile_rank(df.set_index("ticker")["score_raw"])

View File

@@ -8,6 +8,7 @@ from .nodes.high52w import High52WProximity
from .nodes.rs_rating import RsRating
from .nodes.ma_alignment import MaAlignment
from .nodes.vcp_lite import VcpLite
from .nodes.ai_news import AiNewsSentiment
NODE_REGISTRY: dict = {
"foreign_buy": ForeignBuy,
@@ -17,6 +18,7 @@ NODE_REGISTRY: dict = {
"rs_rating": RsRating,
"ma_alignment": MaAlignment,
"vcp_lite": VcpLite,
"ai_news": AiNewsSentiment,
}
GATE_REGISTRY: dict = {

View File

@@ -45,7 +45,13 @@ def _db_path() -> str:
def _conn() -> sqlite3.Connection:
return sqlite3.connect(_db_path())
# WAL 모드 + busy_timeout으로 동시 read/write lock 회피
# WAL은 reader vs writer 동시성만 해결 — writer 두 명은 직렬이므로 busy_timeout이
# snapshot/refresh의 write 시간보다 길어야 함 (네이버 스크래핑 ~20초 + DB upsert).
conn = sqlite3.connect(_db_path(), timeout=120.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn
# ---------- /nodes ----------
@@ -270,6 +276,30 @@ def list_runs(limit: int = 30):
]
# ---------- /snapshot/refresh-news-sentiment ----------
from .ai_news import pipeline as _ai_pipeline
from .ai_news import telegram as _ai_telegram
@router.post("/snapshot/refresh-news-sentiment")
async def post_refresh_news_sentiment(asof: Optional[str] = None):
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
if asof_date.weekday() >= 5:
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
if _is_holiday(asof_date):
return {"asof": asof_date.isoformat(), "status": "skipped_holiday"}
with _conn() as c:
summary = await _ai_pipeline.refresh_daily(c, asof_date)
summary["telegram_text"] = _ai_telegram.build_message(
asof=summary["asof"],
top_pos=summary["top_pos"], top_neg=summary["top_neg"],
tokens_input=summary["tokens_input"],
tokens_output=summary["tokens_output"],
)
return summary
@router.get("/runs/{run_id}")
def get_run(run_id: int):
with _conn() as c:

View File

@@ -12,6 +12,7 @@ DEFAULT_WEIGHTS = {
"rs_rating": 1.2,
"ma_alignment": 1.0,
"vcp_lite": 0.8,
"ai_news": 0.8,
}
DEFAULT_NODE_PARAMS = {
"foreign_buy": {"window_days": 5},
@@ -21,6 +22,7 @@ DEFAULT_NODE_PARAMS = {
"rs_rating": {"weights": {"3m": 2, "6m": 1, "9m": 1, "12m": 1}},
"ma_alignment": {"ma_periods": [50, 150, 200]},
"vcp_lite": {"short_window": 40, "long_window": 252},
"ai_news": {"min_news_count": 1},
}
DEFAULT_GATE_PARAMS = {
"min_market_cap_won": 50_000_000_000,
@@ -110,12 +112,45 @@ CREATE TABLE IF NOT EXISTS screener_results (
FOREIGN KEY (run_id) REFERENCES screener_runs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_results_run_rank ON screener_results(run_id, rank);
CREATE TABLE IF NOT EXISTS news_sentiment (
ticker TEXT NOT NULL,
date TEXT NOT NULL,
score_raw REAL NOT NULL,
reason TEXT NOT NULL DEFAULT '',
news_count INTEGER NOT NULL DEFAULT 0,
tokens_input INTEGER NOT NULL DEFAULT 0,
tokens_output INTEGER NOT NULL DEFAULT 0,
model TEXT NOT NULL DEFAULT 'claude-haiku-4-5-20251001',
created_at TEXT NOT NULL DEFAULT (datetime('now','localtime')),
PRIMARY KEY (ticker, date)
);
CREATE INDEX IF NOT EXISTS idx_news_sentiment_date ON news_sentiment(date DESC);
"""
def ensure_screener_schema(conn: sqlite3.Connection) -> None:
"""Create tables and seed default settings (idempotent)."""
conn.executescript(DDL)
# ai_news 키 누락 시 1회 보충 (이미 운영 중인 환경에 대해)
row = conn.execute(
"SELECT weights_json, node_params_json FROM screener_settings WHERE id=1"
).fetchone()
if row is not None:
w = json.loads(row[0])
p = json.loads(row[1])
changed = False
if "ai_news" not in w:
w["ai_news"] = DEFAULT_WEIGHTS["ai_news"]
changed = True
if "ai_news" not in p:
p["ai_news"] = DEFAULT_NODE_PARAMS["ai_news"]
changed = True
if changed:
conn.execute(
"UPDATE screener_settings SET weights_json=?, node_params_json=? WHERE id=1",
(json.dumps(w), json.dumps(p)),
)
existing = conn.execute("SELECT id FROM screener_settings WHERE id=1").fetchone()
if existing is None:
now = datetime.now(timezone.utc).isoformat()

View File

@@ -22,8 +22,11 @@ NAVER_HEADERS = {
"Referer": "https://finance.naver.com/",
}
DEFAULT_FLOW_TOP_N = 500
DEFAULT_FLOW_TOP_N = 100
DEFAULT_RATE_LIMIT_SEC = 0.2
# 시총 상위 100종목 × 0.2초 = ~20초 — agent-office httpx timeout(180s) 안에 여유롭게 완료
# 외국인 매수 시그널은 대형주에서 의미가 크므로 상위 100종목으로 충분.
# 더 많은 종목이 필요하면 별도 cron으로 분리 권장.
@dataclass

View File

@@ -4,14 +4,15 @@ from __future__ import annotations
import datetime as dt
NODE_ICONS = {
"foreign_buy": "👤외",
"volume_surge": "⚡거",
"momentum": "🚀모",
"high52w": "🆙고",
"rs_rating": "💪RS",
"ma_alignment": "📈MA",
"vcp_lite": "🌀VCP",
# 노드별 풀 라벨 (아이콘 대신 사용 — 사용자가 명확한 이름 선호)
NODE_LABELS = {
"foreign_buy": "외국인",
"volume_surge": "거래량급증",
"momentum": "20일모멘텀",
"high52w": "52주신고가",
"rs_rating": "RS레이팅",
"ma_alignment": "이평선정배열",
"vcp_lite": "VCP수축",
}
PAGE_BASE = "https://gahusb.synology.me/stock/screener"
@@ -25,9 +26,21 @@ def _escape_md(s: str) -> str:
def _format_won(n) -> str:
"""1,234,567원 형태 (None 시 '-')."""
if n is None:
return "-"
return f"{int(n):,}"
return "\\-"
return f"{int(n):,}"
def _format_active_nodes(scores: dict, threshold: int = 70) -> str:
"""70점 이상 노드를 '라벨 점수' 형태로 나열, 콤마 구분."""
active = []
for name, sc in scores.items():
label = NODE_LABELS.get(name)
if label is None or sc < threshold:
continue
active.append(f"{_escape_md(label)} {int(sc)}")
return " · ".join(active) if active else "\\(70점 이상 노드 없음\\)"
def build_telegram_payload(asof: dt.date, mode: str, survivors_count: int,
@@ -40,17 +53,14 @@ def build_telegram_payload(asof: dt.date, mode: str, survivors_count: int,
lines = []
for r in rows[:10]:
icons = " ".join(
NODE_ICONS[name] for name, sc in r["scores"].items()
if sc >= 70 and name in NODE_ICONS
)
nodes_str = _format_active_nodes(r.get("scores", {}))
score_str = f"{r['total_score']:.1f}"
r_pct = r.get("r_pct")
r_pct_str = f"{r_pct:.1f}" if r_pct is not None else "-"
lines.append(
f"{r['rank']}\\. *{_escape_md(r['name'])}* `{r['ticker']}` "
f"{_escape_md(score_str)}\n"
f" {icons}\n"
f" {nodes_str}\n"
f" 진입 {_format_won(r.get('entry_price'))} "
f"손절 {_format_won(r.get('stop_price'))} "
f"익절 {_format_won(r.get('target_price'))} "

View File

@@ -21,15 +21,16 @@ def client():
return TestClient(app)
def test_get_nodes_lists_7_score_and_1_gate(client):
def test_get_nodes_lists_8_score_and_1_gate(client):
r = client.get("/api/stock/screener/nodes")
assert r.status_code == 200
body = r.json()
assert len(body["score_nodes"]) == 7
assert len(body["score_nodes"]) == 8
assert len(body["gate_nodes"]) == 1
assert {n["name"] for n in body["score_nodes"]} == {
"foreign_buy", "volume_surge", "momentum",
"high52w", "rs_rating", "ma_alignment", "vcp_lite",
"ai_news",
}

View File

@@ -31,7 +31,7 @@ def test_build_payload_includes_top10_and_link():
assert "42" in text # run_id 링크
def test_score_threshold_filters_icons():
def test_score_threshold_filters_node_labels():
rows = [{
"rank": 1, "ticker": "A", "name": "A주",
"total_score": 80,
@@ -41,11 +41,28 @@ def test_score_threshold_filters_icons():
"target_price": 53750, "r_pct": 3.5,
}]
p = build_telegram_payload(dt.date(2026, 5, 12), "auto", 100, 1, rows, run_id=1)
# foreign_buy(90), momentum(70), rs_rating(80), ma_alignment(80) 만 표시 (≥70)
assert "👤외" in p["text"]
assert "🚀모" in p["text"]
assert "💪RS" in p["text"]
assert "📈MA" in p["text"]
assert "⚡거" not in p["text"]
assert "🆙고" not in p["text"]
assert "🌀VCP" not in p["text"]
text = p["text"]
# ≥70 노드만 풀 라벨로 표시 (foreign_buy=90, momentum=70, rs_rating=80, ma_alignment=80)
assert "외국인 90" in text
assert "20일모멘텀 70" in text
assert "RS레이팅 80" in text
assert "이평선정배열 80" in text
# <70 노드는 숨김 (volume_surge=50, high52w=30, vcp_lite=60)
assert "거래량급증" not in text
assert "52주신고가" not in text
assert "VCP수축" not in text
def test_prices_have_won_suffix():
rows = [{
"rank": 1, "ticker": "A", "name": "A주",
"total_score": 80,
"scores": {"foreign_buy": 80},
"close": 50000, "entry_price": 50250, "stop_price": 48500,
"target_price": 53750, "r_pct": 3.5,
}]
p = build_telegram_payload(dt.date(2026, 5, 12), "auto", 100, 1, rows, run_id=1)
text = p["text"]
assert "50,250원" in text
assert "48,500원" in text
assert "53,750원" in text

View File

@@ -1,4 +1,5 @@
# 주식 서비스용 라이브러리
anthropic==0.39.0
requests==2.32.3
httpx==0.27.2
beautifulsoup4==4.12.3
@@ -8,4 +9,6 @@ apscheduler==3.10.4
python-dotenv==1.0.1
finance-datareader==0.9.110
lxml==6.1.0
pytest==8.3.2
pytest-asyncio==0.24.0

View File

@@ -0,0 +1,55 @@
import json
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.screener.ai_news import analyzer
def _mk_llm(content_text: str, in_tokens: int = 100, out_tokens: int = 20):
llm = AsyncMock()
resp = MagicMock()
block = MagicMock()
block.text = content_text
resp.content = [block]
resp.usage = MagicMock(input_tokens=in_tokens, output_tokens=out_tokens)
llm.messages = MagicMock()
llm.messages.create = AsyncMock(return_value=resp)
return llm
NEWS = [{"title": "삼성전자, HBM 양산"}, {"title": "메모리 가격 반등"}]
@pytest.mark.asyncio
async def test_score_sentiment_success_parses_json():
llm = _mk_llm(json.dumps({"score": 7.5, "reason": "HBM 호재"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS, name="삼성전자")
assert out["ticker"] == "005930"
assert out["score_raw"] == 7.5
assert out["reason"] == "HBM 호재"
assert out["news_count"] == 2
assert out["tokens_input"] == 100
assert out["tokens_output"] == 20
@pytest.mark.asyncio
async def test_score_sentiment_json_parse_fail_returns_zero():
llm = _mk_llm("not valid json")
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == 0.0
assert "parse fail" in out["reason"]
assert out["tokens_input"] == 100 # 호출은 발생했음
@pytest.mark.asyncio
async def test_score_sentiment_clamps_out_of_range():
llm = _mk_llm(json.dumps({"score": 15.0, "reason": "초강세"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == 10.0 # +10 클램프
@pytest.mark.asyncio
async def test_score_sentiment_clamps_negative_out_of_range():
llm = _mk_llm(json.dumps({"score": -42.0, "reason": "초악재"}))
out = await analyzer.score_sentiment(llm, "005930", NEWS)
assert out["score_raw"] == -10.0

View File

@@ -0,0 +1,57 @@
import datetime as dt
import pandas as pd
import pytest
from app.screener.nodes.ai_news import AiNewsSentiment
class FakeCtx:
def __init__(self, df=None):
self.news_sentiment = df
self.asof = dt.date(2026, 5, 13)
def test_compute_empty_context():
out = AiNewsSentiment().compute(FakeCtx(None), {"min_news_count": 1})
assert out.empty
def test_compute_with_data_percentile_ranks():
df = pd.DataFrame([
{"ticker": "A", "score_raw": -5.0, "news_count": 3},
{"ticker": "B", "score_raw": 0.0, "news_count": 3},
{"ticker": "C", "score_raw": 8.0, "news_count": 3},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert len(out) == 3
# percentile rank: A (lowest) < B < C (highest)
assert out.loc["A"] < out.loc["B"] < out.loc["C"]
# all within [0, 100]
assert (out >= 0).all() and (out <= 100).all()
def test_compute_filters_by_min_news_count():
df = pd.DataFrame([
{"ticker": "A", "score_raw": -5.0, "news_count": 0}, # 필터됨
{"ticker": "B", "score_raw": 0.0, "news_count": 2},
{"ticker": "C", "score_raw": 8.0, "news_count": 5},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert "A" not in out.index
assert "B" in out.index
assert "C" in out.index
def test_compute_all_filtered_returns_empty():
df = pd.DataFrame([
{"ticker": "A", "score_raw": 5.0, "news_count": 0},
])
out = AiNewsSentiment().compute(FakeCtx(df), {"min_news_count": 1})
assert out.empty
def test_metadata():
n = AiNewsSentiment()
assert n.name == "ai_news"
assert "AI" in n.label or "뉴스" in n.label
assert n.default_params == {"min_news_count": 1}
assert "min_news_count" in n.param_schema["properties"]

View File

@@ -0,0 +1,110 @@
import datetime as dt
import sqlite3
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from app.screener.ai_news import pipeline
from app.screener.schema import ensure_screener_schema
@pytest.fixture
def conn():
c = sqlite3.connect(":memory:")
c.row_factory = sqlite3.Row
ensure_screener_schema(c)
# 시총 상위 3종목 시드
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("005930", "삼성전자", 9_000_000))
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("000660", "SK하이닉스", 8_000_000))
c.execute("INSERT INTO krx_master (ticker, name, market, market_cap, updated_at) "
"VALUES (?, ?, 'KOSPI', ?, datetime('now'))", ("373220", "LG에너지솔루션", 7_000_000))
c.commit()
yield c
c.close()
@pytest.mark.asyncio
async def test_refresh_daily_happy_path(conn):
"""3종목 mini integration — 각 종목별로 scraper/analyzer mock."""
asof = dt.date(2026, 5, 13)
fake_news = [{"title": "헤드라인"}]
async def fake_fetch(client, ticker, n):
return fake_news
scores_by_ticker = {
"005930": 7.5, "000660": 4.0, "373220": -6.0,
}
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": scores_by_ticker[ticker],
"reason": f"r{ticker}", "news_count": 1,
"tokens_input": 100, "tokens_output": 20, "model": model,
}
with patch.object(pipeline, "_scraper") as ms, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \
patch.object(pipeline, "_make_http") as mh:
ms.fetch_news = fake_fetch
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock()
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
assert result["asof"] == "2026-05-13"
assert result["updated"] == 3
assert result["failures"] == []
assert len(result["top_pos"]) == 3
assert result["top_pos"][0]["ticker"] == "005930" # 가장 큰 점수
assert result["top_neg"][0]["ticker"] == "373220" # 가장 작은 점수
assert result["tokens_input"] == 300
assert result["tokens_output"] == 60
# DB upsert 확인
rows = conn.execute("SELECT ticker, score_raw FROM news_sentiment WHERE date=?",
("2026-05-13",)).fetchall()
assert len(rows) == 3
by_ticker = {r["ticker"]: r["score_raw"] for r in rows}
assert by_ticker["005930"] == 7.5
assert by_ticker["373220"] == -6.0
@pytest.mark.asyncio
async def test_refresh_daily_failures_isolated(conn):
"""한 종목이 예외 던져도 나머지 종목은 정상 처리."""
asof = dt.date(2026, 5, 13)
async def fake_fetch(client, ticker, n):
return [{"title": "h"}]
async def fake_score(llm, ticker, news, *, name=None, model="m"):
if ticker == "000660":
raise RuntimeError("llm exploded")
return {
"ticker": ticker, "score_raw": 5.0, "reason": "r", "news_count": 1,
"tokens_input": 100, "tokens_output": 20, "model": model,
}
with patch.object(pipeline, "_scraper") as ms, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml, \
patch.object(pipeline, "_make_http") as mh:
ms.fetch_news = fake_fetch
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
mh.return_value.__aenter__.return_value = AsyncMock()
mh.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3, rate_limit_sec=0)
assert result["updated"] == 2
assert len(result["failures"]) == 1
def test_top_market_cap_tickers(conn):
out = pipeline._top_market_cap_tickers(conn, n=2)
assert out == ["005930", "000660"]

View File

@@ -0,0 +1,36 @@
import datetime as dt
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from app.main import app
def test_refresh_news_sentiment_weekend_skip():
# 2026-05-16 = Saturday
client = TestClient(app)
resp = client.post(
"/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-16"
)
assert resp.status_code == 200
assert resp.json()["status"] == "skipped_weekend"
def test_refresh_news_sentiment_weekday_invokes_pipeline():
fake_summary = {
"asof": "2026-05-13", "updated": 3, "failures": [],
"duration_sec": 1.0, "tokens_input": 100, "tokens_output": 20,
"top_pos": [], "top_neg": [], "model": "m",
}
with patch("app.screener.router._ai_pipeline") as mp, \
patch("app.screener.router._ai_telegram") as mt:
mp.refresh_daily = AsyncMock(return_value=fake_summary)
mt.build_message = lambda **kw: "BUILT_TEXT"
client = TestClient(app)
resp = client.post(
"/api/stock/screener/snapshot/refresh-news-sentiment?asof=2026-05-13"
)
assert resp.status_code == 200
body = resp.json()
assert body["asof"] == "2026-05-13"
assert body["updated"] == 3
assert body["telegram_text"] == "BUILT_TEXT"

View File

@@ -0,0 +1,55 @@
import pytest
from unittest.mock import AsyncMock
from app.screener.ai_news import scraper
SAMPLE_HTML = """
<html><body>
<table class="type5"><tbody>
<tr><td class="title"><a href="/news1">삼성전자, HBM 양산 가시화</a></td><td class="date">2026.05.13 07:30</td></tr>
<tr><td class="title"><a href="/news2">삼성, 4분기 어닝 쇼크 우려</a></td><td class="date">2026.05.13 06:00</td></tr>
<tr><td class="title"><a href="/news3">메모리 시장 회복세</a></td><td class="date">2026.05.12 18:00</td></tr>
</tbody></table>
</body></html>
"""
EMPTY_HTML = "<html><body><table class='type5'><tbody></tbody></table></body></html>"
def _mk_client(status_code=200, text=SAMPLE_HTML):
client = AsyncMock()
resp = AsyncMock()
resp.status_code = status_code
resp.text = text
client.get = AsyncMock(return_value=resp)
return client
@pytest.mark.asyncio
async def test_fetch_news_success_returns_n_items():
client = _mk_client()
out = await scraper.fetch_news(client, "005930", n=2)
assert len(out) == 2
assert out[0]["title"] == "삼성전자, HBM 양산 가시화"
assert out[0]["date"] == "2026.05.13 07:30"
@pytest.mark.asyncio
async def test_fetch_news_404_returns_empty():
client = _mk_client(status_code=404, text="")
out = await scraper.fetch_news(client, "999999", n=5)
assert out == []
@pytest.mark.asyncio
async def test_fetch_news_empty_table_returns_empty():
client = _mk_client(text=EMPTY_HTML)
out = await scraper.fetch_news(client, "005930", n=5)
assert out == []
@pytest.mark.asyncio
async def test_fetch_news_n_caps_results():
client = _mk_client()
out = await scraper.fetch_news(client, "005930", n=2)
assert len(out) == 2 # 샘플에 3개 있지만 n=2로 잘림

View File

@@ -0,0 +1,54 @@
from app.screener.ai_news import telegram as tg
def _row(ticker, score, reason="r"):
return {"ticker": ticker, "score_raw": score, "reason": reason,
"news_count": 5, "tokens_input": 100, "tokens_output": 20,
"model": "m"}
def test_build_message_includes_top_sections():
msg = tg.build_message(
asof="2026-05-13",
top_pos=[_row("005930", 8.5, "HBM 호재")],
top_neg=[_row("373220", -6.3, "수주 지연")],
tokens_input=10000, tokens_output=2000,
)
assert "AI 뉴스 분석" in msg
assert "호재 Top" in msg
assert "악재 Top" in msg
assert "005930" in msg
assert "8.5" in msg
assert "HBM" in msg
assert "373220" in msg
def test_build_message_escapes_markdownv2_specials():
msg = tg.build_message(
asof="2026-05-13",
top_pos=[_row("005930", 3.0, "테스트(괄호) [대괄호]")],
top_neg=[],
tokens_input=100, tokens_output=20,
)
# MarkdownV2 특수문자 ( ) [ ] 이 escape 되어야 함
assert r"\(" in msg or r"\)" in msg
assert r"\[" in msg or r"\]" in msg
def test_build_message_cost_won_line():
msg = tg.build_message(
asof="2026-05-13", top_pos=[], top_neg=[],
tokens_input=10000, tokens_output=2000,
)
# tokens_input × 0.0013 + tokens_output × 0.0065 = 13 + 13 = ₩26
assert "₩26" in msg or "₩ 26" in msg or "" in msg
def test_build_message_empty_lists():
msg = tg.build_message(
asof="2026-05-13", top_pos=[], top_neg=[],
tokens_input=0, tokens_output=0,
)
# 빈 리스트라도 헤더는 있어야 함
assert "호재 Top" in msg
assert "악재 Top" in msg

View File

@@ -7,9 +7,10 @@ DB_PATH = os.getenv("TRAVEL_DB_PATH", "/data/thumbs/travel.db")
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn