分布式事务解决方案:Seata TCC/AT 模式在 Java 微服务中的落地实践
各位听众,大家好!今天我们来聊聊分布式事务这个老生常谈但又至关重要的话题,特别是如何在 Java 微服务架构中利用 Seata 的 TCC 和 AT 模式来解决数据一致性问题。
1. 分布式事务的必要性与挑战
在单体应用时代,我们可以依赖本地事务来保证数据的一致性。但随着微服务架构的兴起,一个业务流程可能涉及多个独立的服务,每个服务拥有自己的数据库,本地事务无法跨越多个数据库实例,这就带来了分布式事务问题。
为什么需要分布式事务?
假设一个电商场景,用户下单需要扣减库存、生成订单、扣减用户余额。这三个操作分别在库存服务、订单服务、账户服务中进行。如果其中任何一个服务失败,都会导致数据不一致,比如用户下单了,但库存没扣减,或者扣减了库存但订单没生成。
分布式事务面临的挑战:
- CAP 理论: 一致性(Consistency)、可用性(Availability)、分区容错性(Partition Tolerance)三者不可兼得。分布式系统中,分区容错性是必须的,因此需要在一致性和可用性之间做出权衡。
- 网络延迟和故障: 微服务之间的通信依赖网络,网络延迟、超时、丢包等问题会影响事务的执行结果。
- 数据一致性保证: 需要确保多个服务的数据要么全部成功,要么全部失败,维护数据的一致性状态。
2. Seata 简介
Seata (Simple Extensible Autonomous Transaction Architecture) 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。它提供了多种事务模式,包括 AT (Automatic Transaction)、TCC (Try-Confirm-Cancel)、SAGA 和 XA 模式。
Seata 的核心组件:
- TC (Transaction Coordinator): 事务协调器,负责全局事务的注册、提交、回滚等操作。
- TM (Transaction Manager): 事务管理器,负责开启、提交或回滚全局事务。
- RM (Resource Manager): 资源管理器,负责管理分支事务,与 TC 通信,报告分支事务的状态。
3. Seata AT 模式详解与实践
AT 模式是一种无侵入的分布式事务解决方案。它基于数据库的 undo log 实现,通过代理数据源,拦截 SQL 操作,记录修改前后的数据,在事务提交时删除 undo log,在事务回滚时利用 undo log 恢复数据。
AT 模式的原理:
- 一阶段:
- RM 代理数据源,拦截 SQL 操作。
- 执行业务 SQL,同时记录 undo log。
- 提交本地事务。
- 二阶段(提交):
- TC 通知所有 RM 提交分支事务。
- RM 删除 undo log。
- 二阶段(回滚):
- TC 通知所有 RM 回滚分支事务。
- RM 根据 undo log 恢复数据。
AT 模式的优点:
- 无侵入:对业务代码几乎没有侵入,只需要配置数据源代理即可。
- 高性能:基于数据库的 undo log 实现,性能较好。
- 简单易用:配置简单,容易上手。
AT 模式的缺点:
- 不支持强隔离性:AT 模式默认使用读未提交隔离级别,可能存在脏读问题。可以通过全局锁解决,但会牺牲性能。
- 依赖数据库:必须使用支持 undo log 的数据库。
AT 模式的实践:
3.1 环境准备
- Seata Server: 下载并启动 Seata Server。可以从 Seata 的官方网站 (https://seata.io/zh-cn/docs/user/quickstart.html) 下载最新版本。
- 数据库: 准备三个数据库,分别用于库存服务、订单服务、账户服务。
- Java 项目: 创建三个 Java Spring Boot 项目,分别对应库存服务、订单服务、账户服务。
3.2 项目依赖
在每个项目的 pom.xml
文件中添加 Seata 的依赖:
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.7.1</version> <!-- 使用最新版本 -->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.16</version> <!-- 使用最新版本 -->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version> <!-- 使用最新版本 -->
</dependency>
3.3 配置 Seata
在每个项目的 application.yml
文件中配置 Seata:
spring:
application:
name: inventory-service # 替换为实际的服务名称
seata:
enabled: true
tx-service-group: default_tx_group # 事务分组名称,所有参与全局事务的服务必须使用相同的分组名称
service:
vgroup-mapping:
default_tx_group: default # 默认的 VGroup
client:
rm:
report-success-enable: true # 成功后是否汇报,默认开启
3.4 配置数据源
使用 Seata 提供的 DataSourceProxy
代理数据源。
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
@Value("${spring.datasource.url}")
private String dbUrl;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
@Value("${spring.datasource.driver-class-name}")
private String driverClassName;
@Bean
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(dbUrl);
druidDataSource.setUsername(username);
druidDataSource.setPassword(password);
druidDataSource.setDriverClassName(driverClassName);
return druidDataSource;
}
@Primary
@Bean("dataSourceProxy")
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
@Bean
public DataSource dataSource(DataSourceProxy dataSourceProxy) {
return dataSourceProxy;
}
}
注意: @Primary
注解用于指定首选的数据源。
3.5 编写业务代码
在需要参与分布式事务的方法上添加 @GlobalTransactional
注解。
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrderService {
@Autowired
private InventoryService inventoryService;
@Autowired
private AccountService accountService;
@GlobalTransactional(rollbackFor = Exception.class)
public void createOrder(String userId, String productId, int amount) {
// 1. 扣减库存
inventoryService.decreaseInventory(productId, amount);
// 2. 创建订单
createOrderInternal(userId, productId, amount);
// 3. 扣减用户余额
accountService.decreaseAccount(userId, amount * 10); // 假设商品单价为 10
}
@Transactional
public void createOrderInternal(String userId, String productId, int amount) {
// 创建订单的逻辑
// ...
}
}
注意:
@GlobalTransactional
注解用于开启全局事务。rollbackFor = Exception.class
表示当发生任何异常时,都进行回滚。createOrderInternal
方法使用@Transactional
注解,表示它是一个本地事务。- 需要确保
inventoryService
和accountService
的方法也配置了 Seata 的数据源代理。
3.6 库存服务和账户服务的简单实现
库存服务 (InventoryService):
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class InventoryService {
@Autowired
private InventoryDao inventoryDao;
@Transactional
public void decreaseInventory(String productId, int amount) {
// 扣减库存的逻辑
Inventory inventory = inventoryDao.findByProductId(productId);
if (inventory == null || inventory.getStock() < amount) {
throw new RuntimeException("库存不足");
}
inventory.setStock(inventory.getStock() - amount);
inventoryDao.save(inventory);
}
}
账户服务 (AccountService):
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class AccountService {
@Autowired
private AccountDao accountDao;
@Transactional
public void decreaseAccount(String userId, int amount) {
// 扣减账户余额的逻辑
Account account = accountDao.findByUserId(userId);
if (account == null || account.getBalance() < amount) {
throw new RuntimeException("余额不足");
}
account.setBalance(account.getBalance() - amount);
accountDao.save(account);
}
}
3.7 测试
启动 Seata Server,库存服务、订单服务、账户服务。调用 OrderService.createOrder
方法,观察数据库中的数据变化。可以通过抛出异常来模拟事务回滚,验证 Seata 的事务一致性。
4. Seata TCC 模式详解与实践
TCC (Try-Confirm-Cancel) 模式是一种业务侵入性较强的分布式事务解决方案。它将一个业务流程拆分为三个阶段:
- Try: 尝试阶段,尝试执行业务,预留资源。
- Confirm: 确认阶段,确认执行业务,真正提交资源。
- Cancel: 取消阶段,取消执行业务,释放预留资源。
TCC 模式的原理:
- Try 阶段:
- RM 执行 Try 方法,预留资源。
- 如果 Try 成功,则事务状态为可提交。
- 如果 Try 失败,则事务状态为需要回滚。
- Confirm 阶段:
- TC 通知所有 RM 执行 Confirm 方法。
- RM 执行 Confirm 方法,真正提交资源。
- Confirm 方法必须保证幂等性。
- Cancel 阶段:
- TC 通知所有 RM 执行 Cancel 方法。
- RM 执行 Cancel 方法,释放预留资源。
- Cancel 方法必须保证幂等性。
TCC 模式的优点:
- 高性能:Try 阶段可以预留资源,减少锁冲突。
- 强一致性:可以保证数据的强一致性。
- 支持异构系统:可以跨越不同的数据库和系统。
TCC 模式的缺点:
- 业务侵入性强:需要修改业务代码,实现 Try、Confirm、Cancel 三个方法。
- 开发难度大:需要考虑各种异常情况,保证 Confirm 和 Cancel 方法的幂等性。
TCC 模式的实践:
4.1 定义 TCC 接口
定义 TCC 接口,包含 Try、Confirm、Cancel 三个方法。
import io.seata.rm.tcc.TwoPhaseBusinessAction;
public interface InventoryTccAction {
@TwoPhaseBusinessAction(name = "inventoryTccAction", commitMethod = "commitDecreaseInventory", rollbackMethod = "rollbackDecreaseInventory")
boolean prepareDecreaseInventory(String xid, String productId, int amount);
boolean commitDecreaseInventory(String xid, String productId, int amount);
boolean rollbackDecreaseInventory(String xid, String productId, int amount);
}
注意:
@TwoPhaseBusinessAction
注解用于标记 TCC 接口。name
属性用于指定 TCC 服务的名称。commitMethod
属性用于指定 Confirm 方法的名称。rollbackMethod
属性用于指定 Cancel 方法的名称。
4.2 实现 TCC 接口
实现 TCC 接口,编写 Try、Confirm、Cancel 三个方法的逻辑。
import io.seata.core.context.RootContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component("inventoryTccAction")
public class InventoryTccActionImpl implements InventoryTccAction {
@Autowired
private InventoryDao inventoryDao;
@Override
@Transactional
public boolean prepareDecreaseInventory(String xid, String productId, int amount) {
// 1. 检查库存是否足够
Inventory inventory = inventoryDao.findByProductId(productId);
if (inventory == null || inventory.getStock() < amount) {
return false; // 库存不足
}
// 2. 冻结库存
inventory.setFrozenStock(inventory.getFrozenStock() + amount);
inventory.setStock(inventory.getStock() - amount);
inventoryDao.save(inventory);
return true;
}
@Override
@Transactional
public boolean commitDecreaseInventory(String xid, String productId, int amount) {
// 1. 释放冻结库存
Inventory inventory = inventoryDao.findByProductId(productId);
if (inventory == null) {
return true; // 已经回滚过了,直接返回成功
}
inventory.setFrozenStock(inventory.getFrozenStock() - amount);
inventoryDao.save(inventory);
return true;
}
@Override
@Transactional
public boolean rollbackDecreaseInventory(String xid, String productId, int amount) {
// 1. 释放冻结库存,恢复库存
Inventory inventory = inventoryDao.findByProductId(productId);
if (inventory == null) {
return true; // 已经提交过了,直接返回成功
}
inventory.setStock(inventory.getStock() + amount);
inventory.setFrozenStock(inventory.getFrozenStock() - amount);
inventoryDao.save(inventory);
return true;
}
}
注意:
- Try 方法需要预留资源,例如冻结库存。
- Confirm 方法需要真正提交资源,例如释放冻结库存。
- Cancel 方法需要释放预留资源,例如恢复库存。
- Confirm 和 Cancel 方法必须保证幂等性。
4.3 修改业务代码
修改业务代码,调用 TCC 接口。
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private InventoryTccAction inventoryTccAction;
@Autowired
private AccountService accountService;
@GlobalTransactional(rollbackFor = Exception.class)
public void createOrder(String userId, String productId, int amount) {
// 1. 尝试扣减库存
boolean prepareResult = inventoryTccAction.prepareDecreaseInventory(RootContext.getXID(), productId, amount);
if (!prepareResult) {
throw new RuntimeException("库存不足");
}
// 2. 创建订单
createOrderInternal(userId, productId, amount);
// 3. 扣减用户余额
accountService.decreaseAccount(userId, amount * 10); // 假设商品单价为 10
}
// ...
}
注意:
- 使用
RootContext.getXID()
获取全局事务 ID。 - 在 Try 阶段失败时,需要抛出异常,触发事务回滚。
4.4 配置 TCC Bean
在 Spring 配置文件中配置 TCC Bean。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TccConfig {
@Bean
public InventoryTccAction inventoryTccAction() {
return new InventoryTccActionImpl();
}
}
4.5 测试
启动 Seata Server,库存服务、订单服务、账户服务。调用 OrderService.createOrder
方法,观察数据库中的数据变化。可以通过抛出异常来模拟事务回滚,验证 Seata 的事务一致性。
5. AT 和 TCC 模式的选择
AT 和 TCC 模式各有优缺点,选择哪种模式取决于具体的业务场景。
特性 | AT 模式 | TCC 模式 |
---|---|---|
侵入性 | 低,几乎无侵入 | 高,需要修改业务代码 |
性能 | 较高,基于 undo log | 较高,Try 阶段可以预留资源 |
一致性 | 最终一致性(默认读未提交,可配置全局锁) | 强一致性 |
适用场景 | 对一致性要求不高,业务逻辑简单的场景 | 对一致性要求高,业务逻辑复杂的场景,需要预留资源 |
开发难度 | 简单 | 复杂,需要考虑幂等性问题 |
选择建议:
- 如果业务逻辑简单,对一致性要求不高,可以选择 AT 模式。
- 如果业务逻辑复杂,对一致性要求高,需要预留资源,可以选择 TCC 模式。
- 可以根据实际情况,混合使用 AT 和 TCC 模式。
6. 其他注意事项
- 全局事务 ID: Seata 使用全局事务 ID (XID) 来标识一个全局事务。在微服务之间传递 XID,保证事务的上下文一致。
- 幂等性: Confirm 和 Cancel 方法必须保证幂等性,防止重复执行。
- 空回滚: 如果 Try 方法没有执行,Cancel 方法被执行,需要处理空回滚的情况。
- 悬挂: 如果 Confirm 或 Cancel 方法先于 Try 方法执行,需要处理悬挂的情况。
7. 总结与展望
Seata 提供了强大的分布式事务解决方案,能够帮助我们在 Java 微服务架构中解决数据一致性问题。AT 模式简单易用,适合业务逻辑简单的场景;TCC 模式能够提供强一致性,适合业务逻辑复杂的场景。在实际应用中,我们需要根据具体的业务场景选择合适的事务模式。未来,随着微服务架构的不断发展,分布式事务技术也将不断进步,为我们提供更加可靠和高效的解决方案。