JAVA新一代NIO模型学习记录

NIO的三大组件是:Buffer、Channel、Seclector

缓冲区buffer

在NIO中数据的读写是按块进行读写,每一个buffer代表一个数据块

基本使用

  • Java的八大数据类型中,除了boolean类型外,都有对应的Buffer类型,例如:int类型有IntBuffer
  • 常用的是ByteBuffer,Buffer的本质就是封装了一个数组,内部维护了索引,长度,当前位置等信息
  • 使用allocate()方法指定缓冲区数据块的大小
  • 使用put()方法向缓冲区存放数据
  • 使用flip()方法切换读写模式
  • 使用get()方法向缓冲区取出数据
/**
 * 学习buffer的使用
 */
public class _Buffer {
    public static void main(String[] args) {
        //创建应该大小为5的int类型的buffer缓冲期
        IntBuffer intBuffer = IntBuffer.allocate(5);

        //写入数据
        for (int i = 0; i < 5; i++) {
            intBuffer.put(i);
        }

        //切换读取模式
        intBuffer.flip();

        //读取数据
        while (intBuffer.hasRemaining()) {
            int value = intBuffer.get();
            System.out.println(value);
        }
    }
}

管道channel

在老版本的IO中读写数据,是用通过单向的输入流InputStream或者输出流OutputStream,只能单向操作

在新版本的NIO中读写数据则是通过管道Channel,管道Channel可以进行读也可以进行写

案例

  1. 案例一(使用FileChannel向文件写入数据)

    graph LR;
    a(获取文件输出流)--->b[通过文件输出流获取FileChannel];
    b--->c[创建一个缓冲区Buffer];
    c--->d[向缓冲区Buffer写入数据];
    d--->e[将缓冲区Buufer写入Channel];
    e--->f(关闭相应的流,释放资源);
    
    1. 通过文件流的getChannel()方法获取FileChannel
    2. 将带有数据的Buffer通过Channel的write()方法写入到管道FileChannel中
    3. 释放资源
    /**
     * 使用FileChannel向文件写入数据
     */
    public class ChannelInstance01 {
        public static void main(String[] args) throws IOException {
            //数据
            String data = "杨逸正在学习NIO的Channel";
            //目标文件
            File file = new File("C:\\Users\\huang\\Desktop\\test.txt");
            //输出流
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            //获取channel
            FileChannel fileChannel = fileOutputStream.getChannel();
            //创建buffer缓冲期
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            //写入数据
            byteBuffer.put(data.getBytes());
            //
            byteBuffer.flip();
            //向channel写入数据
            fileChannel.write(byteBuffer);
            //关闭流
            fileChannel.close();
        }
    }
    
  2. 案例二(使用FileChannel读文件)

    graph LR;
    a(获取Channel)--->b[将数据读取到缓冲区Buffer];
    b--->c[显示数据];
    c--->d(释放资源);
    
    /**
     * 使用FileChannel读文件
     */
    public class ChannelInstance02 {
        public static void main(String[] args) throws IOException {
            //目标文件
            File file = new File("C:\\Users\\huang\\Desktop\\test.txt");
            //输入流
            FileInputStream fileInputStream = new FileInputStream(file);
            //获取channel
            FileChannel fileChannel = fileInputStream.getChannel();
            //获取channel中数据的长度
            long size = fileChannel.size();
            //创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(Long.valueOf(size).intValue());
            //将channel中的数据读取到缓冲区
            fileChannel.read(byteBuffer);
            //将字节数据转换成字符串表示
            String data = new String(byteBuffer.array());
            System.out.println("data = " + data);
            //关闭流
            fileChannel.close();
        }
    }
    
  3. 案例三(FileChannel配合一个Buffer拷贝文件)

    graph TB;
    a(获取Channel)--->b[创建缓冲区Buffer];
    b--->c1[将源Channel的数据读取到缓冲区Buffer];
    c1--->c2[将缓冲区Buufer的数据写入到目标Channel];
    c2--->c3[重置缓冲区Buffer];
    c3-->c1;
    c3-->d(关闭管道Channel,释放资源);
    
    /**
     * 使用一个buffer进行文件拷贝
     */
    public class ChannelInstance03 {
        public static void main(String[] args) throws IOException {
            //原始文件和目标文件
            File source = new File("C:\\Users\\huang\\Documents\\note\\2024\\01\\Kotlin学习记录.md");
            File target = new File("C:\\Users\\huang\\Desktop\\Kotlin学习记录.md");
            //输入流和输出流
            FileInputStream fileInputStream = new FileInputStream(source);
            FileOutputStream fileOutputStream = new FileOutputStream(target);
            //获取对应的channel
            FileChannel fileInputChannel = fileInputStream.getChannel();
            FileChannel fileOutputChannel = fileOutputStream.getChannel();
            //创建buffer缓冲区
            int size = 1024;
            ByteBuffer byteBuffer = ByteBuffer.allocate(size);
            //拷贝数据
            while (true) {
                //读
                int read = fileInputChannel.read(byteBuffer);
                if (read <= 0)break;
                //反转索引(重置索引)
                byteBuffer.flip();
                //写
                int write = fileOutputChannel.write(byteBuffer);
                //重置缓冲区
                byteBuffer.clear();
            }
            //关闭流
            fileOutputChannel.close();
            fileInputChannel.close();
        }
    }
    
  4. 案例四(文件拷贝)

    /**
     * 文件拷贝案例
     * 使用Channel的transform()方法
     */
    public class FileCopyInstance {
        public static void main(String[] args) throws IOException {
            //原始文件和目标文件
            File source = new File("C:\\Users\\huang\\Pictures\\Saved Pictures\\back round.png");
            File target = new File("C:\\Users\\huang\\Desktop\\back round.png");
            //输入流和输出流
            FileInputStream fileInputStream = new FileInputStream(source);
            FileOutputStream fileOutputStream = new FileOutputStream(target);
            //获取对应的channel
            FileChannel fileInputChannel = fileInputStream.getChannel();
            FileChannel fileOutputChannel = fileOutputStream.getChannel();
            //拷贝文件
            /**
             * 参数说明
             * 第一个,源数据channel
             * 第二个,开始拷贝的位置
             * 第三个,拷贝的长度(大小)
             */
            fileOutputChannel.transferFrom(fileInputChannel,0,fileInputChannel.size());
            //关闭流
            fileOutputChannel.close();
            fileInputChannel.close();
        }
    }
    
  5. 案例五(网络通信和缓冲区的聚合和分散)

    public class ServeSocketChannelTest {
        public static void main(String[] args) throws IOException {
            //启动一个网络服务,绑定到9550端口
            InetSocketAddress inetSocketAddress = new InetSocketAddress(9550);
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(inetSocketAddress);
            //缓冲区的聚合和分散
            ByteBuffer[] byteBuffers = new ByteBuffer[2];
            byteBuffers[0] = ByteBuffer.allocate(5);
            byteBuffers[1] = ByteBuffer.allocate(3);
            //获取连接的channel
            SocketChannel socketChannel = serverSocketChannel.accept();
            int length = 8;
            while(true){
                long read = 0l;
                //读取数据,读满8个字节
                while(read < length){
                    //将数据读到buffer缓冲区数组里
                    long l = socketChannel.read(byteBuffers);
                    read += l;
                    System.out.println("read = " + read);
                }
    
                //写入数据
                Arrays.asList(byteBuffers).forEach(buffer->buffer.flip());
                socketChannel.write(byteBuffers);
                Arrays.asList(byteBuffers).forEach(buffer->buffer.clear());
                //关闭连接
    //            socketChannel.close();
            }
        }
    }
    

选择器selector

案例

  1. 网络服务(服务端与客户端进行数据交互)

    • 服务端

      graph TB;
      a(1.创建服务端Channel和选择器Selector)-->b[2.将Channel的创建连接事件注册到选择器Selector];
      b-->c1{3.是否触发事件};
      
      c1--连接事件-->c2[3.1.1连接事件];
      c2-->d1[3.1.2获取发生事件的selectedKey];
      d1-->e1[3.1.3创建连接的Channel并将可读事件注册到选择器Selector上];
      e1--->c1;
      
      c1--可读事件-->c3[3.2.1可读事件];
      c3-->d2[3.2.2获取发生事件的selectedKey];
      d2-->e2[3.2.3从selectionKey对象中获取Chnanel,并将数据读取出来];
      e2-->f2[3.2.4响应客户端,向Channel写入数据];
      f2--->c1;
      
      /**
       * 使用selector的非阻塞客户端
       * 1.创建连接
       * 2.读取客户端的数据
       */
      public class NIOServer {
          public static void main(String[] args) throws IOException {
              //网络服务绑定端口
              InetSocketAddress inetSocketAddress = new InetSocketAddress(9550);
              ServerSocketChannel serverSocketChannel = ServerSocketChannel
                      .open()
                      .bind(inetSocketAddress);
              System.out.println("服务器启动成功");
              //设置使用非阻塞模式
              serverSocketChannel.configureBlocking(false);
              //创建selector选择器
              Selector selector = Selector.open();
              //将服务器channel的访问连接事件注册到selector选择器
              SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
              while (true){
                  //没有事件发生
                  if (selector.select(1000) == 0) {
                      System.out.println("等待1秒,无连接");
                      continue;
                  }
                  //发生事件,获取所有事件的SelectorKey
                  Set<SelectionKey> selectionKeys = selector.selectedKeys();
                  //依次处理发生的事件
                  Iterator<SelectionKey> iterator = selectionKeys.iterator();
                  while (iterator.hasNext()) {
                      SelectionKey key = iterator.next();
                      //连接事件
                      if (key.isAcceptable()) {
                          System.out.println("==连接事件处理==");
                          //创建连接
                          SocketChannel socketChannel = serverSocketChannel.accept();
                          socketChannel.configureBlocking(false);
                          //将连接的channel注册到selector,并指定一个buffer缓冲区对象
                          socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                          System.out.printf("服务器创建连接:%s,时间:%dl%n",socketChannel.hashCode(),System.currentTimeMillis());
                      }
                      //读事件
                      if (key.isReadable()) {
                          System.out.println("==读事件处理==");
                          //获取对应的channel
                          SocketChannel socketChannel = (SocketChannel)key.channel();
                          //获取指定的buffer缓冲区
                          ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                          //读取数据
                          socketChannel.read(byteBuffer);
                          String message = new String(byteBuffer.array());
                          System.out.println("message = " + message);
                          //重置缓冲区
                          byteBuffer.clear();
                          key.cancel();
                      }
                      //处理完成删除对应的SelectorKey,防止重复执行
                      iterator.remove();
                  }
      
              }
          }
      }
      
    • 客户端

      flowchart TD
          A[开始] --> B[创建SocketChannel连接]
          B --> C{是否连接成功?}
          C -- 是 --> D[配置SocketChannel为非阻塞模式]
          C -- 否 --> E[打印'正在连接服务器']
          E --> F{是否完成连接?}
          F -- 是 --> G[打印'连接到服务器']
          F -- 否 --> F
          G --> H[准备数据: message]
          H --> I[创建ByteBuffer缓冲区]
          I --> J[向SocketChannel写入数据]
          J --> K[关闭SocketChannel连接]
          K --> L[结束]
      
      public class NIOClient {
          public static void main(String[] args) throws IOException {
              //创建客户端连接channel
              SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9550));
              socketChannel.configureBlocking(false);
              if (!socketChannel.isConnected()) {
                  System.out.println(String.format("%dl-正在连接服务器",System.currentTimeMillis()));
                  while(!socketChannel.finishConnect()){
      
                  }
                  System.out.println(String.format("%dl-连接到服务器",System.currentTimeMillis()));
              }
              //数据
              String message = String.format("时间:%s,杨逸正在学NIO的Selector","2025-01-26");
              //创建一个与数据大小一致的缓冲区
              ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
              //向连接channel写入数据
              socketChannel.write(buffer);
              //关闭连接
              socketChannel.close();
          }
      }
      
  2. 网络聊天室

    sequenceDiagram
        participant Client as 客户端
        participant Server as 服务端
    
        Client->>Server: 1. 连接到服务器 (SocketChannel.connect)
        Server-->>Client: 2. 连接成功
        Server->>Client: 3. 广播客户端上线信息
    
        loop 聊天循环
            Client->>Server: 4. 发送消息 (sendMessage)
            Server->>Client: 5. 转发消息给其他客户端 (sendMessage)
            Client->>Client: 6. 显示接收到的消息 (receiveMessage)
        end
    
        Client->>Server: 7. 发送退出消息 (!q)
        Server->>Client: 8. 广播客户端下线信息
        Server-->>Client: 9. 关闭连接
        Client->>Client: 10. 退出程序
    
    1. 服务端

      flowchart TD
          A(开始) --> B[初始化服务端]
          B --> C[创建ServerSocketChannel并绑定端口]
          C --> D[配置ServerSocketChannel为非阻塞模式]
          D --> E[创建Selector]
          E --> F[注册ServerSocketChannel到Selector, 监听ACCEPT事件]
          F --> G[进入监听循环]
          G --> H{Selector.select是否有事件?}
          H -- 是 --> I[获取SelectionKey集合]
          I --> J[遍历SelectionKey集合]
          J --> K{事件类型?}
          K -- ACCEPT --> L[接受客户端连接]
          L --> M[配置SocketChannel为非阻塞模式]
          M --> N[注册SocketChannel到Selector, 监听READ事件]
          N --> O[广播客户端上线信息]
          O --> J
          K -- READ --> P{SelectionKey是否有效?}
          P -- 是 --> Q[读取客户端数据]
          Q --> R{读取是否成功?}
          R -- 是 --> S[处理客户端数据]
          R -- 否 --> T[处理客户端断开连接]
          T --> U[取消SelectionKey并关闭SocketChannel]
          U --> J
          S --> V{消息是否为退出指令?}
          V -- 是 --> W[广播客户端下线信息并关闭连接]
          V -- 否 --> X[转发消息给其他客户端]
          X --> J
          H -- 否 --> G
      
      /**
       * 使用NIO实现的非阻塞网络聊天室
       */
      public class GroupChatServer {
          //选择器
          private Selector selector;
          //服务端连接
          private ServerSocketChannel serverSocketChannel;
          //服务的端口
          public static final int PORT = 9550;
          public static void main(String[] args) throws IOException {
              new GroupChatServer();
          }
          //服务端初始化
          public GroupChatServer() throws IOException {
              serverSocketChannel = ServerSocketChannel
                      .open()
                      .bind(new InetSocketAddress(PORT));
              serverSocketChannel.configureBlocking(false);
              selector = Selector.open();
              //注册到选择器上
              serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
              //监听网络连接
              listen();
          }
      
          private void listen() throws IOException {
              while(true){
                  if (selector.select(1000) == 0) {
      //                System.out.println(String.format("time:%dl,无事发生",System.currentTimeMillis()));
                      continue;
                  }
                  Set<SelectionKey> selectionKeys = selector.selectedKeys();
                  Iterator<SelectionKey> iterator = selectionKeys.iterator();
                  while (iterator.hasNext()) {
                      SelectionKey key = iterator.next();
                      //连接访问
                      if (key.isAcceptable()) {
                          SocketChannel socketChannel = serverSocketChannel.accept();
                          socketChannel.configureBlocking(false);
                          socketChannel.register(selector, SelectionKey.OP_READ);
                          String mes = String.format("客户端%s连接成功", socketChannel.hashCode());
                          System.out.println(mes);
                          //向所有客户端广播上线信息
                          broadcast(mes);
                      }
                      //读数据,可读且合法才进行读取
                      if (key.isReadable() && key.isValid()) {
                          SocketChannel socketChannel = (SocketChannel) key.channel();
                          //缓冲区
                          ByteBuffer buffer = ByteBuffer.allocate(1024);
                          //读数据
                          int read = 0;
                          //客户端的意外断开连接异常会导致服务端崩溃
                          try {
                              read = socketChannel.read(buffer);
                          } catch (IOException e) {
                              e.printStackTrace();
                              String mes = String.format("客户端%s已下线", socketChannel.hashCode());
                              System.out.println(mes);
                              key.cancel();
                              iterator.remove();
                              socketChannel.close();
                              //尝试广播下线信息,但是SelectionKey中还有已断开连接的channel
                              //broadcast(mes);
                              continue;
                          }
                          String message = new String(buffer.array(), 0, read);
                          System.out.println(String.format("form 客户端%s的信息:%s",socketChannel.hashCode(),message));
                          //向其他连接channel转发信息
                          if ("!q".equals(message)) {
                              String mes = new StringBuilder("客户端")
                                      .append(socketChannel.hashCode())
                                      .append("下线")
                                      .toString();
                              broadcast(mes);
                              socketChannel.close();
                          }else {
                              sendMessage(socketChannel,message);
                          }
                      }
                      //删除事件
                      iterator.remove();
                  }
              }
          }
          //向其他连接发送数据
          private void sendMessage(SocketChannel socketChannel, String message) throws IOException {
              //获取所有的连接
              Set<SelectionKey> keys = selector.keys();
              //组织数据
              byte[] data = new StringBuilder("用户")
                      .append(socketChannel.hashCode())
                      .append(':')
                      .append(message)
                      .toString().getBytes(StandardCharsets.UTF_8);
              for (SelectionKey key : keys) {
                  SelectableChannel channel = key.channel();
                  if (channel instanceof SocketChannel && !channel.equals(socketChannel)) {
                      //发送数据
                      ((SocketChannel) channel).write(ByteBuffer.wrap(data));
                      System.out.println(String.format("服务端向客户端%s转发数据",channel.hashCode()));
                  }
              }
          }
          //广播数据
          private void broadcast(String message) throws IOException {
              //获取所有的连接
              Set<SelectionKey> keys = selector.keys();
              //组织数据
              byte[] data = new StringBuilder("服务器")
                      .append(':')
                      .append(message)
                      .toString().getBytes(StandardCharsets.UTF_8);
              for (SelectionKey key : keys) {
                  SelectableChannel channel = key.channel();
                  if (channel instanceof SocketChannel) {
                      //发送数据
                      ((SocketChannel) channel).write(ByteBuffer.wrap(data));
                      System.out.println(String.format("服务端向客户端%s广播数据",channel.hashCode()));
                  }
              }
          }
      }
      
    2. 客户端

      flowchart TD
          A[开始] --> B[初始化客户端]
          B --> C[创建SocketChannel]
          C --> D[配置SocketChannel为非阻塞模式]
          D --> E[连接服务端]
          E --> F{是否连接成功?}
          F -- 是 --> G[注册SocketChannel到Selector, 监听READ事件]
          F -- 否 --> H[等待连接完成]
          H --> G
          G --> I[启动输入线程]
          I --> J[进入消息接收循环]
          J --> K{Selector.select是否有事件?}
          K -- 是 --> L[获取SelectionKey集合]
          L --> M[遍历SelectionKey集合]
          M --> N{事件类型?}
          N -- READ --> O[读取服务端数据]
          O --> P{读取是否成功?}
          P -- 是 --> Q[显示服务端消息]
          P -- 否 --> R[设置退出标志]
          R --> S[退出循环]
          Q --> J
          K -- 否 --> J
          S --> T[关闭连接]
          T --> U[结束]
      
      public class GroupChatClient {
          public static String IP = "127.0.0.1";
          public static int PORT = 9550;
          private static Selector selector;
          private static SocketChannel socketChannel;
      
          //退出标志
          public static int flag = 0;
          public static void main(String[] args) throws IOException {
              GroupChatClient client = new GroupChatClient();
              //标准输入
              Scanner input = new Scanner(System.in);
              Thread thread = new Thread(() -> {
                  while(flag == 0){
                      System.out.print("我:");
                      String mes = input.nextLine();
                      if ("!q".equals(mes)) {
                          flag = -1;
      //                    continue;
                      }
                      client.sendMessage(mes);
                  }
              });
              thread.start();
              while(flag == 0){
                  client.receiveMessage();
              }
              System.out.println("退出连接");
          }
      
          public GroupChatClient() throws IOException {
              selector = Selector.open();
              //创建连接
              socketChannel = SocketChannel.open();
              //设置非阻塞
              socketChannel.configureBlocking(false);
              //连接服务端
              socketChannel.connect(new InetSocketAddress(IP, PORT));
              if (!socketChannel.isConnected()) {
                  System.out.println("正在连接到服务端...");
                  while (!socketChannel.finishConnect()) {
      
                  }
              }
              System.out.println("成功连接到服务端");
              //注册到选择器
              socketChannel.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
          }
      
          //发送数据
          void sendMessage(String message){
              ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
              try {
                  int write = socketChannel.write(byteBuffer);
              } catch (IOException e) {
                  e.printStackTrace();
                  System.err.println(String.format("发送数据失败,数据:%s",message));
              }
          }
          //接受数据
          void receiveMessage() throws IOException {
              if (selector.select(1000) == 0) {
                  return;
              }
              Set<SelectionKey> selectionKeys = selector.selectedKeys();
              for (SelectionKey selectionKey : selectionKeys) {
                  if (selectionKey.isReadable() && selectionKey.isValid()) {
                      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                      ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                      int read = 0;
                      try {
                          read = socketChannel.read(buffer);
                      } catch (IOException e) {
                          e.printStackTrace();
                          flag = -1;
                          continue;
                      }
                      String mes = new String(buffer.array(), 0, read);
                      buffer.clear();
                      //标准输出的格式控制,向前移动两位,
                      System.out.print("\b\b");
                      System.out.println(mes);
                      System.out.print("我:");
                  }
                  selectionKeys.remove(selectionKey);
              }
          }
      }