前言
今天我们主要来看看线程模型的发展历史,以及为什么会出现高性能通讯框架 Netty
.
线程模型的演进
-
1.传统的 BIO 线程模型(阻塞)
-
2.多线程 BIO
-
3.NIO(Non Block IO/ New IO)线程模型 (基于 Reactor 多路复用模型)
-
4.Netty (主从多线程 Reactor 多路复用模型)
从代码入手,逐步分析
1.传统的 BIO 线程模型(阻塞)
假设我们需要自己手写网络编程,先采用 BIO 的线程模型:
/**
* 我们先用传统的 BIO 模式来实现简单的网络编程
*/
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(7777);
System.out.println("服务端启动...");
for(;;) {
// 获取 socket 套接字
// 注意:这里 accept 是个阻塞点,线程会监听端口,查看是否有客户端连接上来
Socket socket = serverSocket.accept();
System.out.println("有新客户端连接上来了...");
// 获取客户端的输入流
InputStream is = socket.getInputStream();
byte[] cache = new byte[1024];
for (;;) {
// 循环读取数据
// 这里又是一个阻塞点
final int len = is.read(cache);
if (len == -1) {
break;
}
String info = new String(cache, 0, len, "UTF-8");
System.out.println(info);
}
}
}
我们打开两个命令行窗口,通过 telnet localhost 7777
来连接模拟的服务端,看看效果:
在看看服务端日志输出:
缺点非常明显,当一个客户端连接上时,另一个试图连接的客户端只能处于阻塞的状态,直到前一个客户端退出连接,它才能成功连接上。那么如何解决这个问题呢?看下面这种解决方案。
2.多线程 BIO
先看代码实现:
/** 线程数 */
private static final int THREAD_NUMS = Runtime.getRuntime().availableProcessors() * 2;
/**
* 采用线程池异步的方式
*/
public static void main(String[] args) throws IOException {
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_NUMS);
ServerSocket serverSocket = new ServerSocket(7777);
System.out.println("服务端启动...");
for(;;) {
// 获取 socket 套接字
// 注意:这里 accept 是个阻塞点,线程会监听端口,查看是否有客户端连接上来
Socket socket = serverSocket.accept();
System.out.println("有新客户端连接上来了...");
threadPool.submit(() -> {
// 获取客户端的输入流
try {
InputStream is = socket.getInputStream();
byte[] cache = new byte[1024];
for (;;) {
// 循环读取数据
// 这里又是一个阻塞点
final int len = is.read(cache);
if (len == -1) {
break;
}
String info = new String(cache, 0, len, "UTF-8");
System.out.println(info);
}
} catch (IOException e) {
e.printStackTrace();}
});}
}
启动 main 函数,我们再次试图启动两个客户端连接,看看效果:
通过线程池,我们可以同时连接多个客户端,解决单线程模式下阻塞的问题,但是这种解决方案还是存在问题,首先是实际上单个线程还是只能处理一个连接的问题,就好比餐厅服务员,它应该能够同时应付多个客人,而不是从始至终都是为一个客人服务,直到这个客人吃完了,离开了餐厅,才为下一个客人服务,显然这样是不合理的,效率非常低下。
另外一个问题是,看似,我们启用了线程池,任务都是异步的去执行,但是如果并发数非常多的情况下,分配的线程都在工作的状态时,线程池底层会将后面的任务放置到阻塞队列中。
为了解决上述问题,NIO 出现了。
3.NIO(Non Block IO/ New IO)线程模型 (基于事件)
看看我们通过 JDK 自带的 NIO 框架如何实现上述功能:
public class NioSocket {
/** 通道管理器,多用户共用 */
private Selector selector;
/**
* 初始化服务端 ServerSocketChannel 通道,并初始化选择器
* 获得一个 ServerSocket 通道,并对该通道做一些初始化工作
* @param port
*/
private void initServer(int port) throws IOException {
// 获取 ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置通道为非阻塞
serverSocketChannel.configureBlocking(false);
// 将该通道对应的 ServerSocket 绑定到 port 端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 获得一个通道选择器
this.selector = Selector.open();
// 将通道选择器和该通道绑定,并为该通道注册 SelectionKey.OP_ACCEPT 事件,注册该事件后
// 当该事件到达时,selector.select()会返回,如果该事件没有到达,则 selector.select() 会一直阻塞
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功...");
/**
* 补充:
*
* SelectionKey 中定义的 4 种事件:
*
* <ul>
* <li>OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了 </li>
* <li>OP_CONNECT —— 连接就绪事件,表示客户与服务器的连接已经建立成功 </li>
* <li>OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)</li>
* <li>OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)</li>
* </ul>
*/
}
/**
* 采取轮询的方式监听 selector 上是否有需要处理的事件,有则做相应的处理
*/
private void listenSelector() throws IOException {
// 轮询
for (;;) {
// 当注册的事件到达时,方法返回,否则,一直处于阻塞的状态
this.selector.select();
// nio 作为非阻塞式,上面这一步,我们是可以设置为非阻塞的,代码如下
// this.selector.select(1000); // 无论是否有读写事件,selector 每隔 1s 被唤醒
// this.selector.selectNow(); // 无论是否有读写事件,直接返回
Iterator<?> iteratorKey = this.selector.selectedKeys().iterator();
while (iteratorKey.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iteratorKey.next();
// 删除已选的 key, 防止重复消费同一个事件
iteratorKey.remove();
// 处理事件
handler(selectionKey);
}
}
}
/**
* 处理事件
* @param selectionKey
*/
private void handler(SelectionKey selectionKey) throws IOException {
// 处理客户端连接事件
if (selectionKey.isAcceptable()) {
System.out.println("有新的客户端连接了...");
// 获得和客户端连接的通道
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
// 完成下面操作,意味着完成 tcp 三次握手,tcp 物理链路正式建立
SocketChannel socketChannel = serverSocketChannel.accept();
// 设置为非阻塞
socketChannel.configureBlocking(false);
// 和客户端连接成功后,为了可以接受到客户端的信息,需要为该通道设置读的权限
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
// 处理读事件
else if (selectionKey.isReadable()) {
// 获取事件的读写通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 创建读取的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024); // 1kb
int readData = socketChannel.read(buffer);
if (readData > 0) {
// 先将缓冲区数据转换成字节数组,再转化为字符串
String msg = new String(buffer.array(), "GBK").trim();
System.out.println("服务端收到信息:" + msg);
// 回写数据
ByteBuffer writeBackBuffer = ByteBuffer.wrap("server received data.".getBytes("GBK"));
// 将消息回传给客户端
socketChannel.write(writeBackBuffer);
} else {
System.out.println("客户端关闭...");
// SelectionKey 对象会失效,这意味着 Selector 再也不会监控与它相关的事件
selectionKey.cancel();}
}
}
/**
* NIO 的方式再实现一遍
*/
public static void main(String[] args) throws IOException {
NioSocket nioSocket = new NioSocket();
nioSocket.initServer(7777);
nioSocket.listenSelector();}
}
NIO 中相关概念的介绍:
-
1.Selector: 通道管理器
-
2.ServerSocketChannel(ServerSocket): 通道 -> 只关心客户端连接事件
-
3.SocketChannel(Socket): 关心读写事件
-
4.SelectionKey 事件集合
上面代码中,只是单线程来实现,但是 NIO
底层是基于事件的 Reactor
多路复用模型,有着非常高的性能表现。
存在的问题:
毕竟是单线程的,在这个互联网时代,高并发场景随处可见,单线程显然无法应对,另外,也无法充分发挥服务器动则 16 核, 32 核的硬件资源,于是,Netty 就出现了。
4.Netty (主从多线程 Reactor 多路复用模型)
Netty
作为高性能通讯框架,在消息推送,即时聊天,游戏服务器,RPC 调用(Dubbo),物联网长连接通讯领域都有着得天独厚的优势。它实际上实现了多种 Reactor
模型:
-
1.单线程
Reactor
-
2.多线程
Reactor
-
3.主从多线程
Reactor
默认用的是主从多线程 Reacotr
模型,接下来,我们用 Netty
多线程 Reactor
模型来实现同样的功能:
NettyServer.java
public class NettyServer {
public static void main(String[] args) throws IOException {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss 线程,主要用于监听端口和获取 worker 线程以及分配 socketChannel 给 worker 线程
ExecutorService boss = Executors.newCachedThreadPool();
// worker 线程负责数据读写
ExecutorService worker = Executors.newCachedThreadPool();
// 设置 niosocket 工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
bootstrap.setPipelineFactory(() -> {
ChannelPipeline channelPipeline = Channels.pipeline();
// 管道过滤器
channelPipeline.addLast("decoder", new StringDecoder());
channelPipeline.addLast("encoder", new StringEncoder());
channelPipeline.addLast("myServerMessageHandler", new MyServerMessageHandler());
return channelPipeline;
});
bootstrap.bind(new InetSocketAddress(7777));
System.out.println("服务端启动成功...");
}
}
MyServerMessageHandler.java
public class MyServerMessageHandler extends SimpleChannelHandler {
/**
* 接受消息
* @param ctx
* @param e
* @throws Exception
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("messageReceived");
String msg = (String) e.getMessage();
System.out.println("服务端接收到数据:" + msg);
// 回写给客户端
ctx.getChannel().write("服务端接收到了您发送的数据...");
super.messageReceived(ctx, e);
}
/**
* 异常处理
* @param ctx
* @param e
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println("exceptionCaught");
super.exceptionCaught(ctx, e);
}
/**
* 获取新的连接事件
* @param ctx
* @param e
* @throws Exception
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelConnected");
super.channelConnected(ctx, e);
}
/**
* 关闭通道的时候触发(必须是通道已经建立)
* @param ctx
* @param e
* @throws Exception
*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelDisconnected");
super.channelDisconnected(ctx, e);
}
/**
* 通道关闭的时候触发
* @param ctx
* @param e
* @throws Exception
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("channelClosed");
super.channelClosed(ctx, e);
}
}
通过下面的结构图,看下主从多线程 Reactor 模型:
总结
今天我们主要从单线程 BIO, 多线程的 BIO, NIO, 以及 Netty 主从多线程实现了网络通讯,相信大家看了以后对 JDK 发版发展中 IO 的演进有了一个大致的印象。