0%

Netty-异步模型

Netty-异步模型

基本介绍

  1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
  2. Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture
  3. 调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或通过通知机制获得IO操作结果
  4. Netty的异步模型是建立在future和callback之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)

Future说明

  1. 表示异步的执行结果,可以通过它提供的方法来检查执行是否完成,如检索计算等
  2. ChannelFuture是一个接口,可以添加监听器,当监听的事件发生时,就会通知监听器

工作原理示意图

说明:

  1. 在使用Netty进行编程时,拦截操作和转换出入站数据只需要提供callback或利用future即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码
  2. Netty框架的目标就是让业务逻辑从网络基础应用编码中分离出来、解脱出来

Future-Listener机制

  1. 当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作

  2. 常见有如下操作

    • 通过isDone方法来判断当前操作是否完成
    • 通过isSuccess方法来判断已完成的当前操作是否成功
    • 通过getCause方法来获取已完成的当前操作失败的原因
    • 通过isCancelled方法来判断已完成的当前操作是否被取消
    • 通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果Future对象已完成,则通知指定的监听器
  3. 举例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // 给channelFuture注册监听器,监控关心的事件
    channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
    if(channelFuture.isSuccess()){
    System.out.println("监听端口 6688 成功");
    }else{
    System.out.println("监听商品 6688 失败");
    }
    }
    });

小结:

相比传统阻塞I/O,执行I/O操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量

HTTP服务实例

需求:

  1. 监听6668端口
  2. 浏览器发出请求"http://localhost:6668",服务器可以回复消息给客户端"hello,我是服务器",并对特定请求资源进行过滤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package com.zephon.netty.netty.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;

import java.net.URI;

class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器

// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个netty提供的httpServerCodec codec-> coder + decoder
// HttpServerCodec 说明:
// 1.HttpServerCodec 是netty提供的处理http的编-解码器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
// 2.增加一个自定义的处理器
pipeline.addLast(new TestHttpServerHandler());
}
}

/**
* 说明:
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter 的子类
* 2. HttpObject 客户端和服务器端相互通讯的数据被封装成HttpObject
*/
class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 读取客户端数据
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception {
// 判断msg是否是httprequest请求
if (msg instanceof HttpRequest) {
System.out.println("msg 类型:" + msg.getClass());
System.out.println("客户端地址:" + channelHandlerContext.channel().remoteAddress());

HttpRequest request = (HttpRequest) msg;
// 获取uri,过滤指定资源
URI uri = new URI(request.uri());
if("/favicon.ico".equals(uri.getPath())){
System.out.println("请求了favicon.ico,不做响应");
return;
}

// 回复信息给浏览器(满足http协议)
ByteBuf buf = Unpooled.copiedBuffer("hello,我是服务器", CharsetUtil.UTF_8);
// 构造一个Http响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
buf);
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,buf.readableBytes());
// 将构建好的response返回
channelHandlerContext.writeAndFlush(response);

}
}
}

public class TestServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = b.bind(8088).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}