Symfony Workflow组件的高级用法:实现并发、分支与合并(Fork/Join)的状态流

Symfony Workflow 组件高级用法:实现并发、分支与合并(Fork/Join)的状态流

大家好,今天我们来深入探讨 Symfony Workflow 组件的高级用法,特别是如何利用它实现并发、分支与合并(Fork/Join)类型的状态流。这类状态流在现实业务场景中非常常见,例如订单处理、文档审批、任务分配等。掌握这些技巧,能让你更好地利用 Workflow 组件解决复杂的问题。

1. Workflow 组件基础回顾

在深入高级用法之前,我们先快速回顾一下 Workflow 组件的核心概念:

  • Subject (主题): 指的是状态发生变化的对象,例如 Order, Document, Task 等。
  • Workflow (工作流): 定义了 subject 的状态流转规则。
  • State (状态): subject 可能处于的不同状态,例如 draft, review, approved, rejected 等。
  • Transition (转换): 状态之间的转移,例如 submit, approve, reject 等。
  • Marking (标记): 用于存储 subject 当前所处的状态信息。
  • Guard (守卫): 在 transition 发生之前执行的条件检查,决定是否允许转换。
  • Place (位置/节点): 状态的具象化,可以理解为图中的一个节点。

2. 并发、分支与合并(Fork/Join)的概念

在传统的线性状态流中,subject 按照预定义的路径一步一步地从一个状态转移到另一个状态。但在某些情况下,我们需要更复杂的状态流:

  • 并发 (Concurrency/Fork): subject 同时进入多个状态,表示多个任务并行进行。例如,一个订单被提交后,需要同时进行库存检查和支付处理。
  • 分支 (Branching): 根据不同的条件,subject 进入不同的状态路径。例如,一个报销单根据报销金额的大小进入不同的审批流程。
  • 合并 (Join): 多个并发状态完成后,subject 才能进入下一个状态。例如,只有当库存检查和支付处理都完成后,订单才能进入发货状态。

3. 如何在 Symfony Workflow 中实现并发、分支与合并

Symfony Workflow 组件本身并没有直接提供 "Fork" 和 "Join" 的概念,但我们可以通过巧妙的配置和使用 supports() 方法来实现这些功能。

3.1 并发 (Fork)

实现并发的核心在于让一个 transition 能够将 subject 放置到多个 place 中。这可以通过配置多个 to 属性来实现。

示例:订单处理的并发流程

假设一个订单提交后,需要同时进行库存检查和支付处理。

  • 状态: draft, submitted, checking_inventory, processing_payment, ready_to_ship, shipped
  • 转换: submit, check_inventory_done, payment_processed, ship
# config/packages/workflow.yaml
framework:
    workflows:
        order_workflow:
            type: 'state_machine' # 使用 state_machine 类型更适合并发流程
            marking_store:
                type: 'multiple_state' # 必须使用 multiple_state,才能同时存在多个状态
            supports:
                - AppEntityOrder
            places:
                - draft
                - submitted
                - checking_inventory
                - processing_payment
                - ready_to_ship
                - shipped
            transitions:
                submit:
                    from: draft
                    to: [submitted, checking_inventory, processing_payment] # 同时进入三个状态
                check_inventory_done:
                    from: checking_inventory
                    to: submitted
                payment_processed:
                    from: processing_payment
                    to: submitted
                ship:
                    from: submitted
                    to: shipped

代码解释:

  • marking_store: { type: 'multiple_state' } 这行配置非常重要。默认情况下,marking_store 使用的是 single_state,这意味着 subject 只能处于一个状态。要实现并发,必须使用 multiple_state,这样 subject 才能同时处于多个状态。
  • submit transition 的 to 属性是一个数组 [submitted, checking_inventory, processing_payment]。这表示当执行 submit transition 时,order 将同时进入 submitted, checking_inventory, processing_payment 这三个状态。

使用示例:

use AppEntityOrder;
use SymfonyComponentWorkflowWorkflowInterface;

class OrderController extends AbstractController
{
    /**
     * @Route("/order/{id}/submit", name="order_submit")
     */
    public function submitOrder(Order $order, WorkflowInterface $orderWorkflow)
    {
        if ($orderWorkflow->can($order, 'submit')) {
            $orderWorkflow->apply($order, 'submit');
            $this->getDoctrine()->getManager()->flush();

            // 启动库存检查和支付处理的异步任务
            // ...

            return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
        }

        throw new Exception('Cannot submit order');
    }

    /**
     * @Route("/order/{id}/check_inventory_done", name="order_check_inventory_done")
     */
    public function checkInventoryDone(Order $order, WorkflowInterface $orderWorkflow)
    {
        if ($orderWorkflow->can($order, 'check_inventory_done')) {
            $orderWorkflow->apply($order, 'check_inventory_done');
            $this->getDoctrine()->getManager()->flush();
            return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
        }

        throw new Exception('Cannot complete inventory check');
    }

     /**
     * @Route("/order/{id}/payment_processed", name="order_payment_processed")
     */
    public function paymentProcessed(Order $order, WorkflowInterface $orderWorkflow)
    {
        if ($orderWorkflow->can($order, 'payment_processed')) {
            $orderWorkflow->apply($order, 'payment_processed');
            $this->getDoctrine()->getManager()->flush();
            return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
        }

        throw new Exception('Cannot complete payment processing');
    }

    /**
     * @Route("/order/{id}/ship", name="order_ship")
     */
    public function shipOrder(Order $order, WorkflowInterface $orderWorkflow)
    {
        if ($orderWorkflow->can($order, 'ship')) {
            $orderWorkflow->apply($order, 'ship');
            $this->getDoctrine()->getManager()->flush();
            return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
        }

        throw new Exception('Cannot ship order');
    }
}

在这个例子中, submit transition 将 order 同时放置到 checking_inventoryprocessing_payment 状态,这意味着库存检查和支付处理可以并行进行。当这两个任务完成后,分别触发 check_inventory_donepayment_processed transition,将 order 从对应的状态转移到 submitted状态。 注意,这里实际上submitted状态是作为一个汇聚点,用于后续的ship转换。

3.2 合并 (Join)

合并是指多个并发状态完成后,才能进入下一个状态。在上面的例子中,实际上submitted状态就起到了一个汇聚点的作用。只有当checking_inventoryprocessing_payment都转换到submitted状态后,才能进行ship转换。 但是,这种方式存在一个问题,就是ship转换的from状态是submitted,这意味着,只要完成了submit转换,就可以进行ship转换,而不用等待checking_inventoryprocessing_payment完成。

为了解决这个问题,我们可以使用 Guard 来确保所有并发状态都完成后才能执行 ship transition。

修改后的配置:

# config/packages/workflow.yaml
framework:
    workflows:
        order_workflow:
            type: 'state_machine'
            marking_store:
                type: 'multiple_state'
            supports:
                - AppEntityOrder
            places:
                - draft
                - submitted
                - checking_inventory
                - processing_payment
                - ready_to_ship
                - shipped
            transitions:
                submit:
                    from: draft
                    to: [checking_inventory, processing_payment]
                check_inventory_done:
                    from: checking_inventory
                    to: submitted
                payment_processed:
                    from: processing_payment
                    to: submitted
                ship:
                    from: submitted
                    to: shipped
            guards:
                ship: "subject.isInventoryChecked() and subject.isPaymentProcessed()"

代码解释:

  • 我们移除了 submit 转换到 submitted 状态的配置,现在 submit 转换直接将 order 放置到 checking_inventoryprocessing_payment 状态。
  • 增加了 guards 配置,为 ship transition 添加了一个守卫条件 "subject.isInventoryChecked() and subject.isPaymentProcessed()"

修改后的 Order Entity:

namespace AppEntity;

use DoctrineORMMapping as ORM;

/**
 * @ORMEntity(repositoryClass="AppRepositoryOrderRepository")
 */
class Order
{
    // ... 其他属性

    private $inventoryChecked = false;
    private $paymentProcessed = false;

    public function isInventoryChecked(): bool
    {
        return $this->inventoryChecked;
    }

    public function setInventoryChecked(bool $inventoryChecked): self
    {
        $this->inventoryChecked = $inventoryChecked;

        return $this;
    }

    public function isPaymentProcessed(): bool
    {
        return $this->paymentProcessed;
    }

    public function setPaymentProcessed(bool $paymentProcessed): self
    {
        $this->paymentProcessed = $paymentProcessed;

        return $this;
    }
}

修改后的Controller:

use AppEntityOrder;
use SymfonyComponentWorkflowWorkflowInterface;

class OrderController extends AbstractController
{
    /**
     * @Route("/order/{id}/submit", name="order_submit")
     */
    public function submitOrder(Order $order, WorkflowInterface $orderWorkflow)
    {
        if ($orderWorkflow->can($order, 'submit')) {
            $orderWorkflow->apply($order, 'submit');
            $this->getDoctrine()->getManager()->flush();

            // 启动库存检查和支付处理的异步任务
            // ...

            return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
        }

        throw new Exception('Cannot submit order');
    }

    /**
     * @Route("/order/{id}/check_inventory_done", name="order_check_inventory_done")
     */
    public function checkInventoryDone(Order $order, WorkflowInterface $orderWorkflow)
    {
        $order->setInventoryChecked(true);  // 设置库存检查完成的标志
        $this->getDoctrine()->getManager()->flush();  // 保存到数据库

        return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
    }

     /**
     * @Route("/order/{id}/payment_processed", name="order_payment_processed")
     */
    public function paymentProcessed(Order $order, WorkflowInterface $orderWorkflow)
    {
        $order->setPaymentProcessed(true); // 设置支付处理完成的标志
        $this->getDoctrine()->getManager()->flush();  // 保存到数据库

        return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
    }

    /**
     * @Route("/order/{id}/ship", name="order_ship")
     */
    public function shipOrder(Order $order, WorkflowInterface $orderWorkflow)
    {
        if ($orderWorkflow->can($order, 'ship')) {
            $orderWorkflow->apply($order, 'ship');
            $this->getDoctrine()->getManager()->flush();
            return $this->redirectToRoute('order_show', ['id' => $order->getId()]);
        }

        throw new Exception('Cannot ship order');
    }
}

代码解释:

  • Order entity 中添加了 $inventoryChecked$paymentProcessed 属性,用于记录库存检查和支付处理是否完成。
  • checkInventoryDonepaymentProcessed action 中,分别设置 $inventoryChecked$paymentProcessedtrue
  • ship transition 的守卫条件确保只有当 $inventoryChecked$paymentProcessed 都为 true 时,才能执行 ship transition。

这样就实现了真正的合并,只有当所有并发任务都完成后,才能进入下一个状态。 注意,这里需要将checkInventoryDonepaymentProcessed的处理逻辑从Workflow中移出,放到Controller中,因为这两个操作不再是状态的转换,而是设置Order的属性。

3.3 分支 (Branching)

分支是指根据不同的条件,subject 进入不同的状态路径。这可以通过使用多个 transition 和 Guard 来实现。

示例:报销单审批的分支流程

假设一个报销单根据报销金额的大小进入不同的审批流程:

  • 金额小于 1000 元,由部门经理审批。

  • 金额大于等于 1000 元,由财务经理审批。

  • 状态: draft, submitted, waiting_department_approval, waiting_finance_approval, approved, rejected

  • 转换: submit, approve_by_department, approve_by_finance, reject

# config/packages/workflow.yaml
framework:
    workflows:
        expense_report_workflow:
            type: 'state_machine'
            marking_store:
                type: 'single_state'
            supports:
                - AppEntityExpenseReport
            places:
                - draft
                - submitted
                - waiting_department_approval
                - waiting_finance_approval
                - approved
                - rejected
            transitions:
                submit_to_department:
                    from: draft
                    to: waiting_department_approval
                submit_to_finance:
                    from: draft
                    to: waiting_finance_approval
                approve_by_department:
                    from: waiting_department_approval
                    to: approved
                approve_by_finance:
                    from: waiting_finance_approval
                    to: approved
                reject_from_department:
                    from: waiting_department_approval
                    to: rejected
                reject_from_finance:
                    from: waiting_finance_approval
                    to: rejected
            guards:
                submit_to_department: "subject.getAmount() < 1000"
                submit_to_finance: "subject.getAmount() >= 1000"

代码解释:

  • 定义了两个 submit 转换:submit_to_departmentsubmit_to_finance,分别对应不同的审批流程。
  • 使用 Guard 来控制哪个 submit 转换可以执行。submit_to_department 的守卫条件是 subject.getAmount() < 1000submit_to_finance 的守卫条件是 subject.getAmount() >= 1000

使用示例:

use AppEntityExpenseReport;
use SymfonyComponentWorkflowWorkflowInterface;

class ExpenseReportController extends AbstractController
{
    /**
     * @Route("/expense_report/{id}/submit", name="expense_report_submit")
     */
    public function submitExpenseReport(ExpenseReport $expenseReport, WorkflowInterface $expenseReportWorkflow)
    {
        if ($expenseReportWorkflow->can($expenseReport, 'submit_to_department')) {
            $expenseReportWorkflow->apply($expenseReport, 'submit_to_department');
            $this->getDoctrine()->getManager()->flush();

            return $this->redirectToRoute('expense_report_show', ['id' => $expenseReport->getId()]);
        }

        if ($expenseReportWorkflow->can($expenseReport, 'submit_to_finance')) {
            $expenseReportWorkflow->apply($expenseReport, 'submit_to_finance');
            $this->getDoctrine()->getManager()->flush();

            return $this->redirectToRoute('expense_report_show', ['id' => $expenseReport->getId()]);
        }

        throw new Exception('Cannot submit expense report');
    }

    // ... 其他 action
}

代码解释:

  • submitExpenseReport action 中,首先检查 submit_to_department 是否可以执行,如果可以,则执行该转换。
  • 如果 submit_to_department 不可以执行,则检查 submit_to_finance 是否可以执行,如果可以,则执行该转换。
  • 如果两个转换都不能执行,则抛出一个异常。

通过这种方式,我们实现了根据报销金额的大小,将报销单路由到不同的审批流程。

4. 高级技巧:事件监听器

Symfony Workflow 组件提供了强大的事件机制,可以在 transition 发生之前、之后或期间执行自定义逻辑。这对于实现一些高级功能非常有用,例如:

  • 发送通知邮件
  • 记录审计日志
  • 更新数据库
  • 调用外部 API

示例:在订单发货后发送通知邮件

namespace AppEventListener;

use SymfonyComponentEventDispatcherEventSubscriberInterface;
use SymfonyComponentMailerMailerInterface;
use SymfonyComponentMimeEmail;
use SymfonyComponentWorkflowEventEvent;

class OrderWorkflowSubscriber implements EventSubscriberInterface
{
    private $mailer;

    public function __construct(MailerInterface $mailer)
    {
        $this->mailer = $mailer;
    }

    public static function getSubscribedEvents()
    {
        return [
            'workflow.order_workflow.completed.ship' => 'onOrderShipped', // 监听订单工作流中ship转换完成的事件
        ];
    }

    public function onOrderShipped(Event $event)
    {
        $order = $event->getSubject();

        $email = (new Email())
            ->from('[email protected]')
            ->to($order->getCustomerEmail())
            ->subject('Your order has been shipped!')
            ->text('Your order has been shipped and will arrive soon.');

        $this->mailer->send($email);
    }
}

代码解释:

  • OrderWorkflowSubscriber 类实现了 EventSubscriberInterface 接口,用于订阅事件。
  • getSubscribedEvents 方法返回一个数组,指定要订阅的事件和对应的处理方法。 'workflow.order_workflow.completed.ship' 表示订阅名为 order_workflow 的 workflow 中,ship transition 完成的事件。
  • onOrderShipped 方法是事件处理方法,当订单发货后,该方法会被调用。该方法从 event 中获取 order 对象,并发送通知邮件。

配置事件监听器:

# config/services.yaml
services:
    AppEventListenerOrderWorkflowSubscriber:
        arguments: ['@mailer.mailer']
        tags:
            - { name: 'kernel.event_subscriber' }

5. 实战案例:复杂任务分配系统

现在,我们来看一个更复杂的实战案例:一个任务分配系统,该系统需要实现以下功能:

  • 任务可以并行分配给多个用户。
  • 每个用户完成任务后需要进行审核。
  • 只有当所有用户都完成任务并审核通过后,任务才能进入完成状态。
  • 可以根据用户的角色分配不同的任务。

为了实现这个系统,我们可以使用 Symfony Workflow 组件,并结合事件监听器和 Guard。

状态: created, assigned, in_progress, waiting_for_review, completed
转换: assign, start, complete, review, approve

Workflow 配置:

# config/packages/workflow.yaml
framework:
    workflows:
        task_workflow:
            type: 'state_machine'
            marking_store:
                type: 'multiple_state'
            supports:
                - AppEntityTask
            places:
                - created
                - assigned
                - in_progress
                - waiting_for_review
                - completed
            transitions:
                assign:
                    from: created
                    to: assigned
                start:
                    from: assigned
                    to: in_progress
                complete:
                    from: in_progress
                    to: waiting_for_review
                review:
                    from: waiting_for_review
                    to: assigned # 回到 assigned 状态,可以重新分配任务
                approve:
                    from: waiting_for_review
                    to: completed
            guards:
                approve: "subject.isAllReviewsApproved()"

Task Entity:

namespace AppEntity;

use DoctrineCommonCollectionsArrayCollection;
use DoctrineCommonCollectionsCollection;
use DoctrineORMMapping as ORM;

/**
 * @ORMEntity(repositoryClass="AppRepositoryTaskRepository")
 */
class Task
{
    // ... 其他属性

    /**
     * @ORMOneToMany(targetEntity="AppEntityTaskAssignment", mappedBy="task", orphanRemoval=true, cascade={"persist"})
     */
    private $taskAssignments;

    public function __construct()
    {
        $this->taskAssignments = new ArrayCollection();
    }

    /**
     * @return Collection|TaskAssignment[]
     */
    public function getTaskAssignments(): Collection
    {
        return $this->taskAssignments;
    }

    public function addTaskAssignment(TaskAssignment $taskAssignment): self
    {
        if (!$this->taskAssignments->contains($taskAssignment)) {
            $this->taskAssignments[] = $taskAssignment;
            $taskAssignment->setTask($this);
        }

        return $this;
    }

    public function removeTaskAssignment(TaskAssignment $taskAssignment): self
    {
        if ($this->taskAssignments->contains($taskAssignment)) {
            $this->taskAssignments->removeElement($taskAssignment);
            // set the owning side to null (unless already changed)
            if ($taskAssignment->getTask() === $this) {
                $taskAssignment->setTask(null);
            }
        }

        return $this;
    }

    public function isAllReviewsApproved(): bool
    {
        foreach ($this->getTaskAssignments() as $assignment) {
            if (!$assignment->isApproved()) {
                return false;
            }
        }

        return true;
    }
}

TaskAssignment Entity:

namespace AppEntity;

use DoctrineORMMapping as ORM;

/**
 * @ORMEntity(repositoryClass="AppRepositoryTaskAssignmentRepository")
 */
class TaskAssignment
{
    /**
     * @ORMId()
     * @ORMGeneratedValue()
     * @ORMColumn(type="integer")
     */
    private $id;

    /**
     * @ORMManyToOne(targetEntity="AppEntityTask", inversedBy="taskAssignments")
     * @ORMJoinColumn(nullable=false)
     */
    private $task;

    /**
     * @ORMManyToOne(targetEntity="AppEntityUser")
     * @ORMJoinColumn(nullable=false)
     */
    private $user;

    /**
     * @ORMColumn(type="boolean", options={"default":false})
     */
    private $approved = false;

    public function getId(): ?int
    {
        return $this->id;
    }

    public function getTask(): ?Task
    {
        return $this->task;
    }

    public function setTask(Task $task): self
    {
        $this->task = $task;

        return $this;
    }

    public function getUser(): ?User
    {
        return $this->user;
    }

    public function setUser(User $user): self
    {
        $this->user = $user;

        return $this;
    }

    public function isApproved(): ?bool
    {
        return $this->approved;
    }

    public function setApproved(bool $approved): self
    {
        $this->approved = $approved;

        return $this;
    }
}

代码解释:

  • Task 实体有一个 taskAssignments 属性,用于存储任务分配信息。
  • TaskAssignment 实体表示一个任务分配给一个用户的信息,包括用户和是否审核通过。
  • Task 实体中定义了一个 isAllReviewsApproved 方法,用于检查所有任务分配是否都已审核通过。

Controller:

namespace AppController;

use AppEntityTask;
use AppEntityTaskAssignment;
use AppEntityUser;
use SymfonyBundleFrameworkBundleControllerAbstractController;
use SymfonyComponentRoutingAnnotationRoute;
use SymfonyComponentWorkflowWorkflowInterface;

class TaskController extends AbstractController
{
    /**
     * @Route("/task/{id}/assign/{userId}", name="task_assign")
     */
    public function assignTask(Task $task, User $user, WorkflowInterface $taskWorkflow)
    {
        if ($taskWorkflow->can($task, 'assign')) {
            $taskAssignment = new TaskAssignment();
            $taskAssignment->setTask($task);
            $taskAssignment->setUser($user);

            $task->addTaskAssignment($taskAssignment);

            $taskWorkflow->apply($task, 'assign');
            $this->getDoctrine()->getManager()->persist($taskAssignment);
            $this->getDoctrine()->getManager()->flush();

            return $this->redirectToRoute('task_show', ['id' => $task->getId()]);
        }

        throw new Exception('Cannot assign task');
    }

    /**
     * @Route("/task_assignment/{id}/approve", name="task_assignment_approve")
     */
    public function approveTaskAssignment(TaskAssignment $taskAssignment, WorkflowInterface $taskWorkflow)
    {
        $task = $taskAssignment->getTask();

        $taskAssignment->setApproved(true);

        $this->getDoctrine()->getManager()->flush();

        // 尝试执行 approve 转换
        if ($taskWorkflow->can($task, 'approve')) {
            $taskWorkflow->apply($task, 'approve');
            $this->getDoctrine()->getManager()->flush();
        }

        return $this->redirectToRoute('task_show', ['id' => $task->getId()]);
    }

    // ... 其他 action
}

代码解释:

  • assignTask action 用于将任务分配给用户,创建 TaskAssignment 对象,并添加到 Task 实体中。
  • approveTaskAssignment action 用于审核任务分配,设置 TaskAssignmentapproved 属性为 true。 该 action 尝试执行 approve 转换,只有当所有任务分配都已审核通过时,才能成功执行 approve 转换。

通过这个案例,我们展示了如何使用 Symfony Workflow 组件实现一个复杂的任务分配系统,该系统支持并发分配、用户审核和条件完成。

总结

Symfony Workflow 组件是一个非常强大的工具,可以用于管理对象的状态流。通过巧妙的配置和使用 Guard、事件监听器,我们可以利用它实现各种复杂的状态流,包括并发、分支与合并。希望通过今天的讲解,大家能够更好地掌握 Workflow 组件的高级用法,并将其应用到实际项目中。

一些关键点

  • multiple_state marking store 是实现并发的关键。
  • Guard 可以用于控制 transition 的执行条件,实现合并和分支。
  • 事件监听器可以用于在 transition 发生时执行自定义逻辑。

最后,祝大家编程愉快!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注