在 Java 并发编程中,CyclicBarrierCountDownLatchSemaphore 是三个最常用的同步辅助类。它们各自解决不同场景下的线程协调问题,但初看有些相似。本文将从源码层面深入剖析它们的实现原理、性能差异和适用场景,帮助你在实际开发中做出正确选型。

一、概述

工具核心作用典型应用场景一句话总结
CyclicBarrier让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障,所有被阻塞的线程才能继续执行。多线程分阶段计算(如并行处理后再聚合)、多线程数据加载后再统一处理。可重用的多线程等待点
CountDownLatch一个或多个线程等待其他线程完成一组操作后才继续执行。主线程等待所有子任务完成、并行任务启动前的同步。一次性倒计时门闩
Semaphore控制同时访问某个特定资源的线程数量,相当于一个计数器(许可集)。数据库连接池限流、限流器、有界缓冲区的互斥访问。信号量限流

二、核心方法说明

CyclicBarrier 主要方法

方法签名参数返回值异常
CyclicBarrier(int parties)parties:屏障拦截的线程数量--
CyclicBarrier(int parties, Runnable barrierAction)parties:屏障拦截的线程数量;barrierAction:当所有线程到达屏障后优先执行的命令--
int await()当前线程到达屏障的索引(0 表示最后一个到达)InterruptedExceptionBrokenBarrierException
int await(long timeout, TimeUnit unit)timeout:超时时间;unit:时间单位await()InterruptedExceptionBrokenBarrierExceptionTimeoutException
void reset()--
int getNumberWaiting()当前正在屏障处等待的线程数-
boolean isBroken()屏障是否被破坏-

CountDownLatch 主要方法

方法签名参数返回值异常
CountDownLatch(int count)count:需要倒计数的次数(必须大于 0)--
void await()-InterruptedException
boolean await(long timeout, TimeUnit unit)timeout:超时时间;unit:时间单位如果计数到达 0 返回 true,超时返回 falseInterruptedException
void countDown()--
long getCount()当前计数-

Semaphore 主要方法

方法签名参数返回值异常
Semaphore(int permits)permits:初始许可数量--
Semaphore(int permits, boolean fair)permits:初始许可数量;fair:是否公平模式--
void acquire()-InterruptedException
void acquire(int permits)permits:需要获取的许可数-InterruptedException
void release()--
void release(int permits)permits:释放的许可数--
boolean tryAcquire()获取成功返回 true,否则 false-
int availablePermits()当前可用的许可数-

三、详细对比(核心维度)

1. 可重用性

  • CyclicBarrier:可重用。当所有线程都到达屏障后,屏障会自动重置,可以再次使用。通过 reset() 方法也可以手动重置。
    源码体现CyclicBarrier 内部维护 int count(剩余未到达线程数)和 int parties(总线程数)。每次 await() 减少 count,当 count == 0 时触发 barrierAction 并执行 nextGeneration():重置 count = parties,唤醒所有等待线程。

  • CountDownLatch:一次性。计数器的值只能递减,当计数变为 0 后,所有等待线程被唤醒,之后 await() 将立即返回,但计数器无法重置。
    源码体现CountDownLatch.Sync 继承 AQSstate 表示计数。tryReleaseShared 每次将 state 减 1,直到 0;一旦为 0,后续 countDown() 无任何效果。

  • Semaphore:可重用。许可的获取和释放是动态的,可以反复增减,不存在“一次性”限制。

2. 计数器的变化方向

  • CyclicBarrier:计数从 parties 递减到 0。每调用一次 await(),剩余等待线程数减 1。当减到 0 时触发屏障动作并重置。
  • CountDownLatch:计数从初始 count 递减到 0。每次 countDown() 减 1,减到 0 后唤醒等待线程。
  • Semaphore:许可数可以增加或减少。acquire() 减少许可,release() 增加许可。没有固定的归零点,许可数可以为 0 甚至负数(当释放多于获取时)。

3. 是否支持“释放”或“增加”操作

  • Semaphore:明确支持 release() 增加许可。即使一个线程从未 acquire(),也可以 release() 增加许可,这可能导致许可数超出初始值。
  • CyclicBarrier:不支持主动释放或增加计数。计数只能通过线程到达屏障被动递减,重置时恢复初始值。
  • CountDownLatch:不支持增加计数。只支持单向递减。

4. 阻塞与唤醒机制(使用的 AQS 模式)

工具AQS 模式实现方式
CyclicBarrier不使用 AQS内部组合 ReentrantLock + Conditiontrip),使用独占锁和条件队列实现阻塞/唤醒。
CountDownLatch共享模式内部类 Sync 继承 AQS,重写 tryAcquireShared / tryReleaseShared
Semaphore共享模式内部类 Sync 继承 AQS,公平/非公平版本分别重写 tryAcquireShared

源码依据

  • CountDownLatch.Sync.tryAcquireShared:返回 (getState() == 0) ? 1 : -1。只有当 state == 0 时获取成功,否则阻塞。
  • Semaphore.NonfairSync.tryAcquireShared:直接尝试 CAS 减少 stateFairSync 则会先检查队列中是否有更早的等待线程。

5. 是否支持屏障动作(Barrier Action)

  • CyclicBarrier:唯一支持屏障动作的工具。可以在构造函数中传入 Runnable barrierAction,当最后一个线程到达屏障时,该动作会在所有被唤醒线程执行前由最后一个到达的线程执行。
  • CountDownLatch:不支持。当计数归零时,仅唤醒等待线程,没有动作回调。
  • Semaphore:不支持。

6. 典型使用场景

工具典型场景
CyclicBarrier分阶段计算(如 MapReduce 中的 barrier)、多线程同时开始执行(模拟并发请求)、多线程数据加载完成后统一处理。
CountDownLatch主线程等待多个子任务完成后再继续(如批量初始化)、并行任务启动信号(所有 worker 等待 startLatch)。
Semaphore控制数据库连接池的最大连接数、限流器(限制 QPS)、有界阻塞队列中的互斥写入。

四、核心原理与源码分析

1. CountDownLatch 源码分析(AQS 共享模式)

CountDownLatch 内部类 Sync 继承 AQSstate 表示还需要倒计数的次数。

关键方法

// CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;  // 只有当计数变为 0 时才唤醒等待线程
    }
}

流程时序图(Mermaid)

sequenceDiagram
    participant Main as 主线程
    participant Worker as 工作线程
    participant CDL as CountDownLatch(state=N)
    
    Main->>CDL: await()
    CDL->>CDL: tryAcquireShared => (state==0?) 返回-1
    CDL->>AQS: doAcquireSharedInterruptibly
    AQS->>Main: 线程阻塞,进入同步队列
    
    Worker->>CDL: countDown()
    CDL->>CDL: tryReleaseShared CAS减1
    alt state变为0
        CDL-->>AQS: 返回true,唤醒后继节点
        AQS-->>Main: 唤醒主线程
        Main->>CDL: 再次尝试获取,成功返回
    else state未到0
        CDL-->>Worker: 继续,不唤醒
    end

2. Semaphore 源码分析(公平/非公平)

Semaphore.Sync 继承 AQSstate 表示当前可用许可数。

非公平模式 tryAcquireShared

// NonfairSync
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

公平模式

// FairSync
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())  // 检查队列中是否有等待者
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}

release 逻辑(共享模式通用):

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

acquire / release 流程时序图

sequenceDiagram
    participant Thread as 线程
    participant Sem as Semaphore(state=permits)
    participant AQS as AQS同步队列
    
    Thread->>Sem: acquire()
    Sem->>Sem: tryAcquireShared (非公平)
    alt 剩余许可>=请求数
        Sem->>Sem: CAS减少state
        Sem-->>Thread: 成功,继续执行
    else 许可不足
        Sem->>AQS: doAcquireSharedInterruptibly
        AQS->>Thread: 线程阻塞入队
    end
    
    Note over Thread,Sem: 其他线程释放许可
    Thread->>Sem: release()
    Sem->>Sem: tryReleaseShared CAS增加state
    Sem->>AQS: 唤醒队列中的等待线程
    AQS-->>Thread: 被唤醒的线程尝试重新获取

3. CyclicBarrier 源码分析(ReentrantLock + Condition)

CyclicBarrier 不使用 AQS,而是组合 ReentrantLockCondition。核心字段:

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private int count;  // 剩余未到达线程数
private int parties;
private Runnable barrierCommand;

await 核心逻辑

private int dowait(boolean timed, long nanos) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int index = --count;
        if (index == 0) {  // 最后一个线程到达
            // 执行 barrierAction
            if (barrierCommand != null)
                barrierCommand.run();
            nextGeneration(); // 重置 count,唤醒所有等待线程
            return 0;
        }
        // 循环等待,直到被唤醒、超时或屏障破坏
        for (;;) {
            if (!timed)
                trip.await();
            else if (nanos > 0)
                nanos = trip.awaitNanos(nanos);
            // ...
        }
    } finally {
        lock.unlock();
    }
}

nextGeneration() 实现重置和唤醒:

private void nextGeneration() {
    trip.signalAll();   // 唤醒所有等待线程
    count = parties;    // 重置计数
}

五、实际应用场景与代码举例

1. CountDownLatch:主线程等待多个子任务完成

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int taskCount = 5;
        CountDownLatch latch = new CountDownLatch(taskCount);
        ExecutorService executor = Executors.newFixedThreadPool(taskCount);
        
        for (int i = 1; i <= taskCount; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println("任务 " + taskId + " 完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        System.out.println("主线程等待所有子任务完成...");
        latch.await(); // 阻塞直到计数为0
        System.out.println("所有任务完成,主线程继续执行");
        executor.shutdown();
    }
}

2. CyclicBarrier:多线程分阶段计算(两轮数据聚合)

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    private static final int THREAD_COUNT = 4;
    private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, 
            () -> System.out.println("=== 本轮所有线程已到达屏障,开始下一阶段 ==="));
    
    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    // 第一阶段:计算部分数据
                    System.out.println("线程 " + threadId + " 完成第一阶段计算");
                    barrier.await(); // 等待其他线程
                    
                    // 第二阶段:基于第一阶段结果继续计算
                    System.out.println("线程 " + threadId + " 完成第二阶段计算");
                    barrier.await(); // 再次等待
                    
                    System.out.println("线程 " + threadId + " 所有阶段完成");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

3. Semaphore:限制数据库连接池的并发访问数

import java.util.concurrent.Semaphore;

class LimitedConnectionPool {
    private final Semaphore semaphore;
    private final int maxConnections;
    
    public LimitedConnectionPool(int maxConnections) {
        this.maxConnections = maxConnections;
        this.semaphore = new Semaphore(maxConnections, true); // 公平模式
    }
    
    public void getConnection() throws InterruptedException {
        semaphore.acquire();
        System.out.println(Thread.currentThread().getName() + " 获取连接,剩余许可: " + semaphore.availablePermits());
        // 模拟使用连接
        Thread.sleep(500);
    }
    
    public void releaseConnection() {
        semaphore.release();
        System.out.println(Thread.currentThread().getName() + " 释放连接,剩余许可: " + semaphore.availablePermits());
    }
}

public class SemaphoreExample {
    public static void main(String[] args) {
        LimitedConnectionPool pool = new LimitedConnectionPool(3);
        
        for (int i = 1; i <= 10; i++) {
            final int id = i;
            new Thread(() -> {
                try {
                    pool.getConnection();
                    pool.releaseConnection();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Thread-" + id).start();
        }
    }
}

六、吞吐量与性能考量

工具锁竞争程度上下文切换内存开销性能特点与原因
CyclicBarrier中等(独占锁 + Condition)高(每次屏障点所有线程阻塞再唤醒)较低(一个锁 + 一个条件队列)屏障点会造成所有线程停止,然后全部唤醒,适合批处理场景,但高并发下唤醒风暴可能影响性能
CountDownLatch低(共享模式,CAS)中等(等待线程只被唤醒一次)低(AQS 节点较少)countDown() 仅 CAS 操作,极少阻塞;await() 在计数未到 0 时阻塞,唤醒后不再重新阻塞。性能很好,尤其适合一次性同步
Semaphore公平模式高,非公平低取决于许可争用程度非公平模式下 CAS 快速尝试,减少上下文切换;公平模式下会增加队列检查和唤醒开销。非公平模式性能更高,适合高并发限流

为什么会有这些差异?

  • CyclicBarrier 使用独占锁 ReentrantLock,每次 await 都会加锁,并且所有线程在屏障点同时唤醒会引发 惊群效应(尽管 Condition 的 signalAll 会逐个唤醒,但大量线程同时竞争锁仍会有开销)。
  • CountDownLatchcountDown() 只在计数变为 0 时触发一次 releaseShared,然后唤醒所有等待线程。大部分时间只有 CAS 操作,没有锁竞争。因此性能损耗小。
  • Semaphore 的非公平模式直接 CAS 修改 state,失败后才进入队列,减少了无效唤醒。公平模式需要检查 hasQueuedPredecessors(),增加了内存屏障开销。

七、注意事项与常见陷阱

CountDownLatch

  1. 计数不可重置:一旦计数归零,后续 await() 会立即返回,countDown() 无效。如果需要重用,请使用 CyclicBarrier
  2. 计数初始值必须与 countDown 调用次数一致:若 countDown() 调用次数少于初始值,等待线程将永远阻塞。
  3. 异常处理:子任务出现异常时忘记调用 countDown() 会导致主线程永久阻塞。建议使用 finally 块确保 countDown() 被执行。
  4. 不能增加计数:没有 countUp() 方法。

CyclicBarrier

  1. 屏障破坏(Broken):如果某个线程在等待时被中断或超时,屏障会进入损坏状态(isBroken() == true),其他等待线程会抛出 BrokenBarrierException。此时需要调用 reset() 恢复。
  2. 重置时机reset() 方法会中断当前正在等待的线程(抛出 BrokenBarrierException),小心使用。
  3. 线程数不匹配:如果参与线程数不等于构造参数 parties,屏障永远不会触发。
  4. barrierAction 执行线程:屏障动作由最后一个到达的线程执行,如果该动作抛出异常,屏障也会损坏。

Semaphore

  1. 许可泄漏acquire() 后忘记 release() 会导致许可逐渐耗尽,最终所有线程阻塞。务必使用 try-finally 或 try-with-resources(自定义 AutoCloseable)。
  2. 释放多于获取release() 可以在没有 acquire 的情况下调用,导致许可数超出初始值。这可能是 Bug 或故意设计(例如动态补充许可)。
  3. 公平模式性能:公平模式下,高并发时吞吐量较低,因为排队检查增加了开销。
  4. 中断响应acquire() 响应中断,被中断时会抛出 InterruptedException,不会获取许可。

八、选型指南

决策树

graph TD
    A[需要线程同步] --> B{是否需要屏障动作?}
    B -->|是| C[CyclicBarrier]
    B -->|否| D{是否需要重复使用?}
    D -->|是| E{是否需要动态增减计数?}
    E -->|是| F[Semaphore]
    E -->|否| G[CyclicBarrier<br/>或重置CountDownLatch?]
    D -->|否| H{是等待事件发生一次<br/>还是控制并发数?}
    H -->|等待事件发生一次| I[CountDownLatch]
    H -->|控制同时访问线程数| J[Semaphore]

快速对比表

需求推荐工具理由
等待 N 个任务完成,不重用CountDownLatch简单、轻量、一次性
多个线程分阶段执行,每阶段都同步CyclicBarrier可重用,支持屏障动作
限制同时访问资源的线程数Semaphore许可动态增减,天然限流
模拟高并发,让所有线程同时开始CyclicBarrierCountDownLatch两者都可以,CyclicBarrier 更直观
需要公平地获取许可(避免饥饿)Semaphore(true)公平模式保证先到先得
工作线程完成后通知主线程CountDownLatch主线程 await,工作线程 countDown

九、总结

核心区别一览表

特性CyclicBarrierCountDownLatchSemaphore
可重用
计数器方向递减至0递减至0可增可减
支持释放/增加
AQS 模式无(Lock+Condition)共享共享
屏障动作
典型场景分阶段并行等待完成信号限流/池化
初始化参数线程数计数值许可数
主要方法await()await(), countDown()acquire(), release()

学习建议

  1. 理解 AQS 框架CountDownLatchSemaphore 都是 AQS 共享模式的经典应用,掌握 tryAcquireShared / tryReleaseShared 的语义有助于快速理解其他同步工具(如 ReentrantReadWriteLock)。
  2. 区分“等待点”与“许可”CyclicBarrier 是一个等待点(所有线程必须到达),Semaphore 是一个资源计数器(线程可以随时进出)。
  3. 动手实践:尝试修改示例代码,故意制造异常(如中断、超时),观察屏障损坏或计数未归零导致永久阻塞的现象。
  4. 性能测试:用 JMH 对比不同工具在高并发下的吞吐量,加深对锁竞争和上下文切换开销的理解。

通过以上对比和源码分析,相信你已经能够根据业务需求在三者之间做出正确的选择,并避免常见陷阱。在实际开发中,优先考虑语义最匹配的工具,不要为了“炫技”而使用复杂的同步机制。

本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com