netty的编解码、粘包拆包问题、心跳检测机制原理

 2023-09-15 阅读 26 评论 0

摘要:文章目录1. 编码解码器2. 编解码序列化机制的性能优化3. Netty粘包拆包4. Netty心跳检测机制5. Netty断线自动重连实现 1. 编码解码器 当你通过netty发送或者接受一个消息的时候,就会发生一次数据转换,其中内容的发送和j接收会经历: 字符串/对象 =&

文章目录

    • 1. 编码解码器
    • 2. 编解码序列化机制的性能优化
    • 3. Netty粘包拆包
    • 4. Netty心跳检测机制
    • 5. Netty断线自动重连实现


1. 编码解码器

        当你通过netty发送或者接受一个消息的时候,就会发生一次数据转换,其中内容的发送和j接收会经历: 字符串/对象 ==> 字节数组 ==> 字符串/对象 的一个过程,被称为编码解码。 发送数据时,如果直接发送是发送不成功的,因为数据在计算机底层传输是二进制流的形式,如下代码发送数据时会发送失败。必须把字符串转化成字节数组。

 //直接发送字符串,会发送失败ChannelHandlerContext.writeAndFlush("HelloClient");

netty心跳检测。把字符串转化成字节数组的转化方式有多种方案

  1. 发送数据时,耦合的手动编码、解码
  2. 使用netty提供的编码、解码器,
  3. 自定义编码、解码器

第一种方案:手动编解码如下:

需要在代码中,耦合的调用Unpooled方法进行数据的编码、解码

  	// 发送数据时编码@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 发送数据时,需要调用netty的Unpooled类,把字符串放进ByteBuf 字节缓冲区中ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);// 发送数据ctx.writeAndFlush(buf);}
  	// 接收数据时解码@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBufferByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));}

上述代码耦合性太重,只适合测试或者个别数据的发送接收

        第二种方案:比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。他们都是基于JDK的序列化方式,对数据进行序列化。使用时直接加入ChannelPipeline链表中即可

 ChannelPipeline pipeline = socketChannel.pipeline();//向pipeline中加入字符串解码器pipeline.addLast("decoder",new StringDecoder());// 对象编码器//pipeline.addLast(new ObjectEncoder());  //向pipeline中加入字符串编码器pipeline.addLast("encoder",new StringEncoder());

tcp心跳检测、这些编码、解码器 分别实现了ChannelInboundHandler、ChannelOutboundHandler。分别对入站、出站的数据进行编码和解码。
在这里插入图片描述
在这里插入图片描述
        第三种方案自定义编解码器,在 下面粘包拆包中有体现,往下面看!


2. 编解码序列化机制的性能优化

        netty为我们提供的编码解码器底层是使用的JDK的序列化,相对于protobuf序列化JDK的就比较低效。但是protobuf需要维护大量的proto文件也比较麻烦,现在一般可以使用protostuff作为序列化机制。

        protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单,代码如下:

依赖:

<dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-api</artifactId><version>1.0.10</version>
</dependency>
<dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.0.10</version>
</dependency>
<dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.0.10</version>
</dependency>

netty tcp。protostuff工具类

 import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** protostuff 序列化工具类,基于protobuf封装*/
public class ProtostuffUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();private static <T> Schema<T> getSchema(Class<T> clazz) {@SuppressWarnings("unchecked")Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);if (schema == null) {schema = RuntimeSchema.getSchema(clazz);if (schema != null) {cachedSchema.put(clazz, schema);}}return schema;}/*** 序列化** @param obj* @return*/public static <T> byte[] serializer(T obj) {@SuppressWarnings("unchecked")Class<T> clazz = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(clazz);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/*** 反序列化** @param data* @param clazz* @return*/public static <T> T deserializer(byte[] data, Class<T> clazz) {try {T obj = clazz.newInstance();Schema<T> schema = getSchema(clazz);ProtostuffIOUtil.mergeFrom(data, obj, schema);return obj;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}//测试public static void main(String[] args) {byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhb"));User user = ProtostuffUtil.deserializer(userBytes, User.class);System.out.println(user);}
}

protostuff的使用

 // 发送数据时进行序列化@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//测试用protostuff的serializer方法对user对象编解码ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serializer(new User(1, "zhb")));//发送ctx.writeAndFlush(buf);}
 // 接收数据时进行反序列化@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//测试用protostuff的deserializer方法对user对象反序列化ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];System.out.println("从客户端读取到Object:" + ProtostuffUtil.deserializer(bytes, User.class));} 


3. Netty粘包拆包

        TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。

如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。
在这里插入图片描述
比如客户端发送了 java 、python 两个字符串

  1. 粘包问题:服务端接受到的是 javapython 一个字符串
  2. 拆包问题:服务端接受到的是 ja、vapython 两个字符串

netty 粘包。粘包拆包问题解决方案

  1. 规定消息长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格。缺点是耗费空间和资源
  2. 在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不能出现分隔符。缺点是制定的分隔符容易和数据内容撞车,导致数据错误!
  3. 发送数据+长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。

在开发中,通常使用的是第三种(发送数据+长度)的方式来解决粘包拆包问题。由于netty在数据编解码时没有提供同时处理 数据+长度 的编解码方案,所以通常是由自己实现的编解码器,并作为ChannelHandler加入到ChannelPipeline的责任链中。自定义编解码器核心代码如下所示:

自定义发送内容类(长度 + 数据):

//自定义数据包
public class MyMessageProtocol {//定义一次发送包体长度private int len;//一次发送包体内容,已转为字节数组private byte[] content;
}

自定义编码器

//自定义编码器,需要继承netty的MessageToByteEncoder类
public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx,MyMessageProtocol msg, ByteBuf out) throws Exception {System.out.println("MyMessageEncoder encode 方法被调用");//客户端发送数据长度 占用4个字节,服务端会先接受这4个字节out.writeInt(msg.getLen());//客户端发送数据内容out.writeBytes(msg.getContent());}
}

自定义解码器

//自定义解码器,需要继承netty的 ByteToMessageDecoder类
public class MyMessageDecoder extends ByteToMessageDecoder {//int length = 0;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println();System.out.println("MyMessageDecoder decode 被调用");//需要将得到二进制字节码-> MyMessageProtocol 数据包(对象)System.out.println(in);//如果ByteBuf可读的字节数组 >= 4if(in.readableBytes() >= 4) {if (length == 0){//读取内容长度length = in.readInt();}if (in.readableBytes() < length) {System.out.println("当前可读数据不够,继续等待。。");return;}byte[] content = new byte[length];//如果ByteBuf可读的字节数组长度 大于发送的内容长度,则读取内容,并封装对象if (in.readableBytes() >= length){in.readBytes(content);//封装成MyMessageProtocol对象,传递到下一个handler业务处理MyMessageProtocol messageProtocol = new MyMessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);//传递下一个handlerout.add(messageProtocol);}//重置数据长度length = 0;}}
}

tcp需要心跳?客户端 ChannelPipeline 中添加自定义编码器

    protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加自定义编码器pipeline.addLast(new MyMessageEncoder());// 添加业务处理器pipeline.addLast(new MyClientHandler());}================= 业务处理器MyClientHandler ==================public  class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {// 在与服务端连通后调用下面的方法,往服务端发送10次 “你好,我是张三!”@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for(int i = 0; i< 10; i++) {String msg = "你好,我是张三!";//创建协议包对象MyMessageProtocol messageProtocol = new MyMessageProtocol();// 字节长度messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);// 字节内容messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));ctx.writeAndFlush(messageProtocol);}}
}

服务端 ChannelPipeline 中添加自定义解码器

     @Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加自定义编码器pipeline.addLast(new MyMessageDecoder());// 添加业务处理器pipeline.addLast(new MyServerHandler());}================= 业务处理器MyServerHandler ==================public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {private int count;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {System.out.println("====服务端接收到消息如下====");System.out.println("长度=" + msg.getLen());System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8));System.out.println("服务端接收到消息包数量=" + (++this.count));}}

启动服务端和客户端后,控制台打印结果如下,粘包拆包问题得到解决!

在这里插入图片描述


4. Netty心跳检测机制

netty mqtt。        在TCP长连接中,客户端和服务器之间建立连接,并定期发送一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性,这就是心跳机制。作用是防止某一方由于网络等原因断开连接后,另一方不知道,还继续保持连接不释放,造成不必要的资源浪费。

        netty底层已经实现了客户端与服务端的心跳检测机制。实现方式是服务端使用延时线程池开启一个线程,对比当前时间与上一次数据交互的时间间隔,以此来判断连接是否超时,然后通过自定义超时解决方案处理超时现象。实现心跳机制的关键是 IdleStateHandler ,使用方法就是在ChannelPipeline 中加入心跳检测Handler即可,代码如下:

       @Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//编码解码pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());//IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,//会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须//实现userEventTriggered方法处理对应事件pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));//如果使用心跳机制,在下一个handler中需要实现userEventTriggered方法//在userEventTriggered方法中制定超时解决方案pipeline.addLast(new HeartBeatServerHandler());}

IdleStateHandler的构造器如下:

    public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}

IdleStateHandler的构造器参数解读:

  1. readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
  2. writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
  3. allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

注:这三个参数默认的时间单位是。若需要指定其他时间单位,可以使用另一个构造方法:

IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

netty 8583,        IdleStateHandler的构造器源码中主要就是把传入的超时参数赋值给IdleStateHandler类内部的成员变量,以上面代码为例,就是把读超时的 3 秒赋值给IdleStateHandler类内部的readerIdleTimeNanos变量
在这里插入图片描述

真正的超时逻辑判断是在IdleStateHandler中的 channelActive 方法中,这个方法的调用时机是:客户端与服务端刚建立连接时被调用
在这里插入图片描述
进入initialize 方法中
在这里插入图片描述
进入ReaderIdleTimeoutTask 的 run 方法

在这里插入图片描述

        当客户端 与 服务端 出现一次超时后,直接断掉他们之间的连接是不合理的,但是netty并不知道我们想怎么处理超时后的逻辑,所以留给我们一个扩展接口,针对超时后的逻辑进行自定义。我们需要在IdleStateHandler 之后添加一个 业务Handler,并实现userEventTriggered 方法,在该方法中处理超时后的逻辑,如下所示:

    @Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()) {case READER_IDLE:eventType = "读空闲";readIdleTimes++; // 读空闲的计数加1break;case WRITER_IDLE:eventType = "写空闲";// 不处理break;case ALL_IDLE:eventType = "读写空闲";// 不处理break;}//超时3次后关闭连接System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);if (readIdleTimes > 3) {System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");ctx.channel().writeAndFlush("idle close");ctx.channel().close();}}


5. Netty断线自动重连实现

netty零拷贝原理,        
Netty断线自动重连,分为两种场景

①:客户端启动时连接服务端时,如果网络或服务端有问题,客户端连接失败,可以重连,重连的逻辑加在客户端。 开启一定时线程重连即可

//连接方法 connectpublic void connect() throws Exception {System.out.println("netty client start。。");//启动客户端去连接服务器端ChannelFuture cf = bootstrap.connect(host, port);//添加连接失败监听器cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {//开启定时线程 ,每三秒重连一次future.channel().eventLoop().schedule(() -> {System.err.println("重连服务端...");try {//递归调用 connect方法connect();} catch (Exception e) {e.printStackTrace();}}, 3000, TimeUnit.MILLISECONDS);} else {System.out.println("服务端连接成功...");}}});//对通道关闭进行监听cf.channel().closeFuture().sync();}

②:系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的channelInactive方法中进行重连。

    // channel 处于不活动状态时调用@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.err.println("运行中断开重连。。。");nettyClient.connect();}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/59723.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息