基于Hyperf的分布式事务解决方案:TCC模式与Saga模式在PHP微服务中的落地

基于Hyperf的分布式事务解决方案:TCC模式与Saga模式在PHP微服务中的落地

各位朋友,大家好!今天我们来聊聊在PHP微服务架构下,如何利用Hyperf框架落地分布式事务,重点探讨TCC模式和Saga模式的实践方法。

在单体应用时代,我们通常依赖数据库自身的ACID特性来保证数据一致性。然而,在微服务架构中,服务之间的数据隔离和独立部署,使得传统的本地事务无法跨越多个服务边界。因此,我们需要引入分布式事务来保证跨服务的数据一致性。

1. 分布式事务的挑战与CAP理论

分布式事务面临的核心挑战在于如何保证多个服务的数据要么全部成功,要么全部失败。CAP理论(一致性、可用性、分区容错性)告诉我们,在分布式系统中,这三个特性无法同时满足。在实际应用中,我们往往需要在一致性和可用性之间做出权衡。

  • 一致性(Consistency): 所有节点在同一时间看到相同的数据。
  • 可用性(Availability): 系统在任何时候都能提供服务。
  • 分区容错性(Partition Tolerance): 系统在网络分区的情况下仍然能够正常运行。

微服务架构天生具有分区容错性,因此需要在一致性和可用性之间做选择。对于强一致性要求较高的场景,我们可以选择牺牲一定的可用性;而对于可用性要求较高的场景,我们可以选择最终一致性。

2. 分布式事务模式:TCC 与 Saga

目前常见的分布式事务模式有多种,例如2PC、3PC、TCC、Saga等。考虑到PHP的特性和Hyperf框架的优势,我们重点关注TCC模式和Saga模式。

2.1 TCC 模式 (Try-Confirm-Cancel)

TCC模式是一种补偿型事务模式,其核心思想是将业务逻辑分为三个阶段:

  • Try阶段: 尝试执行业务,完成所有业务检查(一致性),预留必须的业务资源(准隔离性)。
  • Confirm阶段: 真正执行业务,不作任何业务检查,只使用Try阶段预留的业务资源,Confirm操作满足幂等性。要求Try成功,Confirm一定能成功。
  • Cancel阶段: 释放Try阶段预留的业务资源,Cancel操作满足幂等性。

TCC模式的优点是性能较好,可以达到较强的一致性。缺点是需要对每个业务操作编写Try、Confirm、Cancel三个阶段的逻辑,开发量较大,且实现难度较高。

2.2 Saga 模式

Saga模式也是一种补偿型事务模式,其核心思想是将一个分布式事务拆分成多个本地事务,每个本地事务对应一个服务。Saga模式有两种实现方式:

  • 基于事件驱动的编排式 Saga(Orchestration-based Saga): 由一个中心化的 Saga 编排器(Orchestrator)负责协调各个参与者的本地事务。
  • 基于命令/事件的协作式 Saga(Choreography-based Saga): 各个参与者通过交换事件来协调彼此的本地事务,没有中心化的协调器。

Saga模式的优点是实现相对简单,对业务的侵入性较小。缺点是最终一致性,存在数据不一致的窗口期,且需要考虑补偿事务的幂等性。

3. Hyperf 下 TCC 模式的落地实践

我们以一个简单的电商场景为例,演示如何在Hyperf框架下落地TCC模式。假设我们需要实现一个订单创建流程,涉及两个服务:订单服务和库存服务。

  • 订单服务(Order Service): 负责创建订单。
  • 库存服务(Inventory Service): 负责扣减库存。

3.1 TCC 模式流程

  1. Try 阶段:
    • 订单服务:创建待支付状态的订单,预留订单所需的库存数量。
    • 库存服务:检查库存是否充足,预留库存。
  2. Confirm 阶段:
    • 订单服务:将订单状态更新为已支付。
    • 库存服务:扣减实际库存。
  3. Cancel 阶段:
    • 订单服务:取消订单,释放预留的库存数量。
    • 库存服务:释放预留的库存。

3.2 代码示例

3.2.1 订单服务 (Order Service)

<?php

namespace AppService;

use HyperfDbConnectionDb;
use AppRpcInventoryServiceInterface; // 假设库存服务接口

class OrderService
{
    /**
     * @var InventoryServiceInterface
     */
    private $inventoryService;

    public function __construct(InventoryServiceInterface $inventoryService)
    {
        $this->inventoryService = $inventoryService;
    }

    /**
     * Try 阶段:创建待支付订单,预留库存
     *
     * @param int $userId
     * @param int $productId
     * @param int $quantity
     * @return int Order ID
     * @throws Exception
     */
    public function createOrderTry(int $userId, int $productId, int $quantity): int
    {
        Db::beginTransaction();
        try {
            // 1. 创建待支付状态的订单
            $orderId = Db::table('orders')->insertGetId([
                'user_id' => $userId,
                'product_id' => $productId,
                'quantity' => $quantity,
                'status' => 'pending', // 待支付
                'created_at' => date('Y-m-d H:i:s'),
                'updated_at' => date('Y-m-d H:i:s'),
            ]);

            // 2. 调用库存服务的 Try 接口,预留库存
            $inventoryResult = $this->inventoryService->reserveInventory($productId, $quantity, $orderId);

            if (!$inventoryResult) {
                throw new Exception('库存预留失败');
            }

            Db::commit();
            return $orderId;

        } catch (Exception $e) {
            Db::rollBack();
            throw $e;
        }
    }

    /**
     * Confirm 阶段:将订单状态更新为已支付
     *
     * @param int $orderId
     * @return bool
     */
    public function createOrderConfirm(int $orderId): bool
    {
        try {
            // 1. 更新订单状态为已支付
            Db::table('orders')
                ->where('id', $orderId)
                ->update(['status' => 'paid', 'updated_at' => date('Y-m-d H:i:s')]);

            return true;
        } catch (Exception $e) {
            // 记录日志,Confirm 阶段失败需要人工介入
            HyperfLoggerLoggerFactory::getLogger()->error('Order Confirm Failed: ' . $e->getMessage());
            return false;
        }
    }

    /**
     * Cancel 阶段:取消订单,释放预留的库存数量
     *
     * @param int $orderId
     * @return bool
     */
    public function createOrderCancel(int $orderId): bool
    {
        try {
            // 1. 查询订单信息
            $order = Db::table('orders')->where('id', $orderId)->first();

            if (!$order) {
                return true; // 订单不存在,视为成功
            }

            // 2. 调用库存服务的 Cancel 接口,释放预留库存
            $inventoryResult = $this->inventoryService->releaseInventory($order->product_id, $order->quantity, $orderId);

            if (!$inventoryResult) {
                // 记录日志,Cancel 阶段失败需要人工介入
                HyperfLoggerLoggerFactory::getLogger()->error('Inventory Cancel Failed: ' . $e->getMessage());
                return false;
            }

            // 3. 删除订单
            Db::table('orders')->where('id', $orderId)->delete();

            return true;
        } catch (Exception $e) {
            // 记录日志,Cancel 阶段失败需要人工介入
            HyperfLoggerLoggerFactory::getLogger()->error('Order Cancel Failed: ' . $e->getMessage());
            return false;
        }
    }
}

3.2.2 库存服务 (Inventory Service)

<?php

namespace AppService;

use HyperfDbConnectionDb;

class InventoryService implements InventoryServiceInterface
{
    /**
     * Try 阶段:检查库存是否充足,预留库存
     *
     * @param int $productId
     * @param int $quantity
     * @param int $orderId
     * @return bool
     */
    public function reserveInventory(int $productId, int $quantity, int $orderId): bool
    {
        try {
            // 1. 检查库存是否充足
            $inventory = Db::table('inventories')->where('product_id', $productId)->first();

            if (!$inventory || $inventory->stock < $quantity) {
                return false; // 库存不足
            }

            // 2. 预留库存(扣减可用库存,增加预留库存)
            Db::table('inventories')
                ->where('product_id', $productId)
                ->update([
                    'stock' => $inventory->stock - $quantity,
                    'reserved_stock' => $inventory->reserved_stock + $quantity,
                    'updated_at' => date('Y-m-d H:i:s'),
                ]);

            // 3. 记录预留日志 (可选,方便回滚)
            Db::table('inventory_reservations')->insert([
                'product_id' => $productId,
                'quantity' => $quantity,
                'order_id' => $orderId,
                'type' => 'reserve',
                'created_at' => date('Y-m-d H:i:s'),
            ]);

            return true;
        } catch (Exception $e) {
            HyperfLoggerLoggerFactory::getLogger()->error('Reserve Inventory Failed: ' . $e->getMessage());
            return false;
        }
    }

    /**
     * Confirm 阶段:扣减实际库存
     *
     * @param int $productId
     * @param int $quantity
     * @param int $orderId
     * @return bool
     */
    public function deductInventory(int $productId, int $quantity, int $orderId): bool
    {
        try {
            // 1. 扣减预留库存
            Db::table('inventories')
                ->where('product_id', $productId)
                ->update([
                    'reserved_stock' => Db::raw('reserved_stock - ' . $quantity),
                    'sold_stock' => Db::raw('sold_stock + ' . $quantity),
                    'updated_at' => date('Y-m-d H:i:s'),
                ]);

            // 2. 记录扣减日志 (可选,方便回滚)
            Db::table('inventory_reservations')->insert([
                'product_id' => $productId,
                'quantity' => $quantity,
                'order_id' => $orderId,
                'type' => 'deduct',
                'created_at' => date('Y-m-d H:i:s'),
            ]);

            return true;
        } catch (Exception $e) {
            HyperfLoggerLoggerFactory::getLogger()->error('Deduct Inventory Failed: ' . $e->getMessage());
            return false;
        }
    }

    /**
     * Cancel 阶段:释放预留库存
     *
     * @param int $productId
     * @param int $quantity
     * @param int $orderId
     * @return bool
     */
    public function releaseInventory(int $productId, int $quantity, int $orderId): bool
    {
        try {
            // 1. 释放预留库存 (增加可用库存,减少预留库存)
            Db::table('inventories')
                ->where('product_id', $productId)
                ->update([
                    'stock' => Db::raw('stock + ' . $quantity),
                    'reserved_stock' => Db::raw('reserved_stock - ' . $quantity),
                    'updated_at' => date('Y-m-d H:i:s'),
                ]);

            // 2. 删除预留日志
            Db::table('inventory_reservations')->where('product_id', $productId)->where('order_id', $orderId)->where('type', 'reserve')->delete();

            return true;
        } catch (Exception $e) {
            HyperfLoggerLoggerFactory::getLogger()->error('Release Inventory Failed: ' . $e->getMessage());
            return false;
        }
    }
}

3.2.3 TCC 事务协调器

由于 TCC 模式需要协调各个服务的 Try、Confirm、Cancel 操作,因此我们需要一个 TCC 事务协调器。这个协调器可以使用消息队列、分布式事务中间件等实现。这里我们简单地使用一个本地事务协调器来模拟。

<?php

namespace AppService;

class TccCoordinator
{
    /**
     * 协调 TCC 事务
     *
     * @param callable $try
     * @param callable $confirm
     * @param callable $cancel
     * @return mixed
     * @throws Exception
     */
    public function coordinate(callable $try, callable $confirm, callable $cancel)
    {
        try {
            // 1. Try 阶段
            $tryResult = $try();

            // 2. Confirm 阶段
            try {
                $confirmResult = $confirm();
                return $tryResult; // 事务成功
            } catch (Exception $e) {
                // Confirm 失败,执行 Cancel 阶段
                $cancel();
                throw $e; // 抛出异常,通知上层事务失败
            }

        } catch (Exception $e) {
            // Try 失败,执行 Cancel 阶段
            $cancel();
            throw $e; // 抛出异常,通知上层事务失败
        }
    }
}

3.2.4 使用 TCC 协调器

<?php

namespace AppController;

use HyperfDiAnnotationInject;
use AppServiceOrderService;
use AppServiceTccCoordinator;

class OrderController
{
    /**
     * @Inject
     * @var OrderService
     */
    private $orderService;

    /**
     * @Inject
     * @var TccCoordinator
     */
    private $tccCoordinator;

    public function createOrder()
    {
        $userId = 1;
        $productId = 1001;
        $quantity = 2;

        try {
            $orderId = $this->tccCoordinator->coordinate(
                function () use ($userId, $productId, $quantity) {
                    // Try 阶段
                    return $this->orderService->createOrderTry($userId, $productId, $quantity);
                },
                function ($orderId) {
                    // Confirm 阶段
                    return $this->orderService->createOrderConfirm($orderId);
                },
                function ($orderId) {
                    // Cancel 阶段
                    return $this->orderService->createOrderCancel($orderId);
                }
            );

            return ['code' => 200, 'message' => '订单创建成功', 'order_id' => $orderId];

        } catch (Exception $e) {
            return ['code' => 500, 'message' => '订单创建失败: ' . $e->getMessage()];
        }
    }
}

4. Hyperf 下 Saga 模式的落地实践

我们仍然以订单创建流程为例,演示如何在Hyperf框架下落地Saga模式。

4.1 Saga 模式流程 (编排式 Saga)

  1. 订单服务:创建待支付状态的订单。
  2. 订单服务:发布 OrderCreatedEvent 事件。
  3. 库存服务:订阅 OrderCreatedEvent 事件,扣减库存。
  4. 如果库存扣减成功,库存服务发布 InventoryDeductedEvent 事件。
  5. 订单服务:订阅 InventoryDeductedEvent 事件,将订单状态更新为已支付。
  6. 如果库存扣减失败,库存服务发布 InventoryDeductionFailedEvent 事件。
  7. 订单服务:订阅 InventoryDeductionFailedEvent 事件,取消订单。

4.2 代码示例

4.2.1 事件定义

<?php

namespace AppEvent;

class OrderCreatedEvent
{
    public $orderId;
    public $userId;
    public $productId;
    public $quantity;

    public function __construct(int $orderId, int $userId, int $productId, int $quantity)
    {
        $this->orderId = $orderId;
        $this->userId = $userId;
        $this->productId = $productId;
        $this->quantity = $quantity;
    }
}

class InventoryDeductedEvent
{
    public $orderId;
    public $productId;
    public $quantity;

    public function __construct(int $orderId, int $productId, int $quantity)
    {
        $this->orderId = $orderId;
        $this->productId = $productId;
        $this->quantity = $quantity;
    }
}

class InventoryDeductionFailedEvent
{
    public $orderId;
    public $productId;
    public $quantity;

    public function __construct(int $orderId, int $productId, int $quantity)
    {
        $this->orderId = $orderId;
        $this->productId = $productId;
        $this->quantity = $quantity;
    }
}

4.2.2 事件监听器

<?php

namespace AppListener;

use AppEventOrderCreatedEvent;
use AppEventInventoryDeductionFailedEvent;
use AppEventInventoryDeductedEvent;
use AppServiceInventoryService;
use HyperfEventAnnotationListener;
use HyperfUtilsApplicationContext;

/**
 * @Listener
 */
class OrderCreatedListener
{
    public function listen(): array
    {
        return [
            OrderCreatedEvent::class,
        ];
    }

    public function process(object $event): void
    {
        /** @var InventoryService */
        $inventoryService = ApplicationContext::getContainer()->get(InventoryService::class);

        try {
            $inventoryService->deductInventory($event->productId, $event->quantity, $event->orderId);
            event(new InventoryDeductedEvent($event->orderId, $event->productId, $event->quantity));
        } catch (Exception $e) {
            event(new InventoryDeductionFailedEvent($event->orderId, $event->productId, $event->quantity));
        }
    }
}

/**
 * @Listener
 */
class InventoryDeductedListener
{
    public function listen(): array
    {
        return [
            InventoryDeductedEvent::class,
        ];
    }

    public function process(object $event): void
    {
        $orderService = ApplicationContext::getContainer()->get(AppServiceOrderService::class);
        $orderService->confirmOrder($event->orderId);
    }
}

/**
 * @Listener
 */
class InventoryDeductionFailedListener
{
    public function listen(): array
    {
        return [
            InventoryDeductionFailedEvent::class,
        ];
    }

    public function process(object $event): void
    {
        $orderService = ApplicationContext::getContainer()->get(AppServiceOrderService::class);
        $orderService->cancelOrder($event->orderId);
    }
}

4.2.3 订单服务和库存服务

订单服务需要修改 createOrder 方法,创建订单后发布 OrderCreatedEvent 事件。库存服务需要实现 deductInventory 方法,扣减库存。这里省略具体代码,与 TCC 模式中的代码类似,只是不需要 Try、Confirm、Cancel 三个阶段。

4.3 Saga 模式的注意事项

  • 幂等性: Saga 模式中的每个本地事务都需要保证幂等性,即多次执行的结果与一次执行的结果相同。
  • 补偿事务: Saga 模式需要考虑补偿事务,即当一个本地事务失败时,需要执行相应的补偿操作来回滚已经执行的事务。
  • 事件可靠性: Saga 模式依赖事件驱动,需要保证事件的可靠性,即事件不会丢失或重复消费。可以使用消息队列的持久化和事务机制来保证事件的可靠性。
  • 环形依赖: 在协作式 Saga 中,需要避免环形依赖,即服务 A 依赖服务 B,服务 B 依赖服务 C,服务 C 又依赖服务 A。

5. TCC vs Saga:选择哪种模式?

特性 TCC Saga
一致性 较强一致性 最终一致性
复杂度 较高 相对简单
侵入性 较高 较低
性能 较高 较低
适用场景 对数据一致性要求较高,允许牺牲一定可用性 对可用性要求较高,允许最终一致性
事务协调 需要专门的事务协调器 可通过事件驱动或编排器实现
补偿机制 需要实现 Cancel 接口 需要实现补偿事务

总结:针对实际场景选择适合的模式,做好异常处理与监控

选择 TCC 模式还是 Saga 模式,需要根据具体的业务场景和需求来决定。如果对数据一致性要求较高,且允许牺牲一定的可用性,可以选择 TCC 模式。如果对可用性要求较高,且允许最终一致性,可以选择 Saga 模式。无论选择哪种模式,都需要做好异常处理和监控,以便及时发现和处理问题。

6. 其他需要关注的点

  • 事务日志: 无论是 TCC 还是 Saga,都需要记录详细的事务日志,以便在出现问题时进行回滚和恢复。
  • 幂等性控制: 为了防止重复执行事务,需要对每个事务进行幂等性控制。可以使用唯一 ID、版本号等方式来实现幂等性。
  • 监控和告警: 需要对分布式事务进行监控和告警,以便及时发现和处理问题。可以监控事务的执行时间、成功率、失败率等指标。
  • 人工介入: 在某些情况下,自动补偿可能无法解决问题,需要人工介入进行处理。需要提供相应的工具和界面,方便人工介入。

希望今天的分享能够帮助大家更好地理解和应用分布式事务。谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注