各位同学,各位未来的架构师,大家好!
今天我们不聊那些虚头巴脑的理论,也不谈什么“高内聚低耦合”的空话套话。我们直接切入正题——PHP高并发延迟队列。
我知道,在很多人的刻板印象里,PHP就是那个“做一锤子买卖”的脚本语言,跑一会儿就挂了,哪来的并发?哪来的高可用?甚至有人会说:“哥,你搞个C++或者Go写个多线程不就行了?”
哎,同学,你太天真了。虽然Go很香,C++很强,但PHP在某些场景下,那可是性价比之王。而且,很多大厂(包括那些做电商、秒杀、社交APP的巨头)都在用PHP处理核心流量。
今天,我们要聊的,就是如何用PHP,在一个单机环境下,打造一个既能延迟执行,又能自动重试,还能抗住高并发的“超级队列”。准备好了吗?让我们把那台名为“服务器”的引擎轰起来!
第一章:为什么要造这个轮子?
在动手之前,我们要先搞清楚,这玩意儿到底是干嘛的。
假设你是淘宝的一个程序员。用户A下了单,但是呢,用户不想立刻付钱,他想“犹豫一下”。或者用户想发个朋友圈炫耀一下自己买了啥,然后过了5分钟再支付。如果这时候系统立马扣款,用户还没准备好,那体验得多差?
于是,我们需要一个“快递柜”。用户A把订单(货物)放进快递柜,然后说:“嘿,系统,5分钟后再开这个柜子,让我付钱。”
这就是延迟队列。它不是“先来后到”的排队,它是“掐着表来”的排队。
再说说高并发。如果双十一零点,一秒钟有100万个用户下单。你不能在主线程里一个个去查数据库、去发短信、去写日志吧?那服务器CPU瞬间就给你表演个“原地升天”。你得把任务扔出去,扔到一个“大池子”里,然后由专门的“搬运工”去处理。
最后是失败自动重试。生活哪有事事顺心?发短信可能失败,第三方支付接口可能挂了。如果任务失败了,我们不能就这么算了,也不能立马重试(可能会导致雪崩),我们需要给它“疗伤”,等它好了再干。
所以,我们的目标是:一个基于PHP和Redis的高性能延迟队列系统。
第二章:核心武器——Redis Sorted Sets (ZSET)
很多初学者会问:“为什么不用MySQL的定时任务?”
答:因为你没听过“数据库锁死”这个传说吗?在一个几百万条数据的表里做范围查询,加上并发更新,你的数据库CPU能给你哭出声来。
我们要用的核心武器,是Redis的数据结构之一:有序集合。俗称 ZSET。
ZSET 是什么?它就像一个排队系统,每个人都有一个分数(Score)。
- 如果分数是0,就是普通队列。
- 如果分数是时间戳,它就变成了延迟队列。
原理很简单:
- 你要把一个任务“延时5分钟”,你就把任务的ID作为成员(Member),把“当前时间 + 5分钟”作为分数(Score),塞进ZSET。
- 然后启动一个Worker(搬运工),不断地去ZSET里问:“现在几点了?有没有分数小于现在时间的任务?”
- 如果有,就把它们取出来处理,然后删掉。
听起来很优雅,对吧?别急,代码才是硬道理。
第三章:代码实战——构建基础队列
首先,我们需要一个简单的封装类。为了演示,我们直接用PHP的Redis扩展。
<?php
class DelayQueue
{
private $redis;
private $queueName = 'delay_queue';
public function __construct()
{
// 连接Redis,这里假设你已经安装了Redis并开启了服务
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
/**
* 压入任务
* @param mixed $data 任务数据,可以是数组,对象序列化后的字符串等
* @param int $delay 延迟秒数
*/
public function push($data, $delay = 0)
{
// 将数据序列化
$payload = serialize($data);
// 计算执行时间戳 = 当前时间 + 延迟时间
$score = time() + $delay;
// ZADD命令:将member加入到zset中,如果member已存在,更新score
// 这是一个原子操作,不用担心并发问题
$this->redis->zAdd($this->queueName, $score, $payload);
return true;
}
/**
* 拉取任务
* @param int $timeout 超时时间(毫秒),如果是0表示阻塞等待
* @return array|null
*/
public function pop($timeout = 0)
{
// ZRANGEBYSCORE key min max WITHSCORES LIMIT offset count
// 我们需要取出分数 <= 当前时间的任务
$now = time();
// 注意:这里我们只取一个,因为我们需要在循环里处理,避免多worker竞争时取到重复任务
// 但是为了性能,实际生产中通常会批量取
$result = $this->redis->zRangeByScore($this->queueName, '-inf', $now, ['limit' => [0, 1]]);
if (empty($result)) {
return null;
}
// $result是一个数组,包含member和score,结构是 [0 => payload, 1 => score]
$payload = $result[0];
// 删除任务:从队列中彻底移除
$this->redis->zRem($this->queueName, $payload);
return unserialize($payload);
}
}
这段代码看起来很完美,对吧?但是,它有个致命的Bug!如果你把它直接扔进生产环境,你会发现……
Bug在哪?
看第43行 zRangeByScore。我们只取了一个。假设你有一万个任务同时到期,而你的Worker只有5个,那么9950个任务得等Worker忙完手头的活才能轮到它们。这叫处理积压。
在PHP的世界里,我们要么快,要么快。既然Redis是单线程的,那我们能不能利用它的原子性,一次性把所有到期的任务都取出来?
第四章:优化——批量拉取与Worker设计
这是资深程序员和初级程序员的分水岭。我们需要写一个Worker类,它不仅要处理任务,还要批量处理任务。
<?php
class QueueWorker
{
private $redis;
private $queueName = 'delay_queue';
private $processBatchSize = 100; // 每次批量处理100个任务
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function work()
{
echo "Worker 启动,开始监听队列...n";
while (true) {
$now = time();
// --- 核心优化:批量拉取 ---
// 使用ZRANGEBYSCORE一次性取出所有到期的任务
// 注意:PHP的Redis扩展处理大数组可能会内存溢出,所以size要控制
$tasks = $this->redis->zRangeByScore($this->queueName, '-inf', $now, ['limit' => [0, $this->processBatchSize]]);
if (!empty($tasks)) {
// 我们得到了一批任务
foreach ($tasks as $payload) {
$this->processSingleTask($payload);
}
} else {
// 没任务,休眠一下,避免CPU空转
// 比如休眠50毫秒,既不浪费资源,又能保持响应速度
usleep(50000);
}
}
}
private function processSingleTask($payload)
{
$data = unserialize($payload);
// 这里调用你的业务逻辑
$this->handleBusinessLogic($data);
}
private function handleBusinessLogic($data)
{
// 模拟业务处理
echo "正在处理任务: " . json_encode($data) . "n";
// 模拟50%的失败率,为了测试重试逻辑
if (rand(0, 1)) {
throw new Exception("业务逻辑处理失败!");
}
}
}
// 启动Worker
$worker = new QueueWorker();
$worker->work();
好了,现在我们有了基础版。但是,你还没看到“失败自动重试”的部分。
如果 handleBusinessLogic 抛出了异常,任务就丢了!这是绝对不行的。我们需要一个包装器,它像一个慈祥的老母亲,把任务包起来,如果孩子摔倒了,就拍拍土,等一会儿再送进去。
第五章:失败重试与指数退避
重试不能傻傻地死循环,那样会把系统搞崩。我们要用指数退避算法。
什么意思?第一次失败等1秒,第二次等2秒,第三次等4秒……如果还是失败,就别试了,记录日志,报警吧,这任务可能真有问题。
我们需要改造一下数据结构。原来的 ZSET 只存了 Payload。现在,我们要存一些元数据:重试次数、重试时间。
我们可以把 Payload 做个 JSON 包装。
class RetryTask
{
public $data; // 原始任务数据
public $attempts; // 已尝试次数
public $nextExecute; // 下次执行的时间戳
public function __construct($data, $delay = 0, $attempts = 1)
{
$this->data = $data;
$this->attempts = $attempts;
// 基础延迟,加上指数退避的时间
$this->nextExecute = time() + $delay + (pow(2, $attempts - 1));
}
}
class SmartQueue
{
private $redis;
private $queueName = 'delay_queue';
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function push($data, $delay = 60)
{
// 构造带重试信息的任务对象
$task = new RetryTask($data, $delay);
$payload = json_encode($task);
$this->redis->zAdd($this->queueName, $task->nextExecute, $payload);
}
public function pop()
{
$now = time();
// 只取一个,确保原子性(或者批量取,逻辑类似)
$tasks = $this->redis->zRangeByScore($this->queueName, '-inf', $now, ['limit' => [0, 1]]);
if (empty($tasks)) {
return null;
}
$payload = $tasks[0];
$this->redis->zRem($this->queueName, $payload);
return json_decode($payload, true); // 解码为数组方便处理
}
public function retry($taskPayload, $maxRetries = 5)
{
$task = json_decode($taskPayload, true);
// 检查是否超过最大重试次数
if ($task['attempts'] >= $maxRetries) {
echo "任务失败次数过多,放弃重试: " . json_encode($task['data']) . "n";
// 这里可以发邮件告警,或者写入失败日志表
return false;
}
// 更新重试次数
$task['attempts']++;
// 创建新的重试任务
$newTask = new RetryTask($task['data'], 0, $task['attempts']);
$newPayload = json_encode($newTask);
// 重新入队,注意:这里使用的是下次执行时间,通常不需要额外的delay
$this->redis->zAdd($this->queueName, $newTask->nextExecute, $newPayload);
return true;
}
}
这下,逻辑闭环了。Worker 拿到任务 -> 处理 -> 如果失败 -> 调用 retry -> 任务带着新的 nextExecute 时间戳重新回到队列。
第六章:高并发下的“小心机”
好了,基础功能有了,重试也有了。但是,我们说的是高并发。
想象一下,双十一零点,Redis瞬间塞进了100万个任务。你的 Worker 只有10个进程。这10个进程开始疯狂 zRangeByScore。
隐患一:竞争条件(Race Condition)
虽然我们用了 zRem,但在极端情况下(虽然Redis是单线程的,但网络延迟、命令执行顺序),多个Worker可能同时看到同一个任务,然后都把它 zRem 了,导致重复执行。
隐患二:阻塞问题
如果 zRangeByScore 的 min 设为 -inf,而 max 是当前时间。如果当前时间是整点,刚好有一波高峰。Worker在处理这些任务时,耗时很长。那么下一波到期的任务怎么办?它们就在队列里等着,直到当前的这1000个任务处理完。这就是阻塞。
解决方案:长轮询
不要在PHP里写一个 while(true),然后 sleep(1)。这太浪费CPU了。我们要用Redis的 BRPOPLPUSH 或者基于 BLPOP 的自定义长轮询。
BRPOPLPUSH 是什么?它是Redis的“阻塞式弹出”。它会让客户端连接挂起,直到有数据弹出,或者超时。一旦有数据,它还会把数据自动推送到另一个列表(备份列表)。这样,消费者拿到数据后,就可以放心大胆地去处理,处理完了再从备份列表删掉。
但是,ZSET怎么配合BRPOPLPUSH?BRPOPLPUSH是针对列表(List)的。
高级架构方案:ZSET + List 双层结构
- 入队:ZSET 存储时间。同时,把数据也推送到一个普通 List(比如
ready_queue)里。 - 出队:Worker 等待
ready_queue里有数据。
等等,这好像把ZSET的作用淡化了?并没有。
我们可以在出队逻辑里做一个动作:定时扫描ZSET,把所有到期的任务,在内存里计算出来,然后一次性 LPUSH 进 ready_queue。
Worker 的真正工作流:
class HighPerformanceWorker
{
private $redis;
private $zsetKey = 'delay_queue_zset';
private $listKey = 'ready_queue'; // 准备执行的任务列表
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
/**
* 搬运工:负责把ZSET里到期的任务搬运到List里
*/
public function shiftExpiredTasks()
{
$now = time();
// 1. 批量取出所有到期的任务
// limit 0 500 表示一次最多取500个,防止内存炸裂
$tasks = $this->redis->zRangeByScore($this->zsetKey, '-inf', $now, ['limit' => [0, 500]]);
if (empty($tasks)) {
return;
}
// 2. 逐个处理(重试逻辑等)
foreach ($tasks as $payload) {
$task = json_decode($payload, true);
// 如果还有重试机会,计算下次时间
if ($task['attempts'] < 5) {
// 重试逻辑... (同上文)
// 如果重试成功,跳过入队,直接处理
continue;
}
// 入队到准备列表
$this->redis->rPush($this->listKey, $payload);
}
// 3. 从ZSET中移除已处理的任务
// 由于zRange返回的是数组,我们需要转成set来高效删除
$this->redis->zRem($this->zsetKey, ...$tasks);
}
/**
* 消费者:负责从List里取任务并执行
*/
public function consume()
{
echo "Consumer 启动,监听 Ready Queue...n";
while (true) {
// 1. 长轮询:等待List里有数据,或者超时1秒
// 如果没有数据,说明还没到期,或者没积压,利用这个时间去做别的
$result = $this->redis->blPop($this->listKey, 1);
// 如果blPop返回空,说明超时了。我们利用这个时间空隙去检查ZSET有没有新到期的!
if (!$result) {
$this->shiftExpiredTasks();
continue;
}
// 如果拿到了数据
$payload = $result[1];
$this->processPayload($payload);
}
}
private function processPayload($payload)
{
$task = json_decode($payload, true);
try {
// 执行业务
$this->execute($task['data']);
echo "任务执行成功n";
} catch (Exception $e) {
// 失败,扔回ZSET重试
$this->pushToRetry($task);
}
}
// 辅助方法省略...
}
这种设计的妙处:
- 解耦:ZSET负责“计算时间”,List负责“排队执行”。计算耗时,执行更耗时。把它们分开,Worker不用一直死等ZSET查询,也不用一直死等List阻塞。
- 利用空隙:
blPop有个超时参数。如果没人排队,1秒后返回空。这段时间,Worker可以去检查ZSET,把新到期的任务搬过来。这极大地提高了CPU利用率和响应速度。 - 高并发:你可以启动N个
consume()进程,它们都会抢blPop的锁。谁先抢到谁执行,互不干扰。
第七章:那些年我们踩过的坑
作为专家,我不讲成功学,我讲踩坑指南。
1. PHP内存泄漏
PHP是脚本语言,默认是内存释放的。但是,如果你在循环里不断地 new Class() 而不销毁引用,或者使用了非线程安全的扩展,进程跑久了就会越来越慢,最后OOM(内存溢出)。
对策:使用 gc_collect_cycles() 强制垃圾回收,或者限制 Worker 的运行时间,使用 Supervisor 等进程管理器自动重启 Worker。
2. 序列化不兼容
如果你在一个PHP版本序列化一个对象,扔进Redis,然后在另一个PHP版本(比如5.6到7.4)反序列化,通常会报错 serializing 'Closure' 或者 Exception。
对策:尽量只传 数组 和 基本类型。不要传闭包,不要传PDO连接对象。数据传输,要轻量,要纯净。
3. 信号处理
当你用 Supervisor 管理Worker时,你执行 supervisorctl stop,Worker不会优雅地退出。它会瞬间被杀掉,导致正在处理的任务可能丢失,或者数据不一致。
对策:在Worker代码里捕获 SIGTERM 和 SIGINT 信号,设置一个标志位,在循环里检查,如果收到信号,先处理完当前任务,再退出。
第八章:终极兵器——RabbitMQ(当你需要更大厂牌的时候)
讲到这里,你可能觉得:“Redis其实也挺香的嘛,轻量、够用。”
没错,对于中小型项目,或者并发量在每秒几千几万级的业务,纯Redis方案完全够用,而且维护成本极低。
但是,如果你的项目是:
- 分布式系统,需要跨服务器通信?
- 需要极其精确的死信队列?
- 需要消息持久化,防止Redis挂了数据全没?
那么,Redis就不行了。这时候,你需要请出真正的“六边形战士”——RabbitMQ。
RabbitMQ 做延迟队列是怎么玩的?
它利用了 TTL (Time To Live) + 死信交换机 (DLX)。
- 发送消息到交换机,设置 TTL 为 5000ms(5秒)。
- 设置路由键到队列 A。
- 队列 A 设置为“死信队列”。
- 队列 A 到期后,消息自动转为“死信”。
- 监听死信队列的消费者,收到消息,执行逻辑。
这听起来比Redis复杂多了,对吧?是的,这就是“重武器”的代价。它提供了更完善的机制,比如消息确认机制、幂等性保证、路由分发等。
总结一下:
- PHP + Redis ZSET:轻骑兵。速度快,部署简单,适合自建系统,适合做“秒杀倒计时”、“订单超时取消”。
- RabbitMQ:重装甲。功能全,生态好,适合做“核心交易链路”,适合大型分布式架构。
第九章:实战总结与代码整合
最后,让我们把所有东西串起来,给一个完整的、可以直接跑的Demo代码结构。别看代码长,逻辑其实很简单。
<?php
/**
* 延迟队列服务类
* 特性:支持重试、指数退避、长轮询优化
*/
class DelayQueueService
{
private $redis;
private $zsetKey = 'mq_delay_queue';
private $listKey = 'mq_ready_queue';
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
$this->redis->select(0); // 确保操作在正确的DB
}
/**
* 生产者:生产任务
*/
public function push($data, $delay = 10, $maxRetries = 3)
{
// 构造任务体
$task = [
'data' => serialize($data),
'attempts' => 0,
'max_retry'=> $maxRetries,
'ts' => time()
];
$score = time() + $delay;
$payload = json_encode($task);
// 原子性入队
$this->redis->zAdd($this->zsetKey, $score, $payload);
}
/**
* 消费者:启动
*/
public function start()
{
echo "Worker Started...n";
while (true) {
// 1. 长轮询从 Ready List 获取任务
// timeout = 1s,如果没有任务,等待1秒
$res = $this->redis->blPop($this->listKey, 1);
if (!$res) {
// 超时了,说明没人排队,去检查 ZSET 有没有新到期的
$this->syncExpiredTasks();
continue;
}
// 获取任务
$payload = $res[1];
$this->handleTask($payload);
}
}
/**
* 搬运工:检查 ZSET,将到期任务移入 List
*/
private function syncExpiredTasks()
{
$now = time();
// 批量获取
$tasks = $this->redis->zRangeByScore($this->zsetKey, '-inf', $now, ['limit' => [0, 100]]);
if (empty($tasks)) return;
foreach ($tasks as $payload) {
$task = json_decode($payload, true);
// 重试逻辑检查
if ($task['attempts'] >= $task['max_retry']) {
// 失败次数过多,丢弃,记录日志(略)
$this->redis->zRem($this->zsetKey, $payload);
continue;
}
// 计算新的重试时间 (指数退避)
$delay = pow(2, $task['attempts']);
$newScore = time() + $delay;
// 更新任务信息并重新入队
$task['attempts']++;
$task['retry_at'] = $newScore; // 记录下次时间
$newPayload = json_encode($task);
$this->redis->zAdd($this->zsetKey, $newScore, $newPayload);
}
// 批量删除已处理(失败次数耗尽的)旧任务
if (!empty($tasks)) {
$this->redis->zRem($this->zsetKey, ...$tasks);
}
}
/**
* 处理单个任务
*/
private function handleTask($payload)
{
$task = json_decode($payload, true);
$data = unserialize($task['data']);
echo "Processing: " . $task['attempts'] . "th attempt.n";
try {
// 模拟业务逻辑
$success = rand(0, 1) ? true : false;
if ($success) {
echo "Success!n";
} else {
throw new Exception("Simulated failure");
}
} catch (Exception $e) {
echo "Failed. Retrying...n";
// 重试逻辑:重新推入 ZSET,下次 syncExpiredTasks 会处理它
$delay = pow(2, $task['attempts']);
$newScore = time() + $delay;
$task['attempts']++;
$newPayload = json_encode($task);
$this->redis->zAdd($this->zsetKey, $newScore, $newPayload);
}
}
}
// --- 使用示例 ---
// 1. 生产者
$q = new DelayQueueService();
$q->push(['type' => 'send_sms', 'phone' => '13800138000'], 2); // 2秒后执行
$q->push(['type' => 'sync_order'], 5); // 5秒后执行
// 2. 启动消费者 (另开一个终端运行)
// php worker.php
// $q->start();
第十章:聊聊架构的艺术
好了,代码给完了。我们来点更“专家”的视角。
为什么我们要这么折腾?
1. 异步化是提升性能的神器
同步调用就像是你打电话给客服,你必须听客服说完才能挂电话。如果你等了5分钟,这5分钟你什么事都干不了。
异步调用就像是你给客服留了张纸条,你转身就去忙别的了,客服处理完会通知你。
在高并发下,主线程(HTTP请求)只需要把任务扔进队列,然后立刻返回 “OK”,用户页面瞬间刷新。用户觉得你的网站快如闪电,而实际耗时在后台悄悄完成。
2. 解耦
业务A不能直接调用业务B。万一业务B挂了,业务A也就挂了。通过队列,业务A只管扔,业务B只管拿,中间隔着一个队列,谁挂了都不影响对方。
3. 流量削峰
双十一流量是波动的,一会儿几万,一会儿几十万。直接打数据库会崩。有了队列,你可以控制消费者的速度(比如每秒只能处理1000个)。流量大的时候,队列就慢慢吞,流量小的时候,队列就慢慢吐。
结语
这篇文章写了这么多,其实核心思想就一句话:利用 Redis 有序集合的“按分数排序”特性,实现时间维度的控制,结合 PHP 的脚本特性,通过长轮询实现高并发消费。
不要觉得这很复杂,当你真正在项目中遇到“秒杀超卖”或者“订单自动取消”的需求时,这套方案就是你的救星。
当然,技术没有银弹。如果你的业务规模已经到了千万级,涉及到跨机房、跨语言协作,那么请毫不犹豫地拥抱 RabbitMQ 或 Kafka。但在大多数 PHP 场景下,这套 Redis 方案,足够让你在面试官面前吹上一整年的牛了。
好了,今天的讲座就到这里。代码拷贝走,修改一下就能用。别忘了,如果任务失败了,多看几眼重试逻辑,那可是你代码里的“回血包”。
祝大家代码无Bug,业务全上线!下课!