0%

Netty-NIO-Buffer&Channel 案例

Netty-NIO-Buffer&Channel 案例

案例1-本地文件写数据

FileChannelWrite
FileChannelWrite
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ChannelDemo {
public static void main(String[] args) throws IOException {
String str = "hello buffer";
// 1. 创建一个输出流
FileOutputStream fileOutputStream = new FileOutputStream("/home/zephon/f.txt");
// 2. 根据输出流获取Channel,对应的是FileChannel
// 实际上其实是FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
// 3. 创建一个缓冲区 ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 4. 将str放入到byteBuffer中
byteBuffer.put(str.getBytes());
// 5. 对byteBuffer进行flip
byteBuffer.flip();
fileChannel.write(byteBuffer);
fileOutputStream.close();
}
}

案例2-本地文件读数据

1
2
3
4
5
6
7
8
9
10
11
public class ChannelDemo2 {
public static void main(String[] args) throws IOException {
File file = new File("/home/zephon/f.txt");
FileInputStream fileInputStream = new FileInputStream(file);
FileChannel fileChannel = fileInputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
fileChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
}
}

案例三-使用一个Buffer完成文件拷贝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ChannelDemo3 {
public static void main(String[] args) throws IOException {
FileInputStream fileInputStream = new FileInputStream("/home/zephon/f.txt");
FileChannel fileChannel = fileInputStream.getChannel();

FileOutputStream fileOutputStream = new FileOutputStream("/home/zephon/desc.txt");
FileChannel outChannel = fileOutputStream.getChannel();

ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while(true){
// 清空buffer,重置
byteBuffer.clear();
int len = fileChannel.read(byteBuffer);
if(len==-1)break;
byteBuffer.flip();
outChannel.write(byteBuffer);
}
fileInputStream.close();
fileOutputStream.close();
}
}

案例四-使用TransferFrom完成文件拷贝

1
2
3
4
5
6
7
8
9
10
11
public class ChannelDemo4 {
public static void main(String[] args) throws IOException {
FileInputStream fileInputStream = new FileInputStream("/home/zephon/f.txt");
FileChannel inChannel = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("/home/zephon/desc.txt");
FileChannel outChannel = fileOutputStream.getChannel();
outChannel.transferFrom(inChannel,0,inChannel.size());
fileInputStream.close();
fileOutputStream.close();
}
}

关于Buffer和Channel的注意事项和细节

  1. ByteBuffer支持类型化的put和get,put放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有BufferUnderflowException异常

  2. 可以将一个普通Buffer围成只读Buffer

  3. NIO还提供了MappedByteBuffer,可以让文件直接在内存(堆外内存)中进行修改,而如何同步到文件由NIO来完成

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class MappedByteBufferTest {
    public static void main(String[] args) throws IOException {
    RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt","rw");
    FileChannel channel = randomAccessFile.getChannel();
    /**
    * FileChannel.MapMode.READ_WRITE:使用读写模式
    * 0:可以直接修改的起始位置
    * 5:映射到内存的大小(不是索引位置),即将1.txt的多少个字节映射到内存
    * 可以直接修改的范围就是[0,5)
    * 实际类型是 DirectByteBuffer
    */
    MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
    mappedByteBuffer.put(0, (byte) 'H');
    mappedByteBuffer.put(3, (byte) '9');
    randomAccessFile.close();
    }
    }
  4. 前面所说的读写操作,都是通过一个Buffer完成的,NIO还支持通过多个Buffer(即Buffer数组)完成读写操作,即Scattering(分散)和Gatering(聚合)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    /**
    * Scattering:将数据写入到buffer时,可以采用buffer数组,依次写入 [分散]
    * Gathering:从buffer读出数据时,可以buffer数组,依次读出
    */
    public class ScatteringAndGatheringTest {
    public static void main(String[] args) throws IOException {
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    InetSocketAddress inetSocketAddress = new InetSocketAddress(8686);
    // 绑定端口并启动
    serverSocketChannel.socket().bind(inetSocketAddress);
    // 创建buffer数组
    ByteBuffer[] byteBuffers = new ByteBuffer[2];
    byteBuffers[0] = ByteBuffer.allocate(5);
    byteBuffers[1] = ByteBuffer.allocate(3);

    // 等待客户端连接
    SocketChannel socketChannel = serverSocketChannel.accept();
    // 假定从客户端接收8个字节
    int messageLength = 8;
    // 循环读取
    while(true){
    int byteRead = 0;
    while(byteRead < messageLength){
    long l = socketChannel.read(byteBuffers);
    byteRead += l;
    System.out.println("byteRead="+byteRead);
    Arrays.asList(byteBuffers).stream()
    .map(buffer->"position="+buffer.position() + ", limit="+buffer.limit())
    .forEach(System.out::println);
    }
    // 将所以buffer进行flip
    Arrays.asList(byteBuffers).forEach(buffer->buffer.flip());
    // 将数据读出显示回客户端
    long byteWrite = 0;
    while(byteWrite < messageLength){
    long l = socketChannel.write(byteBuffers);
    byteWrite += l;
    }
    Arrays.asList(byteBuffers).forEach(buffer->buffer.clear());
    System.out.println("byteRead="+ byteRead + " byteWrite="+ byteWrite + " messageLength:"+messageLength);
    }
    }
    }