基于Swoole实现WebSocket千万级连接:连接维持、心跳检测与消息推送架构设计
大家好,今天我们来聊聊如何利用Swoole构建一个能够支撑千万级并发WebSocket连接的系统,并重点关注连接维持、心跳检测和消息推送三个关键环节。
一、架构概述
要实现千万级WebSocket连接,单台服务器肯定是不够的,我们需要一个分布式架构。核心思想是将连接分散到多台Swoole服务器上,再通过一个中心服务来协调和管理这些连接。
以下是一个简化的架构图:
+-------------------+ +-------------------+ +-------------------+
| Client (用户) | <---> | Load Balancer | <---> | WebSocket Server |
+-------------------+ +-------------------+ +-------------------+
^
|
| (内部网络)
|
+-------------------+ +-------------------+
| Admin/Backend | <---> | Message Queue | <---> | Connection Manager |
+-------------------+ +-------------------+
- Client (用户): 用户的客户端,发起WebSocket连接。
- Load Balancer (负载均衡): 将用户请求均匀地分发到不同的WebSocket服务器上。可以使用Nginx、HAProxy等。
- WebSocket Server (WebSocket服务器): 基于Swoole构建,负责处理WebSocket连接,接收和发送消息。
- Message Queue (消息队列): 用于异步地在WebSocket服务器和后端服务之间传递消息,例如Redis、Kafka、RabbitMQ等。
- Connection Manager (连接管理器): 负责维护所有WebSocket连接的信息,例如用户ID、连接所在的服务器等。可以使用Redis、数据库等。
- Admin/Backend (管理后台/后端服务): 用于推送消息、管理用户等。
二、WebSocket服务器设计
WebSocket服务器是整个架构的核心,需要能够高效地处理大量的并发连接。
-
Swoole配置优化:
Swoole的配置直接影响服务器的性能,以下是一些关键配置项:
$server = new SwooleWebSocketServer("0.0.0.0", 9501); $server->set([ 'worker_num' => swoole_cpu_num() * 2, // 一般设置为CPU核心数的2倍 'max_request' => 0, // 避免worker进程重启 'dispatch_mode' => 2, // 固定模式,根据userId进行分配 'heartbeat_check_interval' => 60, // 每隔60秒检测一次心跳 'heartbeat_idle_time' => 120, // 连接超过120秒没有收到数据就close 'open_tcp_nodelay' => true, // 启用TCP_NODELAY,提高小数据包的实时性 'tcp_fastopen' => true, // 开启TCP快速握手 'log_file' => '/var/log/swoole.log', // 日志文件 'log_level' => SWOOLE_LOG_WARNING, // 日志级别 'max_coro_num' => 100000, // 最大协程数 'enable_coroutine' => true, // 开启协程 ]);- worker_num: worker进程的数量,根据CPU核心数调整。
- max_request: 每个worker进程处理的最大请求数,设置为0表示不重启。
- dispatch_mode: 数据包分发策略,
2表示固定模式,根据连接的fd进行分配,保证同一个连接始终由同一个worker处理。这对于需要维护会话状态的场景非常重要。 - heartbeat_check_interval: 心跳检测间隔。
- heartbeat_idle_time: 连接空闲超时时间。
- open_tcp_nodelay: 启用TCP_NODELAY,禁用Nagle算法,提高小数据包的实时性。
- tcp_fastopen: 开启TCP快速握手,减少连接建立的延迟。
- max_coro_num: 最大协程数,在高并发场景下需要调整。
- enable_coroutine: 启用协程,提高并发能力。
-
事件回调:
Swoole提供了丰富的事件回调,我们需要关注以下几个:
$server->on('open', function (SwooleWebSocketServer $server, SwooleHttpRequest $request) { // 连接建立事件 $fd = $request->fd; $userId = $request->get['user_id'] ?? null; // 从GET参数中获取用户ID,或者从header中获取 if (!$userId) { $server->disconnect($fd); // 断开连接 return; } // 绑定用户ID和连接 $this->bindUser($fd, $userId); // 记录连接信息到Connection Manager $this->registerConnection($userId, $fd, $server->worker_id); echo "server: handshake success with fd{$request->fd}n"; }); $server->on('message', function (SwooleWebSocketServer $server, SwooleWebSocketFrame $frame) { // 收到消息事件 $fd = $frame->fd; $data = $frame->data; // 处理消息,例如发送给其他用户 $this->processMessage($fd, $data); }); $server->on('close', function (SwooleWebSocketServer $server, int $fd) { // 连接关闭事件 $userId = $this->getUserIdByFd($fd); if ($userId) { // 从Connection Manager中移除连接 $this->unregisterConnection($userId, $fd); } echo "client {$fd} closedn"; });- open: 在WebSocket握手成功后触发,可以在这里进行用户认证、绑定用户ID和连接、记录连接信息等操作。
- message: 收到客户端发送的消息时触发,可以在这里处理消息,例如发送给其他用户。
- close: 连接关闭时触发,可以在这里清理连接信息。
-
用户ID和连接绑定:
我们需要将用户ID和WebSocket连接的
fd关联起来,方便根据用户ID查找连接,以及在连接关闭时清理连接信息。可以使用一个全局的数组或者Redis来存储这个映射关系。private $userFdMap = []; // 用户ID到fd的映射 private function bindUser(int $fd, string $userId): void { $this->userFdMap[$userId] = $fd; } private function getUserIdByFd(int $fd): ?string { foreach ($this->userFdMap as $userId => $f) { if ($f === $fd) { return $userId; } } return null; } private function unregisterConnection(string $userId, int $fd): void { unset($this->userFdMap[$userId]); } private function registerConnection(string $userId, int $fd, int $workerId): void { // 将连接信息存储到Connection Manager (例如Redis) // Key: user:{$userId}, Value: json_encode(['fd' => $fd, 'worker_id' => $workerId]) // 需要根据实际的Connection Manager的存储方式进行调整 // 可以使用Redis的Hash结构,例如 hset user:{$userId} fd $fd worker_id $workerId }需要注意的是,由于Swoole是多进程模型,每个worker进程都有自己的内存空间,因此
$userFdMap只能在当前worker进程中使用。 需要在connection manager中维护全局的连接状态。 -
消息处理:
收到消息后,需要根据消息类型进行处理。例如,可以发送给其他用户,或者保存到数据库。
private function processMessage(int $fd, string $data): void { $message = json_decode($data, true); if (empty($message['type'])) { echo "Invalid message formatn"; return; } switch ($message['type']) { case 'chat': $this->handleChatMessage($fd, $message); break; // 其他消息类型 default: echo "Unknown message typen"; } } private function handleChatMessage(int $fd, array $message): void { $senderId = $this->getUserIdByFd($fd); $receiverId = $message['receiver_id'] ?? null; $content = $message['content'] ?? null; if (!$senderId || !$receiverId || !$content) { echo "Invalid chat messagen"; return; } // 从Connection Manager中获取接收者的连接信息 $receiverInfo = $this->getConnectionInfo($receiverId); if (!$receiverInfo) { echo "Receiver not foundn"; return; } $receiverFd = $receiverInfo['fd']; $receiverWorkerId = $receiverInfo['worker_id']; $response = json_encode([ 'type' => 'chat', 'sender_id' => $senderId, 'content' => $content, ]); // 向接收者发送消息 if ($this->send($receiverId, $receiverFd, $receiverWorkerId, $response) === false) { echo "Failed to send messagen"; } } private function send(string $userId, int $receiverFd, int $receiverWorkerId, string $message): bool { //如果接收者fd存在于当前worker进程,直接发送,否则,投递到task进程处理 if($this->getServer()->worker_id == $receiverWorkerId){ return $this->getServer()->push($receiverFd, $message); }else{ $taskData = [ 'fd' => $receiverFd, 'message' => $message, 'worker_id' => $receiverWorkerId ]; return $this->getServer()->task($taskData); } } private function getConnectionInfo(string $userId): ?array { // 从Connection Manager中获取连接信息 (例如Redis) // 可以使用Redis的Hash结构,例如 hgetall user:{$userId} // 返回 ['fd' => $fd, 'worker_id' => $workerId] // 如果找不到,返回 null // 需要根据实际的Connection Manager的存储方式进行调整 return ['fd' => 1, 'worker_id' => 0]; // 示例,需要替换为实际的实现 } private function getServer(): ?SwooleWebSocketServer{ return $this->server; }
三、连接维持
WebSocket连接的维持是保证服务可用性的关键。以下是一些常用的连接维持策略:
-
心跳检测:
客户端和服务器定期发送心跳包,以检测连接是否仍然有效。如果在一定时间内没有收到心跳包,则认为连接已经断开,需要关闭连接。
-
客户端:
function sendHeartbeat() { websocket.send(JSON.stringify({ type: 'ping' })); setTimeout(sendHeartbeat, 30000); // 每隔30秒发送一次心跳 } websocket.onopen = function (event) { console.log("Connected to WebSocket server."); sendHeartbeat(); }; websocket.onmessage = function (event) { const message = JSON.parse(event.data); if (message.type === 'pong') { console.log("Received pong from server."); } else { // 处理其他消息 } }; -
服务器:
在Swoole的配置中设置
heartbeat_check_interval和heartbeat_idle_time,Swoole会自动进行心跳检测。$server->set([ 'heartbeat_check_interval' => 60, // 每隔60秒检测一次心跳 'heartbeat_idle_time' => 120, // 连接超过120秒没有收到数据就close ]);同时,需要在
message事件中处理客户端发送的心跳包:$server->on('message', function (SwooleWebSocketServer $server, SwooleWebSocketFrame $frame) { $data = json_decode($frame->data, true); if ($data['type'] === 'ping') { $server->push($frame->fd, json_encode(['type' => 'pong'])); } else { // 处理其他消息 } });
-
-
断线重连:
客户端在连接断开后,自动尝试重新连接服务器。
-
客户端:
function connectWebSocket() { websocket = new WebSocket("ws://example.com:9501"); websocket.onopen = function (event) { console.log("Connected to WebSocket server."); sendHeartbeat(); }; websocket.onclose = function (event) { console.log("Disconnected from WebSocket server. Reconnecting in 5 seconds..."); setTimeout(connectWebSocket, 5000); // 每隔5秒尝试重新连接 }; websocket.onerror = function (error) { console.error("WebSocket error:", error); }; websocket.onmessage = function (event) { // 处理消息 }; } connectWebSocket();
-
-
Keep-Alive:
开启TCP Keep-Alive,让操作系统来检测连接是否仍然有效。
-
服务器:
Swoole默认开启了TCP Keep-Alive,可以通过
open_tcp_keepalive配置项来控制。$server->set([ 'open_tcp_keepalive' => true, ]);
-
四、消息推送
消息推送是将消息发送给指定用户或者所有用户的过程。以下是一些常用的消息推送策略:
-
单播:
将消息发送给指定的单个用户。
// 在handleChatMessage函数中已经实现了单播 private function handleChatMessage(int $fd, array $message): void{ // ... } -
广播:
将消息发送给所有在线用户。
public function broadcast(string $message): void { foreach ($this->userFdMap as $userId => $fd) { $connectionInfo = $this->getConnectionInfo($userId); if (!$connectionInfo) { continue; } $receiverFd = $connectionInfo['fd']; $receiverWorkerId = $connectionInfo['worker_id']; $this->send($userId, $receiverFd, $receiverWorkerId, $message); } } -
群播:
将消息发送给指定的用户群组。
public function groupcast(array $userIds, string $message): void { foreach ($userIds as $userId) { $connectionInfo = $this->getConnectionInfo($userId); if (!$connectionInfo) { continue; } $receiverFd = $connectionInfo['fd']; $receiverWorkerId = $connectionInfo['worker_id']; $this->send($userId, $receiverFd, $receiverWorkerId, $message); } } -
异步推送:
将消息推送到消息队列,由专门的推送服务来完成消息推送。这样可以避免阻塞WebSocket服务器,提高并发能力。
public function pushToQueue(string $userId, string $message): void { // 将消息推送到消息队列 (例如Redis) // 可以使用Redis的List结构,例如 lpush user:{$userId}:queue $message // 需要根据实际的消息队列的存储方式进行调整 }然后,创建一个专门的推送服务,从消息队列中读取消息,并将其发送给对应的用户。
五、Connection Manager设计
Connection Manager负责维护所有WebSocket连接的信息,例如用户ID、连接所在的服务器等。可以使用Redis、数据库等。
-
Redis:
使用Redis存储连接信息,可以利用其高性能和丰富的数据结构。
-
存储结构:
可以使用Redis的Hash结构来存储每个用户的连接信息:
Key: user:{$userId} Value: { "fd": $fd, "worker_id": $workerId, "server_id": $serverId, // WebSocket服务器的ID } -
操作:
- 注册连接:
HSET user:{$userId} fd $fd worker_id $workerId server_id $serverId - 获取连接信息:
HGETALL user:{$userId} - 移除连接:
DEL user:{$userId}
- 注册连接:
-
-
数据库:
使用数据库存储连接信息,可以提供更强的持久化能力。
-
表结构:
CREATE TABLE `connections` ( `user_id` VARCHAR(255) NOT NULL, `fd` INT NOT NULL, `worker_id` INT NOT NULL, `server_id` INT NOT NULL, `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`user_id`) ); -
操作:
- 注册连接:
INSERT INTO connections (user_id, fd, worker_id, server_id) VALUES ($userId, $fd, $workerId, $serverId) - 获取连接信息:
SELECT * FROM connections WHERE user_id = $userId - 移除连接:
DELETE FROM connections WHERE user_id = $userId
- 注册连接:
-
六、负载均衡
负载均衡是将用户请求均匀地分发到不同的WebSocket服务器上,可以使用Nginx、HAProxy等。
-
Nginx:
Nginx可以使用
upstream模块来实现负载均衡。upstream websocket_servers { server 192.168.1.101:9501; server 192.168.1.102:9501; server 192.168.1.103:9501; } server { listen 80; server_name example.com; location /ws { proxy_pass http://websocket_servers; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; proxy_set_header Host $host; } }upstream: 定义一组WebSocket服务器。proxy_pass: 将请求转发到upstream定义的服务器。proxy_http_version: 设置HTTP协议版本为1.1,以便支持WebSocket。proxy_set_header: 设置请求头,传递必要的信息给WebSocket服务器。
-
HAProxy:
HAProxy也可以用来实现负载均衡。
frontend main bind *:80 default_backend websocket_servers backend websocket_servers balance roundrobin server ws1 192.168.1.101:9501 check server ws2 192.168.1.102:9501 check server ws3 192.168.1.103:9501 checkfrontend: 定义前端监听的端口。backend: 定义后端服务器。balance: 设置负载均衡算法,roundrobin表示轮询。server: 定义后端服务器的地址和端口,check表示启用健康检查。
七、高可用性考虑
为了保证系统的可用性,需要考虑以下几个方面:
-
多机房部署:
将服务器部署在多个机房,以避免单点故障。
-
自动故障转移:
当某个服务器出现故障时,自动将请求转移到其他服务器。
-
监控和告警:
监控服务器的各项指标,例如CPU使用率、内存使用率、网络流量等,并在出现异常时及时告警。
-
限流和熔断:
限制用户的请求速率,防止恶意攻击或者流量突增导致系统崩溃。
-
灰度发布:
在发布新版本时,先在一小部分用户上进行测试,确认没有问题后再逐步推广到所有用户。
八、总结
通过合理的设计和优化,基于Swoole可以构建一个能够支撑千万级并发WebSocket连接的系统。关键在于选择合适的架构、优化Swoole配置、实现高效的连接维持策略和消息推送策略、以及考虑高可用性。
架构设计:核心围绕分布式和异步化展开
以上讨论了构建高并发WebSocket系统的关键架构和技术选型,包括客户端连接、服务器配置、负载均衡以及高可用性策略。
实践:代码示例提供了技术落地的参考
通过代码示例,演示了连接建立、消息处理、心跳检测和消息推送等核心功能的实现方式,帮助大家更好地理解和应用这些技术。
展望:技术选型和架构需要不断优化
在实际应用中,需要根据具体的业务需求和硬件资源,选择合适的架构和技术,并不断进行优化,以满足不断增长的用户需求。