基于Swoole实现WebSocket千万级连接:连接维持、心跳检测与消息推送架构设计

基于Swoole实现WebSocket千万级连接:连接维持、心跳检测与消息推送架构设计

大家好,今天我们来聊聊如何利用Swoole构建一个能够支撑千万级并发WebSocket连接的系统,并重点关注连接维持、心跳检测和消息推送三个关键环节。

一、架构概述

要实现千万级WebSocket连接,单台服务器肯定是不够的,我们需要一个分布式架构。核心思想是将连接分散到多台Swoole服务器上,再通过一个中心服务来协调和管理这些连接。

以下是一个简化的架构图:

+-------------------+       +-------------------+       +-------------------+
|   Client (用户)   | <---> |   Load Balancer   | <---> |  WebSocket Server |
+-------------------+       +-------------------+       +-------------------+
                                                    ^
                                                    |
                                                    | (内部网络)
                                                    |
+-------------------+       +-------------------+
|   Admin/Backend  | <---> |   Message Queue  | <---> |   Connection Manager   |
+-------------------+       +-------------------+
  • Client (用户): 用户的客户端,发起WebSocket连接。
  • Load Balancer (负载均衡): 将用户请求均匀地分发到不同的WebSocket服务器上。可以使用Nginx、HAProxy等。
  • WebSocket Server (WebSocket服务器): 基于Swoole构建,负责处理WebSocket连接,接收和发送消息。
  • Message Queue (消息队列): 用于异步地在WebSocket服务器和后端服务之间传递消息,例如Redis、Kafka、RabbitMQ等。
  • Connection Manager (连接管理器): 负责维护所有WebSocket连接的信息,例如用户ID、连接所在的服务器等。可以使用Redis、数据库等。
  • Admin/Backend (管理后台/后端服务): 用于推送消息、管理用户等。

二、WebSocket服务器设计

WebSocket服务器是整个架构的核心,需要能够高效地处理大量的并发连接。

  1. Swoole配置优化:

    Swoole的配置直接影响服务器的性能,以下是一些关键配置项:

    $server = new SwooleWebSocketServer("0.0.0.0", 9501);
    
    $server->set([
        'worker_num' => swoole_cpu_num() * 2,  // 一般设置为CPU核心数的2倍
        'max_request' => 0,                 // 避免worker进程重启
        'dispatch_mode' => 2,              // 固定模式,根据userId进行分配
        'heartbeat_check_interval' => 60,   // 每隔60秒检测一次心跳
        'heartbeat_idle_time' => 120,      // 连接超过120秒没有收到数据就close
        'open_tcp_nodelay' => true,          // 启用TCP_NODELAY,提高小数据包的实时性
        'tcp_fastopen' => true,              // 开启TCP快速握手
        'log_file' => '/var/log/swoole.log',  // 日志文件
        'log_level' => SWOOLE_LOG_WARNING,   // 日志级别
        'max_coro_num' => 100000,           // 最大协程数
        'enable_coroutine' => true,          // 开启协程
    ]);
    • worker_num: worker进程的数量,根据CPU核心数调整。
    • max_request: 每个worker进程处理的最大请求数,设置为0表示不重启。
    • dispatch_mode: 数据包分发策略,2表示固定模式,根据连接的fd进行分配,保证同一个连接始终由同一个worker处理。这对于需要维护会话状态的场景非常重要。
    • heartbeat_check_interval: 心跳检测间隔。
    • heartbeat_idle_time: 连接空闲超时时间。
    • open_tcp_nodelay: 启用TCP_NODELAY,禁用Nagle算法,提高小数据包的实时性。
    • tcp_fastopen: 开启TCP快速握手,减少连接建立的延迟。
    • max_coro_num: 最大协程数,在高并发场景下需要调整。
    • enable_coroutine: 启用协程,提高并发能力。
  2. 事件回调:

    Swoole提供了丰富的事件回调,我们需要关注以下几个:

    $server->on('open', function (SwooleWebSocketServer $server, SwooleHttpRequest $request) {
        // 连接建立事件
        $fd = $request->fd;
        $userId = $request->get['user_id'] ?? null; // 从GET参数中获取用户ID,或者从header中获取
    
        if (!$userId) {
            $server->disconnect($fd); // 断开连接
            return;
        }
    
        // 绑定用户ID和连接
        $this->bindUser($fd, $userId);
    
        // 记录连接信息到Connection Manager
        $this->registerConnection($userId, $fd, $server->worker_id);
    
        echo "server: handshake success with fd{$request->fd}n";
    });
    
    $server->on('message', function (SwooleWebSocketServer $server, SwooleWebSocketFrame $frame) {
        // 收到消息事件
        $fd = $frame->fd;
        $data = $frame->data;
    
        // 处理消息,例如发送给其他用户
        $this->processMessage($fd, $data);
    });
    
    $server->on('close', function (SwooleWebSocketServer $server, int $fd) {
        // 连接关闭事件
        $userId = $this->getUserIdByFd($fd);
        if ($userId) {
            // 从Connection Manager中移除连接
            $this->unregisterConnection($userId, $fd);
        }
        echo "client {$fd} closedn";
    });
    • open: 在WebSocket握手成功后触发,可以在这里进行用户认证、绑定用户ID和连接、记录连接信息等操作。
    • message: 收到客户端发送的消息时触发,可以在这里处理消息,例如发送给其他用户。
    • close: 连接关闭时触发,可以在这里清理连接信息。
  3. 用户ID和连接绑定:

    我们需要将用户ID和WebSocket连接的fd关联起来,方便根据用户ID查找连接,以及在连接关闭时清理连接信息。可以使用一个全局的数组或者Redis来存储这个映射关系。

    private $userFdMap = []; // 用户ID到fd的映射
    
    private function bindUser(int $fd, string $userId): void
    {
        $this->userFdMap[$userId] = $fd;
    }
    
    private function getUserIdByFd(int $fd): ?string
    {
        foreach ($this->userFdMap as $userId => $f) {
            if ($f === $fd) {
                return $userId;
            }
        }
        return null;
    }
    
    private function unregisterConnection(string $userId, int $fd): void
    {
        unset($this->userFdMap[$userId]);
    }
    
    private function registerConnection(string $userId, int $fd, int $workerId): void
    {
        // 将连接信息存储到Connection Manager (例如Redis)
        //  Key: user:{$userId}, Value: json_encode(['fd' => $fd, 'worker_id' => $workerId])
        //  需要根据实际的Connection Manager的存储方式进行调整
        //  可以使用Redis的Hash结构,例如 hset user:{$userId} fd $fd worker_id $workerId
    }

    需要注意的是,由于Swoole是多进程模型,每个worker进程都有自己的内存空间,因此$userFdMap只能在当前worker进程中使用。 需要在connection manager中维护全局的连接状态。

  4. 消息处理:

    收到消息后,需要根据消息类型进行处理。例如,可以发送给其他用户,或者保存到数据库。

    private function processMessage(int $fd, string $data): void
    {
        $message = json_decode($data, true);
    
        if (empty($message['type'])) {
            echo "Invalid message formatn";
            return;
        }
    
        switch ($message['type']) {
            case 'chat':
                $this->handleChatMessage($fd, $message);
                break;
            // 其他消息类型
            default:
                echo "Unknown message typen";
        }
    }
    
    private function handleChatMessage(int $fd, array $message): void
    {
        $senderId = $this->getUserIdByFd($fd);
        $receiverId = $message['receiver_id'] ?? null;
        $content = $message['content'] ?? null;
    
        if (!$senderId || !$receiverId || !$content) {
            echo "Invalid chat messagen";
            return;
        }
    
        // 从Connection Manager中获取接收者的连接信息
        $receiverInfo = $this->getConnectionInfo($receiverId);
    
        if (!$receiverInfo) {
            echo "Receiver not foundn";
            return;
        }
    
        $receiverFd = $receiverInfo['fd'];
        $receiverWorkerId = $receiverInfo['worker_id'];
    
        $response = json_encode([
            'type' => 'chat',
            'sender_id' => $senderId,
            'content' => $content,
        ]);
    
        // 向接收者发送消息
        if ($this->send($receiverId, $receiverFd, $receiverWorkerId, $response) === false) {
            echo "Failed to send messagen";
        }
    }
    
    private function send(string $userId, int $receiverFd, int $receiverWorkerId, string $message): bool
    {
        //如果接收者fd存在于当前worker进程,直接发送,否则,投递到task进程处理
        if($this->getServer()->worker_id == $receiverWorkerId){
            return $this->getServer()->push($receiverFd, $message);
        }else{
            $taskData = [
                'fd' => $receiverFd,
                'message' => $message,
                'worker_id' => $receiverWorkerId
            ];
    
            return $this->getServer()->task($taskData);
        }
    
    }
    
    private function getConnectionInfo(string $userId): ?array
    {
        // 从Connection Manager中获取连接信息 (例如Redis)
        //  可以使用Redis的Hash结构,例如 hgetall user:{$userId}
        //  返回 ['fd' => $fd, 'worker_id' => $workerId]
        //  如果找不到,返回 null
        //  需要根据实际的Connection Manager的存储方式进行调整
        return ['fd' => 1, 'worker_id' => 0]; // 示例,需要替换为实际的实现
    }
    
    private function getServer(): ?SwooleWebSocketServer{
        return $this->server;
    }

三、连接维持

WebSocket连接的维持是保证服务可用性的关键。以下是一些常用的连接维持策略:

  1. 心跳检测:

    客户端和服务器定期发送心跳包,以检测连接是否仍然有效。如果在一定时间内没有收到心跳包,则认为连接已经断开,需要关闭连接。

    • 客户端:

      function sendHeartbeat() {
          websocket.send(JSON.stringify({ type: 'ping' }));
          setTimeout(sendHeartbeat, 30000); // 每隔30秒发送一次心跳
      }
      
      websocket.onopen = function (event) {
          console.log("Connected to WebSocket server.");
          sendHeartbeat();
      };
      
      websocket.onmessage = function (event) {
          const message = JSON.parse(event.data);
          if (message.type === 'pong') {
              console.log("Received pong from server.");
          } else {
              // 处理其他消息
          }
      };
    • 服务器:

      在Swoole的配置中设置heartbeat_check_intervalheartbeat_idle_time,Swoole会自动进行心跳检测。

      $server->set([
          'heartbeat_check_interval' => 60,   // 每隔60秒检测一次心跳
          'heartbeat_idle_time' => 120,      // 连接超过120秒没有收到数据就close
      ]);

      同时,需要在message事件中处理客户端发送的心跳包:

      $server->on('message', function (SwooleWebSocketServer $server, SwooleWebSocketFrame $frame) {
          $data = json_decode($frame->data, true);
          if ($data['type'] === 'ping') {
              $server->push($frame->fd, json_encode(['type' => 'pong']));
          } else {
              // 处理其他消息
          }
      });
  2. 断线重连:

    客户端在连接断开后,自动尝试重新连接服务器。

    • 客户端:

      function connectWebSocket() {
          websocket = new WebSocket("ws://example.com:9501");
      
          websocket.onopen = function (event) {
              console.log("Connected to WebSocket server.");
              sendHeartbeat();
          };
      
          websocket.onclose = function (event) {
              console.log("Disconnected from WebSocket server. Reconnecting in 5 seconds...");
              setTimeout(connectWebSocket, 5000); // 每隔5秒尝试重新连接
          };
      
          websocket.onerror = function (error) {
              console.error("WebSocket error:", error);
          };
      
          websocket.onmessage = function (event) {
              // 处理消息
          };
      }
      
      connectWebSocket();
  3. Keep-Alive:

    开启TCP Keep-Alive,让操作系统来检测连接是否仍然有效。

    • 服务器:

      Swoole默认开启了TCP Keep-Alive,可以通过open_tcp_keepalive配置项来控制。

      $server->set([
          'open_tcp_keepalive' => true,
      ]);

四、消息推送

消息推送是将消息发送给指定用户或者所有用户的过程。以下是一些常用的消息推送策略:

  1. 单播:

    将消息发送给指定的单个用户。

    // 在handleChatMessage函数中已经实现了单播
    private function handleChatMessage(int $fd, array $message): void{
        // ...
    }
  2. 广播:

    将消息发送给所有在线用户。

    public function broadcast(string $message): void
    {
        foreach ($this->userFdMap as $userId => $fd) {
            $connectionInfo = $this->getConnectionInfo($userId);
    
             if (!$connectionInfo) {
                continue;
             }
    
            $receiverFd = $connectionInfo['fd'];
            $receiverWorkerId = $connectionInfo['worker_id'];
    
            $this->send($userId, $receiverFd, $receiverWorkerId, $message);
        }
    }
  3. 群播:

    将消息发送给指定的用户群组。

    public function groupcast(array $userIds, string $message): void
    {
        foreach ($userIds as $userId) {
            $connectionInfo = $this->getConnectionInfo($userId);
    
            if (!$connectionInfo) {
                continue;
            }
    
            $receiverFd = $connectionInfo['fd'];
            $receiverWorkerId = $connectionInfo['worker_id'];
    
            $this->send($userId, $receiverFd, $receiverWorkerId, $message);
        }
    }
  4. 异步推送:

    将消息推送到消息队列,由专门的推送服务来完成消息推送。这样可以避免阻塞WebSocket服务器,提高并发能力。

    public function pushToQueue(string $userId, string $message): void
    {
        // 将消息推送到消息队列 (例如Redis)
        //  可以使用Redis的List结构,例如 lpush user:{$userId}:queue $message
        //  需要根据实际的消息队列的存储方式进行调整
    }

    然后,创建一个专门的推送服务,从消息队列中读取消息,并将其发送给对应的用户。

五、Connection Manager设计

Connection Manager负责维护所有WebSocket连接的信息,例如用户ID、连接所在的服务器等。可以使用Redis、数据库等。

  1. Redis:

    使用Redis存储连接信息,可以利用其高性能和丰富的数据结构。

    • 存储结构:

      可以使用Redis的Hash结构来存储每个用户的连接信息:

      Key: user:{$userId}
      Value: {
          "fd": $fd,
          "worker_id": $workerId,
          "server_id": $serverId, // WebSocket服务器的ID
      }
    • 操作:

      • 注册连接: HSET user:{$userId} fd $fd worker_id $workerId server_id $serverId
      • 获取连接信息: HGETALL user:{$userId}
      • 移除连接: DEL user:{$userId}
  2. 数据库:

    使用数据库存储连接信息,可以提供更强的持久化能力。

    • 表结构:

      CREATE TABLE `connections` (
          `user_id` VARCHAR(255) NOT NULL,
          `fd` INT NOT NULL,
          `worker_id` INT NOT NULL,
          `server_id` INT NOT NULL,
          `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
          PRIMARY KEY (`user_id`)
      );
    • 操作:

      • 注册连接: INSERT INTO connections (user_id, fd, worker_id, server_id) VALUES ($userId, $fd, $workerId, $serverId)
      • 获取连接信息: SELECT * FROM connections WHERE user_id = $userId
      • 移除连接: DELETE FROM connections WHERE user_id = $userId

六、负载均衡

负载均衡是将用户请求均匀地分发到不同的WebSocket服务器上,可以使用Nginx、HAProxy等。

  1. Nginx:

    Nginx可以使用upstream模块来实现负载均衡。

    upstream websocket_servers {
        server 192.168.1.101:9501;
        server 192.168.1.102:9501;
        server 192.168.1.103:9501;
    }
    
    server {
        listen 80;
        server_name example.com;
    
        location /ws {
            proxy_pass http://websocket_servers;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
            proxy_set_header Host $host;
        }
    }
    • upstream: 定义一组WebSocket服务器。
    • proxy_pass: 将请求转发到upstream定义的服务器。
    • proxy_http_version: 设置HTTP协议版本为1.1,以便支持WebSocket。
    • proxy_set_header: 设置请求头,传递必要的信息给WebSocket服务器。
  2. HAProxy:

    HAProxy也可以用来实现负载均衡。

    frontend main
        bind *:80
        default_backend websocket_servers
    
    backend websocket_servers
        balance roundrobin
        server ws1 192.168.1.101:9501 check
        server ws2 192.168.1.102:9501 check
        server ws3 192.168.1.103:9501 check
    • frontend: 定义前端监听的端口。
    • backend: 定义后端服务器。
    • balance: 设置负载均衡算法,roundrobin表示轮询。
    • server: 定义后端服务器的地址和端口,check表示启用健康检查。

七、高可用性考虑

为了保证系统的可用性,需要考虑以下几个方面:

  1. 多机房部署:

    将服务器部署在多个机房,以避免单点故障。

  2. 自动故障转移:

    当某个服务器出现故障时,自动将请求转移到其他服务器。

  3. 监控和告警:

    监控服务器的各项指标,例如CPU使用率、内存使用率、网络流量等,并在出现异常时及时告警。

  4. 限流和熔断:

    限制用户的请求速率,防止恶意攻击或者流量突增导致系统崩溃。

  5. 灰度发布:

    在发布新版本时,先在一小部分用户上进行测试,确认没有问题后再逐步推广到所有用户。

八、总结

通过合理的设计和优化,基于Swoole可以构建一个能够支撑千万级并发WebSocket连接的系统。关键在于选择合适的架构、优化Swoole配置、实现高效的连接维持策略和消息推送策略、以及考虑高可用性。

架构设计:核心围绕分布式和异步化展开
以上讨论了构建高并发WebSocket系统的关键架构和技术选型,包括客户端连接、服务器配置、负载均衡以及高可用性策略。

实践:代码示例提供了技术落地的参考
通过代码示例,演示了连接建立、消息处理、心跳检测和消息推送等核心功能的实现方式,帮助大家更好地理解和应用这些技术。

展望:技术选型和架构需要不断优化
在实际应用中,需要根据具体的业务需求和硬件资源,选择合适的架构和技术,并不断进行优化,以满足不断增长的用户需求。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注