Compare commits
5 Commits
94cddccaa7
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 36e8d11060 | |||
| db6fed72b3 | |||
| 7cce5c422f | |||
| 94beecbfaf | |||
| 98b17f3a3a |
@@ -268,6 +268,7 @@ docker compose up -d
|
|||||||
| POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 |
|
| POST/GET | `/api/music/compile` (+ `/compiles/{id}/export`) | 컴파일 |
|
||||||
| POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 |
|
| POST/GET/DELETE | `/api/music/video-project` (+ `/{id}/render`, `/export`) | 영상 프로젝트 |
|
||||||
| ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) |
|
| ALL | `/api/music/pipeline` (생성/start/feedback/cancel/publish/retry/telegram-msg/lookup) | YouTube 자동화 파이프라인. `POST /{id}/retry`=실패 step 재개(publish+업로드완료 시 409) |
|
||||||
|
| DELETE | `/api/music/pipeline/{id}` | 파이프라인 행 하드 삭제(자식 jobs/feedback 포함, 전체 목록에서 제거). 없으면 404 |
|
||||||
| GET/PUT | `/api/music/setup` | 파이프라인 설정 |
|
| GET/PUT | `/api/music/setup` | 파이프라인 설정 |
|
||||||
| GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth |
|
| GET | `/api/music/youtube/auth-url`, `/callback`, `/status`; POST `/disconnect` | YouTube OAuth |
|
||||||
| GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 |
|
| GET/POST/PUT/DELETE | `/api/music/revenue` (+ `/dashboard`) | 수익 기록 |
|
||||||
@@ -347,7 +348,7 @@ docker compose up -d
|
|||||||
|
|
||||||
### agent-office (agent-office/)
|
### agent-office (agent-office/)
|
||||||
AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출, 실시간 WebSocket + 텔레그램 봇.
|
AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출, 실시간 WebSocket + 텔레그램 봇.
|
||||||
- 핵심 파일: `main.py`, `db.py`, `config.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `agents/`(stock/music/realestate/youtube/youtube_publisher/lotto/base)
|
- 핵심 파일: `main.py`, `db.py`, `config.py`, `websocket_manager.py`, `service_proxy.py`, `telegram_bot.py`, `scheduler.py`, `node_monitor.py`(분산 워커 관측 집계+경보), `agents/`(stock/music/realestate/youtube/youtube_publisher/lotto/base)
|
||||||
- 에이전트 7종 레지스트리. 명령 API body 필드명 → `reference_agent_office_command_api.md`
|
- 에이전트 7종 레지스트리. 명령 API body 필드명 → `reference_agent_office_command_api.md`
|
||||||
- 📌 상세(DB 9테이블·FSM·전체 cron 목록·AGENT_CONTAINER_MAP·텔레그램 캐싱·env): **`service_agent_office.md`**
|
- 📌 상세(DB 9테이블·FSM·전체 cron 목록·AGENT_CONTAINER_MAP·텔레그램 캐싱·env): **`service_agent_office.md`**
|
||||||
|
|
||||||
@@ -362,6 +363,7 @@ AI 에이전트 가상 오피스 — 기존 서비스 API를 프록시로 호출
|
|||||||
| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook (realestate_bookmark_* 콜백 포함) |
|
| POST | `/api/agent-office/telegram/webhook` | 텔레그램 Webhook (realestate_bookmark_* 콜백 포함) |
|
||||||
| POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 |
|
| POST | `/api/agent-office/realestate/notify` | realestate-lab 전용 push 수신 → 텔레그램 |
|
||||||
| GET | `/api/agent-office/states` | 전체 에이전트 상태 |
|
| GET | `/api/agent-office/states` | 전체 에이전트 상태 |
|
||||||
|
| GET | `/api/agent-office/nodes` | 분산 워커(NAS↔Windows) 관측 — heartbeat 생사+큐깊이+dead-letter 집계 (web-ui `/infra` Three.js 시각화 소비). 상세 → `infra_distributed_workers.md` |
|
||||||
| GET | `/api/agent-office/activity` | 전 에이전트 통합 활동 피드 (tasks+logs UNION). 필터 `agent_id`/`type`(task\|log)/`status`/`days` + `limit`/`offset` |
|
| GET | `/api/agent-office/activity` | 전 에이전트 통합 활동 피드 (tasks+logs UNION). 필터 `agent_id`/`type`(task\|log)/`status`/`days` + `limit`/`offset` |
|
||||||
| GET | `/api/agent-office/conversation/stats` | 텔레그램 대화 토큰·캐시 통계 (`days`) |
|
| GET | `/api/agent-office/conversation/stats` | 텔레그램 대화 토큰·캐시 통계 (`days`) |
|
||||||
| POST/GET | `/api/agent-office/youtube/research` (+ `/status`) | YouTube 트렌드 수집 트리거/상태 |
|
| POST/GET | `/api/agent-office/youtube/research` (+ `/status`) | YouTube 트렌드 수집 트리거/상태 |
|
||||||
|
|||||||
@@ -4,7 +4,12 @@ import logging
|
|||||||
from .base import BaseAgent
|
from .base import BaseAgent
|
||||||
from . import classify_intent
|
from . import classify_intent
|
||||||
from .. import service_proxy
|
from .. import service_proxy
|
||||||
from ..db import add_log
|
from ..db import (
|
||||||
|
add_log,
|
||||||
|
get_notified_failed_pipelines,
|
||||||
|
add_notified_failed_pipeline,
|
||||||
|
prune_notified_failed_pipelines,
|
||||||
|
)
|
||||||
from ..telegram.messaging import send_raw
|
from ..telegram.messaging import send_raw
|
||||||
|
|
||||||
logger = logging.getLogger("agent-office.youtube_publisher")
|
logger = logging.getLogger("agent-office.youtube_publisher")
|
||||||
@@ -25,8 +30,9 @@ class YoutubePublisherAgent(BaseAgent):
|
|||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
# 진행 중(*_pending) 승인 요청 dedup — 인메모리 유지(의도적).
|
||||||
|
# 재시작 시 살아있는 파이프라인 승인 재알림은 유용한 리마인더라 스팸 아님.
|
||||||
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
self._notified_state_per_pipeline: dict[int, tuple] = {}
|
||||||
self._notified_failed: set[int] = set()
|
|
||||||
|
|
||||||
async def poll_state_changes(self) -> None:
|
async def poll_state_changes(self) -> None:
|
||||||
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
|
"""주기적으로 호출되어 *_pending 신규 진입 시 텔레그램 발송."""
|
||||||
@@ -52,18 +58,21 @@ class YoutubePublisherAgent(BaseAgent):
|
|||||||
try:
|
try:
|
||||||
failed = await service_proxy.list_failed_pipelines()
|
failed = await service_proxy.list_failed_pipelines()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# 일시적 폴링 실패를 "failed 없음"으로 오해하면 원장을 비워 재알림 스팸이 남.
|
||||||
|
# → 원장을 건드리지 않고 조용히 종료(다음 폴링에서 재시도).
|
||||||
logger.warning("failed 폴링 실패: %s", e)
|
logger.warning("failed 폴링 실패: %s", e)
|
||||||
failed = []
|
return
|
||||||
|
notified = get_notified_failed_pipelines()
|
||||||
for p in failed:
|
for p in failed:
|
||||||
pid = p.get("id")
|
pid = p.get("id")
|
||||||
if pid is None:
|
if pid is None:
|
||||||
continue
|
continue
|
||||||
if pid not in self._notified_failed:
|
if pid not in notified:
|
||||||
await self._notify_failed(p)
|
await self._notify_failed(p)
|
||||||
self._notified_failed.add(pid)
|
add_notified_failed_pipeline(pid)
|
||||||
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 해제
|
# 재개되어 failed에서 벗어난 파이프라인은 재알림 가능하도록 원장에서 제거
|
||||||
failed_ids = {p.get("id") for p in failed}
|
failed_ids = {p.get("id") for p in failed if p.get("id") is not None}
|
||||||
self._notified_failed &= failed_ids
|
prune_notified_failed_pipelines(failed_ids)
|
||||||
|
|
||||||
async def _notify_failed(self, p: dict) -> None:
|
async def _notify_failed(self, p: dict) -> None:
|
||||||
reason = p.get("failed_reason") or "?"
|
reason = p.get("failed_reason") or "?"
|
||||||
|
|||||||
@@ -158,6 +158,12 @@ def init_db() -> None:
|
|||||||
CREATE INDEX IF NOT EXISTS idx_tarot_favorite
|
CREATE INDEX IF NOT EXISTS idx_tarot_favorite
|
||||||
ON tarot_readings(favorite, created_at DESC)
|
ON tarot_readings(favorite, created_at DESC)
|
||||||
""")
|
""")
|
||||||
|
conn.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS notified_failed_pipelines (
|
||||||
|
pipeline_id INTEGER PRIMARY KEY,
|
||||||
|
notified_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
|
||||||
|
)
|
||||||
|
""")
|
||||||
# Seed default agent configs
|
# Seed default agent configs
|
||||||
for agent_id, name in [
|
for agent_id, name in [
|
||||||
("stock", "주식 트레이더"),
|
("stock", "주식 트레이더"),
|
||||||
@@ -826,6 +832,47 @@ def get_all_baselines() -> List[Dict[str, Any]]:
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
# --- notified_failed_pipelines (파이프라인 실패 알림 dedup 원장, 재시작 지속) ---
|
||||||
|
|
||||||
|
def get_notified_failed_pipelines() -> set:
|
||||||
|
"""이미 실패 알림을 발송한 pipeline_id 집합."""
|
||||||
|
with _conn() as conn:
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT pipeline_id FROM notified_failed_pipelines"
|
||||||
|
).fetchall()
|
||||||
|
return {r["pipeline_id"] for r in rows}
|
||||||
|
|
||||||
|
|
||||||
|
def add_notified_failed_pipeline(pipeline_id: int) -> None:
|
||||||
|
with _conn() as conn:
|
||||||
|
conn.execute(
|
||||||
|
"INSERT OR IGNORE INTO notified_failed_pipelines(pipeline_id) VALUES(?)",
|
||||||
|
(pipeline_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def prune_notified_failed_pipelines(active_failed_ids) -> None:
|
||||||
|
"""현재 failed 목록에 없는 pipeline_id를 원장에서 제거.
|
||||||
|
|
||||||
|
재개되어 failed에서 벗어난 파이프라인이 다시 실패하면 재알림 가능하도록 함.
|
||||||
|
(기존 인메모리 `_notified_failed &= failed_ids`의 영속 버전)
|
||||||
|
"""
|
||||||
|
keep = set(active_failed_ids)
|
||||||
|
with _conn() as conn:
|
||||||
|
existing = {
|
||||||
|
r["pipeline_id"]
|
||||||
|
for r in conn.execute(
|
||||||
|
"SELECT pipeline_id FROM notified_failed_pipelines"
|
||||||
|
).fetchall()
|
||||||
|
}
|
||||||
|
stale = existing - keep
|
||||||
|
for pid in stale:
|
||||||
|
conn.execute(
|
||||||
|
"DELETE FROM notified_failed_pipelines WHERE pipeline_id=?",
|
||||||
|
(pid,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
|
def get_tasks_by_agent_date_kind(agent_id: str, date_iso: str, task_type: str) -> List[Dict[str, Any]]:
|
||||||
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
|
"""같은 (agent, date, task_type)으로 이미 생성된 task 조회. 멱등 guard."""
|
||||||
with _conn() as conn:
|
with _conn() as conn:
|
||||||
|
|||||||
@@ -14,13 +14,20 @@ from unittest.mock import AsyncMock, patch
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def _init_db():
|
def _init_db(monkeypatch):
|
||||||
import gc
|
import gc
|
||||||
gc.collect()
|
gc.collect()
|
||||||
if os.path.exists(_TMP):
|
# config.DB_PATH는 첫 import 시 1회 고정되므로, 다른 테스트 파일과 조합 실행 시
|
||||||
os.remove(_TMP)
|
# db가 이 파일의 _TMP가 아닌 다른 경로를 쓸 수 있다. db.DB_PATH를 이 파일 전용으로
|
||||||
from app.db import init_db
|
# 강제해 영속 테이블(notified_failed_pipelines 등)의 테스트 간 누수를 결정적으로 차단.
|
||||||
init_db()
|
import app.db as _db
|
||||||
|
monkeypatch.setattr(_db, "DB_PATH", _TMP)
|
||||||
|
# WAL 사이드카(-wal/-shm)까지 지워야 영속 상태가 남지 않음
|
||||||
|
for suffix in ("", "-wal", "-shm"):
|
||||||
|
p = _TMP + suffix
|
||||||
|
if os.path.exists(p):
|
||||||
|
os.remove(p)
|
||||||
|
_db.init_db()
|
||||||
yield
|
yield
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
@@ -211,3 +218,70 @@ async def test_failed_poll_exception_is_silent():
|
|||||||
|
|
||||||
# active 알림은 정상 발송
|
# active 알림은 정상 발송
|
||||||
assert sent.await_count == 1
|
assert sent.await_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_failed_notification_persists_across_restart():
|
||||||
|
"""컨테이너 재시작(새 에이전트 인스턴스)해도 이미 알린 failed는 재알림하지 않음."""
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
failed_pipeline = {
|
||||||
|
"id": 3,
|
||||||
|
"state": "failed",
|
||||||
|
"failed_reason": "video: timeout",
|
||||||
|
"track_title": "beat music v2",
|
||||||
|
}
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=AsyncMock(return_value=[failed_pipeline]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
agent1 = YoutubePublisherAgent()
|
||||||
|
await agent1.poll_state_changes()
|
||||||
|
# 컨테이너 재시작 시뮬레이션: 완전히 새로운 인스턴스(인메모리 상태 소실)
|
||||||
|
agent2 = YoutubePublisherAgent()
|
||||||
|
await agent2.poll_state_changes()
|
||||||
|
|
||||||
|
# 재시작해도 DB 원장으로 중복 방지 → 1회만 알림
|
||||||
|
assert sent.await_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_transient_failed_poll_keeps_ledger():
|
||||||
|
"""failed 폴링이 일시적으로 예외를 던져도 원장을 비우지 않아 다음 폴링에서 재알림하지 않음."""
|
||||||
|
from app.agents.youtube_publisher import YoutubePublisherAgent
|
||||||
|
|
||||||
|
failed_pipeline = {
|
||||||
|
"id": 3,
|
||||||
|
"state": "failed",
|
||||||
|
"failed_reason": "video: timeout",
|
||||||
|
"track_title": "beat music v2",
|
||||||
|
}
|
||||||
|
list_failed = AsyncMock(
|
||||||
|
side_effect=[[failed_pipeline], Exception("boom"), [failed_pipeline]]
|
||||||
|
)
|
||||||
|
sent = AsyncMock(return_value={"ok": True, "message_id": 1})
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_active_pipelines",
|
||||||
|
new=AsyncMock(return_value=[]),
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.service_proxy.list_failed_pipelines",
|
||||||
|
new=list_failed,
|
||||||
|
), patch(
|
||||||
|
"app.agents.youtube_publisher.send_raw",
|
||||||
|
new=sent,
|
||||||
|
):
|
||||||
|
agent = YoutubePublisherAgent()
|
||||||
|
await agent.poll_state_changes() # #3 최초 알림
|
||||||
|
await agent.poll_state_changes() # 예외 → 원장 유지되어야 (섣부른 정리 금지)
|
||||||
|
await agent.poll_state_changes() # #3 여전히 failed → 재알림 없어야
|
||||||
|
|
||||||
|
assert sent.await_count == 1
|
||||||
|
|||||||
@@ -464,7 +464,7 @@ services:
|
|||||||
- "6379:6379"
|
- "6379:6379"
|
||||||
volumes:
|
volumes:
|
||||||
- ${RUNTIME_PATH}/redis-data:/data
|
- ${RUNTIME_PATH}/redis-data:/data
|
||||||
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
|
command: redis-server --appendonly yes --save "" --stop-writes-on-bgsave-error no --maxmemory 256mb --maxmemory-policy allkeys-lru
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "redis-cli", "ping"]
|
test: ["CMD", "redis-cli", "ping"]
|
||||||
interval: 60s
|
interval: 60s
|
||||||
|
|||||||
@@ -1100,6 +1100,19 @@ def get_pipeline(pid: int) -> Optional[Dict[str, Any]]:
|
|||||||
return _parse_pipeline_row(row)
|
return _parse_pipeline_row(row)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_pipeline(pid: int) -> bool:
|
||||||
|
"""파이프라인과 자식행(pipeline_feedback, pipeline_jobs)을 하드 삭제.
|
||||||
|
|
||||||
|
SQLite FK를 강제하지 않으므로 자식행을 명시적으로 먼저 삭제한다.
|
||||||
|
파이프라인이 존재했으면 True, 없었으면 False.
|
||||||
|
"""
|
||||||
|
with _conn() as conn:
|
||||||
|
conn.execute("DELETE FROM pipeline_feedback WHERE pipeline_id = ?", (pid,))
|
||||||
|
conn.execute("DELETE FROM pipeline_jobs WHERE pipeline_id = ?", (pid,))
|
||||||
|
cur = conn.execute("DELETE FROM video_pipelines WHERE id = ?", (pid,))
|
||||||
|
return cur.rowcount > 0
|
||||||
|
|
||||||
|
|
||||||
def update_pipeline_state(pid: int, state: str, **fields) -> None:
|
def update_pipeline_state(pid: int, state: str, **fields) -> None:
|
||||||
"""파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다.
|
"""파이프라인 state를 갱신하고 옵션 컬럼을 함께 업데이트한다.
|
||||||
|
|
||||||
|
|||||||
@@ -1133,6 +1133,14 @@ def cancel_pipeline(pid: int):
|
|||||||
return {"ok": True}
|
return {"ok": True}
|
||||||
|
|
||||||
|
|
||||||
|
@app.delete("/api/music/pipeline/{pid}")
|
||||||
|
def delete_pipeline_endpoint(pid: int):
|
||||||
|
"""파이프라인 행을 하드 삭제(전체 목록에서 완전 제거). 없으면 404."""
|
||||||
|
if not _db_module.delete_pipeline(pid):
|
||||||
|
raise HTTPException(404)
|
||||||
|
return {"ok": True, "deleted": pid}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
|
@app.post("/api/music/pipeline/{pid}/retry", status_code=202)
|
||||||
async def retry_pipeline(pid: int, bg: BackgroundTasks):
|
async def retry_pipeline(pid: int, bg: BackgroundTasks):
|
||||||
from .pipeline.state_machine import STEPS
|
from .pipeline.state_machine import STEPS
|
||||||
|
|||||||
@@ -105,6 +105,29 @@ def test_cancel_pipeline(client):
|
|||||||
assert db.get_pipeline(pid)["state"] == "cancelled"
|
assert db.get_pipeline(pid)["state"] == "cancelled"
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_pipeline_removes_from_db(client):
|
||||||
|
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
|
||||||
|
r = client.request("DELETE", f"/api/music/pipeline/{pid}")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json()["ok"] is True
|
||||||
|
assert db.get_pipeline(pid) is None
|
||||||
|
all_ids = [p["id"] for p in client.get("/api/music/pipeline?status=all").json()["pipelines"]]
|
||||||
|
assert pid not in all_ids
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_pipeline_not_found_returns_404(client):
|
||||||
|
r = client.request("DELETE", "/api/music/pipeline/99999")
|
||||||
|
assert r.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_pipeline_removes_child_jobs(client):
|
||||||
|
pid = client.post("/api/music/pipeline", json={"track_id": 1}).json()["id"]
|
||||||
|
db.create_pipeline_job(pid, "cover")
|
||||||
|
assert len(db.list_pipeline_jobs(pid)) == 1
|
||||||
|
client.request("DELETE", f"/api/music/pipeline/{pid}")
|
||||||
|
assert db.list_pipeline_jobs(pid) == []
|
||||||
|
|
||||||
|
|
||||||
def test_setup_get_returns_defaults(client):
|
def test_setup_get_returns_defaults(client):
|
||||||
r = client.get("/api/music/setup")
|
r = client.get("/api/music/setup")
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime as dt
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
@@ -59,13 +60,19 @@ async def score_sentiment(
|
|||||||
*,
|
*,
|
||||||
name: str | None = None,
|
name: str | None = None,
|
||||||
model: str = DEFAULT_MODEL,
|
model: str = DEFAULT_MODEL,
|
||||||
|
asof: dt.date | None = None,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}."""
|
"""Returns {ticker, score_raw, reason, news_count, tokens_input, tokens_output, model}.
|
||||||
|
|
||||||
|
asof(현재 KST 일자)를 주면 prompt 맨 앞에 오늘 날짜를 명시해 LLM이 현재 시점 기준으로 판단한다.
|
||||||
|
"""
|
||||||
news_block = _format_news_block(news)
|
news_block = _format_news_block(news)
|
||||||
prompt = PROMPT_TEMPLATE.format(
|
prompt = PROMPT_TEMPLATE.format(
|
||||||
name=name or ticker, ticker=ticker,
|
name=name or ticker, ticker=ticker,
|
||||||
n=len(news), news_block=news_block,
|
n=len(news), news_block=news_block,
|
||||||
)
|
)
|
||||||
|
if asof is not None:
|
||||||
|
prompt = f"오늘 날짜: {asof.isoformat()} (이 시점 기준으로 뉴스를 평가하세요)\n\n" + prompt
|
||||||
resp = await llm.messages.create(
|
resp = await llm.messages.create(
|
||||||
model=model,
|
model=model,
|
||||||
max_tokens=200,
|
max_tokens=200,
|
||||||
|
|||||||
@@ -39,11 +39,11 @@ def _make_llm():
|
|||||||
|
|
||||||
async def _process_one(
|
async def _process_one(
|
||||||
ticker: str, name: str, articles: List[Dict[str, Any]],
|
ticker: str, name: str, articles: List[Dict[str, Any]],
|
||||||
sem: asyncio.Semaphore, llm, model: str,
|
sem: asyncio.Semaphore, llm, model: str, asof: dt.date,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
async with sem:
|
async with sem:
|
||||||
return await _analyzer.score_sentiment(
|
return await _analyzer.score_sentiment(
|
||||||
llm, ticker, articles, name=name, model=model,
|
llm, ticker, articles, name=name, model=model, asof=asof,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -110,7 +110,7 @@ async def refresh_daily(
|
|||||||
arts = articles_by_ticker.get(t, [])
|
arts = articles_by_ticker.get(t, [])
|
||||||
if not arts:
|
if not arts:
|
||||||
continue # 매핑 0 — score 미생성
|
continue # 매핑 0 — score 미생성
|
||||||
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model))
|
tasks.append(_process_one(t, name_map.get(t, t), arts, sem, llm, model, asof))
|
||||||
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
|
raw_results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
successes: List[Dict[str, Any]] = []
|
successes: List[Dict[str, Any]] = []
|
||||||
|
|||||||
@@ -125,6 +125,16 @@ from . import telegram as _tg
|
|||||||
from .engine import Screener, ScreenContext
|
from .engine import Screener, ScreenContext
|
||||||
|
|
||||||
|
|
||||||
|
def _today_kst() -> dt.date:
|
||||||
|
"""KST 오늘 날짜.
|
||||||
|
|
||||||
|
stock 컨테이너는 python:3.12-alpine + tzdata 미설치라 TZ=Asia/Seoul이 무효 →
|
||||||
|
date.today()가 UTC를 반환한다. 08시대(KST) 리포트가 하루 밀리는 것을 막기 위해
|
||||||
|
UTC+9로 명시 보정한다(holdings_intel._today_kst와 동일한 관용).
|
||||||
|
"""
|
||||||
|
return (dt.datetime.utcnow() + dt.timedelta(hours=9)).date()
|
||||||
|
|
||||||
|
|
||||||
def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date:
|
def _resolve_asof(asof_str, conn: sqlite3.Connection) -> dt.date:
|
||||||
if asof_str:
|
if asof_str:
|
||||||
return dt.date.fromisoformat(asof_str)
|
return dt.date.fromisoformat(asof_str)
|
||||||
@@ -263,7 +273,7 @@ from . import snapshot as _snap
|
|||||||
|
|
||||||
@router.post("/snapshot/refresh")
|
@router.post("/snapshot/refresh")
|
||||||
def post_snapshot_refresh(asof: Optional[str] = None):
|
def post_snapshot_refresh(asof: Optional[str] = None):
|
||||||
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
|
asof_date = dt.date.fromisoformat(asof) if asof else _today_kst()
|
||||||
if asof_date.weekday() >= 5:
|
if asof_date.weekday() >= 5:
|
||||||
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
|
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
|
||||||
with _conn() as c:
|
with _conn() as c:
|
||||||
@@ -300,7 +310,7 @@ from .ai_news import validation as _ai_validation
|
|||||||
|
|
||||||
@router.post("/snapshot/refresh-news-sentiment")
|
@router.post("/snapshot/refresh-news-sentiment")
|
||||||
async def post_refresh_news_sentiment(asof: Optional[str] = None):
|
async def post_refresh_news_sentiment(asof: Optional[str] = None):
|
||||||
asof_date = dt.date.fromisoformat(asof) if asof else dt.date.today()
|
asof_date = dt.date.fromisoformat(asof) if asof else _today_kst()
|
||||||
if asof_date.weekday() >= 5:
|
if asof_date.weekday() >= 5:
|
||||||
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
|
return {"asof": asof_date.isoformat(), "status": "skipped_weekend"}
|
||||||
if _is_holiday(asof_date):
|
if _is_holiday(asof_date):
|
||||||
|
|||||||
@@ -58,6 +58,18 @@ async def test_score_sentiment_clamps_negative_out_of_range():
|
|||||||
assert out["score_raw"] == -10.0
|
assert out["score_raw"] == -10.0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_score_sentiment_includes_asof_date_in_prompt():
|
||||||
|
"""asof(현재 KST 일자)를 넘기면 prompt에 오늘 날짜가 포함되어 LLM이 현재 일자 기준으로 판단."""
|
||||||
|
import datetime as _dt
|
||||||
|
llm = _mk_llm(json.dumps({"score": 5.0, "reason": "ok"}))
|
||||||
|
await analyzer.score_sentiment(
|
||||||
|
llm, "005930", NEWS, name="삼성전자", asof=_dt.date(2026, 7, 2),
|
||||||
|
)
|
||||||
|
user_msg = llm.messages.create.call_args.kwargs["messages"][0]["content"]
|
||||||
|
assert "2026-07-02" in user_msg
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_score_sentiment_includes_summary_in_prompt():
|
async def test_score_sentiment_includes_summary_in_prompt():
|
||||||
"""summary 가 있으면 prompt 에 포함, 없으면 title 만."""
|
"""summary 가 있으면 prompt 에 포함, 없으면 title 만."""
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ async def test_refresh_daily_happy_path(conn):
|
|||||||
scores_by_ticker = {
|
scores_by_ticker = {
|
||||||
"005930": 7.5, "000660": 4.0, "373220": -6.0,
|
"005930": 7.5, "000660": 4.0, "373220": -6.0,
|
||||||
}
|
}
|
||||||
async def fake_score(llm, ticker, news, *, name=None, model="m"):
|
async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
|
||||||
return {
|
return {
|
||||||
"ticker": ticker, "score_raw": scores_by_ticker[ticker],
|
"ticker": ticker, "score_raw": scores_by_ticker[ticker],
|
||||||
"reason": f"r{ticker}", "news_count": 1,
|
"reason": f"r{ticker}", "news_count": 1,
|
||||||
@@ -81,7 +81,7 @@ async def test_refresh_daily_failures_isolated(conn):
|
|||||||
}
|
}
|
||||||
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
|
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
|
||||||
|
|
||||||
async def fake_score(llm, ticker, news, *, name=None, model="m"):
|
async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
|
||||||
if ticker == "000660":
|
if ticker == "000660":
|
||||||
raise RuntimeError("llm exploded")
|
raise RuntimeError("llm exploded")
|
||||||
return {
|
return {
|
||||||
@@ -116,7 +116,7 @@ async def test_refresh_daily_no_match_ticker_skipped(conn):
|
|||||||
}
|
}
|
||||||
fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1}
|
fake_stats = {"total_articles": 1, "matched_pairs": 1, "hit_tickers": 1}
|
||||||
|
|
||||||
async def fake_score(llm, ticker, news, *, name=None, model="m"):
|
async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
|
||||||
return {
|
return {
|
||||||
"ticker": ticker, "score_raw": 5.0, "reason": "r",
|
"ticker": ticker, "score_raw": 5.0, "reason": "r",
|
||||||
"news_count": 1, "tokens_input": 100, "tokens_output": 20,
|
"news_count": 1, "tokens_input": 100, "tokens_output": 20,
|
||||||
@@ -152,7 +152,7 @@ async def test_refresh_daily_sign_gate_no_positive_in_neg(conn):
|
|||||||
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
|
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
|
||||||
scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수
|
scores = {"005930": 6.0, "000660": 2.0, "373220": 0.5} # 모두 양수
|
||||||
|
|
||||||
async def fake_score(llm, ticker, news, *, name=None, model="m"):
|
async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
|
||||||
return {
|
return {
|
||||||
"ticker": ticker, "score_raw": scores[ticker], "reason": "r",
|
"ticker": ticker, "score_raw": scores[ticker], "reason": "r",
|
||||||
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
|
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
|
||||||
@@ -183,7 +183,7 @@ async def test_refresh_daily_sign_gate_excludes_neutral(conn):
|
|||||||
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
|
fake_stats = {"total_articles": 3, "matched_pairs": 3, "hit_tickers": 3}
|
||||||
scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0}
|
scores = {"005930": 3.0, "000660": 0.0, "373220": -3.0}
|
||||||
|
|
||||||
async def fake_score(llm, ticker, news, *, name=None, model="m"):
|
async def fake_score(llm, ticker, news, *, name=None, model="m", asof=None):
|
||||||
return {
|
return {
|
||||||
"ticker": ticker, "score_raw": scores[ticker], "reason": "r",
|
"ticker": ticker, "score_raw": scores[ticker], "reason": "r",
|
||||||
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
|
"news_count": 1, "tokens_input": 1, "tokens_output": 1, "model": model,
|
||||||
|
|||||||
@@ -5,6 +5,21 @@ from fastapi.testclient import TestClient
|
|||||||
from app.main import app
|
from app.main import app
|
||||||
|
|
||||||
|
|
||||||
|
def test_today_kst_uses_kst_offset_not_utc(monkeypatch):
|
||||||
|
"""컨테이너가 UTC(Alpine, tzdata 미설치)라 date.today()는 08시 KST에 어제를 준다.
|
||||||
|
_today_kst()는 UTC+9로 보정해 오늘(KST)을 반환해야 한다."""
|
||||||
|
from app.screener import router
|
||||||
|
|
||||||
|
class _FrozenDT(dt.datetime):
|
||||||
|
@classmethod
|
||||||
|
def utcnow(cls):
|
||||||
|
# 2026-07-01 23:30 UTC == 2026-07-02 08:30 KST (AI 뉴스 리포트 시각대)
|
||||||
|
return dt.datetime(2026, 7, 1, 23, 30, 0)
|
||||||
|
|
||||||
|
monkeypatch.setattr(router.dt, "datetime", _FrozenDT)
|
||||||
|
assert router._today_kst() == dt.date(2026, 7, 2)
|
||||||
|
|
||||||
|
|
||||||
def test_refresh_news_sentiment_weekend_skip():
|
def test_refresh_news_sentiment_weekend_skip():
|
||||||
# 2026-05-16 = Saturday
|
# 2026-05-16 = Saturday
|
||||||
client = TestClient(app)
|
client = TestClient(app)
|
||||||
|
|||||||
Reference in New Issue
Block a user