"""FastAPI entrypoint for insta-lab.""" import asyncio import json import logging import os from typing import Optional from fastapi import FastAPI, HTTPException, BackgroundTasks, Body, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from pydantic import BaseModel from .config import ( CORS_ALLOW_ORIGINS, NAVER_CLIENT_ID, ANTHROPIC_API_KEY, INSTA_DATA_PATH, DB_PATH, DEFAULT_CATEGORY_SEEDS, KEYWORDS_PER_CATEGORY, INSTA_DEFAULT_THEME, ) import redis.asyncio as aioredis from . import db, news_collector, keyword_extractor, card_writer, card_renderer, trend_collector from .internal_router import router as internal_router logger = logging.getLogger(__name__) REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379") redis_client = aioredis.from_url(REDIS_URL, decode_responses=False) app = FastAPI() app.include_router(internal_router) app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in CORS_ALLOW_ORIGINS.split(",")], allow_credentials=False, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"], allow_headers=["Content-Type"], ) @app.on_event("startup") async def on_startup(): os.makedirs(INSTA_DATA_PATH, exist_ok=True) db.init_db() # Chromium browser pool 초기화 (CHECK_POINT 중기-6) await card_renderer.init_browser() @app.on_event("shutdown") async def on_shutdown(): await card_renderer.shutdown_browser() @app.get("/health") def health(): return {"ok": True} @app.get("/api/insta/status") def status(): return { "ok": True, "naver_api": bool(NAVER_CLIENT_ID), "anthropic_api": bool(ANTHROPIC_API_KEY), } # ── News ───────────────────────────────────────────────────────── class CollectRequest(BaseModel): categories: Optional[list[str]] = None def _seeds_for(category: str) -> list[str]: pt = db.get_prompt_template("category_seeds") if pt and pt.get("template"): try: data = json.loads(pt["template"]) if category in data: return list(data[category]) except Exception: pass return list(DEFAULT_CATEGORY_SEEDS.get(category, [])) async def _bg_collect(task_id: str, categories: list[str]): try: db.update_task(task_id, "processing", 10, "수집 중") total = 0 for cat in categories: seeds = _seeds_for(cat) if not seeds: continue total += news_collector.collect_for_category(cat, seeds) db.update_task(task_id, "succeeded", 100, f"{total}건 수집", result_id=total) except Exception as e: logger.exception("collect failed") db.update_task(task_id, "failed", 0, "", error=str(e)) @app.post("/api/insta/news/collect") def collect_news(req: CollectRequest, bg: BackgroundTasks): cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys()) tid = db.create_task("news_collect", {"categories": cats}) bg.add_task(_bg_collect, tid, cats) return {"task_id": tid, "categories": cats} @app.get("/api/insta/news/articles") def list_articles(category: Optional[str] = None, days: int = Query(7, ge=1, le=90)): return {"items": db.list_news_articles(category=category, days=days)} # ── Keywords ───────────────────────────────────────────────────── class ExtractRequest(BaseModel): categories: Optional[list[str]] = None async def _bg_extract(task_id: str, categories: Optional[list[str]] = None): try: db.update_task(task_id, "processing", 10, "추출 중") prefs_rows = db.get_preferences() weights = {p["category"]: p["weight"] for p in prefs_rows} if categories: # 사용자가 카테고리 명시한 경우만 그 서브셋으로 균등 가중치 (override) weights = {c: 1.0 for c in categories} total = KEYWORDS_PER_CATEGORY * max(1, len([w for w in weights.values() if w > 0])) keyword_extractor.extract_with_weights(weights, total_limit=total) db.update_task(task_id, "succeeded", 100, "완료", result_id=0) except Exception as e: logger.exception("extract failed") db.update_task(task_id, "failed", 0, "", error=str(e)) @app.post("/api/insta/keywords/extract") def extract_keywords(req: ExtractRequest, bg: BackgroundTasks): cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys()) tid = db.create_task("keyword_extract", {"categories": cats}) bg.add_task(_bg_extract, tid, cats) return {"task_id": tid, "categories": cats} @app.get("/api/insta/keywords") def list_keywords( category: Optional[str] = None, used: Optional[bool] = None, source: Optional[str] = None, ): if source: return {"items": db.list_trends(source=source, category=category, days=30)} return {"items": db.list_trending_keywords(category=category, used=used)} # ── Slates ─────────────────────────────────────────────────────── class SlateRequest(BaseModel): keyword: str category: str keyword_id: Optional[int] = None async def _bg_create_slate(task_id: str, keyword: str, category: str, keyword_id: Optional[int]): try: db.update_task(task_id, "processing", 30, "카피 생성 중") sid = card_writer.write_slate(keyword=keyword, category=category) if keyword_id: db.mark_keyword_used(keyword_id) # Redis 큐에 push — Windows insta-render worker가 BLPOP 후 렌더 from datetime import datetime, timezone, timedelta kst = timezone(timedelta(hours=9)) payload = { "task_id": task_id, "kind": "insta", "params": {"slate_id": sid, "theme": INSTA_DEFAULT_THEME}, "submitted_at": datetime.now(kst).isoformat(), } await redis_client.rpush("queue:insta-render", json.dumps(payload)) # 사용자는 GET /api/insta/tasks/{task_id}로 폴링 — worker가 webhook으로 status update db.update_task(task_id, "processing", 70, "Redis 큐 푸시 → Windows worker 대기 중", result_id=sid) except Exception as e: logger.exception("create slate failed") db.update_task(task_id, "failed", 0, "", error=str(e)) @app.post("/api/insta/slates") def create_slate(req: SlateRequest, bg: BackgroundTasks): tid = db.create_task("slate_create", req.dict()) bg.add_task(_bg_create_slate, tid, req.keyword, req.category, req.keyword_id) return {"task_id": tid} @app.get("/api/insta/slates") def list_slates(limit: int = Query(50, ge=1, le=500)): return {"items": db.list_card_slates(limit=limit)} @app.get("/api/insta/slates/{slate_id}") def get_slate(slate_id: int): s = db.get_card_slate(slate_id) if not s: raise HTTPException(404, "slate not found") s["assets"] = db.list_card_assets(slate_id) for k in ("cover_copy", "body_copies", "cta_copy", "hashtags"): if isinstance(s.get(k), str): try: s[k] = json.loads(s[k]) except Exception: pass return s async def _bg_render(task_id: str, slate_id: int): """Redis 큐에 push. 실 렌더는 Windows insta-render worker.""" try: from datetime import datetime, timezone, timedelta kst = timezone(timedelta(hours=9)) payload = { "task_id": task_id, "kind": "insta", "params": {"slate_id": slate_id, "theme": INSTA_DEFAULT_THEME}, "submitted_at": datetime.now(kst).isoformat(), } await redis_client.rpush("queue:insta-render", json.dumps(payload)) db.update_task(task_id, "processing", 30, "Redis 큐 푸시 → Windows worker 대기 중") except Exception as e: logger.exception("queue push failed") db.update_task(task_id, "failed", 0, "", error=str(e)) @app.post("/api/insta/slates/{slate_id}/render") def render_slate_endpoint(slate_id: int, bg: BackgroundTasks): if not db.get_card_slate(slate_id): raise HTTPException(404, "slate not found") tid = db.create_task("slate_render", {"slate_id": slate_id}) bg.add_task(_bg_render, tid, slate_id) return {"task_id": tid} @app.get("/api/insta/slates/{slate_id}/assets/{page}") def get_asset(slate_id: int, page: int): if not (1 <= page <= 10): raise HTTPException(400, "page must be 1..10") assets = db.list_card_assets(slate_id) match = next((a for a in assets if a["page_index"] == page), None) if not match: raise HTTPException(404, "asset not found") return FileResponse(match["file_path"], media_type="image/png") @app.delete("/api/insta/slates/{slate_id}") def delete_slate(slate_id: int): if not db.get_card_slate(slate_id): raise HTTPException(404) for a in db.list_card_assets(slate_id): try: os.unlink(a["file_path"]) except OSError: pass db.delete_card_slate(slate_id) return {"ok": True} # ── Tasks ──────────────────────────────────────────────────────── @app.get("/api/insta/tasks/{task_id}") def get_task_status(task_id: str): t = db.get_task(task_id) if not t: raise HTTPException(404) return t # ── Prompt Templates ───────────────────────────────────────────── class TemplateBody(BaseModel): template: str description: str = "" @app.get("/api/insta/templates/prompts/{name}") def get_prompt(name: str): pt = db.get_prompt_template(name) if not pt: raise HTTPException(404) return pt @app.put("/api/insta/templates/prompts/{name}") def upsert_prompt(name: str, body: TemplateBody): db.upsert_prompt_template(name, body.template, body.description) return db.get_prompt_template(name) # ── Trends ─────────────────────────────────────────────────────── class TrendsCollectRequest(BaseModel): categories: Optional[list[str]] = None async def _bg_collect_trends(task_id: str, categories: list[str]): try: db.update_task(task_id, "processing", 10, "외부 트렌드 수집 중") result = trend_collector.collect_all(categories) msg = f"naver:{result['naver_popular']}, youtube:{result['youtube_trending']}" db.update_task(task_id, "succeeded", 100, msg, result_id=sum(result.values())) except Exception as e: logger.exception("trends collect failed") db.update_task(task_id, "failed", 0, "", error=str(e)) @app.post("/api/insta/trends/collect") def collect_trends(req: TrendsCollectRequest, bg: BackgroundTasks): cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys()) tid = db.create_task("trends_collect", {"categories": cats}) bg.add_task(_bg_collect_trends, tid, cats) return {"task_id": tid, "categories": cats} @app.get("/api/insta/trends") def list_trends_endpoint( source: Optional[str] = None, category: Optional[str] = None, days: int = Query(1, ge=1, le=90), ): return {"items": db.list_trends(source=source, category=category, days=days)} # ── Preferences ────────────────────────────────────────────────── class PreferencesBody(BaseModel): categories: dict[str, float] @app.get("/api/insta/preferences") def get_preferences_endpoint(): return {"categories": db.get_preferences()} @app.put("/api/insta/preferences") def put_preferences_endpoint(body: PreferencesBody): db.upsert_preferences(body.categories) return {"categories": db.get_preferences()}