基于 Seata 实现 Spring Boot 应用的分布式事务

Seata:拯救你 Spring Boot 应用的分布式事务,告别数据不一致的烦恼

大家好,我是你们的老朋友,一个在代码堆里摸爬滚打多年的老码农。今天,咱们来聊聊一个让无数程序员夜不能寐的话题:分布式事务

想象一下这样的场景:你正在开发一个电商平台,用户下单需要同时扣减商品库存、生成订单、扣除账户余额。这些操作分布在不同的微服务中,要是其中任何一个环节失败,你的数据就会出现不一致,用户付了钱没收到货,或者库存扣了钱没扣,这可就摊上大事儿了!

传统的 ACID 事务在单体应用中游刃有余,但在分布式系统中就显得力不从心了。为了解决这个问题,各种分布式事务解决方案应运而生,今天我们要聊的就是其中的佼佼者:Seata

什么是 Seata?

Seata (Simple Extensible Autonomous Transaction Architecture) 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。简单来说,Seata 就像一个事务协调员,负责协调各个微服务之间的事务,保证最终的数据一致性。

Seata 提供多种事务模式,包括:

  • AT 模式 (Automatic Transaction): 自动事务模式,对业务代码侵入性最小,也是 Seata 推荐的模式。
  • TCC 模式 (Try-Confirm-Cancel): 补偿事务模式,需要业务代码实现 Try、Confirm 和 Cancel 三个阶段的操作。
  • SAGA 模式 (Long-Running Action): 长事务模式,适用于业务流程较长,无需强一致性的场景。
  • XA 模式 (eXtended Architecture): 基于 XA 协议的事务模式,需要数据库支持 XA 协议。

今天,我们主要聚焦于 AT 模式,因为它最简单易用,也最适合大多数 Spring Boot 应用。

Seata AT 模式的工作原理

AT 模式的核心思想是两阶段提交协议 (Two-Phase Commit, 2PC) 的改进版。它将事务分为两个阶段:

  1. 一阶段 (Prepare): 各个参与者执行本地事务,并记录 undo log (用于回滚的数据),然后向 TC (Transaction Coordinator,事务协调器) 注册分支事务。
  2. 二阶段 (Commit/Rollback): TC 根据全局事务的结果,通知各个参与者提交或回滚本地事务。

AT 模式的巧妙之处在于,它将 2PC 的阻塞阶段进行了优化,通过 undo log 实现本地事务的回滚,从而避免了长时间的资源锁定。

具体流程如下:

  1. 业务服务: 发起全局事务请求,Seata 拦截请求,为本次事务生成一个全局唯一的 XID (Global Transaction ID)。
  2. 数据源代理: Seata 通过数据源代理,拦截 SQL 操作,分析 SQL 中涉及的数据,生成 undo log,并将其保存在 undo_log 表中。
  3. TC: TC 负责全局事务的协调,记录全局事务的状态,并根据全局事务的结果,通知各个参与者提交或回滚本地事务。
  4. 二阶段提交:
    • 提交: 如果全局事务成功,TC 通知各个参与者提交本地事务。由于本地事务在一阶段已经提交,所以二阶段只需要删除 undo_log 表中的记录即可。
    • 回滚: 如果全局事务失败,TC 通知各个参与者回滚本地事务。Seata 根据 undo_log 表中的记录,恢复数据到原始状态。

如何在 Spring Boot 应用中使用 Seata AT 模式?

接下来,我们通过一个简单的例子,演示如何在 Spring Boot 应用中使用 Seata AT 模式。

假设我们有两个微服务:

  • 订单服务 (order-service): 负责创建订单。
  • 库存服务 (stock-service): 负责扣减商品库存。

我们要在创建订单的同时,扣减商品库存,并保证这两个操作要么都成功,要么都失败。

1. 准备工作

  • 安装 Seata Server: 下载 Seata Server 并启动,可以参考 Seata 官方文档。
  • 创建数据库: 为订单服务和库存服务创建数据库,并创建 undo_log 表。

undo_log 表的 SQL 脚本如下:

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2. 添加依赖

order-servicestock-servicepom.xml 文件中添加 Seata 的依赖:

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>最新版本</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>最新版本</version>
</dependency>

3. 配置 Seata

order-servicestock-serviceapplication.yml 文件中添加 Seata 的配置:

seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: default_tx_group
  service:
    vgroup-mapping:
      default_tx_group: default
  client:
    rm:
      report-success-enable: true
  config:
    type: file
    file:
      name: seata.conf # 配置文件路径
  registry:
    type: nacos # 注册中心类型,例如 nacos、eureka、zk 等
    nacos:
      server-addr: 127.0.0.1:8848 # Nacos 地址

注意:

  • application-id 需要和 spring.application.name 保持一致。
  • tx-service-group 是事务组的名称,所有参与同一个全局事务的服务都需要使用相同的事务组。
  • registry.type 根据你使用的注册中心进行配置。
  • config.type 指定配置中心类型。

4. 数据源代理

使用 Seata 提供的数据源代理 DataSourceProxy 替换 Spring Boot 默认的数据源。

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Primary
    @Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

5. 编写业务代码

order-service 中创建一个订单:

import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class OrderService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private RestTemplate restTemplate;

    @GlobalTransactional(rollbackFor = Exception.class)
    public void createOrder(String userId, String productId, int amount) {
        // 1. 创建订单
        String sql = "insert into orders (user_id, product_id, amount) values (?, ?, ?)";
        jdbcTemplate.update(sql, userId, productId, amount);

        // 2. 扣减库存
        String url = "http://stock-service/stock/decrease?productId=" + productId + "&amount=" + amount;
        restTemplate.getForObject(url, String.class);

        // 模拟异常
        // if (amount > 10) {
        //     throw new RuntimeException("Amount too large");
        // }
    }
}

stock-service 中扣减库存:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class StockService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void decreaseStock(String productId, int amount) {
        String sql = "update stock set amount = amount - ? where product_id = ?";
        jdbcTemplate.update(sql, amount, productId);

        // 检查库存是否足够
        int currentAmount = jdbcTemplate.queryForObject("select amount from stock where product_id = ?", Integer.class, productId);
        if (currentAmount < 0) {
            throw new RuntimeException("Stock not enough");
        }
    }
}

注意:

  • order-servicecreateOrder 方法上添加 @GlobalTransactional 注解,表示这是一个全局事务。
  • rollbackFor = Exception.class 表示当发生任何异常时,都进行回滚。
  • stock-servicedecreaseStock 方法不需要添加任何注解,Seata 会自动将其纳入到全局事务中。

6. 启动服务并测试

启动 order-servicestock-service,然后调用 order-servicecreateOrder 方法。

如果一切顺利,你会在 orders 表中看到一条新的订单记录,并且 stock 表中对应商品的库存数量会减少。

如果你取消 order-service 中的注释,模拟一个异常,你会发现订单创建失败,并且库存也不会被扣减。

进阶用法

  • 全局锁: Seata 提供了全局锁机制,可以解决并发场景下的数据一致性问题。
  • 事务传播: Seata 支持多种事务传播行为,可以灵活控制事务的范围。
  • 自定义 undo log: 如果默认的 undo log 无法满足你的需求,你可以自定义 undo log。
  • 监控和告警: Seata 提供了丰富的监控指标和告警功能,可以帮助你及时发现和解决问题。

总结

Seata 是一款功能强大、简单易用的分布式事务解决方案,可以帮助你轻松解决微服务架构下的数据一致性问题。希望这篇文章能够帮助你入门 Seata,并在你的项目中发挥它的威力。

表格总结:

特性 描述
易用性 AT 模式对业务代码侵入性小,易于集成
性能 AT 模式采用异步提交,减少了事务的阻塞时间
可靠性 Seata 提供了完善的事务协调机制,保证了数据的一致性
兼容性 Seata 支持多种数据库和注册中心,具有良好的兼容性
监控和告警 Seata 提供了丰富的监控指标和告警功能

代码示例总结:

  • undo_log 表 SQL 脚本: 用于存储 undo log,在回滚时恢复数据。
  • pom.xml 文件: 添加 Seata 和 Druid 的依赖。
  • application.yml 文件: 配置 Seata 的各项参数。
  • DataSourceConfig.java 文件: 使用 Seata 的 DataSourceProxy 替换 Spring Boot 默认的数据源。
  • OrderService.java 文件: 创建订单,并调用库存服务扣减库存,使用 @GlobalTransactional 注解开启全局事务。
  • StockService.java 文件: 扣减库存,无需添加任何 Seata 注解。

希望这篇文章能帮助你更好地理解和使用 Seata。如果你有任何问题,欢迎在评论区留言,我们一起探讨。祝大家编程愉快!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注