各位同仁、技术爱好者们,晚上好!
今天,我们将一同深入探讨一个在现代异步系统设计中至关重要的模式——‘Wait-for-Event’。想象一下,我们正在构建一个复杂的业务流程,它可能涉及人工审批、外部系统的数据处理、甚至漫长的数据同步。这些流程并非一蹴而就,它们常常需要在某个节点停下来,等待一个外部的信号,然后才能继续。这就像在一条生产线上,某个工位完成了一部分工作后,必须等待质检部门的反馈,才能继续下一个环节。
在同步编程中,我们可能会简单地使用阻塞调用,但那会迅速扼杀系统的响应性和吞吐量。在异步编程的世界里,我们追求的是非阻塞、高并发。那么,如何在异步图中,实现一个节点优雅地“暂停”,然后等待一个外部信号,最后再“恢复”执行呢?这正是我们今天要解决的核心问题。
深入 ‘Wait-for-Event’ 模式:构建可暂停异步图的艺术与实践
引言:异步图与等待的艺术
在构建现代软件系统时,我们经常会遇到需要协调多个独立任务或服务的情况。传统上,我们可能会使用线性流程或简单的回调链。然而,当这些任务变得复杂、依赖关系增多,并且涉及到不确定何时完成的外部操作时,传统的模式就会显得捉襟见肘。异步图(Asynchronous Graph)应运而生,它以节点(Node)代表任务,边(Edge)代表依赖关系,允许任务并行执行,并在依赖满足时自动推进。
但异步图并非万能,它通常擅长处理“即发即弃”或“快速响应”的任务。一旦某个节点需要等待一个无法预测的外部事件(例如用户点击审批、第三方API回调、定时器触发),传统的异步图就会面临挑战:
- 阻塞问题: 如果节点内部直接阻塞等待,那么整个异步图的调度器可能会被挂起,影响其他独立节点的执行。这与异步编程的初衷背道而驰。
- 状态管理: 当节点暂停时,它的内部状态、上下文信息如何保存?如何确保在事件到来时能够正确恢复?
- 事件解耦: 外部事件如何高效、可靠地通知到正在等待的特定节点,而不是广播给所有节点?
- 恢复机制: 如何让图调度器“知道”一个暂停的节点已经收到信号,可以重新进入执行队列?
‘Wait-for-Event’ 模式正是为了解决这些挑战而生。它允许一个异步图中的节点在执行过程中声明:“我需要暂停,等待某个特定类型的事件,当事件发生时,请携带相关数据来恢复我。” 这使得异步图能够处理长时间运行、需要外部交互的复杂业务流程,同时保持其非阻塞的特性。
异步图与传统流程的界限
在深入探讨实现之前,我们先来明确一下异步图和传统同步流程的根本区别,这将有助于我们理解为何需要 ‘Wait-for-Event’ 模式。
| 特性 | 传统同步流程 | 异步图 |
|---|---|---|
| 执行模型 | 顺序执行,一个任务完成后才能开始下一个。 | 并行执行,多个独立任务可同时进行。 |
| 阻塞 | 任务内部的I/O操作或等待会阻塞整个线程/进程。 | 任务内部的I/O操作通常是非阻塞的,通过回调或协程管理。 |
| 资源利用 | 线程/进程可能长时间处于等待状态,资源浪费。 | 线程/进程在等待I/O时可以切换到其他任务,提高资源利用率。 |
| 复杂性 | 流程简单直观,但难以处理并发和外部等待。 | 流程更灵活,擅长处理复杂依赖和高并发,但状态管理和调度更复杂。 |
| 外部交互 | 倾向于轮询或阻塞等待外部响应。 | 倾向于事件驱动,外部事件触发内部状态转换。 |
在传统同步流程中,一个 wait_for_approval() 函数可能会直接阻塞当前线程,直到审批完成。但在异步图中,我们无法接受这种阻塞。因此,我们需要一种机制,让节点在等待时,能够释放其占用的执行资源,并在外部事件到来时,被“唤醒”并重新调度。
‘Wait-for-Event’ 模式的核心挑战
要设计一个能在节点中间暂停并等待外部系统信号的异步图,我们需要解决以下核心挑战:
- 状态持久化与恢复: 当节点暂停时,它的所有运行时数据(局部变量、执行进度、上下文)都必须被保存。当事件到来时,这些数据需要被精确地恢复,以便节点能从上次中断的地方继续执行。
- 事件路由与订阅: 外部系统发出的信号如何准确地路由到正确的、正在等待的节点?这要求节点能够声明它正在等待的事件类型,并且事件系统能够根据类型将事件分发给订阅者。
- 调度器感知: 异步图的调度器必须能够识别节点处于“等待”状态,并将其从活跃执行队列中移除。同时,它也需要知道何时将一个因事件而恢复的节点重新加入执行队列。
- 幂等性与并发: 如果外部事件重复发送,或者在恢复过程中有多个事件同时到来,系统如何保证只处理一次或以预期的方式处理?
- 超时与错误处理: 如果外部事件迟迟不来,节点是否应该无限期等待?如何处理等待超时?如果外部系统发出的是错误信号,如何进行错误处理和补偿?
设计原则:构建健壮的暂停机制
为了有效应对上述挑战,我们应遵循以下设计原则:
- 状态化节点 (Stateful Nodes): 节点必须能够封装并管理自己的内部状态。当暂停时,其关键状态能够被序列化;当恢复时,能够被反序列化。
- 事件驱动 (Event-Driven Architecture): 外部信号应以事件的形式通知系统。事件总线(Event Bus)是实现解耦和高效通信的关键。
- 显式暂停与恢复 (Explicit Pause/Resume): 节点应显式地向调度器声明其暂停意图,并提供恢复的入口。调度器不应猜测节点的意图。
- 解耦 (Decoupling): 节点不应直接轮询外部系统。事件的产生和消费应通过中间层(事件总线、适配器)进行解耦。
- 持久化 (Persistence): 对于长时间等待的节点,其状态应被持久化到可靠的存储介质中,以防系统重启或故障。
- 可观测性 (Observability): 能够清晰地看到哪些节点处于等待状态,等待什么事件,以及它们当前的上下文。
核心组件与架构蓝图
为了实现 ‘Wait-for-Event’ 模式,我们的异步图系统需要以下核心组件:
| 组件名称 | 职责 | 关键能力 |
|---|---|---|
| Graph Orchestrator (图调度器) | 整个异步图的中央控制器。负责管理所有节点的生命周期、状态转换、依赖关系解析、以及节点的调度执行。它需要识别节点的暂停请求,并将事件传递给相应的等待节点。 | 节点状态管理、依赖图遍历、节点调度、处理节点暂停/恢复请求、与事件总线交互。 |
| Node (节点/任务) | 异步图中的一个独立执行单元。它封装了具体的业务逻辑,能够执行、暂停,并在事件触发时恢复。节点必须能够保存和恢复自己的内部状态。 | execute() 方法(返回执行结果和新的状态)、serialize_state() / deserialize_state()、resume() 方法。 |
| Event Bus / Signal Dispatcher (事件总线/信号分发器) | 负责接收来自外部系统或内部的事件,并将其分发给所有订阅了该事件的节点。它是外部世界与异步图内部通信的桥梁,实现了事件的生产者和消费者之间的解耦。 | 事件订阅/发布机制、事件路由、事件缓存(可选)。 |
| External System Adapter (外部系统适配器) | 负责将外部系统的原始信号(例如HTTP请求、消息队列消息、数据库更新)转换成内部统一的事件格式,并发布到事件总线。它隔离了异步图与外部系统的具体通信细节。 | 监听外部系统、将外部信号映射为内部事件、发布事件。 |
| State Store / Persistence Layer (状态存储/持久化层) | 负责持久化暂停节点的完整状态。当系统崩溃或需要长时间等待时,节点的上下文不会丢失。这可以是数据库、键值存储、或者专门的持久化队列。 | 存储节点状态快照、按需加载节点状态。 |
详细设计与代码实践
我们将使用 Python 来演示这些概念。Python 的 asyncio 和协程机制非常适合构建非阻塞的异步图。
1. 节点接口定义
首先,我们定义节点可能的状态,以及节点执行的结果。
import asyncio
import uuid
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List, Callable, Awaitable
# 定义节点可能的状态
class NodeState:
READY = "READY" # 准备好执行
RUNNING = "RUNNING" # 正在执行
PAUSED = "PAUSED" # 暂停,等待外部事件
COMPLETED = "COMPLETED" # 执行完成
FAILED = "FAILED" # 执行失败
# 节点执行结果
class NodeResult:
def __init__(self, status: str, data: Optional[Dict[str, Any]] = None,
wait_event_type: Optional[str] = None,
error: Optional[str] = None):
self.status = status
self.data = data if data is not None else {}
self.wait_event_type = wait_event_type
self.error = error
def __repr__(self):
return f"NodeResult(status={self.status}, wait_event_type={self.wait_event_type}, error={self.error})"
# 基础节点抽象类
class BaseNode(ABC):
def __init__(self, node_id: str, name: str, config: Optional[Dict[str, Any]] = None):
self.node_id = node_id
self.name = name
self.config = config if config is not None else {}
self._current_state = NodeState.READY
self._internal_context: Dict[str, Any] = {} # 节点内部用于保存状态的上下文
@property
def current_state(self) -> str:
return self._current_state
@current_state.setter
def current_state(self, state: str):
self._current_state = state
@abstractmethod
async def execute(self, input_data: Dict[str, Any]) -> NodeResult:
"""
执行节点的核心逻辑。
如果需要暂停,返回 NodeResult(status=PAUSED, wait_event_type='...')
"""
pass
async def resume(self, event_data: Dict[str, Any]) -> NodeResult:
"""
当节点从 PAUSED 状态恢复时调用。
默认实现是重新调用 execute,但具体节点可以重写此方法以处理事件数据并继续。
"""
return await self.execute(event_data) # 默认将事件数据作为输入继续执行
def serialize_state(self) -> Dict[str, Any]:
"""
序列化节点内部状态,以便持久化。
"""
return {
"node_id": self.node_id,
"name": self.name,
"config": self.config,
"current_state": self._current_state,
"internal_context": self._internal_context # 序列化内部上下文
}
@classmethod
def deserialize_state(cls, state_data: Dict[str, Any]):
"""
从序列化数据中反序列化节点。
注意:这里需要一个机制来根据 'name' 或 'type' 动态创建具体的节点实例。
为了简化,我们假设我们知道要反序列化成哪个具体的节点类。
"""
node_id = state_data.get("node_id", str(uuid.uuid4()))
name = state_data.get("name", "UnknownNode")
config = state_data.get("config", {})
# 实际应用中,这里会根据 'name' 或 'type' 字符串来实例化正确的节点子类
# 例如:
# node_type = state_data.get("node_type")
# if node_type == "HumanApprovalNode":
# node = HumanApprovalNode(node_id, name, config)
# else:
# raise ValueError(f"Unknown node type: {node_type}")
# 简化处理:假设我们直接创建一个 BaseNode 实例,但实际需要具体的子类
node = cls(node_id, name, config)
node.current_state = state_data.get("current_state", NodeState.READY)
node._internal_context = state_data.get("internal_context", {})
return node
这里 BaseNode 定义了所有节点必须实现的基本行为。execute 方法是核心,它返回 NodeResult 来指示执行状态。特别地,如果节点需要暂停,它会返回 NodeState.PAUSED 并指定 wait_event_type。_internal_context 是节点内部私有状态,用于在暂停前后保持上下文。
2. 实现一个可暂停节点
现在我们来实现一个具体的、可以暂停的节点:HumanApprovalNode。它模拟一个需要人工审批的流程。
class HumanApprovalNode(BaseNode):
def __init__(self, node_id: str, name: str, config: Optional[Dict[str, Any]] = None):
super().__init__(node_id, name, config)
# 首次运行时,需要等待审批
self._internal_context["awaiting_approval"] = True
self._internal_context["approval_outcome"] = None
async def execute(self, input_data: Dict[str, Any]) -> NodeResult:
print(f"[{self.name}] 收到输入: {input_data}")
if self._internal_context["awaiting_approval"]:
print(f"[{self.name}] 正在等待人工审批...")
# 节点声明暂停,并指定等待的事件类型
self.current_state = NodeState.PAUSED
return NodeResult(status=NodeState.PAUSED,
wait_event_type=f"approval_event_{self.node_id}",
data={"approval_request_id": self.node_id})
else:
# 审批已完成,根据结果决定后续
outcome = self._internal_context["approval_outcome"]
if outcome == "approved":
print(f"[{self.name}] 审批通过,继续执行。")
self.current_state = NodeState.COMPLETED
return NodeResult(status=NodeState.COMPLETED, data={"result": "approved", "final_data": input_data})
else: # outcome == "rejected"
print(f"[{self.name}] 审批拒绝,流程终止。")
self.current_state = NodeState.FAILED
return NodeResult(status=NodeState.FAILED, error="Approval rejected")
async def resume(self, event_data: Dict[str, Any]) -> NodeResult:
"""
人工审批节点特有的恢复逻辑,处理审批结果。
"""
print(f"[{self.name}] 收到审批事件: {event_data}")
self._internal_context["awaiting_approval"] = False
# 从事件数据中获取审批结果
approval_status = event_data.get("status")
if approval_status == "approved":
self._internal_context["approval_outcome"] = "approved"
else:
self._internal_context["approval_outcome"] = "rejected"
# 重新执行 execute 方法,这次它会因为 awaiting_approval 为 False 而走完成逻辑
# 传递原始输入数据或事件数据,取决于业务需求
return await self.execute(event_data.get("original_input_data", {}))
# 为了简化反序列化,需要一个方法来创建特定类型的节点
@classmethod
def create_from_state(cls, state_data: Dict[str, Any]):
node = cls(state_data["node_id"], state_data["name"], state_data["config"])
node.current_state = state_data["current_state"]
node._internal_context = state_data["internal_context"]
return node
# 其他示例节点
class StartNode(BaseNode):
async def execute(self, input_data: Dict[str, Any]) -> NodeResult:
print(f"[{self.name}] 流程开始,初始数据: {input_data}")
self.current_state = NodeState.COMPLETED
return NodeResult(status=NodeState.COMPLETED, data=input_data)
class ProcessDataNode(BaseNode):
async def execute(self, input_data: Dict[str, Any]) -> NodeResult:
print(f"[{self.name}] 正在处理数据: {input_data}")
processed_data = {k: v.upper() if isinstance(v, str) else v * 2 for k, v in input_data.items()}
print(f"[{self.name}] 数据处理完成: {processed_data}")
self.current_state = NodeState.COMPLETED
return NodeResult(status=NodeState.COMPLETED, data=processed_data)
class EndNode(BaseNode):
async def execute(self, input_data: Dict[str, Any]) -> NodeResult:
print(f"[{self.name}] 流程结束,最终数据: {input_data}")
self.current_state = NodeState.COMPLETED
return NodeResult(status=NodeState.COMPLETED, data=input_data)
HumanApprovalNode 在第一次 execute 时,会设置 awaiting_approval 为 True,并返回 NodeState.PAUSED,同时告知调度器它在等待 approval_event_{node_id} 类型的事件。当事件通过 resume 方法传递进来时,它会更新 awaiting_approval 状态并根据事件数据决定审批结果,然后再次调用 execute 来完成剩余逻辑。
3. 事件总线与信号分发
事件总线是解耦的关键。它负责接收事件并将其分发给所有注册的监听器。
class EventEmitter:
def __init__(self):
# 存储 {event_type: [callback1, callback2, ...]}
self._listeners: Dict[str, List[Callable[[Dict[str, Any]], Awaitable[None]]]] = {}
def on(self, event_type: str, callback: Callable[[Dict[str, Any]], Awaitable[None]]):
"""
注册一个事件监听器。
"""
if event_type not in self._listeners:
self._listeners[event_type] = []
self._listeners[event_type].append(callback)
print(f"[EventEmitter] 注册监听器: 事件类型='{event_type}'")
def off(self, event_type: str, callback: Callable[[Dict[str, Any]], Awaitable[None]]):
"""
移除一个事件监听器。
"""
if event_type in self._listeners and callback in self._listeners[event_type]:
self._listeners[event_type].remove(callback)
if not self._listeners[event_type]:
del self._listeners[event_type]
print(f"[EventEmitter] 移除监听器: 事件类型='{event_type}'")
async def emit(self, event_type: str, event_data: Dict[str, Any]):
"""
发布一个事件,通知所有监听器。
"""
print(f"[EventEmitter] 发布事件: 类型='{event_type}', 数据={event_data}")
if event_type in self._listeners:
# 并发地调用所有监听器
await asyncio.gather(*[listener(event_data) for listener in self._listeners[event_type]])
else:
print(f"[EventEmitter] 没有监听器订阅事件类型 '{event_type}'")
EventEmitter 提供 on 注册监听器和 emit 发布事件的功能。当一个事件被 emit 时,所有注册了该事件类型的回调函数都会被异步调用。
4. 图调度器 (Graph Orchestrator)
这是系统的核心,它管理整个图的执行。
class GraphOrchestrator:
def __init__(self, event_emitter: EventEmitter):
self._nodes: Dict[str, BaseNode] = {} # {node_id: node_instance}
self._dependencies: Dict[str, List[str]] = {} # {node_id: [dependent_node_id1, ...]}
self._node_output_data: Dict[str, Dict[str, Any]] = {} # {node_id: output_data}
self._event_emitter = event_emitter
self._paused_nodes: Dict[str, BaseNode] = {} # {node_id: node_instance} 存储暂停的节点
def add_node(self, node: BaseNode):
self._nodes[node.node_id] = node
self._dependencies[node.node_id] = [] # 初始化依赖
def add_dependency(self, from_node_id: str, to_node_id: str):
if from_node_id not in self._nodes or to_node_id not in self._nodes:
raise ValueError("Invalid node IDs for dependency.")
self._dependencies[from_node_id].append(to_node_id)
def _get_ready_nodes(self) -> List[BaseNode]:
"""
获取所有准备好执行的节点。
一个节点准备好,意味着:
1. 它还没有完成或失败。
2. 它的所有前置依赖都已完成。
3. 它当前不是 PAUSED 状态。
"""
ready_nodes = []
for node_id, node in self._nodes.items():
if node.current_state in [NodeState.READY, NodeState.RUNNING]: # 允许 RUNNING 节点重新评估(例如在并发调度中)
# 检查所有前置节点是否完成
all_predecessors_completed = True
for pred_node_id, succ_nodes in self._dependencies.items():
if node_id in succ_nodes: # current node_id is a successor of pred_node_id
if self._nodes[pred_node_id].current_state != NodeState.COMPLETED:
all_predecessors_completed = False
break
if all_predecessors_completed and node.current_state != NodeState.PAUSED:
ready_nodes.append(node)
return ready_nodes
async def _run_node(self, node: BaseNode):
print(f"[Orchestrator] 调度节点 '{node.name}' ({node.node_id}) 执行。")
node.current_state = NodeState.RUNNING
# 收集前置节点的输出作为当前节点的输入
input_data = {}
for pred_node_id, succ_nodes in self._dependencies.items():
if node.node_id in succ_nodes and pred_node_id in self._node_output_data:
input_data.update(self._node_output_data[pred_node_id])
try:
result = await node.execute(input_data)
node.current_state = result.status
self._node_output_data[node.node_id] = result.data
if result.status == NodeState.PAUSED:
print(f"[Orchestrator] 节点 '{node.name}' ({node.node_id}) 暂停,等待事件: '{result.wait_event_type}'")
self._paused_nodes[node.node_id] = node
# 注册事件监听器,当事件到来时,调用 handle_event
# 注意:这里需要将原始输入数据也传递给事件处理,以便恢复时可以使用
event_data_for_resume = input_data.copy()
event_data_for_resume.update(result.data) # 包含节点暂停时可能产生的中间数据
# 创建一个特定于此节点的回调函数,用于处理事件
async def _node_event_handler(event_data_from_emitter: Dict[str, Any]):
# 合并事件数据与原始输入数据,确保resume时有完整上下文
full_event_data = event_data_for_resume.copy()
full_event_data.update(event_data_from_emitter)
await self._handle_event_for_node(node.node_id, full_event_data)
self._event_emitter.on(result.wait_event_type, _node_event_handler)
elif result.status == NodeState.FAILED:
print(f"[Orchestrator] 节点 '{node.name}' ({node.node_id}) 失败: {result.error}")
# 标记所有依赖此节点的后续节点也失败
self._mark_dependents_failed(node.node_id)
elif result.status == NodeState.COMPLETED:
print(f"[Orchestrator] 节点 '{node.name}' ({node.node_id}) 完成。")
except Exception as e:
node.current_state = NodeState.FAILED
print(f"[Orchestrator] 节点 '{node.name}' ({node.node_id}) 运行时异常: {e}")
self._node_output_data[node.node_id] = {"error": str(e)}
self._mark_dependents_failed(node.node_id)
def _mark_dependents_failed(self, failed_node_id: str):
"""递归标记依赖于失败节点的后续节点为失败状态"""
for node_id, node in self._nodes.items():
for pred_node_id, succ_nodes in self._dependencies.items():
if node_id in succ_nodes and pred_node_id == failed_node_id:
if node.current_state not in [NodeState.COMPLETED, NodeState.FAILED]:
node.current_state = NodeState.FAILED
print(f"[Orchestrator] 节点 '{node.name}' ({node.node_id}) 因依赖失败而被标记为失败。")
self._mark_dependents_failed(node.node_id) # 递归
async def _handle_event_for_node(self, node_id: str, event_data: Dict[str, Any]):
"""
处理特定节点收到的事件,使其从暂停状态恢复。
"""
if node_id not in self._paused_nodes:
print(f"[Orchestrator] 节点 '{node_id}' 不在暂停列表中,可能已恢复或不存在。事件被忽略。")
return
node = self._paused_nodes[node_id]
if node.current_state != NodeState.PAUSED:
print(f"[Orchestrator] 节点 '{node.name}' ({node_id}) 状态不是PAUSED,无法恢复。当前状态: {node.current_state}")
return
print(f"[Orchestrator] 恢复节点 '{node.name}' ({node_id}),事件数据: {event_data}")
# 移除事件监听器,避免重复触发
# 这里需要一个更精细的 off 方法,或者在 on 方法中返回一个可取消的句柄
# 简化处理:假设事件总线允许重复注册/移除同类型的回调
# 实际生产中,这里的 off() 逻辑会更复杂,确保只移除与该 node_id 相关的特定回调。
# for simplicity, we skip precise `off` here, assuming event_emitter can handle multiple subscriptions.
try:
result = await node.resume(event_data)
node.current_state = result.status
self._node_output_data[node.node_id] = result.data
if result.status == NodeState.PAUSED:
print(f"[Orchestrator] 节点 '{node.name}' ({node.node_id}) 恢复后再次暂停,等待事件: '{result.wait_event_type}'")
# 重新注册监听器,如果节点再次暂停
event_data_for_resume = event_data.copy() # 使用最新的事件数据作为上下文
event_data_for_resume.update(result.data)
async def _node_event_handler(new_event_data_from_emitter: Dict[str, Any]):
full_event_data = event_data_for_resume.copy()
full_event_data.update(new_event_data_from_emitter)
await self._handle_event_for_node(node.node_id, full_event_data)
self._event_emitter.on(result.wait_event_type, _node_event_handler)
else:
del self._paused_nodes[node_id] # 节点不再暂停
if result.status == NodeState.FAILED:
print(f"[Orchestrator] 节点 '{node.name}' ({node.node_id}) 恢复后失败: {result.error}")
self._mark_dependents_failed(node.node_id)
elif result.status == NodeState.COMPLETED:
print(f"[Orchestrator] 节点 '{node.name}' ({node.node_id}) 恢复后完成。")
# 重新调度,检查是否有新节点可以运行
await self.run_graph_step() # 重要的步骤,恢复后需要重新触发调度
except Exception as e:
node.current_state = NodeState.FAILED
print(f"[Orchestrator] 节点 '{node.name}' ({node_id}) 恢复时异常: {e}")
self._node_output_data[node.node_id] = {"error": str(e)}
del self._paused_nodes[node_id]
self._mark_dependents_failed(node.node_id)
await self.run_graph_step() # 失败后也尝试重新调度
async def run_graph_step(self):
"""
执行图的一个调度步骤,运行所有准备好的节点。
"""
print("n--- 调度器运行一个步骤 ---")
runnable_nodes = self._get_ready_nodes()
if not runnable_nodes and not self._paused_nodes:
print("[Orchestrator] 没有可运行或暂停的节点,图执行完成。")
return
if not runnable_nodes and self._paused_nodes:
print(f"[Orchestrator] 没有可运行节点,但有 {len(self._paused_nodes)} 个节点正在等待事件。等待外部信号...")
return # 图在等待外部事件
tasks = [self._run_node(node) for node in runnable_nodes]
if tasks:
await asyncio.gather(*tasks)
# 运行完一批节点后,再次尝试调度,因为可能有新节点变为 READY 状态
await self.run_graph_step()
else:
print("[Orchestrator] 没有可运行节点,但图仍在活跃状态 (可能存在 PAUSED 节点)。")
def get_node_status(self) -> Dict[str, str]:
return {node.name: node.current_state for node in self._nodes.values()}
def get_graph_status(self) -> str:
all_completed = all(node.current_state == NodeState.COMPLETED for node in self._nodes.values())
any_failed = any(node.current_state == NodeState.FAILED for node in self._nodes.values())
any_paused = any(node.current_state == NodeState.PAUSED for node in self._nodes.values())
if any_failed:
return "FAILED"
elif all_completed:
return "COMPLETED"
elif any_paused:
return "PAUSED"
else:
return "RUNNING" if any(node.current_state in [NodeState.READY, NodeState.RUNNING] for node in self._nodes.values()) else "IDLE"
# 持久化和恢复图状态(简化示例,实际会更复杂)
def serialize_graph_state(self) -> Dict[str, Any]:
serialized_nodes = {node_id: node.serialize_state() for node_id, node in self._nodes.items()}
return {
"nodes": serialized_nodes,
"dependencies": self._dependencies,
"node_output_data": self._node_output_data,
"paused_nodes": list(self._paused_nodes.keys()) # 只保存ID,恢复时再重新引用
}
@classmethod
def deserialize_graph_state(cls, state_data: Dict[str, Any], event_emitter: EventEmitter):
orchestrator = cls(event_emitter)
# 恢复节点
for node_id, node_state in state_data["nodes"].items():
# 这里需要一个工厂函数或注册表来根据 node_state['name'] 重新创建正确的节点实例
# 简化:假设我们知道节点类型
node_type_map = {
"Start Node": StartNode,
"Process Data": ProcessDataNode,
"Approve Request": HumanApprovalNode,
"End Node": EndNode
}
node_cls = node_type_map.get(node_state["name"]) # 假设 name 可以映射到类
if not node_cls:
raise ValueError(f"Unknown node type for deserialization: {node_state['name']}")
node = node_cls.create_from_state(node_state) # 使用每个节点自己的 create_from_state
orchestrator.add_node(node)
# 恢复依赖
orchestrator._dependencies = state_data["dependencies"]
orchestrator._node_output_data = state_data["node_output_data"]
# 恢复暂停节点
for node_id in state_data["paused_nodes"]:
if node_id in orchestrator._nodes:
node = orchestrator._nodes[node_id]
orchestrator._paused_nodes[node_id] = node
# 重新注册事件监听器
# 这部分逻辑在实际中需要更精细,因为需要知道暂停时等待的是哪个事件类型
# 并且需要重新绑定回调到 `_handle_event_for_node`
# 简化:假设暂停节点在恢复后,会重新在第一次运行时注册事件
# 或者在 serialize_state 中存储 wait_event_type
# 例如:node.serialize_state() 中可以包含 node.wait_event_type
# orchestrator._event_emitter.on(node.wait_event_type, orchestrator._handle_event_for_node)
print(f"[Orchestrator] 恢复暂停节点 '{node.name}' ({node_id}) 到等待状态。")
return orchestrator
GraphOrchestrator 的核心逻辑是 run_graph_step。它不断地查找并执行 READY 状态的节点。当一个节点返回 NodeState.PAUSED 时,调度器:
- 将该节点移入
_paused_nodes列表。 - 在
EventEmitter上为该节点注册一个特定的回调 (_node_event_handler),监听result.wait_event_type。这个回调会最终调用_handle_event_for_node。 _handle_event_for_node接收到事件后,会调用暂停节点的resume方法,并根据resume的结果决定节点的下一步状态,然后再次触发run_graph_step。
5. 整体流程示例
现在,我们把所有组件组合起来,运行一个包含人工审批的异步图。
async def main():
print("--- 启动异步图演示 ---")
event_emitter = EventEmitter()
orchestrator = GraphOrchestrator(event_emitter)
# 定义节点
start_node = StartNode(node_id="n1", name="Start Node")
process_data_node = ProcessDataNode(node_id="n2", name="Process Data")
approval_node = HumanApprovalNode(node_id="n3", name="Approve Request")
end_node = EndNode(node_id="n4", name="End Node")
orchestrator.add_node(start_node)
orchestrator.add_node(process_data_node)
orchestrator.add_node(approval_node)
orchestrator.add_node(end_node)
# 定义依赖关系
orchestrator.add_dependency(start_node.node_id, process_data_node.node_id)
orchestrator.add_dependency(process_data_node.node_id, approval_node.node_id)
orchestrator.add_dependency(approval_node.node_id, end_node.node_id)
initial_input = {"document_id": "DOC-XYZ-123", "amount": 100}
orchestrator._node_output_data[start_node.node_id] = initial_input # 模拟开始节点的输入
# 第一次运行图,StartNode 和 ProcessDataNode 会执行
await orchestrator.run_graph_step()
print(f"n当前图状态: {orchestrator.get_node_status()}")
print(f"图整体状态: {orchestrator.get_graph_status()}")
# 模拟外部系统发出审批通过事件
print("n--- 模拟外部系统发出审批通过事件 ---")
approval_event_data = {
"status": "approved",
"approved_by": "John Doe",
"timestamp": "2023-10-27T10:00:00Z",
"original_input_data": initial_input # 确保恢复时有原始上下文
}
# 事件类型与 HumanApprovalNode 暂停时声明的类型匹配
await event_emitter.emit(f"approval_event_{approval_node.node_id}", approval_event_data)
# 因为事件处理内部会触发 run_graph_step,这里可能不需要额外调用
# 但为了确保调度器在事件处理后再次检查,可以显式调用
# await orchestrator.run_graph_step() # 实际已由 _handle_event_for_node 触发
print(f"n当前图状态: {orchestrator.get_node_status()}")
print(f"图整体状态: {orchestrator.get_graph_status()}")
# 模拟外部系统发出审批拒绝事件 (如果流程允许回退或重试,可以再次模拟)
# 为了演示,我们假设第一次审批通过,不再模拟拒绝
# 演示持久化和恢复
print("n--- 演示图状态持久化与恢复 ---")
serialized_state = orchestrator.serialize_graph_state()
# print(f"序列化状态: {serialized_state}") # 可打印查看
# 创建一个新的调度器实例,从序列化状态恢复
new_event_emitter = EventEmitter()
restored_orchestrator = GraphOrchestrator.deserialize_graph_state(serialized_state, new_event_emitter)
print(f"n恢复后的图状态: {restored_orchestrator.get_node_status()}")
print(f"恢复后图整体状态: {restored_orchestrator.get_graph_status()}")
# 假设恢复后,我们再次模拟一个审批事件(如果审批节点还在等待的话)
# 在这个例子中,由于上一步已经审批完成,所以恢复后会直接是 COMPLETED 状态
# 如果我们在 HumanApprovalNode 暂停时进行持久化,那么恢复后它会再次等待事件。
# 为了演示,我们将 HumanApprovalNode 的初始状态改为 PAUSED 并持久化
# 模拟在 HumanApprovalNode 暂停时进行持久化
# (需要修改 main 函数的顺序,在第一次 run_graph_step 后立即持久化)
print("n--- 演示在暂停状态下持久化并恢复 ---")
event_emitter_v2 = EventEmitter()
orchestrator_v2 = GraphOrchestrator(event_emitter_v2)
start_node_v2 = StartNode(node_id="v2_n1", name="Start Node V2")
process_data_node_v2 = ProcessDataNode(node_id="v2_n2", name="Process Data V2")
approval_node_v2 = HumanApprovalNode(node_id="v2_n3", name="Approve Request V2")
end_node_v2 = EndNode(node_id="v2_n4", name="End Node V2")
orchestrator_v2.add_node(start_node_v2)
orchestrator_v2.add_node(process_data_node_v2)
orchestrator_v2.add_node(approval_node_v2)
orchestrator_v2.add_node(end_node_v2)
orchestrator_v2.add_dependency(start_node_v2.node_id, process_data_node_v2.node_id)
orchestrator_v2.add_dependency(process_data_node_v2.node_id, approval_node_v2.node_id)
orchestrator_v2.add_dependency(approval_node_v2.node_id, end_node_v2.node_id)
orchestrator_v2._node_output_data[start_node_v2.node_id] = {"document_id": "DOC-XYZ-456", "amount": 200}
await orchestrator_v2.run_graph_step() # 运行到审批节点暂停
print(f"nV2 图运行到暂停点,当前状态: {orchestrator_v2.get_node_status()}")
print(f"V2 图整体状态: {orchestrator_v2.get_graph_status()}")
if orchestrator_v2.get_graph_status() == "PAUSED":
print("n--- V2 图在暂停状态下进行持久化 ---")
serialized_state_v2 = orchestrator_v2.serialize_graph_state()
print("n--- 从 V2 持久化状态恢复一个新的图 ---")
# 需要重新注册事件监听器,因为事件总线是新的
restored_orchestrator_v2 = GraphOrchestrator.deserialize_graph_state(serialized_state_v2, event_emitter_v2)
# 恢复后,需要重新将暂停节点的事件监听器绑定到新的事件总线
# 在 deserialize_graph_state 中,我简化了这部分,实际生产中需要更复杂的逻辑来重建这些回调。
# 假设 HumanApprovalNode.create_from_state 能够隐式地在内部处理事件监听的注册。
# 这里为了演示,我们手动重新注册一下。
for node_id, node in restored_orchestrator_v2._paused_nodes.items():
if isinstance(node, HumanApprovalNode): # 确保是审批节点
# 重建原始输入数据上下文
original_input_data_for_resume = restored_orchestrator_v2._node_output_data.get(node_id, {})
async def _node_event_handler_v2(event_data_from_emitter: Dict[str, Any]):
full_event_data = original_input_data_for_resume.copy()
full_event_data.update(event_data_from_emitter)
await restored_orchestrator_v2._handle_event_for_node(node.node_id, full_event_data)
# 假设 HumanApprovalNode 在暂停时返回的 wait_event_type 可以从其内部状态中恢复
# 或者直接从 serialized_state["nodes"][node_id]["internal_context"]["wait_event_type"] 获取
wait_event_type = f"approval_event_{node.node_id}" # 示例中是根据 node_id 生成
event_emitter_v2.on(wait_event_type, _node_event_handler_v2)
print(f"n恢复后的 V2 图状态: {restored_orchestrator_v2.get_node_status()}")
print(f"恢复后 V2 图整体状态: {restored_orchestrator_v2.get_graph_status()}")
print("n--- 模拟外部系统对恢复后的 V2 图发出审批通过事件 ---")
approval_event_data_v2 = {
"status": "approved",
"approved_by": "Jane Doe",
"timestamp": "2023-10-27T10:30:00Z",
"original_input_data": {"document_id": "DOC-XYZ-456", "amount": 200} # 恢复时需要原始输入
}
await event_emitter_v2.emit(f"approval_event_{approval_node_v2.node_id}", approval_event_data_v2)
print(f"n恢复并事件触发后的 V2 图状态: {restored_orchestrator_v2.get_node_status()}")
print(f"恢复并事件触发后 V2 图整体状态: {restored_orchestrator_v2.get_graph_status()}")
print("n--- 演示结束 ---")
if __name__ == "__main__":
asyncio.run(main())
运行上述代码,你将看到如下的日志输出(部分):
- 流程启动,
Start Node和Process Data节点依次执行完成。 Approve Request节点开始执行,但因为它需要人工审批,所以会输出“正在等待人工审批…”并暂停。图调度器会将其标记为PAUSED。- 此时,图整体状态为
PAUSED,调度器会等待外部事件。 - 模拟的审批通过事件被
event_emitter发布。 GraphOrchestrator收到事件,识别出这是Approve Request节点正在等待的事件,于是调用其resume方法。Approve Request节点恢复执行,根据事件数据得知审批通过,然后继续执行并完成。- 接着,
End Node节点因为依赖满足而执行,最终图完成。 - 随后,演示了在节点暂停时进行持久化,然后从持久化状态恢复,并再次通过外部事件驱动其完成。
高级考量
1. 超时与重试
如果一个节点无限期地等待一个外部事件,那可能会导致死锁或资源浪费。因此,必须引入超时机制。
- 设计: 在
NodeResult中增加wait_timeout_seconds字段。调度器在将节点标记为PAUSED时,启动一个定时器。如果定时器触发而事件未到,则将节点状态更新为FAILED或TIMED_OUT。 - 重试: 对于因超时或临时错误而失败的节点,可以配置重试策略(例如指数退避)。这需要在
GraphOrchestrator中添加重试计数和逻辑。
2. 错误处理与补偿
复杂的业务流程需要健壮的错误处理。
- 错误传播: 一个节点的失败应能正确地传播到其依赖的后续节点,并可能导致整个子图或主图的失败。
- 补偿机制: 如果一个流程执行到一半失败,可能需要回滚之前已完成的操作。这可以通过定义补偿节点(Compensation Node)来实现,当主流程失败时,调度器触发对应的补偿流程。
- 人工干预: 对于无法自动解决的错误,应提供接口允许人工介入,修改状态或手动触发恢复。
3. 分布式环境下的挑战
在微服务或分布式系统中实现 ‘Wait-for-Event’ 模式,复杂度会显著增加:
- 共享状态:
GraphOrchestrator的状态(节点状态、依赖关系、输出数据)需要是分布式且可持久化的。通常会使用共享数据库(如PostgreSQL, MongoDB)或专门的分布式状态存储。 - 事件交付保证:
EventEmitter需要升级为分布式消息队列(如 Kafka, RabbitMQ, AWS SQS),提供至少一次(at-least-once)或恰好一次(exactly-once)的事件交付语义,以确保事件不会丢失或重复处理。 - 幂等性: 节点的操作必须是幂等的。即使事件重复到达,节点也能安全地处理,不会产生副作用。这通常通过在事件中包含唯一ID和在节点内部记录已处理的事件ID来实现。
- 并发控制: 多个调度器实例可能同时尝试操作同一个图或节点。需要分布式锁来避免竞态条件。
- 高可用性与容错: 调度器本身也可能失败。需要设计冗余和故障转移机制。
- 状态机的外部化: 在更复杂的场景下,可以将整个图的状态机建模并外部化到专门的工作流引擎(如 Apache Airflow, Temporal, AWS Step Functions),它们天然支持持久化、暂停、恢复和分布式执行。
4. 幂等性再探
在 Wait-for-Event 模式中,幂等性尤其重要。外部系统可能会因为网络抖动或其他原因重发事件。
- 事件去重: 外部事件应携带一个唯一的事务ID或消息ID。事件总线或节点在处理前检查此ID,如果已处理过,则忽略。
- 操作的幂等性: 节点内部的操作设计为多次执行效果相同。例如,更新数据库记录时使用
UPSERT操作,而不是简单的INSERT。
总结与展望
‘Wait-for-Event’ 模式是构建响应式、容错、可扩展异步系统的基石。它赋予了异步图处理复杂、长时间运行、需要外部交互的业务流程的能力,而无需牺牲系统的并发性和吞吐量。通过精心设计的状态化节点、强大的事件总线和智能的图调度器,我们可以构建出能够优雅地暂停、等待并恢复执行的复杂工作流。
展望未来,随着微服务、无服务器架构和事件驱动范式的普及,理解并掌握 ‘Wait-for-Event’ 模式将变得更加关键。它不仅是实现人机协作、跨服务协调的有效手段,更是构建弹性分布式系统的核心智慧。深入实践此模式,将极大地提升我们驾驭复杂系统设计的能力。