Skip to main content

项目重启或宕机时线程池任务还没处理完?

作者:程序员马丁

在线博客:https://open8gu.com

note

大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。

回答话术

如果项目运行过程中,线程池中有大量的线程正在运行任务或者阻塞队列中存在大量的任务堆积,此时项目遇到重启或者宕机的情况,线程池会丢失运行中以及未运行的任务,导致业务异常。

针对这种场景,有两种解决方案,如下所示:

1. 优雅关闭线程池

如果线程池中线程数量不多,并且阻塞队列容量设置不大的场景,我们可以实现 Spring DisposableBean 接口,在应用容器关闭时,调用线程池的 shutdown 以及 awaitTermination 方法,等待线程池运行一段时间后再关闭应用。

通过该方式可以极大程度避免应用关闭时线程池任务没有执行完成情况。当然,等待时间需要根据实际情况设置,如果说等待时间太久,会影响项目的重新发布效率。

不过如果线程池任务太多的话,还是有可能会出现到时间了任务还没跑完,最后依然丢任务的情况。

2. 消息队列保障任务不丢失

上面这种方式适合快速响应用户请求的类型,如果是快速处理批量任务则不适用,因为后者阻塞队列的数量可能会很多,基本上不可能等待所有任务完成。为此,要想做到数据不丢失场景,需要借助消息队列进行兜底消费

image.png

如果发送任务,我们不是直接投递到线程池运行,而是投递到消息队列,比如 RocketMQ 等,然后在消息队列消费者逻辑中通过线程池进行消费。这样的话,即可以通过消息队列机制保障消息任务不丢失,同时通过线程池进行消费提速。

有种极端场景,不适合在回答问题时直接说,而是在面试官提出疑问后引出来。

我们将消息投递到线程池后,就返回给消息队列 ACK,此时应用突然宕机,任务在线程池中正在执行,结果还是会丢失。如果想要完全保证消息不丢失,应该在消息队列投递线程池前将任务备份到数据库,并在线程池中将任务设置完成状态。同时,有个异步任务定时扫描未完成的数据,完成最终一致性。

在消费者中的线程池尽量不要设置阻塞队列长度,同时,要加上优雅关闭线程池逻辑,这样能够最大可能性避免任务丢失场景。

问题详解

1. 如何优雅关闭线程池?

给大家看一段代码,可以测试下方法。然后会逐一分析这里用到的技术点,如何搭配完成优雅关闭线程池功能。

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Service;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 项目重启或宕机时线程池任务还没处理完?
* 公众号:马丁玩编程,回复:加群,添加马哥微信进大话面试开源用户群聊
*/
@Slf4j
@Service
public class TestThreadPoolService implements DisposableBean {

private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
20,
1000,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(4096)
);

@Override
public void destroy() throws Exception {
threadPoolExecutor.shutdown();
if (!threadPoolExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
log.warn("Timed out while waiting for executor to terminate.");
}
log.info("Thread pool waits for termination to run successfully.");
}
}

我们在 SpringBoot 项目中测试的话,首先启动项目运行,然后通过 kill -15 形式关闭项目,就可以看到顺利打印关闭日志。

如果能在 awaitTermination 等待时间内完成线程池中所有任务会打印 info 日志,反之则打印 warn 日志。

image.png

IDEA 中点击一次关闭按钮效果等同于 kill -15,如果快速点击多次,相当于 kill -9 强制关闭。

image.png

1.1. DisposableBean 接口做什么的?

假设我们使用 Tomcat 作为 Web 容器,如果接收到 kill -15 优雅退出的指令,那么 Tocmat 会调用 ServletContextListener 中的 contextDestroyed 方法,SpringBoot 会开始销毁应用程序上下文中的 Bean,并在这个过程中将会调用一些回调接口/方法,等到所有的 Bean 都销毁完毕后,才会关闭容器。

我们上文提到的 DisposableBean 接口就是一个在 Bean 被销毁时会调用的回调接口,它是 Bean 生命周期的一环。

如果想了解 Spring Bean 生命周期完整流程,详情查看:✅ Bean 的生命周期?

1.2. awaitTermination 方法有什么作用?

ThreadPoolExecutor#awaitTermination 是线程池原生提供的方法,方法定义如下所示:

/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

该方法用于让主线程等待该线程池关闭,直到等待超时,或线程池完成所有任务正常关闭为止。

通常,在调用线程池的 shutdown 方法后,可以使用 awaitTermination 方法来等待线程池中的任务尽可能执行完成。

然后一起看看底层怎么实现的:

public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 该方法用于判断线程池的运行状态是否小于TERMINATED。如果线程池的运行状态小于TERMINATED,表示线程池仍在运行,需要继续等待
// 如果线程池的运行状态达到或超过TERMINATED,表示线程池已经终止,可以退出循环
while (runStateLessThan(ctl.get(), TERMINATED)) {
// 用于检查等待时间是否已经耗尽
// 如果等待时间已经耗尽(nanos小于等于0),则返回false,表示等待超时。这样可以控制等待的最长时间
if (nanos <= 0L)
return false;
// 等待终止条件的满足,并返回剩余的等待时间
// 这里将剩余的等待时间重新赋值给nanos变量,用于下一次循环的判断
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}

1.3. 为什么调用 awaitTermination 之前要先调用 shutdown 方法?

如果在调用 awaitTermination 方法之前不调用 shutdown 方法,线程池仍处于"运行"状态,会继续接受和执行新的任务。在这种情况下,awaitTermination 方法可能永远不会返回成功,因为线程池仍在运行,无法确定所有任务是否已经执行完成。

通过先调用 shutdown 方法,可以使线程池进入"关闭"状态,等待已提交的任务执行完成。然后,调用 awaitTermination 方法等待线程池中的任务执行完成或等待超时。这样可以确保在调用 awaitTermination 方法时,线程池已经停止接受新任务,并且正在有序地执行已提交的任务。

2. 如何通过消息队列保障任务不丢失?

我们这里以 RocketMQ 举例,通过生产者发送到消息队列,然后在消费端拉取消息并进行消费逻辑。

示例方法逻辑就是取出一批用户手机号发送短信通知,提供两个示例,分别是看看使用线程池怎么做,以及切换到消息队列又如何完成功能。

2.1. 原始线程池处理方案

import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 项目重启或宕机时线程池任务还没处理完?
* 公众号:马丁玩编程,回复:加群,添加马哥微信进大话面试开源用户群聊
*/
@Service
@RequiredArgsConstructor
public class OriginalTestThreadPoolService {

private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
20,
1000,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(4096)
);

public void send() {
List<String> phones = ListUtil.of("1560116xxxx", "1560116xxxx", "1560116xxxx", "1560116xxxx");
for (String each : phones) {
threadPoolExecutor.execute(() -> {
// xxx
});
}
}
}

2.2. 消息队列方案

2.2.1. 消息生产者

之前的逻辑是将任务放到线程池执行具体业务逻辑,重构后就是将任务投递到消息队列,并在消息队列消费端执行具体的业务逻辑。

import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.List;

/**
* 项目重启或宕机时线程池任务还没处理完?
* 公众号:马丁玩编程,回复:加群,添加马哥微信进大话面试开源用户群聊
*/
@Service
@RequiredArgsConstructor
public class TestThreadPoolService {

private final RocketMQTemplate rocketMQTemplate;

public void send() {
List<String> phones = ListUtil.of("1560116xxxx", "1560116xxxx", "1560116xxxx", "1560116xxxx");
for (String each : phones) {
Message<String> build = MessageBuilder.withPayload(each)
.build();
rocketMQTemplate.syncSend("topic", build);
}
}
}
2.2.2. 消息消费者

这里使用线程池消费同时也要保障优雅关闭线程池,大家知道有这个步骤即可,这里就不写对应代码了。

import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 项目重启或宕机时线程池任务还没处理完?
* 公众号:马丁玩编程,回复:加群,添加马哥微信进大话面试开源用户群聊
*/
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "topic",
consumerGroup = "demo_consumer_group"
)
public class TestThreadPoolConsumer implements RocketMQListener<String> {

private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
4,
8,
1000,
TimeUnit.MINUTES,
new SynchronousQueue<>()
);

@Override
public void onMessage(String message) {
threadPoolExecutor.execute(() -> {
// xxx
});
}
}