NIO基础

在Java中,提供了一些关于使用IO的API,可以供开发者来读写外部数据和文件,我们称这些API为Java IO。IO是Java中比较重要知识点,且比较难学习的知识点。并且随着Java的发展为提供更好的数据传输性能,目前有三种IO共存;分别是BIO、NIO和AIO。

BNAIO

Java BIO[Blocking I/O] | 同步阻塞I/O模式

BIO 全称 Block-IO ,是一种同步且阻塞的通信模式。是一个比较传统的通信方式,模式简单,使用方便。但并发处理能力低,通信耗时,依赖网速。

Java AIO[Asynchronous I/O] | 异步非阻塞I/O模型

Java AIO,全称 Asynchronous IO,是异步非阻塞的IO。是一种非阻塞异步的通信模式。在NIO的基础上引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。

Java NIO[Non Blocking I/O] | 同步非阻塞模式

  • Java NIO,全称 Non-Block IO ,是Java SE 1.4版以后,针对网络传输效能优化的新功能。是一种非阻塞同步的通信模式。
  • NIO 与原来的 I/O 有同样的作用和目的, 他们之间最重要的区别是数据打包和传输的方式。原来的 I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。
  • 面向流的 I/O 系统一次一个字节地处理数据。一个输入流产生一个字节的数据,一个输出流消费一个字节的数据。
  • 面向块的 I/O 系统以块的形式处理数据。每一个操作都在一步中产生或者消费一个数据块。按块处理数据比按(流式的)字节处理数据要快得多。但是面向块的 I/O - 缺少一些面向流的 I/O 所具有的优雅性和简单性。

NIO 三大组件

Channel & Buffer

Channel 类似于 stream 流,它是读写数据的双向管道,Buffer 相当于一个内存缓冲区,用来暂存 读取或写入 Channel 的数据。可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层。

1
2
3
graph LR
channel --> buffer
buffer --> channel

常见的 Channel 有

  • FileChannel 用于文件传输
  • DatagramChannel 用于 UDP 报文传输
  • SocketChannel 用于 TCP 字节流传输
  • ServerSocketChannel 用于 TCP 服务端字节流传输

buffer 则用来缓冲读写数据,常见的 buffer 有

  • ByteBuffer

    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer

  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

Selector

selector 单从字面意思不好理解,需要结合服务器的设计演化来理解它的用途

多线程版设计

一个 thread 管理一个 socket。

1
2
3
4
5
6
graph TD
subgraph 多线程版
t1(thread1) --> s1(socket1)
t2(thread2) --> s2(socket2)
t3(thread3) --> s3(socket3)
end

⚠️ 多线程版缺点

  • 内存占用高
  • 线程上下文切换成本高
  • 只适合连接数少的场景

线程池版设计

使用线程池让 某一个 thread 能管理多个 socket ,但同一时刻只能处理一个 socket 。

1
2
3
4
5
6
7
graph TD
subgraph 线程池版
t4(thread1) --> s4(socket1)
t5(thread2) --> s5(socket2)
t4(thread1) -.-> s6(socket3)
t5(thread2) -.-> s7(socket4)
end

⚠️ 线程池版缺点

  • 阻塞模式下,线程仅能处理一个 socket 连接
  • 仅适合短连接场景

selector 版设计 线程的实现

selector 的作用就是配合一个线程来管理多个 channel,监听这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。如果一个 channel 停滞了,但是 seletor 又监听到另一个 channel 的事件,就会通知线程处理。

适合连接数特别多,但流量低的场景(low traffic)。

1
2
3
4
5
6
7
graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end

调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理。

ByteBuffer

ByteBuffer 正确使用姿势

  1. 向 buffer 写入数据,例如调用 channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从 buffer 读取数据,例如调用 buffer.get()
  4. 调用 clear() 或 compact() 切换至写模式
  5. 重复 1~4 步骤

ByteBuffer1

ByteBuffer2

ByteBuffer 结构

ByteBuffer 有以下重要属性

  • capacity 容量
  • position 当前写入/读取位置
  • limit 写入限制/读取限制

一开始,处于写模式

ByteBuffer3

写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态

ByteBuffer4

flip 切换读模式后,position 切换为读取位置,一般是起始位置,limit 切换为读取限制,一般是切换前 position 写入位置

ByteBuffer5

读取 4 个字节后,状态

ByteBuffer6

clear 清空 buffer,postion 回到起始位置,limit 也切换为写入限制,此时 buffer 处于写模式

但如果 buffer 没有读完的话,clear 操作会导致未读数据丢失

ByteBuffer7

compact 方法,是把未读完的部分向前压缩,然后切换至写模式

ByteBuffer8

💡 调试工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import static io.netty.util.internal.MathUtil.isOutOfBounds;
import static io.netty.util.internal.StringUtil.NEWLINE;

/**
* @author Ray
* @date 2023/4/22 23:01
* @description
*/
public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];

static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}

int i;

// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}

// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}

// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}

// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}

// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}

/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
System.out.println();
buffer.limit(oldlimit);
}

/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}

private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");

final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;

// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;

// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");

// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}

// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");

// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}

dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}

private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}

public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}

ByteBuffer 常见方法

分配空间

可以使用 allocate 方法为 ByteBuffer 分配空间,其它 buffer 类也有该方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TestByteBufferAllocate {
public static void main(String[] args) {
System.out.println(ByteBuffer.allocate(10).getClass());
System.out.println(ByteBuffer.allocateDirect(10).getClass());

/*
class java.nio.HeapByteBuffer
- Java堆内存,读写效率低,受 GC 影响,标记-复制以及标记-整理会导致内存移动
class java.nio.DirectByteBuffer
- 直接内存,读写效率高(少一次拷贝),不会受 GC 影响,但属于系统内存,分配效率较慢,使用不当可能导致 OOM 内存泄漏,需要合理释放
*/
}

}

向 buffer 写入数据

有两种办法

  • 调用 channel 的 read 方法
  • 调用 buffer 自己的 put 方法
1
2
int readBytes = channel.read(buf);
buf.put((byte)127);

从 buffer 读取数据

同样有两种办法

  • 调用 channel 的 write 方法
  • 调用 buffer 自己的 get 方法
1
2
int writeBytes = channel.write(buf);
byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

mark 和 reset

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置

注意

rewind 和 flip 都会清除 mark 位置

字符串与 ByteBuffer 互转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author Ray
* @date 2023/4/23 00:08
* @description
*/
public class TestByteBufferString {
public static void main(String[] args) {
// 1. 字符串转为 字节数组 放入 ByteBuffer
ByteBuffer byteBuffer= ByteBuffer.allocate(16);
byteBuffer.put("hello".getBytes());
debugAll(byteBuffer);

// 2. Charset 将字符串编码为对应容量的 ByteBuffer,并且转换后就是读模式
ByteBuffer charsetBuffer = StandardCharsets.UTF_8.encode("hello");
// ByteBuffer charsetBuffer = Charset.forName("UTF-8").encode("hello");
debugAll(charsetBuffer);

// 3. wrap 将字节数组包装为对应容量的 ByteBuffer,并且转换后就是读模式
ByteBuffer wrapBuffer = ByteBuffer.wrap("hello".getBytes());
debugAll(wrapBuffer);

// 将 ByteBuffer 转换为对应的字符串
String str1 = StandardCharsets.UTF_8.decode(charsetBuffer).toString();
System.out.println(str1);

byteBuffer.flip();
String str2 = StandardCharsets.UTF_8.decode(byteBuffer).toString();
System.out.println(str2);

}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........|
+--------+-------------------------------------------------+----------------+

+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+

+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+

hello
hello

⚠️ Buffer 的线程安全

Buffer 是非线程安全的

Scattering Reads 分散读

分散读取,有一个文本文件 words.txt

1
onetwothree

使用如下方式读取,可以将数据填充至多个 buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestScatteringReads {
public static void main(String[] args) {
try (FileChannel channel = new RandomAccessFile("demo-02/words.txt", "r").getChannel()) {
ByteBuffer buffer1 = ByteBuffer.allocate(3);
ByteBuffer buffer2 = ByteBuffer.allocate(3);
ByteBuffer buffer3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{buffer1,buffer2,buffer3});
buffer1.flip();
buffer2.flip();
buffer3.flip();
debugAll(buffer1);
debugAll(buffer2);
debugAll(buffer3);
} catch (IOException e) {
}
}
}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [3]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6f 6e 65 |one |
+--------+-------------------------------------------------+----------------+

+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [3]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 77 6f |two |
+--------+-------------------------------------------------+----------------+

+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 68 72 65 65 |three |
+--------+-------------------------------------------------+----------------+

Gathering Writes 集中写

使用如下方式写入,可以将多个 buffer 的数据填充至 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestGatheringWrites {
public static void main(String[] args) {
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("世界");

try (FileChannel channel = new RandomAccessFile("demo-02/words.txt", "rw").getChannel()) {
channel.position(11);
channel.write(new ByteBuffer[]{buffer1,buffer2,buffer3});
} catch (IOException e) {
}
}
}

文件内容

1
onetwothreehelloworld世界

练习:网络粘包半包问题

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为

  • Hello,world\n
  • I’m Ray\n
  • How are you?\n

变成了下面的两个 byteBuffer

  • Hello,world\nI’m Ray\nHo 粘包 将多个数据囤积起来发送,提高效率
  • w are you?\n 半包 由于窗口或者缓冲区大小限制,无法一次发送太多数据,所以延迟到下一次发送

现在要求你编写程序,将错乱的数据恢复成原始的按 \n 分隔的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class TestByteBufferExam {

/**
* 网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
* 但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
* ● Hello,world\n
* ● I'm Ray\n
* ● How are you?\n
* 变成了下面的两个 byteBuffer
* ● Hello,world\nI'm Ray\nHo 粘包 将多个数据囤积起来发送,提高效率
* ● w are you?\n 半包,由于窗口或者缓冲区大小限制,无法一次发送太多数据,所以延迟到下一次发送
* 现在要求你编写程序,将错乱的数据恢复成原始的按 \n 分隔的数据
*/
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello,world\nI'm Ray\nHo".getBytes());
split(source);
source.put("w are you?\n".getBytes());
split(source);
}

private static void split(ByteBuffer source){
source.flip();

for (int i = 0; i < source.limit(); i++) {
// 遍历到换行符即找到一条完整消息
if(source.get(i)=='\n'){
int size = i - source.position() + 1;
// 将这条完整消息存入新的 ByteBuffer
// 分配对应容量的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(size);

// 从 source 读,向 target 写
for (int j = 0; j < size; j++) {
target.put(source.get(j));
}

debugAll(target);
}
}

source.compact(); // 未读数据向前移动
}
}

结果

网络粘包闭包

文件编程

FileChannel

⚠️ FileChannel 工作模式

FileChannel 只能工作在阻塞模式下

获取

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

读取

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾

1
int readBytes = channel.read(buffer);

写入

1
2
3
4
5
6
7
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式

while(buffer.hasRemaining()) {
channel.write(buffer);
}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

关闭

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

位置

获取当前位置

1
long pos = channel.position();

设置当前位置

1
2
long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)

大小

使用 size 方法获取文件的大小

强制写入

操作系统出于性能的考虑,会将数据缓存,等到 channel 关闭后才写入磁盘,而不是写一次就立刻写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘。

两个 Channel 传输数据

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestChannelTransferTo {
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("demo-02/data.txt").getChannel();
FileChannel to = new FileOutputStream("demo-02/to.txt").getChannel();
) {
// 效率高,底层利用操作系统的零拷贝进行优化,一次最多传输 2G 数据
from.transferTo(0,from.size(),to);
} catch (IOException e) {
e.printStackTrace();
}
}
}

超过 2G 数据大小的文件传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestChannelTransferTo {
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("demo-02/data.txt").getChannel();
FileChannel to = new FileOutputStream("demo-02/to.txt").getChannel();
) {
// 效率高,底层利用操作系统的零拷贝进行优化,一次最多传输 2G 数据
long size = from.size();
// left 代表还剩多少字节
for (long left = size;left>0;){
left -= from.transferTo((size - left), left,to);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

Path

JDK7 引入了 Path 和 Paths 类

  • Path 用来表示文件路径
  • Paths 是工具类,用来获取 Path 实例
1
2
3
4
5
6
7
Path source = Paths.get("1.txt"); // 相对路径 使用 user.dir 环境变量来定位 1.txt

Path source = Paths.get("d:\\1.txt"); // 绝对路径 代表了 d:\1.txt

Path source = Paths.get("d:/1.txt"); // 绝对路径 同样代表了 d:\1.txt

Path projects = Paths.get("d:\\data", "projects"); // 代表了 d:\data\projects
  • . 代表了当前路径
  • .. 代表了上一级路径

例如目录结构如下

1
2
3
4
5
d:
|- data
|- projects
|- a
|- b

代码

1
2
3
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path);
System.out.println(path.normalize()); // 正常化路径

输出

1
2
d:\data\projects\a\..\b
d:\data\projects\b

Files

检查文件是否存在

1
2
Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));

创建一级目录

1
2
Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
  • 如果目录已存在,会抛异常 FileAlreadyExistsException
  • 不能一次创建多级目录,否则会抛异常 NoSuchFileException

创建多级目录用

1
2
Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);

拷贝文件

1
2
3
4
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");

Files.copy(source, target);
  • 如果文件已存在,会抛异常 FileAlreadyExistsException

如果希望用 source 覆盖掉 target,需要用 StandardCopyOption 来控制

1
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);

移动文件

1
2
3
4
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");

Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  • StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性

删除文件

1
2
3
Path target = Paths.get("helloword/target.txt");

Files.delete(target);
  • 如果文件不存在,会抛异常 NoSuchFileException

删除目录

1
2
3
Path target = Paths.get("helloword/d1");

Files.delete(target);
  • 如果目录还有内容,会抛异常 DirectoryNotEmptyException

遍历目录文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static void func1() throws IOException {
AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();

Files.walkFileTree(Paths.get("/Users/ray/goland/sdk/go1.18"),new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("===>"+dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});

System.out.println("dirCount : " + dirCount);
System.out.println("fileCount : " + fileCount);
}

统计 .go 文件数目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static void fun2() throws IOException {
AtomicInteger goCount = new AtomicInteger();
Files.walkFileTree(Paths.get("/Users/ray/goland/sdk/go1.18"),new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (file.toString().endsWith(".go")) {
System.out.println(file);
goCount.incrementAndGet();
}
return super.visitFile(file, attrs);
}
});

System.out.println("goCount : " + goCount);
}

删除多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static void func3() throws IOException {
Files.walkFileTree(Paths.get("Path"),new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println("===> 进入" + dir);
return super.preVisitDirectory(dir, attrs);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
Files.delete(file);
return super.visitFile(file, attrs);
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
System.out.println("===> 退出" + dir);
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});
}

拷贝多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TestFilesCopy {
public static void main(String[] args) throws IOException {
long start = System.currentTimeMillis();
String source = "D:\\source";
String target = "D:\\target";

Files.walk(Paths.get(source)).forEach(path -> {
try {
String targetName = path.toString().replace(source, target);
// 是目录
if (Files.isDirectory(path)) {
Files.createDirectory(Paths.get(targetName));
}
// 是普通文件
else if (Files.isRegularFile(path)) {
Files.copy(path, Paths.get(targetName));
}
} catch (IOException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}

网络编程

Socket编程

阻塞 vs 非阻塞

阻塞

  • 阻塞模式下,相关方法都会导致线程暂停

    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
    • SocketChannel.read 会在没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持

  • 但多线程下,有新的问题,体现在以下方面

    • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {

// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);

// 1. 创建服务器通道
ServerSocketChannel server = ServerSocketChannel.open();

// 2. 绑定监听端口
server.bind(new InetSocketAddress(8080));

// 连接集合
List<SocketChannel> channelList = new ArrayList<>();

while (true) {
log.debug("connecting...");
// 3. 建立与客户端之间的连接, SocketChannel 用于客户端与服务器之间通信
SocketChannel socket = server.accept(); // 阻塞方法,没有客户端连接,线程停滞
log.debug("connected... {}",socket);
channelList.add(socket);
// 4. 接收 客户端 数据
for (SocketChannel channel : channelList){
log.debug("before read... {}",socket);
channel.read(buffer); // 阻塞方法,等待读取数据,没有数据,线程停滞
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read... {}",channel);
}
}
}
}

客户端

1
2
3
4
5
6
7
8
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel socket = SocketChannel.open();
socket.connect(new InetSocketAddress("localhost",8080));

System.out.println("waiting...");
}
}

非阻塞

  • 非阻塞模式下,相关方法都不会让线程暂停

    • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
    • SocketChannel.read 在没有数据可读时,会返回 0,但线程非阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu

  • 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {

// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);

// 1. 创建服务器通道
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false); // 非阻塞模式

// 2. 绑定监听端口
server.bind(new InetSocketAddress(8080));

// 连接集合
List<SocketChannel> channelList = new ArrayList<>();

while (true) {
// log.debug("connecting...");
// 3. 建立与客户端之间的连接, SocketChannel 用于客户端与服务器之间通信
SocketChannel socket = server.accept();
// 阻塞方法,没有客户端连接,线程停滞
// 非阻塞,没有客户端连接,线程继续执行,但 socket 为 null
if (socket!=null) {
log.debug("connected... {}",socket);
socket.configureBlocking(false); // 非阻塞模式
channelList.add(socket);
}
// 4. 接收 客户端 数据
for (SocketChannel channel : channelList){
// log.debug("before read... {}",socket);
int read = channel.read(buffer);
// 阻塞方法,等待读取数据,没有数据,线程停滞
// 非阻塞,没有读取到数据,线程继续执行, read 返回 0
if (read > 0) {
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read... {}",channel);
}
}
}
}
}

客户端代码不变

多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证

    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
      • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

Selector

1
2
3
4
5
6
7
graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

创建

1
Selector selector = Selector.open();

绑定 Channel 事件

也称之为注册事件,绑定的事件 selector 才会关心

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
  • 绑定的事件类型可以有

    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接请求时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

方法1:阻塞直到绑定事件发生

1
int count = selector.select();

方法2:阻塞直到绑定事件发生,或是超时(时间单位为 ms)

1
int count = selector.select(long timeout);

方法3:不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

1
int count = selector.selectNow();

💡 select 何时不阻塞

  • 事件发生时

    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时
  • 调用 selector.wakeup()

  • 调用 selector.close()
  • selector 所在线程 interrupt

处理 accept 事件

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {

// 1. 创建 Selector,管理多个 Channel
Selector selector = Selector.open();

ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false); // 非阻塞模式

// 2. 建立 Selector 和 Channel 的联系 (注册)
// 通过 SelectionKey 定位 事件所属的 Channel
SelectionKey serverKey = server.register(selector, 0, null);
// 服务端 key 只关注 accept 事件
serverKey.interestOps(SelectionKey.OP_ACCEPT);

log.debug("register key {}",serverKey);

server.bind(new InetSocketAddress(8080));

while (true) {
// 3. select 方法 : 监听事件,没有事件发生,线程阻塞,有事件,线程向下执行
// 之前的事件发生但未处理,线程也不会阻塞,继续执行
selector.select();
// 4. 处理事件 : selectedKeys() 包含所有发生的事件
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
log.debug("key {}",key);
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
log.debug("{}",socketChannel);
// key.cancel(); 事件发生后,要么处理,要么取消
}
}
}
}

客户端

1
2
3
4
5
6
7
8
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel socket = SocketChannel.open();
socket.connect(new InetSocketAddress("localhost",8080));

System.out.println("waiting...");
}
}

💡 事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 NIO 底层使用的是水平触发

处理 read 事件

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {

// 1. 创建 Selector,管理多个 Channel
Selector selector = Selector.open();

ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false); // 非阻塞模式

// 2. 建立 Selector 和 Channel 的联系 (注册)
// 通过 SelectionKey 定位 事件所属的 Channel
SelectionKey serverKey = server.register(selector, 0, null);
// 服务端 key 只关注 accept 事件
serverKey.interestOps(SelectionKey.OP_ACCEPT);

log.debug("register key {}",serverKey);

server.bind(new InetSocketAddress(8080));

while (true) {
// 3. select 方法 : 监听事件,没有事件发生,线程阻塞,有事件,线程向下执行
// 之前的事件发生但未处理,线程也不会阻塞,继续执行
selector.select();
// 4. 处理事件 : selectedKeys() 包含所有发生的事件
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
log.debug("key {}",key);

// 5. 根据事件类型分别处理
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
SelectionKey socketKey = socketChannel.register(selector, 0, null);
socketKey.interestOps(SelectionKey.OP_READ);
log.debug("{}",socketChannel);
} else if (key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel(); // 触发事件的 channel
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
buffer.flip();
debugRead(buffer);
}
// 处理完毕,必须将事件移除
// 否则再次遍历时,对应的 channel 事件已经处理,为 null,就会报空指针异常
keys.remove();
// key.cancel();
}
}
}
}

客户端不变

开启两个客户端,修改一下发送文字,输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
19:47:25 [DEBUG] [main] c.n.d.nio.Server - register key sun.nio.ch.SelectionKeyImpl@4fccd51b
19:47:42 [DEBUG] [main] c.n.d.nio.Server - key sun.nio.ch.SelectionKeyImpl@4fccd51b
19:47:42 [DEBUG] [main] c.n.d.nio.Server - java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:58800]
19:48:06 [DEBUG] [main] c.n.d.nio.Server - key sun.nio.ch.SelectionKeyImpl@4fccd51b
19:48:06 [DEBUG] [main] c.n.d.nio.Server - java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:58805]
19:48:20 [DEBUG] [main] c.n.d.nio.Server - key sun.nio.ch.SelectionKeyImpl@1c2c22f3
+--------+-------------------- read -----------------------+----------------+
position: [0], limit: [6]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 21 |hello! |
+--------+-------------------------------------------------+----------------+
19:48:34 [DEBUG] [main] c.n.d.nio.Server - key sun.nio.ch.SelectionKeyImpl@1c2c22f3
+--------+-------------------- read -----------------------+----------------+
position: [0], limit: [6]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 6f 72 6c 64 21 |world! |
+--------+-------------------------------------------------+----------------+

💡 为何要 keys.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 serverkey 上的 accept 事件,处理完事件之后将 对应 channel 的 accept 事件移除,但并没有移除 serverkey
  • 第二次触发了 socketkey 上的 read 事件,但这时 selectedKeys 中还有上次的 serverkey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

为什么需要keys.remove().png)

💡 cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

⚠️ 不处理边界的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {

// 1. 创建 Selector,管理多个 Channel
Selector selector = Selector.open();

ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false); // 非阻塞模式

// 2. 建立 Selector 和 Channel 的联系 (注册)
// 通过 SelectionKey 定位 事件所属的 Channel
SelectionKey serverKey = server.register(selector, 0, null);
// 服务端 key 只关注 accept 事件
serverKey.interestOps(SelectionKey.OP_ACCEPT);

log.debug("server key {}",serverKey);

server.bind(new InetSocketAddress(8080));

while (true) {
// 3. select 方法 : 监听事件,没有事件发生,线程阻塞,有事件,线程向下执行
// 之前的事件发生但未处理,线程也不会阻塞,继续执行
selector.select();
// 4. 处理事件 : selectedKeys() 包含所有发生的事件
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
log.debug("key {}",key);

// 5. 根据事件类型分别处理
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
SelectionKey socketKey = socketChannel.register(selector, 0, null);
socketKey.interestOps(SelectionKey.OP_READ);
log.debug("{}",socketChannel);
log.debug("socketKey {}",socketKey);
} else if (key.isReadable()){
try {
SocketChannel channel = (SocketChannel) key.channel(); // 触发事件的 channel
ByteBuffer buffer = ByteBuffer.allocate(4);
int read = channel.read(buffer);
if (read==-1){ // 正常断开 read = -1
key.cancel();
} else {
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
// debugRead(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 异常断开 取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件
}
}
// 处理完毕,必须将事件移除
// 否则再次遍历时,对应的 channel 事件已经处理,为 null,就会报空指针异常
keys.remove();
// key.cancel();
}
}
}
}

hi 正常输出

边界问题1

hello 被拆分 hell o

边界问题2

中文乱码且被拆分为多个部分

边界问题3

处理消息的边界

处理边界问题

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽,即使是比预定长度小的数据,也分配预定长度大小
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式
1
2
3
4
5
6
7
8
9
10
11
sequenceDiagram 
participant c1 as 客户端1
participant s as 服务器
participant b1 as ByteBuffer1
participant b2 as ByteBuffer2
c1 ->> s: 发送 01234567890abcdefray\n
s ->> b1: 第一次 read 存入 01234567890abcdef
s ->> b2: 扩容
b1 ->> b2: 拷贝 01234567890abcdef
s ->> b2: 第二次 read 存入 ray\n
b2 ->> b2: 01234567890abcdefray\n

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {

// 1. 创建 Selector,管理多个 Channel
Selector selector = Selector.open();

ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false); // 非阻塞模式

// 2. 建立 Selector 和 Channel 的联系 (注册)
// 通过 SelectionKey 定位 事件所属的 Channel
SelectionKey serverKey = server.register(selector, 0, null);
// 服务端 key 只关注 accept 事件
serverKey.interestOps(SelectionKey.OP_ACCEPT);

log.debug("server key {}",serverKey);

server.bind(new InetSocketAddress(8080));

while (true) {
// 3. select 方法 : 监听事件,没有事件发生,线程阻塞,有事件,线程向下执行
// 之前的事件发生但未处理,线程也不会阻塞,继续执行
selector.select();
// 4. 处理事件 : selectedKeys() 包含所有发生的事件
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
log.debug("key {}",key);

// 5. 根据事件类型分别处理
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // 每个 socket 有一个独立的 buffer 用于传输数据
SelectionKey socketKey = socketChannel.register(selector, 0, buffer); // 将 buffer 作为附件关联到对应的 socketKey 上即可
socketKey.interestOps(SelectionKey.OP_READ);
log.debug("{}",socketChannel);
log.debug("socketKey {}",socketKey);
} else if (key.isReadable()){
try {
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取 key 上的附件 buffer
SocketChannel channel = (SocketChannel) key.channel(); // 触发事件的 channel
int read = channel.read(buffer);
if (read==-1){ // 正常断开 read = -1
key.cancel();
} else {
split(buffer);
// 由于 split 方法最终会将未读数据向前移动,而如果已经读到最大长度即 position = limit,那么此时需要扩容
if (buffer.position() == buffer.limit()){
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip(); // 切换读模式
newBuffer.put(buffer); // 新 buffer 拷贝旧 buffer
key.attach(newBuffer); // 替换绑定的旧 buffer
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 异常断开 取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件
}
}
// 处理完毕,必须将事件移除
// 否则再次遍历时,对应的 channel 事件已经处理,为 null,就会报空指针异常
keys.remove();
// key.cancel();
}
}
}

private static void split(ByteBuffer source){
source.flip();

for (int i = 0; i < source.limit(); i++) {
// 遍历到换行符即找到一条完整消息
if(source.get(i)=='\n'){
int size = i - source.position() + 1;
// 将这条完整消息存入新的 ByteBuffer
// 分配对应容量的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(size);

// 从 source 读,向 target 写
for (int j = 0; j < size; j++) {
target.put(source.get());
}

debugAll(target);
}
}

source.compact(); // 未读数据向前移动
}
}

客户端

1
2
3
4
5
6
7
8
9
10
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel socket = SocketChannel.open();
socket.connect(new InetSocketAddress("localhost",8080));
// socket.write(StandardCharsets.UTF_8.encode("hello\nworld\n"));
socket.write(StandardCharsets.UTF_8.encode("this is a test data\ndon't reply"));
socket.write(StandardCharsets.UTF_8.encode("01234567890abcdefray\n"));
System.in.read(); // 阻塞方法,等待控制台输入读取
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
08:47:27 [DEBUG] [main] c.n.d.nio.Server - server key sun.nio.ch.SelectionKeyImpl@4fccd51b
08:47:32 [DEBUG] [main] c.n.d.nio.Server - key sun.nio.ch.SelectionKeyImpl@4fccd51b
08:47:32 [DEBUG] [main] c.n.d.nio.Server - java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:61474]
08:47:32 [DEBUG] [main] c.n.d.nio.Server - socketKey sun.nio.ch.SelectionKeyImpl@1c2c22f3
08:47:32 [DEBUG] [main] c.n.d.nio.Server - key sun.nio.ch.SelectionKeyImpl@1c2c22f3
08:47:32 [DEBUG] [main] c.n.d.nio.Server - key sun.nio.ch.SelectionKeyImpl@1c2c22f3
+--------+-------------------- all ------------------------+----------------+
position: [20], limit: [20]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 68 69 73 20 69 73 20 61 20 74 65 73 74 20 64 |this is a test d|
|00000010| 61 74 61 0a |ata. |
+--------+-------------------------------------------------+----------------+

08:47:32 [DEBUG] [main] c.n.d.nio.Server - key sun.nio.ch.SelectionKeyImpl@1c2c22f3
+--------+-------------------- all ------------------------+----------------+
position: [32], limit: [32]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 6f 6e 27 74 20 72 65 70 6c 79 30 31 32 33 34 |don't reply01234|
|00000010| 35 36 37 38 39 30 61 62 63 64 65 66 72 61 79 0a |567890abcdefray.|
+--------+-------------------------------------------------+----------------+

ByteBuffer 大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,会造成数据杂乱冲突,因此需要为每个 channel 维护一个独立的 ByteBuffer。
1
2
ByteBuffer buffer = ByteBuffer.allocate(16);    // 每个 socket 有一个独立的 buffer 用于传输数据
SelectionKey socketKey = socketChannel.register(selector, 0, buffer); // 将 buffer 作为附件关联到对应的 socketKey 上即可
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

处理 write 事件

一次无法写完例子

  • 非阻塞模式下,由于网络传输能力限制,缓冲区写满后无法再写入,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,每次可写均会触发 write 事件

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class WriteServer {

public static void main(String[] args) throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(8080));

Selector selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);

while(true){
selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isAcceptable()){
SocketChannel socket = server.accept();
socket.configureBlocking(false);
SelectionKey socketKey = socket.register(selector, SelectionKey.OP_READ);

// 1. 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 500000; i++) {
sb.append("a");
}

ByteBuffer buffer = StandardCharsets.UTF_8.encode(sb.toString());

// 2. 返回 write 实际写入的字节数
int write = socket.write(buffer);
System.out.println("第一次写入 " + write);

// 3.是否存在剩余内容
if(buffer.hasRemaining()){
// 4. 在原有基础上再关注可写事件
socketKey.interestOps(socketKey.interestOps() + SelectionKey.OP_WRITE);

// 5. 绑定剩余内容
socketKey.attach(buffer);
}
} else if (key.isWritable()){
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socket = (SocketChannel) key.channel();
int write = socket.write(buffer);
System.out.println("可写事件中写入 " + write);

// 6. 没有剩余内容
if (!buffer.hasRemaining()){
key.attach(null); // 取消绑定 buffer
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 无需再关注可写事件
}
}
}
}
}
}

处理写事件1

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel socket = SocketChannel.open();
socket.connect(new InetSocketAddress("localhost",8080));

// 接收数据
int count = 0;
while(true){
ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
count += socket.read(buffer);
System.out.println(count);
buffer.clear();
}

}
}

处理写事件2

💡 write 为何要取消

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发(相当于长连接?),因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

更进一步

💡 利用多线程优化

现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费

前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢?

分两组选择器

  • 单线程配一个选择器,专门处理 accept 事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("Boss");
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(8080));
server.configureBlocking(false);

Selector boss = Selector.open();
SelectionKey bossKey = server.register(boss, SelectionKey.OP_ACCEPT);

// 1. 创建固定数量的 worker
Worker worker = new Worker("worker1");

while (true) {
boss.select();

Iterator<SelectionKey> keys = boss.selectedKeys().iterator();

while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isAcceptable()){
SocketChannel socket = server.accept();
socket.configureBlocking(false);
log.debug("connected... {}",socket.getRemoteAddress());
// 2. 关联 worker 的 selector ,并关注 read 事件
log.debug("before register... {}",socket.getRemoteAddress());
worker.register(socket); // boss 调用 初始化 , 启动 worker1
log.debug("after register... {}",socket.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;

// Atomic 原子类保证 一个 selector 持有 一个 thread
// 也可以放在 构造方法中 进行初始化 ,但最好不要在构造方法中写可能抛异常的代码
private AtomicBoolean register = new AtomicBoolean();

// 任务队列
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) throws IOException {
this.name = name;
// thread = new Thread(this);
// selector = Selector.open();
// thread.start();
// register.set(true);
}

// 初始化 线程 和 selector
private void register(SocketChannel socket) throws IOException {
if (!register.get()) {
thread = new Thread(this);
selector = Selector.open();
thread.start();
register.set(true);
}

// 向队列添加任务,通过 lambda 表达式 表示该任务,不会立刻执行
queue.add(()->{
try {
socket.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});

// 此时仍在 Boss 线程,唤醒 select,使线程不阻塞,即放行 第一次用于 Boss 线程 客户端 socket 关联 worker 的 selector ,并关注 read 事件
selector.wakeup();
}

@Override
public void run() {
while (true) {
try {
selector.select();
Runnable task = queue.poll();
if (task!=null){
task.run(); // 执行 socket.register(selector,SelectionKey.OP_READ,null);
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();

if (key.isReadable()){
SocketChannel socket = (SocketChannel) key.channel();
socket.configureBlocking(false);

log.debug("read... {}",socket.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(16);
socket.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}

}
}
}
}

这里需要注意的是

  1. 如何保证 一个 worker 对应 一个 thread?

利用 原子类 做验证使其在方法中只执行一次。

1
2
3
4
5
6
7
8
9
10
// Atomic 原子类保证 一个 selector 持有 一个 thread
// 也可以放在 构造方法中 进行初始化 ,但最好不要在构造方法中写可能抛异常的代码
private AtomicBoolean register = new AtomicBoolean();

if (!register.get()) {
thread = new Thread(this);
selector = Selector.open();
thread.start();
register.set(true);
}
  1. 如何关联 worker 的 selector?

如果一开始 worker.register 就 thread.start ,那么在 selector.select(); 没有事件发生,worker 线程阻塞,boss 线程也就无法执行 socket.register(selector,SelectionKey.OP_READ,null);

需要改变其执行顺序。

这里利用 延迟队列 来实现线程间通信,并通过 wakeup 唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
// 任务队列
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
// 向队列添加任务,通过 lambda 表达式 表示该任务,不会立刻执行
queue.add(()->{
try {
socket.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});

// 此时仍在 Boss 线程,唤醒 select,使线程不阻塞,即放行 第一次用于 Boss 线程 客户端 socket 关联 worker 的 selector ,并关注 read 事件
selector.wakeup();

也可以直接使用 wakeup

1
2
3
// 唤醒 select,使线程不阻塞,即放行 第一次用于 Boss 线程 客户端 socket 关联 worker 的 selector ,并关注 read 事件
selector.wakeup();
socket.register(selector,SelectionKey.OP_READ,null);

💡 如何拿到 cpu 个数

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

服务端最终版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.netty.demo02.nio;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.netty.demo02.bytebuffer.ByteBufferUtil.debugAll;

/**
* @author Ray
* @date 2023/4/24 14:06
* @description
*/
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("Boss");
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(8080));
server.configureBlocking(false);

Selector boss = Selector.open();
SelectionKey bossKey = server.register(boss, SelectionKey.OP_ACCEPT);

// 1. 创建固定数量的 worker
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}

AtomicInteger index = new AtomicInteger();
while (true) {
boss.select();

Iterator<SelectionKey> keys = boss.selectedKeys().iterator();

while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isAcceptable()){
SocketChannel socket = server.accept();
socket.configureBlocking(false);
log.debug("connected... {}",socket.getRemoteAddress());
// 2. 关联 worker 的 selector ,并关注 read 事件
log.debug("before register... {}",socket.getRemoteAddress());
// Round Robin 轮询
workers[index.getAndIncrement() % workers.length].register(socket); // boss 调用 初始化 , 启动 worker
log.debug("after register... {}",socket.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;

// Atomic 原子类保证 一个 selector 共享同一个 thread
// 也可以放在 构造方法中 进行初始化 ,但最好不要在构造方法中写可能抛异常的代码
private AtomicBoolean register = new AtomicBoolean();

// 任务队列
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) throws IOException {
this.name = name;
// thread = new Thread(this);
// selector = Selector.open();
// thread.start();
// register.set(true);
}

// 初始化 线程 和 selector
private void register(SocketChannel socket) throws IOException {
if (!register.get()) {
thread = new Thread(this);
selector = Selector.open();
thread.start();
register.set(true);
}

// 向队列添加任务,通过 lambda 表达式 表示该任务,不会立刻执行
queue.add(()->{
try {
socket.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});

// 唤醒 select,使线程不阻塞,即放行 第一次用于 Boss 线程 客户端 socket 关联 worker 的 selector ,并关注 read 事件
selector.wakeup();
// socket.register(selector,SelectionKey.OP_READ,null);
}

@Override
public void run() {
Thread.currentThread().setName(name);
while (true) {
try {
selector.select();
Runnable task = queue.poll();
if (task!=null){
task.run(); // 执行 socket.register(selector,SelectionKey.OP_READ,null);
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();

if (key.isReadable()){
SocketChannel socket = (SocketChannel) key.channel();
socket.configureBlocking(false);

log.debug("read... {}",socket.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(16);
socket.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}

}
}
}
}

UDP

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UDPServer {
public static void main(String[] args) throws IOException {
try (DatagramChannel channel= DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debugAll(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
public class UDPClient {
public static void main(String[] args) throws IOException {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost",9999);
channel.send(buffer,address);
} catch (IOException e) {
e.printStackTrace();
}
}
}

NIO vs BIO vs AIO

stream vs channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行

IO 模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

read等待复制

阻塞 IO

阻塞IO

非阻塞 IO

非阻塞IO

多路复用

多路复用

异步 IO

异步IO

阻塞 IO vs 多路复用

阻塞IO2

多路复用2

零拷贝

传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

1
2
3
4
5
6
7
8
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流程是这样的:

传统IO流程

  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU

DMA 也可以理解为硬件单元,用来解放 CPU 完成文件 IO

  1. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA
  2. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝
  3. 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

NIO 优化

通过 DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

NIO直接内存优化

大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步

    • DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

sendFile优化

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 CPU
  2. 数据从内核缓冲区传输到 socket 缓冲区,CPU 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化(linux 2.4)

Linux2.4优化

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 CPU
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是不会拷贝重复数据到 jvm 内存中,零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 CPU 计算,减少 CPU 缓存伪共享
  • 零拷贝适合小文件传输

AIO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

文件 AIO

先来看看 AsynchronousFileChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class AioDemo1 {
public static void main(String[] args) throws IOException {
try{
AsynchronousFileChannel s =
AsynchronousFileChannel.open(
Paths.get("1.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(2);
log.debug("begin...");
s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...{}", result);
buffer.flip();
debug(buffer);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.debug("read failed...");
}
});

} catch (IOException e) {
e.printStackTrace();
}
log.debug("do other things...");
System.in.read();
}
}

输出

1
2
3
4
5
6
7
8
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d |a. |
+--------+-------------------------------------------------+----------------+

可以看到

  • 响应文件读取成功的是另一个线程 Thread-5
  • 主线程并没有 IO 操作阻塞

💡 守护线程

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

网络 AIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}

private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}

private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}

private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}

@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}