Java中的TCC模式:Try/Confirm/Cancel三个阶段的业务逻辑实现与状态管理

Java中的TCC模式:Try/Confirm/Cancel三个阶段的业务逻辑实现与状态管理

各位朋友,大家好!今天我们来深入探讨一下分布式事务中的TCC模式,也就是Try/Confirm/Cancel模式。TCC是一种柔性事务,它将一个完整的业务逻辑拆分成三个阶段,通过补偿机制来实现最终一致性。相比于传统的XA事务,TCC模式对于性能的影响更小,更适合于高并发、低延迟的分布式系统。

一、TCC模式的核心概念

TCC模式的核心在于将一个业务操作分解为以下三个阶段:

  • Try阶段: 尝试执行业务,完成所有业务检查(一致性),预留必须的业务资源(准隔离性)。Try阶段要尽量减少锁的持有时间,避免长时间阻塞其他事务。

  • Confirm阶段: 确认执行业务,真正执行业务操作,不作任何业务检查。Confirm阶段应该是幂等的,无论执行多少次,结果都应该是一样的。使用的资源是Try阶段预留的资源。

  • Cancel阶段: 取消执行业务,释放Try阶段预留的业务资源。Cancel阶段也应该是幂等的,并且要考虑Try阶段可能出现的各种异常情况,确保能够正确回滚。

二、TCC模式的适用场景

TCC模式主要适用于以下场景:

  • 跨数据库或服务事务: 当一个业务操作需要跨多个数据库或服务时,无法使用XA事务,可以使用TCC模式来实现最终一致性。
  • 对性能有较高要求的场景: TCC模式通过预留资源,减少了锁的竞争,提高了系统的并发能力。
  • 需要高度灵活的事务控制: TCC模式允许在不同的阶段进行自定义的业务逻辑处理,具有较高的灵活性。

三、TCC模式的实现步骤

实现TCC模式需要以下几个步骤:

  1. 定义TCC接口: 定义Try、Confirm、Cancel三个方法的接口,用于描述业务操作的不同阶段。
  2. 实现TCC接口: 针对具体的业务场景,实现TCC接口,编写Try、Confirm、Cancel方法的具体逻辑。
  3. 事务协调器: 需要一个事务协调器来协调各个参与者的TCC操作。事务协调器负责记录事务状态,并在事务成功或失败时,调用相应的Confirm或Cancel方法。
  4. 状态管理: 需要对TCC事务的状态进行管理,记录每个阶段的执行结果,以便在需要回滚时,能够正确执行Cancel操作。

四、代码示例:一个简单的账户转账TCC实现

为了更好地理解TCC模式,我们以一个简单的账户转账为例,演示如何使用Java实现TCC模式。

假设我们有两个账户服务:AccountServiceAAccountServiceB。我们需要实现从AccountServiceA转账到AccountServiceB的功能。

首先,我们定义TCC接口:

public interface AccountTcc {

    /**
     * Try 阶段:尝试转账,冻结转出账户的金额
     * @param accountId 转出账户ID
     * @param amount 转账金额
     * @return true:预留成功,false:预留失败
     */
    boolean tryTransfer(String transactionId, String accountId, BigDecimal amount);

    /**
     * Confirm 阶段:确认转账,从转出账户扣减金额,增加转入账户金额
     * @param accountId 转出账户ID
     * @param amount 转账金额
     * @return true:确认成功,false:确认失败
     */
    boolean confirmTransfer(String transactionId, String accountId, BigDecimal amount);

    /**
     * Cancel 阶段:取消转账,释放转出账户冻结的金额
     * @param accountId 转出账户ID
     * @param amount 转账金额
     * @return true:取消成功,false:取消失败
     */
    boolean cancelTransfer(String transactionId, String accountId, BigDecimal amount);
}

接下来,我们分别实现AccountServiceAAccountServiceB的TCC接口。

AccountServiceA的实现:

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class AccountServiceA implements AccountTcc {

    // 模拟账户余额
    private BigDecimal balance = new BigDecimal("1000.00");

    // 模拟冻结金额
    private final Map<String, BigDecimal> frozenAmounts = new ConcurrentHashMap<>();

    // 模拟事务状态
    private final Map<String, String> transactionStatus = new ConcurrentHashMap<>();

    @Override
    public boolean tryTransfer(String transactionId, String accountId, BigDecimal amount) {
        System.out.println("AccountServiceA Try: transactionId=" + transactionId + ", accountId=" + accountId + ", amount=" + amount);

        // 检查余额是否充足
        if (balance.compareTo(amount) < 0) {
            System.out.println("AccountServiceA Try: 余额不足,accountId=" + accountId + ", amount=" + amount);
            return false;
        }

        // 冻结金额
        frozenAmounts.put(transactionId, amount);
        balance = balance.subtract(amount);
        transactionStatus.put(transactionId, "TRY");
        System.out.println("AccountServiceA Try: 冻结成功,accountId=" + accountId + ", amount=" + amount + ", currentBalance=" + balance);
        return true;
    }

    @Override
    public boolean confirmTransfer(String transactionId, String accountId, BigDecimal amount) {
        System.out.println("AccountServiceA Confirm: transactionId=" + transactionId + ", accountId=" + accountId + ", amount=" + amount);
        String status = transactionStatus.get(transactionId);
        if (status == null || !status.equals("TRY")) {
            System.out.println("AccountServiceA Confirm: 事务状态异常,accountId=" + accountId + ", amount=" + amount + ", status=" + status);
            return true; // 幂等性,允许重复执行
        }

        frozenAmounts.remove(transactionId);
        transactionStatus.put(transactionId, "CONFIRM");
        System.out.println("AccountServiceA Confirm: 扣减成功,accountId=" + accountId + ", amount=" + amount + ", currentBalance=" + balance);
        return true;
    }

    @Override
    public boolean cancelTransfer(String transactionId, String accountId, BigDecimal amount) {
        System.out.println("AccountServiceA Cancel: transactionId=" + transactionId + ", accountId=" + accountId + ", amount=" + amount);

        String status = transactionStatus.get(transactionId);
        if (status == null) {
            System.out.println("AccountServiceA Cancel: 事务不存在,accountId=" + accountId + ", amount=" + amount );
            return true; // 幂等性,允许重复执行
        }

        if(status.equals("CONFIRM")){
            //如果已经CONFIRM,说明已经到最终状态,不能CANCEL。
            System.out.println("AccountServiceA Cancel: 事务已经CONFIRM,不能CANCEL,accountId=" + accountId + ", amount=" + amount );
            return true;
        }

        // 释放冻结金额
        BigDecimal frozenAmount = frozenAmounts.get(transactionId);
        if (frozenAmount != null) {
            balance = balance.add(frozenAmount);
            frozenAmounts.remove(transactionId);
            transactionStatus.put(transactionId, "CANCEL");
            System.out.println("AccountServiceA Cancel: 释放成功,accountId=" + accountId + ", amount=" + amount + ", currentBalance=" + balance);
        }
        return true;
    }

    public BigDecimal getBalance() {
        return balance;
    }
}

AccountServiceB的实现:

import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class AccountServiceB implements AccountTcc {

    // 模拟账户余额
    private BigDecimal balance = new BigDecimal("500.00");

    // 模拟事务状态
    private final Map<String, String> transactionStatus = new ConcurrentHashMap<>();

    @Override
    public boolean tryTransfer(String transactionId, String accountId, BigDecimal amount) {
        System.out.println("AccountServiceB Try: transactionId=" + transactionId + ", accountId=" + accountId + ", amount=" + amount);
        // AccountServiceB的Try阶段通常不需要做任何操作,只需要返回true即可
        transactionStatus.put(transactionId, "TRY");
        return true;
    }

    @Override
    public boolean confirmTransfer(String transactionId, String accountId, BigDecimal amount) {
        System.out.println("AccountServiceB Confirm: transactionId=" + transactionId + ", accountId=" + accountId + ", amount=" + amount);

        String status = transactionStatus.get(transactionId);
        if (status == null || !status.equals("TRY")) {
            System.out.println("AccountServiceB Confirm: 事务状态异常,accountId=" + accountId + ", amount=" + amount + ", status=" + status);
            return true; // 幂等性,允许重复执行
        }

        balance = balance.add(amount);
        transactionStatus.put(transactionId, "CONFIRM");
        System.out.println("AccountServiceB Confirm: 增加成功,accountId=" + accountId + ", amount=" + amount + ", currentBalance=" + balance);
        return true;
    }

    @Override
    public boolean cancelTransfer(String transactionId, String accountId, BigDecimal amount) {
        System.out.println("AccountServiceB Cancel: transactionId=" + transactionId + ", accountId=" + accountId + ", amount=" + amount);

        String status = transactionStatus.get(transactionId);
        if (status == null) {
            System.out.println("AccountServiceB Cancel: 事务不存在,accountId=" + accountId + ", amount=" + amount );
            return true; // 幂等性,允许重复执行
        }

        if(status.equals("CONFIRM")){
            //如果已经CONFIRM,说明已经到最终状态,不能CANCEL。
            System.out.println("AccountServiceB Cancel: 事务已经CONFIRM,不能CANCEL,accountId=" + accountId + ", amount=" + amount );
            return true;
        }

        transactionStatus.put(transactionId, "CANCEL");
        // AccountServiceB的Cancel阶段通常不需要做任何操作,只需要返回true即可
        return true;
    }

    public BigDecimal getBalance() {
        return balance;
    }
}

事务协调器:

import java.math.BigDecimal;
import java.util.UUID;

public class TransactionCoordinator {

    private final AccountServiceA accountServiceA;
    private final AccountServiceB accountServiceB;

    public TransactionCoordinator(AccountServiceA accountServiceA, AccountServiceB accountServiceB) {
        this.accountServiceA = accountServiceA;
        this.accountServiceB = accountServiceB;
    }

    public boolean transfer(String accountIdA, String accountIdB, BigDecimal amount) {
        String transactionId = UUID.randomUUID().toString();

        try {
            // Try 阶段
            boolean tryA = accountServiceA.tryTransfer(transactionId, accountIdA, amount);
            boolean tryB = accountServiceB.tryTransfer(transactionId, accountIdB, amount);

            if (!tryA || !tryB) {
                // 如果Try阶段失败,则执行Cancel
                cancel(transactionId, accountIdA, accountIdB, amount);
                return false;
            }

            // Confirm 阶段
            boolean confirmA = accountServiceA.confirmTransfer(transactionId, accountIdA, amount);
            boolean confirmB = accountServiceB.confirmTransfer(transactionId, accountIdB, amount);

            if (!confirmA || !confirmB) {
                // 如果Confirm阶段失败,则执行Cancel
                cancel(transactionId, accountIdA, accountIdB, amount);
                return false;
            }

            return true;

        } catch (Exception e) {
            // 发生异常,执行Cancel
            cancel(transactionId, accountIdA, accountIdB, amount);
            return false;
        }
    }

    private void cancel(String transactionId, String accountIdA, String accountIdB, BigDecimal amount) {
        // Cancel 阶段
        accountServiceA.cancelTransfer(transactionId, accountIdA, amount);
        accountServiceB.cancelTransfer(transactionId, accountIdB, amount);
    }

    public static void main(String[] args) {
        AccountServiceA accountServiceA = new AccountServiceA();
        AccountServiceB accountServiceB = new AccountServiceB();
        TransactionCoordinator coordinator = new TransactionCoordinator(accountServiceA, accountServiceB);

        String accountIdA = "A123";
        String accountIdB = "B456";
        BigDecimal amount = new BigDecimal("100.00");

        boolean success = coordinator.transfer(accountIdA, accountIdB, amount);

        if (success) {
            System.out.println("转账成功!");
        } else {
            System.out.println("转账失败!");
        }

        System.out.println("Account A Balance: " + accountServiceA.getBalance());
        System.out.println("Account B Balance: " + accountServiceB.getBalance());
    }
}

在这个示例中,TransactionCoordinator负责协调AccountServiceAAccountServiceB的TCC操作。它首先调用AccountServiceAAccountServiceBtryTransfer方法,如果都成功,则调用confirmTransfer方法。如果tryTransferconfirmTransfer方法失败,则调用cancelTransfer方法进行回滚。

五、状态管理

在TCC模式中,状态管理非常重要。我们需要记录每个阶段的执行结果,以便在需要回滚时,能够正确执行Cancel操作。

常见的状态包括:

状态 描述
TRY Try阶段已经执行,资源已经预留
CONFIRM Confirm阶段已经执行,事务已经提交
CANCEL Cancel阶段已经执行,事务已经回滚
TRY_FAILED Try阶段执行失败
CONFIRM_FAILED Confirm阶段执行失败
CANCEL_FAILED Cancel阶段执行失败

状态可以存储在数据库中,也可以存储在缓存中。需要根据具体的业务场景选择合适的存储方式。在上面的代码中,我们使用了简单的Map模拟了状态管理。在实际应用中,我们需要使用数据库进行持久化。

六、幂等性

TCC模式要求Confirm和Cancel方法必须是幂等的。这意味着无论执行多少次Confirm或Cancel方法,结果都应该是一样的。

为了保证幂等性,可以采用以下策略:

  • 使用唯一事务ID: 在每次执行Confirm或Cancel方法时,都使用唯一的事务ID来标识事务。如果已经执行过该事务,则直接返回成功,不再重复执行。
  • 状态检查: 在执行Confirm或Cancel方法之前,先检查事务的状态。如果事务已经处于Confirm或Cancel状态,则直接返回成功,不再重复执行。
  • 乐观锁: 使用乐观锁来防止并发执行Confirm或Cancel方法。

七、空补偿

在某些情况下,Cancel阶段可能会在Try阶段之前执行。这种情况被称为空补偿。

例如,当事务协调器在调用Try方法之前,由于网络原因,调用Cancel方法时,就会发生空补偿。

为了处理空补偿,需要在Cancel方法中进行判断:如果Try方法还没有执行,则直接返回成功。

八、悬挂处理

在某些情况下,Try阶段可能会在Cancel阶段之后执行。这种情况被称为悬挂。

例如,当事务协调器在调用Cancel方法之后,由于网络原因,Try方法才被执行时,就会发生悬挂。

为了处理悬挂,需要在Try方法中进行判断:如果Cancel方法已经执行,则直接返回失败。或者在Confirm和Cancel方法执行时,判断Try是否已经成功执行。

九、TCC框架

为了简化TCC模式的开发,可以使用一些TCC框架。常见的TCC框架包括:

  • Seata: 阿里巴巴开源的分布式事务解决方案,支持TCC、AT、SAGA等多种事务模式。
  • ByteTCC: 一款高性能的TCC事务中间件,支持多种数据库。
  • Himly: 一款轻量级的TCC框架,易于集成和使用。

使用TCC框架可以减少开发工作量,提高开发效率。

十、需要注意的点

  • 资源预留: Try阶段需要预留足够的资源,以保证Confirm阶段能够顺利执行。
  • 网络延迟: 分布式系统中的网络延迟是不可避免的,需要考虑网络延迟对TCC事务的影响。
  • 异常处理: 需要对Try、Confirm、Cancel三个阶段的异常进行妥善处理,确保事务能够正确回滚。
  • 监控和告警: 需要对TCC事务进行监控和告警,及时发现和处理问题。

总结一下:

TCC模式作为一种柔性事务解决方案,通过Try、Confirm、Cancel三个阶段的操作,实现了分布式环境下的最终一致性。 掌握TCC模式的核心概念、实现步骤以及需要注意的点,可以帮助我们构建更加可靠和高效的分布式系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注