0%

Netty-线程模型

Netty-Netty模型

工作原理示意图-简单版

  1. BossGroup线程维护Selector,只关注accept
  2. 当接收到accept事件后,获取到对应的SocketChannel,封装成NIOSocketChannel并注册到Worker线程(事件循环),并进行维护
  3. 当Worker线程监听到selector中通道发生自己感兴趣的事件后,就进行处理(由handler),handler已经加入到通道

工作原理示意图-进阶版

工作原理示意图-详细版

  1. Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接;WorkerGroup专门负责网络的读写
  2. BossGroup和WorkerGroup类型都是NioEventLoopGroup
  3. NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环是NioEventLoop
  4. NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通信
  5. NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop
  6. 每个Boss NioEventLoop循环执行的步骤有3步
    • 1)轮询accept事件
    • 2)处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker NIOEventLoop上的selector
    • 3)处理任务队列的任务,即runAllTasks
  7. 每个Worker NioEventLoop循环执行的步骤
    • 1)轮询read、write事件
    • 2)处理IO事件,即read、write事件,在对应的NioSocketChannel处理
    • 处理任务队列的任务,即runAllTasks
  8. 每个Worker NIOEventLoop处理业务时,会使用pipline(管道),pipeline中包含了channel,即通过pipeline,可以获取到对应通道, 管道中维护了很多的处理器

案例-TCP服务

要求:

  1. Netty服务器在6668端口监听,客户端能发送消息给服务器"hello server"
  2. 服务器可以回复消息给客户端"hello client"

服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.zephon.netty.netty.chatting;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;

public class Server {
public static void main(String[] args) throws InterruptedException {
// 创建BossGroup 和WorkerGroup
// 1. 创建两个线程组,两个都是无限循环
// bossGroup处理连接请求
// bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数,默认是cpu核数×2
EventLoopGroup bossGroup = new NioEventLoopGroup();
// workerGroup真正和客户端业务处理
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
// 创建服务器端的启动对象,配置参数
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)// 设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true)// 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
// 给pipeline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}); // 给workerGroup的EventLoop对应的管道设置处理器
System.out.println("...server is ready...");
// 绑定一个端口并且同步,生成一个ChannelFuture对象
// 启动服务器(并绑定端口)
ChannelFuture channelFuture = b.bind(6668).sync();

// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}
class NettyServerHandler extends ChannelInboundHandlerAdapter{
/**
* 读取数据
* @param ctx 上下文对象,含有管道pipeline、通道、地址
* @param msg 客户端发送的数据,默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx="+ctx);
// 将msg转成ByteBuf(由netty提供)
ByteBuf buf = (ByteBuf) msg;
System.out.println("client send msg:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("client address:"+ctx.channel().remoteAddress());
}

/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将数据写入缓冲区,并刷新
// 一般对发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client~",CharsetUtil.UTF_8));
}

/**
* 处理异常,一般需要关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.zephon.netty.netty.chatting;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

public class Client {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
// 创建客户端启动对象
// 客户端使用的不是ServerBootStrap,而是BootStrap
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors) // 设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加自己的处理器
socketChannel.pipeline().addLast(new NettyClientHandler());
}
}); // 设置处理器
System.out.println("client is ok...");
// 启动客户端去连接服务器
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
// 给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter{
/**
* 当通道就绪就会触发该方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client ctx:"+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8));
}

/**
* 当通道有读取事件时会触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器地址:"+ctx.channel().remoteAddress());
}

/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

任务队列中的Task有3种典型使用场景

问题:

1
2
3
4
5
6
7
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如这里有一个非常耗时的业务 -> 异步执行 -> 提交该channel对应的NioEventLoop的taskQueue中
// 默认会阻塞,等10秒后继续执行
TimeUnit.SECONDS.sleep(10);
ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务", CharsetUtil.UTF_8));
System.out.println("go on...");
}
  1. 用户程序自定义的普通任务

    对应代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 如这里有一个非常耗时的业务 -> 异步执行 -> 提交该channel对应的NioEventLoop的taskQueue中
    // 默认会阻塞,等10秒后继续执行
    // TimeUnit.SECONDS.sleep(10);
    // ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务", CharsetUtil.UTF_8));

    // 解决方案1:用户程序自定义的普通任务
    ctx.channel().eventLoop().execute(() -> {
    try {
    TimeUnit.SECONDS.sleep(10);
    ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务1", CharsetUtil.UTF_8));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    ctx.channel().eventLoop().execute(() -> {
    try {
    // 与初始相比间隔30秒
    TimeUnit.SECONDS.sleep(20);
    ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务2", CharsetUtil.UTF_8));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    System.out.println("go on...");
    }
  2. 用户自定义定时任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 如这里有一个非常耗时的业务 -> 异步执行 -> 提交该channel对应的NioEventLoop的taskQueue中
    // 默认会阻塞,等10秒后继续执行
    // TimeUnit.SECONDS.sleep(10);
    // ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务", CharsetUtil.UTF_8));

    // 用户自定义定时任务 -> 该任务提交到scheduleTaskQueue中
    ctx.channel().eventLoop().schedule(()->{
    try {
    TimeUnit.SECONDS.sleep(5);
    ctx.writeAndFlush(Unpooled.copiedBuffer("耗时业务", CharsetUtil.UTF_8));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },5,TimeUnit.SECONDS);

    System.out.println("go on...");
    }
  3. 非当前Reactor线程调用Channel的各种方法

    如:在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费

本质说明

  1. Netty抽象出两组线程池,BossGroup专门负责接收客户端连接,WorkerGroup专门负责网络读写操作
  2. NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用户监听绑定在其上的socket网络通信
  3. NioEventLoop内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO线程NioEventLoop负责
    • NioEventLoopGroup下包含多个NioEventLoop
    • 每个NioEventLoop中包含有一个Selector,一个taskQueue
    • 每个NioEventLoop的Selector上可以注册监听多个NioChannel
    • 每个NioChannel只会绑定在唯一的NioEventLoop上
    • 每个NioChannel都绑定有一个自己的ChannelPipeline