Java 常用集合类 DelayQueue

DelayQueue 延迟队列提供了在指定时间才能获取队列元素的功能,用于放置实现了 Delayed 接口的对象,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null,超时判定是通过 Delayed#getDelay() 方法的返回值小于等于0来判断。延时队列不能存放空元素。

DelayQueue 应用场景:延迟工作, 例如:订单业务(下单超时付款自动取消订单)等灯。

DelayQueue 整体设计

Delayed 接口

DelayQueue 存储的对象要实现该接口,并且需要实现 getDelay(TimeUnit unit) 方法和 compareTo(Delayed o) 方法,getDelay 定义了剩余到期时间,compareTo 方法定义了元素排序规则,会影响元素的获取顺序。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

属性和构造函数

DelayedQuene 的元素存储由优先级队列 PriorityQueue 存放,优先级队列使用的排序方式是队列元素的 compareTo 方法,优先级队列存放顺序是从小到大的,所以队列元素的 compareTo 方法影响了队列的出队顺序。若 compareTo 方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    // 全局锁,添加和获取元素互斥
      private final transient ReentrantLock lock = new ReentrantLock();
    // 元素存储
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    // 阻塞获取时使用,标记某个线程正在等待队列头元素到期
      private Thread leader = null;
    // 
    private final Condition available = lock.newCondition();

      public DelayQueue() {}

      public DelayQueue(Collection<? extends E> c) {
        // 加入优先级队列
        this.addAll(c);
    }
}

添加元素

加入元素到延迟队列,此处加锁处理(添加元素和获取元素互斥)。

public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      q.offer(e);
      if (q.peek() == e) {
        leader = null;
        available.signal();
      }
      return true;
    } finally {
      lock.unlock();
    }
}

获取元素

获取元素有两种方式:非阻塞和阻塞式。两种都需要加锁,保证 peek 的元素是后面 poll 的元素。

非阻塞获取

通过 poll() 获取元素是非阻塞的,如果没有元素返回 null。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // PriorityQueue#peek()
        // return (size == 0) ? null : (E) queue[0];
        E first = q.peek();
        // 判断是否到达到期时间
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
              // PriorityQueue#poll()
            return q.poll();
    } finally {
        lock.unlock();
    }
}

阻塞获取

通过 take() 获取元素是阻塞的,先阻塞获取队列头元素,如果该元素还没有到达到期时间,等待其达到时间后取出。

private final Condition available = lock.newCondition();
private Thread leader = null;

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                // 让出线程等待
                available.await();
            else {
                // 元素距离到期时间为 delay
                  long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 满足条件 出队列
                        return q.poll();
                  first = null; // don't retain ref while waiting
                if (leader != null)
                    // 有其他线程在等待元素到期,让出线程等待
                        available.await();
                  else {
                      Thread thisThread = Thread.currentThread();
                    // leader 设置为当前线程
                      leader = thisThread;
                      try {
                        // 最大等待 delay 时间
                            available.awaitNanos(delay);
                        } finally {
                              if (leader == thisThread)
                                  leader = null;
                        }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // 通知其他 take 线程处理
              available.signal();
        lock.unlock();
    }
}
为什么要设置 first = null

假设线程A到达,列首元素没有到期,设置 leader = 线程A,这是线程B/C来了因为 leader != null,会 await 阻塞。当线程阻塞完毕了,获取列首元素成功,出列。这个时候列首元素应该会被回收掉,但是引用还被线程B、线程C持有着,所以不会回收,当并发线程很多时,就会造成内存泄漏。

其他方法

// first 元素过期才返回
private E peekExpired() {
    E first = q.peek();
    return (first == null || first.getDelay(NANOSECONDS) > 0) ?
    null : first;
}

// 过期元素加入另一个队列,需要锁
public int drainTo(Collection<? super E> c) {
    if (c == null)
      throw new NullPointerException();
    if (c == this)
      throw new IllegalArgumentException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = 0;
        for (E e; (e = peekExpired()) != null;) {
            c.add(e);       // In this order, in case add() throws.
              q.poll();
            ++n;
        }
        return n;
    } finally {
          lock.unlock();
    }
}

// 移除元素,需要锁
public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.remove(o);
    } finally {
         lock.unlock();
    }
}

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注