BIO/NIO 线程模型以及高性能通讯框架 Netty Reactor 模型初探

非著名 Java 工程师,欢迎关注微信公众号 : Java技术说

前言

今天我们主要来看看线程模型的发展历史,以及为什么会出现高性能通讯框架 Netty.

线程模型的演进

  • 1.传统的 BIO 线程模型(阻塞)

  • 2.多线程 BIO

  • 3.NIO(Non Block IO/ New IO)线程模型 (基于 Reactor 多路复用模型)

  • 4.Netty (主从多线程 Reactor 多路复用模型)

从代码入手,逐步分析

1.传统的 BIO 线程模型(阻塞)

假设我们需要自己手写网络编程,先采用 BIO 的线程模型:

	/**
     * 我们先用传统的 BIO 模式来实现简单的网络编程
     */
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(7777);
        System.out.println("服务端启动...");

        for(;;) {
            // 获取 socket 套接字
            // 注意:这里 accept 是个阻塞点,线程会监听端口,查看是否有客户端连接上来
            Socket socket = serverSocket.accept();
            System.out.println("有新客户端连接上来了...");

            // 获取客户端的输入流
            InputStream is = socket.getInputStream();
            byte[] cache = new byte[1024];
            for (;;) {
                // 循环读取数据
                // 这里又是一个阻塞点
                final int len = is.read(cache);
                if (len == -1) {
                    break;
                }

                String info = new String(cache, 0, len, "UTF-8");
                System.out.println(info);
            }
        }

    }

我们打开两个命令行窗口,通过 telnet localhost 7777 来连接模拟的服务端,看看效果:

在看看服务端日志输出:

缺点非常明显,当一个客户端连接上时,另一个试图连接的客户端只能处于阻塞的状态,直到前一个客户端退出连接,它才能成功连接上。那么如何解决这个问题呢?看下面这种解决方案。

2.多线程 BIO

先看代码实现:

	/** 线程数 */
    private static final int THREAD_NUMS = Runtime.getRuntime().availableProcessors() * 2;

    /**
     * 采用线程池异步的方式
     */
    public static void main(String[] args) throws IOException {
        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_NUMS);
        ServerSocket serverSocket = new ServerSocket(7777);
        System.out.println("服务端启动...");

        for(;;) {
            // 获取 socket 套接字
            // 注意:这里 accept 是个阻塞点,线程会监听端口,查看是否有客户端连接上来
            Socket socket = serverSocket.accept();
            System.out.println("有新客户端连接上来了...");

            threadPool.submit(() -> {
                // 获取客户端的输入流
                try {
                    InputStream is = socket.getInputStream();
                    byte[] cache = new byte[1024];
                    for (;;) {
                        // 循环读取数据
                        // 这里又是一个阻塞点
                        final int len = is.read(cache);
                        if (len == -1) {
                            break;
                        }

                        String info = new String(cache, 0, len, "UTF-8");
                        System.out.println(info);
                    }
                } catch (IOException e) {
                    e.printStackTrace();}

            });}
    }

启动 main 函数,我们再次试图启动两个客户端连接,看看效果:

通过线程池,我们可以同时连接多个客户端,解决单线程模式下阻塞的问题,但是这种解决方案还是存在问题,首先是实际上单个线程还是只能处理一个连接的问题,就好比餐厅服务员,它应该能够同时应付多个客人,而不是从始至终都是为一个客人服务,直到这个客人吃完了,离开了餐厅,才为下一个客人服务,显然这样是不合理的,效率非常低下。

另外一个问题是,看似,我们启用了线程池,任务都是异步的去执行,但是如果并发数非常多的情况下,分配的线程都在工作的状态时,线程池底层会将后面的任务放置到阻塞队列中。

为了解决上述问题,NIO 出现了

3.NIO(Non Block IO/ New IO)线程模型 (基于事件)

看看我们通过 JDK 自带的 NIO 框架如何实现上述功能:

public class NioSocket {

    /** 通道管理器,多用户共用 */
    private Selector selector;

    /**
     * 初始化服务端 ServerSocketChannel 通道,并初始化选择器
     * 获得一个 ServerSocket 通道,并对该通道做一些初始化工作
     * @param port
     */
    private void initServer(int port) throws IOException {
        // 获取 ServerSocket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 设置通道为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 将该通道对应的 ServerSocket 绑定到 port 端口
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        // 获得一个通道选择器
        this.selector = Selector.open();
        // 将通道选择器和该通道绑定,并为该通道注册 SelectionKey.OP_ACCEPT 事件,注册该事件后
        // 当该事件到达时,selector.select()会返回,如果该事件没有到达,则 selector.select() 会一直阻塞
        serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务端启动成功...");

        /**
         * 补充:
         *
         * SelectionKey 中定义的 4 种事件:
         *
         * <ul>
         *      <li>OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了 </li>
         *      <li>OP_CONNECT —— 连接就绪事件,表示客户与服务器的连接已经建立成功 </li>
         *      <li>OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)</li>
         *      <li>OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)</li>
         * </ul>
         */
    }

    /**
     * 采取轮询的方式监听 selector 上是否有需要处理的事件,有则做相应的处理
     */
    private void listenSelector() throws IOException {
        // 轮询
        for (;;) {
            // 当注册的事件到达时,方法返回,否则,一直处于阻塞的状态
            this.selector.select();
            // nio 作为非阻塞式,上面这一步,我们是可以设置为非阻塞的,代码如下
//            this.selector.select(1000); // 无论是否有读写事件,selector 每隔 1s 被唤醒
//            this.selector.selectNow(); // 无论是否有读写事件,直接返回

            Iterator<?> iteratorKey = this.selector.selectedKeys().iterator();
            while (iteratorKey.hasNext()) {
                SelectionKey selectionKey = (SelectionKey) iteratorKey.next();
                // 删除已选的 key, 防止重复消费同一个事件
                iteratorKey.remove();
                // 处理事件
                handler(selectionKey);
            }
        }
    }

    /**
     * 处理事件
     * @param selectionKey
     */
    private void handler(SelectionKey selectionKey) throws IOException {
        // 处理客户端连接事件
        if (selectionKey.isAcceptable()) {
            System.out.println("有新的客户端连接了...");
            // 获得和客户端连接的通道
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            // 完成下面操作,意味着完成 tcp 三次握手,tcp 物理链路正式建立
            SocketChannel socketChannel = serverSocketChannel.accept();
            // 设置为非阻塞
            socketChannel.configureBlocking(false);
            // 和客户端连接成功后,为了可以接受到客户端的信息,需要为该通道设置读的权限
            socketChannel.register(this.selector, SelectionKey.OP_READ);
        }
        // 处理读事件
        else if (selectionKey.isReadable()) {
            // 获取事件的读写通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            // 创建读取的缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024); // 1kb
            int readData = socketChannel.read(buffer);
            if (readData > 0) {
                // 先将缓冲区数据转换成字节数组,再转化为字符串
                String msg = new String(buffer.array(), "GBK").trim();
                System.out.println("服务端收到信息:" + msg);

                // 回写数据
                ByteBuffer writeBackBuffer = ByteBuffer.wrap("server received data.".getBytes("GBK"));
                // 将消息回传给客户端
                socketChannel.write(writeBackBuffer);
            } else {
                System.out.println("客户端关闭...");
                // SelectionKey 对象会失效,这意味着 Selector 再也不会监控与它相关的事件
                selectionKey.cancel();}
        }
    }

    /**
     * NIO 的方式再实现一遍
     */
    public static void main(String[] args) throws IOException {
        NioSocket nioSocket = new NioSocket();
        nioSocket.initServer(7777);
        nioSocket.listenSelector();}

}

NIO 中相关概念的介绍:

  • 1.Selector: 通道管理器

  • 2.ServerSocketChannel(ServerSocket): 通道 -> 只关心客户端连接事件

  • 3.SocketChannel(Socket): 关心读写事件

  • 4.SelectionKey 事件集合

上面代码中,只是单线程来实现,但是 NIO 底层是基于事件的 Reactor 多路复用模型,有着非常高的性能表现。

存在的问题:

毕竟是单线程的,在这个互联网时代,高并发场景随处可见,单线程显然无法应对,另外,也无法充分发挥服务器动则 16 核, 32 核的硬件资源,于是,Netty 就出现了。

4.Netty (主从多线程 Reactor 多路复用模型)

Netty 作为高性能通讯框架,在消息推送,即时聊天,游戏服务器,RPC 调用(Dubbo),物联网长连接通讯领域都有着得天独厚的优势。它实际上实现了多种 Reactor 模型:

  • 1.单线程 Reactor

  • 2.多线程 Reactor

  • 3.主从多线程 Reactor

默认用的是主从多线程 Reacotr 模型,接下来,我们用 Netty 多线程 Reactor 模型来实现同样的功能:

NettyServer.java

public class NettyServer {


    public static void main(String[] args) throws IOException {
        // 服务类
        ServerBootstrap bootstrap = new ServerBootstrap();
        // boss 线程,主要用于监听端口和获取 worker 线程以及分配 socketChannel 给 worker 线程
        ExecutorService boss = Executors.newCachedThreadPool();
        // worker 线程负责数据读写
        ExecutorService worker = Executors.newCachedThreadPool();
        // 设置 niosocket 工厂
        bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
        // 设置管道的工厂
        bootstrap.setPipelineFactory(() -> {
            ChannelPipeline channelPipeline = Channels.pipeline();
            // 管道过滤器
            channelPipeline.addLast("decoder", new StringDecoder());
            channelPipeline.addLast("encoder", new StringEncoder());
            channelPipeline.addLast("myServerMessageHandler", new MyServerMessageHandler());
            return channelPipeline;
        });

        bootstrap.bind(new InetSocketAddress(7777));
        System.out.println("服务端启动成功...");

    }

}

MyServerMessageHandler.java

public class MyServerMessageHandler extends SimpleChannelHandler {

    /**
     * 接受消息
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("messageReceived");
        String msg = (String) e.getMessage();
        System.out.println("服务端接收到数据:" + msg);

        // 回写给客户端
        ctx.getChannel().write("服务端接收到了您发送的数据...");
        super.messageReceived(ctx, e);
    }

    /**
     * 异常处理
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("exceptionCaught");
        super.exceptionCaught(ctx, e);
    }

    /**
     * 获取新的连接事件
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelConnected");
        super.channelConnected(ctx, e);
    }

    /**
     * 关闭通道的时候触发(必须是通道已经建立)
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelDisconnected");
        super.channelDisconnected(ctx, e);
    }

    /**
     * 通道关闭的时候触发
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        System.out.println("channelClosed");
        super.channelClosed(ctx, e);
    }
}

通过下面的结构图,看下主从多线程 Reactor 模型:

总结

今天我们主要从单线程 BIO, 多线程的 BIO, NIO, 以及 Netty 主从多线程实现了网络通讯,相信大家看了以后对 JDK 发版发展中 IO 的演进有了一个大致的印象。

发布于 2018-10-04