Skip to main content

ArrayBlockingQueue 阻塞队列

作草分茶About 5 min

ArrayBlockingQueue 阻塞队列

阻塞队列介绍

juc 指的是 jdk 下的 java.util.concurrent包,在这个包下提供了很多并发相关的工具类。本文将解析这个包下面的一些阻塞队列。

阻塞队列其实是实现了java.util.concurrent.BlockingQueue接口的一些实现类,要搞明白阻塞队列,那么就要清楚这个接口到底是做什么的。

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit)
            throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}

总共有11个方法,根据注释可以了解到这些方法的意图。

  1. add(E e)。在容量允许的情况下,添加元素到队列中,添加成功则返回 true,否则抛出异常。如果队列是有界的,那么推荐使用 offer 方法。

  2. offer(E e)。添加元素到队列中,成功返 true,失败返回 false。

  3. put(E e)。添加指定元素到队列中,该方法会一直等待直到队列中有可用空间为止。

  4. offer(E e, long timeout, TimeUnit unit)。添加指定元素到队列中,该方法会等待一定时间,如果这段时间内添加成功,则返回true,否则返回false。

  5. take()。从队列头部获取元素并移除,如果队列中没有数据则会一直阻塞,直到有数据为止。

  6. poll(long timeout, TimeUnit unit)。从队列头部获取数据并移除,该方法会阻塞一定的时间,如果在这段时间内没有获取到数据,那么抛出 InterruptedException 异常。

  7. remainingCapacity()。返回队列剩余的可用空间。

  8. remove(Object o)。从队列中移除一个指定的元素,如果存在则返回 true,不存在则返回 false。

  9. contains(Object o)。如果队列中包含指定的元素,那么返回 true,否则返回 false。

  10. drainTo(Collection<? super E> c)。转移队列中的所有元素到另一个集合当中。

  11. drainTo(Collection<? super E> c, int maxElements)。转移队列中的指定最大量元素到另一个集合当中,返回实际移除的元素数量。

ArrayBlockingQueue 源码解析

成员变量和构造函数

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** 队列中存放数据的数组 */
    final Object[] items;
    /** 下一次 take, poll, peek 或者 remove 的位置 */
    int takeIndex;
    /** 下一次 put, offer, 或者 add 的位置 */
    int putIndex;
    /** 队列中元素的数量 */
    int count;
    /** 全局锁 */
    final ReentrantLock lock;
    /** 等待获取元素的条件,容器不为空则可以获取 */
    private final Condition notEmpty;
    /** 等待添加元素的条件,容器没满则可以添加 */
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        // 全局锁是一个非公平锁
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
}

新增元素

新增有 add、offer、put等方法,最终都是调用 enqueue 方法入队。add 方法本质上是调用了 offer 方法,在 offer 返回 false 时抛出异常。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private void enqueue(E x) {
// 添加元素
        final Object[] items = this.items;
        items[putIndex] = x;
        // 如果下一次添加元素的位置大小等于容器大小,那么将 putIndex 置为 0
        if (++putIndex == items.length)
            putIndex = 0;
        // 元素个数加 1
        count++;
        // 此时队列肯定不为空,唤醒获取元素的线程
        notEmpty.signal();
    }

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 加锁,这里说明如果抢不到锁会一直阻塞
        lock.lock();
        try {
            // 如果队列满了,则返回 false
            if (count == items.length)
                return false;
            else {
                // 否则插入队列
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 获取到一个可中断锁
        lock.lockInterruptibly();
        try {
            // 如果队列是满的情况下,那么等待一定时间。这是一个自旋的过程。
            while (count == items.length) {
                // 等待时间消耗完毕,返回false
                if (nanos <= 0)
                    return false;
                // 想要添加元素的,必须满足队列未满,此时等待指定时间
                nanos = notFull.awaitNanos(nanos);
            }
            // 元素入队
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
        /* 
          如果队列已经满了,那么就一直阻塞,直到队列不是满的。
          因为可能多个线程都在这里阻塞,那么当 notFull 成立的时候,所有线程同一时间被唤醒,此时需要自旋才能保证线程安全
        */
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
}

获取元素、移除元素

获取元素有 take、poll、remove方法。元素出队的方法是 dequeue。

private E dequeue(){
// 获取元素
final Object[]items=this.items;
@SuppressWarnings("unchecked")
    E x=(E)items[takeIndex];
            // 移除元素,将该位置的元素置为 null,帮助垃圾回收
            items[takeIndex]=null;
            // 如果下一次获取数据的位置大小等于容器大小,那么下一次获取数据的位置是 0
            if(++takeIndex==items.length)
            takeIndex=0;
            // 实际元素个数 -1
            count--;
            if(itrs!=null)
            itrs.elementDequeued();
            // 移除完一个元素,那么队列没有满,通知等待添加的线程去添加元素
            notFull.signal();
            return x;
            }

public E take()throws InterruptedException{
final ReentrantLock lock=this.lock;
        lock.lockInterruptibly();
        try{
        // 如果队列为空,那么一直阻塞,直到队列不为空为止,所以此方法一定会返回一个非空元素
        while(count==0)
        notEmpty.await();
        return dequeue();
        }finally{
        lock.unlock();
        }
        }

public E poll(){
final ReentrantLock lock=this.lock;
        lock.lock();
        try{
        // 如果队列为空,那么返回 null,否则去队列中获取元素
        return(count==0)?null:dequeue();
        }finally{
        lock.unlock();
        }
        }
public E poll(long timeout,TimeUnit unit)throws InterruptedException{
        long nanos=unit.toNanos(timeout);
final ReentrantLock lock=this.lock;
        lock.lockInterruptibly();
        try{
        // 自旋直到容器中有数据为止
        while(count==0){
        // 等待时间用完则返回 null
        if(nanos<=0)
        return null;
        nanos=notEmpty.awaitNanos(nanos);
        }
        // 获取数据
        return dequeue();
        }finally{
        lock.unlock();
        }
        }
public E peek(){
final ReentrantLock lock=this.lock;
        lock.lock();
        try{
        // 从数组中获取元素,该操作不会移除元素
        return itemAt(takeIndex); // null when queue is empty
        }finally{
        lock.unlock();
        }
        }

public boolean remove(Object o){
        if(o==null)return false;
final Object[]items=this.items;
final ReentrantLock lock=this.lock;
        lock.lock();
        try{
        // 如果容器中有元素,则处理,否则直接返回 false
        if(count>0){
final int putIndex=this.putIndex;
        int i=takeIndex;
        // 遍历元素,当匹配到一个满足条件的元素时就返回 true
        do{
        if(o.equals(items[i])){
        removeAt(i);
        return true;
        }
        if(++i==items.length)
        i=0;
        }while(i!=putIndex);
        }
        return false;
        }finally{
        lock.unlock();
        }
        }