好,各位搬砖工、全栈大侠,还有那些在深夜还在修Bug的极客们,大家好!
欢迎来到今天的“Redis与PHP的深夜茶话会”。今天我们不聊Hello World,也不谈双十一大促的流量洪峰——虽然那很刺激,但那是运维的事,我们管的是“脑子”里的活儿。
今天我们要聊一个稍微有点“搞心态”的话题:延迟消息队列。
想象一下,你在淘宝买了个快递。你付完款,这个订单的状态从“待付款”变成了“待发货”。这是即时通讯。但如果是“延迟消息”呢?比如说,你付完款,系统告诉你:“哥们,别急,我有个厨师正在切菜,30分钟后我再通知你去取餐。”这30分钟的等待期,就是延迟消息。
如果没有延迟消息队列,我们程序员通常会怎么做?我们会写一个死循环,每隔5秒钟去查一下数据库:“嘿,还有没有人该吃饭了?”这就好比你每隔5分钟就去敲一下冰箱门:“蛋糕做好了没?蛋糕做好了没?”这不仅浪费电,还把冰箱门弄坏了,而且效率极低。
所以,我们要用Redis和PHP来构建一个高大上的“时间轮调度机制”。
准备好了吗?让我们把Redis这个内存数据库当成一个巨大的、精密的、还没上油的机械钟表。好了,废话少说,开始上代码。
第一部分:Redis ZSET,你是我的“未来”
首先,我们要搞清楚工具。为什么选Redis?因为它快。为什么不用Kafka?因为Kafka太重了,还得装个Java环境,还得搭Zookeeper,甚至还得在服务器上跑一个Java进程,那一启动,CPU直接飙升到90%,运维小哥就要拿着拖鞋来找我了。
我们要用Redis,为什么?因为它内置了有序集合(Sorted Set, ZSET)。这玩意儿简直是为此刻量身定做的。
ZSET有两个东西:member(值)和 score(分数)。
member:就是你的任务内容,比如“给用户发送欢迎邮件”。score:就是这个任务执行的时间点。我们通常用Unix时间戳来表示。
你看,这多像是一个书架。
- 书架第一层是2023年10月1日的书。
- 书架最顶层是2024年12月31日的书。
- 当你想要拿一本书时,你只需要看一眼书脊上的日期。
时间轮思想,本质上就是ZSET。圆盘就是ZSET,指针就是当前时间,槽位就是分值。
第二部分:生产者,把消息“射”向未来
先看生产者。生产者就是个“送信的”,但他有个特点:他不知道信什么时候到,但他知道信会在什么时候“派送出去”。
我们要写一个 Producer.php。
<?php
require 'vendor/autoload.php'; // 假设你用了predis
use PredisClient;
class Producer
{
private $redis;
public function __construct()
{
// 连接Redis,别告诉我你还没装Redis
$this->redis = new Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);
}
/**
* 延迟发送消息
* @param string $taskData 任务数据,JSON字符串最好
* @param int $delay 延迟秒数
*/
public function push($taskData, $delay)
{
// 当前时间戳 + 延迟时间 = 截止时间
$score = time() + $delay;
// zAdd指令:向集合中添加元素,如果member已存在则更新score
// 这里的key就是我们的队列名 "delay_queue"
$result = $this->redis->zAdd('delay_queue', $score, $taskData);
if ($result) {
echo "任务已入队:{$taskData},将在 {$delay} 秒后执行。n";
} else {
echo "入队失败,请检查Redis。n";
}
}
}
// --- 测试一下 ---
$producer = new Producer();
// 发送一个“取消订单”的消息,延迟10秒
$producer->push(json_encode(['action' => 'cancel_order', 'order_id' => 10086]), 10);
这段代码非常简单,就像把一颗子弹上膛,然后拉到10秒后的时间轴上。现在,这颗子弹静静地躺在Redis里,还没到引爆的时候。
第三部分:消费者,时间轮的指针
现在关键来了。谁来转动时间轮?当然是我们的消费者,或者叫 Worker,或者叫调度器。
如果我们像刚才说的那样,每隔5秒问一次Redis:“现在有人要吃饭了吗?”,那会有什么问题?
- 浪费资源:如果是1秒后的任务,你每5秒去查一次,中间3秒的等待毫无意义。
- 实时性差:如果是59秒后的任务,你刚好错过了一轮,得等下一个5秒周期才能发现。
- 高并发:如果瞬间有100万个任务过期了,你一下子查出来100万个,PHP处理得过来吗?可能会把内存撑爆,或者把Redis打死。
所以,我们要做一个“智能”的调度器。它的逻辑是这样的:
- 查一查:现在几点了?去Redis里看看,有没有“过期了”的任务。
- 分一分:把过期的任务取出来。
- 干一干:处理这些任务。
- 睡一觉:看看下一个任务啥时候过期?如果是1分钟后过期,我就睡1分钟。如果是1毫秒后过期,我就睡1毫秒。
- 循环:重复上述步骤。
这就是时间轮调度的核心思想:基于事件的睡眠。
代码实现:RedisDelayWorker
<?php
require 'vendor/autoload.php';
use PredisClient;
class RedisDelayWorker
{
private $redis;
private $queueKey = 'delay_queue';
private $scanCount = 100; // 每次取多少个任务
public function __construct()
{
$this->redis = new Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);
}
public function run()
{
echo "调度器启动,开始等待任务...n";
while (true) {
try {
$now = time();
// 1. 查询:找到所有 <= 当前时间的任务
// zRangeByScore:从分数最小到最大,返回member和score
// withScores: 顺便把分数也给我拿回来,方便计算下次sleep时间
$tasks = $this->redis->zRangeByScore($this->queueKey, '-inf', $now, [
'withscores' => true,
'limit' => [0, $this->scanCount] // 一次处理100个,别贪多
]);
if (empty($tasks)) {
// 2. 如果没有任务,怎么睡?
// 这里的策略是:直接死睡1秒。
// 等待1秒后,继续循环,再次查询。
// 优点:简单。缺点:如果有任务在0.1秒后到期,你要等1秒才发现。
// 对于一般业务,这没问题。如果对实时性要求极高,需要用 Lua 脚本做原子操作。
echo "当前无事可做,休息1秒...n";
sleep(1);
continue;
}
echo "发现 " . count($tasks) . " 个待处理任务,开始收割!n";
// 3. 处理任务
foreach ($tasks as $item) {
// $item 是一个数组 [member, score] 或者 [score, member],取决于Predis版本,通常PHP扩展是 [member, score]
// 具体要看你的Redis扩展返回格式,这里假设是 ['data', 123456789]
$data = $item[0];
$score = $item[1];
// 消费
$this->consume($data);
// 4. 删除:处理完了就赶紧从队列里删掉,免得下次又捞到
$this->redis->zRem($this->queueKey, $data);
}
// 5. 计算下一次休眠时间
// 题目里要求“时间轮调度”,意味着我们要精准地睡到下一个任务的时间点
// $tasks 的顺序是分数从小到大排序的
// 所以数组最后一个元素的分数就是下一个最早的任务时间
$nextTaskTime = end($tasks)[1];
$sleepSeconds = $nextTaskTime - time();
if ($sleepSeconds > 0) {
echo "任务处理完毕,预计下次唤醒时间: " . date('Y-m-d H:i:s', $nextTaskTime) . "n";
echo "正在休眠 {$sleepSeconds} 秒...n";
sleep($sleepSeconds);
} else {
// 如果计算结果是负数(比如任务刚刚过期,但还没来得及处理),说明有堆积,直接继续循环,别睡死
continue;
}
} catch (Exception $e) {
echo "出错了:" . $e->getMessage() . "n";
// 发生错误睡一会儿,避免死循环把CPU干烧
sleep(5);
}
}
}
private function consume($data)
{
$json = json_decode($data, true);
echo ">>> 执行任务: " . $data . "n";
// 这里写你的业务逻辑,比如发邮件、更新数据库状态
// 模拟耗时操作
// sleep(1);
}
}
// 启动消费者
// $worker = new RedisDelayWorker();
// $worker->run();
代码深度解析(敲黑板!)
看上面这个 run() 方法,这就是精髓。
zRangeByScore: 这就是我们的雷达。'-inf'表示从负无穷(也就是最老的任务),$now表示截止到现在。只要时间戳小于等于当前时间,统统给我拿出来。withscores => true: 别小看这个选项。我们需要知道这些任务的具体时间,是为了算出下一次该睡多久。这就是时间轮的“轮”所在——指针停在下一个刻度之前。zRem: 这一步非常关键,叫“消费确认”。处理完了,任务就从队列里消失了。如果你不删,下次循环还会把它捞出来,这就重复消费了,可能会导致数据错误(比如重复发邮件)。
第四部分:优化与进阶——别让系统死机
上面的代码虽然能跑,但有几个坑,掉进去你就得掉头发。
坑一:任务的“坟墓”与重复消费
如果在处理任务的过程中,脚本突然崩了(比如内存溢出,或者被杀毒软件误杀),Redis里的任务还在。重启脚本后,这些任务会被重新处理一次。
- 后果:用户收到两条欢迎邮件,或者订单被重复取消。
- 解决方案:我们需要引入幂等性。所有业务逻辑都要设计成“执行一次和执行100次结果一样”。
- 比如发邮件,先查数据库,如果已经发送过,就不发了。
- 或者引入死信队列(Dead Letter Queue)。如果处理失败(抛出异常),不要
zRem,而是把这个任务移到dead_queue里,然后发个钉钉/邮件报警给你。
坑二:长时间阻塞
如果 $tasks 数组特别大(比如有10万个任务),foreach 循环处理10万个任务可能会阻塞几秒钟甚至几分钟。
- 后果:在这一段时间里,
sleep的逻辑失效了,时间轮的指针转不动了。其他原本要在这个时间点执行的任务,全部被积压了。 - 解决方案:分片处理。不要试图一次吃掉所有任务。
// 改进版:每次只取100个 $tasks = $this->redis->zRangeByScore($this->queueKey, '-inf', $now, ['limit' => [0, 100]]);如果还有没吃完的,下次循环再吃。这样保证任何时候指针都在转。
坑三:Redis连接断开
PHP脚本运行时间长了,Redis连接可能会断开。如果不处理,脚本会一直卡在 $this->redis->zRangeByScore 那里,报错不显示,或者静默失败。
- 解决方案:在你的循环里加个
try-catch,一旦连接出错,重连,然后continue。
第五部分:时间分片优化(进阶大神版)
如果延迟的时间非常长,比如一个任务要推迟10天执行。ZSET里会有一个巨大的分数。当我们用 zRangeByScore 查询时,Redis需要扫描所有的元素来找到符合条件的。虽然ZSET是有序的,但如果有几百万个元素,全表扫描也会慢。
这时候,我们可以玩一点“骚操作”:时间分片。
我们不只用一个 ZSET delay_queue,我们用10个 ZSET。
delay_queue_0 (0-100秒), delay_queue_1 (100-200秒), delay_queue_10 (1000-1100秒)…
生产者:
- 计算延迟时间 $delay。
- 计算取模
floor($delay / 100)。 - 把任务推入对应的
delay_queue_n。
消费者:
- 计算当前时间所在的范围(比如在100-200秒范围内)。
- 只去
delay_queue_1里查。 - 如果时间过了200秒,就从
delay_queue_1里把所有任务移到delay_queue_2中。
这种做法类似于Java里Netty的HashedWheelTimer。它把大问题分解成了小问题,大大提高了查询效率。
第六部分:PHP脚本的生命周期
PHP 是脚本语言,运行完就挂了。但是我们的队列消费者是需要长期运行的。
你有两个选择:
-
命令行运行(推荐):
写个脚本consumer.php,然后在服务器上跑:
php consumer.php &
或者用supervisor守护进程。如果脚本崩了,supervisor 会自动重启它。 -
利用 Web Server 的长连接:
比如写一个接口/api/worker。用户请求这个接口,PHP 就一直不返回,死循环处理任务,直到用户强制刷新页面才结束。
千万别在生产环境用这个。因为如果你访问量大了,可能100个用户都访问了这个接口,就启动了100个消费者。你的服务器瞬间就炸了。
第七部分:聊聊“死锁”与“乐观锁”
虽然我们这里主要用 ZSET,但聊技术就得聊透彻。
假设有一个高并发场景:A任务和B任务同时到期,都得处理。A先抢到了锁,开始处理。B拿到了锁,开始处理。
这叫竞态条件。
在 Redis 中,我们可以用 WATCH 命令(乐观锁)。
// 伪代码
$redis->watch($queueKey);
// 再次确认任务还在(可能B抢走了)
$tasks = $redis->zRangeByScore($queueKey, '-inf', time(), ['limit' => [0, 1]]);
if (!empty($tasks)) {
$redis->multi();
$redis->zRem($queueKey, $tasks[0]);
$redis->exec();
// 处理任务
} else {
$redis->unwatch();
}
虽然代码长了点,但这能保证同一时刻只有一个进程处理同一个任务。
结语:技术是工具,调度是艺术
好了,朋友们,咱们今天这堂课,从最简单的 zAdd 讲到了复杂的 HashedWheelTimer 思想。
用 PHP + Redis 做延迟队列,虽然比不上 RabbitMQ 或 Kafka 那种自带高级功能的重型武器,但它便宜啊!不需要买额外的服务器,不需要复杂的配置,甚至一个单机的 Redis 就能撑起一个百万级并发量的延迟队列(前提是你代码写得溜)。
这就像什么?就像你用一根竹竿挑水。RabbitMQ 是一艘万吨巨轮,运水多,但难造;而我们的 PHP+Redis,就是那根竹竿,灵活,轻便,随拿随用。
记住核心三点:
- 存:用 ZSET,分数是时间戳。
- 查:用
zRangeByScore拿到过期的。 - 删:处理完了
zRem走人。
下次当你想做一个“5分钟后发通知”的功能时,别再去查数据库循环了,拿起 PHP,敲几行代码,让 Redis 帮你记住那个时间点。
代码已上传,快去试试吧!别忘了给你的 Redis 内存扩容点,别让它在处理任务时憋炸了!
好了,讲座结束,我要去处理我电脑里的“垃圾文件”了(虽然它们还没到该被删除的时间)。各位,江湖再见!