本文来说下Netty 是如何实现 TCP 心跳机制与断线重连的
netty心跳机制?
在 TCP 长连接 keepAlive 的应用场景下,client 端一般不会主动关闭它们之间的连接,Client 与 Server 之间的连接如果一直不关闭的话,随着客户端连接越来越多,Server 早晚有扛不住的时候,这时候 Server 端需要采取一些策略,如关闭一些长时间没有读写事件发生的连接,这样可以避免一些恶意连接导致 Server 端服务受损
所谓心跳机制 / 心跳检测, 即在 TCP长连接中 , 客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线, 以确保 TCP连接的有效性.
netty心跳检测,我们可以通过两种方式实现心跳机制:
使用 TCP 协议层面的 keepalive 机制。在 Netty 中使用该策略:
.childOption(ChannelOption.SO_KEEPALIVE, true);
在应用层上实现自定义的心跳机制.,虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:
信息互通机制如何建立、虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳,一般我们自己实现的大致策略是这样的:
① 时间差:
② 简单标识:
下面我们来看看基于 Netty 如何实现应用层上的心跳机制 👇
Netty 中实现心跳检测依赖于 IdleStateHandler,它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 IDLE 状态), 就会触发指定的事件。本文要实现的逻辑步骤为:
服务端启动类:
public class HeartBeatServer {int port ;public HeartBeatServer(int port){this.port = port;}public void start(){ServerBootstrap bootstrap = new ServerBootstrap();EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();try{bootstrap.group(boss,worker).handler(new LoggingHandler(LogLevel.INFO)).channel(NioServerSocketChannel.class).childHandler(new HeartBeatServerInitializer());ChannelFuture future = bootstrap.bind(port).sync();future.channel().closeFuture().sync();}catch(Exception e){e.printStackTrace();}finally {worker.shutdownGracefully();boss.shutdownGracefully();}}public static void main(String[] args) throws Exception {HeartBeatServer server = new HeartBeatServer(8090);server.start();}
}
和我们上一章节的实例几乎一模一样,只需要看 childHandler(new HeartBeatServerInitializer()) 这一句。HeartBeatServerInitializer 就是一个我们自定义的 ChannelInitializer . 顾名思义,他就是在初始化 channel 的时做一些事情。我们所需要开发的业务逻辑 Handler 就是在这里添加的。其代码如下:
public class HeartBeatServerInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new IdleStateHandler(40,0,0, TimeUnit.SECONDS));pipeline.addLast(new HeartBeatServerHandler());}
}
代码很简单,我们先添加了StringDecoder 和StringEncoder 用于编解码,IdleStateHandler 就是心跳检测的核心组件。我们可以看到IdleStateHandler的构造函数中接收了4个参数,其定义如下:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);
我们的例子中设置的是 new IdleStateHandler(2,2,2, TimeUnit.SECONDS),意思就是客户端 40 秒内没有发生读事件,超时事件就会被触发,具体操作定义在自定义的处理类 HeartBeatServerHandler.userEventTriggered 中。代码如下:
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {private int readIdleTimes = 0; // 空闲计数@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {System.out.println(" ====== > [server] message received : " + s);if("I am alive".equals(s)){ctx.channel().writeAndFlush("copy that");}else {System.out.println(" 其他信息处理 ... ");}}// 心跳检测@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent)evt;String eventType = null;switch (event.state()){case READER_IDLE:eventType = "读空闲";readIdleTimes ++; // 读空闲的计数加1break;case WRITER_IDLE:eventType = "写空闲";// 不处理break;case ALL_IDLE:eventType ="读写空闲";// 不处理break;}System.out.println(ctx.channel().remoteAddress() + "超时事件:" +eventType);if(readIdleTimes > 3){System.out.println(" [server]读空闲超过3次,关闭连接");ctx.channel().writeAndFlush("you are out");ctx.channel().close();}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");}}
netty的api设计使得编码的模式非常具有通用性,所以客户端代码和服务端的代码几乎一样:启动client端的代码几乎一样,也需要一个ChannelInitializer,也需要Handler。改动的地方很少,因此本文不对客户端代码进行详细解释。下面给出client端的完整代码:
public class HeartBeatClient {private int port;private Random random ;public HeartBeatClient(int port){this.port = port;random = new Random();}public static void main(String[] args) throws Exception{HeartBeatClient client = new HeartBeatClient(8090);client.start();}public void start() {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try{Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new HeartBeatClientInitializer());Channel channel = bootstrap.connect(host, port).sync().channel();String text = "I am alive";// 客户端每隔一段随机时间发送信息给服务端(模拟空闲)while (channel.isActive()){sendMsg(text);}}catch(Exception e){// do something}finally {eventLoopGroup.shutdownGracefully();}}public void sendMsg(String text) throws Exception{int num = random.nextInt(10);Thread.sleep(num * 1000); // 模拟空闲channel.writeAndFlush(text);}}
客户端不用加入心跳检测 IdleStateHandler :
public class HeartBeatClientInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new HeartBeatClientHandler());}
}
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" client received :" +msg);if(msg!= null && msg.equals("you are out")) {System.out.println(" server closed connection , so client will close too");ctx.channel().closeFuture();}}
}
在上面的代码写好之后,我们先启动服务端,然后在启动客户端。运行日志如下:
server 端:
=== /127.0.0.1:57700 is active ========= > [server] message received : I am alive====== > [server] message received : I am alive
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:写空闲====== > [server] message received : I am alive
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:读空闲[server]读空闲超过3次,关闭连接
client 端:
client sent msg and sleep 2client received :copy thatclient received :copy thatclient sent msg and sleep 6client sent msg and sleep 6client received :copy thatclient received :you are outserver closed connection , so client will close tooProcess finished with exit code 0
通过上面的运行日志,我们可以看到:
客户端连接服务器时
bootstrap.connect(host, port).sync()
会返回一个 ChannelFuture 的对象,我们可以对这个对象进行监听.
如下,我们抽象出 doConnect 方法, 它负责客户端和服务器的 TCP 连接的建立, 并且当 TCP 连接失败时, doConnect 会通过 “channel().eventLoop().schedule” 来延时 10s 后尝试重新连接
public class HeartBeatClient {private Channel channel;.......public void start() {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try{Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new HeartBeatClientInitializer());doConnect();.........}catch(Exception e){// do something}finally {eventLoopGroup.shutdownGracefully();}}protected void doConnect() {if (channel != null && channel.isActive()) {return;}ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);future.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture futureListener) throws Exception {if (futureListener.isSuccess()) {channel = futureListener.channel();System.out.println("Connect to server successfully!");} else {// 断线重连System.out.println("Failed to connect to server, try connect after 10s");futureListener.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {doConnect();}}, 10, TimeUnit.SECONDS);}}});}...........}
这还不够,断线重连的关键一点是检测连接是否已经断开. 因此我们需要在 ClientHandler 中重写 channelInactive 方法. 当 TCP 连接断开时, 会回调 channelInactive 方法, 因此我们在这个方法中调用 client.doConnect() 来进行重连:
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {private HeartBeatClient heartBeatClient;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(" client received :" +msg);if(msg!= null && msg.equals("you are out")) {System.out.println(" server closed connection , so client will close too");ctx.channel().closeFuture();}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);heartBeatClient.doConnect();}
}
本文详细介绍了Netty是如何实现TCP心跳机制与断线重连的。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态