Files
web-page-backend/agent-office/app/service_proxy.py

169 lines
5.6 KiB
Python

import httpx
from typing import Any, Dict, List, Optional
from .config import STOCK_LAB_URL, MUSIC_LAB_URL, BLOG_LAB_URL, REALESTATE_LAB_URL
_client = httpx.AsyncClient(timeout=30.0)
async def fetch_stock_news(limit: int = 10, category: str = None) -> List[Dict[str, Any]]:
params = {"limit": limit}
if category:
params["category"] = category
resp = await _client.get(f"{STOCK_LAB_URL}/api/stock/news", params=params)
resp.raise_for_status()
return resp.json()
async def fetch_stock_indices() -> Dict[str, Any]:
resp = await _client.get(f"{STOCK_LAB_URL}/api/stock/indices")
resp.raise_for_status()
return resp.json()
async def summarize_stock_news(limit: int = 15) -> Dict[str, Any]:
"""stock-lab의 AI 요약 엔드포인트 호출.
반환: {"summary": str, "tokens": {...}, "model": str, "duration_ms": int, "article_count": int}
"""
# stock-lab 내부 Ollama 호출이 180s까지 가능하므로 여유있게 200s
async with httpx.AsyncClient(timeout=200.0) as client:
resp = await client.post(
f"{STOCK_LAB_URL}/api/stock/news/summarize",
json={"limit": limit},
)
resp.raise_for_status()
return resp.json()
async def generate_music(payload: dict) -> Dict[str, Any]:
resp = await _client.post(f"{MUSIC_LAB_URL}/api/music/generate", json=payload)
resp.raise_for_status()
return resp.json()
async def get_music_status(task_id: str) -> Dict[str, Any]:
resp = await _client.get(f"{MUSIC_LAB_URL}/api/music/status/{task_id}")
resp.raise_for_status()
return resp.json()
async def get_music_credits() -> Dict[str, Any]:
resp = await _client.get(f"{MUSIC_LAB_URL}/api/music/credits")
resp.raise_for_status()
return resp.json()
# --- blog-lab ---
async def blog_research(keyword: str) -> Dict[str, Any]:
"""키워드 리서치 시작 → task_id 반환"""
resp = await _client.post(
f"{BLOG_LAB_URL}/api/blog-marketing/research",
json={"keyword": keyword},
)
resp.raise_for_status()
return resp.json()
async def blog_task_status(task_id: str) -> Dict[str, Any]:
resp = await _client.get(f"{BLOG_LAB_URL}/api/blog-marketing/task/{task_id}")
resp.raise_for_status()
return resp.json()
async def blog_generate(keyword_id: int) -> Dict[str, Any]:
resp = await _client.post(
f"{BLOG_LAB_URL}/api/blog-marketing/generate",
json={"keyword_id": keyword_id},
)
resp.raise_for_status()
return resp.json()
async def blog_market(post_id: int) -> Dict[str, Any]:
resp = await _client.post(f"{BLOG_LAB_URL}/api/blog-marketing/market/{post_id}")
resp.raise_for_status()
return resp.json()
async def blog_review(post_id: int) -> Dict[str, Any]:
resp = await _client.post(f"{BLOG_LAB_URL}/api/blog-marketing/review/{post_id}")
resp.raise_for_status()
return resp.json()
async def blog_publish(post_id: int, url: str = "") -> Dict[str, Any]:
resp = await _client.post(
f"{BLOG_LAB_URL}/api/blog-marketing/posts/{post_id}/publish",
json={"url": url},
)
resp.raise_for_status()
return resp.json()
async def blog_get_post(post_id: int) -> Dict[str, Any]:
resp = await _client.get(f"{BLOG_LAB_URL}/api/blog-marketing/posts/{post_id}")
resp.raise_for_status()
return resp.json()
# --- realestate-lab ---
async def realestate_collect() -> Dict[str, Any]:
"""청약 공고 수동 수집 트리거"""
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(f"{REALESTATE_LAB_URL}/api/realestate/collect")
resp.raise_for_status()
return resp.json()
async def realestate_matches(limit: int = 20) -> List[Dict[str, Any]]:
"""realestate-lab의 GET /api/realestate/matches 호출."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{REALESTATE_LAB_URL}/api/realestate/matches",
params={"size": limit},
)
resp.raise_for_status()
data = resp.json()
return data.get("items", [])
async def realestate_dashboard() -> Dict[str, Any]:
resp = await _client.get(f"{REALESTATE_LAB_URL}/api/realestate/dashboard")
resp.raise_for_status()
return resp.json()
async def realestate_mark_read(match_id: int) -> Dict[str, Any]:
resp = await _client.patch(f"{REALESTATE_LAB_URL}/api/realestate/matches/{match_id}/read")
resp.raise_for_status()
return resp.json()
async def realestate_bookmark_toggle(announcement_id: int) -> Dict[str, Any]:
"""realestate-lab의 PATCH /api/realestate/announcements/{id}/bookmark 호출."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.patch(
f"{REALESTATE_LAB_URL}/api/realestate/announcements/{announcement_id}/bookmark"
)
resp.raise_for_status()
return resp.json()
# --- lotto-backend ---
async def lotto_candidates(n: int = 20) -> Dict[str, Any]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/curator/candidates", params={"n": n})
resp.raise_for_status()
return resp.json()
async def lotto_context() -> Dict[str, Any]:
from .config import LOTTO_BACKEND_URL
resp = await _client.get(f"{LOTTO_BACKEND_URL}/api/lotto/curator/context")
resp.raise_for_status()
return resp.json()
async def lotto_save_briefing(payload: dict) -> Dict[str, Any]:
from .config import LOTTO_BACKEND_URL
resp = await _client.post(f"{LOTTO_BACKEND_URL}/api/lotto/briefing", json=payload)
resp.raise_for_status()
return resp.json()