PHP `RabbitMQ` `AMQP` 协议深度:交换机、队列与绑定关系

咳咳,各位观众老爷们,大家好!今天咱们来聊聊PHP里玩儿RabbitMQ这事儿,保证让大家听完之后,感觉自己也能当个“兔子养殖户”!

咱们今天的主题是“PHP RabbitMQ AMQP 协议深度:交换机、队列与绑定关系”。 换句话说,就是搞清楚RabbitMQ里面那些重要的零件儿,以及它们之间是怎么勾搭上的。

开场白:为啥要用RabbitMQ?

在开始之前,先简单聊聊为啥我们要用RabbitMQ。 想象一下,你有一个网站,用户注册的时候,你要发邮件、发短信、记录日志等等。如果每个操作都同步执行,那用户不得等到花儿都谢了?

这时候,消息队列就派上用场了。 我们可以把这些任务都扔到消息队列里,然后让其他的程序(消费者)慢慢去处理。 这样,用户注册的时候就能秒开,用户体验蹭蹭往上涨!

RabbitMQ就是个非常流行的消息队列,它实现了AMQP(Advanced Message Queuing Protocol)协议,所以我们才能用PHP愉快地跟它玩耍。

第一部分:AMQP协议核心概念

AMQP协议定义了消息队列系统里各个组件的标准,咱们来看看几个关键的概念:

  • Producer(生产者): 负责产生消息,就像种菜的农民伯伯。
  • Consumer(消费者): 负责消费消息,就像吃菜的吃货。
  • Message(消息): 就是生产者生产出来的东西,也就是要传递的数据。
  • Exchange(交换机): 接收生产者发来的消息,然后根据一定的规则把消息路由到一个或多个队列。它就像一个邮局,负责分发信件。
  • Queue(队列): 存储消息的地方,消费者从这里取走消息进行处理。 就像一个收件箱,存放着等待处理的邮件。
  • Binding(绑定): 定义了交换机和队列之间的关系,告诉交换机应该把哪些消息发送到哪个队列。 就像邮局告诉你,哪些信件应该放到哪个收件箱。

这几个概念是RabbitMQ的核心,理解了它们,才能更好地使用RabbitMQ。

第二部分:交换机(Exchange)

交换机是 RabbitMQ 的核心组件之一,它负责接收生产者发送的消息,并根据不同的规则将消息路由到一个或多个队列。

交换机的类型

RabbitMQ 定义了几种不同类型的交换机,每种交换机都有不同的路由规则:

交换机类型 路由规则 适用场景
direct 精确匹配。 消息的 routing key 必须与绑定到交换机的 binding key 完全一致,消息才会被路由到对应的队列。 需要精确匹配的场景,例如根据消息类型进行路由。
topic 模糊匹配。 routing keybinding key 都需要使用通配符。 * 代表一个单词, # 代表零个或多个单词。 例如,routing keyuser.created.emailbinding keyuser.*.email,则消息会被路由到对应的队列。 需要根据消息主题进行路由的场景,例如根据日志级别进行路由。
fanout 广播模式。 忽略 routing key,将消息发送到所有绑定到该交换机的队列。 需要将消息广播给所有消费者的场景,例如发送系统通知。
headers 根据消息的 headers 进行路由。 可以设置多个 header 键值对,只有当消息的 headers 包含所有绑定的 header 键值对时,消息才会被路由到对应的队列。 需要根据消息的属性进行路由的场景,例如根据消息的优先级进行路由。
consistent hash 根据消息的 routing key 进行哈希计算,然后将消息路由到对应的队列。这种交换器类型需要安装插件才能使用,它主要用于负载均衡。 在需要将消息均匀分布到多个消费者的情况下使用,例如处理大量任务。

PHP代码示例:创建交换机

这里我们使用 php-amqplib 这个库来操作 RabbitMQ。

首先,需要安装这个库:

composer require php-amqplib/php-amqplib

然后,就可以开始写代码了:

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;

$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';

$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();

// 交换机名称
$exchangeName = 'my_exchange';
// 交换机类型
$exchangeType = 'direct';
// 是否持久化
$exchangeDurable = true;
// 是否自动删除
$exchangeAutoDelete = false;
// 是否内部交换机
$exchangeInternal = false;
// 交换机参数
$exchangeArguments = [];
// 交换机备用交换机
$exchangeTicket = null;

/*
 *  bool $passive = false,
 *  bool $durable = false,
 *  string $type = 'direct',
 *  bool $auto_delete = false,
 *  bool $internal = false,
 *  bool $nowait = false,
 *  array $arguments = [],
 *  int|null $ticket = null
 */
$channel->exchange_declare(
    $exchangeName,
    $exchangeType,
    $exchangePassive = false,
    $exchangeDurable,
    $exchangeAutoDelete,
    $exchangeInternal,
    $exchangeNowait = false,
    $exchangeArguments,
    $exchangeTicket
);

echo "Exchange '$exchangeName' created.n";

$channel->close();
$connection->close();

?>

这段代码创建了一个名为 my_exchangedirect 类型的交换机。 $exchangeDurable = true 表示这个交换机是持久化的,也就是说,即使 RabbitMQ 服务器重启,这个交换机也不会消失。

第三部分:队列(Queue)

队列是 RabbitMQ 中存储消息的地方,消费者从队列中获取消息进行处理。

队列的属性

  • Name(名称): 队列的唯一标识符。
  • Durable(持久化): 是否将队列持久化到磁盘,即使 RabbitMQ 服务器重启,队列也不会消失。
  • Exclusive(独占): 是否只允许一个消费者连接到该队列。
  • Auto Delete(自动删除): 当最后一个消费者断开连接时,是否自动删除该队列。

PHP代码示例:创建队列

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;

$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';

$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();

// 队列名称
$queueName = 'my_queue';
// 是否持久化
$queueDurable = true;
// 是否独占
$queueExclusive = false;
// 是否自动删除
$queueAutoDelete = false;

/**
 * name,
 * passive,
 * durable,
 * exclusive,
 * auto_delete,
 * nowait,
 * arguments,
 * ticket
 */
$channel->queue_declare(
    $queueName,
    $queuePassive = false,
    $queueDurable,
    $queueExclusive,
    $queueAutoDelete,
    $queueNoWait = false,
    $queueArguments = [],
    $queueTicket = null
);

echo "Queue '$queueName' created.n";

$channel->close();
$connection->close();

?>

这段代码创建了一个名为 my_queue 的队列,并且设置了 $queueDurable = true,表示队列是持久化的。

第四部分:绑定(Binding)

绑定定义了交换机和队列之间的关系,告诉交换机应该把哪些消息发送到哪个队列。

Binding Key

Binding Key 是绑定关系中的一个重要参数,它用于指定交换机应该根据什么规则将消息路由到队列。 不同的交换机类型对 Binding Key 的解释不同:

  • direct 交换机:Binding Key 必须与消息的 Routing Key 完全一致。
  • topic 交换机:Binding Key 可以使用通配符进行模糊匹配。
  • fanout 交换机:忽略 Binding Key。
  • headers 交换机:根据消息的 headers 进行匹配。

PHP代码示例:创建绑定

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;

$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';

$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();

// 交换机名称
$exchangeName = 'my_exchange';
// 队列名称
$queueName = 'my_queue';
// Routing Key
$routingKey = 'my_routing_key';

/**
 * queue,
 * exchange,
 * routing_key,
 * nowait,
 * arguments,
 * ticket
 */
$channel->queue_bind(
    $queueName,
    $exchangeName,
    $routingKey
);

echo "Queue '$queueName' bind to exchange '$exchangeName' with routing key '$routingKey'.n";

$channel->close();
$connection->close();

?>

这段代码将队列 my_queue 绑定到交换机 my_exchange,并指定了 Routing Key 为 my_routing_key。 这意味着,只有当消息的 Routing Key 为 my_routing_key 时,消息才会被路由到队列 my_queue

第五部分:生产者(Producer)

生产者负责产生消息,并将消息发送到交换机。

PHP代码示例:生产者

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;
use PhpAmqpLibMessageAMQPMessage;

$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';

$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();

// 交换机名称
$exchangeName = 'my_exchange';
// Routing Key
$routingKey = 'my_routing_key';
// 消息内容
$messageBody = 'Hello, RabbitMQ!';

/**
 * body,
 * props = []
 */
$message = new AMQPMessage($messageBody, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);

/**
 * msg,
 * exchange = '',
 * routing_key = '',
 * mandatory = false,
 * immediate = false,
 * ticket = null
 */
$channel->basic_publish(
    $message,
    $exchangeName,
    $routingKey
);

echo " [x] Sent '$messageBody'n";

$channel->close();
$connection->close();

?>

这段代码创建了一个消息,并将其发送到交换机 my_exchange,并指定了 Routing Key 为 my_routing_key'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT 表示消息是持久化的,也就是说,即使 RabbitMQ 服务器重启,消息也不会丢失。

第六部分:消费者(Consumer)

消费者负责从队列中获取消息,并进行处理。

PHP代码示例:消费者

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;

$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';

$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();

// 队列名称
$queueName = 'my_queue';

echo " [*] Waiting for messages. To exit press CTRL+Cn";

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "n";
    // 确认消息已被处理
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

/**
 * queue,
 * consumer_tag = '',
 * no_local = false,
 * no_ack = false,
 * exclusive = false,
 * nowait = false,
 * callback = null,
 * ticket = null,
 * arguments = []
 */
$channel->basic_consume(
    $queueName,
    $consumerTag = '',
    $noLocal = false,
    $noAck = false,
    $exclusive = false,
    $noWait = false,
    $callback
);

/**
 * @throws PhpAmqpLibExceptionAMQPTimeoutException
 */
while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

这段代码从队列 my_queue 中获取消息,并在控制台输出消息内容。 $channel->basic_ack($msg->delivery_info['delivery_tag']) 表示确认消息已被处理,RabbitMQ 服务器会将消息从队列中删除。 如果没有调用 basic_ack 方法,RabbitMQ 服务器会认为消息没有被处理,当消费者断开连接时,会将消息重新放回队列中,等待下次被消费。

第七部分:实战演练:用户注册

现在,我们来做一个简单的实战演练:用户注册。

当用户注册成功后,我们需要发送欢迎邮件和短信。 我们可以使用 RabbitMQ 来异步处理这些任务。

  1. 创建交换机和队列

    // 创建一个 fanout 类型的交换机,用于广播消息
    $channel->exchange_declare('user_register_exchange', 'fanout', false, true, false);
    
    // 创建两个队列,分别用于处理邮件和短信
    $channel->queue_declare('user_register_email_queue', false, true, false, false);
    $channel->queue_declare('user_register_sms_queue', false, true, false, false);
    
    // 将队列绑定到交换机
    $channel->queue_bind('user_register_email_queue', 'user_register_exchange');
    $channel->queue_bind('user_register_sms_queue', 'user_register_exchange');
  2. 生产者:用户注册

    // 用户注册成功后,发送消息到交换机
    $messageBody = json_encode(['user_id' => 123, 'email' => '[email protected]', 'phone' => '13800000000']);
    $message = new AMQPMessage($messageBody, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    $channel->basic_publish($message, 'user_register_exchange');
  3. 消费者:发送邮件

    // 从队列中获取消息,发送邮件
    $callback = function ($msg) {
        $data = json_decode($msg->body, true);
        // 发送邮件的逻辑
        echo "Sending email to " . $data['email'] . "n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    $channel->basic_consume('user_register_email_queue', '', false, false, false, false, $callback);
  4. 消费者:发送短信

    // 从队列中获取消息,发送短信
    $callback = function ($msg) {
        $data = json_decode($msg->body, true);
        // 发送短信的逻辑
        echo "Sending SMS to " . $data['phone'] . "n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    $channel->basic_consume('user_register_sms_queue', '', false, false, false, false, $callback);

这样,当用户注册成功后,就会分别向 user_register_email_queueuser_register_sms_queue 发送一条消息,然后两个消费者分别处理邮件和短信的发送。

总结

今天我们学习了 RabbitMQ 的核心概念:交换机、队列和绑定关系。 我们还通过代码示例演示了如何创建交换机、队列和绑定关系,以及如何编写生产者和消费者。

掌握了这些知识,你就可以开始用 RabbitMQ 来构建高性能、高可靠性的应用了!

记住,多实践才能真正掌握,赶紧动手试试吧! 下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注