Files
web-page-backend/music-lab/app/pipeline/state_machine.py

42 lines
1.4 KiB
Python

"""파이프라인 상태 머신 — 전이 규칙 단일 소스."""
STEPS = ["cover", "video", "thumb", "meta", "review", "publish"]
USER_GATES = ["cover", "video", "thumb", "meta", "publish"] # review는 자동
_APPROVE_NEXT = {
"cover_pending": "video_pending",
"video_pending": "thumb_pending",
"thumb_pending": "meta_pending",
"meta_pending": "ai_review", # 자동 검토 단계로
"publish_pending": "publishing",
}
TERMINAL_STATES = {"published", "cancelled", "failed", "awaiting_manual"}
def next_state_on_approve(state: str) -> str:
if state not in _APPROVE_NEXT:
raise ValueError(f"승인 불가 상태: {state}")
return _APPROVE_NEXT[state]
def next_state_on_reject(state: str) -> str:
if not state.endswith("_pending"):
raise ValueError(f"반려 불가 상태: {state}")
return state # 같은 상태 유지 (재생성 후 다시 _pending)
def can_transition(from_state: str, to_state: str) -> bool:
if from_state in TERMINAL_STATES:
return False
if to_state in {"cancelled", "failed", "awaiting_manual"}:
return True
if to_state == _APPROVE_NEXT.get(from_state):
return True
# 자동 전이 (ai_review → publish_pending, publishing → published)
auto_transitions = {
("ai_review", "publish_pending"),
("publishing", "published"),
}
return (from_state, to_state) in auto_transitions