Seata Saga状态机编排复杂?Compensable注解与事件驱动补偿框架

好的,下面开始我的讲座。

Seata Saga状态机编排复杂?Compensable注解与事件驱动补偿框架

各位朋友,大家好!今天我们来聊聊Seata Saga模式下,状态机编排的复杂性,以及如何通过@Compensable注解和事件驱动补偿框架来简化和优化这个过程。

Saga模式与状态机编排的挑战

Saga模式是解决分布式事务的一种常见方案,它将一个大的事务拆分成多个本地事务,并通过一系列补偿操作来保证最终一致性。Seata Saga模式提供了状态机引擎,允许我们以状态图的方式来定义Saga流程。

然而,状态机编排在复杂业务场景下可能会变得异常复杂,主要面临以下挑战:

  1. 状态爆炸: 随着业务逻辑的增加,状态机的状态数量和转换关系会呈指数级增长,导致状态图难以维护和理解。
  2. 复杂依赖: 各个本地事务之间可能存在复杂的依赖关系,需要在状态机中精确地表达这些依赖,容易出错。
  3. 补偿逻辑蔓延: 每个状态都需要定义相应的补偿操作,这些补偿逻辑散落在状态机的各个状态中,难以集中管理和复用。
  4. 回滚路径复杂: 当某个本地事务失败时,需要根据状态机的当前状态选择正确的回滚路径,这在复杂状态机中可能非常困难。

Compensable注解的威力

Seata提供的@Compensable注解可以极大地简化Saga模式的开发,尤其是当我们需要在一个本地事务中执行多个操作,并且这些操作都需要进行补偿时。

@Compensable注解可以作用在方法上,表示该方法是一个可补偿的业务操作。它需要指定confirmMethodcancelMethod,分别用于确认和取消(补偿)该操作。

代码示例:

@Service
public class OrderService {

    @Autowired
    private StockService stockService;

    @Autowired
    private AccountService accountService;

    @Compensable(confirmMethod = "confirmCreateOrder", cancelMethod = "cancelCreateOrder")
    @Transactional
    public boolean createOrder(String userId, String productId, int amount) {
        // 1. 扣减库存
        boolean stockResult = stockService.decreaseStock(productId, amount);
        if (!stockResult) {
            throw new RuntimeException("扣减库存失败");
        }

        // 2. 扣减账户余额
        boolean accountResult = accountService.decreaseAccount(userId, amount * 10); // 假设单价是10
        if (!accountResult) {
            throw new RuntimeException("扣减账户余额失败");
        }

        // 3. 创建订单 (假设订单服务本身是可靠的,不需要补偿)
        createOrderInDatabase(userId, productId, amount);

        return true;
    }

    public boolean confirmCreateOrder(String userId, String productId, int amount) {
        // 确认创建订单,例如更新订单状态为已完成
        updateOrderStatusToConfirmed(userId, productId, amount);
        return true;
    }

    public boolean cancelCreateOrder(String userId, String productId, int amount) {
        // 补偿创建订单,例如:
        // 1. 增加库存
        stockService.increaseStock(productId, amount);
        // 2. 增加账户余额
        accountService.increaseAccount(userId, amount * 10);
        // 3. 删除订单
        deleteOrder(userId, productId, amount);

        return true;
    }

    private void createOrderInDatabase(String userId, String productId, int amount) {
        // 创建订单的数据库操作
        // 省略实现
    }

    private void updateOrderStatusToConfirmed(String userId, String productId, int amount) {
        // 更新订单状态的数据库操作
        // 省略实现
    }

    private void deleteOrder(String userId, String productId, int amount) {
        // 删除订单的数据库操作
        // 省略实现
    }
}

@Service
class StockService {
    public boolean decreaseStock(String productId, int amount) {
        // 扣减库存的逻辑
        System.out.println("扣减库存,productId: " + productId + ", amount: " + amount);
        return true;
    }

    public boolean increaseStock(String productId, int amount) {
        // 增加库存的逻辑 (补偿)
        System.out.println("增加库存,productId: " + productId + ", amount: " + amount);
        return true;
    }
}

@Service
class AccountService {
    public boolean decreaseAccount(String userId, int amount) {
        // 扣减账户余额的逻辑
        System.out.println("扣减账户余额,userId: " + userId + ", amount: " + amount);
        return true;
    }

    public boolean increaseAccount(String userId, int amount) {
        // 增加账户余额的逻辑 (补偿)
        System.out.println("增加账户余额,userId: " + userId + ", amount: " + amount);
        return true;
    }
}

在这个例子中,createOrder方法被@Compensable注解标记,并指定了confirmCreateOrdercancelCreateOrder作为确认和取消方法。Seata会自动管理这些方法的调用,并在需要时进行补偿。

@Compensable注解的优势:

  • 简化代码: 无需手动编写状态转换逻辑和补偿调用代码,Seata会自动处理这些细节。
  • 提高可读性: 业务逻辑更加清晰,更容易理解和维护。
  • 降低出错率: 减少了手动编写补偿逻辑的出错机会。
  • 事务上下文传递: Seata会自动传递事务上下文到confirmMethodcancelMethod,保证事务的正确执行。

事件驱动补偿框架的设计与实现

虽然@Compensable注解简化了单个服务的补偿逻辑,但在跨多个服务的Saga流程中,仍然需要一种机制来协调这些服务的补偿操作。事件驱动的补偿框架是一种有效的解决方案。

核心思想:

当某个服务需要进行补偿时,它会发布一个补偿事件,其他服务监听这些事件,并执行相应的补偿操作。

组件设计:

  • 事件总线(Event Bus): 负责事件的发布和订阅。可以使用Kafka、RabbitMQ等消息队列,也可以使用Spring的ApplicationEventPublisher
  • 事件发布器(Event Publisher): 负责将需要补偿的事件发布到事件总线上。
  • 事件监听器(Event Listener): 监听事件总线上的补偿事件,并执行相应的补偿操作。
  • 补偿事件(Compensation Event): 包含需要补偿的信息,例如订单ID、用户ID、商品ID等。
  • 事务ID存储: 存储每个本地事务的ID,用于幂等性控制。

实现步骤:

  1. 定义补偿事件: 定义需要发布的补偿事件,例如OrderCancelEventStockIncreaseEvent等。

    @Data
    public class OrderCancelEvent {
        private String orderId;
        private String userId;
        private String productId;
        private int amount;
    }
    
    @Data
    public class StockIncreaseEvent {
        private String productId;
        private int amount;
    }
    
    @Data
    public class AccountIncreaseEvent {
        private String userId;
        private int amount;
    }
  2. 发布补偿事件:cancelMethod中发布补偿事件。

    @Service
    public class OrderService {
    
        @Autowired
        private ApplicationEventPublisher eventPublisher;
    
        @Autowired
        private StockService stockService;
    
        @Autowired
        private AccountService accountService;
    
        @Compensable(confirmMethod = "confirmCreateOrder", cancelMethod = "cancelCreateOrder")
        @Transactional
        public boolean createOrder(String userId, String productId, int amount) {
            // 1. 扣减库存
            boolean stockResult = stockService.decreaseStock(productId, amount);
            if (!stockResult) {
                throw new RuntimeException("扣减库存失败");
            }
    
            // 2. 扣减账户余额
            boolean accountResult = accountService.decreaseAccount(userId, amount * 10); // 假设单价是10
            if (!accountResult) {
                throw new RuntimeException("扣减账户余额失败");
            }
    
            // 3. 创建订单 (假设订单服务本身是可靠的,不需要补偿)
            createOrderInDatabase(userId, productId, amount);
    
            return true;
        }
    
        public boolean confirmCreateOrder(String userId, String productId, int amount) {
            // 确认创建订单,例如更新订单状态为已完成
            updateOrderStatusToConfirmed(userId, productId, amount);
            return true;
        }
    
        public boolean cancelCreateOrder(String userId, String productId, int amount) {
            // 1. 发布订单取消事件
            OrderCancelEvent event = new OrderCancelEvent();
            event.setOrderId(generateOrderId(userId, productId, amount)); // 假设有生成订单ID的逻辑
            event.setUserId(userId);
            event.setProductId(productId);
            event.setAmount(amount);
            eventPublisher.publishEvent(event);
    
            // 3. 删除订单
            deleteOrder(userId, productId, amount);
    
            return true;
        }
    
        private String generateOrderId(String userId, String productId, int amount) {
            //  生成订单ID的逻辑
            return  "order-" + userId + "-" + productId + "-" + amount;
        }
    
        private void createOrderInDatabase(String userId, String productId, int amount) {
            // 创建订单的数据库操作
            // 省略实现
        }
    
        private void updateOrderStatusToConfirmed(String userId, String productId, int amount) {
            // 更新订单状态的数据库操作
            // 省略实现
        }
    
        private void deleteOrder(String userId, String productId, int amount) {
            // 删除订单的数据库操作
            // 省略实现
        }
    }
  3. 监听补偿事件: 在其他服务中监听补偿事件,并执行相应的补偿操作。

    @Service
    public class StockService {
    
        @Autowired
        private ApplicationEventPublisher eventPublisher;
    
        @EventListener
        @Transactional
        public void onOrderCancelEvent(OrderCancelEvent event) {
            // 1. 增加库存
            increaseStock(event.getProductId(), event.getAmount());
    
            // 2. 发布库存增加事件 (可选,如果其他服务需要监听库存增加事件)
            StockIncreaseEvent stockIncreaseEvent = new StockIncreaseEvent();
            stockIncreaseEvent.setProductId(event.getProductId());
            stockIncreaseEvent.setAmount(event.getAmount());
            eventPublisher.publishEvent(stockIncreaseEvent);
        }
    
        public boolean decreaseStock(String productId, int amount) {
            // 扣减库存的逻辑
            System.out.println("扣减库存,productId: " + productId + ", amount: " + amount);
            return true;
        }
    
        public boolean increaseStock(String productId, int amount) {
            // 增加库存的逻辑 (补偿)
            System.out.println("增加库存,productId: " + productId + ", amount: " + amount);
            return true;
        }
    }
    
    @Service
    public class AccountService {
    
        @Autowired
        private ApplicationEventPublisher eventPublisher;
    
        @EventListener
        @Transactional
        public void onOrderCancelEvent(OrderCancelEvent event) {
            // 1. 增加账户余额
            increaseAccount(event.getUserId(), event.getAmount() * 10); // 假设单价是10
    
            // 2. 发布账户余额增加事件 (可选,如果其他服务需要监听账户余额增加事件)
            AccountIncreaseEvent accountIncreaseEvent = new AccountIncreaseEvent();
            accountIncreaseEvent.setUserId(event.getUserId());
            accountIncreaseEvent.setAmount(event.getAmount() * 10);
            eventPublisher.publishEvent(accountIncreaseEvent);
        }
    
        public boolean decreaseAccount(String userId, int amount) {
            // 扣减账户余额的逻辑
            System.out.println("扣减账户余额,userId: " + userId + ", amount: " + amount);
            return true;
        }
    
        public boolean increaseAccount(String userId, int amount) {
            // 增加账户余额的逻辑 (补偿)
            System.out.println("增加账户余额,userId: " + userId + ", amount: " + amount);
            return true;
        }
    }
  4. 幂等性控制: 为了防止重复执行补偿操作,需要在事件监听器中进行幂等性控制。可以使用数据库表来记录已经处理过的事件ID。

    @Service
    public class StockService {
    
        @Autowired
        private ApplicationEventPublisher eventPublisher;
    
        @Autowired
        private CompensateRecordService compensateRecordService;
    
        @EventListener
        @Transactional
        public void onOrderCancelEvent(OrderCancelEvent event) {
    
            // 1. 幂等性判断
            if (compensateRecordService.isCompensated(event.getOrderId(), "stock_increase")) {
                System.out.println("库存已经增加过,忽略该事件");
                return;
            }
    
            // 2. 增加库存
            increaseStock(event.getProductId(), event.getAmount());
    
            // 3. 记录补偿信息
            compensateRecordService.recordCompensated(event.getOrderId(), "stock_increase");
    
            // 4. 发布库存增加事件 (可选,如果其他服务需要监听库存增加事件)
            StockIncreaseEvent stockIncreaseEvent = new StockIncreaseEvent();
            stockIncreaseEvent.setProductId(event.getProductId());
            stockIncreaseEvent.setAmount(event.getAmount());
            eventPublisher.publishEvent(stockIncreaseEvent);
        }
    
        public boolean decreaseStock(String productId, int amount) {
            // 扣减库存的逻辑
            System.out.println("扣减库存,productId: " + productId + ", amount: " + amount);
            return true;
        }
    
        public boolean increaseStock(String productId, int amount) {
            // 增加库存的逻辑 (补偿)
            System.out.println("增加库存,productId: " + productId + ", amount: " + amount);
            return true;
        }
    }
    
    @Service
    class CompensateRecordService {
    
        // 假设CompensateRecord是一个JPA entity
        @Autowired
        private CompensateRecordRepository compensateRecordRepository;
    
        public boolean isCompensated(String orderId, String compensateType) {
            return compensateRecordRepository.existsByOrderIdAndCompensateType(orderId, compensateType);
        }
    
        public void recordCompensated(String orderId, String compensateType) {
            CompensateRecord record = new CompensateRecord();
            record.setOrderId(orderId);
            record.setCompensateType(compensateType);
            record.setCompensateTime(new Date());
            compensateRecordRepository.save(record);
        }
    }

事件驱动补偿框架的优势:

  • 解耦: 服务之间通过事件进行通信,减少了服务之间的依赖。
  • 可扩展: 可以方便地添加新的服务和补偿逻辑。
  • 异步: 补偿操作是异步执行的,提高了系统的吞吐量。
  • 可观察性: 可以通过监控事件总线来观察Saga流程的执行情况。

Seata状态机与@Compensable和事件驱动的结合

可以将Seata状态机与@Compensable注解和事件驱动补偿框架结合起来,以实现更灵活和可控的Saga流程。

  • 状态机负责流程编排: 使用状态机定义Saga流程的各个状态和转换关系。
  • @Compensable注解简化本地事务: 使用@Compensable注解标记可补偿的业务操作。
  • 事件驱动补偿框架处理跨服务补偿: 使用事件驱动补偿框架来协调跨多个服务的补偿操作。

状态机示例:

<?xml version="1.0" encoding="UTF-8"?>
<statemachine xmlns="http://seata.io/schema/statemachine"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://seata.io/schema/statemachine http://seata.io/schema/statemachine/statemachine.xsd"
              id="createOrderSaga" start="startCreateOrder">

    <state id="startCreateOrder" name="开始创建订单" type="serviceTask" serviceName="orderService" serviceMethod="createOrder" next="decreaseStock">
        <variables>
            <variable name="userId" type="string" scope="in" value="user123"/>
            <variable name="productId" type="string" scope="in" value="product456"/>
            <variable name="amount" type="int" scope="in" value="1"/>
        </variables>
    </state>

    <state id="decreaseStock" name="扣减库存" type="serviceTask" serviceName="stockService" serviceMethod="decreaseStock" next="decreaseAccount" compensateStateId="compensateStock">
        <variables>
            <variable name="productId" type="string" scope="in" value="product456"/>
            <variable name="amount" type="int" scope="in" value="1"/>
        </variables>
    </state>

    <state id="decreaseAccount" name="扣减账户余额" type="serviceTask" serviceName="accountService" serviceMethod="decreaseAccount" next="endCreateOrder" compensateStateId="compensateAccount">
        <variables>
            <variable name="userId" type="string" scope="in" value="user123"/>
            <variable name="amount" type="int" scope="in" value="10"/>
        </variables>
    </state>

    <state id="endCreateOrder" name="订单创建完成" type="end"/>

    <state id="compensateStock" name="补偿库存" type="compensateTask" serviceName="stockService" serviceMethod="increaseStock" next="compensateAccount">
        <variables>
            <variable name="productId" type="string" scope="in" value="product456"/>
            <variable name="amount" type="int" scope="in" value="1"/>
        </variables>
    </state>

    <state id="compensateAccount" name="补偿账户余额" type="compensateTask" serviceName="accountService" serviceMethod="increaseAccount" next="compensateEnd">
        <variables>
            <variable name="userId" type="string" scope="in" value="user123"/>
            <variable name="amount" type="int" scope="in" value="10"/>
        </variables>
    </state>

    <state id="compensateEnd" name="补偿完成" type="end"/>

</statemachine>

在这个状态机中,orderService.createOrder方法使用了@Compensable注解,而stockService.decreaseStockaccountService.decreaseAccount方法则直接在状态机中定义了补偿状态。这种混合使用的方式可以根据实际情况选择最合适的方案。

表格对比:

特性 @Compensable注解 事件驱动补偿框架 状态机编排
适用场景 单个服务的补偿逻辑 跨多个服务的补偿逻辑 复杂流程的编排
优点 简单易用,减少代码 解耦,可扩展,异步 可视化,流程清晰
缺点 耦合性高 实现复杂,需要消息队列 状态机复杂,维护困难

最佳实践与注意事项

  1. 合理选择补偿策略: 并非所有操作都需要补偿,需要根据业务场景选择合适的补偿策略。
  2. 保证补偿操作的幂等性: 补偿操作必须是幂等的,以防止重复执行。
  3. 监控Saga流程: 需要监控Saga流程的执行情况,及时发现和处理异常。
  4. 考虑事务隔离性: Saga模式是一种最终一致性方案,需要考虑事务隔离性问题。
  5. 明确补偿范围: 补偿操作的范围应该明确,避免过度补偿或补偿不足。
  6. 数据一致性校验: 在补偿完成后,需要进行数据一致性校验,确保数据最终一致。

总结一下

今天我们讨论了Seata Saga模式下状态机编排的挑战,并介绍了如何通过@Compensable注解和事件驱动补偿框架来简化和优化这个过程。@Compensable注解简化了单个服务的补偿逻辑,而事件驱动补偿框架则协调了跨多个服务的补偿操作。将Seata状态机与@Compensable注解和事件驱动补偿框架结合起来,可以实现更灵活和可控的Saga流程。希望今天的分享能对大家有所帮助。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注