feat/insta-agent #3

Merged
gahusb merged 17 commits from feat/insta-agent into main 2026-05-16 01:43:21 +09:00
24 changed files with 0 additions and 2812 deletions
Showing only changes of commit 5485d4858a - Show all commits

View File

@@ -1,192 +0,0 @@
import asyncio
from typing import Optional
from .base import BaseAgent
from ..db import (
create_task, update_task_status, approve_task, reject_task,
get_task, get_agent_config, add_log,
)
from .. import service_proxy
from .. import telegram_bot
DEFAULT_TREND_KEYWORDS = [
"다이어트 식단", "재택근무 꿀템", "캠핑 장비 추천",
"홈트레이닝", "제주도 여행", "에어프라이어 레시피",
]
class BlogAgent(BaseAgent):
"""블로그 마케팅 에이전트.
매일 10:00 자동 실행: 키워드 1개 리서치 → 글 생성 → 마케터 → 평가자
→ 평가 점수와 요약을 텔레그램 승인 요청으로 푸시
→ 승인 시 `published` 상태로 전환, 거절 시 재생성
"""
agent_id = "blog"
display_name = "블로그 마케터"
async def on_schedule(self) -> None:
if self.state not in ("idle", "break"):
return
config = get_agent_config(self.agent_id) or {}
custom = config.get("custom_config", {}) or {}
keywords = custom.get("trend_keywords") or DEFAULT_TREND_KEYWORDS
if not keywords:
return
import random
keyword = random.choice(keywords)
task_id = create_task(
self.agent_id,
"auto_blog_pipeline",
{"keyword": keyword},
requires_approval=True,
)
await self.transition("working", f"리서치: {keyword}", task_id)
asyncio.create_task(self._run_pipeline(task_id, keyword))
async def _await_task(self, step: str, task_id: str, timeout_sec: int = 240) -> Optional[int]:
"""blog-lab BackgroundTask 완료 폴링. 완료 시 result_id 반환."""
attempts = max(1, timeout_sec // 5)
for _ in range(attempts):
await asyncio.sleep(5)
status = await service_proxy.blog_task_status(task_id)
s = status.get("status")
if s == "succeeded":
return status.get("result_id")
if s == "failed":
raise Exception(f"{step} failed: {status.get('error')}")
raise Exception(f"{step} timeout ({timeout_sec}s 내 완료되지 않음)")
async def _run_pipeline(self, task_id: str, keyword: str) -> None:
try:
# 1) 리서치
research = await service_proxy.blog_research(keyword)
keyword_id = await self._await_task("research", research.get("task_id"), 180)
if not keyword_id:
raise Exception("research succeeded but result_id missing")
# 2) 작가 단계 (비동기)
await self.transition("working", f"글 생성: {keyword}", task_id)
gen = await service_proxy.blog_generate(keyword_id)
post_id = await self._await_task("generate", gen.get("task_id"), 300)
if not post_id:
raise Exception("generate succeeded but post_id missing")
# 3) 마케터 단계 (비동기)
await self.transition("working", "링크 삽입 중", task_id)
mkt = await service_proxy.blog_market(post_id)
await self._await_task("market", mkt.get("task_id"), 180)
# 4) 평가자 단계 (비동기)
await self.transition("working", "품질 리뷰 중", task_id)
rev = await service_proxy.blog_review(post_id)
await self._await_task("review", rev.get("task_id"), 180)
post_after = await service_proxy.blog_get_post(post_id)
score = post_after.get("review_score")
passed = (score or 0) >= 42
title = post_after.get("title", "(제목 없음)")
excerpt = (post_after.get("body") or "")[:300]
update_task_status(task_id, "pending", {
"keyword": keyword,
"post_id": post_id,
"score": score,
"passed": passed,
"title": title,
})
await self.transition("waiting", f"승인 대기 · {score}/60", task_id)
detail = (
f"키워드: {keyword}\n"
f"제목: {title}\n"
f"평가 점수: {score}/60 ({'통과' if passed else '미통과'})\n\n"
f"{excerpt}..."
)
await telegram_bot.send_approval_request(
self.agent_id, task_id,
"✍️ [블로그 에이전트] 발행 승인 요청", detail,
)
except Exception as e:
add_log(self.agent_id, f"Blog pipeline failed: {e}", "error", task_id)
update_task_status(task_id, "failed", {"error": str(e), "keyword": keyword})
await self.transition("idle", f"오류: {e}")
await telegram_bot.send_task_result(
self.agent_id, "✍️ [블로그 에이전트] 파이프라인 실패",
f"키워드: {keyword}\n오류: {e}",
)
async def on_command(self, command: str, params: dict) -> dict:
if command == "research":
keyword = (params.get("keyword") or "").strip()
if not keyword:
return {"ok": False, "message": "keyword 필수"}
task_id = create_task(
self.agent_id, "auto_blog_pipeline",
{"keyword": keyword}, requires_approval=True,
)
await self.transition("working", f"리서치: {keyword}", task_id)
asyncio.create_task(self._run_pipeline(task_id, keyword))
return {"ok": True, "task_id": task_id, "message": f"파이프라인 시작: {keyword}"}
if command == "add_trend_keyword":
keyword = (params.get("keyword") or "").strip()
if not keyword:
return {"ok": False, "message": "keyword 필수"}
config = get_agent_config(self.agent_id) or {}
custom = config.get("custom_config", {}) or {}
kws = list(custom.get("trend_keywords") or [])
if keyword not in kws:
kws.append(keyword)
from ..db import update_agent_config
update_agent_config(self.agent_id, custom_config={**custom, "trend_keywords": kws})
return {"ok": True, "keywords": kws}
if command == "list_trend_keywords":
config = get_agent_config(self.agent_id) or {}
custom = config.get("custom_config", {}) or {}
return {"ok": True, "keywords": custom.get("trend_keywords") or DEFAULT_TREND_KEYWORDS}
return {"ok": False, "message": f"Unknown command: {command}"}
async def on_approval(self, task_id: str, approved: bool, feedback: str = "") -> None:
task = get_task(task_id)
if not task:
return
result = task.get("result_data") or {}
post_id = result.get("post_id")
if not approved:
reject_task(task_id)
await self.transition("idle", "발행 거절됨")
await telegram_bot.send_task_result(
self.agent_id, "✍️ [블로그 에이전트] 발행 취소",
f"키워드: {result.get('keyword', '')}\n사용자가 거절했습니다.",
)
return
approve_task(task_id, via="telegram")
await self.transition("reporting", "발행 중...", task_id)
try:
if post_id:
await service_proxy.blog_publish(int(post_id))
update_task_status(task_id, "succeeded", {**result, "published": True})
await telegram_bot.send_task_result(
self.agent_id, "✍️ [블로그 에이전트] 발행 완료",
f"키워드: {result.get('keyword', '')}\n제목: {result.get('title', '')}\n"
f"점수: {result.get('score')}/60",
)
await self.transition("idle", "발행 완료")
except Exception as e:
add_log(self.agent_id, f"Blog publish failed: {e}", "error", task_id)
update_task_status(task_id, "failed", {**result, "publish_error": str(e)})
await self.transition("idle", f"발행 오류: {e}")

View File

@@ -1,4 +0,0 @@
__pycache__
*.pyc
.env
data/

View File

@@ -1,15 +0,0 @@
FROM python:3.12-alpine
ENV PYTHONUNBUFFERED=1
WORKDIR /app
RUN apk add --no-cache gcc musl-dev
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -1,15 +0,0 @@
import os
# Anthropic Claude API
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-sonnet-4-20250514")
# Naver Search API
NAVER_CLIENT_ID = os.getenv("NAVER_CLIENT_ID", "")
NAVER_CLIENT_SECRET = os.getenv("NAVER_CLIENT_SECRET", "")
# Database
DB_PATH = os.getenv("BLOG_DB_PATH", "/app/data/blog_marketing.db")
# CORS
CORS_ALLOW_ORIGINS = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080")

View File

@@ -1,172 +0,0 @@
"""Claude API 기반 콘텐츠 생성 — 트렌드 브리프 + 블로그 글 작성."""
import json
import logging
from datetime import date
from typing import Any, Dict, Optional
import anthropic
from .config import ANTHROPIC_API_KEY, CLAUDE_MODEL
from .db import get_template
logger = logging.getLogger(__name__)
_client: Optional[anthropic.Anthropic] = None
def _get_client() -> anthropic.Anthropic:
global _client
if _client is None:
_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
return _client
def _call_claude(prompt: str, max_tokens: int = 4096) -> str:
"""Claude API 호출. 단일 user 메시지. 현재 날짜 시스템 프롬프트 포함."""
client = _get_client()
today = date.today().isoformat()
resp = client.messages.create(
model=CLAUDE_MODEL,
max_tokens=max_tokens,
system=f"현재 날짜는 {today}입니다. 모든 콘텐츠는 이 날짜 기준으로 작성하세요.",
messages=[{"role": "user", "content": prompt}],
)
return resp.content[0].text
def generate_trend_brief(analysis: Dict[str, Any]) -> str:
"""키워드 분석 데이터를 바탕으로 트렌드 브리프 생성."""
template = get_template("trend_brief")
if not template:
raise RuntimeError("trend_brief 템플릿이 없습니다")
top_blogs_text = "\n".join(
f"- {b.get('title', '')}" for b in analysis.get("top_blogs", [])
) or "없음"
top_products_text = "\n".join(
f"- {p.get('title', '')} ({p.get('lprice', '?')}원, {p.get('mallName', '')})"
for p in analysis.get("top_products", [])
) or "없음"
prompt = template.format(
keyword=analysis.get("keyword", ""),
competition=analysis.get("competition", 0),
opportunity=analysis.get("opportunity", 0),
top_blogs=top_blogs_text,
top_products=top_products_text,
)
return _call_claude(prompt)
def _parse_blog_json(raw: str, keyword: str) -> Dict[str, str]:
"""Claude 응답에서 블로그 JSON을 파싱."""
try:
text = raw.strip()
if text.startswith("```"):
lines = text.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
text = "\n".join(lines)
result = json.loads(text)
return {
"title": result.get("title", ""),
"body": result.get("body", ""),
"excerpt": result.get("excerpt", ""),
"tags": result.get("tags", []),
}
except (json.JSONDecodeError, KeyError):
logger.warning("Blog post JSON parse failed, using raw text")
return {
"title": f"{keyword} 추천 리뷰",
"body": raw,
"excerpt": raw[:200],
"tags": [keyword],
}
def generate_blog_post(
analysis: Dict[str, Any],
trend_brief: str,
brand_links: Optional[list] = None,
) -> Dict[str, str]:
"""트렌드 브리프를 바탕으로 블로그 글 작성.
Returns:
{"title": str, "body": str, "excerpt": str, "tags": [...]}
"""
template = get_template("blog_write")
if not template:
raise RuntimeError("blog_write 템플릿이 없습니다")
top_products_text = "\n".join(
f"- {p.get('title', '')} ({p.get('lprice', '?')}원, {p.get('mallName', '')})"
for p in analysis.get("top_products", [])
) or "없음"
# 크롤링된 블로그 본문 참고 자료
reference_blogs_text = ""
for blog in analysis.get("top_blogs", []):
content = blog.get("content", "")
if content:
reference_blogs_text += f"\n### {blog.get('title', '제목 없음')}\n{content}\n"
if not reference_blogs_text:
reference_blogs_text = "없음"
# 브랜드커넥트 링크 정보
brand_products_text = ""
if brand_links:
for link in brand_links:
brand_products_text += (
f"- 상품명: {link.get('product_name', '')}\n"
f" 설명: {link.get('description', '')}\n"
f" 링크: {link.get('url', '')}\n"
f" 배치 힌트: {link.get('placement_hint', '자연스럽게')}\n"
)
if not brand_products_text:
brand_products_text = "없음 (제휴 링크 없이 일반 리뷰로 작성)"
prompt = template.format(
keyword=analysis.get("keyword", ""),
trend_brief=trend_brief,
top_products=top_products_text,
reference_blogs=reference_blogs_text,
brand_products=brand_products_text,
)
# 구조화된 응답을 위한 추가 지시
prompt += (
"\n\n---\n"
"응답은 반드시 아래 JSON 형식으로 해주세요 (JSON만 출력, 다른 텍스트 없이):\n"
'{"title": "블로그 제목", "body": "HTML 본문", "excerpt": "2줄 요약", '
'"tags": ["태그1", "태그2", ...]}'
)
raw = _call_claude(prompt, max_tokens=8192)
return _parse_blog_json(raw, analysis.get("keyword", ""))
def regenerate_blog_post(
analysis: Dict[str, Any],
trend_brief: str,
previous_body: str,
feedback: str,
) -> Dict[str, str]:
"""피드백을 반영하여 블로그 글 재생성."""
prompt = (
"당신은 네이버 블로그에서 월 100만 이상 수익을 올리는 전문 블로거입니다.\n"
f"키워드: {analysis.get('keyword', '')}\n\n"
f"이전에 작성한 글:\n{previous_body[:3000]}\n\n"
f"리뷰어 피드백:\n{feedback}\n\n"
"위 피드백을 반영하여 글을 개선해주세요.\n"
"작성 규칙: 1인칭 체험기, 2,000자 이상, 자연스러운 구어체, "
"제품 비교표 포함, 광고 고지 문구 포함.\n"
"HTML 형식으로 작성하되, 네이버 블로그에서 바로 붙여넣기 가능한 형태로.\n\n"
"---\n"
"응답은 반드시 아래 JSON 형식으로 해주세요 (JSON만 출력):\n"
'{"title": "블로그 제목", "body": "HTML 본문", "excerpt": "2줄 요약", '
'"tags": ["태그1", "태그2", ...]}'
)
raw = _call_claude(prompt, max_tokens=8192)
return _parse_blog_json(raw, analysis.get("keyword", ""))

View File

@@ -1,790 +0,0 @@
import os
import sqlite3
import json
from typing import Any, Dict, List, Optional
from .config import DB_PATH
def _conn() -> sqlite3.Connection:
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH, timeout=120.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=120000")
return conn
def init_db() -> None:
with _conn() as conn:
# 키워드/상품 분석 결과
conn.execute("""
CREATE TABLE IF NOT EXISTS keyword_analyses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT NOT NULL,
blog_total INTEGER NOT NULL DEFAULT 0,
shop_total INTEGER NOT NULL DEFAULT 0,
competition REAL NOT NULL DEFAULT 0,
opportunity REAL NOT NULL DEFAULT 0,
avg_price INTEGER,
min_price INTEGER,
max_price INTEGER,
top_products TEXT NOT NULL DEFAULT '[]',
top_blogs TEXT NOT NULL DEFAULT '[]',
ai_summary TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ka_created ON keyword_analyses(created_at DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ka_keyword ON keyword_analyses(keyword)")
# 블로그 포스트
conn.execute("""
CREATE TABLE IF NOT EXISTS blog_posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword_id INTEGER REFERENCES keyword_analyses(id),
title TEXT NOT NULL DEFAULT '',
body TEXT NOT NULL DEFAULT '',
excerpt TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
status TEXT NOT NULL DEFAULT 'draft',
review_score INTEGER,
review_detail TEXT NOT NULL DEFAULT '{}',
naver_url TEXT NOT NULL DEFAULT '',
trend_brief TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_bp_created ON blog_posts(created_at DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_bp_status ON blog_posts(status)")
# 수익(커미션) 추적
conn.execute("""
CREATE TABLE IF NOT EXISTS commissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER REFERENCES blog_posts(id),
month TEXT NOT NULL,
clicks INTEGER NOT NULL DEFAULT 0,
purchases INTEGER NOT NULL DEFAULT 0,
revenue INTEGER NOT NULL DEFAULT 0,
note TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_comm_month ON commissions(month)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_comm_post ON commissions(post_id)")
# 비동기 작업 상태 (research / generate / review)
conn.execute("""
CREATE TABLE IF NOT EXISTS generation_tasks (
id TEXT PRIMARY KEY,
type TEXT NOT NULL DEFAULT 'research',
status TEXT NOT NULL DEFAULT 'queued',
progress INTEGER NOT NULL DEFAULT 0,
message TEXT NOT NULL DEFAULT '',
result_id INTEGER,
error TEXT,
params TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_gt_created ON generation_tasks(created_at DESC)")
# AI 프롬프트 템플릿
conn.execute("""
CREATE TABLE IF NOT EXISTS prompt_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT NOT NULL DEFAULT '',
template TEXT NOT NULL DEFAULT '',
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
# 브랜드커넥트 제휴 링크
conn.execute("""
CREATE TABLE IF NOT EXISTS brand_links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER REFERENCES blog_posts(id),
keyword_id INTEGER REFERENCES keyword_analyses(id),
url TEXT NOT NULL,
product_name TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
placement_hint TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_bl_post ON brand_links(post_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_bl_keyword ON brand_links(keyword_id)")
# 기본 프롬프트 템플릿 시딩 (존재하지 않을 때만)
_seed_templates(conn)
_migrate_templates(conn)
def _seed_templates(conn: sqlite3.Connection) -> None:
"""기본 프롬프트 템플릿을 DB에 시딩."""
templates = [
{
"name": "trend_brief",
"description": "네이버 블로그 트렌드 분석 + 제목/훅 전략 브리프",
"template": (
"당신은 네이버 블로그 마케팅 전문가입니다.\n"
"아래 키워드 분석 데이터를 바탕으로 블로그 포스팅 전략 브리프를 작성하세요.\n\n"
"키워드: {keyword}\n"
"블로그 경쟁도: {competition} (0-100, 높을수록 경쟁 치열)\n"
"쇼핑 기회 점수: {opportunity} (0-100, 높을수록 기회 큼)\n"
"상위 블로그 제목들: {top_blogs}\n"
"상위 상품들: {top_products}\n\n"
"다음을 포함해주세요:\n"
"1. 클릭을 유도하는 제목 공식 3가지\n"
"2. 도입부 훅 전략 (공감형, 질문형, 충격형 중 추천)\n"
"3. 추천 해시태그 5-10개\n"
"4. 경쟁 분석 요약 (기존 글 대비 차별화 포인트)\n"
"5. SEO 키워드 배치 전략"
),
},
{
"name": "blog_write",
"description": "공감형 1인칭 체험기 블로그 글 작성",
"template": (
"당신은 네이버 블로그에서 월 100만 이상 수익을 올리는 전문 블로거입니다.\n"
"아래 브리프를 바탕으로 블로그 글을 작성하세요.\n\n"
"키워드: {keyword}\n"
"트렌드 브리프: {trend_brief}\n"
"상위 상품 정보: {top_products}\n\n"
"작성 규칙:\n"
"- 1인칭 체험기 형식 (\"제가 직접 써봤는데요\")\n"
"- 1,500자 이상\n"
"- 자연스러운 구어체 (네이버 블로그 톤)\n"
"- 제품 비교표 포함 (마크다운 테이블)\n"
"- 장단점 솔직하게 작성\n"
"- 광고 고지 문구 포함: \"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.\"\n"
"- 추천 매트릭스 (가성비/품질/디자인 기준)\n"
"- 자연스러운 CTA (구매 링크 유도)\n\n"
"HTML 형식으로 작성하되, 네이버 블로그에서 바로 붙여넣기 가능한 형태로 만들어주세요."
),
},
{
"name": "quality_review",
"description": "블로그 글 품질 리뷰 (6기준 × 10점)",
"template": (
"당신은 블로그 콘텐츠 품질 평가 전문가입니다.\n"
"아래 블로그 글을 6가지 기준으로 평가해주세요.\n\n"
"제목: {title}\n"
"본문: {body}\n\n"
"평가 기준 (각 1-10점):\n"
"1. 독자 공감도 (empathy): 1인칭 체험기가 자연스럽고 공감되는가?\n"
"2. 제목 클릭 유도력 (click_appeal): 검색 결과에서 클릭하고 싶은 제목인가?\n"
"3. 구매 전환력 (conversion): 읽고 나서 제품을 사고 싶어지는가?\n"
"4. SEO 최적화 (seo): 키워드 배치, 소제목, 길이가 적절한가?\n"
"5. 형식 완성도 (format): 비교표, 이미지 설명, 단락 구성이 잘 되어있는가?\n"
"6. 링크 자연스러움 (link_natural): 제휴 링크가 광고처럼 느껴지지 않고 자연스럽게 녹아있는가? (링크가 없으면 5점 기본)\n\n"
"JSON 형식으로 응답:\n"
"{{\n"
" \"scores\": {{\n"
" \"empathy\": N,\n"
" \"click_appeal\": N,\n"
" \"conversion\": N,\n"
" \"seo\": N,\n"
" \"format\": N,\n"
" \"link_natural\": N\n"
" }},\n"
" \"total\": N,\n"
" \"pass\": true/false,\n"
" \"feedback\": \"개선 사항 설명\"\n"
"}}"
),
},
{
"name": "marketer_enhance",
"description": "마케터 전환율 강화 + 제휴 링크 삽입",
"template": (
"당신은 네이버 블로그 수익화 전문 마케터입니다.\n"
"아래 블로그 초안에 제휴 링크를 자연스럽게 삽입하고 전환율을 강화하세요.\n\n"
"=== 블로그 초안 ===\n{draft_body}\n\n"
"=== 타겟 키워드 ===\n{keyword}\n\n"
"=== 삽입할 제휴 링크 ===\n{brand_links_info}\n\n"
"작업 규칙:\n"
"- 제휴 링크를 <a href=\"URL\" target=\"_blank\">상품명</a> 형태로 본문 흐름에 맞게 2~3곳 삽입\n"
"- 결론에 CTA(Call-to-Action) 블록 추가 (\"지금 확인하기\" 등)\n"
"- 글 맨 아래에 광고 고지 문구 자동 삽입: \"이 포스팅은 브랜드로부터 소정의 수수료를 받을 수 있습니다\"\n"
"- 작가의 1인칭 톤과 구어체를 유지\n"
"- 과도한 광고 느낌 없이 자연스러운 추천 흐름 유지\n"
"- 구매 심리를 자극하는 표현 강화 (한정 수량, 가격 비교, 실사용 만족도 등)\n"
"- 배치 힌트가 있으면 참고하되, 문맥이 더 자연스러운 위치 우선\n"
"- 기존 본문의 구조와 길이를 크게 변경하지 않음"
),
},
]
for t in templates:
existing = conn.execute(
"SELECT id FROM prompt_templates WHERE name = ?", (t["name"],)
).fetchone()
if not existing:
conn.execute(
"INSERT INTO prompt_templates (name, description, template) VALUES (?, ?, ?)",
(t["name"], t["description"], t["template"]),
)
def _migrate_templates(conn: sqlite3.Connection) -> None:
"""기존 템플릿을 최신 버전으로 업데이트."""
new_blog_write = (
"당신은 네이버 블로그에서 월 100만 이상 수익을 올리는 전문 블로거입니다.\n"
"아래 브리프와 참고 자료를 바탕으로 블로그 글을 작성하세요.\n\n"
"키워드: {keyword}\n"
"트렌드 브리프: {trend_brief}\n\n"
"=== 상위 블로그 참고 자료 ===\n"
"{reference_blogs}\n\n"
"=== 상위 상품 정보 ===\n"
"{top_products}\n\n"
"=== 제휴 상품 (브랜드커넥트 링크) ===\n"
"{brand_products}\n\n"
"작성 규칙:\n"
"- 1인칭 체험기 형식 (\"제가 직접 써봤는데요\")\n"
"- 2,000자 이상\n"
"- 자연스러운 구어체 (네이버 블로그 톤)\n"
"- 상위 블로그 참고하되 표절 금지 (자신만의 시각으로 재구성)\n"
"- 제품 비교표 포함 (HTML 테이블)\n"
"- 장단점 솔직하게 작성\n"
"- 제휴 상품이 있으면 자연스럽게 체험 맥락에 녹여서 작성\n"
"- 제휴 링크는 <a> 태그로 자연스럽게 삽입\n"
"- 추천 매트릭스 (가성비/품질/디자인 기준)\n"
"- 자연스러운 CTA (구매 링크 유도)\n\n"
"HTML 형식으로 작성하되, 네이버 블로그에서 바로 붙여넣기 가능한 형태로 만들어주세요."
)
conn.execute(
"UPDATE prompt_templates SET template = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE name = 'blog_write'",
(new_blog_write,),
)
new_quality_review = (
"당신은 블로그 콘텐츠 품질 평가 전문가입니다.\n"
"아래 블로그 글을 6가지 기준으로 평가해주세요.\n\n"
"제목: {title}\n"
"본문: {body}\n\n"
"평가 기준 (각 1-10점):\n"
"1. 독자 공감도 (empathy): 1인칭 체험기가 자연스럽고 공감되는가?\n"
"2. 제목 클릭 유도력 (click_appeal): 검색 결과에서 클릭하고 싶은 제목인가?\n"
"3. 구매 전환력 (conversion): 읽고 나서 제품을 사고 싶어지는가?\n"
"4. SEO 최적화 (seo): 키워드 배치, 소제목, 길이가 적절한가?\n"
"5. 형식 완성도 (format): 비교표, 이미지 설명, 단락 구성이 잘 되어있는가?\n"
"6. 링크 자연스러움 (link_natural): 제휴 링크가 광고처럼 느껴지지 않고 자연스럽게 녹아있는가? (링크가 없으면 5점 기본)\n\n"
"JSON 형식으로 응답:\n"
"{{\n"
" \"scores\": {{\n"
" \"empathy\": N,\n"
" \"click_appeal\": N,\n"
" \"conversion\": N,\n"
" \"seo\": N,\n"
" \"format\": N,\n"
" \"link_natural\": N\n"
" }},\n"
" \"total\": N,\n"
" \"pass\": true/false,\n"
" \"feedback\": \"개선 사항 설명\"\n"
"}}"
)
conn.execute(
"UPDATE prompt_templates SET template = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE name = 'quality_review'",
(new_quality_review,),
)
# marketer_enhance가 없으면 추가
existing = conn.execute("SELECT id FROM prompt_templates WHERE name = 'marketer_enhance'").fetchone()
if not existing:
conn.execute(
"INSERT INTO prompt_templates (name, description, template) VALUES (?, ?, ?)",
("marketer_enhance", "마케터 전환율 강화 + 제휴 링크 삽입",
"당신은 네이버 블로그 수익화 전문 마케터입니다.\n"
"아래 블로그 초안에 제휴 링크를 자연스럽게 삽입하고 전환율을 강화하세요.\n\n"
"=== 블로그 초안 ===\n{draft_body}\n\n"
"=== 타겟 키워드 ===\n{keyword}\n\n"
"=== 삽입할 제휴 링크 ===\n{brand_links_info}\n\n"
"작업 규칙:\n"
"- 제휴 링크를 <a href=\"URL\" target=\"_blank\">상품명</a> 형태로 본문 흐름에 맞게 2~3곳 삽입\n"
"- 결론에 CTA(Call-to-Action) 블록 추가\n"
"- 글 맨 아래에 광고 고지 문구 자동 삽입\n"
"- 작가의 1인칭 톤과 구어체를 유지\n"
"- 과도한 광고 느낌 없이 자연스러운 추천 흐름 유지"),
)
# ── keyword_analyses CRUD ────────────────────────────────────────────────────
def _ka_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"],
"keyword": r["keyword"],
"blog_total": r["blog_total"],
"shop_total": r["shop_total"],
"competition": r["competition"],
"opportunity": r["opportunity"],
"avg_price": r["avg_price"],
"min_price": r["min_price"],
"max_price": r["max_price"],
"top_products": json.loads(r["top_products"]) if r["top_products"] else [],
"top_blogs": json.loads(r["top_blogs"]) if r["top_blogs"] else [],
"ai_summary": r["ai_summary"],
"created_at": r["created_at"],
}
def add_keyword_analysis(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"""INSERT INTO keyword_analyses
(keyword, blog_total, shop_total, competition, opportunity,
avg_price, min_price, max_price, top_products, top_blogs, ai_summary)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
data.get("keyword", ""),
data.get("blog_total", 0),
data.get("shop_total", 0),
data.get("competition", 0),
data.get("opportunity", 0),
data.get("avg_price"),
data.get("min_price"),
data.get("max_price"),
json.dumps(data.get("top_products", []), ensure_ascii=False),
json.dumps(data.get("top_blogs", []), ensure_ascii=False),
data.get("ai_summary", ""),
),
)
row = conn.execute(
"SELECT * FROM keyword_analyses WHERE rowid = last_insert_rowid()"
).fetchone()
return _ka_row_to_dict(row)
def get_keyword_analysis(analysis_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM keyword_analyses WHERE id = ?", (analysis_id,)
).fetchone()
return _ka_row_to_dict(row) if row else None
def get_keyword_analyses(limit: int = 30) -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM keyword_analyses ORDER BY created_at DESC LIMIT ?", (limit,)
).fetchall()
return [_ka_row_to_dict(r) for r in rows]
def delete_keyword_analysis(analysis_id: int) -> bool:
with _conn() as conn:
row = conn.execute(
"SELECT id FROM keyword_analyses WHERE id = ?", (analysis_id,)
).fetchone()
if not row:
return False
conn.execute("DELETE FROM keyword_analyses WHERE id = ?", (analysis_id,))
return True
# ── blog_posts CRUD ──────────────────────────────────────────────────────────
def _post_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"],
"keyword_id": r["keyword_id"],
"title": r["title"],
"body": r["body"],
"excerpt": r["excerpt"],
"tags": json.loads(r["tags"]) if r["tags"] else [],
"status": r["status"],
"review_score": r["review_score"],
"review_detail": json.loads(r["review_detail"]) if r["review_detail"] else {},
"naver_url": r["naver_url"],
"trend_brief": r["trend_brief"],
"created_at": r["created_at"],
"updated_at": r["updated_at"],
}
def add_post(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"""INSERT INTO blog_posts
(keyword_id, title, body, excerpt, tags, status, review_score,
review_detail, naver_url, trend_brief)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
data.get("keyword_id"),
data.get("title", ""),
data.get("body", ""),
data.get("excerpt", ""),
json.dumps(data.get("tags", []), ensure_ascii=False),
data.get("status", "draft"),
data.get("review_score"),
json.dumps(data.get("review_detail", {}), ensure_ascii=False),
data.get("naver_url", ""),
data.get("trend_brief", ""),
),
)
row = conn.execute(
"SELECT * FROM blog_posts WHERE rowid = last_insert_rowid()"
).fetchone()
return _post_row_to_dict(row)
def get_post(post_id: int) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM blog_posts WHERE id = ?", (post_id,)
).fetchone()
return _post_row_to_dict(row) if row else None
def get_posts(status: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
with _conn() as conn:
if status:
rows = conn.execute(
"SELECT * FROM blog_posts WHERE status = ? ORDER BY created_at DESC LIMIT ?",
(status, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM blog_posts ORDER BY created_at DESC LIMIT ?", (limit,)
).fetchall()
return [_post_row_to_dict(r) for r in rows]
def update_post(post_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
with _conn() as conn:
fields = []
values = []
for k in ("title", "body", "excerpt", "status", "naver_url", "trend_brief"):
if k in data:
fields.append(f"{k} = ?")
values.append(data[k])
if "tags" in data:
fields.append("tags = ?")
values.append(json.dumps(data["tags"], ensure_ascii=False))
if "review_score" in data:
fields.append("review_score = ?")
values.append(data["review_score"])
if "review_detail" in data:
fields.append("review_detail = ?")
values.append(json.dumps(data["review_detail"], ensure_ascii=False))
if not fields:
return get_post(post_id)
fields.append("updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')")
values.append(post_id)
conn.execute(
f"UPDATE blog_posts SET {', '.join(fields)} WHERE id = ?", values
)
row = conn.execute(
"SELECT * FROM blog_posts WHERE id = ?", (post_id,)
).fetchone()
return _post_row_to_dict(row) if row else None
def delete_post(post_id: int) -> bool:
with _conn() as conn:
row = conn.execute(
"SELECT id FROM blog_posts WHERE id = ?", (post_id,)
).fetchone()
if not row:
return False
conn.execute("DELETE FROM blog_posts WHERE id = ?", (post_id,))
return True
# ── commissions CRUD ─────────────────────────────────────────────────────────
def _comm_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"],
"post_id": r["post_id"],
"month": r["month"],
"clicks": r["clicks"],
"purchases": r["purchases"],
"revenue": r["revenue"],
"note": r["note"],
"created_at": r["created_at"],
}
def add_commission(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"""INSERT INTO commissions (post_id, month, clicks, purchases, revenue, note)
VALUES (?, ?, ?, ?, ?, ?)""",
(
data.get("post_id"),
data.get("month", ""),
data.get("clicks", 0),
data.get("purchases", 0),
data.get("revenue", 0),
data.get("note", ""),
),
)
row = conn.execute(
"SELECT * FROM commissions WHERE rowid = last_insert_rowid()"
).fetchone()
return _comm_row_to_dict(row)
def get_commissions(post_id: Optional[int] = None, limit: int = 100) -> List[Dict[str, Any]]:
with _conn() as conn:
if post_id:
rows = conn.execute(
"SELECT * FROM commissions WHERE post_id = ? ORDER BY month DESC LIMIT ?",
(post_id, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM commissions ORDER BY month DESC LIMIT ?", (limit,)
).fetchall()
return [_comm_row_to_dict(r) for r in rows]
def update_commission(comm_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
with _conn() as conn:
fields = []
values = []
for k in ("month", "clicks", "purchases", "revenue", "note"):
if k in data:
fields.append(f"{k} = ?")
values.append(data[k])
if not fields:
return None
values.append(comm_id)
conn.execute(
f"UPDATE commissions SET {', '.join(fields)} WHERE id = ?", values
)
row = conn.execute(
"SELECT * FROM commissions WHERE id = ?", (comm_id,)
).fetchone()
return _comm_row_to_dict(row) if row else None
def delete_commission(comm_id: int) -> bool:
with _conn() as conn:
row = conn.execute(
"SELECT id FROM commissions WHERE id = ?", (comm_id,)
).fetchone()
if not row:
return False
conn.execute("DELETE FROM commissions WHERE id = ?", (comm_id,))
return True
# ── brand_links CRUD ────────────────────────────────────────────────────────
def _bl_row_to_dict(r) -> Dict[str, Any]:
return {
"id": r["id"],
"post_id": r["post_id"],
"keyword_id": r["keyword_id"],
"url": r["url"],
"product_name": r["product_name"],
"description": r["description"],
"placement_hint": r["placement_hint"],
"created_at": r["created_at"],
}
def add_brand_link(data: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"""INSERT INTO brand_links (post_id, keyword_id, url, product_name, description, placement_hint)
VALUES (?, ?, ?, ?, ?, ?)""",
(
data.get("post_id"),
data.get("keyword_id"),
data.get("url", ""),
data.get("product_name", ""),
data.get("description", ""),
data.get("placement_hint", ""),
),
)
row = conn.execute(
"SELECT * FROM brand_links WHERE rowid = last_insert_rowid()"
).fetchone()
return _bl_row_to_dict(row)
def get_brand_links(
post_id: Optional[int] = None,
keyword_id: Optional[int] = None,
) -> List[Dict[str, Any]]:
with _conn() as conn:
if post_id is not None:
rows = conn.execute(
"SELECT * FROM brand_links WHERE post_id = ? ORDER BY id", (post_id,)
).fetchall()
elif keyword_id is not None:
rows = conn.execute(
"SELECT * FROM brand_links WHERE keyword_id = ? ORDER BY id", (keyword_id,)
).fetchall()
else:
rows = conn.execute("SELECT * FROM brand_links ORDER BY id DESC LIMIT 100").fetchall()
return [_bl_row_to_dict(r) for r in rows]
def update_brand_link(link_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
with _conn() as conn:
fields = []
values = []
for k in ("post_id", "keyword_id", "url", "product_name", "description", "placement_hint"):
if k in data:
fields.append(f"{k} = ?")
values.append(data[k])
if not fields:
row = conn.execute("SELECT * FROM brand_links WHERE id = ?", (link_id,)).fetchone()
return _bl_row_to_dict(row) if row else None
values.append(link_id)
conn.execute(f"UPDATE brand_links SET {', '.join(fields)} WHERE id = ?", values)
row = conn.execute("SELECT * FROM brand_links WHERE id = ?", (link_id,)).fetchone()
return _bl_row_to_dict(row) if row else None
def delete_brand_link(link_id: int) -> bool:
with _conn() as conn:
row = conn.execute("SELECT id FROM brand_links WHERE id = ?", (link_id,)).fetchone()
if not row:
return False
conn.execute("DELETE FROM brand_links WHERE id = ?", (link_id,))
return True
def link_brand_links_to_post(keyword_id: int, post_id: int) -> None:
"""keyword_id로 등록된 링크들을 post_id에도 연결."""
with _conn() as conn:
conn.execute(
"UPDATE brand_links SET post_id = ? WHERE keyword_id = ? AND post_id IS NULL",
(post_id, keyword_id),
)
def get_dashboard_stats() -> Dict[str, Any]:
"""대시보드 집계: 총 포스트/클릭/구매/수익 + 월별 추이."""
with _conn() as conn:
total_posts = conn.execute("SELECT COUNT(*) FROM blog_posts").fetchone()[0]
published = conn.execute(
"SELECT COUNT(*) FROM blog_posts WHERE status = 'published'"
).fetchone()[0]
agg = conn.execute(
"SELECT COALESCE(SUM(clicks),0), COALESCE(SUM(purchases),0), COALESCE(SUM(revenue),0) FROM commissions"
).fetchone()
monthly = conn.execute(
"""SELECT month, SUM(clicks) as clicks, SUM(purchases) as purchases, SUM(revenue) as revenue
FROM commissions GROUP BY month ORDER BY month DESC LIMIT 12"""
).fetchall()
top_posts = conn.execute(
"""SELECT bp.id, bp.title, COALESCE(SUM(c.revenue),0) as total_revenue
FROM blog_posts bp LEFT JOIN commissions c ON c.post_id = bp.id
GROUP BY bp.id ORDER BY total_revenue DESC LIMIT 5"""
).fetchall()
return {
"total_posts": total_posts,
"published_posts": published,
"total_clicks": agg[0],
"total_purchases": agg[1],
"total_revenue": agg[2],
"monthly": [
{"month": r["month"], "clicks": r["clicks"], "purchases": r["purchases"], "revenue": r["revenue"]}
for r in monthly
],
"top_posts": [
{"id": r["id"], "title": r["title"], "total_revenue": r["total_revenue"]}
for r in top_posts
],
}
# ── generation_tasks CRUD ────────────────────────────────────────────────────
def _task_row_to_dict(r) -> Dict[str, Any]:
return {
"task_id": r["id"],
"type": r["type"],
"status": r["status"],
"progress": r["progress"],
"message": r["message"],
"result_id": r["result_id"],
"error": r["error"],
"params": json.loads(r["params"]) if r["params"] else {},
"created_at": r["created_at"],
"updated_at": r["updated_at"],
}
def create_task(task_id: str, task_type: str, params: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO generation_tasks (id, type, params) VALUES (?, ?, ?)",
(task_id, task_type, json.dumps(params, ensure_ascii=False)),
)
row = conn.execute(
"SELECT * FROM generation_tasks WHERE id = ?", (task_id,)
).fetchone()
return _task_row_to_dict(row)
def update_task(
task_id: str,
status: str,
progress: int,
message: str,
result_id: Optional[int] = None,
error: Optional[str] = None,
) -> None:
with _conn() as conn:
conn.execute(
"""UPDATE generation_tasks
SET status = ?, progress = ?, message = ?, result_id = ?, error = ?,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id = ?""",
(status, progress, message, result_id, error, task_id),
)
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM generation_tasks WHERE id = ?", (task_id,)
).fetchone()
return _task_row_to_dict(row) if row else None
# ── prompt_templates CRUD ────────────────────────────────────────────────────
def get_template(name: str) -> Optional[str]:
with _conn() as conn:
row = conn.execute(
"SELECT template FROM prompt_templates WHERE name = ?", (name,)
).fetchone()
return row["template"] if row else None
def get_all_templates() -> List[Dict[str, Any]]:
with _conn() as conn:
rows = conn.execute("SELECT * FROM prompt_templates ORDER BY name").fetchall()
return [
{"id": r["id"], "name": r["name"], "description": r["description"],
"template": r["template"], "updated_at": r["updated_at"]}
for r in rows
]
def update_template(name: str, template: str) -> bool:
with _conn() as conn:
conn.execute(
"UPDATE prompt_templates SET template = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE name = ?",
(template, name),
)
return conn.execute(
"SELECT id FROM prompt_templates WHERE name = ?", (name,)
).fetchone() is not None

View File

@@ -1,440 +0,0 @@
import os
import uuid
import logging
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from .config import CORS_ALLOW_ORIGINS, NAVER_CLIENT_ID, ANTHROPIC_API_KEY
from .db import (
init_db,
get_keyword_analyses, get_keyword_analysis, delete_keyword_analysis,
add_keyword_analysis,
get_posts, get_post, add_post, update_post, delete_post,
get_commissions, add_commission, update_commission, delete_commission,
get_dashboard_stats,
get_task, create_task, update_task,
add_brand_link, get_brand_links, update_brand_link, delete_brand_link,
link_brand_links_to_post,
)
from .naver_search import analyze_keyword_with_crawling
from .content_generator import generate_trend_brief, generate_blog_post, regenerate_blog_post
from .quality_reviewer import review_post
from .marketer import enhance_for_conversion
logger = logging.getLogger(__name__)
app = FastAPI()
_cors_origins = CORS_ALLOW_ORIGINS.split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
def on_startup():
init_db()
os.makedirs("/app/data", exist_ok=True)
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/blog-marketing/status")
def service_status():
"""서비스 상태 및 설정 현황."""
return {
"ok": True,
"naver_api": bool(NAVER_CLIENT_ID),
"claude_api": bool(ANTHROPIC_API_KEY),
}
# ── 키워드 분석 API ──────────────────────────────────────────────────────────
class ResearchRequest(BaseModel):
keyword: str
def _run_research(task_id: str, keyword: str):
"""BackgroundTask: 네이버 검색 → 키워드 분석 → DB 저장."""
try:
update_task(task_id, "processing", 30, "네이버 검색 중...")
result = analyze_keyword_with_crawling(keyword)
update_task(task_id, "processing", 80, "분석 결과 저장 중...")
saved = add_keyword_analysis(result)
update_task(task_id, "succeeded", 100, "분석 완료", result_id=saved["id"])
except Exception as e:
logger.exception("Research failed for keyword=%s", keyword)
update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/blog-marketing/research")
def start_research(req: ResearchRequest, background_tasks: BackgroundTasks):
"""키워드 분석 시작 (BackgroundTask). task_id 즉시 반환."""
if not NAVER_CLIENT_ID:
raise HTTPException(status_code=400, detail="Naver API 키가 설정되지 않았습니다")
if not req.keyword.strip():
raise HTTPException(status_code=400, detail="키워드를 입력하세요")
task_id = str(uuid.uuid4())
create_task(task_id, "research", {"keyword": req.keyword.strip()})
background_tasks.add_task(_run_research, task_id, req.keyword.strip())
return {"task_id": task_id}
@app.get("/api/blog-marketing/research/history")
def list_research(limit: int = Query(30, ge=1, le=100)):
return {"analyses": get_keyword_analyses(limit)}
@app.get("/api/blog-marketing/research/{analysis_id}")
def get_research(analysis_id: int):
result = get_keyword_analysis(analysis_id)
if not result:
raise HTTPException(status_code=404, detail="Analysis not found")
return result
@app.delete("/api/blog-marketing/research/{analysis_id}")
def remove_research(analysis_id: int):
if not delete_keyword_analysis(analysis_id):
raise HTTPException(status_code=404, detail="Analysis not found")
return {"ok": True}
# ── 작업 상태 폴링 API ──────────────────────────────────────────────────────
@app.get("/api/blog-marketing/task/{task_id}")
def get_task_status(task_id: str):
task = get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
# ── AI 글 생성 API ──────────────────────────────────────────────────────────
class GenerateRequest(BaseModel):
keyword_id: int # keyword_analyses.id
class LinkRequest(BaseModel):
url: str
product_name: str
keyword_id: Optional[int] = None
post_id: Optional[int] = None
description: str = ""
placement_hint: str = ""
def _run_generate(task_id: str, keyword_id: int):
"""BackgroundTask: 트렌드 브리프 → 블로그 글 생성 → DB 저장."""
try:
analysis = get_keyword_analysis(keyword_id)
if not analysis:
update_task(task_id, "failed", 0, "", error="키워드 분석 결과를 찾을 수 없습니다")
return
# 연결된 브랜드커넥트 링크 조회
brand_links = get_brand_links(keyword_id=keyword_id)
update_task(task_id, "processing", 20, "트렌드 브리프 생성 중...")
trend_brief = generate_trend_brief(analysis)
update_task(task_id, "processing", 60, "블로그 글 작성 중...")
post_data = generate_blog_post(analysis, trend_brief, brand_links=brand_links)
update_task(task_id, "processing", 90, "저장 중...")
saved = add_post({
"keyword_id": keyword_id,
"title": post_data["title"],
"body": post_data["body"],
"excerpt": post_data["excerpt"],
"tags": post_data["tags"],
"status": "draft",
"trend_brief": trend_brief,
})
# keyword_id에 연결된 링크를 post_id에도 연결
link_brand_links_to_post(keyword_id=keyword_id, post_id=saved["id"])
update_task(task_id, "succeeded", 100, "글 생성 완료", result_id=saved["id"])
except Exception as e:
logger.exception("Generate failed for keyword_id=%s", keyword_id)
update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/blog-marketing/generate")
def start_generate(req: GenerateRequest, background_tasks: BackgroundTasks):
"""AI 블로그 글 생성 시작. task_id 즉시 반환."""
if not ANTHROPIC_API_KEY:
raise HTTPException(status_code=400, detail="Claude API 키가 설정되지 않았습니다")
analysis = get_keyword_analysis(req.keyword_id)
if not analysis:
raise HTTPException(status_code=404, detail="키워드 분석 결과를 찾을 수 없습니다")
task_id = str(uuid.uuid4())
create_task(task_id, "generate", {"keyword_id": req.keyword_id})
background_tasks.add_task(_run_generate, task_id, req.keyword_id)
return {"task_id": task_id}
# ── 품질 리뷰 API ───────────────────────────────────────────────────────────
def _run_review(task_id: str, post_id: int):
"""BackgroundTask: 블로그 글 품질 리뷰."""
try:
post = get_post(post_id)
if not post:
update_task(task_id, "failed", 0, "", error="포스트를 찾을 수 없습니다")
return
update_task(task_id, "processing", 50, "품질 리뷰 중...")
result = review_post(post["title"], post["body"])
update_post(post_id, {
"review_score": result["total"],
"review_detail": result,
"status": "reviewed" if result["pass"] else "draft",
})
update_task(task_id, "succeeded", 100, "리뷰 완료", result_id=post_id)
except Exception as e:
logger.exception("Review failed for post_id=%s", post_id)
update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/blog-marketing/review/{post_id}")
def start_review(post_id: int, background_tasks: BackgroundTasks):
"""블로그 글 품질 리뷰 시작. task_id 즉시 반환."""
if not ANTHROPIC_API_KEY:
raise HTTPException(status_code=400, detail="Claude API 키가 설정되지 않았습니다")
post = get_post(post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
task_id = str(uuid.uuid4())
create_task(task_id, "review", {"post_id": post_id})
background_tasks.add_task(_run_review, task_id, post_id)
return {"task_id": task_id}
# ── 재생성 API ───────────────────────────────────────────────────────────────
def _run_regenerate(task_id: str, post_id: int):
"""BackgroundTask: 피드백 기반 블로그 글 재생성."""
try:
post = get_post(post_id)
if not post:
update_task(task_id, "failed", 0, "", error="포스트를 찾을 수 없습니다")
return
analysis = get_keyword_analysis(post["keyword_id"]) if post["keyword_id"] else {}
feedback = post.get("review_detail", {}).get("feedback", "개선이 필요합니다")
update_task(task_id, "processing", 50, "글 재생성 중...")
result = regenerate_blog_post(
analysis or {"keyword": ""},
post.get("trend_brief", ""),
post["body"],
feedback,
)
update_post(post_id, {
"title": result["title"],
"body": result["body"],
"excerpt": result["excerpt"],
"tags": result["tags"],
"status": "draft",
"review_score": None,
"review_detail": {},
})
update_task(task_id, "succeeded", 100, "재생성 완료", result_id=post_id)
except Exception as e:
logger.exception("Regenerate failed for post_id=%s", post_id)
update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/blog-marketing/regenerate/{post_id}")
def start_regenerate(post_id: int, background_tasks: BackgroundTasks):
"""피드백 기반 블로그 글 재생성. task_id 즉시 반환."""
if not ANTHROPIC_API_KEY:
raise HTTPException(status_code=400, detail="Claude API 키가 설정되지 않았습니다")
post = get_post(post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
task_id = str(uuid.uuid4())
create_task(task_id, "regenerate", {"post_id": post_id})
background_tasks.add_task(_run_regenerate, task_id, post_id)
return {"task_id": task_id}
# ── 포스트 CRUD API ──────────────────────────────────────────────────────────
@app.get("/api/blog-marketing/posts")
def list_posts(status: str = None, limit: int = Query(50, ge=1, le=100)):
return {"posts": get_posts(status=status, limit=limit)}
@app.get("/api/blog-marketing/posts/{post_id}")
def get_post_detail(post_id: int):
post = get_post(post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post
@app.put("/api/blog-marketing/posts/{post_id}")
def edit_post(post_id: int, data: dict):
result = update_post(post_id, data)
if not result:
raise HTTPException(status_code=404, detail="Post not found")
return result
@app.delete("/api/blog-marketing/posts/{post_id}")
def remove_post(post_id: int):
if not delete_post(post_id):
raise HTTPException(status_code=404, detail="Post not found")
return {"ok": True}
@app.post("/api/blog-marketing/posts/{post_id}/publish")
def publish_post(post_id: int, data: dict = None):
"""네이버 URL 등록 + 상태를 published로 변경."""
naver_url = (data or {}).get("naver_url", "")
result = update_post(post_id, {"status": "published", "naver_url": naver_url})
if not result:
raise HTTPException(status_code=404, detail="Post not found")
return result
# ── 브랜드커넥트 링크 API ──────────────────────────────────────────────────
@app.post("/api/blog-marketing/links", status_code=201)
def create_link(req: LinkRequest):
return add_brand_link(req.model_dump())
@app.get("/api/blog-marketing/links")
def list_links(post_id: int = None, keyword_id: int = None):
return {"links": get_brand_links(post_id=post_id, keyword_id=keyword_id)}
@app.put("/api/blog-marketing/links/{link_id}")
def edit_link(link_id: int, data: dict):
result = update_brand_link(link_id, data)
if not result:
raise HTTPException(status_code=404, detail="Link not found")
return result
@app.delete("/api/blog-marketing/links/{link_id}")
def remove_link(link_id: int):
if not delete_brand_link(link_id):
raise HTTPException(status_code=404, detail="Link not found")
return {"ok": True}
# ── 마케터 API ──────────────────────────────────────────────────────────────
def _run_market(task_id: str, post_id: int):
"""BackgroundTask: 마케터 전환율 강화."""
try:
post = get_post(post_id)
if not post:
update_task(task_id, "failed", 0, "", error="포스트를 찾을 수 없습니다")
return
brand_links = get_brand_links(post_id=post_id)
if not brand_links and post.get("keyword_id"):
brand_links = get_brand_links(keyword_id=post["keyword_id"])
if not brand_links:
update_task(task_id, "failed", 0, "", error="브랜드커넥트 링크가 없습니다. 먼저 링크를 등록하세요.")
return
analysis = get_keyword_analysis(post["keyword_id"]) if post.get("keyword_id") else {}
keyword = (analysis or {}).get("keyword", "")
update_task(task_id, "processing", 50, "마케터가 전환율 강화 중...")
result = enhance_for_conversion(
post_body=post["body"],
post_title=post["title"],
brand_links=brand_links,
keyword=keyword,
)
update_post(post_id, {
"title": result["title"],
"body": result["body"],
"excerpt": result["excerpt"],
"status": "marketed",
})
update_task(task_id, "succeeded", 100, "마케팅 강화 완료", result_id=post_id)
except Exception as e:
logger.exception("Market failed for post_id=%s", post_id)
update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/blog-marketing/market/{post_id}")
def start_market(post_id: int, background_tasks: BackgroundTasks):
"""마케터 단계 실행. task_id 즉시 반환."""
if not ANTHROPIC_API_KEY:
raise HTTPException(status_code=400, detail="Claude API 키가 설정되지 않았습니다")
post = get_post(post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
task_id = str(uuid.uuid4())
create_task(task_id, "market", {"post_id": post_id})
background_tasks.add_task(_run_market, task_id, post_id)
return {"task_id": task_id}
# ── 수익 추적 API ────────────────────────────────────────────────────────────
@app.get("/api/blog-marketing/commissions")
def list_commissions(post_id: int = None, limit: int = Query(100, ge=1, le=100)):
return {"commissions": get_commissions(post_id=post_id, limit=limit)}
@app.post("/api/blog-marketing/commissions", status_code=201)
def create_commission(data: dict):
return add_commission(data)
@app.put("/api/blog-marketing/commissions/{comm_id}")
def edit_commission(comm_id: int, data: dict):
result = update_commission(comm_id, data)
if not result:
raise HTTPException(status_code=404, detail="Commission not found")
return result
@app.delete("/api/blog-marketing/commissions/{comm_id}")
def remove_commission(comm_id: int):
if not delete_commission(comm_id):
raise HTTPException(status_code=404, detail="Commission not found")
return {"ok": True}
# ── 대시보드 API ─────────────────────────────────────────────────────────────
@app.get("/api/blog-marketing/dashboard")
def dashboard():
return get_dashboard_stats()

View File

@@ -1,105 +0,0 @@
"""마케터 단계 — 전환율 강화 + 브랜드커넥트 링크 삽입."""
import json
import logging
from datetime import date
from typing import Any, Dict, List, Optional
import anthropic
from .config import ANTHROPIC_API_KEY, CLAUDE_MODEL
from .db import get_template
logger = logging.getLogger(__name__)
_client: Optional[anthropic.Anthropic] = None
def _get_client() -> anthropic.Anthropic:
global _client
if _client is None:
_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
return _client
def _call_claude(prompt: str, max_tokens: int = 8192) -> str:
client = _get_client()
today = date.today().isoformat()
resp = client.messages.create(
model=CLAUDE_MODEL,
max_tokens=max_tokens,
system=f"현재 날짜는 {today}입니다. 모든 콘텐츠는 이 날짜 기준으로 작성하세요.",
messages=[{"role": "user", "content": prompt}],
)
return resp.content[0].text
def enhance_for_conversion(
post_body: str,
post_title: str,
brand_links: List[Dict[str, Any]],
keyword: str,
) -> Dict[str, str]:
"""초안에 제휴 링크를 자연스럽게 삽입하고 전환율을 강화.
Args:
post_body: 작가 초안 HTML 본문
post_title: 작가 초안 제목
brand_links: 브랜드커넥트 링크 리스트
keyword: 타겟 키워드
Returns:
{"title": str, "body": str, "excerpt": str}
Raises:
ValueError: 브랜드 링크가 없을 때
"""
if not brand_links:
raise ValueError("브랜드커넥트 링크가 필요합니다")
template = get_template("marketer_enhance")
if not template:
raise RuntimeError("marketer_enhance 템플릿이 없습니다")
brand_links_text = ""
for i, link in enumerate(brand_links, 1):
brand_links_text += (
f"{i}. 상품명: {link.get('product_name', '')}\n"
f" 설명: {link.get('description', '')}\n"
f" URL: {link.get('url', '')}\n"
f" 배치 힌트: {link.get('placement_hint', '자연스럽게')}\n\n"
)
prompt = template.format(
draft_body=post_body[:6000],
keyword=keyword,
brand_links_info=brand_links_text,
)
prompt += (
"\n\n---\n"
"응답은 반드시 아래 JSON 형식으로 해주세요 (JSON만 출력):\n"
'{"title": "개선된 제목", "body": "개선된 HTML 본문", "excerpt": "2줄 요약"}'
)
raw = _call_claude(prompt)
try:
text = raw.strip()
if text.startswith("```"):
lines = text.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
text = "\n".join(lines)
result = json.loads(text)
return {
"title": result.get("title", post_title),
"body": result.get("body", post_body),
"excerpt": result.get("excerpt", ""),
}
except (json.JSONDecodeError, KeyError):
logger.warning("Marketer JSON parse failed, using raw text")
return {
"title": post_title,
"body": raw,
"excerpt": raw[:200],
}

View File

@@ -1,203 +0,0 @@
"""네이버 검색 API 연동 — 블로그 + 쇼핑 검색."""
import asyncio
import logging
import re
import requests
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
from .config import NAVER_CLIENT_ID, NAVER_CLIENT_SECRET
BLOG_URL = "https://openapi.naver.com/v1/search/blog.json"
SHOP_URL = "https://openapi.naver.com/v1/search/shop.json"
_HEADERS = {
"X-Naver-Client-Id": NAVER_CLIENT_ID,
"X-Naver-Client-Secret": NAVER_CLIENT_SECRET,
}
_TAG_RE = re.compile(r"<[^>]+>")
def _strip_html(text: str) -> str:
return _TAG_RE.sub("", text).strip()
def search_blog(keyword: str, display: int = 10, sort: str = "sim") -> Dict[str, Any]:
"""네이버 블로그 검색.
Args:
keyword: 검색 키워드
display: 결과 수 (1-100)
sort: sim(정확도) | date(날짜)
Returns:
{"total": int, "items": [...]}
"""
resp = requests.get(
BLOG_URL,
headers=_HEADERS,
params={"query": keyword, "display": display, "sort": sort},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
items = [
{
"title": _strip_html(item.get("title", "")),
"description": _strip_html(item.get("description", "")),
"link": item.get("link", ""),
"bloggername": item.get("bloggername", ""),
"postdate": item.get("postdate", ""),
}
for item in data.get("items", [])
]
return {"total": data.get("total", 0), "items": items}
def search_shopping(keyword: str, display: int = 20, sort: str = "sim") -> Dict[str, Any]:
"""네이버 쇼핑 검색.
Args:
keyword: 검색 키워드
display: 결과 수 (1-100)
sort: sim(정확도) | date(날짜) | asc(가격↑) | dsc(가격↓)
Returns:
{"total": int, "items": [...], "price_stats": {...}}
"""
resp = requests.get(
SHOP_URL,
headers=_HEADERS,
params={"query": keyword, "display": display, "sort": sort},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
items = []
prices = []
for item in data.get("items", []):
lprice = _safe_int(item.get("lprice"))
hprice = _safe_int(item.get("hprice"))
parsed = {
"title": _strip_html(item.get("title", "")),
"link": item.get("link", ""),
"image": item.get("image", ""),
"lprice": lprice,
"hprice": hprice,
"mallName": item.get("mallName", ""),
"productId": item.get("productId", ""),
"productType": item.get("productType", ""),
"category1": item.get("category1", ""),
"category2": item.get("category2", ""),
"category3": item.get("category3", ""),
"brand": item.get("brand", ""),
"maker": item.get("maker", ""),
}
items.append(parsed)
if lprice and lprice > 0:
prices.append(lprice)
price_stats = None
if prices:
price_stats = {
"min": min(prices),
"max": max(prices),
"avg": int(sum(prices) / len(prices)),
"count": len(prices),
}
return {
"total": data.get("total", 0),
"items": items,
"price_stats": price_stats,
}
def _safe_int(val) -> Optional[int]:
if val is None:
return None
try:
return int(val)
except (ValueError, TypeError):
return None
def analyze_keyword(keyword: str) -> Dict[str, Any]:
"""키워드 경쟁도/기회 분석.
블로그 총 결과수, 쇼핑 총 결과수, 가격 통계를 기반으로
competition_score(경쟁도)와 opportunity_score(기회점수) 산출.
Returns:
{
"keyword", "blog_total", "shop_total",
"competition", "opportunity",
"avg_price", "min_price", "max_price",
"top_products": [...], "top_blogs": [...]
}
"""
blog = search_blog(keyword, display=10, sort="sim")
shop = search_shopping(keyword, display=20, sort="sim")
blog_total = blog["total"]
shop_total = shop["total"]
# 경쟁도: 블로그 결과 수 기반 (로그 스케일 0-100)
import math
if blog_total > 0:
competition = min(100, int(math.log10(blog_total + 1) * 15))
else:
competition = 0
# 기회 점수: 쇼핑 수요가 높고 블로그 경쟁이 낮을수록 높음
if shop_total > 0 and blog_total > 0:
ratio = shop_total / blog_total
opportunity = min(100, int(ratio * 20))
elif shop_total > 0:
opportunity = 90 # 경쟁 없이 수요만 있으면 높은 기회
else:
opportunity = 10 # 쇼핑 수요 없음
price_stats = shop.get("price_stats") or {}
return {
"keyword": keyword,
"blog_total": blog_total,
"shop_total": shop_total,
"competition": competition,
"opportunity": opportunity,
"avg_price": price_stats.get("avg"),
"min_price": price_stats.get("min"),
"max_price": price_stats.get("max"),
"top_products": shop["items"][:5],
"top_blogs": blog["items"][:5],
}
def _run_enrich(top_blogs: list) -> list:
"""동기 컨텍스트에서 비동기 enrich_top_blogs 실행."""
from .web_crawler import enrich_top_blogs
try:
loop = asyncio.get_event_loop()
if loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
return pool.submit(
asyncio.run, enrich_top_blogs(top_blogs)
).result(timeout=60)
else:
return asyncio.run(enrich_top_blogs(top_blogs))
except Exception as e:
logger.warning("블로그 크롤링 실패, 기존 데이터 사용: %s", e)
return top_blogs
def analyze_keyword_with_crawling(keyword: str) -> Dict[str, Any]:
"""analyze_keyword + 상위 블로그 본문 크롤링."""
result = analyze_keyword(keyword)
result["top_blogs"] = _run_enrich(result["top_blogs"])
return result

View File

@@ -1,85 +0,0 @@
"""Claude API 기반 블로그 글 품질 리뷰 — 6기준 × 10점, 42/60 통과."""
import json
import logging
from datetime import date
from typing import Any, Dict, Optional
import anthropic
from .config import ANTHROPIC_API_KEY, CLAUDE_MODEL
from .db import get_template
logger = logging.getLogger(__name__)
PASS_THRESHOLD = 42 # 60점 만점 중 42점 이상이면 통과 (70%)
_client: Optional[anthropic.Anthropic] = None
def _get_client() -> anthropic.Anthropic:
global _client
if _client is None:
_client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
return _client
def review_post(title: str, body: str) -> Dict[str, Any]:
"""블로그 글 품질 리뷰.
Returns:
{
"scores": {
"empathy": N, "click_appeal": N, "conversion": N,
"seo": N, "format": N, "link_natural": N
},
"total": N,
"pass": bool,
"feedback": str
}
"""
template = get_template("quality_review")
if not template:
raise RuntimeError("quality_review 템플릿이 없습니다")
prompt = template.format(title=title, body=body[:6000])
client = _get_client()
today = date.today().isoformat()
resp = client.messages.create(
model=CLAUDE_MODEL,
max_tokens=2048,
system=f"현재 날짜는 {today}입니다.",
messages=[{"role": "user", "content": prompt}],
)
raw = resp.content[0].text
try:
text = raw.strip()
if text.startswith("```"):
lines = text.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
text = "\n".join(lines)
result = json.loads(text)
scores = result.get("scores", {})
total = sum(scores.values())
passed = total >= PASS_THRESHOLD
return {
"scores": scores,
"total": total,
"pass": passed,
"feedback": result.get("feedback", ""),
}
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.warning("Quality review JSON parse failed: %s", e)
return {
"scores": {
"empathy": 0, "click_appeal": 0, "conversion": 0,
"seo": 0, "format": 0, "link_natural": 0,
},
"total": 0,
"pass": False,
"feedback": f"리뷰 파싱 실패. 원본 응답:\n{raw[:500]}",
}

View File

@@ -1,97 +0,0 @@
"""네이버 블로그 본문 크롤링 모듈."""
import asyncio
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
import httpx
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
_TIMEOUT = 10 # 글당 크롤링 타임아웃 (초)
_MAX_CONTENT_LENGTH = 2000 # 본문 최대 길이
# 네이버 블로그 URL 패턴: blog.naver.com/{blogId}/{logNo}
_BLOG_URL_RE = re.compile(r"blog\.naver\.com/([^/]+)/(\d+)")
def _parse_naver_blog_url(url: str) -> Optional[Tuple[str, str]]:
"""네이버 블로그 URL에서 blogId, logNo 추출. 실패 시 None."""
match = _BLOG_URL_RE.search(url)
if not match:
return None
return match.group(1), match.group(2)
async def _fetch_html(url: str) -> str:
"""URL에서 HTML을 가져온다."""
async with httpx.AsyncClient(timeout=_TIMEOUT, follow_redirects=True) as client:
resp = await client.get(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
resp.raise_for_status()
return resp.text
def _extract_text(html: str) -> str:
"""HTML에서 본문 텍스트를 추출한다."""
soup = BeautifulSoup(html, "html.parser")
# 스마트에디터 3 (SE3)
container = soup.select_one("div.se-main-container")
if not container:
# 구 에디터
container = soup.select_one("div#postViewArea")
if not container:
# 폴백: body 전체
container = soup.body
if not container:
return ""
# 스크립트/스타일 제거
for tag in container.find_all(["script", "style"]):
tag.decompose()
text = container.get_text(separator="\n", strip=True)
return text[:_MAX_CONTENT_LENGTH]
async def crawl_blog_content(url: str) -> str:
"""네이버 블로그 URL에서 본문 텍스트 추출.
- 네이버 블로그가 아니면 빈 문자열
- 크롤링 실패 시 빈 문자열 (에러 로그만)
- 본문 최대 2,000자
"""
parsed = _parse_naver_blog_url(url)
if not parsed:
return ""
blog_id, log_no = parsed
# iframe 내부 실제 본문 URL
post_url = f"https://blog.naver.com/PostView.naver?blogId={blog_id}&logNo={log_no}"
try:
html = await _fetch_html(post_url)
return _extract_text(html)
except Exception as e:
logger.warning("블로그 크롤링 실패 (%s): %s", url, e)
return ""
async def enrich_top_blogs(top_blogs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""top_blogs 리스트 각 항목에 content 필드를 추가.
개별 크롤링 실패 시 해당 항목의 content를 빈 문자열로 설정하고 나머지 계속 진행.
"""
result = []
for blog in top_blogs:
enriched = dict(blog)
try:
enriched["content"] = await crawl_blog_content(blog.get("link", ""))
except Exception:
enriched["content"] = ""
result.append(enriched)
return result

View File

@@ -1,3 +0,0 @@
[pytest]
asyncio_mode = auto
pythonpath = .

View File

@@ -1,6 +0,0 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
requests==2.32.3
anthropic==0.52.0
beautifulsoup4>=4.12
httpx>=0.27

View File

@@ -1,9 +0,0 @@
"""공통 테스트 픽스처."""
import os
import sys
# app 패키지를 blog_lab_app으로도 import 가능하게
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
if "blog_lab_app" not in sys.modules:
import app as blog_lab_app
sys.modules["blog_lab_app"] = blog_lab_app

View File

@@ -1,85 +0,0 @@
"""브랜드커넥트 링크 API 테스트."""
import os
import pytest
from fastapi.testclient import TestClient
@pytest.fixture(autouse=True)
def setup_db(tmp_path):
test_db = str(tmp_path / "test.db")
import app.config as config
config.DB_PATH = test_db
from app import db
db.DB_PATH = test_db
db.init_db()
yield
@pytest.fixture
def client():
from app.main import app
return TestClient(app)
def test_create_link(client):
resp = client.post("/api/blog-marketing/links", json={
"keyword_id": 1,
"url": "https://link.coupang.com/abc",
"product_name": "테스트 상품",
"description": "상품 설명",
})
assert resp.status_code == 201
data = resp.json()
assert data["url"] == "https://link.coupang.com/abc"
assert data["product_name"] == "테스트 상품"
def test_create_link_requires_url(client):
resp = client.post("/api/blog-marketing/links", json={
"product_name": "상품",
})
assert resp.status_code == 422
def test_create_link_requires_product_name(client):
resp = client.post("/api/blog-marketing/links", json={
"url": "https://a.com",
})
assert resp.status_code == 422
def test_list_links_by_keyword_id(client):
client.post("/api/blog-marketing/links", json={
"keyword_id": 1, "url": "https://a.com", "product_name": "A",
})
client.post("/api/blog-marketing/links", json={
"keyword_id": 2, "url": "https://b.com", "product_name": "B",
})
resp = client.get("/api/blog-marketing/links?keyword_id=1")
assert resp.status_code == 200
assert len(resp.json()["links"]) == 1
def test_update_link(client):
create_resp = client.post("/api/blog-marketing/links", json={
"url": "https://a.com", "product_name": "원래",
})
link_id = create_resp.json()["id"]
resp = client.put(f"/api/blog-marketing/links/{link_id}", json={
"product_name": "새이름",
})
assert resp.status_code == 200
assert resp.json()["product_name"] == "새이름"
def test_delete_link(client):
create_resp = client.post("/api/blog-marketing/links", json={
"url": "https://a.com", "product_name": "삭제",
})
link_id = create_resp.json()["id"]
resp = client.delete(f"/api/blog-marketing/links/{link_id}")
assert resp.status_code == 200
assert resp.json()["ok"] is True
resp = client.delete(f"/api/blog-marketing/links/{link_id}")
assert resp.status_code == 404

View File

@@ -1,67 +0,0 @@
"""brand_links DB CRUD 테스트."""
import os
import pytest
from app import db
from app.config import DB_PATH
@pytest.fixture(autouse=True)
def setup_db(tmp_path):
"""테스트용 임시 DB 사용."""
test_db = str(tmp_path / "test.db")
import app.config as config
config.DB_PATH = test_db
db.DB_PATH = test_db
db.init_db()
yield
def test_add_brand_link():
link = db.add_brand_link({
"keyword_id": 1,
"url": "https://link.coupang.com/abc",
"product_name": "테스트 상품",
"description": "상품 설명",
"placement_hint": "본문 중간",
})
assert link["id"] is not None
assert link["url"] == "https://link.coupang.com/abc"
assert link["product_name"] == "테스트 상품"
assert link["keyword_id"] == 1
assert link["post_id"] is None
def test_get_brand_links_by_keyword_id():
db.add_brand_link({"keyword_id": 1, "url": "https://a.com", "product_name": "A"})
db.add_brand_link({"keyword_id": 1, "url": "https://b.com", "product_name": "B"})
db.add_brand_link({"keyword_id": 2, "url": "https://c.com", "product_name": "C"})
links = db.get_brand_links(keyword_id=1)
assert len(links) == 2
def test_get_brand_links_by_post_id():
db.add_brand_link({"post_id": 10, "url": "https://a.com", "product_name": "A"})
links = db.get_brand_links(post_id=10)
assert len(links) == 1
assert links[0]["post_id"] == 10
def test_update_brand_link():
link = db.add_brand_link({"url": "https://a.com", "product_name": "원래 이름"})
updated = db.update_brand_link(link["id"], {"product_name": "새 이름", "post_id": 5})
assert updated["product_name"] == "새 이름"
assert updated["post_id"] == 5
def test_delete_brand_link():
link = db.add_brand_link({"url": "https://a.com", "product_name": "삭제할 링크"})
assert db.delete_brand_link(link["id"]) is True
assert db.delete_brand_link(link["id"]) is False
def test_link_keyword_to_post():
db.add_brand_link({"keyword_id": 1, "url": "https://a.com", "product_name": "A"})
db.add_brand_link({"keyword_id": 1, "url": "https://b.com", "product_name": "B"})
db.link_brand_links_to_post(keyword_id=1, post_id=10)
links = db.get_brand_links(post_id=10)
assert len(links) == 2

View File

@@ -1,74 +0,0 @@
"""평가자 단계 테스트 — 6기준 60점."""
import json
import pytest
from unittest.mock import patch
def test_review_post_has_6_criteria():
"""6개 기준으로 채점하는지 확인."""
from app.quality_reviewer import review_post
mock_response = json.dumps({
"scores": {
"empathy": 8, "click_appeal": 7, "conversion": 9,
"seo": 8, "format": 7, "link_natural": 9,
},
"total": 48,
"pass": True,
"feedback": "전체적으로 우수합니다",
})
with patch("app.quality_reviewer._get_client") as mock_client_fn, \
patch("app.quality_reviewer.get_template", return_value="제목: {title}\n본문: {body}"):
mock_client = mock_client_fn.return_value
mock_client.messages.create.return_value.content = [type("C", (), {"text": mock_response})()]
result = review_post("테스트 제목", "<p>본문</p>")
assert "link_natural" in result["scores"]
assert len(result["scores"]) == 6
assert result["total"] == 48
assert result["pass"] is True
def test_review_pass_threshold_is_42():
"""통과 기준이 42점인지 확인."""
from app.quality_reviewer import PASS_THRESHOLD
assert PASS_THRESHOLD == 42
def test_review_fails_below_42():
"""42점 미만이면 불통과."""
from app.quality_reviewer import review_post
mock_response = json.dumps({
"scores": {
"empathy": 5, "click_appeal": 5, "conversion": 5,
"seo": 5, "format": 5, "link_natural": 5,
},
"total": 30,
"pass": False,
"feedback": "개선 필요",
})
with patch("app.quality_reviewer._get_client") as mock_client_fn, \
patch("app.quality_reviewer.get_template", return_value="제목: {title}\n본문: {body}"):
mock_client = mock_client_fn.return_value
mock_client.messages.create.return_value.content = [type("C", (), {"text": mock_response})()]
result = review_post("제목", "<p>본문</p>")
assert result["pass"] is False
def test_review_handles_parse_failure():
"""JSON 파싱 실패 시 기본값 반환 (6개 기준)."""
from app.quality_reviewer import review_post
with patch("app.quality_reviewer._get_client") as mock_client_fn, \
patch("app.quality_reviewer.get_template", return_value="제목: {title}\n본문: {body}"):
mock_client = mock_client_fn.return_value
mock_client.messages.create.return_value.content = [type("C", (), {"text": "잘못된 응답"})()]
result = review_post("제목", "<p>본문</p>")
assert result["pass"] is False
assert "link_natural" in result["scores"]
assert result["total"] == 0

View File

@@ -1,66 +0,0 @@
"""마케터 단계 테스트."""
import json
import pytest
from unittest.mock import patch
def test_enhance_for_conversion_inserts_links():
"""마케터가 브랜드 링크를 본문에 삽입."""
from app.marketer import enhance_for_conversion
brand_links = [
{"url": "https://link.coupang.com/abc", "product_name": "갤럭시 버즈3",
"description": "노이즈캔슬링", "placement_hint": "본문 중간"},
]
mock_response = json.dumps({
"title": "마케팅된 제목",
"body": '<p>본문 <a href="https://link.coupang.com/abc">갤럭시 버즈3</a></p>',
"excerpt": "요약",
})
with patch("app.marketer._call_claude", return_value=mock_response) as mock_call, \
patch("app.marketer.get_template", return_value="초안: {draft_body}\n키워드: {keyword}\n링크:\n{brand_links_info}"):
result = enhance_for_conversion(
post_body="<p>초안 본문</p>",
post_title="초안 제목",
brand_links=brand_links,
keyword="무선 이어폰",
)
prompt_used = mock_call.call_args[0][0]
assert "갤럭시 버즈3" in prompt_used
assert "노이즈캔슬링" in prompt_used
assert result["title"] == "마케팅된 제목"
def test_enhance_requires_brand_links():
"""브랜드 링크가 없으면 ValueError."""
from app.marketer import enhance_for_conversion
with pytest.raises(ValueError, match="브랜드커넥트 링크가 필요합니다"):
enhance_for_conversion(
post_body="<p>본문</p>",
post_title="제목",
brand_links=[],
keyword="테스트",
)
def test_enhance_json_parse_fallback():
"""JSON 파싱 실패 시 원본 제목 유지."""
from app.marketer import enhance_for_conversion
brand_links = [{"url": "https://a.com", "product_name": "상품"}]
with patch("app.marketer._call_claude", return_value="잘못된 JSON"), \
patch("app.marketer.get_template", return_value="초안: {draft_body}\n키워드: {keyword}\n링크:\n{brand_links_info}"):
result = enhance_for_conversion(
post_body="<p>원본</p>",
post_title="원본 제목",
brand_links=brand_links,
keyword="테스트",
)
assert result["title"] == "원본 제목"
assert result["body"] == "잘못된 JSON"

View File

@@ -1,146 +0,0 @@
"""4단계 파이프라인 통합 테스트."""
import os
import pytest
from unittest.mock import patch
from fastapi.testclient import TestClient
@pytest.fixture(autouse=True)
def setup_db(tmp_path):
test_db = str(tmp_path / "test.db")
import app.config as config
config.DB_PATH = test_db
from app import db
db.DB_PATH = test_db
db.init_db()
yield
@pytest.fixture
def client():
from app.main import app
return TestClient(app)
def test_full_pipeline_status_flow(client):
"""draft → marketed → reviewed → published 상태 흐름."""
from app import db
# 1. 키워드 분석 결과 직접 삽입
analysis = db.add_keyword_analysis({
"keyword": "무선 이어폰",
"blog_total": 1000,
"shop_total": 500,
"competition": 45,
"opportunity": 60,
"top_products": [{"title": "에어팟", "lprice": 200000, "mallName": "애플"}],
"top_blogs": [{"title": "리뷰", "link": "https://blog.naver.com/user/123", "content": "본문"}],
})
# 2. 브랜드 링크 등록
resp = client.post("/api/blog-marketing/links", json={
"keyword_id": analysis["id"],
"url": "https://link.coupang.com/abc",
"product_name": "삼성 버즈3",
"description": "노이즈캔슬링",
})
assert resp.status_code == 201
# 3. 포스트 직접 생성 (generate는 Claude API 필요)
post = db.add_post({
"keyword_id": analysis["id"],
"title": "무선 이어폰 추천",
"body": "<p>초안 본문</p>",
"excerpt": "요약",
"tags": ["이어폰"],
"status": "draft",
})
db.link_brand_links_to_post(keyword_id=analysis["id"], post_id=post["id"])
# 4. 상태 확인: draft
resp = client.get(f"/api/blog-marketing/posts/{post['id']}")
assert resp.json()["status"] == "draft"
# 5. marketed 상태
db.update_post(post["id"], {"status": "marketed", "body": "<p>마케팅된 본문</p>"})
resp = client.get(f"/api/blog-marketing/posts/{post['id']}")
assert resp.json()["status"] == "marketed"
# 6. reviewed 상태 (점수 48/60 = 통과)
db.update_post(post["id"], {
"status": "reviewed",
"review_score": 48,
"review_detail": {
"scores": {"empathy": 8, "click_appeal": 8, "conversion": 8, "seo": 8, "format": 8, "link_natural": 8},
"total": 48, "pass": True, "feedback": "우수"
},
})
resp = client.get(f"/api/blog-marketing/posts/{post['id']}")
assert resp.json()["status"] == "reviewed"
assert resp.json()["review_score"] == 48
# 7. 발행
resp = client.post(f"/api/blog-marketing/posts/{post['id']}/publish", json={
"naver_url": "https://blog.naver.com/mypost/123",
})
assert resp.json()["status"] == "published"
def test_links_associated_with_post(client):
"""keyword_id로 등록한 링크가 post 생성 후 post_id로도 조회 가능."""
from app import db
analysis = db.add_keyword_analysis({"keyword": "테스트", "blog_total": 10, "shop_total": 5})
client.post("/api/blog-marketing/links", json={
"keyword_id": analysis["id"],
"url": "https://link.com/1",
"product_name": "상품1",
})
post = db.add_post({"keyword_id": analysis["id"], "title": "제목", "body": "본문", "status": "draft"})
db.link_brand_links_to_post(keyword_id=analysis["id"], post_id=post["id"])
resp = client.get(f"/api/blog-marketing/links?post_id={post['id']}")
links = resp.json()["links"]
assert len(links) == 1
assert links[0]["product_name"] == "상품1"
@patch("app.main.ANTHROPIC_API_KEY", "fake-key-for-test")
def test_market_endpoint_returns_404_for_missing_post(client):
"""존재하지 않는 post_id로 마케터 호출 시 404."""
resp = client.post("/api/blog-marketing/market/9999")
assert resp.status_code == 404
@patch("app.main.ANTHROPIC_API_KEY", "fake-key-for-test")
def test_review_endpoint_returns_404_for_missing_post(client):
"""존재하지 않는 post_id로 리뷰 호출 시 404."""
resp = client.post("/api/blog-marketing/review/9999")
assert resp.status_code == 404
def test_multiple_links_per_keyword(client):
"""하나의 키워드에 복수 링크 등록 가능."""
from app import db
analysis = db.add_keyword_analysis({"keyword": "테스트", "blog_total": 10, "shop_total": 5})
for i in range(3):
resp = client.post("/api/blog-marketing/links", json={
"keyword_id": analysis["id"],
"url": f"https://link.com/{i}",
"product_name": f"상품{i}",
})
assert resp.status_code == 201
resp = client.get(f"/api/blog-marketing/links?keyword_id={analysis['id']}")
assert len(resp.json()["links"]) == 3
def test_dashboard_still_works(client):
"""대시보드 API가 여전히 정상 작동."""
resp = client.get("/api/blog-marketing/dashboard")
assert resp.status_code == 200
data = resp.json()
assert "total_posts" in data
assert "published_posts" in data

View File

@@ -1,58 +0,0 @@
"""리서치 단계 크롤링 통합 테스트."""
from unittest.mock import patch
def test_analyze_keyword_with_crawling_enriches_top_blogs():
"""analyze_keyword_with_crawling가 top_blogs에 content 필드를 추가."""
from app.naver_search import analyze_keyword_with_crawling
mock_blog_result = {
"total": 100,
"items": [
{"title": "테스트 블로그", "link": "https://blog.naver.com/user1/111",
"bloggername": "유저1", "description": "설명", "postdate": "20260401"},
],
}
mock_shop_result = {
"total": 50,
"items": [{"title": "상품1", "lprice": 10000, "mallName": "쿠팡"}],
"price_stats": {"min": 10000, "max": 10000, "avg": 10000, "count": 1},
}
with patch("app.naver_search.search_blog", return_value=mock_blog_result), \
patch("app.naver_search.search_shopping", return_value=mock_shop_result), \
patch("app.naver_search._run_enrich", return_value=[
{"title": "테스트 블로그", "link": "https://blog.naver.com/user1/111",
"bloggername": "유저1", "description": "설명", "postdate": "20260401",
"content": "크롤링된 본문 내용"}
]):
result = analyze_keyword_with_crawling("테스트 키워드")
assert "content" in result["top_blogs"][0]
assert result["top_blogs"][0]["content"] == "크롤링된 본문 내용"
def test_analyze_keyword_with_crawling_fallback_on_enrich_failure():
"""크롤링 실패 시 기존 데이터 유지."""
from app.naver_search import analyze_keyword_with_crawling
mock_blog_result = {
"total": 50,
"items": [{"title": "블로그", "link": "https://blog.naver.com/u/1", "bloggername": "유저", "description": "설명"}],
}
mock_shop_result = {"total": 10, "items": [], "price_stats": None}
with patch("app.naver_search.search_blog", return_value=mock_blog_result), \
patch("app.naver_search.search_shopping", return_value=mock_shop_result), \
patch("app.naver_search._run_enrich", side_effect=Exception("크롤링 실패")):
# _run_enrich 내부에서 예외를 잡으므로 실제로는 이 테스트에서는
# _run_enrich 자체가 예외를 던지는 상황을 시뮬레이션
# 하지만 _run_enrich는 내부에서 잡으므로, 직접 fallback 테스트
pass
# _run_enrich 자체 fallback 테스트
from app.naver_search import _run_enrich
original_blogs = [{"title": "원본", "link": "https://blog.naver.com/u/1"}]
with patch("app.web_crawler.enrich_top_blogs", side_effect=Exception("fail")):
result = _run_enrich(original_blogs)
assert result == original_blogs # fallback으로 원본 반환

View File

@@ -1,94 +0,0 @@
"""web_crawler 모듈 테스트."""
import pytest
from unittest.mock import patch, AsyncMock
from app.web_crawler import crawl_blog_content, enrich_top_blogs, _parse_naver_blog_url, _extract_text
def test_parse_naver_blog_url_valid():
"""blog.naver.com URL에서 blogId와 logNo를 올바르게 파싱."""
result = _parse_naver_blog_url("https://blog.naver.com/testuser/123456")
assert result == ("testuser", "123456")
def test_parse_returns_none_for_invalid_url():
"""잘못된 URL은 None 반환."""
result = _parse_naver_blog_url("https://example.com/post")
assert result is None
def test_extract_text_prefers_se_main_container():
"""SE3 에디터 컨테이너를 우선 선택."""
html = '<div class="se-main-container"><p>SE3 본문</p></div><div id="postViewArea"><p>구 에디터</p></div>'
assert _extract_text(html) == "SE3 본문"
def test_extract_text_falls_back_to_post_view_area():
"""SE3 없으면 구 에디터 컨테이너 사용."""
html = '<div id="postViewArea"><p>구 에디터 본문</p></div>'
assert _extract_text(html) == "구 에디터 본문"
def test_extract_text_removes_script_and_style():
"""스크립트/스타일 태그 제거."""
html = '<div class="se-main-container"><p>본문</p><script>alert(1)</script><style>.x{}</style></div>'
result = _extract_text(html)
assert "alert" not in result
assert ".x" not in result
assert "본문" in result
def test_extract_text_returns_empty_on_no_container():
"""컨테이너가 없고 body도 없으면 빈 문자열."""
assert _extract_text("") == ""
@pytest.mark.asyncio
async def test_crawl_returns_empty_on_non_naver_url():
"""네이버 블로그가 아닌 URL은 빈 문자열 반환."""
result = await crawl_blog_content("https://example.com/post")
assert result == ""
@pytest.mark.asyncio
async def test_crawl_truncates_to_2000_chars():
"""본문이 2000자를 초과하면 잘라낸다."""
long_html = f'<div class="se-main-container"><p>{"" * 3000}</p></div>'
with patch("app.web_crawler._fetch_html", new_callable=AsyncMock, return_value=long_html):
result = await crawl_blog_content("https://blog.naver.com/testuser/123")
assert len(result) <= 2000
@pytest.mark.asyncio
async def test_crawl_returns_empty_on_fetch_failure():
"""HTTP 요청 실패 시 빈 문자열 반환."""
with patch("app.web_crawler._fetch_html", new_callable=AsyncMock, side_effect=Exception("timeout")):
result = await crawl_blog_content("https://blog.naver.com/testuser/123")
assert result == ""
@pytest.mark.asyncio
async def test_enrich_top_blogs_adds_content_field():
"""enrich_top_blogs가 각 블로그에 content 필드를 추가."""
blogs = [
{"title": "테스트", "link": "https://blog.naver.com/user1/111", "bloggername": "유저1", "description": "설명"},
{"title": "테스트2", "link": "https://blog.naver.com/user2/222", "bloggername": "유저2", "description": "설명2"},
]
with patch("app.web_crawler.crawl_blog_content", new_callable=AsyncMock, return_value="크롤링된 본문"):
result = await enrich_top_blogs(blogs)
assert len(result) == 2
assert result[0]["content"] == "크롤링된 본문"
assert result[1]["content"] == "크롤링된 본문"
@pytest.mark.asyncio
async def test_enrich_top_blogs_handles_partial_failure():
"""일부 크롤링 실패 시에도 나머지는 정상 처리."""
blogs = [
{"title": "성공", "link": "https://blog.naver.com/user1/111"},
{"title": "실패", "link": "https://blog.naver.com/user2/222"},
]
side_effects = ["성공 본문", Exception("fail")]
with patch("app.web_crawler.crawl_blog_content", new_callable=AsyncMock, side_effect=side_effects):
result = await enrich_top_blogs(blogs)
assert result[0]["content"] == "성공 본문"
assert result[1]["content"] == ""

View File

@@ -1,86 +0,0 @@
"""작가 단계 테스트 -- 크롤링 본문 + 링크 참조 글 생성."""
import json
import pytest
from unittest.mock import patch
def test_generate_blog_post_includes_crawled_content():
"""크롤링 본문이 프롬프트에 포함되는지 확인."""
from app.content_generator import generate_blog_post
analysis = {
"keyword": "무선 이어폰",
"top_products": [{"title": "에어팟", "lprice": 200000, "mallName": "애플"}],
"top_blogs": [
{"title": "에어팟 리뷰", "content": "에어팟을 한 달간 써봤는데 음질이 정말 좋았습니다."},
],
}
mock_response = json.dumps({
"title": "무선 이어폰 추천",
"body": "<p>본문</p>",
"excerpt": "요약",
"tags": ["이어폰"],
})
with patch("app.content_generator._call_claude", return_value=mock_response) as mock_call, \
patch("app.content_generator.get_template", return_value=(
"키워드: {keyword}\n참고 블로그:\n{reference_blogs}\n상품: {top_products}\n링크 상품: {brand_products}"
)):
result = generate_blog_post(analysis, "트렌드 브리프", brand_links=[])
prompt_used = mock_call.call_args[0][0]
assert "에어팟을 한 달간 써봤는데" in prompt_used
assert result["title"] == "무선 이어폰 추천"
def test_generate_blog_post_includes_brand_links():
"""브랜드커넥트 링크 정보가 프롬프트에 포함되는지 확인."""
from app.content_generator import generate_blog_post
analysis = {"keyword": "무선 이어폰", "top_products": [], "top_blogs": []}
brand_links = [
{"url": "https://link.coupang.com/abc", "product_name": "삼성 버즈3",
"description": "노이즈캔슬링 지원", "placement_hint": "본문 중간"},
]
mock_response = json.dumps({
"title": "제목", "body": "<p>본문</p>", "excerpt": "요약", "tags": ["태그"],
})
with patch("app.content_generator._call_claude", return_value=mock_response) as mock_call, \
patch("app.content_generator.get_template", return_value=(
"키워드: {keyword}\n참고 블로그:\n{reference_blogs}\n상품: {top_products}\n링크 상품: {brand_products}"
)):
result = generate_blog_post(analysis, "트렌드 브리프", brand_links=brand_links)
prompt_used = mock_call.call_args[0][0]
assert "삼성 버즈3" in prompt_used
assert "노이즈캔슬링 지원" in prompt_used
def test_generate_blog_post_works_without_links():
"""링크 없이도 정상 동작."""
from app.content_generator import generate_blog_post
analysis = {"keyword": "테스트", "top_products": [], "top_blogs": []}
mock_response = json.dumps({
"title": "제목", "body": "<p>본문</p>", "excerpt": "요약", "tags": ["태그"],
})
with patch("app.content_generator._call_claude", return_value=mock_response), \
patch("app.content_generator.get_template", return_value=(
"키워드: {keyword}\n참고 블로그:\n{reference_blogs}\n상품: {top_products}\n링크 상품: {brand_products}"
)):
result = generate_blog_post(analysis, "브리프")
assert result["title"] == "제목"
def test_parse_blog_json_fallback():
"""JSON 파싱 실패 시 원본 텍스트를 body로 사용."""
from app.content_generator import _parse_blog_json
result = _parse_blog_json("잘못된 JSON", "테스트 키워드")
assert result["title"] == "테스트 키워드 추천 리뷰"
assert result["body"] == "잘못된 JSON"