当你通过netty发送或者接受一个消息的时候,就会发生一次数据转换,其中内容的发送和j接收会经历: 字符串/对象 ==> 字节数组 ==> 字符串/对象 的一个过程,被称为编码解码。 发送数据时,如果直接发送是发送不成功的,因为数据在计算机底层传输是二进制流的形式,如下代码发送数据时会发送失败。必须把字符串转化成字节数组。
//直接发送字符串,会发送失败ChannelHandlerContext.writeAndFlush("HelloClient");
netty心跳检测。把字符串转化成字节数组的转化方式有多种方案
第一种方案:手动编解码如下:
需要在代码中,耦合的调用Unpooled方法进行数据的编码、解码
// 发送数据时编码@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 发送数据时,需要调用netty的Unpooled类,把字符串放进ByteBuf 字节缓冲区中ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);// 发送数据ctx.writeAndFlush(buf);}
// 接收数据时解码@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBufferByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));}
上述代码耦合性太重,只适合测试或者个别数据的发送接收
第二种方案:比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。他们都是基于JDK的序列化方式,对数据进行序列化。使用时直接加入ChannelPipeline链表中即可
ChannelPipeline pipeline = socketChannel.pipeline();//向pipeline中加入字符串解码器pipeline.addLast("decoder",new StringDecoder());// 对象编码器//pipeline.addLast(new ObjectEncoder()); //向pipeline中加入字符串编码器pipeline.addLast("encoder",new StringEncoder());
tcp心跳检测、这些编码、解码器 分别实现了ChannelInboundHandler、ChannelOutboundHandler。分别对入站、出站的数据进行编码和解码。
第三种方案:自定义编解码器,在 下面粘包拆包中有体现,往下面看!
netty为我们提供的编码解码器底层是使用的JDK的序列化,相对于protobuf序列化JDK的就比较低效。但是protobuf需要维护大量的proto文件也比较麻烦,现在一般可以使用protostuff作为序列化机制。
protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单,代码如下:
依赖:
<dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-api</artifactId><version>1.0.10</version>
</dependency>
<dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.0.10</version>
</dependency>
<dependency><groupId>com.dyuproject.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.0.10</version>
</dependency>
netty tcp。protostuff工具类
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** protostuff 序列化工具类,基于protobuf封装*/
public class ProtostuffUtil {private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();private static <T> Schema<T> getSchema(Class<T> clazz) {@SuppressWarnings("unchecked")Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);if (schema == null) {schema = RuntimeSchema.getSchema(clazz);if (schema != null) {cachedSchema.put(clazz, schema);}}return schema;}/*** 序列化** @param obj* @return*/public static <T> byte[] serializer(T obj) {@SuppressWarnings("unchecked")Class<T> clazz = (Class<T>) obj.getClass();LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {Schema<T> schema = getSchema(clazz);return ProtostuffIOUtil.toByteArray(obj, schema, buffer);} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} finally {buffer.clear();}}/*** 反序列化** @param data* @param clazz* @return*/public static <T> T deserializer(byte[] data, Class<T> clazz) {try {T obj = clazz.newInstance();Schema<T> schema = getSchema(clazz);ProtostuffIOUtil.mergeFrom(data, obj, schema);return obj;} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}//测试public static void main(String[] args) {byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhb"));User user = ProtostuffUtil.deserializer(userBytes, User.class);System.out.println(user);}
}
protostuff的使用
// 发送数据时进行序列化@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//测试用protostuff的serializer方法对user对象编解码ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serializer(new User(1, "zhb")));//发送ctx.writeAndFlush(buf);}
// 接收数据时进行反序列化@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//测试用protostuff的deserializer方法对user对象反序列化ByteBuf buf = (ByteBuf) msg;byte[] bytes = new byte[buf.readableBytes()];System.out.println("从客户端读取到Object:" + ProtostuffUtil.deserializer(bytes, User.class));}
TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。
如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。
比如客户端发送了 java 、python 两个字符串
netty 粘包。粘包拆包问题解决方案
在开发中,通常使用的是第三种(发送数据+长度)的方式来解决粘包拆包问题。由于netty在数据编解码时没有提供同时处理 数据+长度 的编解码方案,所以通常是由自己实现的编解码器,并作为ChannelHandler加入到ChannelPipeline的责任链中。自定义编解码器核心代码如下所示:
自定义发送内容类(长度 + 数据):
//自定义数据包
public class MyMessageProtocol {//定义一次发送包体长度private int len;//一次发送包体内容,已转为字节数组private byte[] content;
}
自定义编码器
//自定义编码器,需要继承netty的MessageToByteEncoder类
public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx,MyMessageProtocol msg, ByteBuf out) throws Exception {System.out.println("MyMessageEncoder encode 方法被调用");//客户端发送数据长度 占用4个字节,服务端会先接受这4个字节out.writeInt(msg.getLen());//客户端发送数据内容out.writeBytes(msg.getContent());}
}
自定义解码器
//自定义解码器,需要继承netty的 ByteToMessageDecoder类
public class MyMessageDecoder extends ByteToMessageDecoder {//int length = 0;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println();System.out.println("MyMessageDecoder decode 被调用");//需要将得到二进制字节码-> MyMessageProtocol 数据包(对象)System.out.println(in);//如果ByteBuf可读的字节数组 >= 4if(in.readableBytes() >= 4) {if (length == 0){//读取内容长度length = in.readInt();}if (in.readableBytes() < length) {System.out.println("当前可读数据不够,继续等待。。");return;}byte[] content = new byte[length];//如果ByteBuf可读的字节数组长度 大于发送的内容长度,则读取内容,并封装对象if (in.readableBytes() >= length){in.readBytes(content);//封装成MyMessageProtocol对象,传递到下一个handler业务处理MyMessageProtocol messageProtocol = new MyMessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);//传递下一个handlerout.add(messageProtocol);}//重置数据长度length = 0;}}
}
tcp需要心跳?客户端 ChannelPipeline 中添加自定义编码器
protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加自定义编码器pipeline.addLast(new MyMessageEncoder());// 添加业务处理器pipeline.addLast(new MyClientHandler());}================= 业务处理器MyClientHandler ==================public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {// 在与服务端连通后调用下面的方法,往服务端发送10次 “你好,我是张三!”@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for(int i = 0; i< 10; i++) {String msg = "你好,我是张三!";//创建协议包对象MyMessageProtocol messageProtocol = new MyMessageProtocol();// 字节长度messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);// 字节内容messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));ctx.writeAndFlush(messageProtocol);}}
}
服务端 ChannelPipeline 中添加自定义解码器
@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加自定义编码器pipeline.addLast(new MyMessageDecoder());// 添加业务处理器pipeline.addLast(new MyServerHandler());}================= 业务处理器MyServerHandler ==================public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {private int count;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {System.out.println("====服务端接收到消息如下====");System.out.println("长度=" + msg.getLen());System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8));System.out.println("服务端接收到消息包数量=" + (++this.count));}}
启动服务端和客户端后,控制台打印结果如下,粘包拆包问题得到解决!
netty mqtt。 在TCP长连接中,客户端和服务器之间建立连接,并定期发送一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性,这就是心跳机制。作用是防止某一方由于网络等原因断开连接后,另一方不知道,还继续保持连接不释放,造成不必要的资源浪费。
netty底层已经实现了客户端与服务端的心跳检测机制。实现方式是服务端使用延时线程池开启一个线程,对比当前时间与上一次数据交互的时间间隔,以此来判断连接是否超时,然后通过自定义超时解决方案处理超时现象。实现心跳机制的关键是 IdleStateHandler ,使用方法就是在ChannelPipeline 中加入心跳检测Handler即可,代码如下:
@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//编码解码pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());//IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,//会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须//实现userEventTriggered方法处理对应事件pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));//如果使用心跳机制,在下一个handler中需要实现userEventTriggered方法//在userEventTriggered方法中制定超时解决方案pipeline.addLast(new HeartBeatServerHandler());}
IdleStateHandler的构造器如下:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}
IdleStateHandler的构造器参数解读:
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
netty 8583, IdleStateHandler的构造器源码中主要就是把传入的超时参数赋值给IdleStateHandler类内部的成员变量,以上面代码为例,就是把读超时的 3 秒赋值给IdleStateHandler类内部的readerIdleTimeNanos变量
真正的超时逻辑判断是在IdleStateHandler中的 channelActive 方法中,这个方法的调用时机是:客户端与服务端刚建立连接时被调用
进入initialize 方法中
进入ReaderIdleTimeoutTask 的 run 方法
当客户端 与 服务端 出现一次超时后,直接断掉他们之间的连接是不合理的,但是netty并不知道我们想怎么处理超时后的逻辑,所以留给我们一个扩展接口,针对超时后的逻辑进行自定义。我们需要在IdleStateHandler 之后添加一个 业务Handler,并实现userEventTriggered 方法,在该方法中处理超时后的逻辑,如下所示:
@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()) {case READER_IDLE:eventType = "读空闲";readIdleTimes++; // 读空闲的计数加1break;case WRITER_IDLE:eventType = "写空闲";// 不处理break;case ALL_IDLE:eventType = "读写空闲";// 不处理break;}//超时3次后关闭连接System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);if (readIdleTimes > 3) {System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");ctx.channel().writeAndFlush("idle close");ctx.channel().close();}}
netty零拷贝原理,
Netty断线自动重连,分为两种场景
①:客户端启动时连接服务端时,如果网络或服务端有问题,客户端连接失败,可以重连,重连的逻辑加在客户端。 开启一定时线程重连即可
//连接方法 connectpublic void connect() throws Exception {System.out.println("netty client start。。");//启动客户端去连接服务器端ChannelFuture cf = bootstrap.connect(host, port);//添加连接失败监听器cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {//开启定时线程 ,每三秒重连一次future.channel().eventLoop().schedule(() -> {System.err.println("重连服务端...");try {//递归调用 connect方法connect();} catch (Exception e) {e.printStackTrace();}}, 3000, TimeUnit.MILLISECONDS);} else {System.out.println("服务端连接成功...");}}});//对通道关闭进行监听cf.channel().closeFuture().sync();}
②:系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的channelInactive方法中进行重连。
// channel 处于不活动状态时调用@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.err.println("运行中断开重连。。。");nettyClient.connect();}
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态