In this article, you will learn how PriorityBlockingQueue
works and what characteristics it has. An example will show you how to use it.
Here we are in the class hierarchy:
PriorityBlockingQueue Characteristics
The java.util.concurrent.PriorityBlockingQueue
is a thread-safe and blocking variant of the PriorityQueue. In the linked article, you will also learn what a priority queue is.
As with PriorityQueue
, the elements are stored in an array representing a min-heap. The iterator iterates through the elements in the corresponding order.
A single ReentrantLock
ensures thread safety.
PriorityBlockingQueue
is not bounded, so it has no capacity limit. That means that
and put(e)
offer(e, time, unit)
never block. Only the dequeue operations take()
and poll(time, unit)
block when the queue is empty.
The characteristics in detail:
Underlying data structure | Thread-safe? | Blocking/ non-blocking | Fairness policy | Bounded/ unbounded | Iterator type |
---|---|---|---|---|---|
Min-heap (stored in an array) | Yes (pessimistic locking with a lock) | Blocking (only dequeue) | Not available | Unbounded | Weakly consistent¹ |
¹ Weakly consistent: All elements that exist when the iterator is created are traversed by the iterator exactly once. Changes that occur after this can, but do not need to, be reflected by the iterator.
Recommended Use Case
PriorityBlockingQueue
is not used in the JDK, and therefore we cannot exclude the possibility that it contains bugs. If you need a queue with appropriate characteristics and use PriorityBlockingQueue
, make sure you test your application intensively.
PriorityBlockingQueue Example
The following example shows how to create a PriorityBlockingQueue
and how multiple threads read and write to it (→ code on GitHub).
Reading threads run every 3 seconds, starting immediately after the queue is created.
Writing threads start after 3.5 seconds (so that two reading threads are already waiting) and write a random value to the queue every second.
public class PriorityBlockingQueueExample {
private static final long startTime = System.currentTimeMillis();
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// Start reading from the queue immediately, every 3 seconds
for (int i = 0; i < 8; i++) {
int delaySeconds = i * 3;
pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS);
}
// Start writing to the queue after 3.5 seconds (so there are already 2 threads
// waiting), every 1 seconds (so that the queue fills faster than it's emptied,
// so that we see some more elements and their order in the queue)
for (int i = 0; i < 8; i++) {
int delayMillis = 3500 + i * 1000;
pool.schedule(() -> enqueue(queue), delayMillis, TimeUnit.MILLISECONDS);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
}
private static void enqueue(BlockingQueue<Integer> queue) {
int element = ThreadLocalRandom.current().nextInt(10, 100);
log("Calling queue.put(%d) (queue = %s)...", element, queue);
try {
queue.put(element);
log("queue.put(%d) returned (queue = %s)", element, queue);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private static void dequeue(BlockingQueue<Integer> queue) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer element = queue.take();
log(" queue.take() returned %d (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void log(String format, Object... args) {
System.out.printf(
Locale.US,
"[%4.1fs] [%-16s] %s%n",
(System.currentTimeMillis() - startTime) / 1000.0,
Thread.currentThread().getName(),
String.format(format, args));
}
}
Code language: Java (java)
Below you can see an example output of the program:
[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])...
[ 3.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [])...
[ 3.5s] [pool-1-thread-6 ] Calling queue.put(87) (queue = [])...
[ 3.5s] [pool-1-thread-6 ] queue.put(87) returned (queue = [])
[ 3.5s] [pool-1-thread-1 ] queue.take() returned 87 (queue = [])
[ 4.5s] [pool-1-thread-9 ] Calling queue.put(89) (queue = [])...
[ 4.5s] [pool-1-thread-9 ] queue.put(89) returned (queue = [])
[ 4.5s] [pool-1-thread-2 ] queue.take() returned 89 (queue = [])
[ 5.5s] [pool-1-thread-7 ] Calling queue.put(31) (queue = [])...
[ 5.5s] [pool-1-thread-7 ] queue.put(31) returned (queue = [31])
[ 6.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [31])...
[ 6.0s] [pool-1-thread-4 ] queue.take() returned 31 (queue = [])
[ 6.5s] [pool-1-thread-5 ] Calling queue.put(71) (queue = [])...
[ 6.5s] [pool-1-thread-5 ] queue.put(71) returned (queue = [71])
[ 7.5s] [pool-1-thread-8 ] Calling queue.put(15) (queue = [71])...
[ 7.5s] [pool-1-thread-8 ] queue.put(15) returned (queue = [15, 71])
[ 8.5s] [pool-1-thread-10] Calling queue.put(33) (queue = [15, 71])...
[ 8.5s] [pool-1-thread-10] queue.put(33) returned (queue = [15, 71, 33])
[ 9.0s] [pool-1-thread-3 ] Calling queue.take() (queue = [15, 71, 33])...
[ 9.0s] [pool-1-thread-3 ] queue.take() returned 15 (queue = [33, 71])
[ 9.5s] [pool-1-thread-6 ] Calling queue.put(58) (queue = [33, 71])...
[ 9.5s] [pool-1-thread-6 ] queue.put(58) returned (queue = [33, 71, 58])
[10.5s] [pool-1-thread-1 ] Calling queue.put(19) (queue = [33, 71, 58])...
[10.5s] [pool-1-thread-1 ] queue.put(19) returned (queue = [19, 33, 58, 71])
[12.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [19, 33, 58, 71])...
[12.0s] [pool-1-thread-9 ] queue.take() returned 19 (queue = [33, 71, 58])
[15.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [33, 71, 58])...
[15.0s] [pool-1-thread-2 ] queue.take() returned 33 (queue = [58, 71])
[18.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [58, 71])...
[18.0s] [pool-1-thread-7 ] queue.take() returned 58 (queue = [71])
[21.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [71])...
[21.0s] [pool-1-thread-4 ] queue.take() returned 71 (queue = [])
Code language: plaintext (plaintext)
What can we see in this sample output?
First of all, you see how after 0.0 s and 3.0 s, threads 1 and 2 block when calling take()
because the queue is empty.
After 3.5 s, thread 6 writes the 87 into the queue. Immediately afterward, the previously blocked thread 1 wakes up again and takes the 87.
After 4.5 s, thread 9 writes the 89 into the queue, which is immediately taken out again by thread 2.
After 5.5 s, the 31 is written into the queue, which is taken out again after 6.0 s.
After 6.5 s, 7.5 s, and 8.5 s, the 71, the 15, and the 33 are written into the queue. You can see how the smallest element is always at the head (left) of the queue.
After 9.0 s, the smallest element, the 15, is removed. The next smallest element, 33, is then placed at the head of the queue.
After 9.5 s and 10.5 s, two more elements, 58 and 19, are written to the queue. Again, you can see how the smallest element is at the queue's head.
The queue now contains four elements. No other elements are written to the queue, and the existing elements are taken according to their priority.
Summary and Outlook
In this article, you learned about the characteristics of the PriorityBlockingQueue
and how to use it.
Starting with the next part of the tutorial series, I will introduce you to some queue implementations for special cases, beginning with the DelayQueue.
If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Then click here to sign up for the HappyCoders.eu newsletter.