PHP `Non-Blocking Sockets` (`stream_set_blocking`) 与 `select()`/`poll()`/`epoll()`

各位观众,大家好!我是今天的主讲人,江湖人称“代码界的老司机”,今天咱们不飙车,聊聊PHP里那些让人又爱又恨的“非阻塞Sockets”。

咱们先来热热身,想象一下,你是一个饭馆老板,客人(请求)来了,你得招呼,还得让后厨(服务器)做菜。如果每个客人你都死守着,等他吃完再招呼下一个,那得饿死多少人? 这就是阻塞模式,效率低下,简直要破产!

所以,聪明的饭馆老板会怎么做? 没错,就是非阻塞模式! 招呼完客人,记下他的桌号(文件描述符),就去招呼下一个。后厨做好菜,再根据桌号送过去。

在PHP的世界里,stream_set_blocking就是控制“招呼客人”方式的开关,而select()poll()epoll()则是你用来监控“后厨上菜”情况的眼睛。

一、stream_set_blocking: 打开非阻塞的大门

stream_set_blocking函数,顾名思义,就是用来设置stream(流)的阻塞模式的。在Socket编程中,这个stream通常就是你的Socket资源。

<?php

// 创建一个Socket
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

// 连接服务器 (假设服务器地址是 127.0.0.1:8080)
$result = socket_connect($socket, '127.0.0.1', 8080);

if ($result === false) {
    echo "连接失败: " . socket_strerror(socket_last_error($socket)) . "n";
    exit;
}

// 设置为非阻塞模式
stream_set_blocking($socket, false);

echo "Socket已设置为非阻塞模式n";

// 现在可以进行非阻塞的读写操作了
?>

这段代码很简单,首先创建了一个Socket,然后尝试连接服务器。关键在于stream_set_blocking($socket, false);这一行,它将Socket设置为非阻塞模式。

阻塞 vs. 非阻塞:一场效率的较量

特性 阻塞模式 非阻塞模式
函数调用 会一直等待操作完成才返回 立即返回,不管操作是否完成
CPU利用率 低 (等待时CPU空闲) 高 (需要不断轮询检查)
适用场景 客户端数量少,并发要求不高 客户端数量多,并发要求高
实现复杂度 简单 复杂 (需要配合select()等函数)
响应速度 慢 (需要等待) 快 (可以同时处理多个连接)

二、select()poll()epoll(): 三双锐利的眼睛

非阻塞模式开启后,你的程序不会傻傻地等待,而是会继续执行。但是,你怎么知道Socket上是否有数据可读,或者是否可以写入数据呢? 这就需要select()poll()epoll()这三位大神出场了。

它们的作用都是监控多个Socket的状态,告诉你哪些Socket可以读,哪些可以写,哪些发生了错误。 就像饭馆老板通过监控系统,知道哪些桌的菜做好了,哪些桌的客人需要加水一样。

1. select(): 最古老但依然好用

select()是POSIX标准中定义的函数,几乎所有的操作系统都支持它。 它接收三个数组作为参数:

  • $read: 需要监控可读状态的Socket数组。
  • $write: 需要监控可写状态的Socket数组。
  • $except: 需要监控异常状态的Socket数组。
<?php

// 假设我们有三个Socket需要监控
$socket1 = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($socket1, '127.0.0.1', 9001);
socket_listen($socket1);
stream_set_blocking($socket1, false);

$socket2 = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($socket2, '127.0.0.1', 9002);
socket_listen($socket2);
stream_set_blocking($socket2, false);

$socket3 = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_connect($socket3, '127.0.0.1', 9001);
stream_set_blocking($socket3, false);

$read_sockets = [$socket1, $socket2, $socket3];
$write_sockets = [$socket3]; // 假设 socket3需要写入数据
$except_sockets = [];

// 设置超时时间为5秒
$timeout_seconds = 5;
$timeout_microseconds = 0;

// 使用select()监控Socket状态
$num_changed_sockets = socket_select($read_sockets, $write_sockets, $except_sockets, $timeout_seconds, $timeout_microseconds);

if ($num_changed_sockets === false) {
    echo "socket_select() 失败: " . socket_strerror(socket_last_error()) . "n";
    exit;
} elseif ($num_changed_sockets > 0) {
    echo "有 " . $num_changed_sockets . " 个Socket状态发生了变化n";

    // 检查哪些Socket可读
    foreach ($read_sockets as $read_socket) {
        if ($read_socket === $socket1) {
            // socket1 有新的连接请求
            $new_socket = socket_accept($socket1);
            if($new_socket) {
                echo "socket1 接受到新的连接n";
                stream_set_blocking($new_socket, false);
                // 将新的socket加入监听队列
            }

        } elseif ($read_socket === $socket2) {
            // socket2 有新的连接请求
             $new_socket = socket_accept($socket2);
             if($new_socket) {
                echo "socket2 接受到新的连接n";
                stream_set_blocking($new_socket, false);
                // 将新的socket加入监听队列
            }
        } elseif ($read_socket === $socket3) {
            // socket3 有数据可读
            echo "socket3 有数据可读n";
            $data = socket_read($socket3, 1024); // 读取数据
            if ($data === false) {
                echo "socket_read() 失败: " . socket_strerror(socket_last_error($socket3)) . "n";
            } else {
                echo "接收到的数据: " . $data . "n";
            }
        }
    }

    // 检查哪些Socket可写
    foreach ($write_sockets as $write_socket) {
        if ($write_socket === $socket3) {
            // socket3 可以写入数据
            echo "socket3 可以写入数据n";
            $message = "Hello, Server!";
            $bytes_sent = socket_write($socket3, $message, strlen($message));
            if ($bytes_sent === false) {
                echo "socket_write() 失败: " . socket_strerror(socket_last_error($socket3)) . "n";
            } else {
                echo "成功发送 " . $bytes_sent . " 字节的数据n";
            }
        }
    }

    // 检查哪些Socket发生了异常
    foreach ($except_sockets as $except_socket) {
        // 处理异常
        echo "Socket 发生了异常n";
    }
} else {
    echo "超时,没有Socket状态发生变化n";
}

?>

select()的缺点:

  • 效率较低: select()使用轮询的方式检查Socket状态,需要遍历整个Socket数组。
  • 最大连接数限制: select()能够监控的Socket数量受到限制,通常是1024个。
  • 需要手动管理Socket数组: 每次调用select()之前,都需要重新构建Socket数组。

2. poll(): select()的升级版

poll()解决了select()的一些缺点,它使用pollfd结构体来描述Socket的状态,可以监控更多的Socket。

<?php

// 假设我们有三个Socket需要监控
$socket1 = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($socket1, '127.0.0.1', 9001);
socket_listen($socket1);
stream_set_blocking($socket1, false);

$socket2 = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($socket2, '127.0.0.1', 9002);
socket_listen($socket2);
stream_set_blocking($socket2, false);

$socket3 = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_connect($socket3, '127.0.0.1', 9001);
stream_set_blocking($socket3, false);

// 创建 pollfd 数组
$pollfds = [
    ['fd' => $socket1, 'events' => POLLIN, 'revents' => 0],
    ['fd' => $socket2, 'events' => POLLIN, 'revents' => 0],
    ['fd' => $socket3, 'events' => POLLIN | POLLOUT, 'revents' => 0], // 同时监听可读和可写
];

// 设置超时时间为5秒
$timeout = 5000; // 毫秒

// 使用 poll() 监控 Socket 状态
$num_events = socket_poll($pollfds, $timeout);

if ($num_events === false) {
    echo "socket_poll() 失败: " . socket_strerror(socket_last_error()) . "n";
    exit;
} elseif ($num_events > 0) {
    echo "有 " . $num_events . " 个 Socket 状态发生了变化n";

    // 检查哪些 Socket 发生了事件
    foreach ($pollfds as $pollfd) {
        $socket = $pollfd['fd'];
        $revents = $pollfd['revents'];

        if ($revents & POLLIN) {
            if ($socket === $socket1) {
                // socket1 有新的连接请求
                 $new_socket = socket_accept($socket1);
                 if($new_socket) {
                     echo "socket1 接受到新的连接n";
                     stream_set_blocking($new_socket, false);
                     // 将新的socket加入监听队列
                 }
            } elseif ($socket === $socket2) {
                // socket2 有新的连接请求
                 $new_socket = socket_accept($socket2);
                 if($new_socket) {
                     echo "socket2 接受到新的连接n";
                     stream_set_blocking($new_socket, false);
                     // 将新的socket加入监听队列
                 }
            } elseif ($socket === $socket3) {
                // socket3 有数据可读
                echo "socket3 有数据可读n";
                $data = socket_read($socket3, 1024); // 读取数据
                if ($data === false) {
                    echo "socket_read() 失败: " . socket_strerror(socket_last_error($socket3)) . "n";
                } else {
                    echo "接收到的数据: " . $data . "n";
                }
            }
        }

        if ($revents & POLLOUT) {
            if ($socket === $socket3) {
                // socket3 可以写入数据
                echo "socket3 可以写入数据n";
                $message = "Hello, Server!";
                $bytes_sent = socket_write($socket3, $message, strlen($message));
                if ($bytes_sent === false) {
                    echo "socket_write() 失败: " . socket_strerror(socket_last_error($socket3)) . "n";
                } else {
                    echo "成功发送 " . $bytes_sent . " 字节的数据n";
                }
            }
        }

        if ($revents & (POLLERR | POLLHUP | POLLNVAL)) {
            // 处理错误
            echo "Socket 发生了错误n";
            socket_close($socket);
            // 从 pollfds 数组中移除该 Socket
        }
    }
} else {
    echo "超时,没有 Socket 状态发生变化n";
}

?>

poll()的优点:

  • 没有最大连接数限制: poll() 使用链表存储Socket,理论上可以监控无限个Socket。
  • 可以同时监听可读和可写事件: 通过设置events属性,可以同时监听Socket的可读和可写状态。

poll()的缺点:

  • 效率依然较低: poll()仍然使用轮询的方式检查Socket状态,需要遍历整个pollfd数组。
  • 需要手动管理pollfd数组: 每次调用socket_poll()之前,都需要重新构建pollfd数组。

3. epoll(): 高性能的王者

epoll()是Linux特有的I/O多路复用技术,它使用事件驱动的方式监控Socket状态,只有当Socket状态发生变化时,才会通知应用程序。 就像饭馆的监控系统,只有当后厨做好菜时,才会发出通知,而不是每隔一段时间就检查一下。

<?php

// 创建 epoll 实例
$epoll = epoll_create();

if ($epoll === false) {
    echo "epoll_create() 失败n";
    exit;
}

// 假设我们有两个 Socket 需要监控
$socket1 = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($socket1, '127.0.0.1', 9001);
socket_listen($socket1);
stream_set_blocking($socket1, false);

$socket2 = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($socket2, '127.0.0.1', 9002);
socket_listen($socket2);
stream_set_blocking($socket2, false);

// 将 Socket 加入 epoll 监听
if (epoll_add($epoll, $socket1, EPOLLIN) === false) {
    echo "epoll_add() 失败: " . socket_strerror(socket_last_error()) . "n";
    exit;
}

if (epoll_add($epoll, $socket2, EPOLLIN) === false) {
    echo "epoll_add() 失败: " . socket_strerror(socket_last_error()) . "n";
    exit;
}

echo "开始 epoll 监听...n";

while (true) {
    // 等待事件发生,超时时间为 5 秒
    $events = epoll_wait($epoll, 5000);

    if ($events === false) {
        echo "epoll_wait() 失败: " . socket_strerror(socket_last_error()) . "n";
        break;
    } elseif (count($events) > 0) {
        echo "有 " . count($events) . " 个 Socket 状态发生了变化n";

        foreach ($events as $socket => $event) {
            if ($socket === $socket1) {
                // socket1 有新的连接请求
                $new_socket = socket_accept($socket1);
                if($new_socket) {
                    echo "socket1 接受到新的连接n";
                    stream_set_blocking($new_socket, false);
                    // 将新的socket加入监听队列
                    epoll_add($epoll, $new_socket, EPOLLIN); // 加入epoll监听
                }
            } elseif ($socket === $socket2) {
                // socket2 有新的连接请求
                $new_socket = socket_accept($socket2);
                if($new_socket) {
                    echo "socket2 接受到新的连接n";
                    stream_set_blocking($new_socket, false);
                    // 将新的socket加入监听队列
                    epoll_add($epoll, $new_socket, EPOLLIN); // 加入epoll监听
                }
            } else {
                // 其他 Socket 有数据可读
                echo "Socket 有数据可读n";
                $data = socket_read($socket, 1024); // 读取数据
                if ($data === false) {
                    echo "socket_read() 失败: " . socket_strerror(socket_last_error($socket)) . "n";
                    epoll_del($epoll, $socket); // 出错移除监听
                    socket_close($socket);

                } else {
                    echo "接收到的数据: " . $data . "n";
                }
            }
        }
    } else {
        echo "超时,没有 Socket 状态发生变化n";
    }
}

// 关闭 epoll 实例
epoll_close($epoll);

?>

epoll()的优点:

  • 高性能: 使用事件驱动的方式,只有当Socket状态发生变化时才会通知应用程序。
  • 没有最大连接数限制: 可以监控大量的Socket。
  • 不需要每次都重新构建Socket数组: 只需要在添加或删除Socket时更新即可。

epoll()的缺点:

  • 只能在Linux系统中使用: 其他操作系统不支持epoll()
  • 实现相对复杂: 需要理解epoll的工作原理。

三、性能对比: 谁是真正的王者?

函数 操作系统支持 最大连接数 性能 实现复杂度
select() 几乎所有 1024 较低 简单
poll() POSIX 无限制 中等 中等
epoll() Linux 无限制 极高 复杂

总结:

  • 如果你的应用只需要处理少量的并发连接,select()是一个不错的选择。
  • 如果你的应用需要处理大量的并发连接,并且运行在Linux系统上,epoll()是最佳选择。
  • 如果你的应用需要跨平台,poll()是一个不错的折中方案。

一些额外的建议:

  • 错误处理: 在Socket编程中,错误处理非常重要。 务必检查每个函数的返回值,并处理可能出现的错误。
  • 超时设置: 为了防止程序长时间阻塞,可以设置超时时间。
  • 缓冲区大小: 根据实际情况调整Socket的缓冲区大小,以提高数据传输效率。
  • 非阻塞I/O库: 如果你不想自己手动管理Socket,可以使用一些非阻塞I/O库,例如ReactPHP、Swoole等。

好了,今天的讲座就到这里。希望大家能够掌握PHP非阻塞Sockets的精髓,写出高性能的网络应用! 记住,代码的世界里,没有绝对的银弹,只有最适合你的解决方案。 祝大家编程愉快!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注