咳咳,各位观众老爷们,大家好!今天咱们来聊聊PHP里玩儿RabbitMQ这事儿,保证让大家听完之后,感觉自己也能当个“兔子养殖户”!
咱们今天的主题是“PHP RabbitMQ
AMQP
协议深度:交换机、队列与绑定关系”。 换句话说,就是搞清楚RabbitMQ里面那些重要的零件儿,以及它们之间是怎么勾搭上的。
开场白:为啥要用RabbitMQ?
在开始之前,先简单聊聊为啥我们要用RabbitMQ。 想象一下,你有一个网站,用户注册的时候,你要发邮件、发短信、记录日志等等。如果每个操作都同步执行,那用户不得等到花儿都谢了?
这时候,消息队列就派上用场了。 我们可以把这些任务都扔到消息队列里,然后让其他的程序(消费者)慢慢去处理。 这样,用户注册的时候就能秒开,用户体验蹭蹭往上涨!
RabbitMQ就是个非常流行的消息队列,它实现了AMQP(Advanced Message Queuing Protocol)协议,所以我们才能用PHP愉快地跟它玩耍。
第一部分:AMQP协议核心概念
AMQP协议定义了消息队列系统里各个组件的标准,咱们来看看几个关键的概念:
- Producer(生产者): 负责产生消息,就像种菜的农民伯伯。
- Consumer(消费者): 负责消费消息,就像吃菜的吃货。
- Message(消息): 就是生产者生产出来的东西,也就是要传递的数据。
- Exchange(交换机): 接收生产者发来的消息,然后根据一定的规则把消息路由到一个或多个队列。它就像一个邮局,负责分发信件。
- Queue(队列): 存储消息的地方,消费者从这里取走消息进行处理。 就像一个收件箱,存放着等待处理的邮件。
- Binding(绑定): 定义了交换机和队列之间的关系,告诉交换机应该把哪些消息发送到哪个队列。 就像邮局告诉你,哪些信件应该放到哪个收件箱。
这几个概念是RabbitMQ的核心,理解了它们,才能更好地使用RabbitMQ。
第二部分:交换机(Exchange)
交换机是 RabbitMQ 的核心组件之一,它负责接收生产者发送的消息,并根据不同的规则将消息路由到一个或多个队列。
交换机的类型
RabbitMQ 定义了几种不同类型的交换机,每种交换机都有不同的路由规则:
交换机类型 | 路由规则 | 适用场景 |
---|---|---|
direct |
精确匹配。 消息的 routing key 必须与绑定到交换机的 binding key 完全一致,消息才会被路由到对应的队列。 |
需要精确匹配的场景,例如根据消息类型进行路由。 |
topic |
模糊匹配。 routing key 和 binding key 都需要使用通配符。 * 代表一个单词, # 代表零个或多个单词。 例如,routing key 为 user.created.email ,binding key 为 user.*.email ,则消息会被路由到对应的队列。 |
需要根据消息主题进行路由的场景,例如根据日志级别进行路由。 |
fanout |
广播模式。 忽略 routing key ,将消息发送到所有绑定到该交换机的队列。 |
需要将消息广播给所有消费者的场景,例如发送系统通知。 |
headers |
根据消息的 headers 进行路由。 可以设置多个 header 键值对,只有当消息的 headers 包含所有绑定的 header 键值对时,消息才会被路由到对应的队列。 |
需要根据消息的属性进行路由的场景,例如根据消息的优先级进行路由。 |
consistent hash |
根据消息的 routing key 进行哈希计算,然后将消息路由到对应的队列。这种交换器类型需要安装插件才能使用,它主要用于负载均衡。 |
在需要将消息均匀分布到多个消费者的情况下使用,例如处理大量任务。 |
PHP代码示例:创建交换机
这里我们使用 php-amqplib
这个库来操作 RabbitMQ。
首先,需要安装这个库:
composer require php-amqplib/php-amqplib
然后,就可以开始写代码了:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;
$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';
$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();
// 交换机名称
$exchangeName = 'my_exchange';
// 交换机类型
$exchangeType = 'direct';
// 是否持久化
$exchangeDurable = true;
// 是否自动删除
$exchangeAutoDelete = false;
// 是否内部交换机
$exchangeInternal = false;
// 交换机参数
$exchangeArguments = [];
// 交换机备用交换机
$exchangeTicket = null;
/*
* bool $passive = false,
* bool $durable = false,
* string $type = 'direct',
* bool $auto_delete = false,
* bool $internal = false,
* bool $nowait = false,
* array $arguments = [],
* int|null $ticket = null
*/
$channel->exchange_declare(
$exchangeName,
$exchangeType,
$exchangePassive = false,
$exchangeDurable,
$exchangeAutoDelete,
$exchangeInternal,
$exchangeNowait = false,
$exchangeArguments,
$exchangeTicket
);
echo "Exchange '$exchangeName' created.n";
$channel->close();
$connection->close();
?>
这段代码创建了一个名为 my_exchange
的 direct
类型的交换机。 $exchangeDurable = true
表示这个交换机是持久化的,也就是说,即使 RabbitMQ 服务器重启,这个交换机也不会消失。
第三部分:队列(Queue)
队列是 RabbitMQ 中存储消息的地方,消费者从队列中获取消息进行处理。
队列的属性
- Name(名称): 队列的唯一标识符。
- Durable(持久化): 是否将队列持久化到磁盘,即使 RabbitMQ 服务器重启,队列也不会消失。
- Exclusive(独占): 是否只允许一个消费者连接到该队列。
- Auto Delete(自动删除): 当最后一个消费者断开连接时,是否自动删除该队列。
PHP代码示例:创建队列
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;
$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';
$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();
// 队列名称
$queueName = 'my_queue';
// 是否持久化
$queueDurable = true;
// 是否独占
$queueExclusive = false;
// 是否自动删除
$queueAutoDelete = false;
/**
* name,
* passive,
* durable,
* exclusive,
* auto_delete,
* nowait,
* arguments,
* ticket
*/
$channel->queue_declare(
$queueName,
$queuePassive = false,
$queueDurable,
$queueExclusive,
$queueAutoDelete,
$queueNoWait = false,
$queueArguments = [],
$queueTicket = null
);
echo "Queue '$queueName' created.n";
$channel->close();
$connection->close();
?>
这段代码创建了一个名为 my_queue
的队列,并且设置了 $queueDurable = true
,表示队列是持久化的。
第四部分:绑定(Binding)
绑定定义了交换机和队列之间的关系,告诉交换机应该把哪些消息发送到哪个队列。
Binding Key
Binding Key 是绑定关系中的一个重要参数,它用于指定交换机应该根据什么规则将消息路由到队列。 不同的交换机类型对 Binding Key 的解释不同:
direct
交换机:Binding Key 必须与消息的 Routing Key 完全一致。topic
交换机:Binding Key 可以使用通配符进行模糊匹配。fanout
交换机:忽略 Binding Key。headers
交换机:根据消息的 headers 进行匹配。
PHP代码示例:创建绑定
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;
$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';
$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();
// 交换机名称
$exchangeName = 'my_exchange';
// 队列名称
$queueName = 'my_queue';
// Routing Key
$routingKey = 'my_routing_key';
/**
* queue,
* exchange,
* routing_key,
* nowait,
* arguments,
* ticket
*/
$channel->queue_bind(
$queueName,
$exchangeName,
$routingKey
);
echo "Queue '$queueName' bind to exchange '$exchangeName' with routing key '$routingKey'.n";
$channel->close();
$connection->close();
?>
这段代码将队列 my_queue
绑定到交换机 my_exchange
,并指定了 Routing Key 为 my_routing_key
。 这意味着,只有当消息的 Routing Key 为 my_routing_key
时,消息才会被路由到队列 my_queue
。
第五部分:生产者(Producer)
生产者负责产生消息,并将消息发送到交换机。
PHP代码示例:生产者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;
use PhpAmqpLibMessageAMQPMessage;
$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';
$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();
// 交换机名称
$exchangeName = 'my_exchange';
// Routing Key
$routingKey = 'my_routing_key';
// 消息内容
$messageBody = 'Hello, RabbitMQ!';
/**
* body,
* props = []
*/
$message = new AMQPMessage($messageBody, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
/**
* msg,
* exchange = '',
* routing_key = '',
* mandatory = false,
* immediate = false,
* ticket = null
*/
$channel->basic_publish(
$message,
$exchangeName,
$routingKey
);
echo " [x] Sent '$messageBody'n";
$channel->close();
$connection->close();
?>
这段代码创建了一个消息,并将其发送到交换机 my_exchange
,并指定了 Routing Key 为 my_routing_key
。 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
表示消息是持久化的,也就是说,即使 RabbitMQ 服务器重启,消息也不会丢失。
第六部分:消费者(Consumer)
消费者负责从队列中获取消息,并进行处理。
PHP代码示例:消费者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibChannelAMQPChannel;
$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';
$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();
// 队列名称
$queueName = 'my_queue';
echo " [*] Waiting for messages. To exit press CTRL+Cn";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "n";
// 确认消息已被处理
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
/**
* queue,
* consumer_tag = '',
* no_local = false,
* no_ack = false,
* exclusive = false,
* nowait = false,
* callback = null,
* ticket = null,
* arguments = []
*/
$channel->basic_consume(
$queueName,
$consumerTag = '',
$noLocal = false,
$noAck = false,
$exclusive = false,
$noWait = false,
$callback
);
/**
* @throws PhpAmqpLibExceptionAMQPTimeoutException
*/
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
这段代码从队列 my_queue
中获取消息,并在控制台输出消息内容。 $channel->basic_ack($msg->delivery_info['delivery_tag'])
表示确认消息已被处理,RabbitMQ 服务器会将消息从队列中删除。 如果没有调用 basic_ack
方法,RabbitMQ 服务器会认为消息没有被处理,当消费者断开连接时,会将消息重新放回队列中,等待下次被消费。
第七部分:实战演练:用户注册
现在,我们来做一个简单的实战演练:用户注册。
当用户注册成功后,我们需要发送欢迎邮件和短信。 我们可以使用 RabbitMQ 来异步处理这些任务。
-
创建交换机和队列
// 创建一个 fanout 类型的交换机,用于广播消息 $channel->exchange_declare('user_register_exchange', 'fanout', false, true, false); // 创建两个队列,分别用于处理邮件和短信 $channel->queue_declare('user_register_email_queue', false, true, false, false); $channel->queue_declare('user_register_sms_queue', false, true, false, false); // 将队列绑定到交换机 $channel->queue_bind('user_register_email_queue', 'user_register_exchange'); $channel->queue_bind('user_register_sms_queue', 'user_register_exchange');
-
生产者:用户注册
// 用户注册成功后,发送消息到交换机 $messageBody = json_encode(['user_id' => 123, 'email' => '[email protected]', 'phone' => '13800000000']); $message = new AMQPMessage($messageBody, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $channel->basic_publish($message, 'user_register_exchange');
-
消费者:发送邮件
// 从队列中获取消息,发送邮件 $callback = function ($msg) { $data = json_decode($msg->body, true); // 发送邮件的逻辑 echo "Sending email to " . $data['email'] . "n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_consume('user_register_email_queue', '', false, false, false, false, $callback);
-
消费者:发送短信
// 从队列中获取消息,发送短信 $callback = function ($msg) { $data = json_decode($msg->body, true); // 发送短信的逻辑 echo "Sending SMS to " . $data['phone'] . "n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_consume('user_register_sms_queue', '', false, false, false, false, $callback);
这样,当用户注册成功后,就会分别向 user_register_email_queue
和 user_register_sms_queue
发送一条消息,然后两个消费者分别处理邮件和短信的发送。
总结
今天我们学习了 RabbitMQ 的核心概念:交换机、队列和绑定关系。 我们还通过代码示例演示了如何创建交换机、队列和绑定关系,以及如何编写生产者和消费者。
掌握了这些知识,你就可以开始用 RabbitMQ 来构建高性能、高可靠性的应用了!
记住,多实践才能真正掌握,赶紧动手试试吧! 下课!