Java阻塞队列详解


一、优先队列

1. 基本介绍

所谓优先队列,即赋予队列中元素访问优先级,优先级越高则出队的优先即也越高。

PriorityQueue 内部实现了一套排序机制,默认根据存入元素的 ASCII 值进行存入,ASCII 值更小的元素哪怕后存也会在队头。如下示例中 bananapear 之后入队,但 bASCII 小于 p 因此优先值更高,在出队时 banana 则会优先于 pear

public void priorityDemo() {
    Queue<String> queue = new PriorityQueue<>();
    queue.offer("apple");
    queue.offer("pear");
    queue.offer("banana");

    while(queue.size() > 0){
        // 输出顺序:apple banana pear
        System.out.println(queue.poll());
    }
}

2. 排序接口

除了使用默认的优先级规则,存入队列中的元素也可同实现 Comparator 接口从而自定义优先级。

通过实现 Comparator 接口并重写 compare() 方法即可自定义排序规则,其中 compare() 返回值取值范围为:(-1,0,1),依次代表 (大于,等于,小于),当调用队列的 poll() 方式时默认取队列中从最小的元素。

public interface Comparator<T> {
    int compare(T o1, T o2);

    boolean equals(Object obj);

    default Comparator<T> reversed() {
        return Collections.reverseOrder(this);
    }
}

3. 示例演示

如下示例中在初始化优先队列时指定了自定义排序器 MyComparator,其根据 User 的年龄而大小确定优先级。

因此对于存入的三个元素,在读取时即会按照年龄的大小升序输出。

public void priorityDemo() {
    PriorityQueue<User> queue1 = new PriorityQueue<>(new MyComparator());
    queue1.offer(new User("Alex", 30));
    queue1.offer(new User("Beth", 25));
    queue1.offer(new User("Jack", 26));
    while (queue1.size() > 0) {
        // User(Beth), User(Jack), User(Alex)
        System.out.println(queue1.poll());
    }
}

class MyComparator implements Comparator<User> {
    @Override
    public int compare(User o1, User o2) {
        int interval = o1.getAge() - o2.getAge();
        return Integer.compare(interval, 0);
    }
}

二、延时队列

Java 同时提供了一种阻塞的延迟队列 DelayQueue ,其队列元素必须实现 Delayed 接口。

1. 延迟接口

Delayed 扩展了 Comparable 接口,因此元素实现时必须同时重写 getDelay()compareTo() 方法。

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}
(1) getDelay()

返回对象还需要延迟的时间,这个时间会以指定的时间单位(例如毫秒、秒等)返回。

(2) compareTo()

比较当前对象与指定对象的顺序,即用于设置队列元素的存入顺序。如果当前对象的延迟时间比指定对象的延迟时间更长,则返回一个正值;如果当前对象的延迟时间比指定对象的延迟时间更短,则返回一个负值;如果两者的延迟时间相等,则返回 0

2. 队列介绍

在上述提到 compareTo() 方法保证了队列元素的有序性,而 getDelay() 则正是实现延时阻塞的关键。

查看延迟队列 DelayQueuetake() 方法源码可以看到线程先取锁并进入死循环,当队列元素无数据则持锁进入阻塞;若队列不为空,则判断当前队头元素的 getDelay() 值是否小于 0,若是则线程阻塞指定时间,阻塞时间值为 getDelay() 的值,又因 delay > 0 所以数据并未出队,因此阻塞结束后下一循环时队头仍为上循环的元素,而经过 delay 时间的睡眠阻塞后其 getDelay() 的值一定小于或等于 0 此时数据将出队。

因此了解了 take() 的阻塞原理,若想要控制数据的阻塞获取,核心即在于定义 getDelay() 的处理逻辑,当其返回的值大于 0 时,线程将根据返回的值阻塞指定时间,从而保证下一次尝试出队时其值一定小于等于 0

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 1. 获取队头
            E first = q.peek();
            if (first == null)
                // 2.1 队头为空,即队列为空将阻塞
                available.await();
            else {
                // 2.2 队列不为空调用元素的 getDelay()
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    // 3. 结果小于 0 则返回该元素
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 4. 阻塞指定时间,值为 getDelay() 返回的大小
                        // 当退出阻塞后进入下一循环,此时队头 getDelay() 值一定小于或等于 0
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

3. 示例演示

下面通过一个具体的示例介绍阻塞队列的效果。

定义队列元素 DelayTask 并实现 Delayed 接口,元素内定义两个属性,data 用于存储真实数据内容,构造函数中入参 offset 用于控制具体阻塞时间。

队列中的元素顺序将根据 compareTo() 定义进行存储,此处即 delayTime 值更小的处于队头。当 take() 读取队列时,若未达到指定的 offset 时间其 getDelay() 返回的值将为正数,在上述 take() 源码中提到的其将会根据这个返回的时间进行阻塞。

public class DelayTask<T> implements Delayed {

    private T data;
    private long delayTime;

    public DelayTask(T t, long offset) {
        this.data = t;
        this.delayTime = System.currentTimeMillis() + offset;
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        long interval = delayTime - System.currentTimeMillis();
        return unit.convert(interval, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        DelayTask task = (DelayTask) o;
        return Ints.saturatedCast(delayTime - task.delayTime);
    }
}

完成上述对象定义之后下面介绍如何使用,初始化 DelayQueue 并存入两个元素,因为 compareTo() 的作用即便 Beth 后执行入队其仍为队头。当执行 take() 方法时队列内部执行 peek() 获取队头调用其 getDelay() 方法,而 Bethoffset 设置为 3s 返回的值将大于 0 因而将阻塞睡眠 3s,睡眠结束后再次循环获取队头的 getDelay() 值此时将小于 0,从而输出队头元素。

因此下述的示例代码输出顺序应如下:程序阻塞 3s 后打印 Beth,再次阻塞两秒后打印 Alex

public void delayDemo() throws InterruptedException {
    BlockingQueue<DelayTask> delayQueue = new DelayQueue<>();
    // 间隔存入数据
    delayQueue.put(new DelayTask<String>("Alex", 5000)));
    delayQueue.put(new DelayTask<String>("Beth", 3000));

    while (!delayQueue.isEmpty()) {
        try {
            // 阻塞等待直到元素到期
            DelayTask task = delayQueue.take(); 
            System.out.println(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

三、BlockingQueue

1. 基本操作

Java 中默认提供了两类阻塞队列,其结构为线程安全,扩展了一系列方法实现阻塞读存。

  • ArrayBlockingQueue:有界集合,队列容量在初始化时指定,一旦定义后续无法变更。
  • LinkedBlockingQueue:无界集合,若初始化时指定容量则效果等同于 `ArrayBlockingQueue``,若不指定容量则为无界集合。

在阻塞队列中除了 offer()、poll() 方法可以指定阻塞时间,同时提供了 put()、take() 方法实现阻塞读存。

方法 作用
offer() 阻塞新增,指定时间未能成功返回 false。
poll() 阻塞获取,指定时间未能成功返回 false。
put() 阻塞新增,若队列已满则会一直处于阻塞待有可用空间时新增。
take() 阻塞查询,若队列为空则会一直处于阻塞状态。

2. 示例演示

BlockingQueue 具体操作示例如下,这里以 ArrayBlockingQueue 为例,LinkedBlockingQueue 仅结构有所差异,具体使用方法类似。

public void blockDemo() throws InterruptedException {
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
    for (int i = 0; i < 6; i++) {
        // Add element
        // If capacity is full then wait, if still full return false
        System.out.println(queue.offer(i, 3, TimeUnit.SECONDS));
    }
    while (queue.size() > 0) {
        System.out.println(queue.poll());
    }
    // Get element
    // If capacity is full then wait, if still full return null
    System.out.println(queue.poll(3, TimeUnit.SECONDS));
}

四、SynchronousQueue

1. 基本操作

SynchronousQueue 是一种特殊的阻塞队列,其通过 put()take() 存入与读取数据。

其特殊之处在于当通过 put() 存入数据时要求必须存在另一线程执行 take() 方法,否则将一直处于阻塞状态,take() 操作同理。

方法 作用
put() 存入数据,必须存在线程执行 take() 否则将阻塞。
take() 读取数据,必须存在线程执行 put() 否则将阻塞。

2. 示例演示

如下示例分别创建了两个读写线程,t1 线程用于执行 put() 写入,t2 线程用于执行 take() 读取。

t1 线程中通过 sleep(5) 模拟休眠五秒,运行示例代码可以看到当启动程序后 t2 线程在打印 start take 将陷入阻碍状态,只有在 t1 中休眠结束后执行 put() 操作 t2 才会退出阻塞读取数据并打印 end take

同理如果将 t1 中的 sleep(5) 移至 t2take() 之前,运行程序后 t1put() 操作的则会进入阻塞等待 t2 的休眠结束执行 take() 才会退出阻塞。

public void synchronousQueueDemo() throws InterruptedException {
    SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();
    Thrad t1 = new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);

            System.out.println("start put.");
            // Block here only when other thread try "take()" then quit.
            synchronousQueue.put(0);
            System.out.println("end put.");
        } catch (InterruptedException e) {
                throw new RuntimeException(e);
        }
    }, "Producer").start();
    Thrad t2 = new Thread(() -> {
        try {
            System.out.println("start take.");
            // Block here only when other thread try "put()" then quit.
            Integer take = synchronousQueue.take();
            System.out.println("end take: " + take);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }, "Consumer").start();

    t1.join();
    t2.join();
}

五、LinkedTransferQueue

1. 基本操作

LinkedTransferQueue 即扩展了 SynchronousQueue,为阻塞方法提供了等待时间,到期若未成功则退出阻塞继续后续执行内容。

方法 作用
transfer(E) 阻塞,类似与 put() 方法。
tryTransfer(E, long, TimeUnit) 非阻塞,指定时间未成功将退出阻塞返回 false。
hasWaitingConsumer() 队列对象当前是否有线程处于 take() 阻塞状态。
getWaitingConsumerCount() 队列对象当前处于 take() 阻塞状态的线程数。

2. 示例演示

LinkedTransferQueue 的测试示例代码如下,与 SynchronousQueue 模拟场景类型这里不再详细介绍。

public void transferQueueDemo() throws InterruptedException {
    LinkedTransferQueue<Integer> transferQueue = new LinkedTransferQueue<>();
    Thrad t1 = new Thread(() -> {
        try {
            System.out.println("start transfer.");
            // Block for e second and return the transfer result
            boolean success = transferQueue.tryTransfer(0, 2, TimeUnit.SECONDS);
            System.out.println("end transfer, success? " + success);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            latch.countDown();
        }
    }, "Producer").start();
    Thrad t2 = new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);

            System.out.println("start take.");
            // Block here only when other thread try "put()" then quit.
            Integer take = transferQueue.take();
            System.out.println("end take: " + take);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }, "Consumer").start();
    
    t1.join();
    t2.join();
}

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录