PHP WebSocket协议解析:在用户态实现Frame分包、解包与心跳检测

PHP WebSocket协议解析:在用户态实现Frame分包、解包与心跳检测

大家好!今天我们来聊聊如何在PHP用户态实现WebSocket协议的解析,包括Frame的分包、解包以及心跳检测。WebSocket作为一种全双工通信协议,在Web应用中扮演着越来越重要的角色,特别是在需要实时交互的场景下。虽然有很多现成的WebSocket服务器实现(如Swoole、Workerman),但理解底层协议原理,自己动手实现一个简单的解析器,能够帮助我们更好地掌握WebSocket的运作机制,并在特定场景下进行定制化开发。

WebSocket协议概述

WebSocket协议基于TCP协议,在HTTP握手的基础上建立持久连接。一旦连接建立,客户端和服务器就可以通过发送消息(Message)进行双向通信。每个消息被分割成一个或多个帧(Frame)进行传输。

一个WebSocket Frame的基本结构如下:

字段 长度 (bits) 说明
FIN 1 表示这是消息的最后一个分片。1表示是,0表示不是。
RSV1, RSV2, RSV3 1 each 保留位,一般设置为0。
Opcode 4 定义帧的类型,如文本数据(0x1)、二进制数据(0x2)、连接关闭(0x8)、Ping(0x9)、Pong(0xA)等。
Mask 1 表示Payload Data是否进行了掩码处理。1表示进行了掩码处理,0表示没有。客户端发送给服务器的数据必须进行掩码处理,服务器发送给客户端的数据则不需要。
Payload Length 7, 7+16, 7+64 表示Payload Data的长度。如果值是0-125,则表示Payload Data的实际长度。如果值是126,则接下来的16位表示Payload Data的长度。如果值是127,则接下来的64位表示Payload Data的长度。
Masking-key 0 or 32 掩码密钥,只有在Mask设置为1时才存在。
Payload Data Variable 实际的负载数据,经过掩码处理(如果Mask为1)。

PHP用户态实现Frame分包与解包

在PHP用户态,我们需要处理TCP连接的读取,并根据WebSocket Frame的结构进行分包和解包。这里我们假设已经建立了TCP连接,并使用socket_read函数读取数据。

1. 分包:读取数据并判断是否完整

由于TCP是流式协议,一次socket_read可能只读取到部分数据,因此我们需要循环读取,直到接收到完整的Frame。

<?php

class WebSocketFrame {

    const OPCODE_TEXT = 0x1;
    const OPCODE_BINARY = 0x2;
    const OPCODE_CLOSE = 0x8;
    const OPCODE_PING = 0x9;
    const OPCODE_PONG = 0xA;

    public $fin;
    public $opcode;
    public $mask;
    public $payloadLength;
    public $maskingKey;
    public $payloadData;

    public function __construct(
        bool $fin,
        int $opcode,
        bool $mask,
        int $payloadLength,
        ?string $maskingKey,
        ?string $payloadData
    ) {
        $this->fin = $fin;
        $this->opcode = $opcode;
        $this->mask = $mask;
        $this->payloadLength = $payloadLength;
        $this->maskingKey = $maskingKey;
        $this->payloadData = $payloadData;
    }

    public function __toString(): string {
        return sprintf(
            "WebSocketFrame(fin=%s, opcode=%s, mask=%s, payloadLength=%s, maskingKey=%s, payloadData=%s)",
            $this->fin ? 'true' : 'false',
            $this->opcode,
            $this->mask ? 'true' : 'false',
            $this->payloadLength,
            $this->maskingKey !== null ? bin2hex($this->maskingKey) : 'null',
            strlen($this->payloadData ?? '')
        );
    }
}

class WebSocketParser {

    private $buffer = '';

    /**
     * @param string $data
     * @return WebSocketFrame[]
     */
    public function decode(string $data): array {
        $this->buffer .= $data;
        $frames = [];

        while (strlen($this->buffer) > 2) { // Minimum frame size is 2 bytes
            $frame = $this->parseFrame($this->buffer);
            if ($frame === null) {
                break; // Incomplete frame
            }

            $frames[] = $frame;
            $this->buffer = substr($this->buffer, $this->frameLength($frame));
        }

        return $frames;
    }

    private function parseFrame(string $buffer): ?WebSocketFrame {
        $firstByte = ord($buffer[0]);
        $secondByte = ord($buffer[1]);

        $fin = ($firstByte >> 7) & 0x1; // Extract FIN bit
        $opcode = $firstByte & 0x0F; // Extract Opcode

        $mask = ($secondByte >> 7) & 0x1; // Extract Mask bit
        $payloadLength = $secondByte & 0x7F; // Extract Payload Length

        $offset = 2;
        $extendedPayloadLength = 0;

        if ($payloadLength == 126) {
            if (strlen($buffer) < 4) {
                return null; // Incomplete frame
            }
            $extendedPayloadLength = unpack('n', substr($buffer, $offset, 2))[1]; // Read 16-bit length
            $offset += 2;
        } elseif ($payloadLength == 127) {
            if (strlen($buffer) < 10) {
                return null; // Incomplete frame
            }
            $extendedPayloadLength = unpack('J', substr($buffer, $offset, 8))[1]; // Read 64-bit length
            $offset += 8;
        } else {
            $extendedPayloadLength = $payloadLength;
        }

        $maskingKey = null;
        if ($mask) {
            if (strlen($buffer) < $offset + 4) {
                return null; // Incomplete frame
            }
            $maskingKey = substr($buffer, $offset, 4);
            $offset += 4;
        }

        if (strlen($buffer) < $offset + $extendedPayloadLength) {
            return null; // Incomplete frame
        }

        $payloadData = substr($buffer, $offset, $extendedPayloadLength);

        if ($mask && $maskingKey !== null) {
            $payloadData = $this->applyMask($payloadData, $maskingKey);
        }

        return new WebSocketFrame(
            (bool) $fin,
            $opcode,
            (bool) $mask,
            $extendedPayloadLength,
            $maskingKey,
            $payloadData
        );
    }

    private function applyMask(string $payloadData, string $maskingKey): string {
        $masked = '';
        $payloadLength = strlen($payloadData);
        for ($i = 0; $i < $payloadLength; $i++) {
            $masked .= chr(ord($payloadData[$i]) ^ ord($maskingKey[$i % 4]));
        }
        return $masked;
    }

    private function frameLength(WebSocketFrame $frame): int {
        $length = 2; // Base length (2 bytes for header)

        if ($frame->payloadLength == 126) {
            $length += 2; // Add 2 bytes for extended payload length (16 bits)
        } elseif ($frame->payloadLength == 127) {
            $length += 8; // Add 8 bytes for extended payload length (64 bits)
        } else {
            // No extended payload length
        }

        if ($frame->mask) {
            $length += 4; // Add 4 bytes for masking key
        }

        $length += $frame->payloadLength; // Add payload data length

        return $length;
    }
}

// Example Usage:
$parser = new WebSocketParser();
$data = file_get_contents('payload.bin'); // Replace with socket_read result

$frames = $parser->decode($data);

foreach ($frames as $frame) {
    echo $frame . PHP_EOL;
    // Process the frame here
    if($frame->opcode == WebSocketFrame::OPCODE_TEXT){
        echo "Received text message: " . $frame->payloadData . PHP_EOL;
    } elseif ($frame->opcode == WebSocketFrame::OPCODE_PING) {
        echo "Received ping, sending pong..." . PHP_EOL;
        //TODO: Send pong
    }
}

2. 解包:解析Frame Header并提取Payload

  • 读取前两个字节: 这两个字节包含FIN, RSV1-3, Opcode, Mask, Payload Length的信息。
  • 解析Payload Length: 根据Payload Length的值,判断是否需要读取扩展的Payload Length。
  • 读取Masking Key (如果存在): 如果Mask为1,则需要读取4个字节的Masking Key。
  • 读取Payload Data: 根据Payload Length读取Payload Data。
  • 应用Mask (如果需要): 如果Mask为1,则使用Masking Key对Payload Data进行异或操作。
    //已经移动到上面的代码块中了

3. 编码:将数据封装成WebSocket Frame

<?php
class WebSocketEncoder {

    public static function encode(string $payload, int $opcode = WebSocketFrame::OPCODE_TEXT, bool $mask = false): string {
        $frameHead = [];
        $payloadLength = strlen($payload);

        // First byte: FIN + RSV1 + RSV2 + RSV3 + Opcode
        $frameHead[0] = 0x80 | $opcode; // FIN set to 1
        // Second byte: Mask + Payload Length
        if ($payloadLength <= 125) {
            $frameHead[1] = $payloadLength;
        } elseif ($payloadLength <= 65535) {
            $frameHead[1] = 126;
            $frameHead[2] = ($payloadLength >> 8) & 0xFF;
            $frameHead[3] = $payloadLength & 0xFF;
        } else {
            $frameHead[1] = 127;
            $frameHead[2] = ($payloadLength >> 56) & 0xFF;
            $frameHead[3] = ($payloadLength >> 48) & 0xFF;
            $frameHead[4] = ($payloadLength >> 40) & 0xFF;
            $frameHead[5] = ($payloadLength >> 32) & 0xFF;
            $frameHead[6] = ($payloadLength >> 24) & 0xFF;
            $frameHead[7] = ($payloadLength >> 16) & 0xFF;
            $frameHead[8] = ($payloadLength >> 8) & 0xFF;
            $frameHead[9] = $payloadLength & 0xFF;
        }

        // Convert frame head to string
        $frame = implode(array_map("chr", $frameHead));

        // Add payload to frame
        $frame .= $payload;

        return $frame;
    }
}

//Example usage
$message = "Hello, WebSocket!";
$encodedMessage = WebSocketEncoder::encode($message);
// Send $encodedMessage through the socket

PHP用户态实现心跳检测

心跳检测是维持WebSocket连接活跃的重要机制。客户端和服务器定期发送Ping帧,对方收到后回复Pong帧。如果在一定时间内没有收到Pong帧,则认为连接已断开。

1. 发送Ping帧:

<?php
// Send a Ping frame
$pingFrame = WebSocketEncoder::encode('', WebSocketFrame::OPCODE_PING);
// socket_write($socket, $pingFrame, strlen($pingFrame)); // Replace $socket with your socket resource

2. 接收Pong帧:

WebSocketParser中,我们需要处理Pong帧的Opcode (0xA)。

<?php
// In the decode loop, after parsing the frame
if ($frame->opcode == WebSocketFrame::OPCODE_PONG) {
    // Handle Pong frame: reset timeout counter
    echo "Received Pong" . PHP_EOL;
    // Reset timeout counter here
}

3. 超时检测:

我们需要维护一个超时计数器,每次收到Pong帧时重置计数器。如果在一定时间内没有收到Pong帧,则关闭连接。

<?php
$lastPongTime = time();
$timeout = 30; // seconds

while (true) {
    // Read data from socket
    $data = socket_read($socket, 2048);

    if ($data === false) {
        // Handle socket error
        break;
    }

    if ($data === '') {
        // Connection closed by client
        break;
    }

    $frames = $parser->decode($data);

    foreach ($frames as $frame) {
        // Process frames...
        if ($frame->opcode == WebSocketFrame::OPCODE_PONG) {
            $lastPongTime = time();
        }
    }

    // Check for timeout
    if (time() - $lastPongTime > $timeout) {
        echo "Timeout! Closing connection." . PHP_EOL;
        break;
    }

    // Send Ping frame periodically (e.g., every 15 seconds)
    if (time() % 15 == 0) {
        $pingFrame = WebSocketEncoder::encode('', WebSocketFrame::OPCODE_PING);
        socket_write($socket, $pingFrame, strlen($pingFrame));
    }

    usleep(100000); // Add a small delay to prevent busy-waiting
}

socket_close($socket);

代码示例说明:

  • WebSocketFrame 类:表示一个WebSocket Frame,包含FIN, Opcode, Mask, Payload Length, Masking Key, Payload Data等属性。
  • WebSocketParser 类:负责将接收到的数据解析成WebSocket Frame。decode方法接收数据,并循环调用parseFrame解析,直到数据不足以构成一个完整的Frame。parseFrame方法根据Frame Header的结构,提取各个字段的值。applyMask方法用于对Payload Data进行掩码处理。frameLength 方法根据 frame 的结构计算出frame的总长度。
  • WebSocketEncoder 类: 负责将数据编码成WebSocket Frame,包括设置FIN, Opcode, Mask, Payload Length等字段,并根据Payload Length选择合适的编码方式。
  • 心跳检测: 通过定期发送Ping帧,并在收到Pong帧时重置超时计数器,来实现连接的保活。

注意事项:

  • 错误处理: 在实际应用中,需要添加完善的错误处理机制,例如socket_read失败、Payload Length超出限制等情况。
  • 性能优化: 对于高并发场景,可以考虑使用异步IO和事件循环来提高性能。
  • 安全性: 需要对接收到的数据进行校验,防止恶意攻击。
  • Masking Key: 客户端发给服务器的数据必须进行掩码处理,这是WebSocket协议的要求。
  • 扩展: 可以根据实际需求,支持更多的WebSocket扩展,例如压缩等。

用户态实现的局限性

虽然可以在PHP用户态实现WebSocket协议的解析,但其性能和并发能力受到限制。PHP是解释型语言,用户态的IO操作效率较低。在面对高并发场景时,用户态实现可能会成为性能瓶颈。因此,在实际生产环境中,建议使用Swoole、Workerman等基于C扩展的WebSocket服务器实现,它们能够提供更高的性能和更好的并发能力。

更好的选择:基于C扩展的WebSocket服务器

Swoole和Workerman是流行的PHP异步事件驱动网络通信引擎,它们提供了WebSocket服务器的封装,可以方便地创建高性能的WebSocket应用。它们基于C扩展实现,能够充分利用操作系统的IO多路复用机制,提供更高的并发能力。

实现原理的价值

虽然用户态实现可能不适用于生产环境,但理解其原理对于深入学习WebSocket协议,以及在特定场景下进行定制化开发非常有帮助。例如,在需要对WebSocket协议进行特殊处理或扩展时,了解底层原理能够帮助我们更好地进行设计和实现。

概括说明

本文详细介绍了如何在PHP用户态实现WebSocket协议的解析,包括Frame的分包、解包以及心跳检测。虽然用户态实现存在性能限制,但理解其原理对于深入学习WebSocket协议非常有价值。在实际生产环境中,建议使用Swoole、Workerman等基于C扩展的WebSocket服务器实现。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注