解析 ‘Two-Phase Commit (2PC) vs Sagas’:在 Go 微服务架构中处理分布式事务的物理折衷

引言:分布式事务的挑战

在现代软件架构中,微服务已成为构建复杂、可伸缩应用的基石。它将一个庞大的单体应用拆解成一系列小型、自治的服务,每个服务专注于特定的业务功能,并独立部署、运行和扩展。这种架构模式带来了诸多好处,例如技术栈的灵活性、团队的自治性、故障隔离以及更快的开发迭代速度。然而,微服务的分布式特性也引入了一个核心挑战:分布式事务

在单体应用中,我们习惯于使用数据库提供的ACID(原子性、一致性、隔离性、持久性)事务来保证数据操作的完整性。一个操作要么全部成功,要么全部失败,回滚到初始状态。但在微服务世界里,一个完整的业务操作可能需要跨越多个服务,每个服务拥有自己的独立数据库。此时,传统的单数据库ACID事务无法直接跨越服务边界,导致我们面临一个核心问题:如何确保这些跨服务的操作要么全部成功,要么全部失败,即使在部分服务失败或网络中断的情况下?

这就是分布式事务需要解决的问题。它要求在分布式系统中协调多个独立资源(通常是数据库)上的操作,以实现一个全局的原子性操作。然而,强一致性(Consistency)和高可用性(Availability)在分布式系统中往往是相互冲突的,这正是著名的CAP定理所揭示的物理折衷。在微服务架构中,我们常常需要为了高可用性和分区容错性而牺牲一定程度的即时一致性。

本文将深入探讨两种主要的分布式事务处理模式:两阶段提交 (Two-Phase Commit, 2PC)Sagas 模式。我们将从原理、优缺点、Go语言实现的角度进行详细分析,并着重讨论在Go微服务架构中选择它们时所面临的物理折衷和实际考量。

两阶段提交 (Two-Phase Commit, 2PC)

两阶段提交(2PC)是一种旨在实现分布式系统原子性操作的协议,它试图在多个参与者之间提供ACID的原子性保证。其核心思想是通过一个协调者(Coordinator)来协调所有参与者(Participants)的事务。

协议原理:准备阶段 (Prepare Phase) 与提交阶段 (Commit Phase)

2PC协议分为两个主要阶段:

  1. 准备阶段 (Prepare Phase / Voting Phase)

    • 协调者向所有参与者发送一个“准备提交”请求。
    • 每个参与者接收到请求后,会执行所有事务操作,但暂不提交。它会将事务操作的日志写入磁盘(redo/undo logs),并锁定所需资源,确保能够提交或回滚。
    • 如果参与者能够成功执行操作并准备好提交,它会向协调者发送“同意”消息。
    • 如果参与者无法执行操作(例如,资源不足或死锁),它会向协调者发送“中止”消息。
    • 如果协调者在指定时间内没有收到所有参与者的响应,它会认为该参与者“中止”。
  2. 提交阶段 (Commit Phase / Decision Phase)

    • 协调者根据准备阶段收到的所有响应做出决定:
      • 全部同意: 如果所有参与者都发送了“同意”消息,协调者将向所有参与者发送“提交”请求。参与者收到请求后,正式提交其本地事务,并释放锁定资源,然后向协调者发送“完成”消息。
      • 任何中止: 如果有任何一个参与者发送了“中止”消息,或者协调者超时未收到某个参与者的响应,协调者将向所有参与者发送“回滚”请求。参与者收到请求后,回滚其本地事务,释放锁定资源,然后向协调者发送“完成”消息。
    • 协调者收到所有参与者的“完成”消息后,事务才算最终完成。

2PC的优势:强一致性,原子性

2PC协议的核心优势在于其能够提供强一致性原子性。在一个成功的2PC事务中,所有参与者要么全部提交,要么全部回滚,保证了数据在分布式系统中的最终状态是一致的。这对于那些对数据完整性有极高要求的关键业务场景(例如银行转账)似乎是一个理想的选择。

2PC的劣势:与微服务哲学的冲突

尽管2PC提供了强大的ACID保证,但在现代分布式微服务架构中,它通常被视为一种反模式。其劣势非常显著:

  1. 阻塞性 (Blocking)

    • 在准备阶段,参与者必须锁定它们所涉及的资源,直到事务最终提交或回滚。这意味着这些资源在整个2PC事务期间都不能被其他事务访问。如果协调者或某个参与者在关键时刻崩溃,这些资源可能会长时间保持锁定状态,导致系统性能急剧下降,甚至死锁。
  2. 单点故障 (Single Point of Failure)

    • 协调者是2PC协议的关键。如果协调者在提交阶段之前发生故障,所有参与者将无法得知最终的事务决策(提交或回滚),它们将一直等待,持有锁定的资源,导致整个系统停滞。虽然可以通过协调者的故障恢复机制(如日志持久化和选举新的协调者)来缓解,但实现起来非常复杂且成本高昂。
  3. 性能瓶颈

    • 2PC涉及多次网络往返(协调者与所有参与者之间的通信),这带来了显著的网络延迟。随着参与者数量的增加,通信开销会线性增长。同时,资源的长时间锁定也限制了并发性,导致系统吞吐量下降。
  4. 与微服务哲学的冲突 (紧耦合)

    • 微服务架构倡导服务之间的松耦合和自治。然而,2PC要求所有参与者都必须遵循相同的协议,并且由一个中央协调者来控制。这意味着服务之间存在强烈的同步依赖,破坏了微服务的独立性原则。一个服务的失败可能会影响整个分布式事务,导致级联故障。
  5. 违反可用性原则 (Availability)

    • CAP定理指出,在分布式系统中,一致性、可用性和分区容错性三者只能同时满足其二。2PC为了追求强一致性,在面对网络分区时,往往需要牺牲可用性。如果网络发生分区,导致协调者无法与部分参与者通信,整个事务就无法推进,系统将停止响应,从而降低了可用性。

Go语言视角下的实现考量

在Go语言中,如果我们要“实现”一个2PC(这里更多是指概念上的模拟,因为真实的跨数据库2PC协调器通常由数据库或专门的事务管理器提供,我们很少自己从头实现),我们可以考虑使用gRPC或标准库的net/rpc进行服务间的通信。

假设我们有一个订单服务、一个支付服务和一个库存服务,它们都充当2PC的参与者。

// event.go (定义消息或命令结构)
package common

type TransactionID string

type PrepareRequest struct {
    TxID       TransactionID
    Payload    []byte // 业务数据,例如订单详情、支付金额、库存扣减数量
    ServiceKey string // 标识是哪个服务发送的请求
}

type PrepareResponse struct {
    TxID       TransactionID
    ServiceKey string
    Approved   bool
    Error      string
}

type CommitRequest struct {
    TxID       TransactionID
    ServiceKey string
    Action     string // "commit" or "rollback"
}

type CommitResponse struct {
    TxID       TransactionID
    ServiceKey string
    Success    bool
    Error      string
}

参与者服务(以库存服务为例)的伪代码:

// inventory_participant.go
package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    "your_module/common" // 假设common包定义了上述结构
)

// InventoryService 模拟库存服务
type InventoryService struct {
    // 假设有一个简单的库存存储
    stock map[string]int
    // 存储正在进行的事务状态
    pendingTransactions map[common.TransactionID]struct {
        ReservedItems map[string]int
        Status        string // "prepared", "committed", "rolledback"
    }
    mu sync.Mutex
}

func NewInventoryService() *InventoryService {
    return &InventoryService{
        stock: map[string]int{
            "ProductA": 100,
            "ProductB": 50,
        },
        pendingTransactions: make(map[common.TransactionID]struct {
            ReservedItems map[string]int
            Status        string
        }),
    }
}

// HandlePrepare 模拟2PC准备阶段
func (s *InventoryService) HandlePrepare(ctx context.Context, req common.PrepareRequest) common.PrepareResponse {
    s.mu.Lock()
    defer s.mu.Unlock()

    log.Printf("Inventory Service: Received Prepare request for TxID %s", req.TxID)

    // 模拟解析业务数据
    var orderItems map[string]int // 假设req.Payload解析后是这个
    // ... 解析 req.Payload ...
    orderItems = map[string]int{"ProductA": 5} // 示例

    // 检查库存并预留
    reservedItems := make(map[string]int)
    for item, quantity := range orderItems {
        if s.stock[item] < quantity {
            log.Printf("Inventory Service: Not enough stock for %s. Current: %d, Requested: %d", item, s.stock[item], quantity)
            return common.PrepareResponse{TxID: req.TxID, ServiceKey: "inventory", Approved: false, Error: "not enough stock"}
        }
        reservedItems[item] = quantity
    }

    // 模拟数据库操作:写入预留日志,但不真正扣减
    // s.db.Exec("INSERT INTO reservations ...")
    // s.db.Exec("UPDATE inventory SET reserved = reserved + ? WHERE item = ?", quantity, item)

    // 更新本地事务状态
    s.pendingTransactions[req.TxID] = struct {
        ReservedItems map[string]int
        Status        string
    }{
        ReservedItems: reservedItems,
        Status:        "prepared",
    }

    log.Printf("Inventory Service: Prepared TxID %s, reserved %v", req.TxID, reservedItems)
    return common.PrepareResponse{TxID: req.TxID, ServiceKey: "inventory", Approved: true}
}

// HandleCommit 模拟2PC提交阶段
func (s *InventoryService) HandleCommit(ctx context.Context, req common.CommitRequest) common.CommitResponse {
    s.mu.Lock()
    defer s.mu.Unlock()

    log.Printf("Inventory Service: Received Commit request for TxID %s, action: %s", req.TxID, req.Action)

    txState, exists := s.pendingTransactions[req.TxID]
    if !exists || txState.Status != "prepared" {
        log.Printf("Inventory Service: TxID %s not in prepared state or doesn't exist.", req.TxID)
        return common.CommitResponse{TxID: req.TxID, ServiceKey: "inventory", Success: false, Error: "invalid transaction state"}
    }

    if req.Action == "commit" {
        for item, quantity := range txState.ReservedItems {
            s.stock[item] -= quantity // 真正扣减库存
        }
        txState.Status = "committed"
        s.pendingTransactions[req.TxID] = txState
        log.Printf("Inventory Service: Committed TxID %s. New stock: %v", req.TxID, s.stock)
        return common.CommitResponse{TxID: req.TxID, ServiceKey: "inventory", Success: true}
    } else if req.Action == "rollback" {
        // 模拟回滚操作:释放预留的库存
        // s.db.Exec("DELETE FROM reservations WHERE tx_id = ?", req.TxID)
        // s.db.Exec("UPDATE inventory SET reserved = reserved - ? WHERE item = ?", quantity, item)
        txState.Status = "rolledback"
        s.pendingTransactions[req.TxID] = txState
        log.Printf("Inventory Service: Rolled back TxID %s", req.TxID)
        return common.CommitResponse{TxID: req.TxID, ServiceKey: "inventory", Success: true}
    }

    return common.CommitResponse{TxID: req.TxID, ServiceKey: "inventory", Success: false, Error: "unknown action"}
}

// 模拟RPC服务器
func main() {
    inventoryService := NewInventoryService()
    // 实际应用中会启动一个gRPC或HTTP服务器来暴露这些方法
    log.Println("Inventory Service started, waiting for requests...")

    // 模拟一个2PC流程(通常由协调者发起)
    txID := common.TransactionID("tx-123")
    log.Printf("--- Simulating a successful 2PC for TxID %s ---", txID)

    // 1. 准备阶段
    prepareReq := common.PrepareRequest{
        TxID:       txID,
        ServiceKey: "order", // 假设是订单服务发起的
        Payload:    []byte(`{"ProductA": 5}`),
    }
    prepResp := inventoryService.HandlePrepare(context.Background(), prepareReq)
    fmt.Printf("Prepare Response: %+vn", prepResp)

    if prepResp.Approved {
        // 2. 提交阶段
        commitReq := common.CommitRequest{
            TxID:       txID,
            ServiceKey: "coordinator",
            Action:     "commit",
        }
        commitResp := inventoryService.HandleCommit(context.Background(), commitReq)
        fmt.Printf("Commit Response: %+vn", commitResp)
    } else {
        // 如果准备失败,无需提交,但如果其他服务准备成功,协调者会发起回滚
        log.Printf("Prepare failed for TxID %s, no commit needed from this service.", txID)
    }

    // 模拟一个失败的2PC流程
    txID2 := common.TransactionID("tx-456")
    log.Printf("n--- Simulating a failed 2PC (due to insufficient stock) for TxID %s ---", txID2)
    prepareReq2 := common.PrepareRequest{
        TxID:       txID2,
        ServiceKey: "order",
        Payload:    []byte(`{"ProductA": 200}`), // 请求200,但只有95(100-5)
    }
    prepResp2 := inventoryService.HandlePrepare(context.Background(), prepareReq2)
    fmt.Printf("Prepare Response: %+vn", prepResp2)

    if !prepResp2.Approved {
        log.Printf("Prepare failed as expected for TxID %s. Coordinator would initiate rollback for other participants if any succeeded.", txID2)
        // 协调者会向所有参与者发送回滚请求
        rollbackReq := common.CommitRequest{
            TxID:       txID2,
            ServiceKey: "coordinator",
            Action:     "rollback",
        }
        rollbackResp := inventoryService.HandleCommit(context.Background(), rollbackReq)
        fmt.Printf("Rollback Response (even though prepare failed here, showing logic): %+vn", rollbackResp)
    }

    // 保持服务运行,以便观察日志
    time.Sleep(5 * time.Second)
}

协调者服务 (Coordinator) 的伪代码思路:

协调者会维护一个事务的状态机,并调用各个参与者的RPC方法。

// coordinator.go (概念性伪代码)
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "your_module/common"
    // 假设有gRPC客户端 for InventoryService, PaymentService, OrderService
)

type ParticipantClient interface {
    Prepare(ctx context.Context, req common.PrepareRequest) (common.PrepareResponse, error)
    Commit(ctx context.Context, req common.CommitRequest) (common.CommitResponse, error)
}

type Coordinator struct {
    participants map[string]ParticipantClient // 映射服务名到其客户端
    txStates     map[common.TransactionID]string // 存储事务状态:"pending", "prepared", "committed", "rolledback"
}

func NewCoordinator(clients map[string]ParticipantClient) *Coordinator {
    return &Coordinator{
        participants: clients,
        txStates:     make(map[common.TransactionID]string),
    }
}

func (c *Coordinator) StartDistributedTransaction(ctx context.Context, txID common.TransactionID, payload map[string][]byte) error {
    c.txStates[txID] = "pending"
    log.Printf("Coordinator: Starting distributed transaction %s", txID)

    // Phase 1: Prepare
    prepareResponses := make(chan common.PrepareResponse, len(c.participants))
    errs := make(chan error, len(c.participants))
    var allApproved bool = true

    for serviceKey, client := range c.participants {
        go func(sk string, cl ParticipantClient) {
            req := common.PrepareRequest{
                TxID:       txID,
                ServiceKey: sk,
                Payload:    payload[sk], // 每个服务可能需要不同的payload
            }
            resp, err := cl.Prepare(ctx, req)
            if err != nil {
                errs <- fmt.Errorf("participant %s prepare failed: %w", sk, err)
                return
            }
            prepareResponses <- resp
        }(serviceKey, client)
    }

    // Collect responses with a timeout
    prepareCount := 0
    for {
        select {
        case resp := <-prepareResponses:
            prepareCount++
            if !resp.Approved {
                allApproved = false
                log.Printf("Coordinator: Participant %s rejected prepare for TxID %s: %s", resp.ServiceKey, txID, resp.Error)
            }
        case err := <-errs:
            prepareCount++
            allApproved = false
            log.Printf("Coordinator: Participant prepare error for TxID %s: %v", txID, err)
        case <-ctx.Done(): // Context timeout or cancellation
            allApproved = false
            log.Printf("Coordinator: Prepare phase timed out for TxID %s", txID)
        default:
            if prepareCount == len(c.participants) {
                goto END_PREPARE_PHASE
            }
            time.Sleep(10 * time.Millisecond) // Small delay to avoid busy-waiting
        }
    }
END_PREPARE_PHASE:

    // Phase 2: Commit or Rollback
    action := "rollback"
    if allApproved {
        action = "commit"
        c.txStates[txID] = "prepared" // 实际中,这里应该先持久化决定
    } else {
        c.txStates[txID] = "rolledback" // 实际中,这里应该先持久化决定
    }

    log.Printf("Coordinator: Decision for TxID %s is to %s", txID, action)

    commitResponses := make(chan common.CommitResponse, len(c.participants))
    commitErrs := make(chan error, len(c.participants))

    for serviceKey, client := range c.participants {
        go func(sk string, cl ParticipantClient) {
            req := common.CommitRequest{
                TxID:       txID,
                ServiceKey: sk,
                Action:     action,
            }
            resp, err := cl.Commit(ctx, req)
            if err != nil {
                commitErrs <- fmt.Errorf("participant %s commit/rollback failed: %w", sk, err)
                return
            }
            commitResponses <- resp
        }(serviceKey, client)
    }

    // Collect commit/rollback responses
    commitCount := 0
    for {
        select {
        case resp := <-commitResponses:
            commitCount++
            if !resp.Success {
                log.Printf("Coordinator: Participant %s failed to %s for TxID %s: %s", resp.ServiceKey, action, txID, resp.Error)
                // 实际中这里需要更复杂的恢复机制
            }
        case err := <-commitErrs:
            commitCount++
            log.Printf("Coordinator: Participant commit/rollback error for TxID %s: %v", txID, err)
            // 实际中这里需要更复杂的恢复机制
        case <-ctx.Done():
            log.Printf("Coordinator: Commit phase timed out for TxID %s. Potential inconsistency!", txID)
            return fmt.Errorf("commit phase timed out, transaction %s might be inconsistent", txID)
        default:
            if commitCount == len(c.participants) {
                goto END_COMMIT_PHASE
            }
            time.Sleep(10 * time.Millisecond)
        }
    }
END_COMMIT_PHASE:

    if allApproved && action == "commit" {
        c.txStates[txID] = "committed"
        log.Printf("Coordinator: Transaction %s successfully committed.", txID)
        return nil
    } else {
        c.txStates[txID] = "rolledback"
        log.Printf("Coordinator: Transaction %s successfully rolled back (or failed to commit).", txID)
        return fmt.Errorf("transaction %s failed to commit or was rolled back", txID)
    }
}

func main() {
    // 模拟创建参与者客户端
    inventoryClient := &MockParticipantClient{Service: NewInventoryService()}
    // paymentClient := &MockParticipantClient{Service: NewPaymentService()}
    // orderClient := &MockParticipantClient{Service: NewOrderService()}

    participants := map[string]ParticipantClient{
        "inventory": inventoryClient,
        // "payment": paymentClient,
        // "order": orderClient,
    }

    coordinator := NewCoordinator(participants)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 模拟一次成功事务
    txID1 := common.TransactionID("global-tx-001")
    payload1 := map[string][]byte{
        "inventory": []byte(`{"ProductA": 5}`),
        // "payment": []byte(`{"amount": 100}`),
        // "order": []byte(`{"order_id": "ORD-001"}`),
    }
    err := coordinator.StartDistributedTransaction(ctx, txID1, payload1)
    if err != nil {
        fmt.Printf("Transaction %s failed: %vn", txID1, err)
    } else {
        fmt.Printf("Transaction %s completed successfully.n", txID1)
    }
    fmt.Printf("Final state of %s: %sn", txID1, coordinator.txStates[txID1])

    fmt.Println("n---")
    // 模拟一次失败事务 (库存不足)
    txID2 := common.TransactionID("global-tx-002")
    payload2 := map[string][]byte{
        "inventory": []byte(`{"ProductA": 200}`), // 假设会失败
    }
    ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel2()
    err = coordinator.StartDistributedTransaction(ctx2, txID2, payload2)
    if err != nil {
        fmt.Printf("Transaction %s failed: %vn", txID2, err)
    } else {
        fmt.Printf("Transaction %s completed successfully.n", txID2)
    }
    fmt.Printf("Final state of %s: %sn", txID2, coordinator.txStates[txID2])
}

// MockParticipantClient 用于模拟调用实际的InventoryService方法
type MockParticipantClient struct {
    Service *InventoryService // 这里只是模拟,实际会是gRPC客户端
}

func (m *MockParticipantClient) Prepare(ctx context.Context, req common.PrepareRequest) (common.PrepareResponse, error) {
    return m.Service.HandlePrepare(ctx, req), nil
}

func (m *MockParticipantClient) Commit(ctx context.Context, req common.CommitResponse) (common.CommitResponse, error) {
    return m.Service.HandleCommit(ctx, req), nil
}

上述Go语言代码只是概念性的伪代码,用于说明2PC的通信流程和状态管理。在实际生产环境中,由于2PC的固有缺陷,它在跨服务分布式事务场景中几乎不被采用。它更多地存在于单个数据库系统内部的事务管理,或者XA(eXtended Architecture)事务管理器这类专门的中间件中,但即便如此,XA事务也因其性能和可用性问题而鲜少用于现代微服务。

为什么在微服务中很少使用2PC?

总结来说,2PC的同步阻塞特性、单点故障问题以及对性能和可用性的严重影响,与微服务架构追求的弹性、松耦合和高可用性理念格格不入。它将本应独立的微服务紧密地绑定在一起,形成一个巨大的分布式单体,这正是微服务架构试图避免的。因此,我们需要寻找一种更符合微服务特性的分布式事务解决方案,即Sagas模式。

Sagas 模式

Sagas 模式是一种处理分布式事务的策略,它通过一系列本地事务和相应的补偿事务来确保整个业务操作的最终一致性。与2PC追求全局强一致性不同,Sagas模式接受了最终一致性 (Eventual Consistency) 的理念,从而在可用性和分区容错性上取得了优势。

Sagas模式的诞生与核心思想

Sagas模式最初由Hector Garcia-Molina和Kenneth Salem于1987年提出,用于处理长事务。其核心思想是:一个分布式事务被分解成一系列本地事务 (Local Transactions)。每个本地事务都由一个微服务完成,并更新其本地数据库。如果所有本地事务都成功,则整个Saga事务成功。如果某个本地事务失败,Saga会执行一系列补偿事务 (Compensating Transactions) 来撤销之前已成功的本地事务的影响,从而使系统回到一个一致的状态(尽管不是初始状态,而是业务上可接受的补偿状态)。

Sagas的优势:高可用性,松耦合,可伸缩性

Sagas模式的优势使其成为现代微服务架构中处理分布式事务的首选:

  1. 高可用性 (Availability)
    • Saga中的每个本地事务都是独立的,服务可以独立提交其本地事务并释放资源。即使某个服务暂时失败,其他服务也可以继续处理其本地事务,这大大提高了系统的可用性。
  2. 非阻塞性
    • 服务在完成本地事务后立即释放资源,避免了2PC中资源长时间锁定的问题。这提高了系统的并发处理能力和吞吐量。
  3. 松耦合
    • 服务之间通过异步消息或事件进行通信,它们不需要知道彼此的内部实现细节,只需要约定事件或命令的格式。这种松耦合使得服务可以独立开发、部署和扩展。
  4. 可伸缩性
    • 由于服务的独立性和异步通信,Saga模式能够更好地支持系统的水平扩展。
  5. 适用于现代微服务架构
    • 与微服务倡导的自治、弹性、容错等原则高度契合。

Sagas的劣势:最终一致性,复杂性

Sagas模式并非没有缺点,它引入了新的挑战:

  1. 最终一致性 (Eventual Consistency)
    • 这是Saga模式固有的特性。在整个Saga事务完成之前,系统可能会处于一种中间状态,即部分本地事务已提交,而其他事务尚未完成或已失败并等待补偿。这可能导致“读到旧数据”或“脏读”问题,需要前端应用或后续业务流程能够容忍这种短暂的不一致性。例如,用户下单后,库存已扣减但支付尚未成功,此时用户查询订单状态可能会看到“待支付”或“处理中”。
  2. 复杂性:补偿事务的设计、幂等性、超时处理
    • 补偿事务 (Compensating Transactions):设计补偿事务比设计正向事务更复杂。每个服务需要知道如何“撤销”其已提交的本地事务,并且补偿事务本身也必须是幂等的,以防重复执行。
    • 幂等性 (Idempotency):由于消息传递的异步性和“至少一次”的交付保证,服务可能会收到重复的消息。因此,每个处理请求的逻辑都必须是幂等的,即多次执行相同操作产生的结果与执行一次相同。
    • 超时处理:Saga事务可能会持续较长时间。需要有机制来检测和处理超时,并触发补偿流程。
    • 可观测性:追踪Saga事务的整个流程和状态,特别是在发生故障时,需要强大的分布式追踪和日志聚合能力。
  3. 原子性缺失 (业务层面一致性)
    • Sagas不提供ACID意义上的原子性。它提供的是业务层面的原子性,即要么完成业务流程,要么通过补偿回到业务上一致的状态。这种“一致状态”可能不是事务开始前的原始状态,而是通过一系列补偿操作达到的新状态。

Sagas的两种实现方式

Sagas模式主要有两种实现方式:编排式 Saga (Choreography-based Saga)协调器式 Saga (Orchestration-based Saga)

编排式 Saga (Choreography-based Saga)

  • 原理: 服务通过事件发布/订阅机制相互协调。每个服务在完成其本地事务后,会发布一个事件,通知其他相关的服务。其他服务订阅这些事件,并根据事件触发自己的本地事务。这种方式没有中央协调器,而是由服务之间的事件流来驱动整个Saga。
  • Go语言实现示例 (基于消息队列如Kafka)

我们以一个简单的“订单创建”Saga为例:

  1. 订单服务:创建订单,发布 OrderCreatedEvent
  2. 库存服务:订阅 OrderCreatedEvent,扣减库存,发布 InventoryReservedEventInventoryReservationFailedEvent
  3. 支付服务:订阅 InventoryReservedEvent,处理支付,发布 PaymentProcessedEventPaymentFailedEvent
  4. 订单服务:订阅 PaymentProcessedEventPaymentFailedEvent,更新订单状态。如果支付失败,可能还会发布 OrderPaymentFailedEvent
  5. 库存服务:订阅 OrderPaymentFailedEvent,回滚库存预留。

事件定义 (common/events.go):

package common

import "time"

type OrderCreatedEvent struct {
    OrderID    string    `json:"order_id"`
    CustomerID string    `json:"customer_id"`
    Items      []OrderItem `json:"items"`
    TotalPrice float64   `json:"total_price"`
    Timestamp  time.Time `json:"timestamp"`
}

type OrderItem struct {
    ProductID string `json:"product_id"`
    Quantity  int    `json:"quantity"`
    Price     float64 `json:"price"`
}

type InventoryReservedEvent struct {
    OrderID   string    `json:"order_id"`
    ProductID string    `json:"product_id"`
    Quantity  int       `json:"quantity"`
    Timestamp time.Time `json:"timestamp"`
}

type InventoryReservationFailedEvent struct {
    OrderID   string    `json:"order_id"`
    ProductID string    `json:"product_id"`
    Reason    string    `json:"reason"`
    Timestamp time.Time `json:"timestamp"`
}

type PaymentProcessedEvent struct {
    OrderID   string    `json:"order_id"`
    Amount    float64   `json:"amount"`
    Status    string    `json:"status"` // "success", "failed"
    Timestamp time.Time `json:"timestamp"`
}

type OrderPaymentFailedEvent struct {
    OrderID   string    `json:"order_id"`
    Reason    string    `json:"reason"`
    Timestamp time.Time `json:"timestamp"`
}

// 定义一个通用的消息接口,以便消息队列消费者处理
type Message interface {
    GetEventType() string
    GetOrderID() string
}

func (e OrderCreatedEvent) GetEventType() string { return "OrderCreated" }
func (e OrderCreatedEvent) GetOrderID() string   { return e.OrderID }

func (e InventoryReservedEvent) GetEventType() string { return "InventoryReserved" }
func (e InventoryReservedEvent) GetOrderID() string   { return e.OrderID }

func (e InventoryReservationFailedEvent) GetEventType() string { return "InventoryReservationFailed" }
func (e InventoryReservationFailedEvent) GetOrderID() string   { return e.OrderID }

func (e PaymentProcessedEvent) GetEventType() string { return "PaymentProcessed" }
func (e PaymentProcessedEvent) GetOrderID() string   { return e.OrderID }

func (e OrderPaymentFailedEvent) GetEventType() string { return "OrderPaymentFailed" }
func (e OrderPaymentFailedEvent) GetOrderID() string   { return e.OrderID }

订单服务 (order_service.go):

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/segmentio/kafka-go"
    "your_module/common"
)

type OrderService struct {
    producer *kafka.Writer
    consumer *kafka.Reader
}

func NewOrderService(kafkaBroker string) *OrderService {
    producer := &kafka.Writer{
        Addr:     kafka.TCP(kafkaBroker),
        Topic:    "order-events",
        Balancer: &kafka.LeastBytes{},
    }
    consumer := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{kafkaBroker},
        Topic:   "payment-events", // 订阅支付事件
        GroupID: "order-service-group",
        MinBytes: 10e3, // 10KB
        MaxBytes: 10e6, // 10MB
        MaxWait:  1 * time.Second,
    })
    return &OrderService{producer: producer, consumer: consumer}
}

func (s *OrderService) CreateOrder(ctx context.Context, customerID string, items []common.OrderItem) (string, error) {
    orderID := fmt.Sprintf("ORD-%d", rand.Intn(100000))
    totalPrice := 0.0
    for _, item := range items {
        totalPrice += item.Price * float64(item.Quantity)
    }

    event := common.OrderCreatedEvent{
        OrderID:    orderID,
        CustomerID: customerID,
        Items:      items,
        TotalPrice: totalPrice,
        Timestamp:  time.Now(),
    }

    payload, err := json.Marshal(event)
    if err != nil {
        return "", fmt.Errorf("failed to marshal OrderCreatedEvent: %w", err)
    }

    msg := kafka.Message{
        Key:   []byte(orderID),
        Value: payload,
        Headers: []kafka.Header{
            {Key: "EventType", Value: []byte(event.GetEventType())},
        },
    }

    err = s.producer.WriteMessages(ctx, msg)
    if err != nil {
        return "", fmt.Errorf("failed to publish OrderCreatedEvent: %w", err)
    }
    log.Printf("OrderService: Published OrderCreatedEvent for OrderID %s", orderID)
    // 实际中,这里会先在数据库中创建订单,状态为“待处理”或“创建中”
    return orderID, nil
}

func (s *OrderService) StartEventConsumer(ctx context.Context) {
    log.Println("OrderService: Starting payment events consumer...")
    for {
        select {
        case <-ctx.Done():
            log.Println("OrderService: Stopping event consumer.")
            s.consumer.Close()
            return
        default:
            m, err := s.consumer.ReadMessage(ctx)
            if err != nil {
                log.Printf("OrderService: Error reading message: %v", err)
                continue
            }
            eventType := string(m.Headers[0].Value) // 假设EventType在第一个Header
            log.Printf("OrderService: Received event %s for OrderID %s", eventType, string(m.Key))

            switch eventType {
            case "PaymentProcessed":
                var event common.PaymentProcessedEvent
                if err := json.Unmarshal(m.Value, &event); err != nil {
                    log.Printf("OrderService: Failed to unmarshal PaymentProcessedEvent: %v", err)
                    continue
                }
                if event.Status == "success" {
                    log.Printf("OrderService: Order %s payment successful. Updating order status to 'Paid'.", event.OrderID)
                    // 实际中,更新数据库中的订单状态
                } else {
                    log.Printf("OrderService: Order %s payment failed. Updating order status to 'PaymentFailed'.", event.OrderID)
                    // 实际中,更新数据库中的订单状态,并可能触发补偿或通知用户
                    s.publishOrderPaymentFailed(ctx, event.OrderID, event.Status)
                }
            case "InventoryReservationFailed": // 如果库存服务失败,也要更新订单状态
                var event common.InventoryReservationFailedEvent
                if err := json.Unmarshal(m.Value, &event); err != nil {
                    log.Printf("OrderService: Failed to unmarshal InventoryReservationFailedEvent: %v", err)
                    continue
                }
                log.Printf("OrderService: Order %s inventory reservation failed. Updating order status to 'InventoryFailed'.", event.OrderID)
                // 实际中,更新数据库中的订单状态,并可能触发补偿或通知用户
            default:
                log.Printf("OrderService: Unknown event type %s", eventType)
            }
        }
    }
}

func (s *OrderService) publishOrderPaymentFailed(ctx context.Context, orderID, reason string) {
    event := common.OrderPaymentFailedEvent{
        OrderID:   orderID,
        Reason:    reason,
        Timestamp: time.Now(),
    }
    payload, err := json.Marshal(event)
    if err != nil {
        log.Printf("OrderService: Failed to marshal OrderPaymentFailedEvent: %v", err)
        return
    }
    msg := kafka.Message{
        Key:   []byte(orderID),
        Value: payload,
        Headers: []kafka.Header{
            {Key: "EventType", Value: []byte(event.GetEventType())},
        },
    }
    err = s.producer.WriteMessages(ctx, msg)
    if err != nil {
        log.Printf("OrderService: Failed to publish OrderPaymentFailedEvent: %v", err)
    } else {
        log.Printf("OrderService: Published OrderPaymentFailedEvent for OrderID %s", orderID)
    }
}

func main() {
    kafkaBroker := "localhost:9092" // 假设Kafka运行在本地
    orderService := NewOrderService(kafkaBroker)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go orderService.StartEventConsumer(ctx)

    // 模拟创建订单
    _, err := orderService.CreateOrder(ctx, "customer-123", []common.OrderItem{
        {ProductID: "P1", Quantity: 2, Price: 10.0},
        {ProductID: "P2", Quantity: 1, Price: 25.0},
    })
    if err != nil {
        log.Fatalf("Failed to create order: %v", err)
    }

    time.Sleep(10 * time.Second) // 运行一段时间观察事件流
}

库存服务 (inventory_service.go):

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
    "your_module/common"
)

type InventoryService struct {
    producer *kafka.Writer
    consumer *kafka.Reader
    stock    map[string]int
    mu       sync.Mutex
}

func NewInventoryService(kafkaBroker string) *InventoryService {
    producer := &kafka.Writer{
        Addr:     kafka.TCP(kafkaBroker),
        Topic:    "inventory-events",
        Balancer: &kafka.LeastBytes{},
    }
    consumer := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{kafkaBroker},
        Topic:   "order-events", // 订阅订单创建事件
        GroupID: "inventory-service-group",
        MinBytes: 10e3,
        MaxBytes: 10e6,
        MaxWait:  1 * time.Second,
    })
    return &InventoryService{
        producer: producer,
        consumer: consumer,
        stock: map[string]int{
            "P1": 100,
            "P2": 50,
        },
    }
}

func (s *InventoryService) StartEventConsumer(ctx context.Context) {
    log.Println("InventoryService: Starting order events consumer...")
    for {
        select {
        case <-ctx.Done():
            log.Println("InventoryService: Stopping event consumer.")
            s.consumer.Close()
            return
        default:
            m, err := s.consumer.ReadMessage(ctx)
            if err != nil {
                log.Printf("InventoryService: Error reading message: %v", err)
                continue
            }
            eventType := string(m.Headers[0].Value)
            log.Printf("InventoryService: Received event %s for OrderID %s", eventType, string(m.Key))

            switch eventType {
            case "OrderCreated":
                var event common.OrderCreatedEvent
                if err := json.Unmarshal(m.Value, &event); err != nil {
                    log.Printf("InventoryService: Failed to unmarshal OrderCreatedEvent: %v", err)
                    continue
                }
                s.processOrderCreated(ctx, event)
            case "OrderPaymentFailed": // 补偿事务:支付失败时回滚库存
                var event common.OrderPaymentFailedEvent
                if err := json.Unmarshal(m.Value, &event); err != nil {
                    log.Printf("InventoryService: Failed to unmarshal OrderPaymentFailedEvent: %v", err)
                    continue
                }
                s.compensateInventoryReservation(ctx, event.OrderID)
            default:
                log.Printf("InventoryService: Unknown event type %s", eventType)
            }
        }
    }
}

func (s *InventoryService) processOrderCreated(ctx context.Context, orderEvent common.OrderCreatedEvent) {
    s.mu.Lock()
    defer s.mu.Unlock()

    log.Printf("InventoryService: Processing OrderCreatedEvent for OrderID %s", orderEvent.OrderID)

    // 检查库存并预留
    for _, item := range orderEvent.Items {
        if s.stock[item.ProductID] < item.Quantity {
            log.Printf("InventoryService: Not enough stock for %s. Current: %d, Requested: %d", item.ProductID, s.stock[item.ProductID], item.Quantity)
            s.publishInventoryReservationFailed(ctx, orderEvent.OrderID, item.ProductID, "not enough stock")
            return // 任何一个商品库存不足,则整个订单的库存预留失败
        }
    }

    for _, item := range orderEvent.Items {
        s.stock[item.ProductID] -= item.Quantity // 扣减库存
        s.publishInventoryReserved(ctx, orderEvent.OrderID, item.ProductID, item.Quantity)
    }
    log.Printf("InventoryService: Inventory reserved for OrderID %s. Current stock: %v", orderEvent.OrderID, s.stock)
    // 实际中,这里会持久化库存扣减或预留记录,并确保幂等性
}

func (s *InventoryService) compensateInventoryReservation(ctx context.Context, orderID string) {
    s.mu.Lock()
    defer s.mu.Unlock()

    log.Printf("InventoryService: Compensating inventory for OrderID %s due to payment failure.", orderID)
    // 实际中,这里会根据orderID查询之前预留的库存记录,然后加回到库存中
    // 简单示例:假设订单P1扣了2个,P2扣了1个
    s.stock["P1"] += 2
    s.stock["P2"] += 1
    log.Printf("InventoryService: Inventory rolled back for OrderID %s. Current stock: %v", orderID, s.stock)
}

func (s *InventoryService) publishInventoryReserved(ctx context.Context, orderID, productID string, quantity int) {
    event := common.InventoryReservedEvent{
        OrderID:   orderID,
        ProductID: productID,
        Quantity:  quantity,
        Timestamp: time.Now(),
    }
    payload, err := json.Marshal(event)
    if err != nil {
        log.Printf("InventoryService: Failed to marshal InventoryReservedEvent: %v", err)
        return
    }
    msg := kafka.Message{
        Key:   []byte(orderID),
        Value: payload,
        Headers: []kafka.Header{
            {Key: "EventType", Value: []byte(event.GetEventType())},
        },
    }
    err = s.producer.WriteMessages(ctx, msg)
    if err != nil {
        log.Printf("InventoryService: Failed to publish InventoryReservedEvent: %v", err)
    } else {
        log.Printf("InventoryService: Published InventoryReservedEvent for OrderID %s, Product %s", orderID, productID)
    }
}

func (s *InventoryService) publishInventoryReservationFailed(ctx context.Context, orderID, productID, reason string) {
    event := common.InventoryReservationFailedEvent{
        OrderID:   orderID,
        ProductID: productID,
        Reason:    reason,
        Timestamp: time.Now(),
    }
    payload, err := json.Marshal(event)
    if err != nil {
        log.Printf("InventoryService: Failed to marshal InventoryReservationFailedEvent: %v", err)
        return
    }
    msg := kafka.Message{
        Key:   []byte(orderID),
        Value: payload,
        Headers: []kafka.Header{
            {Key: "EventType", Value: []byte(event.GetEventType())},
        },
    }
    err = s.producer.WriteMessages(ctx, msg)
    if err != nil {
        log.Printf("InventoryService: Failed to publish InventoryReservationFailedEvent: %v", err)
    } else {
        log.Printf("InventoryService: Published InventoryReservationFailedEvent for OrderID %s, Product %s", orderID, productID)
    }
}

func main() {
    kafkaBroker := "localhost:9092"
    inventoryService := NewInventoryService(kafkaBroker)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go inventoryService.StartEventConsumer(ctx)

    time.Sleep(15 * time.Second) // 运行一段时间观察事件流
}

优缺点分析 (编排式):

  • 优点:
    • 高度解耦: 服务之间完全通过事件通信,没有中央协调器,降低了单点故障风险。
    • 简单性 (对于小型Saga): 对于只有少数步骤且业务逻辑不复杂的Saga,编排式实现相对简单直观。
  • 缺点:
    • 流程难以追踪: 随着Saga步骤的增加和参与者服务的增多,事件流会变得复杂,难以直观地理解整个业务流程的状态和走向。
    • 服务职责不清晰: 每个服务都需要知道它应该在什么事件之后做什么,以及在什么事件之后进行补偿。这使得服务逻辑变得耦合,并且难以修改。
    • 循环依赖风险: 不恰当的事件设计可能导致服务之间出现循环依赖。
    • 补偿逻辑分散: 补偿逻辑散布在各个服务中,难以统一管理和调试。

协调器式 Saga (Orchestration-based Saga)

  • 原理: 引入一个专门的Saga协调器(Orchestrator)服务,它负责管理整个分布式事务的流程。协调器接收初始请求,然后通过发送命令(Command)给参与者服务,并等待它们的回复(Reply)来驱动Saga的执行。协调器内部维护一个状态机来追踪Saga的当前状态,并在需要时触发补偿事务。
  • Go语言实现示例 (Saga Orchestrator)

继续以上面的订单创建Saga为例。现在我们将引入一个 OrderSagaOrchestrator 服务。

命令与回复定义 (common/commands.go):

package common

import "time"

// Commands from Orchestrator to Participants
type CreateOrderCommand struct {
    TxID       TransactionID `json:"tx_id"`
    OrderID    string        `json:"order_id"`
    CustomerID string        `json:"customer_id"`
    Items      []OrderItem   `json:"items"`
    TotalPrice float64       `json:"total_price"`
}

type ReserveInventoryCommand struct {
    TxID       TransactionID `json:"tx_id"`
    OrderID    string        `json:"order_id"`
    Items      []OrderItem   `json:"items"`
}

type ProcessPaymentCommand struct {
    TxID       TransactionID `json:"tx_id"`
    OrderID    string        `json:"order_id"`
    Amount     float64       `json:"amount"`
    CustomerID string        `json:"customer_id"`
}

type UpdateOrderStatusCommand struct { // 用于订单服务更新自身状态
    TxID       TransactionID `json:"tx_id"`
    OrderID    string        `json:"order_id"`
    Status     string        `json:"status"` // e.g., "PENDING", "COMPLETED", "FAILED"
}

// Compensating Commands
type CompensateInventoryCommand struct {
    TxID       TransactionID `json:"tx_id"`
    OrderID    string        `json:"order_id"`
    Items      []OrderItem   `json:"items"` // 需要知道具体回滚哪个订单的哪些商品
}

type RefundPaymentCommand struct { // 如果支付成功但后续失败
    TxID       TransactionID `json:"tx_id"`
    OrderID    string        `json:"order_id"`
    Amount     float64       `json:"amount"`
}

// Replies from Participants to Orchestrator
type CommandReply struct {
    TxID       TransactionID `json:"tx_id"`
    OrderID    string        `json:"order_id"`
    Success    bool          `json:"success"`
    Message    string        `json:"message"`
    StepName   string        `json:"step_name"` // 用于协调器识别是哪个步骤的回复
    Timestamp  time.Time     `json:"timestamp"`
}

Saga协调器 (orchestrator_service.go):

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "sync"
    "time"

    "github.com/segmentio/kafka-go"
    "your_module/common"
)

// SagaState 定义Saga的各个状态
const (
    SagaState_Pending      = "PENDING"
    SagaState_OrderCreated = "ORDER_CREATED"
    SagaState_InventoryReserved = "INVENTORY_RESERVED"
    SagaState_PaymentProcessed  = "PAYMENT_PROCESSED"
    SagaState_Completed    = "COMPLETED"
    SagaState_Failed       = "FAILED"
    SagaState_Compensating = "COMPENSATING"
)

type SagaInstance struct {
    TxID       common.TransactionID
    OrderID    string
    CustomerID string
    Items      []common.OrderItem
    TotalPrice float64
    CurrentState string
    // 存储每个步骤成功时需要的回滚信息
    InventoryReservationDetails map[string]int // productID -> quantity
    PaymentDetails              struct {
        Amount float64
        Status string
    }
    CreatedAt time.Time
    UpdatedAt time.Time
}

type OrderSagaOrchestrator struct {
    commandProducer *kafka.Writer // 发送命令给参与者
    replyConsumer   *kafka.Reader // 接收参与者的回复

    sagas       map[common.TransactionID]*SagaInstance // 存储所有Saga实例的状态
    mu          sync.Mutex
    kafkaBroker string
}

func NewOrderSagaOrchestrator(kafkaBroker string) *OrderSagaOrchestrator {
    commandProducer := &kafka.Writer{
        Addr:     kafka.TCP(kafkaBroker),
        Topic:    "saga-commands", // 所有命令发往这个topic
        Balancer: &kafka.LeastBytes{},
    }
    replyConsumer := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{kafkaBroker},
        Topic:   "saga-replies", // 接收所有回复
        GroupID: "orchestrator-group",
        MinBytes: 10e3,
        MaxBytes: 10e6,
        MaxWait:  1 * time.Second,
    })
    return &OrderSagaOrchestrator{
        commandProducer: commandProducer,
        replyConsumer:   replyConsumer,
        sagas:       make(map[common.TransactionID]*SagaInstance),
        kafkaBroker: kafkaBroker,
    }
}

// StartSaga 启动一个新的Saga事务
func (o *OrderSagaOrchestrator) StartSaga(ctx context.Context, customerID string, items []common.OrderItem) (string, error) {
    txID := common.TransactionID(fmt.Sprintf("SAGA-%d", rand.Intn(100000)))
    orderID := fmt.Sprintf("ORD-%d", rand.Intn(100000))
    totalPrice := 0.0
    for _, item := range items {
        totalPrice += item.Price * float64(item.Quantity)
    }

    saga := &SagaInstance{
        TxID:       txID,
        OrderID:    orderID,
        CustomerID: customerID,
        Items:      items,
        TotalPrice: totalPrice,
        CurrentState: SagaState_Pending,
        CreatedAt: time.Now(),
        UpdatedAt: time.Now(),
        InventoryReservationDetails: make(map[string]int),
    }

    o.mu.Lock()
    o.sagas[txID] = saga
    o.mu.Unlock()

    log.Printf("Orchestrator: Started Saga %s for Order %s", txID, orderID)

    // Step 1: Create Order in Order Service
    cmd := common.CreateOrderCommand{
        TxID:       txID,
        OrderID:    orderID,
        CustomerID: customerID,
        Items:      items,
        TotalPrice: totalPrice,
    }
    err := o.sendCommand(ctx, "order-service", "CreateOrder", cmd)
    if err != nil {
        log.Printf("Orchestrator: Failed to send CreateOrderCommand for TxID %s: %v", txID, err)
        o.failSaga(txID, "Failed to send initial order command")
        return "", err
    }
    return orderID, nil
}

// StartReplyConsumer 启动回复消息的消费者
func (o *OrderSagaOrchestrator) StartReplyConsumer(ctx context.Context) {
    log.Println("Orchestrator: Starting reply consumer...")
    for {
        select {
        case <-ctx.Done():
            log.Println("Orchestrator: Stopping reply consumer.")
            o.replyConsumer.Close()
            return
        default:
            m, err := o.replyConsumer.ReadMessage(ctx)
            if err != nil {
                log.Printf("Orchestrator: Error reading reply message: %v", err)
                continue
            }
            var reply common.CommandReply
            if err := json.Unmarshal(m.Value, &reply); err != nil {
                log.Printf("Orchestrator: Failed to unmarshal CommandReply: %v", err)
                continue
            }
            log.Printf("Orchestrator: Received reply for TxID %s, Step: %s, Success: %t", reply.TxID, reply.StepName, reply.Success)
            o.handleReply(ctx, reply)
        }
    }
}

// handleReply 处理来自参与者的回复
func (o *OrderSagaOrchestrator) handleReply(ctx context.Context, reply common.CommandReply) {
    o.mu.Lock()
    defer o.mu.Unlock()

    saga, exists := o.sagas[reply.TxID]
    if !exists {
        log.Printf("Orchestrator: Received reply for unknown Saga TxID %s", reply.TxID)
        return
    }

    // 幂等性检查:如果Saga已完成或失败,则忽略重复回复
    if saga.CurrentState == SagaState_Completed || saga.CurrentState == SagaState_Failed || saga.CurrentState == SagaState_Compensating {
        log.Printf("Orchestrator: Saga %s already in state %s, ignoring reply for step %s", saga.TxID, saga.CurrentState, reply.StepName)
        return
    }

    saga.UpdatedAt = time.Now()

    if !reply.Success {
        log.Printf("Orchestrator: Step %s failed for Saga %s: %s. Initiating compensation.", reply.StepName, saga.TxID, reply.Message)
        o.startCompensation(ctx, saga, reply.StepName)
        return
    }

    // 状态机流转
    switch reply.StepName {
    case "CreateOrder":
        if saga.CurrentState == SagaState_Pending {
            saga.CurrentState = SagaState_OrderCreated
            log.Printf("Orchestrator: Saga %s state changed to %s. Sending ReserveInventoryCommand.", saga.TxID, saga.CurrentState)
            cmd := common.ReserveInventoryCommand{
                TxID:    saga.TxID,
                OrderID: saga.OrderID,
                Items:   saga.Items,
            }
            o.sendCommand(ctx, "inventory-service", "ReserveInventory", cmd)
            // 在这里保存库存预留的详细信息,以便后续补偿
            for _, item := range saga.Items {
                saga.InventoryReservationDetails[item.ProductID] = item.Quantity
            }
        }
    case "ReserveInventory":
        if saga.CurrentState == SagaState_OrderCreated {
            saga.CurrentState = SagaState_InventoryReserved
            log.Printf("Orchestrator: Saga %s state changed to %s. Sending ProcessPaymentCommand.", saga.TxID, saga.CurrentState)
            cmd := common.ProcessPaymentCommand{
                TxID:       saga.TxID,
                OrderID:    saga.OrderID,
                Amount:     saga.TotalPrice,
                CustomerID: saga.CustomerID,
            }
            o.sendCommand(ctx, "payment-service", "ProcessPayment", cmd)
        }
    case "ProcessPayment":
        if saga.CurrentState == SagaState_InventoryReserved {
            saga.CurrentState = SagaState_PaymentProcessed
            saga.PaymentDetails.Amount = saga.TotalPrice
            saga.PaymentDetails.Status = "success" // 假设回复成功则支付成功
            log.Printf("Orchestrator: Saga %s state changed to %s. Sending UpdateOrderStatusCommand (final).", saga.TxID, saga.CurrentState)
            cmd := common.UpdateOrderStatusCommand{
                TxID:    saga.TxID,
                OrderID: saga.OrderID,
                Status:  "COMPLETED",
            }
            o.sendCommand(ctx, "order-service", "UpdateOrderStatus", cmd)
            saga.CurrentState = SagaState_Completed // Saga完成
            log.Printf("Orchestrator: Saga %s completed successfully!", saga.TxID)
        }
    case "UpdateOrderStatus": // 最终状态更新,表示整个Saga成功结束
        if saga.CurrentState == SagaState_PaymentProcessed {
            // 理论上这里Saga已经Completed了,这个回复只是确认
            log.Printf("Orchestrator: Order status updated for Saga %s. Confirmed completed.", saga.TxID)
        }
    default:
        log.Printf("Orchestrator: Unknown step name in reply: %s for Saga %s", reply.StepName, saga.TxID)
    }
    // 实际中,这里需要持久化Saga实例的状态
}

// startCompensation 启动补偿流程
func (o *OrderSagaOrchestrator) startCompensation(ctx context.Context, saga *SagaInstance, failedStep string) {
    log.Printf("Orchestrator: Starting compensation for Saga %s, failed at step %s", saga.TxID, failedStep)
    saga.CurrentState = SagaState_Compensating
    // 实际中,这里需要持久化Saga实例的状态

    // 根据失败的步骤,逆序执行补偿操作
    switch failedStep {
    case "ProcessPayment":
        // 如果支付失败,但之前库存已预留,需要回滚库存
        if saga.InventoryReservationDetails != nil && len(saga.InventoryReservationDetails) > 0 {
            cmd := common.CompensateInventoryCommand{
                TxID:    saga.TxID,
                OrderID: saga.OrderID,
                Items:   saga.Items, // 使用原始订单项目来计算回滚
            }
            o.sendCommand(ctx, "inventory-service", "CompensateInventory", cmd)
        }
        // 同时更新订单服务状态为失败
        updateOrderCmd := common.UpdateOrderStatusCommand{
            TxID:    saga.TxID,
            OrderID: saga.OrderID,
            Status:  "PAYMENT_FAILED",
        }
        o.sendCommand(ctx, "order-service", "UpdateOrderStatus", updateOrderCmd)
    case "ReserveInventory":
        // 如果库存预留失败,只需更新订单服务状态为失败
        updateOrderCmd := common.UpdateOrderStatusCommand{
            TxID:    saga.TxID,
            OrderID: saga.OrderID,
            Status:  "INVENTORY_FAILED",
        }
        o.sendCommand(ctx, "order-service", "UpdateOrderStatus", updateOrderCmd)
    case "CreateOrder":
        // 如果订单创建失败,没有需要补偿的下游服务
        updateOrderCmd := common.UpdateOrderStatusCommand{
            TxID:    saga.TxID,
            OrderID: saga.OrderID,
            Status:  "ORDER_CREATION_FAILED",
        }
        o.sendCommand(ctx, "order-service", "UpdateOrderStatus", updateOrderCmd)
    default:
        log.Printf("Orchestrator: No compensation defined for failed step %s", failedStep)
    }

    saga.CurrentState = SagaState_Failed // 补偿完成后,Saga最终状态为失败
    log.Printf("Orchestrator: Saga %s compensation completed. Final state: %s", saga.TxID, saga.CurrentState)
    // 实际中,这里需要持久化Saga实例的最终状态
}

// sendCommand 封装发送命令的逻辑
func (o *OrderSagaOrchestrator) sendCommand(ctx context.Context, targetService, stepName string, command interface{}) error {
    payload, err := json.Marshal(command)
    if err != nil {
        return fmt.Errorf("failed to marshal command %s: %w", stepName, err)
    }
    msg := kafka.Message{
        Key:   []byte(command.(interface { GetTxID() common.TransactionID }).GetTxID()), // 假设所有命令都有GetTxID方法
        Value: payload,
        Headers: []kafka.Header{
            {Key: "CommandType", Value: []byte(stepName)},
            {Key: "TargetService", Value: []byte(targetService)}, // 告知哪个服务应该处理
        },
    }
    err = o.commandProducer.WriteMessages(ctx, msg)
    if err != nil {
        return fmt.Errorf("failed to publish command %s: %w", stepName, err)
    }
    log.Printf("Orchestrator: Sent command %s for TxID %s to %s", stepName, command.(interface { GetTxID() common.TransactionID }).GetTxID(), targetService)
    return nil
}

func main() {
    kafkaBroker := "localhost:9092"
    orchestrator := NewOrderSagaOrchestrator(kafkaBroker)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go orchestrator.StartReplyConsumer(ctx)

    // 模拟启动一个Saga
    orderID, err := orchestrator.StartSaga(ctx, "customer-123", []common.OrderItem{
        {ProductID: "P1", Quantity: 2, Price: 10.0},
        {ProductID: "P2", Quantity: 1, Price: 25.0},
    })
    if err != nil {
        log.Fatalf("Failed to start saga: %v", err)
    }
    fmt.Printf("Initial order ID: %sn", orderID)

    time.Sleep(20 * time.Second) // 运行一段时间观察事件流和Saga状态
}

参与者服务(如库存服务)处理命令和发送回复的伪代码:

// inventory_participant_orchestrated.go (部分代码,结合之前的InventoryService)
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/segmentio/kafka-go"
    "your_module/common"
)

type InventoryServiceOrchestrated struct {
    commandConsumer *kafka.Reader // 接收协调器命令
    replyProducer   *kafka.Writer // 发送回复给协调器
    stock           map[string]int
    mu              sync.Mutex
}

func NewInventoryServiceOrchestrated(kafkaBroker string) *InventoryServiceOrchestrated {
    commandConsumer := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{kafkaBroker},
        Topic:   "saga-commands",
        GroupID: "inventory-service-orchestrated-group",
        MinBytes: 10e3,
        MaxBytes: 10e6,
        MaxWait:  1 * time.Second,
    })
    replyProducer := &kafka.Writer{
        Addr:     kafka.TCP(kafkaBroker),
        Topic:    "saga-replies",
        Balancer: &kafka.LeastBytes{},
    }
    return &InventoryServiceOrchestrated{
        commandConsumer: commandConsumer,
        replyProducer:   replyProducer,
        stock: map[string]int{
            "P1": 100,
            "P2": 50,
        },
    }
}

func (s *InventoryServiceOrchestrated) StartCommandConsumer(ctx context.Context) {
    log.Println("InventoryServiceOrchestrated: Starting command consumer...")
    for {
        select {
        case <-ctx.Done():
            log.Println("InventoryServiceOrchestrated: Stopping command consumer.")
            s.commandConsumer.Close()
            return
        default:
            m, err := s.commandConsumer.ReadMessage(ctx)
            if err != nil {
                log.Printf("InventoryServiceOrchestrated: Error reading command message: %v", err)
                continue
            }
            commandType := string(m.Headers[0].Value) // CommandType
            targetService := string(m.Headers[1].Value) // TargetService

            if targetService != "inventory-service" { // 只有是发给自己的命令才处理
                continue
            }

            log.Printf("InventoryServiceOrchestrated: Received command %s for TxID %s", commandType, string(m.Key))

            var reply common.CommandReply
            reply.TxID = common.TransactionID(m.Key)
            reply.OrderID = string(m.Key) // 假设Key就是OrderID
            reply.StepName = commandType
            reply.Timestamp = time.Now()

            switch commandType {
            case "ReserveInventory":
                var cmd common.ReserveInventoryCommand
                if err := json.Unmarshal(m.Value, &cmd); err != nil {
                    log.Printf("InventoryServiceOrchestrated: Failed to unmarshal ReserveInventoryCommand: %v", err)
                    reply.Success = false
                    reply.Message = "invalid command payload"
                } else {
                    reply.OrderID = cmd.OrderID
                    ok, msg := s.handleReserveInventory(ctx, cmd)
                    reply.Success = ok
                    reply.Message = msg
                }
            case "CompensateInventory":
                var cmd common.CompensateInventoryCommand
                if err := json.Unmarshal(m.Value, &cmd); err != nil {
                    log.Printf("InventoryServiceOrchestrated: Failed to unmarshal CompensateInventoryCommand: %v", err)
                    reply.Success = false
                    reply.Message = "invalid command payload"
                } else {
                    reply.OrderID = cmd.OrderID
                    ok, msg := s.handleCompensateInventory(ctx, cmd)
                    reply.Success = ok
                    reply.Message = msg
                }
            default:
                log.Printf("InventoryServiceOrchestrated: Unknown command type %s", commandType)
                reply.Success = false
                reply.Message = "unknown command type"
            }
            s.sendReply(ctx, reply)
        }
    }
}

func (s *InventoryServiceOrchestrated) handleReserveInventory(ctx context.Context, cmd common.ReserveInventoryCommand) (bool, string) {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 幂等性检查:这里需要更精细的逻辑,例如查询数据库中是否已有该TxID的预留记录
    // 简单示例:假设未处理过
    log.Printf("InventoryServiceOrchestrated: Processing ReserveInventory for OrderID %s", cmd.OrderID)
    for _, item := range cmd.Items {
        if s.stock[item.ProductID] < item.Quantity {
            return false, fmt.Sprintf("not enough stock for %s", item.ProductID)
        }
    }
    for _, item := range cmd.Items {
        s.stock[item.ProductID] -= item.Quantity
    }
    log.Printf("InventoryServiceOrchestrated: Inventory reserved for OrderID %s. Current stock: %v", cmd.OrderID, s.stock)
    return true, "inventory reserved"
}

func (s *InventoryServiceOrchestrated) handleCompensateInventory(ctx context.Context, cmd common.CompensateInventoryCommand) (bool, string) {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 幂等性检查:查询数据库中是否已回滚或未曾预留
    log.Printf("InventoryServiceOrchestrated: Processing CompensateInventory for OrderID %s", cmd.OrderID)
    for _, item := range cmd.Items {
        s.stock[item.ProductID] += item.Quantity
    }
    log.Printf("InventoryServiceOrchestrated: Inventory compensated for OrderID %s. Current stock: %v", cmd.OrderID, s.stock)
    return true, "inventory compensated"
}

func (s *InventoryServiceOrchestrated) sendReply(ctx context.Context, reply common.CommandReply) {
    payload, err := json.Marshal(reply)
    if err != nil {
        log.Printf("InventoryServiceOrchestrated: Failed to marshal CommandReply: %v", err)
        return
    }
    msg := kafka.Message{
        Key:   []byte(reply.TxID),
        Value: payload,
        Headers: []kafka.Header{
            {Key: "StepName", Value: []byte(reply.StepName)},
        },
    }
    err = s.replyProducer.WriteMessages(ctx, msg)
    if err != nil {
        log.Printf("InventoryServiceOrchestrated: Failed to publish CommandReply: %v", err)
    } else {
        log.Printf("InventoryServiceOrchestrated: Sent reply for TxID %s, Step %s", reply.TxID, reply.StepName)
    }
}

func main() {
    kafkaBroker := "localhost:9092"
    inventoryService := NewInventoryServiceOrchestrated(kafkaBroker)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go inventoryService.StartCommandConsumer(ctx)

    time.Sleep(25 * time.Second)
}

优缺点分析 (协调器式):

  • 优点:
    • 集中式流程管理: Saga的业务逻辑和流程清晰地定义在协调器中,易于理解、维护和修改。
    • 易于追踪: 协调器作为中央控制点,可以轻松追踪Saga的当前状态,并提供更好的可观测性。
    • 简化服务逻辑: 参与者服务只需关注自身本地事务的执行和补偿逻辑,无需了解整个Saga的流程。
    • 更好的故障处理: 协调器可以更容易地实现重试、超时处理和死信队列等机制。
  • 缺点:
    • 协调器成为单点故障风险: 如果协调器发生故障,整个Saga可能会停滞。需要对协调器进行高可用性设计(例如,通过集群、持久化Saga状态等)。
    • 可能引入性能瓶颈: 如果协调器需要处理大量Saga事务,可能会成为性能瓶颈。
    • 中心化控制: 与微服务去中心化的思想略有冲突,但这种中心化是业务流程层面的,而非数据存储层面的。

Sagas中的关键挑战与Go语言实践

无论选择编排式还是协调器式Saga,以下几个关键挑战是必须面对的,Go语言的特性为解决这些挑战提供了便利。

幂等性 (Idempotency)

  • 为什么重要: 在分布式系统中,消息队列通常提供“至少一次”的交付保证。这意味着消息可能会被重复发送和处理。如果一个服务多次处理同一个“扣减库存”请求,就会导致数据不一致。因此,所有Saga参与者的本地事务必须是幂等的。
  • Go语言中的实现策略:

    • 唯一事务ID: 每个Saga事务都应有一个全局唯一的TxID。参与者服务在处理命令或事件时,首先检查该TxID是否已经被处理过。
    • 数据库检查: 在执行本地事务之前,服务应在本地数据库中查询是否已经存在与该TxID相关的事务记录。

      // 伪代码:库存服务处理库存预留的幂等性检查
      func (s *InventoryServiceOrchestrated) handleReserveInventory(ctx context.Context, cmd common.ReserveInventoryCommand) (bool, string) {
          s.mu.Lock()
          defer s.mu.Unlock()
      
          // 1. 检查是否已经处理过这个TxID的库存预留
          // 假设我们有一个transactions表来记录已处理的分布式事务ID
          // row := s.db.QueryRow("SELECT status FROM saga_transactions WHERE tx_id = ?", cmd.TxID)
          // if row.Scan(&status); err == nil && status == "completed" {
          //     log.Printf("InventoryService: TxID %s already processed, returning success.", cmd.TxID)
          //     return true, "already processed"
          // }
      
          // 2. 执行业务逻辑
          // ... 扣减库存 ...
      
          // 3. 记录TxID为已处理 (在同一个本地事务中)
          // tx, _ := s.db.Begin()
          // tx.Exec("UPDATE inventory SET quantity = quantity - ? WHERE product_id = ?", item.Quantity, item.ProductID)
          // tx.Exec("INSERT INTO saga_transactions (tx_id, status) VALUES (?, 'completed')", cmd.TxID)
          // tx.Commit()
      
          return true, "inventory reserved"
      }
    • 版本控制 / 乐观锁: 对于需要更新的资源,可以使用版本号或时间戳进行乐观锁控制,确保不会重复应用旧

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注