Files
ai-trade/watchlist_manager.py

243 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
from datetime import datetime
from dotenv import load_dotenv
from modules.services.kis import KISClient
from modules.services.ollama import OllamaManager
from modules.services.news import NewsCollector
load_dotenv()
class WatchlistManager:
"""
매일 아침 8시에 뉴스와 시장 동향을 분석하여
Watchlist를 자동으로 업데이트하는 관리자
"""
def __init__(self, kis_client, watchlist_file="watchlist.json"):
self.kis = kis_client
self.watchlist_file = watchlist_file
self.ollama = OllamaManager()
self.news = NewsCollector()
# 섹터별 대표 종목 풀 (30개)
self.sector_pool = {
# 반도체/전자 (8개)
"005930": "삼성전자",
"000660": "SK하이닉스",
"006400": "삼성SDI",
"009150": "삼성전기",
"042700": "한미반도체",
"403870": "HPSP",
"357780": "솔브레인",
"058470": "리노공업",
# IT/플랫폼 (5개)
"035420": "NAVER",
"035720": "카카오",
"017670": "SK텔레콤",
"030200": "KT",
"259960": "크래프톤",
# 2차전지/화학 (5개)
"373220": "LG에너지솔루션",
"051910": "LG화학",
"096770": "SK이노베이션",
"066970": "엘앤에프",
"247540": "에코프로비엠",
# 바이오/제약 (4개)
"207940": "삼성바이오로직스",
"068270": "셀트리온",
"326030": "SK바이오팜",
"196170": "알테오젠",
# 금융 (3개)
"105560": "KB금융",
"055550": "신한지주",
"086790": "하나금융지주",
# 자동차/중공업 (3개)
"005380": "현대차",
"000270": "기아",
"034020": "두산에너빌리티",
# 기타 (2개)
"005490": "POSCO홀딩스",
"028260": "삼성물산"
}
def load_watchlist(self):
"""현재 Watchlist 로드"""
if not os.path.exists(self.watchlist_file):
return {}
with open(self.watchlist_file, "r", encoding="utf-8") as f:
return json.load(f)
def save_watchlist(self, watchlist):
"""Watchlist 저장"""
with open(self.watchlist_file, "w", encoding="utf-8") as f:
json.dump(watchlist, f, ensure_ascii=False, indent=4)
print(f"✅ [Watchlist] Updated: {len(watchlist)} stocks")
def analyze_market_trends(self):
"""
뉴스와 시장 데이터를 분석하여 주목해야 할 섹터/종목 파악
Returns: dict {sector: priority_score}
"""
print("📰 [Watchlist] Analyzing market news and trends...")
# 1. 최신 뉴스 수집
news_items = self.news.get_market_news("주식 시장 경제 뉴스")
# 2. AI에게 섹터 분석 요청
prompt = f"""
[System Instruction]
You are a market analyst. Analyze today's news and identify which sectors are HOT.
News Data:
{json.dumps(news_items[:10], ensure_ascii=False)}
Task:
1. Identify top 3 sectors that will perform well today
2. Rate each sector's priority (0.0 to 1.0)
Output Format (JSON only):
{{
"semiconductor": 0.9,
"battery": 0.7,
"bio": 0.5,
"it": 0.6,
"finance": 0.4,
"auto": 0.3
}}
"""
ai_response = self.ollama.request_inference(prompt)
sector_scores = {
"semiconductor": 0.5,
"battery": 0.5,
"bio": 0.5,
"it": 0.5,
"finance": 0.5,
"auto": 0.5
}
try:
sector_scores = json.loads(ai_response)
print(f"🧠 [AI Analysis] Sector Scores: {sector_scores}")
except:
print("⚠️ [AI] Failed to parse sector analysis, using defaults")
return sector_scores
def get_volume_leaders(self, limit=10):
"""거래량 상위 종목 조회"""
try:
hot_stocks = self.kis.get_volume_rank(limit=limit)
return {item['code']: item['name'] for item in hot_stocks}
except Exception as e:
print(f"⚠️ [Watchlist] Volume rank failed: {e}")
return {}
def update_watchlist_daily(self):
"""
매일 아침 Watchlist를 업데이트하는 메인 로직
- 뉴스 분석 기반 섹터 선정
- 거래량 상위 종목 추가
- 20~30개 유지
"""
print("🔄 [Watchlist] Starting daily update...")
# 1. 현재 Watchlist 로드
current_watchlist = self.load_watchlist()
print(f"📋 [Current] {len(current_watchlist)} stocks in watchlist")
# 2. 시장 트렌드 분석 (AI 기반)
sector_scores = self.analyze_market_trends()
# 3. 섹터별 우선순위에 따라 종목 선정
new_watchlist = {}
# 섹터별 매핑
sector_mapping = {
"semiconductor": ["005930", "000660", "006400", "009150", "042700", "403870", "357780", "058470"],
"it": ["035420", "035720", "017670", "030200", "259960"],
"battery": ["373220", "051910", "096770", "066970", "247540"],
"bio": ["207940", "068270", "326030", "196170"],
"finance": ["105560", "055550", "086790"],
"auto": ["005380", "000270", "034020"]
}
# 섹터 점수 기준 정렬
sorted_sectors = sorted(sector_scores.items(), key=lambda x: x[1], reverse=True)
# 상위 섹터부터 종목 추가 (최대 25개)
for sector, score in sorted_sectors:
if sector not in sector_mapping:
continue
# 섹터 점수에 따라 종목 수 결정
if score >= 0.8:
num_stocks = 5 # 핫한 섹터는 5개
elif score >= 0.6:
num_stocks = 3 # 보통 섹터는 3개
else:
num_stocks = 2 # 약한 섹터는 2개
sector_codes = sector_mapping[sector][:num_stocks]
for code in sector_codes:
if code in self.sector_pool:
new_watchlist[code] = self.sector_pool[code]
if len(new_watchlist) >= 25:
break
if len(new_watchlist) >= 25:
break
# 4. 거래량 상위 종목 추가 (최대 5개)
volume_leaders = self.get_volume_leaders(limit=10)
added_from_volume = 0
for code, name in volume_leaders.items():
if code not in new_watchlist and len(new_watchlist) < 30:
new_watchlist[code] = name
added_from_volume += 1
print(f" ✨ Added from volume rank: {name} ({code})")
if added_from_volume >= 5:
break
# 5. 저장
self.save_watchlist(new_watchlist)
# 6. 변경 사항 요약
added = set(new_watchlist.keys()) - set(current_watchlist.keys())
removed = set(current_watchlist.keys()) - set(new_watchlist.keys())
summary = f"📊 **[Watchlist Updated]**\n"
summary += f"Total: {len(new_watchlist)} stocks\n\n"
if added:
summary += f" **Added ({len(added)}):**\n"
for code in added:
summary += f"{new_watchlist[code]} ({code})\n"
if removed:
summary += f"\n **Removed ({len(removed)}):**\n"
for code in removed:
summary += f"{current_watchlist[code]} ({code})\n"
print(summary)
return summary
if __name__ == "__main__":
# 테스트 실행
kis = KISClient()
manager = WatchlistManager(kis)
manager.update_watchlist_daily()