feat(insta-lab): trend_collector with NAVER popular fetcher

This commit is contained in:
2026-05-16 17:47:17 +09:00
parent b3982c8f72
commit 685320f3cf
2 changed files with 301 additions and 0 deletions

View File

@@ -0,0 +1,180 @@
"""외부 트렌드 수집 — NAVER 인기 + Google Trends + LLM 카테고리 분류."""
import json
import logging
import re
import time
from typing import Any, Dict, List, Optional
import requests
from anthropic import Anthropic
from pytrends.request import TrendReq
from .config import (
NAVER_CLIENT_ID, NAVER_CLIENT_SECRET, DEFAULT_CATEGORY_SEEDS,
ANTHROPIC_API_KEY, ANTHROPIC_MODEL_HAIKU,
)
from . import db
from .news_collector import _clean
from .keyword_extractor import _count_nouns, _top_candidates
logger = logging.getLogger(__name__)
NEWS_URL = "https://openapi.naver.com/v1/search/news.json"
_NAVER_HEADERS = {
"X-Naver-Client-Id": NAVER_CLIENT_ID,
"X-Naver-Client-Secret": NAVER_CLIENT_SECRET,
}
def _seeds_for(category: str) -> List[str]:
pt = db.get_prompt_template("category_seeds")
if pt and pt.get("template"):
try:
data = json.loads(pt["template"])
if category in data:
return list(data[category])
except Exception:
pass
return list(DEFAULT_CATEGORY_SEEDS.get(category, []))
def fetch_naver_popular(category: str, per_seed: int = 30, top_n: int = 10) -> List[Dict[str, Any]]:
"""카테고리 시드 키워드들로 NAVER news.json `sort=sim` 호출,
응답 기사 묶음에서 빈도어 추출 후 상위 N개 반환."""
seeds = _seeds_for(category)
if not seeds:
return []
blob_parts: List[str] = []
for seed in seeds:
try:
resp = requests.get(
NEWS_URL,
headers=_NAVER_HEADERS,
params={"query": seed, "display": per_seed, "sort": "sim"},
timeout=10,
)
resp.raise_for_status()
for item in resp.json().get("items", []):
blob_parts.append(_clean(item.get("title", "")))
blob_parts.append(_clean(item.get("description", "")))
except Exception as e:
logger.warning("fetch_naver_popular seed=%s err=%s", seed, e)
continue
text = "\n".join(blob_parts)
counts = _count_nouns(text)
candidates = _top_candidates(counts, n=top_n)
if not candidates:
return []
max_count = candidates[0][1] or 1
return [
{
"keyword": k,
"category": category,
"source": "naver_popular",
"score": round(min(1.0, c / max_count), 4),
"articles_count": c,
}
for k, c in candidates
]
def collect_naver_popular_for(categories: List[str]) -> int:
total = 0
for cat in categories:
trends = fetch_naver_popular(cat)
for t in trends:
db.add_external_trend(t)
total += 1
return total
# ── LLM 분류 캐시 ────────────────────────────────────────────────────────────
_CACHE_TTL_SEC = 24 * 3600
_category_cache: Dict[str, tuple] = {} # keyword -> (category, expires_ts)
def _llm_classify_one(keyword: str) -> str:
"""Claude Haiku 1회 호출로 단일 키워드 분류."""
if not ANTHROPIC_API_KEY:
return "uncategorized"
seeds_template = db.get_prompt_template("category_seeds")
if seeds_template and seeds_template.get("template"):
try:
allowed = sorted(json.loads(seeds_template["template"]).keys())
except Exception:
allowed = sorted(DEFAULT_CATEGORY_SEEDS.keys())
else:
allowed = sorted(DEFAULT_CATEGORY_SEEDS.keys())
allowed.append("uncategorized")
client = Anthropic(api_key=ANTHROPIC_API_KEY)
msg = client.messages.create(
model=ANTHROPIC_MODEL_HAIKU,
max_tokens=20,
messages=[{
"role": "user",
"content": (
f"다음 한국어 트렌딩 키워드를 카테고리 중 하나로 분류해라. "
f"카테고리: {allowed}. 키워드: '{keyword}'. "
f"카테고리명 한 단어만 출력. 다른 텍스트 금지."
),
}],
)
raw = msg.content[0].text.strip().lower()
for cat in allowed:
if cat.lower() in raw:
return cat
return "uncategorized"
def classify_keyword(keyword: str) -> str:
now = time.time()
cached = _category_cache.get(keyword)
if cached and cached[1] > now:
return cached[0]
cat = _llm_classify_one(keyword)
_category_cache[keyword] = (cat, now + _CACHE_TTL_SEC)
return cat
# ── Google Trends ─────────────────────────────────────────────────────────────
def fetch_google_trends() -> List[Dict[str, Any]]:
"""pytrends 한국 daily trending searches. 실패 시 빈 리스트."""
try:
pytrends = TrendReq(hl="ko-KR", tz=540)
df = pytrends.trending_searches(pn="south_korea")
except Exception as e:
logger.warning("Google Trends fetch failed: %s", e)
return []
items: List[Dict[str, Any]] = []
for idx, row in df.iterrows():
kw = str(row.iloc[0]).strip()
if not kw:
continue
cat = classify_keyword(kw)
rank_score = round(max(0.0, 1.0 - (idx / max(1, len(df)))), 4)
items.append({
"keyword": kw,
"category": cat,
"source": "google_trends",
"score": rank_score,
"articles_count": 0,
})
return items
def collect_google_trends() -> int:
items = fetch_google_trends()
for it in items:
db.add_external_trend(it)
return len(items)
def collect_all(categories: List[str]) -> Dict[str, int]:
naver_n = collect_naver_popular_for(categories)
google_n = collect_google_trends()
return {"naver_popular": naver_n, "google_trends": google_n}

View File

@@ -0,0 +1,121 @@
import os
import gc
import tempfile
from unittest.mock import patch, MagicMock
import pytest
from app import db as db_module
from app import trend_collector
@pytest.fixture
def tmp_db(monkeypatch):
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
monkeypatch.setattr(db_module, "DB_PATH", path)
db_module.init_db()
yield path
gc.collect()
for ext in ("", "-wal", "-shm"):
try:
os.remove(path + ext)
except OSError:
pass
NAVER_RESPONSE = {
"items": [
{"title": "<b>기준금리</b> 인상", "link": "https://n.news.naver.com/a/1", "description": "한국은행 발표"},
{"title": "환율 급등", "link": "https://n.news.naver.com/a/2", "description": "달러 강세"},
{"title": "기준금리 추가 인상", "link": "https://n.news.naver.com/a/3", "description": "추가 발표"},
],
}
def test_fetch_naver_popular_extracts_top_terms(tmp_db, monkeypatch):
fake_resp = MagicMock()
fake_resp.json.return_value = NAVER_RESPONSE
fake_resp.raise_for_status.return_value = None
with patch.object(trend_collector.requests, "get", return_value=fake_resp):
trends = trend_collector.fetch_naver_popular("economy", per_seed=10, top_n=5)
keywords = [t["keyword"] for t in trends]
assert "기준금리" in keywords
for t in trends:
assert t["category"] == "economy"
assert t["source"] == "naver_popular"
assert 0.0 <= t["score"] <= 1.0
def test_collect_naver_writes_to_db(tmp_db, monkeypatch):
fake_resp = MagicMock()
fake_resp.json.return_value = NAVER_RESPONSE
fake_resp.raise_for_status.return_value = None
with patch.object(trend_collector.requests, "get", return_value=fake_resp):
n = trend_collector.collect_naver_popular_for(["economy"])
assert n > 0
rows = db_module.list_trends(source="naver_popular")
assert len(rows) > 0
assert all(r["source"] == "naver_popular" for r in rows)
def test_classify_keyword_with_cache(monkeypatch):
calls = {"n": 0}
def fake_claude(keyword: str) -> str:
calls["n"] += 1
return "economy"
monkeypatch.setattr(trend_collector, "_llm_classify_one", fake_claude)
trend_collector._category_cache.clear()
c1 = trend_collector.classify_keyword("기준금리")
c2 = trend_collector.classify_keyword("기준금리")
assert c1 == c2 == "economy"
assert calls["n"] == 1
def test_fetch_google_trends_parses_and_classifies(tmp_db, monkeypatch):
class FakePyTrends:
def __init__(self, *_a, **_kw):
pass
def trending_searches(self, pn="south_korea"):
import pandas as pd
return pd.DataFrame({"0": ["기준금리", "BTS 컴백", "스트레스 관리"]})
monkeypatch.setattr(trend_collector, "TrendReq", FakePyTrends)
monkeypatch.setattr(trend_collector, "classify_keyword",
lambda kw: {"기준금리": "economy", "BTS 컴백": "celebrity",
"스트레스 관리": "psychology"}.get(kw, "uncategorized"))
trends = trend_collector.fetch_google_trends()
by_kw = {t["keyword"]: t for t in trends}
assert by_kw["기준금리"]["category"] == "economy"
assert by_kw["BTS 컴백"]["category"] == "celebrity"
assert by_kw["스트레스 관리"]["category"] == "psychology"
assert all(t["source"] == "google_trends" for t in trends)
def test_collect_all_invokes_both_sources(tmp_db, monkeypatch):
monkeypatch.setattr(trend_collector, "collect_naver_popular_for",
lambda cats: 5)
monkeypatch.setattr(trend_collector, "collect_google_trends",
lambda: 3)
out = trend_collector.collect_all(["economy"])
assert out == {"naver_popular": 5, "google_trends": 3}
def test_fetch_google_trends_graceful_on_pytrends_failure(monkeypatch):
class FakePyTrends:
def __init__(self, *_a, **_kw):
pass
def trending_searches(self, pn="south_korea"):
raise RuntimeError("rate limited")
monkeypatch.setattr(trend_collector, "TrendReq", FakePyTrends)
out = trend_collector.fetch_google_trends()
assert out == []