NIO简单使用

之前总结了一篇BIO简单使用,BIO在长链接场景,一个连接占用一个线程。在短连接场景,虽然可以通过线程池优化,但是CPU会在不同的CPU不停的切换。因此BIO并不适用于高并发。Oracle在JDK1.4中引入了NIO的SDK,NIO与BIO不同之处在于NIO使用Linux中的IO多路复用技术。IO多路复用技术是将多个阻塞IO复用到同一个Selector阻塞上。本篇文章主要记录NIO的学习过程。

IO多路复用

这里主要分析Linux下的IO多用复用技术

Select

Select流程多路复用流程如下:

  1. 将需要监控的fd放入fd_set数组,并将fd数组从用户态复制到内核态。
  2. 内核通过轮训的方式将有事件的fd返回给应用程序
  3. 应用程序使用轮训的方式对fd进行操作

Select模型模型缺陷如下

  1. select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024。可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
  2. 从用户态到内核态,内核态到用户态的复制开销很多。
  3. 不管有无活跃Socket,都会进行轮询,当连接较多时,效率低下。

    poll

    poll模型与Select模型不同之处在于,select模型传递的是数组,而poll模型传递的是链表,这样就绕开了Linux系统fd_set的限制。

    epoll

    epoll的大致流程图如下
  4. 创建内核事件表(epoll_create)。
  5. 添加或移出监控的fd和事件类型(epoll_ctl)。
  6. epoll_wait 绑定回调事件
  7. 将活跃事件加入到事件队列
  8. 应用程序处理回调事件队列

相对与select和poll,epoll有如下优点

  1. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。
  2. 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。
  3. 效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

    服务端开发

    Java NIO使用Epoll模型,我们先写代码看看NIO和BIO的区别
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    /**
    * @author 姜陶
    * @date 2022/5/13 10:16
    * @describe NIO服务端
    **/
    public class PushServerSocketChannel implements Runnable {

    private Selector selector;

    public PushServerSocketChannel(Integer port){
    int port1 = 8080;
    if (port!=null) {
    port1 = port;
    }
    try{
    // 创建选择器
    selector = Selector.open();
    // 创建server对象
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    // 配置为非阻塞模式
    serverSocketChannel.configureBlocking(false);
    // 绑定地址和端口,指定阻塞队列的最大长度
    serverSocketChannel.socket().bind(new InetSocketAddress(port1),1024);
    // 将serverSocketChannel绑定到选择器上
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }catch(Exception e){
    e.printStackTrace();
    }
    }

    @Override
    public void run() {
    while (true){
    try {
    // 阻塞操作,当有io变化才会返回
    selector.select();
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> it = keys.iterator();
    while (it.hasNext()){
    SelectionKey key = it.next();
    handle(key);
    it.remove();
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    public void handle(SelectionKey key) throws IOException {
    if (key.isValid()){
    // 新接入的客户端
    if (key.isAcceptable()){
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    SocketChannel sc = ssc.accept();
    sc.configureBlocking(false);
    sc.register(selector,SelectionKey.OP_READ);
    }
    // 此通道已经准备好读取数据
    if (key.isReadable()){
    SocketChannel sc = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int dataLength = sc.read(buffer);
    if (dataLength>0){
    // 有数据,接收消息
    buffer.flip();
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    String msg = new String(bytes);
    System.out.println("接收到客户端消息: "+msg);
    // 后端响应前端数据
    String response = "我接受到你的数据了.";
    byte[] responseMsg = response.getBytes(StandardCharsets.UTF_8);
    ByteBuffer responseBuf = ByteBuffer.allocate(responseMsg.length);
    responseBuf.put(responseMsg);
    responseBuf.flip();
    sc.write(responseBuf);
    }

    }
    }
    }
    }
    BIO的服务端使用ServerSocket,ServerSocket使用的是Stream的方式,输入流和输出流,而NIO使用SocketChannel的方式,代表通道,可同时进行输入输出的全双工模式。主要流程如下
  4. 指定端口创建ServerSocketChannel
  5. 将channel绑定到Selector选择器上,并定义为接受连接OP_ACCEPT
  6. 选择器循环阻塞获取事件队列
  7. 如果有新接入的客户端,绑定客户端的读事件
  8. 如果客户端有写入,使用ByteBuffer读取消息并响应客户端数据

客户端开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package com.mayahx.nio.server;

import org.springframework.util.StringUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/**
* @author Jon
* @desc NIO客户端
* @date 2022年05月14日 9:42
*/
public class PushClientSocketChannel implements Runnable{

private static final String host = "127.0.0.1";
private static final int port = 8080;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop = false;

public PushClientSocketChannel() {
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
}catch (Exception e){
e.printStackTrace();
}

}

@Override
public void run() {
try {
handleConnect();
while (!stop){
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key;
while (it.hasNext()){
key = it.next();
it.remove();
handleInput(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}

}


public void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel channel = (SocketChannel) key.channel();
// 连接成功处理
if (key.isConnectable()){
if (channel.finishConnect()){
channel.register(selector,SelectionKey.OP_READ);
handleOutput(channel,"我连接成功了...");
}
}
// 可读的时候处理
if (key.isReadable()){
// 读取数据 kb
ByteBuffer readData = ByteBuffer.allocate(1024);
int byteLength = channel.read(readData);
if (byteLength>0){
readData.flip();
byte[] data = new byte[readData.remaining()];
readData.get(data);
String msg = new String(data, StandardCharsets.UTF_8);
System.out.println("接收到服务端消息: "+msg);
handleOutput(channel,msg);
}
}
}
}

/**
* 执行连接服务器操作
* 1. 如果连接成功,则以read方式挂载到selector
* 2. 如果连接失败,则以connect的方式挂载到selector
* @throws IOException
*/
private void handleConnect() throws IOException {
if (socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector, SelectionKey.OP_READ);
}else {
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}

/**
* 发送消息
* @param msg 消息内容
* @throws IOException
*/
public void handleOutput(SocketChannel channel,String msg) throws IOException {
if (StringUtils.hasLength(msg)) {
byte[] data = msg.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(data.length);
buffer.put(data);
buffer.flip();
channel.write(buffer);
}
}
}

客户端工作流程如下

  1. 指定服务端IP及端口启动SocketChannel
  2. 将SocketChannel以OP_CONNECT事件的方式绑定到选择器上
  3. 当连接成功后,将OP_READ事件绑定到选择器上
  4. 循环读取数据并响应服务端的数据

测试

我们启动测试一下


本篇文章记录了Java最基本的NIO操作,没有定义协议,也没有解决粘包及拆包问题。后面有时间一起解决。从上面我们看出select操作还是阻塞的,下一篇文章记录AIO异步非阻塞的是怎么使用。

作者

Labradors

发布于

2022-05-30

更新于

2022-05-30

许可协议

评论