PHP如何基于Redis实现延迟消息队列与时间轮调度机制

好,各位搬砖工、全栈大侠,还有那些在深夜还在修Bug的极客们,大家好!

欢迎来到今天的“Redis与PHP的深夜茶话会”。今天我们不聊Hello World,也不谈双十一大促的流量洪峰——虽然那很刺激,但那是运维的事,我们管的是“脑子”里的活儿。

今天我们要聊一个稍微有点“搞心态”的话题:延迟消息队列

想象一下,你在淘宝买了个快递。你付完款,这个订单的状态从“待付款”变成了“待发货”。这是即时通讯。但如果是“延迟消息”呢?比如说,你付完款,系统告诉你:“哥们,别急,我有个厨师正在切菜,30分钟后我再通知你去取餐。”这30分钟的等待期,就是延迟消息。

如果没有延迟消息队列,我们程序员通常会怎么做?我们会写一个死循环,每隔5秒钟去查一下数据库:“嘿,还有没有人该吃饭了?”这就好比你每隔5分钟就去敲一下冰箱门:“蛋糕做好了没?蛋糕做好了没?”这不仅浪费电,还把冰箱门弄坏了,而且效率极低。

所以,我们要用Redis和PHP来构建一个高大上的“时间轮调度机制”。

准备好了吗?让我们把Redis这个内存数据库当成一个巨大的、精密的、还没上油的机械钟表。好了,废话少说,开始上代码。


第一部分:Redis ZSET,你是我的“未来”

首先,我们要搞清楚工具。为什么选Redis?因为它快。为什么不用Kafka?因为Kafka太重了,还得装个Java环境,还得搭Zookeeper,甚至还得在服务器上跑一个Java进程,那一启动,CPU直接飙升到90%,运维小哥就要拿着拖鞋来找我了。

我们要用Redis,为什么?因为它内置了有序集合(Sorted Set, ZSET)。这玩意儿简直是为此刻量身定做的。

ZSET有两个东西:member(值)和 score(分数)。

  • member:就是你的任务内容,比如“给用户发送欢迎邮件”。
  • score:就是这个任务执行的时间点。我们通常用Unix时间戳来表示。

你看,这多像是一个书架。

  • 书架第一层是2023年10月1日的书。
  • 书架最顶层是2024年12月31日的书。
  • 当你想要拿一本书时,你只需要看一眼书脊上的日期。

时间轮思想,本质上就是ZSET。圆盘就是ZSET,指针就是当前时间,槽位就是分值。

第二部分:生产者,把消息“射”向未来

先看生产者。生产者就是个“送信的”,但他有个特点:他不知道信什么时候到,但他知道信会在什么时候“派送出去”。

我们要写一个 Producer.php

<?php
require 'vendor/autoload.php'; // 假设你用了predis
use PredisClient;

class Producer
{
    private $redis;

    public function __construct()
    {
        // 连接Redis,别告诉我你还没装Redis
        $this->redis = new Client([
            'scheme' => 'tcp',
            'host'   => '127.0.0.1',
            'port'   => 6379,
        ]);
    }

    /**
     * 延迟发送消息
     * @param string $taskData 任务数据,JSON字符串最好
     * @param int $delay 延迟秒数
     */
    public function push($taskData, $delay)
    {
        // 当前时间戳 + 延迟时间 = 截止时间
        $score = time() + $delay;

        // zAdd指令:向集合中添加元素,如果member已存在则更新score
        // 这里的key就是我们的队列名 "delay_queue"
        $result = $this->redis->zAdd('delay_queue', $score, $taskData);

        if ($result) {
            echo "任务已入队:{$taskData},将在 {$delay} 秒后执行。n";
        } else {
            echo "入队失败,请检查Redis。n";
        }
    }
}

// --- 测试一下 ---
$producer = new Producer();
// 发送一个“取消订单”的消息,延迟10秒
$producer->push(json_encode(['action' => 'cancel_order', 'order_id' => 10086]), 10);

这段代码非常简单,就像把一颗子弹上膛,然后拉到10秒后的时间轴上。现在,这颗子弹静静地躺在Redis里,还没到引爆的时候。

第三部分:消费者,时间轮的指针

现在关键来了。谁来转动时间轮?当然是我们的消费者,或者叫 Worker,或者叫调度器。

如果我们像刚才说的那样,每隔5秒问一次Redis:“现在有人要吃饭了吗?”,那会有什么问题?

  1. 浪费资源:如果是1秒后的任务,你每5秒去查一次,中间3秒的等待毫无意义。
  2. 实时性差:如果是59秒后的任务,你刚好错过了一轮,得等下一个5秒周期才能发现。
  3. 高并发:如果瞬间有100万个任务过期了,你一下子查出来100万个,PHP处理得过来吗?可能会把内存撑爆,或者把Redis打死。

所以,我们要做一个“智能”的调度器。它的逻辑是这样的:

  1. 查一查:现在几点了?去Redis里看看,有没有“过期了”的任务。
  2. 分一分:把过期的任务取出来。
  3. 干一干:处理这些任务。
  4. 睡一觉:看看下一个任务啥时候过期?如果是1分钟后过期,我就睡1分钟。如果是1毫秒后过期,我就睡1毫秒。
  5. 循环:重复上述步骤。

这就是时间轮调度的核心思想:基于事件的睡眠

代码实现:RedisDelayWorker

<?php
require 'vendor/autoload.php';
use PredisClient;

class RedisDelayWorker
{
    private $redis;
    private $queueKey = 'delay_queue';
    private $scanCount = 100; // 每次取多少个任务

    public function __construct()
    {
        $this->redis = new Client([
            'scheme' => 'tcp',
            'host'   => '127.0.0.1',
            'port'   => 6379,
        ]);
    }

    public function run()
    {
        echo "调度器启动,开始等待任务...n";

        while (true) {
            try {
                $now = time();

                // 1. 查询:找到所有 <= 当前时间的任务
                // zRangeByScore:从分数最小到最大,返回member和score
                // withScores: 顺便把分数也给我拿回来,方便计算下次sleep时间
                $tasks = $this->redis->zRangeByScore($this->queueKey, '-inf', $now, [
                    'withscores' => true, 
                    'limit' => [0, $this->scanCount] // 一次处理100个,别贪多
                ]);

                if (empty($tasks)) {
                    // 2. 如果没有任务,怎么睡?
                    // 这里的策略是:直接死睡1秒。
                    // 等待1秒后,继续循环,再次查询。
                    // 优点:简单。缺点:如果有任务在0.1秒后到期,你要等1秒才发现。
                    // 对于一般业务,这没问题。如果对实时性要求极高,需要用 Lua 脚本做原子操作。
                    echo "当前无事可做,休息1秒...n";
                    sleep(1);
                    continue;
                }

                echo "发现 " . count($tasks) . " 个待处理任务,开始收割!n";

                // 3. 处理任务
                foreach ($tasks as $item) {
                    // $item 是一个数组 [member, score] 或者 [score, member],取决于Predis版本,通常PHP扩展是 [member, score]
                    // 具体要看你的Redis扩展返回格式,这里假设是 ['data', 123456789]
                    $data = $item[0];
                    $score = $item[1];

                    // 消费
                    $this->consume($data);

                    // 4. 删除:处理完了就赶紧从队列里删掉,免得下次又捞到
                    $this->redis->zRem($this->queueKey, $data);
                }

                // 5. 计算下一次休眠时间
                // 题目里要求“时间轮调度”,意味着我们要精准地睡到下一个任务的时间点
                // $tasks 的顺序是分数从小到大排序的
                // 所以数组最后一个元素的分数就是下一个最早的任务时间
                $nextTaskTime = end($tasks)[1];

                $sleepSeconds = $nextTaskTime - time();

                if ($sleepSeconds > 0) {
                    echo "任务处理完毕,预计下次唤醒时间: " . date('Y-m-d H:i:s', $nextTaskTime) . "n";
                    echo "正在休眠 {$sleepSeconds} 秒...n";
                    sleep($sleepSeconds);
                } else {
                    // 如果计算结果是负数(比如任务刚刚过期,但还没来得及处理),说明有堆积,直接继续循环,别睡死
                    continue;
                }

            } catch (Exception $e) {
                echo "出错了:" . $e->getMessage() . "n";
                // 发生错误睡一会儿,避免死循环把CPU干烧
                sleep(5);
            }
        }
    }

    private function consume($data)
    {
        $json = json_decode($data, true);
        echo ">>> 执行任务: " . $data . "n";
        // 这里写你的业务逻辑,比如发邮件、更新数据库状态
        // 模拟耗时操作
        // sleep(1);
    }
}

// 启动消费者
// $worker = new RedisDelayWorker();
// $worker->run();

代码深度解析(敲黑板!)

看上面这个 run() 方法,这就是精髓。

  1. zRangeByScore: 这就是我们的雷达。'-inf' 表示从负无穷(也就是最老的任务),$now 表示截止到现在。只要时间戳小于等于当前时间,统统给我拿出来。
  2. withscores => true: 别小看这个选项。我们需要知道这些任务的具体时间,是为了算出下一次该睡多久。这就是时间轮的“轮”所在——指针停在下一个刻度之前。
  3. zRem: 这一步非常关键,叫“消费确认”。处理完了,任务就从队列里消失了。如果你不删,下次循环还会把它捞出来,这就重复消费了,可能会导致数据错误(比如重复发邮件)。

第四部分:优化与进阶——别让系统死机

上面的代码虽然能跑,但有几个坑,掉进去你就得掉头发。

坑一:任务的“坟墓”与重复消费

如果在处理任务的过程中,脚本突然崩了(比如内存溢出,或者被杀毒软件误杀),Redis里的任务还在。重启脚本后,这些任务会被重新处理一次。

  • 后果:用户收到两条欢迎邮件,或者订单被重复取消。
  • 解决方案:我们需要引入幂等性。所有业务逻辑都要设计成“执行一次和执行100次结果一样”。
    • 比如发邮件,先查数据库,如果已经发送过,就不发了。
    • 或者引入死信队列(Dead Letter Queue)。如果处理失败(抛出异常),不要 zRem,而是把这个任务移到 dead_queue 里,然后发个钉钉/邮件报警给你。

坑二:长时间阻塞

如果 $tasks 数组特别大(比如有10万个任务),foreach 循环处理10万个任务可能会阻塞几秒钟甚至几分钟。

  • 后果:在这一段时间里,sleep 的逻辑失效了,时间轮的指针转不动了。其他原本要在这个时间点执行的任务,全部被积压了。
  • 解决方案分片处理。不要试图一次吃掉所有任务。
    // 改进版:每次只取100个
    $tasks = $this->redis->zRangeByScore($this->queueKey, '-inf', $now, ['limit' => [0, 100]]);

    如果还有没吃完的,下次循环再吃。这样保证任何时候指针都在转。

坑三:Redis连接断开

PHP脚本运行时间长了,Redis连接可能会断开。如果不处理,脚本会一直卡在 $this->redis->zRangeByScore 那里,报错不显示,或者静默失败。

  • 解决方案:在你的循环里加个 try-catch,一旦连接出错,重连,然后 continue

第五部分:时间分片优化(进阶大神版)

如果延迟的时间非常长,比如一个任务要推迟10天执行。ZSET里会有一个巨大的分数。当我们用 zRangeByScore 查询时,Redis需要扫描所有的元素来找到符合条件的。虽然ZSET是有序的,但如果有几百万个元素,全表扫描也会慢。

这时候,我们可以玩一点“骚操作”:时间分片

我们不只用一个 ZSET delay_queue,我们用10个 ZSET。
delay_queue_0 (0-100秒), delay_queue_1 (100-200秒), delay_queue_10 (1000-1100秒)…

生产者

  1. 计算延迟时间 $delay。
  2. 计算取模 floor($delay / 100)
  3. 把任务推入对应的 delay_queue_n

消费者

  1. 计算当前时间所在的范围(比如在100-200秒范围内)。
  2. 只去 delay_queue_1 里查。
  3. 如果时间过了200秒,就从 delay_queue_1 里把所有任务移到 delay_queue_2 中。

这种做法类似于Java里Netty的HashedWheelTimer。它把大问题分解成了小问题,大大提高了查询效率。

第六部分:PHP脚本的生命周期

PHP 是脚本语言,运行完就挂了。但是我们的队列消费者是需要长期运行的。

你有两个选择:

  1. 命令行运行(推荐)
    写个脚本 consumer.php,然后在服务器上跑:
    php consumer.php &
    或者用 supervisor 守护进程。如果脚本崩了,supervisor 会自动重启它。

  2. 利用 Web Server 的长连接
    比如写一个接口 /api/worker。用户请求这个接口,PHP 就一直不返回,死循环处理任务,直到用户强制刷新页面才结束。
    千万别在生产环境用这个。因为如果你访问量大了,可能100个用户都访问了这个接口,就启动了100个消费者。你的服务器瞬间就炸了。

第七部分:聊聊“死锁”与“乐观锁”

虽然我们这里主要用 ZSET,但聊技术就得聊透彻。

假设有一个高并发场景:A任务和B任务同时到期,都得处理。A先抢到了锁,开始处理。B拿到了锁,开始处理。
这叫竞态条件

在 Redis 中,我们可以用 WATCH 命令(乐观锁)。

// 伪代码
$redis->watch($queueKey);
// 再次确认任务还在(可能B抢走了)
$tasks = $redis->zRangeByScore($queueKey, '-inf', time(), ['limit' => [0, 1]]);
if (!empty($tasks)) {
    $redis->multi();
    $redis->zRem($queueKey, $tasks[0]);
    $redis->exec();
    // 处理任务
} else {
    $redis->unwatch();
}

虽然代码长了点,但这能保证同一时刻只有一个进程处理同一个任务。

结语:技术是工具,调度是艺术

好了,朋友们,咱们今天这堂课,从最简单的 zAdd 讲到了复杂的 HashedWheelTimer 思想。

用 PHP + Redis 做延迟队列,虽然比不上 RabbitMQ 或 Kafka 那种自带高级功能的重型武器,但它便宜啊!不需要买额外的服务器,不需要复杂的配置,甚至一个单机的 Redis 就能撑起一个百万级并发量的延迟队列(前提是你代码写得溜)。

这就像什么?就像你用一根竹竿挑水。RabbitMQ 是一艘万吨巨轮,运水多,但难造;而我们的 PHP+Redis,就是那根竹竿,灵活,轻便,随拿随用。

记住核心三点:

  1. :用 ZSET,分数是时间戳。
  2. :用 zRangeByScore 拿到过期的。
  3. :处理完了 zRem 走人。

下次当你想做一个“5分钟后发通知”的功能时,别再去查数据库循环了,拿起 PHP,敲几行代码,让 Redis 帮你记住那个时间点。

代码已上传,快去试试吧!别忘了给你的 Redis 内存扩容点,别让它在处理任务时憋炸了!

好了,讲座结束,我要去处理我电脑里的“垃圾文件”了(虽然它们还没到该被删除的时间)。各位,江湖再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注