星云点击:星空遥控器
120.47M · 2026-02-04
ThreadPoolExecutor的 3 个核心基础
ctl:状态 + 线程数二合一// 核心原子变量:32位int,高3位存运行状态,低29位存当前活跃线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程数的位数:32-3=29,限制最大活跃线程数为2^29-1
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程数掩码:低29位全1,用于提取活跃线程数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池5种运行状态(高3位,数值从大到小,状态不可回退)
private static final int RUNNING = -1 << COUNT_BITS; // 接收新任务+处理队列任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务+处理队列任务
private static final int STOP = 1 << COUNT_BITS; // 不接收新任务+不处理队列任务+中断执行中任务
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务执行完,活跃线程数为0,准备进入终止
private static final int TERMINATED = 3 << COUNT_BITS; // 终止状态,执行完terminated()钩子
// 工具方法:从ctl中提取运行状态(高3位)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 工具方法:从ctl中提取活跃线程数(低29位)
private static int workerCountOf(int c) { return c & CAPACITY; }
// 工具方法:组合状态和线程数,生成ctl的新值
private static int ctlOf(int rs, int wc) { return rs | wc; }
Worker:线程复用的载体Worker是线程池的工作线程封装类,实现Runnable接口,底层持有一个线程对象和待执行任务,核心作用是让单个线程循环执行多个任务,实现线程复用。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // 工作线程(由ThreadFactory创建)
Runnable firstTask; // 初始化时的首个任务(可为null)
long completedTasks; // 该线程完成的任务数(统计用)
Worker(Runnable firstTask) {
setState(-1); // 初始化同步状态,避免线程运行前被中断
this.firstTask = firstTask;
// 用线程工厂创建线程,将Worker自身作为Runnable传入
this.thread = getThreadFactory().newThread(this);
}
// 线程启动后执行的核心方法:循环获取任务并执行
public void run() {
runWorker(this); // 委托给外部方法实现,解耦
}
// 省略AQS相关的锁方法(用于中断控制)
}
关键逻辑:线程启动后,执行Worker.run(),进而调用runWorker(),在该方法中循环从任务队列获取任务,直到无法获取为止,实现线程复用。
private final BlockingQueue<Runnable> workQueue; // 任务阻塞队列(生产者-消费者的缓冲区)
private final ReentrantLock mainLock = new ReentrantLock(); // 全局锁:保护workers等临界资源
private final HashSet<Worker> workers = new HashSet<>(); // 存储所有工作线程,仅在持有mainLock时操作
private volatile RejectedExecutionHandler handler; // 拒绝策略(队列满+最大线程数满时触发)
private volatile int corePoolSize; // 核心线程数
private volatile int maximumPoolSize; // 最大线程数
private volatile long keepAliveTime; // 非核心线程空闲超时时间
execute(Runnable command)源码全解析execute()是线程池提交任务的核心入口,所有任务最终都会通过此方法进入线程池,其源码逻辑严格遵循线程池的核心工作流程,层层判断、步步降级,同时通过「CAS 无锁操作优先,加锁兜底」的设计保证并发安全。
jdk8源码
public void execute(Runnable command) {
// 校验:任务为null直接抛空指针异常
if (command == null)
throw new NullPointerException();
// 步骤1:获取当前ctl值(状态+线程数),进入核心判断逻辑
int c = ctl.get();
// 分支1:当前活跃线程数 < 核心线程数(workerCountOf(c) < corePoolSize)
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程执行任务,第二个参数true表示「核心线程」
if (addWorker(command, true))
return; // 创建成功,直接返回,任务由新线程执行
// 创建失败(如线程池状态已变更、CAS修改线程数失败),重新获取ctl值
c = ctl.get();
}
// 分支2:核心线程数已满,判断线程池是否为RUNNING状态,且任务入队成功
if (isRunning(c) && workQueue.offer(command)) {
// 二次校验:任务入队后,再次检查线程池状态(防止入队后线程池被关闭)
int recheck = ctl.get();
// 校验1:若线程池已非RUNNING状态,将任务从队列移除
if (! isRunning(recheck) && remove(command))
// 移除成功,触发拒绝策略
reject(command);
// 校验2:若线程池仍在运行,但当前活跃线程数为0(无线程执行队列任务)
else if (workerCountOf(recheck) == 0)
// 创建空任务的非核心线程(false),保证队列有线程消费
addWorker(null, false);
}
// 分支3:核心线程满+队列满,尝试创建非核心线程执行任务(false表示「非核心线程」)
else if (!addWorker(command, false))
// 非核心线程创建失败(达到maximumPoolSize),触发拒绝策略
reject(command);
}
ctl的修改先尝试 CAS 无锁操作,失败后才会加锁,最大限度减少锁竞争;workQueue.offer())是非原子的,可能存在「入队成功后,线程池被其他线程关闭」的情况,此时必须移除任务并触发拒绝;firstTask=null的工作线程,目的是保证线程池有消费者线程处理队列中的任务,避免任务积压;core区分「核心线程」和「非核心线程」,创建逻辑会根据该参数判断是否受corePoolSize/maximumPoolSize限制。execute()的核心逻辑依赖 addWorker(Runnable firstTask, boolean core) 方法(创建工作线程)和 reject(Runnable command) 方法(触发拒绝策略),这两个方法是连接execute()和线程池底层实现的关键。
作用:尝试创建一个新的Worker工作线程,启动线程并执行任务;若创建失败(如线程池状态非法、线程数达到上限),返回 false。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 校验线程池状态:非RUNNING状态下,仅SHUTDOWN+队列非空+firstTask=null时允许创建线程
// (即SHUTDOWN状态下,只允许创建空任务线程处理队列剩余任务,不允许处理新任务)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 循环CAS修改活跃线程数:直到修改成功或状态非法
for (;;) {
int wc = workerCountOf(c);
// 校验线程数上限:core=true则受corePoolSize限制,否则受maximumPoolSize限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加活跃线程数,成功则跳出双层循环,进入创建Worker逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS失败,重新获取ctl值,判断状态是否变更
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
// 以下逻辑:创建Worker并启动线程(需持有mainLock保护workers集合)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建Worker,封装任务和线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁:防止多线程同时修改workers
try {
// 再次校验状态:加锁后确保线程池未被关闭
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 线程已启动,抛非法状态异常
throw new IllegalThreadStateException();
workers.add(w); // 将Worker加入workers集合
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新线程池历史最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock(); // 释放锁
}
if (workerAdded) {
t.start(); // 启动工作线程:执行Worker.run() -> runWorker()
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 线程启动失败,回滚:从workers移除Worker,CAS减少活跃线程数
addWorkerFailed(w);
}
return workerStarted;
}
workers是普通 HashSet,非线程安全,必须加mainLock保证添加 / 移除的原子性;t.start()后,执行Worker.run()方法,最终进入runWorker()实现任务循环执行。作用:当「队列满 + 最大线程数满」时,执行指定的拒绝策略,底层直接调用RejectedExecutionHandler的rejectedExecution方法。
final void reject(Runnable command) {
// 调用自定义/内置的拒绝策略实现
handler.rejectedExecution(command, this);
}
作用:Worker线程启动后的核心执行方法,实现单个线程循环获取任务、执行任务,是线程池「线程复用」的底层核心。
核心源码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 释放锁:允许线程被中断
boolean completedAbruptly = true; // 标记是否因异常突然结束
try {
// 核心循环:获取任务(getTask())直到返回null
// 任务来源:初始化的firstTask → 任务队列workQueue
while (task != null || (task = getTask()) != null) {
w.lock(); // 加锁:防止线程在执行任务时被中断(除非线程池处于STOP状态)
// 校验线程池状态:若处于STOP/TIDYING/TERMINATED,中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 执行前钩子方法(自定义扩展)
Throwable thrown = null;
try {
task.run(); // 执行任务核心逻辑(最终调用用户的run()/call())
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 执行后钩子方法(自定义扩展)
}
} finally {
task = null; // 置空任务,准备获取下一个
w.completedTasks++; // 统计完成任务数
w.unlock(); // 释放锁
}
}
completedAbruptly = false; // 正常结束循环(getTask()返回null)
} finally {
// 线程退出处理:销毁Worker,更新活跃线程数,尝试终止线程池
processWorkerExit(w, completedAbruptly);
}
}
while (task != null || (task = getTask()) != null)实现,只要能从getTask()获取到任务,线程就不会退出;firstTask,执行完成后,从任务队列workQueue循环获取任务;getTask()会根据线程池状态和线程类型(核心 / 非核心)阻塞获取任务—— 核心线程会一直阻塞(除非设置allowCoreThreadTimeOut(true)),非核心线程超时阻塞,直到有新任务入队或超时销毁;getTask()返回 null(如线程池关闭、非核心线程超时、线程数超过最大线程数),此时线程执行processWorkerExit()销毁,循环结束。作用:为工作线程提供任务,实现阻塞获取、超时控制、线程数校验,是控制线程「存活 / 销毁」的关键。
private Runnable getTask() {
boolean timedOut = false; // 标记是否超时获取任务
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 校验线程池状态:若为SHUTDOWN且队列为空,或为STOP及以上,返回null(线程销毁)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // CAS减少活跃线程数
return null;
}
int wc = workerCountOf(c);
// 判断是否需要超时控制:核心线程(allowCoreThreadTimeOut=true)或非核心线程,需超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 校验是否需要销毁线程:线程数超最大数 || (超时控制且已超时),返回null
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 阻塞获取任务:timed=true则超时阻塞(keepAliveTime),否则一直阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; // 获取到任务,返回
timedOut = true; // 未获取到任务,标记超时
} catch (InterruptedException retry) {
timedOut = false; // 被中断,重置超时标记,重新循环
}
}
}
timed标记区分 —— 核心线程默认不超时,非核心线程必超时;poll(timeout, unit)(超时阻塞)和take()(永久阻塞)实现,保证线程在无任务时不会空转;结合以上所有源码,将线程池从「任务提交」到「任务执行 / 拒绝」的全流程梳理为6 个核心步骤,每一步都对应源码的具体逻辑,形成完整的闭环:
调用executor.execute(Runnable task)提交任务,首先校验任务是否为 null,若为 null 直接抛NullPointerException;若合法,获取当前ctl值(包含线程池状态和活跃线程数),进入核心判断。
通过workerCountOf(c) < corePoolSize判断当前活跃线程数是否小于核心线程数:
addWorker(task, true)创建核心线程执行任务,若创建成功(CAS 修改线程数成功 + 线程启动成功),直接返回,任务由新线程执行;判断两个条件:isRunning(c)(线程池为 RUNNING 状态)+ workQueue.offer(task)(任务入队成功):
若都满足:执行
二次校验
(重新获取ctl),防止入队后线程池被关闭:
reject(task)触发拒绝策略;addWorker(null, false)创建空任务非核心线程,保证队列有线程消费;若任一条件不满足(队列满或线程池非 RUNNING),进入步骤 4。
调用addWorker(task, false)尝试创建非核心线程执行任务(受maximumPoolSize限制):
maximumPoolSize,即「核心线程满 + 队列满 + 最大线程数满」),进入步骤 5。调用reject(task),底层执行handler.rejectedExecution(task, this),根据指定的拒绝策略(内置 / 自定义)处理被拒绝的任务,流程结束。
无论创建核心 / 非核心线程,线程启动后都会进入runWorker()方法,实现线程复用:
firstTask,执行完成后置空;getTask()从队列阻塞获取任务(核心线程永久阻塞,非核心线程超时阻塞);task.run(),执行完成后继续循环;getTask()返回 null(线程池关闭、非核心线程超时、线程数超最大数),线程执行processWorkerExit()销毁,结束生命周期。