ArrayBlockingQueue - Feature ImageArrayBlockingQueue - Feature Image
HappyCoders Glasses

Java ArrayBlockingQueue
(with Example)

Sven Woltmann
Sven Woltmann
April 26, 2022

This article is about the ArrayBlockingQueue and its properties. You will see how the ArrayBlockingQueue is used with an example. I will also give you a recommendation in which cases you should use this queue.

Here we are in the class hierarchy:

ArrayBlockingQueue in the class hierarchy
ArrayBlockingQueue in the class hierarchy

ArrayBlockingQueue Characteristics

The class java.util.concurrent.ArrayBlockingQueue is based on an array and – like most queue implementations – is thread-safe (see below). It is bounded (has a maximum capacity), accordingly blocking, and provides a fairness policy (i.e., blocking methods are served in the order they were called).

The characteristics at a glance:

Underlying data structureThread-safe?Blocking/
non-blocking
Fairness
policy
Bounded/
unbounded
Iterator type
ArrayYes
(pessimistic locking with a lock)
BlockingOptionalBoundedWeakly consistent¹

¹ Weakly consistent: All elements that exist when the iterator is created are traversed by the iterator exactly once. Changes that occur after this can, but do not need to, be reflected by the iterator.

Due to the possibly high contention with simultaneous read and write access, you should – if you need a blocking, thread-safe queue – test whether a LinkedBlockingQueue is more performant for your specific purpose. While this queue is based on a linked list, it uses two separate ReentrantLocks for writing and reading, which reduces access conflicts.

ArrayBlockingQueue Example

In the following example, we create an ArrayBlockingQueue with capacity 3. Then we have a ScheduledExecutorService write and read elements to and from the queue at specified intervals (→ code on GitHub):

public class ArrayBlockingQueueExample { private static final long startTime = System.currentTimeMillis(); public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3); ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); // Start reading from the queue immediately, every 3 seconds for (int i = 0; i < 10; i++) { int delaySeconds = i * 3; pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS); } // Start writing to the queue after 3.5 seconds (so there are already 2 threads // waiting), every 1 seconds (so that the queue fills faster than it's emptied, // so that we see a full queue soon) for (int i = 0; i < 10; i++) { int element = i; // Assign to an effectively final variable int delayMillis = 3500 + i * 1000; pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS); } pool.shutdown(); pool.awaitTermination(1, TimeUnit.MINUTES); } private static void enqueue(BlockingQueue<Integer> queue, int element) { log("Calling queue.put(%d) (queue = %s)...", element, queue); try { queue.put(element); log("queue.put(%d) returned (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void dequeue(BlockingQueue<Integer> queue) { log(" Calling queue.take() (queue = %s)...", queue); try { Integer element = queue.take(); log(" queue.take() returned %d (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void log(String format, Object... args) { System.out.printf( Locale.US, "[%4.1fs] [%-16s] %s%n", (System.currentTimeMillis() - startTime) / 1000.0, Thread.currentThread().getName(), String.format(format, args)); } }
Code language: Java (java)

We try to read an element from the queue every three seconds, starting immediately. We write the elements every second but do not start until 3.5 s have passed. At this point, two reading threads should have already blocked and are waiting for elements to be written to the queue.

Since we write faster than we read, the queue should soon reach its capacity limit. The writing threads should block from that moment until the reading threads have caught up.

Here is a sample output:

[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])... [ 3.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [])... [ 3.5s] [pool-1-thread-3 ] Calling queue.put(0) (queue = [])... [ 3.5s] [pool-1-thread-3 ] queue.put(0) returned (queue = []) [ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = []) [ 4.5s] [pool-1-thread-9 ] Calling queue.put(1) (queue = [])... [ 4.5s] [pool-1-thread-9 ] queue.put(1) returned (queue = []) [ 4.5s] [pool-1-thread-2 ] queue.take() returned 1 (queue = []) [ 5.5s] [pool-1-thread-7 ] Calling queue.put(2) (queue = [])... [ 5.5s] [pool-1-thread-7 ] queue.put(2) returned (queue = [2]) [ 6.0s] [pool-1-thread-8 ] Calling queue.take() (queue = [2])... [ 6.0s] [pool-1-thread-8 ] queue.take() returned 2 (queue = []) [ 6.5s] [pool-1-thread-5 ] Calling queue.put(3) (queue = [])... [ 6.5s] [pool-1-thread-5 ] queue.put(3) returned (queue = [3]) [ 7.5s] [pool-1-thread-4 ] Calling queue.put(4) (queue = [3])... [ 7.5s] [pool-1-thread-4 ] queue.put(4) returned (queue = [3, 4]) [ 8.5s] [pool-1-thread-10] Calling queue.put(5) (queue = [3, 4])... [ 8.5s] [pool-1-thread-10] queue.put(5) returned (queue = [3, 4, 5]) [ 9.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [3, 4, 5])... [ 9.0s] [pool-1-thread-6 ] queue.take() returned 3 (queue = [4, 5]) [ 9.5s] [pool-1-thread-3 ] Calling queue.put(6) (queue = [4, 5])... [ 9.5s] [pool-1-thread-3 ] queue.put(6) returned (queue = [4, 5, 6]) [10.5s] [pool-1-thread-1 ] Calling queue.put(7) (queue = [4, 5, 6])... [11.5s] [pool-1-thread-9 ] Calling queue.put(8) (queue = [4, 5, 6])... [12.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [4, 5, 6])... [12.0s] [pool-1-thread-2 ] queue.take() returned 4 (queue = [5, 6, 7]) [12.0s] [pool-1-thread-1 ] queue.put(7) returned (queue = [5, 6, 7]) [12.5s] [pool-1-thread-7 ] Calling queue.put(9) (queue = [5, 6, 7])... [15.0s] [pool-1-thread-8 ] Calling queue.take() (queue = [5, 6, 7])... [15.0s] [pool-1-thread-8 ] queue.take() returned 5 (queue = [6, 7, 8]) [15.0s] [pool-1-thread-9 ] queue.put(8) returned (queue = [6, 7, 8]) [18.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [6, 7, 8])... [18.0s] [pool-1-thread-5 ] queue.take() returned 6 (queue = [7, 8, 9]) [18.0s] [pool-1-thread-7 ] queue.put(9) returned (queue = [7, 8, 9]) [21.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [7, 8, 9])... [21.0s] [pool-1-thread-4 ] queue.take() returned 7 (queue = [8, 9]) [24.0s] [pool-1-thread-10] Calling queue.take() (queue = [8, 9])... [24.0s] [pool-1-thread-10] queue.take() returned 8 (queue = [9]) [27.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [9])... [27.0s] [pool-1-thread-6 ] queue.take() returned 9 (queue = [])
Code language: plaintext (plaintext)

As predicted, the first two read attempts block at 0.0 s and 3.0 s because no elements have yet been written to the queue.

After 3.5 s, the first element is written, which wakes up the first thread and removes this element again. After 4.5 s, the second element is written, waking up the second thread to remove the element.

Since the program writes faster than it reads, after 10.5 s, thread 1 blocks, after 11.5 s, thread 9 blocks, and after 12.5 s, thread 7 blocks when trying to write additional elements into the queue, which is full at that time.

After 12.0 s, an element is removed, and thread 1 can continue writing. After 15.0 s, another element is taken, and thread 9 can continue. After 18.0 s, thread 7 can continue.

Since no other elements are written to the queue, it empties again towards the end.

Is ArrayBlockingQueue Thread-Safe?

Yes, ArrayBlockingQueue is thread-safe.

A single ReentrantLock maintains ArrayBlockingQueue's thread-safety. It is used for the queue's head and tail simultaneously so that access conflicts ("thread contention") between producer and consumer threads can occur in case of simultaneous read and write accesses.

Explicit locks such as ReentrantLock are mainly suitable for high-contention applications. Optimistic locking is better for low to moderate thread contention.

Differences from other queues:

  • With LinkedBlockingQueue, thread safety is provided by not one but two locks. Thus, producer and consumer threads cannot block each other.
  • With ConcurrentLinkedQueue, thread safety is provided by optimistic locking via compare-and-set, resulting in better performance with low to moderate contention.

Summary and Outlook

This article has introduced you to the ArrayBlockingQueue. This queue is thread-safe, blocking, and bounded. With an example, you have seen how you can use ArrayBlockingQueue.

As the name suggests, this queue is based on an array. The linked list-based counterpart – LinkedBlockingQueue – was covered in the previous part of the series.

The next part of the series is about PriorityBlockingQueue – a thread-safe and blocking variant of the PriorityQueue presented previously.

If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Then click here to sign up for the HappyCoders.eu newsletter.