Java 线程池工作流程源码深度分析(基于 ThreadPoolExecutor)

一、核心前置知识

ThreadPoolExecutor的 3 个核心基础

  1. 核心原子变量ctl:状态 + 线程数二合一
// 核心原子变量:32位int,高3位存运行状态,低29位存当前活跃线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程数的位数:32-3=29,限制最大活跃线程数为2^29-1
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程数掩码:低29位全1,用于提取活跃线程数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池5种运行状态(高3位,数值从大到小,状态不可回退)
private static final int RUNNING    = -1 << COUNT_BITS; // 接收新任务+处理队列任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 不接收新任务+处理队列任务
private static final int STOP       =  1 << COUNT_BITS; // 不接收新任务+不处理队列任务+中断执行中任务
private static final int TIDYING    =  2 << COUNT_BITS; // 所有任务执行完,活跃线程数为0,准备进入终止
private static final int TERMINATED =  3 << COUNT_BITS; // 终止状态,执行完terminated()钩子

// 工具方法:从ctl中提取运行状态(高3位)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 工具方法:从ctl中提取活跃线程数(低29位)
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 工具方法:组合状态和线程数,生成ctl的新值
private static int ctlOf(int rs, int wc) { return rs | wc; }
  1. 核心内部类Worker:线程复用的载体

Worker是线程池的工作线程封装类,实现Runnable接口,底层持有一个线程对象和待执行任务,核心作用是让单个线程循环执行多个任务,实现线程复用

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread; // 工作线程(由ThreadFactory创建)
    Runnable firstTask;  // 初始化时的首个任务(可为null)
    long completedTasks; // 该线程完成的任务数(统计用)

    Worker(Runnable firstTask) {
        setState(-1); // 初始化同步状态,避免线程运行前被中断
        this.firstTask = firstTask;
        // 用线程工厂创建线程,将Worker自身作为Runnable传入
        this.thread = getThreadFactory().newThread(this);
    }

    // 线程启动后执行的核心方法:循环获取任务并执行
    public void run() {
        runWorker(this); // 委托给外部方法实现,解耦
    }

    // 省略AQS相关的锁方法(用于中断控制)
}

关键逻辑:线程启动后,执行Worker.run(),进而调用runWorker(),在该方法中循环从任务队列获取任务,直到无法获取为止,实现线程复用。

  1. 核心成员变量
private final BlockingQueue<Runnable> workQueue; // 任务阻塞队列(生产者-消费者的缓冲区)
private final ReentrantLock mainLock = new ReentrantLock(); // 全局锁:保护workers等临界资源
private final HashSet<Worker> workers = new HashSet<>(); // 存储所有工作线程,仅在持有mainLock时操作
private volatile RejectedExecutionHandler handler; // 拒绝策略(队列满+最大线程数满时触发)
private volatile int corePoolSize; // 核心线程数
private volatile int maximumPoolSize; // 最大线程数
private volatile long keepAliveTime; // 非核心线程空闲超时时间

二、核心方法execute(Runnable command)源码全解析

execute()是线程池提交任务的核心入口,所有任务最终都会通过此方法进入线程池,其源码逻辑严格遵循线程池的核心工作流程,层层判断、步步降级,同时通过「CAS 无锁操作优先,加锁兜底」的设计保证并发安全。

jdk8源码

public void execute(Runnable command) {
    // 校验:任务为null直接抛空指针异常
    if (command == null)
        throw new NullPointerException();
    
    // 步骤1:获取当前ctl值(状态+线程数),进入核心判断逻辑
    int c = ctl.get();
    
    // 分支1:当前活跃线程数 < 核心线程数(workerCountOf(c) < corePoolSize)
    if (workerCountOf(c) < corePoolSize) {
        // 创建核心线程执行任务,第二个参数true表示「核心线程」
        if (addWorker(command, true))
            return; // 创建成功,直接返回,任务由新线程执行
        // 创建失败(如线程池状态已变更、CAS修改线程数失败),重新获取ctl值
        c = ctl.get();
    }
    
    // 分支2:核心线程数已满,判断线程池是否为RUNNING状态,且任务入队成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 二次校验:任务入队后,再次检查线程池状态(防止入队后线程池被关闭)
        int recheck = ctl.get();
        // 校验1:若线程池已非RUNNING状态,将任务从队列移除
        if (! isRunning(recheck) && remove(command))
            // 移除成功,触发拒绝策略
            reject(command);
        // 校验2:若线程池仍在运行,但当前活跃线程数为0(无线程执行队列任务)
        else if (workerCountOf(recheck) == 0)
            // 创建空任务的非核心线程(false),保证队列有线程消费
            addWorker(null, false);
    }
    
    // 分支3:核心线程满+队列满,尝试创建非核心线程执行任务(false表示「非核心线程」)
    else if (!addWorker(command, false))
        // 非核心线程创建失败(达到maximumPoolSize),触发拒绝策略
        reject(command);
}

源码关键细节解读

  1. 空任务校验:最外层直接校验任务是否为 null,符合 Java 的空指针快速失败原则;
  2. CAS 优先原则:所有对ctl的修改先尝试 CAS 无锁操作,失败后才会加锁,最大限度减少锁竞争;
  3. 二次校验的必要性:任务入队后必须重新检查线程池状态 —— 因为入队操作(workQueue.offer())是非原子的,可能存在「入队成功后,线程池被其他线程关闭」的情况,此时必须移除任务并触发拒绝;
  4. 空任务线程创建:当活跃线程数为 0 时,创建firstTask=null的工作线程,目的是保证线程池有消费者线程处理队列中的任务,避免任务积压;
  5. addWorker 的双重含义:第二个参数core区分「核心线程」和「非核心线程」,创建逻辑会根据该参数判断是否受corePoolSize/maximumPoolSize限制。

三、关键辅助方法解析(execute () 的核心依赖)

execute()的核心逻辑依赖 addWorker(Runnable firstTask, boolean core) 方法(创建工作线程)和 reject(Runnable command) 方法(触发拒绝策略),这两个方法是连接execute()和线程池底层实现的关键。

1. addWorker ():创建工作线程的核心方法

作用:尝试创建一个新的Worker工作线程,启动线程并执行任务;若创建失败(如线程池状态非法、线程数达到上限),返回 false。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 校验线程池状态:非RUNNING状态下,仅SHUTDOWN+队列非空+firstTask=null时允许创建线程
        // (即SHUTDOWN状态下,只允许创建空任务线程处理队列剩余任务,不允许处理新任务)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 循环CAS修改活跃线程数:直到修改成功或状态非法
        for (;;) {
            int wc = workerCountOf(c);
            // 校验线程数上限:core=true则受corePoolSize限制,否则受maximumPoolSize限制
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS增加活跃线程数,成功则跳出双层循环,进入创建Worker逻辑
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // CAS失败,重新获取ctl值,判断状态是否变更
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // 以下逻辑:创建Worker并启动线程(需持有mainLock保护workers集合)
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); // 创建Worker,封装任务和线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); // 加锁:防止多线程同时修改workers
            try {
                // 再次校验状态:加锁后确保线程池未被关闭
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 线程已启动,抛非法状态异常
                        throw new IllegalThreadStateException();
                    workers.add(w); // 将Worker加入workers集合
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s; // 更新线程池历史最大线程数
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock(); // 释放锁
            }
            if (workerAdded) {
                t.start(); // 启动工作线程:执行Worker.run() -> runWorker()
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 线程启动失败,回滚:从workers移除Worker,CAS减少活跃线程数
            addWorkerFailed(w);
    }
    return workerStarted;
}
核心逻辑梳理
  1. 状态前置校验:非 RUNNING 状态下,仅允许创建「空任务线程」处理队列剩余任务,拒绝所有新任务的处理;
  2. CAS 循环修改线程数:双层 for 循环保证 CAS 修改成功 —— 外层处理状态变更,内层处理线程数 CAS 竞争;
  3. 加锁保护 workersworkers是普通 HashSet,非线程安全,必须加mainLock保证添加 / 移除的原子性;
  4. 线程启动与回滚:线程启动成功则完成创建,失败则执行回滚操作,避免资源泄漏;
  5. 线程启动的本质t.start()后,执行Worker.run()方法,最终进入runWorker()实现任务循环执行

2. reject ():触发拒绝策略的核心方法

作用:当「队列满 + 最大线程数满」时,执行指定的拒绝策略,底层直接调用RejectedExecutionHandlerrejectedExecution方法。

final void reject(Runnable command) {
    // 调用自定义/内置的拒绝策略实现
    handler.rejectedExecution(command, this);
}

3. runWorker ():线程复用的核心方法

作用Worker线程启动后的核心执行方法,实现单个线程循环获取任务、执行任务,是线程池「线程复用」的底层核心。

核心源码

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 释放锁:允许线程被中断
    boolean completedAbruptly = true; // 标记是否因异常突然结束
    try {
        // 核心循环:获取任务(getTask())直到返回null
        // 任务来源:初始化的firstTask → 任务队列workQueue
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 加锁:防止线程在执行任务时被中断(除非线程池处于STOP状态)
            // 校验线程池状态:若处于STOP/TIDYING/TERMINATED,中断当前线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 执行前钩子方法(自定义扩展)
                Throwable thrown = null;
                try {
                    task.run(); // 执行任务核心逻辑(最终调用用户的run()/call())
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 执行后钩子方法(自定义扩展)
                }
            } finally {
                task = null; // 置空任务,准备获取下一个
                w.completedTasks++; // 统计完成任务数
                w.unlock(); // 释放锁
            }
        }
        completedAbruptly = false; // 正常结束循环(getTask()返回null)
    } finally {
        // 线程退出处理:销毁Worker,更新活跃线程数,尝试终止线程池
        processWorkerExit(w, completedAbruptly);
    }
}
线程复用的核心原理
  1. 无限循环获取任务:通过while (task != null || (task = getTask()) != null)实现,只要能从getTask()获取到任务,线程就不会退出;
  2. 任务来源降级:先执行初始化的firstTask,执行完成后,从任务队列workQueue循环获取任务;
  3. 线程不销毁的关键getTask()会根据线程池状态和线程类型(核心 / 非核心)阻塞获取任务—— 核心线程会一直阻塞(除非设置allowCoreThreadTimeOut(true)),非核心线程超时阻塞,直到有新任务入队或超时销毁;
  4. 循环退出条件getTask()返回 null(如线程池关闭、非核心线程超时、线程数超过最大线程数),此时线程执行processWorkerExit()销毁,循环结束。

4. getTask ():从队列获取任务的核心方法

作用:为工作线程提供任务,实现阻塞获取、超时控制、线程数校验,是控制线程「存活 / 销毁」的关键。

private Runnable getTask() {
    boolean timedOut = false; // 标记是否超时获取任务

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 校验线程池状态:若为SHUTDOWN且队列为空,或为STOP及以上,返回null(线程销毁)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount(); // CAS减少活跃线程数
            return null;
        }

        int wc = workerCountOf(c);
        // 判断是否需要超时控制:核心线程(allowCoreThreadTimeOut=true)或非核心线程,需超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 校验是否需要销毁线程:线程数超最大数 || (超时控制且已超时),返回null
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 阻塞获取任务:timed=true则超时阻塞(keepAliveTime),否则一直阻塞
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r; // 获取到任务,返回
            timedOut = true; // 未获取到任务,标记超时
        } catch (InterruptedException retry) {
            timedOut = false; // 被中断,重置超时标记,重新循环
        }
    }
}
关键逻辑
  1. 状态校验优先:非 RUNNING 状态下,若队列无任务则直接返回 null,销毁线程;
  2. 超时控制判断:通过timed标记区分 —— 核心线程默认不超时,非核心线程必超时;
  3. 阻塞获取方式:使用阻塞队列的poll(timeout, unit)(超时阻塞)和take()(永久阻塞)实现,保证线程在无任务时不会空转;
  4. 线程销毁条件:线程数超过最大线程数,或超时未获取到任务,返回 null,触发线程销毁。

四、线程池完整工作流程(源码视角梳理)

结合以上所有源码,将线程池从「任务提交」到「任务执行 / 拒绝」的全流程梳理为6 个核心步骤,每一步都对应源码的具体逻辑,形成完整的闭环:

步骤 1:任务提交与基础校验

调用executor.execute(Runnable task)提交任务,首先校验任务是否为 null,若为 null 直接抛NullPointerException;若合法,获取当前ctl值(包含线程池状态和活跃线程数),进入核心判断。

步骤 2:核心线程池判断(创建核心线程)

通过workerCountOf(c) < corePoolSize判断当前活跃线程数是否小于核心线程数:

  • 若是:调用addWorker(task, true)创建核心线程执行任务,若创建成功(CAS 修改线程数成功 + 线程启动成功),直接返回,任务由新线程执行;
  • 若否(核心线程数已满),或创建失败(如线程池状态变更、CAS 竞争失败),进入步骤 3。

步骤 3:任务队列判断(任务入队)

判断两个条件:isRunning(c)(线程池为 RUNNING 状态)+ workQueue.offer(task)(任务入队成功):

  • 若都满足:执行

    二次校验

    (重新获取ctl),防止入队后线程池被关闭:

    1. 若线程池已非 RUNNING 状态:将任务从队列移除,调用reject(task)触发拒绝策略;
    2. 若线程池仍在运行,但活跃线程数为 0:调用addWorker(null, false)创建空任务非核心线程,保证队列有线程消费;
  • 若任一条件不满足(队列满或线程池非 RUNNING),进入步骤 4。

步骤 4:最大线程池判断(创建非核心线程)

调用addWorker(task, false)尝试创建非核心线程执行任务(受maximumPoolSize限制):

  • 若创建成功:线程启动后执行任务,流程结束;
  • 若创建失败(活跃线程数达到maximumPoolSize,即「核心线程满 + 队列满 + 最大线程数满」),进入步骤 5。

步骤 5:触发拒绝策略

调用reject(task),底层执行handler.rejectedExecution(task, this),根据指定的拒绝策略(内置 / 自定义)处理被拒绝的任务,流程结束。

步骤 6:任务执行与线程复用(底层闭环)

无论创建核心 / 非核心线程,线程启动后都会进入runWorker()方法,实现线程复用

  1. 先执行初始化的firstTask,执行完成后置空;
  2. 循环调用getTask()从队列阻塞获取任务(核心线程永久阻塞,非核心线程超时阻塞);
  3. 每获取到一个任务,就执行task.run(),执行完成后继续循环;
  4. 直到getTask()返回 null(线程池关闭、非核心线程超时、线程数超最大数),线程执行processWorkerExit()销毁,结束生命周期。
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com