
 2023-09-15 阅读 22 评论 0

摘要:之前在做项目的时候使用到了Netty这个网络框架,对于Java中的IO模型有了进一步的了解,熟悉的NIO非阻塞的模式。而Netty就是对于Java NIO 的高级封装。这篇文章就是个人根据Netty4.1.6的源码,进行了总结。 Netty组件 NioEventLoop   对于Netty中的NioEv

之前在做项目的时候使用到了Netty这个网络框架,对于Java中的IO模型有了进一步的了解,熟悉的NIO非阻塞的模式。而Netty就是对于Java NIO 的高级封装。这篇文章就是个人根据Netty4.1.6的源码,进行了总结。




  @Overrideprotected void run() {//实现一个死循环不断去绑定对应的监听状态for (;;) {try {//根据不同的装响应实现不同的操作switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and//    ''. (BAD)// 2) Selector is waken up between '' and//    'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following '' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following '' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;//开始处理每个链接if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}

netty模型?  从代码中可以看出来真正实现对于数据读写操作就是从processSelectedKeys()方法开始的,而这个方法processSelectedKeys()。主要的作用是什么呢?

private void processSelectedKeys() {//如果SelectionKey为空if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}
}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {final SelectionKey k = selectedKeys[i];if (k == null) {break;}// null out entry in the array to allow to have it GC'ed once the Channel close// See[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again.//// See = this.selectedKeys.flip();i = -1;}}}private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {// check if the set is empty and if so just return to not create garbage by// creating a new Iterator every time even if there is nothing to process.// See (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k =;final Object a = k.attachment();i.remove();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}if (needsToSelectAgain) {selectAgain();selectedKeys = selector.selectedKeys();// Create the iterator again to avoid ConcurrentModificationExceptionif (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();}}}}

  可以看到上面的processSelectedKeys()方法通过对于SelectionKey的判断实现了两种不同的逻辑。但是这两个逻辑最为核心的操作就是AbstractNioChannel类的存在,可以继续跟进源码会发现这个类其实就是一个Channel,在Java NIO中我们知道一个Channel就类似于普通网络编程中的一个Socket。所以说这里的AbstractNioChannel是对于Channel在Netty NIO实现的基础上做了进一步的封装。



private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {//获取到unsafefinal AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise will always return without blocking// See ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loop//这里有一个对于OP_ACCEPT的判断。if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {;if (!ch.isOpen()) {// Connection already closed - no need to handle write.return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}


if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {;if (!ch.isOpen()) {// Connection already closed - no need to handle write.return;}}


  private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called or in channelRead(...) method// * The user called or in channelReadComplete(...) method//// See (!readPending && !config.isAutoRead()) {removeReadOp();}}}}

  在上面逻辑中值得关注一个方法便是doReadMessages(readBuf)方法,查看这个方法的实现会发现,是由NioServerSocketChannel实现。到这里终于是接近Java NIO的底层了。

   @Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}


   @Overrideprotected ServerSocketChannel javaChannel() {return (ServerSocketChannel) super.javaChannel();}


  在Java实现的NIO编程中有一个ByteBuffer,这个ByteBuffer是对IO流的封装,在Netty中还有就是对于ByteBuffer的封装ByteBuf类,在这个类中定义了很多的read和write方法, 而这些方法就是对一普通IO中的输入输出流。如下图



 protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);


 protected DefaultChannelPipeline(Channel channel) { = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this); = tail;tail.prev = head;}


  对应业务逻辑处理块,在ChannelPipeline中有很多的add方法、remove方法其实这些方法就是实现了对于处理逻辑的动态的处理。这里使用的一个策略模式 使用者可以根据不同的处理逻辑实现不同的Handler,但是整个的处理逻辑是不变的就是实现对于数据的读写操作。这里看一个比较常用的方法addLast()。无论传入什么样的Handler都不会改变这个方法的实现逻辑。而这方法就是将对应的处理逻辑加入到整个处理的最后。

    @Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {if (handlers == null) {throw new NullPointerException("handlers");}for (ChannelHandler h: handlers) {if (h == null) {break;}addLast(executor, null, h);}return this;}


  1. NioEventLoop组件
  2. Channel组件
  3. ByteBuf组件
  4. Pipeline组件
  5. ChannelHandler组件





Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。
