如何感知线程池触发了拒绝策略?
作者:程序员马丁
在线博客:https://open8gu.com
大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。
回答话术
在高并发、高吞吐量的极限情况下,平常稳定运行的线程池可能会变得不稳定,作为线程池任务执行策略中兜底的拒绝策略显得格外重要。
当客户端提交任务到线程池时,如果线程池满足以下任意条件,就会发起任务拒绝策略:
- 线程池已经触发了停止行为,当前线程池的状态为非运行状态。
- 阻塞队列已满,并且线程池中已经创建了最大线程数的线程(线程都在运行中)。
线程池抛出拒绝策略意味着线程池参数设置不合理导致业务受到影响,我们期望线程池在拒绝提交的任务时,触发以下行为:
- 发送线程池报警消息或邮件,通知到相关负责人。
- 统计线程池拒绝任务的次数,方便后续统计时采集到关键指标。
- ......
但是,在阅读 JDK 线程池源码的时候,发现线程池 API 中并不支持拒绝任务的扩展。线程池底层设计中,将抛出拒绝策略方法的访问级别设置为默认访问权限,并添加了 final 关键字。
默认访问权限:意味着可以被这个类本身和同一个包中的类访问,在其他包中定义的类,即使是这个类的子类,也不能直接访问这个成员。
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
这也就意味着,我们没办法继承 ThreadPoolExecutor
去重写 reject
。同时,也没办法手动调用 reject
方法。
虽然线程池的拒绝任务方法设置了默认访问权限和 final
关键字,但是我们可以使用代理模式来满足上述的扩展需求。
问题详解
1. 代理模式
代理模式(Proxy Design Pattern),在不改变原始类代码的情况下,引入代理类对原始类的功能作出增强。
虽然线程池的拒绝任务方法设置了 默认访问权限 和 final 关键字,但是我们可以使用代理模式来满足上述的扩展需求。
在下面的示例中,我们将会一步一步的通过代理模式实现统计拒绝策略触发次数和报警的功能。
简单的来说,我们可以继承 ThreadPoolExecutor
实现一个自定义线程池,这个线程池提供了记录拒绝任务触发次数的计数器和对应的访问方法。
然后,我们再继承 AbortPolicy
实现一个自定义拒绝策略,当这个拒绝策略触发的时候,如果触发的线程池是上述的自定义线程池,就让计数器递增。
public class SupportThreadPoolExecutor extends ThreadPoolExecutor {
/**
* 拒绝策略次数统计
*/
private final AtomicInteger rejectCount = new AtomicInteger();
public SupportThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
/**
* 设置 {@link SupportThreadPoolExecutor#rejectCount} 自增
*/
public void incrementRejectCount() {
rejectCount.incrementAndGet();
}
/**
* 获取拒绝次数
*
* @return
*/
public int getRejectCount() {
return rejectCount.get();
}
}
创建增强的公共拒绝策略,其中包含 拒绝策略次数统计
以及 报警推送
,供实际的拒绝策略子类实现。
public class RejectedHandlerProxy implements RejectedExecutionHandler {
/**
* 真实代理拒绝策略对象
*/
private RejectedExecutionHandler delegate;
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
beforeReject(executor);
delegate.rejectedExecution(runnable, executor);
}
/**
* 拒绝策略记录时, 执行某些操作
*
* @param executor
*/
private void beforeReject(ThreadPoolExecutor executor) {
if (executor instanceof SupportThreadPoolExecutor) {
SupportThreadPoolExecutor supportExecutor = (SupportThreadPoolExecutor) executor;
// 发起自增
supportExecutor.incrementRejectCount();
// 触发报警...
System.out.println("线程池触发了任务拒绝...");
}
}
}