Skip to main content

线程池如何实现线程复用和回收?

作者:程序员马丁

在线博客:https://open8gu.com

note

大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。

回答话术

因为这线程池的线程复用和超时回收在一个方法里,关联性较强,所以在一篇文章回答里说明。

1. 线程复用

当线程池通过 addWorker 方法创建了运行线程 Worker 后,线程会通过 runWorker 去运行相关任务。该线程运行任务完成后并不会直接销毁,因为 runWorker 方法会让线程在一个 while 循环中,反复调用 getTask 方法从阻塞队列中获取新的任务并执行。

通过该机制保障了在满足工作线程不销毁的前提下,让工作线程不断从阻塞队列中拿到新的任务并运行,保障了线程池线程复用机制。

image.png

2. 线程超时回收

按照上面的逻辑,那线程池里的线程岂不是会一直在 while 循环中拿阻塞队列的任务并运行?如何实现线程池中设置了 keepAliveTime 的空闲回收机制呢?看管莫急,且看下文讲解。

线程池中线程从阻塞队列中获取任务的 getTask 方法有两种机制,第一种是无限期等待阻塞队列中有任务了返回,还有一种是等待设置 keepAliveTime 时间之后返回,如果说等待了 keepAliveTime 时间之后阻塞队列中还是没有任务,那么会返回空,并在 getTask 方法中返回空,最终在 runWorkerask 方法中执行线程销毁逻辑,这个过程也就是超时回收。

image.png

如果之前没有了解过线程超时回收机制,建议看看问题详解章节。

问题详解

1. 前置知识&相关代码

线程池中的执行线程被包装为了 Worker 对象,并通过内部字段 thread 运行。可以看到 Worker 对象实现了 Runnable 接口,重写 run 方法调用了 runWorker 方法。

能保障线程池中线程复用的精髓都在这个方法以及调用的 getTask 方法中,我将相关代码精简出来,会省略部分非核心逻辑,方便大家更容易理解逻辑。

public class ThreadPoolExecutor extends AbstractExecutorService {

private final class Worker extends AbstractQueuedSynchronizer
implements Runnable
{
// ......
/**
* 执行线程池中任务的线程
*/
final Thread thread;

public void run() {
runWorker(this);
}
// ......
}

/**
* 运行提交到线程池的任务
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 通过 while 循环来保障线程可以一直获取任务并运行
while (task != null || (task = getTask()) != null) {
w.lock();
// ......
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

/**
* 从阻塞队列中获取任务
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// ......
int wc = workerCountOf(c);

// 判断是否需要带超时时间去阻塞队列中获取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 是否需要带超时方式获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}

2. 线程复用

线程复用重点在 runWorker 方法上,可以发现内部有个 while 循环,不断通过 getTask 方法去获取阻塞队列的任务,只要阻塞队列里一直有任务那么这个 while 就不会结束。进而,我们的线程也就不会执行到 finally 中的 processWorkerExit 销毁方法。

从阻塞队列中拿到 task 并直接调用 run 方法,本质上是通过 Worker 的 thread 属性运行。当运行完当前 task 后,那么再继续进行新一轮的 while 循环。

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 通过 while 循环来保障线程可以一直获取任务并运行
while (task != null || (task = getTask()) != null) {
w.lock();
// ......
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

3. 线程超时回收

超时回收重点在于 getTask 方法,其中有两行核心代码我标记了下。

/**
* 从阻塞队列中获取任务
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// ......
int wc = workerCountOf(c);

// 核心1:判断是否需要带超时时间去阻塞队列中获取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 核心2:是否需要带超时方式获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

3.1. 如何判断是否需要带超时时间?

虽然代码里两个逻辑判断是反着的,但是我感觉这样讲解会更容易理解些。

boolean timed = wc > corePoolSize || allowCoreThreadTimeOut;

wc > corePoolSize wc(工作线程数量)大于 corePoolSize(核心线程数),则 timed 的值为 true。这个比较容易理解,毕竟 keepAliveTime 参数的语意就是当线程池内的线程超过核心线程数后,多余的线程就会在 keepAliveTime 空余时间后进行销毁。

allowCoreThreadTimeOut 是线程池的一个配置选项,用于控制是否允许核心线程超时销毁。在标准的线程池实现中,核心线程是线程池中一直保持活动状态的线程,不会被销毁 —— 即使它们处于空闲状态。这样可以保证线程池能够快速响应任务的执行请求,避免线程的创建和销毁开销。

然而,有些情况下,当线程池中的任务量较少或任务执行时间较长时,保持大量的核心线程可能会浪费系统资源。这时,可以通过将 allowCoreThreadTimeOut 设置为 true,允许核心线程也受到空闲超时时间的影响,即使它们是核心线程也可能会被销毁。线程池只在乎 “核心线程”的数量而不会真的去区分线程是不是 “核心线程”。

只要走这个逻辑的时候,工作线程数小于核心线程数,那么它就是一个核心线程,等它再回到这个地方的时候,如果工作线程数已经大于核心线程数了,那么它就是一个非核心线程。

所以,两个条件任意为 true 则 timed 都会设置 true。

3.2. 如何使用超时时间获取阻塞队列任务?

阻塞队列 BlockingQueue 接口提供了 take 和 poll 两个公共用于让线程阻塞的从队列中获取元素:

  • take:不设置超时时间,线程会一直阻塞直到成功从队列里获取一个元素为止。
  • poll:只阻塞指定时间,如果超时了还是没法从队列中获取一个元素就放弃。
public interface BlockingQueue<E> extends Queue<E> {

/**
* 检索并删除此队列的头部,一直等待到指定元素可用的等待时间
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

/**
* 检索并删除此队列的头部,必要时等待直到元素可用为止
*/
E take() throws InterruptedException;
}

这两个方法分别对应 getTask 方法中的核心 2 流程。上文有说,如果获取获取不到元素,那么就意味着线程已经空闲了一段时间,按照默认场景下,如果线程池的线程数多余核心线程数,而且又获取不到任务,那么迎接它的将是被销毁。

Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;

如果 getTask 方法返回空的 task,while 循环将会结束,执行 processWorkerExit 销毁逻辑。

3.3. 如何销毁工作线程?

首先解析下 completedAbruptly 参数,对应两种场景:

  • 如果 completedAbruptly 为 true,表示线程执行任务时因为抛出了异常而退出。在这种情况下,会调用 decrementWorkerCount 方法来减少工作线程的计数。
  • 为 false 则继续执行正常工作线程销毁逻辑。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 移除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

// 兜底策略,避免线程池中的线程全部被清除
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

最下面的这段逻辑,做了一层兜底。简单的来说,如果线程是因为抛出异常而退出的,那么会立刻让工作线程数量减一,并且立刻添加一个新的工作线程。而如果线程是因为在规定的超时时间内获取不到任务而退出,那么就需要分情况考虑是否要添加一个新的工作线程。

当核心线程设置是否超时情况下,有两种处理策略:

  • 允许核心线程超时:那么就需要看看工作队列中是否有未消费的任务,如果队列已空,那么不需要补偿新的工作线程,如果工作队列未空,则仅在当前线程池没有任何工作线程时才补充一个新的工作线程。
  • 不允许核心线程超时:那么就需要确认当前工作线程数是否已经大于核心线程数,如果已经大于则不需要补充新的工作线程,否则则需要。

3.4. keepAliveTime 为零还会超时回收么?

如果将 keepAliveTime 设置为零,这意味着非核心线程在空闲后可以立即被回收,而不需要等待超时时间。

也就是这段代码中 poll 方法将等同于没有阻塞等待时间,如果阻塞队列中没有元素将立即返回。

Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;

线程池的 keepAliveTime 设置为零有什么问题呢?

  • 线程资源浪费:线程池中的非核心线程在任务执行完毕后会立即被回收。这种设置可能导致线程频繁地创建和销毁,增加了线程管理的开销。
  • 不利于线程重用:如果无法以保证工作队列始终非空,那么线程池中的线程就有可能因为经常无法从工作队列获取到任务而频繁的超时退出。这种情况下,一旦又无法保证以比较均匀的速度向线程池提交任务,那么线程池就会频繁的创建或者销毁线程,这无疑会增加线程池的压力,造成线程池的性能下降。

3.5. 如何让线程不被回收?

可以通过将线程池的核心线程数和最大线程数设置为相同的值,可以确保线程池中的所有线程都被视为核心线程。这样,即使线程处于空闲状态,它们也不会被回收。

但有一点需要注意,如果将线程池中的线程设置为不可回收,这可能会导致线程资源的浪费,特别是当线程处于空闲状态时。