Files
web-page-backend/insta-lab/app/trend_collector.py
gahusb cfbb72051f fix(insta-lab): Google Trends — RSS endpoint도 404 폐기, dailytrends JSON API로 교체
Google이 /trends/trendingsearches/daily/rss?geo=KR도 404로 폐기 (직전
fix에서 RSS로 교체했으나 NAS에서 실제 호출 시 404 확인). 대안으로 비공식
/trends/api/dailytrends?hl=ko&tz=-540&geo=KR&ns=15 JSON API로 교체.
응답 앞 `)]}'` XSSI 보호 prefix는 정규식으로 자르고 JSON 파싱.
중복 키워드 제거 + 등장 순서 보존.
2026-05-17 09:30:40 +09:00

240 lines
8.3 KiB
Python

"""외부 트렌드 수집 — NAVER 인기 + Google Trends + LLM 카테고리 분류.
NAVER: 카테고리별 시드 키워드로 인기 검색 → 빈도 상위 추출.
Google Trends: pytrends 4.x + daily RSS endpoint 모두 폐기/404로 깨진 상태라
'/trends/api/dailytrends' JSON API를 직접 호출 (응답 앞 `)]}'` XSSI 접두사 자름).
LLM 분류 결과는 24h in-memory 캐시.
"""
import json
import logging
import re
import time
from typing import Any, Dict, List, Optional
import requests
from anthropic import Anthropic
from .config import (
NAVER_CLIENT_ID, NAVER_CLIENT_SECRET, DEFAULT_CATEGORY_SEEDS,
ANTHROPIC_API_KEY, ANTHROPIC_MODEL_HAIKU,
)
from . import db
from .news_collector import _clean
from .keyword_extractor import _count_nouns, _top_candidates
logger = logging.getLogger(__name__)
NEWS_URL = "https://openapi.naver.com/v1/search/news.json"
_NAVER_HEADERS = {
"X-Naver-Client-Id": NAVER_CLIENT_ID,
"X-Naver-Client-Secret": NAVER_CLIENT_SECRET,
}
GOOGLE_TRENDS_DAILY_URL = (
"https://trends.google.com/trends/api/dailytrends"
"?hl=ko&tz=-540&geo=KR&ns=15"
)
_PLACEHOLDER_SEEDS = {"...", "", "tbd", "todo", "placeholder", "example"}
def _is_valid_seed(s: str) -> bool:
"""프롬프트 템플릿에 placeholder/빈 값이 들어가 NAVER에 400을 유발하는 일을 막는 가드."""
if not s:
return False
s = s.strip()
if len(s) < 2:
return False
if s.lower() in _PLACEHOLDER_SEEDS:
return False
return True
def _seeds_for(category: str) -> List[str]:
"""category_seeds 프롬프트 템플릿이 있으면 사용, 없거나 모두 invalid면 config DEFAULT 폴백."""
pt = db.get_prompt_template("category_seeds")
if pt and pt.get("template"):
try:
data = json.loads(pt["template"])
if category in data:
filtered = [s for s in (data[category] or []) if _is_valid_seed(s)]
if filtered:
return filtered
logger.warning("category_seeds[%s]에 유효한 시드 없음 → DEFAULT 폴백", category)
except Exception as e:
logger.warning("category_seeds JSON 파싱 실패 → DEFAULT 폴백: %s", e)
return list(DEFAULT_CATEGORY_SEEDS.get(category, []))
def fetch_naver_popular(category: str, per_seed: int = 30, top_n: int = 10) -> List[Dict[str, Any]]:
"""카테고리 시드 키워드들로 NAVER news.json `sort=sim` 호출,
응답 기사 묶음에서 빈도어 추출 후 상위 N개 반환."""
seeds = _seeds_for(category)
if not seeds:
return []
blob_parts: List[str] = []
for seed in seeds:
try:
resp = requests.get(
NEWS_URL,
headers=_NAVER_HEADERS,
params={"query": seed, "display": per_seed, "sort": "sim"},
timeout=10,
)
resp.raise_for_status()
for item in resp.json().get("items", []):
blob_parts.append(_clean(item.get("title", "")))
blob_parts.append(_clean(item.get("description", "")))
except Exception as e:
logger.warning("fetch_naver_popular seed=%s err=%s", seed, e)
continue
text = "\n".join(blob_parts)
counts = _count_nouns(text)
candidates = _top_candidates(counts, n=top_n)
if not candidates:
return []
max_count = candidates[0][1] or 1
return [
{
"keyword": k,
"category": category,
"source": "naver_popular",
"score": round(min(1.0, c / max_count), 4),
"articles_count": c,
}
for k, c in candidates
]
def collect_naver_popular_for(categories: List[str]) -> int:
total = 0
for cat in categories:
trends = fetch_naver_popular(cat)
for t in trends:
db.add_external_trend(t)
total += 1
return total
# ── LLM 분류 캐시 ────────────────────────────────────────────────────────────
_CACHE_TTL_SEC = 24 * 3600
_category_cache: Dict[str, tuple] = {} # keyword -> (category, expires_ts)
def _llm_classify_one(keyword: str) -> str:
"""Claude Haiku 1회 호출로 단일 키워드 분류."""
if not ANTHROPIC_API_KEY:
return "uncategorized"
seeds_template = db.get_prompt_template("category_seeds")
if seeds_template and seeds_template.get("template"):
try:
allowed = sorted(json.loads(seeds_template["template"]).keys())
except Exception:
allowed = sorted(DEFAULT_CATEGORY_SEEDS.keys())
else:
allowed = sorted(DEFAULT_CATEGORY_SEEDS.keys())
allowed.append("uncategorized")
client = Anthropic(api_key=ANTHROPIC_API_KEY)
msg = client.messages.create(
model=ANTHROPIC_MODEL_HAIKU,
max_tokens=20,
messages=[{
"role": "user",
"content": (
f"다음 한국어 트렌딩 키워드를 카테고리 중 하나로 분류해라. "
f"카테고리: {allowed}. 키워드: '{keyword}'. "
f"카테고리명 한 단어만 출력. 다른 텍스트 금지."
),
}],
)
raw = msg.content[0].text.strip().lower()
for cat in allowed:
if cat.lower() in raw:
return cat
return "uncategorized"
def classify_keyword(keyword: str) -> str:
now = time.time()
cached = _category_cache.get(keyword)
if cached and cached[1] > now:
return cached[0]
cat = _llm_classify_one(keyword)
_category_cache[keyword] = (cat, now + _CACHE_TTL_SEC)
return cat
# ── Google Trends ─────────────────────────────────────────────────────────────
# pytrends 4.x + daily RSS endpoint(`/trends/trendingsearches/daily/rss`) 모두
# 폐기/404 상태라 Google Trends 비공식 JSON API `/trends/api/dailytrends`를 직접
# 호출. 응답 앞에 `)]}'` XSSI 보호 prefix가 붙어있어 잘라낸 후 JSON 파싱.
# 응답 구조: default.trendingSearchesDays[].trendingSearches[].title.query
_XSSI_PREFIX_RE = re.compile(r"^[\s\)\]\}',\n]+")
def fetch_google_trends() -> List[Dict[str, Any]]:
"""Google Trends Daily JSON API (한국) 직접 호출. 실패 시 빈 리스트."""
try:
resp = requests.get(
GOOGLE_TRENDS_DAILY_URL,
timeout=15,
headers={"User-Agent": "Mozilla/5.0 (insta-lab trend collector)"},
)
resp.raise_for_status()
body = _XSSI_PREFIX_RE.sub("", resp.text, count=1)
data = json.loads(body)
days = data.get("default", {}).get("trendingSearchesDays", []) or []
raw_titles: List[str] = []
for day in days:
for ts in day.get("trendingSearches", []) or []:
q = (ts.get("title") or {}).get("query", "")
if isinstance(q, str):
q = q.strip()
if q:
raw_titles.append(q)
# 중복 제거 (등장 순서 유지)
seen = set()
titles: List[str] = []
for t in raw_titles:
if t not in seen:
seen.add(t)
titles.append(t)
except Exception as e:
logger.warning("Google Trends daily fetch failed: %s", e)
return []
items: List[Dict[str, Any]] = []
total = max(1, len(titles))
for idx, kw in enumerate(titles):
try:
cat = classify_keyword(kw)
except Exception as e:
logger.warning("classify_keyword(%s) 실패: %s", kw, e)
cat = "uncategorized"
rank_score = round(max(0.0, 1.0 - (idx / total)), 4)
items.append({
"keyword": kw,
"category": cat,
"source": "google_trends",
"score": rank_score,
"articles_count": 0,
})
return items
def collect_google_trends() -> int:
items = fetch_google_trends()
for it in items:
db.add_external_trend(it)
return len(items)
def collect_all(categories: List[str]) -> Dict[str, int]:
naver_n = collect_naver_popular_for(categories)
google_n = collect_google_trends()
return {"naver_popular": naver_n, "google_trends": google_n}