Files
web-page-backend/stock/app/main.py
gahusb 5da7a0040b fix(stock,docs): portfolio total_buy 수량 곱산 + insta-trends spec 변경 이력 (F4 + F6)
[F4] /api/portfolio 응답의 summary.total_buy가 종목별 단가 × 수량의 합이
되도록 fix. 기존 인라인 코드가 purchase_price를 수량 미곱산으로 단순
누적해 명세(qty 100 · avg 72000 → 7,200,000)와 어긋났음. API_SPEC.md에
purchase_price 필드 의미 + total_buy 계산식 명시. test 3건 (단가 곱산,
avg_price 폴백, 다종목 합산).

[F6] insta-trends spec/plan 상단에 "google_trends → youtube_trending"
변경 이력 추가. Google Trends endpoint 폐기로 source 교체된 이력이
본문 검색 시 혼란 주는 문제 차단. 사유 cross-ref:
feedback_external_data_sources.md
2026-05-17 14:06:19 +09:00

622 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import logging
from datetime import date as date_type
from typing import Optional
from fastapi import FastAPI, Query, Header, Depends, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from pydantic import BaseModel
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
logger = logging.getLogger("stock")
from .db import (
init_db, save_articles, get_latest_articles,
add_portfolio_item, get_all_portfolio, get_portfolio_item,
update_portfolio_item, delete_portfolio_item,
upsert_broker_cash, get_all_broker_cash, delete_broker_cash,
upsert_asset_snapshot, get_asset_snapshots,
add_sell_history, get_sell_history, update_sell_history, delete_sell_history,
)
from .scraper import fetch_market_news, fetch_major_indices
from .price_fetcher import get_current_prices, get_current_prices_detail
from .ai_summarizer import summarize_news, OllamaError
from .auth import verify_webai_key
app = FastAPI()
# Screener 라우터 등록
from app.screener.router import router as screener_router
app.include_router(screener_router)
# CORS 설정 (프론트엔드 접근 허용)
_cors_origins = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:3007,http://localhost:8080").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _cors_origins],
allow_credentials=False,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type"],
)
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
# Windows AI Server URL (NAS .env에서 설정)
WINDOWS_AI_SERVER_URL = os.getenv("WINDOWS_AI_SERVER_URL", "http://192.168.0.5:8000")
# Admin API Key 인증 — /api/trade/* 보호 (CODE_REVIEW F2)
# 빈 키 + 명시적 dev flag 없으면 503으로 거부. 운영 .env에 ADMIN_API_KEY 누락 시
# 무인증 통과되던 버그 차단.
ADMIN_API_KEY = os.getenv("ADMIN_API_KEY", "")
def verify_admin(x_admin_key: str = Header(None)):
"""admin/trade 엔드포인트 보호용 API 키 검증.
- ADMIN_API_KEY 설정됨 + 키 일치 → 통과
- ADMIN_API_KEY 설정됨 + 키 불일치 → 401 Unauthorized
- ADMIN_API_KEY 미설정 + ALLOW_UNAUTHENTICATED_ADMIN=true → 통과 (개발 모드)
- ADMIN_API_KEY 미설정 + dev flag 없음 → 503 (보호 강화, 운영 .env 누락 차단)
"""
if not ADMIN_API_KEY:
if os.getenv("ALLOW_UNAUTHENTICATED_ADMIN", "false").lower() == "true":
return # 개발 환경 명시적 허용
raise HTTPException(
status_code=503,
detail=(
"admin endpoint protected — ADMIN_API_KEY not configured. "
"Set ADMIN_API_KEY in .env, or set ALLOW_UNAUTHENTICATED_ADMIN=true "
"for development only."
),
)
if x_admin_key != ADMIN_API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized")
# Anthropic API 프록시용 키 (서버 측 보관)
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
# 공휴일 목록 로드
_HOLIDAYS_PATH = os.path.join(os.path.dirname(__file__), "holidays.json")
try:
with open(_HOLIDAYS_PATH, "r") as f:
_HOLIDAYS: set = set(json.load(f))
except Exception:
_HOLIDAYS = set()
def is_market_open(d: date_type) -> bool:
return d.weekday() < 5 and d.strftime("%Y-%m-%d") not in _HOLIDAYS
def _calc_portfolio_totals(items, prices):
"""포트폴리오 총 매입/평가금 계산 (snapshot과 API에서 공용)"""
total_buy = 0
total_eval = 0
for item in items:
buy_amount = item["avg_price"] * item["quantity"]
current_price = prices.get(item["ticker"], item["avg_price"])
total_buy += buy_amount
total_eval += current_price * item["quantity"]
return total_buy, total_eval
def save_daily_snapshot():
today = date_type.today()
if not is_market_open(today):
logger.info(f"Snapshot: {today} 휴장일 — 스킵")
return
today_str = today.strftime("%Y-%m-%d")
items = get_all_portfolio()
cash_rows = get_all_broker_cash()
total_cash = sum(r["cash"] for r in cash_rows)
if items:
tickers = list({item["ticker"] for item in items})
prices = get_current_prices(tickers)
_, total_eval = _calc_portfolio_totals(items, prices)
else:
total_eval = 0
total_assets = total_eval + total_cash
upsert_asset_snapshot(today_str, total_eval, total_cash, total_assets)
logger.info(f"Snapshot: {today_str} 저장 — eval={total_eval}, cash={total_cash}, total={total_assets}")
@app.on_event("startup")
def on_startup():
init_db()
# 매일 아침 8시 뉴스 스크랩 (NAS 자체 수행)
scheduler.add_job(run_scraping_job, "cron", hour="8", minute="0")
# 평일 15:40 총 자산 스냅샷 저장
scheduler.add_job(save_daily_snapshot, "cron", day_of_week="mon-fri", hour=15, minute=40)
# 앱 시작 시에도 한 번 실행 (데이터 없으면)
if not get_latest_articles(1):
run_scraping_job()
scheduler.start()
def run_scraping_job():
logger.info("뉴스 스크래핑 시작")
articles_kr = fetch_market_news()
count_kr = save_articles(articles_kr)
logger.info(f"스크래핑 완료: 국내 {count_kr}")
@app.get("/health")
def health():
return {"ok": True}
@app.get("/api/stock/news")
def get_news(limit: int = 20, category: str = None):
"""최신 주식 뉴스 조회 (category: 'domestic' | 'overseas')"""
return get_latest_articles(limit, category)
@app.get("/api/stock/indices")
def get_indices():
"""주요 지표(KOSPI 등) 실시간 크롤링 조회"""
return fetch_major_indices()
@app.post("/api/stock/scrap")
def trigger_scrap():
"""수동 스크랩 트리거"""
run_scraping_job()
return {"ok": True}
class NewsSummarizeRequest(BaseModel):
limit: Optional[int] = 10
@app.post("/api/stock/news/summarize")
async def summarize_latest_news(req: NewsSummarizeRequest = NewsSummarizeRequest()):
"""최근 뉴스를 Ollama(qwen3:14b)로 요약"""
limit = req.limit if (req and req.limit) else 10
articles = get_latest_articles(limit)
if not articles:
raise HTTPException(status_code=404, detail="요약할 뉴스가 없습니다.")
try:
result = await summarize_news(articles)
except OllamaError as e:
logger.error(f"뉴스 요약 실패: {e}")
raise HTTPException(status_code=500, detail=f"Ollama 호출 실패: {e}")
except Exception as e:
logger.exception("뉴스 요약 중 예상치 못한 오류")
raise HTTPException(status_code=500, detail=f"뉴스 요약 실패: {e}")
# 상위 기사 메타 일부만 노출 (클라이언트가 본문 조립에 사용)
top_articles = [
{
"title": (a.get("title") or "").strip(),
"link": a.get("link") or "",
"press": a.get("press") or "",
"pub_date": a.get("pub_date") or "",
}
for a in articles[:8]
]
return {
**result,
"article_count": len(articles),
"articles": top_articles,
}
# --- Trading API (Windows Proxy, 인증 필요) ---
@app.get("/api/trade/balance", dependencies=[Depends(verify_admin)])
def get_balance():
"""계좌 잔고 조회 (Windows AI Server Proxy)"""
logger.info(f"Requesting Balance from {WINDOWS_AI_SERVER_URL}")
resp = None
try:
resp = requests.get(f"{WINDOWS_AI_SERVER_URL}/trade/balance", timeout=5)
if resp.status_code != 200:
logger.error(f"Balance Error: {resp.status_code}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
return resp.json()
except ValueError:
status = resp.status_code if resp is not None else 502
return JSONResponse(status_code=status, content={"error": f"Upstream error {status}"})
except Exception as e:
logger.error(f"Balance Connection Failed: {e}")
return JSONResponse(status_code=500, content={"error": "Connection Failed"})
class OrderRequest(BaseModel):
ticker: str
action: str
quantity: int
price: int = 0
reason: Optional[str] = "Manual Order"
@app.post("/api/trade/order", dependencies=[Depends(verify_admin)])
def order_stock(req: OrderRequest):
"""주식 매수/매도 주문 (Windows AI Server Proxy)"""
logger.info(f"Order Request: {req.action} {req.ticker} x{req.quantity}")
resp = None
try:
resp = requests.post(f"{WINDOWS_AI_SERVER_URL}/trade/order", json=req.model_dump(), timeout=10)
if resp.status_code != 200:
logger.error(f"Order Error: {resp.status_code}")
return JSONResponse(status_code=resp.status_code, content=resp.json())
return resp.json()
except ValueError:
status = resp.status_code if resp is not None else 502
return JSONResponse(status_code=status, content={"error": f"Upstream error {status}"})
except Exception as e:
logger.error(f"Order Connection Failed: {e}")
return JSONResponse(status_code=500, content={"error": "Connection Failed"})
# --- AI Coach 프록시 (API 키를 서버에 보관) ---
class AiCoachRequest(BaseModel):
model: str = "claude-haiku-4-5-20251001"
prompt: str
max_tokens: int = 1024
@app.post("/api/stock/ai-coach")
def ai_coach(req: AiCoachRequest):
"""AI 포트폴리오 코치 — Anthropic API 프록시 (API 키 서버 보관)"""
if not ANTHROPIC_API_KEY:
raise HTTPException(503, "AI Coach not configured (ANTHROPIC_API_KEY missing)")
allowed_models = {"claude-haiku-4-5-20251001", "claude-sonnet-4-6"}
model = req.model if req.model in allowed_models else "claude-haiku-4-5-20251001"
try:
resp = requests.post(
"https://api.anthropic.com/v1/messages",
headers={
"Content-Type": "application/json",
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
},
json={
"model": model,
"max_tokens": req.max_tokens,
"messages": [{"role": "user", "content": req.prompt}],
},
timeout=30,
)
if resp.status_code != 200:
logger.error(f"Anthropic API error: {resp.status_code}")
return JSONResponse(status_code=resp.status_code, content={"error": "AI API error"})
return resp.json()
except requests.Timeout:
return JSONResponse(status_code=504, content={"error": "AI API timeout"})
except Exception as e:
logger.error(f"AI Coach error: {e}")
return JSONResponse(status_code=500, content={"error": "AI Coach failed"})
@app.get("/api/version")
def version():
return {"version": os.getenv("APP_VERSION", "dev")}
# --- Portfolio API ---
class PortfolioItemRequest(BaseModel):
broker: str
ticker: str
name: str
quantity: int
avg_price: int # 평균단가 (현재가 평가/손익 계산용)
purchase_price: Optional[int] = None # 매입가 (총 매입 금액 계산용, 미입력 시 avg_price로 자동 설정)
class PortfolioUpdateRequest(BaseModel):
broker: Optional[str] = None
ticker: Optional[str] = None
name: Optional[str] = None
quantity: Optional[int] = None
avg_price: Optional[int] = None
purchase_price: Optional[int] = None
@app.get("/api/portfolio")
def get_portfolio():
"""전체 포트폴리오 조회 (현재가 + 손익 + 예수금 포함)"""
items = get_all_portfolio()
cash_rows = get_all_broker_cash()
total_cash = sum(r["cash"] for r in cash_rows)
if not items:
return {
"holdings": [],
"cash": cash_rows,
"summary": {
"total_buy": 0,
"total_eval": 0,
"total_profit": 0,
"total_profit_rate": 0.0,
"total_cash": total_cash,
"total_assets": total_cash,
},
}
tickers = list({item["ticker"] for item in items})
details = get_current_prices_detail(tickers)
holdings = []
total_buy = 0 # 요약 표시용 (purchase_price 기반)
total_cost_basis = 0 # 손익률 계산용 (avg_price 기반)
total_eval = 0
for item in items:
detail = details.get(item["ticker"])
current_price = detail["price"] if detail else None
price_session = detail["session"] if detail else None
price_as_of = detail["as_of"] if detail else None
# avg_price: 평균단가 — 손익(평가금액 - 매입원가) 계산 기준
# purchase_price: 매입 단가(1주당) — 없으면 avg_price로 폴백 (CODE_REVIEW F4)
purchase_price = item.get("purchase_price") if item.get("purchase_price") is not None else item["avg_price"]
cost_basis = item["avg_price"] * item["quantity"]
# 총 매입 금액 = 단가 × 보유 수량. API_SPEC.md 예시(qty 100·avg 72000 → 7,200,000)와 일치
buy_amount = purchase_price * item["quantity"]
eval_amount = current_price * item["quantity"] if current_price is not None else None
profit_amount = (eval_amount - cost_basis) if eval_amount is not None else None
profit_rate = round((profit_amount / cost_basis) * 100, 2) if (profit_amount is not None and cost_basis) else None
holdings.append({
"id": item["id"],
"broker": item["broker"],
"ticker": item["ticker"],
"name": item["name"],
"quantity": item["quantity"],
"avg_price": item["avg_price"],
"purchase_price": purchase_price,
"current_price": current_price,
"price_session": price_session,
"price_as_of": price_as_of,
"eval_amount": eval_amount,
"profit_amount": profit_amount,
"profit_rate": profit_rate,
})
total_buy += buy_amount
total_cost_basis += cost_basis
if eval_amount is not None:
total_eval += eval_amount
# 손익은 실제 평균단가(cost_basis) 기준으로 계산
total_profit = total_eval - total_cost_basis
total_profit_rate = round((total_profit / total_cost_basis) * 100, 2) if total_cost_basis else 0.0
return {
"holdings": holdings,
"cash": cash_rows,
"summary": {
"total_buy": total_buy,
"total_eval": total_eval,
"total_profit": total_profit,
"total_profit_rate": total_profit_rate,
"total_cash": total_cash,
"total_assets": total_eval + total_cash,
},
}
def _augment_portfolio_with_pnl_pct(raw: dict) -> dict:
"""Add pnl_pct (ratio) to each holding and total_pnl_pct to summary."""
holdings = []
for h in raw["holdings"]:
pnl_pct = round(h["profit_rate"] / 100, 6) if h.get("profit_rate") is not None else None
holdings.append({**h, "pnl_pct": pnl_pct})
summary = dict(raw["summary"])
rate = summary.get("total_profit_rate")
summary["total_pnl_pct"] = round(rate / 100, 6) if rate is not None else 0.0
return {"holdings": holdings, "cash": raw["cash"], "summary": summary}
@app.get("/api/webai/portfolio", dependencies=[Depends(verify_webai_key)])
def get_webai_portfolio():
"""web-ai 전용 portfolio (인증 필수, pnl_pct 비율 필드 추가)."""
return _augment_portfolio_with_pnl_pct(get_portfolio())
def _fetch_news_sentiment_dump(date: str | None) -> dict:
"""news_sentiment 일별 dump (krx_master JOIN, score DESC)."""
from .db import _conn
conn = _conn()
try:
# 1) date resolve — None 이면 최신 date
if date is None:
row = conn.execute(
"SELECT MAX(date) FROM news_sentiment"
).fetchone()
date = row[0] if row and row[0] else None
if date is None:
return {"date": None, "count": 0, "items": []}
# 2) JOIN krx_master.name (없으면 ticker 그대로)
rows = conn.execute(
"""
SELECT ns.ticker,
COALESCE(km.name, ns.ticker) AS name,
ns.score_raw,
ns.reason,
ns.news_count,
ns.source
FROM news_sentiment ns
LEFT JOIN krx_master km ON km.ticker = ns.ticker
WHERE ns.date = ?
ORDER BY ns.score_raw DESC
""",
(date,)
).fetchall()
finally:
conn.close()
items = [
{"ticker": r[0], "name": r[1], "score": r[2],
"reason": r[3], "news_count": r[4], "source": r[5]}
for r in rows
]
return {"date": date, "count": len(items), "items": items}
@app.get("/api/webai/news-sentiment", dependencies=[Depends(verify_webai_key)])
def get_webai_news_sentiment(date: str | None = None):
"""web-ai 전용 news sentiment 일별 dump."""
return _fetch_news_sentiment_dump(date)
@app.post("/api/portfolio", status_code=201)
def create_portfolio_item(req: PortfolioItemRequest):
"""포트폴리오 종목 추가"""
item_id = add_portfolio_item(
req.broker, req.ticker, req.name, req.quantity, req.avg_price,
purchase_price=req.purchase_price,
)
return {"id": item_id, "ok": True}
# --- Broker Cash API ---
# /{item_id} 라우트보다 반드시 먼저 정의해야 /cash가 item_id로 매칭되지 않음
class BrokerCashRequest(BaseModel):
broker: str
cash: int
@app.get("/api/portfolio/cash")
def list_broker_cash():
"""증권사별 예수금 전체 조회"""
return get_all_broker_cash()
@app.put("/api/portfolio/cash")
def set_broker_cash(req: BrokerCashRequest):
"""증권사 예수금 등록 또는 수정 (upsert)"""
upsert_broker_cash(req.broker, req.cash)
return {"ok": True, "broker": req.broker, "cash": req.cash}
@app.delete("/api/portfolio/cash/{broker}")
def remove_broker_cash(broker: str):
"""증권사 예수금 삭제"""
if not delete_broker_cash(broker):
return JSONResponse(status_code=404, content={"error": "Broker not found"})
return {"ok": True}
@app.put("/api/portfolio/{item_id}")
def update_portfolio(item_id: int, req: PortfolioUpdateRequest):
"""포트폴리오 종목 수정"""
if get_portfolio_item(item_id) is None:
return JSONResponse(status_code=404, content={"error": "Item not found"})
update_portfolio_item(item_id, **req.model_dump())
return {"ok": True}
@app.delete("/api/portfolio/{item_id}")
def delete_portfolio(item_id: int):
"""포트폴리오 종목 삭제"""
if not delete_portfolio_item(item_id):
return JSONResponse(status_code=404, content={"error": "Item not found"})
return {"ok": True}
# --- Asset Snapshot API ---
@app.post("/api/portfolio/snapshot")
def create_snapshot():
"""총 자산 스냅샷 수동 저장 (오늘 날짜 기준)"""
today = date_type.today()
today_str = today.strftime("%Y-%m-%d")
items = get_all_portfolio()
cash_rows = get_all_broker_cash()
total_cash = sum(r["cash"] for r in cash_rows)
if items:
tickers = list({item["ticker"] for item in items})
prices = get_current_prices(tickers)
total_eval = sum(
prices.get(item["ticker"], item["avg_price"]) * item["quantity"]
for item in items
)
else:
total_eval = 0
total_assets = total_eval + total_cash
upsert_asset_snapshot(today_str, total_eval, total_cash, total_assets)
return {
"ok": True,
"snapshot": {
"date": today_str,
"total_eval": total_eval,
"total_cash": total_cash,
"total_assets": total_assets,
},
}
@app.get("/api/portfolio/snapshot/history")
def get_snapshot_history(days: int = Query(30, ge=0)):
"""총 자산 스냅샷 이력 조회 (days=0: 전체, days=N: 최근 N일)"""
snapshots = get_asset_snapshots(days)
return {"snapshots": snapshots}
# --- Sell History API ---
class SellHistoryRequest(BaseModel):
broker: str
ticker: str
name: str
quantity: int
avg_price: float
sell_price: float
commission: float = 0
buy_amount: float
sell_amount: float
realized_profit: float
realized_rate: float
sold_at: str
@app.get("/api/portfolio/sell-history")
def list_sell_history(broker: Optional[str] = None, days: Optional[int] = None):
"""매도 내역 조회 (broker, days 필터 선택)"""
records = get_sell_history(broker=broker, days=days)
return {"records": records}
@app.post("/api/portfolio/sell-history")
def create_sell_history(req: SellHistoryRequest):
"""매도 기록 저장"""
record = add_sell_history(req.model_dump())
return record
@app.put("/api/portfolio/sell-history/{record_id}")
def modify_sell_history(record_id: int, req: SellHistoryRequest):
"""매도 기록 수정"""
record = update_sell_history(record_id, req.model_dump())
if record is None:
return JSONResponse(status_code=404, content={"error": "not found"})
return record
@app.delete("/api/portfolio/sell-history/{record_id}")
def remove_sell_history(record_id: int):
"""매도 기록 삭제"""
if not delete_sell_history(record_id):
return JSONResponse(status_code=404, content={"error": "not found"})
return {"ok": True}