各位观众老爷们,大家好!今天咱们聊聊消息队列里那些“防丢防错”的绝活儿:消息去重、幂等性以及死信队列。这些东西听起来高大上,其实就是为了保证咱们的消息在传递过程中,万一出了点岔子,也能“安全落地”,不至于数据乱套,系统崩溃。
咱们先用大白话解释一下这几个概念:
-
消息去重(Deduplication): 就像你给女朋友发微信,结果手抖点了两下发送,发了两条一模一样的信息。女朋友肯定觉得你抽风了。消息去重就是防止这种情况,确保同样的消息只被消费一次。
-
幂等性(Idempotency): 想象你给银行转账,转100块钱。如果因为网络问题,这条转账请求发了两次,但银行只扣你一次钱,这就是幂等性。也就是说,同样的请求,执行一次和执行多次的效果是一样的。
-
死信队列(DLQ – Dead Letter Queue): 消息队列里,有些消息可能因为各种原因,一直无法被正常消费,比如消费者程序出错了,或者消息格式不对。这些消息就会变成“死信”,被丢到死信队列里,等待人工处理或者进行补偿操作。
好,概念清楚了,咱们就来细聊一下,如何在 Kafka、RabbitMQ 和 Pulsar 这些消息队列里实现这些功能。
一、Kafka:靠谱老大哥的解决方案
Kafka 作为消息队列界的扛把子,性能强悍,吞吐量巨大,自然也考虑到了这些问题。
- 消息去重:Producer Idempotence
Kafka 提供了 Producer 的幂等性来保证消息的去重。简单来说,就是给每个 Producer 分配一个唯一的 ID(producerId
),然后给每个 Producer 发送的消息都加上一个序列号(sequence number
)。这样,Kafka Broker 就能通过 producerId
和 sequence number
来判断消息是否重复。
-
工作原理:
- Producer 启动时,会向 Broker 申请一个
producerId
。 - Producer 发送消息时,会给消息加上
sequence number
,并且每次发送消息后,sequence number
都会递增。 - Broker 收到消息后,会根据
producerId
和sequence number
来判断消息是否已经存在。如果存在,就直接丢弃,避免重复消费。
- Producer 启动时,会向 Broker 申请一个
-
配置方法:
在 Kafka Producer 的配置中,设置enable.idempotence=true
即可开启 Producer 的幂等性。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 必须设置 acks=all,才能保证消息的可靠性
props.put("retries", 3); // 重试次数,防止网络抖动导致消息发送失败
props.put("enable.idempotence", "true"); // 开启 Producer 幂等性
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-
优点:
- 实现简单,配置方便。
- 性能损耗小。
-
缺点:
- 只能保证单个 Producer 在单个 Session 内的消息去重。如果 Producer 重启,
producerId
会发生变化,就无法保证消息的去重了。 - 只能保证 At Least Once 的语义,不能保证 Exactly Once 的语义。
- 只能保证单个 Producer 在单个 Session 内的消息去重。如果 Producer 重启,
- 消费端幂等性:自行实现
Kafka 本身并没有提供消费端的幂等性保证,需要咱们自己在消费端实现。常见的做法是:
- 数据库唯一约束: 将消息的关键信息(比如订单号)作为数据库的唯一约束,每次消费消息时,先查询数据库中是否存在该订单号,如果存在,就直接忽略,避免重复处理。
// 假设有一个订单服务
public class OrderService {
private final JdbcTemplate jdbcTemplate;
public OrderService(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public void processOrder(String orderId, String orderDetails) {
// 1. 检查订单是否已经处理过
String sql = "SELECT COUNT(*) FROM orders WHERE order_id = ?";
int count = jdbcTemplate.queryForObject(sql, Integer.class, orderId);
if (count > 0) {
// 订单已经处理过,直接忽略
System.out.println("订单 " + orderId + " 已经处理过,忽略...");
return;
}
// 2. 处理订单
System.out.println("开始处理订单 " + orderId + ",详情:" + orderDetails);
// 模拟处理订单的逻辑
try {
Thread.sleep(100); // 模拟处理时间
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3. 将订单信息保存到数据库
String insertSql = "INSERT INTO orders (order_id, order_details, create_time) VALUES (?, ?, NOW())";
jdbcTemplate.update(insertSql, orderId, orderDetails);
System.out.println("订单 " + orderId + " 处理完成!");
}
}
- Redis 缓存: 将消息的关键信息存入 Redis 缓存中,每次消费消息时,先检查 Redis 中是否存在该信息,如果存在,就直接忽略。
// 使用 Redis 实现幂等性
public class OrderService {
private final RedisTemplate<String, String> redisTemplate;
public OrderService(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void processOrder(String orderId, String orderDetails) {
// 1. 检查订单是否已经处理过
String key = "order:" + orderId;
Boolean exists = redisTemplate.hasKey(key);
if (exists != null && exists) {
// 订单已经处理过,直接忽略
System.out.println("订单 " + orderId + " 已经处理过,忽略...");
return;
}
// 2. 处理订单
System.out.println("开始处理订单 " + orderId + ",详情:" + orderDetails);
// 模拟处理订单的逻辑
try {
Thread.sleep(100); // 模拟处理时间
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3. 将订单信息保存到 Redis
redisTemplate.opsForValue().set(key, "processed", 1, TimeUnit.HOURS); // 设置过期时间,防止 Redis 占用过多内存
System.out.println("订单 " + orderId + " 处理完成!");
}
}
- Token 机制: Producer 在发送消息时,带上一个唯一的 Token,Consumer 在消费消息时,先验证 Token 是否有效,如果有效,则消费消息,并将 Token 标记为已消费。
- 死信队列:手动实现
Kafka 本身并没有提供原生的死信队列功能,需要咱们自己在消费端实现。
- 实现方法:
- 在消费端捕获异常,如果消息消费失败,就将消息发送到另一个 Topic,作为死信队列。
- 可以设置一个最大重试次数,如果消息重试多次仍然失败,就将消息发送到死信队列。
// Kafka 消费者示例,手动实现死信队列
public class KafkaConsumerExample {
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer; // 用于发送到死信队列
private final String deadLetterTopic;
private final int maxRetryAttempts;
public KafkaConsumerExample(KafkaConsumer<String, String> consumer, KafkaProducer<String, String> producer, String deadLetterTopic, int maxRetryAttempts) {
this.consumer = consumer;
this.producer = producer;
this.deadLetterTopic = deadLetterTopic;
this.maxRetryAttempts = maxRetryAttempts;
}
public void consumeMessages(String topic) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
int retryCount = 0; // 初始重试次数
while (retryCount <= maxRetryAttempts) {
try {
// 模拟处理消息
processMessage(key, value);
consumer.commitSync(); // 消费成功,提交 offset
System.out.println("消息处理成功,key: " + key + ", value: " + value);
break; // 成功处理消息后退出循环
} catch (Exception e) {
System.err.println("处理消息失败,key: " + key + ", value: " + value + ", 尝试次数: " + (retryCount + 1) + ", 错误信息: " + e.getMessage());
retryCount++;
try {
Thread.sleep(1000); // 短暂休眠后重试
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
if (retryCount > maxRetryAttempts) {
// 达到最大重试次数,将消息发送到死信队列
sendToDeadLetterQueue(key, value, "消息处理失败,达到最大重试次数");
consumer.commitSync(); // 也要提交 offset,防止重复消费
}
}
}
}
private void processMessage(String key, String value) throws Exception {
// 模拟消息处理逻辑,可能会抛出异常
if (value.contains("error")) {
throw new RuntimeException("模拟处理消息时发生错误");
}
System.out.println("处理消息,key: " + key + ", value: " + value);
}
private void sendToDeadLetterQueue(String key, String value, String errorMessage) {
// 将消息发送到死信队列
ProducerRecord<String, String> deadLetterRecord = new ProducerRecord<>(deadLetterTopic, key, value);
producer.send(deadLetterRecord, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送到死信队列失败,key: " + key + ", value: " + value + ", 错误信息: " + exception.getMessage());
} else {
System.out.println("消息已发送到死信队列,topic: " + deadLetterTopic + ", key: " + key + ", value: " + value);
}
});
producer.flush(); // 确保消息发送到 Kafka
}
public static void main(String[] args) {
// Kafka 消费者配置
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Kafka 生产者配置 (用于死信队列)
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String topic = "my-topic"; // 原始 Topic
String deadLetterTopic = "my-topic-dlq"; // 死信队列 Topic
int maxRetryAttempts = 3; // 最大重试次数
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
KafkaConsumerExample example = new KafkaConsumerExample(consumer, producer, deadLetterTopic, maxRetryAttempts);
example.consumeMessages(topic);
}
}
二、RabbitMQ:灵活小能手的解决方案
RabbitMQ 以其灵活的路由策略和强大的消息确认机制著称,在消息去重、幂等性和死信队列方面也有一套自己的解决方案。
- 消息去重:Message Id 或 Correlation Id
RabbitMQ 本身并没有提供原生的消息去重机制,需要咱们自己在 Producer 和 Consumer 端配合实现。
-
Message Id: Producer 在发送消息时,给消息设置一个唯一的 Message Id,Consumer 在消费消息时,先检查该 Message Id 是否已经存在,如果存在,就直接忽略。
-
Correlation Id: 适用于需要回复的场景。Producer 在发送消息时,给消息设置一个唯一的 Correlation Id,Consumer 在处理完消息后,将结果发送回 Producer,并且带上相同的 Correlation Id。Producer 收到回复后,就可以根据 Correlation Id 来判断消息是否已经处理过。
- 消费端幂等性:与 Kafka 类似
RabbitMQ 的消费端幂等性实现方式与 Kafka 类似,可以使用数据库唯一约束、Redis 缓存、Token 机制等方法。
- 死信队列:DLX(Dead Letter Exchange)
RabbitMQ 提供了原生的死信队列功能,通过 DLX(Dead Letter Exchange)来实现。
-
工作原理:
- 当消息被 Consumer 拒绝(
basic.reject
或basic.nack
)并且requeue=false
时,或者消息过期时,RabbitMQ 会将消息重新发布到 DLX。 - DLX 实际上就是一个普通的 Exchange,可以绑定一个或多个 Queue,这些 Queue 就是死信队列。
- 当消息被 Consumer 拒绝(
-
配置方法:
- 在声明 Queue 时,可以设置
x-dead-letter-exchange
参数,指定 DLX。 - 可以设置
x-dead-letter-routing-key
参数,指定消息被路由到 DLX 时的 Routing Key。
- 在声明 Queue 时,可以设置
// RabbitMQ 声明队列和死信队列
public class RabbitMQDeclare {
private static final String QUEUE_NAME = "my_queue";
private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 1. 声明死信队列和死信交换机
channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT, true); // 声明死信交换机
channel.queueDeclare(DEAD_LETTER_QUEUE, true, false, false, null); // 声明死信队列
channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "dead_letter_routing_key"); // 绑定死信队列到死信交换机
// 2. 声明正常队列,并指定死信交换机
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 指定死信交换机
argsMap.put("x-dead-letter-routing-key", "dead_letter_routing_key"); // 指定routing key
channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap); // 声明队列,并设置死信交换机
System.out.println("队列和死信队列声明完成!");
}
}
}
// RabbitMQ 消费者示例,处理消息并发送到死信队列
public class RabbitMQConsumer {
private static final String QUEUE_NAME = "my_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.basicConsume(QUEUE_NAME, false, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息: " + message);
try {
// 模拟处理消息,可能会抛出异常
processMessage(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 消息处理成功,确认消息
System.out.println("消息处理成功并确认: " + message);
} catch (Exception e) {
System.err.println("消息处理失败: " + message + ", 错误信息: " + e.getMessage());
// 消息处理失败,拒绝消息并 requeue=false,消息会被发送到死信队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
System.out.println("消息处理失败并拒绝,已发送到死信队列: " + message);
}
}, consumerTag -> { });
System.out.println("等待接收消息...");
Thread.sleep(Long.MAX_VALUE); // 保持消费者运行
}
}
private static void processMessage(String message) throws Exception {
// 模拟消息处理逻辑,可能会抛出异常
if (message.contains("error")) {
throw new RuntimeException("模拟处理消息时发生错误");
}
System.out.println("处理消息: " + message);
}
}
// RabbitMQ 生产者示例,发送消息到队列
public class RabbitMQProducer {
private static final String QUEUE_NAME = "my_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
String message = "Hello, RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送消息: " + message);
message = "This is an error message!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送消息: " + message);
System.out.println("消息发送完成!");
}
}
}
三、Pulsar:后起之秀的解决方案
Pulsar 作为新一代的分布式消息队列,在设计上就考虑到了消息的可靠性和一致性,提供了更完善的解决方案。
- 消息去重:Message Deduplication
Pulsar 提供了原生的消息去重功能,通过配置 enableDeduplication=true
来开启。
-
工作原理:
- Pulsar Broker 会为每个 Producer 分配一个唯一的 Producer ID。
- Producer 发送的消息会带上一个序列号(Sequence ID)。
- Broker 会根据 Producer ID 和 Sequence ID 来判断消息是否重复。
-
配置方法:
- 在 Pulsar Broker 的配置中,设置
enableDeduplication=true
。 - 在 Producer 的配置中,设置
producerConfiguration.setEnableDeduplication(true)
。
- 在 Pulsar Broker 的配置中,设置
// Pulsar Producer 示例,开启消息去重
public class PulsarProducerExample {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.producerName("my-producer") // 设置 Producer Name
.enableBatching(false) // 禁用批量处理,方便演示
.enableDeduplication(true) // 开启消息去重
.create();
try {
// 发送重复的消息
producer.send("Message 1".getBytes());
producer.send("Message 1".getBytes());
producer.send("Message 2".getBytes());
System.out.println("消息发送完成!");
} finally {
producer.close();
client.close();
}
}
}
- 消费端幂等性:与 Kafka 类似
Pulsar 的消费端幂等性实现方式与 Kafka 类似,可以使用数据库唯一约束、Redis 缓存、Token 机制等方法。
- 死信队列:Retry Letter Topic 和 Dead Letter Topic
Pulsar 提供了 Retry Letter Topic 和 Dead Letter Topic 来实现死信队列功能。
-
Retry Letter Topic: 当消息消费失败时,Pulsar 会将消息发送到 Retry Letter Topic,等待一段时间后重新消费。
-
Dead Letter Topic: 当消息重试多次仍然失败时,Pulsar 会将消息发送到 Dead Letter Topic,等待人工处理。
-
配置方法:
- 在 Consumer 的配置中,可以设置
negativeAckRedeliveryDelay
参数,指定消息重试的间隔时间。 - 可以设置
maxRedeliverCount
参数,指定消息的最大重试次数。 - 可以设置
deadLetterPolicy
参数,指定 Dead Letter Topic 的名称。
- 在 Consumer 的配置中,可以设置
// Pulsar Consumer 示例,配置 Retry Letter Topic 和 Dead Letter Topic
public class PulsarConsumerExample {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.negativeAckRedeliveryDelay(1, TimeUnit.MINUTES) // 设置重试间隔时间为 1 分钟
.maxRedeliverCount(3) // 设置最大重试次数为 3 次
.deadLetterPolicy(DeadLetterPolicy.builder()
.deadLetterTopic("my-topic-dlq") // 设置 Dead Letter Topic 名称
.build())
.subscribe();
while (true) {
try {
Message<byte[]> message = consumer.receive();
String messageContent = new String(message.getData());
System.out.println("接收到消息: " + messageContent);
try {
// 模拟处理消息,可能会抛出异常
processMessage(messageContent);
consumer.acknowledge(message); // 消息处理成功,确认消息
System.out.println("消息处理成功并确认: " + messageContent);
} catch (Exception e) {
System.err.println("消息处理失败: " + messageContent + ", 错误信息: " + e.getMessage());
consumer.negativeAcknowledge(message); // 消息处理失败,否定确认消息,消息会被发送到 Retry Letter Topic
System.out.println("消息处理失败并否定确认: " + messageContent);
}
} catch (PulsarClientException e) {
System.err.println("接收消息失败: " + e.getMessage());
}
}
}
private static void processMessage(String message) throws Exception {
// 模拟消息处理逻辑,可能会抛出异常
if (message.contains("error")) {
throw new RuntimeException("模拟处理消息时发生错误");
}
System.out.println("处理消息: " + message);
}
}
四、总结:选择适合自己的方案
不同的消息队列,在消息去重、幂等性和死信队列方面提供了不同的解决方案。咱们需要根据自己的业务场景和需求,选择合适的方案。
特性 | Kafka | RabbitMQ | Pulsar |
---|---|---|---|
消息去重 | Producer Idempotence (仅 Producer 端) | Message Id 或 Correlation Id (需自行实现) | Message Deduplication (Broker 端支持) |
幂等性 | 消费端自行实现 | 消费端自行实现 | 消费端自行实现 |
死信队列 | 手动实现 | DLX (Dead Letter Exchange) | Retry Letter Topic 和 Dead Letter Topic |
总的来说:
- Kafka: 适合高吞吐量、高可靠性的场景,需要自己实现消费端幂等性和死信队列。
- RabbitMQ: 适合对消息的路由和可靠性有较高要求的场景,提供了原生的死信队列功能。
- Pulsar: 适合需要消息去重和多租户的场景,提供了更完善的死信队列功能。
好了,今天的讲座就到这里。希望大家能够理解消息队列里这些“防丢防错”的绝活儿,并在实际项目中灵活运用,保证咱们的消息能够“安全落地”,系统稳定运行! 谢谢大家!