Netty中粘包和拆包的解决方案

Stella981
• 阅读 754

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。

TCP粘包和拆包

TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

Netty中粘包和拆包的解决方案

如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
  4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

TCP粘包和拆包产生的原因

数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

Netty中粘包和拆包的解决方案

详细来说,造成粘包和拆包的原因主要有以下三个:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小
  2. 进行MSS大小的TCP分段
  3. 以太网帧的payload大于MTU进行IP分片。

Netty中粘包和拆包的解决方案

粘包和拆包的解决方法

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

  1. 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
  2. 将回车换行符作为消息结束符
  3. 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  4. 通过在消息头中定义长度字段来标识消息的总长度

Netty中的粘包和拆包解决方案

针对上一小节描述的粘包和拆包的解决方案,对于拆包问题比较简单,用户可以自己定义自己的编码器进行处理,Netty并没有提供相应的组件。对于粘包的问题,由于拆包比较复杂,代码比较处理比较繁琐,Netty提供了4种解码器来解决,分别如下:

  1. 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
  2. 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
  3. 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
  4. 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度

以上解码器在使用时只需要添加到Netty的责任链中即可,大多数情况下这4种解码器都可以满足了,当然除了以上4种解码器,用户也可以自定义自己的解码器进行处理。具体可以参考以下代码示例:

// Server主程序
public class XNettyServer {
  public static void main(String[] args) throws Exception {
    // accept 处理连接的线程池
    NioEventLoopGroup acceptGroup = new NioEventLoopGroup();
    // read io 处理数据的线程池
    NioEventLoopGroup readGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      serverBootstrap
          .group(acceptGroup, readGroup)
          .channel(NioServerSocketChannel.class)
          .childHandler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  ChannelPipeline pipeline = ch.pipeline();

                  // 增加解码器
                  pipeline.addLast(new XDecoder());

                  // 打印出内容 handdler
                  pipeline.addLast(new XHandler());
                }
              });
      System.out.println("启动成功,端口 7777");
      serverBootstrap.bind(7777).sync().channel().closeFuture().sync();
    } finally {
      acceptGroup.shutdownGracefully();
      readGroup.shutdownGracefully();
    }
  }
}


// 解码器
public class XDecoder extends ByteToMessageDecoder {

  static final int PACKET_SIZE = 220;

  // 用来临时保留没有处理过的请求报文
  ByteBuf tempMsg = Unpooled.buffer();

  /**
   * @param ctx
   * @param in 请求的数据
   * @param out 将粘在一起的报文拆分后的结果保留起来
   * @throws Exception
   */
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    System.out.println(Thread.currentThread() + "收到了一次数据包,长度是:" + in.readableBytes());

    // 合并报文
    ByteBuf message = null;
    int tmpMsgSize = tempMsg.readableBytes();
    // 如果暂存有上一次余下的请求报文,则合并
    if (tmpMsgSize > 0) {
      message = Unpooled.buffer();
      message.writeBytes(tempMsg);
      message.writeBytes(in);
      System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
    } else {
      message = in;
    }

    int size = message.readableBytes();
    int counter = size / PACKET_SIZE;
    for (int i = 0; i < counter; i++) {
      byte[] request = new byte[PACKET_SIZE];
      // 每次从总的消息中读取220个字节的数据
      message.readBytes(request);

      // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
      out.add(Unpooled.copiedBuffer(request));
    }

    // 多余的报文存起来
    // 第一个报文: i+  暂存
    // 第二个报文: 1 与第一次
    size = message.readableBytes();
    if (size != 0) {
      System.out.println("多余的数据长度:" + size);
      // 剩下来的数据放到tempMsg暂存
      tempMsg.clear();
      tempMsg.writeBytes(message.readBytes(size));
    }
  }
}


// 处理器
public class XHandler extends ChannelInboundHandlerAdapter {

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
    byte[] content = new byte[byteBuf.readableBytes()];
    byteBuf.readBytes(content);
    System.out.println(Thread.currentThread() + ": 最终打印" + new String(content));
    ((ByteBuf) msg).release();
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}

Netty中粘包和拆包的解决方案

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
2年前
tcp粘包与udp丢包的原因
tcp粘包与udp丢包的原因一,什么是tcp粘包与udp丢包TCP是面向流的, 流要说明就像河水一样, 只要有水, 就会一直流向低处, 不会间断. TCP为了提高传输效率, 发送数据的时候, 并不是直接发送数据到网路, 而是先暂存到系统缓冲, 超过时间或者缓冲满了, 才把缓冲区的内容发送
Stella981 Stella981
2年前
Netty使用解码器Decoder解决TCP粘包和拆包问题
解码器Decoder和ChannelHandler的关系netty的解码器通常是继承自ByteToMessageDecoder,而它又是继承自ChannelInboundHandlerAdapter,其实也是一种ChannelHandler和我们自定义的ChannelHandler一样都是来处理进
Wesley13 Wesley13
2年前
TCP协议粘包问题详解
TCP协议粘包问题详解前言在本章节中,我们将探讨TCP协议基于流式传输的最大一个问题,即粘包问题。本章主要介绍TCP粘包的原理与其三种解决粘包的方案。并且还会介绍为什么UDP协议不会产生粘包。基于TCP协议的socket实现远程命令输入我们准备做一个可以在Clie
Stella981 Stella981
2年前
Dubbo处理TCP拆包粘包问题
Dubbo处理TCP拆包粘包问题在TCP网络传输工程中,由于TCP包的缓存大小限制,每次请求数据有可能不在一个TCP包里面,或者也可能多个请求的数据在一个TCP包里面。那么如果合理的decode接受的TCP数据很重要,需要考虑TCP拆包和粘包的问题。我们知道在Netty提供了各种Decoder来解决此类问题,比如LineBasedFrameDecod
Stella981 Stella981
2年前
Netty解决TCP粘包和拆包问题的四种方案
 在RPC框架中,TCP粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接。由于微服务往对方发送信息的时候,所有的请求都是使用的同一个连接,这样就会产生粘包和拆包的问题。本文首先会对TCP粘包和拆包问题进行描述,然后介绍其常用的解决方案,最后会对Netty提供的几种解决方案进
Stella981 Stella981
2年前
Netty概述
1.Netty概念异步事件驱动框架,用于快速开发高性能服务端和客户端封装了JDK底层BIO和NIO模型,提供高度可用的API自带编解码器解决拆包粘包问题,用户只用关心业务逻辑精心设计的reactor线程模型支持高并发海量连接自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
Stella981 Stella981
2年前
Netty(三) 什么是 TCP 拆、粘包?如何解决?
!(https://ws3.sinaimg.cn/large/006tKfTcgy1ftuojmzbvxj31kw11xqbq.jpg)前言记得前段时间我们生产上的一个网关出现了故障。这个网关逻辑非常简单,就是接收客户端的请求然后解析报文最后发送短信。但这个请求并不是常见的HTTP,而是利用Netty自定义的协议。有
Wesley13 Wesley13
2年前
34.TCP取样器
阅读文本大概需要3分钟。1、TCP取样器的作用   TCP取样器作用就是通过TCP/IP协议来连接服务器,然后发送数据和接收数据。2、TCP取样器详解!(https://oscimg.oschina.net/oscnet/32a9b19ba1db00f321d22a0f33bcfb68a0d.png)TCPClien
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_