Java `Message Queue` (`Kafka`, `RabbitMQ`, `Pulsar`) `Deduplication`, `Idempotency`, `DLQ`

各位观众老爷们,大家好!今天咱们聊聊消息队列里那些“防丢防错”的绝活儿:消息去重、幂等性以及死信队列。这些东西听起来高大上,其实就是为了保证咱们的消息在传递过程中,万一出了点岔子,也能“安全落地”,不至于数据乱套,系统崩溃。

咱们先用大白话解释一下这几个概念:

  • 消息去重(Deduplication): 就像你给女朋友发微信,结果手抖点了两下发送,发了两条一模一样的信息。女朋友肯定觉得你抽风了。消息去重就是防止这种情况,确保同样的消息只被消费一次。

  • 幂等性(Idempotency): 想象你给银行转账,转100块钱。如果因为网络问题,这条转账请求发了两次,但银行只扣你一次钱,这就是幂等性。也就是说,同样的请求,执行一次和执行多次的效果是一样的。

  • 死信队列(DLQ – Dead Letter Queue): 消息队列里,有些消息可能因为各种原因,一直无法被正常消费,比如消费者程序出错了,或者消息格式不对。这些消息就会变成“死信”,被丢到死信队列里,等待人工处理或者进行补偿操作。

好,概念清楚了,咱们就来细聊一下,如何在 Kafka、RabbitMQ 和 Pulsar 这些消息队列里实现这些功能。

一、Kafka:靠谱老大哥的解决方案

Kafka 作为消息队列界的扛把子,性能强悍,吞吐量巨大,自然也考虑到了这些问题。

  1. 消息去重:Producer Idempotence

Kafka 提供了 Producer 的幂等性来保证消息的去重。简单来说,就是给每个 Producer 分配一个唯一的 ID(producerId),然后给每个 Producer 发送的消息都加上一个序列号(sequence number)。这样,Kafka Broker 就能通过 producerIdsequence number 来判断消息是否重复。

  • 工作原理:

    • Producer 启动时,会向 Broker 申请一个 producerId
    • Producer 发送消息时,会给消息加上 sequence number,并且每次发送消息后,sequence number 都会递增。
    • Broker 收到消息后,会根据 producerIdsequence number 来判断消息是否已经存在。如果存在,就直接丢弃,避免重复消费。
  • 配置方法:
    在 Kafka Producer 的配置中,设置 enable.idempotence=true 即可开启 Producer 的幂等性。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 必须设置 acks=all,才能保证消息的可靠性
props.put("retries", 3); // 重试次数,防止网络抖动导致消息发送失败
props.put("enable.idempotence", "true"); // 开启 Producer 幂等性
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  • 优点:

    • 实现简单,配置方便。
    • 性能损耗小。
  • 缺点:

    • 只能保证单个 Producer 在单个 Session 内的消息去重。如果 Producer 重启,producerId 会发生变化,就无法保证消息的去重了。
    • 只能保证 At Least Once 的语义,不能保证 Exactly Once 的语义。
  1. 消费端幂等性:自行实现

Kafka 本身并没有提供消费端的幂等性保证,需要咱们自己在消费端实现。常见的做法是:

  • 数据库唯一约束: 将消息的关键信息(比如订单号)作为数据库的唯一约束,每次消费消息时,先查询数据库中是否存在该订单号,如果存在,就直接忽略,避免重复处理。
// 假设有一个订单服务
public class OrderService {

    private final JdbcTemplate jdbcTemplate;

    public OrderService(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    public void processOrder(String orderId, String orderDetails) {
        // 1. 检查订单是否已经处理过
        String sql = "SELECT COUNT(*) FROM orders WHERE order_id = ?";
        int count = jdbcTemplate.queryForObject(sql, Integer.class, orderId);

        if (count > 0) {
            // 订单已经处理过,直接忽略
            System.out.println("订单 " + orderId + " 已经处理过,忽略...");
            return;
        }

        // 2. 处理订单
        System.out.println("开始处理订单 " + orderId + ",详情:" + orderDetails);

        // 模拟处理订单的逻辑
        try {
            Thread.sleep(100); // 模拟处理时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 3. 将订单信息保存到数据库
        String insertSql = "INSERT INTO orders (order_id, order_details, create_time) VALUES (?, ?, NOW())";
        jdbcTemplate.update(insertSql, orderId, orderDetails);

        System.out.println("订单 " + orderId + " 处理完成!");
    }
}
  • Redis 缓存: 将消息的关键信息存入 Redis 缓存中,每次消费消息时,先检查 Redis 中是否存在该信息,如果存在,就直接忽略。
// 使用 Redis 实现幂等性
public class OrderService {

    private final RedisTemplate<String, String> redisTemplate;

    public OrderService(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void processOrder(String orderId, String orderDetails) {
        // 1. 检查订单是否已经处理过
        String key = "order:" + orderId;
        Boolean exists = redisTemplate.hasKey(key);

        if (exists != null && exists) {
            // 订单已经处理过,直接忽略
            System.out.println("订单 " + orderId + " 已经处理过,忽略...");
            return;
        }

        // 2. 处理订单
        System.out.println("开始处理订单 " + orderId + ",详情:" + orderDetails);

        // 模拟处理订单的逻辑
        try {
            Thread.sleep(100); // 模拟处理时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 3. 将订单信息保存到 Redis
        redisTemplate.opsForValue().set(key, "processed", 1, TimeUnit.HOURS); // 设置过期时间,防止 Redis 占用过多内存

        System.out.println("订单 " + orderId + " 处理完成!");
    }
}
  • Token 机制: Producer 在发送消息时,带上一个唯一的 Token,Consumer 在消费消息时,先验证 Token 是否有效,如果有效,则消费消息,并将 Token 标记为已消费。
  1. 死信队列:手动实现

Kafka 本身并没有提供原生的死信队列功能,需要咱们自己在消费端实现。

  • 实现方法:
    • 在消费端捕获异常,如果消息消费失败,就将消息发送到另一个 Topic,作为死信队列。
    • 可以设置一个最大重试次数,如果消息重试多次仍然失败,就将消息发送到死信队列。
// Kafka 消费者示例,手动实现死信队列
public class KafkaConsumerExample {

    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> producer; // 用于发送到死信队列
    private final String deadLetterTopic;
    private final int maxRetryAttempts;

    public KafkaConsumerExample(KafkaConsumer<String, String> consumer, KafkaProducer<String, String> producer, String deadLetterTopic, int maxRetryAttempts) {
        this.consumer = consumer;
        this.producer = producer;
        this.deadLetterTopic = deadLetterTopic;
        this.maxRetryAttempts = maxRetryAttempts;
    }

    public void consumeMessages(String topic) {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                String key = record.key();
                String value = record.value();
                int retryCount = 0; // 初始重试次数

                while (retryCount <= maxRetryAttempts) {
                    try {
                        // 模拟处理消息
                        processMessage(key, value);
                        consumer.commitSync(); // 消费成功,提交 offset
                        System.out.println("消息处理成功,key: " + key + ", value: " + value);
                        break; // 成功处理消息后退出循环
                    } catch (Exception e) {
                        System.err.println("处理消息失败,key: " + key + ", value: " + value + ", 尝试次数: " + (retryCount + 1) + ", 错误信息: " + e.getMessage());
                        retryCount++;

                        try {
                            Thread.sleep(1000); // 短暂休眠后重试
                        } catch (InterruptedException ex) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }

                if (retryCount > maxRetryAttempts) {
                    // 达到最大重试次数,将消息发送到死信队列
                    sendToDeadLetterQueue(key, value, "消息处理失败,达到最大重试次数");
                    consumer.commitSync(); // 也要提交 offset,防止重复消费
                }
            }
        }
    }

    private void processMessage(String key, String value) throws Exception {
        // 模拟消息处理逻辑,可能会抛出异常
        if (value.contains("error")) {
            throw new RuntimeException("模拟处理消息时发生错误");
        }
        System.out.println("处理消息,key: " + key + ", value: " + value);
    }

    private void sendToDeadLetterQueue(String key, String value, String errorMessage) {
        // 将消息发送到死信队列
        ProducerRecord<String, String> deadLetterRecord = new ProducerRecord<>(deadLetterTopic, key, value);
        producer.send(deadLetterRecord, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("发送到死信队列失败,key: " + key + ", value: " + value + ", 错误信息: " + exception.getMessage());
            } else {
                System.out.println("消息已发送到死信队列,topic: " + deadLetterTopic + ", key: " + key + ", value: " + value);
            }
        });
        producer.flush(); // 确保消息发送到 Kafka
    }

    public static void main(String[] args) {
        // Kafka 消费者配置
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "my-group");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // Kafka 生产者配置 (用于死信队列)
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        String topic = "my-topic"; // 原始 Topic
        String deadLetterTopic = "my-topic-dlq"; // 死信队列 Topic
        int maxRetryAttempts = 3; // 最大重试次数

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

        KafkaConsumerExample example = new KafkaConsumerExample(consumer, producer, deadLetterTopic, maxRetryAttempts);
        example.consumeMessages(topic);
    }
}

二、RabbitMQ:灵活小能手的解决方案

RabbitMQ 以其灵活的路由策略和强大的消息确认机制著称,在消息去重、幂等性和死信队列方面也有一套自己的解决方案。

  1. 消息去重:Message Id 或 Correlation Id

RabbitMQ 本身并没有提供原生的消息去重机制,需要咱们自己在 Producer 和 Consumer 端配合实现。

  • Message Id: Producer 在发送消息时,给消息设置一个唯一的 Message Id,Consumer 在消费消息时,先检查该 Message Id 是否已经存在,如果存在,就直接忽略。

  • Correlation Id: 适用于需要回复的场景。Producer 在发送消息时,给消息设置一个唯一的 Correlation Id,Consumer 在处理完消息后,将结果发送回 Producer,并且带上相同的 Correlation Id。Producer 收到回复后,就可以根据 Correlation Id 来判断消息是否已经处理过。

  1. 消费端幂等性:与 Kafka 类似

RabbitMQ 的消费端幂等性实现方式与 Kafka 类似,可以使用数据库唯一约束、Redis 缓存、Token 机制等方法。

  1. 死信队列:DLX(Dead Letter Exchange)

RabbitMQ 提供了原生的死信队列功能,通过 DLX(Dead Letter Exchange)来实现。

  • 工作原理:

    • 当消息被 Consumer 拒绝(basic.rejectbasic.nack)并且 requeue=false 时,或者消息过期时,RabbitMQ 会将消息重新发布到 DLX。
    • DLX 实际上就是一个普通的 Exchange,可以绑定一个或多个 Queue,这些 Queue 就是死信队列。
  • 配置方法:

    • 在声明 Queue 时,可以设置 x-dead-letter-exchange 参数,指定 DLX。
    • 可以设置 x-dead-letter-routing-key 参数,指定消息被路由到 DLX 时的 Routing Key。
// RabbitMQ 声明队列和死信队列
public class RabbitMQDeclare {

    private static final String QUEUE_NAME = "my_queue";
    private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    private static final String DEAD_LETTER_QUEUE = "dead_letter_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 1. 声明死信队列和死信交换机
            channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT, true); // 声明死信交换机
            channel.queueDeclare(DEAD_LETTER_QUEUE, true, false, false, null); // 声明死信队列
            channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "dead_letter_routing_key"); // 绑定死信队列到死信交换机

            // 2. 声明正常队列,并指定死信交换机
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // 指定死信交换机
            argsMap.put("x-dead-letter-routing-key", "dead_letter_routing_key"); // 指定routing key

            channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap); // 声明队列,并设置死信交换机

            System.out.println("队列和死信队列声明完成!");
        }
    }
}

// RabbitMQ 消费者示例,处理消息并发送到死信队列
public class RabbitMQConsumer {

    private static final String QUEUE_NAME = "my_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.basicConsume(QUEUE_NAME, false, (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("接收到消息: " + message);

                try {
                    // 模拟处理消息,可能会抛出异常
                    processMessage(message);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 消息处理成功,确认消息
                    System.out.println("消息处理成功并确认: " + message);
                } catch (Exception e) {
                    System.err.println("消息处理失败: " + message + ", 错误信息: " + e.getMessage());
                    // 消息处理失败,拒绝消息并 requeue=false,消息会被发送到死信队列
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                    System.out.println("消息处理失败并拒绝,已发送到死信队列: " + message);
                }

            }, consumerTag -> { });

            System.out.println("等待接收消息...");
            Thread.sleep(Long.MAX_VALUE); // 保持消费者运行
        }
    }

    private static void processMessage(String message) throws Exception {
        // 模拟消息处理逻辑,可能会抛出异常
        if (message.contains("error")) {
            throw new RuntimeException("模拟处理消息时发生错误");
        }
        System.out.println("处理消息: " + message);
    }
}

// RabbitMQ 生产者示例,发送消息到队列
public class RabbitMQProducer {

    private static final String QUEUE_NAME = "my_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            String message = "Hello, RabbitMQ!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("发送消息: " + message);

            message = "This is an error message!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("发送消息: " + message);

            System.out.println("消息发送完成!");
        }
    }
}

三、Pulsar:后起之秀的解决方案

Pulsar 作为新一代的分布式消息队列,在设计上就考虑到了消息的可靠性和一致性,提供了更完善的解决方案。

  1. 消息去重:Message Deduplication

Pulsar 提供了原生的消息去重功能,通过配置 enableDeduplication=true 来开启。

  • 工作原理:

    • Pulsar Broker 会为每个 Producer 分配一个唯一的 Producer ID。
    • Producer 发送的消息会带上一个序列号(Sequence ID)。
    • Broker 会根据 Producer ID 和 Sequence ID 来判断消息是否重复。
  • 配置方法:

    • 在 Pulsar Broker 的配置中,设置 enableDeduplication=true
    • 在 Producer 的配置中,设置 producerConfiguration.setEnableDeduplication(true)
// Pulsar Producer 示例,开启消息去重
public class PulsarProducerExample {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Producer<byte[]> producer = client.newProducer()
                .topic("my-topic")
                .producerName("my-producer") // 设置 Producer Name
                .enableBatching(false) // 禁用批量处理,方便演示
                .enableDeduplication(true) // 开启消息去重
                .create();

        try {
            // 发送重复的消息
            producer.send("Message 1".getBytes());
            producer.send("Message 1".getBytes());
            producer.send("Message 2".getBytes());

            System.out.println("消息发送完成!");
        } finally {
            producer.close();
            client.close();
        }
    }
}
  1. 消费端幂等性:与 Kafka 类似

Pulsar 的消费端幂等性实现方式与 Kafka 类似,可以使用数据库唯一约束、Redis 缓存、Token 机制等方法。

  1. 死信队列:Retry Letter Topic 和 Dead Letter Topic

Pulsar 提供了 Retry Letter Topic 和 Dead Letter Topic 来实现死信队列功能。

  • Retry Letter Topic: 当消息消费失败时,Pulsar 会将消息发送到 Retry Letter Topic,等待一段时间后重新消费。

  • Dead Letter Topic: 当消息重试多次仍然失败时,Pulsar 会将消息发送到 Dead Letter Topic,等待人工处理。

  • 配置方法:

    • 在 Consumer 的配置中,可以设置 negativeAckRedeliveryDelay 参数,指定消息重试的间隔时间。
    • 可以设置 maxRedeliverCount 参数,指定消息的最大重试次数。
    • 可以设置 deadLetterPolicy 参数,指定 Dead Letter Topic 的名称。
// Pulsar Consumer 示例,配置 Retry Letter Topic 和 Dead Letter Topic
public class PulsarConsumerExample {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Consumer<byte[]> consumer = client.newConsumer()
                .topic("my-topic")
                .subscriptionName("my-subscription")
                .negativeAckRedeliveryDelay(1, TimeUnit.MINUTES) // 设置重试间隔时间为 1 分钟
                .maxRedeliverCount(3) // 设置最大重试次数为 3 次
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .deadLetterTopic("my-topic-dlq") // 设置 Dead Letter Topic 名称
                        .build())
                .subscribe();

        while (true) {
            try {
                Message<byte[]> message = consumer.receive();
                String messageContent = new String(message.getData());
                System.out.println("接收到消息: " + messageContent);

                try {
                    // 模拟处理消息,可能会抛出异常
                    processMessage(messageContent);
                    consumer.acknowledge(message); // 消息处理成功,确认消息
                    System.out.println("消息处理成功并确认: " + messageContent);
                } catch (Exception e) {
                    System.err.println("消息处理失败: " + messageContent + ", 错误信息: " + e.getMessage());
                    consumer.negativeAcknowledge(message); // 消息处理失败,否定确认消息,消息会被发送到 Retry Letter Topic
                    System.out.println("消息处理失败并否定确认: " + messageContent);
                }
            } catch (PulsarClientException e) {
                System.err.println("接收消息失败: " + e.getMessage());
            }
        }
    }

    private static void processMessage(String message) throws Exception {
        // 模拟消息处理逻辑,可能会抛出异常
        if (message.contains("error")) {
            throw new RuntimeException("模拟处理消息时发生错误");
        }
        System.out.println("处理消息: " + message);
    }
}

四、总结:选择适合自己的方案

不同的消息队列,在消息去重、幂等性和死信队列方面提供了不同的解决方案。咱们需要根据自己的业务场景和需求,选择合适的方案。

特性 Kafka RabbitMQ Pulsar
消息去重 Producer Idempotence (仅 Producer 端) Message Id 或 Correlation Id (需自行实现) Message Deduplication (Broker 端支持)
幂等性 消费端自行实现 消费端自行实现 消费端自行实现
死信队列 手动实现 DLX (Dead Letter Exchange) Retry Letter Topic 和 Dead Letter Topic

总的来说:

  • Kafka: 适合高吞吐量、高可靠性的场景,需要自己实现消费端幂等性和死信队列。
  • RabbitMQ: 适合对消息的路由和可靠性有较高要求的场景,提供了原生的死信队列功能。
  • Pulsar: 适合需要消息去重和多租户的场景,提供了更完善的死信队列功能。

好了,今天的讲座就到这里。希望大家能够理解消息队列里这些“防丢防错”的绝活儿,并在实际项目中灵活运用,保证咱们的消息能够“安全落地”,系统稳定运行! 谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注