JAVA 分布式事务一致性难题?使用 Seata 实现 AT 模式全局回滚

JAVA 分布式事务一致性难题:使用 Seata 实现 AT 模式全局回滚

大家好,今天我们来聊聊在分布式系统中,如何解决事务一致性这个老大难问题。尤其是针对 Seata 的 AT (Automatic Transaction) 模式,我们会深入探讨其原理,并通过代码示例演示如何实现全局回滚。

分布式事务的挑战

在单体应用时代,事务管理相对简单,依赖数据库的 ACID 特性即可保证数据的一致性。然而,随着微服务架构的流行,一个业务流程往往需要跨越多个服务,每个服务可能使用不同的数据库,甚至不同的数据存储技术。这时,传统的本地事务已经无法保证全局数据的一致性,分布式事务问题应运而生。

为什么分布式事务这么难?主要体现在以下几个方面:

  • 网络延迟: 服务之间的调用需要通过网络,网络延迟是不可避免的,这会影响事务的执行时间和成功率。
  • 服务不可用: 任何一个服务都有可能出现故障,导致整个事务无法完成。
  • 数据一致性: 如何保证在分布式环境下,所有服务的数据要么全部成功,要么全部回滚,是最大的挑战。

常见的分布式事务解决方案

解决分布式事务的方案有很多,各有优缺点。常见的方案包括:

  • 2PC (Two-Phase Commit): 经典的分布式事务协议,通过 Prepare 和 Commit 两个阶段来保证事务的一致性。缺点是性能较差,存在单点故障的风险。
  • 3PC (Three-Phase Commit): 对 2PC 的改进,试图解决 2PC 的阻塞问题,但仍然存在性能瓶颈。
  • TCC (Try-Confirm-Cancel): 业务侵入性强,需要为每个业务操作编写 Try、Confirm 和 Cancel 三个阶段的逻辑。
  • Seata AT 模式: 一种基于 Undo Log 的补偿机制,对业务的侵入性较小,性能较好。
  • 消息队列事务: 通过消息队列来实现最终一致性,适用于对数据一致性要求不高的场景。

各种分布式事务方案的比较:

方案 优点 缺点 适用场景
2PC 理论上保证强一致性 性能差,存在单点故障风险,阻塞问题 对一致性要求极高的场景,但实际应用较少
3PC 试图解决 2PC 的阻塞问题 性能仍然较差,复杂度高 与 2PC 类似,但应用更少
TCC 业务可控性高,可以针对特定业务场景进行优化 业务侵入性强,需要编写大量的补偿逻辑,开发成本高 对业务逻辑有较强控制的场景,例如涉及到资金操作
Seata AT 对业务侵入性较小,性能较好,支持多种数据库,社区活跃 存在脏写问题,需要依赖全局事务协调器 (TC) 大部分场景,尤其是对性能有一定要求的场景
消息队列事务 实现最终一致性,解耦性好,吞吐量高 一致性级别较低,存在数据延迟 对一致性要求不高的场景,例如异步通知、日志处理

Seata AT 模式详解

今天我们重点关注 Seata 的 AT 模式。AT 模式是一种基于 Undo Log 的补偿机制,其核心思想是:

  1. 业务 SQL 执行前: 记录原始数据快照 (Undo Log)。
  2. 业务 SQL 执行: 直接更新业务数据。
  3. 提交阶段: 提交分支事务,释放本地锁。
  4. 回滚阶段: 如果全局事务需要回滚,则根据 Undo Log 恢复数据。

AT 模式的优点在于对业务代码的侵入性较小,只需要引入 Seata 的 SDK 即可。但它也存在一些缺点,例如:

  • 脏写: 在全局提交之前,其他事务可能会读取到未提交的数据。
  • 全局事务协调器 (TC) 依赖: 需要依赖 Seata 的 TC 来协调全局事务。

AT 模式的工作流程

AT 模式的详细工作流程如下:

  1. Begin: 全局事务发起者向 TC 申请全局事务 ID (XID)。
  2. Branch Register: 每个参与者向 TC 注册分支事务,并携带 XID。
  3. SQL Execute: 执行业务 SQL,同时生成 Undo Log。
  4. Branch Report: 参与者向 TC 报告分支事务的状态 (成功或失败)。
  5. Global Commit/Rollback: TC 根据分支事务的状态决定全局事务是提交还是回滚。
  6. Branch Commit/Rollback: TC 通知参与者提交或回滚分支事务。
  7. Async Commit/Rollback: 参与者异步提交或回滚分支事务。

Undo Log 的作用

Undo Log 是 AT 模式的核心,它记录了业务 SQL 执行前的数据快照。Undo Log 包含了以下信息:

  • XID: 全局事务 ID。
  • Branch ID: 分支事务 ID。
  • Resource ID: 数据库连接信息。
  • SQL Type: SQL 语句类型 (INSERT、UPDATE、DELETE)。
  • Before Image: SQL 执行前的数据快照。
  • After Image: SQL 执行后的数据快照。

在回滚阶段,Seata 会根据 Undo Log 中的信息,将数据恢复到 SQL 执行前的状态。

使用 Seata 实现 AT 模式全局回滚的代码示例

接下来,我们通过一个简单的例子来演示如何使用 Seata 实现 AT 模式的全局回滚。

场景:

假设我们有两个服务:

  • Order Service: 负责创建订单。
  • Storage Service: 负责扣减库存。

当用户下单时,Order Service 会创建一个订单,然后调用 Storage Service 扣减库存。如果扣减库存失败,则需要回滚订单的创建。

1. 环境准备:

  • 安装 Seata Server。
  • 创建 Order Service 和 Storage Service 两个微服务。
  • 配置数据库连接池。
  • 引入 Seata 的依赖。

2. 引入 Seata 依赖 (以 Maven 为例):

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.4.2</version>
</dependency>

3. 配置 Seata:

application.ymlapplication.properties 中配置 Seata:

seata:
  enabled: true
  application-id: order-service # 应用名称
  tx-service-group: default_tx_group # 事务组名称
  registry:
    type: nacos # 注册中心类型
    nacos:
      server-addr: 127.0.0.1:8848 # Nacos 地址
      namespace: public # Nacos 命名空间
  config:
    type: nacos # 配置中心类型
    nacos:
      server-addr: 127.0.0.1:8848 # Nacos 地址
      namespace: public # Nacos 命名空间

4. 定义实体类:

  • Order.java (Order Service):
import lombok.Data;

@Data
public class Order {
    private Long id;
    private Long userId;
    private Long productId;
    private Integer count;
    private Double money;
    private Integer status; // 0: 创建中, 1: 已完成
}
  • Storage.java (Storage Service):
import lombok.Data;

@Data
public class Storage {
    private Long id;
    private Long productId;
    private Integer total;
    private Integer used;
}

5. 定义 Mapper 接口:

  • OrderMapper.java (Order Service):
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

@Mapper
public interface OrderMapper {
    int create(Order order);
}
  • StorageMapper.java (Storage Service):
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

@Mapper
public interface StorageMapper {
    int decrease(@Param("productId") Long productId, @Param("count") Integer count);
}

6. 定义 Service 接口和实现类:

  • OrderService.java (Order Service):
public interface OrderService {
    void create(Order order);
}
  • OrderServiceImpl.java (Order Service):
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private StorageService storageService;

    @Override
    @GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
    public void create(Order order) {
        // 1. 创建订单
        orderMapper.create(order);

        // 2. 扣减库存
        storageService.decrease(order.getProductId(), order.getCount());

        // 模拟异常,触发回滚
        // if (order.getProductId() == 1) {
        //     throw new RuntimeException("模拟库存不足,回滚订单");
        // }
    }
}
  • StorageService.java (Storage Service):
public interface StorageService {
    void decrease(Long productId, Integer count);
}
  • StorageServiceImpl.java (Storage Service):
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class StorageServiceImpl implements StorageService {

    @Autowired
    private StorageMapper storageMapper;

    @Override
    public void decrease(Long productId, Integer count) {
        int result = storageMapper.decrease(productId, count);
        if (result == 0) {
            throw new RuntimeException("库存不足");
        }
    }
}

7. 编写 Mapper XML 文件:

  • OrderMapper.xml (Order Service):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.orderservice.mapper.OrderMapper">
    <insert id="create" parameterType="com.example.orderservice.entity.Order" useGeneratedKeys="true" keyProperty="id">
        INSERT INTO t_order (user_id, product_id, count, money, status)
        VALUES (#{userId}, #{productId}, #{count}, #{money}, 0)
    </insert>
</mapper>
  • StorageMapper.xml (Storage Service):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.storageservice.mapper.StorageMapper">
    <update id="decrease">
        UPDATE t_storage
        SET used = used + #{count}, total = total - #{count}
        WHERE product_id = #{productId} AND total >= #{count}
    </update>
</mapper>

8. 创建数据库表:

  • t_order (Order Service):
CREATE TABLE `t_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(20) DEFAULT NULL,
  `product_id` bigint(20) DEFAULT NULL,
  `count` int(11) DEFAULT NULL,
  `money` decimal(10,2) DEFAULT NULL,
  `status` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • t_storage (Storage Service):
CREATE TABLE `t_storage` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `product_id` bigint(20) DEFAULT NULL,
  `total` int(11) DEFAULT NULL,
  `used` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `t_storage` (`id`, `product_id`, `total`, `used`) VALUES (1, 1, 100, 0);

9. 测试:

在 Order Service 中调用 OrderService.create() 方法创建一个订单。如果 Storage Service 扣减库存失败,则 Order Service 会回滚订单的创建。

10. Seata AT 模式的关键点:

  • @GlobalTransactional 注解: 这个注解是 Seata AT 模式的核心,它声明了一个全局事务的入口点。 name 属性定义了全局事务的名称, rollbackFor 属性指定了需要回滚的异常类型。
  • 数据源代理: Seata 需要代理数据源,才能拦截 SQL 语句并生成 Undo Log。 通常情况下, seata-spring-boot-starter 会自动完成数据源的代理。 如果需要手动配置,可以使用 io.seata.rm.datasource.DataSourceProxy 类。
  • TC Server: Seata 需要一个 TC Server 来协调全局事务。 需要保证 TC Server 正常运行,并且配置正确的注册中心和配置中心。
  • Undo Log 表: Seata 会自动创建 Undo Log 表,用于存储事务的回滚信息。 Undo Log 表的名称默认为 undo_log, 可以通过配置进行修改。
  • SQL 解析: Seata 会解析 SQL 语句,提取出需要记录的数据快照。目前 Seata 支持多种数据库的 SQL 解析。

进一步优化

  • 性能优化: 针对高并发场景,可以考虑使用 Seata 的 Saga 模式或 TCC 模式,以提高性能。
  • 异常处理: 在全局事务中,需要对各种异常情况进行处理,例如网络异常、服务不可用等。 可以使用重试机制或补偿机制来保证事务的最终一致性。
  • 监控和告警: 对 Seata 的运行状态进行监控,并设置告警规则,以便及时发现和解决问题。

关于 AT 模式的一些补充说明

  • 锁机制: AT 模式依赖于数据库的行锁来实现隔离性。在提交之前,Seata 会尝试获取全局锁,以防止其他事务修改相同的数据。如果获取锁失败,则会进行重试。
  • 数据源隔离: 为了防止脏读,建议使用读写分离的数据源。在全局事务中,所有读操作都应该从只读库读取数据。
  • 幂等性: 为了防止重复执行,需要保证业务操作的幂等性。可以使用 Token 机制或版本号机制来实现幂等性。
  • AT 模式的限制: AT 模式对 SQL 语句有一定的限制。 例如,不支持跨库事务。 在使用 AT 模式时,需要仔细阅读 Seata 的官方文档,了解其限制。

AT 模式的应用场景

AT 模式适用于以下场景:

  • 对数据一致性有一定要求,但允许一定的延迟。
  • 业务逻辑相对简单,不需要复杂的补偿逻辑。
  • 对性能有一定要求,不希望对业务代码进行过多的侵入。

总结 AT 模式的要点

AT 模式是解决分布式事务一致性问题的一种有效方案,它具有对业务侵入性小、性能较好等优点。通过本文的讲解和代码示例,相信大家对 Seata 的 AT 模式有了更深入的理解。

分布式事务的未来发展

随着云计算和微服务架构的不断发展,分布式事务问题将变得越来越重要。未来的分布式事务解决方案将更加注重性能、可扩展性和易用性。例如,基于 Service Mesh 的分布式事务方案正在成为一种新的趋势。 期待未来出现更多更好的分布式事务解决方案,帮助我们构建更加可靠和高效的分布式系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注