Files
ai-trade/signal_v1/modules/services/news.py
gahusb 7ea1a21487 refactor: web-ai V1 assets → signal_v1/ (graduation prep)
Atomic mv of root V1 assets (main_server.py + modules/ + data/ +
tests/ + entry scripts + docs + logs) into signal_v1/ subdirectory.
load_dotenv() updated to load web-ai/.env explicitly via Path.

Adds web-ai/CLAUDE.md (workspace guide) and web-ai/start.bat
(signal_v1 entry wrapper). Prepares for signal_v2/ Phase 2.

Tests: signal_v1/tests/unit baseline preserved (no regression).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 03:00:11 +09:00

123 lines
4.7 KiB
Python

import time
import requests
import xml.etree.ElementTree as ET
from typing import Optional
def _parse_items(root, max_items):
"""RSS item → [{title, url, pub_date, source}]"""
out = []
for item in root.findall(".//item")[:max_items]:
t = item.find("title")
l = item.find("link")
p = item.find("pubDate")
title = (t.text or "").strip() if t is not None else ""
url = (l.text or "").strip() if l is not None else ""
pub = (p.text or "").strip() if p is not None else ""
if not title:
continue
out.append({"title": title, "url": url, "pub_date": pub, "source": "Google News"})
return out
class NewsCollector:
"""동기 뉴스 수집 (Google News RSS)"""
@staticmethod
def get_market_news(query="주식 시장"):
url = f"https://news.google.com/rss/search?q={query}&hl=ko&gl=KR&ceid=KR:ko"
try:
resp = requests.get(url, timeout=5)
root = ET.fromstring(resp.content)
return _parse_items(root, 5)
except Exception as e:
print(f"[News] Collection failed: {e}")
return []
class AsyncNewsCollector:
"""비동기 뉴스 수집 + 5분 캐싱 + (옵션) 스냅샷 저장"""
def __init__(self, snapshot_store=None):
self._cache = None
self._cache_time = 0
self._cache_ttl = 300 # 5분
self._stock_cache = {} # {stock_name: (items, timestamp)}
self._snap = snapshot_store # NewsSnapshotStore | None
def _save_snapshot(self, items, query: str, ticker: Optional[str] = None):
if not self._snap or not items:
return
try:
self._snap.save_many(items, query=query, ticker=ticker)
except Exception as e:
print(f"[News] snapshot 저장 실패: {e}")
def get_market_news(self, query="주식 시장"):
"""동기 인터페이스 (하위 호환)"""
now = time.time()
if self._cache and (now - self._cache_time) < self._cache_ttl:
return self._cache
result = NewsCollector.get_market_news(query)
self._cache = result
self._cache_time = now
self._save_snapshot(result, query=query)
return result
async def get_market_news_async(self, query="주식 시장"):
"""비동기 뉴스 수집 (aiohttp + 캐싱)"""
now = time.time()
if self._cache and (now - self._cache_time) < self._cache_ttl:
return self._cache
try:
import aiohttp
url = f"https://news.google.com/rss/search?q={query}&hl=ko&gl=KR&ceid=KR:ko"
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
content = await resp.read()
root = ET.fromstring(content)
items = _parse_items(root, 5)
self._cache = items
self._cache_time = now
self._save_snapshot(items, query=query)
return items
except ImportError:
return self.get_market_news(query)
except Exception as e:
print(f"[News Async] Collection failed: {e}")
if self._cache:
return self._cache
return self.get_market_news(query)
async def get_stock_news_async(self, stock_name, max_items=3, ticker: Optional[str] = None):
"""종목별 뉴스 수집 (5분 캐싱)
stock_name: 종목 이름 (e.g. '삼성전자', 'SK하이닉스')
ticker: 스냅샷 저장 시 종목코드 (옵션)
"""
now = time.time()
cached = self._stock_cache.get(stock_name)
if cached and (now - cached[1]) < self._cache_ttl:
return cached[0]
try:
import aiohttp
import urllib.parse
query = urllib.parse.quote(f"{stock_name} 주가")
url = f"https://news.google.com/rss/search?q={query}&hl=ko&gl=KR&ceid=KR:ko"
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
content = await resp.read()
root = ET.fromstring(content)
items = _parse_items(root, max_items)
self._stock_cache[stock_name] = (items, now)
self._save_snapshot(items, query=f"{stock_name} 주가", ticker=ticker)
return items
except Exception as e:
print(f"[News] 종목 뉴스 수집 실패 ({stock_name}): {e}")
return []
def clear_stock_cache(self):
"""종목 뉴스 캐시 전체 초기화"""
self._stock_cache.clear()