Files
web-page-backend/stock-lab/app/main.py

155 lines
5.0 KiB
Python

import os
import json
from typing import Optional
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from pydantic import BaseModel
from .db import init_db, save_articles, get_latest_articles
from .scraper import fetch_market_news, fetch_major_indices, fetch_overseas_news
app = FastAPI()
# CORS 설정 (프론트엔드 접근 허용)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 운영 시에는 구체적인 도메인으로 제한하는 것이 좋음
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
# Windows AI Server URL (NAS .env에서 설정)
WINDOWS_AI_SERVER_URL = os.getenv("WINDOWS_AI_SERVER_URL", "http://192.168.0.5:8000")
@app.on_event("startup")
def on_startup():
print(f"[StockLab] Startup. Windows AI Server URL: {WINDOWS_AI_SERVER_URL}")
init_db()
# 매일 아침 8시 뉴스 스크랩 (NAS 자체 수행)
scheduler.add_job(run_scraping_job, "cron", hour="8", minute="0")
# 앱 시작 시에도 한 번 실행 (데이터 없으면)
if not get_latest_articles(1):
run_scraping_job()
scheduler.start()
def run_scraping_job():
print("[StockLab] Starting news scraping...")
# 1. 국내
articles_kr = fetch_market_news()
count_kr = save_articles(articles_kr)
# 2. 해외
articles_world = fetch_overseas_news()
count_world = save_articles(articles_world)
print(f"[StockLab] Saved {count_kr} domestic, {count_world} overseas articles.")
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/stock/news")
def get_news(limit: int = 20, category: str = None):
"""최신 주식 뉴스 조회 (category: 'domestic' | 'overseas')"""
return get_latest_articles(limit, category)
@app.get("/api/stock/indices")
def get_indices():
"""주요 지표(KOSPI 등) 실시간 크롤링 조회"""
return fetch_major_indices()
@app.post("/api/stock/scrap")
def trigger_scrap():
"""수동 스크랩 트리거"""
run_scraping_job()
return {"ok": True}
# --- Trading API (Windows Proxy) ---
@app.get("/api/trade/balance")
def get_balance():
"""계좌 잔고 조회 (Windows AI Server Proxy)"""
print(f"[Proxy] Requesting Balance from {WINDOWS_AI_SERVER_URL}...")
try:
resp = requests.get(f"{WINDOWS_AI_SERVER_URL}/trade/balance", timeout=5)
if resp.status_code != 200:
print(f"[ProxyError] Balance Error: {resp.status_code} {resp.text}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
print("[Proxy] Balance Success")
return resp.json()
except Exception as e:
print(f"[ProxyError] Connection Failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "Connection Failed", "detail": str(e), "target": WINDOWS_AI_SERVER_URL}
)
class OrderRequest(BaseModel):
ticker: str # 종목 코드 (예: "005930")
action: str # "BUY" or "SELL"
quantity: int # 주문 수량
price: int = 0 # 0이면 시장가
reason: Optional[str] = "Manual Order" # 주문 사유 (AI 기록용)
@app.post("/api/trade/order")
def order_stock(req: OrderRequest):
"""주식 매수/매도 주문 (Windows AI Server Proxy)"""
print(f"[Proxy] Order Request: {req.dict()} to {WINDOWS_AI_SERVER_URL}...")
try:
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/trade/order", json=req.dict(), timeout=10)
if resp.status_code != 200:
print(f"[ProxyError] Order Error: {resp.status_code} {resp.text}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
return resp.json()
except Exception as e:
print(f"[ProxyError] Order Connection Failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "Connection Failed", "detail": str(e), "target": WINDOWS_AI_SERVER_URL}
)
@app.get("/api/stock/analyze")
def analyze_market():
"""Windows PC를 통한 AI 시장 분석"""
print(f"[Proxy] Analyzing Market at {WINDOWS_AI_SERVER_URL}...")
try:
# 빈 JSON Body 전송
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/analyze/portfolio", json={}, timeout=120)
if resp.status_code != 200:
print(f"[ProxyError] Analyze Error: {resp.status_code} {resp.text}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
return resp.json()
except Exception as e:
print(f"[ProxyError] Analyze Connection Failed: {e}")
return JSONResponse(
status_code=500,
content={"error": "Connection Failed", "detail": str(e), "target": WINDOWS_AI_SERVER_URL}
)
@app.get("/api/version")
def version():
return {"version": os.getenv("APP_VERSION", "dev")}