好的,下面开始我的讲座:
PHP应用中的数据库事务:基于Redis/Etcd实现分布式事务(Saga/TCC)的模式
各位朋友,大家好!今天我们来聊一聊PHP应用中数据库事务,特别是如何利用Redis和Etcd来实现分布式事务,重点关注Saga和TCC这两种模式。在微服务架构盛行的今天,跨多个服务的数据一致性是一个非常重要且复杂的问题。传统的ACID事务在单体应用中运行良好,但在分布式环境中,由于网络延迟、服务故障等因素,难以保证强一致性。因此,我们需要寻找新的解决方案。
1. 事务的必要性与挑战
在任何一个需要保证数据完整性的系统中,事务都是不可或缺的。在单体架构中,我们可以利用数据库提供的ACID特性来保证事务的正确性。但是,在微服务架构下,一个业务流程可能会涉及到多个服务,每个服务都有自己的数据库。这时,传统的ACID事务就难以适用。
- 原子性(Atomicity): 要么全部成功,要么全部失败。在分布式系统中,涉及多个服务的操作,很难保证同时成功或失败。
- 一致性(Consistency): 事务执行前后,数据必须处于一致状态。
- 隔离性(Isolation): 多个并发事务之间互不影响。在分布式系统中,由于网络延迟,隔离性很难保证。
- 持久性(Durability): 事务一旦提交,所做的修改就会永久保存。
2. 分布式事务的解决方案:Saga和TCC
为了解决分布式事务的问题,业界提出了多种解决方案,其中比较流行的就是Saga和TCC。
- Saga模式: 将一个分布式事务分解为多个本地事务,每个本地事务由一个服务负责。Saga模式有两种实现方式:
- 编排式Saga: 使用一个中心编排器来协调各个本地事务的执行。
- 协同式Saga: 每个本地事务完成后,发布一个事件,其他服务监听这些事件并执行相应的操作。
- TCC模式: TCC是Try-Confirm-Cancel的缩写。它将每个本地事务分为三个阶段:
- Try阶段: 尝试执行业务,并预留所需的资源。
- Confirm阶段: 确认执行业务,实际使用Try阶段预留的资源。
- Cancel阶段: 取消执行业务,释放Try阶段预留的资源。
3. 基于Redis实现Saga模式
Redis可以用来实现Saga模式中的事件发布/订阅机制,以及状态管理。
3.1 编排式Saga的实现:
假设一个电商场景,用户下单流程涉及到订单服务、库存服务和支付服务。
- 订单服务(Order Service): 创建订单。
- 库存服务(Inventory Service): 扣减库存。
- 支付服务(Payment Service): 完成支付。
实现步骤:
- 定义Saga编排器: 创建一个Saga类来协调各个服务的事务。
<?php
use PredisClient;
class SagaOrchestrator
{
private $redis;
private $sagaId;
private $steps;
private $currentStep = 0;
private $status = 'PENDING';
public function __construct(Client $redis, string $sagaId, array $steps)
{
$this->redis = $redis;
$this->sagaId = $sagaId;
$this->steps = $steps;
}
public function start()
{
$this->setStatus('RUNNING');
$this->executeNextStep();
}
private function executeNextStep()
{
if ($this->currentStep < count($this->steps) && $this->status === 'RUNNING') {
$step = $this->steps[$this->currentStep];
try {
$result = $step['action'](); // 执行当前步骤的动作
$this->currentStep++;
$this->saveState();
if ($result) {
$this->executeNextStep(); // 如果当前步骤成功,则执行下一步
} else {
$this->compensate(); // 如果当前步骤失败,则进行补偿
}
} catch (Exception $e) {
error_log("Step failed: " . $e->getMessage());
$this->compensate(); // 发生异常,进行补偿
}
} else {
if ($this->status === 'RUNNING') {
$this->setStatus('COMPLETED');
}
}
}
public function compensate()
{
$this->setStatus('ROLLING_BACK');
for ($i = $this->currentStep - 1; $i >= 0; $i--) {
$step = $this->steps[$i];
if (isset($step['compensation'])) {
try {
$step['compensation'](); // 执行补偿操作
} catch (Exception $e) {
error_log("Compensation failed: " . $e->getMessage());
// 处理补偿失败的情况,可以重试或者人工干预
}
}
}
$this->setStatus('ROLLED_BACK');
}
private function setStatus(string $status)
{
$this->status = $status;
$this->saveState();
}
private function saveState()
{
$state = [
'sagaId' => $this->sagaId,
'currentStep' => $this->currentStep,
'status' => $this->status,
];
$this->redis->set("saga:{$this->sagaId}", json_encode($state));
}
public function getStatus(): string
{
return $this->status;
}
public static function loadFromRedis(Client $redis, string $sagaId): ?SagaOrchestrator
{
$stateJson = $redis->get("saga:{$sagaId}");
if (!$stateJson) {
return null;
}
$state = json_decode($stateJson, true);
// 模拟从数据库或者其他持久化存储加载Steps
$steps = self::defineSteps(); // 这里需要替换成实际的步骤定义
$orchestrator = new SagaOrchestrator($redis, $state['sagaId'], $steps);
$orchestrator->currentStep = $state['currentStep'];
$orchestrator->status = $state['status'];
return $orchestrator;
}
public static function defineSteps(): array
{
// 模拟步骤定义
return [
[
'action' => function () {
// 调用订单服务创建订单
return OrderService::createOrder();
},
'compensation' => function () {
// 调用订单服务取消订单
OrderService::cancelOrder();
},
],
[
'action' => function () {
// 调用库存服务扣减库存
return InventoryService::decreaseStock();
},
'compensation' => function () {
// 调用库存服务增加库存
InventoryService::increaseStock();
},
],
[
'action' => function () {
// 调用支付服务完成支付
return PaymentService::processPayment();
},
'compensation' => function () {
// 调用支付服务退款
PaymentService::refundPayment();
},
],
];
}
}
class OrderService {
public static function createOrder(): bool {
// 模拟创建订单逻辑
echo "Creating order...n";
// ... 实际创建订单的数据库操作
return true; // 假设成功
}
public static function cancelOrder(): void {
// 模拟取消订单逻辑
echo "Canceling order...n";
// ... 实际取消订单的数据库操作
}
}
class InventoryService {
public static function decreaseStock(): bool {
// 模拟扣减库存逻辑
echo "Decreasing stock...n";
// ... 实际扣减库存的数据库操作
return true; // 假设成功
}
public static function increaseStock(): void {
// 模拟增加库存逻辑
echo "Increasing stock...n";
// ... 实际增加库存的数据库操作
}
}
class PaymentService {
public static function processPayment(): bool {
// 模拟完成支付逻辑
echo "Processing payment...n";
// ... 实际完成支付的数据库操作
return false; // 假设失败
}
public static function refundPayment(): void {
// 模拟退款逻辑
echo "Refunding payment...n";
// ... 实际退款的数据库操作
}
}
// 使用示例
$redis = new Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);
$sagaId = uniqid(); // 生成唯一的 Saga ID
$steps = SagaOrchestrator::defineSteps(); // 定义 Saga 步骤
$saga = new SagaOrchestrator($redis, $sagaId, $steps);
$saga->start();
echo "Saga status: " . $saga->getStatus() . "n"; // 输出 Saga 状态
// 模拟从Redis恢复Saga状态
$loadedSaga = SagaOrchestrator::loadFromRedis($redis, $sagaId);
if ($loadedSaga) {
echo "Loaded Saga status: " . $loadedSaga->getStatus() . "n";
}
- 定义Saga步骤: 每个步骤包含一个
action(执行动作)和一个compensation(补偿动作)。 - 执行Saga: Saga编排器按照步骤顺序执行,如果某个步骤失败,则执行补偿操作。
- 状态持久化: 使用Redis来保存Saga的状态,包括当前步骤和状态。
3.2 协同式Saga的实现:
使用Redis的发布/订阅功能来实现服务之间的事件通知。
实现步骤:
- 定义事件: 定义Saga中涉及的事件,例如
OrderCreated、StockDecreased、PaymentProcessed等。 - 服务发布事件: 每个服务在完成本地事务后,发布一个事件到Redis的Channel。
- 服务订阅事件: 其他服务订阅相关的事件,并执行相应的操作。
- 补偿机制: 如果某个服务失败,发布补偿事件,其他服务监听这些事件并执行补偿操作。
<?php
use PredisClient;
class EventPublisher
{
private $redis;
public function __construct(Client $redis)
{
$this->redis = $redis;
}
public function publish(string $channel, array $message)
{
$this->redis->publish($channel, json_encode($message));
}
}
class EventSubscriber
{
private $redis;
private $channel;
private $callback;
public function __construct(Client $redis, string $channel, callable $callback)
{
$this->redis = $redis;
$this->channel = $channel;
$this->callback = $callback;
}
public function subscribe()
{
$pubSub = $this->redis->pubSubLoop();
$pubSub->subscribe($this->channel);
foreach ($pubSub as $message) {
switch ($message->kind) {
case 'subscribe':
echo "Subscribed to {$message->channel}n";
break;
case 'message':
$data = json_decode($message->payload, true);
call_user_func($this->callback, $data);
break;
}
}
}
}
// 示例:订单服务
class OrderService
{
private $redis;
private $publisher;
public function __construct(Client $redis, EventPublisher $publisher)
{
$this->redis = $redis;
$this->publisher = $publisher;
}
public function createOrder(array $orderData)
{
// 创建订单的本地事务
// ...
$orderId = uniqid(); // 假设生成订单ID
// 发布 OrderCreated 事件
$this->publisher->publish('order_events', [
'event' => 'OrderCreated',
'order_id' => $orderId,
'order_data' => $orderData,
]);
return $orderId;
}
public function cancelOrder(string $orderId)
{
// 取消订单的本地事务
// ...
// 发布 OrderCancelled 事件
$this->publisher->publish('order_events', [
'event' => 'OrderCancelled',
'order_id' => $orderId,
]);
}
}
// 示例:库存服务
class InventoryService
{
private $redis;
private $publisher;
public function __construct(Client $redis, EventPublisher $publisher)
{
$this->redis = $redis;
$this->publisher = $publisher;
}
public function decreaseStock(string $orderId, array $items)
{
// 扣减库存的本地事务
// ...
// 发布 StockDecreased 事件
$this->publisher->publish('inventory_events', [
'event' => 'StockDecreased',
'order_id' => $orderId,
'items' => $items,
]);
}
public function increaseStock(string $orderId, array $items)
{
// 增加库存的本地事务 (补偿操作)
// ...
// 发布 StockIncreased 事件
$this->publisher->publish('inventory_events', [
'event' => 'StockIncreased',
'order_id' => $orderId,
'items' => $items,
]);
}
}
// 示例:支付服务
class PaymentService
{
private $redis;
private $publisher;
public function __construct(Client $redis, EventPublisher $publisher)
{
$this->redis = $redis;
$this->publisher = $publisher;
}
public function processPayment(string $orderId, float $amount)
{
// 处理支付的本地事务
// ...
// 发布 PaymentProcessed 事件
$this->publisher->publish('payment_events', [
'event' => 'PaymentProcessed',
'order_id' => $orderId,
'amount' => $amount,
]);
}
public function refundPayment(string $orderId, float $amount)
{
// 退款的本地事务 (补偿操作)
// ...
// 发布 PaymentRefunded 事件
$this->publisher->publish('payment_events', [
'event' => 'PaymentRefunded',
'order_id' => $orderId,
'amount' => $amount,
]);
}
}
// 使用示例
$redis = new Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);
$eventPublisher = new EventPublisher($redis);
// 订单服务
$orderService = new OrderService($redis, $eventPublisher);
$orderId = $orderService->createOrder(['product_id' => '123', 'quantity' => 2]);
// 库存服务
$inventoryService = new InventoryService($redis, $eventPublisher);
$inventorySubscriber = new EventSubscriber($redis, 'order_events', function ($data) use ($inventoryService) {
if ($data['event'] === 'OrderCreated') {
$inventoryService->decreaseStock($data['order_id'], [['product_id' => '123', 'quantity' => 2]]);
} elseif ($data['event'] === 'OrderCancelled') {
$inventoryService->increaseStock($data['order_id'], [['product_id' => '123', 'quantity' => 2]]);
}
});
// 支付服务
$paymentService = new PaymentService($redis, $eventPublisher);
$paymentSubscriber = new EventSubscriber($redis, 'inventory_events', function ($data) use ($paymentService) {
if ($data['event'] === 'StockDecreased') {
$paymentService->processPayment($data['order_id'], 100.00); // 假设金额为 100.00
}
});
// 启动订阅者 (需要在不同的进程或线程中运行)
// 实际应用中,这些订阅者会在各自的服务中运行
// $inventorySubscriber->subscribe();
// $paymentSubscriber->subscribe();
echo "Order created with ID: " . $orderId . "n";
4. 基于Etcd实现TCC模式
Etcd是一个分布式键值存储系统,可以用来实现TCC模式中的事务协调器。Etcd可以保证数据的一致性和可靠性,非常适合用来存储事务的状态。
4.1 TCC模式的实现:
仍然以电商场景为例,用户下单流程涉及到订单服务、库存服务和支付服务。
实现步骤:
- 定义TCC协调器: 创建一个TCC协调器,负责协调各个服务的Try、Confirm和Cancel操作。
<?php
use EtcdClient;
class TccCoordinator
{
private $etcd;
private $transactionId;
private $participants = [];
private $status = 'PENDING';
public function __construct(Client $etcd, string $transactionId)
{
$this->etcd = $etcd;
$this->transactionId = $transactionId;
}
public function registerParticipant(string $serviceName, callable $try, callable $confirm, callable $cancel)
{
$this->participants[$serviceName] = [
'try' => $try,
'confirm' => $confirm,
'cancel' => $cancel,
'status' => 'PENDING',
];
}
public function start()
{
$this->setStatus('TRYING');
foreach ($this->participants as $serviceName => &$participant) {
try {
$result = $participant['try'](); // 执行Try阶段
if ($result) {
$participant['status'] = 'TRY_SUCCESS';
} else {
$participant['status'] = 'TRY_FAILED';
$this->rollback();
return false;
}
} catch (Exception $e) {
error_log("Try failed for {$serviceName}: " . $e->getMessage());
$participant['status'] = 'TRY_FAILED';
$this->rollback();
return false;
}
$this->saveState();
}
$this->confirm();
return true;
}
private function confirm()
{
$this->setStatus('CONFIRMING');
foreach ($this->participants as $serviceName => &$participant) {
if ($participant['status'] === 'TRY_SUCCESS') {
try {
$participant['confirm'](); // 执行Confirm阶段
$participant['status'] = 'CONFIRM_SUCCESS';
} catch (Exception $e) {
error_log("Confirm failed for {$serviceName}: " . $e->getMessage());
$participant['status'] = 'CONFIRM_FAILED';
// Confirm 阶段失败,需要重试或者人工干预
}
$this->saveState();
}
}
$this->setStatus('CONFIRMED');
}
private function rollback()
{
$this->setStatus('ROLLING_BACK');
foreach ($this->participants as $serviceName => &$participant) {
if ($participant['status'] === 'TRY_SUCCESS' || $participant['status'] === 'CONFIRM_SUCCESS') {
try {
$participant['cancel'](); // 执行Cancel阶段
$participant['status'] = 'CANCEL_SUCCESS';
} catch (Exception $e) {
error_log("Cancel failed for {$serviceName}: " . $e->getMessage());
$participant['status'] = 'CANCEL_FAILED';
// Cancel 阶段失败,需要重试或者人工干预
}
$this->saveState();
}
}
$this->setStatus('ROLLED_BACK');
}
private function setStatus(string $status)
{
$this->status = $status;
$this->saveState();
}
private function saveState()
{
$state = [
'transactionId' => $this->transactionId,
'participants' => $this->participants,
'status' => $this->status,
];
$this->etcd->set("/tcc/{$this->transactionId}", json_encode($state));
}
public function getStatus(): string
{
return $this->status;
}
public static function loadFromEtcd(Client $etcd, string $transactionId): ?TccCoordinator
{
try {
$response = $etcd->get("/tcc/{$transactionId}");
$stateJson = $response->node->value;
$state = json_decode($stateJson, true);
$coordinator = new TccCoordinator($etcd, $state['transactionId']);
$coordinator->participants = $state['participants'];
$coordinator->status = $state['status'];
return $coordinator;
} catch (Exception $e) {
// 没有找到对应transactionId,或者Etcd连接失败
return null;
}
}
}
// 示例:订单服务
class OrderService
{
public static function tryCreateOrder(): bool
{
// 尝试创建订单,预留资源
echo "Trying to create order...n";
// ... 实际的 Try 逻辑
return true; // 假设成功
}
public static function confirmCreateOrder(): void
{
// 确认创建订单,实际使用资源
echo "Confirming create order...n";
// ... 实际的 Confirm 逻辑
}
public static function cancelCreateOrder(): void
{
// 取消创建订单,释放资源
echo "Canceling create order...n";
// ... 实际的 Cancel 逻辑
}
}
// 示例:库存服务
class InventoryService
{
public static function tryDecreaseStock(): bool
{
// 尝试扣减库存,预留资源
echo "Trying to decrease stock...n";
// ... 实际的 Try 逻辑
return true; // 假设成功
}
public static function confirmDecreaseStock(): void
{
// 确认扣减库存,实际使用资源
echo "Confirming decrease stock...n";
// ... 实际的 Confirm 逻辑
}
public static function cancelDecreaseStock(): void
{
// 取消扣减库存,释放资源
echo "Canceling decrease stock...n";
// ... 实际的 Cancel 逻辑
}
}
// 示例:支付服务
class PaymentService
{
public static function tryProcessPayment(): bool
{
// 尝试处理支付,预留资源
echo "Trying to process payment...n";
// ... 实际的 Try 逻辑
return false; // 假设失败
}
public static function confirmProcessPayment(): void
{
// 确认处理支付,实际使用资源
echo "Confirming process payment...n";
// ... 实际的 Confirm 逻辑
}
public static function cancelProcessPayment(): void
{
// 取消处理支付,释放资源
echo "Canceling process payment...n";
// ... 实际的 Cancel 逻辑
}
}
// 使用示例
$etcd = new Client([
'host' => '127.0.0.1',
'port' => 2379,
]);
$transactionId = uniqid(); // 生成唯一的 Transaction ID
$coordinator = new TccCoordinator($etcd, $transactionId);
// 注册参与者
$coordinator->registerParticipant('order_service', [OrderService::class, 'tryCreateOrder'], [OrderService::class, 'confirmCreateOrder'], [OrderService::class, 'cancelCreateOrder']);
$coordinator->registerParticipant('inventory_service', [InventoryService::class, 'tryDecreaseStock'], [InventoryService::class, 'confirmDecreaseStock'], [InventoryService::class, 'cancelDecreaseStock']);
$coordinator->registerParticipant('payment_service', [PaymentService::class, 'tryProcessPayment'], [PaymentService::class, 'confirmProcessPayment'], [PaymentService::class, 'cancelProcessPayment']);
// 启动事务
$coordinator->start();
echo "Transaction status: " . $coordinator->getStatus() . "n";
// 模拟从Etcd恢复TCC状态
$loadedCoordinator = TccCoordinator::loadFromEtcd($etcd, $transactionId);
if ($loadedCoordinator) {
echo "Loaded Transaction status: " . $loadedCoordinator->getStatus() . "n";
}
- 注册参与者: 将涉及的每个服务注册到TCC协调器,并提供Try、Confirm和Cancel方法的实现。
- 执行Try阶段: TCC协调器依次调用每个服务的Try方法,如果所有服务的Try方法都成功,则进入Confirm阶段,否则进入Cancel阶段。
- 执行Confirm阶段: TCC协调器依次调用每个服务的Confirm方法,完成事务。
- 执行Cancel阶段: TCC协调器依次调用每个服务的Cancel方法,回滚事务。
- 状态持久化: 使用Etcd来保存事务的状态,包括参与者状态和事务状态。
5. 模式选择的考量
选择Saga还是TCC,需要根据实际业务场景进行权衡。
| 特性 | Saga | TCC |
|---|---|---|
| 一致性级别 | 最终一致性 | 强一致性(相对) |
| 实现复杂度 | 相对简单 | 相对复杂,需要预留资源 |
| 性能 | 较高,无需锁定资源 | 较低,需要锁定资源 |
| 适用场景 | 对一致性要求不高,允许最终一致性的场景 | 对一致性要求较高,需要保证强一致性的场景 |
| 补偿机制 | 补偿操作可能比较复杂,需要考虑幂等性 | Cancel操作相对简单,但需要保证资源预留 |
6. 进一步的思考
- 幂等性: 无论是Saga还是TCC,都需要保证操作的幂等性,即多次执行相同的操作,结果应该是一致的。
- 重试机制: 在分布式环境中,服务可能会出现故障,因此需要实现重试机制来保证事务的最终成功。
- 监控和告警: 需要对分布式事务进行监控和告警,以便及时发现和处理问题。
- 事务日志: 记录事务的执行过程,方便进行故障排查和数据恢复。
7. 总结:分布式事务并非银弹,谨慎选择
分布式事务的实现,无论是Saga还是TCC,都带来了额外的复杂性。选择哪种方案,需要结合业务场景,权衡一致性、性能和实现复杂度。在选择方案时,务必充分考虑各种因素,并进行充分的测试,以确保系统的稳定性和可靠性。
感谢大家的聆听!希望今天的讲座对大家有所帮助。