各位好,我是你们的老朋友,一个每天都在跟代码死磕的PHP老码农。
今天咱们不聊什么优雅的设计模式,也不扯那些高深莫测的微服务架构。咱们来聊聊一个非常实在、非常粗暴,但又是绝大多数互联网公司保命的绝招——消息队列(MQ)。
特别是当你的服务器CPU飙到100%,数据库连不上,客户在群里疯狂艾特你的时候,你手里要是没有一把“削峰填谷”的刀,那你这顿骂是跑不掉的。
一、 现实总是很骨感:当流量像洪水猛兽
想象一下这个场景:双11零点。
后台的服务器本来是个文质彬彬的程序员(平时运行得挺稳),突然被推上了前台。一瞬间,十万个请求“嗖嗖嗖”地涌进来,全是“我要买这个!我要买那个!”。
这时候,如果你的架构是这样的:
浏览器 -> PHP代码 -> 直接写MySQL
好家伙,这就像是开了一家只有一个小窗口的饭馆。大门一开,十万个顾客同时涌进来,只有一个小厨师。你让小厨师怎么操作?他手忙脚乱,切菜切到手了,炒菜把锅炸了。数据库呢?它本来正在悠闲地哼着小曲儿,突然被连了十万次,它直接气哭了(CPU 100%,连接超时)。
这就是高并发带来的“雪崩”效应。流量就是那个“峰”,数据库就是那个脆弱的“谷”。峰太高,谷填满了,水(数据)就溢出来了,服务器就崩了。
消息队列(MQ)是干嘛的?
它就是那个“外卖窗口”。
当成千上万的顾客(请求)涌进来,服务员(PHP)不直接把菜(数据)端到厨房(数据库),而是先把单子写在一个小本子上(MQ),然后告诉顾客:“请坐,把单子放在这,厨房做好了会叫你。”
这样,顾客虽然排起了长队(消息队列),但厨房(数据库)可以有条不紊地做菜,不会乱套。这就是削峰。
等到厨房做完菜了,再慢慢叫号(消费者)。这就把瞬间的“洪峰”变成了平缓的“流量”,这就是填谷。
二、 选对武器:Redis 还是 RabbitMQ?
在PHP的世界里,MQ就像菜刀,有好几种:瑞士军刀(Redis)、大砍刀(RabbitMQ)、餐刀(Kafka——虽然PHP对Kafka支持不如Java好,但也用)。
咱们今天重点讲PHP最常用的两个:Redis 和 RabbitMQ。
1. Redis:轻量级外卖窗口
Redis是个内存大户,快是它最大的优点,死也是它最大的缺点。它适合那种“活不过一秒”的消息,比如验证码发送、简单的通知。
2. RabbitMQ:稳重派大厨
RabbitMQ是基于AMQP协议的,它更稳健,支持消息确认、持久化、死信队列。它就像一个经验丰富的特级厨师,哪怕电闪雷鸣,他也能把菜做出来。适合电商订单、复杂的业务流程。
咱们先从最简单的开始,用PHP + Redis来打个样。
三、 实战一:用 PHP + Redis 搞定“秒杀”
首先,你得有个Redis。如果你没有,你可以用Docker搞一个,或者买个云Redis。咱们假设你已经连上了。
1. 生产者:把单子写上去
生产者很简单,就是把用户的请求扔进队列。
<?php
// 生产者脚本: producer.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->select(0); // 选择数据库0
$userId = 10086;
$productId = 'Iphone15'; // 商品ID
// 模拟高并发场景,这里其实可以用多进程跑
for ($i = 0; $i < 100; $i++) {
$message = json_encode([
'user_id' => $userId + $i,
'product_id' => $productId,
'create_time' => time()
]);
// LPUSH:把消息加到队列最左边(头部)
// LPOP是从右边(尾部)取。谁快用谁,或者按顺序来。
// 这里我们用 LPUSH 假设是“最新订单”
$result = $redis->lPush('order_queue', $message);
if ($result) {
echo "成功扔进单子: {$message}n";
} else {
echo "排队太长了!服务器要崩了!n";
}
}
看,这就很Python了,简单粗暴。这一步没有任何IO阻塞(除了网络连接),哪怕你并发1万次,Redis也能瞬间处理完,然后返回“OK”。数据库此时还在喝咖啡呢。
2. 消费者:慢慢做菜
这时候,你需要一个一直在跑的脚本,不停地从队列里取单子,处理业务,最后写数据库。
注意: PHP是单线程脚本,脚本一跑完,进程就死了。所以消费者必须是一个死循环。
<?php
// 消费者脚本: consumer.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
echo "我是勤劳的小蜜蜂,开始干活了...n";
while (true) {
// BRPOP: Block Pop Right. 从列表右边阻塞弹出。
// 如果队列为空,它会“阻塞”在这里,不消耗CPU,也不报错。
// 一直等到有新消息进来了,才唤醒,然后返回消息。
// timeout参数是秒数,0代表永久阻塞。
$result = $redis->brPop('order_queue', 0);
// $result 是一个数组 [队列名, 消息内容]
if ($result) {
$msg = json_decode($result[1], true);
processOrder($msg);
}
}
function processOrder($order) {
echo "开始处理订单: 用户ID {$order['user_id']} 买 {$order['product_id']}n";
// 这里才是真正的业务逻辑
// 查库存、扣钱、写数据库
// 假设这里写数据库很慢
simulateDbWrite();
}
function simulateDbWrite() {
// 模拟数据库IO,耗时1秒
sleep(1);
echo " -> 数据库写入成功!n";
}
这就是核心逻辑:brPop。它把阻塞权交给了操作系统,让CPU去休息。当生产者推入消息时,消费者瞬间被唤醒。
四、 实战二:RabbitMQ 的优雅之路
用Redis虽然快,但它是内存操作,万一Redis崩了,消息全丢。RabbitMQ才是正经的“大厂”做法。
用RabbitMQ,你得多装一个软件。你可以用Docker:docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management。
1. 环境准备
你需要安装 php-amqplib 这个库。Composer安装一下:
composer require php-amqplib/php-amqplib
2. 生产者:发布消息
RabbitMQ有交换机、队列、路由的概念。咱们用最简单的直连交换机。
<?php
// RabbitMQ 生产者: rabbit_producer.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
// 1. 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 2. 声明队列(确保队列存在)
// durable=true 表示队列会持久化,重启不丢失
$channel->queue_declare('task_queue', false, true, false, false);
// 3. 发送消息
$data = json_encode([
'msg_id' => uniqid(),
'user_id' => 888,
'product' => 'Super iPhone',
'payload' => 'Loading...'
]);
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 消息持久化
]);
// 4. 发布到队列
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent $datan";
$channel->close();
$connection->close();
注意 delivery_mode,这个属性告诉RabbitMQ:“哥,这单子贵重,给我存硬盘里,别断电丢了。”
3. 消费者:死磕到底
RabbitMQ的消费者处理失败时,不能就这么扔了,得告诉MQ:“哥们,这个我处理不了,你给我退回去,或者扔进死信箱。”
<?php
// RabbitMQ 消费者: rabbit_consumer.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
// 连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+Cn";
// 回调函数
$callback = function ($msg) {
$data = $msg->body;
echo " [x] Received {$data}n";
// 模拟处理业务
// $msg->delivery_info['channel'] ...
// 这里加个随机数模拟偶尔的失败
if (rand(1, 10) == 5) {
throw new Exception("系统繁忙,处理失败!");
}
// 核心点:手动ACK(确认)
// 如果不调用这个,消息会被MQ认为是未处理状态,一直重发。
// 如果我们处理成功了,才调用 ack()。
$msg->ack();
};
// no_ack = false (默认就是false)
// 表示我们需要手动确认
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
// 死循环监听
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
上面的代码里,$msg->ack() 是灵魂。一旦你的PHP脚本因为内存溢出崩了,或者抛出了异常,这条消息就会被RabbitMQ重新放回队列,后面排队的消费者会接着处理它。
五、 深入:如何避免“重复消费”和“消息丢失”?
这里有个大坑。
场景: 你正在处理一个发红包的任务。MQ发送了一条消息:“给用户A发1块钱”。你的PHP脚本拿到消息,数据库写入了1块钱。结果,你的PHP代码还没来得及调用 ack(),你的机器突然断电了(或者你重启了脚本)。
后果: MQ觉得你没收到,把消息又发了一次。你的数据库里,用户A多了1块钱。
这叫消息重复。
解决方案:
-
做幂等性设计: 这是最根本的。你在写数据库的时候,要在SQL里加上唯一约束,或者在业务逻辑里判断。比如:
-- 尝试插入一条记录,如果user_id已经存在,就报错回滚 INSERT INTO bonus_log (user_id, amount, order_no) VALUES (10086, 1, 'msg_12345');或者代码里:
if (db->isExist('msg_12345')) { return; // 重复消息,直接丢弃 } db->insert(...); ack(); -
确认机制: 无论是Redis还是RabbitMQ,都要开启确认机制。不要省那一行代码。
六、 PHP 的特有痛:多进程与崩溃
PHP最让人头疼的就是它是一个脚本语言,容易挂。如果上面的 consumer.php 只是跑在一个单线程里,一旦处理某个慢任务卡住了,整个队列就堵死了。
解决方案:多进程(Forking)
你需要用 PHP 的 pcntl 扩展(Linux下默认支持)。我们启动脚本的时候,让它在后台开5个进程,每个进程独立监听队列。
伪代码逻辑如下:
// master.php
for ($i = 0; $i < 5; $i++) {
pcntl_fork(); // 创建子进程
}
function runConsumer() {
// 循环调用上面的 RabbitMQ consumer 代码
// 这里省略具体代码,其实就是把上面的逻辑封装成一个函数
}
// 在主进程里监听子进程是否挂了
while(true) {
$status = null;
$pid = pcntl_wait($status); // 等待任意子进程退出
if ($pid == -1) {
// 没有子进程
break;
} else if (pcntl_wifexited($status)) {
// 子进程异常退出了
echo "子进程 $pid 退出了,正在重启...n";
pcntl_fork(); // 重新拉一个进来
}
}
或者更现代一点,使用 Swoole 或者 Workerman。这两个库把PHP的高并发能力提升了不止一个档次。Swoole的 Server->on('receive') 监听端口,Corun() 开启协程,处理MQ消息简直就是小菜一碟。这属于进阶玩法了,但如果你真的要搞高并发,建议直接上 Swoole。
七、 死信队列:终极垃圾处理
如果消息一直处理不了怎么办?比如某个商品突然被下架了,或者数据格式错了。如果一直积压在队列里,什么时候是个头?
死信队列(DLQ) 就是垃圾桶。
- MQ配置:在主队列上设置
x-dead-letter-exchange指向一个死信交换机。 - 处理失败:当消息被拒绝(nack)3次后,MQ自动把它扔进死信队列。
- 后台处理:专门写一个脚本去监听死信队列,记录日志,或者人工介入。
这就像你有个垃圾桶,垃圾扔满了,你如果不倒,垃圾就堆满了屋子。
八、 总结与心态
用消息队列,不是让你为了用而用。
- 不要滥用: 如果你的业务很简单,发个通知而已,用个Redis List就够,别为了装逼去上Kafka,运维成本很高。
- 不要瞎配置: 消息持久化很安全,但速度慢。RabbitMQ的持久化如果配合高并发,可能会导致吞吐量下降。如果是秒杀这种极端场景,有时候为了速度,甚至允许丢消息(比如缓存扣减库存,最后异步同步库存),但这需要你的业务允许一定的数据不一致。
PHP虽然被戏称为“脚本语言”,但在消息队列这块,它的优势在于开发快、上手容易。只要你把ACK机制、幂等性、多进程这几个点卡死,PHP完全能扛住百万级的QPS。
记住,高并发不是目的,用户体验才是。消息队列就是为了让你的服务器在“过山车”一样的流量面前,保持一颗“淡定”的心。
好了,今天的课就上到这儿。赶紧回去看看你的代码,是不是少了个 brPop 或者 ack?别让你的数据库再哭了。