Netty 粘包分包现象及解决方案

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

前言

今天主要说说 Netty 什么是粘包分包现象,产生的原因是什么,以及粘包分包现象演示,最后给出相关的解决方案。

粘包分包现象

先上一张图:

上面是粘包现象,假设有两个客户端,分别发送 hello, I am client 1hello, I am client 2 数据流,当发生粘包现象后,后台拿到的是一条数据流,且粘在一起。

上面是分包现象,假设有一个客户端,发送 hello, I am client 数据流,当发生分包现象后,后台拿到了两条数据流。

为什么会发生粘包分包

要知道,在 TCP 中,只有流的概念,没有包的概念,这是根本原因。

之所以会发生粘包现象,服务端和客户端都有原因:

  • 1.服务端:服务端收到的数据放在系统接受的缓冲区,用户进程从该缓冲区中拿数据。

  • 2.客户端:客户端在使用 TCP 协议进行传输时,TCP 为了提高传输效率,要收集到足够多的数据后才发送数据。

上面是粘包发生的原因,再说说分包发生的原因:

  • 1.应用程序写入的字节大小大于套接字发送缓冲区的大小;

  • 2.当 TCP 报文长度 - TCP 头部长度 > mss(最大报文长度), 会进行 mss(最大报文长度) 大小的 TCP 分段

  • 3.以太网帧的 playload(净荷) 大于 MTU (1500 字节) 进行 IP 分片

Netty 粘包分包现象演示

编写测试代码:

Server.java

public class Server {

    public static void main(String[] args) {
        // 服务类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // boss 线程池,主要监听端口和获取 worker 线程以及分配 socketChannel 给 worker 线程
        ExecutorService boss = Executors.newCachedThreadPool();
        // worker 线程池,负责数据读写
        ExecutorService worker = Executors.newCachedThreadPool();
        // 设置 niosocket 工厂
        serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
        // 设置管道工厂
        serverBootstrap.setPipelineFactory(() -> {
            ChannelPipeline pipeline = Channels.pipeline();
            // 管道过滤器
            pipeline.addLast("myHandler", new ServerHandler());
            return pipeline;
        });
        // 服务端绑定端口
        serverBootstrap.bind(new InetSocketAddress(8888));
    }
}

ServerHandler.java:

public class ServerHandler extends SimpleChannelHandler {
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ChannelBuffer channelBuffer = (ChannelBuffer) e.getMessage();
        byte[] bytes = channelBuffer.array();
        System.out.println("server receive data:" + new String(bytes));
    }
}

客户端, client.java:

public class Client {

    public static void main(String[] args) throws InterruptedException {
        // 服务类
        ClientBootstrap clientBootstrap = new ClientBootstrap();
        // 线程池
        ExecutorService boss = Executors.newCachedThreadPool();
        ExecutorService worker = Executors.newCachedThreadPool();
        // socket 工厂
        clientBootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
        // 管道工厂
        clientBootstrap.setPipelineFactory(() -> {
            ChannelPipeline channelPipeline = Channels.pipeline();
            channelPipeline.addLast("1", new StringEncoder());
            channelPipeline.addLast("2", new ClientHandler());
            return channelPipeline;
        });
        // 连接服务端
        clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 8888)).sync();}
}

ClientHandler.java

public class ClientHandler extends SimpleChannelHandler {
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        Channel channel = ctx.getChannel();
        String msg = "hello, i am client";
        for (int i = 0; i < 1000; i++) {
            channel.write(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
    }
}

先运行 Server.class, 再运行 Client.class, 从代码中,我们可以看到,在客户端通道连接成功后,会发送 1000hello, I am client, 服务端接收结果如下:

可以看到服务端的输出信息粘包分包现象已经很明显了。

解决方案

服务端和客户端约定好稳定的数据包结构,然后客户端根据约定的数据包结构发送数据,服务端根据约定的数据包结构来读取数据。

假设我们约定协议如下:

我们定义客户端发送数据包的协议为 长度 + 数据,长度为字节流的长度,后面才跟上具体的数据,下面看看改写过后,遵循这种协议的代码:

首先看看客户端的代码:

public class ClientHandler extends SimpleChannelHandler {
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        Channel channel = ctx.getChannel();
        String msg = "hello, i am client";
        byte[] bytes = msg.getBytes();
        // 定义数据包,结构为:长度 + 数据
        ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(); // 动态缓存
        // 1.写长度
        channelBuffer.writeInt(bytes.length); // 4 个字节
        // 2.写数据
        channelBuffer.writeBytes(bytes);
        for (int i = 0; i < 1000; i++) {
            channel.write(channelBuffer);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
    }
}

同样是发送 1000 个 hello, I am client, 这次采用 长度 + 数据 的格式,再看看服务端如何解析,服务端解析的话,需要自定义解码器,我们定义一个类叫 MyDecoder.java, 需要让它继承自 FrameDecoder:

public class MyDecoder extends FrameDecoder {

    private static final int BASE_LENGTH = 4;

    @Override
    protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer) throws Exception {
        // 收到数据后,先判断 buffer 中的可读数据长度是否大于数据包的基本长度
        if (channelBuffer.readableBytes() > BASE_LENGTH) {
            // 说明此时有数据包来临
            // 做标记(记住当前读指针的位置)
            channelBuffer.markReaderIndex();
            // 1.读长度
            int dataLength = channelBuffer.readInt();
            // 2.读数据本身
            if (channelBuffer.readableBytes() < dataLength) {
                // 说明数据本身长度还不够,要继续等待后面的数据到来
                // 还原指针位置
                channelBuffer.resetReaderIndex();
                return null;
            }

            // 此时说明数据包已经完整
            // 2.读数据本身
            byte[] dst = new byte[dataLength];
            channelBuffer.readBytes(dst);
            // 对于此时还没有读取完的数据,会做什么处理?留个疑问
            return new String(dst);

        }

        // return null 表示此时的数据包不完整, 需要继续等待下一个数据包的到来
        return null;
    }
}

Server.java 添加解码器:

public static void main(String[] args) {
        // 服务类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // boss 线程池,主要监听端口和获取 worker 线程以及分配 socketChannel 给 worker 线程
        ExecutorService boss = Executors.newCachedThreadPool();
        // worker 线程池,负责数据读写
        ExecutorService worker = Executors.newCachedThreadPool();
        // 设置 niosocket 工厂
        serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
        // 设置管道工厂
        serverBootstrap.setPipelineFactory(() -> {
            ChannelPipeline pipeline = Channels.pipeline();
            // 管道过滤器
            pipeline.addLast("myDecoder", new MyDecoder());
            pipeline.addLast("myHandler", new ServerHandler());
            return pipeline;
        });
        // 服务端绑定端口
        serverBootstrap.bind(new InetSocketAddress(8888));
    }

服务端会先读取自定义数据包的长度,如果说可读长度小于 4 个字节,则认为,不满足一个完整数据包,继续等待下一个数据包的到来,如果说大于 4 个字节,则读取数据包的长度,再读取数据本身的实际长度,如果说实际长度是小于头部写的长度,则认为还不是一个完整的数据包,继续 return null, 等待下一个数据包的到来,反之,则读取头部定义的长度,完成一次完整数据包的读取流程。

看看控制台输出:

可以看到通过自定义的协议,并未出现粘包分包的现象出现,且接受的行数是 1000 行。我们已经解决了粘包分包的问题。

抛出几个疑问?

  • 1.return null 为什么可以等到下一个数据包的到来?

  • 2.如果 buffer 中的数据大于 length, 此时 buffer 中的数据还没有读完,那么剩下的数据怎么办?

  • 3.return new String(dst); 怎么往下传递对象;

欲知后事如何 且听下回分解 ...

相关文章