Java 常用集合类 DelayQueue
DelayQueue 延迟队列提供了在指定时间才能获取队列元素的功能,用于放置实现了 Delayed 接口的对象,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll()
方法会返回 null,超时判定是通过 Delayed#getDelay()
方法的返回值小于等于0来判断。延时队列不能存放空元素。
DelayQueue 应用场景:延迟工作, 例如:订单业务(下单超时付款自动取消订单)等灯。
DelayQueue 整体设计
Delayed 接口
DelayQueue 存储的对象要实现该接口,并且需要实现 getDelay(TimeUnit unit)
方法和 compareTo(Delayed o)
方法,getDelay
定义了剩余到期时间,compareTo
方法定义了元素排序规则,会影响元素的获取顺序。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
属性和构造函数
DelayedQuene 的元素存储由优先级队列 PriorityQueue 存放,优先级队列使用的排序方式是队列元素的 compareTo
方法,优先级队列存放顺序是从小到大的,所以队列元素的 compareTo
方法影响了队列的出队顺序。若 compareTo
方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 全局锁,添加和获取元素互斥
private final transient ReentrantLock lock = new ReentrantLock();
// 元素存储
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 阻塞获取时使用,标记某个线程正在等待队列头元素到期
private Thread leader = null;
//
private final Condition available = lock.newCondition();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
// 加入优先级队列
this.addAll(c);
}
}
添加元素
加入元素到延迟队列,此处加锁处理(添加元素和获取元素互斥)。
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
获取元素
获取元素有两种方式:非阻塞和阻塞式。两种都需要加锁,保证 peek
的元素是后面 poll
的元素。
非阻塞获取
通过 poll()
获取元素是非阻塞的,如果没有元素返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// PriorityQueue#peek()
// return (size == 0) ? null : (E) queue[0];
E first = q.peek();
// 判断是否到达到期时间
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// PriorityQueue#poll()
return q.poll();
} finally {
lock.unlock();
}
}
阻塞获取
通过 take()
获取元素是阻塞的,先阻塞获取队列头元素,如果该元素还没有到达到期时间,等待其达到时间后取出。
private final Condition available = lock.newCondition();
private Thread leader = null;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
// 让出线程等待
available.await();
else {
// 元素距离到期时间为 delay
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 满足条件 出队列
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
// 有其他线程在等待元素到期,让出线程等待
available.await();
else {
Thread thisThread = Thread.currentThread();
// leader 设置为当前线程
leader = thisThread;
try {
// 最大等待 delay 时间
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// 通知其他 take 线程处理
available.signal();
lock.unlock();
}
}
为什么要设置 first = null
假设线程A到达,列首元素没有到期,设置 leader = 线程A,这是线程B/C来了因为 leader != null,会 await 阻塞。当线程阻塞完毕了,获取列首元素成功,出列。这个时候列首元素应该会被回收掉,但是引用还被线程B、线程C持有着,所以不会回收,当并发线程很多时,就会造成内存泄漏。
其他方法
// first 元素过期才返回
private E peekExpired() {
E first = q.peek();
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
// 过期元素加入另一个队列,需要锁
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (E e; (e = peekExpired()) != null;) {
c.add(e); // In this order, in case add() throws.
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
// 移除元素,需要锁
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}