JAVA 分布式事务一致性难题:使用 Seata 实现 AT 模式全局回滚
大家好,今天我们来聊聊在分布式系统中,如何解决事务一致性这个老大难问题。尤其是针对 Seata 的 AT (Automatic Transaction) 模式,我们会深入探讨其原理,并通过代码示例演示如何实现全局回滚。
分布式事务的挑战
在单体应用时代,事务管理相对简单,依赖数据库的 ACID 特性即可保证数据的一致性。然而,随着微服务架构的流行,一个业务流程往往需要跨越多个服务,每个服务可能使用不同的数据库,甚至不同的数据存储技术。这时,传统的本地事务已经无法保证全局数据的一致性,分布式事务问题应运而生。
为什么分布式事务这么难?主要体现在以下几个方面:
- 网络延迟: 服务之间的调用需要通过网络,网络延迟是不可避免的,这会影响事务的执行时间和成功率。
- 服务不可用: 任何一个服务都有可能出现故障,导致整个事务无法完成。
- 数据一致性: 如何保证在分布式环境下,所有服务的数据要么全部成功,要么全部回滚,是最大的挑战。
常见的分布式事务解决方案
解决分布式事务的方案有很多,各有优缺点。常见的方案包括:
- 2PC (Two-Phase Commit): 经典的分布式事务协议,通过 Prepare 和 Commit 两个阶段来保证事务的一致性。缺点是性能较差,存在单点故障的风险。
- 3PC (Three-Phase Commit): 对 2PC 的改进,试图解决 2PC 的阻塞问题,但仍然存在性能瓶颈。
- TCC (Try-Confirm-Cancel): 业务侵入性强,需要为每个业务操作编写 Try、Confirm 和 Cancel 三个阶段的逻辑。
- Seata AT 模式: 一种基于 Undo Log 的补偿机制,对业务的侵入性较小,性能较好。
- 消息队列事务: 通过消息队列来实现最终一致性,适用于对数据一致性要求不高的场景。
各种分布式事务方案的比较:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 2PC | 理论上保证强一致性 | 性能差,存在单点故障风险,阻塞问题 | 对一致性要求极高的场景,但实际应用较少 |
| 3PC | 试图解决 2PC 的阻塞问题 | 性能仍然较差,复杂度高 | 与 2PC 类似,但应用更少 |
| TCC | 业务可控性高,可以针对特定业务场景进行优化 | 业务侵入性强,需要编写大量的补偿逻辑,开发成本高 | 对业务逻辑有较强控制的场景,例如涉及到资金操作 |
| Seata AT | 对业务侵入性较小,性能较好,支持多种数据库,社区活跃 | 存在脏写问题,需要依赖全局事务协调器 (TC) | 大部分场景,尤其是对性能有一定要求的场景 |
| 消息队列事务 | 实现最终一致性,解耦性好,吞吐量高 | 一致性级别较低,存在数据延迟 | 对一致性要求不高的场景,例如异步通知、日志处理 |
Seata AT 模式详解
今天我们重点关注 Seata 的 AT 模式。AT 模式是一种基于 Undo Log 的补偿机制,其核心思想是:
- 业务 SQL 执行前: 记录原始数据快照 (Undo Log)。
- 业务 SQL 执行: 直接更新业务数据。
- 提交阶段: 提交分支事务,释放本地锁。
- 回滚阶段: 如果全局事务需要回滚,则根据 Undo Log 恢复数据。
AT 模式的优点在于对业务代码的侵入性较小,只需要引入 Seata 的 SDK 即可。但它也存在一些缺点,例如:
- 脏写: 在全局提交之前,其他事务可能会读取到未提交的数据。
- 全局事务协调器 (TC) 依赖: 需要依赖 Seata 的 TC 来协调全局事务。
AT 模式的工作流程
AT 模式的详细工作流程如下:
- Begin: 全局事务发起者向 TC 申请全局事务 ID (XID)。
- Branch Register: 每个参与者向 TC 注册分支事务,并携带 XID。
- SQL Execute: 执行业务 SQL,同时生成 Undo Log。
- Branch Report: 参与者向 TC 报告分支事务的状态 (成功或失败)。
- Global Commit/Rollback: TC 根据分支事务的状态决定全局事务是提交还是回滚。
- Branch Commit/Rollback: TC 通知参与者提交或回滚分支事务。
- Async Commit/Rollback: 参与者异步提交或回滚分支事务。
Undo Log 的作用
Undo Log 是 AT 模式的核心,它记录了业务 SQL 执行前的数据快照。Undo Log 包含了以下信息:
- XID: 全局事务 ID。
- Branch ID: 分支事务 ID。
- Resource ID: 数据库连接信息。
- SQL Type: SQL 语句类型 (INSERT、UPDATE、DELETE)。
- Before Image: SQL 执行前的数据快照。
- After Image: SQL 执行后的数据快照。
在回滚阶段,Seata 会根据 Undo Log 中的信息,将数据恢复到 SQL 执行前的状态。
使用 Seata 实现 AT 模式全局回滚的代码示例
接下来,我们通过一个简单的例子来演示如何使用 Seata 实现 AT 模式的全局回滚。
场景:
假设我们有两个服务:
- Order Service: 负责创建订单。
- Storage Service: 负责扣减库存。
当用户下单时,Order Service 会创建一个订单,然后调用 Storage Service 扣减库存。如果扣减库存失败,则需要回滚订单的创建。
1. 环境准备:
- 安装 Seata Server。
- 创建 Order Service 和 Storage Service 两个微服务。
- 配置数据库连接池。
- 引入 Seata 的依赖。
2. 引入 Seata 依赖 (以 Maven 为例):
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
3. 配置 Seata:
在 application.yml 或 application.properties 中配置 Seata:
seata:
enabled: true
application-id: order-service # 应用名称
tx-service-group: default_tx_group # 事务组名称
registry:
type: nacos # 注册中心类型
nacos:
server-addr: 127.0.0.1:8848 # Nacos 地址
namespace: public # Nacos 命名空间
config:
type: nacos # 配置中心类型
nacos:
server-addr: 127.0.0.1:8848 # Nacos 地址
namespace: public # Nacos 命名空间
4. 定义实体类:
- Order.java (Order Service):
import lombok.Data;
@Data
public class Order {
private Long id;
private Long userId;
private Long productId;
private Integer count;
private Double money;
private Integer status; // 0: 创建中, 1: 已完成
}
- Storage.java (Storage Service):
import lombok.Data;
@Data
public class Storage {
private Long id;
private Long productId;
private Integer total;
private Integer used;
}
5. 定义 Mapper 接口:
- OrderMapper.java (Order Service):
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface OrderMapper {
int create(Order order);
}
- StorageMapper.java (Storage Service):
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface StorageMapper {
int decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
6. 定义 Service 接口和实现类:
- OrderService.java (Order Service):
public interface OrderService {
void create(Order order);
}
- OrderServiceImpl.java (Order Service):
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private StorageService storageService;
@Override
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void create(Order order) {
// 1. 创建订单
orderMapper.create(order);
// 2. 扣减库存
storageService.decrease(order.getProductId(), order.getCount());
// 模拟异常,触发回滚
// if (order.getProductId() == 1) {
// throw new RuntimeException("模拟库存不足,回滚订单");
// }
}
}
- StorageService.java (Storage Service):
public interface StorageService {
void decrease(Long productId, Integer count);
}
- StorageServiceImpl.java (Storage Service):
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageMapper storageMapper;
@Override
public void decrease(Long productId, Integer count) {
int result = storageMapper.decrease(productId, count);
if (result == 0) {
throw new RuntimeException("库存不足");
}
}
}
7. 编写 Mapper XML 文件:
- OrderMapper.xml (Order Service):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.orderservice.mapper.OrderMapper">
<insert id="create" parameterType="com.example.orderservice.entity.Order" useGeneratedKeys="true" keyProperty="id">
INSERT INTO t_order (user_id, product_id, count, money, status)
VALUES (#{userId}, #{productId}, #{count}, #{money}, 0)
</insert>
</mapper>
- StorageMapper.xml (Storage Service):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.storageservice.mapper.StorageMapper">
<update id="decrease">
UPDATE t_storage
SET used = used + #{count}, total = total - #{count}
WHERE product_id = #{productId} AND total >= #{count}
</update>
</mapper>
8. 创建数据库表:
- t_order (Order Service):
CREATE TABLE `t_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) DEFAULT NULL,
`product_id` bigint(20) DEFAULT NULL,
`count` int(11) DEFAULT NULL,
`money` decimal(10,2) DEFAULT NULL,
`status` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- t_storage (Storage Service):
CREATE TABLE `t_storage` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`product_id` bigint(20) DEFAULT NULL,
`total` int(11) DEFAULT NULL,
`used` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `t_storage` (`id`, `product_id`, `total`, `used`) VALUES (1, 1, 100, 0);
9. 测试:
在 Order Service 中调用 OrderService.create() 方法创建一个订单。如果 Storage Service 扣减库存失败,则 Order Service 会回滚订单的创建。
10. Seata AT 模式的关键点:
@GlobalTransactional注解: 这个注解是 Seata AT 模式的核心,它声明了一个全局事务的入口点。name属性定义了全局事务的名称,rollbackFor属性指定了需要回滚的异常类型。- 数据源代理: Seata 需要代理数据源,才能拦截 SQL 语句并生成 Undo Log。 通常情况下,
seata-spring-boot-starter会自动完成数据源的代理。 如果需要手动配置,可以使用io.seata.rm.datasource.DataSourceProxy类。 - TC Server: Seata 需要一个 TC Server 来协调全局事务。 需要保证 TC Server 正常运行,并且配置正确的注册中心和配置中心。
- Undo Log 表: Seata 会自动创建 Undo Log 表,用于存储事务的回滚信息。 Undo Log 表的名称默认为
undo_log, 可以通过配置进行修改。 - SQL 解析: Seata 会解析 SQL 语句,提取出需要记录的数据快照。目前 Seata 支持多种数据库的 SQL 解析。
进一步优化
- 性能优化: 针对高并发场景,可以考虑使用 Seata 的 Saga 模式或 TCC 模式,以提高性能。
- 异常处理: 在全局事务中,需要对各种异常情况进行处理,例如网络异常、服务不可用等。 可以使用重试机制或补偿机制来保证事务的最终一致性。
- 监控和告警: 对 Seata 的运行状态进行监控,并设置告警规则,以便及时发现和解决问题。
关于 AT 模式的一些补充说明
- 锁机制: AT 模式依赖于数据库的行锁来实现隔离性。在提交之前,Seata 会尝试获取全局锁,以防止其他事务修改相同的数据。如果获取锁失败,则会进行重试。
- 数据源隔离: 为了防止脏读,建议使用读写分离的数据源。在全局事务中,所有读操作都应该从只读库读取数据。
- 幂等性: 为了防止重复执行,需要保证业务操作的幂等性。可以使用 Token 机制或版本号机制来实现幂等性。
- AT 模式的限制: AT 模式对 SQL 语句有一定的限制。 例如,不支持跨库事务。 在使用 AT 模式时,需要仔细阅读 Seata 的官方文档,了解其限制。
AT 模式的应用场景
AT 模式适用于以下场景:
- 对数据一致性有一定要求,但允许一定的延迟。
- 业务逻辑相对简单,不需要复杂的补偿逻辑。
- 对性能有一定要求,不希望对业务代码进行过多的侵入。
总结 AT 模式的要点
AT 模式是解决分布式事务一致性问题的一种有效方案,它具有对业务侵入性小、性能较好等优点。通过本文的讲解和代码示例,相信大家对 Seata 的 AT 模式有了更深入的理解。
分布式事务的未来发展
随着云计算和微服务架构的不断发展,分布式事务问题将变得越来越重要。未来的分布式事务解决方案将更加注重性能、可扩展性和易用性。例如,基于 Service Mesh 的分布式事务方案正在成为一种新的趋势。 期待未来出现更多更好的分布式事务解决方案,帮助我们构建更加可靠和高效的分布式系统。