JAVA新一代NIO模型学习记录
NIO的三大组件是:Buffer、Channel、Seclector
缓冲区buffer
在NIO中数据的读写是按块进行读写,每一个buffer代表一个数据块
基本使用
- Java的八大数据类型中,除了boolean类型外,都有对应的Buffer类型,例如:int类型有IntBuffer
- 常用的是ByteBuffer,Buffer的本质就是封装了一个数组,内部维护了索引,长度,当前位置等信息
- 使用allocate()方法指定缓冲区数据块的大小
- 使用put()方法向缓冲区存放数据
- 使用flip()方法切换读写模式
- 使用get()方法向缓冲区取出数据
/**
* 学习buffer的使用
*/
public class _Buffer {
public static void main(String[] args) {
//创建应该大小为5的int类型的buffer缓冲期
IntBuffer intBuffer = IntBuffer.allocate(5);
//写入数据
for (int i = 0; i < 5; i++) {
intBuffer.put(i);
}
//切换读取模式
intBuffer.flip();
//读取数据
while (intBuffer.hasRemaining()) {
int value = intBuffer.get();
System.out.println(value);
}
}
}
管道channel
在老版本的IO中读写数据,是用通过单向的输入流InputStream或者输出流OutputStream,只能单向操作
在新版本的NIO中读写数据则是通过管道Channel,管道Channel可以进行读也可以进行写
案例
-
案例一(使用FileChannel向文件写入数据)
graph LR; a(获取文件输出流)--->b[通过文件输出流获取FileChannel]; b--->c[创建一个缓冲区Buffer]; c--->d[向缓冲区Buffer写入数据]; d--->e[将缓冲区Buufer写入Channel]; e--->f(关闭相应的流,释放资源);- 通过文件流的getChannel()方法获取FileChannel
- 将带有数据的Buffer通过Channel的write()方法写入到管道FileChannel中
- 释放资源
/** * 使用FileChannel向文件写入数据 */ public class ChannelInstance01 { public static void main(String[] args) throws IOException { //数据 String data = "杨逸正在学习NIO的Channel"; //目标文件 File file = new File("C:\\Users\\huang\\Desktop\\test.txt"); //输出流 FileOutputStream fileOutputStream = new FileOutputStream(file); //获取channel FileChannel fileChannel = fileOutputStream.getChannel(); //创建buffer缓冲期 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //写入数据 byteBuffer.put(data.getBytes()); // byteBuffer.flip(); //向channel写入数据 fileChannel.write(byteBuffer); //关闭流 fileChannel.close(); } } -
案例二(使用FileChannel读文件)
graph LR; a(获取Channel)--->b[将数据读取到缓冲区Buffer]; b--->c[显示数据]; c--->d(释放资源);/** * 使用FileChannel读文件 */ public class ChannelInstance02 { public static void main(String[] args) throws IOException { //目标文件 File file = new File("C:\\Users\\huang\\Desktop\\test.txt"); //输入流 FileInputStream fileInputStream = new FileInputStream(file); //获取channel FileChannel fileChannel = fileInputStream.getChannel(); //获取channel中数据的长度 long size = fileChannel.size(); //创建缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(Long.valueOf(size).intValue()); //将channel中的数据读取到缓冲区 fileChannel.read(byteBuffer); //将字节数据转换成字符串表示 String data = new String(byteBuffer.array()); System.out.println("data = " + data); //关闭流 fileChannel.close(); } } -
案例三(FileChannel配合一个Buffer拷贝文件)
graph TB; a(获取Channel)--->b[创建缓冲区Buffer]; b--->c1[将源Channel的数据读取到缓冲区Buffer]; c1--->c2[将缓冲区Buufer的数据写入到目标Channel]; c2--->c3[重置缓冲区Buffer]; c3-->c1; c3-->d(关闭管道Channel,释放资源);/** * 使用一个buffer进行文件拷贝 */ public class ChannelInstance03 { public static void main(String[] args) throws IOException { //原始文件和目标文件 File source = new File("C:\\Users\\huang\\Documents\\note\\2024\\01\\Kotlin学习记录.md"); File target = new File("C:\\Users\\huang\\Desktop\\Kotlin学习记录.md"); //输入流和输出流 FileInputStream fileInputStream = new FileInputStream(source); FileOutputStream fileOutputStream = new FileOutputStream(target); //获取对应的channel FileChannel fileInputChannel = fileInputStream.getChannel(); FileChannel fileOutputChannel = fileOutputStream.getChannel(); //创建buffer缓冲区 int size = 1024; ByteBuffer byteBuffer = ByteBuffer.allocate(size); //拷贝数据 while (true) { //读 int read = fileInputChannel.read(byteBuffer); if (read <= 0)break; //反转索引(重置索引) byteBuffer.flip(); //写 int write = fileOutputChannel.write(byteBuffer); //重置缓冲区 byteBuffer.clear(); } //关闭流 fileOutputChannel.close(); fileInputChannel.close(); } } -
案例四(文件拷贝)
/** * 文件拷贝案例 * 使用Channel的transform()方法 */ public class FileCopyInstance { public static void main(String[] args) throws IOException { //原始文件和目标文件 File source = new File("C:\\Users\\huang\\Pictures\\Saved Pictures\\back round.png"); File target = new File("C:\\Users\\huang\\Desktop\\back round.png"); //输入流和输出流 FileInputStream fileInputStream = new FileInputStream(source); FileOutputStream fileOutputStream = new FileOutputStream(target); //获取对应的channel FileChannel fileInputChannel = fileInputStream.getChannel(); FileChannel fileOutputChannel = fileOutputStream.getChannel(); //拷贝文件 /** * 参数说明 * 第一个,源数据channel * 第二个,开始拷贝的位置 * 第三个,拷贝的长度(大小) */ fileOutputChannel.transferFrom(fileInputChannel,0,fileInputChannel.size()); //关闭流 fileOutputChannel.close(); fileInputChannel.close(); } } -
案例五(网络通信和缓冲区的聚合和分散)
public class ServeSocketChannelTest { public static void main(String[] args) throws IOException { //启动一个网络服务,绑定到9550端口 InetSocketAddress inetSocketAddress = new InetSocketAddress(9550); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(inetSocketAddress); //缓冲区的聚合和分散 ByteBuffer[] byteBuffers = new ByteBuffer[2]; byteBuffers[0] = ByteBuffer.allocate(5); byteBuffers[1] = ByteBuffer.allocate(3); //获取连接的channel SocketChannel socketChannel = serverSocketChannel.accept(); int length = 8; while(true){ long read = 0l; //读取数据,读满8个字节 while(read < length){ //将数据读到buffer缓冲区数组里 long l = socketChannel.read(byteBuffers); read += l; System.out.println("read = " + read); } //写入数据 Arrays.asList(byteBuffers).forEach(buffer->buffer.flip()); socketChannel.write(byteBuffers); Arrays.asList(byteBuffers).forEach(buffer->buffer.clear()); //关闭连接 // socketChannel.close(); } } }
选择器selector
案例
-
网络服务(服务端与客户端进行数据交互)
-
服务端
graph TB; a(1.创建服务端Channel和选择器Selector)-->b[2.将Channel的创建连接事件注册到选择器Selector]; b-->c1{3.是否触发事件}; c1--连接事件-->c2[3.1.1连接事件]; c2-->d1[3.1.2获取发生事件的selectedKey]; d1-->e1[3.1.3创建连接的Channel并将可读事件注册到选择器Selector上]; e1--->c1; c1--可读事件-->c3[3.2.1可读事件]; c3-->d2[3.2.2获取发生事件的selectedKey]; d2-->e2[3.2.3从selectionKey对象中获取Chnanel,并将数据读取出来]; e2-->f2[3.2.4响应客户端,向Channel写入数据]; f2--->c1;/** * 使用selector的非阻塞客户端 * 1.创建连接 * 2.读取客户端的数据 */ public class NIOServer { public static void main(String[] args) throws IOException { //网络服务绑定端口 InetSocketAddress inetSocketAddress = new InetSocketAddress(9550); ServerSocketChannel serverSocketChannel = ServerSocketChannel .open() .bind(inetSocketAddress); System.out.println("服务器启动成功"); //设置使用非阻塞模式 serverSocketChannel.configureBlocking(false); //创建selector选择器 Selector selector = Selector.open(); //将服务器channel的访问连接事件注册到selector选择器 SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); while (true){ //没有事件发生 if (selector.select(1000) == 0) { System.out.println("等待1秒,无连接"); continue; } //发生事件,获取所有事件的SelectorKey Set<SelectionKey> selectionKeys = selector.selectedKeys(); //依次处理发生的事件 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); //连接事件 if (key.isAcceptable()) { System.out.println("==连接事件处理=="); //创建连接 SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); //将连接的channel注册到selector,并指定一个buffer缓冲区对象 socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.printf("服务器创建连接:%s,时间:%dl%n",socketChannel.hashCode(),System.currentTimeMillis()); } //读事件 if (key.isReadable()) { System.out.println("==读事件处理=="); //获取对应的channel SocketChannel socketChannel = (SocketChannel)key.channel(); //获取指定的buffer缓冲区 ByteBuffer byteBuffer = (ByteBuffer) key.attachment(); //读取数据 socketChannel.read(byteBuffer); String message = new String(byteBuffer.array()); System.out.println("message = " + message); //重置缓冲区 byteBuffer.clear(); key.cancel(); } //处理完成删除对应的SelectorKey,防止重复执行 iterator.remove(); } } } } -
客户端
flowchart TD A[开始] --> B[创建SocketChannel连接] B --> C{是否连接成功?} C -- 是 --> D[配置SocketChannel为非阻塞模式] C -- 否 --> E[打印'正在连接服务器'] E --> F{是否完成连接?} F -- 是 --> G[打印'连接到服务器'] F -- 否 --> F G --> H[准备数据: message] H --> I[创建ByteBuffer缓冲区] I --> J[向SocketChannel写入数据] J --> K[关闭SocketChannel连接] K --> L[结束]public class NIOClient { public static void main(String[] args) throws IOException { //创建客户端连接channel SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9550)); socketChannel.configureBlocking(false); if (!socketChannel.isConnected()) { System.out.println(String.format("%dl-正在连接服务器",System.currentTimeMillis())); while(!socketChannel.finishConnect()){ } System.out.println(String.format("%dl-连接到服务器",System.currentTimeMillis())); } //数据 String message = String.format("时间:%s,杨逸正在学NIO的Selector","2025-01-26"); //创建一个与数据大小一致的缓冲区 ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); //向连接channel写入数据 socketChannel.write(buffer); //关闭连接 socketChannel.close(); } }
-
-
网络聊天室
sequenceDiagram participant Client as 客户端 participant Server as 服务端 Client->>Server: 1. 连接到服务器 (SocketChannel.connect) Server-->>Client: 2. 连接成功 Server->>Client: 3. 广播客户端上线信息 loop 聊天循环 Client->>Server: 4. 发送消息 (sendMessage) Server->>Client: 5. 转发消息给其他客户端 (sendMessage) Client->>Client: 6. 显示接收到的消息 (receiveMessage) end Client->>Server: 7. 发送退出消息 (!q) Server->>Client: 8. 广播客户端下线信息 Server-->>Client: 9. 关闭连接 Client->>Client: 10. 退出程序-
服务端
flowchart TD A(开始) --> B[初始化服务端] B --> C[创建ServerSocketChannel并绑定端口] C --> D[配置ServerSocketChannel为非阻塞模式] D --> E[创建Selector] E --> F[注册ServerSocketChannel到Selector, 监听ACCEPT事件] F --> G[进入监听循环] G --> H{Selector.select是否有事件?} H -- 是 --> I[获取SelectionKey集合] I --> J[遍历SelectionKey集合] J --> K{事件类型?} K -- ACCEPT --> L[接受客户端连接] L --> M[配置SocketChannel为非阻塞模式] M --> N[注册SocketChannel到Selector, 监听READ事件] N --> O[广播客户端上线信息] O --> J K -- READ --> P{SelectionKey是否有效?} P -- 是 --> Q[读取客户端数据] Q --> R{读取是否成功?} R -- 是 --> S[处理客户端数据] R -- 否 --> T[处理客户端断开连接] T --> U[取消SelectionKey并关闭SocketChannel] U --> J S --> V{消息是否为退出指令?} V -- 是 --> W[广播客户端下线信息并关闭连接] V -- 否 --> X[转发消息给其他客户端] X --> J H -- 否 --> G/** * 使用NIO实现的非阻塞网络聊天室 */ public class GroupChatServer { //选择器 private Selector selector; //服务端连接 private ServerSocketChannel serverSocketChannel; //服务的端口 public static final int PORT = 9550; public static void main(String[] args) throws IOException { new GroupChatServer(); } //服务端初始化 public GroupChatServer() throws IOException { serverSocketChannel = ServerSocketChannel .open() .bind(new InetSocketAddress(PORT)); serverSocketChannel.configureBlocking(false); selector = Selector.open(); //注册到选择器上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //监听网络连接 listen(); } private void listen() throws IOException { while(true){ if (selector.select(1000) == 0) { // System.out.println(String.format("time:%dl,无事发生",System.currentTimeMillis())); continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); //连接访问 if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); String mes = String.format("客户端%s连接成功", socketChannel.hashCode()); System.out.println(mes); //向所有客户端广播上线信息 broadcast(mes); } //读数据,可读且合法才进行读取 if (key.isReadable() && key.isValid()) { SocketChannel socketChannel = (SocketChannel) key.channel(); //缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读数据 int read = 0; //客户端的意外断开连接异常会导致服务端崩溃 try { read = socketChannel.read(buffer); } catch (IOException e) { e.printStackTrace(); String mes = String.format("客户端%s已下线", socketChannel.hashCode()); System.out.println(mes); key.cancel(); iterator.remove(); socketChannel.close(); //尝试广播下线信息,但是SelectionKey中还有已断开连接的channel //broadcast(mes); continue; } String message = new String(buffer.array(), 0, read); System.out.println(String.format("form 客户端%s的信息:%s",socketChannel.hashCode(),message)); //向其他连接channel转发信息 if ("!q".equals(message)) { String mes = new StringBuilder("客户端") .append(socketChannel.hashCode()) .append("下线") .toString(); broadcast(mes); socketChannel.close(); }else { sendMessage(socketChannel,message); } } //删除事件 iterator.remove(); } } } //向其他连接发送数据 private void sendMessage(SocketChannel socketChannel, String message) throws IOException { //获取所有的连接 Set<SelectionKey> keys = selector.keys(); //组织数据 byte[] data = new StringBuilder("用户") .append(socketChannel.hashCode()) .append(':') .append(message) .toString().getBytes(StandardCharsets.UTF_8); for (SelectionKey key : keys) { SelectableChannel channel = key.channel(); if (channel instanceof SocketChannel && !channel.equals(socketChannel)) { //发送数据 ((SocketChannel) channel).write(ByteBuffer.wrap(data)); System.out.println(String.format("服务端向客户端%s转发数据",channel.hashCode())); } } } //广播数据 private void broadcast(String message) throws IOException { //获取所有的连接 Set<SelectionKey> keys = selector.keys(); //组织数据 byte[] data = new StringBuilder("服务器") .append(':') .append(message) .toString().getBytes(StandardCharsets.UTF_8); for (SelectionKey key : keys) { SelectableChannel channel = key.channel(); if (channel instanceof SocketChannel) { //发送数据 ((SocketChannel) channel).write(ByteBuffer.wrap(data)); System.out.println(String.format("服务端向客户端%s广播数据",channel.hashCode())); } } } } -
客户端
flowchart TD A[开始] --> B[初始化客户端] B --> C[创建SocketChannel] C --> D[配置SocketChannel为非阻塞模式] D --> E[连接服务端] E --> F{是否连接成功?} F -- 是 --> G[注册SocketChannel到Selector, 监听READ事件] F -- 否 --> H[等待连接完成] H --> G G --> I[启动输入线程] I --> J[进入消息接收循环] J --> K{Selector.select是否有事件?} K -- 是 --> L[获取SelectionKey集合] L --> M[遍历SelectionKey集合] M --> N{事件类型?} N -- READ --> O[读取服务端数据] O --> P{读取是否成功?} P -- 是 --> Q[显示服务端消息] P -- 否 --> R[设置退出标志] R --> S[退出循环] Q --> J K -- 否 --> J S --> T[关闭连接] T --> U[结束]public class GroupChatClient { public static String IP = "127.0.0.1"; public static int PORT = 9550; private static Selector selector; private static SocketChannel socketChannel; //退出标志 public static int flag = 0; public static void main(String[] args) throws IOException { GroupChatClient client = new GroupChatClient(); //标准输入 Scanner input = new Scanner(System.in); Thread thread = new Thread(() -> { while(flag == 0){ System.out.print("我:"); String mes = input.nextLine(); if ("!q".equals(mes)) { flag = -1; // continue; } client.sendMessage(mes); } }); thread.start(); while(flag == 0){ client.receiveMessage(); } System.out.println("退出连接"); } public GroupChatClient() throws IOException { selector = Selector.open(); //创建连接 socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //连接服务端 socketChannel.connect(new InetSocketAddress(IP, PORT)); if (!socketChannel.isConnected()) { System.out.println("正在连接到服务端..."); while (!socketChannel.finishConnect()) { } } System.out.println("成功连接到服务端"); //注册到选择器 socketChannel.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024)); } //发送数据 void sendMessage(String message){ ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)); try { int write = socketChannel.write(byteBuffer); } catch (IOException e) { e.printStackTrace(); System.err.println(String.format("发送数据失败,数据:%s",message)); } } //接受数据 void receiveMessage() throws IOException { if (selector.select(1000) == 0) { return; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { if (selectionKey.isReadable() && selectionKey.isValid()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = (ByteBuffer) selectionKey.attachment(); int read = 0; try { read = socketChannel.read(buffer); } catch (IOException e) { e.printStackTrace(); flag = -1; continue; } String mes = new String(buffer.array(), 0, read); buffer.clear(); //标准输出的格式控制,向前移动两位, System.out.print("\b\b"); System.out.println(mes); System.out.print("我:"); } selectionKeys.remove(selectionKey); } } }
-