Canal Adapter RabbitMQ顺序性乱序?MessageConverter自定义与单队列分区键绑定

好的,我们开始。

今天我们来探讨一个在使用 RabbitMQ 时经常遇到的问题:当使用 Canal Adapter 作为数据源,并且需要保证消息顺序性的时候,可能出现的乱序问题。 同时,我们会深入研究如何通过自定义 MessageConverter 和单队列分区键绑定来解决这个问题。

一、问题背景:Canal Adapter 和顺序性需求

Canal 是阿里巴巴开源的一款 MySQL binlog 解析工具。 它可以将 MySQL 的数据变更实时同步到其他系统,例如消息队列 RabbitMQ。 在很多业务场景下,我们需要保证数据变更的顺序性,比如:

  • 库存变更: 先有入库,后有出库,如果顺序颠倒,会导致库存数据不准确。
  • 订单状态变更: 订单状态必须按照创建 -> 支付 -> 发货 -> 完成的顺序进行,乱序会导致业务流程错误。

然而,在使用 Canal Adapter 将 binlog 数据发送到 RabbitMQ 时,由于各种因素(例如网络延迟、多线程处理、ACK机制等),消息可能会出现乱序,这给业务带来很大的风险。

二、乱序问题分析

导致 Canal Adapter + RabbitMQ 乱序的常见原因包括:

  1. Canal 并行处理: Canal 可能会使用多线程并行解析 binlog,并将不同的数据变更发送到 RabbitMQ。这可能导致来自同一个数据库表的不同变更事件以不同的速度到达 RabbitMQ。
  2. 网络延迟: 不同的消息在网络传输中可能会经历不同的延迟,导致先发出的消息后到达。
  3. RabbitMQ ACK机制: 如果 RabbitMQ 开启了 ACK 机制,消费者在处理完消息后需要向 RabbitMQ 发送 ACK。如果消费者处理消息的速度不一致,或者网络不稳定导致 ACK 丢失,RabbitMQ 可能会重新发送消息,导致消息乱序。
  4. 消费者多线程处理: 消费者如果使用多线程并行处理消息,也可能导致消息乱序。

三、解决方案:单队列分区键绑定 + 自定义 MessageConverter

为了解决上述问题,我们需要采取一些策略来保证消息的顺序性。 一个比较有效的方案是:

  1. 单队列: 所有消息都发送到同一个队列,避免由于队列之间的竞争而导致乱序。
  2. 分区键绑定 (Partitioning Key): 使用一个能够唯一标识数据变更顺序的字段作为分区键,确保具有相同分区键的消息被发送到同一个 RabbitMQ 分区 (如果使用了 RabbitMQ 集群)。这里我们假设rabbitmq是单实例,所以分区键就简化为顺序键。
  3. 自定义 MessageConverter: 自定义 MessageConverter,将分区键信息添加到消息的 header 中,方便 RabbitMQ 根据分区键进行路由。
  4. 消费者单线程处理: 保证消费者使用单线程按照接收到的顺序处理消息。

3.1 单队列的配置

在 RabbitMQ 中,我们只需要创建一个队列即可,所有的canal消息都会发送到这个队列。

@Configuration
public class RabbitConfig {

    @Bean
    public Queue orderQueue() {
        return new Queue("order.queue", true);
    }

    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange");
    }

    @Bean
    public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with("order.routing.key");
    }
}

3.2 自定义 MessageConverter

MessageConverter 负责将 Java 对象转换为 RabbitMQ 消息,以及将 RabbitMQ 消息转换为 Java 对象。 我们需要自定义 MessageConverter,将分区键信息添加到消息的 header 中。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import java.util.HashMap;
import java.util.Map;

public class CustomMessageConverter extends AbstractMessageConverter {

    private static final String PARTITION_KEY_HEADER = "partitionKey";

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        if (object instanceof OrderChangeEvent) {
            OrderChangeEvent event = (OrderChangeEvent) object;
            String partitionKey = String.valueOf(event.getOrderId()); // 使用 orderId 作为分区键

            messageProperties.setHeader(PARTITION_KEY_HEADER, partitionKey);
            messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); // 设置消息类型为JSON
            try {
                byte[] bytes = new ObjectMapper().writeValueAsBytes(event);
                return new Message(bytes, messageProperties);
            } catch (JsonProcessingException e) {
                throw new MessageConversionException("Failed to convert object to JSON", e);
            }

        }
        return null; // 或者抛出异常,取决于你的业务需求
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        try {
            return new ObjectMapper().readValue(message.getBody(), OrderChangeEvent.class);
        } catch (IOException e) {
            throw new MessageConversionException("Failed to convert message to object", e);
        }
    }

}

这个 CustomMessageConverter 类做了以下几件事:

  • createMessage() 方法:
    • 判断传入的对象是否是 OrderChangeEvent 类型。
    • 获取 OrderChangeEvent 对象的 orderId 作为分区键。
    • 将分区键添加到消息的 header 中,header 的 key 为 "partitionKey"。
    • OrderChangeEvent 对象转换为 JSON 字符串,并将其作为消息的 body。
  • fromMessage() 方法:
    • 将 RabbitMQ 消息的 body 解析为 OrderChangeEvent 对象。

3.3 配置 RabbitTemplate 使用自定义 MessageConverter

我们需要配置 RabbitTemplate 使用我们自定义的 MessageConverter。

@Configuration
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new CustomMessageConverter());
        return rabbitTemplate;
    }
}

这样,在使用 rabbitTemplate.convertAndSend() 方法发送消息时,就会使用我们自定义的 CustomMessageConverter 进行消息转换。

3.4 Canal Adapter 配置

你需要配置 Canal Adapter,使其能够将数据变更事件转换为 OrderChangeEvent 对象,并发送到 RabbitMQ。 这里假设你已经配置好了Canal,能够监听到mysql的数据变更。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient {

    private CanalConnector connector;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String DESTINATION = "example";
    private static final String ADDRESS = "127.0.0.1";
    private static final int PORT = 11111;
    private static final String USERNAME = "";
    private static final String PASSWORD = "";

    @PostConstruct
    public void start() {
        connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ADDRESS,
                PORT), DESTINATION, USERNAME, PASSWORD);

        try {
            connector.connect();
            connector.subscribe(".*\..*"); // 订阅所有库的所有表
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(100); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    @PreDestroy
    public void stop() {
        if (connector != null) {
            connector.disconnect();
        }
    }

    private void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                CanalEntry.RowChange rowChange;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }

                CanalEntry.EventType eventType = rowChange.getEventType();

                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType == CanalEntry.EventType.UPDATE) {
                        // 处理更新事件
                        handleUpdateEvent(entry.getHeader().getTableName(), rowData.getAfterColumnsList());
                    } else if (eventType == CanalEntry.EventType.INSERT) {
                        // 处理插入事件
                        handleInsertEvent(entry.getHeader().getTableName(), rowData.getAfterColumnsList());
                    } else if (eventType == CanalEntry.EventType.DELETE) {
                         //处理删除事件
                         handleDeleteEvent(entry.getHeader().getTableName(), rowData.getBeforeColumnsList());
                    }
                }
            }
        }
    }

    private void handleInsertEvent(String tableName, List<CanalEntry.Column> columns) {
        if ("orders".equalsIgnoreCase(tableName)) { // 假设orders表是需要关注的表
            OrderChangeEvent event = convertColumnsToOrderChangeEvent(columns);
            rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", event);
        }
    }

    private void handleUpdateEvent(String tableName, List<CanalEntry.Column> columns) {
         if ("orders".equalsIgnoreCase(tableName)) { // 假设orders表是需要关注的表
            OrderChangeEvent event = convertColumnsToOrderChangeEvent(columns);
            rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", event);
        }

    }

    private void handleDeleteEvent(String tableName, List<CanalEntry.Column> columns) {
        if ("orders".equalsIgnoreCase(tableName)) { // 假设orders表是需要关注的表
            OrderChangeEvent event = convertColumnsToOrderChangeEvent(columns);
            rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", event);
        }
    }

    private OrderChangeEvent convertColumnsToOrderChangeEvent(List<CanalEntry.Column> columns) {
        OrderChangeEvent event = new OrderChangeEvent();
        for (CanalEntry.Column column : columns) {
            if ("order_id".equalsIgnoreCase(column.getName())) {
                event.setOrderId(Long.parseLong(column.getValue()));
            } else if ("order_status".equalsIgnoreCase(column.getName())) {
                event.setOrderStatus(column.getValue());
            } // 其他字段的转换
        }
        return event;
    }

}

这段代码的关键在于 convertColumnsToOrderChangeEvent() 方法,它负责将从 Canal 获取的 Column 数据转换为 OrderChangeEvent 对象。 然后使用 rabbitTemplate.convertAndSend() 方法将消息发送到 RabbitMQ。 rabbitTemplate 会使用我们配置的 CustomMessageConverterOrderChangeEvent 对象转换为 RabbitMQ 消息,并将 orderId 作为分区键添加到消息的 header 中。

3.5 消费者端单线程处理

消费者端需要配置单线程处理消息, 避免并发处理带来的乱序。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer {

    @RabbitListener(queues = "order.queue", concurrency = "1") // concurrency = "1" 保证单线程消费
    public void receiveOrder(OrderChangeEvent event) {
        // 处理订单事件
        System.out.println("Received order event: " + event);
        // 你的业务逻辑
    }
}

@RabbitListener(queues = "order.queue", concurrency = "1") 注解中的 concurrency = "1" 表示使用单线程处理消息。 这样可以保证消息按照接收到的顺序进行处理。

四、OrderChangeEvent 类的定义

import java.io.Serializable;

public class OrderChangeEvent implements Serializable {

    private Long orderId;
    private String orderStatus;

    // Getters and setters

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public String getOrderStatus() {
        return orderStatus;
    }

    public void setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
    }

    @Override
    public String toString() {
        return "OrderChangeEvent{" +
                "orderId=" + orderId +
                ", orderStatus='" + orderStatus + ''' +
                '}';
    }
}

OrderChangeEvent 类用于封装订单变更事件的数据。 它包含 orderIdorderStatus 两个字段,分别表示订单 ID 和订单状态。 orderId 将作为分区键,用于保证具有相同 orderId 的消息被顺序处理。

五、代码总结

以下是完整的示例代码结构:

├── src
│   ├── main
│   │   ├── java
│   │   │   ├── com.example
│   │   │   │   ├── CanalClient.java
│   │   │   │   ├── CustomMessageConverter.java
│   │   │   │   ├── OrderChangeEvent.java
│   │   │   │   ├── OrderConsumer.java
│   │   │   │   ├── RabbitConfig.java
│   │   │   │   ├── RabbitTemplateConfig.java
│   │   │   │   └── Application.java
│   │   ├── resources
│   │   │   └── application.properties
│   └── test
│       └── java
│           └── com.example
│               └── DemoApplicationTests.java

六、测试与验证

  1. 启动 Canal Adapter 和 RabbitMQ。
  2. 在 MySQL 数据库中对 orders 表进行数据变更 (INSERT, UPDATE, DELETE)。
  3. 观察 RabbitMQ 消费者端是否按照数据变更的顺序处理消息。

可以通过以下方式验证顺序性:

  • 记录消息接收的时间戳: 在消费者端记录每条消息接收到的时间戳,并按照 orderId 分组,检查同一组消息的时间戳是否递增。
  • 打印消息内容: 在消费者端打印每条消息的内容,观察消息的顺序是否与数据库变更的顺序一致。

七、注意事项

  • 分区键的选择: 分区键的选择非常重要。 它必须能够唯一标识数据变更的顺序。 通常情况下,可以使用数据库表的主键作为分区键。
  • 消费者端处理能力: 消费者端必须具有足够的处理能力,才能及时处理消息,避免消息堆积。
  • 异常处理: 在 Canal Adapter 和消费者端都需要进行完善的异常处理,确保在发生异常时能够及时处理,避免数据丢失或乱序。
  • Canal配置: Canal需要正确配置,确保能够准确捕获MySQL的binlog变更。
  • 消息重试机制: 结合消息的重试机制,确保消息能够被成功消费,同时避免重试导致乱序。建议使用死信队列来处理消费失败的消息。

消息顺序的保证需要多方面的配合

从canal的binlog解析,到消息的发送,再到消费端的消费,每一个环节都不能出错,才能保证消息的顺序性。

八、总结

通过自定义 MessageConverter 和单队列分区键绑定,我们可以有效地解决 Canal Adapter + RabbitMQ 场景下的消息乱序问题。 这种方案的核心思想是将具有相同分区键的消息发送到同一个 RabbitMQ 分区,并保证消费者端单线程处理消息,从而保证消息的顺序性。 这种方案可以应用于各种需要保证数据变更顺序性的业务场景,例如库存管理、订单状态变更等。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注