Skip to main content

为什么不建议Executors创建线程池?

作者:程序员马丁

在线博客:https://open8gu.com

note

大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。

回答话术

Executors 是 Java 提供的一个工具类,位于 java.util.concurrent 包中,用于创建线程池。它提供了一些静态方法来便捷地创建不同类型的线程池。

常见有四种生成策略:

  • newFixedThreadPool(int nThreads) - 创建一个固定大小的线程池。
  • newCachedThreadPool() - 创建一个核心线程数 0,最大线程数为 Integer 最大值的线程池,阻塞队列使用 SynchronousQueue
  • newSingleThreadExecutor() - 创建一个只有单个工作线程的线程池。它确保所有任务按顺序执行,不需要担心多线程问题。
  • newScheduledThreadPool(int corePoolSize) - 创建一个定时线程池,它可以在给定的延迟后运行命令,或者定期执行。

Executors 提供了创建线程池的便捷方法,然而,Executors 基于默认配置创建的线程池可能并不适合所有场景,这里我们说下每个方法创建的线程池都有哪些弊端:

  • newFixedThreadPoolnewSingleThreadExecutor:这两种固定大小的线程池使用无界的 LinkedBlockingQueue 作为工作队列。当任务提交速度超过处理速度时,工作队列会不断增长,可能导致内存溢出。
  • newScheduledThreadPool:虽然最大线程数是 Integer 最大值,但是因为阻塞队列是无界的,所以核心问题同上。
  • newCachedThreadPool:核心线程数为 0,使用同步的 SynchronousQueue,并且允许创建无限数量的线程。在高并发情况下,可能会创建大量线程,导致系统资源耗尽,甚至使系统崩溃。

大家需要格外注意,网上很多资料说 newScheduledThreadPool 的问题是会创建大量线程导致 OOM,其实说法不对。详情参上。

而且,上述线程池所使用的拒绝策略都是 AbortPolicy,当队列和线程满了后再提交新任务会抛出 RejectedExecutionException,可能不是所有场景下都适用。

总而言之,基于上述原因,当在我们在要使用线程池的时候,手动的创建 ThreadPoolExecutor,并且根据自己的场景来指定它的各项参数和配置。

问题详解

1. 不推荐线程池创建形式

1.1. newFixedThreadPool

应用场景:针对生产消费数量稳定的场景设计的,等同于我们把核心与最大线程数设置为一样。

可以看到方法底层调用的是 ThreadPoolExecutor 构造器,看到阻塞队列使用的是 new LinkedBlockingQueue<Runnable>(),当任务提交速度超过处理速度时,工作队列会不断增长,可能导致内存溢出。

/**
* 创建一个线程池,该线程池重用固定数量的线程,这些线程从共享的无界队列中运行。
* 在任何时刻,最多有 {@code nThreads} 个线程会处于活跃状态,处理任务。
* 如果在所有线程都活跃并处理任务时提交了额外的任务,它们将等待在队列中,直到有线程可用。
* 如果任何线程在关闭之前由于执行期间的失败而终止,如果需要执行后续的任务,将会有新线程替代它的位置。
* 线程池中的线程将一直存在,直到显式调用 {@link ExecutorService#shutdown shutdown} 方法关闭它。
*
* @param nThreads 线程池中的线程数量
* @return 新创建的线程池
* @throws IllegalArgumentException 如果 {@code nThreads <= 0}
*/

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

1.2. newSingleThreadExecutor

应用场景:适用于需要顺序执行任务的情况,保证任务按顺序执行。确保同一时间只有一个任务在执行,避免并发问题。

创建一个只有单个工作线程的线程池,当任务提交速度超过处理速度时,工作队列会不断增长,可能导致内存溢出。问题同上。

/**
* 创建一个使用单个工作线程运行的执行器(Executor),该线程从无界队列中提取任务。(注意,如果这个单线程在关闭之前由于执行期间的失败而终止,如果需要执行后续任务,将会有新线程替代它的位置。)
* 任务保证按顺序执行,在任何给定时间不会有多个任务处于活跃状态。
* 与 {@code newFixedThreadPool(1)} 不同的是,返回的执行器保证不会重新配置为使用额外的线程。
*
* @return 新创建的单线程执行器
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

1.3. newCachedThreadPool

应用场景:适用于短时间内需要处理大量任务的情况,任务执行时间较短。可以快速增长线程数以应对突发任务负载。

线程池构造器中最大线程数使用 Integer.MAX_VALUE,并且使用同步阻塞队列。相当于阻塞队列不缓冲任务,如果线程池中线程都在运行的话,会直接继续创建线程。

高并发情况下,该线程池可能会创建出大量线程,这无疑会带来严重的问题。从线程调度的角度来说,这些过多的线程在调度时会因为频繁的上下文切换带来额外的开销,并且大量的线程也会占用过多的 CPU 资源,进而影响系统的整体吞吐量和响应速度。

而从内存的角度来说,Java 中每个线程对象都需要一定的内存来存储其线程栈和上下文信息,过多的线程对象会增加内存消耗,甚至可能导致内存不足或频繁的内存页调度,进而影响系统稳定性和性能。

/**
* 创建一个线程池,根据需要创建新线程,但在可用时会重用先前构造的线程。
* 这些线程池通常会提高执行许多短暂异步任务的程序的性能。
* 调用 {@code execute} 方法时,如果有可用的先前构造的线程,则会重用它们。
* 如果没有可用的现有线程,则会创建一个新线程并添加到线程池中。
* 未使用超过六十秒的线程将被终止并从缓存中移除。
* 因此,空闲足够长时间的线程池将不会消耗任何资源。
* 请注意,可以使用 {@link ThreadPoolExecutor} 构造函数创建具有类似属性但细节不同(例如超时参数)的线程池。
*
* @return 新创建的线程池
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

1.4. newScheduledThreadPool

应用场景:需要定期或延迟执行任务的情况,常用于周期性任务调度,如心跳检测、定时报告生成等。

创建一个定时线程池,它可以在给定的延迟后运行命令,或者定期执行。

/**
* 创建一个具有给定核心池大小的 {@code ScheduledThreadPoolExecutor}。
*
* @param corePoolSize 保持在池中的线程数,即使它们是空闲的,
* 除非设置了 {@code allowCoreThreadTimeOut}
* @throws IllegalArgumentException 如果 {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}

看到它的创建方式时,很多同学都觉得问题出在最大线程数 Integer.MAX_VALUE。但是其实它使用的工作队列 DelayedWorkQueue 反而是个更大的隐患,因为 DelayedWorkQueue 是一个无界队列,这意味着在线程池因为创建过多线程而出现问题之前,很可能就已经因为队列堆积了过多的任务而直接引发 OOM 了。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {

// 虽然初始化容量只有 16,但是会在新增元素时进行不断扩容
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

// 向阻塞队列新增任务
public boolean offer(Runnable x) {
// ......
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
// 当队列满时进行扩容
grow();
// ......
} finally {
lock.unlock();
}
return true;
}

/**
* 调整堆数组的大小,只在持有锁时调用
*/
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
}

这里还有一个扩展知识点。DealyQueueDelayedWorkQueue 实现功能基本一致,为什么不直接用 DealyQueue,而是再扩展出 DelayedWorkQueue

DelayedWorkQueue 中的元素 ScheduledFutureTask 在堆中记录了索引,因此 remove 操作的时间复杂度从 O(n) 提升到 O(1)。因此,DelayedWorkQueue 重写了 remove 方法,直接利用元素的索引进行操作。

我们先看 DealyQueueremove 方法:

public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 调用内部任务队列存储 remove 方法
return q.remove(o);
} finally {
lock.unlock();
}
}

public boolean remove(Object o) {
// 首先需要查到任务元素下标
int i = indexOf(o);
if (i == -1)
return false;
else {
removeAt(i);
return true;
}
}

// 可以看到元素查找是通过遍历的形式,平均时间复杂度 O(n)
private int indexOf(Object o) {
if (o != null) {
final Object[] es = queue;
for (int i = 0, n = size; i < n; i++)
if (o.equals(es[i]))
return i;
}
return -1;
}

我们再看 DelayedWorkQueue 重写后的 remove 方法:

public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 重点是这行,其他请忽略
int i = indexOf(x);
if (i < 0)
return false;

setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}

private int indexOf(Object x) {
if (x != null) {
// 如果是 ScheduledFutureTask 类型,可以直接通过堆下标找到对应元素,时间复杂度 O(1)
if (x instanceof ScheduledFutureTask) {
int i = ((ScheduledFutureTask) x).heapIndex;
// Sanity check; x could conceivably be a
// ScheduledFutureTask from some other pool.
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
// 如果不是 ScheduledFutureTask 类型,通用通过遍历形式,平均时间复杂度 O(n)
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}

DelayedWorkQueue 中,元素通常是 ScheduledFutureTask,但它实际上可以是持有任何实现了 RunnableScheduledFuture 接口的对象。

基于以上行为,可以说 remove 操作的时间复杂度从 O(n) 降低到 O(1)

2. 如何正确创建线程池?

生产环境使用线程池还是有挺多讲究的,创建线程池的几个参数应该如何正确设置?参考文章:如何设置线程池的参数?