前言
今天主要说说 Netty 什么是粘包分包现象,产生的原因是什么,以及粘包分包现象演示,最后给出相关的解决方案。
粘包分包现象
先上一张图:
上面是粘包现象,假设有两个客户端,分别发送 hello, I am client 1
和 hello, I am client 2
数据流,当发生粘包现象后,后台拿到的是一条数据流,且粘在一起。
上面是分包现象,假设有一个客户端,发送 hello, I am client
数据流,当发生分包现象后,后台拿到了两条数据流。
为什么会发生粘包分包
要知道,在 TCP 中,只有流的概念,没有包的概念,这是根本原因。
之所以会发生粘包现象,服务端和客户端都有原因:
-
1.服务端:服务端收到的数据放在系统接受的缓冲区,用户进程从该缓冲区中拿数据。
-
2.客户端:客户端在使用 TCP 协议进行传输时,TCP 为了提高传输效率,要收集到足够多的数据后才发送数据。
上面是粘包发生的原因,再说说分包发生的原因:
-
1.应用程序写入的字节大小大于套接字发送缓冲区的大小;
-
2.当 TCP 报文长度 - TCP 头部长度 > mss(最大报文长度), 会进行 mss(最大报文长度) 大小的 TCP 分段
-
3.以太网帧的 playload(净荷) 大于 MTU (1500 字节) 进行 IP 分片
Netty 粘包分包现象演示
编写测试代码:
Server.java
public class Server {
public static void main(String[] args) {
// 服务类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// boss 线程池,主要监听端口和获取 worker 线程以及分配 socketChannel 给 worker 线程
ExecutorService boss = Executors.newCachedThreadPool();
// worker 线程池,负责数据读写
ExecutorService worker = Executors.newCachedThreadPool();
// 设置 niosocket 工厂
serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 设置管道工厂
serverBootstrap.setPipelineFactory(() -> {
ChannelPipeline pipeline = Channels.pipeline();
// 管道过滤器
pipeline.addLast("myHandler", new ServerHandler());
return pipeline;
});
// 服务端绑定端口
serverBootstrap.bind(new InetSocketAddress(8888));
}
}
ServerHandler.java:
public class ServerHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer channelBuffer = (ChannelBuffer) e.getMessage();
byte[] bytes = channelBuffer.array();
System.out.println("server receive data:" + new String(bytes));
}
}
客户端, client.java:
public class Client {
public static void main(String[] args) throws InterruptedException {
// 服务类
ClientBootstrap clientBootstrap = new ClientBootstrap();
// 线程池
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
// socket 工厂
clientBootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
// 管道工厂
clientBootstrap.setPipelineFactory(() -> {
ChannelPipeline channelPipeline = Channels.pipeline();
channelPipeline.addLast("1", new StringEncoder());
channelPipeline.addLast("2", new ClientHandler());
return channelPipeline;
});
// 连接服务端
clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 8888)).sync();}
}
ClientHandler.java
public class ClientHandler extends SimpleChannelHandler {
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = ctx.getChannel();
String msg = "hello, i am client";
for (int i = 0; i < 1000; i++) {
channel.write(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
}
}
先运行 Server.class, 再运行 Client.class, 从代码中,我们可以看到,在客户端通道连接成功后,会发送 1000
个 hello, I am client
, 服务端接收结果如下:
可以看到服务端的输出信息粘包分包现象已经很明显了。
解决方案
服务端和客户端约定好稳定的数据包结构,然后客户端根据约定的数据包结构发送数据,服务端根据约定的数据包结构来读取数据。
假设我们约定协议如下:
我们定义客户端发送数据包的协议为 长度 + 数据
,长度为字节流的长度,后面才跟上具体的数据,下面看看改写过后,遵循这种协议的代码:
首先看看客户端的代码:
public class ClientHandler extends SimpleChannelHandler {
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel channel = ctx.getChannel();
String msg = "hello, i am client";
byte[] bytes = msg.getBytes();
// 定义数据包,结构为:长度 + 数据
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(); // 动态缓存
// 1.写长度
channelBuffer.writeInt(bytes.length); // 4 个字节
// 2.写数据
channelBuffer.writeBytes(bytes);
for (int i = 0; i < 1000; i++) {
channel.write(channelBuffer);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
}
}
同样是发送 1000 个 hello, I am client
, 这次采用 长度 + 数据
的格式,再看看服务端如何解析,服务端解析的话,需要自定义解码器,我们定义一个类叫 MyDecoder.java
, 需要让它继承自 FrameDecoder
:
public class MyDecoder extends FrameDecoder {
private static final int BASE_LENGTH = 4;
@Override
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer) throws Exception {
// 收到数据后,先判断 buffer 中的可读数据长度是否大于数据包的基本长度
if (channelBuffer.readableBytes() > BASE_LENGTH) {
// 说明此时有数据包来临
// 做标记(记住当前读指针的位置)
channelBuffer.markReaderIndex();
// 1.读长度
int dataLength = channelBuffer.readInt();
// 2.读数据本身
if (channelBuffer.readableBytes() < dataLength) {
// 说明数据本身长度还不够,要继续等待后面的数据到来
// 还原指针位置
channelBuffer.resetReaderIndex();
return null;
}
// 此时说明数据包已经完整
// 2.读数据本身
byte[] dst = new byte[dataLength];
channelBuffer.readBytes(dst);
// 对于此时还没有读取完的数据,会做什么处理?留个疑问
return new String(dst);
}
// return null 表示此时的数据包不完整, 需要继续等待下一个数据包的到来
return null;
}
}
Server.java 添加解码器:
public static void main(String[] args) {
// 服务类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// boss 线程池,主要监听端口和获取 worker 线程以及分配 socketChannel 给 worker 线程
ExecutorService boss = Executors.newCachedThreadPool();
// worker 线程池,负责数据读写
ExecutorService worker = Executors.newCachedThreadPool();
// 设置 niosocket 工厂
serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 设置管道工厂
serverBootstrap.setPipelineFactory(() -> {
ChannelPipeline pipeline = Channels.pipeline();
// 管道过滤器
pipeline.addLast("myDecoder", new MyDecoder());
pipeline.addLast("myHandler", new ServerHandler());
return pipeline;
});
// 服务端绑定端口
serverBootstrap.bind(new InetSocketAddress(8888));
}
服务端会先读取自定义数据包的长度,如果说可读长度小于 4 个字节,则认为,不满足一个完整数据包,继续等待下一个数据包的到来,如果说大于 4 个字节,则读取数据包的长度,再读取数据本身的实际长度,如果说实际长度是小于头部写的长度,则认为还不是一个完整的数据包,继续 return null
, 等待下一个数据包的到来,反之,则读取头部定义的长度,完成一次完整数据包的读取流程。
看看控制台输出:
可以看到通过自定义的协议,并未出现粘包分包的现象出现,且接受的行数是 1000 行。我们已经解决了粘包分包的问题。
抛出几个疑问?
-
1.
return null
为什么可以等到下一个数据包的到来? -
2.如果
buffer
中的数据大于length
, 此时buffer
中的数据还没有读完,那么剩下的数据怎么办? -
3.
return new String(dst);
怎么往下传递对象;
欲知后事如何 且听下回分解 ...