JAVA Netty 粘包拆包?基于 LengthFieldBasedFrameDecoder 的规范处理

好的,我们开始今天的讲座,主题是Java Netty中的粘包拆包问题,以及如何使用LengthFieldBasedFrameDecoder进行规范处理。

粘包拆包问题概述

在基于TCP协议的网络通信中,由于TCP是面向流的协议,数据在传输过程中可能会发生粘包(多个数据包粘在一起)或拆包(一个数据包被拆分成多个)的现象。

  • 粘包: 发送端发送了两个数据包,但是接收端一次性接收到了这两个数据包。
  • 拆包: 发送端发送了一个数据包,但是接收端分多次接收到这个数据包。

为什么会发生粘包拆包?

  1. TCP是面向流的协议: TCP不保证应用层数据的边界,它只是把数据当成一串无结构的字节流进行传输。
  2. Nagle算法: 为了提高网络利用率,TCP可能会将小的、连续发送的数据包合并成一个大的数据包发送。
  3. 滑动窗口: TCP的滑动窗口机制允许发送端一次发送多个数据包,接收端也可能一次接收多个数据包。
  4. MTU限制: 如果单个数据包的大小超过了网络MTU(Maximum Transmission Unit),TCP可能会将数据包拆分成多个小的数据包进行传输。

粘包拆包带来的问题:

如果接收端没有正确处理粘包拆包问题,就可能导致数据解析错误,甚至程序崩溃。例如,如果接收端将两个粘在一起的数据包当成一个数据包来处理,就会导致数据解析失败。

解决粘包拆包的常见方法

解决粘包拆包的常见方法有以下几种:

  1. 固定长度: 每个数据包的长度固定。接收端每次读取固定长度的数据,如果读取到的数据不足固定长度,就等待直到读取到足够的数据。
  2. 特殊分隔符: 在每个数据包的末尾添加一个特殊的分隔符。接收端通过查找分隔符来确定数据包的边界。
  3. 长度字段: 在每个数据包的头部添加一个长度字段,用于表示数据包的长度。接收端首先读取长度字段,然后根据长度字段的值读取数据包的内容。

Netty中的LengthFieldBasedFrameDecoder

Netty提供了一个非常强大的解码器 LengthFieldBasedFrameDecoder,它可以方便地处理基于长度字段的粘包拆包问题。LengthFieldBasedFrameDecoder 的核心思想是在每个消息的头部包含一个长度字段,该字段指示消息体的长度。解码器会根据这个长度字段来正确地提取消息。

LengthFieldBasedFrameDecoder的构造函数参数:

参数名称 数据类型 描述
maxFrameLength int 整个消息帧的最大长度,包括长度字段本身和消息体。 如果接收到的帧长度超过这个值,将会抛出 TooLongFrameException
lengthFieldOffset int 长度字段的偏移量,即长度字段在整个消息帧中的起始位置(从消息头的第一个字节开始计算)。
lengthFieldLength int 长度字段本身的长度,即长度字段占用的字节数。 例如,如果长度字段使用一个 int 类型表示,那么 lengthFieldLength 就是 4。
lengthAdjustment int 长度调整值。 由于长度字段可能不包含消息头的长度,或者长度字段表示的是消息体的长度而不是整个消息帧的长度,因此需要进行调整。 lengthAdjustment 的值等于消息头的长度减去长度字段的长度。 如果长度字段表示的是整个消息帧的长度,并且消息头只包含长度字段,那么 lengthAdjustment 的值就应该是 0。
initialBytesToStrip int 丢弃的起始字节数。 解码后,从消息帧中丢弃的起始字节数。 通常用于去除消息头,只保留消息体。 如果不需要丢弃任何字节,那么 initialBytesToStrip 的值就应该是 0。
failFast boolean 是否快速失败。 如果设置为 true,当接收到的帧长度超过 maxFrameLength 时,立即抛出 TooLongFrameException。 如果设置为 false,则会尝试读取整个帧,然后在解码时抛出异常。

举例说明:

假设我们的消息格式如下:

字段 长度(字节) 描述
Magic Code 4 魔数,用于标识消息类型。
Length 4 消息体的长度。
Content N 消息体。

在这种情况下,我们可以使用以下参数来配置 LengthFieldBasedFrameDecoder

  • maxFrameLength: 消息的最大长度,例如 1024。
  • lengthFieldOffset: 长度字段的偏移量,这里是 4,因为长度字段在消息头的第 5 个字节开始。
  • lengthFieldLength: 长度字段的长度,这里是 4。
  • lengthAdjustment: 长度调整值。 由于长度字段表示的是消息体的长度,而我们希望解码后的消息包含整个消息头(Magic Code + Length + Content),因此需要将长度字段的值加上消息头的长度(Magic Code 的长度)。 所以 lengthAdjustment 的值是 4。
  • initialBytesToStrip: 丢弃的起始字节数。 如果我们希望解码后的消息只包含消息体,那么 initialBytesToStrip 的值应该是 8(Magic Code + Length 的长度)。
  • failFast: 通常设置为 true,以便在消息长度超过限制时快速失败。

代码示例:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class LengthFieldBasedFrameDecoderServer {

    private final int port;

    public LengthFieldBasedFrameDecoderServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             new LengthFieldBasedFrameDecoder(1024, 4, 4, 0, 0),  //maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip
                             new StringDecoder(CharsetUtil.UTF_8),
                             new StringEncoder(CharsetUtil.UTF_8),
                             new SimpleChannelInboundHandler<String>() {
                                 @Override
                                 protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                     System.out.println("Server received: " + msg);
                                     ctx.writeAndFlush("Server response: " + msg);
                                 }

                                 @Override
                                 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                     cause.printStackTrace();
                                     ctx.close();
                                 }
                             }
                     );
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new LengthFieldBasedFrameDecoderServer(port).run();
    }
}

在这个例子中,LengthFieldBasedFrameDecoder 被配置为:

  • maxFrameLength = 1024: 消息的最大长度是 1024 字节。
  • lengthFieldOffset = 4: 长度字段的偏移量是 4 个字节。 假设消息结构是 [magic code(4 bytes)][length(4 bytes)][content]
  • lengthFieldLength = 4: 长度字段的长度是 4 个字节。
  • lengthAdjustment = 0: 长度调整值为0。 因为我们发送的消息就是 [length][content]
  • initialBytesToStrip = 0: 我们不跳过任何字节,也就是完整消息都传递给后面的handler处理。

这个例子中,我们假设客户端发送的消息格式是:[length(4 bytes)][content],其中 length 表示 content 的字节长度。

客户端代码示例:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class LengthFieldBasedFrameDecoderClient {

    private final String host;
    private final int port;

    public LengthFieldBasedFrameDecoderClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             new StringDecoder(CharsetUtil.UTF_8),
                             new StringEncoder(CharsetUtil.UTF_8),
                             new SimpleChannelInboundHandler<String>() {
                                 @Override
                                 public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                     // Send multiple messages to demonstrate the decoder
                                     String message1 = "Hello, Netty!";
                                     String message2 = "This is a longer message.";
                                     sendMessage(ctx, message1);
                                     sendMessage(ctx, message2);
                                 }

                                 private void sendMessage(ChannelHandlerContext ctx, String message) {
                                     byte[] contentBytes = message.getBytes(CharsetUtil.UTF_8);
                                     int contentLength = contentBytes.length;
                                     byte[] messageBytes = new byte[4 + contentLength]; // Length (4 bytes) + Content

                                     // Convert length to byte array
                                     messageBytes[0] = (byte) ((contentLength >> 24) & 0xFF);
                                     messageBytes[1] = (byte) ((contentLength >> 16) & 0xFF);
                                     messageBytes[2] = (byte) ((contentLength >> 8) & 0xFF);
                                     messageBytes[3] = (byte) (contentLength & 0xFF);

                                     // Copy content bytes
                                     System.arraycopy(contentBytes, 0, messageBytes, 4, contentLength);
                                     ctx.writeAndFlush(ctx.alloc().buffer(messageBytes.length).writeBytes(messageBytes));
                                 }

                                 @Override
                                 protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                     System.out.println("Client received: " + msg);
                                 }

                                 @Override
                                 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                     cause.printStackTrace();
                                     ctx.close();
                                 }
                             }
                     );
                 }
             });

            ChannelFuture f = b.connect(host, port).sync();

            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080;
        new LengthFieldBasedFrameDecoderClient(host, port).run();
    }
}

在这个客户端代码中,sendMessage 方法负责构建消息。它首先将消息内容转换为字节数组,然后创建一个包含长度字段和内容的新字节数组。长度字段使用 4 个字节表示,并使用大端字节序进行编码。最后,将长度字段和内容复制到新的字节数组中,并通过 writeAndFlush 方法发送到服务器。

运行结果:

如果运行上述服务器和客户端代码,你将会看到以下输出:

服务器端:

Server received: Hello, Netty!
Server received: This is a longer message.

客户端:

Client received: Server response: Hello, Netty!
Client received: Server response: This is a longer message.

这个结果表明 LengthFieldBasedFrameDecoder 成功地处理了粘包拆包问题,服务器能够正确地接收和解析客户端发送的消息。

其他注意事项

  • 字节序: 在使用长度字段时,需要注意字节序的问题。如果发送端和接收端的字节序不一致,会导致长度字段解析错误。可以使用 ByteBuf.order() 方法来设置字节序。
  • 最大帧长度: maxFrameLength 参数非常重要,它可以防止接收端接收过大的数据包,从而避免内存溢出等问题。
  • 安全性: 在实际应用中,需要对接收到的数据进行安全校验,防止恶意攻击。

更加复杂的例子

假设消息格式如下:

字段 长度(字节) 描述
Magic Code 2 魔数,用于标识消息类型。
Version 1 协议版本号。
Length 2 消息体的长度 (不包含 Magic Code, Version, Length 本身)
Command Type 1 命令类型。
Serial Number 4 序列号。
Payload (Content) N 消息体。

那么 LengthFieldBasedFrameDecoder 的参数应该如何设置呢?

  • maxFrameLength: 消息的最大长度,例如 1024。
  • lengthFieldOffset: 长度字段的偏移量,这里是 2 + 1 = 3,因为长度字段在消息头的第4个字节开始。
  • lengthFieldLength: 长度字段的长度,这里是 2。
  • lengthAdjustment: 长度调整值。 由于长度字段表示的是消息体的长度,而我们希望解码后的消息包含整个消息头,因此需要将长度字段的值加上消息头中除了Length字段之外的长度。 所以 lengthAdjustment 的值是 2 + 1 + 1 + 4 = 8。
  • initialBytesToStrip: 丢弃的起始字节数。 如果我们希望解码后的消息只包含消息体,那么 initialBytesToStrip 的值应该是 2 + 1 + 2 + 1 + 4 = 10。
  • failFast: 通常设置为 true,以便在消息长度超过限制时快速失败。

所以,对应的代码可以修改为:

ch.pipeline().addLast(
    new LengthFieldBasedFrameDecoder(1024, 3, 2, 8, 10),
    //... 其他handler
);

进一步理解 LengthAdjustment 和 InitialBytesToStrip

  • LengthAdjustment 的作用: 它用于补偿 lengthFieldLength 中定义的长度字段所指示长度与实际需要读取的完整帧长度之间的差异。如果 lengthFieldLength 字段的值仅代表消息体的长度,而你希望解码器输出包含消息头和消息体的完整帧,那么就需要用 lengthAdjustment 加上消息头中除长度字段以外所有字段的长度。
  • InitialBytesToStrip 的作用: 它用于在解码后,从帧的开头移除指定数量的字节。 这通常用于移除消息头,只保留消息体。 如果你希望解码器输出的仅仅是消息体,那么你需要设置 initialBytesToStrip 为消息头的总长度(包括长度字段)。

通过精心设置这两个参数,你可以灵活地控制解码器输出的内容,使其满足你的特定需求。

使用 LengthFieldPrepender 编码

LengthFieldBasedFrameDecoder 相对应,Netty 提供了 LengthFieldPrepender 编码器,用于在消息的前面添加长度字段。 它的作用与 LengthFieldBasedFrameDecoder 相反,用于在发送数据时,自动添加长度信息,方便接收端进行解码。

LengthFieldPrepender 的构造函数参数包括:

  • lengthFieldLength: 长度字段的长度(字节数)。常用的值包括 1, 2, 3, 4, or 8。
  • lengthAdjustment (可选): 长度调整值,默认为0。
  • lengthIncludesLengthFieldLength (可选): 是否包含长度字段本身的长度,默认为 false。

例如,如果想发送的消息格式是 [length][content],长度字段占用 4 个字节,并且包含 content 的长度,那么可以这样使用:

ch.pipeline().addLast(new LengthFieldPrepender(4));

如果希望长度字段包含自身长度,可以:

ch.pipeline().addLast(new LengthFieldPrepender(4, false)); // 或者 true,取决于具体需求

灵活运用,组合使用

LengthFieldBasedFrameDecoderLengthFieldPrepender 可以灵活组合使用,以满足各种复杂的协议需求。 在实际项目中,需要根据具体的协议格式,仔细分析各个参数的含义,才能正确地使用这两个工具类。

网络传输的可靠保障

LengthFieldBasedFrameDecoder 的正确使用能够有效解决Netty网络编程中的粘包拆包问题,保证网络传输的可靠性。通过灵活调整各个参数,可以适应各种复杂的协议格式,为构建高性能、高可靠性的网络应用提供了强有力的支持。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注