0%

Netty-应用实例

Netty-应用实例

实例-群聊系统

需求:

  1. 编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  2. 实现多个群聊
  3. 服务器端:可以监测用户上线、离线,并实现消息转发功能
  4. 客户端:通过channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

public class Server {
private int port;
public Server(int port){
this.port = port;
}

/**
* 处理客户端请求
*/
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加解码器
pipeline.addLast("decoder",new StringDecoder());
// 添加编码器
pipeline.addLast("encoder",new StringEncoder());
// 添加业务处理器
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture channelFuture = b.bind(port).sync();
// 监听关闭事件
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
public static void main(String[] args) throws InterruptedException {
new Server(7000).run();
}
}
class GroupChatServerHandler extends SimpleChannelInboundHandler<String>{
// 定义一个channel组,管理所有的channel
// GlobalEventExecutor.INSTANCE:是全局的事件执行器,是一个单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 获取到当前channel
Channel channel = ctx.channel();
// 遍历channelGroup,根据不同的情况,回送不同消息
channelGroup.forEach(ch -> {
if(channel != ch){
ch.writeAndFlush(sdf.format(new Date()) + " [客户端]" + channel.remoteAddress() +"发送了消息:" + msg + "\n");
}else{
// 回显自己发送的消息
ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
}
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭
ctx.close();
}

/**
* 表示连接建立后,执行
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 将消息推送给客户端
// 该方法会加将channelGroup中所有的channel遍历,并发送消息,不需要自己遍历
channelGroup.writeAndFlush(sdf.format(new Date()) + " [客户端]"+ctx.channel().remoteAddress()+"加入聊天\n");
// 将当前channel加入到channelGroup
channelGroup.add(ctx.channel());
}

/**
* 表示连接断开后,执行
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 将消息推送给客户端
// 该方法会加将channelGroup中所有的channel遍历,并发送消息,不需要自己遍历
channelGroup.writeAndFlush(sdf.format(new Date()) + " [客户端]"+ctx.channel().remoteAddress()+"离开了\n");
// 不需要手动从channelGroup中移除channel
}

/**
* 表示channel处于活动状态,提示xx上线
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了");
}

/**
* 表示channel处于非活动状态,提示xx离线
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 离线了");
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Client {
private final String host;
private final int port;

public Client(String host,int port){
this.host = host;
this.port = port;
}

public void run() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 得到pipeline
ChannelPipeline pipeline = ch.pipeline();
// 加入相关handler
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = b.connect(host, port).sync();
// 得到channel
Channel channel = channelFuture.channel();
System.out.println("-----" + channel.remoteAddress() + "-----");
// 客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String msg = scanner.nextLine();
// 通过channel 发送到服务器端
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}


public static void main(String[] args) throws InterruptedException {
new Client("127.0.0.1",7000).run();
}
}
class GroupChatClientHandler extends SimpleChannelInboundHandler<String>{

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}

实例-心跳检测机制

需求:

  1. 编写Netty心跳检测机制案例,当服务器超过3秒没有读时,就提示读空闲
  2. 当服务器超过5秒没有写操作时,就提示写空闲
  3. 当服务器超过7秒没有读或写操作时,就提示读写空闲
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 加入一个netty提供的IdleStateHandler
// 说明:
// 1:IdleStateHandler 是netty提供的处理空闲状态的处理器
// 2. IdleStateHandler(
// long readerIdleTime, long writerIdleTime, long allIdleTime,
// TimeUnit unit)
// readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包,检测还是连接状态
// writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包,检测是否连接
// allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包,检测是否连接
// 3. 文档说明:当channel没有执行read、write或read和write,就会触发
// 4. 当IdleStateEvent 触发后,就会传递给pipeline的下一个handler处理
// 通过调用(触发)下一个handler的userEventTriggered,在该方法中去处理IdleStateEvent(读空闲、写空闲、读写空闲)
pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));

// 加入一个对空闲检测进一步处理的自定义handler
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

class MyServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "-- 超时事件 --"+eventType);

}
}
}

实例-通过WebSocket编程实现服务器和客户端长连接

需求:

  1. Http协议是无状态的,浏览器和服务器的请求响应一次,下一次会重新创建连接
  2. 要求:实现基于webSocket的长连接的全双工的交互
  3. 改变Http协议多次请求的约束,实现长连接了,服务器可以发送消息给浏览器
  4. 客户端浏览器和服务器端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 基于HTTP协议,使用HTTP编码解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加chunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段,HttpObjectAggregator 就是可以将多个段聚合
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/**
* 说明:
* 1. 对于websocket,数据是以帧的形式传递的
* 2. 可以看到WebSocketFrame下面有6个子类
* 3. 浏览器请求时,形式:ws://localhost:8888/xxx 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是将http协议升级为ws协议,保持长连接
* 5. 协议切换是通过101状态码实现
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 自定义handler
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = b.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
}
}

/**
* TextWebSocketFrame:表示一个文本帧
*/
class MyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息:"+msg.text());
// 回复浏览器
ctx.writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now() + " " + msg.text()));
}

/**
* 当web客户端连接后,触发方法
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// id:表示唯一的值,有LongText(唯一)和ShortText(不唯一)两种形式
System.out.println("handlerAdded被调用 " + ctx.channel().id().asLongText());
System.out.println("handlerAdded被调用 " + ctx.channel().id().asShortText());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved被调用 " + ctx.channel().id().asLongText());
System.out.println("handlerRemoved被调用 " + ctx.channel().id().asShortText());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 "+cause.getMessage());
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
// 判断当前浏览器是否支持websocket
if ( window.WebSocket){
socket = new WebSocket("ws://localhost:8888/hello")
// ev 收到服务器端回送的消息
socket.onmessage = ev => {
let resText = document.getElementById('resText');
resText.value = resText.value + '\n' + ev.data
}
// 连接开启(感知连接开启)
socket.onopen = ev => {
let resText = document.getElementById('resText');
resText.value = "连接开启.."
}
socket.onclose = ev => {
let resText = document.getElementById('resText');
resText.value = resText.value + "\n连接关闭.."
}
}else{
alert("当前浏览器不支持websocket")
}
// 发送消息到服务器
function send(message) {
if(!window.socket){
return;
}
if(socket.readyState == WebSocket.OPEN){
// 通过socket发送消息
socket.send(message);
}else{
alert("连接未开启")
}
}
</script>
<form onsubmit="return false">
<textarea id="message" style="height: 300px;width: 300px"></textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<textarea id="resText" style="height: 300px;width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('resText').value = ''">
</form>
</body>
</html>