无处生还
76.43M · 2026-04-23
在上一篇中我们详细介绍了AQS的底层代码实现逻辑,这里我们来介绍基于AQS实现的类,在 Java 并发编程中,java.util.concurrent(JUC)包为我们提供了大量高效、可靠的并发工具类。这些工具类的底层实现大多依赖于一个关键的抽象框架——AbstractQueuedSynchronizer(AQS)。本文将深入介绍 AQS 的基本原理,并详细讲解 JDK 中几个基于 AQS 实现的核心同步器类,包括 ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 和 Condition,并附上实用的代码示例。
AbstractQueuedSynchronizer(AQS)是 Doug Lea 设计的一个用于构建锁和同步器的基础框架。它通过一个 int 类型的 state 表示同步状态,并维护一个 FIFO 的等待队列(CLH 队列变体),用于管理那些未能获取资源而被阻塞的线程。
AQS 的核心思想是:
tryAcquire / tryRelease(独占模式)或 tryAcquireShared / tryReleaseShared(共享模式)等方法来定义如何获取/释放资源。AQS 支持两种资源共享方式:
特点:
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private static final ReentrantLock lock = new ReentrantLock();
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Final count: " + count); // 输出 2000
}
}
特点:
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreExample {
private static final Semaphore semaphore = new Semaphore(3); // 最多3个并发
public static void main(String[] args) {
for (int i = 0; i < 8; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 获取数据库连接");
TimeUnit.SECONDS.sleep(2); // 模拟操作
System.out.println(Thread.currentThread().getName() + " 释放数据库连接");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
}, "Thread-" + i).start();
}
}
}
特点:
countDown() 减 1,await() 阻塞直到计数为 0import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
// 模拟任务
Thread.sleep((long) (Math.random() * 2000));
System.out.println(Thread.currentThread().getName() + " 完成任务");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数减1
}
}).start();
}
latch.await(); // 主线程等待所有任务完成
System.out.println("所有任务已完成,继续执行后续逻辑");
}
}
特点:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockCache {
private final Map<String, String> cache = new HashMap<>();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public String get(String key) {
rwLock.readLock().lock();
try {
return cache.get(key);
} finally {
rwLock.readLock().unlock();
}
}
public void put(String key, String value) {
rwLock.writeLock().lock();
try {
cache.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
public static void main(String[] args) {
ReadWriteLockCache cache = new ReadWriteLockCache();
cache.put("name", "Alice");
// 多个读线程
for (int i = 0; i < 5; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 读取: " + cache.get("name"));
}).start();
}
}
}
特点:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerWithCondition {
private final Queue<Integer> buffer = new LinkedList<>();
private final int capacity = 2;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (buffer.size() == capacity) {
System.out.println("Buffer full, producer waiting...");
notFull.await();
}
buffer.offer(item);
System.out.println("Produced: " + item);
notEmpty.signal(); // 唤醒消费者
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (buffer.isEmpty()) {
System.out.println("Buffer empty, consumer waiting...");
notEmpty.await();
}
int item = buffer.poll();
System.out.println("Consumed: " + item);
notFull.signal(); // 唤醒生产者
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumerWithCondition pc = new ProducerConsumerWithCondition();
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
pc.produce(i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
pc.consume();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
| 同步器 | AQS 模式 | 典型用途 |
|---|---|---|
ReentrantLock | 独占 | 替代 synchronized,支持超时、中断、公平性 |
Semaphore | 共享 | 控制并发线程数,资源池限流 |
CountDownLatch | 共享 | 等待 N 个任务完成后再继续 |
ReentrantReadWriteLock | 读(共享)/写(独占) | 读多写少的并发访问控制 |
Condition | 与 Lock 配合 | 精细线程通信(替代 wait/notify) |
这些类都封装了 AQS 的复杂性,让开发者可以专注于业务逻辑,而无需手动处理线程调度和状态管理。理解 AQS 的工作原理,有助于我们更高效、安全地使用这些并发工具。
希望本文能帮助你深入理解 AQS 及其在 JDK 中的应用。如果你有任何问题或想了解更底层的源码机制,欢迎留言讨论!