线程池的详细使用方法和参数解析等我在之前已经讲解过,如果对线程池基本用法和概念不清楚的可以先看下我之前的线程池的文章,这里就通过一张线程池运行流程图来资助大家去简朴了解下线程池的工作原理。
线程池源码我们主要通过ThreadPoolExecutor进行分析,一步一步分析线程池源码的焦点内容。
01
属性解析
//高3位:表示当前线程池运行状态 撤除高3位之后的低位: // 表示当前线程池所拥有的线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 表示在ctl中,低COUNT_BITS位 用于存放当前线程数量的位 private static final int COUNT_BITS = Integer.SIZE - 3; //低COUNT_BITS位 所能表达的最大数值 private static final int CAPACITY = (1 largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } if (workerAdded) { // 添加work成功后,将创建的线程启动 t.start(); workerStarted = true; } } } finally { // 启动失败 if (!workerStarted) // 释放令牌 // 将当前worker清算出workers集合 addWorkerFailed(w); } return workerStarted; }addWorker方法总体就是做了两件事
第一步:判定是否可以创建新的Work
第二步:如果可以创建就创建新的Work,然后添加到任务队列当中,末了启动该线程。
这里会看到,创建Work会加锁,加了一个来包管线程安全,新创建的Work会添加到任务队列当中,这个任务队列实在就是通过HashSet来存储work,末了启动线程,启动线程后,真正运行这个任务的方法就不在execute当中,而是通过
Work类中的run方法来执行。
04
runWorker方法
通过execute方法来启动线程后,就会通过work类中的run方法调用ThreadPoolExecutor的runWork方法来运行任务。
// 当worker启动时,会执行run方法 public void run() { runWorker(this); } final void runWorker(Worker w) { // 工作线程 Thread wt = Thread.currentThread(); // 任务 Runnable task = w.firstTask; // 逼迫释放锁 // 这里相当于无视那边的中断标记 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 取任务,如果有第一个任务,这里先执行第一个任务 // 只要能取到任务,这就是个死循环 // getTask:取任务 while (task != null || (task = getTask()) != null) { // 加锁,是由于当调用shutDown方法它会判定当前是否加锁,加锁就会跳过它接着执行下一个任务 w.lock(); // 查抄线程池状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,方便子类在任务执行前做一些处理 beforeExecute(wt, task); Throwable thrown = null; try { // 真正任务执行的地方 //task 可能是FutureTask 也可能是 平凡的Runnable接口实现类。 //如果前面是通过submit()提交的 runnable/callable 会被封装成 FutureTask。这个不清楚,请看上一期,在b站。 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }runWorker方法就是真正执行任务的方法,如果有第一个任务就先执行第一个任务,第一个任务执行完后就通过getTask()方法从任务队列中获取任务来执行。
05
getTask()方法
private Runnable getTask() { // 是否超时 boolean timedOut = false; // Did the last poll() time out? // 自旋 for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); //当前程池状态是SHUTDOWN的时候会把队列中的任务执行完直到队列为空 // 线程池状态是stop时退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 获取工作线程数量 int wc = workerCountOf(c); // 是否答应超时,有两种情况: // 1. 是答应焦点线程数超时,这种就是说所有的线程都可能超时 // 2. 是工作线程数大于了焦点数量,这种肯定是答应超时的 // 注意,非焦点线程是肯定答应超时的,这里的超时实在是指取任务超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 真正取任务的地方 // 默认情况,只有当工作线程数量大于焦点线程数量时,才会调用poll方法触发超时调用 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 取到任务就返回 if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }这里取任务会根据工作线程的数量判定是使用BlockingQueue的poll(timeout, unit)方法还是take()方法。
poll(timeout, unit)方法会在超时时返回null,如果timeout |