0%

Netty-GoogleProtobuf

Netty-GoogleProtobuf

编码和解码的基本介绍

  1. 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
  2. codec(编解码器)的组成部分有两个:decoder(解码器)和encoder(编码器)。encoder负责把业务数据转换成字节码数据,decoder负责把字节码数据转换成业务数据

Netty本身的编码解码的机制和问题分析

  1. Netty自身提供了一些codec(编解码器)
  2. Netty提供的编码器
    • SringEncoder:对字符串数据进行编码
    • ObjectEncoder:对Java对象进行编码
    • ...
  3. Netty提供的解码器
    • StringDecoder:对字符串数据进行解码
    • ObjectDecoder:对Java对象进行解码
    • ...
  4. Netty本身自带的ObjectDecoder和ObjectEncoder可以用来实现POJO对象或各种业务对象的编码和解码,底层使用的仍是Java序列化技术,而Java序列化技术本身效率就不高,存在如下问题:
    1. 无法跨语言
    2. 序列化后的体积太大,是二进制编码的5倍多
    3. 序列化性能太低

解决方案:[Google的Protobuf]

Protobuf

Protobuf基本介绍和使用

  1. Protobuf是Google发布的开源项目,全称Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或RPC(远程过程调用)数据交换格式

    目前很多公司 http+json -> tcp+protobuf

  2. 参考文档https://developers.google.com/protocol-buffers/docs/proto

  3. Protobuf是以message的方式来管理数据的

  4. 支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的](支持目前绝大多数语言,如C++、C#、Java、Python等)

  5. 高性能,高可靠性

  6. 使用protobuf编译器自动生成代码,Protobuf是将类的定义使用.proto文件进行描述。说明:在idea中编写.proto文件时,会自动提示是否下载.proto编写插件,可以让语法高亮

  7. 然后通过protoc.exe编译器根据.proto自动生成.java文件

入门实例1

需求:

  1. 客户端可以发送一个 Student PoJo对象到服务器(通过Protobuf编码)
  2. 服务端能接收Student PoJo对象,并显示信息(通过Protobuf解码)
  1. 引入protobuf

    1
    2
    3
    4
    5
    6
    7
    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
    <dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.13.0</version>
    </dependency>

  2. 编写proto文件

    1
    2
    3
    4
    5
    6
    7
    syntax = "proto3"; // 版本
    option java_outer_classname = "StudentPOJO"; // 生成的外部类名,同时也是文件名
    // protobuf 使用message管理数据
    message Student{ // 会在StudentPOJO外部类生成一个内部类Student,它是真正发送的POJO对象
    int32 id = 1; //Student类中有一个属性,名称为id,类型为int32
    string name = 2;
    }

    cmd运行proto.exe --java_out. Student.proto或Linux下./protoc --java_out=. Student.proto

  3. 编写服务端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    public class Server {
    public static void main(String[] args) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();
    // 添加解码器
    // 指定对哪种对象进行解码
    pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
    pipeline.addLast(new NettyServerHandler());
    }
    });
    System.out.println("...server is ready...");

    ChannelFuture channelFuture = b.bind(6668).sync();

    channelFuture.channel().closeFuture().sync();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }

    }
    }

    class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 读取从客户端发送的StudentPOJO.Student
    StudentPOJO.Student student = (StudentPOJO.Student) msg;
    System.out.println("客户端发送的数据 id="+student.getId() + ",name="+student.getName());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.channel().close();
    }
    }
  4. 编写客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class Client {
    public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
    try {
    Bootstrap bootstrap = new Bootstrap();
    // 设置相关参数
    bootstrap.group(eventExecutors)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();
    // 添加编码器
    pipeline.addLast("encoder",new ProtobufEncoder());
    // 添加自己的处理器
    pipeline.addLast(new NettyClientHandler());
    }
    });
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    channelFuture.channel().closeFuture().sync();
    } finally {
    eventExecutors.shutdownGracefully();
    }
    }
    }
    class NettyClientHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 发送一个 Student对象到服务器
    StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("Tom").build();
    ctx.writeAndFlush(student);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
    }
    }

入门实例2

需求:

  1. 客户端可以随机发送Student PoJo/Worker PoJo对象到服务器(通过Protobuf编码)
  2. 服务端能接收Student PoJo/Workder PoJo对象(需要判断是哪种类型),并显示信息(通过Protobuf解码)
  1. 编写proto

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    syntax = "proto3"; // 版本
    option optimize_for = SPEED; // 加快解析
    option java_package = "com.zephon.netty.netty.codec2";//指定生成到哪个包下
    option java_outer_classname = "MyDataInfo"; // 生成的外部类名,同时也是文件名

    // protobuf 可以使用message 管理其它message
    message MyMessage{
    // 定义一个枚举类型
    enum DataType{
    StudentType = 0; // 在proto3 要求enum的编号从0开始
    WorkerType = 1;
    }
    // 用data_type来标识传的是哪个枚举类型
    DataType data_type = 1;
    // 表示每次枚举类型最多只出现其中的一个,节省空间
    oneof dataBody{
    Student student = 2;
    Worker worker = 3;
    }
    }

    // protobuf 使用message管理数据
    message Student{ // 会在StudentPOJO外部类生成一个内部类Student,它是真正发送的POJO对象
    int32 id = 1; //Student类中有一个属性,名称为id,类型为int32
    string name = 2;
    }

    message Worker{
    string name = 1;
    int32 age = 2;
    }
  2. 编写服务端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59

    public class Server {
    public static void main(String[] args) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();
    // 添加解码器
    // 指定对哪种对象进行解码
    pipeline.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
    pipeline.addLast(new NettyServerHandler());
    }
    });
    System.out.println("...server is ready...");

    ChannelFuture channelFuture = b.bind(6668).sync();

    channelFuture.channel().closeFuture().sync();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }

    }
    }

    class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 读取从客户端发送的StudentPOJO.Student
    MyDataInfo.MyMessage message = (MyDataInfo.MyMessage) msg;
    MyDataInfo.MyMessage.DataType dataType = message.getDataType();
    if(dataType == MyDataInfo.MyMessage.DataType.StudentType){
    MyDataInfo.Student student = message.getStudent();
    System.out.println("student id="+student.getId()+",name="+student.getName());
    }else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType){
    MyDataInfo.Worker worker = message.getWorker();
    System.out.println("worker name="+worker.getName()+",age="+worker.getAge());
    }else{
    System.out.println("传输类型错误");
    }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.channel().close();
    }
    }
  3. 编写客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52

    public class Client {
    public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
    try {
    Bootstrap bootstrap = new Bootstrap();
    // 设置相关参数
    bootstrap.group(eventExecutors)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();
    // 添加编码器
    pipeline.addLast("encoder", new ProtobufEncoder());
    // 添加自己的处理器
    pipeline.addLast(new NettyClientHandler());
    }
    });
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    channelFuture.channel().closeFuture().sync();
    } finally {
    eventExecutors.shutdownGracefully();
    }
    }
    }

    class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 随机发送Student或worker对象
    int random = new Random().nextInt(3);
    MyDataInfo.MyMessage myMessage = null;
    if (0 == random) {
    myMessage = MyDataInfo.MyMessage.newBuilder()
    .setDataType(MyDataInfo.MyMessage.DataType.StudentType)
    .setStudent(MyDataInfo.Student.newBuilder().setId(1).setName("Tom").build()).build();
    } else {
    myMessage = MyDataInfo.MyMessage.newBuilder()
    .setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
    .setWorker(MyDataInfo.Worker.newBuilder().setName("Jake").setAge(22).build()).build();
    }
    ctx.writeAndFlush(myMessage);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
    }
    }