Compare commits

..

15 Commits

Author SHA1 Message Date
cb70226f42 feat(image-render): main + Dockerfile + compose entry (port 18714) 2026-05-23 12:10:29 +09:00
de24bae984 feat(image-render): Redis BLPOP worker + 3 provider dispatch 2026-05-23 12:06:24 +09:00
0e6c893b4e feat(image-render): flux (ComfyUI 로컬) provider + GPU 장중 가드 2026-05-23 12:03:23 +09:00
fb80973e38 feat(image-render): nano_banana (Gemini Flash Image) provider 2026-05-23 12:00:06 +09:00
31b0e7dbc4 feat(image-render): gpt_image provider + media helper (SP image) 2026-05-23 11:56:50 +09:00
6169f48eb8 feat(image-render): nas_client webhook adapter (video-render 복제) 2026-05-23 11:53:41 +09:00
27a6df6cff docs(task-watcher): NSSM_SETUP.md — SP-9 자동 시작 안내
ai_trade(HIGH, native python :8001) + wsl_docker(NORMAL, WSL2 Ubuntu-24.04
docker compose up). spec의 signal_v2→ai_trade, 22.04→24.04, web-ai-services
→web-ai/services 정정. sudoers NOPASSWD + 재부팅 검증 절차.
Plan-B-Infra Phase 3.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 01:46:56 +09:00
803fdb6278 feat(task-watcher): services/docker-compose entry (SP-10)
port 18713, REDIS_URL/STOCK_BASE_URL/TRADING_START/END env.
insta/music/video-render와 같은 services 묶음. outbound only.
Plan-B-Infra Phase 2 완료 — 박재오 빌드 대기.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 01:45:40 +09:00
77e21b54e6 feat(task-watcher): main.py + Dockerfile + requirements + env (SP-10)
FastAPI lifespan에서 watcher_loop 스폰. /health. tzdata(zoneinfo Asia/Seoul).
.env: REDIS_URL, STOCK_BASE_URL, TRADING_START/END.
Plan-B-Infra Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 01:44:48 +09:00
4d0c89ce79 feat(task-watcher): watcher.py — 30초 loop + queue:paused 토글 (SP-10)
trading → SET queue:paused 1 EX 600 / free → DEL.
holidays 1시간마다 refresh. PAUSED_TTL 600s (watcher 죽어도 자동 해제 — 안전).
mode 전환 시에만 로그.
Plan-B-Infra Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 01:43:48 +09:00
4b60ab34c3 feat(task-watcher): mode.py — 시간대+휴장일 판정 (SP-10)
current_mode(now, holidays): 비휴장 평일 07:00–16:30 → trading, 그 외 free.
fetch_holidays(): NAS /api/stock/holidays 조회 (실패 시 빈 set = free 안전).
TRADING_START/END env로 윈도우 조정. idle 감지 생략 (박재오 결정).
6 tests (평일 장중/장전/장후, 주말, 휴장, 경계).
Plan-B-Infra Phase 2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 01:42:36 +09:00
53a0657027 fix(video-render): Veo durationSeconds str → int (T10 follow-up 2)
end-to-end 검증 2차: Gemini API는 durationSeconds를 number로 요구.
str("6") → 400 INVALID_ARGUMENT. int(params["duration"])로 전송.
(WebFetch 문서는 string으로 표기했으나 실제 API는 number.)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 01:25:22 +09:00
91f01d126b fix(video-render): Veo numberOfVideos 무조건 추가 → optional (T10 follow-up)
end-to-end 검증에서 발견: veo-3.0-fast-generate-001은 numberOfVideos
파라미터 미지원 → 400 INVALID_ARGUMENT 즉시 실패.
호출자가 number_of_videos params 명시할 때만 body에 추가.
default body는 prompt + aspectRatio + (duration/resolution/negativePrompt
/personGeneration 조건부)만.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 08:45:13 +09:00
0702cf052f fix(video-render): Kling PiAPI → Native KlingAI (T11 follow-up)
박재오 발견: Kling 공식 API key 발급 (Access Key + Secret Key).
PiAPI gateway가 아닌 native api.klingai.com 사용.

변경:
- providers/kling.py: JWT 인증 (HS256, iss=access_key, exp=now+1800, nbf=now-5).
  POST /v1/videos/text2video → GET /v1/videos/{kind}/{task_id} 폴링.
  data.task_result.videos[0].url 다운로드.
  text2video / image2video 자동 분기.
- .env.example: PIAPI_API_KEY → KLING_ACCESS_KEY + KLING_SECRET_KEY
- docker-compose: 같은 env 교체
- requirements.txt: + PyJWT>=2.8.0

박재오 측: .env에 두 키 모두 입력.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 02:40:01 +09:00
8aa3f1c3b2 fix(video-render): Veo Vertex AI → Gemini API (T10 follow-up)
박재오 발견: Veo는 Gemini API key 단일로 충분 (ai.google.dev).
Vertex AI의 GCP project + service account JSON + GCS bucket 셋업 불필요.

변경:
- providers/veo.py: generativelanguage.googleapis.com/v1beta endpoint
  + x-goog-api-key 헤더 + response.generateVideoResponse.generatedSamples[0].video.uri
- .env.example: GOOGLE_PROJECT_ID/LOCATION/GCS_BUCKET/SA_JSON 4 변수 → GEMINI_API_KEY 1개
- docker-compose: GCP 4 env + SA JSON volume mount 제거, GEMINI_API_KEY 추가
- requirements.txt: google-cloud-storage 제거 (requests만 사용)

박재오 측 영향: /etc/webai/gcp-sa.json 더미 파일 + GCP_SA_JSON_HOST_PATH env 무관.
GEMINI_API_KEY 1개만 발급하여 .env에 추가하면 됨.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 02:32:11 +09:00
31 changed files with 1010 additions and 149 deletions

View File

@@ -63,17 +63,60 @@ services:
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18801}
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
- GOOGLE_PROJECT_ID=${GOOGLE_PROJECT_ID:-}
- GOOGLE_LOCATION=${GOOGLE_LOCATION:-us-central1}
- GOOGLE_GCS_BUCKET=${GOOGLE_GCS_BUCKET:-}
- GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
- PIAPI_API_KEY=${PIAPI_API_KEY:-}
- GEMINI_API_KEY=${GEMINI_API_KEY:-}
- KLING_ACCESS_KEY=${KLING_ACCESS_KEY:-}
- KLING_SECRET_KEY=${KLING_SECRET_KEY:-}
- SEEDANCE_API_KEY=${SEEDANCE_API_KEY:-}
- VIDEO_MEDIA_ROOT=${VIDEO_MEDIA_ROOT:-/mnt/nas/webpage/data/video}
- VIDEO_MEDIA_URL_PREFIX=${VIDEO_MEDIA_URL_PREFIX:-/media/video}
volumes:
- /mnt/nas/webpage/data/video:/mnt/nas/webpage/data/video
- ${GCP_SA_JSON_HOST_PATH:-/etc/webai/gcp-sa.json}:/app/keys/gcp-sa.json:ro
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s
timeout: 5s
retries: 3
task-watcher:
build:
context: ./task-watcher
container_name: task-watcher
restart: unless-stopped
ports:
- "18713:8000"
environment:
- TZ=Asia/Seoul
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
- STOCK_BASE_URL=${STOCK_BASE_URL:-http://192.168.45.54:18500}
- TRADING_START=${TRADING_START:-07:00}
- TRADING_END=${TRADING_END:-16:30}
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s
timeout: 5s
retries: 3
image-render:
build:
context: ./image-render
container_name: image-render
restart: unless-stopped
ports:
- "18714:8000"
environment:
- TZ=Asia/Seoul
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18802}
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
- GEMINI_API_KEY=${GEMINI_API_KEY:-}
- COMFYUI_URL=${COMFYUI_URL:-http://host.docker.internal:8188}
- FLUX_BLOCK_TRADING_HOURS=${FLUX_BLOCK_TRADING_HOURS:-1}
- IMAGE_MEDIA_ROOT=${IMAGE_MEDIA_ROOT:-/mnt/nas/webpage/data/image}
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- /mnt/nas/webpage/data/image:/mnt/nas/webpage/data/image
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
interval: 60s

View File

@@ -0,0 +1,16 @@
FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

View File

@@ -0,0 +1,18 @@
# Redis (NAS)
REDIS_URL=redis://192.168.45.54:6379
# NAS image-lab webhook
NAS_BASE_URL=http://192.168.45.54:18802
INTERNAL_API_KEY=replace-me
# API provider keys (worker reports failed if missing)
OPENAI_API_KEY=
GEMINI_API_KEY=
# Seedance key not used by image-render
# FLUX local
COMFYUI_URL=http://host.docker.internal:8188
FLUX_BLOCK_TRADING_HOURS=1
# NAS SMB mount target (image-render writes to this, NAS reads via /media/image/)
IMAGE_MEDIA_ROOT=/mnt/nas/webpage/data/image

View File

@@ -0,0 +1,36 @@
"""image-render FastAPI entry — health + lifespan (worker loop spawn)."""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
import worker
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
worker_task = asyncio.create_task(worker.worker_loop())
logger.info("image-render lifespan 시작")
try:
yield
finally:
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
logger.info("image-render lifespan 종료")
app = FastAPI(lifespan=lifespan)
@app.get("/health")
def health():
return {"ok": True, "service": "image-render"}

View File

@@ -0,0 +1,54 @@
"""NAS webhook 어댑터 — Windows worker → NAS image-lab HTTP 위임.
video-render nas_client 복제 (call-time os.getenv으로 테스트 격리).
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, Optional
import httpx
logger = logging.getLogger(__name__)
_TIMEOUT = 10.0
def _post(payload: Dict[str, Any]) -> None:
nas_base_url = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18802")
internal_api_key = os.getenv("INTERNAL_API_KEY", "")
url = f"{nas_base_url}/api/internal/image/update"
try:
r = httpx.post(
url,
headers={"X-Internal-Key": internal_api_key},
json=payload,
timeout=_TIMEOUT,
)
if r.status_code != 200:
logger.error("webhook %s returned %d: %s",
payload.get("task_id"), r.status_code, r.text[:200])
except Exception:
logger.exception("webhook %s 호출 실패", payload.get("task_id"))
def webhook_update_task(
task_id: str,
status: str,
progress: int,
message: str = "",
image_url: Optional[str] = None,
error: Optional[str] = None,
) -> None:
payload: Dict[str, Any] = {
"task_id": task_id,
"status": status,
"progress": progress,
"message": message,
}
if image_url is not None:
payload["image_url"] = image_url
if error is not None:
payload["error"] = error
_post(payload)

View File

@@ -0,0 +1,18 @@
"""b64 이미지 → NAS SMB 경로 저장 → /media/image URL 반환."""
from __future__ import annotations
import base64
import os
import uuid
IMAGE_MEDIA_ROOT = os.getenv("IMAGE_MEDIA_ROOT", "/mnt/nas/webpage/data/image")
IMAGE_MEDIA_URL_PREFIX = os.getenv("IMAGE_MEDIA_URL_PREFIX", "/media/image")
def save_b64_png(task_id: str, b64_data: str) -> str:
os.makedirs(IMAGE_MEDIA_ROOT, exist_ok=True)
fname = f"{task_id}-{uuid.uuid4().hex[:8]}.png"
path = os.path.join(IMAGE_MEDIA_ROOT, fname)
with open(path, "wb") as f:
f.write(base64.b64decode(b64_data))
return f"{IMAGE_MEDIA_URL_PREFIX}/{fname}"

View File

@@ -0,0 +1,79 @@
"""FLUX 로컬 — ComfyUI HTTP API.
POST {COMFYUI_URL}/prompt (workflow JSON) → prompt_id
GET {COMFYUI_URL}/history/{prompt_id} → outputs → image filename
GET {COMFYUI_URL}/view?filename=... → PNG bytes → b64
워크플로우 JSON은 `flux_workflow.json` (ComfyUI UI에서 "Save (API Format)"로 export, CLIPTextEncode 노드 text를 "%PROMPT%"로 수동 치환). 박재오 산출물.
"""
from __future__ import annotations
import base64, json, logging, os, time
from datetime import datetime, timezone, timedelta
import requests
from nas_client import webhook_update_task
from providers._media import save_b64_png
logger = logging.getLogger(__name__)
COMFYUI_URL = os.getenv("COMFYUI_URL", "http://127.0.0.1:8188")
WORKFLOW_PATH = os.path.join(os.path.dirname(__file__), "flux_workflow.json")
POLL_INTERVAL = 2
POLL_MAX = 120
def _is_trading_hours() -> bool:
kst = timezone(timedelta(hours=9))
now = datetime.now(kst)
if now.weekday() >= 5:
return False
return (now.hour, now.minute) >= (9, 0) and (now.hour, now.minute) <= (15, 30)
def _load_workflow(prompt: str, size: str) -> dict:
with open(WORKFLOW_PATH, encoding="utf-8") as f:
wf = json.load(f)
# CLIPTextEncode 노드의 text를 prompt로 치환 (workflow에 "%PROMPT%" placeholder 사용)
raw = json.dumps(wf).replace("%PROMPT%", prompt.replace('"', "'"))
return json.loads(raw)
def _submit_prompt(workflow: dict) -> str:
r = requests.post(f"{COMFYUI_URL}/prompt", json={"prompt": workflow}, timeout=30)
r.raise_for_status()
return r.json()["prompt_id"]
def _poll_image_b64(prompt_id: str):
for _ in range(POLL_MAX):
h = requests.get(f"{COMFYUI_URL}/history/{prompt_id}", timeout=10)
data = h.json().get(prompt_id)
if data and data.get("outputs"):
for node_out in data["outputs"].values():
for img in node_out.get("images", []):
view = requests.get(f"{COMFYUI_URL}/view",
params={"filename": img["filename"], "subfolder": img.get("subfolder", ""), "type": img.get("type", "output")},
timeout=30)
view.raise_for_status()
return base64.b64encode(view.content).decode()
time.sleep(POLL_INTERVAL)
return None
def run_flux_generation(task_id: str, params: dict) -> None:
try:
if os.getenv("FLUX_BLOCK_TRADING_HOURS") == "1" and _is_trading_hours():
webhook_update_task(task_id, "failed", 0, "", error="장중 GPU 보호 — FLUX 거부 (API provider 사용 권장)")
return
webhook_update_task(task_id, "processing", 10, "FLUX (ComfyUI) 생성 중...")
wf = _load_workflow(params["prompt"], params.get("size") or "1024x1024")
pid = _submit_prompt(wf)
b64 = _poll_image_b64(pid)
if not b64:
webhook_update_task(task_id, "failed", 0, "", error="ComfyUI 타임아웃 또는 출력 없음")
return
url = save_b64_png(task_id, b64)
webhook_update_task(task_id, "succeeded", 100, "완료", image_url=url)
except Exception as e:
logger.exception("flux task=%s 실패", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))

View File

@@ -0,0 +1,47 @@
"""GPT Image 2.0 — OpenAI Images API.
POST https://api.openai.com/v1/images/generations
body {model:"gpt-image-1", prompt, size, n:1} → data[0].b64_json
"""
from __future__ import annotations
import logging
import os
import requests
from nas_client import webhook_update_task
from providers._media import save_b64_png
logger = logging.getLogger(__name__)
OPENAI_URL = "https://api.openai.com/v1/images/generations"
DEFAULT_MODEL = "gpt-image-1"
def run_gpt_image_generation(task_id: str, params: dict) -> None:
try:
if not os.getenv("OPENAI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="OPENAI_API_KEY 미설정 (Windows .env)")
return
webhook_update_task(task_id, "processing", 10, "GPT Image 호출 중...")
body = {
"model": params.get("model") or DEFAULT_MODEL,
"prompt": params["prompt"],
"size": params.get("size") or "1024x1024",
"n": 1,
}
resp = requests.post(
OPENAI_URL,
headers={"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", "Content-Type": "application/json"},
json=body,
timeout=120,
)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "", error=f"OpenAI {resp.status_code}: {resp.text[:200]}")
return
b64 = resp.json()["data"][0]["b64_json"]
url = save_b64_png(task_id, b64)
webhook_update_task(task_id, "succeeded", 100, "완료", image_url=url)
except Exception as e:
logger.exception("gpt_image task=%s 실패", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))

View File

@@ -0,0 +1,52 @@
"""Nano Banana — Gemini 2.5 Flash Image (generativelanguage API).
POST /v1beta/models/{MODEL}:generateContent
→ candidates[0].content.parts[*].inlineData.data (b64 png)
"""
from __future__ import annotations
import logging, os
import requests
from nas_client import webhook_update_task
from providers._media import save_b64_png
logger = logging.getLogger(__name__)
GEMINI_BASE = "https://generativelanguage.googleapis.com/v1beta"
DEFAULT_MODEL = "gemini-2.5-flash-image"
def _extract_b64(data: dict):
for cand in data.get("candidates", []):
for part in cand.get("content", {}).get("parts", []):
inline = part.get("inlineData") or part.get("inline_data")
if inline and inline.get("data"):
return inline["data"]
return None
def run_nano_banana_generation(task_id: str, params: dict) -> None:
try:
if not os.getenv("GEMINI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="GEMINI_API_KEY 미설정 (Windows .env)")
return
webhook_update_task(task_id, "processing", 10, "Nano Banana (Gemini) 호출 중...")
model_id = params.get("model") or DEFAULT_MODEL
body = {"contents": [{"parts": [{"text": params["prompt"]}]}]}
resp = requests.post(
f"{GEMINI_BASE}/models/{model_id}:generateContent",
headers={"x-goog-api-key": os.getenv("GEMINI_API_KEY"), "Content-Type": "application/json"},
json=body, timeout=120,
)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "", error=f"Gemini {resp.status_code}: {resp.text[:200]}")
return
b64 = _extract_b64(resp.json())
if not b64:
webhook_update_task(task_id, "failed", 0, "", error="Gemini 응답에 이미지 없음")
return
url = save_b64_png(task_id, b64)
webhook_update_task(task_id, "succeeded", 100, "완료", image_url=url)
except Exception as e:
logger.exception("nano_banana task=%s 실패", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))

View File

@@ -0,0 +1,9 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
requests==2.32.3
redis>=5.0
httpx>=0.27
openai>=1.50.0
pytest>=8.0
pytest-asyncio>=0.24
respx>=0.21

View File

View File

@@ -0,0 +1,21 @@
import providers.flux as fx
def test_blocked_during_trading_hours(monkeypatch):
monkeypatch.setenv("FLUX_BLOCK_TRADING_HOURS", "1")
monkeypatch.setattr(fx, "_is_trading_hours", lambda: True)
calls = []
monkeypatch.setattr(fx, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
fx.run_flux_generation("t1", {"prompt": "a cat"})
assert calls[-1][0][1] == "failed"
assert "장중" in calls[-1][1]["error"]
def test_success_polls_history_and_saves(monkeypatch):
monkeypatch.setattr(fx, "_is_trading_hours", lambda: False)
calls = []
monkeypatch.setattr(fx, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
monkeypatch.setattr(fx, "_load_workflow", lambda prompt, size: {"3": {}})
monkeypatch.setattr(fx, "_submit_prompt", lambda wf: "pid-1")
monkeypatch.setattr(fx, "_poll_image_b64", lambda pid: "ZmFrZQ==")
monkeypatch.setattr(fx, "save_b64_png", lambda tid, b64: "/media/image/t1.png")
fx.run_flux_generation("t1", {"prompt": "a cat"})
assert [c for c in calls if c[0][1] == "succeeded"]

View File

@@ -0,0 +1,32 @@
import providers.gpt_image as gi
def test_missing_key_reports_failed(monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
calls = []
monkeypatch.setattr(gi, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
gi.run_gpt_image_generation("t1", {"prompt": "a cat"})
# 마지막 호출이 failed
assert calls[-1][0][1] == "failed"
def test_success_saves_and_reports_url(monkeypatch):
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
calls = []
monkeypatch.setattr(gi, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
monkeypatch.setattr(gi, "save_b64_png", lambda tid, b64: "/media/image/t1.png")
class FakeResp:
status_code = 200
def json(self):
return {"data": [{"b64_json": "ZmFrZQ=="}]}
def raise_for_status(self):
pass
monkeypatch.setattr(gi.requests, "post", lambda *a, **k: FakeResp())
gi.run_gpt_image_generation("t1", {"prompt": "a cat"})
succeeded = [c for c in calls if c[0][1] == "succeeded"]
assert succeeded and succeeded[-1][1]["image_url"] == "/media/image/t1.png"

View File

@@ -0,0 +1,25 @@
import providers.nano_banana as nb
def test_missing_key_reports_failed(monkeypatch):
monkeypatch.delenv("GEMINI_API_KEY", raising=False)
calls = []
monkeypatch.setattr(nb, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
nb.run_nano_banana_generation("t1", {"prompt": "a cat"})
assert calls[-1][0][1] == "failed"
def test_success_extracts_inline_data(monkeypatch):
monkeypatch.setenv("GEMINI_API_KEY", "g-test")
calls = []
monkeypatch.setattr(nb, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
monkeypatch.setattr(nb, "save_b64_png", lambda tid, b64: "/media/image/t1.png")
class FakeResp:
status_code = 200
def json(self):
return {"candidates": [{"content": {"parts": [
{"inlineData": {"mimeType": "image/png", "data": "ZmFrZQ=="}}
]}}]}
monkeypatch.setattr(nb.requests, "post", lambda *a, **k: FakeResp())
nb.run_nano_banana_generation("t1", {"prompt": "a cat"})
assert [c for c in calls if c[0][1] == "succeeded"]

View File

@@ -0,0 +1,20 @@
import nas_client
def test_webhook_includes_image_url(monkeypatch):
captured = {}
def fake_post(payload):
captured.update(payload)
monkeypatch.setattr(nas_client, "_post", fake_post)
nas_client.webhook_update_task("t1", "succeeded", 100, "done", image_url="/media/image/t1.png")
assert captured["task_id"] == "t1"
assert captured["image_url"] == "/media/image/t1.png"
def test_webhook_omits_none_fields(monkeypatch):
captured = {}
monkeypatch.setattr(nas_client, "_post", lambda p: captured.update(p))
nas_client.webhook_update_task("t2", "processing", 10, "working")
assert "image_url" not in captured and "error" not in captured

View File

@@ -0,0 +1,15 @@
import worker
def test_dispatch_routes_to_provider(monkeypatch):
called = {}
monkeypatch.setattr(worker, "run_gpt_image_generation", lambda tid, p: called.setdefault("gpt", (tid, p)))
worker._dispatch({"job_type": "gpt_image_generation", "task_id": "t1", "params": {"prompt": "x"}})
assert called["gpt"][0] == "t1"
def test_dispatch_unknown_job_type_reports_failed(monkeypatch):
calls = []
monkeypatch.setattr(worker, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
worker._dispatch({"job_type": "midjourney_generation", "task_id": "t9", "params": {}})
assert calls[-1][0][1] == "failed"

View File

@@ -0,0 +1,84 @@
"""Redis BLPOP worker — queue:image-render → job_type dispatch → NAS webhook.
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
video-render worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환).
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import sys
import redis.asyncio as aioredis
from nas_client import webhook_update_task
from providers.gpt_image import run_gpt_image_generation
from providers.nano_banana import run_nano_banana_generation
from providers.flux import run_flux_generation
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
QUEUE_KEY = "queue:image-render"
PAUSED_KEY = "queue:paused"
# string names so `unittest.mock.patch` / `monkeypatch.setattr` on `worker.<name>`
# is correctly intercepted by getattr(sys.modules[__name__], ...)
_DISPATCH_TABLE = {
"gpt_image_generation": "run_gpt_image_generation",
"nano_banana_generation": "run_nano_banana_generation",
"flux_generation": "run_flux_generation",
}
def _dispatch(payload: dict) -> None:
"""payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap)."""
job_type = payload.get("job_type", "")
task_id = payload.get("task_id", "")
params = payload.get("params", {})
fn_name = _DISPATCH_TABLE.get(job_type)
if fn_name is None:
logger.error("unknown job_type=%s task=%s", job_type, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
return
try:
fn = getattr(sys.modules[__name__], fn_name)
except AttributeError:
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
return
fn(task_id, params)
async def worker_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
logger.info("image-render worker started (queue=%s)", QUEUE_KEY)
while True:
try:
paused = await redis.get(PAUSED_KEY)
if paused == b"1":
await asyncio.sleep(10)
continue
item = await redis.blpop(QUEUE_KEY, timeout=5)
if item is None:
continue
_, raw = item
try:
payload = json.loads(raw)
except json.JSONDecodeError:
logger.error("invalid queue payload: %r", raw[:200])
continue
await asyncio.to_thread(_dispatch, payload)
except asyncio.CancelledError:
logger.info("worker_loop cancelled")
raise
except Exception:
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
await asyncio.sleep(5)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(worker_loop())

View File

@@ -0,0 +1,11 @@
# Plan-B-Infra — task-watcher
# NAS Redis
REDIS_URL=redis://192.168.45.54:6379
# NAS stock holidays endpoint
STOCK_BASE_URL=http://192.168.45.54:18500
# 트레이딩 윈도우 (KST, HH:MM) — 이 시간대에만 queue:paused
TRADING_START=07:00
TRADING_END=16:30

View File

@@ -0,0 +1,16 @@
FROM python:3.12-slim-bookworm
ENV PYTHONUNBUFFERED=1
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates tzdata \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]

View File

@@ -0,0 +1,83 @@
# NSSM 자동 시작 설정 (SP-9)
Windows AI 머신 부팅 시 ai_trade(트레이딩) + WSL2 Docker(render workers + task-watcher) 자동 시작.
## 1. NSSM 다운로드
https://nssm.cc/download → nssm-2.24.zip → `C:\nssm\nssm.exe` 배치 (또는 PATH 등록).
## 2. ai_trade (Native Python, HIGH priority)
⚠️ spec의 signal_v2는 ai_trade로 rename됨. 경로/포트 확인.
```powershell
# 관리자 PowerShell
C:\nssm\nssm.exe install ai_trade "C:\Python312\python.exe" "-m uvicorn main:app --host 0.0.0.0 --port 8001"
C:\nssm\nssm.exe set ai_trade AppDirectory "C:\Users\jaeoh\Desktop\workspace\web-ai\ai_trade"
C:\nssm\nssm.exe set ai_trade Priority HIGH_PRIORITY_CLASS
C:\nssm\nssm.exe set ai_trade Start SERVICE_AUTO_START
C:\nssm\nssm.exe set ai_trade AppStdout "C:\Users\jaeoh\nssm-logs\ai_trade.log"
C:\nssm\nssm.exe set ai_trade AppStderr "C:\Users\jaeoh\nssm-logs\ai_trade.log"
```
(ai_trade의 실제 진입점이 main:app + port 8001인지 확인. 다르면 조정.)
## 3. WSL2 Docker (NORMAL priority — render workers + task-watcher)
```powershell
C:\nssm\nssm.exe install wsl_docker "C:\Windows\System32\wsl.exe" "-d Ubuntu-24.04 -- sh -c 'sudo service docker start && cd /workspace/web-ai/services && docker compose up -d'"
C:\nssm\nssm.exe set wsl_docker Priority NORMAL_PRIORITY_CLASS
C:\nssm\nssm.exe set wsl_docker Start SERVICE_AUTO_START
C:\nssm\nssm.exe set wsl_docker AppStdout "C:\Users\jaeoh\nssm-logs\wsl_docker.log"
```
⚠️ 변경점: Ubuntu-22.04 → **Ubuntu-24.04**, web-ai-services → **web-ai/services**. WSL 경로는 박재오 WSL 마운트 기준 (`/workspace`가 web-ai에 매핑되어 있으면 그대로, 아니면 `/mnt/c/Users/jaeoh/Desktop/workspace/web-ai/services`).
`sudo service docker start`가 비밀번호 요구하면 sudoers에 NOPASSWD 추가:
```bash
# WSL2 안
echo "$USER ALL=(ALL) NOPASSWD: /usr/sbin/service docker start" | sudo tee /etc/sudoers.d/docker-start
```
## 4. 서비스 시작 + 확인
```powershell
C:\nssm\nssm.exe start ai_trade
C:\nssm\nssm.exe start wsl_docker
# 상태 확인
C:\nssm\nssm.exe status ai_trade
C:\nssm\nssm.exe status wsl_docker
sc query ai_trade
```
## 5. 검증
```powershell
# ai_trade
curl http://localhost:8001/health # 또는 ai_trade의 실제 health endpoint
# WSL2 docker 컨테이너 (재부팅 후 자동 시작 확인)
wsl -d Ubuntu-24.04 -- docker ps
# insta-render, music-render, video-render, task-watcher 4개 Up 확인
```
## 6. 재부팅 테스트
Windows 재부팅 → 로그인 → 수동 조작 없이:
- ai_trade 서비스 자동 시작 (HIGH priority)
- WSL2 + Docker + 4 컨테이너 자동 시작 (NORMAL priority)
- task-watcher가 trading window에 queue:paused 토글 시작
## task-watcher 동작 확인
```bash
# WSL2
docker logs task-watcher --tail 20
# 기대: "task-watcher started" + mode 전환 로그 (trading/free)
# Redis 큐 상태 (NAS 또는 LAN)
docker exec redis redis-cli GET queue:paused
# 트레이딩 시간대(평일 07:00-16:30): "1"
# 그 외: (nil)
```

View File

@@ -0,0 +1,36 @@
"""task-watcher FastAPI entry — health + lifespan (watcher loop spawn)."""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
import watcher
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
watcher_task = asyncio.create_task(watcher.watcher_loop())
logger.info("task-watcher lifespan 시작")
try:
yield
finally:
watcher_task.cancel()
try:
await watcher_task
except asyncio.CancelledError:
pass
logger.info("task-watcher lifespan 종료")
app = FastAPI(lifespan=lifespan)
@app.get("/health")
def health():
return {"ok": True, "service": "task-watcher"}

View File

@@ -0,0 +1,57 @@
"""시간대 + 휴장일 기반 모드 판정 (idle 감지 생략 — 박재오 결정 2026-05-22).
trading: 비휴장 평일 07:0016:30 (장중) → queue:paused SET
free: 그 외 (장 전/후, 주말, 휴장) → queue:paused DEL
"""
from __future__ import annotations
import datetime as dt
import logging
import os
from typing import Set
from zoneinfo import ZoneInfo
import httpx
logger = logging.getLogger(__name__)
KST = ZoneInfo("Asia/Seoul")
STOCK_BASE_URL = os.getenv("STOCK_BASE_URL", "http://192.168.45.54:18500")
# 트레이딩 윈도우 (HH:MM, KST). .env로 조정 가능.
TRADING_START = os.getenv("TRADING_START", "07:00")
TRADING_END = os.getenv("TRADING_END", "16:30")
def _parse_hhmm(s: str) -> dt.time:
hh, mm = s.split(":")
return dt.time(int(hh), int(mm))
def current_mode(now: dt.datetime, holidays: Set[str]) -> str:
"""now(KST aware) + holidays(ISO date set) → 'trading' | 'free'."""
# 주말 (토=5, 일=6)
if now.weekday() >= 5:
return "free"
# 휴장일
if now.date().isoformat() in holidays:
return "free"
# 트레이딩 윈도우 [start, end)
start = _parse_hhmm(TRADING_START)
end = _parse_hhmm(TRADING_END)
t = now.timetz().replace(tzinfo=None)
if start <= t < end:
return "trading"
return "free"
def fetch_holidays() -> Set[str]:
"""NAS stock /api/stock/holidays 조회. 실패 시 빈 set (안전 — free로 판정)."""
try:
r = httpx.get(f"{STOCK_BASE_URL}/api/stock/holidays", timeout=10.0)
if r.status_code == 200:
return set(r.json().get("holidays", []))
logger.warning("holidays fetch returned %d", r.status_code)
except Exception:
logger.exception("holidays fetch 실패")
return set()

View File

@@ -0,0 +1,5 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
redis>=5.0
httpx>=0.27
pytest>=8.0

View File

View File

@@ -0,0 +1,44 @@
"""current_mode — 시간대 + 휴장일 판정 (순수 함수)."""
import datetime as dt
from zoneinfo import ZoneInfo
from mode import current_mode
KST = ZoneInfo("Asia/Seoul")
HOLIDAYS = {"2026-05-25"} # 가상 휴장일 (월요일)
def _kst(y, m, d, hh, mm):
return dt.datetime(y, m, d, hh, mm, tzinfo=KST)
def test_weekday_trading_hours_is_trading():
# 2026-05-22 금요일 10:00 — 트레이딩 시간대
assert current_mode(_kst(2026, 5, 22, 10, 0), HOLIDAYS) == "trading"
def test_weekday_before_open_is_free():
# 평일 06:00 — 장 전
assert current_mode(_kst(2026, 5, 22, 6, 0), HOLIDAYS) == "free"
def test_weekday_after_close_is_free():
# 평일 17:00 — 장 마감 후
assert current_mode(_kst(2026, 5, 22, 17, 0), HOLIDAYS) == "free"
def test_weekend_is_free():
# 2026-05-23 토요일 10:00
assert current_mode(_kst(2026, 5, 23, 10, 0), HOLIDAYS) == "free"
def test_holiday_weekday_is_free():
# 2026-05-25 월요일이지만 휴장일 → 트레이딩 시간대라도 free
assert current_mode(_kst(2026, 5, 25, 10, 0), HOLIDAYS) == "free"
def test_trading_boundary_inclusive_start_exclusive_end():
# 07:00 정각 = 트레이딩 시작, 16:30 정각 = 마감 (16:30은 free)
assert current_mode(_kst(2026, 5, 22, 7, 0), HOLIDAYS) == "trading"
assert current_mode(_kst(2026, 5, 22, 16, 29), HOLIDAYS) == "trading"
assert current_mode(_kst(2026, 5, 22, 16, 30), HOLIDAYS) == "free"

View File

@@ -0,0 +1,59 @@
"""30초마다 current_mode 판정 → queue:paused 토글.
trading → SET queue:paused 1 EX 600 (10분 TTL — watcher 죽어도 자동 해제)
free → DEL queue:paused
holidays는 1시간마다 refresh (매 loop fetch 부하 회피).
"""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
import os
from zoneinfo import ZoneInfo
import redis.asyncio as aioredis
from mode import current_mode, fetch_holidays, KST
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
PAUSED_KEY = "queue:paused"
LOOP_INTERVAL = 30 # 초
HOLIDAYS_REFRESH = 3600 # 1시간
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
async def watcher_loop():
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
holidays = fetch_holidays()
last_holiday_refresh = dt.datetime.now(KST)
last_mode = None
logger.info("task-watcher started (trading window 토글)")
while True:
try:
now = dt.datetime.now(KST)
# holidays 주기적 refresh
if (now - last_holiday_refresh).total_seconds() >= HOLIDAYS_REFRESH:
holidays = fetch_holidays()
last_holiday_refresh = now
mode = current_mode(now, holidays)
if mode == "trading":
await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL)
else:
await redis.delete(PAUSED_KEY)
if mode != last_mode:
logger.info("mode 전환: %s%s (paused=%s)", last_mode, mode, mode == "trading")
last_mode = mode
await asyncio.sleep(LOOP_INTERVAL)
except asyncio.CancelledError:
logger.info("watcher_loop cancelled")
raise
except Exception:
logger.exception("watcher_loop iteration 실패, 30초 후 재시도")
await asyncio.sleep(LOOP_INTERVAL)

View File

@@ -10,14 +10,12 @@ INTERNAL_API_KEY=__copy_from_nas_dotenv__
# Sora 2 (OpenAI)
OPENAI_API_KEY=__paste_openai_key__
# Veo 3.1 (Google Vertex AI)
GOOGLE_PROJECT_ID=__paste_gcp_project_id__
GOOGLE_LOCATION=us-central1
GOOGLE_GCS_BUCKET=__paste_gcs_bucket_name__
GOOGLE_APPLICATION_CREDENTIALS=/app/keys/gcp-sa.json
# Veo (Google Gemini API — ai.google.dev. Vertex AI 경로 아님, GCS bucket 불필요)
GEMINI_API_KEY=__paste_gemini_key__
# Kling (PiAPI gateway)
PIAPI_API_KEY=__paste_piapi_key__
# Kling (Native KlingAI — JWT auth with Access Key + Secret Key)
KLING_ACCESS_KEY=__paste_kling_access_key__
KLING_SECRET_KEY=__paste_kling_secret_key__
# Seedance 2.0 (BytePlus)
SEEDANCE_API_KEY=__paste_seedance_key__

View File

@@ -1,6 +1,7 @@
"""Kling AI video generation — PiAPI gateway 경유.
"""Kling AI video generation — Native KlingAI API (api.klingai.com).
POST https://api.piapi.ai/api/v1/task → GET /api/v1/task/{id} 폴링 → data.output.video_url 다운로드.
JWT auth: HS256, payload {iss: ACCESS_KEY, exp: now+1800, nbf: now-5}.
POST /v1/videos/text2video → GET /v1/videos/text2video/{task_id} → task_result.videos[0].url 다운로드.
"""
from __future__ import annotations
@@ -9,26 +10,41 @@ import os
import time
from typing import Optional
import jwt as pyjwt
import requests
from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
PIAPI_BASE_URL = "https://api.piapi.ai/api/v1"
KLING_BASE_URL = "https://api.klingai.com"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 10 # Kling은 30~180초
POLL_MAX_ATTEMPTS = 60 # 최대 10분
POLL_INTERVAL = 10
POLL_MAX_ATTEMPTS = 60 # 최대 ~10분
DEFAULT_VERSION = "2.6"
DEFAULT_MODEL = "kling-v1-6"
JWT_EXP_SECONDS = 1800 # 30분
JWT_NBF_OFFSET = -5 # 5초 뒤로
def _generate_jwt() -> Optional[str]:
access_key = os.getenv("KLING_ACCESS_KEY", "")
secret_key = os.getenv("KLING_SECRET_KEY", "")
if not access_key or not secret_key:
return None
now = int(time.time())
headers = {"alg": "HS256", "typ": "JWT"}
payload = {"iss": access_key, "exp": now + JWT_EXP_SECONDS, "nbf": now + JWT_NBF_OFFSET}
return pyjwt.encode(payload, secret_key, algorithm="HS256", headers=headers)
def _headers() -> dict:
api_key = os.getenv("PIAPI_API_KEY", "")
token = _generate_jwt()
return {
"x-api-key": api_key,
"Authorization": f"Bearer {token}" if token else "",
"Content-Type": "application/json",
}
@@ -36,80 +52,83 @@ def _headers() -> dict:
def run_kling_generation(task_id: str, params: dict) -> None:
"""Kling으로 영상 생성 → mp4 → NAS SMB → webhook."""
try:
if not os.getenv("PIAPI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="PIAPI_API_KEY 미설정")
if not os.getenv("KLING_ACCESS_KEY") or not os.getenv("KLING_SECRET_KEY"):
webhook_update_task(task_id, "failed", 0, "",
error="KLING_ACCESS_KEY 또는 KLING_SECRET_KEY 미설정")
return
webhook_update_task(task_id, "processing", 5, "Kling API 호출 중...")
input_obj = {
"prompt": params["prompt"][:2500],
"duration": params.get("duration", 5),
"aspect_ratio": params.get("aspect_ratio", "16:9"),
"mode": params.get("mode", "std"),
"version": params.get("model") or DEFAULT_VERSION,
}
if params.get("negative_prompt"):
input_obj["negative_prompt"] = params["negative_prompt"][:2500]
if params.get("cfg_scale") is not None:
input_obj["cfg_scale"] = str(params["cfg_scale"])
if params.get("image_url"):
input_obj["image_url"] = params["image_url"]
# image_url 있으면 image2video, 없으면 text2video
is_image2video = bool(params.get("image_url"))
endpoint_path = "/v1/videos/image2video" if is_image2video else "/v1/videos/text2video"
body = {
"model": "kling",
"task_type": "video_generation",
"input": input_obj,
"config": {"service_mode": "public"},
"model_name": params.get("model") or DEFAULT_MODEL,
"prompt": params["prompt"][:2500],
"duration": str(params.get("duration", 5)),
"aspect_ratio": params.get("aspect_ratio", "16:9"),
"mode": params.get("mode", "std"),
}
if params.get("negative_prompt"):
body["negative_prompt"] = params["negative_prompt"][:2500]
if params.get("cfg_scale") is not None:
body["cfg_scale"] = float(params["cfg_scale"])
if is_image2video:
body["image"] = params["image_url"]
resp = requests.post(f"{PIAPI_BASE_URL}/task", headers=_headers(), json=body, timeout=30)
resp = requests.post(f"{KLING_BASE_URL}{endpoint_path}",
headers=_headers(), json=body, timeout=30)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Kling/PiAPI 오류: {resp.status_code} {resp.text[:300]}")
error=f"Kling API 오류: {resp.status_code} {resp.text[:300]}")
return
body_json = resp.json()
if body_json.get("code") != 200:
if body_json.get("code") != 0:
webhook_update_task(task_id, "failed", 0, "",
error=f"Kling/PiAPI 거부: {body_json.get('message', '?')}")
error=f"Kling API 거부: {body_json.get('message', '?')}")
return
piapi_task_id = (body_json.get("data") or {}).get("task_id", "")
if not piapi_task_id:
webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 응답에 task_id 없음")
kling_task_id = (body_json.get("data") or {}).get("task_id", "")
if not kling_task_id:
webhook_update_task(task_id, "failed", 0, "", error="Kling 응답에 task_id 없음")
return
webhook_update_task(task_id, "processing", 15, "Kling 작업 등록됨")
# 폴링 — GET /task/{id}
# 폴링 — GET /v1/videos/{text2video|image2video}/{task_id}
video_url = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.get(f"{PIAPI_BASE_URL}/task/{piapi_task_id}",
fetch = requests.get(f"{KLING_BASE_URL}{endpoint_path}/{kling_task_id}",
headers=_headers(), timeout=30)
if fetch.status_code != 200:
continue
fd = fetch.json()
data = fd.get("data", {})
status = data.get("status", "")
if fd.get("code") != 0:
continue
data = fd.get("data") or {}
status = data.get("task_status", "")
scaled = min(15 + int((attempt / POLL_MAX_ATTEMPTS) * 65), 79)
webhook_update_task(task_id, "processing", scaled, f"Kling 생성 중... ({status})")
if status == "Completed":
video_url = (data.get("output") or {}).get("video_url", "")
if status == "succeed":
videos = ((data.get("task_result") or {}).get("videos") or [])
if videos:
video_url = videos[0].get("url", "")
break
elif status in ("Failed", "failed"):
err = (data.get("error") or {}).get("message", "Kling 작업 실패")
elif status == "failed":
err = data.get("task_status_msg") or "Kling 작업 실패"
webhook_update_task(task_id, "failed", 0, "", error=err)
return
# Pending/Processing/Staged → 계속 폴링
# submitted/processing → 계속 폴링
else:
webhook_update_task(task_id, "failed", 0, "", error="Kling 폴링 timeout (10분)")
return
if not video_url:
webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video_url 없음")
webhook_update_task(task_id, "failed", 0, "", error="Kling 완료했으나 video url 없음")
return
webhook_update_task(task_id, "processing", 85, "Kling 결과 다운로드 중...")
@@ -117,6 +136,7 @@ def run_kling_generation(task_id: str, params: dict) -> None:
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
# Kling 결과 url은 일반적으로 인증 불필요 (signed URL)
dl = requests.get(video_url, stream=True, timeout=300)
dl.raise_for_status()
with open(file_path, "wb") as f:
@@ -127,7 +147,7 @@ def run_kling_generation(task_id: str, params: dict) -> None:
webhook_update_task(task_id, "succeeded", 100, "Kling 생성 완료", video_url=local_url)
except requests.Timeout:
webhook_update_task(task_id, "failed", 0, "", error="Kling/PiAPI 타임아웃")
webhook_update_task(task_id, "failed", 0, "", error="Kling API 타임아웃")
except Exception as e:
logger.exception("Kling generation error task=%s", task_id)
webhook_update_task(task_id, "failed", 0, "", error=str(e))

View File

@@ -1,13 +1,13 @@
"""Veo 3.1 video generation — Google Vertex AI.
"""Veo 3.1 video generation — Gemini API (ai.google.dev).
POST .../models/{MODEL}:predictLongRunning → POST :fetchPredictOperation 폴링 →
결과 gs://bucket/path/sample_0.mp4 → google-cloud-storage로 다운로드 → NAS SMB.
POST https://generativelanguage.googleapis.com/v1beta/models/{MODEL}:predictLongRunning
GET https://generativelanguage.googleapis.com/v1beta/{operation_name}
→ done=true 시 response.generateVideoResponse.generatedSamples[0].video.uri 다운로드
"""
from __future__ import annotations
import logging
import os
import subprocess
import time
from typing import Optional
@@ -17,100 +17,59 @@ from nas_client import webhook_update_task
logger = logging.getLogger(__name__)
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
VIDEO_MEDIA_ROOT = os.getenv("VIDEO_MEDIA_ROOT", "/mnt/nas/webpage/data/video")
VIDEO_MEDIA_URL_PREFIX = os.getenv("VIDEO_MEDIA_URL_PREFIX", "/media/video")
POLL_INTERVAL = 12 # Veo는 30~120초 소요
POLL_MAX_ATTEMPTS = 50 # 최대 ~10분
POLL_INTERVAL = 10 # Veo는 30~120초 소요
POLL_MAX_ATTEMPTS = 60 # 최대 ~10분
DEFAULT_MODEL = "veo-3.1-fast-generate-001"
DEFAULT_MODEL = "veo-3.1-fast-generate-preview"
def _gcloud_access_token() -> Optional[str]:
"""GOOGLE_APPLICATION_CREDENTIALS service account JSON으로 access token 발행.
google-auth가 컨테이너 안에서 자동 인증 — Bearer 토큰을 GCS SDK가 직접 사용.
REST API 호출용으로는 명시적 token이 필요 → google.auth로 발행.
"""
try:
from google.auth import default as google_default_auth
from google.auth.transport.requests import Request as GoogleAuthRequest
credentials, _ = google_default_auth(
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
credentials.refresh(GoogleAuthRequest())
return credentials.token
except Exception:
logger.exception("Google credentials refresh 실패")
return None
def _download_gcs(gcs_uri: str, local_path: str) -> bool:
"""gs://bucket/path/file.mp4 → local_path 다운로드. 성공 여부 반환."""
try:
from google.cloud import storage as gcs_storage
if not gcs_uri.startswith("gs://"):
return False
without_scheme = gcs_uri[len("gs://"):]
bucket_name, blob_path = without_scheme.split("/", 1)
client = gcs_storage.Client(project=os.getenv("GOOGLE_PROJECT_ID"))
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_path)
blob.download_to_filename(local_path)
return True
except Exception:
logger.exception("GCS 다운로드 실패: %s", gcs_uri)
return False
def _headers() -> dict:
api_key = os.getenv("GEMINI_API_KEY", "")
return {
"x-goog-api-key": api_key,
"Content-Type": "application/json",
}
def run_veo_generation(task_id: str, params: dict) -> None:
"""Veo 3.1로 영상 생성 → GCS → NAS SMB → webhook."""
"""Veo로 영상 생성 → mp4 → NAS SMB → webhook."""
try:
project_id = os.getenv("GOOGLE_PROJECT_ID", "")
location = os.getenv("GOOGLE_LOCATION", "us-central1")
gcs_bucket = os.getenv("GOOGLE_GCS_BUCKET", "")
if not project_id or not gcs_bucket:
webhook_update_task(task_id, "failed", 0, "",
error="GOOGLE_PROJECT_ID 또는 GOOGLE_GCS_BUCKET 미설정")
if not os.getenv("GEMINI_API_KEY"):
webhook_update_task(task_id, "failed", 0, "", error="GEMINI_API_KEY 미설정 (Windows .env)")
return
token = _gcloud_access_token()
if not token:
webhook_update_task(task_id, "failed", 0, "",
error="Google access token 발행 실패 (서비스 계정 JSON 확인)")
return
webhook_update_task(task_id, "processing", 5, "Veo API 호출 중...")
webhook_update_task(task_id, "processing", 5, "Veo (Gemini API) 호출 중...")
model_id = params.get("model") or DEFAULT_MODEL
endpoint_base = (
f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}"
f"/locations/{location}/publishers/google/models/{model_id}"
)
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
body = {
"instances": [{"prompt": params["prompt"]}],
"parameters": {
"storageUri": f"gs://{gcs_bucket}/veo/{task_id}/",
"sampleCount": 1,
"aspectRatio": params.get("aspect_ratio") or "16:9",
},
}
# numberOfVideos는 일부 모델(veo-3.0-fast 등) 미지원 — 호출자 명시 시에만 추가
if params.get("number_of_videos"):
body["parameters"]["numberOfVideos"] = int(params["number_of_videos"])
if params.get("duration"):
body["parameters"]["duration"] = params["duration"]
body["parameters"]["durationSeconds"] = int(params["duration"])
if params.get("resolution"):
body["parameters"]["resolution"] = params["resolution"]
if params.get("negative_prompt"):
body["parameters"]["negativePrompt"] = params["negative_prompt"]
if params.get("person_generation"):
body["parameters"]["personGeneration"] = params["person_generation"]
resp = requests.post(f"{endpoint_base}:predictLongRunning",
headers=headers, json=body, timeout=30)
resp = requests.post(
f"{GEMINI_BASE_URL}/models/{model_id}:predictLongRunning",
headers=_headers(), json=body, timeout=30,
)
if resp.status_code != 200:
webhook_update_task(task_id, "failed", 0, "",
error=f"Veo API 오류: {resp.status_code} {resp.text[:300]}")
error=f"Veo Gemini API 오류: {resp.status_code} {resp.text[:300]}")
return
op_name = resp.json().get("name", "")
@@ -118,16 +77,15 @@ def run_veo_generation(task_id: str, params: dict) -> None:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 operation name 없음")
return
webhook_update_task(task_id, "processing", 15, f"Veo 작업 시작됨")
webhook_update_task(task_id, "processing", 15, "Veo 작업 시작됨")
# 폴링 — fetchPredictOperation
gcs_uri = None
# 폴링 — GET /v1beta/{operation_name}
video_uri = None
for attempt in range(POLL_MAX_ATTEMPTS):
time.sleep(POLL_INTERVAL)
fetch = requests.post(
f"{endpoint_base}:fetchPredictOperation",
headers=headers,
json={"operationName": op_name},
fetch = requests.get(
f"{GEMINI_BASE_URL}/{op_name}",
headers=_headers(),
timeout=30,
)
if fetch.status_code != 200:
@@ -142,29 +100,34 @@ def run_veo_generation(task_id: str, params: dict) -> None:
webhook_update_task(task_id, "failed", 0, "",
error=f"Veo 작업 실패: {fd['error'].get('message','?')}")
return
videos = (fd.get("response") or {}).get("videos") or []
if not videos:
webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 videos 비어 있음")
# response.generateVideoResponse.generatedSamples[0].video.uri
response = fd.get("response") or {}
gen = response.get("generateVideoResponse") or {}
samples = gen.get("generatedSamples") or []
if not samples:
webhook_update_task(task_id, "failed", 0, "", error="Veo 완료했으나 generatedSamples 비어 있음")
return
gcs_uri = videos[0].get("gcsUri", "")
video_uri = (samples[0].get("video") or {}).get("uri", "")
break
else:
webhook_update_task(task_id, "failed", 0, "", error="Veo 폴링 timeout (10분)")
return
if not gcs_uri:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 gcsUri 없음")
if not video_uri:
webhook_update_task(task_id, "failed", 0, "", error="Veo 응답에 video.uri 없음")
return
webhook_update_task(task_id, "processing", 85, "GCS에서 mp4 다운로드 중...")
webhook_update_task(task_id, "processing", 85, "Veo 결과 다운로드 중...")
filename = f"{task_id}.mp4"
os.makedirs(VIDEO_MEDIA_ROOT, exist_ok=True)
file_path = os.path.join(VIDEO_MEDIA_ROOT, filename)
ok = _download_gcs(gcs_uri, file_path)
if not ok:
webhook_update_task(task_id, "failed", 0, "", error=f"GCS 다운로드 실패: {gcs_uri}")
return
# 다운로드 — x-goog-api-key 헤더 그대로 사용 (Gemini API가 인증 처리)
dl = requests.get(video_uri, headers=_headers(), stream=True, timeout=300)
dl.raise_for_status()
with open(file_path, "wb") as f:
for chunk in dl.iter_content(chunk_size=8192):
f.write(chunk)
local_url = f"{VIDEO_MEDIA_URL_PREFIX}/{filename}"
webhook_update_task(task_id, "succeeded", 100, "Veo 생성 완료", video_url=local_url)

View File

@@ -4,7 +4,7 @@ requests==2.32.3
redis>=5.0
httpx>=0.27
openai>=1.50.0
google-cloud-storage>=2.18.0
PyJWT>=2.8.0
pytest>=8.0
pytest-asyncio>=0.24
respx>=0.21