Netty是如何实现TCP心跳机制与断线重连的

 2023-09-15 阅读 26 评论 0

摘要:本文来说下Netty 是如何实现 TCP 心跳机制与断线重连的 文章目录什么是心跳机制HeartBeat如何实现心跳机制Netty实现自定义的心跳机制服务端客户端测试效果客户端断线重连本文小结 netty心跳机制? 什么是心跳机制HeartBeat 在 TCP 长连接 keepAlive 的应用场景下,cli

本文来说下Netty 是如何实现 TCP 心跳机制与断线重连的

文章目录

  • 什么是心跳机制HeartBeat
  • 如何实现心跳机制
  • Netty实现自定义的心跳机制
    • 服务端
    • 客户端
    • 测试效果
  • 客户端断线重连
  • 本文小结

netty心跳机制?


什么是心跳机制HeartBeat

在 TCP 长连接 keepAlive 的应用场景下,client 端一般不会主动关闭它们之间的连接,Client 与 Server 之间的连接如果一直不关闭的话,随着客户端连接越来越多,Server 早晚有扛不住的时候,这时候 Server 端需要采取一些策略,如关闭一些长时间没有读写事件发生的连接,这样可以避免一些恶意连接导致 Server 端服务受损

所谓心跳机制 / 心跳检测, 即在 TCP长连接中 , 客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线, 以确保 TCP连接的有效性.


如何实现心跳机制

netty心跳检测,我们可以通过两种方式实现心跳机制:

使用 TCP 协议层面的 keepalive 机制。在 Netty 中使用该策略:

.childOption(ChannelOption.SO_KEEPALIVE, true); 

在应用层上实现自定义的心跳机制.,虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

  • 它不是 TCP 的标准协议, 并且是默认关闭的.
  • TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.
  • TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

信息互通机制如何建立、虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳,一般我们自己实现的大致策略是这样的:

  • Client 启动一个定时器,不断向客户端发送心跳
  • Server 收到心跳后,做出回应;
  • Server 启动一个定时器,判断 Client 是否存在,这里做判断有两种方法:时间差和简单标识。

① 时间差:

  • 收到一个心跳包之后记录当前时间;
  • 判断定时器到达时间,计算多久没收到心跳时间 = 当前时间 - 上次收到心跳时间。如果该时间大于设定值则认为超时。

② 简单标识:

  • 收到心跳后设置连接标识为 true
  • 判断定时器到达时间,如果未收到心跳则设置连接标识为false

下面我们来看看基于 Netty 如何实现应用层上的心跳机制 👇


Netty实现自定义的心跳机制

Netty 中实现心跳检测依赖于 IdleStateHandler,它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 IDLE 状态), 就会触发指定的事件。本文要实现的逻辑步骤为:

  • 启动服务端,启动客户端
  • 客户端向服务端发送 “I am alive”,并 sleep 随机时间,用来模拟空闲。
  • 服务端接收客户端消息并返回 “copy that”,若客户端空闲则计数 +1.
  • 若服务端检测客户端读空闲次数 > 3,则服务端关闭连接。
  • 客户端发现连接关闭了,就退出了。

服务端

服务端启动类:

public class HeartBeatServer {int port ;public HeartBeatServer(int port){this.port = port;}public void start(){ServerBootstrap bootstrap = new ServerBootstrap();EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();try{bootstrap.group(boss,worker).handler(new LoggingHandler(LogLevel.INFO)).channel(NioServerSocketChannel.class).childHandler(new HeartBeatServerInitializer());ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync();}catch(Exception e){e.printStackTrace();}finally {worker.shutdownGracefully();boss.shutdownGracefully();}}public static void main(String[] args) throws Exception {HeartBeatServer server = new HeartBeatServer(8090);server.start();}
}

和我们上一章节的实例几乎一模一样,只需要看 childHandler(new HeartBeatServerInitializer()) 这一句。HeartBeatServerInitializer 就是一个我们自定义的 ChannelInitializer . 顾名思义,他就是在初始化 channel 的时做一些事情。我们所需要开发的业务逻辑 Handler 就是在这里添加的。其代码如下:

public class HeartBeatServerInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new IdleStateHandler(40,0,0, TimeUnit.SECONDS));pipeline.addLast(new HeartBeatServerHandler());}
}

代码很简单,我们先添加了StringDecoder 和StringEncoder 用于编解码,IdleStateHandler 就是心跳检测的核心组件。我们可以看到IdleStateHandler的构造函数中接收了4个参数,其定义如下:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);
  • readerIdleTime:超过xxx时间客户端没有发生读事件,就会触发一个 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTime:超过xxx时间客户端没有发生写事件,就会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTime:超过xxx时间客户端没有发生读或写事件,就会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
  • unit:时间参数的格式

我们的例子中设置的是 new IdleStateHandler(2,2,2, TimeUnit.SECONDS),意思就是客户端 40 秒内没有发生读事件,超时事件就会被触发,具体操作定义在自定义的处理类 HeartBeatServerHandler.userEventTriggered 中。代码如下:

public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {private int readIdleTimes = 0; // 空闲计数@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {System.out.println(" ====== > [server] message received : " + s);if("I am alive".equals(s)){ctx.channel().writeAndFlush("copy that");}else {System.out.println(" 其他信息处理 ... ");}}// 心跳检测@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent)evt;String eventType = null;switch (event.state()){case READER_IDLE:eventType = "读空闲";readIdleTimes ++; // 读空闲的计数加1break;case WRITER_IDLE:eventType = "写空闲";// 不处理break;case ALL_IDLE:eventType ="读写空闲";// 不处理break;}System.out.println(ctx.channel().remoteAddress() + "超时事件:" +eventType);if(readIdleTimes > 3){System.out.println(" [server]读空闲超过3次,关闭连接");ctx.channel().writeAndFlush("you are out");ctx.channel().close();}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");}}

客户端

netty的api设计使得编码的模式非常具有通用性,所以客户端代码和服务端的代码几乎一样:启动client端的代码几乎一样,也需要一个ChannelInitializer,也需要Handler。改动的地方很少,因此本文不对客户端代码进行详细解释。下面给出client端的完整代码:

public class HeartBeatClient  {private int port;private Random random ;public HeartBeatClient(int port){this.port = port;random = new Random();}public static void main(String[] args) throws Exception{HeartBeatClient client = new HeartBeatClient(8090);client.start();}public void start() {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try{Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new HeartBeatClientInitializer());Channel channel = bootstrap.connect(host, port).sync().channel();String  text = "I am alive";// 客户端每隔一段随机时间发送信息给服务端(模拟空闲)while (channel.isActive()){sendMsg(text);}}catch(Exception e){// do something}finally {eventLoopGroup.shutdownGracefully();}}public void sendMsg(String text) throws Exception{int num = random.nextInt(10);Thread.sleep(num * 1000); // 模拟空闲channel.writeAndFlush(text);}}

客户端不用加入心跳检测 IdleStateHandler :

public class HeartBeatClientInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new HeartBeatClientHandler());}
}
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" client received :" +msg);if(msg!= null && msg.equals("you are out")) {System.out.println(" server closed connection , so client will close too");ctx.channel().closeFuture();}}
}

测试效果

在上面的代码写好之后,我们先启动服务端,然后在启动客户端。运行日志如下

server 端:

=== /127.0.0.1:57700 is active ========= > [server] message received : I am alive====== > [server] message received : I am alive
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:写空闲====== > [server] message received : I am alive
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:读空闲[server]读空闲超过3次,关闭连接

client 端:

 client sent msg and sleep 2client received :copy thatclient received :copy thatclient sent msg and sleep 6client sent msg and sleep 6client received :copy thatclient received :you are outserver closed connection , so client will close tooProcess finished with exit code 0

通过上面的运行日志,我们可以看到:

  • 客户端在与服务器成功建立之后,发送了3次’I am alive’,服务端也回应了3次:‘copy that’
  • 由于客户端超时了多次,服务端关闭了链接。
  • 客户端知道服务端抛弃自己之后,也关闭了连接,程序退出。

客户端断线重连

客户端连接服务器时

bootstrap.connect(host, port).sync()

会返回一个 ChannelFuture 的对象,我们可以对这个对象进行监听.

如下,我们抽象出 doConnect 方法, 它负责客户端和服务器的 TCP 连接的建立, 并且当 TCP 连接失败时, doConnect 会通过 “channel().eventLoop().schedule” 来延时 10s 后尝试重新连接

public class HeartBeatClient  {private Channel channel;.......public void start() {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try{Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new HeartBeatClientInitializer());doConnect();.........}catch(Exception e){// do something}finally {eventLoopGroup.shutdownGracefully();}}protected void doConnect() {if (channel != null && channel.isActive()) {return;}ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);future.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture futureListener) throws Exception {if (futureListener.isSuccess()) {channel = futureListener.channel();System.out.println("Connect to server successfully!");} else {// 断线重连System.out.println("Failed to connect to server, try connect after 10s");futureListener.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {doConnect();}}, 10, TimeUnit.SECONDS);}}});}...........}

这还不够,断线重连的关键一点是检测连接是否已经断开. 因此我们需要在 ClientHandler 中重写 channelInactive 方法. 当 TCP 连接断开时, 会回调 channelInactive 方法, 因此我们在这个方法中调用 client.doConnect() 来进行重连:

public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {private HeartBeatClient heartBeatClient;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" client received :" +msg);if(msg!= null && msg.equals("you are out")) {System.out.println(" server closed connection , so client will close too");ctx.channel().closeFuture();}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);heartBeatClient.doConnect();}
}

本文小结

本文详细介绍了Netty是如何实现TCP心跳机制与断线重连的。

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/1/62318.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息