PHP如何实现多进程爬虫系统并解决任务调度与去重问题

PHP多进程爬虫深度实战:从调度到去重的“武林秘籍”

大家好,我是你们的PHP架构导师。

今天我们不聊“如何优雅地连接数据库”,也不聊“Laravel怎么优雅地报错”。今天我们要聊一个在PHP世界里稍微带点“硬核”属性的话题——多进程爬虫系统

很多人听到“PHP”和“多进程”这两个词,第一反应可能是:“嘿,老兄,PHP不是单线程的脚本语言吗?搞多线程不是找罪受?”

哈哈,朋友,你的观念该更新了。PHP完全可以在CLI(命令行)模式下,利用 pcntl 扩展玩转多进程。虽然它不像Go那样原生就是并发之王,也不像Node那样基于事件循环,但PHP通过“暴力美学”——进程池,依然能构建出高吞吐、高并发的爬虫系统。

今天这堂课,我们就来解剖一只名为“蜘蛛”的怪兽,重点解决两个核心痛点:任务调度去重问题。准备好了吗?系好安全带,我们开始。


第一部分:为什么我们需要多进程?(打破迷思)

想象一下,你有一个任务:爬取100万个网页。

如果你用单线程(或者单进程)去爬,就像是一个叫“小明”的快递员,背上背着一个巨大的背包,从A地跑到B地,爬完一个拿一个,再跑下一个。假设每个网页耗时1秒,100万个网页就要跑上11天。

这时候,小明肯定会累瘫,而且网络稍有波动,他就得从头开始。效率极低,不可取。

多进程爬虫是什么?那就是雇了100个“小明”(Worker进程)。这100个“小明”每人手里拿着一个GPS(Redis队列),去爬取自己的那一部分任务。如果有10万个网页,每个人只爬1万个,耗时瞬间从11天缩短到几个小时。

PHP实现多进程的核心武器有两个:

  1. pcntl_fork():这是PHP的“分身术”,可以把一个进程复制成两个,互不干扰。
  2. posix 扩展:这是进程管理员的工具,用来获取进程ID、PID,处理信号。

进程与线程的区别(通俗版)

很多新手分不清进程和线程。为了方便理解,我们打个比方:

  • 进程:像是一个独立的房间。房间里有桌椅,有电脑,有空气。进程A在房间里疯狂打字,进程B在隔壁房间睡觉。A被电击了,B毫发无损。
  • 线程:像是一个房间里的“魂”。线程A和线程B都在同一个房间里。A把桌子砸了,B也会发现桌子没了。

PHP的设计哲学倾向于“房间”(进程),因为PHP的内存模型不支持多线程共享内存。虽然稍微牺牲了一点内存效率,但换来了极高的稳定性和安全性——一个Worker崩了,不会把整个系统搞挂。


第二部分:系统架构蓝图

在开始写代码之前,我们必须先画好图。这是一个典型的 Master-Worker(主从)架构

  1. Master进程:老板。它不干活,它只负责两件事:初始化,然后通过 pcntl_fork() 把任务分发给Worker。它还得负责监控Worker是否活着,如果Worker挂了,Master得赶紧再招一个替补。
  2. Redis队列:任务分发中心。Master把所有待爬的URL通过 LPUSH 放进去,Worker通过 RPOP 取出来。这是调度核心。
  3. Worker进程:打工人。它拿到URL,去爬取内容,解析数据,然后把结果存入MySQL,最后再去队列里领新任务。

第三部分:核心组件一——任务调度(Redis队列)

调度是爬虫的大脑。我们不能让Worker们没事干,也不能让Master瞎指挥。

Redis的选择理由:

  1. 速度快:内存操作,纳秒级响应。
  2. 原子性RPOP 操作是原子的,保证同一个URL不会被两个Worker抢走。
  3. List数据结构:天然适合做队列。

代码实现:调度器逻辑

<?php

// 这是一个简单的Redis队列客户端
class QueueClient
{
    private $redis;
    private $queueName = 'spider:urls';

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    /**
     * 放入任务 (生产者)
     */
    public function push(array $urls)
    {
        $this->redis->lPush($this->queueName, ...$urls);
    }

    /**
     * 取出任务 (消费者)
     * @param int $timeout 超时时间,0表示一直等
     */
    public function pop($timeout = 0)
    {
        // BRPOP 是阻塞读取,非常适合Worker进程等待任务
        // 它会一直阻塞,直到有新数据进来,或者超时
        $result = $this->redis->brPop($this->queueName, $timeout);

        if ($result) {
            // $result 是一个数组 ['queue_name', 'url'], 我们只要第二个元素
            return $result[1];
        }
        return false;
    }

    // 获取队列长度,用于监控
    public function count() {
        return $this->redis->lLen($this->queueName);
    }
}

注意: 这里使用了 BRPOP 而不是 RPOPRPOP 是一旦没有数据就立刻返回 false,Worker就会空转去检查,消耗CPU。BRPOP 会让Worker进程“挂起”等待,直到有新任务砸在Redis上,这才是最高效的调度。


第四部分:核心组件二——去重问题(布隆过滤器)

来了,重头戏。去重

如果你不解决去重问题,你的爬虫跑几天后就会变成“复读机”。你会看到日志里疯狂打印:“爬取 http://example.com/… 成功”重复了100遍。

为什么不能直接查数据库?
你可以把URL存入MySQL的 visited 表。但你要知道,爬虫的并发量是巨大的。每秒钟可能有几千个请求,每次请求都去MySQL查一次“这网址爬过吗?”,数据库会瞬间被打爆(IO瓶颈),CPU利用率飙升到100%然后卡死。

解决方案:布隆过滤器

布隆过滤器是一种非常精妙的算法。它是一个很长的二进制位向量(数组)。当你插入一个URL时,它会被计算成3个(或者更多)不同的哈希值,然后对应的位数被置为1。

  • 优点:速度极快,不需要查库,内存占用极小。
  • 缺点有误判。它可能会说“这个URL我爬过”,但实际上没爬过(误判为存在);但它绝不会说“这个URL没爬过”,而实际上爬过(漏判是不可能的)。

对于爬虫来说,误判是可以容忍的(浪费一次HTTP请求,总比卡死数据库强)。

代码实现:PHP原生布隆过滤器

虽然PHP有扩展,但为了代码的独立性,我们手写一个简易版。注意,生产环境建议使用 redis-bloom 扩展,或者 swoole 自带的布隆过滤器,性能更高。

<?php

class BloomFilter
{
    private $size;      // 位图大小
    private $hashCount; // 哈希函数个数
    private $bits = []; // 位图数组

    public function __construct($size, $hashCount = 3)
    {
        // size 必须是2的幂次方,方便取模运算优化
        $this->size = $this->getNextPrime($size);
        $this->hashCount = $hashCount;
    }

    // 生成一个接近的素数,保证散列均匀
    private function getNextPrime($n)
    {
        while (!isPrime($n)) {
            $n++;
        }
        return $n;
    }

    // 简单的质数检查
    private function isPrime($n)
    {
        if ($n <= 1) return false;
        if ($n <= 3) return true;
        if ($n % 2 == 0 || $n % 3 == 0) return false;
        $i = 5;
        while ($i * $i <= $n) {
            if ($n % $i == 0 || $n % ($i + 2) == 0) return false;
            $i += 6;
        }
        return true;
    }

    /**
     * 插入数据
     */
    public function add($str)
    {
        $hashes = $this->hashes($str);
        foreach ($hashes as $hash) {
            $index = $hash % $this->size;
            $this->bits[$index] = 1;
        }
    }

    /**
     * 检查数据是否存在
     */
    public function exists($str)
    {
        $hashes = $this->hashes($str);
        foreach ($hashes as $hash) {
            $index = $hash % $this->size;
            if (!isset($this->bits[$index]) || $this->bits[$index] == 0) {
                return false; // 一定不存在
            }
        }
        return true; // 可能存在(误判)
    }

    // 简单的字符串哈希函数
    private function hashes($str)
    {
        $h1 = crc32($str);
        $h2 = crc32($str . 'salt');
        $h3 = crc32($str . 'salt2');
        return [$h1, $h2, $h3];
    }
}

// 使用示例
$bf = new BloomFilter(10000); // 模拟1万个槽位

$bf->add('http://example.com/1');
$bf->add('http://example.com/2');

var_dump($bf->exists('http://example.com/1')); // bool(true)
var_dump($bf->exists('http://example.com/99')); // bool(false) (没存过,肯定不存在)

实战建议:位图初始化可以序列化存文件,这样重启PHP进程后,记忆不会丢失。


第五部分:大杀器——Master-Worker 多进程实现

现在,我们把调度器和过滤器组装起来,写一个真正的Master程序。

这个程序需要处理几个棘手的问题:

  1. 僵尸进程:子进程结束时,父进程不收尸,子进程就会变成僵尸。
  2. 信号处理:按下Ctrl+C(SIGINT)时,如何优雅地关闭所有子进程,而不是让它们裸奔。
  3. 重试机制:网络抖动导致失败,下次再爬。

完整代码示例

<?php
/**
 * SpiderMaster.php
 * PHP多进程爬虫系统主控
 */

require_once 'QueueClient.php';
require_once 'BloomFilter.php';

class SpiderMaster
{
    private $masterPid;
    private $workerCount = 5; // 开启5个Worker
    private $workerProcs = []; // 存储子进程的PID
    private $queue;
    private $bf;
    private $isRunning = true;

    public function __construct()
    {
        $this->masterPid = getmypid();
        $this->queue = new QueueClient();
        // 初始化布隆过滤器,这里为了演示简单,不持久化,重启会丢失
        // 实际项目请用Redis BF
        $this->bf = new BloomFilter(5000000); // 500万容量

        $this->signalHandler();
        $this->forkWorkers();
    }

    /**
     * 注册信号处理
     */
    private function signalHandler()
    {
        pcntl_async_signals(true); // 开启异步信号

        pcntl_signal(SIGTERM, function ($signo) {
            $this->log("收到 SIGTERM 信号,准备关闭...");
            $this->shutdown();
        });

        pcntl_signal(SIGINT, function ($signo) {
            $this->log("收到 Ctrl+C 信号,准备关闭...");
            $this->shutdown();
        });
    }

    /**
     * 启动子进程
     */
    private function forkWorkers()
    {
        $this->log("Master PID: {$this->masterPid} 正在雇佣 {$this->workerCount} 个工人...");

        for ($i = 0; $i < $this->workerCount; $i++) {
            $pid = pcntl_fork();

            if ($pid == -1) {
                $this->log("无法fork进程!");
                exit(1);
            } elseif ($pid) {
                // 父进程逻辑:记录子进程PID
                $this->workerProcs[$pid] = true;
            } else {
                // 子进程逻辑:立即执行Worker代码
                $this->runWorker();
                exit(0);
            }
        }

        // 父进程继续监听子进程退出
        $this->monitorWorkers();
    }

    /**
     * 子进程执行体
     */
    private function runWorker()
    {
        // 子进程可以设置自己的ID,方便调试
        $workerId = posix_getpid();
        $this->log("Worker #{$workerId} 已上线,开始工作...");

        while ($this->isRunning) {
            // 1. 从Redis获取任务,设置超时时间2秒
            // 如果2秒没任务,会返回false,此时可以跳出循环去处理一些清理工作
            $url = $this->queue->pop(2);

            if ($url) {
                $this->processUrl($url, $workerId);
            } else {
                // 超时,说明当前没有任务,Worker可以做一些维护
                // 或者这里可以什么都不做,等待下一次BRPOP
            }
        }

        $this->log("Worker #{$workerId} 退出了。");
    }

    /**
     * 处理单个URL
     */
    private function processUrl($url, $workerId)
    {
        // 1. 去重检查
        if ($this->bf->exists($url)) {
            // 这里的日志不用太频繁,否则刷屏
            // $this->log("Worker #{$workerId} 跳过已爬取: {$url}");
            return;
        }

        // 模拟爬取过程
        $this->log("Worker #{$workerId} 正在抓取: {$url}");

        // 简单的随机失败模拟
        if (rand(0, 100) < 5) { // 5%概率模拟失败
            $this->log("Worker #{$workerId} 抓取失败: {$url}");
            return;
        }

        // 模拟解析数据
        $data = [
            'url' => $url,
            'title' => '这里是解析出的标题',
            'content' => '这里是解析出的内容'
        ];

        // 2. 标记为已爬取 (存入布隆过滤器)
        $this->bf->add($url);

        // 3. 存入数据库 (异步写入,或者直接写)
        $this->saveData($data);

        $this->log("Worker #{$workerId} 抓取成功: {$url}");
    }

    /**
     * 模拟数据库写入
     */
    private function saveData($data)
    {
        // 实际项目中,这里可以用 swoole 协程 或者 异步队列
        // 这里为了演示,直接sleep模拟IO
        usleep(100000); // 0.1秒
        // echo "Saved: " . $data['url'] . PHP_EOL;
    }

    /**
     * 监控Worker状态
     */
    private function monitorWorkers()
    {
        while ($this->isRunning || !empty($this->workerProcs)) {
            // wait() 方法会阻塞,直到有子进程退出
            $pid = pcntl_wait($status);

            if ($pid > 0) {
                $this->log("Worker #{$pid} 退出了,Master正在招新...");
                unset($this->workerProcs[$pid]);

                // 如果还有Worker在跑,且任务还没完,重新启动一个
                if ($this->isRunning) {
                    $newPid = pcntl_fork();
                    if ($newPid) {
                        $this->workerProcs[$newPid] = true;
                    } else {
                        $this->runWorker();
                        exit(0);
                    }
                }
            }
        }
    }

    /**
     * 优雅关闭
     */
    private function shutdown()
    {
        $this->isRunning = false;

        // 告诉所有Worker退出
        foreach (array_keys($this->workerProcs) as $pid) {
            posix_kill($pid, SIGTERM);
        }

        // 等待所有子进程退出 (最多等5秒)
        $timeout = 5;
        $start = time();
        while (!empty($this->workerProcs) && (time() - $start < $timeout)) {
            pcntl_wait($status);
            // 这里简化逻辑,实际需要遍历移除已退出的PID
        }

        $this->log("系统已关闭");
        exit(0);
    }

    private function log($msg)
    {
        echo date('Y-m-d H:i:s') . " [PID:" . getmypid() . "] " . $msg . PHP_EOL;
    }
}

// 运行
$master = new SpiderMaster();

第六部分:深度解析——那些坑与细节

上面那套代码跑起来可能没问题,但如果你想让它真正投入生产环境,还得懂点“潜规则”。

1. 僵尸进程与孤儿进程

pcntl_fork 生成的孩子,谁来负责?

  • 孤儿进程:父进程死了,子进程还在跑。Linux的init进程(PID 1)会收养它,所以孤儿进程很安全。
  • 僵尸进程:子进程死了,但没告诉父进程。父进程不管,子进程就成了僵尸。占着位置不干事。

在Master代码中,我们用 pcntl_wait() 来“收尸”。如果不用 wait,数以万计的子进程死后,你的服务器内存迟早会爆。

2. 资源竞争

在多进程环境下,全局变量是共享的吗?不是! 除非你用了共享内存。
在刚才的代码里,$this->bf(布隆过滤器)在父进程中初始化。如果子进程直接修改 $this->bf,它只会修改自己进程内存里的副本,父进程不知道,其他子进程也不知道

解决方案A(性能差):每个Worker进程初始化一个自己的布隆过滤器。缺点是内存消耗大,但数据一致性最好。
解决方案B(推荐):使用 Redis 的 Bitmap 或者 RedisBloom 扩展。

  • Worker ADD URL -> SADD Redis Set。
  • Worker EXISTS URL -> SISMEMBER Redis Set。
  • 这样所有进程共享同一个状态库。

3. 错误处理

PHP有个大坑:子进程出错会导致整个父进程崩溃吗?
不一定。如果子进程里发生了致命错误(比如 Fatal Error),子进程会直接死亡,Master的 pcntl_wait 能感知到,然后Master会重启一个新的子进程。这其实是个保护机制。

但是,如果错误只是 Warning 或 Notice,它可能不会触发崩溃,导致子进程莫名其妙地卡住。

最佳实践:在 Worker 循环的最外层包上 try-catch,并设置 pcntl_signal(SIGUSR1, ...) 来处理自定义错误信号。

4. 线程安全锁

如果你在PHP中使用了 flock 锁文件,或者某些扩展的锁功能,请务必小心。在多进程环境下,文件锁通常是不起作用的(不同进程看到的锁状态不同)。多进程锁通常需要依赖 Redis 的 SETNX (分布式锁) 或者专门的共享内存锁。


第七部分:进阶篇——从多进程到异步协程

虽然上面的 Master-Worker 方案很稳健,但它本质上还是“同步阻塞”的。比如 Worker 进程在 fopen 一个网页时,它必须傻等 TCP 握手,等服务器响应。这期间 CPU 空转,等待 IO。

这时候,PHP 的另一个神器登场了:Swoole (或者 Workerman)

Swoole 允许你在单进程内开启数千个协程(Coroutines)。它自动处理了 IO 的阻塞问题,就像 Go 语言一样。

Swoole 下的爬虫架构会更简单:

  1. 一个主进程(或者完全不存在的协程入口)。
  2. 成百上千个协程
  3. 协程发起请求,遇到网络IO自动挂起,让出CPU给其他协程。
  4. 数据回来后,协程自动恢复继续执行。

对比之下:

  • PHP多进程:像是开了5个窗口办业务,每个窗口排队。适合CPU密集型或简单IO,内存消耗大。
  • Swoole协程:像是有一个大厅,1000个人去办业务,谁不忙谁就去办。适合高并发IO场景,内存消耗极小。

虽然本篇讲座的重点是传统的 多进程,但我强烈建议你在实战中,如果追求极致性能,直接上 Swoole2.x。Swoole 已经兼容了绝大部分 PHP 标准库的 API,你甚至不需要改你的 QueueClient 代码,直接把刚才的逻辑塞进去跑,性能可能会提升10倍以上。


总结

好了,伙计们。我们今天深入探讨了如何用 PHP 搭建一个多进程爬虫系统。

我们讲了:

  1. 进程 vs 线程:为什么PHP选择用进程来保命。
  2. Master-Worker 模式:如何用 pcntl_fork 搞定调度。
  3. Redis 调度:为什么 BRPOP 是Worker的最佳伴侣。
  4. 去重艺术:布隆过滤器如何拯救你的数据库。
  5. 细节打磨:如何优雅地处理信号和资源释放。

PHP 不止是写网站的,只要加上 pcntlposix,它就是一把好用的服务器端武器。

现在,把你那台卡顿的单线程脚本扔进垃圾桶吧。去写一个真正的分布式爬虫系统!记得,爬虫要有节操,遵守 robots.txt,别被封 IP。

如果你在实现过程中遇到了什么“奇奇怪怪”的错误,记得——那是代码在向你求救,也是你在向大师之路迈进。祝大家爬虫事业蒸蒸日上,数据采集如探囊取物!

下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注