火柴人拳击竞技场
77.47MB · 2025-10-19
为什么进行源码角度的深度解析?
大家在项目中到处都在使用线程池做一些性能接口层次的优化,原先串行的多个远程调用,因为rt过高,通过线程池批量异步优化,从而降低rt。还有像RocketMQ中broker启动时,同时通过ScheduledThreadPoolExecutor线程池执行其他组件的定时任务,每隔一段时间处理相关的任务。线程池广泛的应用在外面各种实际开发场景中,我们很多同学可能在项目里只是简单的copy了一些前人的代码参数并不知道其中的含义,从而导致生产级别的bug。所以本篇文章,旨在帮助还不熟悉或者想要熟悉线程池的同学,分享我自己在学习线程池源码上的一些内容来更简单、快速的掌握线程池。
并发编程中,对于常见的操作系统,线程都是执行任务的基本单元,如果每次执行任务时都创建新的线程,任务执行完毕又进行销毁,会出现以下的问题:
通过上面的问题,我们其实可以清晰的感知到这些问题都是归拢到资源没有得到合理的分配和控制导致的,线程池出现的核心宗旨其实就是对资源的合理分配和控制。除了线程池,其实更多的也接触过数据库连接池、netty的对象池等池化技术,这些池化思想其实都是为了更好的降低资源的消耗以及更好的进行资源管理。
新任务通过execute()方法提交给ThreadPoolExecutor时,其处理流程如下:
判断核心线程数:如果当前运行的线程数小于corePoolSize,则创建新线程(即使有空闲的核心线程)来执行任务。
尝试入队:如果当前运行的线程数大于或等于corePoolSize,则尝试将任务添加到workQueue中。
尝试创建非核心线程:如果workQueue.offer()失败(队列已满):
执行拒绝策略:
如果当前运行的线程数也达到了maximumPoolSize(即核心线程和非核心线程都已用尽,且队列也满了),则执行RejectedExecutionHandler所定义的拒绝策略。
参考网络中的经典执行图:
这个图能很好的表明运行原理,但是忽略了很多细节,比如所谓的缓冲执行是在什么条件下去走的呢?直接执行又是什么逻辑下执行呢?最后的任务拒绝又是怎么回事?带着这些疑问点,我们直接来进行一个源码级别的分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//线程池状态 高3位表示线程状态 低29位代表线程数量
int c = ctl.get();
//判断当前线程池线程数量是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//作为核心线程数进行线程的创建,并且创建成功线程会将command的任务执行--》对应图上的直接执行
if (addWorker(command, true))
return;
c = ctl.get();
}
//创建核心线程失败或者当前线程数量超过核心线程数
//当前线程池是否还在运行状态,尝试将任务添加到阻塞队列 --》对应图上的缓冲执行
//BlockingQueue队列的顶级抽象定义了offer不是进行阻塞添加而是立即返回,添加失败直接返回false,区别于put
if (isRunning(c) && workQueue.offer(command)) {
//重新获取线程池标志位
int recheck = ctl.get();
//如果线程此时不在运行状态中,那么将任务删除
if (! isRunning(recheck) && remove(command))
//删除任务成功,走拒绝策略拒绝掉当前任务
reject(command);
else if (workerCountOf(recheck) == 0)
//如果线程池中的工作线程都没有的时候,这里需要创建一个线程去执行添加到队列中的任务
//防止因为并发的原因工作线程都被终止掉了,此时任务在阻塞队列里等着,缺没有工作线程
addWorker(null, false);
}
//到这里那就是添加队列失败,或者线程池状态异常,但是这里仍然尝试进行创建一个worker
//如果创建失败,也是走拒绝策略拒绝当前任务
else if (!addWorker(command, false))
reject(command);
}
接下来我们仔细看看addWorker这个方法具体是在做什么:
//核心逻辑其实就是在无限循环创建一个worker,创建失败直接返回,创建成功,则将worker执行
// 因为worker有thread的成员变量,最终添加worker成功,会启动线程的start方法
//start方法最终会回调到外层的runWorker方法,改方法会不停的从阻塞队列里以阻塞的take方式
//获取任务,除非达到能被终止的条件,此时当前线程会终止
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//不停的重试添加worker的计数,只有添加成功的才会进行后续的worker启动
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//重试期间,如果其他线程导致线程池状态不一致了。重新回到第一个循环进行check判断
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//这里加锁一个是workers.add时需要加锁,另外是防止其他线程已经在尝试修改线程池状态了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将worker的引用添加到workers的hashSet中
workers.add(w);
int s = workers.size();
//更新线程池此时最大的线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加成功,就启动worker中的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//这里添加失败的话,需要把线程池的count数进行--,并且要把worker引用从hashSer中移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在介绍运行机制原理的源码分析时,其实是有提到线程池状态这个概念的。介绍这个状态其实也是让大家更方便的去管理线程池,比如我们关闭线程池时,怎么去优雅的关闭,使用不同的方法可能会有不同的效果,我们需要根据自己的业务场景去酌情分析、权衡使用。
//线程池的状态和计数采用一个Integer变量设置的
//这里之所以用一个变量来储存状态和数量,其实很有讲究的,因为我们在上面的运行原理上可以看到
//源码中有大量的进行状态以及数量的判断,如果分开采用变量的记录的话,在维护二者一致性方面
//可能就需要加锁的维护成本了,而且计算中都是位移运算也是非常高效的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程池的大小由ctl低29位表示,现成状态由ctl高3位表示
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的状态通过简单的位移就能计算出来,状态只能从低到高流转,不能逆向
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 这里是获取线程状态以及获取线程数量的简单高效的位移方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
接下来结合源码详细介绍下线程池的5种状态以及分别有什么不同的表现行为?
先说下结论:
RUNNING 这个就是线程池运行中状态,我们可以添加任务也可以处理阻塞队列任务
SHUTDOWN 不能添加新的任务,但是会将阻塞队列中任务执行完毕
STOP 不能添加新的任务,执行中的线程也会被打断,也不会处理阻塞队列的任务
TIDYING 所有线程都被终止,并且workCount=0时会被置为的状态
TERMINATED 调用完钩子方法terminated()被置为的状态
//线程池关闭
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//循环cas设置线程池状态,直到成功或状态已经state>=SHUTDOWN
advanceRunState(SHUTDOWN);
//这个是真正得出结论的地方
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//打断空闲的线程,如何判断线程是否空闲还是运行?
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//worker的线程没有被打断过,并且能获取到worker的aqs独占锁
if (!t.isInterrupted() && w.tryLock()) {
try {
//打断当前线程,如果线程在阻塞队列中阻塞,此时会被中断
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
//循环cas修改线程池状态为stop。打断所有线程,取出阻塞队列的所有任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查线程的权限
checkShutdownAccess();
//将状态case为stop
advanceRunState(STOP);
//打断所有worker不管是不是正在执行任务
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
//这里获取锁之后。打断了所有的线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
//这个方法在每个线程退出时都会进行调用,如果是运行中、或者状态大于等于TIDYING或者shutdown但是队列不为空都
//直接返回,如果不满足以上条件,并且线程数不为0的话,打断一个空闲线程
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//此时到这里,状态要么为STOP。要么是shutdown并且队列为空了
// 获取一个锁,尝试cas修改状态为TIDYING
//调用terminated()的钩子方法,
//修改线程池为终态TERMINATED,并且唤醒阻塞在termination队列上的线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
java.util.concurrent.Executors工厂类提供了一些静态方法,方便我们快速创建几种预设配置的线程池:
某一线互联网Java开发手册
ExecutorService.invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)方法会提交一组Callable任务,并等待所有任务完成,或者直到超时。如果超时发生,它会尝试取消(中断)所有尚未完成的任务,然后返回一个List。
失效场景分析:
Callable<String> task = () -> {
while (true) {
//缺少此检查将导致超时失效
if (Thread.interrupted()) break;
// 耗时计算...
}
return "done";
};
Callable<Integer> task = () -> {
Files.copy(in, path); // 某些NIO操作不响应中断
return 1;
};
Callable<Result> task = () -> {
//未设查询超时时间
return jdbcTemplate.query("SELECT * FROM large_table");
};
new ThreadPoolExecutor(
100, 100, // 核心线程数过大
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>() // 无界队列
);
invokeAll超时失效demo:
import java.util.*;
import java.util.concurrent.*;
public class InvokeAllTimeoutDemo {
// 模拟耗时任务(可配置是否响应中断)
static class Task implements Callable<String> {
private final int id;
private final long durationMs;
private final boolean respectInterrupt;
Task(int id, long durationMs, boolean respectInterrupt) {
this.id = id;
this.durationMs = durationMs;
this.respectInterrupt = respectInterrupt;
}
@Override
public String call() throws Exception {
System.out.printf("Task %d started%n", id);
long start = System.currentTimeMillis();
// 模拟工作(检查中断状态)
while (System.currentTimeMillis() - start < durationMs) {
if (respectInterrupt && Thread.interrupted()) {
throw new InterruptedException("Task " + id + " interrupted");
}
// 不响应中断的任务会继续执行
}
System.out.printf("Task %d completed%n", id);
return "Result-" + id;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
List<Callable<String>> tasks = Arrays.asList(
new Task(1, 2000, true), // 2秒,响应中断
new Task(2, 10000, false) // 10秒,不响应中断
);
System.out.println("Invoking with 3s timeout...");
try {
//设置3秒超时
List<Future<String>> futures = executor.invokeAll(tasks, 3, TimeUnit.SECONDS);
for (Future<String> f : futures) {
// 明确处理取消状态
if (f.isCancelled()) {
System.out.println("Task was cancelled");
} else {
try {
System.out.println("Result: " + f.get(100, TimeUnit.MILLISECONDS));
} catch (TimeoutException | ExecutionException e) {
System.out.println("Task failed: " + e.getCause());
}
}
}
} finally {
executor.shutdownNow();
System.out.println("Executor shutdown");
}
}
}
当我们使用invokeAll(tasks, timeout) 提交多个任务时,如果出现某个任务对中断不响应或者响应不及时,那我们即使设置了超时时间,不响应中断的任务2仍在后台运行(即使调用了 shutdownNow())
使用ExecutorService.submit()提交任务时,任务执行过程中如果抛出未捕获的异常(无论是受检异常还是运行时异常),这个异常会被Future的包装类如FutureTask重写的run()方法捕获并封装在返回的Future包装对象的成员变量中。
submit()异常消失demo:
public class ThreadPoolExceptionDemo {
public static void main(String[] args) {
// 创建单线程线程池(便于观察异常)
ExecutorService executor = Executors.newSingleThreadExecutor();
// 场景1:Callable抛出异常(通过Future.get()捕获)
Future<String> future1 = executor.submit(() -> {
System.out.println("[Callable] 开始执行");
Thread.sleep(100);
throw new RuntimeException("Callable故意抛出的异常");
});
try {
System.out.println("Callable结果: " + future1.get());
} catch (ExecutionException e) {
System.err.println("捕获到Callable异常: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 场景2:Runnable抛出异常(同样通过Future.get()捕获)
Future<?> future2 = executor.submit(() -> {
System.out.println("[Runnable] 开始执行");
throw new IllegalArgumentException("Runnable故意抛出的异常");
});
try {
future2.get(); // Runnable成功时返回null
} catch (ExecutionException e) {
System.err.println("捕获到Runnable异常: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 场景3:未处理的任务异常(需设置异常处理器)
executor.submit(() -> {
System.out.println("[未捕获的任务] 开始执行");
throw new IllegalStateException("这个异常会被默认处理器处理");
});
executor.shutdown();
}
}
//自定义当前线程组创建线程的统一异常处理,类似于controller的统一异常处理机制
ThreadFactory myThreadFactory = new ThreadFactory() {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
private final String threadNamePrefix = "myThreadFactory-";
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r,threadNamePrefix + atomicInteger.getAndIncrement());
t.setUncaughtExceptionHandler((thread, throwable) -> {
//异常的统一处理,日志打印、兜底处理、监控、资源释放等
System.err.println("线程[" + thread.getName() + "]异常: " + throwable);});
return t;
}
};
//构造方法时使用自定义的线程工厂
ExecutorService executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory,
handler
);
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//需要特别注意任务是否为submit提交,如果是execute提交的任务,那这里很直接的知道任务是否发生异常以及后续去怎么处理
if(r instanceof Future){
if(((Future<?>) r).isDone() || ((Future<?>) r).isCancelled()){
//继续使用主线程完成任务,一般不建议,最好使用兜底方式:例如异步发消息,由后续的消费组统一处理异常的任务
}
}else if( t != null){
//execute异常处理
}
}
}
//FutureTask 把run方法进行了重写,并且catch住了异常,所以说afterExecute的t 如果是submit提交的方式
//那么t基本上就是null
public void run() {
//....
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
//...
}
afterExecute可以借鉴的示例:
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.slf4j.*;
public class RobustThreadPool extends ThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(RobustThreadPool.class);
private final AtomicLong failureCounter = new AtomicLong();
private final RetryPolicy retryPolicy; // 重试策略
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
public RobustThreadPool(int corePoolSize, int maxPoolSize,
BlockingQueue<Runnable> workQueue,
RetryPolicy retryPolicy) {
super(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, workQueue);
this.retryPolicy = retryPolicy;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
logger.debug("开始执行任务: {}", r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 1. 异常分类处理
if(r instanceof Future){
if(((Future<?>) r).isDone()){
//错误记录以及异常处理
failureCounter.incrementAndGet();
handleFailure(r, t, costTime);
}
}else if( t != null){
//execute异常处理
failureCounter.incrementAndGet();
handleFailure(r, t, costTime);
}
// 2. 资源清理
cleanThreadLocals();
}
private void handleFailure(Runnable r, Throwable t) {
// 1. 异常类型识别
if (t instanceof OutOfMemoryError) {
logger.error("JVM内存不足,终止任务: {}", t.getMessage());
System.exit(1); // 严重错误直接终止
}
// 2. 可重试异常处理
else if (isRetryable(t)) {
int retryCount = retryPolicy.getCurrentRetryCount(r);
if (retryCount < retryPolicy.getMaxRetries()) {
logger.warn("任务第{}次失败,准备重试...",
retryCount + 1, t);
retryPolicy.retry(r, this);
} else {
logger.error("任务超过最大重试次数({}),转入死信队列",
retryPolicy.getMaxRetries(), t);
DeadLetterQueue.add(r, t);
}
}
// 3. 不可重试异常
else {
logger.error("不可恢复任务失败", t);
Metrics.recordFailure(t.getClass()); // 上报监控
}
}
private boolean isRetryable(Throwable t) {
return t instanceof IOException ||
t instanceof TimeoutException ||
(t.getCause() != null && isRetryable(t.getCause()));
}
private void cleanThreadLocals() {
// 清理可能的内存泄漏
try {
ThreadLocal<?>[] holders = { /* 其他ThreadLocal */};
for (ThreadLocal<?> holder : holders) {
holder.remove();
}
} catch (Exception e) {
logger.warn("清理ThreadLocal失败", e);
}
}
// 重试策略嵌套类
public static class RetryPolicy {
private final int maxRetries;
private final long retryDelayMs;
private final Map<Runnable, AtomicInteger> retryMap = new ConcurrentHashMap<>();
public RetryPolicy(int maxRetries, long retryDelayMs) {
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
public void retry(Runnable task, Executor executor) {
retryMap.computeIfAbsent(task, k -> new AtomicInteger()).incrementAndGet();
if (retryDelayMs > 0) {
executor.execute(() -> {
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ignored) {}
executor.execute(task);
});
} else {
executor.execute(task);
}
}
public int getCurrentRetryCount(Runnable task) {
return retryMap.getOrDefault(task, new AtomicInteger()).get();
}
public int getMaxRetries() {
return maxRetries;
}
}
}
异常处理小结:要特别注意使用future.get()方法时,我们一定要注意设置超时时间,防止主线程无限期的阻塞避免边缘的业务查询影响了主业务造成得不偿失的效果,另外我们需要注意一个点就是submit()方法的提交任务时,afterExecute(Runnable r, Throwable t)中的t恒为null,如果是execute方法提交的任务,那么就是直接获取的任务执行的异常,对于submit提交的任务异常其被封装到了Futrure 包装对象中,一般需要我们再次判断任务时执行完毕还是异常或被取消了,如果发生了异常,Future.get()会抛出封装的ExecutionException异常,当然还可能是取消异常以及中断异常。invokeAll和invokeAny我们需要对返回的Future结果检查可能抛出的异常,对于callable 前面一再强调了要对InterruptedException不要静默处理,因为线程的中断标记只是一个协作方式,他并没有停止当前线程的运行,我们需要根据自身的场景对发生的中断进行快速响应以及传递中断标志。
先带大家回顾一下策略是如何触发执行的流程:
//添加任务,当不满足条件时会执行拒绝方法reject
public void execute(Runnable command) {
//...
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
//这里就是拒绝的入口。handler是有构造方法传入
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//....
//指定拒绝策略
this.handler = handler;
}
AbortPolicy:默认的拒绝策略,简单粗暴,当execute中添加woker失败时,直接在当前线程抛出异常。
public static class AbortPolicy implements RejectedExecutionHandler {
//直接抛出RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
优点:快速失败,立即暴露系统过载问题、避免任务静默丢失,便于监控系统捕获
缺点:需要调用方显式处理异常,增加代码复杂度,可能中断主业务流程
适用场景:适用于那些对任务丢失非常敏感,配合熔断机制使用的快速失败场景
CallerRunsPolicy:提交任务的线程,直接执行任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
//直接在提交任务的线程中执行任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
优点:任务都会被执行,不会丢任务,并且由于主线程执行任务,天然的流量控制,避免了大量的任务进入线程池。
缺点:调用线程可能被阻塞,导致上游服务雪崩。不适合高并发场景(可能拖垮整个调用链)。
适用场景:适用于处理能力不高,并且资源过载能够平滑过渡,同时不丢失任务的场景。如:低并发、高可靠性的后台任务(如日志归档)、允许同步执行的批处理系统。
DiscardPolicy:直接丢弃被拒绝的任务,不做任何通知。
public static class DiscardPolicy implements RejectedExecutionHandler {
//空实现
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
优点:实现简单,无额外性能开销。避免异常传播影响主流程
缺点:数据静默丢失,可能会掩盖系统容量问题
适用场景:边缘业务的监控上报数据,统计类的uv、pv统计任务
DiscardOldestPolicy:丢弃队列中最旧的任务,然后重试提交当前任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
//丢弃队列中最旧的任务,重试当前任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
优点:优先保证新任务执行,避免队列堆积导致内存溢出。
缺点:可能丢失关键旧任务、任务执行顺序无法保证。
适用场景:适用于可容忍部分数据丢失,并且实时性要求高于历史数据的场景,比如:行情推送。
通过上线的介绍,我们可以看到JDK内置策略基本上只使用于简单处理的场景,在生产实践中一般推荐我们自定义拒绝策略,进行相关的业务处理。
1. 自定义RejectedExecutionHandler:
/**
* 带监控统计的拒绝策略处理器
*/
public class MetricsRejectedExecutionHandler implements RejectedExecutionHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsRejectedExecutionHandler.class);
// 统计被拒绝的任务数量
private final AtomicLong rejectedCount = new AtomicLong(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 1. 采集线程池关键指标
int poolSize = executor.getPoolSize();
int activeThreads = executor.getActiveCount();
int corePoolSize = executor.getCorePoolSize();
int maxPoolSize = executor.getMaximumPoolSize();
int queueSize = executor.getQueue().size();
long completedTasks = executor.getCompletedTaskCount();
// 2. 递增拒绝计数器
long totalRejected = rejectedCount.incrementAndGet();
// 3. 输出警告日志(包含完整指标)
logger.warn("""
任务被拒绝执行!线程池状态:
|- 活跃线程数/当前线程数: {}/{}
|- 核心/最大线程数: {}/{}
|- 队列大小: {}
|- 已完成任务数: {}
|- 历史拒绝总数: {}
|- 被拒绝任务: {}
""",
activeThreads, poolSize,
corePoolSize, maxPoolSize,
queueSize,
completedTasks,
totalRejected,
r.getClass().getName());
// 4. 可选:降级处理(如存入数据库等待重试)
// fallbackToDatabase(r);
// 5. 抛出RejectedExecutionException(保持默认行为)
throw new RejectedExecutionException("Task " + r.toString() + " rejected");
}
// 获取累计拒绝次数(用于监控)
public long getRejectedCount() {
return rejectedCount.get();
}
}
2. 根据自身业务场景选择合适的拒绝策略:
3. 结合线程池参数综合考虑:
拒绝策略小结:
策略的选择跟我们大多数的系统设计哲学是保持一致的,都是在应对不同的场景中,做出一定的trade off。最好的策略需要根据业务场景、系统容忍度、资源等方面的综合考量,一个黄金的实践原则:拒绝事件做好监控告警、根据业务SLA定义策略,是否可丢失,快速失败等,定期的进行压力测试,验证策略的有效性。
核心思想:根据任务的资源类型 、优先级和业务特性 ,划分多个独立的线程池,避免不同性质的任务相互干扰。
1. 隔离维度:
2. 不同业务场景线程池独立使用:在不同的业务场景下,为自己的特定业务,创建独立的线程池。
3. 自定义Executor避免线程池共用
// 创建CPU密集型任务线程池(线程数=CPU核心数)
ExecutorService cpuIntensiveExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // 核心线程数=CPU核心数
Runtime.getRuntime().availableProcessors(), // 最大线程数=CPU核心数
30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new ThreadFactoryBuilder()
.setNameFormat("cpu-pool-%d")
.setPriority(Thread.MAX_PRIORITY) // 提高优先级
.build(),
new ThreadPoolExecutor.AbortPolicy() // 直接拒绝
);
// 使用示例
CompletableFuture.supplyAsync(() -> {
// 矩阵计算等CPU密集型任务
double[][] result = matrixMultiply(largeMatrixA, largeMatrixB);
return result;
}, cpuIntensiveExecutor)
.thenAccept(result -> {
System.out.println("计算结果维度: " + result.length + "x" + result[0].length);
});
线程池隔离小结:
专池专用的本质是通过物理隔离实现:
最终呈现的效果是像专业厨房的分区(切配区/炒菜区/面点区)一样,让每个线程池专注处理同类任务,提升整体效率和可靠性。
线程池是Java并发编程的核心组件,通过复用线程减少资源开销,提升系统吞吐量。其核心设计包括线程复用机制 、任务队列和拒绝策略 ,通过ThreadPoolExecutor的参数(核心线程数、最大线程数、队列容量等)实现灵活的资源控制。线程池的生命周期由RUNNING、SHUTDOWN等状态管理,确保任务有序执行或终止。
内置线程池(如Executors.newCachedThreadPool)虽便捷,但存在内存溢出或无界队列堆积的风险,需谨慎选择。invokeAll的超时失效和submit提交任务的异常消失是常见陷阱需通过正确处理中断和检查Future.get()规避。
最佳实践包括:
合理使用线程池能显著提升性能,但需结合业务场景精细调参,确保稳定性和可维护性,希望这篇文章能给大家带来一些生产实践上的指导,减少一些因为不熟悉线程池相关原理生产误用导致的一些问题。
1. 基于浏览器扩展 API Mock 工具开发探索|得物技术
2. 破解gh-ost变更导致MySQL表膨胀之谜|得物技术
3. MySQL单表为何别超2000万行?揭秘B+树与16KB页的生死博弈|得物技术
4. 0基础带你精通Java对象序列化--以Hessian为例|得物技术
5. 前端日志回捞系统的性能优化实践|得物技术
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。