0%

Netty

概述

原生NIO存在的问题

  1. NIO的类库和API繁杂,使用麻烦
  2. 需要具备其它的额外技能如Java多线程编程、网络编程等
  3. 开发工作量和难度都非常大,如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等。
  4. JDK NIO存在一些Bug

Netty

  1. Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络IO程序
  2. Netty可以帮助我们快速、简单的开发出一个网络应用,相当于简化和流程化了NIO的开发过程
  3. Netty是目前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用。知名的Elasticsearch、Dubbo框架内部都采用了Netty

Netty的优点

  1. 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池
  2. 使用方便:详细记录了Javadoc,用户指南和示例,没有其它依赖项
  3. 高性能、吞吐量更高:延迟更低,减少资源消耗,最小化不必要的内存复制
  4. 安全:完整的SSL/TLC和StartTLS支持
  5. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的bug可以被及时修复,同时,更多的新功能会被加入。

Netty高性能架构设计

线程模型

目前存在的线程模型有:

  • 传统阻塞I/O服务模型
  • Reactor模式;根据Reactor的数量和处理资源池线程的数量不同,有3种典型的实现:
    • 单Reactor单线程
    • 单Reactor多线程
    • 主从Reactor多线程

Netty线程模式主要就是基于主从Reactor多线程模型做了一定的改进。

传统阻塞I/O服务模型

采用阻塞I/O模式获取输入的数据,每个连接都需要独立的线程完成数据的输入、业务处理和数据返回

存在的问题:

  1. 当并发数很大时,就会创建大量的线程,占用很多系统资源
  2. 连接创建后,如果当前线程暂时没有数据可读,该线程就会被阻塞,导致线程资源浪费

Reactor模式

Reactor模式解决传统阻塞I/O服务模型存在问题的方案:

  1. 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
  2. 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

核心组成:

  1. Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。
  2. Handlers:处理程序执行I/O事件要完成的实际事件。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。

单Reactor单线程

单Reactor单线程
单Reactor单线程

优点:

  • 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成

缺点:

  • 性能问题,只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接上的业务时,整个进程无法处理其它连接事件,容易导致性能瓶颈
  • 可靠性问题,线程意外终止,或进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。

单Reactor多线程

单Reactor多线程
单Reactor多线程
  1. Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行分发
  2. 如果建立连接请求,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理各种事件
  3. 如果不是连接请求,则由Reactor分发调用对应的Handler来处理
  4. Handler只负责响应事件,不做具体业务处理,通过read读取数据后分发给后面的worker线程池的某个线程处理业务
  5. Worker线程池会分配独立线程完成真正的业务并将结果返回给Handler
  6. Handler收到响应后,通过send将结果返回给客户端

优点:

  • 可以充分利用多核CPU的处理能力

缺点:

  • 多线程数据共享和访问比较复杂
  • reactor处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现瓶颈

主从Reactor多线程

主从Reactor多线程
主从Reactor多线程
  1. Reactor主线程MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件
  2. 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor
  3. SubReactor将连接加入到连接队列进行监听,并创建Handler进行各种事件处理
  4. 当有新事件发生时,SubReactor就会调用对应的Handler处理
  5. Handler通过read读取数据,分发给Worker线程池分配独立的线程进行业务处理并返回结果
  6. Handler收到响应的结果后,再通过send将结果返回给Client
  7. Reactor主线程可以对应多个Reactor子线程,即MainReactor可以关联多个SubReactor

优点:

  • 父线程和子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理

  • 父线程和子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据

缺点:

  • 编程复杂度较高

Netty模型

Netty模型
Netty模型
  1. Netty抽象出两组线程池:BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写
  2. BossGroup和WorkerGroup类型都是NioEventLoopGroup
  3. NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环是NioEventLoop
  4. NioEventLoop表示一个不断循环地执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定其上的socket的网络通讯
  5. NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop
  6. 每个Boss下的NioEventLoop循环执行的步骤:
    1. 轮询accept事件
    2. 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个Worker NioEventLoop上的Selector
    3. 处理任务队列的任务,即runAllTasks
  7. 每个Worker下的NioEventLoop循环执行的步骤:
    1. 轮询read、write事件
    2. 处理I/O事件,即read、write事件,在对应的NioSocketChannel处理
    3. 处理任务队列的任务,即runAllTasks
  8. 每个Worker的NioEventLoop在处理业务时,会使用pipeline,pipeline中包含了channel,即通过pipeline可以获取到对应的channel,pipeline中维护了很多的处理器

简单案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据
*
* @param ctx 上下文对象,包含:pipeline、channel、地址
* @param msg 客户端发送的数据,默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx=" + ctx);
// 将msg转成ByteBuf
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息:" + buf.toString(StandardCharsets.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
}

/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将数据写入到缓冲区并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", StandardCharsets.UTF_8));
}

/**
* 处理异常,一般需要关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class NettyServer {
public static void main(String[] args) throws IOException, InterruptedException {
// 创建BossGroup
// BossGroup和WorkerGroup含有的子线程(NioEventLoop)个数默认是CPU核数*2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建WorkerGroup,默认分发机制是轮询,即第9个客户端会再分配给第1个线程
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
// 创建服务端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置连接保持活动状态
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建一个通道初始化对象
// 给pipeline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new NettyServerHandler());
}
}); // 给WorkerGroup的EventLoop对应的pipeline设置处理器

System.out.println("服务器已准备好");
// 绑定端口并同步,生成一个ChannelFuture对象
// 启动服务器
ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully(); // 优雅地关闭
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪时就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", StandardCharsets.UTF_8));
}

/**
* 当通道有读取事件时触发
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复:" + buf.toString(StandardCharsets.UTF_8));
System.out.println("服务器地址:" + ctx.channel().remoteAddress());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端只需要一个事件循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 客户端启动对象
Bootstrap bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端已准备好");
// 客户端连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}

几个问题:

  1. ChannelHandlerContext、Channel和Pipeline的关系

    Channel和Pipeline的关系是互相包含的关系,Channel中包含了pipeline,pipeline中也包含了channel,而ChannelHandlerContext(上下文)则是同时包含了channel和pipeline)

任务队列(TaskQueue)

三种典型使用场景:

  1. 用户程序自定义的普通任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 如果有一个非常耗时的业务就需要异步执行,提交到该Channel对应的NioEventLoop的taskQueue
    ctx.channel().eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    try {
    Thread.sleep(10 * 1000);
    ctx.writeAndFlush(Unpooled.copiedBuffer("2->hello, 客户端~\n", StandardCharsets.UTF_8));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    System.out.println("OK");
    }
  2. 用户自定义定时任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 定时任务,提交到scheduleTaskQueue中
    ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
    try {
    Thread.sleep(5 * 1000);
    ctx.writeAndFlush(Unpooled.copiedBuffer("10s->hello, 客户端~", StandardCharsets.UTF_8));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    },
    5, // 时长
    TimeUnit.SECONDS); // 时间单位
    System.out.println("OK");
    }
  3. 非当前Reactor线程调用Channel的各种方法

    例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列后被异步消费

异步模型

  1. 异步的概念和同步相对。当一个异步过程调用发出后,不能立刻得到结果。实际处理这个调用的组件 在完成后,通过状态、通知和回调来通知调用者
  2. Netty中的I/O操作是异步的,包含Bind、Write、Connect等操作会简单的返回一个ChannelFuture
  3. 调用者不能立即得到结果,而是通过Future-Listener机制,用户可以方便地主动获取或者通过通知机制获得IO操作结果
  4. Netty异步模型是建立在future和callback之上的。callback就是回调。future的核心思想是:假设一个方法fun,计算过程非常耗时,等待fun返回显然不合适,则可以在调用fun的时候立即返回一个future,后续可以通过future去监控fun方法的处理过程(即:Future-Listener机制)

Future说明:

  1. 表示异步的执行结果,可以通过它提供的方法来检测执行是否完成
  2. ChannelFuture是一个接口,可以添加监听器,当监听的事件发生时,就会通知到监听器

例:

1
2
3
4
5
6
7
8
9
10
11
12
// 绑定端口并同步,生成一个ChannelFuture对象
// 启动服务器
ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 给channelFuture注册监听器,监控事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("监听端口8888成功");
}
}
});

相比传统阻塞I/O,执行I/O操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量

Http服务端案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}
1
2
3
4
5
6
7
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("myHttpServerCodec", new HttpServerCodec()) // Netty提供的编解码器,处理HTTP的编解码器
.addLast("myHttpServerHandler", new HttpServerHandler()); // 添加自定义的处理器
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* SimpleChannelInboundHandler是ChannelInboundHandlerAdapter的子类
* HttpObject:客户端和服务器端相互通讯的数据被封装成HttpObject
*/
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 读取客户端数据
* @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
* belongs to
* @param msg the message to handle
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
// 判断msg是否是一个HttpRequest请求
if(msg instanceof HttpRequest){
System.out.println("msg 类型=" + msg.getClass());
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
HttpRequest request = (HttpRequest) msg;
URI uri = new URI(request.uri());
// 通过uri过滤指定资源
if("/favicon.ico".equals(uri.getPath())){
System.out.println("请求了favicon.ico,不做响应");
return;
}
// 回复信息给浏览器 [HTTP协议]
ByteBuf content = Unpooled.copiedBuffer("Hello,客户端", StandardCharsets.UTF_8);
// 构造HTTP响应
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 将构建好的response返回
ctx.channel().writeAndFlush(response);
}
}
}

Netty核心模块组件

Bootstrap、ServerBootstrap

一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类

常用方法

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup):用于服务器端,用来设置两个EventLoopGroup(bossGroup和workerGroup)
  • public ServerBootstrap group(EventLoopGroup group):用于客户端,用来设置一个EventLoopGroup
  • public B channel(Class<? extends C> channelClass):用来设置一个服务器端的通道实现
  • public <T> B option(ChannelOption<T> option, T value):用来给ServerChannel添加配置
  • public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value):用来给接收到的通道添加配置
  • public ServerBootstrap childHandler(ChannelHandler childHandler):用来设置业务处理类(自定义Handler),对应的workerGroup
  • final ChannelHandler handler():该Handler对应bossGroup
  • public ChannelFuture bind(int inetPort):用于服务器端,用来设置占用的端口号
  • public ChannelFuture connect(String inetHost, int inetPort):用户客户端,用来连接服务器端

Future和ChannelFuture

Netty中所有的IO操作都是异步的,不能立刻得到消息是否被正确处理,但可以过一会等它执行完成或直接注册一个监听,具体的实现就是通过Future和ChannelFuture,它们可以注册一个监听,当操作执行成功或失败时监听就会自动触发注册的监听事件

常用方法

  • Channel channel():返回当前正在进行I/O操作的通道
  • ChannelFuture sync():等待异步操作执行完毕(将异步变同步)

Channel

  1. Netty网络通信的组件,能够用于执行网络I/O操作
  2. 通过Channel可获得当前网络连接的通道的状态
  3. 通过Channel可获得网络连接的配置参数(如接收缓冲区大小)
  4. Channel提供异步的网络I/O操作(如建立连接、读写、绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成
  5. 调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上,可以在I/O操作成功、失败或取消时回调通知调用方
  6. 支持关联I/O操作与对应的处理程序
  7. 不同协议、不同的阻塞类型的连接都有不同的Channel类型与之对应,常用的Channel类型:
    • NioSocketChannel:异步的客户端TCP Socket连接
    • NioServerSocketChannel:异步的服务器端TCP Socket连接
    • NioDatagramChannel:异步的UDP连接
    • NioSctpChannel:异步的客户端Sctp连接
    • NioSctpServerChannel:异步的Sctp服务器端连接

Selector

  1. Netty基于Selector对象实现I/O多路复用,通过Selector一个线程可以监听多个连接的Channel事件
  2. 当向一个Selector中注册Channel后,Selector内部的机制就可以自动不断地查询(Select)这些注册的Channel是否有已就绪的I/O事件,这样程序就可以简单地使用一个线程高效管理多个Channel

ChannelHandler及其实现类

  1. ChannelHandler是一个接口,处理I/O事件或拦截I/O操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序
  2. ChannelHandler本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间可以继承它的子类
ChannelHandler
ChannelHandler
  • ChannelInboundHandler用于处理入站I/O事件
  • ChannelOutboundHandler用于处理出站I/O操作

可能需要重写的方法

  • public void channelActive(ChannelHandlerContext ctx):通道就绪事件
  • public void channelRead(ChannelHandlerContext ctx, Object msg):通道读取数据事件
  • public void channelReadComplete(ChannelHandlerContext ctx):通道数据读取完毕事件
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause):通道发生异常事件

Pipeline和ChannelPipeline

  1. ChannelPipeline是一个Handler的集合,负责处理和拦截inbound或者outbound的事件和操作,相当于一个贯穿Netty的链。(也可以理解成ChannelPipeline是保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作)
  2. ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个ChannelHandler如何相互交互
  3. 在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应
Pipeline和ChannelPipeline
Pipeline和ChannelPipeline
  • 一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个ChannelHandlerContext中又关联着一个ChannelHandler
  • 入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站handler,两种类型的handler互不干扰

常用方法

  • ChannelPipeline addLast(ChannelHandler... handlers):将一个业务处理类(handler)添加到最后一个位置
  • ChannelPipeline addFirst(ChannelHandler... handlers):将一个业务处理类(handler)添加到第一个位置

ChannelHandlerContext

  1. 保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象
  2. ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和channel的信息,方便对ChannelHandler进行调用

常用方法

  • ChannelFuture close():关闭通道
  • ChannelOutboundInvoker flush();:刷新
  • ChannelFuture writeAndFlush(Object msg):将数据写到ChannelPipeline当前ChannelHandler的下一个ChannelHandler处理

ChannelOption

  1. Netty创建Channel实例后,一般都需要设置ChannelOption参数,如:
    • ChannelOption.SO_BACKLOG:对应TCP/IP协议listen函数中的backlog参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。
    • ChannelOption.SO_KEEPALIVE:一直保持连接活动状态

EventLoopGroup和其实现类NioEventLoopGroup

  1. EventLoopGroup是一组EventLoop的抽象,Netty为了更好的利用多核CPU资源,一般会有多个EventLoop同时工作,每个EventLoop维护着一个Selector实例
  2. EventLoopGroup提供next接口,可以从组里面按照一定规则获取其中一个EventLoop来处理任务。在Netty服务器编程中,一般都需要提供两个EventLoopGroup
  3. 通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程。BossEventLoop负责接收客户端的连接并将SocketChannel交给WorkerEventLoopGroup来进行IO处理
    • BossEventLoopGroup通常是一个单线程的EventLoop,EventLoop维护着一个注册了ServerSocketChannel的Selector实例,BossEventLoop不断轮询Selector将连接事件分离出来。
    • 通常是OP_ACCEPT事件,然后将接收到的SocketChannel交给WorkerEventLoopGroup
    • WorkerEventLoopGroup会由next选择其中一个EventLoop来将这个SocketChannel注册到其维护了Selector并对其后续的I/O事件进行处理。

常用方法

  • public NioEventLoopGroup(int nThreads):构造方法
  • public Future<?> shutdownGracefully():断开连接,关闭线程

Unpooled类

  1. Netty提供的一个专门用来操作缓冲区(即Netty的数据容器)的工具类
  2. ByteBuf中,不需要使用flip进行反转,底层维护了readIndex和writeIndex,通过readIndex、writeIndex和capacity将buffer分成三个区域

常用方法

  • public static ByteBuf copiedBuffer(CharSequence string, Charset charset):通过给定的数据和字符编码返回一个ByteBuf对象(类似于NIO中的ByteBuffer,但有区别)

群聊案例

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class GroupChatServer {
private int port;

public GroupChatServer(int port) {
this.port = port;
}

public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new StringDecoder()) // 向pipeline加解码器
.addLast("encoder", new StringEncoder()) // 加编码器
.addLast(new GroupChatServerHandler());
}
});
System.out.println("服务器启动");
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

public static void main(String[] args) throws InterruptedException {
new GroupChatServer(9999).run();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
// 定义Channel组,管理所有的Channel
// GlobalEventExecutor.INSTANCE:全局事件执行器
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

/**
* 表示连接建立,此时将当前channel加入到channelGroup
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 将消息推送给所有客户端
channelGroup.writeAndFlush(LocalDateTime.now().format(formatter) + ":" + "[客户端]" + channel.remoteAddress() + "加入聊天!\r\n"); // 该方法会将channelGroup中所有channel遍历
channelGroup.add(channel);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush(LocalDateTime.now().format(formatter) + ":" + "[客户端]" + channel.remoteAddress() + "离开了!\r\n");
}

/**
* 表示channel处于活动状态
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalDateTime.now().format(formatter) + ":" + ctx.channel().remoteAddress() + "已经上线~");
}

/**
* 表示channel处于非活动状态
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(LocalDateTime.now().format(formatter) + ":" + ctx.channel().remoteAddress() + "已经离线~");
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(ch -> {
if (ch != channel) {
ch.writeAndFlush(LocalDateTime.now().format(formatter) + ":" + "[客户]" + channel.remoteAddress() + ":" + msg + "\r\n");
} else {
ch.writeAndFlush(LocalDateTime.now().format(formatter) + ":" + "[自己]" + channel.remoteAddress() + ":" + msg + "\r\n");
}
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
// cause.printStackTrace();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class GroupChatClient {
private final String host;
private final int port;

public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}

public void run() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new StringDecoder())
.addLast("encoder", new StringEncoder())
.addLast(new GroupChatClientHandler());
}
});

ChannelFuture future = bootstrap.connect(host, port).sync();
Channel channel = future.channel();
System.out.println("-------" + channel.localAddress() + "-------");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()){
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\r\n");
}
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatClient("127.0.0.1", 9999).run();
}
}
1
2
3
4
5
6
7
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}

心跳检测机制

心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。

案例

服务器通过IdleStateHandler根据读写情况定时发送心跳检测包来判断客户端是否存活

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 在bossGroup增加日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// IdleStateHandler是Netty提供的处理空闲状态的处理器
// 参数: readerIdleTime: 多久没有读就会发送一个心跳检测包检测是否连接
// writerIdleTime: 多久没有写就会发送一个心跳检测包检测是否连接
// alldleTime: 多久没有读写就会发送一个心跳检测包检测是否连接
// TimeUnit: 时间单位
// 当IdleStateEvent触发后,就会传递给管道的下一个handler去处理,通过调用(触发)下一个handler的userEventTriggered
// 在该方法中处理IdleStateEvent(读空闲、写空闲或读写空闲)
ch.pipeline().addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS))
.addLast(new ChannelInboundHandlerAdapter() { // 加一个空闲检测进一步处理handler
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
String eventType = null;
switch (idleStateEvent.state()){
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
default:
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超时事件:" + eventType);
// 服务器做相应处理
}
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}

WebSocket长连接

  • HTTP协议是无状态的,浏览器和服务器间请求响应一次,下次会重新创建连接

  • WebSocket是一种在单个TCP连接上进行全双工通信的协议,实现了浏览器与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯的目的

  • WebSocket可以改变Http协议多次请求的约束,实现长连接,使得服务器可以发送消息给浏览器

  • 客户端和浏览器也可以相互感知

案例

需求:

  1. 实现基于WebSocket的长连接的全双工交互
  2. 浏览器可以知道服务器关闭,服务器也能知道浏览器断开连接
  3. 浏览器可以向服务器发送消息,服务器也能向浏览器发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec(),
new ChunkedWriteHandler(),// 以块方式写,添加ChunkedWriteHandler
// HTTP数据在传输过程中是分段的,HttpObjectAggregator就是可以将多个段聚合
new HttpObjectAggregator(8192),
// 对于WebSocket,数据是以帧的形式传递
// 可以看到WebSocketFrame下有6个子类
// 浏览器请求形式: ws://localhost:9999/xxx 表示请求uri
// WebSocketServerProtocolHandler:核心功能是将HTTP升级为ws协议,保持长连接
// 通过状态码101切换的
new WebSocketServerProtocolHandler("/xxx"),
// TextWebSocketFrame是WebSocketFrame的子类,表示一个文本帧
new SimpleChannelInboundHandler<TextWebSocketFrame>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息" + msg.text());
// 回复浏览器
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now() + " 消息:" + msg.text()));
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// id:表示唯一的值,LongText是唯一的,ShortText不是唯一
System.out.println("已连接:" + ctx.channel().id().asLongText());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("已断开:" + ctx.channel().id().asLongText());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生:" + cause.getMessage());
ctx.close();
}
}
);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
let socket;
if (window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1:9999/xxx")
// 相当于channelRead0, ev:服务器端回送的消息
socket.onmessage = function (ev) {
let rt = document.getElementById("responseText");
rt.value = rt.value + '\n' + ev.data;
}
// 连接开启
socket.onopen = function (ev) {
let rt = document.getElementById("responseText");
rt.value = rt.value + "连接已开启...";
}
// 连接关闭
socket.onclose = function (ev) {
let rt = document.getElementById("responseText");
rt.value = rt.value + "连接已关闭...";
}

// 发送消息到服务器
function send(msg) {
if (window.WebSocket) {
if(socket.readyState === WebSocket.OPEN){
socket.send(msg);
}else{
alert("连接未开启")
}
}
}
} else {
alert("浏览器不支持WebSocket")
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="发送" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>

Google Protobuf

Netty的编码解码机制

  • Netty本身提供了一些codec(编解码器),如:
    • 编码器:
      • StringEncoder:对字符串数据进行编码
      • ObjectEncoder:对Java对象进行编码
      • ...
    • 解码器
      • StringDecoder:对字符串进行解码
      • ObjectDecoder:对Java对象进行解码
      • ...
  • Netty本身自带的ObjectDecoder和ObjectEncoder可以用来实现POJO对象或各种业务对象的编码和解码,底层使用的仍是Java序列化技术,而Java序列化技术本身效率就不高,存在如下问题:
    • 无法跨语言
    • 序列化后体积太大,是二进制编码的5倍多
    • 序列化性能太低

Protobuf

  • Protobuf是Google发布的开源项目,全称是Google Protocal Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。很适合做数据存储或RPC数据交换格式。目前很多公司就在从http+json向tcp+protobuf转型
  • Protobuf是以message的方式来管理数据的
  • 支持跨平台、跨语言
  • 高性能、高可靠性
  • 使用protobuf编译器能自动生成代码,Protobuf是将类的定义使用.proto文件进行描述,然后通过protoc.ext编译器根据.proto自动生成.java文件

案例

需求:

  1. 客户端随机发送Student对象或Worker对象给服务器(通过Protobuf编码)
  2. 服务器接收对象后显示对应的信息(通过Protobuf解码)

流程:

  1. 引入依赖

    1
    2
    3
    4
    5
    6
    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
    <dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.21.7</version>
    </dependency>
  2. 编写proto文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    syntax = "proto3"; //协议版本
    option optimize_for = SPEED; // 加快解析
    option java_package = "com.zephon.netty_learn.protobuf"; // 指定包
    option java_outer_classname = "MyDataInfo"; //外部类名

    // protobuf可以使用message管理其它message
    message MyMessage{
    // 定义一个枚举类型
    enum DataType{
    StudentType=0; // 在proto3 要求enum的编号从0开始
    WorkerType=1;
    }
    DataType data_type=1; // 用data_type来标识传的是哪个枚举类型
    // 表示每次枚举类型最多只能出现其中的一个
    oneof dataBody{
    Student student=2;
    Worker worker=3;
    }
    }

    // protobuf使用message管理数据
    message Student{ // 会在StudentPOJO外部类生成一个内部类Student,作为真正的发送对象
    int32 id = 1; // int32(protobuf类型) -> int(java类型), 1表示属性序号,不是值
    string name = 2; // string(protobuf类型) -> String(java类型)
    }

    message Worker{
    string name=1;
    int32 age=2;
    }

  3. 编译proto文件,通过maven配置插件,然后使用maven编译

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    <build>
    <extensions>
    <extension>
    <groupId>kr.motd.maven</groupId>
    <artifactId>os-maven-plugin</artifactId>
    <version>1.4.1.Final</version>
    </extension>
    </extensions>
    <plugins>
    <!-- Protobuf插件 -->
    <plugin>
    <groupId>org.xolstice.maven.plugins</groupId>
    <artifactId>protobuf-maven-plugin</artifactId>
    <version>0.5.1</version>
    <configuration>
    <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
    <protocArtifact>
    com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}
    </protocArtifact>
    </configuration>
    <executions>
    <execution>
    <goals>
    <goal>compile</goal>
    </goals>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>
  4. 编写服务器端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    public class ProServer {
    public static void main(String[] args) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(8);

    try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(
    new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()),
    new SimpleChannelInboundHandler<MyDataInfo.MyMessage>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
    MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
    if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
    MyDataInfo.Student student = msg.getStudent();
    System.out.println("客户端发送Student: id=" + student.getId() + ", name=" + student.getName());
    } else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType){
    MyDataInfo.Worker worker = msg.getWorker();
    System.out.println("客户端发送Worker: name=" + worker.getName() + ", age=" + worker.getAge());
    } else{
    System.out.println("传输类型不正确");
    }
    }
    });
    }
    });
    ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
    channelFuture.channel().closeFuture().sync();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }

    }
    }
  5. 编写客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    public class ProClient {
    public static void main(String[] args) throws InterruptedException {
    // 客户端只需要一个事件循环组
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    try {
    // 客户端启动对象
    Bootstrap bootstrap = new Bootstrap();

    bootstrap.group(eventLoopGroup)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline().addLast(
    new ProtobufEncoder(),
    new ChannelInboundHandlerAdapter() {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 随机发送Student或Worker对象
    int r = new Random().nextInt(3);
    MyDataInfo.MyMessage myMessage = null;
    if (0 == r) {
    myMessage = MyDataInfo.MyMessage.newBuilder()
    .setDataType(MyDataInfo.MyMessage.DataType.StudentType)
    .setStudent(MyDataInfo.Student.newBuilder().setId(3).setName("张三").build()).build();
    } else {
    myMessage = MyDataInfo.MyMessage.newBuilder()
    .setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
    .setWorker(MyDataInfo.Worker.newBuilder().setName("李四").setAge(20).build()).build();
    }
    System.out.println(myMessage);
    ctx.writeAndFlush(myMessage);
    }
    });
    }
    });
    System.out.println("客户端已准备好");
    // 客户端连接服务器端
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
    channelFuture.channel().closeFuture().sync();
    } finally {
    eventLoopGroup.shutdownGracefully();
    }
    }
    }

Netty编解码器和Handler的调用机制

入站与出站

  • Netty的主要组件有:Channel、EventLoop、ChannelFuture、ChannelHandler和ChannelPipe等
  • ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。如:实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的
  • ChannelPipeline提供了ChannelHandler链的容器,以客户端应用程序为例,如果事件运动方向是从客户端到服务端,则是出站,反之为入站

编码与解码

  • 当Netty发送或接收一个消息时,就会发生一次数据转换,入站消息会被解码:从字节转换为另一种格式(如Java对象),如果是出站消息,就会被编码成字节
  • Netty提供的一系列实用的编解码器,都实现了ChannelInboundHandlerChannelOutboudHandler接口。在这些类中,channelRead方法已经被重写了,以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用,随后将调用由解码器所提供的decode()方法进行解码,并将解码的字节转发给ChannelPipeline的下一个ChannelInboundHandler

自定义编解码器案例

  1. 客户端发送long类型数据到服务器
  2. 服务器接收long类型数据并显示

客户端与编码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Client {
public static void main(String[] args) throws InterruptedException {
// 客户端只需要一个事件循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 客户端启动对象
Bootstrap bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(
new MessageToByteEncoder<Long>() {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Long aLong, ByteBuf byteBuf) throws Exception {
System.out.println("encode被调用");
System.out.println("msg=" + aLong);
byteBuf.writeLong(aLong);
}
},
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(123456L);
}
});
}
});
System.out.println("客户端已准备好");
// 客户端连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}

}

服务端与解码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 需要判断有8个字节才能读取一个long
while (byteBuf.readableBytes() >= 8) {
list.add(byteBuf.readLong());
}
}
},
new SimpleChannelInboundHandler<Long>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端读取long:" + msg);
}
}
);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}

Netty其它常用编解码器

  • public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder:ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,不需要调用readableBytes()方法,参数S指定了用户状态管理的类型,其中Void代表不需要状态管理
    • 存在局限性:
      • 并不是所有的ByteBuf操作都支持,如果调用了不被支持的方法,就会抛出异常
      • 在某些情况下可能稍慢于ByteToMessageDecoder,如网络缓慢并且消息格式复杂时,消息会被拆成多个碎片,速度变慢
  • public class LineBasedFrameDecoder extends ByteToMessageDecoder:这个类在Netty内部也有使用,使用行尾控制字符(\r)作为分割符来解析数据
  • public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder:使用自定义的特殊字符作为消息的分隔符
  • public abstract class HttpObjectDecoder extends ByteToMessageDecoder:一个HTTP数据的解码器
  • public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder:通过指定长度来标识整包消息,这样就可以自动处理粘包和半包消息
  • ...

TCP粘包和拆包及解决方案

  • TCP是面向连接、面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的Socket,因此,发送端为了将多个发送给接收端的包更有效的发送,使用了优化算法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样虽然提高了效率,但接收端就很难分辨出完整的数据包,因为面向流的通信是无消息保护边界的
  • 由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是所谓的粘包、拆包问题
  • 具体地,假设客户端发送了两个数据包D1和D2,则:
    • 如果服务端分两次读取两个独立的数据包,分别是D1和D2,则没有粘包和拆包
    • 如果服务端一次接收到了两个数据包,D1和D2粘合在一起,则称为TCP粘包
    • 如果服务端分两次读取到数据包,第一次读取到了完成的D1和部分D2,第二次读取到了D2包的剩余内容,则称为TCP拆包
    • 如果服务端分两次读取到了数据包,第一次读取到了D1包的部分内容,第干净读取到了D1的剩余部分同窗和完成的D2包

TCP粘包和拆包案例

服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Server {
public static void main(String[] args) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new SimpleChannelInboundHandler<ByteBuf>() {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
// 将buffer转成字符串
String s = new String(buffer, StandardCharsets.UTF_8);
System.out.println("服务器端接收到数据:" + s);
System.out.println("服务器端接收到消息量:" + ++count);

// 客户端回复
ctx.writeAndFlush(Unpooled.copiedBuffer(UUID.randomUUID().toString(), StandardCharsets.UTF_8));
}
}
);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Client {
public static void main(String[] args) throws InterruptedException {
// 客户端只需要一个事件循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 客户端启动对象
Bootstrap bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(
new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 使用客户端发送10条数据
for(int i=0;i<10;i++){
ByteBuf buffer = Unpooled.copiedBuffer(" hello ,server" + i, StandardCharsets.UTF_8);
ctx.writeAndFlush(buffer);
}
}

@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
}
});
}
});
System.out.println("客户端已准备好");
// 客户端连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}

}

输出:

服务器端接收到数据: hello ,server0 服务器端接收到消息量:1 服务器端接收到数据: hello ,server1 服务器端接收到消息量:2 服务器端接收到数据: hello ,server2 hello ,server3 服务器端接收到消息量:3 服务器端接收到数据: hello ,server4 hello ,server5 hello ,server6 服务器端接收到消息量:4 服务器端接收到数据: hello ,server7 hello ,server8 hello ,server9 服务器端接收到消息量:5

解决

  1. 使用自定义协议+编解码器来解决
  2. 关键就是要解决服务器端每次读取数据长度的问题

自定义协议包:

1
2
3
4
5
6
@Data
@AllArgsConstructer
public class MessageProtocol {
private int len;
private byte[] content;
}

自定义编码器:

1
2
3
4
5
6
7
8
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception {
System.out.println("调用encode方法");
byteBuf.writeInt(messageProtocol.getLen());
byteBuf.writeBytes(messageProtocol.getContent());
}
}

自定义解码器:

1
2
3
4
5
6
7
8
9
10
11
public class MyMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
System.out.println("调用decode");
int len = byteBuf.readInt();
byte[] content = new byte[len];
byteBuf.readBytes(content);
MessageProtocol messageProtocol = new MessageProtocol(len, content);
list.add(messageProtocol);
}
}

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

public class Server {
public static void main(String[] args) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new MyMessageEncoder(),
new MyMessageDecoder(),
new SimpleChannelInboundHandler<MessageProtocol>() {
private int count;

@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("服务端接收到信息:");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, StandardCharsets.UTF_8));
System.out.println("服务器接收到消息次数: " + ++count);
// 回复
byte[] response = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
ctx.writeAndFlush(new MessageProtocol(response.length, response));
}
}
);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Client {
public static void main(String[] args) throws InterruptedException {
// 客户端只需要一个事件循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 客户端启动对象
Bootstrap bootstrap = new Bootstrap();

bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(
new MyMessageEncoder(),
new MyMessageDecoder(),
new SimpleChannelInboundHandler<MessageProtocol>() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 使用客户端发送10条数据
for (int i = 0; i < 5; i++) {
byte[] content = "醉卧沙场君莫笑,古来征战几人回".getBytes(StandardCharsets.UTF_8);
MessageProtocol messageProtocol = new MessageProtocol(content.length, content);
ctx.writeAndFlush(messageProtocol);
}
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
System.out.println("客户端收到:");
System.out.println(new String(msg.getContent(), StandardCharsets.UTF_8));
}
});
}
});
System.out.println("客户端已准备好");
// 客户端连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}

}

输出:

调用decode 服务端接收到信息: 长度=45 内容=醉卧沙场君莫笑,古来征战几人回 服务器接收到消息次数: 1 调用encode方法 调用decode 服务端接收到信息: 长度=45 内容=醉卧沙场君莫笑,古来征战几人回 服务器接收到消息次数: 2 调用encode方法 调用decode 服务端接收到信息: 长度=45 内容=醉卧沙场君莫笑,古来征战几人回 服务器接收到消息次数: 3 调用encode方法 调用decode 服务端接收到信息: 长度=45 内容=醉卧沙场君莫笑,古来征战几人回 服务器接收到消息次数: 4 调用encode方法 调用decode 服务端接收到信息: 长度=45 内容=醉卧沙场君莫笑,古来征战几人回 服务器接收到消息次数: 5 调用encode方法