Skip to main content

如何监控线程池的运行性能指标?

作者:程序员马丁

在线博客:https://open8gu.com

note

大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。

回答话术

绝大多数的项目其实都运行在大大小小的线程池中,如果我们不对线程池的运行状态进行监控,那么就不能及时的发现问题的苗头,而只能等到问题真正发生的时候再进行补救。比如常见的线程池被打满的问题,如果能及时发现线程池消费速率不足,任务积压严重,那么就可以尽早处理避免。

image.png

上图是线程池提供的原生监控方法,有些同学看了可能会比较疑惑,阻塞队列明明也是线程池的一部分,但是在这里面似乎并没有看到获取阻塞队列状态的 API。

实际上,阻塞队列本身单独提供了两个获取状态信息的 API,我们可以直接从阻塞队列上采集信息:

/**
* 阻塞队列中有多少元素,该方法并不是 BlockingQueue 接口独有,而是继承自 Collection 接口
*/
int size();

/**
* 阻塞队列还能容纳多少元素
*/
int remainingCapacity();

有了这些 API 后,我们就能获取到线程池的相关运行指标,并将其接入监控系统。

以目前互联网主流的监控技术方案来说,我们可以通过诸如 Micrometer 这类指标库来从线程池中采集一些运行指标并为其打上标签,然后再通过定时任务让 Prometheus 定期的从项目中拉取指标数据。这些数据最终可以通过 Grafana 进行可视化展示。

image.png

有了监控方案以后,我们就可以延伸出一些告警功能,比如:

  • 假设阈值设置 80%,线程池最大线程数 10,当活跃线程数达到 8 发起报警。可以知道线程数是否不够,进而有效调整线程数。
  • 假设阈值设置 80%,阻塞队列容量 100,当队列容量达到 80 发起报警。监控阻塞队列容量可以让我们及时意识到出现任务堆积。

问题详解

1. 如何监控线程池数据?

由于线程池每个线程都是相对独立的,当我们调用获取线程池运行状态的 API 时,实际上线程与任务的状态都在实时的变化,因此诸如完成任务数或者工作线程数这类数据是不能保证绝对准确且实时有效的。

不过,为了最大层度的保证数据的相对准确性,线程池会在获取数据时加锁(这部分后文会解释),而一旦加锁又毫无疑问的会影响到性能。

因为获取线程池运行数据涉及到加锁行为,为此我们应该避免频繁获取,可以使用定时任务方式采集线程池运行数据并进行接下来数据上报等操作。


在项目中,我们一般会把线程池声明为 Spring 容器中的 Bean,对于此类线程池,我们可以选择在项目 Web 容器(如 Tomcat)启动成功后,让采集器直接从容器获取相关的线程池实例进行采集。

而如果线程池不被 Spring 管理,那么我们则需要在 Spring 容器启动前主动的将其注册到采集器。下面我们简单示范一下:

1.1. 非 Spring Bean 线程池数据采集

创建线程池定时任务数据采集器,在应用启动后执行定时任务获取线程池的运行数据,并将采集到的线程池数据上报到监控端。

代码片段中的 THREAD_POOL_EXECUTOR_LIST 是线程池监控存储容器,代码中创建线程池后将对象存储一份到该集合中即可,后续定时任务会采集该线程池运行时监控数据。

import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
@RequiredArgsConstructor
public class ThreadPoolSpringNotBeanDataCollector implements ApplicationRunner {

private static final List<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_LIST = new ArrayList<>();

private final String COLLECT_VESSEL_TASK_NAME = "client_scheduled_collect_data_";
private final ScheduledThreadPoolExecutor COLLECT_VESSEL_EXECUTOR = new ScheduledThreadPoolExecutor(
1,
ThreadFactoryBuilder.create().setNamePrefix(COLLECT_VESSEL_TASK_NAME).build()
);

@Override
public void run(ApplicationArguments args) throws Exception {
// 延迟 initialDelay 后循环调用. scheduleWithFixedDelay 每次执行时间为上一次任务结束时, 向后推一个时间间隔
COLLECT_VESSEL_EXECUTOR.scheduleWithFixedDelay(
() -> THREAD_POOL_EXECUTOR_LIST.forEach(each -> {
// 执行线程池监控数据采集任务
}),
10, // 自定义初始化延迟时间
5, // 自定义采集线程池监控数据时间周期
TimeUnit.MILLISECONDS // 采集线程池监控数据时间单位,默认毫秒
);
}

public void putExecutor(ThreadPoolExecutor executor) {
THREAD_POOL_EXECUTOR_LIST.add(executor);
}
}

如果觉得业务线程池是需要监控的,我们通过下述方式添加到监控存储容器属性中即可,如下所示:

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
@RequiredArgsConstructor
public class SendMessageService implements InitializingBean {

private final ThreadPoolSpringNotBeanDataCollector threadPoolSpringNotBeanDataCollector;

private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
10,
1024,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(4096)
);

@Override
public void afterPropertiesSet() throws Exception {
threadPoolSpringNotBeanDataCollector.putExecutor(threadPoolExecutor);
}
}

1.2. Spring Bean 线程池数据采集

如果我们把线程池声明为 Bean 交给 Spring 容器管理,那么我们就不需要再手动将其注册到采集器了,直接让采集器在启动后主动从 Spring 容器获取所有线程池实例即可:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class ThreadPoolConfiguration {

@Bean
public ThreadPoolExecutor customerThreadPoolExecutor() {
return new ThreadPoolExecutor(xxx, xxx, xxx, xxx);
}
}

@Component
@RequiredArgsConstructor
public class ThreadPoolSpringBeanDataCollector implements ApplicationRunner {

private final String COLLECT_VESSEL_TASK_NAME = "client_scheduled_collect_data_";
private final ScheduledThreadPoolExecutor COLLECT_VESSEL_EXECUTOR = new ScheduledThreadPoolExecutor(
1,
ThreadFactoryBuilder.create().setNamePrefix(COLLECT_VESSEL_TASK_NAME).build()
);

private final ApplicationContext APPLICATION_CONTEXT;

@Override
public void run(ApplicationArguments args) throws Exception {
Map<String, ThreadPoolExecutor> threadPoolExecutorMap = APPLICATION_CONTEXT.getBeansOfType(ThreadPoolExecutor.class);
// 延迟 initialDelay 后循环调用. scheduleWithFixedDelay 每次执行时间为上一次任务结束时, 向后推一个时间间隔
COLLECT_VESSEL_EXECUTOR.scheduleWithFixedDelay(
() -> threadPoolExecutorMap.forEach((key, val) -> {
// 执行线程池监控数据采集任务
}),
10, // 自定义初始化延迟时间
5, // 自定义采集线程池监控数据时间周期
TimeUnit.MILLISECONDS // 采集线程池监控数据时间单位,默认毫秒
);
}
}

不论线程池是否由 Spring 管理,采集的方式大致相同。一种是创建好线程池后放到一个自定义容器,另一种则是从 Spring 容器中获取。

2. 线程池监控指标都有哪些?

根据我的理解,咱们在定义线程池监控指标时,收集的核心数据包括不限于以下列表:

  • 线程池当前负载:当前线程数 / 最大线程数。
  • 线程池峰值负载:当前线程数 / 最大线程数,线程池运行期间最大的负载。
  • 核心线程数:线程池的核心线程数。
  • 最大线程数:线程池限制同时存在的线程数。
  • 当前线程数:当前线程池的线程数。
  • 活跃线程数:执行任务的线程的大致数目。
  • 最大出现线程数:线程池中运行以来同时存在的最大线程数。
  • 阻塞队列:线程池暂存任务的容器。
  • 队列容量:队列中允许元素的最大数量。
  • 队列元素:队列中已存放的元素数量。
  • 队列剩余容量:队列中还可以存放的元素数量。
  • 线程池任务完成总量:已完成执行的任务的大致总数。
  • 拒绝策略执行次数:运行时抛出的拒绝次数总数。

拒绝策略执行次数参数在原生线程池中并没有 API 体现,为此我们需要自定义扩展策略实现该指标。详情查看:✅ 如何感知线程池触发了拒绝策略?

这些指标可以帮助我们排查并解决大部分因为线程池导致的问题。当获取这些指标时,大多都需要先获取线程池中的主锁才能进行,而线程池的创建线程、销毁线程、记录完成任务数等操作也同样需要获取这把锁。

这意味着,如果我们频繁的调用这些需要获取主锁的 API,那么必然会影响线程池的正常运行,进而影响到线程池的执行性能。

因此,我们需要尽可能的避免频繁的获取这些参数,这也是我们在上文不推荐直接在线程池里面埋点统计运行指标的原因。

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
xxxxx
} finally {
mainLock.unlock();
}

3. 监控数据如何存储?

上面的线程池监控指标如果只能支持实时查看,并不能帮忙开发日常排查错误。大部分场景下,生产上的问题发现会有延迟。比如 12:30 出现的问题,业务人员 13:00 进行的反馈。

为了更好帮助开发排错,我们需要将线程池的历史运行数据进行存储。说到线程池历史运行数据的存储,使用 时序数据库(TSDB) 是最合适的。但大部分情况下,公司不会为了这一个需求搭建或者采购时序数据库,那就可以使用折中方案,比如说 MySQL、ElasticSearch 等。

我们以 MySQL 为例,his_run_data 历史运行数据表,建表语句如下:

CREATE TABLE `his_run_data` (
`thread_pool_id` varchar(56) DEFAULT NULL COMMENT '线程池ID',
`instance_id` varchar(256) DEFAULT NULL COMMENT '实例ID',
`current_load` bigint(20) DEFAULT NULL COMMENT '当前负载',
`peak_load` bigint(20) DEFAULT NULL COMMENT '峰值负载',
`pool_size` bigint(20) DEFAULT NULL COMMENT '线程数',
`active_size` bigint(20) DEFAULT NULL COMMENT '活跃线程数',
`queue_capacity` bigint(20) DEFAULT NULL COMMENT '队列容量',
`queue_size` bigint(20) DEFAULT NULL COMMENT '队列元素',
`queue_remaining_capacity` bigint(20) DEFAULT NULL COMMENT '队列剩余容量',
`completed_task_count` bigint(20) DEFAULT NULL COMMENT '已完成任务计数',
`reject_count` bigint(20) DEFAULT NULL COMMENT '拒绝次数',
`timestamp` bigint(20) DEFAULT NULL COMMENT '时间戳',
`gmt_create` datetime DEFAULT NULL COMMENT '创建时间',
`gmt_modified` datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_group_key` (`tp_id`,`instance_id`) USING BTREE,
KEY `idx_timestamp` (`timestamp`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='历史运行数据表';

可以看到,建表语句中有三个关键字段:

  • thread_pool_id:表示当前数据的线程池标识。
  • instance_id:应用可能集群部署,标识集群下唯一的线程池。
  • timestamp:记录线程池运行数据产生时的时间戳。

有一个问题,线上的线程池是源源不断产生运行数据的,迟早不得把表的数据量推到上亿?

因为数据是有时效性的,过了一定时间之后,就没有必要再占用实时的资源。针对上述问题提供两种解决方案:

  • 假设数据存储 1 天,如果超出这个时间,直接删除即可。
  • 同上所述,过期数据可以保留到备份表中,并删除 his_run_data 数据。

可能有的小伙伴还会担心,数据量太大会不会导致查询时过慢?我们可以算一下,假设有 100 个应用,每个应用部署 10 个节点。假设数据有效期为 1 小时,那么可以产出的数据是 72 万,一天也就是 1728 万。对于 MySQL 而言,几千万数据量以下针对索引的查询,都不会产生性能瓶颈。

如果想要保存时间更长或者应用以及节点数据更多,我们还可以采用分表方案存储,以此保障真实企业级应用数据海量存储。

文末引言

我开源了一款线程池框架 Hippo4j,支持线程池动态变更&监控&报警。上文中的监控相关功能以及对应指标均以实现,GitHub 已收获 5.2k Star,感兴趣的小伙伴欢迎查阅该框架。

开源仓库:github.com/opengoofy/hippo4j