import json import os from datetime import datetime from dotenv import load_dotenv from modules.services.kis import KISClient from modules.services.ollama import OllamaManager from modules.services.news import NewsCollector load_dotenv() class WatchlistManager: """ 매일 아침 8시에 뉴스와 시장 동향을 분석하여 Watchlist를 자동으로 업데이트하는 관리자 """ def __init__(self, kis_client, watchlist_file="watchlist.json"): self.kis = kis_client self.watchlist_file = watchlist_file self.ollama = OllamaManager() self.news = NewsCollector() # 섹터별 대표 종목 풀 (30개) self.sector_pool = { # 반도체/전자 (8개) "005930": "삼성전자", "000660": "SK하이닉스", "006400": "삼성SDI", "009150": "삼성전기", "042700": "한미반도체", "403870": "HPSP", "357780": "솔브레인", "058470": "리노공업", # IT/플랫폼 (5개) "035420": "NAVER", "035720": "카카오", "017670": "SK텔레콤", "030200": "KT", "259960": "크래프톤", # 2차전지/화학 (5개) "373220": "LG에너지솔루션", "051910": "LG화학", "096770": "SK이노베이션", "066970": "엘앤에프", "247540": "에코프로비엠", # 바이오/제약 (4개) "207940": "삼성바이오로직스", "068270": "셀트리온", "326030": "SK바이오팜", "196170": "알테오젠", # 금융 (3개) "105560": "KB금융", "055550": "신한지주", "086790": "하나금융지주", # 자동차/중공업 (3개) "005380": "현대차", "000270": "기아", "034020": "두산에너빌리티", # 기타 (2개) "005490": "POSCO홀딩스", "028260": "삼성물산" } def load_watchlist(self): """현재 Watchlist 로드""" if not os.path.exists(self.watchlist_file): return {} with open(self.watchlist_file, "r", encoding="utf-8") as f: return json.load(f) def save_watchlist(self, watchlist): """Watchlist 저장""" with open(self.watchlist_file, "w", encoding="utf-8") as f: json.dump(watchlist, f, ensure_ascii=False, indent=4) print(f"✅ [Watchlist] Updated: {len(watchlist)} stocks") def analyze_market_trends(self): """ 뉴스와 시장 데이터를 분석하여 주목해야 할 섹터/종목 파악 Returns: dict {sector: priority_score} """ print("📰 [Watchlist] Analyzing market news and trends...") # 1. 최신 뉴스 수집 news_items = self.news.get_market_news("주식 시장 경제 뉴스") # 2. AI에게 섹터 분석 요청 prompt = f""" [System Instruction] You are a market analyst. Analyze today's news and identify which sectors are HOT. News Data: {json.dumps(news_items[:10], ensure_ascii=False)} Task: 1. Identify top 3 sectors that will perform well today 2. Rate each sector's priority (0.0 to 1.0) Output Format (JSON only): {{ "semiconductor": 0.9, "battery": 0.7, "bio": 0.5, "it": 0.6, "finance": 0.4, "auto": 0.3 }} """ ai_response = self.ollama.request_inference(prompt) sector_scores = { "semiconductor": 0.5, "battery": 0.5, "bio": 0.5, "it": 0.5, "finance": 0.5, "auto": 0.5 } try: sector_scores = json.loads(ai_response) print(f"🧠 [AI Analysis] Sector Scores: {sector_scores}") except: print("⚠️ [AI] Failed to parse sector analysis, using defaults") return sector_scores def get_volume_leaders(self, limit=10): """거래량 상위 종목 조회""" try: hot_stocks = self.kis.get_volume_rank(limit=limit) return {item['code']: item['name'] for item in hot_stocks} except Exception as e: print(f"⚠️ [Watchlist] Volume rank failed: {e}") return {} def update_watchlist_daily(self): """ 매일 아침 Watchlist를 업데이트하는 메인 로직 - 뉴스 분석 기반 섹터 선정 - 거래량 상위 종목 추가 - 20~30개 유지 """ print("🔄 [Watchlist] Starting daily update...") # 1. 현재 Watchlist 로드 current_watchlist = self.load_watchlist() print(f"📋 [Current] {len(current_watchlist)} stocks in watchlist") # 2. 시장 트렌드 분석 (AI 기반) sector_scores = self.analyze_market_trends() # 3. 섹터별 우선순위에 따라 종목 선정 new_watchlist = {} # 섹터별 매핑 sector_mapping = { "semiconductor": ["005930", "000660", "006400", "009150", "042700", "403870", "357780", "058470"], "it": ["035420", "035720", "017670", "030200", "259960"], "battery": ["373220", "051910", "096770", "066970", "247540"], "bio": ["207940", "068270", "326030", "196170"], "finance": ["105560", "055550", "086790"], "auto": ["005380", "000270", "034020"] } # 섹터 점수 기준 정렬 sorted_sectors = sorted(sector_scores.items(), key=lambda x: x[1], reverse=True) # 상위 섹터부터 종목 추가 (최대 25개) for sector, score in sorted_sectors: if sector not in sector_mapping: continue # 섹터 점수에 따라 종목 수 결정 if score >= 0.8: num_stocks = 5 # 핫한 섹터는 5개 elif score >= 0.6: num_stocks = 3 # 보통 섹터는 3개 else: num_stocks = 2 # 약한 섹터는 2개 sector_codes = sector_mapping[sector][:num_stocks] for code in sector_codes: if code in self.sector_pool: new_watchlist[code] = self.sector_pool[code] if len(new_watchlist) >= 25: break if len(new_watchlist) >= 25: break # 4. 거래량 상위 종목 추가 (최대 5개) volume_leaders = self.get_volume_leaders(limit=10) added_from_volume = 0 for code, name in volume_leaders.items(): if code not in new_watchlist and len(new_watchlist) < 30: new_watchlist[code] = name added_from_volume += 1 print(f" ✨ Added from volume rank: {name} ({code})") if added_from_volume >= 5: break # 5. 저장 self.save_watchlist(new_watchlist) # 6. 변경 사항 요약 added = set(new_watchlist.keys()) - set(current_watchlist.keys()) removed = set(current_watchlist.keys()) - set(new_watchlist.keys()) summary = f"📊 **[Watchlist Updated]**\n" summary += f"Total: {len(new_watchlist)} stocks\n\n" if added: summary += f"➕ **Added ({len(added)}):**\n" for code in added: summary += f" • {new_watchlist[code]} ({code})\n" if removed: summary += f"\n➖ **Removed ({len(removed)}):**\n" for code in removed: summary += f" • {current_watchlist[code]} ({code})\n" print(summary) return summary if __name__ == "__main__": # 테스트 실행 kis = KISClient() manager = WatchlistManager(kis) manager.update_watchlist_daily()