好的,我们开始。
今天我们来探讨一个在使用 RabbitMQ 时经常遇到的问题:当使用 Canal Adapter 作为数据源,并且需要保证消息顺序性的时候,可能出现的乱序问题。 同时,我们会深入研究如何通过自定义 MessageConverter 和单队列分区键绑定来解决这个问题。
一、问题背景:Canal Adapter 和顺序性需求
Canal 是阿里巴巴开源的一款 MySQL binlog 解析工具。 它可以将 MySQL 的数据变更实时同步到其他系统,例如消息队列 RabbitMQ。 在很多业务场景下,我们需要保证数据变更的顺序性,比如:
- 库存变更: 先有入库,后有出库,如果顺序颠倒,会导致库存数据不准确。
- 订单状态变更: 订单状态必须按照创建 -> 支付 -> 发货 -> 完成的顺序进行,乱序会导致业务流程错误。
然而,在使用 Canal Adapter 将 binlog 数据发送到 RabbitMQ 时,由于各种因素(例如网络延迟、多线程处理、ACK机制等),消息可能会出现乱序,这给业务带来很大的风险。
二、乱序问题分析
导致 Canal Adapter + RabbitMQ 乱序的常见原因包括:
- Canal 并行处理: Canal 可能会使用多线程并行解析 binlog,并将不同的数据变更发送到 RabbitMQ。这可能导致来自同一个数据库表的不同变更事件以不同的速度到达 RabbitMQ。
- 网络延迟: 不同的消息在网络传输中可能会经历不同的延迟,导致先发出的消息后到达。
- RabbitMQ ACK机制: 如果 RabbitMQ 开启了 ACK 机制,消费者在处理完消息后需要向 RabbitMQ 发送 ACK。如果消费者处理消息的速度不一致,或者网络不稳定导致 ACK 丢失,RabbitMQ 可能会重新发送消息,导致消息乱序。
- 消费者多线程处理: 消费者如果使用多线程并行处理消息,也可能导致消息乱序。
三、解决方案:单队列分区键绑定 + 自定义 MessageConverter
为了解决上述问题,我们需要采取一些策略来保证消息的顺序性。 一个比较有效的方案是:
- 单队列: 所有消息都发送到同一个队列,避免由于队列之间的竞争而导致乱序。
- 分区键绑定 (Partitioning Key): 使用一个能够唯一标识数据变更顺序的字段作为分区键,确保具有相同分区键的消息被发送到同一个 RabbitMQ 分区 (如果使用了 RabbitMQ 集群)。这里我们假设rabbitmq是单实例,所以分区键就简化为顺序键。
- 自定义 MessageConverter: 自定义 MessageConverter,将分区键信息添加到消息的 header 中,方便 RabbitMQ 根据分区键进行路由。
- 消费者单线程处理: 保证消费者使用单线程按照接收到的顺序处理消息。
3.1 单队列的配置
在 RabbitMQ 中,我们只需要创建一个队列即可,所有的canal消息都会发送到这个队列。
@Configuration
public class RabbitConfig {
@Bean
public Queue orderQueue() {
return new Queue("order.queue", true);
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
@Bean
public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with("order.routing.key");
}
}
3.2 自定义 MessageConverter
MessageConverter 负责将 Java 对象转换为 RabbitMQ 消息,以及将 RabbitMQ 消息转换为 Java 对象。 我们需要自定义 MessageConverter,将分区键信息添加到消息的 header 中。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import java.util.HashMap;
import java.util.Map;
public class CustomMessageConverter extends AbstractMessageConverter {
private static final String PARTITION_KEY_HEADER = "partitionKey";
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
if (object instanceof OrderChangeEvent) {
OrderChangeEvent event = (OrderChangeEvent) object;
String partitionKey = String.valueOf(event.getOrderId()); // 使用 orderId 作为分区键
messageProperties.setHeader(PARTITION_KEY_HEADER, partitionKey);
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); // 设置消息类型为JSON
try {
byte[] bytes = new ObjectMapper().writeValueAsBytes(event);
return new Message(bytes, messageProperties);
} catch (JsonProcessingException e) {
throw new MessageConversionException("Failed to convert object to JSON", e);
}
}
return null; // 或者抛出异常,取决于你的业务需求
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
try {
return new ObjectMapper().readValue(message.getBody(), OrderChangeEvent.class);
} catch (IOException e) {
throw new MessageConversionException("Failed to convert message to object", e);
}
}
}
这个 CustomMessageConverter 类做了以下几件事:
createMessage()方法:- 判断传入的对象是否是
OrderChangeEvent类型。 - 获取
OrderChangeEvent对象的orderId作为分区键。 - 将分区键添加到消息的 header 中,header 的 key 为 "partitionKey"。
- 将
OrderChangeEvent对象转换为 JSON 字符串,并将其作为消息的 body。
- 判断传入的对象是否是
fromMessage()方法:- 将 RabbitMQ 消息的 body 解析为
OrderChangeEvent对象。
- 将 RabbitMQ 消息的 body 解析为
3.3 配置 RabbitTemplate 使用自定义 MessageConverter
我们需要配置 RabbitTemplate 使用我们自定义的 MessageConverter。
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new CustomMessageConverter());
return rabbitTemplate;
}
}
这样,在使用 rabbitTemplate.convertAndSend() 方法发送消息时,就会使用我们自定义的 CustomMessageConverter 进行消息转换。
3.4 Canal Adapter 配置
你需要配置 Canal Adapter,使其能够将数据变更事件转换为 OrderChangeEvent 对象,并发送到 RabbitMQ。 这里假设你已经配置好了Canal,能够监听到mysql的数据变更。
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient {
private CanalConnector connector;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String DESTINATION = "example";
private static final String ADDRESS = "127.0.0.1";
private static final int PORT = 11111;
private static final String USERNAME = "";
private static final String PASSWORD = "";
@PostConstruct
public void start() {
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ADDRESS,
PORT), DESTINATION, USERNAME, PASSWORD);
try {
connector.connect();
connector.subscribe(".*\..*"); // 订阅所有库的所有表
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
@PreDestroy
public void stop() {
if (connector != null) {
connector.disconnect();
}
}
private void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.UPDATE) {
// 处理更新事件
handleUpdateEvent(entry.getHeader().getTableName(), rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
// 处理插入事件
handleInsertEvent(entry.getHeader().getTableName(), rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.DELETE) {
//处理删除事件
handleDeleteEvent(entry.getHeader().getTableName(), rowData.getBeforeColumnsList());
}
}
}
}
}
private void handleInsertEvent(String tableName, List<CanalEntry.Column> columns) {
if ("orders".equalsIgnoreCase(tableName)) { // 假设orders表是需要关注的表
OrderChangeEvent event = convertColumnsToOrderChangeEvent(columns);
rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", event);
}
}
private void handleUpdateEvent(String tableName, List<CanalEntry.Column> columns) {
if ("orders".equalsIgnoreCase(tableName)) { // 假设orders表是需要关注的表
OrderChangeEvent event = convertColumnsToOrderChangeEvent(columns);
rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", event);
}
}
private void handleDeleteEvent(String tableName, List<CanalEntry.Column> columns) {
if ("orders".equalsIgnoreCase(tableName)) { // 假设orders表是需要关注的表
OrderChangeEvent event = convertColumnsToOrderChangeEvent(columns);
rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", event);
}
}
private OrderChangeEvent convertColumnsToOrderChangeEvent(List<CanalEntry.Column> columns) {
OrderChangeEvent event = new OrderChangeEvent();
for (CanalEntry.Column column : columns) {
if ("order_id".equalsIgnoreCase(column.getName())) {
event.setOrderId(Long.parseLong(column.getValue()));
} else if ("order_status".equalsIgnoreCase(column.getName())) {
event.setOrderStatus(column.getValue());
} // 其他字段的转换
}
return event;
}
}
这段代码的关键在于 convertColumnsToOrderChangeEvent() 方法,它负责将从 Canal 获取的 Column 数据转换为 OrderChangeEvent 对象。 然后使用 rabbitTemplate.convertAndSend() 方法将消息发送到 RabbitMQ。 rabbitTemplate 会使用我们配置的 CustomMessageConverter 将 OrderChangeEvent 对象转换为 RabbitMQ 消息,并将 orderId 作为分区键添加到消息的 header 中。
3.5 消费者端单线程处理
消费者端需要配置单线程处理消息, 避免并发处理带来的乱序。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
@RabbitListener(queues = "order.queue", concurrency = "1") // concurrency = "1" 保证单线程消费
public void receiveOrder(OrderChangeEvent event) {
// 处理订单事件
System.out.println("Received order event: " + event);
// 你的业务逻辑
}
}
@RabbitListener(queues = "order.queue", concurrency = "1") 注解中的 concurrency = "1" 表示使用单线程处理消息。 这样可以保证消息按照接收到的顺序进行处理。
四、OrderChangeEvent 类的定义
import java.io.Serializable;
public class OrderChangeEvent implements Serializable {
private Long orderId;
private String orderStatus;
// Getters and setters
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
public String getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
@Override
public String toString() {
return "OrderChangeEvent{" +
"orderId=" + orderId +
", orderStatus='" + orderStatus + ''' +
'}';
}
}
OrderChangeEvent 类用于封装订单变更事件的数据。 它包含 orderId 和 orderStatus 两个字段,分别表示订单 ID 和订单状态。 orderId 将作为分区键,用于保证具有相同 orderId 的消息被顺序处理。
五、代码总结
以下是完整的示例代码结构:
├── src
│ ├── main
│ │ ├── java
│ │ │ ├── com.example
│ │ │ │ ├── CanalClient.java
│ │ │ │ ├── CustomMessageConverter.java
│ │ │ │ ├── OrderChangeEvent.java
│ │ │ │ ├── OrderConsumer.java
│ │ │ │ ├── RabbitConfig.java
│ │ │ │ ├── RabbitTemplateConfig.java
│ │ │ │ └── Application.java
│ │ ├── resources
│ │ │ └── application.properties
│ └── test
│ └── java
│ └── com.example
│ └── DemoApplicationTests.java
六、测试与验证
- 启动 Canal Adapter 和 RabbitMQ。
- 在 MySQL 数据库中对
orders表进行数据变更 (INSERT, UPDATE, DELETE)。 - 观察 RabbitMQ 消费者端是否按照数据变更的顺序处理消息。
可以通过以下方式验证顺序性:
- 记录消息接收的时间戳: 在消费者端记录每条消息接收到的时间戳,并按照
orderId分组,检查同一组消息的时间戳是否递增。 - 打印消息内容: 在消费者端打印每条消息的内容,观察消息的顺序是否与数据库变更的顺序一致。
七、注意事项
- 分区键的选择: 分区键的选择非常重要。 它必须能够唯一标识数据变更的顺序。 通常情况下,可以使用数据库表的主键作为分区键。
- 消费者端处理能力: 消费者端必须具有足够的处理能力,才能及时处理消息,避免消息堆积。
- 异常处理: 在 Canal Adapter 和消费者端都需要进行完善的异常处理,确保在发生异常时能够及时处理,避免数据丢失或乱序。
- Canal配置: Canal需要正确配置,确保能够准确捕获MySQL的binlog变更。
- 消息重试机制: 结合消息的重试机制,确保消息能够被成功消费,同时避免重试导致乱序。建议使用死信队列来处理消费失败的消息。
消息顺序的保证需要多方面的配合
从canal的binlog解析,到消息的发送,再到消费端的消费,每一个环节都不能出错,才能保证消息的顺序性。
八、总结
通过自定义 MessageConverter 和单队列分区键绑定,我们可以有效地解决 Canal Adapter + RabbitMQ 场景下的消息乱序问题。 这种方案的核心思想是将具有相同分区键的消息发送到同一个 RabbitMQ 分区,并保证消费者端单线程处理消息,从而保证消息的顺序性。 这种方案可以应用于各种需要保证数据变更顺序性的业务场景,例如库存管理、订单状态变更等。