0%

Netty-NIO-案例

Netty-NIO-案例

NIO非阻塞网络编程原理分析图

  1. 当客户端连接时,会通过ServerSocketChannel得到对应的SocketChannel
  2. Selector进行监听(select()方法),返回有事件发生的通道的个数
  3. 将SocketChannel注册到Selector上register(Selector sel,int ops),一个Selector上可以注册多个SocketChannel
  4. 注册后返回一个SelectionKey,会和该Selector关联(以集合的方式)
  5. 进一步得到各个SelectionKey(有事件发生)
  6. 再通过SelectionKey反向获取SocketChannel,方法是channel()
  7. 可以通过得到的channel,完成业务处理

案例1-简单通讯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class NIOServer {
public static void main(String[] args) throws IOException {
// 1. 创建ServerSocketChannel -> BIO中的ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2. 得到一个Selector对象
Selector selector = Selector.open();
// 3. 绑定端口,在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
// 4. 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 5. 将ServerSocketChannel注册到Selector中,关心事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6. 循环等待客户端连接
while(true){
// 等待1秒,如果没有事件发生,返回
if(selector.select(1000)==0){
// 没有事件发生
System.out.println("服务器等待了1秒,无连接");
continue;
}
// 如果返回>0,表示已经获取到关注的事件
// 获取到相关的selectionKeys集合(关注事件的集合)
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
// 根据selectionKey对应的通道发生的事件,做对应的处理
if(selectionKey.isAcceptable()){
// 如果是OP_ACCEPT,有新客户端连接
// 给该客户端生成一个 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功,生成SocketChannel"+socketChannel.hashCode());
// 设置为非阻塞式的
socketChannel.configureBlocking(false);
// 将socketChannel注册到Selector,关注事件为OP_READ,同时关联一个Buffer
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if(selectionKey.isReadable()){
// 如果是OP_READ,可以读取数据
// 通过key反向获取对应Channel
SocketChannel channel = (SocketChannel) selectionKey.channel();
// 获取到该Channel关联的Buffer
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
// 读取数据
int read = channel.read(buffer);
// cancel防止客户端断开连接后死循环输出
if(read<0)selectionKey.cancel();
System.out.println("from client:"+new String(buffer.array()));
}
// 手动从集合中移除当前selectionKey,防止重复操作
iterator.remove();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NIOClient {
public static void main(String[] args) throws IOException {
// 1. 得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
// 2. 设置非阻塞模式
socketChannel.configureBlocking(false);
// 3. 提供服务器IP和端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1",6666);
// 4. 连接服务器
if(!socketChannel.connect(inetSocketAddress)){
while(!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作...");
}
}
// 如果连接成功,发送数据
String str = "hello word";
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
// 发送数据,将Buffer数据写入Channel
socketChannel.write(byteBuffer);

System.in.read();
}
}

SelectionKey

  1. SelectionKey,表示Selector和网络通信的注册关系,共四种:

    • int OP_ACCEPT:有新的网络连接可以accept,值为16

    • int OP_CONNECT:代表连接已经建立,值为8

    • int OP_READ:代表读操作,值为1

    • int OP_WRITE:代表写操作,值为4

  2. SelectionKey相关方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 得到与之关联的Selector对象
    public abstract Selector selector();
    // 得到与之关系的通道
    public abstract SelectableChannel channel();
    // 得到与之关联的共享数据
    public final Object attachment();
    // 设置或改变监听事件
    public abstract SelectionKey interestOps(int ops);
    // 是否可以accpet
    public final boolean isAcceptable();
    // 是否可以读
    public final boolean isReadable();
    // 是否可以写
    public final boolean isWriteable();

ServerSocketChannel

  1. ServerSocketChannel在服务器端监听新的客户端Socket连接

  2. 相关方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 得到一个ServerSocketChannel通道
    public static ServerSocketChannel open();
    // 设置服务器端端口号
    public final ServerSocketChannel bind(SocketAddress local);
    // 设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
    public final SelectableChannel configureBlocking(boolean block);
    // 接受一个连接,返回代表这个连接的通道对象
    public SocketChannel accept();
    // 注册一个选择器并设置监听事件
    public final SelectionKey register(Selector sel,int ops);

SocketChannel

  1. SocketChannel,网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区

  2. 相关方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 得到一个SocketChannel通道
    public static SocketChannel open();
    // 设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
    public final SelectableChannel configureBlocking(boolean block);
    // 连接服务器
    public boolean connect(SocketAddress remote);
    // 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
    public boolean finishConnect();
    // 往通道里写数据
    public int write(ByteBuffer src);
    // 从通道里读数据
    public int read(ByteBuffer dst);
    // 注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
    public final SelectionKey register(Selector sel,int ops,Object att);
    // 关闭通道
    public final void close();

案例2-群聊系统

功能要求:

  1. 实现服务器和客户端之间数据简单通讯
  2. 实现多人群聊
  3. 服务器端可以监测用户上线、离线并实现消息转发功能
  4. 客户端可以无阻塞发送消息给其它用户,同时可以接收其它用户发送的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public class Server {
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 6667;
public Server(){
try {
selector = Selector.open();
listenChannel = ServerSocketChannel.open();
listenChannel.socket().bind(new InetSocketAddress(PORT));
// 设置 非阻塞
listenChannel.configureBlocking(false);
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen(){
try {
while(true){
int count = selector.select();
if(count>0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isAcceptable()){
SocketChannel sc = listenChannel.accept();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress()+"上线");
}
if(key.isReadable()){
readData(key);
}
iterator.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {

}
}

/**
* 读取客户端信息
* @param key
*/
private void readData(SelectionKey key){
// 定义一个SocketChannel
SocketChannel channel = null;
try{
// 取关联channel
channel = (SocketChannel) key.channel();
// 创建buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
if(count > 0){
// 将缓冲区数据转字符串
String msg = new String(buffer.array());
System.out.println("form 客户端:"+msg);
// 向其它客户端转发消息(排除自己)
sendInfoToOtherClients(msg,channel);
}

}catch (IOException e){
try {
System.out.println(channel.getRemoteAddress()+"离线了");
// 取消注册
key.cancel();
// 关闭通道
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
// e.printStackTrace();
}
}

/**
* 转发消息给其它客户端
* @param msg
* @param self
*/
private void sendInfoToOtherClients(String msg,SocketChannel self) throws IOException {
Iterator<SelectionKey> iterator = selector.keys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
Channel target = key.channel();
// 排除self
if(target instanceof SocketChannel && target != self){
SocketChannel dest = (SocketChannel) target;
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
dest.write(buffer);
}
}
}
public static void main(String[] args) throws IOException {
Server server = new Server();
server.listen();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class Client {
private final String HOST = "127.0.0.1";
private final int PORT = 6667;
private Selector selector;
private SocketChannel socketChannel;
private String username;
public Client() throws IOException {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username +" is ok ...");
}
public void sendInfo(String info){
info = username+":"+info;
ByteBuffer buffer = ByteBuffer.wrap(info.getBytes());
try {
socketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
public void readInfo(){
try {
int readChannels = selector.select();
if(readChannels>0){
Iterator<SelectionKey> iterator = selector.keys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
if(read>0){
System.out.println(new String(buffer.array()).trim());
}
}
iterator.remove();
}
}else{
// System.out.println("没有可用通道...");

}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
// 启动客户端
Client client = new Client();
// 启动一个线程每隔3秒读取数据
new Thread(()->{
while(true){
client.readInfo();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 发送数据
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String s = scanner.nextLine();
client.sendInfo(s);
}
}
}