JAVA 分布式事务一致性难题?基于 Seata 实现 TCC 模式的完整流程

Seata TCC 模式:解决 Java 分布式事务一致性难题

各位同学,大家好!今天我们来聊聊 Java 分布式事务一致性难题以及如何利用 Seata 的 TCC 模式来解决这个问题。

什么是分布式事务?

在一个单体应用中,事务管理相对简单,ACID 特性由数据库自身保证。但当应用拆分成多个微服务后,一个业务操作可能需要跨越多个服务,每个服务都有自己的数据库。这时,传统的事务机制就无法保证数据的一致性了,这就是分布式事务。

例如,一个电商场景,用户下单需要扣减库存、生成订单、扣除用户余额,这三个操作分别位于不同的服务中:库存服务、订单服务、账户服务。如果扣减库存成功,生成订单成功,但扣除用户余额失败,就会导致数据不一致,用户下单了,但钱没扣,商家就亏了。

分布式事务的挑战

保证分布式事务的 ACID 特性面临着诸多挑战:

  • 原子性 (Atomicity): 多个服务要么全部成功,要么全部失败。
  • 一致性 (Consistency): 事务执行前后,系统数据处于一致的状态。
  • 隔离性 (Isolation): 并发事务之间互不干扰。
  • 持久性 (Durability): 事务一旦提交,数据更改永久保存。

Seata 简介

Seata (Simple Extensible Autonomous Transaction Architecture) 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。它支持多种事务模式,包括 AT、TCC、SAGA 和 XA。今天我们主要聚焦 TCC 模式。

TCC 模式原理

TCC (Try-Confirm-Cancel) 是一种柔性事务解决方案,它将一个分布式事务拆分成三个阶段:

  • Try 阶段: 尝试执行业务,完成所有业务检查 (一致性),预留必须的业务资源 (准隔离性)。
  • Confirm 阶段: 确认执行业务,不作任何业务检查,直接使用 Try 阶段预留的业务资源完成业务。Confirm 操作要满足幂等性。
  • Cancel 阶段: 取消执行业务,释放 Try 阶段预留的业务资源。Cancel 操作也要满足幂等性。

TCC 模式的优点:

  • 高性能: 相比 XA 模式,TCC 模式不需要依赖数据库的 XA 协议,性能更高。
  • 资源利用率高: Try 阶段预留资源,Confirm 阶段直接使用,避免了资源争用。
  • 业务侵入性低: TCC 模式对业务的侵入性相对较低,只需要实现 Try、Confirm、Cancel 三个方法即可。

TCC 模式的缺点:

  • 开发复杂度高: 需要为每个服务实现 Try、Confirm、Cancel 三个方法,开发复杂度较高。
  • 数据一致性要求较高: 需要保证 Try 阶段的业务检查足够完善,避免出现数据不一致的情况。
  • 需要考虑空回滚、幂等问题 这些都需要额外处理。

Seata TCC 模式实现流程

接下来,我们通过一个简单的示例来说明如何使用 Seata 实现 TCC 模式。假设我们有一个库存服务和一个订单服务,用户下单需要扣减库存并在订单服务中创建订单。

1. 定义 TCC 接口

首先,我们需要在库存服务中定义 TCC 接口:

public interface StockTccService {

    /**
     * 扣减库存 Try 阶段
     * @param businessActionContext 分布式事务上下文
     * @param productId 产品ID
     * @param count 扣减数量
     * @return
     */
    @TwoPhaseBusinessAction(name = "stockTccService", commitMethod = "commitDeduct", rollbackMethod = "rollbackDeduct")
    boolean tryDeduct(BusinessActionContext businessActionContext, String productId, int count);

    /**
     * 扣减库存 Confirm 阶段
     * @param businessActionContext 分布式事务上下文
     * @return
     */
    boolean commitDeduct(BusinessActionContext businessActionContext);

    /**
     * 扣减库存 Cancel 阶段
     * @param businessActionContext 分布式事务上下文
     * @return
     */
    boolean rollbackDeduct(BusinessActionContext businessActionContext);
}
  • @TwoPhaseBusinessAction 注解用于标识 TCC 接口,name 属性是 TCC 服务的名称,commitMethodrollbackMethod 属性分别指定 Confirm 和 Cancel 方法。
  • BusinessActionContext 是 Seata 提供的上下文对象,用于在 Try、Confirm、Cancel 三个阶段传递数据。

2. 实现 TCC 接口

接下来,我们需要在库存服务中实现 TCC 接口:

@Service("stockTccService")
public class StockTccServiceImpl implements StockTccService {

    private static final Logger LOGGER = LoggerFactory.getLogger(StockTccServiceImpl.class);

    @Autowired
    private StockDao stockDao;

    @Override
    public boolean tryDeduct(BusinessActionContext businessActionContext, String productId, int count) {
        LOGGER.info("库存服务 try 阶段, productId: {}, count: {}", productId, count);

        // 1. 检查库存是否足够
        int stock = stockDao.getStock(productId);
        if (stock < count) {
            throw new RuntimeException("库存不足");
        }

        // 2. 冻结库存
        stockDao.freezeStock(productId, count);

        // 3. 保存 try 阶段信息,用于 confirm 和 cancel 阶段
        businessActionContext.bind("productId", productId);
        businessActionContext.bind("count", count);

        return true;
    }

    @Override
    public boolean commitDeduct(BusinessActionContext businessActionContext) {
        String productId = (String) businessActionContext.getActionContext("productId");
        Integer count = (Integer) businessActionContext.getActionContext("count");

        LOGGER.info("库存服务 commit 阶段, productId: {}, count: {}", productId, count);

        // 1. 扣减实际库存
        stockDao.deductStock(productId, count);

        // 2. 清除冻结库存
        stockDao.clearFreezeStock(productId, count);

        return true;
    }

    @Override
    public boolean rollbackDeduct(BusinessActionContext businessActionContext) {
        String productId = (String) businessActionContext.getActionContext("productId");
        Integer count = (Integer) businessActionContext.getActionContext("count");

        LOGGER.info("库存服务 rollback 阶段, productId: {}, count: {}", productId, count);

        // 1. 释放冻结库存
        stockDao.releaseFreezeStock(productId, count);

        return true;
    }
}
  • tryDeduct 方法:首先检查库存是否足够,然后冻结库存,并将 productId 和 count 保存到 BusinessActionContext 中,供后续 Confirm 和 Cancel 阶段使用。
  • commitDeduct 方法:扣减实际库存,清除冻结库存。
  • rollbackDeduct 方法:释放冻结库存。

3. 在订单服务中调用 TCC 接口

接下来,我们需要在订单服务中调用库存服务的 TCC 接口:

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private StockTccService stockTccService;

    @GlobalTransactional(timeoutMills = 300000, name = "createOrder")
    @Override
    public void createOrder(String productId, int count) {
        BusinessActionContext businessActionContext = new BusinessActionContext();
        try {
            // 1. 扣减库存 (Try 阶段)
            boolean tryResult = stockTccService.tryDeduct(businessActionContext, productId, count);
            if (!tryResult) {
                throw new RuntimeException("扣减库存失败 (Try 阶段)");
            }

            // 2. 创建订单
            orderDao.createOrder(productId, count);

        } catch (Exception e) {
            // 事务回滚
            throw new RuntimeException("创建订单失败", e);
        }
    }
}
  • @GlobalTransactional 注解用于标识全局事务,name 属性是全局事务的名称。
  • createOrder 方法中,首先调用库存服务的 tryDeduct 方法,如果 Try 阶段失败,则抛出异常,触发全局事务回滚。如果 Try 阶段成功,则创建订单。

4. 数据表设计

为了实现 TCC 模式,我们需要在数据库中添加一些额外的字段:

库存表 (stock):

字段名 类型 说明
product_id VARCHAR 产品ID
stock INT 实际库存
freeze_stock INT 冻结库存

订单表 (order):

字段名 类型 说明
order_id VARCHAR 订单ID
product_id VARCHAR 产品ID
count INT 数量

5. Dao 层实现(以 MyBatis 为例)

// StockDao.java
public interface StockDao {
    int getStock(String productId);
    void freezeStock(String productId, int count);
    void deductStock(String productId, int count);
    void clearFreezeStock(String productId, int count);
    void releaseFreezeStock(String productId, int count);
}

// StockDao.xml
<mapper namespace="com.example.stock.dao.StockDao">
    <select id="getStock" resultType="int">
        SELECT stock FROM stock WHERE product_id = #{productId}
    </select>

    <update id="freezeStock">
        UPDATE stock SET stock = stock - #{count}, freeze_stock = freeze_stock + #{count}
        WHERE product_id = #{productId} AND stock >= #{count}
    </update>

    <update id="deductStock">
        UPDATE stock SET freeze_stock = freeze_stock - #{count}
        WHERE product_id = #{productId}
    </update>

    <update id="clearFreezeStock">
         UPDATE stock SET freeze_stock = freeze_stock - #{count}
        WHERE product_id = #{productId}
    </update>

    <update id="releaseFreezeStock">
        UPDATE stock SET stock = stock + #{count}, freeze_stock = freeze_stock - #{count}
        WHERE product_id = #{productId}
    </update>
</mapper>

//OrderDao.java
public interface OrderDao {
    void createOrder(String productId, int count);
}

//OrderDao.xml
<mapper namespace="com.example.order.dao.OrderDao">
    <insert id="createOrder">
        INSERT INTO `order` (product_id, count) VALUES (#{productId}, #{count})
    </insert>
</mapper>

6. Seata 配置

我们需要配置 Seata Server 和 Client。

Seata Server:

Seata Server 是全局事务协调器,负责协调各个服务的事务。你需要下载 Seata Server 的安装包,并进行配置。

Seata Client:

Seata Client 集成在各个服务中,负责与 Seata Server 通信。你需要在你的 Spring Boot 项目中引入 Seata 的依赖,并进行配置。

<!-- seata -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.6.1</version>
</dependency>

application.yml 配置:

seata:
  enabled: true
  application-id: your-application-name
  tx-service-group: default_tx_group # 事务组,所有参与同一个全局事务的服务必须使用相同的事务组
  client:
    rm:
      report-success-enable: true
  config:
    type: file
    file:
      name: seata.conf
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: public

7. 解决 TCC 的特殊问题

TCC 模式虽然灵活,但也存在一些需要特别处理的问题:

  • 空回滚: 当 Try 阶段未执行,但收到 Cancel 请求时,需要进行空回滚处理,避免资源被错误释放。
  • 幂等性: Confirm 和 Cancel 阶段需要保证幂等性,避免重复执行导致数据错误。
  • 悬挂: 当 Confirm 或 Cancel 请求先于 Try 请求到达时,需要拒绝执行,避免数据不一致。

解决空回滚:

可以在执行 Cancel 操作前,先检查 Try 阶段是否执行过。如果没有执行过,则直接返回成功。

@Override
public boolean rollbackDeduct(BusinessActionContext businessActionContext) {
    String productId = (String) businessActionContext.getActionContext("productId");
    Integer count = (Integer) businessActionContext.getActionContext("count");

    LOGGER.info("库存服务 rollback 阶段, productId: {}, count: {}", productId, count);

    // 1.  检查 Try 阶段是否执行过
    if (!stockDao.isFreezeStockExist(productId, count)) {
        LOGGER.warn("空回滚: productId: {}, count: {}", productId, count);
        return true; // 空回滚,直接返回成功
    }

    // 2. 释放冻结库存
    stockDao.releaseFreezeStock(productId, count);

    return true;
}

需要新增isFreezeStockExist方法在StockDao中及其对应的XML映射。

解决幂等性:

可以通过状态机或唯一标识来保证 Confirm 和 Cancel 阶段的幂等性。例如,可以在数据库中添加一个状态字段,记录 Try、Confirm 和 Cancel 阶段的执行状态。

@Override
public boolean commitDeduct(BusinessActionContext businessActionContext) {
    String productId = (String) businessActionContext.getActionContext("productId");
    Integer count = (Integer) businessActionContext.getActionContext("count");

    LOGGER.info("库存服务 commit 阶段, productId: {}, count: {}", productId, count);

    // 1.  检查是否已经执行过 Commit
    if (stockDao.isCommit(productId, count)) {
        LOGGER.warn("幂等性控制:commit 已执行, productId: {}, count: {}", productId, count);
        return true; // 已经执行过,直接返回成功
    }

    // 2. 扣减实际库存
    stockDao.deductStock(productId, count);

    // 3. 清除冻结库存
    stockDao.clearFreezeStock(productId, count);

     //4. 标记为已Commit
     stockDao.markCommit(productId,count);

    return true;
}

需要新增isCommitmarkCommit方法在StockDao中及其对应的XML映射,并且在数据库的stock表增加一个commit状态字段。

解决悬挂:

可以在 Try 阶段创建一个事务记录,用于标记 Try 阶段已经执行过。在 Confirm 和 Cancel 阶段,先检查事务记录是否存在,如果不存在,则拒绝执行。

总结:

Seata TCC 模式提供了一种灵活的分布式事务解决方案,通过 Try、Confirm、Cancel 三个阶段来保证数据的一致性。 虽然开发复杂度较高,但可以有效解决分布式场景下的数据一致性问题。在实际应用中,需要充分考虑空回滚、幂等性和悬挂等问题,并采取相应的措施来保证事务的正确执行。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注