PHP如何实现文章定时发布并支持失败自动重试机制

各位同学,把手里的奶茶放一放,把手机调静音,今天我们不聊那点鸡毛蒜皮的业务逻辑,我们来聊点硬核的、能让你在深夜服务器宕机时看着报警邮件嘴角上扬的技术——PHP定时发布与失败重试机制。

听着,很多初级开发者(包括以前的我)一听到“定时发布”或者“延迟任务”,脑子里蹦出的第一个想法就是:“我是不是该在数据库里存个 publish_time 字段,然后写个每分钟跑一次的 crontab 去查表?”

如果你真的这么干了,恭喜你,你刚刚给自己的系统埋下了一颗定时炸弹。这种做法就像是想送女朋友情人节礼物,你不仅没买花,还指望自己去检查花店老板明天有没有给你包好。如果老板关店了怎么办?如果快递员累倒了怎么办?如果数据库正忙着备份卡死了怎么办?

所以,今天我们要讲的是“高端局”。我们要用PHP实现一个既优雅、又健壮、还能在遇到挫折后(比如API调用失败)自动爬起来的文章发布系统。

我们要解决的三个核心痛点:

  1. 时间精度:我们不是在等钟敲响,我们是在主动出击。
  2. 可靠性:任务不能丢。
  3. 自愈能力:任务挂了?没事,给它一个机会,不行就再给一次,给不到就不罢休。

第一讲:闹钟与检查员的博弈

首先,我们要明确一个哲学问题:定时任务的本质是什么?

它本质上是一个“检查员”。它每隔几秒钟醒来一次,问数据库:“嘿,有没有哪个倒霉蛋的 publish_time 刚好等于现在?如果有,请把它变成 status = 1。”

这种做法在流量小的时候没问题,但一旦你的文章多了,或者时间精度要求高了(比如“精确到秒”),这就完蛋了。

  • 问题1:时间漂移。如果检查员每隔60秒才查一次,而你设置的是59秒后发布,那这篇文章可能要迟到1秒。别小看这1秒,在电商大促里,这就是用户流失。
  • 问题2:服务器不在线。如果你用了 sleep(60) 的死循环写法,当你的服务器被维护人员(或者老板)重启,或者你为了省电把服务器关了,这个检查员就睡着了。等你早上9点开机,它可能已经错过了一整天的任务。
  • 问题3:资源浪费。数据库每分钟扫一次表,全表扫描,那个IO开销啧啧啧,就像你每天早上醒来都要去翻遍家里所有抽屉找钥匙,而不是直接去口袋里拿。

高级做法是什么?

我们需要引入“延迟队列”的概念。这就好比我们不再去翻抽屉找钥匙,而是给这个“发布任务”装了一个电子闹钟。

当我们要发布一篇文章时,不要把它插进待发布队列,而是把它扔进一个“待触发闹钟”的列表里,并告诉它:“哥们,给我定个时间,时间到了你喊我一声。”

这个“闹钟”列表,在PHP里,我们通常用Redis的ZSET(有序集合)来实现。为什么用ZSET?因为ZSET自带排序功能,它的 score 字段就是时间戳,天然就解决了“按时间排序”的问题。


第二讲:架构设计——用Redis构建智能闹钟

好了,让我们开始造轮子。为了方便演示,我们假设使用Redis作为存储,因为PHP操作Redis简直不要太顺手。

我们需要两个主要的Redis结构:

  1. delayed_queue:一个ZSET。成员是任务ID,分数是发布时间戳。
  2. processing_queue:一个Set或List(这里为了演示方便,我们用List)。存放正在处理的任务ID,防止重复执行。
  3. failed_tasks:一个Set或List。存放失败的任务ID。

核心逻辑流程图

  1. 创建任务时

    • 计算未来的时间戳 $timestamp = time() + 3600
    • 将任务ID(比如 article_1001)推入 delayed_queue,分数设为 $timestamp
    • 注意:这一步是瞬间完成的,不消耗服务器性能。
  2. 定时检查器(Crontab)

    • 我们设定一个极短周期的Cron,比如 * * * * *(每分钟执行一次)。
    • 它做的事情很简单:检查 delayed_queue 里有没有分数(时间戳)小于等于 当前时间 的任务。
  3. 触发任务

    • 如果有任务过期了,把它们从ZSET里拿出来(使用 ZREVRANGEBYSCORE 配合 LIMIT 0 100 批量取出),然后移入 processing_queue
    • 去除 processing_queue,逐个执行文章发布的逻辑(比如写文件、调用API)。

第三讲:代码实战——别死循环,要用Cron!

很多新手喜欢写这种代码:

while(true) {
    $tasks = getExpiredTasks();
    if ($tasks) process($tasks);
    sleep(1); // 哎呀,这里睡1秒,精度只有1秒,而且服务器一重启就全废了
}

千万别这么干。我们要用 systemd 或者 supervisor 来守护进程,但核心的检查逻辑,我们要交给Cron。

1. 创建文章(加入闹钟队列)

想象一下,你写了一篇绝世好文,准备一小时后发出去。

class ArticlePublisher
{
    private $redis;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    /**
     * 安排定时发布
     * @param int $articleId 文章ID
     * @param int $delaySeconds 延迟秒数
     */
    public function schedule($articleId, $delaySeconds = 3600)
    {
        $now = time();
        $targetTime = $now + $delaySeconds;

        // ZADD key score member
        // 这里我们把文章ID作为member,时间戳作为score
        $this->redis->zAdd('delayed_queue', $targetTime, "article:{$articleId}");

        echo "文章 {$articleId} 已被扔进时间胶囊,将在 {$targetTime} 秒后触发。n";
    }
}

这行代码执行得非常快,不占用CPU。这就好比你把信扔进了邮筒,你不需要去追着邮递员跑。

2. 启动定时检查器(Cron脚本)

我们在Linux服务器上配置一个cron任务,每分钟跑一次这个脚本。这就好比有一个保安,每分钟进来扫视一遍邮筒。

<?php
// worker.php
require 'vendor/autoload.php';

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

// 每次处理的数量,防止一次把所有任务都拿出来导致内存溢出
$batchSize = 100; 

while (true) {
    $now = time();

    // 1. 从延迟队列中取出所有过期且未处理的任务
    // ZREVRANGEBYSCORE key max min LIMIT offset count
    // 这里的逻辑是:分数 <= 现在的时间
    $tasks = $redis->zRevRangeByScore('delayed_queue', $now, '-inf', 0, $batchSize);

    if (empty($tasks)) {
        // 没任务做?那就睡一会儿,别浪费CPU
        sleep(1);
        continue;
    }

    // 2. 将这些任务从延迟队列中移除(防止重复处理)
    // 使用 MULTI/EXEC 确保原子性
    $redis->multi();
    foreach ($tasks as $task) {
        $redis->zRem('delayed_queue', $task);
        // 放入正在处理队列
        $redis->lPush('processing_queue', $task);
    }
    $redis->exec();

    // 3. 逐个处理任务
    foreach ($tasks as $task) {
        try {
            $this->processSingleTask($task);
            // 处理成功,从处理队列中移除
            $redis->lRem('processing_queue', $task, 0);
        } catch (Exception $e) {
            // 处理失败!这就是我们要引入“重试机制”的时刻了。
            echo "任务 {$task} 处理失败:{$e->getMessage()}n";
            $this->handleFailure($task, $e);
        }
    }
}

private function processSingleTask($taskId) {
    // 这里是核心业务逻辑
    // 比如:调用第三方API发布,或者写入数据库状态
    echo "正在发布:{$taskId}...n";

    // 模拟业务逻辑
    if (rand(0, 10) > 8) {
        throw new Exception("模拟API超时错误!");
    }

    // 假设发布成功
    echo "发布成功!n";
}

private function handleFailure($taskId, $exception) {
    // 失败了怎么办?这里我们先简单地把任务扔回队列
    // 记得这里要重新计算时间戳,而不是直接扔回去,因为现在可能还没到时间
    $now = time();

    // 1. 放回延迟队列,设置下次尝试时间为 30秒后
    $retryAfter = $now + 30; 
    $this->redis->zAdd('delayed_queue', $retryAfter, $taskId);

    echo "任务 {$taskId} 正在进入休眠(重试),休眠时长:30秒n";
}

3. 关于 handleFailure 的吐槽

上面的代码其实有个大坑。如果你在 processSingleTask 里只是简单的 zAdd 回去,它会在30秒后再次触发。这没问题,但它会一直占用你的 delayed_queue

更好的做法是,我们要区分“等待时间”和“重试间隔”。如果文章设定是1小时后发,结果现在失败了,你总不能让它在30秒后再次被检查,然后发现时间还没到,又扔回去吧?这太浪费了。

优化方案: 只有当任务被触发(即当前时间已经到了设定的发布时间)但执行失败时,才应该进行重试。

让我们修改一下 worker.php 的逻辑:

// 伪代码逻辑
$tasks = $redis->zRevRangeByScore('delayed_queue', $now, '-inf', 0, $batchSize);

foreach ($tasks as $task) {
    // 获取任务的原始计划时间
    $scheduleTime = $redis->zScore('delayed_queue', $task);

    // 校验:如果任务被从 delayed_queue 移除了(可能被重新放入了 retry_queue),就跳过
    if ($scheduleTime === false) continue;

    // 核心逻辑:
    // 如果现在是 10:00,任务是 10:00 发布的 -> 执行
    // 如果任务是 11:00 发布的 -> 跳过(等待)
    if ($now < $scheduleTime) {
        // 没到时间,把它放回去(虽然刚才 zRem 了,但为了安全,这里可以加个判断,或者直接略过)
        // 实际上 zRem 后这个 key 就不在了,所以这里逻辑有点绕。
        // 更好的方式是:把所有到期的拿出来,按计划时间排序,只处理计划时间 <= now 的。
        continue; 
    }

    // 执行任务
    try {
        $this->doPublish($task);
    } catch (Exception $e) {
        // 失败了,怎么处理?
        // 情况A:还没到计划时间?不存在的,如果是这种情况,上面的 if 判断已经拦截了。
        // 情况B:已经到时间了,但是失败了。

        $this->retryTask($task, $scheduleTime);
    }
}

第四讲:失败重试的艺术——指数退避

如果系统挂了,任务一直失败怎么办?我们要无限重试吗?无限重试会让数据库被查挂,或者把第三方API的接口彻底封掉。我们需要指数退避算法

这就是“欠债还钱”的逻辑。第一次失败,等1分钟再试;第二次失败,等2分钟;第三次失败,等4分钟……直到次数用尽。

我们需要给任务加一个属性:retry_count

class RetryHandler
{
    private $redis;
    private $maxRetries = 5; // 最多重试5次
    private $baseDelay = 60; // 基础延迟60秒

    public function retryTask($taskId, $originalScheduleTime)
    {
        // 获取当前的尝试次数,没有就是0
        $attempts = $this->redis->hGet('task_meta', $taskId . ':attempts') ?: 0;

        if ($attempts >= $this->maxRetries) {
            // 死信队列
            $this->moveToDeadLetter($taskId);
            echo "任务 {$taskId} 已进入死信队列,放弃治疗。n";
            return;
        }

        // 计算新的等待时间:baseDelay * 2^attempts
        $delay = $this->baseDelay * pow(2, $attempts);

        // 更新元数据
        $this->redis->hIncrBy('task_meta', $taskId . ':attempts', 1);
        $this->redis->hSet('task_meta', $taskId . ':last_error', time());

        // 计算下次触发的时间戳
        // 这里的策略是:任务设定是10:00发,如果10:00失败了,我们10:05再试。
        // 但是要注意,这个任务实际上是“10:05发”,而不是“10:00发”。
        // 如果我们把它放回 delayed_queue,它的 score 应该是 10:05。

        $nextTime = time() + $delay;

        $this->redis->zAdd('delayed_queue', $nextTime, $taskId);
        echo "任务 {$taskId} 重试中... 倒计时:{$delay}秒 (尝试次数: " . ($attempts + 1) . ")n";
    }

    private function moveToDeadLetter($taskId)
    {
        // 把任务移到一个专门的 List 里,方便运维人工介入
        $this->redis->rPush('dead_letter_queue', $taskId);
    }
}

关键点解析:
这里我们引入了一个 task_meta Hash表。为什么要用Hash?因为我们要记录每个任务的详细信息:重试次数、最后错误时间、最后一次重试时间。这样我们在排查问题时,一眼就能看到这个任务是不是个“老油条”。


第五讲:死信队列与监控

如果重试了5次(也就是等待了 60s + 120s + 240s + 480s + 960s ≈ 23分钟)还没成功,怎么办?

这时候,我们不能再往 delayed_queue 里塞了。因为如果这个任务是因为“文章内容格式错误”导致的,你给它延时23分钟再试,它还是不会成功,只会浪费你的检查器资源。

我们必须把它移入死信队列

死信队列不是用来继续处理的,它是用来报警人工介入的。

// 在 Cron 脚本中
// 假设我们在 processSingleTask 里面捕获到了异常
catch (Exception $e) {
    $attempts = $this->redis->hGet('task_meta', $taskId . ':attempts') ?: 0;

    if ($attempts >= 5) {
        // 写入死信队列
        $errorLog = "任务 {$taskId} 失败次数已达上限。错误信息:{$e->getMessage()}";
        file_put_contents("logs/dlq.log", date('Y-m-d H:i:s') . " " . $errorLog . "n", FILE_APPEND);

        // 可选:发送邮件给运维老王
        // mail('[email protected]', '定时任务失败', $errorLog);
    } else {
        // 正常重试逻辑...
    }
}

监控:怎么知道有没有漏掉任务?

既然是定时任务,监控就是生命线。我们可以写一个简单的脚本来检查 Redis 里的队列长度。

// monitor.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$delayedCount = $redis->zCard('delayed_queue');
$processingCount = $redis->lLen('processing_queue');
$deadCount = $redis->lLen('dead_letter_queue');

echo "当前定时任务队列长度:{$delayedCount}n";
echo "当前正在处理队列长度:{$processingCount}n";
echo "死信队列长度:{$deadCount}n";

if ($delayedCount > 1000) {
    // 队列堆积了!警报!
    echo "警报!任务堆积过多,可能是机器卡死了!n";
}

第六讲:高并发下的分布式一致性

讲到这里,你以为这就结束了?天真。我们刚才的代码在单机环境下跑得风生水起,但一旦你把这台服务器部署到了负载均衡后面,或者你有两台服务器在抢活干,就出了大问题。

问题场景:
服务器A和服务器B都在跑Worker。

  1. Cron脚本每分钟执行一次。
  2. Server A 查到了10个任务,扔进 processing_queue
  3. Server B 也查到了这10个任务,也扔进 processing_queue
  4. 结果:Server A 处理了10个,Server B 也处理了10个。文章发了20次!

解决方案:分布式锁。

我们需要在把任务从 delayed_queue 移到 processing_queue 的过程中,加一把锁。

Redis提供了 SETNX 命令,这是“SET if Not eXists”的缩写。如果key不存在,就设置并返回1;如果存在,返回0。

我们可以用一个特殊的Key,比如 lock:processing

// 在 worker.php 中
$lockKey = 'lock:processing';

// 尝试获取锁,设置过期时间10秒(防止死锁,比如进程挂了没释放)
$isLocked = $redis->set($lockKey, 1, ['NX', 'EX' => 10]);

if (!$isLocked) {
    // 锁被占用了,说明其他机器正在干活,或者上一分钟的任务还没跑完
    // 我们可以选择跳过这次执行,或者sleep一下再试
    echo "锁被占用,跳过本次执行n";
    continue;
}

try {
    // 获取到锁了,开始干活
    // ... 原有的 zRem 和 lPush 逻辑 ...

    // 假设我们处理了100个任务
    // ...

} finally {
    // 无论成功失败,都要释放锁
    // 注意:生产环境最好用 Lua 脚本保证原子性,这里简化处理
    $redis->del($lockKey);
}

进阶技巧:
更完美的方案是,利用Redis的 Pipeline(管道)Lua脚本
因为 zRem + lPush + del(lock) 这三个操作不是原子的。如果在 zRemlPush 之间,Server B 查询了队列,它也能查到这个任务(因为它还没被移除)。

所以,必须使用Lua脚本。Lua脚本在Redis中是原子执行的。

-- lua_script.lua
local now = tonumber(ARGV[1])
local batchSize = tonumber(ARGV[2])

-- 1. 获取锁
local lockKey = KEYS[1]
if redis.call("set", lockKey, "1", "NX", "EX", 10) == 0 then
    return {0} -- 没拿到锁
end

-- 2. 获取任务
local tasks = redis.call("ZREVRANGEBYSCORE", "delayed_queue", now, "-inf", "LIMIT", 0, batchSize)

if #tasks == 0 then
    return {0} -- 没任务
end

-- 3. 移除任务
redis.call("ZREMRANGEBYSCORE", "delayed_queue", now, "-inf")

-- 4. 放入处理队列
for i, task in ipairs(tasks) do
    redis.call("LPUSH", "processing_queue", task)
end

return {1, tasks}

然后在PHP中调用:

$lua = <<<LUA
-- 同上
LUA;

$redis->eval($lua, 1, 'lock:processing', time(), 100);

第七讲:终极形态——不仅仅是文章发布

讲到现在,我们实现了一个基于Redis ZSET的定时任务系统。其实这就是大名鼎鼎的 ResqueBull 甚至 Laravel Queue 的底层逻辑的简化版。

这个系统可以扩展到:

  1. 秒杀扣库存:不用每次都去查数据库,先把库存压入队列,倒计时扣减。
  2. 发送邮件/短信:把成千上万条短信任务丢进队列,慢慢处理,避免把短信接口打挂。
  3. 数据同步:把需要同步的任务扔进去,失败自动重试。

总结一下我们要遵守的“黄金法则”:

  1. 别用 sleep,用 Cron:让操作系统帮你管理进程,而不是自己写死循环。
  2. 用 ZSET 做时间轴:Redis的有序集合是处理时间队列的神器。
  3. 用指数退避防挂机:别跟服务器硬刚,让它缓一缓。
  4. 死信队列是救命稻草:别让失败的任务无限循环,它们是系统的噪音。
  5. 分布式锁保平安:多台机器跑时,锁是兄弟情谊的试金石。

好了,今天的讲座就到这里。现在,请打开你的编辑器,把那个每分钟查一次数据库的烂代码删了,重构成我们刚才讲的这个基于Redis的智能闹钟系统。

如果你在重试的时候遇到问题,记得检查你的Redis内存够不够,或者你的PHP扩展(Swoole/Workerman)有没有正确加载。别忘了,编程不仅是写代码,更是修bug和调优。下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注