Netty-GoogleProtobuf
编码和解码的基本介绍
- 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
- codec(编解码器)的组成部分有两个:decoder(解码器)和encoder(编码器)。encoder负责把业务数据转换成字节码数据,decoder负责把字节码数据转换成业务数据
Netty本身的编码解码的机制和问题分析
- Netty自身提供了一些codec(编解码器)
- Netty提供的编码器
- SringEncoder:对字符串数据进行编码
- ObjectEncoder:对Java对象进行编码
- ...
- Netty提供的解码器
- StringDecoder:对字符串数据进行解码
- ObjectDecoder:对Java对象进行解码
- ...
- Netty本身自带的ObjectDecoder和ObjectEncoder可以用来实现POJO对象或各种业务对象的编码和解码,底层使用的仍是Java序列化技术,而Java序列化技术本身效率就不高,存在如下问题:
- 无法跨语言
- 序列化后的体积太大,是二进制编码的5倍多
- 序列化性能太低
解决方案:[Google的Protobuf]
Protobuf
Protobuf基本介绍和使用
Protobuf是Google发布的开源项目,全称Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或RPC(远程过程调用)数据交换格式
目前很多公司 http+json -> tcp+protobuf
参考文档https://developers.google.com/protocol-buffers/docs/proto
Protobuf是以message的方式来管理数据的
支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的](支持目前绝大多数语言,如C++、C#、Java、Python等)
高性能,高可靠性
使用protobuf编译器自动生成代码,Protobuf是将类的定义使用.proto文件进行描述。说明:在idea中编写.proto文件时,会自动提示是否下载.proto编写插件,可以让语法高亮
然后通过protoc.exe编译器根据.proto自动生成.java文件
入门实例1
需求:
- 客户端可以发送一个 Student PoJo对象到服务器(通过Protobuf编码)
- 服务端能接收Student PoJo对象,并显示信息(通过Protobuf解码)
引入protobuf
1
2
3
4
5
6
7<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.13.0</version>
</dependency>编写proto文件
1
2
3
4
5
6
7syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 生成的外部类名,同时也是文件名
// protobuf 使用message管理数据
message Student{ // 会在StudentPOJO外部类生成一个内部类Student,它是真正发送的POJO对象
int32 id = 1; //Student类中有一个属性,名称为id,类型为int32
string name = 2;
}cmd运行
proto.exe --java_out. Student.proto
或Linux下./protoc --java_out=. Student.proto
编写服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加解码器
// 指定对哪种对象进行解码
pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("...server is ready...");
ChannelFuture channelFuture = b.bind(6668).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class NettyServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读取从客户端发送的StudentPOJO.Student
StudentPOJO.Student student = (StudentPOJO.Student) msg;
System.out.println("客户端发送的数据 id="+student.getId() + ",name="+student.getName());
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}编写客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class Client {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加编码器
pipeline.addLast("encoder",new ProtobufEncoder());
// 添加自己的处理器
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter{
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 发送一个 Student对象到服务器
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("Tom").build();
ctx.writeAndFlush(student);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
入门实例2
需求:
- 客户端可以随机发送Student PoJo/Worker PoJo对象到服务器(通过Protobuf编码)
- 服务端能接收Student PoJo/Workder PoJo对象(需要判断是哪种类型),并显示信息(通过Protobuf解码)
编写proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31syntax = "proto3"; // 版本
option optimize_for = SPEED; // 加快解析
option java_package = "com.zephon.netty.netty.codec2";//指定生成到哪个包下
option java_outer_classname = "MyDataInfo"; // 生成的外部类名,同时也是文件名
// protobuf 可以使用message 管理其它message
message MyMessage{
// 定义一个枚举类型
enum DataType{
StudentType = 0; // 在proto3 要求enum的编号从0开始
WorkerType = 1;
}
// 用data_type来标识传的是哪个枚举类型
DataType data_type = 1;
// 表示每次枚举类型最多只出现其中的一个,节省空间
oneof dataBody{
Student student = 2;
Worker worker = 3;
}
}
// protobuf 使用message管理数据
message Student{ // 会在StudentPOJO外部类生成一个内部类Student,它是真正发送的POJO对象
int32 id = 1; //Student类中有一个属性,名称为id,类型为int32
string name = 2;
}
message Worker{
string name = 1;
int32 age = 2;
}编写服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加解码器
// 指定对哪种对象进行解码
pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("...server is ready...");
ChannelFuture channelFuture = b.bind(6668).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class NettyServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读取从客户端发送的StudentPOJO.Student
MyDataInfo.MyMessage message = (MyDataInfo.MyMessage) msg;
MyDataInfo.MyMessage.DataType dataType = message.getDataType();
if(dataType == MyDataInfo.MyMessage.DataType.StudentType){
MyDataInfo.Student student = message.getStudent();
System.out.println("student id="+student.getId()+",name="+student.getName());
}else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType){
MyDataInfo.Worker worker = message.getWorker();
System.out.println("worker name="+worker.getName()+",age="+worker.getAge());
}else{
System.out.println("传输类型错误");
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}编写客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Client {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加编码器
pipeline.addLast("encoder", new ProtobufEncoder());
// 添加自己的处理器
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 随机发送Student或worker对象
int random = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if (0 == random) {
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.StudentType)
.setStudent(MyDataInfo.Student.newBuilder().setId(1).setName("Tom").build()).build();
} else {
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
.setWorker(MyDataInfo.Worker.newBuilder().setName("Jake").setAge(22).build()).build();
}
ctx.writeAndFlush(myMessage);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}