PHP如何实现高并发延迟队列并支持失败任务自动重试

各位同学,各位未来的架构师,大家好!

今天我们不聊那些虚头巴脑的理论,也不谈什么“高内聚低耦合”的空话套话。我们直接切入正题——PHP高并发延迟队列

我知道,在很多人的刻板印象里,PHP就是那个“做一锤子买卖”的脚本语言,跑一会儿就挂了,哪来的并发?哪来的高可用?甚至有人会说:“哥,你搞个C++或者Go写个多线程不就行了?”

哎,同学,你太天真了。虽然Go很香,C++很强,但PHP在某些场景下,那可是性价比之王。而且,很多大厂(包括那些做电商、秒杀、社交APP的巨头)都在用PHP处理核心流量。

今天,我们要聊的,就是如何用PHP,在一个单机环境下,打造一个既能延迟执行,又能自动重试,还能抗住高并发的“超级队列”。准备好了吗?让我们把那台名为“服务器”的引擎轰起来!


第一章:为什么要造这个轮子?

在动手之前,我们要先搞清楚,这玩意儿到底是干嘛的。

假设你是淘宝的一个程序员。用户A下了单,但是呢,用户不想立刻付钱,他想“犹豫一下”。或者用户想发个朋友圈炫耀一下自己买了啥,然后过了5分钟再支付。如果这时候系统立马扣款,用户还没准备好,那体验得多差?

于是,我们需要一个“快递柜”。用户A把订单(货物)放进快递柜,然后说:“嘿,系统,5分钟后再开这个柜子,让我付钱。”

这就是延迟队列。它不是“先来后到”的排队,它是“掐着表来”的排队。

再说说高并发。如果双十一零点,一秒钟有100万个用户下单。你不能在主线程里一个个去查数据库、去发短信、去写日志吧?那服务器CPU瞬间就给你表演个“原地升天”。你得把任务扔出去,扔到一个“大池子”里,然后由专门的“搬运工”去处理。

最后是失败自动重试。生活哪有事事顺心?发短信可能失败,第三方支付接口可能挂了。如果任务失败了,我们不能就这么算了,也不能立马重试(可能会导致雪崩),我们需要给它“疗伤”,等它好了再干。

所以,我们的目标是:一个基于PHP和Redis的高性能延迟队列系统。


第二章:核心武器——Redis Sorted Sets (ZSET)

很多初学者会问:“为什么不用MySQL的定时任务?”
答:因为你没听过“数据库锁死”这个传说吗?在一个几百万条数据的表里做范围查询,加上并发更新,你的数据库CPU能给你哭出声来。

我们要用的核心武器,是Redis的数据结构之一:有序集合。俗称 ZSET

ZSET 是什么?它就像一个排队系统,每个人都有一个分数(Score)

  • 如果分数是0,就是普通队列。
  • 如果分数是时间戳,它就变成了延迟队列

原理很简单:

  1. 你要把一个任务“延时5分钟”,你就把任务的ID作为成员(Member),把“当前时间 + 5分钟”作为分数(Score),塞进ZSET。
  2. 然后启动一个Worker(搬运工),不断地去ZSET里问:“现在几点了?有没有分数小于现在时间的任务?”
  3. 如果有,就把它们取出来处理,然后删掉。

听起来很优雅,对吧?别急,代码才是硬道理。


第三章:代码实战——构建基础队列

首先,我们需要一个简单的封装类。为了演示,我们直接用PHP的Redis扩展。

<?php

class DelayQueue
{
    private $redis;
    private $queueName = 'delay_queue';

    public function __construct()
    {
        // 连接Redis,这里假设你已经安装了Redis并开启了服务
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    /**
     * 压入任务
     * @param mixed $data 任务数据,可以是数组,对象序列化后的字符串等
     * @param int $delay 延迟秒数
     */
    public function push($data, $delay = 0)
    {
        // 将数据序列化
        $payload = serialize($data);

        // 计算执行时间戳 = 当前时间 + 延迟时间
        $score = time() + $delay;

        // ZADD命令:将member加入到zset中,如果member已存在,更新score
        // 这是一个原子操作,不用担心并发问题
        $this->redis->zAdd($this->queueName, $score, $payload);

        return true;
    }

    /**
     * 拉取任务
     * @param int $timeout 超时时间(毫秒),如果是0表示阻塞等待
     * @return array|null
     */
    public function pop($timeout = 0)
    {
        // ZRANGEBYSCORE key min max WITHSCORES LIMIT offset count
        // 我们需要取出分数 <= 当前时间的任务
        $now = time();

        // 注意:这里我们只取一个,因为我们需要在循环里处理,避免多worker竞争时取到重复任务
        // 但是为了性能,实际生产中通常会批量取
        $result = $this->redis->zRangeByScore($this->queueName, '-inf', $now, ['limit' => [0, 1]]);

        if (empty($result)) {
            return null;
        }

        // $result是一个数组,包含member和score,结构是 [0 => payload, 1 => score]
        $payload = $result[0];

        // 删除任务:从队列中彻底移除
        $this->redis->zRem($this->queueName, $payload);

        return unserialize($payload);
    }
}

这段代码看起来很完美,对吧?但是,它有个致命的Bug!如果你把它直接扔进生产环境,你会发现……

Bug在哪?
看第43行 zRangeByScore。我们只取了一个。假设你有一万个任务同时到期,而你的Worker只有5个,那么9950个任务得等Worker忙完手头的活才能轮到它们。这叫处理积压

在PHP的世界里,我们要么快,要么快。既然Redis是单线程的,那我们能不能利用它的原子性,一次性把所有到期的任务都取出来?


第四章:优化——批量拉取与Worker设计

这是资深程序员和初级程序员的分水岭。我们需要写一个Worker类,它不仅要处理任务,还要批量处理任务。

<?php

class QueueWorker
{
    private $redis;
    private $queueName = 'delay_queue';
    private $processBatchSize = 100; // 每次批量处理100个任务

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    public function work()
    {
        echo "Worker 启动,开始监听队列...n";

        while (true) {
            $now = time();

            // --- 核心优化:批量拉取 ---
            // 使用ZRANGEBYSCORE一次性取出所有到期的任务
            // 注意:PHP的Redis扩展处理大数组可能会内存溢出,所以size要控制
            $tasks = $this->redis->zRangeByScore($this->queueName, '-inf', $now, ['limit' => [0, $this->processBatchSize]]);

            if (!empty($tasks)) {
                // 我们得到了一批任务
                foreach ($tasks as $payload) {
                    $this->processSingleTask($payload);
                }
            } else {
                // 没任务,休眠一下,避免CPU空转
                // 比如休眠50毫秒,既不浪费资源,又能保持响应速度
                usleep(50000); 
            }
        }
    }

    private function processSingleTask($payload)
    {
        $data = unserialize($payload);
        // 这里调用你的业务逻辑
        $this->handleBusinessLogic($data);
    }

    private function handleBusinessLogic($data)
    {
        // 模拟业务处理
        echo "正在处理任务: " . json_encode($data) . "n";

        // 模拟50%的失败率,为了测试重试逻辑
        if (rand(0, 1)) {
            throw new Exception("业务逻辑处理失败!");
        }
    }
}

// 启动Worker
$worker = new QueueWorker();
$worker->work();

好了,现在我们有了基础版。但是,你还没看到“失败自动重试”的部分。

如果 handleBusinessLogic 抛出了异常,任务就丢了!这是绝对不行的。我们需要一个包装器,它像一个慈祥的老母亲,把任务包起来,如果孩子摔倒了,就拍拍土,等一会儿再送进去。


第五章:失败重试与指数退避

重试不能傻傻地死循环,那样会把系统搞崩。我们要用指数退避算法

什么意思?第一次失败等1秒,第二次等2秒,第三次等4秒……如果还是失败,就别试了,记录日志,报警吧,这任务可能真有问题。

我们需要改造一下数据结构。原来的 ZSET 只存了 Payload。现在,我们要存一些元数据:重试次数、重试时间。

我们可以把 Payload 做个 JSON 包装。

class RetryTask
{
    public $data;       // 原始任务数据
    public $attempts;   // 已尝试次数
    public $nextExecute; // 下次执行的时间戳

    public function __construct($data, $delay = 0, $attempts = 1)
    {
        $this->data = $data;
        $this->attempts = $attempts;
        // 基础延迟,加上指数退避的时间
        $this->nextExecute = time() + $delay + (pow(2, $attempts - 1)); 
    }
}

class SmartQueue
{
    private $redis;
    private $queueName = 'delay_queue';

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    public function push($data, $delay = 60)
    {
        // 构造带重试信息的任务对象
        $task = new RetryTask($data, $delay);
        $payload = json_encode($task);

        $this->redis->zAdd($this->queueName, $task->nextExecute, $payload);
    }

    public function pop()
    {
        $now = time();
        // 只取一个,确保原子性(或者批量取,逻辑类似)
        $tasks = $this->redis->zRangeByScore($this->queueName, '-inf', $now, ['limit' => [0, 1]]);

        if (empty($tasks)) {
            return null;
        }

        $payload = $tasks[0];
        $this->redis->zRem($this->queueName, $payload);

        return json_decode($payload, true); // 解码为数组方便处理
    }

    public function retry($taskPayload, $maxRetries = 5)
    {
        $task = json_decode($taskPayload, true);

        // 检查是否超过最大重试次数
        if ($task['attempts'] >= $maxRetries) {
            echo "任务失败次数过多,放弃重试: " . json_encode($task['data']) . "n";
            // 这里可以发邮件告警,或者写入失败日志表
            return false;
        }

        // 更新重试次数
        $task['attempts']++;

        // 创建新的重试任务
        $newTask = new RetryTask($task['data'], 0, $task['attempts']);
        $newPayload = json_encode($newTask);

        // 重新入队,注意:这里使用的是下次执行时间,通常不需要额外的delay
        $this->redis->zAdd($this->queueName, $newTask->nextExecute, $newPayload);

        return true;
    }
}

这下,逻辑闭环了。Worker 拿到任务 -> 处理 -> 如果失败 -> 调用 retry -> 任务带着新的 nextExecute 时间戳重新回到队列。


第六章:高并发下的“小心机”

好了,基础功能有了,重试也有了。但是,我们说的是高并发

想象一下,双十一零点,Redis瞬间塞进了100万个任务。你的 Worker 只有10个进程。这10个进程开始疯狂 zRangeByScore

隐患一:竞争条件(Race Condition)
虽然我们用了 zRem,但在极端情况下(虽然Redis是单线程的,但网络延迟、命令执行顺序),多个Worker可能同时看到同一个任务,然后都把它 zRem 了,导致重复执行。

隐患二:阻塞问题
如果 zRangeByScoremin 设为 -inf,而 max 是当前时间。如果当前时间是整点,刚好有一波高峰。Worker在处理这些任务时,耗时很长。那么下一波到期的任务怎么办?它们就在队列里等着,直到当前的这1000个任务处理完。这就是阻塞

解决方案:长轮询

不要在PHP里写一个 while(true),然后 sleep(1)。这太浪费CPU了。我们要用Redis的 BRPOPLPUSH 或者基于 BLPOP 的自定义长轮询。

BRPOPLPUSH 是什么?它是Redis的“阻塞式弹出”。它会让客户端连接挂起,直到有数据弹出,或者超时。一旦有数据,它还会把数据自动推送到另一个列表(备份列表)。这样,消费者拿到数据后,就可以放心大胆地去处理,处理完了再从备份列表删掉。

但是,ZSET怎么配合BRPOPLPUSH?BRPOPLPUSH是针对列表(List)的。

高级架构方案:ZSET + List 双层结构

  1. 入队:ZSET 存储时间。同时,把数据也推送到一个普通 List(比如 ready_queue)里。
  2. 出队:Worker 等待 ready_queue 里有数据。

等等,这好像把ZSET的作用淡化了?并没有。

我们可以在出队逻辑里做一个动作:定时扫描ZSET,把所有到期的任务,在内存里计算出来,然后一次性 LPUSHready_queue

Worker 的真正工作流:

class HighPerformanceWorker
{
    private $redis;
    private $zsetKey = 'delay_queue_zset';
    private $listKey = 'ready_queue'; // 准备执行的任务列表

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    /**
     * 搬运工:负责把ZSET里到期的任务搬运到List里
     */
    public function shiftExpiredTasks()
    {
        $now = time();

        // 1. 批量取出所有到期的任务
        // limit 0 500 表示一次最多取500个,防止内存炸裂
        $tasks = $this->redis->zRangeByScore($this->zsetKey, '-inf', $now, ['limit' => [0, 500]]);

        if (empty($tasks)) {
            return;
        }

        // 2. 逐个处理(重试逻辑等)
        foreach ($tasks as $payload) {
            $task = json_decode($payload, true);

            // 如果还有重试机会,计算下次时间
            if ($task['attempts'] < 5) {
                 // 重试逻辑... (同上文)
                 // 如果重试成功,跳过入队,直接处理
                 continue;
            }

            // 入队到准备列表
            $this->redis->rPush($this->listKey, $payload);
        }

        // 3. 从ZSET中移除已处理的任务
        // 由于zRange返回的是数组,我们需要转成set来高效删除
        $this->redis->zRem($this->zsetKey, ...$tasks);
    }

    /**
     * 消费者:负责从List里取任务并执行
     */
    public function consume()
    {
        echo "Consumer 启动,监听 Ready Queue...n";

        while (true) {
            // 1. 长轮询:等待List里有数据,或者超时1秒
            // 如果没有数据,说明还没到期,或者没积压,利用这个时间去做别的
            $result = $this->redis->blPop($this->listKey, 1);

            // 如果blPop返回空,说明超时了。我们利用这个时间空隙去检查ZSET有没有新到期的!
            if (!$result) {
                $this->shiftExpiredTasks();
                continue;
            }

            // 如果拿到了数据
            $payload = $result[1];
            $this->processPayload($payload);
        }
    }

    private function processPayload($payload)
    {
        $task = json_decode($payload, true);
        try {
            // 执行业务
            $this->execute($task['data']);
            echo "任务执行成功n";
        } catch (Exception $e) {
            // 失败,扔回ZSET重试
            $this->pushToRetry($task);
        }
    }

    // 辅助方法省略...
}

这种设计的妙处:

  1. 解耦:ZSET负责“计算时间”,List负责“排队执行”。计算耗时,执行更耗时。把它们分开,Worker不用一直死等ZSET查询,也不用一直死等List阻塞。
  2. 利用空隙blPop 有个超时参数。如果没人排队,1秒后返回空。这段时间,Worker可以去检查ZSET,把新到期的任务搬过来。这极大地提高了CPU利用率和响应速度。
  3. 高并发:你可以启动N个 consume() 进程,它们都会抢 blPop 的锁。谁先抢到谁执行,互不干扰。

第七章:那些年我们踩过的坑

作为专家,我不讲成功学,我讲踩坑指南。

1. PHP内存泄漏
PHP是脚本语言,默认是内存释放的。但是,如果你在循环里不断地 new Class() 而不销毁引用,或者使用了非线程安全的扩展,进程跑久了就会越来越慢,最后OOM(内存溢出)。
对策:使用 gc_collect_cycles() 强制垃圾回收,或者限制 Worker 的运行时间,使用 Supervisor 等进程管理器自动重启 Worker。

2. 序列化不兼容
如果你在一个PHP版本序列化一个对象,扔进Redis,然后在另一个PHP版本(比如5.6到7.4)反序列化,通常会报错 serializing 'Closure' 或者 Exception
对策:尽量只传 数组基本类型。不要传闭包,不要传PDO连接对象。数据传输,要轻量,要纯净。

3. 信号处理
当你用 Supervisor 管理Worker时,你执行 supervisorctl stop,Worker不会优雅地退出。它会瞬间被杀掉,导致正在处理的任务可能丢失,或者数据不一致。
对策:在Worker代码里捕获 SIGTERMSIGINT 信号,设置一个标志位,在循环里检查,如果收到信号,先处理完当前任务,再退出。


第八章:终极兵器——RabbitMQ(当你需要更大厂牌的时候)

讲到这里,你可能觉得:“Redis其实也挺香的嘛,轻量、够用。”

没错,对于中小型项目,或者并发量在每秒几千几万级的业务,纯Redis方案完全够用,而且维护成本极低。

但是,如果你的项目是:

  • 分布式系统,需要跨服务器通信?
  • 需要极其精确的死信队列?
  • 需要消息持久化,防止Redis挂了数据全没?

那么,Redis就不行了。这时候,你需要请出真正的“六边形战士”——RabbitMQ

RabbitMQ 做延迟队列是怎么玩的?
它利用了 TTL (Time To Live) + 死信交换机 (DLX)

  1. 发送消息到交换机,设置 TTL 为 5000ms(5秒)。
  2. 设置路由键到队列 A。
  3. 队列 A 设置为“死信队列”。
  4. 队列 A 到期后,消息自动转为“死信”。
  5. 监听死信队列的消费者,收到消息,执行逻辑。

这听起来比Redis复杂多了,对吧?是的,这就是“重武器”的代价。它提供了更完善的机制,比如消息确认机制、幂等性保证、路由分发等。

总结一下:

  • PHP + Redis ZSET轻骑兵。速度快,部署简单,适合自建系统,适合做“秒杀倒计时”、“订单超时取消”。
  • RabbitMQ重装甲。功能全,生态好,适合做“核心交易链路”,适合大型分布式架构。

第九章:实战总结与代码整合

最后,让我们把所有东西串起来,给一个完整的、可以直接跑的Demo代码结构。别看代码长,逻辑其实很简单。

<?php

/**
 * 延迟队列服务类
 * 特性:支持重试、指数退避、长轮询优化
 */
class DelayQueueService
{
    private $redis;
    private $zsetKey = 'mq_delay_queue';
    private $listKey = 'mq_ready_queue';

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(0); // 确保操作在正确的DB
    }

    /**
     * 生产者:生产任务
     */
    public function push($data, $delay = 10, $maxRetries = 3)
    {
        // 构造任务体
        $task = [
            'data'     => serialize($data),
            'attempts' => 0,
            'max_retry'=> $maxRetries,
            'ts'       => time()
        ];

        $score = time() + $delay;
        $payload = json_encode($task);

        // 原子性入队
        $this->redis->zAdd($this->zsetKey, $score, $payload);
    }

    /**
     * 消费者:启动
     */
    public function start()
    {
        echo "Worker Started...n";

        while (true) {
            // 1. 长轮询从 Ready List 获取任务
            // timeout = 1s,如果没有任务,等待1秒
            $res = $this->redis->blPop($this->listKey, 1);

            if (!$res) {
                // 超时了,说明没人排队,去检查 ZSET 有没有新到期的
                $this->syncExpiredTasks();
                continue;
            }

            // 获取任务
            $payload = $res[1];
            $this->handleTask($payload);
        }
    }

    /**
     * 搬运工:检查 ZSET,将到期任务移入 List
     */
    private function syncExpiredTasks()
    {
        $now = time();
        // 批量获取
        $tasks = $this->redis->zRangeByScore($this->zsetKey, '-inf', $now, ['limit' => [0, 100]]);

        if (empty($tasks)) return;

        foreach ($tasks as $payload) {
            $task = json_decode($payload, true);

            // 重试逻辑检查
            if ($task['attempts'] >= $task['max_retry']) {
                // 失败次数过多,丢弃,记录日志(略)
                $this->redis->zRem($this->zsetKey, $payload);
                continue;
            }

            // 计算新的重试时间 (指数退避)
            $delay = pow(2, $task['attempts']);
            $newScore = time() + $delay;

            // 更新任务信息并重新入队
            $task['attempts']++;
            $task['retry_at'] = $newScore; // 记录下次时间
            $newPayload = json_encode($task);

            $this->redis->zAdd($this->zsetKey, $newScore, $newPayload);
        }

        // 批量删除已处理(失败次数耗尽的)旧任务
        if (!empty($tasks)) {
            $this->redis->zRem($this->zsetKey, ...$tasks);
        }
    }

    /**
     * 处理单个任务
     */
    private function handleTask($payload)
    {
        $task = json_decode($payload, true);
        $data = unserialize($task['data']);

        echo "Processing: " . $task['attempts'] . "th attempt.n";

        try {
            // 模拟业务逻辑
            $success = rand(0, 1) ? true : false;

            if ($success) {
                echo "Success!n";
            } else {
                throw new Exception("Simulated failure");
            }
        } catch (Exception $e) {
            echo "Failed. Retrying...n";
            // 重试逻辑:重新推入 ZSET,下次 syncExpiredTasks 会处理它
            $delay = pow(2, $task['attempts']);
            $newScore = time() + $delay;
            $task['attempts']++;
            $newPayload = json_encode($task);
            $this->redis->zAdd($this->zsetKey, $newScore, $newPayload);
        }
    }
}

// --- 使用示例 ---

// 1. 生产者
$q = new DelayQueueService();
$q->push(['type' => 'send_sms', 'phone' => '13800138000'], 2); // 2秒后执行
$q->push(['type' => 'sync_order'], 5); // 5秒后执行

// 2. 启动消费者 (另开一个终端运行)
// php worker.php
// $q->start();

第十章:聊聊架构的艺术

好了,代码给完了。我们来点更“专家”的视角。

为什么我们要这么折腾?

1. 异步化是提升性能的神器
同步调用就像是你打电话给客服,你必须听客服说完才能挂电话。如果你等了5分钟,这5分钟你什么事都干不了。
异步调用就像是你给客服留了张纸条,你转身就去忙别的了,客服处理完会通知你。
在高并发下,主线程(HTTP请求)只需要把任务扔进队列,然后立刻返回 “OK”,用户页面瞬间刷新。用户觉得你的网站快如闪电,而实际耗时在后台悄悄完成。

2. 解耦
业务A不能直接调用业务B。万一业务B挂了,业务A也就挂了。通过队列,业务A只管扔,业务B只管拿,中间隔着一个队列,谁挂了都不影响对方。

3. 流量削峰
双十一流量是波动的,一会儿几万,一会儿几十万。直接打数据库会崩。有了队列,你可以控制消费者的速度(比如每秒只能处理1000个)。流量大的时候,队列就慢慢吞,流量小的时候,队列就慢慢吐。

结语

这篇文章写了这么多,其实核心思想就一句话:利用 Redis 有序集合的“按分数排序”特性,实现时间维度的控制,结合 PHP 的脚本特性,通过长轮询实现高并发消费。

不要觉得这很复杂,当你真正在项目中遇到“秒杀超卖”或者“订单自动取消”的需求时,这套方案就是你的救星。

当然,技术没有银弹。如果你的业务规模已经到了千万级,涉及到跨机房、跨语言协作,那么请毫不犹豫地拥抱 RabbitMQ 或 Kafka。但在大多数 PHP 场景下,这套 Redis 方案,足够让你在面试官面前吹上一整年的牛了。

好了,今天的讲座就到这里。代码拷贝走,修改一下就能用。别忘了,如果任务失败了,多看几眼重试逻辑,那可是你代码里的“回血包”。

祝大家代码无Bug,业务全上线!下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注