주식 증권 api 연동 및 window pc AI 연동 기능 구현 시작

This commit is contained in:
2026-01-26 22:31:56 +09:00
parent 8283dab0de
commit c6850da4ac
6 changed files with 358 additions and 5 deletions

52
stock-lab/app/analysis.py Normal file
View File

@@ -0,0 +1,52 @@
import os
import requests
import json
class AIAnalyst:
"""Ollama API를 통한 주식 뉴스 분석"""
def __init__(self):
# NAS 외부의 Windows PC IP 주소
self.base_url = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
self.model = os.getenv("OLLAMA_MODEL", "llama3")
def _call_ollama(self, prompt: str) -> str:
url = f"{self.base_url}/api/generate"
payload = {
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.2, # 분석용이므로 창의성 낮춤
"num_ctx": 4096
}
}
try:
# 타임아웃 3분 (PC 사양 좋으므로 금방 될 것)
resp = requests.post(url, json=payload, timeout=180)
resp.raise_for_status()
data = resp.json()
return data.get("response", "").strip()
except Exception as e:
return f"Error analyzing: {str(e)}"
def analyze_market_summary(self, articles: list) -> str:
"""뉴스 헤드라인들을 모아 시장 분위기 요약"""
if not articles:
return "분석할 뉴스가 없습니다."
# 최신 10개만 추려서 전달
targets = articles[:10]
titles = "\n".join([f"- {a.get('title')}" for a in targets])
prompt = f"""
다음은 최근 한국 증시 주요 뉴스 헤드라인입니다:
{titles}
위 뉴스들을 바탕으로 현재 시장의 주요 이슈와 분위기를 3줄로 요약해주고,
전반적인 투자 심리가 '긍정/부정/중립' 중 어디에 가까운지 판단해주세요.
한국어로 답변해주세요.
"""
return self._call_ollama(prompt)

182
stock-lab/app/kis_api.py Normal file
View File

@@ -0,0 +1,182 @@
import os
import time
import requests
import json
from datetime import datetime
class KisApi:
"""한국투자증권 REST API 래퍼 (모의투자/실전투자 지원)"""
def __init__(self):
self.app_key = os.getenv("KIS_APP_KEY", "")
self.app_secret = os.getenv("KIS_APP_SECRET", "")
self.account_no = os.getenv("KIS_ACCOUNT_NO", "") # 계좌번호 앞 8자리
self.account_code = os.getenv("KIS_ACCOUNT_CODE", "01") # 계좌번호 뒤 2자리 (보통 01)
# 모의투자 여부 (환경변수가 "PROD"가 아니면 기본 모의투자)
self.is_prod = os.getenv("KIS_MODE", "DEV") == "PROD"
if self.is_prod:
self.base_url = "https://openapi.koreainvestment.com:9443"
else:
self.base_url = "https://openapivts.koreainvestment.com:29443"
self.access_token = None
self.token_expired_at = None
def _get_headers(self, tr_id=None):
"""공통 헤더 생성"""
self._ensure_token()
headers = {
"Content-Type": "application/json; charset=utf-8",
"authorization": f"Bearer {self.access_token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
}
if tr_id:
headers["tr_id"] = tr_id
return headers
def _ensure_token(self):
"""토큰 유효성 검사 및 재발급"""
now = time.time()
# 토큰이 없거나 만료 1분 전이면 재발급
if not self.access_token or not self.token_expired_at or now >= (self.token_expired_at - 60):
self._issue_token()
def _issue_token(self):
"""접근 토큰 발급"""
url = f"{self.base_url}/oauth2/tokenP"
payload = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"appsecret": self.app_secret
}
try:
res = requests.post(url, json=payload, timeout=10)
res.raise_for_status()
data = res.json()
self.access_token = data["access_token"]
# 만료 시간 (보통 24시간인데 여유 있게 계산)
expires_in = int(data.get("expires_in", 86400))
self.token_expired_at = time.time() + expires_in
print(f"[KisApi] Token issued. Expires in {expires_in} sec.")
except Exception as e:
print(f"[KisApi] Token issue failed: {e}")
raise
def get_balance(self):
"""주식 잔고 조회"""
url = f"{self.base_url}/uapi/domestic-stock/v1/trading/inquire-balance"
# 실전: TTTC8434R, 모의: VTTC8434R
tr_id = "TTTC8434R" if self.is_prod else "VTTC8434R"
headers = self._get_headers(tr_id=tr_id)
params = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_code, #보통 01
"AFHR_FLPR_YN": "N",
"OFL_YN": "",
"INQR_DVSN": "02",
"UNPR_DVSN": "01",
"FUND_STTL_ICLD_YN": "N",
"FNCG_AMT_AUTO_RDPT_YN": "N",
"PRCS_DVSN": "00", # 전일매매포함
"CTX_AREA_FK100": "",
"CTX_AREA_NK100": ""
}
try:
res = requests.get(url, headers=headers, params=params, timeout=10)
res.raise_for_status()
data = res.json()
# API 응답 코드가 0이 아니면 에러
if data["rt_cd"] != "0":
return {"error": data["msg1"]}
output1 = data.get("output1", []) # 종목별 잔고
output2 = data.get("output2", []) # 계좌 총 평가
holdings = []
for item in output1:
# 보유수량이 0인 것은 제외
qty = int(item.get("hldg_qty", 0))
if qty > 0:
holdings.append({
"code": item.get("pdno"),
"name": item.get("prdt_name"),
"qty": qty,
"buy_price": float(item.get("pchs_avg_pric", 0)), # 매입단가
"current_price": float(item.get("prpr", 0)), # 현재가
"profit_rate": float(item.get("evlu_pfls_rt", 0)),# 수익률
})
total_eval = 0
deposit = 0
if output2:
total_eval = float(output2[0].get("tot_evlu_amt", 0)) # 총 평가금액
deposit = float(output2[0].get("dnca_tot_amt", 0)) # 예수금
return {
"holdings": holdings,
"summary": {
"total_eval": total_eval,
"deposit": deposit
}
}
except Exception as e:
return {"error": str(e)}
def order(self, stock_code: str, qty: int, price: int, buy_sell: str):
"""
주문 실행
buy_sell: 'buy' or 'sell'
price: 0이면 시장가(추후 구현), 양수면 지정가
"""
url = f"{self.base_url}/uapi/domestic-stock/v1/trading/order-cash"
# 매수/매도 tr_id 구분
# 실전: 매수 TTTC0802U, 매도 TTTC0801U
# 모의: 매수 VTTC0802U, 매도 VTTC0801U
if self.is_prod:
tr_id = "TTTC0802U" if buy_sell == "buy" else "TTTC0801U"
else:
tr_id = "VTTC0802U" if buy_sell == "buy" else "VTTC0801U"
headers = self._get_headers(tr_id=tr_id)
# 가격이 0이면 시장가 "01", 아니면 지정가 "00"
ord_dvsn = "01" if price == 0 else "00"
payload = {
"CANO": self.account_no,
"ACNT_PRDT_CD": self.account_code,
"PDNO": stock_code, # 종목코드 (6자리)
"ORD_DVSN": ord_dvsn, # 주문구분
"ORD_QTY": str(qty), # 주문수량 (문자열)
"ORD_UNPR": str(price), # 주문단가 (문자열)
}
try:
res = requests.post(url, headers=headers, json=payload, timeout=10)
res.raise_for_status()
data = res.json()
if data["rt_cd"] != "0":
return {"success": False, "message": data["msg1"]}
return {
"success": True,
"message": data["msg1"],
"order_no": data["output"]["ODNO"] # 주문번호
}
except Exception as e:
return {"success": False, "message": str(e)}

View File

@@ -2,11 +2,17 @@ import os
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from .db import init_db, save_articles, get_latest_articles
from .db import init_db, save_articles, get_latest_articles
from .scraper import fetch_market_news, fetch_major_indices, fetch_overseas_news
from .kis_api import KisApi
from .analysis import AIAnalyst
from pydantic import BaseModel
app = FastAPI()
scheduler = BackgroundScheduler(timezone=os.getenv("TZ", "Asia/Seoul"))
kis = KisApi()
analyst = AIAnalyst()
@app.on_event("startup")
def on_startup():
@@ -53,8 +59,37 @@ def get_indices():
def trigger_scrap():
"""수동 스크랩 트리거"""
run_scraping_job()
run_scraping_job()
return {"ok": True}
# --- Trading API ---
@app.get("/api/trade/balance")
def get_balance():
"""계좌 잔고 조회 (보유주식 + 예수금)"""
return kis.get_balance()
class OrderRequest(BaseModel):
code: str
qty: int
price: int = 0 # 0이면 시장가
type: str # 'buy' or 'sell'
@app.post("/api/trade/order")
def order_stock(req: OrderRequest):
"""주식 매수/매도 주문"""
if req.type not in ["buy", "sell"]:
return {"success": False, "message": "Invalid type (buy/sell)"}
return kis.order(req.code, req.qty, req.price, req.type)
@app.get("/api/stock/analyze")
def analyze_market():
"""최신 뉴스를 기반으로 AI 시장 요약"""
articles = get_latest_articles(20)
result = analyst.analyze_market_summary(articles)
return {"analysis": result, "model": analyst.model}
@app.get("/api/version")
def version():
return {"version": os.getenv("APP_VERSION", "dev")}