Files
web-page-backend/insta-lab/app/main.py
gahusb c62e3e70b9 fix(insta-lab): ranked가 judge에 보낼 후보를 상위 30개로 cap
미사용 키워드 대량 누적 시 judge 프롬프트/응답이 토큰 한도를 넘어 파싱 실패→claude 신호 전부 null로 degrade되던 문제(프로덕션 확인됨) 해결. base score 상위 JUDGE_CANDIDATE_CAP(30)개만 judge·선별에 적용해 claude 신호 일관 보장.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 03:07:29 +09:00

446 lines
16 KiB
Python

"""FastAPI entrypoint for insta-lab."""
import asyncio
import io
import json
import logging
import os
import zipfile
from datetime import datetime, timezone
from typing import Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks, Body, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel
from _shared.access_log import install as install_access_log
from .config import (
CORS_ALLOW_ORIGINS, NAVER_CLIENT_ID, ANTHROPIC_API_KEY,
INSTA_DATA_PATH, DB_PATH, DEFAULT_CATEGORY_SEEDS, KEYWORDS_PER_CATEGORY,
INSTA_DEFAULT_THEME,
)
import redis.asyncio as aioredis
from . import db, news_collector, keyword_extractor, card_writer, trend_collector, selection, selection_judge
from .internal_router import router as internal_router
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
redis_client = aioredis.from_url(REDIS_URL, decode_responses=False)
app = FastAPI()
install_access_log(app)
app.include_router(internal_router)
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in CORS_ALLOW_ORIGINS.split(",")],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
async def on_startup():
os.makedirs(INSTA_DATA_PATH, exist_ok=True)
db.init_db()
@app.on_event("shutdown")
async def on_shutdown():
pass
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/insta/status")
def status():
return {
"ok": True,
"naver_api": bool(NAVER_CLIENT_ID),
"anthropic_api": bool(ANTHROPIC_API_KEY),
}
# ── News ─────────────────────────────────────────────────────────
class CollectRequest(BaseModel):
categories: Optional[list[str]] = None
def _seeds_for(category: str) -> list[str]:
pt = db.get_prompt_template("category_seeds")
if pt and pt.get("template"):
try:
data = json.loads(pt["template"])
if category in data:
return list(data[category])
except Exception:
pass
return list(DEFAULT_CATEGORY_SEEDS.get(category, []))
async def _bg_collect(task_id: str, categories: list[str]):
try:
db.update_task(task_id, "processing", 10, "수집 중")
total = 0
for cat in categories:
seeds = _seeds_for(cat)
if not seeds:
continue
total += news_collector.collect_for_category(cat, seeds)
db.update_task(task_id, "succeeded", 100, f"{total}건 수집", result_id=total)
except Exception as e:
logger.exception("collect failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/news/collect")
def collect_news(req: CollectRequest, bg: BackgroundTasks):
cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys())
tid = db.create_task("news_collect", {"categories": cats})
bg.add_task(_bg_collect, tid, cats)
return {"task_id": tid, "categories": cats}
@app.get("/api/insta/news/articles")
def list_articles(category: Optional[str] = None, days: int = Query(7, ge=1, le=90)):
return {"items": db.list_news_articles(category=category, days=days)}
# ── Keywords ─────────────────────────────────────────────────────
class ExtractRequest(BaseModel):
categories: Optional[list[str]] = None
async def _bg_extract(task_id: str, categories: Optional[list[str]] = None):
try:
db.update_task(task_id, "processing", 10, "추출 중")
prefs_rows = db.get_preferences()
weights = {p["category"]: p["weight"] for p in prefs_rows}
if categories:
# 사용자가 카테고리 명시한 경우만 그 서브셋으로 균등 가중치 (override)
weights = {c: 1.0 for c in categories}
total = KEYWORDS_PER_CATEGORY * max(1, len([w for w in weights.values() if w > 0]))
keyword_extractor.extract_with_weights(weights, total_limit=total)
db.update_task(task_id, "succeeded", 100, "완료", result_id=0)
except Exception as e:
logger.exception("extract failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/keywords/extract")
def extract_keywords(req: ExtractRequest, bg: BackgroundTasks):
cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys())
tid = db.create_task("keyword_extract", {"categories": cats})
bg.add_task(_bg_extract, tid, cats)
return {"task_id": tid, "categories": cats}
@app.get("/api/insta/keywords")
def list_keywords(
category: Optional[str] = None,
used: Optional[bool] = None,
source: Optional[str] = None,
):
if source:
return {"items": db.list_trends(source=source, category=category, days=30)}
return {"items": db.list_trending_keywords(category=category, used=used)}
# judge(Claude)에 보낼 최대 후보 수 — 미사용 키워드 대량 누적 시 응답 truncation으로
# claude 점수가 전부 null로 degrade되는 것을 방지 (base score 상위 N개만 평가).
JUDGE_CANDIDATE_CAP = 30
@app.get("/api/insta/keywords/ranked")
def ranked_keywords(
limit: int = Query(20, ge=1, le=100),
threshold: float = Query(0.6, ge=0.0, le=1.0),
dedup_window_days: int = Query(14, ge=1, le=90),
):
candidates = db.list_trending_keywords(used=False)
if not candidates:
return {"items": []}
# base score 상위 JUDGE_CANDIDATE_CAP개로 제한 → judge·선별 동일 집합에 적용(claude 신호 일관)
candidates = sorted(
candidates, key=lambda c: float(c.get("score", 0.0)), reverse=True
)[:JUDGE_CANDIDATE_CAP]
issued = db.list_recent_issued_topics(window_days=dedup_window_days)
prefs = {p["category"]: p["weight"] for p in db.get_preferences()}
claude_scores = selection_judge.judge_candidates(candidates)
now_iso = datetime.now(timezone.utc).isoformat()
scored = selection.score_candidates(
candidates, issued, prefs, claude_scores=claude_scores,
threshold=threshold, now_iso=now_iso,
)
return {"items": scored[:limit]}
# ── Slates ───────────────────────────────────────────────────────
class SlateRequest(BaseModel):
keyword: str
category: str
keyword_id: Optional[int] = None
async def _bg_create_slate(task_id: str, keyword: str, category: str, keyword_id: Optional[int]):
try:
db.update_task(task_id, "processing", 30, "카피 생성 중")
sid = card_writer.write_slate(keyword=keyword, category=category)
if keyword_id:
db.mark_keyword_used(keyword_id)
# Redis 큐에 push — Windows insta-render worker가 BLPOP 후 렌더
from datetime import datetime, timezone, timedelta
kst = timezone(timedelta(hours=9))
payload = {
"task_id": task_id,
"kind": "insta",
"params": {"slate_id": sid, "theme": INSTA_DEFAULT_THEME},
"submitted_at": datetime.now(kst).isoformat(),
}
await redis_client.rpush("queue:insta-render", json.dumps(payload))
logger.info(f"슬레이트 생성 완료: slate_id={sid}, keyword={keyword!r}, category={category!r}")
# 사용자는 GET /api/insta/tasks/{task_id}로 폴링 — worker가 webhook으로 status update
db.update_task(task_id, "processing", 70, "Redis 큐 푸시 → Windows worker 대기 중", result_id=sid)
except Exception as e:
logger.exception("create slate failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/slates")
def create_slate(req: SlateRequest, bg: BackgroundTasks):
tid = db.create_task("slate_create", req.dict())
bg.add_task(_bg_create_slate, tid, req.keyword, req.category, req.keyword_id)
return {"task_id": tid}
@app.get("/api/insta/slates")
def list_slates(limit: int = Query(50, ge=1, le=500)):
return {"items": db.list_card_slates(limit=limit)}
@app.get("/api/insta/slates/{slate_id}")
def get_slate(slate_id: int):
s = db.get_card_slate(slate_id)
if not s:
raise HTTPException(404, "slate not found")
s["assets"] = db.list_card_assets(slate_id)
for k in ("cover_copy", "body_copies", "cta_copy", "hashtags"):
if isinstance(s.get(k), str):
try:
s[k] = json.loads(s[k])
except Exception:
pass
return s
async def _bg_render(task_id: str, slate_id: int):
"""Redis 큐에 push. 실 렌더는 Windows insta-render worker."""
try:
from datetime import datetime, timezone, timedelta
kst = timezone(timedelta(hours=9))
payload = {
"task_id": task_id,
"kind": "insta",
"params": {"slate_id": slate_id, "theme": INSTA_DEFAULT_THEME},
"submitted_at": datetime.now(kst).isoformat(),
}
await redis_client.rpush("queue:insta-render", json.dumps(payload))
logger.info(f"렌더 큐 푸시 완료: slate_id={slate_id}, task_id={task_id}")
db.update_task(task_id, "processing", 30, "Redis 큐 푸시 → Windows worker 대기 중")
except Exception as e:
logger.exception("queue push failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/slates/{slate_id}/render")
def render_slate_endpoint(slate_id: int, bg: BackgroundTasks):
if not db.get_card_slate(slate_id):
raise HTTPException(404, "slate not found")
tid = db.create_task("slate_render", {"slate_id": slate_id})
bg.add_task(_bg_render, tid, slate_id)
return {"task_id": tid}
@app.get("/api/insta/slates/{slate_id}/assets/{page}")
def get_asset(slate_id: int, page: int):
if not (1 <= page <= 10):
raise HTTPException(400, "page must be 1..10")
assets = db.list_card_assets(slate_id)
match = next((a for a in assets if a["page_index"] == page), None)
if not match:
raise HTTPException(404, "asset not found")
return FileResponse(match["file_path"], media_type="image/png")
@app.get("/api/insta/slates/{slate_id}/package")
def download_package(slate_id: int):
slate = db.get_card_slate(slate_id)
if not slate:
raise HTTPException(404, "slate not found")
assets = sorted(db.list_card_assets(slate_id), key=lambda a: a["page_index"])
if not assets:
raise HTTPException(409, "아직 렌더된 카드가 없습니다")
buf = io.BytesIO()
written = 0
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z:
for a in assets:
fp = a["file_path"]
if os.path.exists(fp):
z.write(fp, arcname=f"{a['page_index']:02d}.png")
written += 1
caption = (slate.get("suggested_caption") or "").strip()
tags = slate.get("hashtags") or []
if isinstance(tags, str):
try:
tags = json.loads(tags)
except Exception:
tags = []
caption_full = caption + ("\n\n" + " ".join(tags) if tags else "")
z.writestr("caption.txt", caption_full)
if written == 0:
raise HTTPException(409, "렌더된 카드 파일이 없습니다")
buf.seek(0)
return StreamingResponse(buf, media_type="application/zip", headers={
"Content-Disposition": f'attachment; filename="insta_slate_{slate_id}.zip"'
})
class DecisionBody(BaseModel):
decision: str # "approved" | "rejected"
@app.post("/api/insta/slates/{slate_id}/decision")
def slate_decision(slate_id: int, body: DecisionBody):
if not db.get_card_slate(slate_id):
raise HTTPException(404, "slate not found")
if body.decision not in ("approved", "rejected"):
raise HTTPException(400, "decision must be approved|rejected")
db.set_slate_decision(slate_id, body.decision)
return db.get_card_slate(slate_id)
@app.delete("/api/insta/slates/{slate_id}")
def delete_slate(slate_id: int):
if not db.get_card_slate(slate_id):
raise HTTPException(404)
for a in db.list_card_assets(slate_id):
try:
os.unlink(a["file_path"])
except OSError:
pass
db.delete_card_slate(slate_id)
return {"ok": True}
# ── Tasks ────────────────────────────────────────────────────────
@app.get("/api/insta/tasks/{task_id}")
def get_task_status(task_id: str):
t = db.get_task(task_id)
if not t:
raise HTTPException(404)
return t
# ── Prompt Templates ─────────────────────────────────────────────
class TemplateBody(BaseModel):
template: str
description: str = ""
def _default_prompt_templates() -> dict:
"""DB에 저장된 override가 없을 때 노출할 코드 기본값.
생성 파이프라인이 실제로 폴백하는 값과 동일한 단일 소스를 사용."""
return {
"slate_writer": {
"template": card_writer.DEFAULT_PROMPT,
"description": "카드 10페이지 카피 생성 마스터 프롬프트 (Claude Sonnet). "
"{category}/{keyword}/{articles} 치환자 필수.",
},
"category_seeds": {
"template": json.dumps(DEFAULT_CATEGORY_SEEDS, ensure_ascii=False, indent=2),
"description": "트렌드 수집·분류용 카테고리별 시드 키워드 (JSON). "
"최상위 키가 분류 라벨로도 쓰임.",
},
}
@app.get("/api/insta/templates/prompts/{name}")
def get_prompt(name: str):
pt = db.get_prompt_template(name)
if pt:
return pt
# DB override 없음 → 코드 기본값 노출 (편집 UI가 마스터 프롬프트를 보고 수정 가능)
defaults = _default_prompt_templates()
if name in defaults:
d = defaults[name]
return {
"name": name,
"template": d["template"],
"description": d["description"],
"updated_at": None,
"is_default": True,
}
raise HTTPException(404)
@app.put("/api/insta/templates/prompts/{name}")
def upsert_prompt(name: str, body: TemplateBody):
db.upsert_prompt_template(name, body.template, body.description)
return db.get_prompt_template(name)
# ── Trends ───────────────────────────────────────────────────────
class TrendsCollectRequest(BaseModel):
categories: Optional[list[str]] = None
async def _bg_collect_trends(task_id: str, categories: list[str]):
try:
db.update_task(task_id, "processing", 10, "외부 트렌드 수집 중")
result = trend_collector.collect_all(categories)
msg = f"naver:{result['naver_popular']}, youtube:{result['youtube_trending']}"
db.update_task(task_id, "succeeded", 100, msg, result_id=sum(result.values()))
except Exception as e:
logger.exception("trends collect failed")
db.update_task(task_id, "failed", 0, "", error=str(e))
@app.post("/api/insta/trends/collect")
def collect_trends(req: TrendsCollectRequest, bg: BackgroundTasks):
cats = req.categories or list(DEFAULT_CATEGORY_SEEDS.keys())
tid = db.create_task("trends_collect", {"categories": cats})
bg.add_task(_bg_collect_trends, tid, cats)
return {"task_id": tid, "categories": cats}
@app.get("/api/insta/trends")
def list_trends_endpoint(
source: Optional[str] = None,
category: Optional[str] = None,
days: int = Query(1, ge=1, le=90),
):
return {"items": db.list_trends(source=source, category=category, days=days)}
# ── Preferences ──────────────────────────────────────────────────
class PreferencesBody(BaseModel):
categories: dict[str, float]
@app.get("/api/insta/preferences")
def get_preferences_endpoint():
return {"categories": db.get_preferences()}
@app.put("/api/insta/preferences")
def put_preferences_endpoint(body: PreferencesBody):
db.upsert_preferences(body.categories)
return {"categories": db.get_preferences()}