1. Netty系列——NIO

发布于 2022年 01月 17日 13:43

Netty系列


1. 三种IO模型

在Java中,有三种IO模型: BIO,NIO,AIO,我们先来看下他们的区别

  • BIO(Blocking I/O):BIO也就是传统的同步阻塞IO模型,对应Java.io包,它提供了很多IO功能,比如输入输出流,对文件进行操作。在网络编程(Socket通信)中也同样进行IO操作。
  • NIO(New I/O): NIO是一种同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架,对应 java.nio 包,提供了 Channel , Selector,Buffer等抽象。在NIO中, 抛弃了传统的 I/O流, 而是引入了Channel和Buffer的概念. 在NIO中, 只能从Channel中读取数据到Buffer中或将数据 Buffer 中写入到 Channel
  • AIO: AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的IO模型

而实际上在Linux(Unix)操作系统中,共有五种 IO模型,分别是:阻塞IO模型、非阻塞IO模型、IO复用模型、信号驱动IO模型以及异步IO模型,而4种都是同步的,只有最后一种是异步的

推荐阅读 漫话:如何给女朋友解释什么是Linux的五种IO模型

2. NIO

概述

NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道

NIO与IO的区别

  • IO是面向流的,NIO是面向缓冲区的
  • IO流是阻塞的,NIO流是不阻塞的
  • NIO有选择器,而IO没有

我们先来用FileChannel来看下IO与NIO的写法有何区别

首先是传统IO

public void testIO() {
    InputStream inputStream = null;
    int mark = -1;
    StringBuilder stringBuilder = new StringBuilder();
    try {
        inputStream = new BufferedInputStream(new FileInputStream("io.txt"));
        byte[] buffer = new byte[1024];
        int read = inputStream.read(buffer);
        while (read != mark) {
            for (int i = 0; i < read; i++) {
                stringBuilder.append((char)buffer[i]);
            }
            read = inputStream.read(buffer);
        }

        System.out.printf("文件 io.txt 的内容是: %s%n", stringBuilder);
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if (inputStream != null) {
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

NIO的方式

public void testNIO() {
    FileInputStream fileInputStream = null;
    StringBuilder stringBuilder = new StringBuilder();
    try {
        fileInputStream = new FileInputStream("io.txt");
        FileChannel channel = fileInputStream.getChannel();
        // 分配空间
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        // 从channel中读取数据到buffer
        int read = channel.read(byteBuffer);

        while (read != mark) {
            // 翻转缓冲区,position设置为0,limit设置为之前position的值
            byteBuffer.flip();
            while (byteBuffer.hasRemaining()) {
                stringBuilder.append((char)byteBuffer.get());
            }

            byteBuffer.compact();
            read = channel.read(byteBuffer);
        }

        System.out.printf("文件 io.txt 的内容是: %s%n", stringBuilder);
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

NIO读写数据的方式

  • 创建一个缓冲区,然后请求通道读取数据
  • 创建一个缓冲区,填充数据,并要求通道写入数据

3. NIO的核心组件

Buffer 缓冲区

Buffer顾名思义:缓冲区,实际上是一个容器,一个连续数组。Channel提供从文件、网络读取数据的渠道,但是读写的数据都必须经过Buffer

Buffer通过几个变量来保存这个数据的当前位置状态:

  1. capacity:容量,缓冲区能容纳元素的数量
  2. position:当前位置,是缓冲区中下一次发生读取和写入操作的索引,当前位置通过大多数读写操作向前推进
  3. limit:界限,是缓冲区中最后一个有效位置之后下一个位置的索引
  4. mark:用于记录当前position的前一个位置或者默认是-1

操作buffer

  1. 首先给Buffer分配空间,以字节为单位
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  1. 向Buffer中写入数据:
数据从Channel到Buffer:channel.read(byteBuffer);
数据从Client到Buffer:byteBuffer.put(...);

3.从Buffer中读取数据:

数据从Buffer到Channel:channel.write(byteBuffer);
数据从Buffer到Server:byteBuffer.get(...);

NIO中的关键Buffer实现有

Buffer 数据类型
ByteBuffer byte
CharBuffer char
DoubleBuffer double
FloatBuffer float
IntBuffer int
LongBuffer long
ShortBuffer short
MappedByteBuffer -
HeapByteBuffer -
DirectByteBuffer -

Channel 通道

Channel和IO中的Stream(流)差不多。只不过Stream是单向的,如:InputStream, OutputStream.而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作

NIO中的Channel的主要实现有

Channel 用途
FileChannel IO
DatagramChannel UDP
SocketChannel TCP(client)
ServerSocketChannel TCP(server)

操作Channel

打开一个ServerSocketChannel通道

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

关闭ServerSocketChannel通道:

serverSocketChannel.close();

循环监听SocketChannel:

while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    // 将此通道设置为非阻塞,这就是异步
自由控制阻塞或非阻塞便是NIO的特性之一
    clientChannel.configureBlocking(false);
}

实例 SocketChannel

用NIO实现客户端

public void client() {
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    SocketChannel socketChannel = null;

    try {
        socketChannel = SocketChannel.open();
        // 配置是否阻塞
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("192.168.31.178", 8888));

        if (socketChannel.finishConnect()) {
            int i = 0;
            while (true) {
                String info = "this is " + i + "- th msg form client";
                byteBuffer.clear();
                byteBuffer.put(info.getBytes());
                byteBuffer.flip();
                while (byteBuffer.hasRemaining()) {
                    System.out.println(byteBuffer);
                    socketChannel.write(byteBuffer);
                }
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Selector

选择器是NIO的核心,它是channel的管理者 通过执行select()阻塞方法,监听是否有channel准备好 一旦有数据可读,此方法的返回值是SelectionKey的数量

所以服务端通常会死循环执行select()方法,直到有channl准备就绪,然后开始工作 每个channel都会和Selector绑定一个事件,然后生成一个SelectionKey的对象

需要注意的是: channel和Selector绑定时,channel必须是非阻塞模式 而FileChannel不能切换到非阻塞模式,因为它不是套接字通道,所以FileChannel不能和Selector绑定事件

在NIO中一共有四种事件:

  • SelectionKey.OP_CONNECT:连接事件
  • SelectionKey.OP_ACCEPT:接收事件
  • SelectionKey.OP_READ:读事件
  • SelectionKey.OP_WRITE:写事件

SelectionKey

当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含以下的属性:

  • interest集合
  • ready集合
  • Channel
  • Selector
  • 附加的对象(可选)

interest集合:interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合。

ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。

int readySet = selectionKey.readyOps();

可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

  • selectionKey.isAcceptable();
  • selectionKey.isConnectable();
  • selectionKey.isReadable();
  • selectionKey.isWritable();

从SelectionKey访问Channel和Selector很简单。如下:

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();

可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还可以在用register()方法向Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

4. NIO实战,用NIO实现TCP服务端与客户端

TCP服务端

public class NIOServer {
    public static void main(String[] args) {
        try {
            (new NIOServer()).initServer();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private final int port = 8888;

    public void initServer() throws IOException {
        // 创建通道管理对象Selector
        Selector selector = Selector.open();
        // 创建通道对象Channel
        ServerSocketChannel channel = ServerSocketChannel.open();
        // 将通道设置为非阻塞
        channel.configureBlocking(false);
        channel.socket().bind(new InetSocketAddress(this.port));

        // 将通道与通道管理器绑定,并为通道注册OP_ACCEPT事件(接收事件)
        // 注册事件后,当事件到达时,selector.select()会返回一个key,如果该事件没有到达selector.select()会一直阻塞
        channel.register(selector, SelectionKey.OP_ACCEPT);

        // 等待读取数据
        while (true) {
            selector.select();
            // 将通道中的数据放入集合中
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 已经拿到数据,将迭代器中的数据删除,避免出错
                iterator.remove();
                if (key.isAcceptable()) {
                    this.accept(key);
                } else if (key.isReadable()) {
                    this.read(key);
                } else if (key.isWritable() && key.isValid()) {
                    this.write(key);
                } else if (key.isConnectable()) {
                    System.out.println("========================= client 连接成功 =======================");
                }
            }
        }

    }

    private void accept(SelectionKey key) throws IOException {
        System.out.println("ServerSocketChannel 正在等待数据...");
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(key.selector(), SelectionKey.OP_READ);
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int read = socketChannel.read(byteBuffer);
        while (read > 0) {
            byteBuffer.flip();
            byte[] data = byteBuffer.array();
            String msg = new String(data);
            System.out.println("client msg is: " + msg);
            byteBuffer.clear();
            read = socketChannel.read(byteBuffer);
        }
        if (read == -1) {
            socketChannel.close();
        }
    }

    private void write(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.flip();
        // 如果通道中还有数据就把它写进ByteBuffer中
        while (byteBuffer.hasRemaining()) {
            socketChannel.write(byteBuffer);
        }
        byteBuffer.compact();
    }
}

TCP客户端

public class NIOClient {
    public static void main(String[] args) {
        try {
            (new NIOClient()).initClient();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private final int port = 8888;
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    public void initClient() throws IOException {
        Selector selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress(port));
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        // 等待读取数据
        while (true) {
            selector.select();
            // 将通道中的数据放入集合中
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 已经拿到数据,将迭代器中的数据删除,避免出错
                iterator.remove();
                if (key.isReadable()) {
                    this.read(key);
                } else if (key.isConnectable()) {
                    this.connect(key);
                }
            }
        }
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        socketChannel.read(byteBuffer);
        byte[] data = byteBuffer.array();
        String msg = new String(data);
        System.out.printf("服务端发的消息: %s%n", msg);
        socketChannel.close();
        key.selector().close();
    }

    private void connect(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        if (socketChannel.isConnectionPending()) {
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);
        String info = "this is what from client";
        byteBuffer.clear();
        byteBuffer.put(info.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
        socketChannel.close();
    }
}

我们的channel设置了非堵塞,有兴趣的朋友可以去修改下客户端塞数据的代码,多启动几个client,修改下msg循环塞数据,看是否有阻塞

NIO的缺点

  • NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
  • 需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序
  • 可靠性能力补齐,开发工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大
  • JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决

推荐文章