你好,我是猿java。
今天我们开门见山,直奔主题:线程池。
申明:本文基于 jdk-11.0.15
为什么有线程还需要线程池 {#为什么有线程还需要线程池}
关于 Java线程,有兴趣可以参考小编过往的文章:深度剖析:Java线程运行机制,程序员必看的知识点!
我们先看下线程的几个缺陷:
- Java的线程最终是由操作系统创建的,因此线程的创建,调度都受限于操作系统的处理能力;
- 线程在 Java中是一种宝贵的资源,如果程序创建线程时控制不当,可能导致资源耗尽的风险;
- 互联网的快速发张,很多业务场景会涉及到多线程;
鉴于上面的
内容大纲:
线程池是什么 {#线程池是什么}
线程池(Thread Pool)是 JDK 1.5版本开始引入,由大牛 Doug Lea实现,本文的线程池是指J.U.C提供的 java.util.concurrent.ThreadPoolExecutor 类。 它是一种基于池化思想管理线程的工具,更直白的说,线程池就是用来帮助程序员更优的管理线程的工具。
线程池核心属性 {#线程池核心属性}
下面给出一张创建线程池最底层构造器源码截图(其他方式创建线程池最终都是调用该构造器):
通过 ThreadPoolExecutor源码截图我们可以得出线程池中几个核心的概念:corePoolSize、maximumPoolSize、workQueue、keepAliveTime、threadFactory、handler。
理解了这几个核心概念,线程池基本掌握了50%,下面分别对每个属性进行讲解。
corePoolSize {#corePoolSize}
核心线程数,当提交新任务,如果线程池中的线程数量小于 corePoolSize,即便其它线程处于空闲状态,线程池也会创建一个新线程来处理该任务;
maximumPoolSize {#maximumPoolSize}
最大线程数,线程池中允许创建的最大线程数阈值。当新任务被提交时:
- 如果线程池中的线程数 countWorker大于 corePoolSize并且小于 maximumPoolSize,任务会被放入队列;
- 如果线程池中的线程数 countWorker大于 maximumPoolSize,假如任务能够放入堵塞队列则放入,否则执行拒绝策略;
workQueue {#workQueue}
工作队列,它是 BlockingQueue 堵塞类型,用来传输和保存由 execute()方法提交的 Runnable任务,当线程池中的线程大于等于
corePoolSize 而小于
maximumPoolSize时,任务会被优先放入队列,任务放入队列有 3种策略:
- 直接处理:当 workQueue为
SynchronousQueue(同步队列),则新任务会立即被交给线程池中已有的线程进行处理,如果无法立即获取可用线程,则认为任务放入队列失败,需要重新生成一个线程来处理该任务(如果此时线程数大于
maximumPoolSize,任务被拒绝),为了避免任务被拒绝,一般会把 maximumPoolSize设置成无限大,带来的问题是线程数疯长,有资源耗尽的风险,不过该策略可以避免死锁。 - 无界队列:当 workQueue为无界队列,任务会被放入队列中,等待空闲的线程来处理。如果任务之间是相互独立的,适合使用这种队列,但是当线程处理的速度小于新任务放入的速度,队列会无线封装,依然存在资源耗尽的风险。
- 有界队列:当 workQueue为有界队列,可以规避无界队列资源耗尽的风险,但是这个边界该设置多大比较难把控,如果设置大队列小线程池,虽然可以降低
CPU使用率和线程切换的开销,但是可能导致低吞吐量。
keepAliveTime {#keepAliveTime}
保留存活时间,当线程数大于 corePoolSize时,假如线程池无需处理任务,超出 corePoolSize的空闲线程会保留 keepAliveTime时长后被
terminated,如果设置了 allowCoreThreadTimeOut(true),核心线程也会保留 keepAliveTime时长后会被
terminated,keepAliveTime的值可以通过
setKeepAliveTime(long, TimeUnit)方法动态设置;
threadFactory {#threadFactory}
线程工厂,线程池中的所有线程都是通过 threadFactory进行创建;使用线程工厂可以避免 new Thread()这种硬码创建新线程,我们可以在
threadFactory中使用特殊的线程子类、优先级等功能。
handler {#handler}
拒绝策略,当 Executor关闭时,以及 Executor对最大线程和工作队列容量都使用有限的界限,并且已经饱和时,在方法 execute(Runnable)
中提交的新任务将被拒绝。线程池提供了 4种拒绝策略:
- AbortPolicy:默认的拒绝策略。直接丢弃任务并抛出 RejectedExecutionException这样一个运行时错误;
- CallerRunsPolicy:任务会被转交给调用 execute()方法的线程处理;
- DiscardPolicy:直接丢弃了无法执行的任务;
- DiscardOldestPolicy:如果线程池没有被关闭,则丢弃工作队列头部的任务(最早放入的任务),然后重试执行(可能再次失败,导致重复此操作);
在实际生产中,我们需要根据具体的业务来自定义拒绝策略:
比如:对于一些不能丢失的数据,可以写数据库、写log或者放入MQ中,当线程池恢复处理能力时,再取出数据进行处理。
比如:在数据库中数据有软状态,可以直接丢弃,当线程池恢复处理能力时,再从数据库中获取数据进行处理。
线程池的运行状态 {#线程池的运行状态}
线程池的运行状态主要有 5种:
- RUNNING:接受新任务并处理排队任务;
- SHUTDOWN:不接受新任务,但处理排队任务;
- STOP:不接受新任务,不处理排队任务,并中断正在进行的任务;
- TIDYING:所有任务都已终止,workerCount 为零,转换到状态 TIDYING的线程将运行 terminate()钩子方法;
- TERMINATED:当 terminate()完成,在 awaitTermination()中等待的线程将达到 TERMINATED状态;
5种运行状态的转换关系如下图:
线程池运行状态是通过 ctl变量进行存储的,ctl包装了 workerCount 和 runState两个字段,共 32bit,源码如下:
|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3
| public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); }
|
workerCount:指的是线程池中的工作线程数;
runState:指的是线程池的运行状态,包含 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 5种;
ctl 结构如下图:
线程池工作机制 {#线程池工作机制}
下面整理了一张线程池工作的核心流程图:
线程池源码分析 {#线程池源码分析}
execute()方法 {#execute-方法}
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| public class ThreadPoolExecutor extends AbstractExecutorService { public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); /** * 步骤1:workerCount < corePoolSize,尝试创建一个新线程执行任务 * workerCountOf()方法取出 ctl变量低 29位的值,表示当前活动的线程数; * 如果当前活动的线程数小于corePoolSize,则新建一个线程放入线程池中,并把该任务放到线程中 */ if (workerCountOf(c) < corePoolSize) { /** * addWorker中的第二个参数表示限制添加线程的数量 是根据据corePoolSize 来判断还是maximumPoolSize来判断; * 如果是ture,根据corePoolSize判断 * 如果是false,根据maximumPoolSize判断 */ if (addWorker(command, true)) return; // 如果 addWorker()添加失败,则重新获取 ctl值 c = ctl.get(); } /** * 步骤2:当步骤1的 addWorker()失败,并且线程池是RUNNING,并且任务成功接入阻塞队列 */ if (isRunning(c) && workQueue.offer(command)) { //double-check,重新获取ctl的值 int recheck = ctl.get(); /** * 因为线程池的状态一直在变化,所以再次判断线程池的状态,如果不是运行状态,并且移之前已添加到阻塞队列中command * 然后进入拒绝策略,流程终止 */ if (!isRunning(recheck) && remove(command)) reject(command); /** * 线程池为Running状态,获取线程池中的总线程数,如果数量是0,则执行 addWorker()方法; * 第一个参数为null,表示在线程池中创建一个线程,但不去启动 * 第二个参数为false,将线程池的线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断 */ else if (workerCountOf(recheck) == 0) addWorker(null, false); /** * 步骤3:步骤1的 addWorker()失败并且无法进入步骤2,有两种情况: * 1、线程池的状态不是RUNNING; * 2、线程池状态是RUNNING,但是workerCount >= corePoolSize, workerQueue已满 * 再次调用addWorker方法,第二个参数传 false,将线程池的线程数上限设置为 maximumPoolSize; * 如果失败则执行拒绝策略; */ } else if (!addWorker(command, false)) reject(command); } }
|
execute(Runnable command) 方法总结:
- 如果 workerCount < corePoolSize,则尝试创建并启动一个新线程来执行新提交的任务;
- 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添 加到该阻塞队列中;
- 如果 corePoolSize <= workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则尝试创建并启动一个新线程来执行新提交的任务;
- 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的
AbortPolicy策略是直接抛异常;
addWorker()方法 {#addWorker-方法}
|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| public class ThreadPoolExecutor extends AbstractExecutorService { /** * 给线程池增加新的 worker * @param firstTask 当前任务 * @param core 创建新worker是以 corePoolSize还是 maximumPoolSize作为最大阈值 * @return 返回增加新的 worker的结果,true or false */ private boolean addWorker(Runnable firstTask, boolean core) { // retry 作用域为 for循环 retry: // 自旋 for (int c = ctl.get(); ; ) { // Check if queue empty only if necessary. /** * 这里的判断比较绕,直白的说:线程池已经关闭 或者 有资格关闭,则无法 addWorker并且返回false; * 如果线程池当前的 runState为 SHUTDOWN、STOP、TIDYING、TERMINATED 其中一种;并且下面的结果 * 三个条件 通过 || (或) 连接,只要有一个为 true,就返回 true; * 1. 线程池当前的 runState为 STOP、TIDYING、TERMINATED 其中一种;表示关闭状态,不再接收提交的任务,但却可以继续处理阻塞队列中已经保存的任务; * 2. firstTask 不为空 * 3. Check if queue empty only if necessary.阻塞队列为空 */ if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (; ; ) { // workerCount 大于最大线程阈值,无法 addWorker,返回false if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 通过 CAS 方式来增加线程数,如果成功,则跳出最外层 for if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 线程池状态为 非 RUNNING,则重新进入 for循环 if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // addWorker() 的核心流程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { /** * 根据firstTask来创建Worker对象,每个Worker对象里面会封装一个通过 ThreadFactory创建的线程 * Worker(Runnable firstTask) { * setState(-1); // inhibit interrupts until runWorker * this.firstTask = firstTask; * this.thread = getThreadFactory().newThread(this); * } */ w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 获取可重入锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); // 如果线程池的runState为 RUNNING状态,或者 runState < STOP(RUNNING或者SHUTDOWN)并且任务为空 // 如果线程不为NEW,则抛异常 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动线程,Worker implements Runnable,是一个线程类,此时t.start()会调用 Worker的run()方法,从而调用 runWorker()方法 t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; } }
|
addWorker() 方法总结:
- addWorker()方法的主要作用是在线程池中创建一个新的线程并执行任务;
- firstTask参数用于指定新增的线程执行的第一个任务,
- core参数用来限制创建新线程,是以 corePoolSize还是以 maximumPoolSize作为线程数最大阈值。
- 需要获取 mainLock可重入锁成功才能将 worker加入 HashSet中
- worker添加成功后,调用线程的t.start() 启动线程执行任务;
Worker类分析 {#Worker类分析}
|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; // TODO: switch to AbstractQueuedLongSynchronizer and move // completedTasks into the lock word. /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { /** * 把state设置为 -1,阻止中断直到调用runWorker方法; * 因为 AQS默认 state是 0,刚创建一个 Worker对象,在没有执行任务时,不应该被中断 */ setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; /** * 创建一个线程,newThread()方法入参是 this对象,也就是 Worker对象, * 因为 Worker implements Runnable,Worker 是一个线程类; * 所以 Worker对象中的 Thread.start()会间接调用 Worker类中 run()方法 */ this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. * 将主运行循环委托给外部 runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } /** 获取锁 * CAS 方式修改 state,不可重入; * state根据 0 来判断,所以 Worker构造方法中state=-1,就是为了禁止在执行任务前对线程进行中断; * 因此,在 runWorker()方法中会先调用Worker对象的 unlock()方法将 state设置为 0 */ protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 释放锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
|
Worker类总结:
- Worker类实现了 Runnable接口,因此是一个线程类。
- Worker类继承了AQS,使用了 AQS实现独占锁的功能。
- 为什么不使用 ReentrantLock可重入锁来实现?
从源码的 tryAcquire()方法可以看出它是不允许重入的,而ReentrantLock是允许可重入的:
- lock方法一旦获取独占锁,表示当前线程正在执行任务中;
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲状态,说明它没有处理任务,这时可以对该线程进行中断;
- 线程池中执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入的,是因为在任务调用setCorePoolSize这类线程池控制的方法时,不会中断正在运行的线程所以,Worker继承自AQS,用于判断线程是否空闲以及是否处于被中断。
runWorker() {#runWorker}
|------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 获取任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } }
|
runWorker()方法总结:
- while 死循环通过getTask()方法从阻塞队列中获取任务,当getTask()为null时跳出 while()循环;
- 正常进入 while死循,先加锁,然后进入任务处理逻辑;
- 如果线程池正在停止,则中断当前线程,调用 processWorkerExit()方法,runWorker()结束;
- 调用 task.run() 执行任务;
- while 循环不论何种原因跳出,最终都需要执行 processWorkerExit()方法;
- runWorker()方法执行成功,意味着 Worker中的 run()方法执行成功,则释放锁,销毁当前线程;
getTask()方法 {#getTask-方法}
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public class ThreadPoolExecutor extends AbstractExecutorService { private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (; ; ) { int c = ctl.get(); /** * 再次判断线程池的 runState,决定能够获取到 task */ // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /** * timed 变量用于判断是否需要进行 keepAliveTime超时处理; * allowCoreThreadTimeOut 默认是 false,代表核心线程不允许超时; * wc > corePoolSize,表示当前线程数大于核心线程数; * 对于超出 corePoolSize的线程,需要 keepAliveTime超时处理; */ // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * wc > maximumPoolSize 产生的原因是:此方法执行阶段同时执行了 setMaximumPoolSize()方法; * timed && timedOut 如果为true,表示当前操作需要进行获取任务超时处理,并且上次从阻塞队列中获取任务发生了超时; * 如果池中的线程数大于 1,或者 workQueue为空,则尝试将 workCount减1;如果减1失败,则返回重试; */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } /** * 从工作队列中获取任务,根据 timed来决定采用超时获取task()还是堵塞获取task * timed为 true,则通过workQueue的poll方法进行超时控制,如果在keepAliveTime时间内没有获取任务,则返回null; * 否则通过take方法,如果队列为空,则take方法会阻塞直到队列中不为空; */ try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } }
|
getTask()方法总结:
- 判断线程池的 runState决定是否能返回任务;
- 判断是否需要对超出 corePoolSize的线程最超时推出操作;
- 根据 timed来判断 采用什么方式从阻塞队列中获取任务;
processWorkerExit()方法 {#processWorkerExit-方法}
|------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class ThreadPoolExecutor extends AbstractExecutorService { private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly为 true,则说明线程执行时出现异常,需要将 workerCount 减1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; // 获取锁 mainLock.lock(); try { // 已完成 taskCount计数加1 completedTaskCount += w.completedTasks; // 从 workers hashSet中移除当前 worker workers.remove(w); } finally { mainLock.unlock(); } // 钩子函数,根据线程池的状态来判断是否结束线程池 tryTerminate(); int c = ctl.get(); /** * 当前线程 runState为 RUNNING或 SHUTDOWN 时,如果 worker是异常结束,会执行 addWorker()方法; * 如果allowCoreThreadTimeOut=true,那么等待队列有任务,至少保留一个worker; * 如果allowCoreThreadTimeOut=false,workerCount少于coolPoolSize */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } }
|
到此,线程池的核心方法已经分析完毕,那么,我们需要如何创建线程池呢?
线程池创建方式 {#线程池创建方式}
手动创建 {#手动创建}
参数按需设置,比如下面代码就是自己创建的一个线程池:
|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7
| ThreadPoolExecutor executor=new ThreadPoolExecutor(2, 4, 4, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
|
使用 Executors提供的方法 {#使用-Executors提供的方法}
newFixedThreadPool
固定线程数的线程池,corePoolSize = maximumPoolSize,keepAliveTime = 0,工作队列为无界堵塞队列 LinkedBlockingQueue。
- 优点:线程数是固定的,不会无限增长。
- 缺点:线程数是固定的,无法动态调整;工作队列是无界的,如果任务堆积过多可能导致内存溢出;不支持自定义拒绝策略
newSingleThreadExecutor
只有一个线程的线程池,corePoolSize = maximumPoolSize = 1,keepAliveTime = 0,工作队列为无界堵塞队列 LinkedBlockingQueue。
- 优点:可以顺序执行任务
- 缺点:线程数为1,不支持并发,无法动态调整;工作队列是无界的,如果任务堆积过多可能导致内存溢出;不支持自定义拒绝策略
newCachedThreadPool
按需要创建新线程的线程池,核心线程数为0,最大线程数为 Integer.MAX_VALUE,keepAliveTime = 60s,工作队列使用同步SynchronousQueue。
- 优点:可以无线添加新的线程,当需求降低时会自动回收空闲线程。适用于执行很多的短期异步任务,或者是负载较轻的服务器。
- 缺点:不支持自定义拒绝策略
newScheduledThreadPool
创建一个以延迟或定时的方式来执行任务的线程池,堵塞队列为 DelayedWorkQueue。
- 优点:适用于需要多个后台线程执行周期任务
- 缺点:任务是单线程方式执行,一旦一个任务失败其他任务也受影响;不支持自定义拒绝策略
newWorkStealingPool
窃取(抢占式操作)线程池,JDK 1.8 新增,底层使用 ForkJoinPool 实现,1个任务拆分成多个"小任务","小任务"被分发到多个线程上执行,"小任务" 都执行完成后,再将结果合并。每一个线程都有一个自己的队列,当线程发现自己的队列没有任务了,就会到其它线程的队列里获取任务执行,这个过程理解为"窃取"。
和上面 4种线程池有很明显的区别,前 4种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题,任务的执行是无序的,哪个线程抢到任务。
- 优点:每个线程有自己的队列,可以减少队列的争用,适合并发量大的任务
- 缺点:不会保证任务的顺序执行,不支持自定义拒绝策略
线程池终止方式 {#线程池终止方式}
终止线程池主要有两种方式:
- shutdown():不接受新任务,但是在关闭前会将之前提交的任务处理完毕。
- shutdownNow():直接关闭线程池,通过 Thread.interrupt()方法终止所有线程,不会等待之前提交的任务执行完毕,但是会返回队列中未处理的任务。
如何配置线程池 {#如何配置线程池}
线程池参数的配置,会依据服务器的工作类型来设置,参考意见如下:
-
计算密集型,设置 线程数 = CPU数 + 1;
-
I/O密集型,线程数 = CPU数 * 2;
但是在实际开发中,服务器很难完全区分是哪一种类型,因此一个比较合理的设置是:
线程数 = CPU数 * CPU利用率 * (任务等待时间 / 任务计算时间 + 1)
比如,程序部署在 4核的服务器上用于任务计算,假设任务计算时长是 100ms,等待 I/O操作为 400ms,则线程数约为:4 * 1 * (1 + 400 / 100) = 20个
不过,上面的都是参考值,在实际生产中的做法是:经过压测得出最佳线程数,并且把 corePoolSize 和 maximumPoolSize
设计成可配置化参数,一旦需要调整大小,可以直接通过配置中心来完成,以免重新发布代码。
常见面试题 {#常见面试题}
注意:在面试中回答线程池的问题,一定要先说明 jdk版本,因为不同的版本实现可能有差异(jdk修复bug或者性能优化导致的)。有了上面对线程池原理的分析,下面的面试题可以很容易的回答。
1.为什么有线程还需要线程池呢? {#1-为什么有线程还需要线程池呢?}
线程在正常执行或者异常中断时会被销毁,如果频繁的创建很多线程,不仅会消耗系统资源,还会降低系统的稳定性,一不小心把系统搞崩了。
使用线程池可以带来以下几个好处:
- 线程池内部的线程数是可控的;
- 可以灵活的设置参数;
- 线程池内会保留部分线程,当提交新的任务可以直接运行;
- 方便内部线程资源的管理,调优和监控;
2.能讲下线程池中几个核心属性吗? {#2-能讲下线程池中几个核心属性吗?}
参考上述 核心属性 章节
3.能讲下线程池的运行原理吗? {#3-能讲下线程池的运行原理吗?}
参考上述 运行原理 章节
4.线程池中的核心线程会被销毁吗? {#4-线程池中的核心线程会被销毁吗?}
默认情况下不会,但是如果设置了 allowCoreThreadTimeOut(true),核心线程也会保留 keepAliveTime时长后会被 terminated。
5.能讲下线程池的运行状态吗?他们之间是如何转换的? {#5-能讲下线程池的运行状态吗?他们之间是如何转换的?}
参考上述 线程池的运行 章节
6.能聊聊线程池的拒绝策略吗? {#6-能聊聊线程池的拒绝策略吗?}
参考上述 拒绝策略 章节
7.能讲讲 ctl的实现机制吗? {#7-能讲讲-ctl的实现机制吗?}
ctl将 runState 和 workerCount 封装成了一个原子 AtomicInteger操作。因为 runState 和 workerCount 是线程池正常运行的2个最重要属性。
因此无论是查询还是修改,我们必须保证对这2个属性的操作是属于"同一时刻"的,也就是原子操作,否则就会出现错乱的情况。如果我们使用2个变量来分别存储,要保证原子性则需要额外进行加锁操作,这显然会带来额外的开销,而将这2个变量封装成1个
AtomicInteger 则不会带来额外的加锁开销,而且只需使用简单的位操作就能分别得到 runState 和 workerCount。
由于这个设计,workerCount 的上限 CAPACITY = (1 << 29) - 1,对应的二进制原码为:0001 1111 1111 1111 1111 1111 1111
1111(29个1)。
通过 ctl得到 runState:runState = ctl & ~~CAPACITY。(~~ 按位取反),于是"~CAPACITY"的值为:1110 0000 0000 0000 0000 0000 0000
0000,只有高3位为1,与 ctl 进行 & 操作,结果为 ctl
高3位的值,也就是 runState。
通过 ctl得到 workerCount:workerCount = c & CAPACITY(位操作)。
8.Executors提供了哪些创建线程池的方法? {#8-Executors提供了哪些创建线程池的方法?}
参考上述 线程池创建方法 章节
9.线程池中的核心线程数和最大线程数该如何设置呢? {#9-线程池中的核心线程数和最大线程数该如何设置呢?}
10.如何终止线程池? {#10-如何终止线程池?}
参考上述 终止线程池 章节
11.为什么线程池中要使用堵塞队列? {#11-为什么线程池中要使用堵塞队列?}
- 主要原因:线程从阻塞队列取任务时,如果阻塞队列不为空则立即返回,如果为空,则线程会被阻塞,一直等待,直到队列中有新的任务,这样就充分的利用了阻塞队列
堵塞和通知线程的功能。如果采用非堵塞队列,则需要写额外的机制来实现通知线程获取任务。 - 次要原因:当核心线程处理不过来,可以通过堵塞队列进行任务堆积
12. 非核心线程能成为核心线程吗? {#12-非核心线程能成为核心线程吗?}
核心线程和非核心线程是理解上的区分,线程池内部并不区分核心线程和非核心线程的,会根据池的工作线程数 countWorker和 corePoolSize 进行调整,
- 当 countWorker <= corePoolSize,则 countWorker这些线程被理解成核心线程;
- 当 corePoolSize < countWorker < maximumPoolSize,则 countWorker - corePoolSize 这部分线程被理解成非核心线程;
13. 线程池中的线程在什么时候创建? {#13-线程池中的线程在什么时候创建?}
默认情况下,即使是核心线程也只能在新任务到达时才创建和启动。但是可以使用 prestartCoreThread 或 prestartAllCoreThreads 方法来提前启动核心线程,即创建线程池的时候就会创建对应数量的线程。
14. 有界队列和无解队列有什么风险? {#14-有界队列和无解队列有什么风险?}
- 有界队列,当线程池的队列满了后,被拒绝的任务如何处理;
- 无界队列,当任务的提交速度大于线程池的处理速度,可能会导致内存溢出;
1.5 为什么不推荐使用Executors包装的线程池? {#1-5-为什么不推荐使用Executors包装的线程池?}
参考上述 线程池创建方式 章节
总结 {#总结}
到此,我们对线程池进行了全面的分析,也列举了面试中常见的问题,对于线程池的学习,个人极力推荐优先阅读源码,毕竟是原滋原味,然后结合一些优秀的文章,比如本文?,对照着理解,这样的话就能快速的掌握理论,
但是,光有理论还不行,还应该多参与一些关于线程池的开发工作,也可以多参与一些生产环境线程池的监控和排错任务,加强实战水平,这样才能在工作和面试中游刃有余。
文章总结不易,看到的这里的小伙伴,还请帮忙点赞,关注哦!