一、优先队列
1. 基本介绍
所谓优先队列,即赋予队列中元素访问优先级,优先级越高则出队的优先即也越高。
PriorityQueue
内部实现了一套排序机制,默认根据存入元素的 ASCII
值进行存入,ASCII
值更小的元素哪怕后存也会在队头。如下示例中 banana
在 pear
之后入队,但 b
的 ASCII
小于 p
因此优先值更高,在出队时 banana
则会优先于 pear
。
public void priorityDemo() {
Queue<String> queue = new PriorityQueue<>();
queue.offer("apple");
queue.offer("pear");
queue.offer("banana");
while(queue.size() > 0){
// 输出顺序:apple banana pear
System.out.println(queue.poll());
}
}
2. 排序接口
除了使用默认的优先级规则,存入队列中的元素也可同实现 Comparator
接口从而自定义优先级。
通过实现 Comparator
接口并重写 compare()
方法即可自定义排序规则,其中 compare()
返回值取值范围为:(-1,0,1)
,依次代表 (大于,等于,小于)
,当调用队列的 poll()
方式时默认取队列中从最小的元素。
public interface Comparator<T> {
int compare(T o1, T o2);
boolean equals(Object obj);
default Comparator<T> reversed() {
return Collections.reverseOrder(this);
}
}
3. 示例演示
如下示例中在初始化优先队列时指定了自定义排序器 MyComparator
,其根据 User
的年龄而大小确定优先级。
因此对于存入的三个元素,在读取时即会按照年龄的大小升序输出。
public void priorityDemo() {
PriorityQueue<User> queue1 = new PriorityQueue<>(new MyComparator());
queue1.offer(new User("Alex", 30));
queue1.offer(new User("Beth", 25));
queue1.offer(new User("Jack", 26));
while (queue1.size() > 0) {
// User(Beth), User(Jack), User(Alex)
System.out.println(queue1.poll());
}
}
class MyComparator implements Comparator<User> {
@Override
public int compare(User o1, User o2) {
int interval = o1.getAge() - o2.getAge();
return Integer.compare(interval, 0);
}
}
二、延时队列
Java
同时提供了一种阻塞的延迟队列 DelayQueue
,其队列元素必须实现 Delayed
接口。
1. 延迟接口
Delayed
扩展了 Comparable
接口,因此元素实现时必须同时重写 getDelay()
与 compareTo()
方法。
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
(1) getDelay()
返回对象还需要延迟的时间,这个时间会以指定的时间单位(例如毫秒、秒等)返回。
(2) compareTo()
比较当前对象与指定对象的顺序,即用于设置队列元素的存入顺序。如果当前对象的延迟时间比指定对象的延迟时间更长,则返回一个正值;如果当前对象的延迟时间比指定对象的延迟时间更短,则返回一个负值;如果两者的延迟时间相等,则返回 0
。
2. 队列介绍
在上述提到 compareTo()
方法保证了队列元素的有序性,而 getDelay()
则正是实现延时阻塞的关键。
查看延迟队列 DelayQueue
的 take()
方法源码可以看到线程先取锁并进入死循环,当队列元素无数据则持锁进入阻塞;若队列不为空,则判断当前队头元素的 getDelay()
值是否小于 0
,若是则线程阻塞指定时间,阻塞时间值为 getDelay()
的值,又因 delay > 0
所以数据并未出队,因此阻塞结束后下一循环时队头仍为上循环的元素,而经过 delay
时间的睡眠阻塞后其 getDelay()
的值一定小于或等于 0
此时数据将出队。
因此了解了 take()
的阻塞原理,若想要控制数据的阻塞获取,核心即在于定义 getDelay()
的处理逻辑,当其返回的值大于 0
时,线程将根据返回的值阻塞指定时间,从而保证下一次尝试出队时其值一定小于等于 0
。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 1. 获取队头
E first = q.peek();
if (first == null)
// 2.1 队头为空,即队列为空将阻塞
available.await();
else {
// 2.2 队列不为空调用元素的 getDelay()
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
// 3. 结果小于 0 则返回该元素
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 4. 阻塞指定时间,值为 getDelay() 返回的大小
// 当退出阻塞后进入下一循环,此时队头 getDelay() 值一定小于或等于 0
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
3. 示例演示
下面通过一个具体的示例介绍阻塞队列的效果。
定义队列元素 DelayTask
并实现 Delayed
接口,元素内定义两个属性,data
用于存储真实数据内容,构造函数中入参 offset
用于控制具体阻塞时间。
队列中的元素顺序将根据 compareTo()
定义进行存储,此处即 delayTime
值更小的处于队头。当 take()
读取队列时,若未达到指定的 offset
时间其 getDelay()
返回的值将为正数,在上述 take()
源码中提到的其将会根据这个返回的时间进行阻塞。
public class DelayTask<T> implements Delayed {
private T data;
private long delayTime;
public DelayTask(T t, long offset) {
this.data = t;
this.delayTime = System.currentTimeMillis() + offset;
}
@Override
public long getDelay(TimeUnit unit) {
long interval = delayTime - System.currentTimeMillis();
return unit.convert(interval, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
DelayTask task = (DelayTask) o;
return Ints.saturatedCast(delayTime - task.delayTime);
}
}
完成上述对象定义之后下面介绍如何使用,初始化 DelayQueue
并存入两个元素,因为 compareTo()
的作用即便 Beth
后执行入队其仍为队头。当执行 take()
方法时队列内部执行 peek()
获取队头调用其 getDelay()
方法,而 Beth
的 offset
设置为 3s
返回的值将大于 0
因而将阻塞睡眠 3s
,睡眠结束后再次循环获取队头的 getDelay()
值此时将小于 0
,从而输出队头元素。
因此下述的示例代码输出顺序应如下:程序阻塞 3s
后打印 Beth
,再次阻塞两秒后打印 Alex
。
public void delayDemo() throws InterruptedException {
BlockingQueue<DelayTask> delayQueue = new DelayQueue<>();
// 间隔存入数据
delayQueue.put(new DelayTask<String>("Alex", 5000)));
delayQueue.put(new DelayTask<String>("Beth", 3000));
while (!delayQueue.isEmpty()) {
try {
// 阻塞等待直到元素到期
DelayTask task = delayQueue.take();
System.out.println(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
三、BlockingQueue
1. 基本操作
Java
中默认提供了两类阻塞队列,其结构为线程安全,扩展了一系列方法实现阻塞读存。
ArrayBlockingQueue
:有界集合,队列容量在初始化时指定,一旦定义后续无法变更。LinkedBlockingQueue
:无界集合,若初始化时指定容量则效果等同于 `ArrayBlockingQueue``,若不指定容量则为无界集合。
在阻塞队列中除了 offer()、poll()
方法可以指定阻塞时间,同时提供了 put()、take()
方法实现阻塞读存。
方法 | 作用 |
---|---|
offer() | 阻塞新增,指定时间未能成功返回 false。 |
poll() | 阻塞获取,指定时间未能成功返回 false。 |
put() | 阻塞新增,若队列已满则会一直处于阻塞待有可用空间时新增。 |
take() | 阻塞查询,若队列为空则会一直处于阻塞状态。 |
2. 示例演示
BlockingQueue
具体操作示例如下,这里以 ArrayBlockingQueue
为例,LinkedBlockingQueue
仅结构有所差异,具体使用方法类似。
public void blockDemo() throws InterruptedException {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
for (int i = 0; i < 6; i++) {
// Add element
// If capacity is full then wait, if still full return false
System.out.println(queue.offer(i, 3, TimeUnit.SECONDS));
}
while (queue.size() > 0) {
System.out.println(queue.poll());
}
// Get element
// If capacity is full then wait, if still full return null
System.out.println(queue.poll(3, TimeUnit.SECONDS));
}
四、SynchronousQueue
1. 基本操作
SynchronousQueue
是一种特殊的阻塞队列,其通过 put()
与 take()
存入与读取数据。
其特殊之处在于当通过 put()
存入数据时要求必须存在另一线程执行 take()
方法,否则将一直处于阻塞状态,take()
操作同理。
方法 | 作用 |
---|---|
put() | 存入数据,必须存在线程执行 take() 否则将阻塞。 |
take() | 读取数据,必须存在线程执行 put() 否则将阻塞。 |
2. 示例演示
如下示例分别创建了两个读写线程,t1
线程用于执行 put()
写入,t2
线程用于执行 take()
读取。
在 t1
线程中通过 sleep(5)
模拟休眠五秒,运行示例代码可以看到当启动程序后 t2
线程在打印 start take
将陷入阻碍状态,只有在 t1
中休眠结束后执行 put()
操作 t2
才会退出阻塞读取数据并打印 end take
同理如果将 t1
中的 sleep(5)
移至 t2
的 take()
之前,运行程序后 t1
的 put()
操作的则会进入阻塞等待 t2
的休眠结束执行 take()
才会退出阻塞。
public void synchronousQueueDemo() throws InterruptedException {
SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();
Thrad t1 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("start put.");
// Block here only when other thread try "take()" then quit.
synchronousQueue.put(0);
System.out.println("end put.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "Producer").start();
Thrad t2 = new Thread(() -> {
try {
System.out.println("start take.");
// Block here only when other thread try "put()" then quit.
Integer take = synchronousQueue.take();
System.out.println("end take: " + take);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "Consumer").start();
t1.join();
t2.join();
}
五、LinkedTransferQueue
1. 基本操作
LinkedTransferQueue
即扩展了 SynchronousQueue
,为阻塞方法提供了等待时间,到期若未成功则退出阻塞继续后续执行内容。
方法 | 作用 |
---|---|
transfer(E) | 阻塞,类似与 put() 方法。 |
tryTransfer(E, long, TimeUnit) | 非阻塞,指定时间未成功将退出阻塞返回 false。 |
hasWaitingConsumer() | 队列对象当前是否有线程处于 take() 阻塞状态。 |
getWaitingConsumerCount() | 队列对象当前处于 take() 阻塞状态的线程数。 |
2. 示例演示
LinkedTransferQueue
的测试示例代码如下,与 SynchronousQueue
模拟场景类型这里不再详细介绍。
public void transferQueueDemo() throws InterruptedException {
LinkedTransferQueue<Integer> transferQueue = new LinkedTransferQueue<>();
Thrad t1 = new Thread(() -> {
try {
System.out.println("start transfer.");
// Block for e second and return the transfer result
boolean success = transferQueue.tryTransfer(0, 2, TimeUnit.SECONDS);
System.out.println("end transfer, success? " + success);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}, "Producer").start();
Thrad t2 = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("start take.");
// Block here only when other thread try "put()" then quit.
Integer take = transferQueue.take();
System.out.println("end take: " + take);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "Consumer").start();
t1.join();
t2.join();
}