PHP `RabbitMQ` / `Kafka` 消息队列:异步通信与削峰填谷

各位观众,各位朋友,大家好!我是你们的老朋友,Bug终结者,今天咱们来聊聊PHP世界里的消息队列,特别是RabbitMQ和Kafka这两位大佬。

今天的主题是:PHP RabbitMQ / Kafka 消息队列:异步通信与削峰填谷。

先声明一下,我说的都是我踩过的坑,趟过的河,绝对真材实料,童叟无欺。

一、啥是消息队列?为啥要用它?

想象一下,你经营一家餐厅,顾客点餐(请求),厨师做菜(处理),服务员上菜(响应)。 如果厨师只有一个,顾客又特别多,那是不是得排队?顾客是不是要骂娘?

消息队列就相当于一个“传菜员”,顾客点餐后,把菜单(消息)交给传菜员,传菜员按照顺序(FIFO,先进先出)或者其他规则(优先级)把菜单给厨师。 这样,顾客不用等着厨师做菜,可以先去玩手机,厨师也不用管顾客啥时候来的,只管按照传菜员给的菜单做菜就行了。

用专业的术语来说,消息队列是一种异步通信机制。它允许不同的应用程序或服务之间通过消息进行通信,而无需直接相互调用。

使用消息队列的好处多多:

  • 解耦: 各个服务之间不需要直接依赖,降低了系统的耦合性。你可以随意更换厨师(服务),只要他能看懂菜单(消息格式)就行。
  • 异步: 请求发出后,不需要等待响应,提高了系统的响应速度。顾客点了菜就可以去玩手机了,不用一直盯着厨师。
  • 削峰填谷: 当请求量剧增时,消息队列可以缓存请求,避免系统崩溃。就像餐厅可以把顾客点的菜先记下来,等厨师忙完再做。
  • 可靠性: 消息队列通常会持久化消息,即使系统崩溃,消息也不会丢失。菜单都记在小本本上了,不怕丢。
  • 可扩展性: 可以轻松地增加或减少服务实例,提高系统的处理能力。多请几个厨师就行了。

二、RabbitMQ:轻量级消息队列的典范

RabbitMQ 是一个流行的开源消息队列,基于 AMQP(Advanced Message Queuing Protocol)协议。 它以其易用性、灵活性和可靠性而闻名。

咱们先来个简单的 RabbitMQ 示例:

  1. 安装 RabbitMQ: 这个就略过了,网上教程很多,根据你的操作系统选择合适的安装方式。

  2. 安装 PHP AMQP 扩展:

    sudo apt-get install php-amqp  # Debian/Ubuntu
    sudo yum install php-pecl-amqp  # CentOS/RHEL

    或者使用 PECL:

    pecl install amqp

    安装完记得重启 PHP-FPM 或者 Apache。

  3. 生产者 (Producer): 发送消息

    <?php
    
    $host = 'localhost';
    $port = 5672;
    $user = 'guest';
    $password = 'guest';
    $vhost = '/';
    
    try {
        $conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
        $channel = $conn->channel();
    
        $exchangeName = 'my_exchange';
        $queueName = 'my_queue';
    
        // 声明交换机
        $channel->exchange_declare($exchangeName, 'direct', false, false, false);
    
        // 声明队列
        $channel->queue_declare($queueName, false, false, false, false);
    
        // 绑定交换机和队列
        $channel->queue_bind($queueName, $exchangeName);
    
        $messageBody = 'Hello, RabbitMQ!';
    
        $message = new AMQPMessage($messageBody, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT  // 消息持久化
        ]);
    
        $channel->basic_publish($message, $exchangeName);
    
        echo " [x] Sent '" . $messageBody . "'n";
    
        $channel->close();
        $conn->close();
    
    } catch (Exception $e) {
        echo 'Caught exception: ',  $e->getMessage(), "n";
    }
    
    ?>

    代码解释:

    • AMQPStreamConnection: 建立到 RabbitMQ 服务器的连接。
    • channel(): 创建一个通道,用于执行 AMQP 命令。
    • exchange_declare(): 声明一个交换机。 direct 类型的交换机根据路由键将消息发送到匹配的队列。
    • queue_declare(): 声明一个队列。
    • queue_bind(): 将队列绑定到交换机,并指定路由键。 这里没有显式指定路由键,默认使用队列名作为路由键。
    • AMQPMessage: 创建一个消息对象,包含消息内容和属性。
    • delivery_mode => AMQPMessage::DELIVERY_MODE_PERSISTENT: 将消息设置为持久化,即使 RabbitMQ 重启,消息也不会丢失。
    • basic_publish(): 发布消息到交换机。
  4. 消费者 (Consumer): 接收消息

    <?php
    
    $host = 'localhost';
    $port = 5672;
    $user = 'guest';
    $password = 'guest';
    $vhost = '/';
    
    try {
        $conn = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
        $channel = $conn->channel();
    
        $exchangeName = 'my_exchange';
        $queueName = 'my_queue';
    
        // 声明队列(确保队列存在)
        $channel->queue_declare($queueName, false, false, false, false);
    
        echo ' [*] Waiting for messages. To exit press CTRL+C' . "n";
    
        $callback = function ($message) {
            echo " [x] Received " . $message->body . "n";
            sleep(rand(1,3)); // 模拟处理消息的时间
            echo " [x] Donen";
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // 确认消息已被处理
        };
    
        $channel->basic_qos(null, 1, null); // 每次只处理一条消息
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
    
        while ($channel->is_consuming()) {
            $channel->wait();
        }
    
        $channel->close();
        $conn->close();
    
    } catch (Exception $e) {
        echo 'Caught exception: ',  $e->getMessage(), "n";
    }
    
    ?>

    代码解释:

    • basic_consume(): 从队列中消费消息。
      • 第一个参数:队列名
      • 第二个参数:消费者标签,可以为空字符串。
      • 第三个参数:no_local,是否只接收来自其他消费者的消息,通常设置为 false
      • 第四个参数:no_ack,是否自动确认消息,如果设置为 false,需要手动确认消息。
      • 第五个参数:exclusive,是否独占队列,如果设置为 true,只有当前消费者可以消费该队列的消息。
      • 第六个参数:nowait,是否非阻塞模式,通常设置为 false
      • 第七个参数:callback,回调函数,用于处理接收到的消息。
    • basic_ack(): 手动确认消息已被处理。 如果不确认,RabbitMQ 会认为消息没有被处理,会将消息重新放入队列中。
    • basic_qos(): 设置预取计数。 basic_qos(null, 1, null) 表示每次只预取一条消息,这样可以避免消费者一次性接收太多消息而导致崩溃。

RabbitMQ 核心概念:

概念 描述
Producer 消息生产者,负责创建和发送消息到 RabbitMQ。
Consumer 消息消费者,负责从 RabbitMQ 接收和处理消息。
Message 消息,包含需要传输的数据。
Exchange 交换机,接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列。 RabbitMQ 支持多种交换机类型,包括 direct, fanout, topic 和 headers。
Queue 队列,存储消息,等待消费者消费。
Binding 绑定,将交换机和队列绑定在一起,并指定路由键。 当消息到达交换机时,交换机根据路由键将消息路由到绑定的队列。
Routing Key 路由键,用于将消息路由到特定的队列。 路由键的含义取决于交换机类型。 例如,在 direct 交换机中,路由键必须与队列的绑定键完全匹配才能将消息路由到该队列。
Virtual Host 虚拟主机,用于隔离不同的 RabbitMQ 环境。 可以在同一个 RabbitMQ 服务器上创建多个虚拟主机,每个虚拟主机都有自己的用户、权限和队列。

RabbitMQ 交换机类型:

  • Direct: 完全匹配路由键。
  • Fanout: 将消息发送到所有绑定到该交换机的队列,忽略路由键。
  • Topic: 使用模式匹配路由键,例如 *.orange.* 可以匹配 quick.orange.rabbitlazy.orange.elephant
  • Headers: 根据消息头进行路由,而不是路由键。

三、Kafka:高吞吐量消息队列的王者

Kafka 是一个分布式、高吞吐量的消息队列,由 LinkedIn 开发,后来捐献给 Apache 基金会。 它主要用于构建实时数据管道和流应用程序。

Kafka 的特点是:

  • 高吞吐量: 可以处理大量的消息。
  • 可扩展性: 可以轻松地扩展集群,提高处理能力。
  • 持久化: 消息持久化到磁盘,即使 Kafka 集群重启,消息也不会丢失。
  • 容错性: Kafka 集群具有容错能力,即使部分节点发生故障,系统仍然可以正常运行。

咱们也来个简单的 Kafka 示例:

  1. 安装 Kafka: 这个也略过了,网上教程也很多,根据你的操作系统选择合适的安装方式。 记得还要安装 Zookeeper,Kafka 依赖 Zookeeper 来管理集群。

  2. 安装 PHP Kafka 扩展:

    pecl install rdkafka

    同样,安装完记得重启 PHP-FPM 或者 Apache。

  3. 生产者 (Producer): 发送消息

    <?php
    
    $brokerList = 'localhost:9092';
    $topicName = 'my_topic';
    
    $conf = new RdKafkaConf();
    
    // 设置 broker 列表
    $conf->set('metadata.broker.list', $brokerList);
    
    // 创建生产者
    $producer = new RdKafkaProducer($conf);
    
    // 创建主题
    $topic = $producer->newTopic($topicName);
    
    for ($i = 0; $i < 10; $i++) {
        $message = "Message " . $i;
    
        // 发送消息
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        $producer->poll(0); // 触发消息发送
    
        echo "Sent message: " . $message . "n";
    }
    
    // 等待所有消息发送完成
    $producer->flush(10000);
    
    ?>

    代码解释:

    • RdKafkaConf: 创建 Kafka 配置对象。
    • set('metadata.broker.list', $brokerList): 设置 Kafka broker 列表。
    • RdKafkaProducer: 创建 Kafka 生产者对象。
    • newTopic(): 创建 Kafka 主题对象。
    • produce(): 发送消息到 Kafka 主题。
      • 第一个参数:RD_KAFKA_PARTITION_UA,表示自动选择分区。
      • 第二个参数:消息标志,通常设置为 0
      • 第三个参数:消息内容。
    • poll(): 触发消息发送。
    • flush(): 等待所有消息发送完成。
  4. 消费者 (Consumer): 接收消息

    <?php
    
    $brokerList = 'localhost:9092';
    $topicName = 'my_topic';
    $groupId = 'my_group';
    
    $conf = new RdKafkaConf();
    
    // 设置 broker 列表
    $conf->set('metadata.broker.list', $brokerList);
    
    // 设置消费者组 ID
    $conf->set('group.id', $groupId);
    
    // 设置自动提交偏移量的时间间隔
    $conf->set('enable.auto.offset.store', 'true');
    $conf->set('auto.offset.store.interval.ms', '60000');
    
    // 设置当没有 offset 时,从 earliest 开始消费
    $conf->set('auto.offset.reset', 'earliest');
    
    $consumer = new RdKafkaKafkaConsumer($conf);
    
    $consumer->subscribe([$topicName]);
    
    echo "Waiting for messages...n";
    
    while (true) {
        $message = $consumer->consume(120000); // 超时时间 2 分钟
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo "Received message: " . $message->payload . "n";
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for moren";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed outn";
                break;
            default:
                throw new Exception($message->errstr(), $message->err);
                break;
        }
    }
    
    ?>

    代码解释:

    • RdKafkaKafkaConsumer: 创建 Kafka 消费者对象。
    • set('group.id', $groupId): 设置消费者组 ID。 同一个消费者组的消费者共同消费一个主题的消息,每个消费者消费主题的一部分分区。
    • set('auto.offset.reset', 'earliest'): 设置当没有 offset 时,从 earliest 开始消费。
    • subscribe(): 订阅 Kafka 主题。
    • consume(): 从 Kafka 主题消费消息。 参数是超时时间,单位是毫秒。

Kafka 核心概念:

概念 描述
Broker Kafka 集群中的服务器。
Topic 主题,用于组织消息。 每个主题可以有多个分区。
Partition 分区,将主题分割成多个部分,每个分区可以存储在不同的 Broker 上。 分区是 Kafka 并行处理的基本单元。
Offset 偏移量,用于标识分区中的每条消息。 消费者使用偏移量来跟踪消费进度。
Consumer Group 消费者组,一组消费者共同消费一个主题的消息。 每个消费者组消费主题的所有分区,每个分区只能被同一个消费者组的一个消费者消费。 Kafka 使用消费者组来实现负载均衡和容错。
Zookeeper Kafka 依赖 Zookeeper 来管理集群配置、协调 Broker 和消费者。 Zookeeper 存储 Kafka 的元数据,例如主题、分区、Broker 的信息。

四、RabbitMQ vs Kafka:选哪个?

这就像问:吃米饭还是吃面条? 都好吃,关键看你饿不饿,想吃啥。

特性 RabbitMQ Kafka
架构 传统的 message broker 分布式流处理平台
协议 AMQP 自有协议
吞吐量 中等 非常高
延迟 较低 较高
用途 任务队列、消息路由、异步通信 大数据流处理、日志收集、实时分析
消息模型 推送 (Push) 拉取 (Pull)
持久化 支持消息持久化 强制消息持久化
复杂性 相对简单,易于上手 相对复杂,需要更多配置和管理
适用场景 对延迟敏感、需要复杂路由规则的场景 对吞吐量要求高、需要持久化存储大量数据的场景

简单总结:

  • RabbitMQ: 适合轻量级的消息队列,例如任务队列、异步通信。 如果你的系统对延迟要求很高,并且需要复杂的路由规则,那么 RabbitMQ 是一个不错的选择。
  • Kafka: 适合高吞吐量的数据流处理,例如日志收集、实时分析。 如果你的系统需要处理大量的消息,并且需要持久化存储这些消息,那么 Kafka 是一个更好的选择。

五、PHP 中使用消息队列的注意事项

  • 消息序列化: 在发送消息之前,需要将消息序列化成字符串,例如 JSON 或者 PHP 的 serialize() 函数。 在接收消息之后,需要将消息反序列化成 PHP 对象或者数组。
  • 消息格式: 定义清晰的消息格式,方便消费者解析消息。 例如,可以使用 JSON 格式,并定义固定的字段。
  • 错误处理: 在生产者和消费者中,都需要进行错误处理,例如连接失败、消息发送失败、消息消费失败。
  • 消息确认: 消费者需要手动确认消息已被处理,避免消息丢失。
  • 连接管理: 合理管理连接,避免连接泄漏。 可以使用连接池来复用连接。
  • 监控: 监控消息队列的运行状态,例如队列长度、消息积压情况、消费者消费速度。

六、代码示例:削峰填谷

假设你的网站有一个注册功能,用户注册后需要发送验证邮件。 如果用户注册量很大,每次注册都发送邮件可能会导致邮件服务器压力过大。 可以使用消息队列来削峰填谷。

  1. 生产者 (注册流程):

    <?php
    
    // 注册用户
    $user = registerUser($_POST);
    
    // 将发送邮件的任务放入消息队列
    $message = [
        'user_id' => $user['id'],
        'email' => $user['email'],
        'subject' => 'Welcome!',
        'body' => 'Please verify your email address by clicking the link below.'
    ];
    
    // 使用 RabbitMQ 或者 Kafka 发送消息
    sendMessageToQueue('send_email_queue', json_encode($message));
    
    // 提示用户注册成功
    echo "Registration successful. Please check your email to verify your account.";
    
    ?>
  2. 消费者 (邮件发送服务):

    <?php
    
    // 从消息队列中获取发送邮件的任务
    $message = receiveMessageFromQueue('send_email_queue');
    
    // 解析消息
    $data = json_decode($message, true);
    
    // 发送邮件
    sendEmail($data['email'], $data['subject'], $data['body']);
    
    // 记录日志
    logMessage("Email sent to " . $data['email']);
    
    // 确认消息已被处理
    acknowledgeMessage($message);
    
    ?>

七、总结

今天我们聊了 PHP 中使用 RabbitMQ 和 Kafka 这两个消息队列大佬,从概念到代码,希望对大家有所帮助。 记住,选择哪个消息队列取决于你的具体需求。 没有最好的,只有最适合的。

消息队列是一个强大的工具,可以帮助你构建高可用、高并发、可扩展的系统。 希望大家能够熟练掌握消息队列的使用,并在实际项目中灵活运用。

最后,祝大家 Bug 越来越少,代码越来越优雅! 下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注