Netty-异步模型
基本介绍
- 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
- Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture
- 调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或通过通知机制获得IO操作结果
- Netty的异步模型是建立在future和callback之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)
Future说明
- 表示异步的执行结果,可以通过它提供的方法来检查执行是否完成,如检索计算等
- ChannelFuture是一个接口,可以添加监听器,当监听的事件发生时,就会通知监听器
工作原理示意图

说明:
- 在使用Netty进行编程时,拦截操作和转换出入站数据只需要提供callback或利用future即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码
- Netty框架的目标就是让业务逻辑从网络基础应用编码中分离出来、解脱出来
Future-Listener机制
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作
常见有如下操作
- 通过isDone方法来判断当前操作是否完成
- 通过isSuccess方法来判断已完成的当前操作是否成功
- 通过getCause方法来获取已完成的当前操作失败的原因
- 通过isCancelled方法来判断已完成的当前操作是否被取消
- 通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果Future对象已完成,则通知指定的监听器
举例
1 2 3 4 5 6 7 8 9 10 11 12
| channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(channelFuture.isSuccess()){ System.out.println("监听端口 6688 成功"); }else{ System.out.println("监听商品 6688 失败"); } } });
|
小结:
相比传统阻塞I/O,执行I/O操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量
HTTP服务实例
需求:
- 监听6668端口
- 浏览器发出请求"http://localhost:6668",服务器可以回复消息给客户端"hello,我是服务器",并对特定请求资源进行过滤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| package com.zephon.netty.netty.http;
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.CharsetUtil;
import java.net.URI;
class TestServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("MyHttpServerCodec", new HttpServerCodec()); pipeline.addLast(new TestHttpServerHandler()); } }
class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { System.out.println("msg 类型:" + msg.getClass()); System.out.println("客户端地址:" + channelHandlerContext.channel().remoteAddress());
HttpRequest request = (HttpRequest) msg; URI uri = new URI(request.uri()); if("/favicon.ico".equals(uri.getPath())){ System.out.println("请求了favicon.ico,不做响应"); return; }
ByteBuf buf = Unpooled.copiedBuffer("hello,我是服务器", CharsetUtil.UTF_8); DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=utf-8"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH,buf.readableBytes()); channelHandlerContext.writeAndFlush(response);
} } }
public class TestServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new TestServerInitializer()); ChannelFuture channelFuture = b.bind(8088).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|