PHP应用中的数据库事务:基于Redis/Etcd实现分布式事务(Saga/TCC)的模式

好的,下面开始我的讲座:

PHP应用中的数据库事务:基于Redis/Etcd实现分布式事务(Saga/TCC)的模式

各位朋友,大家好!今天我们来聊一聊PHP应用中数据库事务,特别是如何利用Redis和Etcd来实现分布式事务,重点关注Saga和TCC这两种模式。在微服务架构盛行的今天,跨多个服务的数据一致性是一个非常重要且复杂的问题。传统的ACID事务在单体应用中运行良好,但在分布式环境中,由于网络延迟、服务故障等因素,难以保证强一致性。因此,我们需要寻找新的解决方案。

1. 事务的必要性与挑战

在任何一个需要保证数据完整性的系统中,事务都是不可或缺的。在单体架构中,我们可以利用数据库提供的ACID特性来保证事务的正确性。但是,在微服务架构下,一个业务流程可能会涉及到多个服务,每个服务都有自己的数据库。这时,传统的ACID事务就难以适用。

  • 原子性(Atomicity): 要么全部成功,要么全部失败。在分布式系统中,涉及多个服务的操作,很难保证同时成功或失败。
  • 一致性(Consistency): 事务执行前后,数据必须处于一致状态。
  • 隔离性(Isolation): 多个并发事务之间互不影响。在分布式系统中,由于网络延迟,隔离性很难保证。
  • 持久性(Durability): 事务一旦提交,所做的修改就会永久保存。

2. 分布式事务的解决方案:Saga和TCC

为了解决分布式事务的问题,业界提出了多种解决方案,其中比较流行的就是Saga和TCC。

  • Saga模式: 将一个分布式事务分解为多个本地事务,每个本地事务由一个服务负责。Saga模式有两种实现方式:
    • 编排式Saga: 使用一个中心编排器来协调各个本地事务的执行。
    • 协同式Saga: 每个本地事务完成后,发布一个事件,其他服务监听这些事件并执行相应的操作。
  • TCC模式: TCC是Try-Confirm-Cancel的缩写。它将每个本地事务分为三个阶段:
    • Try阶段: 尝试执行业务,并预留所需的资源。
    • Confirm阶段: 确认执行业务,实际使用Try阶段预留的资源。
    • Cancel阶段: 取消执行业务,释放Try阶段预留的资源。

3. 基于Redis实现Saga模式

Redis可以用来实现Saga模式中的事件发布/订阅机制,以及状态管理。

3.1 编排式Saga的实现:

假设一个电商场景,用户下单流程涉及到订单服务、库存服务和支付服务。

  • 订单服务(Order Service): 创建订单。
  • 库存服务(Inventory Service): 扣减库存。
  • 支付服务(Payment Service): 完成支付。

实现步骤:

  1. 定义Saga编排器: 创建一个Saga类来协调各个服务的事务。
<?php

use PredisClient;

class SagaOrchestrator
{
    private $redis;
    private $sagaId;
    private $steps;
    private $currentStep = 0;
    private $status = 'PENDING';

    public function __construct(Client $redis, string $sagaId, array $steps)
    {
        $this->redis = $redis;
        $this->sagaId = $sagaId;
        $this->steps = $steps;
    }

    public function start()
    {
        $this->setStatus('RUNNING');
        $this->executeNextStep();
    }

    private function executeNextStep()
    {
        if ($this->currentStep < count($this->steps) && $this->status === 'RUNNING') {
            $step = $this->steps[$this->currentStep];
            try {
                $result = $step['action'](); // 执行当前步骤的动作
                $this->currentStep++;
                $this->saveState();
                if ($result) {
                    $this->executeNextStep(); // 如果当前步骤成功,则执行下一步
                } else {
                    $this->compensate(); // 如果当前步骤失败,则进行补偿
                }
            } catch (Exception $e) {
                error_log("Step failed: " . $e->getMessage());
                $this->compensate(); // 发生异常,进行补偿
            }
        } else {
            if ($this->status === 'RUNNING') {
              $this->setStatus('COMPLETED');
            }
        }
    }

    public function compensate()
    {
        $this->setStatus('ROLLING_BACK');
        for ($i = $this->currentStep - 1; $i >= 0; $i--) {
            $step = $this->steps[$i];
            if (isset($step['compensation'])) {
                try {
                    $step['compensation'](); // 执行补偿操作
                } catch (Exception $e) {
                    error_log("Compensation failed: " . $e->getMessage());
                    // 处理补偿失败的情况,可以重试或者人工干预
                }
            }
        }
        $this->setStatus('ROLLED_BACK');
    }

    private function setStatus(string $status)
    {
        $this->status = $status;
        $this->saveState();
    }

    private function saveState()
    {
        $state = [
            'sagaId' => $this->sagaId,
            'currentStep' => $this->currentStep,
            'status' => $this->status,
        ];
        $this->redis->set("saga:{$this->sagaId}", json_encode($state));
    }

    public function getStatus(): string
    {
        return $this->status;
    }

    public static function loadFromRedis(Client $redis, string $sagaId): ?SagaOrchestrator
    {
        $stateJson = $redis->get("saga:{$sagaId}");
        if (!$stateJson) {
            return null;
        }

        $state = json_decode($stateJson, true);

        // 模拟从数据库或者其他持久化存储加载Steps
        $steps = self::defineSteps(); // 这里需要替换成实际的步骤定义

        $orchestrator = new SagaOrchestrator($redis, $state['sagaId'], $steps);
        $orchestrator->currentStep = $state['currentStep'];
        $orchestrator->status = $state['status'];

        return $orchestrator;
    }

    public static function defineSteps(): array
    {
        // 模拟步骤定义
        return [
            [
                'action' => function () {
                    // 调用订单服务创建订单
                    return OrderService::createOrder();
                },
                'compensation' => function () {
                    // 调用订单服务取消订单
                    OrderService::cancelOrder();
                },
            ],
            [
                'action' => function () {
                    // 调用库存服务扣减库存
                    return InventoryService::decreaseStock();
                },
                'compensation' => function () {
                    // 调用库存服务增加库存
                    InventoryService::increaseStock();
                },
            ],
            [
                'action' => function () {
                    // 调用支付服务完成支付
                    return PaymentService::processPayment();
                },
                'compensation' => function () {
                    // 调用支付服务退款
                    PaymentService::refundPayment();
                },
            ],
        ];
    }
}

class OrderService {
    public static function createOrder(): bool {
        // 模拟创建订单逻辑
        echo "Creating order...n";
        // ... 实际创建订单的数据库操作
        return true; // 假设成功
    }

    public static function cancelOrder(): void {
        // 模拟取消订单逻辑
        echo "Canceling order...n";
        // ... 实际取消订单的数据库操作
    }
}

class InventoryService {
    public static function decreaseStock(): bool {
        // 模拟扣减库存逻辑
        echo "Decreasing stock...n";
        // ... 实际扣减库存的数据库操作
        return true; // 假设成功
    }

    public static function increaseStock(): void {
        // 模拟增加库存逻辑
        echo "Increasing stock...n";
        // ... 实际增加库存的数据库操作
    }
}

class PaymentService {
    public static function processPayment(): bool {
        // 模拟完成支付逻辑
        echo "Processing payment...n";
        // ... 实际完成支付的数据库操作
        return false; // 假设失败
    }

    public static function refundPayment(): void {
        // 模拟退款逻辑
        echo "Refunding payment...n";
        // ... 实际退款的数据库操作
    }
}

// 使用示例
$redis = new Client([
    'scheme' => 'tcp',
    'host'   => '127.0.0.1',
    'port'   => 6379,
]);

$sagaId = uniqid(); // 生成唯一的 Saga ID
$steps = SagaOrchestrator::defineSteps(); // 定义 Saga 步骤

$saga = new SagaOrchestrator($redis, $sagaId, $steps);
$saga->start();

echo "Saga status: " . $saga->getStatus() . "n"; // 输出 Saga 状态

// 模拟从Redis恢复Saga状态
$loadedSaga = SagaOrchestrator::loadFromRedis($redis, $sagaId);
if ($loadedSaga) {
    echo "Loaded Saga status: " . $loadedSaga->getStatus() . "n";
}
  1. 定义Saga步骤: 每个步骤包含一个action(执行动作)和一个compensation(补偿动作)。
  2. 执行Saga: Saga编排器按照步骤顺序执行,如果某个步骤失败,则执行补偿操作。
  3. 状态持久化: 使用Redis来保存Saga的状态,包括当前步骤和状态。

3.2 协同式Saga的实现:

使用Redis的发布/订阅功能来实现服务之间的事件通知。

实现步骤:

  1. 定义事件: 定义Saga中涉及的事件,例如OrderCreatedStockDecreasedPaymentProcessed等。
  2. 服务发布事件: 每个服务在完成本地事务后,发布一个事件到Redis的Channel。
  3. 服务订阅事件: 其他服务订阅相关的事件,并执行相应的操作。
  4. 补偿机制: 如果某个服务失败,发布补偿事件,其他服务监听这些事件并执行补偿操作。
<?php

use PredisClient;

class EventPublisher
{
    private $redis;

    public function __construct(Client $redis)
    {
        $this->redis = $redis;
    }

    public function publish(string $channel, array $message)
    {
        $this->redis->publish($channel, json_encode($message));
    }
}

class EventSubscriber
{
    private $redis;
    private $channel;
    private $callback;

    public function __construct(Client $redis, string $channel, callable $callback)
    {
        $this->redis = $redis;
        $this->channel = $channel;
        $this->callback = $callback;
    }

    public function subscribe()
    {
        $pubSub = $this->redis->pubSubLoop();
        $pubSub->subscribe($this->channel);

        foreach ($pubSub as $message) {
            switch ($message->kind) {
                case 'subscribe':
                    echo "Subscribed to {$message->channel}n";
                    break;
                case 'message':
                    $data = json_decode($message->payload, true);
                    call_user_func($this->callback, $data);
                    break;
            }
        }
    }
}

// 示例:订单服务
class OrderService
{
    private $redis;
    private $publisher;

    public function __construct(Client $redis, EventPublisher $publisher)
    {
        $this->redis = $redis;
        $this->publisher = $publisher;
    }

    public function createOrder(array $orderData)
    {
        // 创建订单的本地事务
        // ...
        $orderId = uniqid(); // 假设生成订单ID

        // 发布 OrderCreated 事件
        $this->publisher->publish('order_events', [
            'event' => 'OrderCreated',
            'order_id' => $orderId,
            'order_data' => $orderData,
        ]);

        return $orderId;
    }

    public function cancelOrder(string $orderId)
    {
        // 取消订单的本地事务
        // ...

        // 发布 OrderCancelled 事件
        $this->publisher->publish('order_events', [
            'event' => 'OrderCancelled',
            'order_id' => $orderId,
        ]);
    }
}

// 示例:库存服务
class InventoryService
{
    private $redis;
    private $publisher;

    public function __construct(Client $redis, EventPublisher $publisher)
    {
        $this->redis = $redis;
        $this->publisher = $publisher;
    }

    public function decreaseStock(string $orderId, array $items)
    {
        // 扣减库存的本地事务
        // ...

        // 发布 StockDecreased 事件
        $this->publisher->publish('inventory_events', [
            'event' => 'StockDecreased',
            'order_id' => $orderId,
            'items' => $items,
        ]);
    }

    public function increaseStock(string $orderId, array $items)
    {
        // 增加库存的本地事务 (补偿操作)
        // ...

        // 发布 StockIncreased 事件
        $this->publisher->publish('inventory_events', [
            'event' => 'StockIncreased',
            'order_id' => $orderId,
            'items' => $items,
        ]);
    }
}

// 示例:支付服务
class PaymentService
{
    private $redis;
    private $publisher;

    public function __construct(Client $redis, EventPublisher $publisher)
    {
        $this->redis = $redis;
        $this->publisher = $publisher;
    }

    public function processPayment(string $orderId, float $amount)
    {
        // 处理支付的本地事务
        // ...

        // 发布 PaymentProcessed 事件
        $this->publisher->publish('payment_events', [
            'event' => 'PaymentProcessed',
            'order_id' => $orderId,
            'amount' => $amount,
        ]);
    }

    public function refundPayment(string $orderId, float $amount)
    {
        // 退款的本地事务 (补偿操作)
        // ...

        // 发布 PaymentRefunded 事件
        $this->publisher->publish('payment_events', [
            'event' => 'PaymentRefunded',
            'order_id' => $orderId,
            'amount' => $amount,
        ]);
    }
}

// 使用示例
$redis = new Client([
    'scheme' => 'tcp',
    'host'   => '127.0.0.1',
    'port'   => 6379,
]);

$eventPublisher = new EventPublisher($redis);

// 订单服务
$orderService = new OrderService($redis, $eventPublisher);
$orderId = $orderService->createOrder(['product_id' => '123', 'quantity' => 2]);

// 库存服务
$inventoryService = new InventoryService($redis, $eventPublisher);
$inventorySubscriber = new EventSubscriber($redis, 'order_events', function ($data) use ($inventoryService) {
    if ($data['event'] === 'OrderCreated') {
        $inventoryService->decreaseStock($data['order_id'], [['product_id' => '123', 'quantity' => 2]]);
    } elseif ($data['event'] === 'OrderCancelled') {
        $inventoryService->increaseStock($data['order_id'], [['product_id' => '123', 'quantity' => 2]]);
    }
});

// 支付服务
$paymentService = new PaymentService($redis, $eventPublisher);
$paymentSubscriber = new EventSubscriber($redis, 'inventory_events', function ($data) use ($paymentService) {
    if ($data['event'] === 'StockDecreased') {
        $paymentService->processPayment($data['order_id'], 100.00); // 假设金额为 100.00
    }
});

// 启动订阅者 (需要在不同的进程或线程中运行)
// 实际应用中,这些订阅者会在各自的服务中运行
// $inventorySubscriber->subscribe();
// $paymentSubscriber->subscribe();

echo "Order created with ID: " . $orderId . "n";

4. 基于Etcd实现TCC模式

Etcd是一个分布式键值存储系统,可以用来实现TCC模式中的事务协调器。Etcd可以保证数据的一致性和可靠性,非常适合用来存储事务的状态。

4.1 TCC模式的实现:

仍然以电商场景为例,用户下单流程涉及到订单服务、库存服务和支付服务。

实现步骤:

  1. 定义TCC协调器: 创建一个TCC协调器,负责协调各个服务的Try、Confirm和Cancel操作。
<?php

use EtcdClient;

class TccCoordinator
{
    private $etcd;
    private $transactionId;
    private $participants = [];
    private $status = 'PENDING';

    public function __construct(Client $etcd, string $transactionId)
    {
        $this->etcd = $etcd;
        $this->transactionId = $transactionId;
    }

    public function registerParticipant(string $serviceName, callable $try, callable $confirm, callable $cancel)
    {
        $this->participants[$serviceName] = [
            'try' => $try,
            'confirm' => $confirm,
            'cancel' => $cancel,
            'status' => 'PENDING',
        ];
    }

    public function start()
    {
        $this->setStatus('TRYING');
        foreach ($this->participants as $serviceName => &$participant) {
            try {
                $result = $participant['try'](); // 执行Try阶段
                if ($result) {
                    $participant['status'] = 'TRY_SUCCESS';
                } else {
                    $participant['status'] = 'TRY_FAILED';
                    $this->rollback();
                    return false;
                }
            } catch (Exception $e) {
                error_log("Try failed for {$serviceName}: " . $e->getMessage());
                $participant['status'] = 'TRY_FAILED';
                $this->rollback();
                return false;
            }
            $this->saveState();
        }

        $this->confirm();
        return true;
    }

    private function confirm()
    {
        $this->setStatus('CONFIRMING');
        foreach ($this->participants as $serviceName => &$participant) {
            if ($participant['status'] === 'TRY_SUCCESS') {
                try {
                    $participant['confirm'](); // 执行Confirm阶段
                    $participant['status'] = 'CONFIRM_SUCCESS';
                } catch (Exception $e) {
                    error_log("Confirm failed for {$serviceName}: " . $e->getMessage());
                    $participant['status'] = 'CONFIRM_FAILED';
                    // Confirm 阶段失败,需要重试或者人工干预
                }
                $this->saveState();
            }
        }
        $this->setStatus('CONFIRMED');
    }

    private function rollback()
    {
        $this->setStatus('ROLLING_BACK');
        foreach ($this->participants as $serviceName => &$participant) {
            if ($participant['status'] === 'TRY_SUCCESS' || $participant['status'] === 'CONFIRM_SUCCESS') {
                try {
                    $participant['cancel'](); // 执行Cancel阶段
                    $participant['status'] = 'CANCEL_SUCCESS';
                } catch (Exception $e) {
                    error_log("Cancel failed for {$serviceName}: " . $e->getMessage());
                    $participant['status'] = 'CANCEL_FAILED';
                    // Cancel 阶段失败,需要重试或者人工干预
                }
                $this->saveState();
            }
        }
        $this->setStatus('ROLLED_BACK');
    }

    private function setStatus(string $status)
    {
        $this->status = $status;
        $this->saveState();
    }

    private function saveState()
    {
        $state = [
            'transactionId' => $this->transactionId,
            'participants' => $this->participants,
            'status' => $this->status,
        ];
        $this->etcd->set("/tcc/{$this->transactionId}", json_encode($state));
    }

    public function getStatus(): string
    {
        return $this->status;
    }

    public static function loadFromEtcd(Client $etcd, string $transactionId): ?TccCoordinator
    {
        try {
            $response = $etcd->get("/tcc/{$transactionId}");
            $stateJson = $response->node->value;
            $state = json_decode($stateJson, true);

            $coordinator = new TccCoordinator($etcd, $state['transactionId']);
            $coordinator->participants = $state['participants'];
            $coordinator->status = $state['status'];

            return $coordinator;

        } catch (Exception $e) {
            // 没有找到对应transactionId,或者Etcd连接失败
            return null;
        }
    }
}

// 示例:订单服务
class OrderService
{
    public static function tryCreateOrder(): bool
    {
        // 尝试创建订单,预留资源
        echo "Trying to create order...n";
        // ... 实际的 Try 逻辑
        return true; // 假设成功
    }

    public static function confirmCreateOrder(): void
    {
        // 确认创建订单,实际使用资源
        echo "Confirming create order...n";
        // ... 实际的 Confirm 逻辑
    }

    public static function cancelCreateOrder(): void
    {
        // 取消创建订单,释放资源
        echo "Canceling create order...n";
        // ... 实际的 Cancel 逻辑
    }
}

// 示例:库存服务
class InventoryService
{
    public static function tryDecreaseStock(): bool
    {
        // 尝试扣减库存,预留资源
        echo "Trying to decrease stock...n";
        // ... 实际的 Try 逻辑
        return true; // 假设成功
    }

    public static function confirmDecreaseStock(): void
    {
        // 确认扣减库存,实际使用资源
        echo "Confirming decrease stock...n";
        // ... 实际的 Confirm 逻辑
    }

    public static function cancelDecreaseStock(): void
    {
        // 取消扣减库存,释放资源
        echo "Canceling decrease stock...n";
        // ... 实际的 Cancel 逻辑
    }
}

// 示例:支付服务
class PaymentService
{
    public static function tryProcessPayment(): bool
    {
        // 尝试处理支付,预留资源
        echo "Trying to process payment...n";
        // ... 实际的 Try 逻辑
        return false; // 假设失败
    }

    public static function confirmProcessPayment(): void
    {
        // 确认处理支付,实际使用资源
        echo "Confirming process payment...n";
        // ... 实际的 Confirm 逻辑
    }

    public static function cancelProcessPayment(): void
    {
        // 取消处理支付,释放资源
        echo "Canceling process payment...n";
        // ... 实际的 Cancel 逻辑
    }
}

// 使用示例
$etcd = new Client([
    'host' => '127.0.0.1',
    'port' => 2379,
]);

$transactionId = uniqid(); // 生成唯一的 Transaction ID

$coordinator = new TccCoordinator($etcd, $transactionId);

// 注册参与者
$coordinator->registerParticipant('order_service', [OrderService::class, 'tryCreateOrder'], [OrderService::class, 'confirmCreateOrder'], [OrderService::class, 'cancelCreateOrder']);
$coordinator->registerParticipant('inventory_service', [InventoryService::class, 'tryDecreaseStock'], [InventoryService::class, 'confirmDecreaseStock'], [InventoryService::class, 'cancelDecreaseStock']);
$coordinator->registerParticipant('payment_service', [PaymentService::class, 'tryProcessPayment'], [PaymentService::class, 'confirmProcessPayment'], [PaymentService::class, 'cancelProcessPayment']);

// 启动事务
$coordinator->start();

echo "Transaction status: " . $coordinator->getStatus() . "n";

// 模拟从Etcd恢复TCC状态
$loadedCoordinator = TccCoordinator::loadFromEtcd($etcd, $transactionId);
if ($loadedCoordinator) {
    echo "Loaded Transaction status: " . $loadedCoordinator->getStatus() . "n";
}
  1. 注册参与者: 将涉及的每个服务注册到TCC协调器,并提供Try、Confirm和Cancel方法的实现。
  2. 执行Try阶段: TCC协调器依次调用每个服务的Try方法,如果所有服务的Try方法都成功,则进入Confirm阶段,否则进入Cancel阶段。
  3. 执行Confirm阶段: TCC协调器依次调用每个服务的Confirm方法,完成事务。
  4. 执行Cancel阶段: TCC协调器依次调用每个服务的Cancel方法,回滚事务。
  5. 状态持久化: 使用Etcd来保存事务的状态,包括参与者状态和事务状态。

5. 模式选择的考量

选择Saga还是TCC,需要根据实际业务场景进行权衡。

特性 Saga TCC
一致性级别 最终一致性 强一致性(相对)
实现复杂度 相对简单 相对复杂,需要预留资源
性能 较高,无需锁定资源 较低,需要锁定资源
适用场景 对一致性要求不高,允许最终一致性的场景 对一致性要求较高,需要保证强一致性的场景
补偿机制 补偿操作可能比较复杂,需要考虑幂等性 Cancel操作相对简单,但需要保证资源预留

6. 进一步的思考

  • 幂等性: 无论是Saga还是TCC,都需要保证操作的幂等性,即多次执行相同的操作,结果应该是一致的。
  • 重试机制: 在分布式环境中,服务可能会出现故障,因此需要实现重试机制来保证事务的最终成功。
  • 监控和告警: 需要对分布式事务进行监控和告警,以便及时发现和处理问题。
  • 事务日志: 记录事务的执行过程,方便进行故障排查和数据恢复。

7. 总结:分布式事务并非银弹,谨慎选择

分布式事务的实现,无论是Saga还是TCC,都带来了额外的复杂性。选择哪种方案,需要结合业务场景,权衡一致性、性能和实现复杂度。在选择方案时,务必充分考虑各种因素,并进行充分的测试,以确保系统的稳定性和可靠性。

感谢大家的聆听!希望今天的讲座对大家有所帮助。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注