import os import sqlite3 import json from typing import Any, Dict, List, Optional from .config import DB_PATH def _conn() -> sqlite3.Connection: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def init_db() -> None: with _conn() as conn: # 키워드/상품 분석 결과 conn.execute(""" CREATE TABLE IF NOT EXISTS keyword_analyses ( id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT NOT NULL, blog_total INTEGER NOT NULL DEFAULT 0, shop_total INTEGER NOT NULL DEFAULT 0, competition REAL NOT NULL DEFAULT 0, opportunity REAL NOT NULL DEFAULT 0, avg_price INTEGER, min_price INTEGER, max_price INTEGER, top_products TEXT NOT NULL DEFAULT '[]', top_blogs TEXT NOT NULL DEFAULT '[]', ai_summary TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_ka_created ON keyword_analyses(created_at DESC)") conn.execute("CREATE INDEX IF NOT EXISTS idx_ka_keyword ON keyword_analyses(keyword)") # 블로그 포스트 conn.execute(""" CREATE TABLE IF NOT EXISTS blog_posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, keyword_id INTEGER REFERENCES keyword_analyses(id), title TEXT NOT NULL DEFAULT '', body TEXT NOT NULL DEFAULT '', excerpt TEXT NOT NULL DEFAULT '', tags TEXT NOT NULL DEFAULT '[]', status TEXT NOT NULL DEFAULT 'draft', review_score INTEGER, review_detail TEXT NOT NULL DEFAULT '{}', naver_url TEXT NOT NULL DEFAULT '', trend_brief TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_bp_created ON blog_posts(created_at DESC)") conn.execute("CREATE INDEX IF NOT EXISTS idx_bp_status ON blog_posts(status)") # 수익(커미션) 추적 conn.execute(""" CREATE TABLE IF NOT EXISTS commissions ( id INTEGER PRIMARY KEY AUTOINCREMENT, post_id INTEGER REFERENCES blog_posts(id), month TEXT NOT NULL, clicks INTEGER NOT NULL DEFAULT 0, purchases INTEGER NOT NULL DEFAULT 0, revenue INTEGER NOT NULL DEFAULT 0, note TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_comm_month ON commissions(month)") conn.execute("CREATE INDEX IF NOT EXISTS idx_comm_post ON commissions(post_id)") # 비동기 작업 상태 (research / generate / review) conn.execute(""" CREATE TABLE IF NOT EXISTS generation_tasks ( id TEXT PRIMARY KEY, type TEXT NOT NULL DEFAULT 'research', status TEXT NOT NULL DEFAULT 'queued', progress INTEGER NOT NULL DEFAULT 0, message TEXT NOT NULL DEFAULT '', result_id INTEGER, error TEXT, params TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_gt_created ON generation_tasks(created_at DESC)") # AI 프롬프트 템플릿 conn.execute(""" CREATE TABLE IF NOT EXISTS prompt_templates ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, description TEXT NOT NULL DEFAULT '', template TEXT NOT NULL DEFAULT '', updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')) ) """) # 기본 프롬프트 템플릿 시딩 (존재하지 않을 때만) _seed_templates(conn) def _seed_templates(conn: sqlite3.Connection) -> None: """기본 프롬프트 템플릿을 DB에 시딩.""" templates = [ { "name": "trend_brief", "description": "네이버 블로그 트렌드 분석 + 제목/훅 전략 브리프", "template": ( "당신은 네이버 블로그 마케팅 전문가입니다.\n" "아래 키워드 분석 데이터를 바탕으로 블로그 포스팅 전략 브리프를 작성하세요.\n\n" "키워드: {keyword}\n" "블로그 경쟁도: {competition} (0-100, 높을수록 경쟁 치열)\n" "쇼핑 기회 점수: {opportunity} (0-100, 높을수록 기회 큼)\n" "상위 블로그 제목들: {top_blogs}\n" "상위 상품들: {top_products}\n\n" "다음을 포함해주세요:\n" "1. 클릭을 유도하는 제목 공식 3가지\n" "2. 도입부 훅 전략 (공감형, 질문형, 충격형 중 추천)\n" "3. 추천 해시태그 5-10개\n" "4. 경쟁 분석 요약 (기존 글 대비 차별화 포인트)\n" "5. SEO 키워드 배치 전략" ), }, { "name": "blog_write", "description": "공감형 1인칭 체험기 블로그 글 작성", "template": ( "당신은 네이버 블로그에서 월 100만 이상 수익을 올리는 전문 블로거입니다.\n" "아래 브리프를 바탕으로 블로그 글을 작성하세요.\n\n" "키워드: {keyword}\n" "트렌드 브리프: {trend_brief}\n" "상위 상품 정보: {top_products}\n\n" "작성 규칙:\n" "- 1인칭 체험기 형식 (\"제가 직접 써봤는데요\")\n" "- 1,500자 이상\n" "- 자연스러운 구어체 (네이버 블로그 톤)\n" "- 제품 비교표 포함 (마크다운 테이블)\n" "- 장단점 솔직하게 작성\n" "- 광고 고지 문구 포함: \"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다.\"\n" "- 추천 매트릭스 (가성비/품질/디자인 기준)\n" "- 자연스러운 CTA (구매 링크 유도)\n\n" "HTML 형식으로 작성하되, 네이버 블로그에서 바로 붙여넣기 가능한 형태로 만들어주세요." ), }, { "name": "quality_review", "description": "블로그 글 품질 리뷰 (5기준 × 10점)", "template": ( "당신은 블로그 콘텐츠 품질 평가 전문가입니다.\n" "아래 블로그 글을 5가지 기준으로 평가해주세요.\n\n" "제목: {title}\n" "본문: {body}\n\n" "평가 기준 (각 1-10점):\n" "1. 독자 공감도: 1인칭 체험기가 자연스럽고 공감되는가?\n" "2. 제목 클릭 유도력: 검색 결과에서 클릭하고 싶은 제목인가?\n" "3. 구매 전환력: 읽고 나서 제품을 사고 싶어지는가?\n" "4. SEO 최적화: 키워드 배치, 소제목, 길이가 적절한가?\n" "5. 형식 완성도: 비교표, 이미지 설명, 단락 구성이 잘 되어있는가?\n\n" "JSON 형식으로 응답:\n" "{{\n" " \"scores\": {{\n" " \"empathy\": N,\n" " \"click_appeal\": N,\n" " \"conversion\": N,\n" " \"seo\": N,\n" " \"format\": N\n" " }},\n" " \"total\": N,\n" " \"pass\": true/false,\n" " \"feedback\": \"개선 사항 설명\"\n" "}}" ), }, ] for t in templates: existing = conn.execute( "SELECT id FROM prompt_templates WHERE name = ?", (t["name"],) ).fetchone() if not existing: conn.execute( "INSERT INTO prompt_templates (name, description, template) VALUES (?, ?, ?)", (t["name"], t["description"], t["template"]), ) # ── keyword_analyses CRUD ──────────────────────────────────────────────────── def _ka_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "keyword": r["keyword"], "blog_total": r["blog_total"], "shop_total": r["shop_total"], "competition": r["competition"], "opportunity": r["opportunity"], "avg_price": r["avg_price"], "min_price": r["min_price"], "max_price": r["max_price"], "top_products": json.loads(r["top_products"]) if r["top_products"] else [], "top_blogs": json.loads(r["top_blogs"]) if r["top_blogs"] else [], "ai_summary": r["ai_summary"], "created_at": r["created_at"], } def add_keyword_analysis(data: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( """INSERT INTO keyword_analyses (keyword, blog_total, shop_total, competition, opportunity, avg_price, min_price, max_price, top_products, top_blogs, ai_summary) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( data.get("keyword", ""), data.get("blog_total", 0), data.get("shop_total", 0), data.get("competition", 0), data.get("opportunity", 0), data.get("avg_price"), data.get("min_price"), data.get("max_price"), json.dumps(data.get("top_products", []), ensure_ascii=False), json.dumps(data.get("top_blogs", []), ensure_ascii=False), data.get("ai_summary", ""), ), ) row = conn.execute( "SELECT * FROM keyword_analyses WHERE rowid = last_insert_rowid()" ).fetchone() return _ka_row_to_dict(row) def get_keyword_analysis(analysis_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute( "SELECT * FROM keyword_analyses WHERE id = ?", (analysis_id,) ).fetchone() return _ka_row_to_dict(row) if row else None def get_keyword_analyses(limit: int = 30) -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute( "SELECT * FROM keyword_analyses ORDER BY created_at DESC LIMIT ?", (limit,) ).fetchall() return [_ka_row_to_dict(r) for r in rows] def delete_keyword_analysis(analysis_id: int) -> bool: with _conn() as conn: row = conn.execute( "SELECT id FROM keyword_analyses WHERE id = ?", (analysis_id,) ).fetchone() if not row: return False conn.execute("DELETE FROM keyword_analyses WHERE id = ?", (analysis_id,)) return True # ── blog_posts CRUD ────────────────────────────────────────────────────────── def _post_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "keyword_id": r["keyword_id"], "title": r["title"], "body": r["body"], "excerpt": r["excerpt"], "tags": json.loads(r["tags"]) if r["tags"] else [], "status": r["status"], "review_score": r["review_score"], "review_detail": json.loads(r["review_detail"]) if r["review_detail"] else {}, "naver_url": r["naver_url"], "trend_brief": r["trend_brief"], "created_at": r["created_at"], "updated_at": r["updated_at"], } def add_post(data: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( """INSERT INTO blog_posts (keyword_id, title, body, excerpt, tags, status, review_score, review_detail, naver_url, trend_brief) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( data.get("keyword_id"), data.get("title", ""), data.get("body", ""), data.get("excerpt", ""), json.dumps(data.get("tags", []), ensure_ascii=False), data.get("status", "draft"), data.get("review_score"), json.dumps(data.get("review_detail", {}), ensure_ascii=False), data.get("naver_url", ""), data.get("trend_brief", ""), ), ) row = conn.execute( "SELECT * FROM blog_posts WHERE rowid = last_insert_rowid()" ).fetchone() return _post_row_to_dict(row) def get_post(post_id: int) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute( "SELECT * FROM blog_posts WHERE id = ?", (post_id,) ).fetchone() return _post_row_to_dict(row) if row else None def get_posts(status: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]: with _conn() as conn: if status: rows = conn.execute( "SELECT * FROM blog_posts WHERE status = ? ORDER BY created_at DESC LIMIT ?", (status, limit), ).fetchall() else: rows = conn.execute( "SELECT * FROM blog_posts ORDER BY created_at DESC LIMIT ?", (limit,) ).fetchall() return [_post_row_to_dict(r) for r in rows] def update_post(post_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: with _conn() as conn: fields = [] values = [] for k in ("title", "body", "excerpt", "status", "naver_url", "trend_brief"): if k in data: fields.append(f"{k} = ?") values.append(data[k]) if "tags" in data: fields.append("tags = ?") values.append(json.dumps(data["tags"], ensure_ascii=False)) if "review_score" in data: fields.append("review_score = ?") values.append(data["review_score"]) if "review_detail" in data: fields.append("review_detail = ?") values.append(json.dumps(data["review_detail"], ensure_ascii=False)) if not fields: return get_post(post_id) fields.append("updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')") values.append(post_id) conn.execute( f"UPDATE blog_posts SET {', '.join(fields)} WHERE id = ?", values ) row = conn.execute( "SELECT * FROM blog_posts WHERE id = ?", (post_id,) ).fetchone() return _post_row_to_dict(row) if row else None def delete_post(post_id: int) -> bool: with _conn() as conn: row = conn.execute( "SELECT id FROM blog_posts WHERE id = ?", (post_id,) ).fetchone() if not row: return False conn.execute("DELETE FROM blog_posts WHERE id = ?", (post_id,)) return True # ── commissions CRUD ───────────────────────────────────────────────────────── def _comm_row_to_dict(r) -> Dict[str, Any]: return { "id": r["id"], "post_id": r["post_id"], "month": r["month"], "clicks": r["clicks"], "purchases": r["purchases"], "revenue": r["revenue"], "note": r["note"], "created_at": r["created_at"], } def add_commission(data: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( """INSERT INTO commissions (post_id, month, clicks, purchases, revenue, note) VALUES (?, ?, ?, ?, ?, ?)""", ( data.get("post_id"), data.get("month", ""), data.get("clicks", 0), data.get("purchases", 0), data.get("revenue", 0), data.get("note", ""), ), ) row = conn.execute( "SELECT * FROM commissions WHERE rowid = last_insert_rowid()" ).fetchone() return _comm_row_to_dict(row) def get_commissions(post_id: Optional[int] = None, limit: int = 100) -> List[Dict[str, Any]]: with _conn() as conn: if post_id: rows = conn.execute( "SELECT * FROM commissions WHERE post_id = ? ORDER BY month DESC LIMIT ?", (post_id, limit), ).fetchall() else: rows = conn.execute( "SELECT * FROM commissions ORDER BY month DESC LIMIT ?", (limit,) ).fetchall() return [_comm_row_to_dict(r) for r in rows] def update_commission(comm_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: with _conn() as conn: fields = [] values = [] for k in ("month", "clicks", "purchases", "revenue", "note"): if k in data: fields.append(f"{k} = ?") values.append(data[k]) if not fields: return None values.append(comm_id) conn.execute( f"UPDATE commissions SET {', '.join(fields)} WHERE id = ?", values ) row = conn.execute( "SELECT * FROM commissions WHERE id = ?", (comm_id,) ).fetchone() return _comm_row_to_dict(row) if row else None def delete_commission(comm_id: int) -> bool: with _conn() as conn: row = conn.execute( "SELECT id FROM commissions WHERE id = ?", (comm_id,) ).fetchone() if not row: return False conn.execute("DELETE FROM commissions WHERE id = ?", (comm_id,)) return True def get_dashboard_stats() -> Dict[str, Any]: """대시보드 집계: 총 포스트/클릭/구매/수익 + 월별 추이.""" with _conn() as conn: total_posts = conn.execute("SELECT COUNT(*) FROM blog_posts").fetchone()[0] published = conn.execute( "SELECT COUNT(*) FROM blog_posts WHERE status = 'published'" ).fetchone()[0] agg = conn.execute( "SELECT COALESCE(SUM(clicks),0), COALESCE(SUM(purchases),0), COALESCE(SUM(revenue),0) FROM commissions" ).fetchone() monthly = conn.execute( """SELECT month, SUM(clicks) as clicks, SUM(purchases) as purchases, SUM(revenue) as revenue FROM commissions GROUP BY month ORDER BY month DESC LIMIT 12""" ).fetchall() top_posts = conn.execute( """SELECT bp.id, bp.title, COALESCE(SUM(c.revenue),0) as total_revenue FROM blog_posts bp LEFT JOIN commissions c ON c.post_id = bp.id GROUP BY bp.id ORDER BY total_revenue DESC LIMIT 5""" ).fetchall() return { "total_posts": total_posts, "published_posts": published, "total_clicks": agg[0], "total_purchases": agg[1], "total_revenue": agg[2], "monthly": [ {"month": r["month"], "clicks": r["clicks"], "purchases": r["purchases"], "revenue": r["revenue"]} for r in monthly ], "top_posts": [ {"id": r["id"], "title": r["title"], "total_revenue": r["total_revenue"]} for r in top_posts ], } # ── generation_tasks CRUD ──────────────────────────────────────────────────── def _task_row_to_dict(r) -> Dict[str, Any]: return { "task_id": r["id"], "type": r["type"], "status": r["status"], "progress": r["progress"], "message": r["message"], "result_id": r["result_id"], "error": r["error"], "params": json.loads(r["params"]) if r["params"] else {}, "created_at": r["created_at"], "updated_at": r["updated_at"], } def create_task(task_id: str, task_type: str, params: Dict[str, Any]) -> Dict[str, Any]: with _conn() as conn: conn.execute( "INSERT INTO generation_tasks (id, type, params) VALUES (?, ?, ?)", (task_id, task_type, json.dumps(params, ensure_ascii=False)), ) row = conn.execute( "SELECT * FROM generation_tasks WHERE id = ?", (task_id,) ).fetchone() return _task_row_to_dict(row) def update_task( task_id: str, status: str, progress: int, message: str, result_id: Optional[int] = None, error: Optional[str] = None, ) -> None: with _conn() as conn: conn.execute( """UPDATE generation_tasks SET status = ?, progress = ?, message = ?, result_id = ?, error = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE id = ?""", (status, progress, message, result_id, error, task_id), ) def get_task(task_id: str) -> Optional[Dict[str, Any]]: with _conn() as conn: row = conn.execute( "SELECT * FROM generation_tasks WHERE id = ?", (task_id,) ).fetchone() return _task_row_to_dict(row) if row else None # ── prompt_templates CRUD ──────────────────────────────────────────────────── def get_template(name: str) -> Optional[str]: with _conn() as conn: row = conn.execute( "SELECT template FROM prompt_templates WHERE name = ?", (name,) ).fetchone() return row["template"] if row else None def get_all_templates() -> List[Dict[str, Any]]: with _conn() as conn: rows = conn.execute("SELECT * FROM prompt_templates ORDER BY name").fetchall() return [ {"id": r["id"], "name": r["name"], "description": r["description"], "template": r["template"], "updated_at": r["updated_at"]} for r in rows ] def update_template(name: str, template: str) -> bool: with _conn() as conn: conn.execute( "UPDATE prompt_templates SET template = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now') WHERE name = ?", (template, name), ) return conn.execute( "SELECT id FROM prompt_templates WHERE name = ?", (name,) ).fetchone() is not None