Files
web-page-backend/stock-lab/app/main.py

172 lines
5.8 KiB
Python

import os
import json
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from pydantic import BaseModel
from .db import init_db, save_articles, get_latest_articles
from .scraper import fetch_market_news, fetch_major_indices, fetch_overseas_news
app = FastAPI()
# CORS 설정 (프론트엔드 접근 허용)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 운영 시에는 구체적인 도메인으로 제한하는 것이 좋음
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
# Windows AI Server URL (NAS .env에서 설정)
WINDOWS_AI_SERVER_URL = os.getenv("WINDOWS_AI_SERVER_URL", "http://192.168.0.5:8000")
@app.on_event("startup")
def on_startup():
print(f"[StockLab] Startup. Windows AI Server URL: {WINDOWS_AI_SERVER_URL}")
init_db()
# 매일 아침 8시 뉴스 스크랩 (NAS 자체 수행)
scheduler.add_job(run_scraping_job, "cron", hour="8", minute="0")
# 앱 시작 시에도 한 번 실행 (데이터 없으면)
if not get_latest_articles(1):
run_scraping_job()
scheduler.start()
def run_scraping_job():
print("[StockLab] Starting news scraping...")
# 1. 국내
articles_kr = fetch_market_news()
count_kr = save_articles(articles_kr)
# 2. 해외
articles_world = fetch_overseas_news()
count_world = save_articles(articles_world)
print(f"[StockLab] Saved {count_kr} domestic, {count_world} overseas articles.")
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/stock/news")
def get_news(limit: int = 20, category: str = None):
"""최신 주식 뉴스 조회 (category: 'domestic' | 'overseas')"""
return get_latest_articles(limit, category)
@app.get("/api/stock/indices")
def get_indices():
"""주요 지표(KOSPI 등) 실시간 크롤링 조회"""
return fetch_major_indices()
@app.post("/api/stock/scrap")
def trigger_scrap():
"""수동 스크랩 트리거"""
run_scraping_job()
return {"ok": True}
# --- Trading API (Windows Proxy) ---
@app.get("/api/trade/balance")
def get_balance():
"""계좌 잔고 조회 (Windows AI Server Proxy)"""
print(f"[Proxy] Requesting Balance from {WINDOWS_AI_SERVER_URL}...")
try:
resp = requests.get(f"{WINDOWS_AI_SERVER_URL}/trade/balance", timeout=5)
if resp.status_code != 200:
print(f"[ProxyError] Balance Error: {resp.status_code} {resp.text}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
print("[Proxy] Balance Success")
return resp.json()
except Exception as e:
print(f"[ProxyError] Connection Failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "Connection Failed", "detail": str(e), "target": WINDOWS_AI_SERVER_URL}
)
class OrderRequest(BaseModel):
code: str
qty: int
price: int = 0 # 0이면 시장가
type: str # 'buy' or 'sell'
@app.post("/api/trade/order")
def order_stock(req: OrderRequest):
"""주식 매수/매도 주문 (Windows AI Server Proxy)"""
print(f"[Proxy] Order Request: {req.dict()} to {WINDOWS_AI_SERVER_URL}...")
try:
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/trade/order", json=req.dict(), timeout=10)
if resp.status_code != 200:
print(f"[ProxyError] Order Error: {resp.status_code} {resp.text}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
return resp.json()
except Exception as e:
print(f"[ProxyError] Order Connection Failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "Connection Failed", "detail": str(e), "target": WINDOWS_AI_SERVER_URL}
)
@app.post("/api/trade/auto")
def auto_trade():
"""AI 자동 매매 트리거 (Windows AI Server Proxy)"""
print(f"[Proxy] Triggering Auto Trade at {WINDOWS_AI_SERVER_URL}...")
try:
# 빈 JSON Body를 명시적으로 전송하여 422 에러 방지
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/trade/auto", json={}, timeout=120)
if resp.status_code != 200:
print(f"[ProxyError] Auto Trade Error: {resp.status_code} {resp.text}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
result = resp.json()
print(f"[Proxy] Auto Trade Success. Response:\n{json.dumps(result, indent=2, ensure_ascii=False)}")
return result
except Exception as e:
print(f"[ProxyError] Auto Trade Connection Failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "Connection Failed", "detail": str(e), "target": WINDOWS_AI_SERVER_URL}
)
@app.get("/api/stock/analyze")
def analyze_market():
"""Windows PC를 통한 AI 시장 분석"""
print(f"[Proxy] Analyzing Market at {WINDOWS_AI_SERVER_URL}...")
try:
# 빈 JSON Body 전송
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/analyze/portfolio", json={}, timeout=120)
if resp.status_code != 200:
print(f"[ProxyError] Analyze Error: {resp.status_code} {resp.text}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
return resp.json()
except Exception as e:
print(f"[ProxyError] Analyze Connection Failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "Connection Failed", "detail": str(e), "target": WINDOWS_AI_SERVER_URL}
)
@app.get("/api/version")
def version():
return {"version": os.getenv("APP_VERSION", "dev")}