深入 ‘Two-Phase Commit (2PC) vs. TCC’:在 Go 微服务架构中处理跨库事务的最终一致性方案

各位同仁,下午好!

今天,我们将深入探讨在 Go 微服务架构中处理跨库事务的最终一致性方案。随着业务复杂度的提升和系统规模的膨胀,单一数据库已无法满足所有需求,分布式系统成为主流。然而,分布式系统也带来了新的挑战,其中最棘手的问题之一就是如何保证跨多个服务、多个数据库操作的数据一致性。

在单体应用中,我们习惯于ACID事务的强大保障:原子性、一致性、隔离性和持久性。但当业务拆分成微服务,每个服务拥有自己的数据库时,传统的本地事务就无法跨越服务边界。这就引出了分布式事务的概念。

在分布式事务领域,强一致性和最终一致性是两种主要目标。强一致性意味着所有参与者在事务结束时都达到相同的状态,并且在任何时刻查询都能看到这个最新状态。而最终一致性则允许系统在一段时间内处于不一致状态,但最终会达到一致。在微服务架构中,为了追求高可用性、可伸缩性和性能,我们通常会倾向于采用最终一致性方案。

今天,我们将聚焦于两种实现最终一致性的经典模式:两阶段提交(Two-Phase Commit, 2PC)和事务补偿提交(Transactional Compensating Commit, TCC)。我们将深入剖析它们的原理、优缺点,并通过 Go 语言代码示例来模拟其实现细节,帮助大家理解如何在实际项目中做出权衡和选择。


分布式事务的挑战与最终一致性的需求

在讲解具体方案之前,我们首先要理解为什么分布式事务如此复杂。CAP 定理告诉我们,在一个分布式系统中,我们不可能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition Tolerance)这三者。在微服务架构中,分区容错性是不可避免的,这意味着我们必须在一致性和可用性之间做出选择。

对于大多数业务场景,尤其是涉及用户体验和高并发的场景,可用性往往比强一致性更为重要。短暂的数据不一致可以接受,但服务长时间不可用是灾难性的。因此,最终一致性成为了分布式事务的首选目标。

最终一致性意味着:

  1. 高可用性: 即使部分服务失败,系统仍能对外提供服务。
  2. 高性能: 避免长时间的资源锁定,提高并发吞吐量。
  3. 松耦合: 服务间通过异步消息或轻量级协议协作,降低依赖。

现在,让我们来看看两种主要的实现模式。


两阶段提交(Two-Phase Commit, 2PC)

两阶段提交(2PC)是一种经典的分布式事务协议,旨在保证分布式系统中的所有节点要么全部提交事务,要么全部回滚事务,从而实现强一致性。尽管它在微服务架构中因其缺点而较少直接使用,但理解其原理对于理解分布式事务的挑战和后续的解决方案至关重要。

2PC 的基本原理

2PC 协议涉及一个事务协调者(Transaction Coordinator) 和多个事务参与者(Transaction Participants)

整个过程分为两个阶段:

阶段一:提交请求阶段(Prepare Phase)

  1. 协调者发送准备请求: 协调者向所有参与者发送 prepare 消息,询问它们是否可以执行事务。
  2. 参与者执行事务并响应:
    • 每个参与者接收到 prepare 请求后,会在本地执行事务操作,但不提交。
    • 它会将事务涉及的资源锁定,并记录事务日志(undo/redo logs)。
    • 如果参与者能够成功执行事务(即预留资源、通过业务校验),它会向协调者发送 yes 响应。
    • 如果参与者无法执行事务(如资源不足、业务校验失败),它会向协调者发送 no 响应。

阶段二:事务提交阶段(Commit Phase)

协调者根据所有参与者的响应做出决定。

  1. 所有参与者都响应 yes
    • 协调者向所有参与者发送 commit 消息。
    • 参与者收到 commit 消息后,提交本地事务,并释放所有事务锁定的资源,然后向协调者发送 ack 消息。
  2. 任何参与者响应 no 或超时:
    • 协调者向所有参与者发送 rollback 消息。
    • 参与者收到 rollback 消息后,回滚本地事务,释放所有事务锁定的资源,然后向协调者发送 ack 消息。

流程图示:

角色 阶段一:准备 阶段二:提交(全部成功) 阶段二:回滚(任一失败)
协调者 1. 发送 Prepare 1. 发送 Commit 1. 发送 Rollback
参与者A 2. 执行事务,锁定资源,写日志,响应 Yes/No 2. 提交事务,释放资源,响应 Ack 2. 回滚事务,释放资源,响应 Ack
参与者B 2. 执行事务,锁定资源,写日志,响应 Yes/No 2. 提交事务,释放资源,响应 Ack 2. 回滚事务,释放资源,响应 Ack

2PC 的优缺点

优点:

  • 强一致性: 在正常情况下,2PC 能够保证所有参与者的数据最终保持一致,满足 ACID 事务的原子性。
  • 简单易懂: 协议逻辑相对直接。

缺点:

  • 同步阻塞: 协调者和参与者在整个事务过程中需要保持同步阻塞,等待彼此的响应。这导致了资源长时间锁定,降低了系统的并发性能和吞吐量。
  • 单点故障: 协调者是整个事务的中心,如果协调者在第二阶段发生故障,那么所有参与者都将处于阻塞状态,直到协调者恢复。这被称为“协调者宕机”问题。
  • 数据不一致风险:
    • 在协调者发出 commit 消息后,如果部分参与者收到消息并提交,而另一些参与者未收到消息(例如网络故障或参与者宕机),那么系统就会出现数据不一致。
    • 即使协调者在发出 rollback 消息后也可能出现类似问题。
  • 性能瓶颈: 跨网络通信和资源锁定带来了显著的延迟,尤其是在微服务数量增多时。
  • 难以实现: 在复杂的微服务架构中,要实现一个健壮、容错且高性能的 2PC 协议是非常困难的,通常需要借助专门的分布式事务管理器(如 XA 事务),但这又带来了更强的耦合和部署复杂性。

Go 语言中 2PC 的概念模拟

在 Go 微服务架构中,我们通常不会直接实现一个生产级的 2PC 协议,因为它与微服务的去中心化、高可用设计理念相悖。但我们可以通过一个简化的模型来模拟其核心思想,以理解其工作机制和潜在问题。

假设我们有一个订单服务、一个库存服务和一个支付服务,我们需要在它们之间进行一个跨服务的购买操作。

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 定义事务参与者接口
type Participant interface {
    Prepare(ctx context.Context, transactionID string) error
    Commit(ctx context.Context, transactionID string) error
    Rollback(ctx context.Context, transactionID string) error
    GetName() string
}

// 订单服务模拟
type OrderService struct {
    orders map[string]string // 模拟订单数据
    mu     sync.Mutex
}

func NewOrderService() *OrderService {
    return &OrderService{
        orders: make(map[string]string),
    }
}

func (s *OrderService) GetName() string { return "OrderService" }

func (s *OrderService) Prepare(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 模拟订单预创建逻辑,检查用户余额、商品信息等
    // 实际中可能插入一个'pending'状态的订单记录
    fmt.Printf("[%s] %s: Preparing order for transaction %sn", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    // 模拟随机失败
    // if transactionID == "fail_order_prepare" {
    //  return fmt.Errorf("order service prepare failed for %s", transactionID)
    // }
    s.orders[transactionID] = "pending" // 预留订单
    fmt.Printf("[%s] %s: Order %s prepared (status: %s)n", time.Now().Format("15:04:05"), s.GetName(), transactionID, s.orders[transactionID])
    return nil
}

func (s *OrderService) Commit(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.orders[transactionID] == "pending" {
        s.orders[transactionID] = "completed"
        fmt.Printf("[%s] %s: Order %s committed (status: %s)n", time.Now().Format("15:04:05"), s.GetName(), transactionID, s.orders[transactionID])
    } else {
        fmt.Printf("[%s] %s: Order %s not in pending state, no commit needed.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    }
    return nil
}

func (s *OrderService) Rollback(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.orders[transactionID] == "pending" {
        delete(s.orders, transactionID) // 删除预留订单
        fmt.Printf("[%s] %s: Order %s rolled back (deleted)n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    } else {
        fmt.Printf("[%s] %s: Order %s not in pending state, no rollback needed.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    }
    return nil
}

// 库存服务模拟
type InventoryService struct {
    stock map[string]int // 模拟商品库存
    mu    sync.Mutex
}

func NewInventoryService() *InventoryService {
    return &InventoryService{
        stock: map[string]int{"itemA": 100},
    }
}

func (s *InventoryService) GetName() string { return "InventoryService" }

func (s *InventoryService) Prepare(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 模拟库存预留逻辑
    fmt.Printf("[%s] %s: Preparing stock for transaction %sn", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    // 假设预留1个itemA
    if s.stock["itemA"] < 1 {
        return fmt.Errorf("inventory service prepare failed for %s: itemA out of stock", transactionID)
    }
    s.stock["itemA"]-- // 预扣库存
    fmt.Printf("[%s] %s: Stock for itemA prepared (remaining: %d)n", time.Now().Format("15:04:05"), s.GetName(), s.stock["itemA"])
    return nil
}

func (s *InventoryService) Commit(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 库存已经预扣,无需额外操作,这里可以记录一个已提交的日志
    fmt.Printf("[%s] %s: Stock for transaction %s committed.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    return nil
}

func (s *InventoryService) Rollback(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 归还预扣库存
    s.stock["itemA"]++
    fmt.Printf("[%s] %s: Stock for transaction %s rolled back (itemA returned, total: %d)n", time.Now().Format("15:04:05"), s.GetName(), s.stock["itemA"])
    return nil
}

// 支付服务模拟
type PaymentService struct {
    accounts map[string]float64 // 模拟账户余额
    mu       sync.Mutex
}

func NewPaymentService() *PaymentService {
    return &PaymentService{
        accounts: map[string]float64{"user123": 1000.00},
    }
}

func (s *PaymentService) GetName() string { return "PaymentService" }

func (s *PaymentService) Prepare(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 模拟支付预授权逻辑
    fmt.Printf("[%s] %s: Preparing payment for transaction %sn", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    // 假设扣款100
    if s.accounts["user123"] < 100.00 {
        return fmt.Errorf("payment service prepare failed for %s: insufficient funds", transactionID)
    }
    s.accounts["user123"] -= 100.00 // 预扣款
    fmt.Printf("[%s] %s: Payment for user123 prepared (balance: %.2f)n", time.Now().Format("15:04:05"), s.GetName(), s.accounts["user123"])
    return nil
}

func (s *PaymentService) Commit(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 预扣款已经完成,这里可以记录一个已提交的支付记录
    fmt.Printf("[%s] %s: Payment for transaction %s committed.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    return nil
}

func (s *PaymentService) Rollback(ctx context.Context, transactionID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    // 归还预扣款
    s.accounts["user123"] += 100.00
    fmt.Printf("[%s] %s: Payment for transaction %s rolled back (user123 balance: %.2f)n", time.Now().Format("15:04:05"), s.GetName(), s.accounts["user123"])
    return nil
}

// 2PC 协调者
type TwoPhaseCoordinator struct {
    participants []Participant
}

func NewTwoPhaseCoordinator(participants ...Participant) *TwoPhaseCoordinator {
    return &TwoPhaseCoordinator{
        participants: participants,
    }
}

func (c *TwoPhaseCoordinator) ExecuteTransaction(ctx context.Context, transactionID string) error {
    fmt.Printf("n--- Starting 2PC transaction %s ---n", transactionID)

    // Phase 1: Prepare
    fmt.Printf("Phase 1: Prepare for transaction %sn", transactionID)
    prepareSuccess := true
    var wg sync.WaitGroup
    errChan := make(chan error, len(c.participants))

    for _, p := range c.participants {
        wg.Add(1)
        go func(participant Participant) {
            defer wg.Done()
            if err := participant.Prepare(ctx, transactionID); err != nil {
                fmt.Printf("!!! [%s] %s prepare failed: %vn", time.Now().Format("15:04:05"), participant.GetName(), err)
                errChan <- err
            }
        }(p)
    }
    wg.Wait()
    close(errChan)

    if len(errChan) > 0 { // Any prepare failed
        prepareSuccess = false
        fmt.Printf("Phase 1: Prepare failed for transaction %s. Initiating rollback.n", transactionID)
        // Phase 2: Rollback
        for _, p := range c.participants {
            _ = p.Rollback(ctx, transactionID) // In real scenario, handle rollback errors carefully
        }
        fmt.Printf("--- 2PC transaction %s rolled back ---n", transactionID)
        return fmt.Errorf("transaction %s failed during prepare phase: %v", transactionID, <-errChan)
    }

    // Phase 2: Commit
    fmt.Printf("Phase 2: All participants prepared. Committing transaction %sn", transactionID)
    for _, p := range c.participants {
        wg.Add(1)
        go func(participant Participant) {
            defer wg.Done()
            if err := participant.Commit(ctx, transactionID); err != nil {
                // This is a critical point: if commit fails here, data inconsistency might occur.
                // In a real 2PC, recovery mechanisms would be needed.
                fmt.Printf("!!! [%s] %s commit failed: %v (CRITICAL! Potential inconsistency)n", time.Now().Format("15:04:05"), participant.GetName(), err)
                errChan <- err // Signal that commit failed
            }
        }(p)
    }
    wg.Wait()
    close(errChan) // Close again to ensure no panic if already closed

    if len(errChan) > 0 {
        fmt.Printf("!!! [%s] Commit phase failed for transaction %s. Potential data inconsistency. Requires manual intervention/recovery.n", time.Now().Format("15:04:05"), transactionID)
        return fmt.Errorf("transaction %s failed during commit phase: %v", transactionID, <-errChan)
    }

    fmt.Printf("--- 2PC transaction %s committed successfully ---n", transactionID)
    return nil
}

func main() {
    orderSvc := NewOrderService()
    inventorySvc := NewInventoryService()
    paymentSvc := NewPaymentService()

    coordinator := NewTwoPhaseCoordinator(orderSvc, inventorySvc, paymentSvc)
    ctx := context.Background()

    // 场景1: 成功提交
    err := coordinator.ExecuteTransaction(ctx, "tx_success_1")
    if err != nil {
        fmt.Printf("Transaction tx_success_1 failed: %vn", err)
    }
    fmt.Println("Current inventory for itemA:", inventorySvc.stock["itemA"])
    fmt.Println("Current balance for user123:", paymentSvc.accounts["user123"])

    // 场景2: 模拟库存不足导致 Prepare 失败
    fmt.Println("n--- Resetting state for next scenario ---")
    orderSvc = NewOrderService() // Reset services
    inventorySvc = &InventoryService{stock: map[string]int{"itemA": 0}} // Set inventory to 0
    paymentSvc = NewPaymentService()
    coordinator = NewTwoPhaseCoordinator(orderSvc, inventorySvc, paymentSvc)

    err = coordinator.ExecuteTransaction(ctx, "tx_fail_inventory")
    if err != nil {
        fmt.Printf("Transaction tx_fail_inventory failed as expected: %vn", err)
    }
    fmt.Println("Current inventory for itemA (after failed tx):", inventorySvc.stock["itemA"])
    fmt.Println("Current balance for user123 (after failed tx):", paymentSvc.accounts["user123"])

    // 场景3: 模拟支付服务 Prepare 失败 (余额不足)
    fmt.Println("n--- Resetting state for next scenario ---")
    orderSvc = NewOrderService()
    inventorySvc = NewInventoryService()
    paymentSvc = &PaymentService{accounts: map[string]float64{"user123": 50.00}} // Set low balance
    coordinator = NewTwoPhaseCoordinator(orderSvc, inventorySvc, paymentSvc)

    err = coordinator.ExecuteTransaction(ctx, "tx_fail_payment")
    if err != nil {
        fmt.Printf("Transaction tx_fail_payment failed as expected: %vn", err)
    }
    fmt.Println("Current inventory for itemA (after failed tx):", inventorySvc.stock["itemA"])
    fmt.Println("Current balance for user123 (after failed tx):", paymentSvc.accounts["user123"])

    // 场景4: 模拟在 commit 阶段某个服务失败 (非常危险,可能导致不一致)
    // 这个场景在模拟代码中很难直接体现,因为我们没有模拟网络或进程故障
    // 但其后果是:部分服务提交,部分未提交,导致数据不一致。
    // 在实际 2PC 实现中,需要复杂的恢复机制来处理这种情况。
    fmt.Println("n--- Scenario 4: Critical Commit Failure (Conceptual) ---")
    fmt.Println("If a participant fails during the commit phase after coordinator sent commit,")
    fmt.Println("it could lead to data inconsistency. E.g., Order committed, but Inventory failed to commit.")
    fmt.Println("2PC requires robust recovery logs and mechanisms to resolve this, which adds complexity.")
}

代码解释:

  • Participant 接口定义了 Prepare, Commit, Rollback 方法,这是 2PC 参与者必须实现的核心逻辑。
  • OrderService, InventoryService, PaymentService 模拟了微服务,它们各自维护自己的数据,并实现了 Participant 接口。
  • Prepare 方法:执行预扣操作,如预留库存、预扣款项、创建待定订单。
  • Commit 方法:确认预扣操作,使之永久生效。
  • Rollback 方法:撤销预扣操作,归还资源。
  • TwoPhaseCoordinator:负责协调整个事务,它首先调用所有参与者的 Prepare,如果都成功,则调用 Commit;否则调用 Rollback
  • 使用了 sync.WaitGroupchan error 来并发调用参与者的操作并收集错误。
  • 模拟了成功、库存不足和余额不足的场景。

核心问题与 Go 模拟的局限性:

这个 Go 模拟代码虽然展示了 2PC 的流程,但它没有完全体现 2PC 最致命的问题:协调者宕机和网络分区导致的数据不一致。在我们的模拟中,协调者是在一个进程内运行的,并且参与者之间的通信是同步的函数调用,这与真实分布式环境中的网络通信和独立进程的协调有很大区别。

真实 2PC 中,协调者需要持久化事务状态(例如,记录每个参与者的投票结果),以便在宕机后恢复。如果协调者在第二阶段发送 commit 消息后,自身宕机了,或者部分参与者收到了 commit 消息但其他参与者没有收到,就会出现数据不一致。解决这些问题需要复杂的事务日志、恢复机制和超时处理,这使得 2PC 在微服务中变得非常笨重和难以维护。


事务补偿提交(Transactional Compensating Commit, TCC)

与 2PC 追求强一致性不同,TCC 模式是一种基于业务层面的分布式事务解决方案,它追求的是最终一致性。TCC 的核心思想是“先尝试,后确认,失败则取消”,通过业务逻辑的三个阶段来实现事务的原子性。

TCC 的基本原理

TCC 模式也涉及一个事务协调者和多个事务参与者,但它的协调逻辑和参与者行为与 2PC 有本质区别。每个事务参与者都需要实现三个操作:

  1. Try 阶段(尝试执行):

    • 这个阶段业务系统尝试去执行业务操作,但并不真正提交。
    • 主要任务是检查业务可行性(如库存是否足够、余额是否充足)和预留必要的业务资源(如锁定库存、冻结资金、创建待定订单)。
    • Try 阶段必须保证其操作是幂等的。
  2. Confirm 阶段(确认执行):

    • 当所有参与者的 Try 阶段都成功后,协调者会发起 Confirm 阶段。
    • 这个阶段真正执行业务操作,提交 Try 阶段预留的资源,使其生效。
    • Confirm 阶段的操作必须是幂等的
    • Confirm 阶段原则上不应失败,因为所有必要条件已经在 Try 阶段检查过并预留了资源。如果 Confirm 仍然失败,通常需要人工介入。
  3. Cancel 阶段(取消执行):

    • 如果在 Try 阶段有任何一个参与者失败,或者 Confirm 阶段失败(理论上不应发生),协调者会发起 Cancel 阶段。
    • 这个阶段撤销 Try 阶段预留的资源,将系统恢复到事务开始前的状态。
    • Cancel 阶段的操作必须是幂等的
    • Cancel 阶段必须能够成功执行,不能失败,否则系统将陷入不一致状态。

流程图示:

角色 阶段一:Try (全部成功) 阶段二:Confirm 阶段一:Try (任一失败) 阶段二:Cancel
协调者 1. 发送 Try 1. 发送 Confirm 1. 发送 Try 1. 发送 Cancel
参与者A 2. 检查并预留资源,响应 Yes 2. 确认并提交资源 2. 检查并预留资源,响应 No 2. 释放预留资源
参与者B 2. 检查并预留资源,响应 Yes 2. 确认并提交资源 2. 检查并预留资源,响应 Yes 2. 释放预留资源

幂等性与空回滚

在 TCC 模式中,幂等性(Idempotency) 是至关重要的。这意味着对同一个操作的多次调用应该产生与单次调用相同的效果。

  • Try 操作:多次调用 Try 应该只预留一次资源。
  • Confirm 操作:多次调用 Confirm 应该只提交一次资源。
  • Cancel 操作:多次调用 Cancel 应该只释放一次资源。

此外,还需要考虑空回滚(Nullification)。如果协调者在某个参与者还未收到 Try 请求,或者 Try 请求失败后,就直接发起 Cancel 请求,那么这个 Cancel 请求就应该能够安全地执行,即使没有任何资源需要释放。这意味着 Cancel 操作需要判断对应的 Try 操作是否已经成功执行过。

TCC 的优缺点

优点:

  • 非阻塞: Try 阶段的资源锁定是业务层面的,通常是轻量级锁定,且时间较短,避免了 2PC 那样的全局性长时间阻塞。这大大提高了系统的并发性和吞吐量。
  • 高可用性: 协调者和参与者之间的通信可以是异步的。即使协调者或某个参与者暂时失败,也可以通过重试机制最终达到一致。
  • 灵活性: 业务逻辑完全由服务自身控制,可以根据实际业务需求设计 Try/Confirm/Cancel 操作。
  • 最终一致性: 通过业务补偿机制,保证了分布式事务的最终一致性。
  • 性能较好: 相比 2PC,TCC 通常有更好的性能和可伸缩性。

缺点:

  • 业务侵入性强: 每个参与者都需要根据业务逻辑实现 Try、Confirm、Cancel 三个接口,这增加了业务服务的开发和维护成本。
  • 实现复杂度高: 需要仔细设计每个阶段的业务逻辑,确保幂等性、防空回滚和悬挂问题(CancelTry 先执行)。
  • 数据一致性风险: 在 Confirm 或 Cancel 阶段,如果某个参与者失败且重试机制也无法使其成功,那么系统可能陷入数据不一致状态,需要人工介入。
  • 隔离性较弱: 在 Try 阶段,资源只是被预留,而不是完全锁定,其他事务仍可能看到这些预留的资源,可能导致脏读。业务上需要通过合理的隔离设计(例如,预留状态的商品不显示为可购买)来缓解。

Go 语言中 TCC 的实现示例

我们将使用 Go 语言模拟一个电商购物流程:用户下单,需要扣减库存、扣款,并创建订单。

package main

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"
)

// TCC 事务状态
type TCCState int

const (
    TCC_INIT TCCState = iota
    TCC_TRY_SUCCESS
    TCC_CONFIRM_SUCCESS
    TCC_CANCEL_SUCCESS
    TCC_FAILED // 任何阶段失败
)

func (s TCCState) String() string {
    switch s {
    case TCC_INIT:
        return "INIT"
    case TCC_TRY_SUCCESS:
        return "TRY_SUCCESS"
    case TCC_CONFIRM_SUCCESS:
        return "CONFIRM_SUCCESS"
    case TCC_CANCEL_SUCCESS:
        return "CANCEL_SUCCESS"
    case TCC_FAILED:
        return "FAILED"
    default:
        return "UNKNOWN"
    }
}

// 定义 TCC 参与者接口
type TCCParticipant interface {
    Try(ctx context.Context, transactionID string, args interface{}) error
    Confirm(ctx context.Context, transactionID string, args interface{}) error
    Cancel(ctx context.Context, transactionID string, args interface{}) error
    GetName() string
}

// OrderInfo 结构体用于传递订单相关参数
type OrderInfo struct {
    UserID    string
    ProductID string
    Quantity  int
    Amount    float64
}

// OrderService 模拟
type OrderService struct {
    orders map[string]OrderInfo // 存储预创建/已完成订单
    status map[string]TCCState  // 订单事务状态
    mu     sync.Mutex
}

func NewOrderService() *OrderService {
    return &OrderService{
        orders: make(map[string]OrderInfo),
        status: make(map[string]TCCState),
    }
}

func (s *OrderService) GetName() string { return "OrderService" }

func (s *OrderService) Try(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    orderInfo, ok := args.(OrderInfo)
    if !ok {
        return errors.New("invalid args for order service try")
    }

    fmt.Printf("[%s] %s (Try): Transaction %s - User %s, Product %s, Quantity %d, Amount %.2fn",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, orderInfo.UserID, orderInfo.ProductID, orderInfo.Quantity, orderInfo.Amount)

    // 模拟幂等性:如果已经Try过,直接返回成功
    if s.status[transactionID] == TCC_TRY_SUCCESS {
        fmt.Printf("[%s] %s (Try): Transaction %s already in TRY_SUCCESS state, skipping.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
        return nil
    }

    // 模拟创建待支付订单
    s.orders[transactionID] = orderInfo
    s.status[transactionID] = TCC_TRY_SUCCESS
    fmt.Printf("[%s] %s (Try): Order %s created as PENDING.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    return nil
}

func (s *OrderService) Confirm(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    fmt.Printf("[%s] %s (Confirm): Transaction %s.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)

    // 模拟幂等性:如果已经Confirm过,直接返回成功
    if s.status[transactionID] == TCC_CONFIRM_SUCCESS {
        fmt.Printf("[%s] %s (Confirm): Transaction %s already in CONFIRM_SUCCESS state, skipping.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
        return nil
    }

    if s.status[transactionID] != TCC_TRY_SUCCESS {
        // 悬挂:Cancel 比 Try 先到,或者 Try 失败了,Confirm 不应该再执行
        // 实际应该记录日志并报警
        fmt.Printf("[%s] %s (Confirm): Transaction %s not in TRY_SUCCESS, possible hanging. Current status: %sn", time.Now().Format("15:04:05"), s.GetName(), transactionID, s.status[transactionID])
        return nil // 或者返回错误,取决于业务策略
    }

    // 模拟将订单状态改为已完成
    s.status[transactionID] = TCC_CONFIRM_SUCCESS
    fmt.Printf("[%s] %s (Confirm): Order %s status changed to COMPLETED.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    return nil
}

func (s *OrderService) Cancel(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    fmt.Printf("[%s] %s (Cancel): Transaction %s.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)

    // 模拟幂等性:如果已经Cancel过,直接返回成功
    if s.status[transactionID] == TCC_CANCEL_SUCCESS {
        fmt.Printf("[%s] %s (Cancel): Transaction %s already in CANCEL_SUCCESS state, skipping.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
        return nil
    }

    // 空回滚:如果 Try 阶段未成功执行,直接返回成功,不做任何操作
    if s.status[transactionID] != TCC_TRY_SUCCESS && s.status[transactionID] != TCC_INIT {
        fmt.Printf("[%s] %s (Cancel): Transaction %s not in TRY_SUCCESS/INIT state, no need to cancel. Current status: %sn", time.Now().Format("15:04:05"), s.GetName(), transactionID, s.status[transactionID])
        return nil
    }

    // 模拟删除待支付订单
    delete(s.orders, transactionID)
    s.status[transactionID] = TCC_CANCEL_SUCCESS
    fmt.Printf("[%s] %s (Cancel): Order %s deleted.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
    return nil
}

// InventoryService 模拟
type InventoryService struct {
    stock       map[string]int // 实际库存
    reservedStock map[string]map[string]int // transactionID -> productID -> quantity
    mu          sync.Mutex
}

func NewInventoryService() *InventoryService {
    return &InventoryService{
        stock:       map[string]int{"itemA": 100},
        reservedStock: make(map[string]map[string]int),
    }
}

func (s *InventoryService) GetName() string { return "InventoryService" }

func (s *InventoryService) Try(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    orderInfo, ok := args.(OrderInfo)
    if !ok {
        return errors.New("invalid args for inventory service try")
    }

    fmt.Printf("[%s] %s (Try): Transaction %s - Reserve %d of %s.n",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, orderInfo.Quantity, orderInfo.ProductID)

    // 模拟幂等性
    if _, ok := s.reservedStock[transactionID]; ok {
        if s.reservedStock[transactionID][orderInfo.ProductID] == orderInfo.Quantity {
            fmt.Printf("[%s] %s (Try): Transaction %s already reserved, skipping.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
            return nil
        }
    }

    // 检查库存
    if s.stock[orderInfo.ProductID] < orderInfo.Quantity {
        fmt.Printf("[%s] %s (Try): Not enough stock for %s. Available: %d, Required: %dn",
            time.Now().Format("15:04:05"), s.GetName(), orderInfo.ProductID, s.stock[orderInfo.ProductID], orderInfo.Quantity)
        return fmt.Errorf("not enough stock for %s", orderInfo.ProductID)
    }

    // 预留库存
    s.stock[orderInfo.ProductID] -= orderInfo.Quantity
    if _, ok := s.reservedStock[transactionID]; !ok {
        s.reservedStock[transactionID] = make(map[string]int)
    }
    s.reservedStock[transactionID][orderInfo.ProductID] = orderInfo.Quantity
    fmt.Printf("[%s] %s (Try): Reserved %d of %s for transaction %s. Remaining stock: %d.n",
        time.Now().Format("15:04:05"), s.GetName(), orderInfo.Quantity, orderInfo.ProductID, transactionID, s.stock[orderInfo.ProductID])
    return nil
}

func (s *InventoryService) Confirm(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    orderInfo, ok := args.(OrderInfo)
    if !ok {
        return errors.New("invalid args for inventory service confirm")
    }

    fmt.Printf("[%s] %s (Confirm): Transaction %s - Confirm %d of %s.n",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, orderInfo.Quantity, orderInfo.ProductID)

    // 幂等性:如果事务ID在已预留库存中不存在,说明已经确认过或者没有Try过
    if _, ok := s.reservedStock[transactionID]; !ok {
        fmt.Printf("[%s] %s (Confirm): Transaction %s has no reserved stock, skipping confirm (already confirmed or never tried).n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
        return nil
    }

    // 实际库存已经扣除,这里只需删除预留记录
    delete(s.reservedStock[transactionID], orderInfo.ProductID)
    if len(s.reservedStock[transactionID]) == 0 {
        delete(s.reservedStock, transactionID)
    }
    fmt.Printf("[%s] %s (Confirm): Stock for transaction %s confirmed. Actual stock: %d.n",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, s.stock[orderInfo.ProductID])
    return nil
}

func (s *InventoryService) Cancel(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    orderInfo, ok := args.(OrderInfo)
    if !ok {
        return errors.New("invalid args for inventory service cancel")
    }

    fmt.Printf("[%s] %s (Cancel): Transaction %s - Cancel %d of %s.n",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, orderInfo.Quantity, orderInfo.ProductID)

    // 空回滚 & 幂等性:如果事务ID没有预留库存,说明已经回滚过或者没有Try过
    if _, ok := s.reservedStock[transactionID]; !ok {
        fmt.Printf("[%s] %s (Cancel): Transaction %s has no reserved stock, skipping cancel (already cancelled or never tried).n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
        return nil
    }

    // 归还库存
    s.stock[orderInfo.ProductID] += s.reservedStock[transactionID][orderInfo.ProductID]
    delete(s.reservedStock[transactionID], orderInfo.ProductID)
    if len(s.reservedStock[transactionID]) == 0 {
        delete(s.reservedStock, transactionID)
    }
    fmt.Printf("[%s] %s (Cancel): Stock for transaction %s cancelled. Returned %d of %s. Current stock: %d.n",
        time.Now().Format("15:04:05"), s.GetName(), orderInfo.Quantity, orderInfo.ProductID, s.stock[orderInfo.ProductID])
    return nil
}

// PaymentService 模拟
type PaymentService struct {
    accounts map[string]float64 // 实际账户余额
    frozenAccounts map[string]map[string]float64 // transactionID -> userID -> amount
    mu       sync.Mutex
}

func NewPaymentService() *PaymentService {
    return &PaymentService{
        accounts: map[string]float64{"user123": 1000.00},
        frozenAccounts: make(map[string]map[string]float64),
    }
}

func (s *PaymentService) GetName() string { return "PaymentService" }

func (s *PaymentService) Try(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    orderInfo, ok := args.(OrderInfo)
    if !ok {
        return errors.New("invalid args for payment service try")
    }

    fmt.Printf("[%s] %s (Try): Transaction %s - Freeze %.2f for User %s.n",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, orderInfo.Amount, orderInfo.UserID)

    // 模拟幂等性
    if _, ok := s.frozenAccounts[transactionID]; ok {
        if s.frozenAccounts[transactionID][orderInfo.UserID] == orderInfo.Amount {
            fmt.Printf("[%s] %s (Try): Transaction %s already frozen, skipping.n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
            return nil
        }
    }

    // 检查余额
    if s.accounts[orderInfo.UserID] < orderInfo.Amount {
        fmt.Printf("[%s] %s (Try): Insufficient funds for %s. Available: %.2f, Required: %.2fn",
            time.Now().Format("15:04:05"), s.GetName(), orderInfo.UserID, s.accounts[orderInfo.UserID], orderInfo.Amount)
        return fmt.Errorf("insufficient funds for %s", orderInfo.UserID)
    }

    // 冻结资金
    s.accounts[orderInfo.UserID] -= orderInfo.Amount
    if _, ok := s.frozenAccounts[transactionID]; !ok {
        s.frozenAccounts[transactionID] = make(map[string]float64)
    }
    s.frozenAccounts[transactionID][orderInfo.UserID] = orderInfo.Amount
    fmt.Printf("[%s] %s (Try): Frozen %.2f for User %s. Remaining balance: %.2f.n",
        time.Now().Format("15:04:05"), s.GetName(), orderInfo.Amount, orderInfo.UserID, s.accounts[orderInfo.UserID])
    return nil
}

func (s *PaymentService) Confirm(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    orderInfo, ok := args.(OrderInfo)
    if !ok {
        return errors.New("invalid args for payment service confirm")
    }

    fmt.Printf("[%s] %s (Confirm): Transaction %s - Confirm payment for User %s.n",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, orderInfo.UserID)

    // 幂等性:如果事务ID在已冻结资金中不存在,说明已经确认过或者没有Try过
    if _, ok := s.frozenAccounts[transactionID]; !ok {
        fmt.Printf("[%s] %s (Confirm): Transaction %s has no frozen amount, skipping confirm (already confirmed or never tried).n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
        return nil
    }

    // 实际资金已经扣除,这里只需删除冻结记录
    delete(s.frozenAccounts[transactionID], orderInfo.UserID)
    if len(s.frozenAccounts[transactionID]) == 0 {
        delete(s.frozenAccounts, transactionID)
    }
    fmt.Printf("[%s] %s (Confirm): Payment for transaction %s confirmed. Actual balance: %.2f.n",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, s.accounts[orderInfo.UserID])
    return nil
}

func (s *PaymentService) Cancel(ctx context.Context, transactionID string, args interface{}) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    orderInfo, ok := args.(OrderInfo)
    if !ok {
        return errors.New("invalid args for payment service cancel")
    }

    fmt.Printf("[%s] %s (Cancel): Transaction %s - Cancel payment for User %s.n",
        time.Now().Format("15:04:05"), s.GetName(), transactionID, orderInfo.UserID)

    // 空回滚 & 幂等性:如果事务ID没有冻结资金,说明已经回滚过或者没有Try过
    if _, ok := s.frozenAccounts[transactionID]; !ok {
        fmt.Printf("[%s] %s (Cancel): Transaction %s has no frozen amount, skipping cancel (already cancelled or never tried).n", time.Now().Format("15:04:05"), s.GetName(), transactionID)
        return nil
    }

    // 归还冻结资金
    s.accounts[orderInfo.UserID] += s.frozenAccounts[transactionID][orderInfo.UserID]
    delete(s.frozenAccounts[transactionID], orderInfo.UserID)
    if len(s.frozenAccounts[transactionID]) == 0 {
        delete(s.frozenAccounts, transactionID)
    }
    fmt.Printf("[%s] %s (Cancel): Payment for transaction %s cancelled. Returned %.2f to User %s. Current balance: %.2f.n",
        time.Now().Format("15:04:05"), s.GetName(), orderInfo.Amount, orderInfo.UserID, s.accounts[orderInfo.UserID])
    return nil
}

// TCC 协调者
type TCCCoordinator struct {
    participants []TCCParticipant
    // 实际生产中,TCC 协调者需要持久化事务状态,以便在宕机后恢复和重试
    // 这里简化为内存存储
    transactionStates map[string]TCCState
    mu                sync.Mutex
}

func NewTCCCoordinator(participants ...TCCParticipant) *TCCCoordinator {
    return &TCCCoordinator{
        participants:      participants,
        transactionStates: make(map[string]TCCState),
    }
}

// 重试机制 (简化版)
func retry(attempts int, sleep time.Duration, f func() error) error {
    for i := 0; i < attempts; i++ {
        err := f()
        if err == nil {
            return nil
        }
        fmt.Printf("Retrying after error: %v (attempt %d/%d)n", err, i+1, attempts)
        time.Sleep(sleep)
    }
    return fmt.Errorf("failed after %d attempts", attempts)
}

func (c *TCCCoordinator) ExecuteTransaction(ctx context.Context, transactionID string, args interface{}) error {
    c.mu.Lock()
    c.transactionStates[transactionID] = TCC_INIT
    c.mu.Unlock()

    fmt.Printf("n--- Starting TCC transaction %s (Initial state: %s) ---n", transactionID, c.transactionStates[transactionID])

    // Phase 1: Try
    fmt.Printf("Phase 1: Try for transaction %sn", transactionID)
    trySuccess := true
    var wg sync.WaitGroup
    errChan := make(chan error, len(c.participants))

    for _, p := range c.participants {
        wg.Add(1)
        go func(participant TCCParticipant) {
            defer wg.Done()
            // 在实际场景中,Try 阶段的重试需要谨慎,因为可能会导致资源重复预留
            // 通常 Try 失败直接进入 Cancel
            if err := participant.Try(ctx, transactionID, args); err != nil {
                fmt.Printf("!!! [%s] %s Try failed: %vn", time.Now().Format("15:04:05"), participant.GetName(), err)
                errChan <- err
            }
        }(p)
    }
    wg.Wait()
    close(errChan)

    if len(errChan) > 0 { // Any Try failed
        trySuccess = false
        c.mu.Lock()
        c.transactionStates[transactionID] = TCC_FAILED
        c.mu.Unlock()
        fmt.Printf("Phase 1: Try failed for transaction %s. Initiating Cancel. (State: %s)n", transactionID, c.transactionStates[transactionID])
    } else {
        c.mu.Lock()
        c.transactionStates[transactionID] = TCC_TRY_SUCCESS
        c.mu.Unlock()
        fmt.Printf("Phase 1: All participants Try successful for transaction %s. (State: %s)n", transactionID, c.transactionStates[transactionID])
    }

    if !trySuccess {
        // Phase 2: Cancel (if any Try failed)
        fmt.Printf("Phase 2: Initiating Cancel for transaction %sn", transactionID)
        for _, p := range c.participants {
            wg.Add(1)
            go func(participant TCCParticipant) {
                defer wg.Done()
                // Cancel 阶段必须保证成功,需要重试
                err := retry(3, 1*time.Second, func() error {
                    return participant.Cancel(ctx, transactionID, args)
                })
                if err != nil {
                    fmt.Printf("!!! [%s] %s Cancel failed after retries: %v (CRITICAL! Requires manual intervention)n", time.Now().Format("15:04:05"), participant.GetName(), err)
                    // 这通常意味着数据不一致,需要人工介入或更复杂的恢复机制
                    errChan <- err
                }
            }(p)
        }
        wg.Wait()
        close(errChan) // Close again for potential new errors

        if len(errChan) > 0 {
            fmt.Printf("--- TCC transaction %s failed during Cancel phase. Potential inconsistency. (State: %s)n", transactionID, c.transactionStates[transactionID])
            return fmt.Errorf("transaction %s failed during cancel phase after retries", transactionID)
        }
        c.mu.Lock()
        c.transactionStates[transactionID] = TCC_CANCEL_SUCCESS
        c.mu.Unlock()
        fmt.Printf("--- TCC transaction %s successfully cancelled. (State: %s)n", transactionID, c.transactionStates[transactionID])
        return fmt.Errorf("transaction %s failed during try phase, rolled back", transactionID)
    }

    // Phase 2: Confirm (if all Try successful)
    fmt.Printf("Phase 2: All participants Try successful. Committing transaction %sn", transactionID)
    for _, p := range c.participants {
        wg.Add(1)
        go func(participant TCCParticipant) {
            defer wg.Done()
            // Confirm 阶段也必须保证成功,需要重试
            err := retry(3, 1*time.Second, func() error {
                return participant.Confirm(ctx, transactionID, args)
            })
            if err != nil {
                fmt.Printf("!!! [%s] %s Confirm failed after retries: %v (CRITICAL! Requires manual intervention)n", time.Now().Format("15:04:05"), participant.GetName(), err)
                // 这通常意味着数据不一致,需要人工介入或更复杂的恢复机制
                errChan <- err
            }
        }(p)
    }
    wg.Wait()
    close(errChan) // Close again for potential new errors

    if len(errChan) > 0 {
        c.mu.Lock()
        c.transactionStates[transactionID] = TCC_FAILED // 理论上 Confirm 不应该失败,失败则意味着严重问题
        c.mu.Unlock()
        fmt.Printf("--- TCC transaction %s failed during Confirm phase. Potential inconsistency. (State: %s)n", transactionID, c.transactionStates[transactionID])
        return fmt.Errorf("transaction %s failed during confirm phase after retries", transactionID)
    }

    c.mu.Lock()
    c.transactionStates[transactionID] = TCC_CONFIRM_SUCCESS
    c.mu.Unlock()
    fmt.Printf("--- TCC transaction %s committed successfully. (State: %s)n", transactionID, c.transactionStates[transactionID])
    return nil
}

func main() {
    orderSvc := NewOrderService()
    inventorySvc := NewInventoryService()
    paymentSvc := NewPaymentService()

    coordinator := NewTCCCoordinator(orderSvc, inventorySvc, paymentSvc)
    ctx := context.Background()

    // 场景1: 成功提交
    fmt.Println("n=== Scenario 1: Successful Transaction ===")
    orderArgs1 := OrderInfo{UserID: "user123", ProductID: "itemA", Quantity: 1, Amount: 100.00}
    err := coordinator.ExecuteTransaction(ctx, "tcc_tx_success_1", orderArgs1)
    if err != nil {
        fmt.Printf("Transaction tcc_tx_success_1 failed: %vn", err)
    }
    fmt.Println("Final inventory for itemA:", inventorySvc.stock["itemA"])
    fmt.Println("Final balance for user123:", paymentSvc.accounts["user123"])
    fmt.Println("Order status for tcc_tx_success_1:", orderSvc.status["tcc_tx_success_1"])

    // 场景2: 模拟库存不足导致 Try 失败
    fmt.Println("n=== Scenario 2: Inventory Shortage (Try Phase Failure) ===")
    orderArgs2 := OrderInfo{UserID: "user123", ProductID: "itemA", Quantity: 100, Amount: 10000.00} // itemA only has 99 now
    err = coordinator.ExecuteTransaction(ctx, "tcc_tx_fail_inventory", orderArgs2)
    if err != nil {
        fmt.Printf("Transaction tcc_tx_fail_inventory failed as expected: %vn", err)
    }
    fmt.Println("Final inventory for itemA (after failed tx):", inventorySvc.stock["itemA"]) // Should be 99
    fmt.Println("Final balance for user123 (after failed tx):", paymentSvc.accounts["user123"]) // Should be 900
    fmt.Println("Order status for tcc_tx_fail_inventory:", orderSvc.status["tcc_tx_fail_inventory"])

    // 场景3: 模拟支付服务 Try 失败 (余额不足)
    fmt.Println("n=== Scenario 3: Insufficient Funds (Try Phase Failure) ===")
    // Reset services for clean state (inventory is 99, balance is 900)
    inventorySvc = NewInventoryService()
    inventorySvc.stock["itemA"] = 99 // Reset to previous state
    paymentSvc = NewPaymentService()
    paymentSvc.accounts["user123"] = 900.00 // Reset to previous state
    orderSvc = NewOrderService()
    coordinator = NewTCCCoordinator(orderSvc, inventorySvc, paymentSvc)

    orderArgs3 := OrderInfo{UserID: "user123", ProductID: "itemA", Quantity: 1, Amount: 1000.00} // user123 only has 900 now
    err = coordinator.ExecuteTransaction(ctx, "tcc_tx_fail_payment", orderArgs3)
    if err != nil {
        fmt.Printf("Transaction tcc_tx_fail_payment failed as expected: %vn", err)
    }
    fmt.Println("Final inventory for itemA (after failed tx):", inventorySvc.stock["itemA"]) // Should be 99
    fmt.Println("Final balance for user123 (after failed tx):", paymentSvc.accounts["user123"]) // Should be 900
    fmt.Println("Order status for tcc_tx_fail_payment:", orderSvc.status["tcc_tx_fail_payment"])

    // 场景4: 模拟 Confirm 阶段某个服务失败 (非常危险,需要重试)
    // 在我们的retry机制下,理论上不会直接失败,而是会一直重试直到成功或达到重试上限
    fmt.Println("n=== Scenario 4: Confirm Phase Failure (Conceptual with Retries) ===")
    fmt.Println("If a participant fails during the Confirm phase, the coordinator will retry.")
    fmt.Println("If retries exhaust, it leads to a critical inconsistent state requiring manual intervention.")
    fmt.Println("This scenario is hard to reliably simulate as Confirm *should* eventually succeed due to prior Try success.")
    fmt.Println("In a real system, the coordinator must persist transaction state and have a separate 'transaction checker' or 'recovery' service to deal with such critical failures by continuously retrying Confirm/Cancel.")
}

代码解释:

  • TCCParticipant 接口定义了 Try, Confirm, Cancel 方法。
  • OrderService, InventoryService, PaymentService 模拟了微服务。
    • Try 阶段:
      • OrderService: 创建一个 PENDING 状态的订单。
      • InventoryService: 检查库存,如果足够则预扣库存(实际库存减少),并记录预留量。
      • PaymentService: 检查余额,如果足够则冻结资金(实际余额减少),并记录冻结金额。
    • Confirm 阶段:
      • OrderService: 将订单状态从 PENDING 改为 COMPLETED
      • InventoryService: 删除预留库存记录(因为实际库存已在 Try 阶段扣除)。
      • PaymentService: 删除冻结资金记录(因为实际资金已在 Try 阶段扣除)。
    • Cancel 阶段:
      • OrderService: 删除 PENDING 状态的订单。
      • InventoryService: 归还预扣的库存,删除预留记录。
      • PaymentService: 归还冻结的资金,删除冻结记录。
  • 每个服务的 Try, Confirm, Cancel 方法都包含了幂等性检查(例如,通过 transactionID 判断是否已经执行过)。
  • Cancel 方法包含了空回滚处理:如果对应的 Try 阶段没有成功执行,Cancel 方法直接返回成功,不做任何实际操作。
  • TCCCoordinator:负责驱动整个 TCC 流程。
    • 它维护每个事务的当前状态 (TCCState),这在实际系统中需要持久化到数据库或专门的事务日志中,以便在协调者重启后能够恢复。
    • ExecuteTransaction 方法首先并发执行所有参与者的 Try
    • 如果所有 Try 都成功,则并发执行所有参与者的 Confirm
    • 如果任何一个 Try 失败,或者(理论上不应发生)Confirm 失败,则并发执行所有参与者的 Cancel
    • retry 函数模拟了重试机制,尤其在 ConfirmCancel 阶段,重试是确保最终一致性的关键。

TCC 的核心挑战与 Go 模拟的局限性:

  • 事务状态持久化与恢复: 模拟代码中的 TCCCoordinator 将事务状态存储在内存中。在真实的分布式系统中,协调者必须将事务状态持久化到数据库或消息队列中,以便在协调者宕机后能够恢复事务,并继续执行 ConfirmCancel 操作,这是 TCC 确保最终一致性的关键。通常会有一个独立的事务状态管理器或消息队列来完成这个任务。
  • 幂等性与空回滚的实现: 示例代码中通过简单的 map 检查来模拟幂等性和空回滚,实际业务中需要更复杂的机制,例如在数据库中记录事务日志、使用唯一请求 ID 等。
  • 悬挂问题: 如果 CancelTry 先执行(例如,网络延迟导致 Try 请求很晚才到达,而协调者已经超时并发送了 Cancel),Cancel 操作会因为空回滚而成功,但 Try 稍后成功预留了资源,导致资源悬挂。解决方法通常是在 Try 阶段设置一个有效期,或者在 Cancel 阶段记录已取消的事务 ID,Try 阶段检查这个列表。
  • 人工介入: 即使有重试,ConfirmCancel 阶段仍然可能在多次重试后失败,这时系统会陷入不一致状态,通常需要人工介入进行数据修复。

2PC 与 TCC 的比较

为了更好地理解两者的权衡,我们通过表格进行对比:

特性 两阶段提交 (2PC) 事务补偿提交 (TCC)
一致性模型 强一致性 (理论上) 最终一致性
隔离性 强隔离性 (资源在 Prepare 阶段被锁定) 弱隔离性 (资源在 Try 阶段是预留,可能被其他事务看到)
阻塞性 同步阻塞,资源长时间锁定 异步非阻塞,资源轻量级预留,锁定时间短
性能/吞吐量 较低 较高
可用性 协调者单点故障风险,可用性较低 协调者故障可通过重试恢复,可用性较高
业务侵入性 低 (由数据库或事务管理器实现) 高 (业务服务需实现 Try/Confirm/Cancel 接口)
实现复杂度 协调者实现复杂,需考虑宕机恢复和超时处理;参与者相对简单 (依赖 XA) 参与者实现复杂 (幂等性、空回滚、悬挂处理),协调者也需持久化状态和重试
数据不一致风险 协调者或网络故障可能导致数据不一致,恢复困难 Confirm/Cancel 阶段失败可能导致数据不一致,需重试和人工介入
适用场景 传统单体应用或强一致性要求极高的分布式系统,且性能要求不高 微服务架构、高并发、对可用性要求高、允许短暂不一致的业务场景

权衡与选择

  • 追求强一致性且对性能要求不高: 2PC 理论上能提供强一致性,但其在分布式微服务场景下的性能、可用性问题使其成为一个不推荐的选择。在 Go 微服务中,通常不会直接实现 2PC,而是会寻求更适合分布式环境的替代方案。
  • 追求最终一致性、高可用性和高性能: TCC 是一个更适合微服务架构的选择。它将分布式事务的复杂性推向了业务层面,要求业务开发者深入理解业务流程,并为每个参与者设计 Try/Confirm/Cancel 操作。虽然开发成本较高,但它带来了更好的性能和可用性。
  • 业务侵入性: 2PC 通常由底层数据库或事务中间件实现,对业务代码侵入性低。TCC 对业务代码有很强的侵入性,每个服务都需要修改来支持 TCC 接口。
  • 故障处理: 2PC 的故障处理和恢复机制非常复杂且容易引入不一致。TCC 通过重试和补偿机制处理故障,但极端情况下仍可能需要人工介入。

真实世界考量与最佳实践

在 Go 微服务中选择和实现分布式事务方案时,除了 2PC 和 TCC,还有一些其他考量和模式:

  1. Saga 模式: TCC 可以看作是 Saga 模式的一种具体实现。Saga 模式是一种更通用的分布式事务模式,它将一个分布式事务分解为一系列本地事务,每个本地事务都有一个对应的补偿事务。如果某个本地事务失败,则执行之前所有已成功本地事务的补偿事务。Saga 模式有两种实现方式:

    • 编排(Orchestration): 有一个中央协调器负责发送命令给参与者并根据结果决定下一步操作(类似于 TCC 协调器)。
    • 协同(Choreography): 没有中央协调器,每个参与者通过发布事件来触发下一个参与者的本地事务,参与者根据事件决定是否执行补偿。
      TCC 模式更偏向于编排式的 Saga。
  2. 消息队列实现最终一致性: 结合可靠消息队列(如 Kafka, RabbitMQ)是实现最终一致性的常见且推荐方式。

    • 本地消息表 / 发件箱模式 (Outbox Pattern): 确保本地事务和消息发送的原子性。服务在本地事务中将业务数据和待发送消息一同写入本地消息表。然后一个独立的进程扫描消息表,将消息发送到消息队列。
    • 事务性消息 (Transactional Outbox / In-box Pattern): 接收方服务监听消息队列,处理消息并更新本地数据。为了保证幂等性,接收方需要记录已处理的消息ID。
      这种模式将分布式事务分解为多个本地事务和异步消息传递,是微服务中最常用的最终一致性方案之一。
  3. 幂等性是基石: 无论选择哪种最终一致性方案,确保所有操作的幂等性都是至关重要的。这意味着服务在接收到重复请求时,能够安全地处理而不会产生副作用。

  4. 可观测性: 建立完善的监控、日志和告警系统,跟踪分布式事务的每个阶段。当出现不一致时,能够快速发现并定位问题。

  5. 人工介入和回滚方案: 即使设计再完善,分布式事务也可能在极端情况下失败。因此,需要有清晰的人工介入流程和数据回滚方案,以便在发生严重问题时进行手动修复。

  6. 业务场景驱动: 没有银弹。选择哪种方案应根据具体的业务场景、对一致性、可用性、性能和复杂度的要求来决定。对于一些简单、不涉及核心资产的跨服务操作,甚至可以使用简单的异步通知即可。


在 Go 微服务架构中,由于其高并发特性和对性能的追求,我们通常会倾向于采用像 TCC 或 Saga 模式这样的最终一致性方案,并结合消息队列来构建健壮的分布式事务。2PC 因其阻塞性、单点故障和复杂性,在现代微服务设计中已较少被直接应用。理解它们的原理,有助于我们更好地设计和实现高可用、可伸缩的分布式系统。

感谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注