爆米花烘焙
66.80M · 2026-04-14
CountDownLatch 是 Java 并发包(java.util.concurrent)中一个常用的同步辅助类。它允许一个或多个线程等待,直到其他线程中执行的一组操作完成。其核心思想是一个倒计数锁存器:初始化时设置一个正数计数值,每次调用 countDown() 方法使计数值减 1,当计数值减至 0 时,所有因调用 await() 而阻塞的线程被唤醒并继续执行。
CountDownLatch 具有一次性(不可重用)、倒计数、共享锁语义等核心特性,内部基于 AbstractQueuedSynchronizer(AQS)实现。它非常适合“主线程等待多个子任务完成后再汇总”、“模拟高并发同时启动”等场景。本文将依次分析其核心特性与源码实现、所有公开方法说明、关键流程的时序图、实际应用代码示例、吞吐量影响因素以及使用中的注意事项,帮助读者深入理解并正确使用这一工具。
特性描述
CountDownLatch 的计数器只能递减,一旦减到 0,就不能再被重置或增加。此后任何线程调用 await() 都不会阻塞,调用 countDown() 也不会有任何效果(但不会抛出异常)。若需要可重用的同步屏障,应使用 CyclicBarrier。
源码实现解释
在内部类 Sync 中,tryReleaseShared 方法负责递减计数器。代码如下:
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false; // 已经归零,不再修改状态
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0; // 只有变为 0 时才需要唤醒等待线程
}
}
state(计数器)已经是 0 时,方法直接返回 false,不会进行任何 CAS 操作,因此计数器不会变为负数,也无法“重新递增”。state 的值,构造器仅初始化一次。tryAcquireShared 的实现是 return (getState() == 0) ? 1 : -1;,一旦 state == 0,所有 await() 调用都会立即成功(返回 1),不再阻塞。特性描述
CountDownLatch 使用一个整型计数器表示需要等待的事件数量。每个事件完成时调用 countDown() 使计数器减 1;等待线程调用 await() 阻塞,直到计数器变为 0。计数器的递减是原子的,且对等待线程的可见性由 AQS 保证。
源码实现解释
Sync(int count) { setState(count); },将 AQS 的 state 字段初始化为给定的计数值。countDown() 方法内部调用 sync.releaseShared(1),进而调用 tryReleaseShared。该方法使用 CAS + 自旋 实现线程安全的递减:
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
这保证了即使多个线程同时调用 countDown(),计数器的值也能正确递减,不会出现丢失更新。await() 方法调用 sync.acquireSharedInterruptibly(1),内部先尝试 tryAcquireShared:若 state != 0 则失败,当前线程被包装成共享节点进入 AQS 等待队列并阻塞;若 state == 0 则直接通过。特性描述
CountDownLatch 允许多个线程同时等待同一个事件(计数器归零)。当事件发生时,所有等待线程应被同时唤醒,而不是只唤醒一个。这符合“一组操作完成”的语义,即所有等待线程都可以继续执行。
源码实现解释
tryAcquireShared 和 tryReleaseShared 是 AQS 中共享模式的方法,与独占模式(如 ReentrantLock)不同。
当最后一个 countDown() 将计数器从 1 减为 0 时,tryReleaseShared 返回 true。AQS 的 releaseShared 方法会调用 doReleaseShared(),该函数会:
SHARED,唤醒操作会传播(setHeadAndPropagate),继续唤醒下一个共享节点,直到队列中没有共享节点或条件不满足。await() 中返回。这种“一唤多”的机制正是共享锁的核心价值。
特性描述
CountDownLatch 提供了可中断的 await() 以及带超时的 await(long timeout, TimeUnit unit),允许等待线程响应中断或在指定时间内未被唤醒时自行返回。
源码实现解释
await() 内部调用 sync.acquireSharedInterruptibly(1),该方法在 AQS 中会响应中断:若当前线程在等待过程中被中断,会抛出 InterruptedException。await(long timeout, TimeUnit unit) 调用 sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)),同样可中断,且使用了 AQS 的超时阻塞机制(LockSupport.parkNanos)。如果超时前计数器仍未归零,方法返回 false;若归零则返回 true。下表列出 CountDownLatch 的所有公开方法及其用途:
| 方法签名 | 描述 |
|---|---|
CountDownLatch(int count) | 构造器,初始化计数器为 count。如果 count < 0,抛出 IllegalArgumentException。 |
void await() throws InterruptedException | 使当前线程等待,直到计数器归零。若当前线程在等待时被中断,则抛出 InterruptedException。 |
boolean await(long timeout, TimeUnit unit) throws InterruptedException | 使当前线程等待,直到计数器归零或超过指定的等待时间。返回 true 表示计数器归零;false 表示超时。同样响应中断。 |
void countDown() | 将计数器减 1。如果减后计数器变为 0,则唤醒所有等待线程。 |
long getCount() | 返回当前计数器的值。此方法主要用于调试或日志,不保证原子性(快照值)。 |
注意事项:
await() 和 countDown() 均可被多个线程调用。countDown() 在计数器已为 0 时调用不会产生任何效果(不会抛出异常,也不会修改状态)。getCount() 不能用于替代 await() 做业务判断,因为它存在竞态条件。CountDownLatch 并非从零实现同步机制,而是建立在 AbstractQueuedSynchronizer(AQS) 框架之上。AQS 是 java.util.concurrent 包中大多数同步器(如 ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier 内部使用的 ReentrantLock 等)的底层基石。理解二者的关系有助于深入掌握 CountDownLatch 的行为与性能特征。
CountDownLatch 内部定义了一个私有静态内部类 Sync,它继承自 AbstractQueuedSynchronizer:
java
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) { setState(count); }
int getCount() { return getState(); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
protected boolean tryReleaseShared(int releases) { ... } // 前面已分析
}
CountDownLatch 的所有同步操作(await、countDown、getCount)都委托给 Sync 实例完成,例如:
java
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
这种委派模式使得 CountDownLatch 可以复用 AQS 提供的线程排队、阻塞/唤醒、中断处理等底层机制,只需专注于实现 tryAcquireShared 和 tryReleaseShared 两个钩子方法即可。
AQS 为 CountDownLatch 提供了以下关键功能:
| AQS 能力 | 在 CountDownLatch 中的应用 |
|---|---|
| 同步状态(state)管理 | 使用 volatile int state 存储计数器值,提供 getState()、setState()、compareAndSetState() 原子操作。 |
| 等待队列(CLH 队列变种) | 管理所有因 await() 而阻塞的线程,实现公平(默认)或非公平的排队唤醒。 |
| 共享模式(Shared Mode) | 支持 acquireShared / releaseShared,实现“一唤多”语义。CountDownLatch 使用共享模式,因此计数器归零时能唤醒所有等待线程。 |
| 可中断阻塞 | acquireSharedInterruptibly 使 await() 能够响应 Thread.interrupt()。 |
| 超时阻塞 | tryAcquireSharedNanos 支持 await(long timeout, TimeUnit unit)。 |
| CAS 操作 | 保证 countDown() 的原子递减,避免数据竞争。 |
CountDownLatch 仅需实现 AQS 的两个抽象方法,就能获得完整的同步器功能:
tryAcquireShared(int arg) :
判断当前是否允许获取锁(即是否可以通过 await())。
实现为 return getState() == 0 ? 1 : -1;。
state == 0,返回正数(1),表示获取成功,await() 立即返回。state != 0,返回 -1,表示获取失败,AQS 会将当前线程加入等待队列并阻塞。tryReleaseShared(int arg) :
尝试释放一个共享单位(即计数器减 1)。
实现为自旋 CAS 递减 state,并返回 nextc == 0(即是否变为 0)。
true,AQS 会执行唤醒等待队列中所有共享节点的操作。false,AQS 仅完成状态修改,不唤醒任何线程。AQS 在共享模式下的释放操作(releaseShared)与独占模式不同。当 tryReleaseShared 返回 true 时,AQS 会调用 doReleaseShared(),该方法会:
SHARED,则继续尝试唤醒下一个共享节点(setHeadAndPropagate 传播)。这正是 CountDownLatch 能够“一次性唤醒所有等待线程”的根本原因。独占锁(如 ReentrantLock)的 release 只会唤醒头节点的后继节点(通常只有一个线程能获取锁)。
AQS 的 state 被声明为 volatile,所有 getState() 和 compareAndSetState() 操作都遵循 Java 内存模型的 volatile 规则。因此:
state 的修改(countDown() 中的 CAS)对其他线程立即可见(具有 happens-before 关系)。countDown() 将 state 从 1 改为 0 时,该写入与后续等待线程从 await() 返回后读取 state 或共享变量之间,自动建立 happens-before 关系。这就是为什么工作线程在 countDown() 之前对共享变量的修改,对主线程在 await() 返回后是可见的,无需额外同步。AQS 是一个通用的同步器框架,需要用户实现具体的获取/释放逻辑。CountDownLatch 通过 Sync 类封装了“倒计数锁存器”这一特定语义,提供了更友好的 API(await、countDown、getCount),并隐藏了 AQS 的复杂性(如 acquireShared、releaseShared 等底层方法)。直接使用 AQS 也可以实现类似功能,但需要编写大量重复代码,且容易出错。
text
┌─────────────────────────────────────────┐
│ CountDownLatch │
├─────────────────────────────────────────┤
│ - sync: Sync │
│ + await() → sync.acquireShared... │
│ + countDown()→ sync.releaseShared() │
│ + getCount() → sync.getCount() │
└─────────────────┬───────────────────────┘
│ 继承/委派
▼
┌─────────────────────────────────────────┐
│ Sync extends AbstractQueuedSynchronizer│
├─────────────────────────────────────────┤
│ + tryAcquireShared(int) │
│ + tryReleaseShared(int) │
└─────────────────┬───────────────────────┘
│ 使用
▼
┌─────────────────────────────────────────┐
│ AQS (底层框架) │
├─────────────────────────────────────────┤
│ - volatile int state │
│ - Node head, tail (CLH queue) │
│ + acquireSharedInterruptibly() │
│ + releaseShared() │
│ + compareAndSetState() │
│ + park() / unpark() │
└─────────────────────────────────────────┘
通过这种关系,CountDownLatch 在保持简洁 API 的同时,获得了 AQS 提供的强大、可靠、高性能的同步基础设施。
以一个主线程等待三个工作线程完成任务为例,展示 CountDownLatch 的交互过程。
sequenceDiagram
participant Main as 主线程
participant Latch as CountDownLatch<br/>(count=3)
participant Worker1 as 工作线程1
participant Worker2 as 工作线程2
participant Worker3 as 工作线程3
Main->>Latch: new CountDownLatch(3)
Main->>Worker1: start()
Main->>Worker2: start()
Main->>Worker3: start()
Main->>Latch: await()
Note over Main,Latch: state=3 ≠0 → 线程阻塞<br/>加入AQS等待队列
Worker1->>Latch: countDown()
Note over Latch: CAS: state 3→2<br/>未归零,不唤醒
Worker2->>Latch: countDown()
Note over Latch: CAS: state 2→1
Worker3->>Latch: countDown()
Note over Latch: CAS: state 1→0<br/>tryReleaseShared返回true
Latch-->>Main: 唤醒等待队列中所有线程
Note over Main: await()返回,继续执行
CountDownLatch 实例,state = 3,等待队列为空。countDown()。await():
tryAcquireShared 检查 state == 3,返回 -1(获取失败)。LockSupport.park(this) 挂起。countDown():
tryReleaseShared,通过 CAS 将 state 减 1。state 从 3→2、2→1,因为递减后不为 0,返回 false,不唤醒等待线程。state 从 1→0,返回 true。AQS 的 releaseShared 调用 doReleaseShared(),唤醒等待队列中的所有线程。park() 中恢复,重新尝试 tryAcquireShared,此时 state == 0,返回 1,await() 成功返回,继续执行后续逻辑。import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ParallelTaskExample {
public static void main(String[] args) throws InterruptedException {
int taskCount = 5;
CountDownLatch latch = new CountDownLatch(taskCount);
ExecutorService executor = Executors.newFixedThreadPool(taskCount);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 模拟耗时任务
Thread.sleep((long) (Math.random() * 1000));
System.out.println("Task " + taskId + " completed by " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 确保无论成功或失败都递减计数
}
});
}
// 主线程等待所有任务完成
latch.await();
System.out.println("All tasks done, proceeding to merge results...");
executor.shutdown();
}
}
分析:
countDown()。await() 阻塞直到所有子任务完成。finally 块保证即使任务抛出异常,计数器也能递减,避免主线程永久阻塞。使用两个 CountDownLatch:一个启动门(start latch),一个结束门(end latch)。
public class ConcurrentStartExample {
public static void main(String[] args) throws InterruptedException {
int threadNum = 10;
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
new Thread(() -> {
try {
startLatch.await(); // 所有线程在此等待,同时出发
// 执行并发任务
System.out.println(Thread.currentThread().getName() + " is running");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}
// 准备工作完成,发令枪响
System.out.println("Ready... Go!");
startLatch.countDown();
// 等待所有线程执行完毕
endLatch.await();
System.out.println("All threads finished.");
}
}
分析:
startLatch 初始为 1,所有工作线程调用 await() 阻塞。主线程调用 countDown() 后,所有工作线程同时被唤醒,达到“并发启动”的效果。endLatch 用于主线程等待所有工作线程结束。CountDownLatch 的吞吐量(单位时间内能处理的同步事件数)受底层实现和竞争程度影响,需要从以下三个方面分析“为什么”会存在性能开销。
CAS 自旋操作
countDown() 调用都会执行一次 CAS 循环。在低竞争下,CAS 非常快(约 50~100 ns);但当多个线程同时调用 countDown() 时,CAS 可能失败并重试,导致 CPU 使用率升高。state 已变为 0,重试时会检测到 c == 0 直接返回 false,开销较小)。线程阻塞与唤醒(park / unpark)
await() 且计数器未归零时,线程会进入等待队列并被 park() 挂起。这涉及用户态到内核态的切换,延迟约 1~10 μs。countDown() 需要唤醒所有等待线程。AQS 的共享模式唤醒会遍历等待队列,逐个调用 unpark。若等待线程数量很大(如数千个),唤醒开销线性增长,可能导致短暂的“雷鸣效应”但已优化为批量唤醒。内存屏障
在高并发系统中(例如每秒处理数万请求的服务器),若每个请求都使用 CountDownLatch 进行同步,await() 和 countDown() 的调用频率极高。不恰当的使用(例如大量线程同时等待同一个锁存器)会导致上下文切换风暴,严重降低系统吞吐量。
量化示例:
countDown() 平均耗时 200 ns(CAS 成功),一次 await() 阻塞+唤醒耗时 5 μs。countDown() 唤醒总耗时 ≈ 1000 × 5 μs = 5 ms。在此期间,CPU 忙于处理唤醒和线程调度,可能影响其他任务。await() 同一个 CountDownLatch。考虑使用更细粒度的同步(如分阶段 CompletableFuture)或异步回调。countDown() 的 CAS 循环和状态检查会增加开销。await(long timeout, TimeUnit unit) 可以避免线程无限阻塞,配合重试机制提高系统弹性。Phaser 支持批量操作和动态注册,扩展性更好;CompletableFuture 适合异步流水线。现象:计数器归零后,再调用 await() 不会阻塞,调用 countDown() 无效。
原因:tryReleaseShared 在 c == 0 时直接返回 false,不修改状态;tryAcquireShared 在 state == 0 时返回成功。要再次同步必须创建新实例。
countDown() 调用次数等于初始计数现象:若调用次数少于初始计数,await() 永远阻塞;若多于初始计数,多余的调用无影响但浪费 CPU。
原因:内部状态机严格递减到 0,未归零时不会唤醒等待线程。
最佳实践:将 countDown() 放在 finally 块中,确保异常时也能递减。
await() 会响应中断现象:等待线程在 await() 期间被中断,会抛出 InterruptedException 并退出等待。
原因:acquireSharedInterruptibly 设计为可响应中断,支持线程取消/关闭。
处理:调用者应正确处理中断(例如恢复中断状态或执行清理)。
countDown() 前后执行重量级操作现象:在 countDown() 之前/之后执行大量 I/O 或计算,会延迟计数器归零的时间,进而推迟等待线程的唤醒。
原因:countDown() 本身只是原子递减,但用户代码的耗时操作会阻塞当前线程。
建议:让 countDown() 作为子任务最后一步(如在 finally 中仅调用 countDown())。
getCount() 不应用于业务判断现象:使用 if (latch.getCount() == 0) { ... } 存在竞态条件——判断后到真正操作前,计数可能被其他线程改变。
原因:getCount() 只是一个无同步的快照值。
正确做法:直接使用 await() 阻塞等待,或使用 await(0, TimeUnit.NANOSECONDS) 进行非阻塞检测(该方法会立即返回当前是否归零)。
现象:工作线程修改的共享变量,在主线程 await() 返回后是否一定可见?
原因:CountDownLatch 基于 AQS,其 state 是 volatile 变量,且 CAS 操作具有与 volatile 相同的内存语义。根据 Java 内存模型,countDown() 中的 releaseShared 会建立 happens-before 关系:线程在 countDown() 之前对共享变量的写入,对等待线程在 await() 成功返回后的读取是可见的。
结论:无需额外同步即可保证可见性(前提是不通过非 volatile 路径破坏)。
CountDownLatch 是一个轻量级且易于使用的同步工具,其一次性倒计数和共享唤醒特性使其在“等待多个事件完成”的场景下表现出色。通过理解其 AQS 实现原理、掌握方法语义、关注吞吐量影响因素并规避常见陷阱,开发者可以高效、安全地利用 CountDownLatch 构建高并发程序。对于需要重用的场景,请考虑 CyclicBarrier 或 Phaser 作为替代。