Netty-Netty模型
工作原理示意图-简单版
- BossGroup线程维护Selector,只关注accept
- 当接收到accept事件后,获取到对应的SocketChannel,封装成NIOSocketChannel并注册到Worker线程(事件循环),并进行维护
- 当Worker线程监听到selector中通道发生自己感兴趣的事件后,就进行处理(由handler),handler已经加入到通道
工作原理示意图-进阶版
工作原理示意图-详细版
- Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接;WorkerGroup专门负责网络的读写
- BossGroup和WorkerGroup类型都是NioEventLoopGroup
- NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环是NioEventLoop
- NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通信
- NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop
- 每个Boss NioEventLoop循环执行的步骤有3步
- 1)轮询accept事件
- 2)处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker NIOEventLoop上的selector
- 3)处理任务队列的任务,即runAllTasks
- 每个Worker NioEventLoop循环执行的步骤
- 1)轮询read、write事件
- 2)处理IO事件,即read、write事件,在对应的NioSocketChannel处理
- 处理任务队列的任务,即runAllTasks
- 每个Worker NIOEventLoop处理业务时,会使用pipline(管道),pipeline中包含了channel,即通过pipeline,可以获取到对应通道, 管道中维护了很多的处理器
案例-TCP服务
要求:
- Netty服务器在6668端口监听,客户端能发送消息给服务器"hello server"
- 服务器可以回复消息给客户端"hello client"
服务器:
1 | package com.zephon.netty.netty.chatting; |
客户端:
1 | package com.zephon.netty.netty.chatting; |
任务队列中的Task有3种典型使用场景
问题:
1
2
3
4
5
6
7 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如这里有一个非常耗时的业务 -> 异步执行 -> 提交该channel对应的NioEventLoop的taskQueue中
// 默认会阻塞,等10秒后继续执行
TimeUnit.SECONDS.sleep(10);
ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务", CharsetUtil.UTF_8));
System.out.println("go on...");
}
用户程序自定义的普通任务
对应代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如这里有一个非常耗时的业务 -> 异步执行 -> 提交该channel对应的NioEventLoop的taskQueue中
// 默认会阻塞,等10秒后继续执行
// TimeUnit.SECONDS.sleep(10);
// ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务", CharsetUtil.UTF_8));
// 解决方案1:用户程序自定义的普通任务
ctx.channel().eventLoop().execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务1", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ctx.channel().eventLoop().execute(() -> {
try {
// 与初始相比间隔30秒
TimeUnit.SECONDS.sleep(20);
ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务2", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("go on...");
}用户自定义定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如这里有一个非常耗时的业务 -> 异步执行 -> 提交该channel对应的NioEventLoop的taskQueue中
// 默认会阻塞,等10秒后继续执行
// TimeUnit.SECONDS.sleep(10);
// ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务", CharsetUtil.UTF_8));
// 用户自定义定时任务 -> 该任务提交到scheduleTaskQueue中
ctx.channel().eventLoop().schedule(()->{
try {
TimeUnit.SECONDS.sleep(5);
ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
},5,TimeUnit.SECONDS);
System.out.println("go on...");
}非当前Reactor线程调用Channel的各种方法
如:在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费
本质说明
- Netty抽象出两组线程池,BossGroup专门负责接收客户端连接,WorkerGroup专门负责网络读写操作
- NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用户监听绑定在其上的socket网络通信
- NioEventLoop内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO线程NioEventLoop负责
- NioEventLoopGroup下包含多个NioEventLoop
- 每个NioEventLoop中包含有一个Selector,一个taskQueue
- 每个NioEventLoop的Selector上可以注册监听多个NioChannel
- 每个NioChannel只会绑定在唯一的NioEventLoop上
- 每个NioChannel都绑定有一个自己的ChannelPipeline