各位同仁,下午好!
今天,我们将深入探讨一个在复杂系统设计中极具价值的模式——动态节点分支(Dynamic Node Branching)。我们将特别关注如何利用 Pydantic 这一强大的数据验证和设置管理库,来构建灵活、可配置且易于维护的执行路径选择逻辑。作为一名编程专家,我将以讲座的形式,结合大量的代码示例和严谨的逻辑,为大家揭示这一模式的奥秘。
I. 序言:动态节点分支的魅力
在软件工程中,我们经常面临需要根据运行时数据或外部配置来决定程序执行路径的场景。最常见的做法是使用一系列 if/else if/else 语句,或者 switch/case 结构。然而,当这些决策逻辑变得复杂、分支条件增多、或者需要频繁修改时,传统的硬编码方式就会暴露出其弊端:
- 可维护性差: 业务逻辑与控制流紧密耦合,修改一个条件可能需要修改多处代码。
- 扩展性受限: 增加新的分支或条件,往往需要修改现有代码,违反开放/封闭原则。
- 可读性下降: 冗长的
if/else链条使得代码难以理解和追踪。 - 难以配置: 决策逻辑嵌入在代码中,无法通过外部配置文件动态调整。
动态节点分支模式应运而生,旨在解决这些痛点。它将程序的执行流抽象为一系列“节点”和连接这些节点的“分支”。每个节点代表一个操作或一个决策点,而分支则定义了从一个节点到另一个节点的转换规则,这些规则通常由运行时评估的条件驱动。这里的“动态”体现在:
- 动态配置: 节点的类型、属性以及分支的条件可以从外部配置(如 JSON, YAML, 数据库)中加载,而不是硬编码。
- 动态评估: 分支的条件在运行时根据当前上下文数据进行评估,从而决定下一步的执行路径。
- 动态演进: 业务规则和流程的变化可以通过修改配置而非代码来实现,极大地提高了系统的灵活性和响应速度。
这种模式在工作流引擎、业务规则管理系统、智能决策系统、AI Agent 路径规划以及复杂请求路由等领域有着广泛的应用。
II. Pydantic 核心特性回顾:为动态配置奠基
Pydantic 是一个基于 Python 类型提示的数据验证和设置管理库。它通过将数据模型化为 Python 类,并利用类型提示来自动进行数据验证和解析。这使得 Pydantic 成为了构建动态节点分支配置的理想工具。我们将重点回顾几个对其至关重要的特性:
- 数据验证与解析: Pydantic 模型能够自动将原始数据(如 JSON 字典)解析并验证为具有强类型定义的 Python 对象。这确保了我们加载的配置数据的结构和类型是正确的。
- 类型提示: Pydantic 充分利用 Python 的类型提示系统,使代码更具可读性和可维护性。
- 模型继承与多态: 这是实现动态节点分支的关键。Pydantic 允许我们定义基类,然后通过子类继承来创建不同类型的节点或条件。配合
Field(discriminator='type'),Pydantic 能够在运行时根据配置中的一个特定字段(通常是type)来自动实例化正确的子类。
让我们通过一个简单的例子来回顾 discriminator 的用法:
from pydantic import BaseModel, Field
from typing import Union
# 定义一个基类 Shape
class Shape(BaseModel):
id: str
name: str
# 定义两个子类 Circle 和 Rectangle
class Circle(Shape):
type: str = "circle" # 判别字段
radius: float
class Rectangle(Shape):
type: str = "rectangle" # 判别字段
width: float
height: float
# 定义一个包含多种 Shape 的容器
class Drawing(BaseModel):
shapes: list[Union[Circle, Rectangle]] = Field(discriminator='type')
# 示例数据
drawing_data = {
"shapes": [
{"id": "c1", "name": "Red Circle", "type": "circle", "radius": 10.0},
{"id": "r1", "name": "Blue Rect", "type": "rectangle", "width": 5.0, "height": 8.0}
]
}
# 解析数据
drawing = Drawing.model_validate(drawing_data)
# 验证解析结果
for shape in drawing.shapes:
print(f"Shape ID: {shape.id}, Name: {shape.name}")
if isinstance(shape, Circle):
print(f" Type: Circle, Radius: {shape.radius}")
elif isinstance(shape, Rectangle):
print(f" Type: Rectangle, Width: {shape.width}, Height: {shape.height}")
# 尝试错误数据
try:
Drawing.model_validate({"shapes": [{"id": "s1", "name": "Unknown", "type": "triangle", "side": 3}]})
except Exception as e:
print(f"nError with invalid type: {e}")
在这个例子中,Drawing 模型通过 Field(discriminator='type') 告诉 Pydantic,在解析 shapes 列表时,它应该根据每个字典中的 type 字段的值来决定实例化 Circle 还是 Rectangle。这正是我们构建动态节点和条件的核心机制。
III. 构建动态节点:基础模型
首先,我们需要定义构成工作流的基本元素——节点。每个节点都有一个唯一的标识符,并且可能包含一些描述信息。节点可以有不同的类型,例如执行某个操作的节点、根据条件进行决策的节点,或者表示流程结束的节点。
我们将使用 Pydantic 的多态特性来定义这些不同类型的节点。
from pydantic import BaseModel, Field, ValidationError
from typing import Dict, Any, Optional, List, Union, Type
import json
# --- 1. 定义执行上下文 ---
# 执行上下文用于在节点之间传递数据和状态
class ExecutionContext(BaseModel):
"""
执行上下文,存储流程运行时的所有相关数据。
"""
data: Dict[str, Any] = Field(default_factory=dict)
current_node_id: Optional[str] = None
status: str = "RUNNING" # RUNNING, COMPLETED, FAILED
error_message: Optional[str] = None
def update_data(self, key: str, value: Any):
"""更新上下文数据"""
self.data[key] = value
def get_data(self, key: str, default: Any = None) -> Any:
"""获取上下文数据"""
return self.data.get(key, default)
def __str__(self):
return f"Context(current={self.current_node_id}, status={self.status}, data={self.data})"
# --- 2. 定义节点基类 ---
class NodeBase(BaseModel):
"""
所有节点的基类。
包含节点ID和名称。
"""
id: str = Field(description="节点的唯一标识符")
name: str = Field(description="节点的名称")
description: Optional[str] = Field(None, description="节点的描述")
type: str = Field(..., description="节点的类型,用于Pydantic的判别器")
def execute(self, context: ExecutionContext) -> Optional[str]:
"""
执行节点逻辑。
子类必须实现此方法。
返回下一个节点的ID,或None表示流程结束。
"""
raise NotImplementedError("子类必须实现 execute 方法")
# --- 3. 定义具体节点类型 ---
# 3.1. 动作节点 (ActionNode)
class ActionNode(NodeBase):
"""
执行特定操作的节点。
例如:发送邮件、调用API、数据处理等。
"""
type: str = "action"
action_type: str = Field(description="具体动作的类型,例如 'log', 'api_call', 'data_transform'")
action_params: Dict[str, Any] = Field(default_factory=dict, description="动作的参数")
def execute(self, context: ExecutionContext) -> Optional[str]:
print(f"Executing ActionNode '{self.name}' (ID: {self.id}) with action '{self.action_type}' and params: {self.action_params}")
# 实际的动作逻辑会根据 action_type 和 action_params 实现
if self.action_type == "log":
message = self.action_params.get("message", "No message provided.")
print(f"Log Message: {message}. Current context data: {context.data}")
context.update_data("last_action_result", f"Logged: {message}")
elif self.action_type == "increment_counter":
counter_key = self.action_params.get("key", "counter")
increment_by = self.action_params.get("value", 1)
current_value = context.get_data(counter_key, 0)
context.update_data(counter_key, current_value + increment_by)
print(f"Incremented {counter_key} to {context.get_data(counter_key)}")
context.update_data("last_action_result", f"Incremented {counter_key}")
elif self.action_type == "set_value":
key = self.action_params.get("key")
value = self.action_params.get("value")
if key:
context.update_data(key, value)
print(f"Set context data '{key}' to '{value}'")
context.update_data("last_action_result", f"Set {key} to {value}")
else:
print("Warning: 'key' not provided for 'set_value' action.")
context.update_data("last_action_result", "Failed to set value: no key")
else:
print(f"Unknown action type: {self.action_type}. Doing nothing.")
context.update_data("last_action_result", f"Unknown action: {self.action_type}")
# 动作节点通常直接指向下一个节点,或者由外部定义其出边
# 在这里,我们假设 ActionNode 不直接决定下一个节点,而是由 WorkflowEngine 决定
return None # 由分支决定下一个节点
# 3.2. 结束节点 (EndNode)
class EndNode(NodeBase):
"""
表示工作流的结束。
"""
type: str = "end"
result_message: Optional[str] = Field(None, description="流程结束时的消息")
def execute(self, context: ExecutionContext) -> Optional[str]:
print(f"Executing EndNode '{self.name}' (ID: {self.id}). Result: {self.result_message}")
context.status = "COMPLETED"
if self.result_message:
context.update_data("workflow_result_message", self.result_message)
return None # 结束节点没有下一个节点
# 3.3. 决策节点 (DecisionNode) - 后面会详细定义其分支
# 这里先定义一个占位符,它将依赖于分支和条件模型
class DecisionNode(NodeBase):
"""
根据条件评估决定下一个执行路径的节点。
实际的分支逻辑将通过其关联的 Branch 定义。
"""
type: str = "decision"
def execute(self, context: ExecutionContext) -> Optional[str]:
# 决策节点的执行逻辑主要是评估条件并选择分支
# 这部分逻辑将由 WorkflowEngine 来处理,因为它需要知道所有分支
print(f"Executing DecisionNode '{self.name}' (ID: {self.id}). Evaluating branches...")
return None # 由 WorkflowEngine 负责根据条件选择下一个节点
代码解析:
ExecutionContext: 这是一个非常重要的模型,它承载了整个工作流执行过程中的状态和数据。节点之间通过它来共享信息。NodeBase: 定义了所有节点的通用属性 (id,name,description,type) 和一个抽象的execute方法。type字段是 Pydanticdiscriminator的关键。ActionNode: 继承自NodeBase,type固定为"action"。它包含action_type和action_params来描述具体的动作。其execute方法模拟了不同的操作,并更新ExecutionContext。EndNode: 同样继承自NodeBase,type为"end"。它标记了流程的终点,并更新上下文状态。DecisionNode: 继承自NodeBase,type为"decision"。它本身不执行复杂逻辑,而是依赖于外部定义的分支和条件来选择路径。
IV. 定义动态分支与条件
有了节点,我们还需要定义如何从一个节点“分支”到另一个节点。分支的决定通常基于某些条件,这些条件会在运行时评估上下文数据。
我们将使用 Pydantic 的多态性来定义不同类型的条件。
# --- 4. 定义条件基类 ---
class ConditionBase(BaseModel):
"""
所有条件的基类。
"""
type: str = Field(..., description="条件的类型,用于Pydantic的判别器")
field: Optional[str] = Field(None, description="要检查的上下文数据字段名")
def evaluate(self, context: ExecutionContext) -> bool:
"""
评估条件是否满足。
子类必须实现此方法。
"""
raise NotImplementedError("子类必须实现 evaluate 方法")
# --- 5. 定义具体条件类型 ---
# 5.1. 等于条件 (EqualsCondition)
class EqualsCondition(ConditionBase):
"""
检查上下文数据字段是否等于指定值。
"""
type: str = "equals"
value: Any = Field(description="要比较的值")
def evaluate(self, context: ExecutionContext) -> bool:
if self.field is None:
return False # 如果没有指定字段,则无法评估
actual_value = context.get_data(self.field)
result = actual_value == self.value
# print(f" Condition '{self.field} == {self.value}' evaluated to {result} (actual: {actual_value})")
return result
# 5.2. 大于条件 (GreaterThanCondition)
class GreaterThanCondition(ConditionBase):
"""
检查上下文数据字段是否大于指定值。
"""
type: str = "greater_than"
value: Union[int, float] = Field(description="要比较的值")
def evaluate(self, context: ExecutionContext) -> bool:
if self.field is None:
return False
actual_value = context.get_data(self.field)
if isinstance(actual_value, (int, float)):
result = actual_value > self.value
# print(f" Condition '{self.field} > {self.value}' evaluated to {result} (actual: {actual_value})")
return result
return False
# 5.3. 包含条件 (ContainsCondition)
class ContainsCondition(ConditionBase):
"""
检查上下文数据字段(列表或字符串)是否包含指定值/子串。
"""
type: str = "contains"
value: Any = Field(description="要检查的值或子串")
def evaluate(self, context: ExecutionContext) -> bool:
if self.field is None:
return False
actual_value = context.get_data(self.field)
if isinstance(actual_value, (list, str)):
result = self.value in actual_value
# print(f" Condition '{self.field} contains {self.value}' evaluated to {result} (actual: {actual_value})")
return result
return False
# 5.4. 组合条件 (AndCondition, OrCondition)
class AndCondition(ConditionBase):
"""
所有子条件都必须为真。
"""
type: str = "and"
conditions: List[
Union[EqualsCondition, GreaterThanCondition, ContainsCondition, 'AndCondition', 'OrCondition']
] = Field(description="子条件列表")
field: Optional[str] = None # 组合条件本身不直接检查字段
def evaluate(self, context: ExecutionContext) -> bool:
# print(f" Evaluating AndCondition for {len(self.conditions)} sub-conditions.")
for cond in self.conditions:
if not cond.evaluate(context):
return False
return True
class OrCondition(ConditionBase):
"""
任一子条件为真即可。
"""
type: str = "or"
conditions: List[
Union[EqualsCondition, GreaterThanCondition, ContainsCondition, AndCondition, 'OrCondition']
] = Field(description="子条件列表")
field: Optional[str] = None # 组合条件本身不直接检查字段
def evaluate(self, context: ExecutionContext) -> bool:
# print(f" Evaluating OrCondition for {len(self.conditions)} sub-conditions.")
for cond in self.conditions:
if cond.evaluate(context):
return True
return False
# --- 6. 定义分支 (Branch) ---
class Branch(BaseModel):
"""
定义从一个节点到另一个节点的转换规则。
"""
target_node_id: str = Field(description="目标节点的ID")
condition: Optional[
Union[EqualsCondition, GreaterThanCondition, ContainsCondition, AndCondition, OrCondition]
] = Field(None, discriminator='type', description="评估是否采取此分支的条件")
priority: int = Field(0, description="分支优先级,数字越大优先级越高")
def should_take(self, context: ExecutionContext) -> bool:
"""
判断是否应该采取此分支。
如果 condition 为 None,则表示无条件分支(默认或兜底)。
"""
if self.condition is None:
return True
return self.condition.evaluate(context)
# Pydantic 2 的 forward reference 机制
# Pydantic 1.x 使用 update_forward_refs()
# Pydantic 2.x 自动处理,但为了清晰性,可以显式声明
# AndCondition.model_rebuild()
# OrCondition.model_rebuild()
代码解析:
ConditionBase: 所有条件的基类,定义了type字段和抽象的evaluate方法。- 具体条件类 (
EqualsCondition,GreaterThanCondition,ContainsCondition): 继承自ConditionBase,实现了各自的evaluate方法,根据上下文数据和自身属性进行逻辑判断。 - 组合条件 (
AndCondition,OrCondition): 这些条件可以嵌套,允许构建非常复杂的逻辑。它们同样继承自ConditionBase,但其conditions字段是一个列表,包含了其他ConditionBase的子类型。 Branch: 定义了一个从当前节点到target_node_id的潜在路径。它包含一个condition字段,只有当这个条件评估为真时,才考虑这条分支。priority字段允许在多个条件都满足时,选择优先级最高的分支。condition字段同样使用了discriminator='type'来实现多态。
V. 核心:动态节点分支的实现
现在我们有了节点和分支的定义,接下来需要一个机制来组织这些元素,并驱动整个工作流的执行。这就是 Workflow 模型和 WorkflowEngine 的职责。
# --- 7. 定义工作流 (Workflow) ---
class Workflow(BaseModel):
"""
定义一个完整的工作流,包含所有节点和它们之间的分支。
"""
id: str = Field(description="工作流的唯一标识符")
name: str = Field(description="工作流的名称")
start_node_id: str = Field(description="工作流的起始节点ID")
nodes: List[
Union[ActionNode, EndNode, DecisionNode]
] = Field(discriminator='type', description="工作流中包含的所有节点")
branches: List[Branch] = Field(default_factory=list, description="工作流中定义的所有分支")
def get_node_by_id(self, node_id: str) -> Optional[NodeBase]:
"""根据ID获取节点对象"""
for node in self.nodes:
if node.id == node_id:
return node
return None
def get_branches_from_node(self, source_node_id: str) -> List[Branch]:
"""获取从指定节点发出的所有分支"""
# 在实际应用中,branch 应该与 source_node_id 关联
# 这里为了简化,我们假设 branches 列表中的所有分支都是从 DecisionNode 发出的,
# 或者我们定义一个更明确的 Branch 模型,包含 source_node_id
# 为了更严谨,我们修改 Branch 模型,使其包含 source_node_id
# 但为了与现有示例兼容,我们暂时通过一个映射来模拟
# 实际的 Workflow 定义会是这样:
# nodes: Dict[str, Union[ActionNode, EndNode, DecisionNode]]
# edges: Dict[str, List[Branch]] # key是source_node_id
# 为了简化,我们暂时让 WorkflowEngine 来处理查找分支的逻辑,
# 假设分支是全局的,并且其 condition 决定了其适用性
# 或者更直接的方式:每个 DecisionNode 内部维护自己的分支列表
# 考虑到 Pydantic 的多态性,让 DecisionNode 拥有自己的分支会更清晰
# 让我们修改 DecisionNode
return [] # 这里不再需要,因为 DecisionNode 会管理自己的分支
# --- 7.1. 重新定义 DecisionNode 以包含分支 ---
class DecisionNode(NodeBase):
"""
根据条件评估决定下一个执行路径的节点。
"""
type: str = "decision"
# 将分支直接定义在 DecisionNode 内部,使其自包含
branches: List[Branch] = Field(default_factory=list, description="从该决策节点发出的所有分支")
def execute(self, context: ExecutionContext) -> Optional[str]:
print(f"Executing DecisionNode '{self.name}' (ID: {self.id}). Evaluating branches...")
# 决策节点的执行逻辑主要是评估条件并选择分支
# 这里,execute 方法不再返回下一个节点ID,而是由 WorkflowEngine 负责选择
return None # 由 WorkflowEngine 负责根据条件选择下一个节点
# 重新构建 Workflow 模型以使用新的 DecisionNode 定义
class Workflow(BaseModel):
id: str
name: str
start_node_id: str
nodes: List[
Union[ActionNode, EndNode, DecisionNode]
] = Field(discriminator='type', description="工作流中包含的所有节点")
def get_node_by_id(self, node_id: str) -> Optional[NodeBase]:
for node in self.nodes:
if node.id == node_id:
return node
return None
# --- 8. 定义工作流引擎 ---
class WorkflowEngine:
"""
工作流引擎,负责加载工作流定义并驱动其执行。
"""
def __init__(self, workflow_definition: Workflow):
self.workflow = workflow_definition
self.node_map: Dict[str, NodeBase] = {node.id: node for node in self.workflow.nodes}
def _get_next_node_id(self, current_node: NodeBase, context: ExecutionContext) -> Optional[str]:
"""
根据当前节点类型和上下文,决定下一个节点的ID。
"""
if isinstance(current_node, EndNode):
return None # 结束节点没有下一个
elif isinstance(current_node, ActionNode):
# 动作节点通常是简单的顺序流,或者由Workflow定义出边
# 这里我们假设 ActionNode 后面只有一个默认的下一跳,
# 或者由 Workflow 定义的全局分支来决定,但为了简化,我们暂时让 ActionNode 总是继续
# 实际场景中,ActionNode 也可以有自己的 output_ports 或 default_next_node_id
# 这里我们通过 WorkflowEngine 查找与当前 ActionNode ID 匹配的默认分支
# 或者,更常见的是在 ActionNode 的配置中直接指定 next_node_id
# 让我们修改 ActionNode 以支持默认的 next_node_id
# ActionNode 暂不处理分支,直接由全局配置决定
pass # 留空,由外部的 DecisionNode 或 WorkflowEngine 决定
elif isinstance(current_node, DecisionNode):
# 决策节点需要评估其所有分支
eligible_branches = []
for branch in current_node.branches:
if branch.should_take(context):
eligible_branches.append(branch)
if not eligible_branches:
print(f" No eligible branch found from DecisionNode '{current_node.id}'. Workflow might stall or fail.")
context.status = "FAILED"
context.error_message = f"No eligible branch from decision node '{current_node.id}'"
return None
# 按照优先级排序,选择优先级最高的(如果多个条件满足)
eligible_branches.sort(key=lambda b: b.priority, reverse=True)
chosen_branch = eligible_branches[0]
print(f" DecisionNode '{current_node.id}' chose branch to '{chosen_branch.target_node_id}' (Priority: {chosen_branch.priority})")
return chosen_branch.target_node_id
# 如果当前节点既不是 EndNode 也不是 DecisionNode,
# 且没有明确的下一个节点,则流程结束或出错。
# 这里需要一个机制来定义 ActionNode 的默认出边。
# 为了简化,我们假设 ActionNode 后面的下一个节点ID会在工作流定义中隐式或显式地指定
# 在本例中,我们将让 WorkflowEngine 直接寻找与 ActionNode 相关的分支,
# 或者在 ActionNode 中增加一个 next_node_id 字段。
# 让我们把 ActionNode 也改为可以有自己的 branches (简单的next_node_id)
# 为避免复杂,我们让 ActionNode 默认总是继续到配置中的下一个节点 (如果定义了)
# 更简洁的方法是:WorkflowEngine 有一个全局的 node_transitions 映射
# 为了保持节点纯粹性,我们让 WorkflowEngine 负责根据全局分支列表寻找下一个节点。
# 最终决定:DecisionNode 内部管理分支,ActionNode 则依赖于全局的隐式流程,
# 或者在 WorkflowEngine 层面维护一个 ActionNode 的默认下一跳逻辑。
# 为了本例的严谨性,ActionNode 在 execute 结束后,我们让 WorkflowEngine 来查找与它关联的默认分支。
# 假设所有非 DecisionNode 的节点,如果它们后面没有明确的 DecisionNode,则有一个默认的下一跳。
# 暂时,我们假设 ActionNode 执行完后,如果没有明确的下一跳,则流程结束,
# 或者由一个默认的 "next_node_id" 属性来指引。
# 让我们给 ActionNode 添加一个 optional 的 default_next_node_id
return None # 如果没有明确的下一个节点,流程结束
def run(self, initial_context_data: Optional[Dict[str, Any]] = None) -> ExecutionContext:
"""
运行工作流。
"""
context = ExecutionContext(data=initial_context_data if initial_context_data else {})
current_node_id = self.workflow.start_node_id
if current_node_id not in self.node_map:
context.status = "FAILED"
context.error_message = f"Start node '{current_node_id}' not found."
print(context.error_message)
return context
max_iterations = 100 # 防止无限循环
iteration_count = 0
while current_node_id is not None and context.status == "RUNNING" and iteration_count < max_iterations:
iteration_count += 1
context.current_node_id = current_node_id
current_node = self.node_map[current_node_id]
print(f"n--- Current Node: {current_node.name} (ID: {current_node.id}) ---")
print(f"Context before execution: {context.data}")
# 执行当前节点逻辑
try:
current_node.execute(context)
except Exception as e:
context.status = "FAILED"
context.error_message = f"Error executing node '{current_node.id}': {e}"
print(context.error_message)
break # 退出循环
# 决定下一个节点
next_node_id = None
if isinstance(current_node, DecisionNode):
# DecisionNode 逻辑已在 _get_next_node_id 中处理
next_node_id = self._get_next_node_id(current_node, context)
elif isinstance(current_node, ActionNode):
# 对于 ActionNode,我们需要一个机制来指定下一个节点
# 最简单的方式是让 Workflow 定义一个全局的顺序或默认分支
# 暂时,我们假设 ActionNode 执行完后,如果没有明确的下一跳,流程结束。
# 或者,我们可以设计成 ActionNode 也有一个 default_next_node_id 属性。
# 为了保持统一性,我们让 WorkflowEngine 查找 ActionNode 的下一跳。
# 假设所有非 DecisionNode 的节点,如果有且只有一个出边(默认分支),
# 或者由 WorkflowEngine 维护一个 ActionNode 的显式下一个节点映射。
# 为了简化本例,我们假设 ActionNode 后面总是跟随一个唯一的下一跳
# 这个下一跳不是通过分支条件判断,而是通过一个简单的映射或配置。
# 让我们为 Workflow 模型增加一个 `node_transitions` 字典
# 为了不重新修改 Workflow 定义,我们假设 ActionNode 的下一个节点是其在 nodes 列表中的下一个,
# 或者是一个在 WorkflowEngine 中硬编码的简单映射。
# 更优雅的方式是让 Workflow 定义一个 transitions 字典,
# 例如 { "node_id": "next_node_id" } 或 { "node_id": [branch1, branch2] }
# 为了快速实现,我们暂时假设 ActionNode 总是指向列表中下一个节点,直到遇到 DecisionNode 或 EndNode
# 更好的方式:让 Workflow 模型在加载时构建一个更完整的图结构
# 但为了展示 Pydantic 的多态性,我们保持 Workflow 结构简单
# 最终决定:ActionNode 和 EndNode 自身不决定下一跳。
# WorkflowEngine 负责查找下一个节点。
# 对于 ActionNode,我们假设其总是有一个默认的下一跳,
# 但这里我们需要一个全局的映射来定义这个。
# 考虑到 DecisionNode 已经自包含分支,那么 ActionNode 的下一跳
# 应该由 WorkflowEngine 从某种全局映射中获取。
# 让我们在 WorkflowEngine 内部维护一个简单的默认下一跳逻辑,
# 仅用于 ActionNode,直到遇到 EndNode 或 DecisionNode。
# 这个实现会比较 hacky,理想情况是 Workflow 定义一个更完整的图。
# 为了严谨性,我们必须修改 Workflow 模型,让它包含边 (edges) 的定义。
# 让我们回到 Workflow 模型,增加一个 `edges` 属性。
# 重构 `Workflow` 模型,使其包含明确的边定义
# 每一条边都从一个源节点指向一个目标节点,并可以包含条件。
# 但考虑到我们已经将分支集成到 DecisionNode 中,
# 那么对于 ActionNode,我们需要一个简单的 `next_node_id` 属性。
# 让我们修改 ActionNode。
# --- 3.1. 重新定义 ActionNode ---
class ActionNode(NodeBase):
type: str = "action"
action_type: str
action_params: Dict[str, Any] = Field(default_factory=dict)
# 动作节点可以有一个默认的下一跳
next_node_id: Optional[str] = None # 如果为None,表示流程结束或需要外部定义
def execute(self, context: ExecutionContext) -> Optional[str]:
# ... (同上) ...
print(f" ActionNode '{self.name}' finished. Next node hint: {self.next_node_id}")
return self.next_node_id # ActionNode 直接提供下一跳ID
# WorkFlow 模型不需要大改,因为它已经通过 discriminator 处理了节点类型
next_node_id = current_node.next_node_id if isinstance(current_node, ActionNode) else None # 从ActionNode获取下一跳
elif isinstance(current_node, EndNode):
next_node_id = None # EndNode 终结流程
current_node_id = next_node_id # 更新当前节点
if current_node_id is not None and current_node_id not in self.node_map:
context.status = "FAILED"
context.error_message = f"Next node '{current_node_id}' not found."
print(context.error_message)
break
if iteration_count >= max_iterations:
context.status = "FAILED"
context.error_message = "Workflow exceeded max iterations, possibly an infinite loop."
print(context.error_message)
if current_node_id is None and context.status == "RUNNING":
context.status = "COMPLETED" # 正常结束
print(f"n--- Workflow Finished ---")
print(f"Final Status: {context.status}")
print(f"Final Context Data: {context.data}")
if context.error_message:
print(f"Error: {context.error_message}")
return context
# 重新构建 Workflow 模型 (不需要修改,因为它通过 Union 和 discriminator 已经可以处理新的 ActionNode)
# 确保所有前向引用都处理完毕
ActionNode.model_rebuild()
DecisionNode.model_rebuild()
AndCondition.model_rebuild()
OrCondition.model_rebuild()
代码解析:
ActionNode增加了next_node_id: 这是为了让ActionNode也能指引流程的走向,避免WorkflowEngine需要进行复杂的全局查找。如果next_node_id为None,则表示该ActionNode是一个流程的死胡同,或者需要DecisionNode来接管。Workflow: 定义了整个工作流的结构,包括id,name,start_node_id以及一个nodes列表。nodes列表使用了 Pydantic 的discriminator特性,能够自动解析不同类型的节点。WorkflowEngine: 这是整个系统的核心执行器。- 它接收一个
Workflow对象作为定义。 _get_next_node_id方法是关键,它根据当前节点的类型和上下文数据来决定下一个节点的 ID。- 对于
EndNode,没有下一个节点。 - 对于
ActionNode,直接使用其next_node_id属性。 - 对于
DecisionNode,它遍历该节点内部定义的所有Branch,评估其条件。如果多个分支满足,则选择优先级最高的那个。
- 对于
run方法是主循环,它从start_node_id开始,不断执行当前节点的execute方法,然后通过_get_next_node_id确定下一个节点,直到流程结束、失败或达到最大迭代次数。ExecutionContext在整个流程中传递,保存和更新状态。
- 它接收一个
VI. 案例分析:一个简单的审批流程
现在,让我们通过一个实际的审批流程来演示动态节点分支的威力。
场景描述:
一个简单的用户申请审批流程:
- 用户提交申请。
- 系统检查申请金额:
- 如果金额 <= 1000,自动批准。
- 如果金额 > 1000 且 <= 5000,需要部门经理审批。
- 如果金额 > 5000,需要高级经理审批。
- 如果需要审批,审批完成后流程结束。
- 最终记录审批结果。
工作流定义 (JSON 配置):
{
"id": "approval_workflow_v1",
"name": "动态审批流程",
"start_node_id": "start_application",
"nodes": [
{
"id": "start_application",
"name": "开始申请",
"type": "action",
"action_type": "log",
"action_params": {
"message": "申请已提交,准备评估。"
},
"next_node_id": "evaluate_amount"
},
{
"id": "evaluate_amount",
"name": "评估申请金额",
"type": "decision",
"description": "根据申请金额决定审批路径",
"branches": [
{
"target_node_id": "auto_approve",
"condition": {
"type": "equals",
"field": "amount",
"value": 0
},
"priority": 100
},
{
"target_node_id": "auto_approve",
"condition": {
"type": "greater_than",
"field": "amount",
"value": -1
},
"priority": 90
},
{
"target_node_id": "auto_approve",
"condition": {
"type": "and",
"conditions": [
{ "type": "greater_than", "field": "amount", "value": 0 },
{ "type": "greater_than", "field": "amount", "value": 1000 }
]
},
"priority": 80
},
{
"target_node_id": "auto_approve",
"condition": {
"type": "greater_than",
"field": "amount",
"value": 1000
},
"priority": 70
},
{
"target_node_id": "auto_approve",
"condition": {
"type": "less_than_or_equals",
"field": "amount",
"value": 1000
},
"priority": 60
},
{
"target_node_id": "auto_approve",
"condition": {
"type": "equals",
"field": "amount",
"value": 1000
},
"priority": 50
},
{
"target_node_id": "manager_approve",
"condition": {
"type": "and",
"conditions": [
{ "type": "greater_than", "field": "amount", "value": 1000 },
{ "type": "less_than_or_equals", "field": "amount", "value": 5000 }
]
},
"priority": 40
},
{
"target_node_id": "senior_manager_approve",
"condition": {
"type": "greater_than",
"field": "amount",
"value": 5000
},
"priority": 30
},
{
"target_node_id": "reject_application",
"condition": null,
"priority": 0
}
]
},
{
"id": "auto_approve",
"name": "自动批准",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED"
},
"next_node_id": "record_result"
},
{
"id": "manager_approve",
"name": "经理审批",
"type": "action",
"action_type": "log",
"action_params": {
"message": "需要经理审批,等待审批结果..."
},
"next_node_id": "simulate_manager_decision"
},
{
"id": "simulate_manager_decision",
"name": "模拟经理决策",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED"
},
"next_node_id": "record_result"
},
{
"id": "senior_manager_approve",
"name": "高级经理审批",
"type": "action",
"action_type": "log",
"action_params": {
"message": "需要高级经理审批,等待审批结果..."
},
"next_node_id": "simulate_senior_manager_decision"
},
{
"id": "simulate_senior_manager_decision",
"name": "模拟高级经理决策",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED_BY_SENIOR"
},
"next_node_id": "record_result"
},
{
"id": "reject_application",
"name": "拒绝申请",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "REJECTED"
},
"next_node_id": "record_result"
},
{
"id": "record_result",
"name": "记录审批结果",
"type": "end",
"result_message": "审批流程已完成"
}
]
}
注意: 上述 JSON 中的 evaluate_amount 节点的 branches 定义有些冗余和错误,例如 greater_than 0 和 greater_than 1000 的组合条件。这只是为了演示条件组合,实际中会更精简。同时,less_than_or_equals 这种条件类型需要我们添加,但为了精简代码,我们暂时不添加,而是用 greater_than 的反向逻辑或组合来实现。为了确保示例能运行,我们需要增加 LessThanOrEqualsCondition。
更新条件类型:
# --- 5.5. 小于等于条件 (LessThanOrEqualsCondition) ---
class LessThanOrEqualsCondition(ConditionBase):
"""
检查上下文数据字段是否小于或等于指定值。
"""
type: str = "less_than_or_equals"
value: Union[int, float] = Field(description="要比较的值")
def evaluate(self, context: ExecutionContext) -> bool:
if self.field is None:
return False
actual_value = context.get_data(self.field)
if isinstance(actual_value, (int, float)):
result = actual_value <= self.value
return result
return False
# 重新定义 Union 类型,包含新的条件
ConditionUnion = Union[
EqualsCondition,
GreaterThanCondition,
ContainsCondition,
LessThanOrEqualsCondition, # 添加新条件
AndCondition,
OrCondition
]
# 更新 Branch 和组合条件中的类型提示
Branch.model_fields['condition'].annotation = Optional[ConditionUnion]
AndCondition.model_fields['conditions'].annotation = List[ConditionUnion]
OrCondition.model_fields['conditions'].annotation = List[ConditionUnion]
# Pydantic 2 的 forward reference 机制
# 确保所有模型在定义后都调用 model_rebuild()
AndCondition.model_rebuild()
OrCondition.model_rebuild()
Branch.model_rebuild() # Branch 可能也需要重构以更新其 condition 字段的类型
重新修正 evaluate_amount 节点的 branches 定义,使其更合理和精简:
{
"id": "approval_workflow_v1",
"name": "动态审批流程",
"start_node_id": "start_application",
"nodes": [
{
"id": "start_application",
"name": "开始申请",
"type": "action",
"action_type": "log",
"action_params": {
"message": "申请已提交,准备评估。"
},
"next_node_id": "evaluate_amount"
},
{
"id": "evaluate_amount",
"name": "评估申请金额",
"type": "decision",
"description": "根据申请金额决定审批路径",
"branches": [
{
"target_node_id": "auto_approve",
"condition": {
"type": "less_than_or_equals",
"field": "amount",
"value": 1000
},
"priority": 100
},
{
"target_node_id": "manager_approve",
"condition": {
"type": "and",
"conditions": [
{ "type": "greater_than", "field": "amount", "value": 1000 },
{ "type": "less_than_or_equals", "field": "amount", "value": 5000 }
]
},
"priority": 90
},
{
"target_node_id": "senior_manager_approve",
"condition": {
"type": "greater_than",
"field": "amount",
"value": 5000
},
"priority": 80
},
{
"target_node_id": "reject_application",
"condition": null,
"priority": 0
}
]
},
{
"id": "auto_approve",
"name": "自动批准",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED_AUTO"
},
"next_node_id": "record_result"
},
{
"id": "manager_approve",
"name": "经理审批",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED_MANAGER"
},
"next_node_id": "record_result"
},
{
"id": "senior_manager_approve",
"name": "高级经理审批",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED_SENIOR"
},
"next_node_id": "record_result"
},
{
"id": "reject_application",
"name": "拒绝申请",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "REJECTED"
},
"next_node_id": "record_result"
},
{
"id": "record_result",
"name": "记录审批结果",
"type": "end",
"result_message": "审批流程已完成"
}
]
}
Python 执行代码:
# 假设上述 Pydantic 模型和 WorkflowEngine 已经定义并可用
# 加载工作流定义
workflow_config_json = """
{
"id": "approval_workflow_v1",
"name": "动态审批流程",
"start_node_id": "start_application",
"nodes": [
{
"id": "start_application",
"name": "开始申请",
"type": "action",
"action_type": "log",
"action_params": {
"message": "申请已提交,准备评估。"
},
"next_node_id": "evaluate_amount"
},
{
"id": "evaluate_amount",
"name": "评估申请金额",
"type": "decision",
"description": "根据申请金额决定审批路径",
"branches": [
{
"target_node_id": "auto_approve",
"condition": {
"type": "less_than_or_equals",
"field": "amount",
"value": 1000
},
"priority": 100
},
{
"target_node_id": "manager_approve",
"condition": {
"type": "and",
"conditions": [
{ "type": "greater_than", "field": "amount", "value": 1000 },
{ "type": "less_than_or_equals", "field": "amount", "value": 5000 }
]
},
"priority": 90
},
{
"target_node_id": "senior_manager_approve",
"condition": {
"type": "greater_than",
"field": "amount",
"value": 5000
},
"priority": 80
},
{
"target_node_id": "reject_application",
"condition": null,
"priority": 0
}
]
},
{
"id": "auto_approve",
"name": "自动批准",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED_AUTO"
},
"next_node_id": "record_result"
},
{
"id": "manager_approve",
"name": "经理审批",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED_MANAGER"
},
"next_node_id": "record_result"
},
{
"id": "senior_manager_approve",
"name": "高级经理审批",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "APPROVED_SENIOR"
},
"next_node_id": "record_result"
},
{
"id": "reject_application",
"name": "拒绝申请",
"type": "action",
"action_type": "set_value",
"action_params": {
"key": "approval_status",
"value": "REJECTED"
},
"next_node_id": "record_result"
},
{
"id": "record_result",
"name": "记录审批结果",
"type": "end",
"result_message": "审批流程已完成"
}
]
}
"""
try:
workflow_definition = Workflow.model_validate_json(workflow_config_json)
engine = WorkflowEngine(workflow_definition)
print("--- 运行案例 1: 金额 500 (自动批准) ---")
final_context_500 = engine.run(initial_context_data={"amount": 500, "applicant": "Alice"})
print(f"最终审批状态: {final_context_500.get_data('approval_status')}")
print("n--- 运行案例 2: 金额 3000 (经理审批) ---")
final_context_3000 = engine.run(initial_context_data={"amount": 3000, "applicant": "Bob"})
print(f"最终审批状态: {final_context_3000.get_data('approval_status')}")
print("n--- 运行案例 3: 金额 7000 (高级经理审批) ---")
final_context_7000 = engine.run(initial_context_data={"amount": 7000, "applicant": "Charlie"})
print(f"最终审批状态: {final_context_7000.get_data('approval_status')}")
print("n--- 运行案例 4: 金额 -100 (拒绝 - 默认分支) ---")
final_context_negative = engine.run(initial_context_data={"amount": -100, "applicant": "David"})
print(f"最终审批状态: {final_context_negative.get_data('approval_status')}")
except ValidationError as e:
print(f"Pydantic Validation Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
通过这个例子,我们可以清晰地看到:
- 工作流的定义完全是数据驱动的(JSON)。
- 修改审批规则(例如,调整金额阈值或增加新的审批层级),只需修改 JSON 配置,而无需改动 Python 代码。
- Pydantic 确保了加载的配置是有效的,并且能够正确地实例化出不同类型的节点和条件对象。
WorkflowEngine根据运行时上下文 (amount字段) 动态评估条件,从而选择正确的执行路径。
VII. 进阶:复杂场景与策略
动态节点分支模式具有很强的扩展性,可以应对更复杂的场景:
- 循环与重入:
- 通过将某个节点的
target_node_id指向流程中已经访问过的节点,可以实现循环。 - 需要
ExecutionContext记录循环次数或设置跳出条件,防止无限循环。WorkflowEngine中的max_iterations就是一个简单的防范。
- 通过将某个节点的
- 并行执行:
- Pydantic 模型可以定义“并行分支”节点,其
execute方法可以启动多个子流程或任务。 WorkflowEngine需要支持异步执行和结果聚合机制(例如,等待所有并行分支完成后再继续)。Pydantic 负责定义并行任务的结构,执行层面则需要asyncio或线程池/进程池等。
- Pydantic 模型可以定义“并行分支”节点,其
- 错误处理与回退机制:
- 可以在
Branch中增加on_error_target_node_id字段,当当前节点执行失败时,跳转到错误处理节点。 - 引入
RetryNode,允许在特定条件下重试上一个失败的节点。
- 可以在
- 外部系统集成:
ActionNode的action_type可以映射到实际的外部 API 调用、数据库操作、消息队列发送等。- 可以通过一个注册表来管理
action_type到具体 Python 函数的映射。
- 动态图修改:
- 在某些高级场景中,工作流本身的结构(节点和分支)可能需要在运行时动态生成或修改。
- 这可以通过
Workflow模型的编程创建或更新来实现,但会增加复杂性。
- 决策策略:
- 当前
DecisionNode采用“优先级最高”的匹配策略。可以扩展支持“第一个匹配”、“所有匹配(并行)”、“基于权重选择”等多种决策策略。 - 这可以通过在
DecisionNode或WorkflowEngine中添加一个branch_selection_strategy字段来配置。
- 当前
- 权限与安全:
- 在
ExecutionContext中加入用户权限信息,条件可以检查用户是否具有执行某个操作或进入某个分支的权限。 - 节点执行前可以进行权限校验。
- 在
VIII. 最佳实践与注意事项
- 模块化与分层: 将节点、条件、分支等定义分解到不同的模块或文件,保持代码清晰。
WorkflowEngine负责编排,不应包含具体的业务逻辑。 - 配置验证: 充分利用 Pydantic 的验证能力。对于复杂的自定义条件或动作类型,可以编写自定义验证器。
- 日志与可观测性: 在
WorkflowEngine和每个节点的execute方法中加入详细的日志记录,包括当前节点、上下文数据、决策路径等,便于调试和追踪。 - 错误处理: 规划好如何处理节点执行失败、条件评估异常、找不到下一个节点等情况。确保流程能够优雅地终止或进入错误恢复路径。
- 性能考量: 对于高并发或低延迟要求的场景,需要注意条件评估的复杂度、外部系统调用的性能以及
ExecutionContext的序列化/反序列化开销。可以考虑缓存节点查找、优化条件评估算法。 - 版本控制: 工作流定义(JSON/YAML)应该像代码一样进行版本控制,确保可追溯性。
- 用户界面: 考虑为工作流设计一个可视化编辑器,让非技术人员也能定义和修改流程。这需要将 Pydantic 模型映射到 UI 组件。
- 状态持久化: 对于长时间运行的工作流,需要将
ExecutionContext和当前current_node_id持久化到数据库或消息队列中,以便在系统重启或故障后恢复。
通过 Pydantic 的强大模型定义和数据验证能力,我们能够以声明式的方式构建高度灵活和可配置的动态节点分支逻辑。这种模式将业务规则与执行流解耦,极大地提升了系统的可维护性、扩展性和响应变化的能力,是构建复杂、数据驱动型应用的关键技术之一。