14 Commits

Author SHA1 Message Date
42bd53ee7b feat(insta): _bg_extract uses preferences + 09:00 trends_collect cron 2026-05-16 17:58:52 +09:00
86694ae4fe feat(agent-office): InstaAgent collect_trends action + preferences-aware on_schedule 2026-05-16 17:57:44 +09:00
41225b3337 feat(insta-lab): main.py — trends + preferences endpoints
- POST /api/insta/trends/collect — background trend collection via trend_collector.collect_all
- GET /api/insta/trends — list external trends with source/category/days filters
- GET /api/insta/preferences — return category weights (defaults seeded on init_db)
- PUT /api/insta/preferences — upsert category weights
- Modified GET /api/insta/keywords to accept source= filter (source present → list_trends, else existing list_trending_keywords, backward compatible)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 17:54:09 +09:00
6bb5c2fb40 feat(insta-lab): keyword_extractor.extract_with_weights for category proportions 2026-05-16 17:51:16 +09:00
bd1773e29e feat(insta-lab): trend_collector adds Google Trends + LLM category classification 2026-05-16 17:48:26 +09:00
685320f3cf feat(insta-lab): trend_collector with NAVER popular fetcher 2026-05-16 17:47:17 +09:00
b3982c8f72 feat(insta-lab): db migration — trending_keywords.source + account_preferences + CRUD
- Idempotent ALTER TABLE adds source column (default 'manual') + idx_tk_source index
- New account_preferences table seeded with economy/psychology/celebrity at weight=1.0
- add_trending_keyword now accepts optional source param
- New helpers: add_external_trend, list_trends, get_preferences, upsert_preferences
- test_db updated: six→seven tables; test_preferences_crud.py (7 new tests, all pass)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-16 17:44:01 +09:00
002c0893f8 chore(insta-lab): add pytrends>=4.9 dependency 2026-05-16 17:41:30 +09:00
d6081ba2d3 docs(insta-trends): implementation plan (10 TDD-grouped tasks)
trend_collector NAVER+Google+LLM 분류, db migration + preferences CRUD,
extract_with_weights, 4 endpoints + keywords source 필터, InstaAgent
collect_trends action + preferences-aware schedule, web-ui 탭 + 3 패널,
스모크 매트릭스.
2026-05-16 17:39:19 +09:00
10cb3ae1df docs(insta-trends): 셀프 리뷰 보강 — LLM 분류 캐시 위치, days 쿼리 의미 명시 2026-05-16 17:31:22 +09:00
e3348da642 docs(insta-trends): 외부 트렌드 + 카테고리 가중치 설계
NAVER 인기 + Google Trends 두 source 수집, account_preferences로 카테고리
가중치 모델, 가중치 기반 키워드 추출 알고리즘, Insta 페이지 Cards/Trends
탭 분리.
2026-05-16 17:30:45 +09:00
088bbaa097 fix(deploy): use docker inspect for healthcheck (호스트/컨테이너 둘 다 동작)
기존 curl http://lotto:8000/health은 deployer 컨테이너 내부에서만
Docker DNS가 'lotto'를 해석. 호스트 셸에서 sudo bash로 직접 실행 시
DNS 해석 실패해 모든 서비스가 HEALTH_FAIL로 오판정. docker inspect로
이미 정의된 compose healthcheck 결과를 직접 조회하도록 변경. starting
상태는 최대 60초 대기 후 최종 판정.
2026-05-16 02:11:38 +09:00
be322557ee fix(insta-lab): pin to bookworm + manual Chromium deps (drop --with-deps)
python:3.12-slim이 trixie(Debian 13)로 옮겨가면서 Playwright 1.48의
--with-deps가 ttf-ubuntu-font-family / ttf-unifont 등 ubuntu20.04
fallback 패키지를 시도하다 apt 실패 → Docker build exit 100.

해결: python:3.12-slim-bookworm 명시(Debian 12, Playwright 공식 지원)
+ Chromium 런타임 라이브러리 직접 apt 설치 + --with-deps 제거.
2026-05-16 01:58:53 +09:00
70438caa1f fix(scripts): blog-lab → insta-lab in deploy/healthcheck service lists
배포 스크립트 hardcoded 서비스 리스트가 blog-lab을 참조해 머지 후
첫 webhook 배포가 rsync(/repo/blog-lab 없음) + docker compose
(서비스 미정의) 양쪽에서 실패. SERVICES/BUILD_TARGETS/HEALTH_ENDPOINTS/
DATA_DIRS를 insta-lab 기준으로 갱신. CONTAINER_NAMES는 blog-lab 고아
정리용으로 유지(다음번 docker rm -f가 안전 실행).
2026-05-16 01:51:45 +09:00
20 changed files with 2884 additions and 22 deletions

View File

@@ -56,6 +56,8 @@ class InstaAgent(BaseAgent):
requires_approval=False)
await self.transition("working", "뉴스 수집·키워드 추출", task_id)
try:
prefs = await service_proxy.insta_get_preferences()
add_log(self.agent_id, f"insta preferences: {prefs}", "info", task_id)
await self._run_collect_and_extract()
kws = await service_proxy.insta_list_keywords(used=False)
if auto_select:
@@ -147,6 +149,12 @@ class InstaAgent(BaseAgent):
return {"ok": False, "message": "keyword_id 필수"}
await self._render_and_push(kid)
return {"ok": True}
if command == "collect_trends":
await messaging.send_raw("🌐 외부 트렌드 수집 시작")
created = await service_proxy.insta_collect_trends()
st = await self._wait_task(created["task_id"], step="trends_collect", timeout_sec=300)
await messaging.send_raw(f"✅ 트렌드 수집 완료: {st.get('message', '')}")
return {"ok": True, "result": st}
return {"ok": False, "message": f"Unknown command: {command}"}
async def on_callback(self, action: str, params: dict) -> dict:

View File

@@ -29,6 +29,12 @@ async def _run_insta_schedule():
if agent:
await agent.on_schedule()
async def _run_insta_trends_collect():
agent = AGENT_REGISTRY.get("insta")
if agent:
await agent.on_command("collect_trends", {})
async def _run_lotto_schedule():
agent = AGENT_REGISTRY.get("lotto")
if agent:
@@ -68,6 +74,7 @@ def init_scheduler():
id="stock_ai_news_sentiment",
)
scheduler.add_job(_run_insta_schedule, "cron", hour=9, minute=30, id="insta_pipeline")
scheduler.add_job(_run_insta_trends_collect, "cron", hour=9, minute=0, id="insta_trends_collect")
scheduler.add_job(_run_lotto_schedule, "cron", day_of_week="mon", hour=9, minute=0, id="lotto_curate")
scheduler.add_job(_run_youtube_research, "cron", hour=9, minute=0, id="youtube_research")
scheduler.add_job(_send_youtube_weekly_report, "cron", day_of_week="mon", hour=8, minute=0, id="youtube_weekly_report")

View File

@@ -167,6 +167,41 @@ async def insta_get_asset_bytes(slate_id: int, page: int) -> bytes:
return resp.content
async def insta_collect_trends(categories: Optional[list] = None) -> Dict[str, Any]:
payload = {"categories": categories} if categories else {}
resp = await _client.post(f"{INSTA_LAB_URL}/api/insta/trends/collect", json=payload)
resp.raise_for_status()
return resp.json()
async def insta_list_trends(source: Optional[str] = None,
category: Optional[str] = None,
days: int = 1) -> List[Dict[str, Any]]:
params: Dict[str, Any] = {"days": days}
if source:
params["source"] = source
if category:
params["category"] = category
resp = await _client.get(f"{INSTA_LAB_URL}/api/insta/trends", params=params)
resp.raise_for_status()
return resp.json().get("items", [])
async def insta_get_preferences() -> Dict[str, float]:
resp = await _client.get(f"{INSTA_LAB_URL}/api/insta/preferences")
resp.raise_for_status()
return {p["category"]: p["weight"] for p in resp.json().get("categories", [])}
async def insta_put_preferences(weights: Dict[str, float]) -> Dict[str, Any]:
resp = await _client.put(
f"{INSTA_LAB_URL}/api/insta/preferences",
json={"categories": weights},
)
resp.raise_for_status()
return resp.json()
# --- realestate-lab ---
async def realestate_collect() -> Dict[str, Any]:

View File

@@ -0,0 +1,73 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unittest.mock import AsyncMock
import pytest
from app.agents.insta import InstaAgent
@pytest.fixture(autouse=True)
def _init_db():
import gc
gc.collect()
if os.path.exists(_TMP):
os.remove(_TMP)
from app.db import init_db
init_db()
yield
gc.collect()
@pytest.mark.asyncio
async def test_on_command_collect_trends_dispatches(monkeypatch):
agent = InstaAgent()
fake_collect = AsyncMock(return_value={"task_id": "tcollect"})
fake_status = AsyncMock(return_value={"status": "succeeded", "result_id": 8,
"message": "naver:5, google:3"})
monkeypatch.setattr("app.agents.insta.service_proxy.insta_collect_trends", fake_collect)
monkeypatch.setattr("app.agents.insta.service_proxy.insta_task_status", fake_status)
monkeypatch.setattr("app.agents.insta.messaging.send_raw", AsyncMock(return_value={"ok": True}))
result = await agent.on_command("collect_trends", {})
assert result["ok"] is True
fake_collect.assert_awaited()
@pytest.mark.asyncio
async def test_on_schedule_loads_preferences(monkeypatch):
"""on_schedule이 preferences를 가져오는지 확인."""
agent = InstaAgent()
fake_collect = AsyncMock(return_value={"task_id": "t1"})
fake_extract = AsyncMock(return_value={"task_id": "t2"})
fake_status = AsyncMock(side_effect=[
{"status": "succeeded", "result_id": 0},
{"status": "succeeded", "result_id": 0},
])
fake_keywords = AsyncMock(return_value=[
{"id": 1, "keyword": "K", "category": "economy", "score": 0.9},
])
fake_prefs = AsyncMock(return_value={"economy": 0.6, "psychology": 0.4})
monkeypatch.setattr("app.agents.insta.service_proxy.insta_collect", fake_collect)
monkeypatch.setattr("app.agents.insta.service_proxy.insta_extract", fake_extract)
monkeypatch.setattr("app.agents.insta.service_proxy.insta_task_status", fake_status)
monkeypatch.setattr("app.agents.insta.service_proxy.insta_list_keywords", fake_keywords)
monkeypatch.setattr("app.agents.insta.service_proxy.insta_get_preferences", fake_prefs)
monkeypatch.setattr("app.agents.insta.messaging.send_raw", AsyncMock(return_value={"ok": True}))
agent.state = "idle"
await agent.on_schedule()
fake_prefs.assert_awaited()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,247 @@
# insta-lab Trends 탭 설계 — 외부 트렌드 수집 + 카테고리 가중치
작성일: 2026-05-16
상태: 사용자 승인 대기 → writing-plans 진입 예정
연관 문서: `2026-05-15-insta-agent-design.md` (insta-lab 기본 설계)
---
## 1. 목적·배경
insta-lab 운영 첫 사이클(2026-05-16 머지·배포 완료)에서 다음 두 가지 한계가 드러남:
1. **키워드 발견 소스가 사용자 시드 키워드에만 의존** — 진짜 "지금 뜨고 있는" 화제를 잡지 못함. 카테고리당 5개 시드를 고정해두고 거기에 매칭되는 기사만 모음.
2. **계정 정체성을 시스템이 모름** — 사용자가 "내 인스타 계정은 경제 위주"라고 정해도 시스템은 모든 카테고리를 균등하게 처리.
이 spec은 두 한계를 해소하기 위해:
- 외부 트렌드 소스(NAVER 인기 + Google Trends)를 추가해 "발견" 단계를 보강
- 계정 카테고리 가중치 모델을 도입해 자동 추출 알고리즘이 계정 정체성을 반영
---
## 2. 스코프
### 포함
- 신규 백엔드 모듈 `trend_collector.py` (NAVER 인기 + Google Trends 두 source)
- 신규 백엔드 모듈 변경: `keyword_extractor.py`에 가중치 기반 `extract_with_weights()` 추가
- DB 마이그레이션: `trending_keywords` 테이블에 `source` 컬럼 추가, `account_preferences` 신규 테이블
- 신규 API 4개 (`POST /trends/collect`, `GET /trends`, `GET/PUT /preferences`)
- 09:00 매일 cron 추가 (트렌드 수집), 09:30 cron 가중치 적용
- 프론트엔드: InstaCards 페이지에 탭 네비게이션 추가, Trends 탭 신규 3개 패널
### 제외
- pytrends 외 외부 SaaS 트렌드 API (BuzzSumo 등)
- 트렌드 시계열 차트
- 카테고리 자동 학습 (사용자 카드 생성 이력에서 선호도 추론)
- 트렌드 알림 (특정 키워드 등장 시 push)
---
## 3. 데이터 소스
### 3-1. NAVER 인기 (source = 'naver_popular')
- NAVER news.json API 재사용. 카테고리당 시드 키워드로 `sort=sim` (정확도 정렬 = 인기 시그널) 30건 수집
- 응답 기사 묶음에서 빈도어 추출 → 카테고리 매핑 (기존 keyword_extractor의 `_count_nouns` + `_top_candidates` 재사용)
- 상위 N개를 `trending_keywords` 테이블에 source='naver_popular'로 저장
### 3-2. Google Trends (source = 'google_trends')
- 라이브러리: `pytrends` (PyPI, MIT)
- `TrendReq(hl='ko-KR', tz=540).trending_searches(pn='south_korea')` 호출 → 일일 트렌딩 키워드 리스트
- 각 키워드에 대해 Claude Haiku 1회 호출로 카테고리 분류 (`economy` / `psychology` / `celebrity` / 사용자 추가 카테고리 / `uncategorized`)
- LLM 분류 비용 절감을 위해 분류 결과를 1일 캐시 — `trend_collector` 모듈 레벨 `_category_cache: dict[str, tuple[str, float]]` (keyword → (category, expires_ts)), 컨테이너 lifetime 동안 유효. 같은 키워드 재요청 시 cache hit. 캐시는 영속화하지 않음 (재시작 시 첫 호출은 LLM 재분류)
- `trending_keywords` 테이블에 source='google_trends', score=traffic 정규화값
### 3-3. 통합 저장
기존 `trending_keywords` 스키마에 한 컬럼 추가:
```sql
ALTER TABLE trending_keywords ADD COLUMN source TEXT NOT NULL DEFAULT 'manual';
-- 기존 row 모두 'manual'로 마킹됨 (시드 키워드에서 추출된 것)
-- 신규 source: 'naver_popular' | 'google_trends'
```
`source`별 추가 인덱스:
```sql
CREATE INDEX idx_tk_source ON trending_keywords(source, suggested_at DESC);
```
---
## 4. 카테고리 가중치 모델
### 4-1. 신규 테이블 `account_preferences`
```sql
CREATE TABLE account_preferences (
category TEXT PRIMARY KEY,
weight REAL NOT NULL DEFAULT 1.0,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
```
- 초기 시드: `economy=1.0`, `psychology=1.0`, `celebrity=1.0` (균등)
- 사용자는 0~10 자유 범위 (UI는 0~100 정수%로 노출, 백엔드에서 0~1 정규화)
- 합계 강제 없음. 알고리즘 내부에서 비율 정규화
- 카테고리 추가 자유. 단 추가 시 `prompt_templates.category_seeds`에도 시드 키워드 함께 정의해야 자동 추출에 반영됨 (UI에서 안내)
### 4-2. 가중치 기반 추출 알고리즘
기존 `keyword_extractor.extract_for_category(category, limit)` 유지. 신규:
```python
def extract_with_weights(weights: dict[str, float], total_limit: int) -> list[Keyword]:
"""카테고리 가중치 비율대로 키워드를 분배 추출."""
if not weights or sum(weights.values()) == 0:
# fallback: 균등 가중치
cats = list(DEFAULT_CATEGORY_SEEDS.keys())
weights = {c: 1.0 for c in cats}
total_weight = sum(weights.values())
saved = []
for category, w in weights.items():
if w <= 0:
continue
per_cat = round(total_limit * w / total_weight)
if per_cat <= 0:
continue
saved.extend(extract_for_category(category, limit=per_cat))
return saved
```
- `total_limit` 기본 15 (3 카테고리 × 5 시드 시절 합계와 동일)
- weight=0 카테고리는 skip (분류는 유지하되 자동 추출에서 제외하고 싶을 때)
---
## 5. API (insta-lab)
| 메서드 | 경로 | 설명 |
|--------|------|------|
| POST | `/api/insta/trends/collect` | 두 source 모두 수집 (BackgroundTask) → `{task_id}` |
| GET | `/api/insta/trends` | 트렌드 조회. query: `source` (`naver_popular`/`google_trends`/`all`), `category`, `days` (default 1, 의미: `suggested_at >= now() - days*24h`). 정렬 `suggested_at DESC, score DESC` |
| GET | `/api/insta/preferences` | 가중치 조회 → `{categories: [{category, weight, updated_at}]}` |
| PUT | `/api/insta/preferences` | body `{categories: {economy: 0.6, ...}}` → upsert |
기존 `/api/insta/keywords`는 source 필터 추가 (`?source=manual` 등). 미지정 시 모든 source 반환 (default behavior 유지).
---
## 6. 스케줄러 변경 (agent-office InstaAgent)
기존:
- 09:30 — 키워드 추출 → 텔레그램 푸시
신규:
- **09:00 — 외부 트렌드 수집** (NAVER 인기 + Google Trends) — `_run_insta_trends_collect()` 신규 cron
- **09:30 — 키워드 추출** (기존 + 가중치 적용) — InstaAgent가 `get_preferences()` 호출 후 `extract_with_weights()` 사용
수동 트리거: InstaAgent에 `on_command("collect_trends", {})` 신규 액션. 텔레그램에서 `/insta collect_trends` 슬래시 명령 또는 Insta 페이지 버튼에서 호출.
---
## 7. 프론트엔드 변경 (web-ui InstaCards.jsx)
### 7-1. 탭 네비게이션
기존 5개 패널을 두 탭으로 재구성:
| 탭 | 패널 |
|----|------|
| **Cards** (기본) | Trigger, Trending Keywords, Slates, SlateDetail, PromptEditor (기존 그대로) |
| **Trends** (신규) | AccountFocusPanel, ExternalTrendsPanel, PreferenceImpactPanel |
탭 컴포넌트: `<TabBar>` 단순 buttons (`activeTab` state), URL에 `?tab=trends` 쿼리로 deep-link 지원.
### 7-2. AccountFocusPanel
- 카테고리별 가중치 슬라이더 (0~100 정수%) + 우측 막대 차트 (분포 시각화)
- **+ 카테고리 추가** 버튼 → 모달로 카테고리명 + 시드 키워드 N개 입력 (시드는 category_seeds 프롬프트 템플릿에 머지)
- **저장** 버튼 → `PUT /preferences` (debounce 1초)
### 7-3. ExternalTrendsPanel
- 상단: **🔄 수동 수집** 버튼 + "마지막 수집: HH:MM" 라벨 + 진행 task box
- 두 컬럼 (반응형 → 모바일은 세로):
- **🔥 NAVER 인기** — 카테고리별 그룹핑, 각 카드: keyword + score + 카테고리 배지
- **🌐 Google Trends** — 단순 리스트, 각 카드: keyword + 카테고리 배지 + traffic
- 각 카드 우측에 **🎴** 버튼 → 즉시 `POST /slates` (기존 흐름)
- 색상 매핑: economy=#0F62FE, psychology=#A66CFF, celebrity=#FF5C8A, custom=#6B7280
### 7-4. PreferenceImpactPanel (작은 박스)
- "현재 가중치 기준 다음 자동 추출 결과 미리보기: economy 3 / psychology 2 / celebrity 0"
- 가중치 슬라이더 변경 시 즉시 클라이언트에서 계산해 갱신
- 컴팩트 1줄 표시
### 7-5. 신규 API 헬퍼 (src/api.js)
```js
export function getInstaTrends({ source, category, days = 1 } = {}) { ... }
export function instaCollectTrends() { ... }
export function getInstaPreferences() { ... }
export function putInstaPreferences(categories) { ... }
```
---
## 8. 에러 처리
| 상황 | 처리 |
|------|------|
| pytrends rate limit / 차단 | try/except → 빈 결과로 graceful degrade. NAVER 인기는 정상 수집 |
| LLM 분류 실패 | `uncategorized` 카테고리로 폴백, 사용자가 UI에서 수동 재분류 가능 |
| 가중치 합계 0 | 균등 가중치 (1/N)로 폴백, 로그 warning |
| 카테고리 추가했는데 시드 없음 | 자동 추출에서 자연스럽게 skip (NAVER 검색에 시드 필요), UI에서 "시드 키워드 추가 필요" 경고 |
| Google Trends 한국 region 부재 | hl='ko-KR' + pn='south_korea' 명시. 실패 시 빈 결과 |
---
## 9. 테스트
### insta-lab pytest
- `test_trend_collector.py` (4): `fetch_naver_popular` mocked, `fetch_google_trends` pytrends mocked, 카테고리 매핑, 캐시 hit
- `test_extract_with_weights.py` (3): 균등 가중치, 한쪽 0 가중치, fallback 빈 가중치
- `test_preferences_crud.py` (2): GET 기본값, PUT upsert
- `test_main_trends.py` (3): 신규 4개 엔드포인트 통합
### agent-office pytest
- `test_insta_agent_trends.py` (2): `on_schedule_trends` mocked, weight-applied extract
---
## 10. 마이그레이션 절차
1. `db.init_db()``ALTER TABLE trending_keywords ADD COLUMN source ...` 추가 — `PRAGMA table_info`로 컬럼 존재 여부 확인 후 idempotent하게 실행
2. `account_preferences` 테이블 신규 생성
3. 초기 시드: 기존 카테고리 economy/psychology/celebrity 모두 weight=1.0
4. 기존 `trending_keywords` row는 자동으로 source='manual' (컬럼 DEFAULT)
5. `requirements.txt``pytrends>=4.9` 추가
6. 배포 후 사용자가 Trends 탭에서 가중치 조정 (필수 아님, 균등이 디폴트 동작)
---
## 11. 운영 영향
| 항목 | 영향 |
|------|------|
| Anthropic 토큰 비용 | +미미 (Google Trends 1회당 ~20 키워드 × Haiku 분류 1콜 ≈ 600 토큰/일) |
| DB 크기 | +미미 (트렌드 row 일일 ~50개, 카테고리당 30 + Google 20) |
| NAS CPU | +낮음 (pytrends + NAVER API 호출만, LLM은 외부) |
| 카드 생성 흐름 | 변경 없음. 트렌드는 "발견" 단계만 보강 |
---
## 12. 완료 정의
- [ ] `trending_keywords.source` 컬럼 마이그레이션 적용, 기존 row 모두 'manual'로 표시됨
- [ ] `account_preferences` 테이블 생성, 초기 3개 카테고리 weight=1.0
- [ ] `POST /api/insta/trends/collect` 호출 시 NAVER 인기 + Google Trends 모두 수집되어 DB 저장
- [ ] `GET /api/insta/trends?source=google_trends` 결과 카테고리 분류됨
- [ ] `PUT /api/insta/preferences` 후 09:30 cron이 가중치 비율대로 추출
- [ ] 09:00 cron 등록, 매일 자동 트렌드 수집
- [ ] Insta 페이지에 Cards/Trends 탭 전환 작동
- [ ] Trends 탭의 AccountFocusPanel에서 가중치 변경·저장 가능
- [ ] ExternalTrendsPanel에서 NAVER 인기 + Google Trends 한 눈에 표시, 각 카드 생성 트리거 작동
- [ ] PreferenceImpactPanel 미리보기 갱신
- [ ] insta-lab pytest 전체 통과 (기존 21 + 신규 12 = 33)
- [ ] agent-office pytest 전체 통과

View File

@@ -1,15 +1,23 @@
FROM python:3.12-slim
FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1
WORKDIR /app
# Korean fonts + Chromium runtime deps (Debian 12 / bookworm)
# `playwright install --with-deps`를 쓰지 않는 이유: 그 명령은 Ubuntu 패키지명을
# 사용해 Debian에서 ttf-ubuntu-font-family / ttf-unifont 등 없는 패키지를 시도
# → apt 실패. 대신 Chromium이 실제 필요로 하는 라이브러리만 명시 설치.
RUN apt-get update && apt-get install -y --no-install-recommends \
fonts-noto-cjk fonts-noto-cjk-extra \
libnss3 libnspr4 libdbus-1-3 libatk1.0-0 libatk-bridge2.0-0 \
libcups2 libdrm2 libxkbcommon0 libxcomposite1 libxdamage1 \
libxfixes3 libxrandr2 libgbm1 libxshmfence1 libpango-1.0-0 \
libcairo2 libasound2 libatspi2.0-0 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN playwright install --with-deps chromium
RUN playwright install chromium
COPY . .

View File

@@ -101,6 +101,29 @@ def init_db() -> None:
)
""")
# source column for trending_keywords (idempotent ALTER)
cols = [r[1] for r in conn.execute("PRAGMA table_info(trending_keywords)").fetchall()]
if "source" not in cols:
conn.execute("ALTER TABLE trending_keywords ADD COLUMN source TEXT NOT NULL DEFAULT 'manual'")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tk_source ON trending_keywords(source, suggested_at DESC)")
# account_preferences — 카테고리 가중치
conn.execute("""
CREATE TABLE IF NOT EXISTS account_preferences (
category TEXT PRIMARY KEY,
weight REAL NOT NULL DEFAULT 1.0,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# seed defaults if table empty
existing = conn.execute("SELECT COUNT(*) FROM account_preferences").fetchone()[0]
if existing == 0:
for cat in ("economy", "psychology", "celebrity"):
conn.execute(
"INSERT INTO account_preferences(category, weight) VALUES(?,?)",
(cat, 1.0),
)
# ── news_articles ────────────────────────────────────────────────
def add_news_article(row: Dict[str, Any]) -> int:
@@ -132,8 +155,12 @@ def list_news_articles(category: Optional[str] = None, days: int = 1) -> List[Di
def add_trending_keyword(row: Dict[str, Any]) -> int:
with _conn() as conn:
cur = conn.execute(
"INSERT INTO trending_keywords(keyword, category, score, articles_count) VALUES(?,?,?,?)",
(row["keyword"], row["category"], float(row.get("score", 0.0)), int(row.get("articles_count", 0))),
"INSERT INTO trending_keywords(keyword, category, score, articles_count, source) VALUES(?,?,?,?,?)",
(
row["keyword"], row["category"],
float(row.get("score", 0.0)), int(row.get("articles_count", 0)),
row.get("source", "manual"),
),
)
return cur.lastrowid
@@ -276,3 +303,50 @@ def get_prompt_template(name: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM prompt_templates WHERE name=?", (name,)).fetchone()
return dict(row) if row else None
# ── external trends ─────────────────────────────────────────────
def add_external_trend(row: Dict[str, Any]) -> int:
"""`source` 필수 — naver_popular | google_trends. trending_keywords에 인서트."""
if "source" not in row:
raise ValueError("add_external_trend requires 'source' field")
return add_trending_keyword(row)
def list_trends(source: Optional[str] = None, category: Optional[str] = None,
days: int = 1) -> List[Dict[str, Any]]:
sql = "SELECT * FROM trending_keywords WHERE suggested_at >= datetime('now', ?)"
params: List[Any] = [f"-{int(days)} days"]
if source and source != "all":
sql += " AND source=?"
params.append(source)
if category:
sql += " AND category=?"
params.append(category)
sql += " ORDER BY suggested_at DESC, score DESC"
with _conn() as conn:
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
# ── account_preferences ─────────────────────────────────────────
def get_preferences() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT category, weight, updated_at FROM account_preferences ORDER BY category ASC"
).fetchall()
return [dict(r) for r in rows]
def upsert_preferences(weights: Dict[str, float]) -> None:
"""전체 upsert. 기존에 있던 카테고리는 weight 갱신, 신규는 INSERT.
명시되지 않은 기존 카테고리는 그대로 둔다 (삭제 X). 삭제 필요 시 별도 API로."""
with _conn() as conn:
for cat, w in weights.items():
conn.execute("""
INSERT INTO account_preferences(category, weight)
VALUES(?,?)
ON CONFLICT(category) DO UPDATE SET
weight=excluded.weight,
updated_at=strftime('%Y-%m-%dT%H:%M:%fZ','now')
""", (cat, float(w)))

View File

@@ -81,3 +81,22 @@ def extract_for_category(category: str, limit: int = KEYWORDS_PER_CATEGORY) -> L
})
saved.append({"id": kid, **kw, "category": category})
return saved
def extract_with_weights(weights: Dict[str, float], total_limit: int) -> List[Dict[str, Any]]:
"""카테고리 가중치 비율대로 키워드를 분배 추출."""
from .config import DEFAULT_CATEGORY_SEEDS
if not weights or sum(weights.values()) == 0:
cats = list(DEFAULT_CATEGORY_SEEDS.keys())
weights = {c: 1.0 for c in cats}
total_weight = sum(weights.values())
out: List[Dict[str, Any]] = []
for category, w in weights.items():
if w <= 0:
continue
per_cat = round(total_limit * w / total_weight)
if per_cat <= 0:
continue
out.extend(extract_for_category(category, limit=per_cat))
return out

View File

@@ -15,7 +15,7 @@ from .config import (
CORS_ALLOW_ORIGINS, NAVER_CLIENT_ID, ANTHROPIC_API_KEY,
INSTA_DATA_PATH, DB_PATH, DEFAULT_CATEGORY_SEEDS, KEYWORDS_PER_CATEGORY,
)
from . import db, news_collector, keyword_extractor, card_writer, card_renderer
from . import db, news_collector, keyword_extractor, card_writer, card_renderer, trend_collector
logger = logging.getLogger(__name__)
app = FastAPI()
@@ -99,11 +99,16 @@ class ExtractRequest(BaseModel):
categories: Optional[list[str]] = None
async def _bg_extract(task_id: str, categories: list[str]):
async def _bg_extract(task_id: str, categories: Optional[list[str]] = None):
try:
db.update_task(task_id, "processing", 10, "추출 중")
for cat in categories:
keyword_extractor.extract_for_category(cat, limit=KEYWORDS_PER_CATEGORY)
prefs_rows = db.get_preferences()
weights = {p["category"]: p["weight"] for p in prefs_rows}
if categories:
# 사용자가 카테고리 명시한 경우만 그 서브셋으로 균등 가중치 (override)
weights = {c: 1.0 for c in categories}
total = KEYWORDS_PER_CATEGORY * max(1, len([w for w in weights.values() if w > 0]))
keyword_extractor.extract_with_weights(weights, total_limit=total)
db.update_task(task_id, "succeeded", 100, "완료", result_id=0)
except Exception as e:
logger.exception("extract failed")
@@ -119,7 +124,13 @@ def extract_keywords(req: ExtractRequest, bg: BackgroundTasks):
@app.get("/api/insta/keywords")
def list_keywords(category: Optional[str] = None, used: Optional[bool] = None):
def list_keywords(
category: Optional[str] = None,
used: Optional[bool] = None,
source: Optional[str] = None,
):
if source:
return {"items": db.list_trends(source=source, category=category, days=30)}
return {"items": db.list_trending_keywords(category=category, used=used)}
@@ -243,3 +254,52 @@ def get_prompt(name: str):
def upsert_prompt(name: str, body: TemplateBody):
db.upsert_prompt_template(name, body.template, body.description)
return db.get_prompt_template(name)
# ── Trends ───────────────────────────────────────────────────────
class TrendsCollectRequest(BaseModel):
categories: Optional[list[str]] = None
async def _bg_collect_trends(task_id: str, categories: list[str]):
try:
db.update_task(task_id, "processing", 10, "외부 트렌드 수집 중")
result = trend_collector.collect_all(categories)
msg = f"naver:{result['naver_popular']}, google:{result['google_trends']}"
db.update_task(task_id, "succeeded", 100, msg, result_id=sum(result.values()))
except Exception as e:
logger.exception("trends collect failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/trends/collect")
def collect_trends(req: TrendsCollectRequest, bg: BackgroundTasks):
cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys())
tid = db.create_task("trends_collect", {"categories": cats})
bg.add_task(_bg_collect_trends, tid, cats)
return {"task_id": tid, "categories": cats}
@app.get("/api/insta/trends")
def list_trends_endpoint(
source: Optional[str] = None,
category: Optional[str] = None,
days: int = Query(1, ge=1, le=90),
):
return {"items": db.list_trends(source=source, category=category, days=days)}
# ── Preferences ──────────────────────────────────────────────────
class PreferencesBody(BaseModel):
categories: dict[str, float]
@app.get("/api/insta/preferences")
def get_preferences_endpoint():
return {"categories": db.get_preferences()}
@app.put("/api/insta/preferences")
def put_preferences_endpoint(body: PreferencesBody):
db.upsert_preferences(body.categories)
return {"categories": db.get_preferences()}

View File

@@ -0,0 +1,183 @@
"""외부 트렌드 수집 — NAVER 인기 + Google Trends + LLM 카테고리 분류.
Phase B Task 3: Google Trends integration via pytrends + Anthropic Haiku 분류 캐시 (24h TTL).
"""
import json
import logging
import re
import time
from typing import Any, Dict, List, Optional
import requests
from anthropic import Anthropic
from pytrends.request import TrendReq
from .config import (
NAVER_CLIENT_ID, NAVER_CLIENT_SECRET, DEFAULT_CATEGORY_SEEDS,
ANTHROPIC_API_KEY, ANTHROPIC_MODEL_HAIKU,
)
from . import db
from .news_collector import _clean
from .keyword_extractor import _count_nouns, _top_candidates
logger = logging.getLogger(__name__)
NEWS_URL = "https://openapi.naver.com/v1/search/news.json"
_NAVER_HEADERS = {
"X-Naver-Client-Id": NAVER_CLIENT_ID,
"X-Naver-Client-Secret": NAVER_CLIENT_SECRET,
}
def _seeds_for(category: str) -> List[str]:
pt = db.get_prompt_template("category_seeds")
if pt and pt.get("template"):
try:
data = json.loads(pt["template"])
if category in data:
return list(data[category])
except Exception:
pass
return list(DEFAULT_CATEGORY_SEEDS.get(category, []))
def fetch_naver_popular(category: str, per_seed: int = 30, top_n: int = 10) -> List[Dict[str, Any]]:
"""카테고리 시드 키워드들로 NAVER news.json `sort=sim` 호출,
응답 기사 묶음에서 빈도어 추출 후 상위 N개 반환."""
seeds = _seeds_for(category)
if not seeds:
return []
blob_parts: List[str] = []
for seed in seeds:
try:
resp = requests.get(
NEWS_URL,
headers=_NAVER_HEADERS,
params={"query": seed, "display": per_seed, "sort": "sim"},
timeout=10,
)
resp.raise_for_status()
for item in resp.json().get("items", []):
blob_parts.append(_clean(item.get("title", "")))
blob_parts.append(_clean(item.get("description", "")))
except Exception as e:
logger.warning("fetch_naver_popular seed=%s err=%s", seed, e)
continue
text = "\n".join(blob_parts)
counts = _count_nouns(text)
candidates = _top_candidates(counts, n=top_n)
if not candidates:
return []
max_count = candidates[0][1] or 1
return [
{
"keyword": k,
"category": category,
"source": "naver_popular",
"score": round(min(1.0, c / max_count), 4),
"articles_count": c,
}
for k, c in candidates
]
def collect_naver_popular_for(categories: List[str]) -> int:
total = 0
for cat in categories:
trends = fetch_naver_popular(cat)
for t in trends:
db.add_external_trend(t)
total += 1
return total
# ── LLM 분류 캐시 ────────────────────────────────────────────────────────────
_CACHE_TTL_SEC = 24 * 3600
_category_cache: Dict[str, tuple] = {} # keyword -> (category, expires_ts)
def _llm_classify_one(keyword: str) -> str:
"""Claude Haiku 1회 호출로 단일 키워드 분류."""
if not ANTHROPIC_API_KEY:
return "uncategorized"
seeds_template = db.get_prompt_template("category_seeds")
if seeds_template and seeds_template.get("template"):
try:
allowed = sorted(json.loads(seeds_template["template"]).keys())
except Exception:
allowed = sorted(DEFAULT_CATEGORY_SEEDS.keys())
else:
allowed = sorted(DEFAULT_CATEGORY_SEEDS.keys())
allowed.append("uncategorized")
client = Anthropic(api_key=ANTHROPIC_API_KEY)
msg = client.messages.create(
model=ANTHROPIC_MODEL_HAIKU,
max_tokens=20,
messages=[{
"role": "user",
"content": (
f"다음 한국어 트렌딩 키워드를 카테고리 중 하나로 분류해라. "
f"카테고리: {allowed}. 키워드: '{keyword}'. "
f"카테고리명 한 단어만 출력. 다른 텍스트 금지."
),
}],
)
raw = msg.content[0].text.strip().lower()
for cat in allowed:
if cat.lower() in raw:
return cat
return "uncategorized"
def classify_keyword(keyword: str) -> str:
now = time.time()
cached = _category_cache.get(keyword)
if cached and cached[1] > now:
return cached[0]
cat = _llm_classify_one(keyword)
_category_cache[keyword] = (cat, now + _CACHE_TTL_SEC)
return cat
# ── Google Trends ─────────────────────────────────────────────────────────────
def fetch_google_trends() -> List[Dict[str, Any]]:
"""pytrends 한국 daily trending searches. 실패 시 빈 리스트."""
try:
pytrends = TrendReq(hl="ko-KR", tz=540)
df = pytrends.trending_searches(pn="south_korea")
except Exception as e:
logger.warning("Google Trends fetch failed: %s", e)
return []
items: List[Dict[str, Any]] = []
for idx, row in df.iterrows():
kw = str(row.iloc[0]).strip()
if not kw:
continue
cat = classify_keyword(kw)
rank_score = round(max(0.0, 1.0 - (idx / max(1, len(df)))), 4)
items.append({
"keyword": kw,
"category": cat,
"source": "google_trends",
"score": rank_score,
"articles_count": 0,
})
return items
def collect_google_trends() -> int:
items = fetch_google_trends()
for it in items:
db.add_external_trend(it)
return len(items)
def collect_all(categories: List[str]) -> Dict[str, int]:
naver_n = collect_naver_popular_for(categories)
google_n = collect_google_trends()
return {"naver_popular": naver_n, "google_trends": google_n}

View File

@@ -7,3 +7,4 @@ jinja2>=3.1.4
playwright==1.48.0
pytest>=8.0
pytest-asyncio>=0.24
pytrends>=4.9

View File

@@ -24,7 +24,7 @@ def tmp_db(monkeypatch):
pass
def test_init_db_creates_six_tables(tmp_db):
def test_init_db_creates_seven_tables(tmp_db):
with db_module._conn() as conn:
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
@@ -33,6 +33,7 @@ def test_init_db_creates_six_tables(tmp_db):
assert names == sorted([
"news_articles", "trending_keywords", "card_slates",
"card_assets", "generation_tasks", "prompt_templates",
"account_preferences",
])

View File

@@ -0,0 +1,71 @@
import os
import gc
import tempfile
from unittest.mock import patch
import pytest
from app import db as db_module
from app import keyword_extractor
@pytest.fixture
def tmp_db(monkeypatch):
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
monkeypatch.setattr(db_module, "DB_PATH", path)
db_module.init_db()
yield path
gc.collect()
for ext in ("", "-wal", "-shm"):
try:
os.remove(path + ext)
except OSError:
pass
def test_extract_with_weights_proportional(tmp_db, monkeypatch):
calls = []
def fake_extract(category, limit):
calls.append((category, limit))
return [{"id": i, "keyword": f"{category}{i}", "category": category, "score": 0.5}
for i in range(limit)]
monkeypatch.setattr(keyword_extractor, "extract_for_category", fake_extract)
out = keyword_extractor.extract_with_weights(
{"economy": 0.6, "psychology": 0.3, "celebrity": 0.1}, total_limit=10,
)
by_cat = {c: l for c, l in calls}
assert by_cat == {"economy": 6, "psychology": 3, "celebrity": 1}
assert len(out) == 10
def test_extract_with_weights_skips_zero(tmp_db, monkeypatch):
calls = []
def fake_extract(category, limit):
calls.append((category, limit))
return []
monkeypatch.setattr(keyword_extractor, "extract_for_category", fake_extract)
keyword_extractor.extract_with_weights(
{"economy": 1.0, "celebrity": 0.0}, total_limit=10,
)
cats_called = [c for c, _ in calls]
assert "celebrity" not in cats_called
assert "economy" in cats_called
def test_extract_with_weights_fallback_to_equal(tmp_db, monkeypatch):
calls = []
def fake_extract(category, limit):
calls.append((category, limit))
return []
monkeypatch.setattr(keyword_extractor, "extract_for_category", fake_extract)
keyword_extractor.extract_with_weights({}, total_limit=9)
by_cat = {c: l for c, l in calls}
assert set(by_cat.keys()) == {"economy", "psychology", "celebrity"}
assert all(l == 3 for l in by_cat.values())

View File

@@ -0,0 +1,83 @@
import os
import gc
import tempfile
import pytest
from fastapi.testclient import TestClient
from app import db as db_module
@pytest.fixture
def client(monkeypatch):
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
monkeypatch.setattr(db_module, "DB_PATH", path)
db_module.init_db()
from app import main
monkeypatch.setattr(main, "DB_PATH", path)
with TestClient(main.app) as c:
yield c
gc.collect()
for ext in ("", "-wal", "-shm"):
try:
os.remove(path + ext)
except OSError:
pass
def test_get_preferences_returns_defaults(client):
resp = client.get("/api/insta/preferences")
assert resp.status_code == 200
cats = {p["category"]: p["weight"] for p in resp.json()["categories"]}
assert cats == {"economy": 1.0, "psychology": 1.0, "celebrity": 1.0}
def test_put_preferences_upsert(client):
resp = client.put("/api/insta/preferences",
json={"categories": {"economy": 0.7, "psychology": 0.2, "tech": 0.5}})
assert resp.status_code == 200
cats = {p["category"]: p["weight"] for p in resp.json()["categories"]}
assert cats["economy"] == 0.7
assert cats["tech"] == 0.5
def test_list_trends_filter(client):
db_module.add_external_trend({"keyword": "A", "category": "economy",
"source": "naver_popular", "score": 1.0})
db_module.add_external_trend({"keyword": "B", "category": "celebrity",
"source": "google_trends", "score": 0.8})
resp = client.get("/api/insta/trends?source=naver_popular")
items = resp.json()["items"]
assert {it["keyword"] for it in items} == {"A"}
def test_collect_trends_kicks_background(client, monkeypatch):
from app import main, trend_collector
captured = {"called": False}
def fake_collect_all(cats):
captured["called"] = True
return {"naver_popular": 3, "google_trends": 2}
monkeypatch.setattr(trend_collector, "collect_all", fake_collect_all)
resp = client.post("/api/insta/trends/collect", json={})
assert resp.status_code == 200
task_id = resp.json()["task_id"]
for _ in range(20):
st = client.get(f"/api/insta/tasks/{task_id}").json()
if st["status"] in ("succeeded", "failed"):
break
assert st["status"] == "succeeded"
assert captured["called"] is True
def test_list_keywords_filters_by_source(client):
db_module.add_trending_keyword({"keyword": "M", "category": "economy",
"score": 0.4, "articles_count": 1, "source": "manual"})
db_module.add_external_trend({"keyword": "N", "category": "economy",
"source": "naver_popular", "score": 0.9})
resp = client.get("/api/insta/keywords?source=manual")
items = resp.json()["items"]
assert {it["keyword"] for it in items} == {"M"}

View File

@@ -0,0 +1,77 @@
import os
import gc
import tempfile
import pytest
from app import db as db_module
@pytest.fixture
def tmp_db(monkeypatch):
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
monkeypatch.setattr(db_module, "DB_PATH", path)
db_module.init_db()
yield path
gc.collect()
for ext in ("", "-wal", "-shm"):
try:
os.remove(path + ext)
except OSError:
pass
def test_init_db_creates_account_preferences(tmp_db):
with db_module._conn() as conn:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
names = {r[0] for r in rows}
assert "account_preferences" in names
def test_init_db_seeds_default_weights(tmp_db):
prefs = db_module.get_preferences()
cats = {p["category"]: p["weight"] for p in prefs}
assert cats["economy"] == pytest.approx(1.0)
assert cats["psychology"] == pytest.approx(1.0)
assert cats["celebrity"] == pytest.approx(1.0)
def test_upsert_preferences_replaces_weights(tmp_db):
db_module.upsert_preferences({"economy": 0.6, "psychology": 0.3, "celebrity": 0.1, "tech": 0.5})
prefs = {p["category"]: p["weight"] for p in db_module.get_preferences()}
assert prefs["economy"] == pytest.approx(0.6)
assert prefs["tech"] == pytest.approx(0.5)
assert "celebrity" in prefs and prefs["celebrity"] == pytest.approx(0.1)
def test_trending_keywords_source_column_exists(tmp_db):
with db_module._conn() as conn:
cols = [r[1] for r in conn.execute("PRAGMA table_info(trending_keywords)").fetchall()]
assert "source" in cols
def test_add_trending_keyword_default_source(tmp_db):
kid = db_module.add_trending_keyword({
"keyword": "K", "category": "economy", "score": 0.5, "articles_count": 3,
})
with db_module._conn() as conn:
row = conn.execute("SELECT source FROM trending_keywords WHERE id=?", (kid,)).fetchone()
assert row[0] == "manual"
def test_add_external_trend_stores_source(tmp_db):
tid = db_module.add_external_trend({
"keyword": "급등주", "category": "economy", "source": "naver_popular", "score": 0.9,
})
rows = db_module.list_trends(source="naver_popular")
assert any(r["id"] == tid and r["keyword"] == "급등주" for r in rows)
def test_list_trends_filters_by_source_and_category(tmp_db):
db_module.add_external_trend({"keyword": "A", "category": "economy", "source": "naver_popular", "score": 1.0})
db_module.add_external_trend({"keyword": "B", "category": "celebrity", "source": "google_trends", "score": 1.0})
only_naver = db_module.list_trends(source="naver_popular")
assert {r["keyword"] for r in only_naver} == {"A"}
only_celeb_google = db_module.list_trends(source="google_trends", category="celebrity")
assert {r["keyword"] for r in only_celeb_google} == {"B"}

View File

@@ -0,0 +1,121 @@
import os
import gc
import tempfile
from unittest.mock import patch, MagicMock
import pytest
from app import db as db_module
from app import trend_collector
@pytest.fixture
def tmp_db(monkeypatch):
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
monkeypatch.setattr(db_module, "DB_PATH", path)
db_module.init_db()
yield path
gc.collect()
for ext in ("", "-wal", "-shm"):
try:
os.remove(path + ext)
except OSError:
pass
NAVER_RESPONSE = {
"items": [
{"title": "<b>기준금리</b> 인상", "link": "https://n.news.naver.com/a/1", "description": "한국은행 발표"},
{"title": "환율 급등", "link": "https://n.news.naver.com/a/2", "description": "달러 강세"},
{"title": "기준금리 추가 인상", "link": "https://n.news.naver.com/a/3", "description": "추가 발표"},
],
}
def test_fetch_naver_popular_extracts_top_terms(tmp_db, monkeypatch):
fake_resp = MagicMock()
fake_resp.json.return_value = NAVER_RESPONSE
fake_resp.raise_for_status.return_value = None
with patch.object(trend_collector.requests, "get", return_value=fake_resp):
trends = trend_collector.fetch_naver_popular("economy", per_seed=10, top_n=5)
keywords = [t["keyword"] for t in trends]
assert "기준금리" in keywords
for t in trends:
assert t["category"] == "economy"
assert t["source"] == "naver_popular"
assert 0.0 <= t["score"] <= 1.0
def test_collect_naver_writes_to_db(tmp_db, monkeypatch):
fake_resp = MagicMock()
fake_resp.json.return_value = NAVER_RESPONSE
fake_resp.raise_for_status.return_value = None
with patch.object(trend_collector.requests, "get", return_value=fake_resp):
n = trend_collector.collect_naver_popular_for(["economy"])
assert n > 0
rows = db_module.list_trends(source="naver_popular")
assert len(rows) > 0
assert all(r["source"] == "naver_popular" for r in rows)
def test_classify_keyword_with_cache(monkeypatch):
calls = {"n": 0}
def fake_claude(keyword: str) -> str:
calls["n"] += 1
return "economy"
monkeypatch.setattr(trend_collector, "_llm_classify_one", fake_claude)
trend_collector._category_cache.clear()
c1 = trend_collector.classify_keyword("기준금리")
c2 = trend_collector.classify_keyword("기준금리")
assert c1 == c2 == "economy"
assert calls["n"] == 1
def test_fetch_google_trends_parses_and_classifies(tmp_db, monkeypatch):
class FakePyTrends:
def __init__(self, *_a, **_kw):
pass
def trending_searches(self, pn="south_korea"):
import pandas as pd
return pd.DataFrame({"0": ["기준금리", "BTS 컴백", "스트레스 관리"]})
monkeypatch.setattr(trend_collector, "TrendReq", FakePyTrends)
monkeypatch.setattr(trend_collector, "classify_keyword",
lambda kw: {"기준금리": "economy", "BTS 컴백": "celebrity",
"스트레스 관리": "psychology"}.get(kw, "uncategorized"))
trends = trend_collector.fetch_google_trends()
by_kw = {t["keyword"]: t for t in trends}
assert by_kw["기준금리"]["category"] == "economy"
assert by_kw["BTS 컴백"]["category"] == "celebrity"
assert by_kw["스트레스 관리"]["category"] == "psychology"
assert all(t["source"] == "google_trends" for t in trends)
def test_collect_all_invokes_both_sources(tmp_db, monkeypatch):
monkeypatch.setattr(trend_collector, "collect_naver_popular_for",
lambda cats: 5)
monkeypatch.setattr(trend_collector, "collect_google_trends",
lambda: 3)
out = trend_collector.collect_all(["economy"])
assert out == {"naver_popular": 5, "google_trends": 3}
def test_fetch_google_trends_graceful_on_pytrends_failure(monkeypatch):
class FakePyTrends:
def __init__(self, *_a, **_kw):
pass
def trending_searches(self, pn="south_korea"):
raise RuntimeError("rate limited")
monkeypatch.setattr(trend_collector, "TrendReq", FakePyTrends)
out = trend_collector.fetch_google_trends()
assert out == []

View File

@@ -2,7 +2,7 @@
set -euo pipefail
# ── 서비스 목록 (한 곳에서만 관리) ──
SERVICES="lotto travel-proxy deployer stock music-lab blog-lab realestate-lab agent-office personal packs-lab nginx scripts"
SERVICES="lotto travel-proxy deployer stock music-lab insta-lab realestate-lab agent-office personal packs-lab nginx scripts"
# 1. 자동 감지: Docker 컨테이너 내부인가?
if [ -d "/repo" ] && [ -d "/runtime" ]; then

View File

@@ -7,13 +7,13 @@ flock -n 200 || { echo "Deploy already running, skipping"; exit 0; }
# ── 서비스 목록 (한 곳에서만 관리) ──
# docker compose 서비스명 (deployer 제외 — 자기 자신을 재빌드하면 스크립트 중단)
BUILD_TARGETS="lotto travel-proxy stock music-lab blog-lab realestate-lab agent-office personal packs-lab frontend"
# 컨테이너 이름 (고아 정리용)
CONTAINER_NAMES="lotto stock music-lab blog-lab realestate-lab agent-office personal packs-lab travel-proxy frontend"
BUILD_TARGETS="lotto travel-proxy stock music-lab insta-lab realestate-lab agent-office personal packs-lab frontend"
# 컨테이너 이름 (고아 정리용 — blog-lab은 폐기 대상으로 정리 리스트에 유지)
CONTAINER_NAMES="lotto stock music-lab insta-lab blog-lab realestate-lab agent-office personal packs-lab travel-proxy frontend"
# 헬스체크 대상
HEALTH_ENDPOINTS="lotto stock travel-proxy music-lab blog-lab realestate-lab agent-office personal packs-lab"
HEALTH_ENDPOINTS="lotto stock travel-proxy music-lab insta-lab realestate-lab agent-office personal packs-lab"
# data 디렉토리 (packs-lab은 별도 media/packs 사용)
DATA_DIRS="music stock blog realestate agent-office personal"
DATA_DIRS="music stock insta realestate agent-office personal"
# 1. 자동 감지: Docker 컨테이너 내부인가?
if [ -d "/repo" ] && [ -d "/runtime" ]; then
@@ -96,13 +96,25 @@ docker compose up -d --build $BUILD_TARGETS
docker exec frontend nginx -s reload 2>/dev/null || true
# ── 배포 후 헬스체크 ──
echo "Waiting for services to start..."
sleep 5
# Docker compose의 healthcheck 블록이 이미 모든 컨테이너에 정의되어 있으므로
# `docker inspect`로 컨테이너 health state를 직접 조회. 이 방식은
# (a) deployer 컨테이너 내부에서도 호스트에서도 동일하게 동작
# (b) 호스트네임 DNS 해석에 의존하지 않음 (호스트 셸에서는 'lotto' 등 미해석)
echo "Waiting for services to become healthy..."
HEALTH_OK=true
for svc in $HEALTH_ENDPOINTS; do
if ! curl -sf --max-time 10 --retry 2 --retry-delay 3 "http://$svc:8000/health" > /dev/null 2>&1; then
echo "HEALTH_FAIL: http://$svc:8000/health"
health="starting"
# 최대 60초 (5초×12) 동안 starting → healthy 전이 대기
for _ in $(seq 1 12); do
health=$(docker inspect --format='{{.State.Health.Status}}' "$svc" 2>/dev/null || echo "missing")
if [ "$health" = "healthy" ] || [ "$health" = "unhealthy" ] || [ "$health" = "missing" ]; then
break
fi
sleep 5
done
if [ "$health" != "healthy" ]; then
echo "HEALTH_FAIL: $svc (state=$health)"
HEALTH_OK=false
fi
done

View File

@@ -44,8 +44,9 @@ check_url "Music Health" "http://localhost:18600/health"
check_url "Music Providers" "http://localhost:18600/api/music/providers"
echo ""
echo "--- 4. Blog Lab ---"
check_url "Blog Health" "http://localhost:18700/health"
echo "--- 4. Insta Lab ---"
check_url "Insta Health" "http://localhost:18700/health"
check_url "Insta Status" "http://localhost:18700/api/insta/status"
echo ""
echo "--- 5. Realestate Lab ---"