Files
web-page-backend/stock-lab/app/main.py

129 lines
4.2 KiB
Python

import os
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from pydantic import BaseModel
from .db import init_db, save_articles, get_latest_articles
from .scraper import fetch_market_news, fetch_major_indices, fetch_overseas_news
app = FastAPI()
# CORS 설정 (프론트엔드 접근 허용)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 운영 시에는 구체적인 도메인으로 제한하는 것이 좋음
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
# Windows AI Server URL (NAS .env에서 설정)
WINDOWS_AI_SERVER_URL = os.getenv("WINDOWS_AI_SERVER_URL", "http://192.168.0.5:8000")
@app.on_event("startup")
def on_startup():
init_db()
# 매일 아침 8시 뉴스 스크랩 (NAS 자체 수행)
scheduler.add_job(run_scraping_job, "cron", hour="8", minute="0")
# 앱 시작 시에도 한 번 실행 (데이터 없으면)
if not get_latest_articles(1):
run_scraping_job()
scheduler.start()
def run_scraping_job():
print("[StockLab] Starting news scraping...")
# 1. 국내
articles_kr = fetch_market_news()
count_kr = save_articles(articles_kr)
# 2. 해외
articles_world = fetch_overseas_news()
count_world = save_articles(articles_world)
print(f"[StockLab] Saved {count_kr} domestic, {count_world} overseas articles.")
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/stock/news")
def get_news(limit: int = 20, category: str = None):
"""최신 주식 뉴스 조회 (category: 'domestic' | 'overseas')"""
return get_latest_articles(limit, category)
@app.get("/api/stock/indices")
def get_indices():
"""주요 지표(KOSPI 등) 실시간 크롤링 조회"""
return fetch_major_indices()
@app.post("/api/stock/scrap")
def trigger_scrap():
"""수동 스크랩 트리거"""
run_scraping_job()
return {"ok": True}
# --- Trading API (Windows Proxy) ---
@app.get("/api/trade/balance")
def get_balance():
"""계좌 잔고 조회 (Windows AI Server Proxy)"""
try:
# Windows Server로 잔고 조회 요청 (Windows가 직접 KIS 호출)
resp = requests.get(f"{WINDOWS_AI_SERVER_URL}/trade/balance", timeout=5)
# Windows Server가 500 등을 내더라도 JSON 포맷이면 그대로 전달
if resp.status_code != 200:
return {"error": "Windows AI Server returned error", "status": resp.status_code, "detail": resp.text}
return resp.json()
except Exception as e:
return {"error": "Failed to connect to Windows AI Server", "detail": str(e), "url": WINDOWS_AI_SERVER_URL}
class OrderRequest(BaseModel):
code: str
qty: int
price: int = 0 # 0이면 시장가
type: str # 'buy' or 'sell'
@app.post("/api/trade/order")
def order_stock(req: OrderRequest):
"""주식 매수/매도 주문 (Windows AI Server Proxy)"""
try:
# Windows Server로 주문 요청
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/trade/order", json=req.dict(), timeout=10)
return resp.json()
except Exception as e:
return {"success": False, "message": f"Proxy Error: {str(e)}", "url": WINDOWS_AI_SERVER_URL}
@app.post("/api/trade/auto")
def auto_trade():
"""AI 자동 매매 트리거 (Windows AI Server Proxy)"""
try:
# Windows Server로 AI 매매 요청
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/trade/auto", timeout=120)
return resp.json()
except Exception as e:
return {"success": False, "message": f"Proxy Error: {str(e)}", "url": WINDOWS_AI_SERVER_URL}
@app.get("/api/stock/analyze")
def analyze_market():
"""Windows PC를 통한 AI 시장 분석"""
try:
# Windows AI Server의 API 호출
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/analyze/portfolio", timeout=120)
return resp.json()
except Exception as e:
return {"error": "Failed to connect to Windows AI Server", "detail": str(e), "url": WINDOWS_AI_SERVER_URL}
@app.get("/api/version")
def version():
return {"version": os.getenv("APP_VERSION", "dev")}