黑暗时代:背水一战免安装绿色中文版
16.3G · 2025-11-01
DelayQueue是一个无界阻塞队列,它要求存入的元素都必须实现 Delayed接口。其核心特性是:只有在元素的延迟时间到期时,消费者线程才能从队列中取出它。
DelayQueue的类定义和继承关系如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
// ...
}
类层级关系图如下:
从类定义可以看出,DelayQueue是一个标准的阻塞队列BlockingQueue,也是集合框架中的重要成员。关于其基本用法和最佳实践,可以参考之前我们总结的这篇详尽的文章:⏰ 一招鲜吃遍天!详解Java延时队列DelayQueue,从此延时任务不再难!
而本文,我们将聚焦于源码层级的深度剖析。
DelayQueue的精华,都浓缩在几个关键的成员变量中:
// 可重入锁:保证所有操作的线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,实际存储元素的地方 优先级队列会根据元素实现Delayed接口时指定排序规则进行排序,保证了队首是最先到期的元素
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 等待消费队首元素的线程 采用Leader-Follower模式实现最小化不必要的线程等待
private Thread leader = null;
// 用于当队首有新元素可用或新线程可能需要成为leader时通知等待唤醒机制
private final Condition available = lock.newCondition();
作为无界队列,其构造方法非常简洁:
// 无参构造器
public DelayQueue() {}
// 指定集合的构造器
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
DelayQueue入队方法有:add(E), offer(E), put(E),因为DelayQueue是无界队列,添加元素不会阻塞,所以入队方法最终都是通过调用offer(E)实现的,下面我们就来看看次方法:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 将元素添加到优先级队列中
q.offer(e);
// 如果新元素成为了队首(即它是当前最快到期的)
if (q.peek() == e) {
// 重置leader线程
leader = null;
// 唤醒等待消费的线程
available.signal();
}
// 无界队列,总是添加元素成功返回true
return true;
} finally {
lock.unlock();
}
}
可以看出,入队方法还是简洁明了的,加锁保证线程安全,无界队列总是入队成功。还有一个关键逻辑在于,当有新元素“加塞”成为队首时,会立即唤醒等待的线程。这确保了等待线程能及时响应最新的到期时间,提高了响应速度
出队是DelayQueue最复杂也最精彩的部分,提供了三种策略,对应着三个出队方法:poll(), take(), poll(timeout, unit)
poll(): 非阻塞的,立即返回
public E poll() {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 获取队首元素
E first = q.peek();
// 如果队首元素为空,或者未过期,直接返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 返回并删除队首元素
return q.poll();
} finally {
lock.unlock();
}
}
take(): 阻塞,直到队列有元素且过期了才返回
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取可中断锁
lock.lockInterruptibly();
try {
// 无线循环
for (;;) {
// 获取队首元素
E first = q.peek();
// 队首元素为空,等待
if (first == null)
available.await();
else {
// 队首元素不为空
long delay = first.getDelay(NANOSECONDS);
// 延时时间已过期,直接返回并删除队首元素
if (delay <= 0)
return q.poll();
// 等待期间不保留引用,有助于内存GC回收
first = null; // don't retain ref while waiting
if (leader != null)
// 如果已经有线程在专候此元素(leader),当前线程作为follower无限期等待
available.await();
else {
// 将leader设置为当前线程,消费队首元素
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等到延迟时间过期,循环执行上面代码队首元素就被取出消费了
available.awaitNanos(delay);
} finally {
// 将leader置为空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果队列非空且没有leader,唤醒其他线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
poll(timeout, unit): 有限时等待,在等待时间之内没有满足的元素,那么返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 获取可中断锁
lock.lockInterruptibly();
try {
// 无线循环
for (;;) {
// 获取队首元素
E first = q.peek();
// 队首元素为空
if (first == null) {
if (nanos <= 0)
return null;
else
// 等到指定时间
nanos = available.awaitNanos(nanos);
} else {
// 队首元素不为空
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 延迟时间已过期,直接返回并删除队首元素
return q.poll();
if (nanos <= 0)
// 延迟时间未到,但是等待时间到了,返回null
return null;
// 等待期间不保留引用,有助于内存回收
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
// 等待时间小于延迟时间,或者已经有其他线程准备消费队首元素了,这两种情况都不会消费到队首元素
// 那么这里等待完等待时间,返回null
nanos = available.awaitNanos(nanos);
else {
// 指定当前线程消费队首元素
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
// 更新剩余等待时间,正常情况timeLeft=0,然后delay<=nanos的,所以最终nanos<=0
nanos -= delay - timeLeft;
} finally {
// 将leader线程置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果队列非空且没有leader,唤醒其他线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
逻辑与take()类似,但增加了超时控制,代码较长,其核心区别在于每次等待都会检查超时时间nanos是否耗尽。理解了take(),此方法便不难掌握。
先来看看去掉Leader-Follower模式会怎样?
假如在时刻0s时,元素X入队,5s后过期,有三个线程A,B,C消费队列,A,B,C都会等待5秒。在时刻2s时,元素Y入队,2s后过期,但这时候由于A,B,C都在等待,无法消费元素Y,如下图所示:
3个线程竞争,但只有1个能拿到元素,其他2个白唤醒!
惊群效应:一个事件(元素到期)唤醒了所有等待的线程,但只有一个能真正处理该事件。
下面看看Leader-Follower 模式的设计实现:
该模式下只有一个leader线程定时等待,到期时不会唤醒所有线程,解决惊群效应问题,新元素入队时可以立即唤醒follower重新评估,精确了等待时间。
DelayQueue 是一个设计精巧的并发组件,它的核心价值在于:
通过深入理解其源码,我们不仅能更好地使用这个工具,还能学习到优秀的并发程序设计模式和实现思想。
本文涉及的源码解析与注释已上传至Gitee仓库,欢迎Star & Fork一起学习:gitee.com/shepherdzfj…
欢迎在评论区留言交流! 你曾在哪些场景下使用过DelayQueue?对它又有哪些独到的见解呢?
16.3G · 2025-11-01
16.1G · 2025-11-01
205M · 2025-11-01