从源码全面解析Java 线程池的来龙去脉
温馨提示:这篇文章已超过391天没有更新,请注意相关的内容是否还可用!
- 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
- 📕系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
- 📝联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀
文章目录
- 线程池源码剖析
- 一、引言
- 二、使用
- 三、源码
- 1、初始化
- 1.1 拒绝策略
- 1.1.1 AbortPolicy
- 1.1.2 CallerRunsPolicy
- 1.1.3 DiscardOldestPolicy
- 1.1.4 DiscardPolicy
- 1.1.5 自定义拒绝策略
- 1.2 其余变量
- 2、线程池的execute方法
- 3、线程池的addWorker方法
- 3.1 校验
- 3.2 添加线程
- 4、线程池的 worker 源码
- 5、线程池的 runWorker 方法
- 6、线程池的 getTask 方法
- 7、线程池的 processWorkerExit 方法
- 8、线程池的关闭方法
- 8.1 shutdownNow 方法
- 8.2 shutdown 方法
- 四、流程图
- 五、总结
线程池源码剖析
一、引言
线程池技术在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在线程池技术的使用和原理方面对小伙伴们进行 360° 的刁难。
作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。
于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写 《吊打面试官》 系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer!
虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马
二、使用
我想大部分人应该都使用过线程池,我们的 JDK 中也提供了一些包装好的线程池使用,比如:
-
newFixedThreadPool:返回一个核心线程数为 nThreads 的线程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } -
newSingleThreadExecutor:返回一个核心线程数为 1 的线程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue())); } -
newCachedThreadPool:大同小异
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }通过上面 JDK 提供的我们可以发现一个共识,他们其实都是调用了 ThreadPoolExecutor 的构造方法来进行线程池的创建
这时候,我们不免有疑问,我们难道不可以直接使用 ThreadPoolExecutor 的构造方法去进行创建嘛
是的,阿里巴巴Java开发手册中明确指出,『不允许』使用Executors创建线程池
所以,我们在生产中,一般使用 ThreadPoolExecutor 的构造方法自定义去创建线程池,比如:
public class ThreadPoolTest { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, // 核心线程数 5, // 最大线程数 200, // 非核心工作线程在阻塞队列位置等待的时间 TimeUnit.SECONDS, // 非核心工作线程在阻塞队列位置等待的单位 new LinkedBlockingQueue(), // 阻塞队列,存放任务的地方 new ThreadPoolExecutor.AbortPolicy() // 拒绝策略:这里有四种 ); for (int i = 0; i三、源码
整体的流程如下:
1、初始化
聊源码不从初始化聊的,都是不讲道理的
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } // 执行ThreadPoolExecutor的构造方法进行初始化 // corePoolSize: 核心线程数 // maximumPoolSize: 最大线程数 // keepAliveTime: 非核心工作线程在阻塞队列位置等待的时间 // unit: 非核心工作线程在阻塞队列位置等待的时间单位 // workQueue: 存放任务的阻塞队列 // threadFactory: 线程工厂(生产线程的地方) // RejectedExecutionHandler: 拒绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 核心线程数可以为0 // 最大线程数不为0 // 最大线程数 大于 核心线程数 // 等待时间大于等于0 if (corePoolSize5、线程池的 runWorker 方法
public void run() { runWorker(this); } final void runWorker(Worker w) { // 拿到当前的线程 Thread wt = Thread.currentThread(); // 拿到当前Worker的第一个任务(如果携带的话) Runnable task = w.firstTask; // 置空 w.firstTask = null; // 解锁 w.unlock(); boolean completedAbruptly = true; try { // 如果任务不等于空 或者 从阻塞队列中拿到的任务不等于空 while (task != null || (task = getTask()) != null) { // 加锁 w.lock(); // 如果线程池状态 >= STOP,确保线程中断了 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 在任务执行前做一些操作,自己实现的钩子 beforeExecute(wt, task); Throwable thrown = null; try { // 任务执行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任务执行后一些操作:自己实现的钩子 afterExecute(task, thrown); } } finally { // 任务置空 task = null; // 执行任务+1 w.completedTasks++; // 解锁 w.unlock(); } } completedAbruptly = false; } finally { // 删除线程的方法 processWorkerExit(w, completedAbruptly); } }6、线程池的 getTask 方法
private Runnable getTask() { // 超时的标记 boolean timedOut = false; // 死循环拿数据 for (;;) { // 拿到当前的ctl int c = ctl.get(); // 获取其线程池状态 int rs = runStateOf(c); // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 线程池中的线程个数减一 decrementWorkerCount(); return null; } // 当前线程池中线程个数 int wc = workerCountOf(c); // 这里是个重点 // allowCoreThreadTimeOut:是否允许核心线程数超时(开启这个之后),核心线程数也会执行下面超时的逻辑 // wc > corePoolSize:当前线程池中的线程个数大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // wc > maximumPoolSize:基本不存在 // timed && timedOut:第一次肯定是失败的(超时标记为false) if ((wc > maximumPoolSize || (timed && timedOut)) // 1、线程个数为1 // 2、阻塞队列是空的 && (wc > 1 || workQueue.isEmpty())) { // 线程池的线程个数减一 if (compareAndDecrementWorkerCount(c)){ return null; } continue; } try { // 根据我们前面的timed的值(当前线程池中的线程个数是否大于核心线程数) // 如果大于,执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)带有时间的等待,超过时间无任务,会返回null // 如果小于,执行workQueue.take(),死等任务,不会返回null Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null){ return r; } // 到这里,说明上面的等待超时了 // 这里要注意一下,如果这里超时后,我们上面 if ((wc > maximumPoolSize || (timed && timedOut)) 这个判断要起作用了 // (timed && timedOut) true // wc > 1 || workQueue.isEmpty():当线程大于1或者阻塞队列无数据,直接返回null,让外部循环删除 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }7、线程池的 processWorkerExit 方法
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 正常退出的:completedAbruptly=false // 不是正常退出的:completedAbruptly=true if (completedAbruptly) decrementWorkerCount(); // 加锁——上锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将当前worker的执行任务数累加到线程池中 completedTaskCount += w.completedTasks; // 线程池删除该工作线程 workers.remove(w); } finally { // 解锁 mainLock.unlock(); } tryTerminate(); // 获取ctl的数据 int c = ctl.get(); // 这里只有SHUTDOWN、RUNNING会进入判断 if (runStateLessThan(c, STOP)) { // 正常退出的 if (!completedAbruptly) { // 是否允许超时 // 允许:0 // 不允许:核心线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min=0并且阻塞队列不为空 // 将min设置成1 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 当前线程池的大小大于最小值,直接返回即可 if (workerCountOf(c) >= min){ return; } } // 如果没有的话,说明线程池中没有线程了,并且还有阻塞任务 // 只能添加一个非核心线程去处理这些任务 addWorker(null, false); } }8、线程池的关闭方法
8.1 shutdownNow 方法
- 将线程池状态修改为Stop(不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管)
- 将线程池中的线程全部中断
- 删除当前线程池所有的工作线程
- 将线程池的状态从:Stop --> TIDYING --> TERMINATED,正式标记线程池的结束(唤醒一下等待的主线程)
public List shutdownNow() { // 声明返回结果 List tasks; // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 将线程池状态修改为STOP advanceRunState(STOP); // 将线程池中的线程全部中断 interruptWorkers(); // 删除当前所有的工作线程 tasks = drainQueue(); } finally { // 解锁 mainLock.unlock(); } // 查看当前线程池是否可以变为TERMINATED状态 // 从 Stop 状态修改为 TIDYING,在修改为 TERMINATED tryTerminate(); return tasks; } // targetState = STOP // 作用:将当前线程池的状态修改为Stop private void advanceRunState(int targetState) { // 进来直接死循环 for (;;) { // 拿到当前的ctl int c = ctl.get(); // runStateAtLeast(c, targetState):当前的c是不是大于STOP(如果大于Stop的话,说明线程池状态已经G了 // 基于CAS,将ctl从c修改为Stop状态,不修改工作线程个数,仅仅将状态修改为Stop // 如果可以修改成功,直接退出即可 if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } // 将线程池中的线程全部中断 private void interruptWorkers() { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 循环遍历线程组 for (Worker w : workers) // 中断线程 // 这里会给线程打一个中断的标记,具体什么时候中断线程,需要我们自己去控制 w.interruptIfStarted(); } finally { // 解锁 mainLock.unlock(); } } // 删除当前所有的工作线程 private List drainQueue() { // 存放工作线程的队列 BlockingQueue q = workQueue; // 返回的结果 ArrayList taskList = new ArrayList(); // 清空阻塞队列并将数据放入taskList中 q.drainTo(taskList); // 校验当前的数据是够真的清空 if (!q.isEmpty()) { // 如果确实有遗漏的,毕竟这哥们也没上锁 // 手动的将线程从workQueue删除掉并且放到taskList中 for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } // 最终返回即可 return taskList; } final void tryTerminate() { for (;;) { // 拿到ctl int c = ctl.get(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // CAS将当前的ctl设置成TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 该方法是一个钩子函数,我们自己定义,在线程池销毁之前做最后的处理 terminated(); } finally { // 将ctl设置成TERMINATED标志着线程池的正式结束 ctl.set(ctlOf(TERMINATED, 0)); // 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。 // 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作 // 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程 // 简单来说,当时等待线程池返回的主线程,由于线程池已经销毁了,他们也必须要唤醒 termination.signalAll(); } return; } } finally { // 解锁 mainLock.unlock(); } } }8.2 shutdown 方法
public void shutdown() { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 将线程池状态修改为SHUTDOWN advanceRunState(SHUTDOWN); // 将线程池中的线程全部中断 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 查看当前线程池是否可以变为TERMINATED状态 // 从 SHUTDOWN 状态修改为 TIDYING,在修改为 TERMINATED tryTerminate(); }四、流程图
五、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。
其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。
如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长
我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。
我们下期再见。
我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。
往期文章推荐:
- 从源码全面解析LinkedBlockingQueue的来龙去脉
- 从源码全面解析 ArrayBlockingQueue 的来龙去脉
- 从源码全面解析ReentrantLock的来龙去脉
- 阅读完synchronized和ReentrantLock的源码后,我竟发现其完全相似
- 从源码全面解析 ThreadLocal 关键字的来龙去脉
- 从源码全面解析 synchronized 关键字的来龙去脉
- 阿里面试官让我讲讲volatile,我直接从HotSpot开始讲起,一套组合拳拿下面试
-



