11 Commits

Author SHA1 Message Date
2e042e18c5 fix(image-lab): env 변수를 다른 -lab과 동일하게 정렬 (TZ + :- defaults) 2026-05-23 11:51:38 +09:00
83e74ad1f4 fix(image-lab): volume mount을 video-lab과 동일한 ${RUNTIME_PATH}/data/image로 통일 2026-05-23 11:48:24 +09:00
b70caddff1 feat(image-lab): Dockerfile + compose entry + scripts 6위치 + nginx 차단
Task 5 of Video Studio backend plan. Wires image-lab Python code (T1-T4)
into NAS Docker infrastructure on port 18802.

- image-lab/Dockerfile (python:3.12-slim + uvicorn)
- image-lab/requirements.txt (fastapi, redis, httpx)
- image-lab/env.example (INTERNAL_API_KEY, IMAGE_DATA_DIR, REDIS_URL, CORS)
- docker-compose.yml: image-lab service block (port 18802, redis depends_on,
  healthcheck, volume ${RUNTIME_PATH}/image-data:/app/data) + frontend
  depends_on entry
- scripts/deploy-nas.sh: SERVICES += image-lab
- scripts/deploy.sh: BUILD_TARGETS/CONTAINER_NAMES/HEALTH_ENDPOINTS += image-lab,
  DATA_DIRS += image
- nginx/default.conf: /api/internal/image/ 3-layer block (IP allowlist +
  deny all + X-Internal-Key forward) mirroring /api/internal/video/

Plan-B-Video lesson: 6-location registration enforced per
feedback_nas_deploy_paths.md rule 3 to avoid 'transferring dockerfile: 2B'
deploy failure.

Tests: image-lab pytest 11 passed (no regression).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 11:46:45 +09:00
d6e34973a4 feat(image-lab): generate/tasks/providers 엔드포인트 (video-lab 복제) 2026-05-23 11:41:47 +09:00
7007c90665 feat(image-lab): /api/internal/image/update webhook (video-lab 복제) 2026-05-23 11:37:33 +09:00
ca7a502514 feat(image-lab): verify_internal_key (video-lab 복제) 2026-05-23 11:34:03 +09:00
dc471ecc60 feat(image-lab): image_tasks 테이블 + CRUD (video-lab 복제) 2026-05-23 11:31:02 +09:00
e91715bf2c docs(plan): video-studio Plan 1 — image-render 포트 18714(task-watcher 충돌 회피) + scripts 6위치 등재 step 추가 2026-05-23 11:28:21 +09:00
1e4c1b42b7 fix(insta-lab): 프롬프트 템플릿 GET이 미저장 시 코드 기본값 반환
slate_writer/category_seeds가 DB에 없으면 404 대신 생성 파이프라인이
실제 폴백하는 코드 기본값(card_writer.DEFAULT_PROMPT,
DEFAULT_CATEGORY_SEEDS)을 is_default=true로 반환. 편집 UI가 마스터
프롬프트를 표시·수정 가능. 미지정 이름은 여전히 404. 테스트 4건.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 02:50:33 +09:00
0190a6c206 feat(agent-office): 인스타 큐레이터 후보를 중복 제거 + 신뢰도 0.7+ 필터
_dedup_and_filter_keywords: score>=0.7만 남기고 동일 keyword 중복 제거
(최고 score 유지) 후 내림차순. _push_keyword_candidates가 이 필터를 거쳐
"확실한 것만" 전송, 후보 없으면 안내 메시지. 헬퍼 테스트 5건.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 02:50:33 +09:00
6ef4160da2 fix(stock): AI 뉴스 호재/악재 명확히 구분
(1) 부호 게이트: top_pos는 score>0, top_neg는 score<0만 분류해 양수(호재)
종목이 악재란에 채워지는 문제 제거. 중립(0)은 양쪽 모두 제외.
(2) 프롬프트: reason을 score 부호와 같은 방향 근거만 쓰도록 명시 —
호재 평가에 악재 내용, 악재 평가에 호재 내용 혼입 금지.
부호 게이트 회귀 테스트 2건 추가.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 02:50:18 +09:00
25 changed files with 2084 additions and 19 deletions

View File

@@ -18,6 +18,26 @@ from ..telegram import messaging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# 텔레그램 후보 푸시 시 "확실한 것만" 보내기 위한 최소 신뢰도 (키워드 score 0~1)
KEYWORD_MIN_SCORE = 0.7
def _dedup_and_filter_keywords(
keywords: List[Dict[str, Any]], min_score: float = KEYWORD_MIN_SCORE,
) -> List[Dict[str, Any]]:
"""score >= min_score 인 키워드만 남기고, 동일 keyword 중복 제거(최고 score 유지).
결과는 score 내림차순. 텔레그램 후보 푸시 전 정리용."""
best: Dict[str, Dict[str, Any]] = {}
for k in keywords:
if float(k.get("score", 0)) < min_score:
continue
name = str(k.get("keyword", "")).strip()
if not name:
continue
if name not in best or k["score"] > best[name]["score"]:
best[name] = k
return sorted(best.values(), key=lambda k: -k["score"])
async def _send_media_group(media: List[Dict[str, Any]], caption: str = "") -> Dict[str, Any]: async def _send_media_group(media: List[Dict[str, Any]], caption: str = "") -> Dict[str, Any]:
"""텔레그램 sendMediaGroup. media는 InputMediaPhoto dicts. """텔레그램 sendMediaGroup. media는 InputMediaPhoto dicts.
@@ -89,14 +109,18 @@ class InstaAgent(BaseAgent):
raise TimeoutError(f"{step} timeout {timeout_sec}s") raise TimeoutError(f"{step} timeout {timeout_sec}s")
async def _push_keyword_candidates(self, keywords: List[Dict[str, Any]]) -> None: async def _push_keyword_candidates(self, keywords: List[Dict[str, Any]]) -> None:
by_cat: Dict[str, List[Dict[str, Any]]] = {} # 중복 제거 + 신뢰도(score) 임계값 이상만 — "확실한 것만" 정리해서 전송
for k in keywords: filtered = _dedup_and_filter_keywords(keywords)
by_cat.setdefault(k["category"], []).append(k) if not filtered:
if not by_cat: await messaging.send_raw(
await messaging.send_raw("📰 [인스타 큐레이터] 오늘은 추천 키워드가 없습니다.") f"📰 [인스타 큐레이터] 오늘은 확실한 추천 키워드가 없습니다 (신뢰도 {KEYWORD_MIN_SCORE:.1f}+ 기준)."
)
return return
by_cat: Dict[str, List[Dict[str, Any]]] = {}
for k in filtered:
by_cat.setdefault(k["category"], []).append(k)
rows: List[List[Dict[str, Any]]] = [] rows: List[List[Dict[str, Any]]] = []
text_lines = ["📰 <b>[인스타 큐레이터]</b> 오늘의 키워드 후보"] text_lines = [f"📰 <b>[인스타 큐레이터]</b> 오늘의 키워드 후보 (신뢰도 {KEYWORD_MIN_SCORE:.1f}+)"]
for cat, items in by_cat.items(): for cat, items in by_cat.items():
text_lines.append(f"\n<b>{cat}</b>") text_lines.append(f"\n<b>{cat}</b>")
for k in items[:5]: for k in items[:5]:

View File

@@ -0,0 +1,55 @@
import os
import sys
import tempfile
_fd, _TMP = tempfile.mkstemp(suffix=".db")
os.close(_fd)
os.unlink(_TMP)
os.environ["AGENT_OFFICE_DB_PATH"] = _TMP
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.agents.insta import _dedup_and_filter_keywords, KEYWORD_MIN_SCORE
def test_filters_below_threshold():
"""score < 임계값(0.7) 키워드는 제외."""
kws = [
{"id": 1, "keyword": "금리인하", "category": "경제", "score": 0.9},
{"id": 2, "keyword": "환율", "category": "경제", "score": 0.6}, # 컷
{"id": 3, "keyword": "반도체", "category": "경제", "score": 0.71},
]
out = _dedup_and_filter_keywords(kws, min_score=0.7)
kept = {k["keyword"] for k in out}
assert kept == {"금리인하", "반도체"}
def test_dedup_keeps_highest_score():
"""동일 keyword 중복 시 최고 score 1개만 유지."""
kws = [
{"id": 1, "keyword": "AI", "category": "경제", "score": 0.75},
{"id": 2, "keyword": "AI", "category": "기술", "score": 0.92}, # 같은 키워드, 더 높음
]
out = _dedup_and_filter_keywords(kws, min_score=0.7)
assert len(out) == 1
assert out[0]["id"] == 2
assert out[0]["score"] == 0.92
def test_sorted_by_score_desc():
kws = [
{"id": 1, "keyword": "a", "category": "c", "score": 0.72},
{"id": 2, "keyword": "b", "category": "c", "score": 0.95},
{"id": 3, "keyword": "c", "category": "c", "score": 0.80},
]
out = _dedup_and_filter_keywords(kws, min_score=0.7)
assert [k["keyword"] for k in out] == ["b", "c", "a"]
def test_empty_when_all_below_threshold():
kws = [{"id": 1, "keyword": "x", "category": "c", "score": 0.4}]
assert _dedup_and_filter_keywords(kws, min_score=0.7) == []
def test_default_threshold_is_0_7():
assert KEYWORD_MIN_SCORE == 0.7

View File

@@ -113,6 +113,28 @@ services:
timeout: 5s timeout: 5s
retries: 3 retries: 3
image-lab:
build: ./image-lab
container_name: image-lab
restart: unless-stopped
ports:
- "18802:8000"
environment:
- TZ=${TZ:-Asia/Seoul}
- REDIS_URL=${REDIS_URL:-redis://redis:6379}
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
- IMAGE_DATA_DIR=/app/data
- CORS_ALLOW_ORIGINS=${CORS_ALLOW_ORIGINS:-http://localhost:3007,http://localhost:8080}
volumes:
- ${RUNTIME_PATH}/data/image:/app/data
depends_on:
- redis
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s
timeout: 5s
retries: 3
insta-lab: insta-lab:
build: build:
context: ./insta-lab context: ./insta-lab
@@ -289,6 +311,7 @@ services:
- packs-lab - packs-lab
- travel-proxy - travel-proxy
- video-lab - video-lab
- image-lab
ports: ports:
- "8080:80" - "8080:80"
volumes: volumes:

File diff suppressed because it is too large Load Diff

7
image-lab/Dockerfile Normal file
View File

@@ -0,0 +1,7 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app ./app
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

View File

13
image-lab/app/auth.py Normal file
View File

@@ -0,0 +1,13 @@
"""Windows image-render worker → NAS image-lab internal webhook 인증."""
from __future__ import annotations
import os
from fastapi import Header, HTTPException
def verify_internal_key(x_internal_key: str = Header(...)):
expected = os.getenv("INTERNAL_API_KEY")
if not expected:
raise HTTPException(401, "INTERNAL_API_KEY not configured on server")
if x_internal_key != expected:
raise HTTPException(401, "Invalid X-Internal-Key")

83
image-lab/app/db.py Normal file
View File

@@ -0,0 +1,83 @@
"""SQLite persistence for image_tasks. Single table — task 단위 추적만."""
from __future__ import annotations
import json
import os
import sqlite3
from contextlib import contextmanager
from typing import Any, Dict, Optional
DB_PATH = os.path.join(os.getenv("IMAGE_DATA_DIR", "/app/data"), "image.db")
@contextmanager
def _conn():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db() -> None:
with _conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS image_tasks (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
params TEXT NOT NULL,
status TEXT DEFAULT 'queued',
progress INTEGER DEFAULT 0,
message TEXT DEFAULT '',
image_url TEXT,
error TEXT,
created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
)
"""
)
def _row_to_dict(row) -> Dict[str, Any]:
return {
"id": row["id"], "provider": row["provider"], "params": row["params"],
"status": row["status"], "progress": row["progress"], "message": row["message"],
"image_url": row["image_url"], "error": row["error"],
"created_at": row["created_at"], "updated_at": row["updated_at"],
}
def create_task(task_id: str, provider: str, params: Dict[str, Any]) -> Dict[str, Any]:
with _conn() as conn:
conn.execute(
"INSERT INTO image_tasks (id, provider, params) VALUES (?, ?, ?)",
(task_id, provider, json.dumps(params)),
)
row = conn.execute("SELECT * FROM image_tasks WHERE id = ?", (task_id,)).fetchone()
return _row_to_dict(row)
def update_task(task_id: str, status: str, progress: int, message: str = "",
image_url: Optional[str] = None, error: Optional[str] = None) -> None:
with _conn() as conn:
conn.execute(
"""
UPDATE image_tasks
SET status = ?, progress = ?, message = ?, image_url = ?, error = ?,
updated_at = strftime('%Y-%m-%dT%H:%M:%fZ','now')
WHERE id = ?
""",
(status, progress, message, image_url, error, task_id),
)
def get_task(task_id: str) -> Optional[Dict[str, Any]]:
with _conn() as conn:
row = conn.execute("SELECT * FROM image_tasks WHERE id = ?", (task_id,)).fetchone()
return _row_to_dict(row) if row else None

View File

@@ -0,0 +1,52 @@
"""Windows image-render → NAS image-lab internal webhook.
POST /api/internal/image/update
- X-Internal-Key 인증 필수
- image_tasks row update (status, progress, message, image_url, error)
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from . import db
from .auth import verify_internal_key
logger = logging.getLogger(__name__)
router = APIRouter()
class UpdatePayload(BaseModel):
task_id: str
status: str = Field(..., description="processing|succeeded|failed")
progress: int = Field(..., ge=0, le=100)
message: str = ""
image_url: Optional[str] = None
error: Optional[str] = None
@router.post(
"/api/internal/image/update",
dependencies=[Depends(verify_internal_key)],
)
def image_update(payload: UpdatePayload):
task = db.get_task(payload.task_id)
if task is None:
raise HTTPException(404, f"task not found: {payload.task_id}")
db.update_task(
payload.task_id,
payload.status,
payload.progress,
message=payload.message,
image_url=payload.image_url,
error=payload.error,
)
logger.info(
"internal/image/update task=%s status=%s progress=%d",
payload.task_id, payload.status, payload.progress,
)
return {"ok": True}

113
image-lab/app/main.py Normal file
View File

@@ -0,0 +1,113 @@
"""FastAPI entrypoint for image-lab.
POST /api/image/generate — provider + prompt → Redis push → task_id
GET /api/image/tasks/{id} — DB 조회
GET /api/image/providers — 3 provider 메타
"""
from __future__ import annotations
import json
import logging
import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from . import db
from .internal_router import router as internal_router
logger = logging.getLogger(__name__)
CORS_ALLOW_ORIGINS = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080")
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379")
redis_client = aioredis.from_url(REDIS_URL, decode_responses=False)
SUPPORTED_PROVIDERS = {"gpt_image", "nano_banana", "flux"}
app = FastAPI()
app.include_router(internal_router)
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in CORS_ALLOW_ORIGINS.split(",")],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=["Content-Type"],
)
@app.on_event("startup")
def on_startup():
db.init_db()
@app.get("/health")
def health():
return {"ok": True, "service": "image-lab"}
@app.get("/api/image/providers")
def list_providers():
"""3 provider 항상 노출 (key 누락은 worker가 failed 보고)."""
return {"providers": [
{"id": "gpt_image", "name": "GPT Image 2.0", "models": ["gpt-image-1"],
"sizes": ["1024x1024", "1024x1536", "1536x1024"]},
{"id": "nano_banana", "name": "Nano Banana (Gemini)", "models": ["gemini-2.5-flash-image"],
"sizes": ["1024x1024"]},
{"id": "flux", "name": "FLUX (local)", "models": ["flux-schnell", "flux-dev"],
"sizes": ["1024x1024", "832x1216", "1216x832"]},
]}
class GenerateRequest(BaseModel):
provider: str = Field(..., description="gpt_image|nano_banana|flux")
model: Optional[str] = None
prompt: str
size: Optional[str] = None
negative_prompt: Optional[str] = None
# Provider 별 추가 키는 extra 허용
extra: Optional[Dict[str, Any]] = None
class Config:
extra = "allow"
async def _push_render_job(task_id: str, job_type: str, params: dict) -> None:
"""Redis queue:image-render에 push."""
kst = timezone(timedelta(hours=9))
payload = {
"task_id": task_id,
"kind": "image",
"job_type": job_type,
"params": params,
"submitted_at": datetime.now(kst).isoformat(),
}
await redis_client.rpush("queue:image-render", json.dumps(payload))
@app.post("/api/image/generate")
async def generate_image(req: GenerateRequest):
"""이미지 생성 — Redis 큐로 Windows image-render에 위임."""
if req.provider not in SUPPORTED_PROVIDERS:
raise HTTPException(400, f"지원하지 않는 provider: {req.provider} (supported: {sorted(SUPPORTED_PROVIDERS)})")
task_id = str(uuid.uuid4())
params = req.model_dump(exclude_none=True)
db.create_task(task_id, req.provider, params)
job_type = f"{req.provider}_generation" # gpt_image_generation, nano_banana_generation, flux_generation
await _push_render_job(task_id, job_type, params)
return {"task_id": task_id, "provider": req.provider}
@app.get("/api/image/tasks/{task_id}")
def get_task_status(task_id: str):
t = db.get_task(task_id)
if not t:
raise HTTPException(404, "task not found")
return t

4
image-lab/env.example Normal file
View File

@@ -0,0 +1,4 @@
INTERNAL_API_KEY=replace-me
IMAGE_DATA_DIR=/app/data
CORS_ALLOW_ORIGINS=http://localhost:3007,http://localhost:8080
REDIS_URL=redis://redis:6379

View File

@@ -0,0 +1,5 @@
fastapi==0.115.0
uvicorn[standard]==0.30.6
pydantic==2.9.2
redis==5.0.8
httpx==0.27.2

View File

View File

@@ -0,0 +1,19 @@
import pytest
from fastapi import HTTPException
from app.auth import verify_internal_key
def test_no_server_key_rejects(monkeypatch):
monkeypatch.delenv("INTERNAL_API_KEY", raising=False)
with pytest.raises(HTTPException) as e:
verify_internal_key("anything")
assert e.value.status_code == 401
def test_wrong_key_rejects(monkeypatch):
monkeypatch.setenv("INTERNAL_API_KEY", "secret")
with pytest.raises(HTTPException) as e:
verify_internal_key("wrong")
assert e.value.status_code == 401
def test_correct_key_passes(monkeypatch):
monkeypatch.setenv("INTERNAL_API_KEY", "secret")
assert verify_internal_key("secret") is None

View File

@@ -0,0 +1,29 @@
import os, tempfile, importlib
def _fresh_db(monkeypatch, tmp):
monkeypatch.setenv("IMAGE_DATA_DIR", tmp)
import app.db as db
importlib.reload(db)
db.init_db()
return db
def test_create_and_get_task(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
db = _fresh_db(monkeypatch, tmp)
row = db.create_task("t1", "gpt_image", {"prompt": "a cat"})
assert row["id"] == "t1"
assert row["provider"] == "gpt_image"
assert row["status"] == "queued"
got = db.get_task("t1")
assert got["id"] == "t1"
assert db.get_task("nope") is None
def test_update_task_sets_image_url(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
db = _fresh_db(monkeypatch, tmp)
db.create_task("t2", "nano_banana", {"prompt": "x"})
db.update_task("t2", "succeeded", 100, message="done", image_url="/media/image/t2.png")
got = db.get_task("t2")
assert got["status"] == "succeeded"
assert got["image_url"] == "/media/image/t2.png"
assert got["progress"] == 100

View File

@@ -0,0 +1,38 @@
import os, tempfile, importlib
from fastapi import FastAPI
from fastapi.testclient import TestClient
def _client(monkeypatch, tmp):
monkeypatch.setenv("IMAGE_DATA_DIR", tmp)
monkeypatch.setenv("INTERNAL_API_KEY", "secret")
import app.db as db; importlib.reload(db); db.init_db()
import app.internal_router as ir; importlib.reload(ir)
app = FastAPI(); app.include_router(ir.router)
return TestClient(app), db
def test_update_requires_key(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
client, db = _client(monkeypatch, tmp)
db.create_task("t1", "gpt_image", {"prompt": "x"})
r = client.post("/api/internal/image/update",
json={"task_id": "t1", "status": "succeeded", "progress": 100})
assert r.status_code == 422 or r.status_code == 401 # header 누락
def test_update_succeeds_with_key(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
client, db = _client(monkeypatch, tmp)
db.create_task("t1", "gpt_image", {"prompt": "x"})
r = client.post("/api/internal/image/update",
headers={"X-Internal-Key": "secret"},
json={"task_id": "t1", "status": "succeeded", "progress": 100,
"image_url": "/media/image/t1.png"})
assert r.status_code == 200
assert db.get_task("t1")["image_url"] == "/media/image/t1.png"
def test_update_unknown_task_404(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
client, db = _client(monkeypatch, tmp)
r = client.post("/api/internal/image/update",
headers={"X-Internal-Key": "secret"},
json={"task_id": "nope", "status": "failed", "progress": 0})
assert r.status_code == 404

View File

@@ -0,0 +1,43 @@
import os, tempfile, importlib
from fastapi.testclient import TestClient
def _client(monkeypatch, tmp):
monkeypatch.setenv("IMAGE_DATA_DIR", tmp)
import app.db as db
importlib.reload(db)
db.init_db()
import app.main as main
importlib.reload(main)
pushed = []
async def fake_push(task_id, job_type, params):
pushed.append((task_id, job_type, params))
monkeypatch.setattr(main, "_push_render_job", fake_push)
return TestClient(main.app), db, pushed
def test_providers_lists_three(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
client, _, _ = _client(monkeypatch, tmp)
r = client.get("/api/image/providers")
ids = {p["id"] for p in r.json()["providers"]}
assert ids == {"gpt_image", "nano_banana", "flux"}
def test_generate_rejects_unknown_provider(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
client, _, _ = _client(monkeypatch, tmp)
r = client.post("/api/image/generate", json={"provider": "midjourney", "prompt": "x"})
assert r.status_code == 400
def test_generate_creates_task_and_pushes(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
client, db, pushed = _client(monkeypatch, tmp)
r = client.post("/api/image/generate", json={"provider": "gpt_image", "prompt": "a cat"})
assert r.status_code == 200
task_id = r.json()["task_id"]
assert db.get_task(task_id)["status"] == "queued"
assert pushed[0][1] == "gpt_image_generation"

View File

@@ -271,12 +271,40 @@ class TemplateBody(BaseModel):
description: str = "" description: str = ""
def _default_prompt_templates() -> dict:
"""DB에 저장된 override가 없을 때 노출할 코드 기본값.
생성 파이프라인이 실제로 폴백하는 값과 동일한 단일 소스를 사용."""
return {
"slate_writer": {
"template": card_writer.DEFAULT_PROMPT,
"description": "카드 10페이지 카피 생성 마스터 프롬프트 (Claude Sonnet). "
"{category}/{keyword}/{articles} 치환자 필수.",
},
"category_seeds": {
"template": json.dumps(DEFAULT_CATEGORY_SEEDS, ensure_ascii=False, indent=2),
"description": "트렌드 수집·분류용 카테고리별 시드 키워드 (JSON). "
"최상위 키가 분류 라벨로도 쓰임.",
},
}
@app.get("/api/insta/templates/prompts/{name}") @app.get("/api/insta/templates/prompts/{name}")
def get_prompt(name: str): def get_prompt(name: str):
pt = db.get_prompt_template(name) pt = db.get_prompt_template(name)
if not pt: if pt:
raise HTTPException(404) return pt
return pt # DB override 없음 → 코드 기본값 노출 (편집 UI가 마스터 프롬프트를 보고 수정 가능)
defaults = _default_prompt_templates()
if name in defaults:
d = defaults[name]
return {
"name": name,
"template": d["template"],
"description": d["description"],
"updated_at": None,
"is_default": True,
}
raise HTTPException(404)
@app.put("/api/insta/templates/prompts/{name}") @app.put("/api/insta/templates/prompts/{name}")

View File

@@ -0,0 +1,63 @@
import os
import gc
import json
import tempfile
import pytest
from fastapi.testclient import TestClient
from app import db as db_module
@pytest.fixture
def client(monkeypatch):
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
monkeypatch.setattr(db_module, "DB_PATH", path)
db_module.init_db()
from app import main
monkeypatch.setattr(main, "DB_PATH", path)
with TestClient(main.app) as c:
yield c
gc.collect()
for ext in ("", "-wal", "-shm"):
try:
os.remove(path + ext)
except OSError:
pass
def test_get_slate_writer_returns_default_when_unset(client):
"""DB에 없으면 코드 기본 마스터 프롬프트를 200으로 반환 (404 아님)."""
resp = client.get("/api/insta/templates/prompts/slate_writer")
assert resp.status_code == 200
body = resp.json()
assert body["is_default"] is True
assert "{keyword}" in body["template"]
assert "{category}" in body["template"]
def test_get_category_seeds_returns_default_when_unset(client):
"""category_seeds 기본값은 유효한 JSON (카테고리→시드 배열)."""
resp = client.get("/api/insta/templates/prompts/category_seeds")
assert resp.status_code == 200
body = resp.json()
assert body["is_default"] is True
seeds = json.loads(body["template"])
assert "economy" in seeds and isinstance(seeds["economy"], list)
def test_get_unknown_prompt_still_404(client):
resp = client.get("/api/insta/templates/prompts/does_not_exist")
assert resp.status_code == 404
def test_saved_template_overrides_default(client):
"""PUT로 저장하면 이후 GET은 저장본(is_default 없음)을 반환."""
client.put("/api/insta/templates/prompts/slate_writer",
json={"template": "내 커스텀 프롬프트", "description": "custom"})
resp = client.get("/api/insta/templates/prompts/slate_writer")
assert resp.status_code == 200
body = resp.json()
assert body["template"] == "내 커스텀 프롬프트"
assert not body.get("is_default")

View File

@@ -276,6 +276,26 @@ server {
proxy_pass http://$video_internal_backend$request_uri; proxy_pass http://$video_internal_backend$request_uri;
} }
# Video Studio — Windows image-render → NAS image-lab internal webhook
# Layer 1·2: nginx IP 화이트리스트 (LAN + Tailscale)
# Layer 3: X-Internal-Key (FastAPI dependency)
location /api/internal/image/ {
allow 192.168.45.0/24; # LAN 화이트리스트
allow 100.64.0.0/10; # Tailscale CGNAT
allow 127.0.0.1; # NAS 내부
deny all;
resolver 127.0.0.11 valid=10s;
set $image_internal_backend image-lab:8000;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Internal-Key $http_x_internal_key;
proxy_pass http://$image_internal_backend$request_uri;
}
# portfolio API (Stock) — trailing slash 유무 모두 매칭 # portfolio API (Stock) — trailing slash 유무 모두 매칭
location /api/portfolio { location /api/portfolio {
proxy_http_version 1.1; proxy_http_version 1.1;

View File

@@ -2,7 +2,7 @@
set -euo pipefail set -euo pipefail
# ── 서비스 목록 (한 곳에서만 관리) ── # ── 서비스 목록 (한 곳에서만 관리) ──
SERVICES="lotto travel-proxy deployer stock music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab nginx scripts" SERVICES="lotto travel-proxy deployer stock music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab image-lab nginx scripts"
# 1. 자동 감지: Docker 컨테이너 내부인가? # 1. 자동 감지: Docker 컨테이너 내부인가?
if [ -d "/repo" ] && [ -d "/runtime" ]; then if [ -d "/repo" ] && [ -d "/runtime" ]; then

View File

@@ -15,15 +15,15 @@ flock -n 200 || { echo "Deploy already running, skipping"; exit 0; }
# ── 서비스 목록 (한 곳에서만 관리) ── # ── 서비스 목록 (한 곳에서만 관리) ──
# docker compose 서비스명 (deployer 제외 — 자기 자신을 재빌드하면 스크립트 중단) # docker compose 서비스명 (deployer 제외 — 자기 자신을 재빌드하면 스크립트 중단)
BUILD_TARGETS="lotto travel-proxy stock music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab frontend" BUILD_TARGETS="lotto travel-proxy stock music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab image-lab frontend"
# 컨테이너 이름 (고아 정리용 — blog-lab은 폐기 대상으로 정리 리스트에 유지) # 컨테이너 이름 (고아 정리용 — blog-lab은 폐기 대상으로 정리 리스트에 유지)
CONTAINER_NAMES="lotto stock music-lab insta-lab blog-lab realestate-lab agent-office personal packs-lab travel-proxy video-lab frontend" CONTAINER_NAMES="lotto stock music-lab insta-lab blog-lab realestate-lab agent-office personal packs-lab travel-proxy video-lab image-lab frontend"
# Infra 서비스 (image-based, 영속 데이터 보존을 위해 stop/rm 없이 up만) # Infra 서비스 (image-based, 영속 데이터 보존을 위해 stop/rm 없이 up만)
INFRA_SERVICES="redis" INFRA_SERVICES="redis"
# 헬스체크 대상 # 헬스체크 대상
HEALTH_ENDPOINTS="lotto stock travel-proxy music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab redis" HEALTH_ENDPOINTS="lotto stock travel-proxy music-lab insta-lab realestate-lab agent-office personal packs-lab video-lab image-lab redis"
# data 디렉토리 (packs-lab은 별도 media/packs 사용) # data 디렉토리 (packs-lab은 별도 media/packs 사용)
DATA_DIRS="music stock insta realestate agent-office personal video" DATA_DIRS="music stock insta realestate agent-office personal video image"
# 1. 자동 감지: Docker 컨테이너 내부인가? # 1. 자동 감지: Docker 컨테이너 내부인가?
if [ -d "/repo" ] && [ -d "/runtime" ]; then if [ -d "/repo" ] && [ -d "/runtime" ]; then

View File

@@ -15,9 +15,15 @@ PROMPT_TEMPLATE = """다음은 종목 {name}({ticker})에 대한 최근 뉴스 {
{news_block} {news_block}
이 뉴스들이 종목에 호재인지 악재인지 평가하세요. 이 뉴스들이 종목 주가에 호재인지 악재인지 종합 평가하세요.
score: -10(매우 강한 악재) ~ +10(매우 강한 호재) 사이의 실수. 0은 중립.
reason: 30자 이내 한 줄 근거. 규칙:
- score: -10(매우 강한 악재) ~ +10(매우 강한 호재) 사이의 실수. 명확한 방향성이 없으면 0(중립).
- 뉴스가 호재·악재로 섞여 있으면 주가에 더 우세한 쪽을 기준으로 부호를 정하세요.
- reason은 반드시 score 부호와 같은 방향의 근거만 쓰세요.
· score가 양수(호재)면 호재 근거만, 음수(악재)면 악재 근거만 적습니다.
· 호재 평가에 악재 내용을, 악재 평가에 호재 내용을 섞지 마세요.
- reason: 30자 이내 한 줄.
JSON으로만 응답하세요. 다른 텍스트 금지: JSON으로만 응답하세요. 다른 텍스트 금지:
{{"score": <float>, "reason": "<string>"}}""" {{"score": <float>, "reason": "<string>"}}"""

View File

@@ -124,8 +124,10 @@ async def refresh_daily(
if successes: if successes:
_upsert_news_sentiment(conn, asof, successes, source="articles") _upsert_news_sentiment(conn, asof, successes, source="articles")
top_pos = sorted(successes, key=lambda r: -r["score_raw"])[:5] # 부호 게이트: 호재(score>0)·악재(score<0)만 분류. score 미만 종목이 5개 미만이어도
top_neg = sorted(successes, key=lambda r: r["score_raw"])[:5] # 반대 부호 종목으로 채우지 않음 (양수 종목이 악재란에 섞이는 문제 방지). 중립(0)은 제외.
top_pos = sorted([r for r in successes if r["score_raw"] > 0], key=lambda r: -r["score_raw"])[:5]
top_neg = sorted([r for r in successes if r["score_raw"] < 0], key=lambda r: r["score_raw"])[:5]
return { return {
"asof": asof.isoformat(), "asof": asof.isoformat(),

View File

@@ -140,6 +140,71 @@ async def test_refresh_daily_no_match_ticker_skipped(conn):
assert {r["ticker"] for r in rows} == {"005930"} assert {r["ticker"] for r in rows} == {"005930"}
@pytest.mark.asyncio
async def test_refresh_daily_sign_gate_no_positive_in_neg(conn):
"""전 종목 양수 점수면 top_neg는 비어야 함 (호재 종목이 악재란에 채워지면 안 됨)."""
asof = dt.date(2026, 5, 13)
fake_articles_by_ticker = {
"005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
}
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
}
with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml:
mas.gather_articles_for_tickers = MagicMock(return_value=(fake_articles_by_ticker, fake_stats))
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3)
assert len(result["top_pos"]) == 3
assert result["top_neg"] == [] # 양수 종목이 악재란에 들어가면 안 됨
@pytest.mark.asyncio
async def test_refresh_daily_sign_gate_excludes_neutral(conn):
"""score=0(중립)은 호재·악재 어디에도 포함되지 않음."""
asof = dt.date(2026, 5, 13)
fake_articles_by_ticker = {
"005930": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"000660": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
"373220": [{"title": "h", "summary": "", "press": "", "pub_date": ""}],
}
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0}
async def fake_score(llm, ticker, news, *, name=None, model="m"):
return {
"ticker": ticker, "score_raw": scores[ticker], "reason": "r",
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
}
with patch.object(pipeline, "articles_source") as mas, \
patch.object(pipeline, "_analyzer") as ma, \
patch.object(pipeline, "_make_llm") as ml:
mas.gather_articles_for_tickers = MagicMock(return_value=(fake_articles_by_ticker, fake_stats))
ma.score_sentiment = fake_score
ml.return_value.__aenter__.return_value = AsyncMock()
ml.return_value.__aexit__.return_value = None
result = await pipeline.refresh_daily(conn, asof, concurrency=3)
pos_tickers = {r["ticker"] for r in result["top_pos"]}
neg_tickers = {r["ticker"] for r in result["top_neg"]}
assert pos_tickers == {"005930"}
assert neg_tickers == {"373220"}
assert "000660" not in pos_tickers and "000660" not in neg_tickers
def test_top_market_cap_tickers(conn): def test_top_market_cap_tickers(conn):
out = pipeline._top_market_cap_tickers(conn, n=2) out = pipeline._top_market_cap_tickers(conn, n=2)
assert out == ["005930", "000660"] assert out == ["005930", "000660"]