Files
web-page-backend/insta-lab/app/trend_collector.py
gahusb bf5897fc85 fix(insta-lab): trend_collector — Google Trends RSS + seed placeholder filter
(1) pytrends 4.x가 Google API 변경으로 trending_searches(pn='south_korea')
가 404 반환 → daily trending searches RSS endpoint를 requests로 직접 호출
하도록 교체. pytrends 의존성 제거.

(2) category_seeds 프롬프트 템플릿에 placeholder ('...', 'TBD' 등) 또는
2자 미만 값이 들어가면 NAVER가 400 Bad Request 반환 → _seeds_for에
_is_valid_seed 가드 추가, 모두 invalid면 DEFAULT_CATEGORY_SEEDS 폴백.

테스트 8/8 PASS (기존 6 + placeholder/fallback 2 신규).
2026-05-17 09:21:38 +09:00

221 lines
7.6 KiB
Python

"""외부 트렌드 수집 — NAVER 인기 + Google Trends RSS + LLM 카테고리 분류.
NAVER: 카테고리별 시드 키워드로 인기 검색 → 빈도 상위 추출.
Google Trends: pytrends 4.x가 Google API 변경으로 깨진 상태라 daily RSS endpoint 직접 호출.
LLM 분류 결과는 24h in-memory 캐시.
"""
import json
import logging
import re
import time
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional
import requests
from anthropic import Anthropic
from .config import (
NAVER_CLIENT_ID, NAVER_CLIENT_SECRET, DEFAULT_CATEGORY_SEEDS,
ANTHROPIC_API_KEY, ANTHROPIC_MODEL_HAIKU,
)
from . import db
from .news_collector import _clean
from .keyword_extractor import _count_nouns, _top_candidates
logger = logging.getLogger(__name__)
NEWS_URL = "https://openapi.naver.com/v1/search/news.json"
_NAVER_HEADERS = {
"X-Naver-Client-Id": NAVER_CLIENT_ID,
"X-Naver-Client-Secret": NAVER_CLIENT_SECRET,
}
GOOGLE_TRENDS_RSS_URL = "https://trends.google.com/trends/trendingsearches/daily/rss?geo=KR"
_PLACEHOLDER_SEEDS = {"...", "", "tbd", "todo", "placeholder", "example"}
def _is_valid_seed(s: str) -> bool:
"""프롬프트 템플릿에 placeholder/빈 값이 들어가 NAVER에 400을 유발하는 일을 막는 가드."""
if not s:
return False
s = s.strip()
if len(s) < 2:
return False
if s.lower() in _PLACEHOLDER_SEEDS:
return False
return True
def _seeds_for(category: str) -> List[str]:
"""category_seeds 프롬프트 템플릿이 있으면 사용, 없거나 모두 invalid면 config DEFAULT 폴백."""
pt = db.get_prompt_template("category_seeds")
if pt and pt.get("template"):
try:
data = json.loads(pt["template"])
if category in data:
filtered = [s for s in (data[category] or []) if _is_valid_seed(s)]
if filtered:
return filtered
logger.warning("category_seeds[%s]에 유효한 시드 없음 → DEFAULT 폴백", category)
except Exception as e:
logger.warning("category_seeds JSON 파싱 실패 → DEFAULT 폴백: %s", e)
return list(DEFAULT_CATEGORY_SEEDS.get(category, []))
def fetch_naver_popular(category: str, per_seed: int = 30, top_n: int = 10) -> List[Dict[str, Any]]:
"""카테고리 시드 키워드들로 NAVER news.json `sort=sim` 호출,
응답 기사 묶음에서 빈도어 추출 후 상위 N개 반환."""
seeds = _seeds_for(category)
if not seeds:
return []
blob_parts: List[str] = []
for seed in seeds:
try:
resp = requests.get(
NEWS_URL,
headers=_NAVER_HEADERS,
params={"query": seed, "display": per_seed, "sort": "sim"},
timeout=10,
)
resp.raise_for_status()
for item in resp.json().get("items", []):
blob_parts.append(_clean(item.get("title", "")))
blob_parts.append(_clean(item.get("description", "")))
except Exception as e:
logger.warning("fetch_naver_popular seed=%s err=%s", seed, e)
continue
text = "\n".join(blob_parts)
counts = _count_nouns(text)
candidates = _top_candidates(counts, n=top_n)
if not candidates:
return []
max_count = candidates[0][1] or 1
return [
{
"keyword": k,
"category": category,
"source": "naver_popular",
"score": round(min(1.0, c / max_count), 4),
"articles_count": c,
}
for k, c in candidates
]
def collect_naver_popular_for(categories: List[str]) -> int:
total = 0
for cat in categories:
trends = fetch_naver_popular(cat)
for t in trends:
db.add_external_trend(t)
total += 1
return total
# ── LLM 분류 캐시 ────────────────────────────────────────────────────────────
_CACHE_TTL_SEC = 24 * 3600
_category_cache: Dict[str, tuple] = {} # keyword -> (category, expires_ts)
def _llm_classify_one(keyword: str) -> str:
"""Claude Haiku 1회 호출로 단일 키워드 분류."""
if not ANTHROPIC_API_KEY:
return "uncategorized"
seeds_template = db.get_prompt_template("category_seeds")
if seeds_template and seeds_template.get("template"):
try:
allowed = sorted(json.loads(seeds_template["template"]).keys())
except Exception:
allowed = sorted(DEFAULT_CATEGORY_SEEDS.keys())
else:
allowed = sorted(DEFAULT_CATEGORY_SEEDS.keys())
allowed.append("uncategorized")
client = Anthropic(api_key=ANTHROPIC_API_KEY)
msg = client.messages.create(
model=ANTHROPIC_MODEL_HAIKU,
max_tokens=20,
messages=[{
"role": "user",
"content": (
f"다음 한국어 트렌딩 키워드를 카테고리 중 하나로 분류해라. "
f"카테고리: {allowed}. 키워드: '{keyword}'. "
f"카테고리명 한 단어만 출력. 다른 텍스트 금지."
),
}],
)
raw = msg.content[0].text.strip().lower()
for cat in allowed:
if cat.lower() in raw:
return cat
return "uncategorized"
def classify_keyword(keyword: str) -> str:
now = time.time()
cached = _category_cache.get(keyword)
if cached and cached[1] > now:
return cached[0]
cat = _llm_classify_one(keyword)
_category_cache[keyword] = (cat, now + _CACHE_TTL_SEC)
return cat
# ── Google Trends ─────────────────────────────────────────────────────────────
# pytrends 4.x가 Google API 변경(404)으로 자주 깨지므로 daily trending searches
# RSS endpoint를 직접 호출. RSS는 공식 Google Trends 서비스가 제공하며 pn=geo
# 파라미터로 region 지정 가능.
def fetch_google_trends() -> List[Dict[str, Any]]:
"""Google Trends Daily RSS (한국) 직접 호출. 실패 시 빈 리스트로 graceful degrade."""
try:
resp = requests.get(
GOOGLE_TRENDS_RSS_URL,
timeout=15,
headers={"User-Agent": "Mozilla/5.0 (insta-lab trend collector)"},
)
resp.raise_for_status()
root = ET.fromstring(resp.text)
titles = [
(item.findtext("title") or "").strip()
for item in root.iter("item")
]
titles = [t for t in titles if t]
except Exception as e:
logger.warning("Google Trends RSS fetch failed: %s", e)
return []
items: List[Dict[str, Any]] = []
total = max(1, len(titles))
for idx, kw in enumerate(titles):
try:
cat = classify_keyword(kw)
except Exception as e:
logger.warning("classify_keyword(%s) 실패: %s", kw, e)
cat = "uncategorized"
rank_score = round(max(0.0, 1.0 - (idx / total)), 4)
items.append({
"keyword": kw,
"category": cat,
"source": "google_trends",
"score": rank_score,
"articles_count": 0,
})
return items
def collect_google_trends() -> int:
items = fetch_google_trends()
for it in items:
db.add_external_trend(it)
return len(items)
def collect_all(categories: List[str]) -> Dict[str, int]:
naver_n = collect_naver_popular_for(categories)
google_n = collect_google_trends()
return {"naver_popular": naver_n, "google_trends": google_n}