近来笔者在研究java的nio部分代码时,看到在java的nio中关于事件类型的设计有四个分别是:
- OP_ACCEPT
- OP_READ
- OP_WRITE
- OP_CONNECT
然而nio的底层在Linux的实现是epoll,在epoll的模型中事件类型相对比较多分别是:
- POLLIN
- POLLOUT
- POLLERR
- POLLHUP
- POLLNVAL
- POLLREMOVE
当时笔者就有一个疑惑,为什么在epoll中没有关于OP_ACCEPT相关的事件,那么OP_ACCEPT在epoll中又意味着什么
怀着这样的疑问,笔者开始对源码进行解析,想一探究竟,本篇博客,我们就一起学习下关于java的nio到epoll详细实现的源码。
我们先从java服务端nio代码,一样平常在java中利用nio编程时都会利用这样的代码:
//当我们在java中利用nio编程时,一样平常会利用这几行代码在服务端开启一个selector和channel对特定端口进行监听//笔者省略了一些关于selectionKey的循环处置惩罚逻辑Selector selector = Selector.open();ServerSocketChannel serverChannel = ServerSocketChannel.open();serverChannel.bind(new InetSocketAddress(bindIp, port), 1024);serverChannel.register(selector, SelectionKey.OP_ACCEPT);selector.select(1000L)Set keys = selector.selectedKeys()复制1.selector.open():
我们来一行一行分析,先看selector.open(),这一行会创建一个selector,我们一起看下selector的本质是什么:
//这个方法是开启一个Selectorpublic static Selector open() throws IOException { return SelectorProvider.provider().openSelector();}public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; //默认情况下利用DefaultSelectorProvider provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); }}//linux情况下DefaultSelectorProvider的create方法public static SelectorProvider create() { String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); if (osname.equals("SunOS")) return createProvider("sun.nio.ch.DevPollSelectorProvider"); //可以看出在linux情况下利用的是EPollSelectorProvider if (osname.equals("Linux")) return createProvider("sun.nio.ch.EPollSelectorProvider"); return new sun.nio.ch.PollSelectorProvider();}//EPollSelectorProvider的openSelector()方法public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this);}复制EPollSelectorImpl从名字可以看出,nio的在Linux系统上的实现果然是epoll,我们看下EPollSelectorImpl类的构造方法:
EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); //创建一个pipe通道,返回fd文件句柄,用于实现超机遇制 long pipeFds = IOUtil.makePipe(false); //分别保存输入和输出的句柄 fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; //创建epoll包装类 pollWrapper = new EPollArrayWrapper(); //这里是初始化中断,后面用于实现超机遇制 pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap();}复制这里我们看到在selector中实际是创建了一个epoll的包装类,我们先着看这个包装类构造方法做了什么:
//EPollArrayWrapper构造方法的EPollArrayWrapper() throws IOException { //创建epoll文件描述符 epollCreate()是native方法 epfd = epollCreate(); //申请一个数组,用于保存epoll_event的数组 int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true); //这里保存了这个数组的地点用于将其传给epoll,epoll会将准备好的事件复制到数组中 pollArrayAddress = pollArray.address(); //用于保存高位channel绑定的socket文件句柄和感兴趣事件的映射 //EPollArrayWrapper中另有用于保存映射关系的低位数组,两者作用是一样的 if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE) eventsHigh = new HashMap();}//epollCreate的native方法private native int epollCreate();复制我们看下创建epoll的的native方法,这个方法的实现在EPollArrayWrapper.c中,是由c语言写的:
JNIEXPORT jint JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this){ //可以看到简朴干脆,调用了epoll_create方法,并直接返回epoll的文件句柄 int epfd = epoll_create(256); if (epfd < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed"); } return epfd;}复制到这里我们可以看出java中的selector在linux上其实其本质就是一个epoll,当然以笔者看源码的习惯是不会满足于这里,我们还要继续往下看。
而epoll_create()方法的实现就需要从linux源码中探求了,它在eventpoll.c中:
//来源于linux 5.14.7SYSCALL_DEFINE1(epoll_create, int, size){ if (size event.events & EPOLLEXCLUSIVE)) { epds->events |= EPOLLERR | EPOLLHUP; error = ep_modify(ep, epi, epds); } } else error = -ENOENT; break; } .....}复制注:这里关于ep_insert()这个方法笔者就不继续展开,这个方法将对应的fd添加到epoll的红黑树中,并在添加一个回调(叫做ep_poll_callback)绑定在epoll事件发生后,这个回调会在epoll事件发生后将对应的事件放到epoll的准备就绪链表上,并唤醒epoll中的全部等待队列。
代码执行到这里,我们需要监听的全部socoket的fd已经被添加到epoll的红黑树中了,并且当产生对应感兴趣的事件后,会调用我们添加的回调方法,将对应事件复制到epoll的就绪链表中,以是接下来epoll要做的就是监听这个就绪链表,这也就是epollWait()方法的内容,我们一起来看下:
//实现一样是在linux源码中,最后会调用这个方法static int do_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, struct timespec64 *to){ ...... //这个方法会先将一个临时空节点加入就绪队列rdllist中 //之后将就绪队列从eventpoll中卸载,只从临时空节点开始遍历获取全部的就绪事件, //并将其从内核态copy到用户态,copy到之前传入的事件数组pollArray中(之前传入了pollArray的地点) //我们就不继续展开 error = ep_poll(ep, events, maxevents, to); ......}复制到这里pollArray中就有已经就绪的地点了,之后会调用updateSelectedKeys()这个方法(之前提到过的方法)将其复制到key的集合中,以便于我们获取:
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); pollWrapper.poll(timeout); } finally { end(); } processDeregisterQueue(); //这个方法会将pollArray数组中的已经完成的事件取出,找到对应的Key将其 //更新前面提到的SelectionKey的Set集合,即selector.selectedKeys()方法返回的集合 int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.interrupted()) { pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated;}private int updateSelectedKeys() { //获取已经就绪的事件数 int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i |