Compare commits
11 Commits
53a0657027
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| cb70226f42 | |||
| de24bae984 | |||
| 0e6c893b4e | |||
| fb80973e38 | |||
| 31b0e7dbc4 | |||
| 6169f48eb8 | |||
| 27a6df6cff | |||
| 803fdb6278 | |||
| 77e21b54e6 | |||
| 4d0c89ce79 | |||
| 4b60ab34c3 |
@@ -76,3 +76,49 @@ services:
|
|||||||
interval: 60s
|
interval: 60s
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 3
|
retries: 3
|
||||||
|
|
||||||
|
task-watcher:
|
||||||
|
build:
|
||||||
|
context: ./task-watcher
|
||||||
|
container_name: task-watcher
|
||||||
|
restart: unless-stopped
|
||||||
|
ports:
|
||||||
|
- "18713:8000"
|
||||||
|
environment:
|
||||||
|
- TZ=Asia/Seoul
|
||||||
|
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
|
||||||
|
- STOCK_BASE_URL=${STOCK_BASE_URL:-http://192.168.45.54:18500}
|
||||||
|
- TRADING_START=${TRADING_START:-07:00}
|
||||||
|
- TRADING_END=${TRADING_END:-16:30}
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
|
||||||
|
interval: 60s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 3
|
||||||
|
|
||||||
|
image-render:
|
||||||
|
build:
|
||||||
|
context: ./image-render
|
||||||
|
container_name: image-render
|
||||||
|
restart: unless-stopped
|
||||||
|
ports:
|
||||||
|
- "18714:8000"
|
||||||
|
environment:
|
||||||
|
- TZ=Asia/Seoul
|
||||||
|
- REDIS_URL=${REDIS_URL:-redis://192.168.45.54:6379}
|
||||||
|
- NAS_BASE_URL=${NAS_BASE_URL:-http://192.168.45.54:18802}
|
||||||
|
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-}
|
||||||
|
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
|
||||||
|
- GEMINI_API_KEY=${GEMINI_API_KEY:-}
|
||||||
|
- COMFYUI_URL=${COMFYUI_URL:-http://host.docker.internal:8188}
|
||||||
|
- FLUX_BLOCK_TRADING_HOURS=${FLUX_BLOCK_TRADING_HOURS:-1}
|
||||||
|
- IMAGE_MEDIA_ROOT=${IMAGE_MEDIA_ROOT:-/mnt/nas/webpage/data/image}
|
||||||
|
extra_hosts:
|
||||||
|
- "host.docker.internal:host-gateway"
|
||||||
|
volumes:
|
||||||
|
- /mnt/nas/webpage/data/image:/mnt/nas/webpage/data/image
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"]
|
||||||
|
interval: 60s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 3
|
||||||
|
|||||||
16
services/image-render/Dockerfile
Normal file
16
services/image-render/Dockerfile
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
FROM python:3.12-slim-bookworm
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
ca-certificates \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
EXPOSE 8000
|
||||||
|
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
|
||||||
18
services/image-render/env.example
Normal file
18
services/image-render/env.example
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
# Redis (NAS)
|
||||||
|
REDIS_URL=redis://192.168.45.54:6379
|
||||||
|
|
||||||
|
# NAS image-lab webhook
|
||||||
|
NAS_BASE_URL=http://192.168.45.54:18802
|
||||||
|
INTERNAL_API_KEY=replace-me
|
||||||
|
|
||||||
|
# API provider keys (worker reports failed if missing)
|
||||||
|
OPENAI_API_KEY=
|
||||||
|
GEMINI_API_KEY=
|
||||||
|
# Seedance key not used by image-render
|
||||||
|
|
||||||
|
# FLUX local
|
||||||
|
COMFYUI_URL=http://host.docker.internal:8188
|
||||||
|
FLUX_BLOCK_TRADING_HOURS=1
|
||||||
|
|
||||||
|
# NAS SMB mount target (image-render writes to this, NAS reads via /media/image/)
|
||||||
|
IMAGE_MEDIA_ROOT=/mnt/nas/webpage/data/image
|
||||||
36
services/image-render/main.py
Normal file
36
services/image-render/main.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
"""image-render FastAPI entry — health + lifespan (worker loop spawn)."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
|
||||||
|
import worker
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
worker_task = asyncio.create_task(worker.worker_loop())
|
||||||
|
logger.info("image-render lifespan 시작")
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
worker_task.cancel()
|
||||||
|
try:
|
||||||
|
await worker_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
logger.info("image-render lifespan 종료")
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI(lifespan=lifespan)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
def health():
|
||||||
|
return {"ok": True, "service": "image-render"}
|
||||||
54
services/image-render/nas_client.py
Normal file
54
services/image-render/nas_client.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
"""NAS webhook 어댑터 — Windows worker → NAS image-lab HTTP 위임.
|
||||||
|
|
||||||
|
video-render nas_client 복제 (call-time os.getenv으로 테스트 격리).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_TIMEOUT = 10.0
|
||||||
|
|
||||||
|
|
||||||
|
def _post(payload: Dict[str, Any]) -> None:
|
||||||
|
nas_base_url = os.getenv("NAS_BASE_URL", "http://192.168.45.54:18802")
|
||||||
|
internal_api_key = os.getenv("INTERNAL_API_KEY", "")
|
||||||
|
url = f"{nas_base_url}/api/internal/image/update"
|
||||||
|
try:
|
||||||
|
r = httpx.post(
|
||||||
|
url,
|
||||||
|
headers={"X-Internal-Key": internal_api_key},
|
||||||
|
json=payload,
|
||||||
|
timeout=_TIMEOUT,
|
||||||
|
)
|
||||||
|
if r.status_code != 200:
|
||||||
|
logger.error("webhook %s returned %d: %s",
|
||||||
|
payload.get("task_id"), r.status_code, r.text[:200])
|
||||||
|
except Exception:
|
||||||
|
logger.exception("webhook %s 호출 실패", payload.get("task_id"))
|
||||||
|
|
||||||
|
|
||||||
|
def webhook_update_task(
|
||||||
|
task_id: str,
|
||||||
|
status: str,
|
||||||
|
progress: int,
|
||||||
|
message: str = "",
|
||||||
|
image_url: Optional[str] = None,
|
||||||
|
error: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
payload: Dict[str, Any] = {
|
||||||
|
"task_id": task_id,
|
||||||
|
"status": status,
|
||||||
|
"progress": progress,
|
||||||
|
"message": message,
|
||||||
|
}
|
||||||
|
if image_url is not None:
|
||||||
|
payload["image_url"] = image_url
|
||||||
|
if error is not None:
|
||||||
|
payload["error"] = error
|
||||||
|
_post(payload)
|
||||||
0
services/image-render/providers/__init__.py
Normal file
0
services/image-render/providers/__init__.py
Normal file
18
services/image-render/providers/_media.py
Normal file
18
services/image-render/providers/_media.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
"""b64 이미지 → NAS SMB 경로 저장 → /media/image URL 반환."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
IMAGE_MEDIA_ROOT = os.getenv("IMAGE_MEDIA_ROOT", "/mnt/nas/webpage/data/image")
|
||||||
|
IMAGE_MEDIA_URL_PREFIX = os.getenv("IMAGE_MEDIA_URL_PREFIX", "/media/image")
|
||||||
|
|
||||||
|
|
||||||
|
def save_b64_png(task_id: str, b64_data: str) -> str:
|
||||||
|
os.makedirs(IMAGE_MEDIA_ROOT, exist_ok=True)
|
||||||
|
fname = f"{task_id}-{uuid.uuid4().hex[:8]}.png"
|
||||||
|
path = os.path.join(IMAGE_MEDIA_ROOT, fname)
|
||||||
|
with open(path, "wb") as f:
|
||||||
|
f.write(base64.b64decode(b64_data))
|
||||||
|
return f"{IMAGE_MEDIA_URL_PREFIX}/{fname}"
|
||||||
79
services/image-render/providers/flux.py
Normal file
79
services/image-render/providers/flux.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
"""FLUX 로컬 — ComfyUI HTTP API.
|
||||||
|
|
||||||
|
POST {COMFYUI_URL}/prompt (workflow JSON) → prompt_id
|
||||||
|
GET {COMFYUI_URL}/history/{prompt_id} → outputs → image filename
|
||||||
|
GET {COMFYUI_URL}/view?filename=... → PNG bytes → b64
|
||||||
|
|
||||||
|
워크플로우 JSON은 `flux_workflow.json` (ComfyUI UI에서 "Save (API Format)"로 export, CLIPTextEncode 노드 text를 "%PROMPT%"로 수동 치환). 박재오 산출물.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import base64, json, logging, os, time
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
from providers._media import save_b64_png
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
COMFYUI_URL = os.getenv("COMFYUI_URL", "http://127.0.0.1:8188")
|
||||||
|
WORKFLOW_PATH = os.path.join(os.path.dirname(__file__), "flux_workflow.json")
|
||||||
|
POLL_INTERVAL = 2
|
||||||
|
POLL_MAX = 120
|
||||||
|
|
||||||
|
|
||||||
|
def _is_trading_hours() -> bool:
|
||||||
|
kst = timezone(timedelta(hours=9))
|
||||||
|
now = datetime.now(kst)
|
||||||
|
if now.weekday() >= 5:
|
||||||
|
return False
|
||||||
|
return (now.hour, now.minute) >= (9, 0) and (now.hour, now.minute) <= (15, 30)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_workflow(prompt: str, size: str) -> dict:
|
||||||
|
with open(WORKFLOW_PATH, encoding="utf-8") as f:
|
||||||
|
wf = json.load(f)
|
||||||
|
# CLIPTextEncode 노드의 text를 prompt로 치환 (workflow에 "%PROMPT%" placeholder 사용)
|
||||||
|
raw = json.dumps(wf).replace("%PROMPT%", prompt.replace('"', "'"))
|
||||||
|
return json.loads(raw)
|
||||||
|
|
||||||
|
|
||||||
|
def _submit_prompt(workflow: dict) -> str:
|
||||||
|
r = requests.post(f"{COMFYUI_URL}/prompt", json={"prompt": workflow}, timeout=30)
|
||||||
|
r.raise_for_status()
|
||||||
|
return r.json()["prompt_id"]
|
||||||
|
|
||||||
|
|
||||||
|
def _poll_image_b64(prompt_id: str):
|
||||||
|
for _ in range(POLL_MAX):
|
||||||
|
h = requests.get(f"{COMFYUI_URL}/history/{prompt_id}", timeout=10)
|
||||||
|
data = h.json().get(prompt_id)
|
||||||
|
if data and data.get("outputs"):
|
||||||
|
for node_out in data["outputs"].values():
|
||||||
|
for img in node_out.get("images", []):
|
||||||
|
view = requests.get(f"{COMFYUI_URL}/view",
|
||||||
|
params={"filename": img["filename"], "subfolder": img.get("subfolder", ""), "type": img.get("type", "output")},
|
||||||
|
timeout=30)
|
||||||
|
view.raise_for_status()
|
||||||
|
return base64.b64encode(view.content).decode()
|
||||||
|
time.sleep(POLL_INTERVAL)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def run_flux_generation(task_id: str, params: dict) -> None:
|
||||||
|
try:
|
||||||
|
if os.getenv("FLUX_BLOCK_TRADING_HOURS") == "1" and _is_trading_hours():
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="장중 GPU 보호 — FLUX 거부 (API provider 사용 권장)")
|
||||||
|
return
|
||||||
|
webhook_update_task(task_id, "processing", 10, "FLUX (ComfyUI) 생성 중...")
|
||||||
|
wf = _load_workflow(params["prompt"], params.get("size") or "1024x1024")
|
||||||
|
pid = _submit_prompt(wf)
|
||||||
|
b64 = _poll_image_b64(pid)
|
||||||
|
if not b64:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="ComfyUI 타임아웃 또는 출력 없음")
|
||||||
|
return
|
||||||
|
url = save_b64_png(task_id, b64)
|
||||||
|
webhook_update_task(task_id, "succeeded", 100, "완료", image_url=url)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("flux task=%s 실패", task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=str(e))
|
||||||
47
services/image-render/providers/gpt_image.py
Normal file
47
services/image-render/providers/gpt_image.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
"""GPT Image 2.0 — OpenAI Images API.
|
||||||
|
|
||||||
|
POST https://api.openai.com/v1/images/generations
|
||||||
|
body {model:"gpt-image-1", prompt, size, n:1} → data[0].b64_json
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
from providers._media import save_b64_png
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
OPENAI_URL = "https://api.openai.com/v1/images/generations"
|
||||||
|
DEFAULT_MODEL = "gpt-image-1"
|
||||||
|
|
||||||
|
|
||||||
|
def run_gpt_image_generation(task_id: str, params: dict) -> None:
|
||||||
|
try:
|
||||||
|
if not os.getenv("OPENAI_API_KEY"):
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="OPENAI_API_KEY 미설정 (Windows .env)")
|
||||||
|
return
|
||||||
|
webhook_update_task(task_id, "processing", 10, "GPT Image 호출 중...")
|
||||||
|
body = {
|
||||||
|
"model": params.get("model") or DEFAULT_MODEL,
|
||||||
|
"prompt": params["prompt"],
|
||||||
|
"size": params.get("size") or "1024x1024",
|
||||||
|
"n": 1,
|
||||||
|
}
|
||||||
|
resp = requests.post(
|
||||||
|
OPENAI_URL,
|
||||||
|
headers={"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", "Content-Type": "application/json"},
|
||||||
|
json=body,
|
||||||
|
timeout=120,
|
||||||
|
)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"OpenAI {resp.status_code}: {resp.text[:200]}")
|
||||||
|
return
|
||||||
|
b64 = resp.json()["data"][0]["b64_json"]
|
||||||
|
url = save_b64_png(task_id, b64)
|
||||||
|
webhook_update_task(task_id, "succeeded", 100, "완료", image_url=url)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("gpt_image task=%s 실패", task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=str(e))
|
||||||
52
services/image-render/providers/nano_banana.py
Normal file
52
services/image-render/providers/nano_banana.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
"""Nano Banana — Gemini 2.5 Flash Image (generativelanguage API).
|
||||||
|
|
||||||
|
POST /v1beta/models/{MODEL}:generateContent
|
||||||
|
→ candidates[0].content.parts[*].inlineData.data (b64 png)
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging, os
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
from providers._media import save_b64_png
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
GEMINI_BASE = "https://generativelanguage.googleapis.com/v1beta"
|
||||||
|
DEFAULT_MODEL = "gemini-2.5-flash-image"
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_b64(data: dict):
|
||||||
|
for cand in data.get("candidates", []):
|
||||||
|
for part in cand.get("content", {}).get("parts", []):
|
||||||
|
inline = part.get("inlineData") or part.get("inline_data")
|
||||||
|
if inline and inline.get("data"):
|
||||||
|
return inline["data"]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def run_nano_banana_generation(task_id: str, params: dict) -> None:
|
||||||
|
try:
|
||||||
|
if not os.getenv("GEMINI_API_KEY"):
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="GEMINI_API_KEY 미설정 (Windows .env)")
|
||||||
|
return
|
||||||
|
webhook_update_task(task_id, "processing", 10, "Nano Banana (Gemini) 호출 중...")
|
||||||
|
model_id = params.get("model") or DEFAULT_MODEL
|
||||||
|
body = {"contents": [{"parts": [{"text": params["prompt"]}]}]}
|
||||||
|
resp = requests.post(
|
||||||
|
f"{GEMINI_BASE}/models/{model_id}:generateContent",
|
||||||
|
headers={"x-goog-api-key": os.getenv("GEMINI_API_KEY"), "Content-Type": "application/json"},
|
||||||
|
json=body, timeout=120,
|
||||||
|
)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"Gemini {resp.status_code}: {resp.text[:200]}")
|
||||||
|
return
|
||||||
|
b64 = _extract_b64(resp.json())
|
||||||
|
if not b64:
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error="Gemini 응답에 이미지 없음")
|
||||||
|
return
|
||||||
|
url = save_b64_png(task_id, b64)
|
||||||
|
webhook_update_task(task_id, "succeeded", 100, "완료", image_url=url)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("nano_banana task=%s 실패", task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=str(e))
|
||||||
9
services/image-render/requirements.txt
Normal file
9
services/image-render/requirements.txt
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
fastapi==0.115.6
|
||||||
|
uvicorn[standard]==0.34.0
|
||||||
|
requests==2.32.3
|
||||||
|
redis>=5.0
|
||||||
|
httpx>=0.27
|
||||||
|
openai>=1.50.0
|
||||||
|
pytest>=8.0
|
||||||
|
pytest-asyncio>=0.24
|
||||||
|
respx>=0.21
|
||||||
0
services/image-render/tests/__init__.py
Normal file
0
services/image-render/tests/__init__.py
Normal file
21
services/image-render/tests/test_flux.py
Normal file
21
services/image-render/tests/test_flux.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
import providers.flux as fx
|
||||||
|
|
||||||
|
def test_blocked_during_trading_hours(monkeypatch):
|
||||||
|
monkeypatch.setenv("FLUX_BLOCK_TRADING_HOURS", "1")
|
||||||
|
monkeypatch.setattr(fx, "_is_trading_hours", lambda: True)
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(fx, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
|
||||||
|
fx.run_flux_generation("t1", {"prompt": "a cat"})
|
||||||
|
assert calls[-1][0][1] == "failed"
|
||||||
|
assert "장중" in calls[-1][1]["error"]
|
||||||
|
|
||||||
|
def test_success_polls_history_and_saves(monkeypatch):
|
||||||
|
monkeypatch.setattr(fx, "_is_trading_hours", lambda: False)
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(fx, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
|
||||||
|
monkeypatch.setattr(fx, "_load_workflow", lambda prompt, size: {"3": {}})
|
||||||
|
monkeypatch.setattr(fx, "_submit_prompt", lambda wf: "pid-1")
|
||||||
|
monkeypatch.setattr(fx, "_poll_image_b64", lambda pid: "ZmFrZQ==")
|
||||||
|
monkeypatch.setattr(fx, "save_b64_png", lambda tid, b64: "/media/image/t1.png")
|
||||||
|
fx.run_flux_generation("t1", {"prompt": "a cat"})
|
||||||
|
assert [c for c in calls if c[0][1] == "succeeded"]
|
||||||
32
services/image-render/tests/test_gpt_image.py
Normal file
32
services/image-render/tests/test_gpt_image.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
import providers.gpt_image as gi
|
||||||
|
|
||||||
|
|
||||||
|
def test_missing_key_reports_failed(monkeypatch):
|
||||||
|
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(gi, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
|
||||||
|
gi.run_gpt_image_generation("t1", {"prompt": "a cat"})
|
||||||
|
# 마지막 호출이 failed
|
||||||
|
assert calls[-1][0][1] == "failed"
|
||||||
|
|
||||||
|
|
||||||
|
def test_success_saves_and_reports_url(monkeypatch):
|
||||||
|
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(gi, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
|
||||||
|
monkeypatch.setattr(gi, "save_b64_png", lambda tid, b64: "/media/image/t1.png")
|
||||||
|
|
||||||
|
class FakeResp:
|
||||||
|
status_code = 200
|
||||||
|
|
||||||
|
def json(self):
|
||||||
|
return {"data": [{"b64_json": "ZmFrZQ=="}]}
|
||||||
|
|
||||||
|
def raise_for_status(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
monkeypatch.setattr(gi.requests, "post", lambda *a, **k: FakeResp())
|
||||||
|
|
||||||
|
gi.run_gpt_image_generation("t1", {"prompt": "a cat"})
|
||||||
|
succeeded = [c for c in calls if c[0][1] == "succeeded"]
|
||||||
|
assert succeeded and succeeded[-1][1]["image_url"] == "/media/image/t1.png"
|
||||||
25
services/image-render/tests/test_nano_banana.py
Normal file
25
services/image-render/tests/test_nano_banana.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
import providers.nano_banana as nb
|
||||||
|
|
||||||
|
def test_missing_key_reports_failed(monkeypatch):
|
||||||
|
monkeypatch.delenv("GEMINI_API_KEY", raising=False)
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(nb, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
|
||||||
|
nb.run_nano_banana_generation("t1", {"prompt": "a cat"})
|
||||||
|
assert calls[-1][0][1] == "failed"
|
||||||
|
|
||||||
|
def test_success_extracts_inline_data(monkeypatch):
|
||||||
|
monkeypatch.setenv("GEMINI_API_KEY", "g-test")
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(nb, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
|
||||||
|
monkeypatch.setattr(nb, "save_b64_png", lambda tid, b64: "/media/image/t1.png")
|
||||||
|
|
||||||
|
class FakeResp:
|
||||||
|
status_code = 200
|
||||||
|
def json(self):
|
||||||
|
return {"candidates": [{"content": {"parts": [
|
||||||
|
{"inlineData": {"mimeType": "image/png", "data": "ZmFrZQ=="}}
|
||||||
|
]}}]}
|
||||||
|
monkeypatch.setattr(nb.requests, "post", lambda *a, **k: FakeResp())
|
||||||
|
|
||||||
|
nb.run_nano_banana_generation("t1", {"prompt": "a cat"})
|
||||||
|
assert [c for c in calls if c[0][1] == "succeeded"]
|
||||||
20
services/image-render/tests/test_nas_client.py
Normal file
20
services/image-render/tests/test_nas_client.py
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
import nas_client
|
||||||
|
|
||||||
|
|
||||||
|
def test_webhook_includes_image_url(monkeypatch):
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
def fake_post(payload):
|
||||||
|
captured.update(payload)
|
||||||
|
|
||||||
|
monkeypatch.setattr(nas_client, "_post", fake_post)
|
||||||
|
nas_client.webhook_update_task("t1", "succeeded", 100, "done", image_url="/media/image/t1.png")
|
||||||
|
assert captured["task_id"] == "t1"
|
||||||
|
assert captured["image_url"] == "/media/image/t1.png"
|
||||||
|
|
||||||
|
|
||||||
|
def test_webhook_omits_none_fields(monkeypatch):
|
||||||
|
captured = {}
|
||||||
|
monkeypatch.setattr(nas_client, "_post", lambda p: captured.update(p))
|
||||||
|
nas_client.webhook_update_task("t2", "processing", 10, "working")
|
||||||
|
assert "image_url" not in captured and "error" not in captured
|
||||||
15
services/image-render/tests/test_worker.py
Normal file
15
services/image-render/tests/test_worker.py
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
import worker
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_routes_to_provider(monkeypatch):
|
||||||
|
called = {}
|
||||||
|
monkeypatch.setattr(worker, "run_gpt_image_generation", lambda tid, p: called.setdefault("gpt", (tid, p)))
|
||||||
|
worker._dispatch({"job_type": "gpt_image_generation", "task_id": "t1", "params": {"prompt": "x"}})
|
||||||
|
assert called["gpt"][0] == "t1"
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_unknown_job_type_reports_failed(monkeypatch):
|
||||||
|
calls = []
|
||||||
|
monkeypatch.setattr(worker, "webhook_update_task", lambda *a, **k: calls.append((a, k)))
|
||||||
|
worker._dispatch({"job_type": "midjourney_generation", "task_id": "t9", "params": {}})
|
||||||
|
assert calls[-1][0][1] == "failed"
|
||||||
84
services/image-render/worker.py
Normal file
84
services/image-render/worker.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
"""Redis BLPOP worker — queue:image-render → job_type dispatch → NAS webhook.
|
||||||
|
|
||||||
|
queue:paused 가 set이면 대기 (task-watcher가 박재오 활동 감지 시 set).
|
||||||
|
video-render worker.py 패턴 — string-based dispatch + getattr (테스트 patch 호환).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
|
from nas_client import webhook_update_task
|
||||||
|
from providers.gpt_image import run_gpt_image_generation
|
||||||
|
from providers.nano_banana import run_nano_banana_generation
|
||||||
|
from providers.flux import run_flux_generation
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||||||
|
QUEUE_KEY = "queue:image-render"
|
||||||
|
PAUSED_KEY = "queue:paused"
|
||||||
|
|
||||||
|
# string names so `unittest.mock.patch` / `monkeypatch.setattr` on `worker.<name>`
|
||||||
|
# is correctly intercepted by getattr(sys.modules[__name__], ...)
|
||||||
|
_DISPATCH_TABLE = {
|
||||||
|
"gpt_image_generation": "run_gpt_image_generation",
|
||||||
|
"nano_banana_generation": "run_nano_banana_generation",
|
||||||
|
"flux_generation": "run_flux_generation",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _dispatch(payload: dict) -> None:
|
||||||
|
"""payload[job_type] → provider 함수 호출 (sync, worker_loop에서 asyncio.to_thread로 wrap)."""
|
||||||
|
job_type = payload.get("job_type", "")
|
||||||
|
task_id = payload.get("task_id", "")
|
||||||
|
params = payload.get("params", {})
|
||||||
|
fn_name = _DISPATCH_TABLE.get(job_type)
|
||||||
|
if fn_name is None:
|
||||||
|
logger.error("unknown job_type=%s task=%s", job_type, task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"unknown job_type: {job_type}")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
fn = getattr(sys.modules[__name__], fn_name)
|
||||||
|
except AttributeError:
|
||||||
|
logger.error("dispatch table typo for job_type=%s name=%s task=%s", job_type, fn_name, task_id)
|
||||||
|
webhook_update_task(task_id, "failed", 0, "", error=f"internal dispatch error: {fn_name}")
|
||||||
|
return
|
||||||
|
fn(task_id, params)
|
||||||
|
|
||||||
|
|
||||||
|
async def worker_loop():
|
||||||
|
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||||
|
logger.info("image-render worker started (queue=%s)", QUEUE_KEY)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
paused = await redis.get(PAUSED_KEY)
|
||||||
|
if paused == b"1":
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
continue
|
||||||
|
item = await redis.blpop(QUEUE_KEY, timeout=5)
|
||||||
|
if item is None:
|
||||||
|
continue
|
||||||
|
_, raw = item
|
||||||
|
try:
|
||||||
|
payload = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.error("invalid queue payload: %r", raw[:200])
|
||||||
|
continue
|
||||||
|
await asyncio.to_thread(_dispatch, payload)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("worker_loop cancelled")
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("worker_loop iteration 실패, 5초 후 재시도")
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
asyncio.run(worker_loop())
|
||||||
11
services/task-watcher/.env.example
Normal file
11
services/task-watcher/.env.example
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
# Plan-B-Infra — task-watcher
|
||||||
|
|
||||||
|
# NAS Redis
|
||||||
|
REDIS_URL=redis://192.168.45.54:6379
|
||||||
|
|
||||||
|
# NAS stock holidays endpoint
|
||||||
|
STOCK_BASE_URL=http://192.168.45.54:18500
|
||||||
|
|
||||||
|
# 트레이딩 윈도우 (KST, HH:MM) — 이 시간대에만 queue:paused
|
||||||
|
TRADING_START=07:00
|
||||||
|
TRADING_END=16:30
|
||||||
16
services/task-watcher/Dockerfile
Normal file
16
services/task-watcher/Dockerfile
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
FROM python:3.12-slim-bookworm
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
ca-certificates tzdata \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir --timeout 600 --retries 5 -r requirements.txt
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
EXPOSE 8000
|
||||||
|
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
|
||||||
83
services/task-watcher/NSSM_SETUP.md
Normal file
83
services/task-watcher/NSSM_SETUP.md
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
# NSSM 자동 시작 설정 (SP-9)
|
||||||
|
|
||||||
|
Windows AI 머신 부팅 시 ai_trade(트레이딩) + WSL2 Docker(render workers + task-watcher) 자동 시작.
|
||||||
|
|
||||||
|
## 1. NSSM 다운로드
|
||||||
|
|
||||||
|
https://nssm.cc/download → nssm-2.24.zip → `C:\nssm\nssm.exe` 배치 (또는 PATH 등록).
|
||||||
|
|
||||||
|
## 2. ai_trade (Native Python, HIGH priority)
|
||||||
|
|
||||||
|
⚠️ spec의 signal_v2는 ai_trade로 rename됨. 경로/포트 확인.
|
||||||
|
|
||||||
|
```powershell
|
||||||
|
# 관리자 PowerShell
|
||||||
|
C:\nssm\nssm.exe install ai_trade "C:\Python312\python.exe" "-m uvicorn main:app --host 0.0.0.0 --port 8001"
|
||||||
|
C:\nssm\nssm.exe set ai_trade AppDirectory "C:\Users\jaeoh\Desktop\workspace\web-ai\ai_trade"
|
||||||
|
C:\nssm\nssm.exe set ai_trade Priority HIGH_PRIORITY_CLASS
|
||||||
|
C:\nssm\nssm.exe set ai_trade Start SERVICE_AUTO_START
|
||||||
|
C:\nssm\nssm.exe set ai_trade AppStdout "C:\Users\jaeoh\nssm-logs\ai_trade.log"
|
||||||
|
C:\nssm\nssm.exe set ai_trade AppStderr "C:\Users\jaeoh\nssm-logs\ai_trade.log"
|
||||||
|
```
|
||||||
|
|
||||||
|
(ai_trade의 실제 진입점이 main:app + port 8001인지 확인. 다르면 조정.)
|
||||||
|
|
||||||
|
## 3. WSL2 Docker (NORMAL priority — render workers + task-watcher)
|
||||||
|
|
||||||
|
```powershell
|
||||||
|
C:\nssm\nssm.exe install wsl_docker "C:\Windows\System32\wsl.exe" "-d Ubuntu-24.04 -- sh -c 'sudo service docker start && cd /workspace/web-ai/services && docker compose up -d'"
|
||||||
|
C:\nssm\nssm.exe set wsl_docker Priority NORMAL_PRIORITY_CLASS
|
||||||
|
C:\nssm\nssm.exe set wsl_docker Start SERVICE_AUTO_START
|
||||||
|
C:\nssm\nssm.exe set wsl_docker AppStdout "C:\Users\jaeoh\nssm-logs\wsl_docker.log"
|
||||||
|
```
|
||||||
|
|
||||||
|
⚠️ 변경점: Ubuntu-22.04 → **Ubuntu-24.04**, web-ai-services → **web-ai/services**. WSL 경로는 박재오 WSL 마운트 기준 (`/workspace`가 web-ai에 매핑되어 있으면 그대로, 아니면 `/mnt/c/Users/jaeoh/Desktop/workspace/web-ai/services`).
|
||||||
|
|
||||||
|
`sudo service docker start`가 비밀번호 요구하면 sudoers에 NOPASSWD 추가:
|
||||||
|
```bash
|
||||||
|
# WSL2 안
|
||||||
|
echo "$USER ALL=(ALL) NOPASSWD: /usr/sbin/service docker start" | sudo tee /etc/sudoers.d/docker-start
|
||||||
|
```
|
||||||
|
|
||||||
|
## 4. 서비스 시작 + 확인
|
||||||
|
|
||||||
|
```powershell
|
||||||
|
C:\nssm\nssm.exe start ai_trade
|
||||||
|
C:\nssm\nssm.exe start wsl_docker
|
||||||
|
|
||||||
|
# 상태 확인
|
||||||
|
C:\nssm\nssm.exe status ai_trade
|
||||||
|
C:\nssm\nssm.exe status wsl_docker
|
||||||
|
sc query ai_trade
|
||||||
|
```
|
||||||
|
|
||||||
|
## 5. 검증
|
||||||
|
|
||||||
|
```powershell
|
||||||
|
# ai_trade
|
||||||
|
curl http://localhost:8001/health # 또는 ai_trade의 실제 health endpoint
|
||||||
|
|
||||||
|
# WSL2 docker 컨테이너 (재부팅 후 자동 시작 확인)
|
||||||
|
wsl -d Ubuntu-24.04 -- docker ps
|
||||||
|
# insta-render, music-render, video-render, task-watcher 4개 Up 확인
|
||||||
|
```
|
||||||
|
|
||||||
|
## 6. 재부팅 테스트
|
||||||
|
|
||||||
|
Windows 재부팅 → 로그인 → 수동 조작 없이:
|
||||||
|
- ai_trade 서비스 자동 시작 (HIGH priority)
|
||||||
|
- WSL2 + Docker + 4 컨테이너 자동 시작 (NORMAL priority)
|
||||||
|
- task-watcher가 trading window에 queue:paused 토글 시작
|
||||||
|
|
||||||
|
## task-watcher 동작 확인
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# WSL2
|
||||||
|
docker logs task-watcher --tail 20
|
||||||
|
# 기대: "task-watcher started" + mode 전환 로그 (trading/free)
|
||||||
|
|
||||||
|
# Redis 큐 상태 (NAS 또는 LAN)
|
||||||
|
docker exec redis redis-cli GET queue:paused
|
||||||
|
# 트레이딩 시간대(평일 07:00-16:30): "1"
|
||||||
|
# 그 외: (nil)
|
||||||
|
```
|
||||||
36
services/task-watcher/main.py
Normal file
36
services/task-watcher/main.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
"""task-watcher FastAPI entry — health + lifespan (watcher loop spawn)."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
|
||||||
|
import watcher
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
watcher_task = asyncio.create_task(watcher.watcher_loop())
|
||||||
|
logger.info("task-watcher lifespan 시작")
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
watcher_task.cancel()
|
||||||
|
try:
|
||||||
|
await watcher_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
logger.info("task-watcher lifespan 종료")
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI(lifespan=lifespan)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
def health():
|
||||||
|
return {"ok": True, "service": "task-watcher"}
|
||||||
57
services/task-watcher/mode.py
Normal file
57
services/task-watcher/mode.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
"""시간대 + 휴장일 기반 모드 판정 (idle 감지 생략 — 박재오 결정 2026-05-22).
|
||||||
|
|
||||||
|
trading: 비휴장 평일 07:00–16:30 (장중) → queue:paused SET
|
||||||
|
free: 그 외 (장 전/후, 주말, 휴장) → queue:paused DEL
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime as dt
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Set
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
KST = ZoneInfo("Asia/Seoul")
|
||||||
|
STOCK_BASE_URL = os.getenv("STOCK_BASE_URL", "http://192.168.45.54:18500")
|
||||||
|
|
||||||
|
# 트레이딩 윈도우 (HH:MM, KST). .env로 조정 가능.
|
||||||
|
TRADING_START = os.getenv("TRADING_START", "07:00")
|
||||||
|
TRADING_END = os.getenv("TRADING_END", "16:30")
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_hhmm(s: str) -> dt.time:
|
||||||
|
hh, mm = s.split(":")
|
||||||
|
return dt.time(int(hh), int(mm))
|
||||||
|
|
||||||
|
|
||||||
|
def current_mode(now: dt.datetime, holidays: Set[str]) -> str:
|
||||||
|
"""now(KST aware) + holidays(ISO date set) → 'trading' | 'free'."""
|
||||||
|
# 주말 (토=5, 일=6)
|
||||||
|
if now.weekday() >= 5:
|
||||||
|
return "free"
|
||||||
|
# 휴장일
|
||||||
|
if now.date().isoformat() in holidays:
|
||||||
|
return "free"
|
||||||
|
# 트레이딩 윈도우 [start, end)
|
||||||
|
start = _parse_hhmm(TRADING_START)
|
||||||
|
end = _parse_hhmm(TRADING_END)
|
||||||
|
t = now.timetz().replace(tzinfo=None)
|
||||||
|
if start <= t < end:
|
||||||
|
return "trading"
|
||||||
|
return "free"
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_holidays() -> Set[str]:
|
||||||
|
"""NAS stock /api/stock/holidays 조회. 실패 시 빈 set (안전 — free로 판정)."""
|
||||||
|
try:
|
||||||
|
r = httpx.get(f"{STOCK_BASE_URL}/api/stock/holidays", timeout=10.0)
|
||||||
|
if r.status_code == 200:
|
||||||
|
return set(r.json().get("holidays", []))
|
||||||
|
logger.warning("holidays fetch returned %d", r.status_code)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("holidays fetch 실패")
|
||||||
|
return set()
|
||||||
5
services/task-watcher/requirements.txt
Normal file
5
services/task-watcher/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
fastapi==0.115.6
|
||||||
|
uvicorn[standard]==0.34.0
|
||||||
|
redis>=5.0
|
||||||
|
httpx>=0.27
|
||||||
|
pytest>=8.0
|
||||||
0
services/task-watcher/tests/__init__.py
Normal file
0
services/task-watcher/tests/__init__.py
Normal file
44
services/task-watcher/tests/test_mode.py
Normal file
44
services/task-watcher/tests/test_mode.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
"""current_mode — 시간대 + 휴장일 판정 (순수 함수)."""
|
||||||
|
import datetime as dt
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
from mode import current_mode
|
||||||
|
|
||||||
|
KST = ZoneInfo("Asia/Seoul")
|
||||||
|
HOLIDAYS = {"2026-05-25"} # 가상 휴장일 (월요일)
|
||||||
|
|
||||||
|
|
||||||
|
def _kst(y, m, d, hh, mm):
|
||||||
|
return dt.datetime(y, m, d, hh, mm, tzinfo=KST)
|
||||||
|
|
||||||
|
|
||||||
|
def test_weekday_trading_hours_is_trading():
|
||||||
|
# 2026-05-22 금요일 10:00 — 트레이딩 시간대
|
||||||
|
assert current_mode(_kst(2026, 5, 22, 10, 0), HOLIDAYS) == "trading"
|
||||||
|
|
||||||
|
|
||||||
|
def test_weekday_before_open_is_free():
|
||||||
|
# 평일 06:00 — 장 전
|
||||||
|
assert current_mode(_kst(2026, 5, 22, 6, 0), HOLIDAYS) == "free"
|
||||||
|
|
||||||
|
|
||||||
|
def test_weekday_after_close_is_free():
|
||||||
|
# 평일 17:00 — 장 마감 후
|
||||||
|
assert current_mode(_kst(2026, 5, 22, 17, 0), HOLIDAYS) == "free"
|
||||||
|
|
||||||
|
|
||||||
|
def test_weekend_is_free():
|
||||||
|
# 2026-05-23 토요일 10:00
|
||||||
|
assert current_mode(_kst(2026, 5, 23, 10, 0), HOLIDAYS) == "free"
|
||||||
|
|
||||||
|
|
||||||
|
def test_holiday_weekday_is_free():
|
||||||
|
# 2026-05-25 월요일이지만 휴장일 → 트레이딩 시간대라도 free
|
||||||
|
assert current_mode(_kst(2026, 5, 25, 10, 0), HOLIDAYS) == "free"
|
||||||
|
|
||||||
|
|
||||||
|
def test_trading_boundary_inclusive_start_exclusive_end():
|
||||||
|
# 07:00 정각 = 트레이딩 시작, 16:30 정각 = 마감 (16:30은 free)
|
||||||
|
assert current_mode(_kst(2026, 5, 22, 7, 0), HOLIDAYS) == "trading"
|
||||||
|
assert current_mode(_kst(2026, 5, 22, 16, 29), HOLIDAYS) == "trading"
|
||||||
|
assert current_mode(_kst(2026, 5, 22, 16, 30), HOLIDAYS) == "free"
|
||||||
59
services/task-watcher/watcher.py
Normal file
59
services/task-watcher/watcher.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
"""30초마다 current_mode 판정 → queue:paused 토글.
|
||||||
|
|
||||||
|
trading → SET queue:paused 1 EX 600 (10분 TTL — watcher 죽어도 자동 해제)
|
||||||
|
free → DEL queue:paused
|
||||||
|
holidays는 1시간마다 refresh (매 loop fetch 부하 회피).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import datetime as dt
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
|
from mode import current_mode, fetch_holidays, KST
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://192.168.45.54:6379")
|
||||||
|
PAUSED_KEY = "queue:paused"
|
||||||
|
LOOP_INTERVAL = 30 # 초
|
||||||
|
HOLIDAYS_REFRESH = 3600 # 1시간
|
||||||
|
PAUSED_TTL = 600 # 10분 (watcher 죽어도 자동 해제)
|
||||||
|
|
||||||
|
|
||||||
|
async def watcher_loop():
|
||||||
|
redis = aioredis.from_url(REDIS_URL, decode_responses=False)
|
||||||
|
holidays = fetch_holidays()
|
||||||
|
last_holiday_refresh = dt.datetime.now(KST)
|
||||||
|
last_mode = None
|
||||||
|
logger.info("task-watcher started (trading window 토글)")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
now = dt.datetime.now(KST)
|
||||||
|
# holidays 주기적 refresh
|
||||||
|
if (now - last_holiday_refresh).total_seconds() >= HOLIDAYS_REFRESH:
|
||||||
|
holidays = fetch_holidays()
|
||||||
|
last_holiday_refresh = now
|
||||||
|
|
||||||
|
mode = current_mode(now, holidays)
|
||||||
|
if mode == "trading":
|
||||||
|
await redis.set(PAUSED_KEY, b"1", ex=PAUSED_TTL)
|
||||||
|
else:
|
||||||
|
await redis.delete(PAUSED_KEY)
|
||||||
|
|
||||||
|
if mode != last_mode:
|
||||||
|
logger.info("mode 전환: %s → %s (paused=%s)", last_mode, mode, mode == "trading")
|
||||||
|
last_mode = mode
|
||||||
|
|
||||||
|
await asyncio.sleep(LOOP_INTERVAL)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("watcher_loop cancelled")
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
logger.exception("watcher_loop iteration 실패, 30초 후 재시도")
|
||||||
|
await asyncio.sleep(LOOP_INTERVAL)
|
||||||
Reference in New Issue
Block a user