Netty-应用实例
实例-群聊系统
需求:
- 编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
- 实现多个群聊
- 服务器端:可以监测用户上线、离线,并实现消息转发功能
- 客户端:通过channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class Server {
private int port;
public Server(int port){
this.port = port;
}
/**
* 处理客户端请求
*/
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加解码器
pipeline.addLast("decoder",new StringDecoder());
// 添加编码器
pipeline.addLast("encoder",new StringEncoder());
// 添加业务处理器
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture channelFuture = b.bind(port).sync();
// 监听关闭事件
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Server(7000).run();
}
}
class GroupChatServerHandler extends SimpleChannelInboundHandler<String>{
// 定义一个channel组,管理所有的channel
// GlobalEventExecutor.INSTANCE:是全局的事件执行器,是一个单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 获取到当前channel
Channel channel = ctx.channel();
// 遍历channelGroup,根据不同的情况,回送不同消息
channelGroup.forEach(ch -> {
if(channel != ch){
ch.writeAndFlush(sdf.format(new Date()) + " [客户端]" + channel.remoteAddress() +"发送了消息:" + msg + "\n");
}else{
// 回显自己发送的消息
ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
}
});
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭
ctx.close();
}
/**
* 表示连接建立后,执行
* @param ctx
* @throws Exception
*/
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 将消息推送给客户端
// 该方法会加将channelGroup中所有的channel遍历,并发送消息,不需要自己遍历
channelGroup.writeAndFlush(sdf.format(new Date()) + " [客户端]"+ctx.channel().remoteAddress()+"加入聊天\n");
// 将当前channel加入到channelGroup
channelGroup.add(ctx.channel());
}
/**
* 表示连接断开后,执行
* @param ctx
* @throws Exception
*/
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 将消息推送给客户端
// 该方法会加将channelGroup中所有的channel遍历,并发送消息,不需要自己遍历
channelGroup.writeAndFlush(sdf.format(new Date()) + " [客户端]"+ctx.channel().remoteAddress()+"离开了\n");
// 不需要手动从channelGroup中移除channel
}
/**
* 表示channel处于活动状态,提示xx上线
* @param ctx
* @throws Exception
*/
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了");
}
/**
* 表示channel处于非活动状态,提示xx离线
* @param ctx
* @throws Exception
*/
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 离线了");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54public class Client {
private final String host;
private final int port;
public Client(String host,int port){
this.host = host;
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
// 得到pipeline
ChannelPipeline pipeline = ch.pipeline();
// 加入相关handler
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = b.connect(host, port).sync();
// 得到channel
Channel channel = channelFuture.channel();
System.out.println("-----" + channel.remoteAddress() + "-----");
// 客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String msg = scanner.nextLine();
// 通过channel 发送到服务器端
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Client("127.0.0.1",7000).run();
}
}
class GroupChatClientHandler extends SimpleChannelInboundHandler<String>{
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
实例-心跳检测机制
需求:
- 编写Netty心跳检测机制案例,当服务器超过3秒没有读时,就提示读空闲
- 当服务器超过5秒没有写操作时,就提示写空闲
- 当服务器超过7秒没有读或写操作时,就提示读写空闲
1 | public class MyServer { |
实例-通过WebSocket编程实现服务器和客户端长连接
需求:
- Http协议是无状态的,浏览器和服务器的请求响应一次,下一次会重新创建连接
- 要求:实现基于webSocket的长连接的全双工的交互
- 改变Http协议多次请求的约束,实现长连接了,服务器可以发送消息给浏览器
- 客户端浏览器和服务器端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知
1 | public class MyServer { |
1 |
|