PHP `Saga Pattern`:分布式事务的补偿机制与协调器

各位听众,晚上好!我是老码农,今天咱们聊聊PHP里的“Saga Pattern”,听起来是不是像个古老的传说?其实它是一种解决分布式事务的方案,尤其是当咱们在微服务架构里折腾的时候,会经常用到。别害怕,这玩意儿没那么玄乎,听我慢慢道来。

什么是分布式事务?

首先,得明确一下什么是分布式事务。 想象一下,你要完成一个在线购物流程:

  1. 扣减用户账户余额
  2. 减少商品库存
  3. 创建订单

这三个操作如果都在同一个数据库里,那简单,一个事务搞定。 但如果它们分别在三个不同的服务里(账户服务、库存服务、订单服务),这就成了分布式事务。 传统的ACID事务就没那么好使了,因为它们针对的是单数据库环境。

为什么需要Saga?

传统的两阶段提交(2PC)或者XA协议在微服务架构下常常水土不服。 性能差不说,还可能引入单点故障,把整个系统拖垮。 而Saga模式的出现,就是为了解决这些问题。

Saga Pattern的核心思想

Saga的核心思想是:将一个大的分布式事务拆分成一系列本地事务(每个服务负责一部分),然后通过补偿操作(也叫“回滚”操作)来保证最终一致性。 也就是说,如果其中一个本地事务失败了,就执行一系列的补偿操作,撤销之前已经成功的事务,最终达到“要么都成功,要么都失败”的效果。

Saga的两种主要实现方式

Saga模式主要有两种实现方式:

  1. 编排式Saga (Orchestration-based Saga):有一个中心化的“协调器”(Orchestrator)来驱动整个Saga流程。
  2. 协作式Saga (Choreography-based Saga):没有中心协调器,每个服务监听其他服务的事件,然后根据事件触发相应的操作。

咱们先从编排式Saga开始说起。

编排式Saga (Orchestration-based Saga)

编排式Saga就像一个乐队指挥,它负责指挥各个乐器(服务)演奏,如果某个乐器跑调了,指挥就要想办法让它回到正轨。

举个例子:在线购物的编排式Saga

咱们还是用之前的在线购物流程:

  1. 协调器 (Orchestrator):创建一个新的Saga实例,并开始执行第一个步骤:扣减用户账户余额。
  2. 账户服务 (Account Service):接收到扣款请求,执行扣款操作。如果成功,发送“账户余额已扣减”事件给协调器;如果失败,发送“账户余额扣减失败”事件给协调器。
  3. 协调器
    • 如果收到“账户余额已扣减”事件,开始执行第二个步骤:减少商品库存。
    • 如果收到“账户余额扣减失败”事件,Saga流程结束,进入补偿流程(理论上没有需要补偿的,因为第一步就失败了)。
  4. 库存服务 (Inventory Service):接收到减少库存请求,执行减少库存操作。如果成功,发送“商品库存已减少”事件给协调器;如果失败,发送“商品库存减少失败”事件给协调器。
  5. 协调器
    • 如果收到“商品库存已减少”事件,开始执行第三个步骤:创建订单。
    • 如果收到“商品库存减少失败”事件,开始执行补偿流程,先调用账户服务的“退款”接口,然后再结束Saga流程。
  6. 订单服务 (Order Service):接收到创建订单请求,执行创建订单操作。如果成功,发送“订单已创建”事件给协调器;如果失败,发送“订单创建失败”事件给协调器。
  7. 协调器
    • 如果收到“订单已创建”事件,Saga流程成功结束。
    • 如果收到“订单创建失败”事件,开始执行补偿流程,先调用库存服务的“增加库存”接口,再调用账户服务的“退款”接口,然后结束Saga流程。

PHP代码示例 (编排式Saga)

为了方便理解,咱们用PHP代码来模拟一下这个过程。这里只是一个简化版的示例,实际生产环境中需要考虑更多细节,比如状态持久化、幂等性、重试机制等等。

<?php

// 协调器 (Orchestrator)
class OrderSagaOrchestrator
{
    private $accountId;
    private $productId;
    private $quantity;
    private $orderId;

    public function __construct($accountId, $productId, $quantity)
    {
        $this->accountId = $accountId;
        $this->productId = $productId;
        $this->quantity = $quantity;
    }

    public function run()
    {
        try {
            // 1. 扣减账户余额
            echo "扣减账户余额...n";
            $accountService = new AccountService();
            $accountService->debit($this->accountId, $this->quantity);
            echo "账户余额扣减成功n";

            // 2. 减少商品库存
            echo "减少商品库存...n";
            $inventoryService = new InventoryService();
            $inventoryService->decreaseStock($this->productId, $this->quantity);
            echo "商品库存减少成功n";

            // 3. 创建订单
            echo "创建订单...n";
            $orderService = new OrderService();
            $this->orderId = $orderService->createOrder($this->accountId, $this->productId, $this->quantity);
            echo "订单创建成功,订单ID: " . $this->orderId . "n";

            echo "Saga流程成功结束n";
            return true;

        } catch (Exception $e) {
            echo "Saga流程失败,开始补偿...n";
            $this->compensate($e);
            return false;
        }
    }

    private function compensate(Exception $e)
    {
        echo "错误信息: " . $e->getMessage() . "n";

        // 补偿操作:根据失败步骤,逆向执行操作
        if (isset($this->orderId)) {
            echo "删除订单...n";
            $orderService = new OrderService();
            $orderService->cancelOrder($this->orderId);
        }

        if (isset($this->productId) && isset($this->quantity)) {
            echo "增加商品库存...n";
            $inventoryService = new InventoryService();
            $inventoryService->increaseStock($this->productId, $this->quantity);
        }

        if (isset($this->accountId) && isset($this->quantity)) {
            echo "退款...n";
            $accountService = new AccountService();
            $accountService->credit($this->accountId, $this->quantity);
        }

        echo "补偿流程结束n";
    }
}

// 账户服务 (Account Service)
class AccountService
{
    public function debit($accountId, $amount)
    {
        // 模拟扣减账户余额
        echo "账户服务:扣减账户 $accountId 余额 $amount...n";
        // 这里可以实际调用账户服务的API
        if (rand(0, 10) < 3) { // 模拟30%的失败率
            throw new Exception("账户余额不足");
        }
    }

    public function credit($accountId, $amount)
    {
        // 模拟退款
        echo "账户服务:退款 $accountId $amount...n";
        // 这里可以实际调用账户服务的API
    }
}

// 库存服务 (Inventory Service)
class InventoryService
{
    public function decreaseStock($productId, $quantity)
    {
        // 模拟减少商品库存
        echo "库存服务:减少商品 $productId 库存 $quantity...n";
        // 这里可以实际调用库存服务的API
        if (rand(0, 10) < 3) { // 模拟30%的失败率
            throw new Exception("库存不足");
        }
    }

    public function increaseStock($productId, $quantity)
    {
        // 模拟增加商品库存
        echo "库存服务:增加商品 $productId 库存 $quantity...n";
        // 这里可以实际调用库存服务的API
    }
}

// 订单服务 (Order Service)
class OrderService
{
    public function createOrder($accountId, $productId, $quantity)
    {
        // 模拟创建订单
        echo "订单服务:创建订单 (账户: $accountId, 商品: $productId, 数量: $quantity)...n";
        // 这里可以实际调用订单服务的API
        if (rand(0, 10) < 3) { // 模拟30%的失败率
            throw new Exception("创建订单失败");
        }
        return uniqid(); // 返回一个假的订单ID
    }

    public function cancelOrder($orderId)
    {
        // 模拟取消订单
        echo "订单服务:取消订单 $orderId...n";
        // 这里可以实际调用订单服务的API
    }
}

// 使用示例
$orchestrator = new OrderSagaOrchestrator("user123", "product456", 2);
$orchestrator->run();

?>

这个代码只是一个简单的演示,实际应用中你需要:

  • 状态持久化:将Saga的状态(当前步骤、已执行的步骤、相关数据)持久化到数据库或者其他存储介质中,以便在服务重启或者发生故障时能够恢复Saga流程。
  • 幂等性:保证每个本地事务和补偿操作都是幂等的,也就是说,执行多次的结果和执行一次的结果相同。 这可以通过在请求中加入唯一ID,或者在服务内部进行状态检查来实现。
  • 重试机制:如果某个本地事务或者补偿操作失败了,可以尝试重试几次。
  • 事件驱动:使用消息队列(比如RabbitMQ、Kafka)来实现服务之间的异步通信,提高系统的可用性和扩展性。

协作式Saga (Choreography-based Saga)

协作式Saga就像一个舞蹈团,每个舞者(服务)根据其他舞者的动作(事件)来调整自己的舞步,不需要一个统一的指挥。

举个例子:在线购物的协作式Saga

  1. 订单服务 (Order Service):接收到创建订单请求,创建一个“待支付”状态的订单,并发布一个“订单已创建”事件。
  2. 账户服务 (Account Service):监听“订单已创建”事件,接收到事件后,尝试扣减用户账户余额。如果成功,发布一个“账户余额已扣减”事件;如果失败,发布一个“账户余额扣减失败”事件。
  3. 库存服务 (Inventory Service):监听“订单已创建”事件,接收到事件后,尝试减少商品库存。如果成功,发布一个“商品库存已减少”事件;如果失败,发布一个“商品库存减少失败”事件。
  4. 订单服务
    • 如果收到“账户余额已扣减”和“商品库存已减少”事件,将订单状态更新为“已支付”,并发布一个“订单已支付”事件。
    • 如果收到“账户余额扣减失败”或者“商品库存减少失败”事件,将订单状态更新为“已取消”,并发布一个“订单已取消”事件。
  5. 其他服务:可以监听“订单已支付”或者“订单已取消”事件,执行后续操作,比如发送短信通知、更新用户积分等等。

PHP代码示例 (协作式Saga)

<?php

// 订单服务 (Order Service)
class OrderService
{
    public function createOrder($accountId, $productId, $quantity)
    {
        // 模拟创建订单
        echo "订单服务:创建订单 (账户: $accountId, 商品: $productId, 数量: $quantity)...n";
        $orderId = uniqid();
        echo "订单服务:订单创建成功,订单ID: " . $orderId . "n";

        // 发布“订单已创建”事件
        $this->publishEvent("order.created", [
            "order_id" => $orderId,
            "account_id" => $accountId,
            "product_id" => $productId,
            "quantity" => $quantity,
        ]);

        return $orderId;
    }

    public function cancelOrder($orderId)
    {
        echo "订单服务:取消订单 $orderId...n";
        // 这里可以实际调用订单服务的API

        // 发布“订单已取消”事件
        $this->publishEvent("order.cancelled", [
            "order_id" => $orderId,
        ]);
    }

    public function completeOrder($orderId)
    {
        echo "订单服务:完成订单 $orderId...n";

        // 发布“订单已完成”事件
        $this->publishEvent("order.completed", [
            "order_id" => $orderId,
        ]);
    }

    private function publishEvent($eventName, $data)
    {
        // 模拟发布事件到消息队列
        echo "订单服务:发布事件 $eventName: " . json_encode($data) . "n";
        // 实际应用中,需要使用消息队列客户端(比如RabbitMQ、Kafka)来发布事件
    }
}

// 账户服务 (Account Service)
class AccountService
{
    public function __construct()
    {
        // 订阅“订单已创建”事件
        $this->subscribeEvent("order.created", function ($event) {
            $this->handleOrderCreated($event);
        });
    }

    private function handleOrderCreated($event)
    {
        $orderId = $event["order_id"];
        $accountId = $event["account_id"];
        $quantity = $event["quantity"];

        echo "账户服务:收到 '订单已创建' 事件,订单ID: $orderId, 账户ID: $accountId, 数量: $quantityn";

        try {
            $this->debit($accountId, $quantity);
            $this->publishEvent("account.debited", [
                "order_id" => $orderId,
                "account_id" => $accountId,
                "quantity" => $quantity,
            ]);
        } catch (Exception $e) {
            echo "账户服务:扣款失败: " . $e->getMessage() . "n";
            $this->publishEvent("account.debit_failed", [
                "order_id" => $orderId,
                "account_id" => $accountId,
                "quantity" => $quantity,
                "reason" => $e->getMessage(),
            ]);
        }
    }

    public function debit($accountId, $amount)
    {
        // 模拟扣减账户余额
        echo "账户服务:扣减账户 $accountId 余额 $amount...n";
        // 这里可以实际调用账户服务的API
        if (rand(0, 10) < 3) { // 模拟30%的失败率
            throw new Exception("账户余额不足");
        }
    }

    private function publishEvent($eventName, $data)
    {
        // 模拟发布事件到消息队列
        echo "账户服务:发布事件 $eventName: " . json_encode($data) . "n";
        // 实际应用中,需要使用消息队列客户端(比如RabbitMQ、Kafka)来发布事件
    }

    private function subscribeEvent($eventName, $callback)
    {
        // 模拟订阅事件
        echo "账户服务:订阅事件 $eventNamen";
        // 实际应用中,需要使用消息队列客户端来订阅事件
        // 这里只是简单地将事件名和回调函数保存起来,方便模拟事件触发
        $this->eventListeners[$eventName] = $callback;
    }

    public function triggerEvent($eventName, $eventData) {
        if (isset($this->eventListeners[$eventName])) {
            $callback = $this->eventListeners[$eventName];
            $callback($eventData);
        }
    }
}

// 库存服务 (Inventory Service)
class InventoryService
{
    public function __construct()
    {
        // 订阅“订单已创建”事件
        $this->subscribeEvent("order.created", function ($event) {
            $this->handleOrderCreated($event);
        });
    }

    private function handleOrderCreated($event)
    {
        $orderId = $event["order_id"];
        $productId = $event["product_id"];
        $quantity = $event["quantity"];

        echo "库存服务:收到 '订单已创建' 事件,订单ID: $orderId, 商品ID: $productId, 数量: $quantityn";

        try {
            $this->decreaseStock($productId, $quantity);
            $this->publishEvent("inventory.decreased", [
                "order_id" => $orderId,
                "product_id" => $productId,
                "quantity" => $quantity,
            ]);
        } catch (Exception $e) {
            echo "库存服务:减少库存失败: " . $e->getMessage() . "n";
            $this->publishEvent("inventory.decrease_failed", [
                "order_id" => $orderId,
                "product_id" => $productId,
                "quantity" => $quantity,
                "reason" => $e->getMessage(),
            ]);
        }
    }

    public function decreaseStock($productId, $quantity)
    {
        // 模拟减少商品库存
        echo "库存服务:减少商品 $productId 库存 $quantity...n";
        // 这里可以实际调用库存服务的API
        if (rand(0, 10) < 3) { // 模拟30%的失败率
            throw new Exception("库存不足");
        }
    }

    private function publishEvent($eventName, $data)
    {
        // 模拟发布事件到消息队列
        echo "库存服务:发布事件 $eventName: " . json_encode($data) . "n";
        // 实际应用中,需要使用消息队列客户端(比如RabbitMQ、Kafka)来发布事件
    }

    private function subscribeEvent($eventName, $callback)
    {
        // 模拟订阅事件
        echo "库存服务:订阅事件 $eventNamen";
        $this->eventListeners[$eventName] = $callback;
    }

    public function triggerEvent($eventName, $eventData) {
        if (isset($this->eventListeners[$eventName])) {
            $callback = $this->eventListeners[$eventName];
            $callback($eventData);
        }
    }
}

// 初始化服务
$orderService = new OrderService();
$accountService = new AccountService();
$inventoryService = new InventoryService();

// 模拟事件触发机制,由于php的特性,无法异步监听,此处模拟事件触发
$accountService->eventListeners = []; // 初始化事件监听器
$inventoryService->eventListeners = []; // 初始化事件监听器

// 模拟创建订单
$orderId = $orderService->createOrder("user123", "product456", 2);

// 模拟触发账户服务和库存服务的事件处理
if(isset($accountService->eventListeners['order.created'])){
    $accountService->triggerEvent('order.created', ["order_id"=>$orderId,"account_id"=>"user123","product_id"=>"product456","quantity"=>2]);
}

if(isset($inventoryService->eventListeners['order.created'])){
    $inventoryService->triggerEvent('order.created', ["order_id"=>$orderId,"account_id"=>"user123","product_id"=>"product456","quantity"=>2]);
}

?>

同样,这个代码也只是一个简化版的演示。 实际应用中,你需要:

  • 消息队列:使用消息队列来实现服务之间的异步通信。
  • 事件总线:可以使用事件总线(比如Broadway)来简化事件的发布和订阅。
  • 幂等性:同样需要保证每个本地事务和补偿操作都是幂等的。
  • 最终一致性:由于是异步通信,Saga的最终一致性可能会有一定的延迟。

编排式Saga vs 协作式Saga

特性 编排式Saga (Orchestration) 协作式Saga (Choreography)
协调器 有中心化的协调器 没有中心协调器
服务间通信 协调器直接与各个服务通信 服务之间通过事件进行通信
复杂性 协调器逻辑相对复杂,但服务逻辑简单 服务逻辑相对复杂,需要处理各种事件
可维护性 协调器代码容易维护,但如果Saga流程很复杂,协调器可能会变得臃肿 服务之间耦合度较低,更容易扩展和修改,但Saga流程可能难以追踪
适用场景 流程相对简单,需要集中控制的场景 流程复杂,服务需要高度自治的场景
事务隔离性 相对较好,协调器可以控制事务的执行顺序 较差,服务之间通过事件通信,事务隔离性较弱
优点 中心化控制,易于理解和调试 服务自治,松耦合,易于扩展
缺点 协调器可能成为瓶颈,单点故障 难以追踪Saga流程,服务之间可能存在循环依赖

选择哪种Saga实现方式?

选择哪种Saga实现方式取决于你的具体场景。

  • 如果你的Saga流程比较简单,需要集中控制,而且对事务隔离性要求较高,那么编排式Saga可能更适合你。
  • 如果你的Saga流程比较复杂,服务需要高度自治,而且对扩展性要求较高,那么协作式Saga可能更适合你。

Saga Pattern的挑战

Saga Pattern虽然强大,但也有一些挑战:

  • 复杂性:Saga Pattern本身就比较复杂,需要仔细设计和实现。
  • 调试:调试分布式事务比较困难,需要使用专门的工具和技术。
  • 幂等性:保证每个本地事务和补偿操作都是幂等的,需要花费额外的精力。
  • 最终一致性:由于是异步通信,Saga的最终一致性可能会有一定的延迟。

总结

Saga Pattern是一种解决分布式事务的有效方案,尤其是在微服务架构下。 它可以帮助你保证数据的一致性,提高系统的可用性和扩展性。 当然,它也有一些挑战,需要仔细设计和实现。 希望今天的讲解能让你对Saga Pattern有一个初步的了解。 记住,没有银弹,选择合适的方案才是最重要的。

好啦,今天的讲座就到这里,谢谢大家! 如果有什么问题,欢迎提问。 下次有机会再和大家聊聊其他有趣的技术话题。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注