Symfony Workflow 组件高级用法:实现并发、分支与合并(Fork/Join)的状态流
大家好,今天我们来深入探讨 Symfony Workflow 组件的高级用法,特别是如何利用它实现并发、分支与合并(Fork/Join)类型的状态流。这类状态流在现实业务场景中非常常见,例如订单处理、文档审批、任务分配等。掌握这些技巧,能让你更好地利用 Workflow 组件解决复杂的问题。
1. Workflow 组件基础回顾
在深入高级用法之前,我们先快速回顾一下 Workflow 组件的核心概念:
- Subject (主题): 指的是状态发生变化的对象,例如
Order,Document,Task等。 - Workflow (工作流): 定义了 subject 的状态流转规则。
- State (状态): subject 可能处于的不同状态,例如
draft,review,approved,rejected等。 - Transition (转换): 状态之间的转移,例如
submit,approve,reject等。 - Marking (标记): 用于存储 subject 当前所处的状态信息。
- Guard (守卫): 在 transition 发生之前执行的条件检查,决定是否允许转换。
- Place (位置/节点): 状态的具象化,可以理解为图中的一个节点。
2. 并发、分支与合并(Fork/Join)的概念
在传统的线性状态流中,subject 按照预定义的路径一步一步地从一个状态转移到另一个状态。但在某些情况下,我们需要更复杂的状态流:
- 并发 (Concurrency/Fork): subject 同时进入多个状态,表示多个任务并行进行。例如,一个订单被提交后,需要同时进行库存检查和支付处理。
- 分支 (Branching): 根据不同的条件,subject 进入不同的状态路径。例如,一个报销单根据报销金额的大小进入不同的审批流程。
- 合并 (Join): 多个并发状态完成后,subject 才能进入下一个状态。例如,只有当库存检查和支付处理都完成后,订单才能进入发货状态。
3. 如何在 Symfony Workflow 中实现并发、分支与合并
Symfony Workflow 组件本身并没有直接提供 "Fork" 和 "Join" 的概念,但我们可以通过巧妙的配置和使用 supports() 方法来实现这些功能。
3.1 并发 (Fork)
实现并发的核心在于让一个 transition 能够将 subject 放置到多个 place 中。这可以通过配置多个 to 属性来实现。
示例:订单处理的并发流程
假设一个订单提交后,需要同时进行库存检查和支付处理。
- 状态:
draft,submitted,checking_inventory,processing_payment,ready_to_ship,shipped - 转换:
submit,check_inventory_done,payment_processed,ship
# config/packages/workflow.yaml
framework:
workflows:
order_workflow:
type: 'state_machine' # 使用 state_machine 类型更适合并发流程
marking_store:
type: 'multiple_state' # 必须使用 multiple_state,才能同时存在多个状态
supports:
- AppEntityOrder
places:
- draft
- submitted
- checking_inventory
- processing_payment
- ready_to_ship
- shipped
transitions:
submit:
from: draft
to: [submitted, checking_inventory, processing_payment] # 同时进入三个状态
check_inventory_done:
from: checking_inventory
to: submitted
payment_processed:
from: processing_payment
to: submitted
ship:
from: submitted
to: shipped
代码解释:
marking_store: { type: 'multiple_state' }这行配置非常重要。默认情况下,marking_store使用的是single_state,这意味着 subject 只能处于一个状态。要实现并发,必须使用multiple_state,这样 subject 才能同时处于多个状态。submittransition 的to属性是一个数组[submitted, checking_inventory, processing_payment]。这表示当执行submittransition 时,order 将同时进入submitted,checking_inventory,processing_payment这三个状态。
使用示例:
use AppEntityOrder;
use SymfonyComponentWorkflowWorkflowInterface;
class OrderController extends AbstractController
{
/**
* @Route("/order/{id}/submit", name="order_submit")
*/
public function submitOrder(Order $order, WorkflowInterface $orderWorkflow)
{
if ($orderWorkflow->can($order, 'submit')) {
$orderWorkflow->apply($order, 'submit');
$this->getDoctrine()->getManager()->flush();
// 启动库存检查和支付处理的异步任务
// ...
return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
}
throw new Exception('Cannot submit order');
}
/**
* @Route("/order/{id}/check_inventory_done", name="order_check_inventory_done")
*/
public function checkInventoryDone(Order $order, WorkflowInterface $orderWorkflow)
{
if ($orderWorkflow->can($order, 'check_inventory_done')) {
$orderWorkflow->apply($order, 'check_inventory_done');
$this->getDoctrine()->getManager()->flush();
return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
}
throw new Exception('Cannot complete inventory check');
}
/**
* @Route("/order/{id}/payment_processed", name="order_payment_processed")
*/
public function paymentProcessed(Order $order, WorkflowInterface $orderWorkflow)
{
if ($orderWorkflow->can($order, 'payment_processed')) {
$orderWorkflow->apply($order, 'payment_processed');
$this->getDoctrine()->getManager()->flush();
return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
}
throw new Exception('Cannot complete payment processing');
}
/**
* @Route("/order/{id}/ship", name="order_ship")
*/
public function shipOrder(Order $order, WorkflowInterface $orderWorkflow)
{
if ($orderWorkflow->can($order, 'ship')) {
$orderWorkflow->apply($order, 'ship');
$this->getDoctrine()->getManager()->flush();
return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
}
throw new Exception('Cannot ship order');
}
}
在这个例子中, submit transition 将 order 同时放置到 checking_inventory 和 processing_payment 状态,这意味着库存检查和支付处理可以并行进行。当这两个任务完成后,分别触发 check_inventory_done 和 payment_processed transition,将 order 从对应的状态转移到 submitted状态。 注意,这里实际上submitted状态是作为一个汇聚点,用于后续的ship转换。
3.2 合并 (Join)
合并是指多个并发状态完成后,才能进入下一个状态。在上面的例子中,实际上submitted状态就起到了一个汇聚点的作用。只有当checking_inventory和processing_payment都转换到submitted状态后,才能进行ship转换。 但是,这种方式存在一个问题,就是ship转换的from状态是submitted,这意味着,只要完成了submit转换,就可以进行ship转换,而不用等待checking_inventory和processing_payment完成。
为了解决这个问题,我们可以使用 Guard 来确保所有并发状态都完成后才能执行 ship transition。
修改后的配置:
# config/packages/workflow.yaml
framework:
workflows:
order_workflow:
type: 'state_machine'
marking_store:
type: 'multiple_state'
supports:
- AppEntityOrder
places:
- draft
- submitted
- checking_inventory
- processing_payment
- ready_to_ship
- shipped
transitions:
submit:
from: draft
to: [checking_inventory, processing_payment]
check_inventory_done:
from: checking_inventory
to: submitted
payment_processed:
from: processing_payment
to: submitted
ship:
from: submitted
to: shipped
guards:
ship: "subject.isInventoryChecked() and subject.isPaymentProcessed()"
代码解释:
- 我们移除了
submit转换到submitted状态的配置,现在submit转换直接将 order 放置到checking_inventory和processing_payment状态。 - 增加了
guards配置,为shiptransition 添加了一个守卫条件"subject.isInventoryChecked() and subject.isPaymentProcessed()"。
修改后的 Order Entity:
namespace AppEntity;
use DoctrineORMMapping as ORM;
/**
* @ORMEntity(repositoryClass="AppRepositoryOrderRepository")
*/
class Order
{
// ... 其他属性
private $inventoryChecked = false;
private $paymentProcessed = false;
public function isInventoryChecked(): bool
{
return $this->inventoryChecked;
}
public function setInventoryChecked(bool $inventoryChecked): self
{
$this->inventoryChecked = $inventoryChecked;
return $this;
}
public function isPaymentProcessed(): bool
{
return $this->paymentProcessed;
}
public function setPaymentProcessed(bool $paymentProcessed): self
{
$this->paymentProcessed = $paymentProcessed;
return $this;
}
}
修改后的Controller:
use AppEntityOrder;
use SymfonyComponentWorkflowWorkflowInterface;
class OrderController extends AbstractController
{
/**
* @Route("/order/{id}/submit", name="order_submit")
*/
public function submitOrder(Order $order, WorkflowInterface $orderWorkflow)
{
if ($orderWorkflow->can($order, 'submit')) {
$orderWorkflow->apply($order, 'submit');
$this->getDoctrine()->getManager()->flush();
// 启动库存检查和支付处理的异步任务
// ...
return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
}
throw new Exception('Cannot submit order');
}
/**
* @Route("/order/{id}/check_inventory_done", name="order_check_inventory_done")
*/
public function checkInventoryDone(Order $order, WorkflowInterface $orderWorkflow)
{
$order->setInventoryChecked(true); // 设置库存检查完成的标志
$this->getDoctrine()->getManager()->flush(); // 保存到数据库
return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
}
/**
* @Route("/order/{id}/payment_processed", name="order_payment_processed")
*/
public function paymentProcessed(Order $order, WorkflowInterface $orderWorkflow)
{
$order->setPaymentProcessed(true); // 设置支付处理完成的标志
$this->getDoctrine()->getManager()->flush(); // 保存到数据库
return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
}
/**
* @Route("/order/{id}/ship", name="order_ship")
*/
public function shipOrder(Order $order, WorkflowInterface $orderWorkflow)
{
if ($orderWorkflow->can($order, 'ship')) {
$orderWorkflow->apply($order, 'ship');
$this->getDoctrine()->getManager()->flush();
return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
}
throw new Exception('Cannot ship order');
}
}
代码解释:
Orderentity 中添加了$inventoryChecked和$paymentProcessed属性,用于记录库存检查和支付处理是否完成。- 在
checkInventoryDone和paymentProcessedaction 中,分别设置$inventoryChecked和$paymentProcessed为true。 shiptransition 的守卫条件确保只有当$inventoryChecked和$paymentProcessed都为true时,才能执行shiptransition。
这样就实现了真正的合并,只有当所有并发任务都完成后,才能进入下一个状态。 注意,这里需要将checkInventoryDone和paymentProcessed的处理逻辑从Workflow中移出,放到Controller中,因为这两个操作不再是状态的转换,而是设置Order的属性。
3.3 分支 (Branching)
分支是指根据不同的条件,subject 进入不同的状态路径。这可以通过使用多个 transition 和 Guard 来实现。
示例:报销单审批的分支流程
假设一个报销单根据报销金额的大小进入不同的审批流程:
-
金额小于 1000 元,由部门经理审批。
-
金额大于等于 1000 元,由财务经理审批。
-
状态:
draft,submitted,waiting_department_approval,waiting_finance_approval,approved,rejected -
转换:
submit,approve_by_department,approve_by_finance,reject
# config/packages/workflow.yaml
framework:
workflows:
expense_report_workflow:
type: 'state_machine'
marking_store:
type: 'single_state'
supports:
- AppEntityExpenseReport
places:
- draft
- submitted
- waiting_department_approval
- waiting_finance_approval
- approved
- rejected
transitions:
submit_to_department:
from: draft
to: waiting_department_approval
submit_to_finance:
from: draft
to: waiting_finance_approval
approve_by_department:
from: waiting_department_approval
to: approved
approve_by_finance:
from: waiting_finance_approval
to: approved
reject_from_department:
from: waiting_department_approval
to: rejected
reject_from_finance:
from: waiting_finance_approval
to: rejected
guards:
submit_to_department: "subject.getAmount() < 1000"
submit_to_finance: "subject.getAmount() >= 1000"
代码解释:
- 定义了两个
submit转换:submit_to_department和submit_to_finance,分别对应不同的审批流程。 - 使用 Guard 来控制哪个
submit转换可以执行。submit_to_department的守卫条件是subject.getAmount() < 1000,submit_to_finance的守卫条件是subject.getAmount() >= 1000。
使用示例:
use AppEntityExpenseReport;
use SymfonyComponentWorkflowWorkflowInterface;
class ExpenseReportController extends AbstractController
{
/**
* @Route("/expense_report/{id}/submit", name="expense_report_submit")
*/
public function submitExpenseReport(ExpenseReport $expenseReport, WorkflowInterface $expenseReportWorkflow)
{
if ($expenseReportWorkflow->can($expenseReport, 'submit_to_department')) {
$expenseReportWorkflow->apply($expenseReport, 'submit_to_department');
$this->getDoctrine()->getManager()->flush();
return $this->redirectToRoute('expense_report_show', ['id' => $expenseReport->getId()]);
}
if ($expenseReportWorkflow->can($expenseReport, 'submit_to_finance')) {
$expenseReportWorkflow->apply($expenseReport, 'submit_to_finance');
$this->getDoctrine()->getManager()->flush();
return $this->redirectToRoute('expense_report_show', ['id' => $expenseReport->getId()]);
}
throw new Exception('Cannot submit expense report');
}
// ... 其他 action
}
代码解释:
- 在
submitExpenseReportaction 中,首先检查submit_to_department是否可以执行,如果可以,则执行该转换。 - 如果
submit_to_department不可以执行,则检查submit_to_finance是否可以执行,如果可以,则执行该转换。 - 如果两个转换都不能执行,则抛出一个异常。
通过这种方式,我们实现了根据报销金额的大小,将报销单路由到不同的审批流程。
4. 高级技巧:事件监听器
Symfony Workflow 组件提供了强大的事件机制,可以在 transition 发生之前、之后或期间执行自定义逻辑。这对于实现一些高级功能非常有用,例如:
- 发送通知邮件
- 记录审计日志
- 更新数据库
- 调用外部 API
示例:在订单发货后发送通知邮件
namespace AppEventListener;
use SymfonyComponentEventDispatcherEventSubscriberInterface;
use SymfonyComponentMailerMailerInterface;
use SymfonyComponentMimeEmail;
use SymfonyComponentWorkflowEventEvent;
class OrderWorkflowSubscriber implements EventSubscriberInterface
{
private $mailer;
public function __construct(MailerInterface $mailer)
{
$this->mailer = $mailer;
}
public static function getSubscribedEvents()
{
return [
'workflow.order_workflow.completed.ship' => 'onOrderShipped', // 监听订单工作流中ship转换完成的事件
];
}
public function onOrderShipped(Event $event)
{
$order = $event->getSubject();
$email = (new Email())
->from('[email protected]')
->to($order->getCustomerEmail())
->subject('Your order has been shipped!')
->text('Your order has been shipped and will arrive soon.');
$this->mailer->send($email);
}
}
代码解释:
OrderWorkflowSubscriber类实现了EventSubscriberInterface接口,用于订阅事件。getSubscribedEvents方法返回一个数组,指定要订阅的事件和对应的处理方法。'workflow.order_workflow.completed.ship'表示订阅名为order_workflow的 workflow 中,shiptransition 完成的事件。onOrderShipped方法是事件处理方法,当订单发货后,该方法会被调用。该方法从 event 中获取 order 对象,并发送通知邮件。
配置事件监听器:
# config/services.yaml
services:
AppEventListenerOrderWorkflowSubscriber:
arguments: ['@mailer.mailer']
tags:
- { name: 'kernel.event_subscriber' }
5. 实战案例:复杂任务分配系统
现在,我们来看一个更复杂的实战案例:一个任务分配系统,该系统需要实现以下功能:
- 任务可以并行分配给多个用户。
- 每个用户完成任务后需要进行审核。
- 只有当所有用户都完成任务并审核通过后,任务才能进入完成状态。
- 可以根据用户的角色分配不同的任务。
为了实现这个系统,我们可以使用 Symfony Workflow 组件,并结合事件监听器和 Guard。
状态: created, assigned, in_progress, waiting_for_review, completed
转换: assign, start, complete, review, approve
Workflow 配置:
# config/packages/workflow.yaml
framework:
workflows:
task_workflow:
type: 'state_machine'
marking_store:
type: 'multiple_state'
supports:
- AppEntityTask
places:
- created
- assigned
- in_progress
- waiting_for_review
- completed
transitions:
assign:
from: created
to: assigned
start:
from: assigned
to: in_progress
complete:
from: in_progress
to: waiting_for_review
review:
from: waiting_for_review
to: assigned # 回到 assigned 状态,可以重新分配任务
approve:
from: waiting_for_review
to: completed
guards:
approve: "subject.isAllReviewsApproved()"
Task Entity:
namespace AppEntity;
use DoctrineCommonCollectionsArrayCollection;
use DoctrineCommonCollectionsCollection;
use DoctrineORMMapping as ORM;
/**
* @ORMEntity(repositoryClass="AppRepositoryTaskRepository")
*/
class Task
{
// ... 其他属性
/**
* @ORMOneToMany(targetEntity="AppEntityTaskAssignment", mappedBy="task", orphanRemoval=true, cascade={"persist"})
*/
private $taskAssignments;
public function __construct()
{
$this->taskAssignments = new ArrayCollection();
}
/**
* @return Collection|TaskAssignment[]
*/
public function getTaskAssignments(): Collection
{
return $this->taskAssignments;
}
public function addTaskAssignment(TaskAssignment $taskAssignment): self
{
if (!$this->taskAssignments->contains($taskAssignment)) {
$this->taskAssignments[] = $taskAssignment;
$taskAssignment->setTask($this);
}
return $this;
}
public function removeTaskAssignment(TaskAssignment $taskAssignment): self
{
if ($this->taskAssignments->contains($taskAssignment)) {
$this->taskAssignments->removeElement($taskAssignment);
// set the owning side to null (unless already changed)
if ($taskAssignment->getTask() === $this) {
$taskAssignment->setTask(null);
}
}
return $this;
}
public function isAllReviewsApproved(): bool
{
foreach ($this->getTaskAssignments() as $assignment) {
if (!$assignment->isApproved()) {
return false;
}
}
return true;
}
}
TaskAssignment Entity:
namespace AppEntity;
use DoctrineORMMapping as ORM;
/**
* @ORMEntity(repositoryClass="AppRepositoryTaskAssignmentRepository")
*/
class TaskAssignment
{
/**
* @ORMId()
* @ORMGeneratedValue()
* @ORMColumn(type="integer")
*/
private $id;
/**
* @ORMManyToOne(targetEntity="AppEntityTask", inversedBy="taskAssignments")
* @ORMJoinColumn(nullable=false)
*/
private $task;
/**
* @ORMManyToOne(targetEntity="AppEntityUser")
* @ORMJoinColumn(nullable=false)
*/
private $user;
/**
* @ORMColumn(type="boolean", options={"default":false})
*/
private $approved = false;
public function getId(): ?int
{
return $this->id;
}
public function getTask(): ?Task
{
return $this->task;
}
public function setTask(Task $task): self
{
$this->task = $task;
return $this;
}
public function getUser(): ?User
{
return $this->user;
}
public function setUser(User $user): self
{
$this->user = $user;
return $this;
}
public function isApproved(): ?bool
{
return $this->approved;
}
public function setApproved(bool $approved): self
{
$this->approved = $approved;
return $this;
}
}
代码解释:
Task实体有一个taskAssignments属性,用于存储任务分配信息。TaskAssignment实体表示一个任务分配给一个用户的信息,包括用户和是否审核通过。Task实体中定义了一个isAllReviewsApproved方法,用于检查所有任务分配是否都已审核通过。
Controller:
namespace AppController;
use AppEntityTask;
use AppEntityTaskAssignment;
use AppEntityUser;
use SymfonyBundleFrameworkBundleControllerAbstractController;
use SymfonyComponentRoutingAnnotationRoute;
use SymfonyComponentWorkflowWorkflowInterface;
class TaskController extends AbstractController
{
/**
* @Route("/task/{id}/assign/{userId}", name="task_assign")
*/
public function assignTask(Task $task, User $user, WorkflowInterface $taskWorkflow)
{
if ($taskWorkflow->can($task, 'assign')) {
$taskAssignment = new TaskAssignment();
$taskAssignment->setTask($task);
$taskAssignment->setUser($user);
$task->addTaskAssignment($taskAssignment);
$taskWorkflow->apply($task, 'assign');
$this->getDoctrine()->getManager()->persist($taskAssignment);
$this->getDoctrine()->getManager()->flush();
return $this->redirectToRoute('task_show', ['id' => $task->getId()]);
}
throw new Exception('Cannot assign task');
}
/**
* @Route("/task_assignment/{id}/approve", name="task_assignment_approve")
*/
public function approveTaskAssignment(TaskAssignment $taskAssignment, WorkflowInterface $taskWorkflow)
{
$task = $taskAssignment->getTask();
$taskAssignment->setApproved(true);
$this->getDoctrine()->getManager()->flush();
// 尝试执行 approve 转换
if ($taskWorkflow->can($task, 'approve')) {
$taskWorkflow->apply($task, 'approve');
$this->getDoctrine()->getManager()->flush();
}
return $this->redirectToRoute('task_show', ['id' => $task->getId()]);
}
// ... 其他 action
}
代码解释:
assignTaskaction 用于将任务分配给用户,创建TaskAssignment对象,并添加到Task实体中。approveTaskAssignmentaction 用于审核任务分配,设置TaskAssignment的approved属性为true。 该 action 尝试执行approve转换,只有当所有任务分配都已审核通过时,才能成功执行approve转换。
通过这个案例,我们展示了如何使用 Symfony Workflow 组件实现一个复杂的任务分配系统,该系统支持并发分配、用户审核和条件完成。
总结
Symfony Workflow 组件是一个非常强大的工具,可以用于管理对象的状态流。通过巧妙的配置和使用 Guard、事件监听器,我们可以利用它实现各种复杂的状态流,包括并发、分支与合并。希望通过今天的讲解,大家能够更好地掌握 Workflow 组件的高级用法,并将其应用到实际项目中。
一些关键点
multiple_statemarking store 是实现并发的关键。- Guard 可以用于控制 transition 的执行条件,实现合并和分支。
- 事件监听器可以用于在 transition 发生时执行自定义逻辑。
最后,祝大家编程愉快!