1. Netty系列——NIO
发布于 2022年 01月 17日 13:43
Netty系列
1. 三种IO模型
在Java中,有三种IO模型: BIO,NIO,AIO,我们先来看下他们的区别
- BIO(Blocking I/O):BIO也就是传统的同步阻塞IO模型,对应Java.io包,它提供了很多IO功能,比如输入输出流,对文件进行操作。在网络编程(Socket通信)中也同样进行IO操作。
- NIO(New I/O): NIO是一种同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架,对应 java.nio 包,提供了 Channel , Selector,Buffer等抽象。在NIO中, 抛弃了传统的 I/O流, 而是引入了Channel和Buffer的概念. 在NIO中, 只能从Channel中读取数据到Buffer中或将数据 Buffer 中写入到 Channel
- AIO: AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的IO模型
而实际上在Linux(Unix)操作系统中,共有五种 IO模型,分别是:阻塞IO模型、非阻塞IO模型、IO复用模型、信号驱动IO模型以及异步IO模型,而4种都是同步的,只有最后一种是异步的
推荐阅读 漫话:如何给女朋友解释什么是Linux的五种IO模型
2. NIO
概述
NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道
NIO与IO的区别
- IO是面向流的,NIO是面向缓冲区的
- IO流是阻塞的,NIO流是不阻塞的
- NIO有选择器,而IO没有
我们先来用FileChannel来看下IO与NIO的写法有何区别
首先是传统IO
public void testIO() {
InputStream inputStream = null;
int mark = -1;
StringBuilder stringBuilder = new StringBuilder();
try {
inputStream = new BufferedInputStream(new FileInputStream("io.txt"));
byte[] buffer = new byte[1024];
int read = inputStream.read(buffer);
while (read != mark) {
for (int i = 0; i < read; i++) {
stringBuilder.append((char)buffer[i]);
}
read = inputStream.read(buffer);
}
System.out.printf("文件 io.txt 的内容是: %s%n", stringBuilder);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
NIO的方式
public void testNIO() {
FileInputStream fileInputStream = null;
StringBuilder stringBuilder = new StringBuilder();
try {
fileInputStream = new FileInputStream("io.txt");
FileChannel channel = fileInputStream.getChannel();
// 分配空间
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 从channel中读取数据到buffer
int read = channel.read(byteBuffer);
while (read != mark) {
// 翻转缓冲区,position设置为0,limit设置为之前position的值
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
stringBuilder.append((char)byteBuffer.get());
}
byteBuffer.compact();
read = channel.read(byteBuffer);
}
System.out.printf("文件 io.txt 的内容是: %s%n", stringBuilder);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
NIO读写数据的方式
- 创建一个缓冲区,然后请求通道读取数据
- 创建一个缓冲区,填充数据,并要求通道写入数据
3. NIO的核心组件
Buffer 缓冲区
Buffer顾名思义:缓冲区,实际上是一个容器,一个连续数组。Channel提供从文件、网络读取数据的渠道,但是读写的数据都必须经过Buffer
Buffer通过几个变量来保存这个数据的当前位置状态:
- capacity:容量,缓冲区能容纳元素的数量
- position:当前位置,是缓冲区中下一次发生读取和写入操作的索引,当前位置通过大多数读写操作向前推进
- limit:界限,是缓冲区中最后一个有效位置之后下一个位置的索引
- mark:用于记录当前position的前一个位置或者默认是-1
操作buffer
- 首先给Buffer分配空间,以字节为单位
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
- 向Buffer中写入数据:
数据从Channel到Buffer:channel.read(byteBuffer);
数据从Client到Buffer:byteBuffer.put(...);
3.从Buffer中读取数据:
数据从Buffer到Channel:channel.write(byteBuffer);
数据从Buffer到Server:byteBuffer.get(...);
NIO中的关键Buffer实现有
Buffer | 数据类型 |
---|---|
ByteBuffer | byte |
CharBuffer | char |
DoubleBuffer | double |
FloatBuffer | float |
IntBuffer | int |
LongBuffer | long |
ShortBuffer | short |
MappedByteBuffer | - |
HeapByteBuffer | - |
DirectByteBuffer | - |
Channel 通道
Channel和IO中的Stream(流)差不多。只不过Stream是单向的,如:InputStream, OutputStream.而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作
NIO中的Channel的主要实现有
Channel | 用途 |
---|---|
FileChannel | IO |
DatagramChannel | UDP |
SocketChannel | TCP(client) |
ServerSocketChannel | TCP(server) |
操作Channel
打开一个ServerSocketChannel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
关闭ServerSocketChannel通道:
serverSocketChannel.close();
循环监听SocketChannel:
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
// 将此通道设置为非阻塞,这就是异步
自由控制阻塞或非阻塞便是NIO的特性之一
clientChannel.configureBlocking(false);
}
实例 SocketChannel
用NIO实现客户端
public void client() {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
// 配置是否阻塞
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("192.168.31.178", 8888));
if (socketChannel.finishConnect()) {
int i = 0;
while (true) {
String info = "this is " + i + "- th msg form client";
byteBuffer.clear();
byteBuffer.put(info.getBytes());
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.println(byteBuffer);
socketChannel.write(byteBuffer);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
Selector
选择器是NIO的核心,它是channel的管理者 通过执行select()阻塞方法,监听是否有channel准备好 一旦有数据可读,此方法的返回值是SelectionKey的数量
所以服务端通常会死循环执行select()方法,直到有channl准备就绪,然后开始工作 每个channel都会和Selector绑定一个事件,然后生成一个SelectionKey的对象
需要注意的是: channel和Selector绑定时,channel必须是非阻塞模式 而FileChannel不能切换到非阻塞模式,因为它不是套接字通道,所以FileChannel不能和Selector绑定事件
在NIO中一共有四种事件:
- SelectionKey.OP_CONNECT:连接事件
- SelectionKey.OP_ACCEPT:接收事件
- SelectionKey.OP_READ:读事件
- SelectionKey.OP_WRITE:写事件
SelectionKey
当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。这个对象包含以下的属性:
- interest集合
- ready集合
- Channel
- Selector
- 附加的对象(可选)
interest集合:interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合。
ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。
int readySet = selectionKey.readyOps();
可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:
- selectionKey.isAcceptable();
- selectionKey.isConnectable();
- selectionKey.isReadable();
- selectionKey.isWritable();
从SelectionKey访问Channel和Selector很简单。如下:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
还可以在用register()方法向Selector注册Channel的时候附加对象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
4. NIO实战,用NIO实现TCP服务端与客户端
TCP服务端
public class NIOServer {
public static void main(String[] args) {
try {
(new NIOServer()).initServer();
} catch (IOException e) {
e.printStackTrace();
}
}
private final int port = 8888;
public void initServer() throws IOException {
// 创建通道管理对象Selector
Selector selector = Selector.open();
// 创建通道对象Channel
ServerSocketChannel channel = ServerSocketChannel.open();
// 将通道设置为非阻塞
channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(this.port));
// 将通道与通道管理器绑定,并为通道注册OP_ACCEPT事件(接收事件)
// 注册事件后,当事件到达时,selector.select()会返回一个key,如果该事件没有到达selector.select()会一直阻塞
channel.register(selector, SelectionKey.OP_ACCEPT);
// 等待读取数据
while (true) {
selector.select();
// 将通道中的数据放入集合中
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 已经拿到数据,将迭代器中的数据删除,避免出错
iterator.remove();
if (key.isAcceptable()) {
this.accept(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable() && key.isValid()) {
this.write(key);
} else if (key.isConnectable()) {
System.out.println("========================= client 连接成功 =======================");
}
}
}
}
private void accept(SelectionKey key) throws IOException {
System.out.println("ServerSocketChannel 正在等待数据...");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(key.selector(), SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
while (read > 0) {
byteBuffer.flip();
byte[] data = byteBuffer.array();
String msg = new String(data);
System.out.println("client msg is: " + msg);
byteBuffer.clear();
read = socketChannel.read(byteBuffer);
}
if (read == -1) {
socketChannel.close();
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.flip();
// 如果通道中还有数据就把它写进ByteBuffer中
while (byteBuffer.hasRemaining()) {
socketChannel.write(byteBuffer);
}
byteBuffer.compact();
}
}
TCP客户端
public class NIOClient {
public static void main(String[] args) {
try {
(new NIOClient()).initClient();
} catch (IOException e) {
e.printStackTrace();
}
}
private final int port = 8888;
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
public void initClient() throws IOException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(port));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 等待读取数据
while (true) {
selector.select();
// 将通道中的数据放入集合中
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 已经拿到数据,将迭代器中的数据删除,避免出错
iterator.remove();
if (key.isReadable()) {
this.read(key);
} else if (key.isConnectable()) {
this.connect(key);
}
}
}
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(byteBuffer);
byte[] data = byteBuffer.array();
String msg = new String(data);
System.out.printf("服务端发的消息: %s%n", msg);
socketChannel.close();
key.selector().close();
}
private void connect(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
if (socketChannel.isConnectionPending()) {
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);
String info = "this is what from client";
byteBuffer.clear();
byteBuffer.put(info.getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
socketChannel.close();
}
}
我们的channel设置了非堵塞,有兴趣的朋友可以去修改下客户端塞数据的代码,多启动几个client,修改下msg循环塞数据,看是否有阻塞
NIO的缺点
- NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
- 需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序
- 可靠性能力补齐,开发工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大
- JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决