引言:分布式事务的挑战
在现代软件架构中,微服务已成为构建复杂、可伸缩应用的基石。它将一个庞大的单体应用拆解成一系列小型、自治的服务,每个服务专注于特定的业务功能,并独立部署、运行和扩展。这种架构模式带来了诸多好处,例如技术栈的灵活性、团队的自治性、故障隔离以及更快的开发迭代速度。然而,微服务的分布式特性也引入了一个核心挑战:分布式事务。
在单体应用中,我们习惯于使用数据库提供的ACID(原子性、一致性、隔离性、持久性)事务来保证数据操作的完整性。一个操作要么全部成功,要么全部失败,回滚到初始状态。但在微服务世界里,一个完整的业务操作可能需要跨越多个服务,每个服务拥有自己的独立数据库。此时,传统的单数据库ACID事务无法直接跨越服务边界,导致我们面临一个核心问题:如何确保这些跨服务的操作要么全部成功,要么全部失败,即使在部分服务失败或网络中断的情况下?
这就是分布式事务需要解决的问题。它要求在分布式系统中协调多个独立资源(通常是数据库)上的操作,以实现一个全局的原子性操作。然而,强一致性(Consistency)和高可用性(Availability)在分布式系统中往往是相互冲突的,这正是著名的CAP定理所揭示的物理折衷。在微服务架构中,我们常常需要为了高可用性和分区容错性而牺牲一定程度的即时一致性。
本文将深入探讨两种主要的分布式事务处理模式:两阶段提交 (Two-Phase Commit, 2PC) 和 Sagas 模式。我们将从原理、优缺点、Go语言实现的角度进行详细分析,并着重讨论在Go微服务架构中选择它们时所面临的物理折衷和实际考量。
两阶段提交 (Two-Phase Commit, 2PC)
两阶段提交(2PC)是一种旨在实现分布式系统原子性操作的协议,它试图在多个参与者之间提供ACID的原子性保证。其核心思想是通过一个协调者(Coordinator)来协调所有参与者(Participants)的事务。
协议原理:准备阶段 (Prepare Phase) 与提交阶段 (Commit Phase)
2PC协议分为两个主要阶段:
-
准备阶段 (Prepare Phase / Voting Phase)
- 协调者向所有参与者发送一个“准备提交”请求。
- 每个参与者接收到请求后,会执行所有事务操作,但暂不提交。它会将事务操作的日志写入磁盘(redo/undo logs),并锁定所需资源,确保能够提交或回滚。
- 如果参与者能够成功执行操作并准备好提交,它会向协调者发送“同意”消息。
- 如果参与者无法执行操作(例如,资源不足或死锁),它会向协调者发送“中止”消息。
- 如果协调者在指定时间内没有收到所有参与者的响应,它会认为该参与者“中止”。
-
提交阶段 (Commit Phase / Decision Phase)
- 协调者根据准备阶段收到的所有响应做出决定:
- 全部同意: 如果所有参与者都发送了“同意”消息,协调者将向所有参与者发送“提交”请求。参与者收到请求后,正式提交其本地事务,并释放锁定资源,然后向协调者发送“完成”消息。
- 任何中止: 如果有任何一个参与者发送了“中止”消息,或者协调者超时未收到某个参与者的响应,协调者将向所有参与者发送“回滚”请求。参与者收到请求后,回滚其本地事务,释放锁定资源,然后向协调者发送“完成”消息。
- 协调者收到所有参与者的“完成”消息后,事务才算最终完成。
- 协调者根据准备阶段收到的所有响应做出决定:
2PC的优势:强一致性,原子性
2PC协议的核心优势在于其能够提供强一致性和原子性。在一个成功的2PC事务中,所有参与者要么全部提交,要么全部回滚,保证了数据在分布式系统中的最终状态是一致的。这对于那些对数据完整性有极高要求的关键业务场景(例如银行转账)似乎是一个理想的选择。
2PC的劣势:与微服务哲学的冲突
尽管2PC提供了强大的ACID保证,但在现代分布式微服务架构中,它通常被视为一种反模式。其劣势非常显著:
-
阻塞性 (Blocking)
- 在准备阶段,参与者必须锁定它们所涉及的资源,直到事务最终提交或回滚。这意味着这些资源在整个2PC事务期间都不能被其他事务访问。如果协调者或某个参与者在关键时刻崩溃,这些资源可能会长时间保持锁定状态,导致系统性能急剧下降,甚至死锁。
-
单点故障 (Single Point of Failure)
- 协调者是2PC协议的关键。如果协调者在提交阶段之前发生故障,所有参与者将无法得知最终的事务决策(提交或回滚),它们将一直等待,持有锁定的资源,导致整个系统停滞。虽然可以通过协调者的故障恢复机制(如日志持久化和选举新的协调者)来缓解,但实现起来非常复杂且成本高昂。
-
性能瓶颈
- 2PC涉及多次网络往返(协调者与所有参与者之间的通信),这带来了显著的网络延迟。随着参与者数量的增加,通信开销会线性增长。同时,资源的长时间锁定也限制了并发性,导致系统吞吐量下降。
-
与微服务哲学的冲突 (紧耦合)
- 微服务架构倡导服务之间的松耦合和自治。然而,2PC要求所有参与者都必须遵循相同的协议,并且由一个中央协调者来控制。这意味着服务之间存在强烈的同步依赖,破坏了微服务的独立性原则。一个服务的失败可能会影响整个分布式事务,导致级联故障。
-
违反可用性原则 (Availability)
- CAP定理指出,在分布式系统中,一致性、可用性和分区容错性三者只能同时满足其二。2PC为了追求强一致性,在面对网络分区时,往往需要牺牲可用性。如果网络发生分区,导致协调者无法与部分参与者通信,整个事务就无法推进,系统将停止响应,从而降低了可用性。
Go语言视角下的实现考量
在Go语言中,如果我们要“实现”一个2PC(这里更多是指概念上的模拟,因为真实的跨数据库2PC协调器通常由数据库或专门的事务管理器提供,我们很少自己从头实现),我们可以考虑使用gRPC或标准库的net/rpc进行服务间的通信。
假设我们有一个订单服务、一个支付服务和一个库存服务,它们都充当2PC的参与者。
// event.go (定义消息或命令结构)
package common
type TransactionID string
type PrepareRequest struct {
TxID TransactionID
Payload []byte // 业务数据,例如订单详情、支付金额、库存扣减数量
ServiceKey string // 标识是哪个服务发送的请求
}
type PrepareResponse struct {
TxID TransactionID
ServiceKey string
Approved bool
Error string
}
type CommitRequest struct {
TxID TransactionID
ServiceKey string
Action string // "commit" or "rollback"
}
type CommitResponse struct {
TxID TransactionID
ServiceKey string
Success bool
Error string
}
参与者服务(以库存服务为例)的伪代码:
// inventory_participant.go
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"your_module/common" // 假设common包定义了上述结构
)
// InventoryService 模拟库存服务
type InventoryService struct {
// 假设有一个简单的库存存储
stock map[string]int
// 存储正在进行的事务状态
pendingTransactions map[common.TransactionID]struct {
ReservedItems map[string]int
Status string // "prepared", "committed", "rolledback"
}
mu sync.Mutex
}
func NewInventoryService() *InventoryService {
return &InventoryService{
stock: map[string]int{
"ProductA": 100,
"ProductB": 50,
},
pendingTransactions: make(map[common.TransactionID]struct {
ReservedItems map[string]int
Status string
}),
}
}
// HandlePrepare 模拟2PC准备阶段
func (s *InventoryService) HandlePrepare(ctx context.Context, req common.PrepareRequest) common.PrepareResponse {
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("Inventory Service: Received Prepare request for TxID %s", req.TxID)
// 模拟解析业务数据
var orderItems map[string]int // 假设req.Payload解析后是这个
// ... 解析 req.Payload ...
orderItems = map[string]int{"ProductA": 5} // 示例
// 检查库存并预留
reservedItems := make(map[string]int)
for item, quantity := range orderItems {
if s.stock[item] < quantity {
log.Printf("Inventory Service: Not enough stock for %s. Current: %d, Requested: %d", item, s.stock[item], quantity)
return common.PrepareResponse{TxID: req.TxID, ServiceKey: "inventory", Approved: false, Error: "not enough stock"}
}
reservedItems[item] = quantity
}
// 模拟数据库操作:写入预留日志,但不真正扣减
// s.db.Exec("INSERT INTO reservations ...")
// s.db.Exec("UPDATE inventory SET reserved = reserved + ? WHERE item = ?", quantity, item)
// 更新本地事务状态
s.pendingTransactions[req.TxID] = struct {
ReservedItems map[string]int
Status string
}{
ReservedItems: reservedItems,
Status: "prepared",
}
log.Printf("Inventory Service: Prepared TxID %s, reserved %v", req.TxID, reservedItems)
return common.PrepareResponse{TxID: req.TxID, ServiceKey: "inventory", Approved: true}
}
// HandleCommit 模拟2PC提交阶段
func (s *InventoryService) HandleCommit(ctx context.Context, req common.CommitRequest) common.CommitResponse {
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("Inventory Service: Received Commit request for TxID %s, action: %s", req.TxID, req.Action)
txState, exists := s.pendingTransactions[req.TxID]
if !exists || txState.Status != "prepared" {
log.Printf("Inventory Service: TxID %s not in prepared state or doesn't exist.", req.TxID)
return common.CommitResponse{TxID: req.TxID, ServiceKey: "inventory", Success: false, Error: "invalid transaction state"}
}
if req.Action == "commit" {
for item, quantity := range txState.ReservedItems {
s.stock[item] -= quantity // 真正扣减库存
}
txState.Status = "committed"
s.pendingTransactions[req.TxID] = txState
log.Printf("Inventory Service: Committed TxID %s. New stock: %v", req.TxID, s.stock)
return common.CommitResponse{TxID: req.TxID, ServiceKey: "inventory", Success: true}
} else if req.Action == "rollback" {
// 模拟回滚操作:释放预留的库存
// s.db.Exec("DELETE FROM reservations WHERE tx_id = ?", req.TxID)
// s.db.Exec("UPDATE inventory SET reserved = reserved - ? WHERE item = ?", quantity, item)
txState.Status = "rolledback"
s.pendingTransactions[req.TxID] = txState
log.Printf("Inventory Service: Rolled back TxID %s", req.TxID)
return common.CommitResponse{TxID: req.TxID, ServiceKey: "inventory", Success: true}
}
return common.CommitResponse{TxID: req.TxID, ServiceKey: "inventory", Success: false, Error: "unknown action"}
}
// 模拟RPC服务器
func main() {
inventoryService := NewInventoryService()
// 实际应用中会启动一个gRPC或HTTP服务器来暴露这些方法
log.Println("Inventory Service started, waiting for requests...")
// 模拟一个2PC流程(通常由协调者发起)
txID := common.TransactionID("tx-123")
log.Printf("--- Simulating a successful 2PC for TxID %s ---", txID)
// 1. 准备阶段
prepareReq := common.PrepareRequest{
TxID: txID,
ServiceKey: "order", // 假设是订单服务发起的
Payload: []byte(`{"ProductA": 5}`),
}
prepResp := inventoryService.HandlePrepare(context.Background(), prepareReq)
fmt.Printf("Prepare Response: %+vn", prepResp)
if prepResp.Approved {
// 2. 提交阶段
commitReq := common.CommitRequest{
TxID: txID,
ServiceKey: "coordinator",
Action: "commit",
}
commitResp := inventoryService.HandleCommit(context.Background(), commitReq)
fmt.Printf("Commit Response: %+vn", commitResp)
} else {
// 如果准备失败,无需提交,但如果其他服务准备成功,协调者会发起回滚
log.Printf("Prepare failed for TxID %s, no commit needed from this service.", txID)
}
// 模拟一个失败的2PC流程
txID2 := common.TransactionID("tx-456")
log.Printf("n--- Simulating a failed 2PC (due to insufficient stock) for TxID %s ---", txID2)
prepareReq2 := common.PrepareRequest{
TxID: txID2,
ServiceKey: "order",
Payload: []byte(`{"ProductA": 200}`), // 请求200,但只有95(100-5)
}
prepResp2 := inventoryService.HandlePrepare(context.Background(), prepareReq2)
fmt.Printf("Prepare Response: %+vn", prepResp2)
if !prepResp2.Approved {
log.Printf("Prepare failed as expected for TxID %s. Coordinator would initiate rollback for other participants if any succeeded.", txID2)
// 协调者会向所有参与者发送回滚请求
rollbackReq := common.CommitRequest{
TxID: txID2,
ServiceKey: "coordinator",
Action: "rollback",
}
rollbackResp := inventoryService.HandleCommit(context.Background(), rollbackReq)
fmt.Printf("Rollback Response (even though prepare failed here, showing logic): %+vn", rollbackResp)
}
// 保持服务运行,以便观察日志
time.Sleep(5 * time.Second)
}
协调者服务 (Coordinator) 的伪代码思路:
协调者会维护一个事务的状态机,并调用各个参与者的RPC方法。
// coordinator.go (概念性伪代码)
package main
import (
"context"
"fmt"
"log"
"time"
"your_module/common"
// 假设有gRPC客户端 for InventoryService, PaymentService, OrderService
)
type ParticipantClient interface {
Prepare(ctx context.Context, req common.PrepareRequest) (common.PrepareResponse, error)
Commit(ctx context.Context, req common.CommitRequest) (common.CommitResponse, error)
}
type Coordinator struct {
participants map[string]ParticipantClient // 映射服务名到其客户端
txStates map[common.TransactionID]string // 存储事务状态:"pending", "prepared", "committed", "rolledback"
}
func NewCoordinator(clients map[string]ParticipantClient) *Coordinator {
return &Coordinator{
participants: clients,
txStates: make(map[common.TransactionID]string),
}
}
func (c *Coordinator) StartDistributedTransaction(ctx context.Context, txID common.TransactionID, payload map[string][]byte) error {
c.txStates[txID] = "pending"
log.Printf("Coordinator: Starting distributed transaction %s", txID)
// Phase 1: Prepare
prepareResponses := make(chan common.PrepareResponse, len(c.participants))
errs := make(chan error, len(c.participants))
var allApproved bool = true
for serviceKey, client := range c.participants {
go func(sk string, cl ParticipantClient) {
req := common.PrepareRequest{
TxID: txID,
ServiceKey: sk,
Payload: payload[sk], // 每个服务可能需要不同的payload
}
resp, err := cl.Prepare(ctx, req)
if err != nil {
errs <- fmt.Errorf("participant %s prepare failed: %w", sk, err)
return
}
prepareResponses <- resp
}(serviceKey, client)
}
// Collect responses with a timeout
prepareCount := 0
for {
select {
case resp := <-prepareResponses:
prepareCount++
if !resp.Approved {
allApproved = false
log.Printf("Coordinator: Participant %s rejected prepare for TxID %s: %s", resp.ServiceKey, txID, resp.Error)
}
case err := <-errs:
prepareCount++
allApproved = false
log.Printf("Coordinator: Participant prepare error for TxID %s: %v", txID, err)
case <-ctx.Done(): // Context timeout or cancellation
allApproved = false
log.Printf("Coordinator: Prepare phase timed out for TxID %s", txID)
default:
if prepareCount == len(c.participants) {
goto END_PREPARE_PHASE
}
time.Sleep(10 * time.Millisecond) // Small delay to avoid busy-waiting
}
}
END_PREPARE_PHASE:
// Phase 2: Commit or Rollback
action := "rollback"
if allApproved {
action = "commit"
c.txStates[txID] = "prepared" // 实际中,这里应该先持久化决定
} else {
c.txStates[txID] = "rolledback" // 实际中,这里应该先持久化决定
}
log.Printf("Coordinator: Decision for TxID %s is to %s", txID, action)
commitResponses := make(chan common.CommitResponse, len(c.participants))
commitErrs := make(chan error, len(c.participants))
for serviceKey, client := range c.participants {
go func(sk string, cl ParticipantClient) {
req := common.CommitRequest{
TxID: txID,
ServiceKey: sk,
Action: action,
}
resp, err := cl.Commit(ctx, req)
if err != nil {
commitErrs <- fmt.Errorf("participant %s commit/rollback failed: %w", sk, err)
return
}
commitResponses <- resp
}(serviceKey, client)
}
// Collect commit/rollback responses
commitCount := 0
for {
select {
case resp := <-commitResponses:
commitCount++
if !resp.Success {
log.Printf("Coordinator: Participant %s failed to %s for TxID %s: %s", resp.ServiceKey, action, txID, resp.Error)
// 实际中这里需要更复杂的恢复机制
}
case err := <-commitErrs:
commitCount++
log.Printf("Coordinator: Participant commit/rollback error for TxID %s: %v", txID, err)
// 实际中这里需要更复杂的恢复机制
case <-ctx.Done():
log.Printf("Coordinator: Commit phase timed out for TxID %s. Potential inconsistency!", txID)
return fmt.Errorf("commit phase timed out, transaction %s might be inconsistent", txID)
default:
if commitCount == len(c.participants) {
goto END_COMMIT_PHASE
}
time.Sleep(10 * time.Millisecond)
}
}
END_COMMIT_PHASE:
if allApproved && action == "commit" {
c.txStates[txID] = "committed"
log.Printf("Coordinator: Transaction %s successfully committed.", txID)
return nil
} else {
c.txStates[txID] = "rolledback"
log.Printf("Coordinator: Transaction %s successfully rolled back (or failed to commit).", txID)
return fmt.Errorf("transaction %s failed to commit or was rolled back", txID)
}
}
func main() {
// 模拟创建参与者客户端
inventoryClient := &MockParticipantClient{Service: NewInventoryService()}
// paymentClient := &MockParticipantClient{Service: NewPaymentService()}
// orderClient := &MockParticipantClient{Service: NewOrderService()}
participants := map[string]ParticipantClient{
"inventory": inventoryClient,
// "payment": paymentClient,
// "order": orderClient,
}
coordinator := NewCoordinator(participants)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 模拟一次成功事务
txID1 := common.TransactionID("global-tx-001")
payload1 := map[string][]byte{
"inventory": []byte(`{"ProductA": 5}`),
// "payment": []byte(`{"amount": 100}`),
// "order": []byte(`{"order_id": "ORD-001"}`),
}
err := coordinator.StartDistributedTransaction(ctx, txID1, payload1)
if err != nil {
fmt.Printf("Transaction %s failed: %vn", txID1, err)
} else {
fmt.Printf("Transaction %s completed successfully.n", txID1)
}
fmt.Printf("Final state of %s: %sn", txID1, coordinator.txStates[txID1])
fmt.Println("n---")
// 模拟一次失败事务 (库存不足)
txID2 := common.TransactionID("global-tx-002")
payload2 := map[string][]byte{
"inventory": []byte(`{"ProductA": 200}`), // 假设会失败
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
err = coordinator.StartDistributedTransaction(ctx2, txID2, payload2)
if err != nil {
fmt.Printf("Transaction %s failed: %vn", txID2, err)
} else {
fmt.Printf("Transaction %s completed successfully.n", txID2)
}
fmt.Printf("Final state of %s: %sn", txID2, coordinator.txStates[txID2])
}
// MockParticipantClient 用于模拟调用实际的InventoryService方法
type MockParticipantClient struct {
Service *InventoryService // 这里只是模拟,实际会是gRPC客户端
}
func (m *MockParticipantClient) Prepare(ctx context.Context, req common.PrepareRequest) (common.PrepareResponse, error) {
return m.Service.HandlePrepare(ctx, req), nil
}
func (m *MockParticipantClient) Commit(ctx context.Context, req common.CommitResponse) (common.CommitResponse, error) {
return m.Service.HandleCommit(ctx, req), nil
}
上述Go语言代码只是概念性的伪代码,用于说明2PC的通信流程和状态管理。在实际生产环境中,由于2PC的固有缺陷,它在跨服务分布式事务场景中几乎不被采用。它更多地存在于单个数据库系统内部的事务管理,或者XA(eXtended Architecture)事务管理器这类专门的中间件中,但即便如此,XA事务也因其性能和可用性问题而鲜少用于现代微服务。
为什么在微服务中很少使用2PC?
总结来说,2PC的同步阻塞特性、单点故障问题以及对性能和可用性的严重影响,与微服务架构追求的弹性、松耦合和高可用性理念格格不入。它将本应独立的微服务紧密地绑定在一起,形成一个巨大的分布式单体,这正是微服务架构试图避免的。因此,我们需要寻找一种更符合微服务特性的分布式事务解决方案,即Sagas模式。
Sagas 模式
Sagas 模式是一种处理分布式事务的策略,它通过一系列本地事务和相应的补偿事务来确保整个业务操作的最终一致性。与2PC追求全局强一致性不同,Sagas模式接受了最终一致性 (Eventual Consistency) 的理念,从而在可用性和分区容错性上取得了优势。
Sagas模式的诞生与核心思想
Sagas模式最初由Hector Garcia-Molina和Kenneth Salem于1987年提出,用于处理长事务。其核心思想是:一个分布式事务被分解成一系列本地事务 (Local Transactions)。每个本地事务都由一个微服务完成,并更新其本地数据库。如果所有本地事务都成功,则整个Saga事务成功。如果某个本地事务失败,Saga会执行一系列补偿事务 (Compensating Transactions) 来撤销之前已成功的本地事务的影响,从而使系统回到一个一致的状态(尽管不是初始状态,而是业务上可接受的补偿状态)。
Sagas的优势:高可用性,松耦合,可伸缩性
Sagas模式的优势使其成为现代微服务架构中处理分布式事务的首选:
- 高可用性 (Availability)
- Saga中的每个本地事务都是独立的,服务可以独立提交其本地事务并释放资源。即使某个服务暂时失败,其他服务也可以继续处理其本地事务,这大大提高了系统的可用性。
- 非阻塞性
- 服务在完成本地事务后立即释放资源,避免了2PC中资源长时间锁定的问题。这提高了系统的并发处理能力和吞吐量。
- 松耦合
- 服务之间通过异步消息或事件进行通信,它们不需要知道彼此的内部实现细节,只需要约定事件或命令的格式。这种松耦合使得服务可以独立开发、部署和扩展。
- 可伸缩性
- 由于服务的独立性和异步通信,Saga模式能够更好地支持系统的水平扩展。
- 适用于现代微服务架构
- 与微服务倡导的自治、弹性、容错等原则高度契合。
Sagas的劣势:最终一致性,复杂性
Sagas模式并非没有缺点,它引入了新的挑战:
- 最终一致性 (Eventual Consistency)
- 这是Saga模式固有的特性。在整个Saga事务完成之前,系统可能会处于一种中间状态,即部分本地事务已提交,而其他事务尚未完成或已失败并等待补偿。这可能导致“读到旧数据”或“脏读”问题,需要前端应用或后续业务流程能够容忍这种短暂的不一致性。例如,用户下单后,库存已扣减但支付尚未成功,此时用户查询订单状态可能会看到“待支付”或“处理中”。
- 复杂性:补偿事务的设计、幂等性、超时处理
- 补偿事务 (Compensating Transactions):设计补偿事务比设计正向事务更复杂。每个服务需要知道如何“撤销”其已提交的本地事务,并且补偿事务本身也必须是幂等的,以防重复执行。
- 幂等性 (Idempotency):由于消息传递的异步性和“至少一次”的交付保证,服务可能会收到重复的消息。因此,每个处理请求的逻辑都必须是幂等的,即多次执行相同操作产生的结果与执行一次相同。
- 超时处理:Saga事务可能会持续较长时间。需要有机制来检测和处理超时,并触发补偿流程。
- 可观测性:追踪Saga事务的整个流程和状态,特别是在发生故障时,需要强大的分布式追踪和日志聚合能力。
- 原子性缺失 (业务层面一致性)
- Sagas不提供ACID意义上的原子性。它提供的是业务层面的原子性,即要么完成业务流程,要么通过补偿回到业务上一致的状态。这种“一致状态”可能不是事务开始前的原始状态,而是通过一系列补偿操作达到的新状态。
Sagas的两种实现方式
Sagas模式主要有两种实现方式:编排式 Saga (Choreography-based Saga) 和 协调器式 Saga (Orchestration-based Saga)。
编排式 Saga (Choreography-based Saga)
- 原理: 服务通过事件发布/订阅机制相互协调。每个服务在完成其本地事务后,会发布一个事件,通知其他相关的服务。其他服务订阅这些事件,并根据事件触发自己的本地事务。这种方式没有中央协调器,而是由服务之间的事件流来驱动整个Saga。
- Go语言实现示例 (基于消息队列如Kafka)
我们以一个简单的“订单创建”Saga为例:
- 订单服务:创建订单,发布
OrderCreatedEvent。 - 库存服务:订阅
OrderCreatedEvent,扣减库存,发布InventoryReservedEvent或InventoryReservationFailedEvent。 - 支付服务:订阅
InventoryReservedEvent,处理支付,发布PaymentProcessedEvent或PaymentFailedEvent。 - 订单服务:订阅
PaymentProcessedEvent或PaymentFailedEvent,更新订单状态。如果支付失败,可能还会发布OrderPaymentFailedEvent。 - 库存服务:订阅
OrderPaymentFailedEvent,回滚库存预留。
事件定义 (common/events.go):
package common
import "time"
type OrderCreatedEvent struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Items []OrderItem `json:"items"`
TotalPrice float64 `json:"total_price"`
Timestamp time.Time `json:"timestamp"`
}
type OrderItem struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
type InventoryReservedEvent struct {
OrderID string `json:"order_id"`
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Timestamp time.Time `json:"timestamp"`
}
type InventoryReservationFailedEvent struct {
OrderID string `json:"order_id"`
ProductID string `json:"product_id"`
Reason string `json:"reason"`
Timestamp time.Time `json:"timestamp"`
}
type PaymentProcessedEvent struct {
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
Status string `json:"status"` // "success", "failed"
Timestamp time.Time `json:"timestamp"`
}
type OrderPaymentFailedEvent struct {
OrderID string `json:"order_id"`
Reason string `json:"reason"`
Timestamp time.Time `json:"timestamp"`
}
// 定义一个通用的消息接口,以便消息队列消费者处理
type Message interface {
GetEventType() string
GetOrderID() string
}
func (e OrderCreatedEvent) GetEventType() string { return "OrderCreated" }
func (e OrderCreatedEvent) GetOrderID() string { return e.OrderID }
func (e InventoryReservedEvent) GetEventType() string { return "InventoryReserved" }
func (e InventoryReservedEvent) GetOrderID() string { return e.OrderID }
func (e InventoryReservationFailedEvent) GetEventType() string { return "InventoryReservationFailed" }
func (e InventoryReservationFailedEvent) GetOrderID() string { return e.OrderID }
func (e PaymentProcessedEvent) GetEventType() string { return "PaymentProcessed" }
func (e PaymentProcessedEvent) GetOrderID() string { return e.OrderID }
func (e OrderPaymentFailedEvent) GetEventType() string { return "OrderPaymentFailed" }
func (e OrderPaymentFailedEvent) GetOrderID() string { return e.OrderID }
订单服务 (order_service.go):
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"time"
"github.com/segmentio/kafka-go"
"your_module/common"
)
type OrderService struct {
producer *kafka.Writer
consumer *kafka.Reader
}
func NewOrderService(kafkaBroker string) *OrderService {
producer := &kafka.Writer{
Addr: kafka.TCP(kafkaBroker),
Topic: "order-events",
Balancer: &kafka.LeastBytes{},
}
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBroker},
Topic: "payment-events", // 订阅支付事件
GroupID: "order-service-group",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
MaxWait: 1 * time.Second,
})
return &OrderService{producer: producer, consumer: consumer}
}
func (s *OrderService) CreateOrder(ctx context.Context, customerID string, items []common.OrderItem) (string, error) {
orderID := fmt.Sprintf("ORD-%d", rand.Intn(100000))
totalPrice := 0.0
for _, item := range items {
totalPrice += item.Price * float64(item.Quantity)
}
event := common.OrderCreatedEvent{
OrderID: orderID,
CustomerID: customerID,
Items: items,
TotalPrice: totalPrice,
Timestamp: time.Now(),
}
payload, err := json.Marshal(event)
if err != nil {
return "", fmt.Errorf("failed to marshal OrderCreatedEvent: %w", err)
}
msg := kafka.Message{
Key: []byte(orderID),
Value: payload,
Headers: []kafka.Header{
{Key: "EventType", Value: []byte(event.GetEventType())},
},
}
err = s.producer.WriteMessages(ctx, msg)
if err != nil {
return "", fmt.Errorf("failed to publish OrderCreatedEvent: %w", err)
}
log.Printf("OrderService: Published OrderCreatedEvent for OrderID %s", orderID)
// 实际中,这里会先在数据库中创建订单,状态为“待处理”或“创建中”
return orderID, nil
}
func (s *OrderService) StartEventConsumer(ctx context.Context) {
log.Println("OrderService: Starting payment events consumer...")
for {
select {
case <-ctx.Done():
log.Println("OrderService: Stopping event consumer.")
s.consumer.Close()
return
default:
m, err := s.consumer.ReadMessage(ctx)
if err != nil {
log.Printf("OrderService: Error reading message: %v", err)
continue
}
eventType := string(m.Headers[0].Value) // 假设EventType在第一个Header
log.Printf("OrderService: Received event %s for OrderID %s", eventType, string(m.Key))
switch eventType {
case "PaymentProcessed":
var event common.PaymentProcessedEvent
if err := json.Unmarshal(m.Value, &event); err != nil {
log.Printf("OrderService: Failed to unmarshal PaymentProcessedEvent: %v", err)
continue
}
if event.Status == "success" {
log.Printf("OrderService: Order %s payment successful. Updating order status to 'Paid'.", event.OrderID)
// 实际中,更新数据库中的订单状态
} else {
log.Printf("OrderService: Order %s payment failed. Updating order status to 'PaymentFailed'.", event.OrderID)
// 实际中,更新数据库中的订单状态,并可能触发补偿或通知用户
s.publishOrderPaymentFailed(ctx, event.OrderID, event.Status)
}
case "InventoryReservationFailed": // 如果库存服务失败,也要更新订单状态
var event common.InventoryReservationFailedEvent
if err := json.Unmarshal(m.Value, &event); err != nil {
log.Printf("OrderService: Failed to unmarshal InventoryReservationFailedEvent: %v", err)
continue
}
log.Printf("OrderService: Order %s inventory reservation failed. Updating order status to 'InventoryFailed'.", event.OrderID)
// 实际中,更新数据库中的订单状态,并可能触发补偿或通知用户
default:
log.Printf("OrderService: Unknown event type %s", eventType)
}
}
}
}
func (s *OrderService) publishOrderPaymentFailed(ctx context.Context, orderID, reason string) {
event := common.OrderPaymentFailedEvent{
OrderID: orderID,
Reason: reason,
Timestamp: time.Now(),
}
payload, err := json.Marshal(event)
if err != nil {
log.Printf("OrderService: Failed to marshal OrderPaymentFailedEvent: %v", err)
return
}
msg := kafka.Message{
Key: []byte(orderID),
Value: payload,
Headers: []kafka.Header{
{Key: "EventType", Value: []byte(event.GetEventType())},
},
}
err = s.producer.WriteMessages(ctx, msg)
if err != nil {
log.Printf("OrderService: Failed to publish OrderPaymentFailedEvent: %v", err)
} else {
log.Printf("OrderService: Published OrderPaymentFailedEvent for OrderID %s", orderID)
}
}
func main() {
kafkaBroker := "localhost:9092" // 假设Kafka运行在本地
orderService := NewOrderService(kafkaBroker)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go orderService.StartEventConsumer(ctx)
// 模拟创建订单
_, err := orderService.CreateOrder(ctx, "customer-123", []common.OrderItem{
{ProductID: "P1", Quantity: 2, Price: 10.0},
{ProductID: "P2", Quantity: 1, Price: 25.0},
})
if err != nil {
log.Fatalf("Failed to create order: %v", err)
}
time.Sleep(10 * time.Second) // 运行一段时间观察事件流
}
库存服务 (inventory_service.go):
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
"your_module/common"
)
type InventoryService struct {
producer *kafka.Writer
consumer *kafka.Reader
stock map[string]int
mu sync.Mutex
}
func NewInventoryService(kafkaBroker string) *InventoryService {
producer := &kafka.Writer{
Addr: kafka.TCP(kafkaBroker),
Topic: "inventory-events",
Balancer: &kafka.LeastBytes{},
}
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBroker},
Topic: "order-events", // 订阅订单创建事件
GroupID: "inventory-service-group",
MinBytes: 10e3,
MaxBytes: 10e6,
MaxWait: 1 * time.Second,
})
return &InventoryService{
producer: producer,
consumer: consumer,
stock: map[string]int{
"P1": 100,
"P2": 50,
},
}
}
func (s *InventoryService) StartEventConsumer(ctx context.Context) {
log.Println("InventoryService: Starting order events consumer...")
for {
select {
case <-ctx.Done():
log.Println("InventoryService: Stopping event consumer.")
s.consumer.Close()
return
default:
m, err := s.consumer.ReadMessage(ctx)
if err != nil {
log.Printf("InventoryService: Error reading message: %v", err)
continue
}
eventType := string(m.Headers[0].Value)
log.Printf("InventoryService: Received event %s for OrderID %s", eventType, string(m.Key))
switch eventType {
case "OrderCreated":
var event common.OrderCreatedEvent
if err := json.Unmarshal(m.Value, &event); err != nil {
log.Printf("InventoryService: Failed to unmarshal OrderCreatedEvent: %v", err)
continue
}
s.processOrderCreated(ctx, event)
case "OrderPaymentFailed": // 补偿事务:支付失败时回滚库存
var event common.OrderPaymentFailedEvent
if err := json.Unmarshal(m.Value, &event); err != nil {
log.Printf("InventoryService: Failed to unmarshal OrderPaymentFailedEvent: %v", err)
continue
}
s.compensateInventoryReservation(ctx, event.OrderID)
default:
log.Printf("InventoryService: Unknown event type %s", eventType)
}
}
}
}
func (s *InventoryService) processOrderCreated(ctx context.Context, orderEvent common.OrderCreatedEvent) {
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("InventoryService: Processing OrderCreatedEvent for OrderID %s", orderEvent.OrderID)
// 检查库存并预留
for _, item := range orderEvent.Items {
if s.stock[item.ProductID] < item.Quantity {
log.Printf("InventoryService: Not enough stock for %s. Current: %d, Requested: %d", item.ProductID, s.stock[item.ProductID], item.Quantity)
s.publishInventoryReservationFailed(ctx, orderEvent.OrderID, item.ProductID, "not enough stock")
return // 任何一个商品库存不足,则整个订单的库存预留失败
}
}
for _, item := range orderEvent.Items {
s.stock[item.ProductID] -= item.Quantity // 扣减库存
s.publishInventoryReserved(ctx, orderEvent.OrderID, item.ProductID, item.Quantity)
}
log.Printf("InventoryService: Inventory reserved for OrderID %s. Current stock: %v", orderEvent.OrderID, s.stock)
// 实际中,这里会持久化库存扣减或预留记录,并确保幂等性
}
func (s *InventoryService) compensateInventoryReservation(ctx context.Context, orderID string) {
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("InventoryService: Compensating inventory for OrderID %s due to payment failure.", orderID)
// 实际中,这里会根据orderID查询之前预留的库存记录,然后加回到库存中
// 简单示例:假设订单P1扣了2个,P2扣了1个
s.stock["P1"] += 2
s.stock["P2"] += 1
log.Printf("InventoryService: Inventory rolled back for OrderID %s. Current stock: %v", orderID, s.stock)
}
func (s *InventoryService) publishInventoryReserved(ctx context.Context, orderID, productID string, quantity int) {
event := common.InventoryReservedEvent{
OrderID: orderID,
ProductID: productID,
Quantity: quantity,
Timestamp: time.Now(),
}
payload, err := json.Marshal(event)
if err != nil {
log.Printf("InventoryService: Failed to marshal InventoryReservedEvent: %v", err)
return
}
msg := kafka.Message{
Key: []byte(orderID),
Value: payload,
Headers: []kafka.Header{
{Key: "EventType", Value: []byte(event.GetEventType())},
},
}
err = s.producer.WriteMessages(ctx, msg)
if err != nil {
log.Printf("InventoryService: Failed to publish InventoryReservedEvent: %v", err)
} else {
log.Printf("InventoryService: Published InventoryReservedEvent for OrderID %s, Product %s", orderID, productID)
}
}
func (s *InventoryService) publishInventoryReservationFailed(ctx context.Context, orderID, productID, reason string) {
event := common.InventoryReservationFailedEvent{
OrderID: orderID,
ProductID: productID,
Reason: reason,
Timestamp: time.Now(),
}
payload, err := json.Marshal(event)
if err != nil {
log.Printf("InventoryService: Failed to marshal InventoryReservationFailedEvent: %v", err)
return
}
msg := kafka.Message{
Key: []byte(orderID),
Value: payload,
Headers: []kafka.Header{
{Key: "EventType", Value: []byte(event.GetEventType())},
},
}
err = s.producer.WriteMessages(ctx, msg)
if err != nil {
log.Printf("InventoryService: Failed to publish InventoryReservationFailedEvent: %v", err)
} else {
log.Printf("InventoryService: Published InventoryReservationFailedEvent for OrderID %s, Product %s", orderID, productID)
}
}
func main() {
kafkaBroker := "localhost:9092"
inventoryService := NewInventoryService(kafkaBroker)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go inventoryService.StartEventConsumer(ctx)
time.Sleep(15 * time.Second) // 运行一段时间观察事件流
}
优缺点分析 (编排式):
- 优点:
- 高度解耦: 服务之间完全通过事件通信,没有中央协调器,降低了单点故障风险。
- 简单性 (对于小型Saga): 对于只有少数步骤且业务逻辑不复杂的Saga,编排式实现相对简单直观。
- 缺点:
- 流程难以追踪: 随着Saga步骤的增加和参与者服务的增多,事件流会变得复杂,难以直观地理解整个业务流程的状态和走向。
- 服务职责不清晰: 每个服务都需要知道它应该在什么事件之后做什么,以及在什么事件之后进行补偿。这使得服务逻辑变得耦合,并且难以修改。
- 循环依赖风险: 不恰当的事件设计可能导致服务之间出现循环依赖。
- 补偿逻辑分散: 补偿逻辑散布在各个服务中,难以统一管理和调试。
协调器式 Saga (Orchestration-based Saga)
- 原理: 引入一个专门的Saga协调器(Orchestrator)服务,它负责管理整个分布式事务的流程。协调器接收初始请求,然后通过发送命令(Command)给参与者服务,并等待它们的回复(Reply)来驱动Saga的执行。协调器内部维护一个状态机来追踪Saga的当前状态,并在需要时触发补偿事务。
- Go语言实现示例 (Saga Orchestrator)
继续以上面的订单创建Saga为例。现在我们将引入一个 OrderSagaOrchestrator 服务。
命令与回复定义 (common/commands.go):
package common
import "time"
// Commands from Orchestrator to Participants
type CreateOrderCommand struct {
TxID TransactionID `json:"tx_id"`
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Items []OrderItem `json:"items"`
TotalPrice float64 `json:"total_price"`
}
type ReserveInventoryCommand struct {
TxID TransactionID `json:"tx_id"`
OrderID string `json:"order_id"`
Items []OrderItem `json:"items"`
}
type ProcessPaymentCommand struct {
TxID TransactionID `json:"tx_id"`
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
CustomerID string `json:"customer_id"`
}
type UpdateOrderStatusCommand struct { // 用于订单服务更新自身状态
TxID TransactionID `json:"tx_id"`
OrderID string `json:"order_id"`
Status string `json:"status"` // e.g., "PENDING", "COMPLETED", "FAILED"
}
// Compensating Commands
type CompensateInventoryCommand struct {
TxID TransactionID `json:"tx_id"`
OrderID string `json:"order_id"`
Items []OrderItem `json:"items"` // 需要知道具体回滚哪个订单的哪些商品
}
type RefundPaymentCommand struct { // 如果支付成功但后续失败
TxID TransactionID `json:"tx_id"`
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
}
// Replies from Participants to Orchestrator
type CommandReply struct {
TxID TransactionID `json:"tx_id"`
OrderID string `json:"order_id"`
Success bool `json:"success"`
Message string `json:"message"`
StepName string `json:"step_name"` // 用于协调器识别是哪个步骤的回复
Timestamp time.Time `json:"timestamp"`
}
Saga协调器 (orchestrator_service.go):
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"sync"
"time"
"github.com/segmentio/kafka-go"
"your_module/common"
)
// SagaState 定义Saga的各个状态
const (
SagaState_Pending = "PENDING"
SagaState_OrderCreated = "ORDER_CREATED"
SagaState_InventoryReserved = "INVENTORY_RESERVED"
SagaState_PaymentProcessed = "PAYMENT_PROCESSED"
SagaState_Completed = "COMPLETED"
SagaState_Failed = "FAILED"
SagaState_Compensating = "COMPENSATING"
)
type SagaInstance struct {
TxID common.TransactionID
OrderID string
CustomerID string
Items []common.OrderItem
TotalPrice float64
CurrentState string
// 存储每个步骤成功时需要的回滚信息
InventoryReservationDetails map[string]int // productID -> quantity
PaymentDetails struct {
Amount float64
Status string
}
CreatedAt time.Time
UpdatedAt time.Time
}
type OrderSagaOrchestrator struct {
commandProducer *kafka.Writer // 发送命令给参与者
replyConsumer *kafka.Reader // 接收参与者的回复
sagas map[common.TransactionID]*SagaInstance // 存储所有Saga实例的状态
mu sync.Mutex
kafkaBroker string
}
func NewOrderSagaOrchestrator(kafkaBroker string) *OrderSagaOrchestrator {
commandProducer := &kafka.Writer{
Addr: kafka.TCP(kafkaBroker),
Topic: "saga-commands", // 所有命令发往这个topic
Balancer: &kafka.LeastBytes{},
}
replyConsumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBroker},
Topic: "saga-replies", // 接收所有回复
GroupID: "orchestrator-group",
MinBytes: 10e3,
MaxBytes: 10e6,
MaxWait: 1 * time.Second,
})
return &OrderSagaOrchestrator{
commandProducer: commandProducer,
replyConsumer: replyConsumer,
sagas: make(map[common.TransactionID]*SagaInstance),
kafkaBroker: kafkaBroker,
}
}
// StartSaga 启动一个新的Saga事务
func (o *OrderSagaOrchestrator) StartSaga(ctx context.Context, customerID string, items []common.OrderItem) (string, error) {
txID := common.TransactionID(fmt.Sprintf("SAGA-%d", rand.Intn(100000)))
orderID := fmt.Sprintf("ORD-%d", rand.Intn(100000))
totalPrice := 0.0
for _, item := range items {
totalPrice += item.Price * float64(item.Quantity)
}
saga := &SagaInstance{
TxID: txID,
OrderID: orderID,
CustomerID: customerID,
Items: items,
TotalPrice: totalPrice,
CurrentState: SagaState_Pending,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
InventoryReservationDetails: make(map[string]int),
}
o.mu.Lock()
o.sagas[txID] = saga
o.mu.Unlock()
log.Printf("Orchestrator: Started Saga %s for Order %s", txID, orderID)
// Step 1: Create Order in Order Service
cmd := common.CreateOrderCommand{
TxID: txID,
OrderID: orderID,
CustomerID: customerID,
Items: items,
TotalPrice: totalPrice,
}
err := o.sendCommand(ctx, "order-service", "CreateOrder", cmd)
if err != nil {
log.Printf("Orchestrator: Failed to send CreateOrderCommand for TxID %s: %v", txID, err)
o.failSaga(txID, "Failed to send initial order command")
return "", err
}
return orderID, nil
}
// StartReplyConsumer 启动回复消息的消费者
func (o *OrderSagaOrchestrator) StartReplyConsumer(ctx context.Context) {
log.Println("Orchestrator: Starting reply consumer...")
for {
select {
case <-ctx.Done():
log.Println("Orchestrator: Stopping reply consumer.")
o.replyConsumer.Close()
return
default:
m, err := o.replyConsumer.ReadMessage(ctx)
if err != nil {
log.Printf("Orchestrator: Error reading reply message: %v", err)
continue
}
var reply common.CommandReply
if err := json.Unmarshal(m.Value, &reply); err != nil {
log.Printf("Orchestrator: Failed to unmarshal CommandReply: %v", err)
continue
}
log.Printf("Orchestrator: Received reply for TxID %s, Step: %s, Success: %t", reply.TxID, reply.StepName, reply.Success)
o.handleReply(ctx, reply)
}
}
}
// handleReply 处理来自参与者的回复
func (o *OrderSagaOrchestrator) handleReply(ctx context.Context, reply common.CommandReply) {
o.mu.Lock()
defer o.mu.Unlock()
saga, exists := o.sagas[reply.TxID]
if !exists {
log.Printf("Orchestrator: Received reply for unknown Saga TxID %s", reply.TxID)
return
}
// 幂等性检查:如果Saga已完成或失败,则忽略重复回复
if saga.CurrentState == SagaState_Completed || saga.CurrentState == SagaState_Failed || saga.CurrentState == SagaState_Compensating {
log.Printf("Orchestrator: Saga %s already in state %s, ignoring reply for step %s", saga.TxID, saga.CurrentState, reply.StepName)
return
}
saga.UpdatedAt = time.Now()
if !reply.Success {
log.Printf("Orchestrator: Step %s failed for Saga %s: %s. Initiating compensation.", reply.StepName, saga.TxID, reply.Message)
o.startCompensation(ctx, saga, reply.StepName)
return
}
// 状态机流转
switch reply.StepName {
case "CreateOrder":
if saga.CurrentState == SagaState_Pending {
saga.CurrentState = SagaState_OrderCreated
log.Printf("Orchestrator: Saga %s state changed to %s. Sending ReserveInventoryCommand.", saga.TxID, saga.CurrentState)
cmd := common.ReserveInventoryCommand{
TxID: saga.TxID,
OrderID: saga.OrderID,
Items: saga.Items,
}
o.sendCommand(ctx, "inventory-service", "ReserveInventory", cmd)
// 在这里保存库存预留的详细信息,以便后续补偿
for _, item := range saga.Items {
saga.InventoryReservationDetails[item.ProductID] = item.Quantity
}
}
case "ReserveInventory":
if saga.CurrentState == SagaState_OrderCreated {
saga.CurrentState = SagaState_InventoryReserved
log.Printf("Orchestrator: Saga %s state changed to %s. Sending ProcessPaymentCommand.", saga.TxID, saga.CurrentState)
cmd := common.ProcessPaymentCommand{
TxID: saga.TxID,
OrderID: saga.OrderID,
Amount: saga.TotalPrice,
CustomerID: saga.CustomerID,
}
o.sendCommand(ctx, "payment-service", "ProcessPayment", cmd)
}
case "ProcessPayment":
if saga.CurrentState == SagaState_InventoryReserved {
saga.CurrentState = SagaState_PaymentProcessed
saga.PaymentDetails.Amount = saga.TotalPrice
saga.PaymentDetails.Status = "success" // 假设回复成功则支付成功
log.Printf("Orchestrator: Saga %s state changed to %s. Sending UpdateOrderStatusCommand (final).", saga.TxID, saga.CurrentState)
cmd := common.UpdateOrderStatusCommand{
TxID: saga.TxID,
OrderID: saga.OrderID,
Status: "COMPLETED",
}
o.sendCommand(ctx, "order-service", "UpdateOrderStatus", cmd)
saga.CurrentState = SagaState_Completed // Saga完成
log.Printf("Orchestrator: Saga %s completed successfully!", saga.TxID)
}
case "UpdateOrderStatus": // 最终状态更新,表示整个Saga成功结束
if saga.CurrentState == SagaState_PaymentProcessed {
// 理论上这里Saga已经Completed了,这个回复只是确认
log.Printf("Orchestrator: Order status updated for Saga %s. Confirmed completed.", saga.TxID)
}
default:
log.Printf("Orchestrator: Unknown step name in reply: %s for Saga %s", reply.StepName, saga.TxID)
}
// 实际中,这里需要持久化Saga实例的状态
}
// startCompensation 启动补偿流程
func (o *OrderSagaOrchestrator) startCompensation(ctx context.Context, saga *SagaInstance, failedStep string) {
log.Printf("Orchestrator: Starting compensation for Saga %s, failed at step %s", saga.TxID, failedStep)
saga.CurrentState = SagaState_Compensating
// 实际中,这里需要持久化Saga实例的状态
// 根据失败的步骤,逆序执行补偿操作
switch failedStep {
case "ProcessPayment":
// 如果支付失败,但之前库存已预留,需要回滚库存
if saga.InventoryReservationDetails != nil && len(saga.InventoryReservationDetails) > 0 {
cmd := common.CompensateInventoryCommand{
TxID: saga.TxID,
OrderID: saga.OrderID,
Items: saga.Items, // 使用原始订单项目来计算回滚
}
o.sendCommand(ctx, "inventory-service", "CompensateInventory", cmd)
}
// 同时更新订单服务状态为失败
updateOrderCmd := common.UpdateOrderStatusCommand{
TxID: saga.TxID,
OrderID: saga.OrderID,
Status: "PAYMENT_FAILED",
}
o.sendCommand(ctx, "order-service", "UpdateOrderStatus", updateOrderCmd)
case "ReserveInventory":
// 如果库存预留失败,只需更新订单服务状态为失败
updateOrderCmd := common.UpdateOrderStatusCommand{
TxID: saga.TxID,
OrderID: saga.OrderID,
Status: "INVENTORY_FAILED",
}
o.sendCommand(ctx, "order-service", "UpdateOrderStatus", updateOrderCmd)
case "CreateOrder":
// 如果订单创建失败,没有需要补偿的下游服务
updateOrderCmd := common.UpdateOrderStatusCommand{
TxID: saga.TxID,
OrderID: saga.OrderID,
Status: "ORDER_CREATION_FAILED",
}
o.sendCommand(ctx, "order-service", "UpdateOrderStatus", updateOrderCmd)
default:
log.Printf("Orchestrator: No compensation defined for failed step %s", failedStep)
}
saga.CurrentState = SagaState_Failed // 补偿完成后,Saga最终状态为失败
log.Printf("Orchestrator: Saga %s compensation completed. Final state: %s", saga.TxID, saga.CurrentState)
// 实际中,这里需要持久化Saga实例的最终状态
}
// sendCommand 封装发送命令的逻辑
func (o *OrderSagaOrchestrator) sendCommand(ctx context.Context, targetService, stepName string, command interface{}) error {
payload, err := json.Marshal(command)
if err != nil {
return fmt.Errorf("failed to marshal command %s: %w", stepName, err)
}
msg := kafka.Message{
Key: []byte(command.(interface { GetTxID() common.TransactionID }).GetTxID()), // 假设所有命令都有GetTxID方法
Value: payload,
Headers: []kafka.Header{
{Key: "CommandType", Value: []byte(stepName)},
{Key: "TargetService", Value: []byte(targetService)}, // 告知哪个服务应该处理
},
}
err = o.commandProducer.WriteMessages(ctx, msg)
if err != nil {
return fmt.Errorf("failed to publish command %s: %w", stepName, err)
}
log.Printf("Orchestrator: Sent command %s for TxID %s to %s", stepName, command.(interface { GetTxID() common.TransactionID }).GetTxID(), targetService)
return nil
}
func main() {
kafkaBroker := "localhost:9092"
orchestrator := NewOrderSagaOrchestrator(kafkaBroker)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go orchestrator.StartReplyConsumer(ctx)
// 模拟启动一个Saga
orderID, err := orchestrator.StartSaga(ctx, "customer-123", []common.OrderItem{
{ProductID: "P1", Quantity: 2, Price: 10.0},
{ProductID: "P2", Quantity: 1, Price: 25.0},
})
if err != nil {
log.Fatalf("Failed to start saga: %v", err)
}
fmt.Printf("Initial order ID: %sn", orderID)
time.Sleep(20 * time.Second) // 运行一段时间观察事件流和Saga状态
}
参与者服务(如库存服务)处理命令和发送回复的伪代码:
// inventory_participant_orchestrated.go (部分代码,结合之前的InventoryService)
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/segmentio/kafka-go"
"your_module/common"
)
type InventoryServiceOrchestrated struct {
commandConsumer *kafka.Reader // 接收协调器命令
replyProducer *kafka.Writer // 发送回复给协调器
stock map[string]int
mu sync.Mutex
}
func NewInventoryServiceOrchestrated(kafkaBroker string) *InventoryServiceOrchestrated {
commandConsumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBroker},
Topic: "saga-commands",
GroupID: "inventory-service-orchestrated-group",
MinBytes: 10e3,
MaxBytes: 10e6,
MaxWait: 1 * time.Second,
})
replyProducer := &kafka.Writer{
Addr: kafka.TCP(kafkaBroker),
Topic: "saga-replies",
Balancer: &kafka.LeastBytes{},
}
return &InventoryServiceOrchestrated{
commandConsumer: commandConsumer,
replyProducer: replyProducer,
stock: map[string]int{
"P1": 100,
"P2": 50,
},
}
}
func (s *InventoryServiceOrchestrated) StartCommandConsumer(ctx context.Context) {
log.Println("InventoryServiceOrchestrated: Starting command consumer...")
for {
select {
case <-ctx.Done():
log.Println("InventoryServiceOrchestrated: Stopping command consumer.")
s.commandConsumer.Close()
return
default:
m, err := s.commandConsumer.ReadMessage(ctx)
if err != nil {
log.Printf("InventoryServiceOrchestrated: Error reading command message: %v", err)
continue
}
commandType := string(m.Headers[0].Value) // CommandType
targetService := string(m.Headers[1].Value) // TargetService
if targetService != "inventory-service" { // 只有是发给自己的命令才处理
continue
}
log.Printf("InventoryServiceOrchestrated: Received command %s for TxID %s", commandType, string(m.Key))
var reply common.CommandReply
reply.TxID = common.TransactionID(m.Key)
reply.OrderID = string(m.Key) // 假设Key就是OrderID
reply.StepName = commandType
reply.Timestamp = time.Now()
switch commandType {
case "ReserveInventory":
var cmd common.ReserveInventoryCommand
if err := json.Unmarshal(m.Value, &cmd); err != nil {
log.Printf("InventoryServiceOrchestrated: Failed to unmarshal ReserveInventoryCommand: %v", err)
reply.Success = false
reply.Message = "invalid command payload"
} else {
reply.OrderID = cmd.OrderID
ok, msg := s.handleReserveInventory(ctx, cmd)
reply.Success = ok
reply.Message = msg
}
case "CompensateInventory":
var cmd common.CompensateInventoryCommand
if err := json.Unmarshal(m.Value, &cmd); err != nil {
log.Printf("InventoryServiceOrchestrated: Failed to unmarshal CompensateInventoryCommand: %v", err)
reply.Success = false
reply.Message = "invalid command payload"
} else {
reply.OrderID = cmd.OrderID
ok, msg := s.handleCompensateInventory(ctx, cmd)
reply.Success = ok
reply.Message = msg
}
default:
log.Printf("InventoryServiceOrchestrated: Unknown command type %s", commandType)
reply.Success = false
reply.Message = "unknown command type"
}
s.sendReply(ctx, reply)
}
}
}
func (s *InventoryServiceOrchestrated) handleReserveInventory(ctx context.Context, cmd common.ReserveInventoryCommand) (bool, string) {
s.mu.Lock()
defer s.mu.Unlock()
// 幂等性检查:这里需要更精细的逻辑,例如查询数据库中是否已有该TxID的预留记录
// 简单示例:假设未处理过
log.Printf("InventoryServiceOrchestrated: Processing ReserveInventory for OrderID %s", cmd.OrderID)
for _, item := range cmd.Items {
if s.stock[item.ProductID] < item.Quantity {
return false, fmt.Sprintf("not enough stock for %s", item.ProductID)
}
}
for _, item := range cmd.Items {
s.stock[item.ProductID] -= item.Quantity
}
log.Printf("InventoryServiceOrchestrated: Inventory reserved for OrderID %s. Current stock: %v", cmd.OrderID, s.stock)
return true, "inventory reserved"
}
func (s *InventoryServiceOrchestrated) handleCompensateInventory(ctx context.Context, cmd common.CompensateInventoryCommand) (bool, string) {
s.mu.Lock()
defer s.mu.Unlock()
// 幂等性检查:查询数据库中是否已回滚或未曾预留
log.Printf("InventoryServiceOrchestrated: Processing CompensateInventory for OrderID %s", cmd.OrderID)
for _, item := range cmd.Items {
s.stock[item.ProductID] += item.Quantity
}
log.Printf("InventoryServiceOrchestrated: Inventory compensated for OrderID %s. Current stock: %v", cmd.OrderID, s.stock)
return true, "inventory compensated"
}
func (s *InventoryServiceOrchestrated) sendReply(ctx context.Context, reply common.CommandReply) {
payload, err := json.Marshal(reply)
if err != nil {
log.Printf("InventoryServiceOrchestrated: Failed to marshal CommandReply: %v", err)
return
}
msg := kafka.Message{
Key: []byte(reply.TxID),
Value: payload,
Headers: []kafka.Header{
{Key: "StepName", Value: []byte(reply.StepName)},
},
}
err = s.replyProducer.WriteMessages(ctx, msg)
if err != nil {
log.Printf("InventoryServiceOrchestrated: Failed to publish CommandReply: %v", err)
} else {
log.Printf("InventoryServiceOrchestrated: Sent reply for TxID %s, Step %s", reply.TxID, reply.StepName)
}
}
func main() {
kafkaBroker := "localhost:9092"
inventoryService := NewInventoryServiceOrchestrated(kafkaBroker)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go inventoryService.StartCommandConsumer(ctx)
time.Sleep(25 * time.Second)
}
优缺点分析 (协调器式):
- 优点:
- 集中式流程管理: Saga的业务逻辑和流程清晰地定义在协调器中,易于理解、维护和修改。
- 易于追踪: 协调器作为中央控制点,可以轻松追踪Saga的当前状态,并提供更好的可观测性。
- 简化服务逻辑: 参与者服务只需关注自身本地事务的执行和补偿逻辑,无需了解整个Saga的流程。
- 更好的故障处理: 协调器可以更容易地实现重试、超时处理和死信队列等机制。
- 缺点:
- 协调器成为单点故障风险: 如果协调器发生故障,整个Saga可能会停滞。需要对协调器进行高可用性设计(例如,通过集群、持久化Saga状态等)。
- 可能引入性能瓶颈: 如果协调器需要处理大量Saga事务,可能会成为性能瓶颈。
- 中心化控制: 与微服务去中心化的思想略有冲突,但这种中心化是业务流程层面的,而非数据存储层面的。
Sagas中的关键挑战与Go语言实践
无论选择编排式还是协调器式Saga,以下几个关键挑战是必须面对的,Go语言的特性为解决这些挑战提供了便利。
幂等性 (Idempotency)
- 为什么重要: 在分布式系统中,消息队列通常提供“至少一次”的交付保证。这意味着消息可能会被重复发送和处理。如果一个服务多次处理同一个“扣减库存”请求,就会导致数据不一致。因此,所有Saga参与者的本地事务必须是幂等的。
-
Go语言中的实现策略:
- 唯一事务ID: 每个Saga事务都应有一个全局唯一的TxID。参与者服务在处理命令或事件时,首先检查该TxID是否已经被处理过。
-
数据库检查: 在执行本地事务之前,服务应在本地数据库中查询是否已经存在与该TxID相关的事务记录。
// 伪代码:库存服务处理库存预留的幂等性检查 func (s *InventoryServiceOrchestrated) handleReserveInventory(ctx context.Context, cmd common.ReserveInventoryCommand) (bool, string) { s.mu.Lock() defer s.mu.Unlock() // 1. 检查是否已经处理过这个TxID的库存预留 // 假设我们有一个transactions表来记录已处理的分布式事务ID // row := s.db.QueryRow("SELECT status FROM saga_transactions WHERE tx_id = ?", cmd.TxID) // if row.Scan(&status); err == nil && status == "completed" { // log.Printf("InventoryService: TxID %s already processed, returning success.", cmd.TxID) // return true, "already processed" // } // 2. 执行业务逻辑 // ... 扣减库存 ... // 3. 记录TxID为已处理 (在同一个本地事务中) // tx, _ := s.db.Begin() // tx.Exec("UPDATE inventory SET quantity = quantity - ? WHERE product_id = ?", item.Quantity, item.ProductID) // tx.Exec("INSERT INTO saga_transactions (tx_id, status) VALUES (?, 'completed')", cmd.TxID) // tx.Commit() return true, "inventory reserved" } - 版本控制 / 乐观锁: 对于需要更新的资源,可以使用版本号或时间戳进行乐观锁控制,确保不会重复应用旧