健康生活、快乐学习、深度思索
大厂深耕多年,技术不断学习迭代并且深入研究,热爱技术,热爱开源
右上角点击关注,一起进步,关注作者并获取最新技术资讯和技术文章;持续更新有思索深度的文章和技术。
作者:鬣狗
日期:2021年8月29日
0.简介
本篇文章详细介绍了java NIO的架构和底层实现方式。作者通过NIO的源码以及通过各种参考资料弄清楚了NIO的实现原理,除此之外本文还会介绍阻塞和非阻塞的底层实现,以及操纵系统提供Select、Poll、Epoll函数,NIO中的打断机制以以及相关的设计思想,最后结合例子来回过头分析NIO的工作原理。
1.Channel
想了解NIO,第一步就是弄清Channel的概念,Channel是什么?
图1 Channel
A nexus for I/O operations.
A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.
上文的Channel的Java doc的说明翻译过来就是,Channel代表一个到某个实体的打开的连接,这个实体可以是设备、文件、网络套接字或者是具备IO操纵的程序组件。与Java IO流不同的是,Channel的界说显得更加抽象,只要满足上面的界说都可以称作一个Channel,从这个意义上来说,Channel更的含义更像“信道”这个词。
图2 Channel的抽象表示
如图所示,Channel自己抽象为Client或者是调用方到某个Entity的信道,然后通过这个信道可以进行IO操纵。
Channel类族
Channel按照分类有如下:
- ServerSocketChannel 代表一个到监听套接字(a listening socket)的连接
- SocketChannel 代表一个到套接字的连接
- FileChannel 代表一个到文件的连接
- DatagramChannel 代表一个到面向报文的套接字连接
图3 Channel接口
Channel接口自己只有两个方法 isOpen和close。
图4 SocketChannelImpl类图
图4是SockeChannelImpl类的继承关系图,这个图用红色圈起来的类是我们接下来要分析的类。这个图还涉及到接口分离的设计思想,Channel的复杂功能被拆分具有不同功能的接口,如InterruptibleChannel、SelectableChannel、WritableByteChannel、ReadableByteChannel,其中InterruptibleChannel接口增加了异步打断机制功能,SelectableChannel接口增加了多路服用功能,而Channel读写分别有WritableByteChannel、ReadableByteChannel接口承担。这个图还可以看到接口设计的套路:接口---> 抽象类 -----> 具体实现类,如上图的 InterruptibleChannel -> AbstractInterrupibleChannel -> SocketChannelImpl以及SelectableChannel -> AbstractSelectableChannel -> SocketChannelImpl。
Channel的打断机制
A channel that can be asynchronously closed and interrupted.
A channel that implements this interface is asynchronously closeable: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the channel's close method. This will cause the blocked thread to receive an AsynchronousCloseException.
A channel that implements this interface is also interruptible: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the blocked thread's interrupt method. This will cause the channel to be closed, the blocked thread to receive a ClosedByInterruptException, and the blocked thread's interrupt status to be set.
If a thread's interrupt status is already set and it invokes a blocking I/O operation upon a channel then the channel will be closed and the thread will immediately receive a ClosedByInterruptException; its interrupt status will remain set.
A channel supports asynchronous closing and interruption if, and only if, it implements this interface. This can be tested at runtime, if necessary, via the instanceof operator.
实现InterruptibleChannel接口的Channel支持异步关闭和中断关闭
1.异步关闭(async closed):当前线程A阻塞在此Channel,然后线程B调用了Channel.close方法,那么当前线程A就会收到一个AsynchronousCloseExcepion
2.中断关闭:当前线程A阻塞在此Channel,然后线程B调用了线程A的interrupt方法,这会造成当前Channel关闭,并且线程A会收到ClosedByInterruptException。
接下来就看下AbstractInterruptibleChannel怎么实现Channel的这两种中断机制的。Channel的打断机制主要是由AbstractInterruptibleChannel这个类来完成的。
public abstract class AbstractInterruptibleChannel implements Channel, InterruptibleChannel{ private final Object closeLock = new Object(); private volatile boolean open = true; /** * Initializes a new instance of this class. */ protected AbstractInterruptibleChannel() { } /** * Closes this channel. * * If the channel has already been closed then this method returns * immediately. Otherwise it marks the channel as closed and then invokes * the {@link #implCloseChannel implCloseChannel} method in order to * complete the close operation.
* * @throws IOException * If an I/O error occurs */ public final void close() throws IOException { synchronized (closeLock) { if (!open) return; open = false; implCloseChannel(); } } /** * Closes this channel. * *
This method is invoked by the {@link #close close} method in order * to perform the actual work of closing the channel. This method is only * invoked if the channel has not yet been closed, and it is never invoked * more than once. * * An implementation of this method must arrange for any other thread * that is blocked in an I/O operation upon this channel to return * immediately, either by throwing an exception or by returning normally. *
* * @throws IOException * If an I/O error occurs while closing the channel */ protected abstract void implCloseChannel() throws IOException; public final boolean isOpen() { return open; } // -- Interruption machinery -- private Interruptible interruptor; private volatile Thread interrupted; /** * Marks the beginning of an I/O operation that might block indefinitely. * * This method should be invoked in tandem with the {@link #end end} * method, using a try ... finally block as * shown above, in order to implement asynchronous * closing and interruption for this channel.
*/ protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread target) { synchronized (closeLock) { if (!open) return; open = false; interrupted = target; try { AbstractInterruptibleChannel.this.implCloseChannel(); } catch (IOException x) { } } }}; } blockedOn(interruptor); Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); } /** * Marks the end of an I/O operation that might block indefinitely. * * This method should be invoked in tandem with the {@link #begin * begin} method, using a try ... finally block * as shown above, in order to implement asynchronous * closing and interruption for this channel.
* * @param completed * true if, and only if, the I/O operation completed * successfully, that is, had some effect that would be visible to * the operation's invoker * * @throws AsynchronousCloseException * If the channel was asynchronously closed * * @throws ClosedByInterruptException * If the thread blocked in the I/O operation was interrupted */ protected final void end(boolean completed) throws AsynchronousCloseException { blockedOn(null); Thread interrupted = this.interrupted; if (interrupted != null && interrupted == Thread.currentThread()) { interrupted = null; throw new ClosedByInterruptException(); } if (!completed && !open) throw new AsynchronousCloseException(); } // -- sun.misc.SharedSecrets -- static void blockedOn(Interruptible intr) { // package-private sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr); }Channel的中断机制主由上面的begin和end方法完成,子类Channel实现中断机制需要使用下面的代码模板:其中begin和end方法类似代码中前置和后置的功能
boolean completed = false; try { begin(); completed = ...; // Perform blocking I/O operation return ...; // Return result } finally { end(completed); }接下来我们分析下这个代码模板。
begin方法首先设置了Thread的Interruptible属性,这个属性有什么用?这个很关键,Interruptible是Thread中断的一个回调函数,在执行Thread.interrupt方法时,会首先执行Interruptible.interrupt,直接看代码:
图5 Thread的Interruptible
begin方法就是使用了Thread的这个中断回调,如果线程被打断时,就会调用设置的中断回调,将当前Channel关闭,这个就是Channel中断机制的“中断关闭”机制的实现。
protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread target) { synchronized (closeLock) { if (!open) return; open = false; interrupted = target; try { AbstractInterruptibleChannel.this.implCloseChannel(); } catch (IOException x) { } } }}; } blockedOn(interruptor); Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); }end方法用来抛出异常来返回效果,下面是两种中断机制的判断逻辑,这个我就不分析了。
protected final void end(boolean completed) throws AsynchronousCloseException { blockedOn(null); Thread interrupted = this.interrupted; if (interrupted != null && interrupted == Thread.currentThread()) { interrupted = null; throw new ClosedByInterruptException(); } if (!completed && !open) throw new AsynchronousCloseException(); }
SelectableChannel
SelectableChannel的主要作用就是让Channel与Selector交互,并支持多路复用机制。
图6 SelectaChannel
如图SelectableChannel接口提供的主要功能有:向Selector中注册Channel、以及Channel支持哪些操纵。和InterruptibleChannel实现的套路一样,SelectableChannel的部分功能有AbstractSelectableChannel实现。
AbstractSelectableChannel
A channel that can be multiplexed via a Selector.
In order to be used with a selector, an instance of this class must first be registered via the register method. This method returns a new SelectionKey object that represents the channel's registration with the selector.
Once registered with a selector, a channel remains registered until it is deregistered. This involves deallocating whatever resources were allocated to the channel by the selector.
A channel cannot be deregistered directly; instead, the key representing its registration must be cancelled. Cancelling a key requests that the channel be deregistered during the selector's next selection operation. A key may be cancelled explicitly by invoking its cancel method. All of a channel's keys are cancelled implicitly when the channel is closed, whether by invoking its close method or by interrupting a thread blocked in an I/O operation upon the channel.
If the selector itself is closed then the channel will be deregistered, and the key representing its registration will be invalidated, without further delay.
A channel may be registered at most once with any particular selector.
Whether or not a channel is registered with one or more selectors may be determined by invoking the isRegistered method.
Selectable channels are safe for use by multiple concurrent threads.
Blocking mode
A selectable channel is either in blocking mode or in non-blocking mode. In blocking mode, every I/O operation invoked upon the channel will block until it completes. In non-blocking mode an I/O operation will never block and may transfer fewer bytes than were requested or possibly no bytes at all. The blocking mode of a selectable channel may be determined by invoking its isBlocking method.
Newly-created selectable channels are always in blocking mode. Non-blocking mode is most useful in conjunction with selector-based multiplexing. A channel must be placed into non-blocking mode before being registered with a selector, and may not be returned to blocking mode until it has been deregistered
上面的注释要注意的一点就是:selectable channel可以在阻塞模式下工作也可以在非阻塞模式下工作。
阻塞模式:每个IO操纵会阻塞,直到IO完成
非阻塞模式:IO操纵不会阻塞
下文会单独分析Selectable channel的阻塞模式和非阻塞模式是怎么实现的。
图7 AbstractSelectableChannel
AbstractSelectableChannel我们这里只关心红色圈起来的两个函数。剩下的函数都是对SelectionKey的操纵,这里我们忽略。
图8 register函数
selectable channel的register函数最终调用的是Selector函数的register,下一章节我们分析Selector
2.Selector
Selector类的java doc建议仔细阅读。
A multiplexor of SelectableChannel objects.
A selector may be created by invoking the open method of this class, which will use the system's default selector provider to create a new selector. A selector may also be created by invoking the openSelector method of a custom selector provider. A selector remains open until it is closed via its close method.
A selectable channel's registration with a selector is represented by a SelectionKey object. A selector maintains three sets of selection keys:
The key set contains the keys representing the current channel registrations of this selector. This set is returned by the keys method.
The selected-key set is the set of keys such that each key's channel was detected to be ready for at least one of the operations identified in the key's interest set during a prior selection operation. This set is returned by the selectedKeys method. The selected-key set is always a subset of the key set.
The cancelled-key set is the set of keys that have been cancelled but whose channels have not yet been deregistered. This set is not directly accessible. The cancelled-key set is always a subset of the key set.
图9 Selector
如图,Selector类的继承方式,首先是Selector,然后是抽象类,然后是一个基本的实现类,最后是具体的实现类。
open
通过open方法可以获取到平台相关的Selector,open方法通过SelectorProvider来获取真正的Selector。
图10 open
图11 SelectorProvider
而Selector选择则由SelectorProiver.provide方法给定的策略来获取:首先是从系统属性”java.nio.channels.spi.SelectorProvider“指定的类来获取,其次使用spi技术加载,最后使用系统默认的Selector。
java提供的Selector的具体实现有三种:PollSelectorImpl、KqueueSelectorImpl、EpollSelectorImpl,其中EpollSelector是Linux下的Selector
3.阻塞和非阻塞以及底层实现
上文AbstractSelectableChannel中有个configureBlocking函数,这个函数具体是怎么实现的呢。可以看到它调用了implConfigBlocking函数。找到它的具体实现,效果发现最调用的是java native函数。
图12 configureBlocking
找到它的具体实现,效果发现最调用的是java native函数。
图13 IOUtil.configBlocking
打开openjdk源码,发现其调用了linux下的fcntl函数,这下就解开了,文件阻塞和非阻塞是linux下本来就支持的功能特性。fcntl函数就可以设置以非阻塞模式打开文件,这个函数这里就不介绍了。
图14 configureBlocking源码
4.Channel和Selector的交互
我们通过一个NIO常用示例来分析底层的运行。如下图是NIO一个常用的例子。
1.首先创建一个服务端的ServerSocketChannel,绑定端口
2.然后将这个channel注册到selector上面
3.然后调用select方法,注意select方法是阻塞的,只有selector上存在channel发生了注册的事件时才会返回
4.select返回时,selectedKey里面就有可以大概有用的channel了
图15 NIO常用例子
备注:代码来自https://blog.csdn.net/u010889616/article/details/80686236
图16 手绘例子
上图是模拟java nio在服务器方面的使用,可以看到,serversocketchannel监听到有新的连接进来时,就会把它注册到selector中,用一个selector就可以管理全部连接的状态,也就是SelectionKey,然后就可以使用Dispatcher的方式将这些SelectionKey分派下去进行处理。注意:Selector的register底层实现使用的是channel的文件某舒服
5.Select、Poll、Epoll以及EpollSelectorImpl具体实现
这章我只说一句话,Epoll使用红黑树来管理这些socket,然后EpollSelectorImpl就是对Linux下的Epoll函数的包装。
6.Epoll的水平触发和边缘触发
水平触发(LT):水平触发指的在当前select选择周期中,当channel对应的文件描述符中有数据时,这个文件就会被select选中并返回
边缘触发(ET):边缘触发指的当前select选择周期中,当channel对应的文件描述符中有新增的数据时,这个文件就会被select选中并返回
7.参考资料
epoll、EpollSelectorImpl:
https://man7.org/linux/man-pages/man2/epoll_ctl.2.html
https://www.cnblogs.com/Jack-Blog/p/12394487.html
https://man7.org/linux/man-pages/man7/epoll.7.html
https://developer.aliyun.com/article/58917
fcntl函数:
https://www.cnblogs.com/zxc2man/p/7649240.html
https://www.cnblogs.com/xuyh/p/3273082.html
https://www.cnblogs.com/zxc2man/p/7649240.html
NIO JCP提案:
https://jcp.org/en/jsr/detail?id=51
linux pipe函数:
https://www.cnblogs.com/kunhu/p/3608109.html
OpenJDK源码:github自找
NIO示例:
https://blog.csdn.net/chengkui1990/article/details/81558522
https://www.cnblogs.com/wenbinshen/p/9902814.html
NIO底层原理,强烈保举!!这三篇好好读:
https://zhuanlan.zhihu.com/p/361750240
https://juejin.cn/post/6882984260672847879
https://www.cnblogs.com/flashsun/p/14591563.html
8.手稿
图16 作者手稿
9.其它
github主页:
https://github.com/youngFF
git堆栈地点:
https://github.com/youngFF/MyHearthStone.git
gitbook地点:
https://youngff.github.io/MyHearthStone/
接待各位加入鬣狗技术社区,希望可以大概为您提供有思索、有深度的文章。接待加入我们,如果你也有想法在鬣狗技术社区发表文章,头条私聊即可。
求 关注转发点赞,谢谢各位!您的支持就是我们更新的动力 |