BlockingQueue - Feature ImageBlockingQueue - Feature Image
HappyCoders Glasses

BlockingQueue in Java

Sven Woltmann
Sven Woltmann
April 26, 2022

In this article, you will learn about the java.util.concurrent.BlockingQueue interface. BlockingQueue extends Java's Queue interface discussed in the previous part of this tutorial series with methods for blocking access.

Before we clarify what "blocking access" means, we first need to talk about the term "bounded queue".

What Is a Bounded Queue?

If a queue can only hold a limited number of elements, it is referred to as a "bounded queue". The maximum number of elements is referred to as "capacity" and is specified when the queue is created.

For example, the following line of code creates an ArrayBlockingQueue limited to 100 elements:

Queue<Integer> queue = new ArrayBlockingQueue<>(100);
Code language: Java (java)

IOn the other hand, if the number of elements in the queue is not limited (or is limited only by the available memory), we speak of an "unbounded queue".

(By the way, the same definition applies to all data structures, e.g., also to stacks and deques.)

What Is a Blocking Queue?

Two special cases can occur with the "Enqueue" and "Dequeue" queue operations:

  • We could try to insert an element into a bounded queue that has reached its capacity limit – in other words, that is full.
  • We could try to take an element from an empty queue.

A non-blocking queue returns a specific return value or throws an exception in such cases (see section "Queue Methods" in the article about Java queues).

A blocking queue, on the other hand, provides additional methods that wait for the desired operation to be executed:

  • Enqueue methods that, when inserting into a full bounded queue, wait until the queue has free capacity again (this requires another thread to take an element).
  • Dequeue methods that, when taking an element from an empty queue, wait for the queue to become non-empty (this requires another thread to insert an element).

These additional methods are defined in the BlockingQueue interface. I will explain them in the following chapter.

Fairness Policy

Blocking methods are not automatically processed in the order they were called. You can activate the processing in call order in some queue implementations through an optional "fairness policy". However, this increases the overhead and thus massively reduces the throughput of the queue. As a rule, it is not necessary to activate the fairness policy.

BlockingQueue Interface

The blocking enqueue and dequeue operations each come in two variants. The first variant waits indefinitely. The second variant gives up after a specified waiting time and returns false or null.

In the first two columns, the following table shows the non-blocking methods that BlockingQueue inherits from Queue (and that we discussed in the previous part of the tutorial). In the third and fourth columns, you will find the added blocking methods:

Non-blocking
(inherited from Queue)
Blocking
(new in BlockingQueue)
ExceptionReturn valueBlocksBlocks
with timeout
Adding an element
(enqueue):
add(E e)offer(E e)put(E e)offer(E e,
  long timeout,
  TimeUnit unit)
Removing an element
(dequeue):
remove()poll()take()poll(
  long timeout,
  TimeUnit unit)
Viewing an element
(examine):
element()peek()

The following section describes the BlockingQueue methods in detail.

BlockingQueue Methods

BlockingQueue.put()

The put() method inserts an element into the queue if space is available. However, if the queue's capacity limit is reached, the method blocks until space is freed.

BlockingQueue.offer() with Timeout

Also, the offer() method inserts an element if there is still space in the queue. Otherwise, the method waits for the specified time. If a space becomes available during this time, the element is inserted, and the method returns true. If, on the other hand, the waiting time expires without any space being freed, the method returns false.

BlockingQueue.take()

This method takes an element from the head of the queue, provided the queue is not empty. If the queue is empty, take() blocks until an element becomes available and then returns it.

BlockingQueue.poll() with Timeout

Also, poll() takes an element from the queue's head if the queue is not empty. If the queue is empty, the method waits for the specified time. If an element becomes available during the waiting time, it is returned. If the wait time expires without result, the method returns null.

InterruptedException for Blocking Methods

All blocking methods throw an InterruptedException when the interrupt() method is called on the waiting thread. With interrupt(), blocked threads should be terminated when waiting is no longer necessary.

This is the case, for example, when the application is being shut down. In this case, the event for which the blocking method is waiting may no longer occur. However, the method would still wait for the event to occur and thus prevent a regular shutdown of the application. Canceling the waiting threads with interrupt() allows a clean shutdown.

Java BlockingQueue Example

The following source code shows an example that is significantly more complex than the example with a non-blocking queue due to concurrency (→ Code on GitHub):

public class BlockingQueueExample { private static final long startTime = System.currentTimeMillis(); public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3); ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); // Start reading from the queue immediately, every 3 seconds for (int i = 0; i < 10; i++) { int delaySeconds = i * 3; pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS); } // Start writing to the queue after 3.5 seconds (so there are already 2 // threads waiting), every 1 seconds (so that the queue fills faster than // it's emptied, so that we see a full queue soon) for (int i = 0; i < 10; i++) { int element = i; // Assign to an effectively final variable int delayMillis = 3500 + i * 1000; pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS); } pool.shutdown(); pool.awaitTermination(1, TimeUnit.MINUTES); } private static void enqueue(BlockingQueue<Integer> queue, int element) { log("Calling queue.put(%d) (queue = %s)...", element, queue); try { queue.put(element); log("queue.put(%d) returned (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void dequeue(BlockingQueue<Integer> queue) { log(" Calling queue.take() (queue = %s)...", queue); try { Integer element = queue.take(); log(" queue.take() returned %d (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void log(String format, Object... args) { System.out.printf( Locale.US, "[%4.1fs] [%-16s] %s%n", (System.currentTimeMillis() - startTime) / 1000.0, Thread.currentThread().getName(), String.format(format, args)); } }
Code language: Java (java)

In this example, we create a blocking, bounded queue with a capacity of 3 and schedule ten enqueue and ten dequeue operations each.

The enqueue operations start later, so we can see blocking dequeue operations at the beginning. Also, the enqueue operations happen in shorter intervals so that the queue's capacity limit is reached after a while, and we can see blocking enqueue operations.

Here is the output of the program:

[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])... [ 3.0s] [pool-1-thread-3 ] Calling queue.take() (queue = [])... [ 3.5s] [pool-1-thread-2 ] Calling queue.put(0) (queue = [])... [ 3.5s] [pool-1-thread-2 ] queue.put(0) returned (queue = []) [ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = []) [ 4.5s] [pool-1-thread-10] Calling queue.put(1) (queue = [])... [ 4.5s] [pool-1-thread-10] queue.put(1) returned (queue = []) [ 4.5s] [pool-1-thread-3 ] queue.take() returned 1 (queue = []) [ 5.5s] [pool-1-thread-8 ] Calling queue.put(2) (queue = [])... [ 5.5s] [pool-1-thread-8 ] queue.put(2) returned (queue = [2]) [ 6.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [2])... [ 6.0s] [pool-1-thread-9 ] queue.take() returned 2 (queue = []) [ 6.5s] [pool-1-thread-5 ] Calling queue.put(3) (queue = [])... [ 6.5s] [pool-1-thread-5 ] queue.put(3) returned (queue = [3]) [ 7.5s] [pool-1-thread-6 ] Calling queue.put(4) (queue = [3])... [ 7.5s] [pool-1-thread-6 ] queue.put(4) returned (queue = [3, 4]) [ 8.5s] [pool-1-thread-7 ] Calling queue.put(5) (queue = [3, 4])... [ 8.5s] [pool-1-thread-7 ] queue.put(5) returned (queue = [3, 4, 5]) [ 9.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [3, 4, 5])... [ 9.0s] [pool-1-thread-4 ] queue.take() returned 3 (queue = [4, 5]) [ 9.5s] [pool-1-thread-2 ] Calling queue.put(6) (queue = [4, 5])... [ 9.5s] [pool-1-thread-2 ] queue.put(6) returned (queue = [4, 5, 6]) [10.5s] [pool-1-thread-1 ] Calling queue.put(7) (queue = [4, 5, 6])... [11.5s] [pool-1-thread-10] Calling queue.put(8) (queue = [4, 5, 6])... [12.0s] [pool-1-thread-3 ] Calling queue.take() (queue = [4, 5, 6])... [12.0s] [pool-1-thread-3 ] queue.take() returned 4 (queue = [5, 6, 7]) [12.0s] [pool-1-thread-1 ] queue.put(7) returned (queue = [5, 6, 7]) [12.5s] [pool-1-thread-8 ] Calling queue.put(9) (queue = [5, 6, 7])... [15.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [5, 6, 7])... [15.0s] [pool-1-thread-9 ] queue.take() returned 5 (queue = [6, 7, 8]) [15.0s] [pool-1-thread-10] queue.put(8) returned (queue = [6, 7, 8]) [18.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [6, 7, 8])... [18.0s] [pool-1-thread-5 ] queue.take() returned 6 (queue = [7, 8, 9]) [18.0s] [pool-1-thread-8 ] queue.put(9) returned (queue = [7, 8, 9]) [21.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [7, 8, 9])... [21.0s] [pool-1-thread-6 ] queue.take() returned 7 (queue = [8, 9]) [24.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [8, 9])... [24.0s] [pool-1-thread-7 ] queue.take() returned 8 (queue = [9]) [27.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [9])... [27.0s] [pool-1-thread-4 ] queue.take() returned 9 (queue = [])
Code language: plaintext (plaintext)

In the beginning, the queue is empty, so the first two read attempts block (after 0 and 3 s).

After 3.5 s (after two reading threads are waiting at the queue), the program starts writing to the queue every second. The output shows nicely how a reading thread is woken up in each case and immediately removes the attached element again (at 3.5 and 4.5 s).

Since the program writes to the queue three times as fast as it reads from it, the attempt to write a 7 to the queue blocks after 10.5 s since the queue has reached its capacity limit of 3 with the elements [4, 5, 6].

Only after the 4 has been removed from the queue after 12 s, the 7 can be inserted. For the 8 and the 9, we see a corresponding behavior.

BlockingQueue Implementations

There are five implementations of the BlockingQueue interface in the JDK, each with specific characteristics. In the following UML class diagram, I've highlighted them together with their interface:

BlockingQueue interface in the class hierarchy
BlockingQueue interface in the class hierarchy

I will discuss each of the implementations in separate articles in the tutorial. There, I'll present their characteristics and explain, on their basis, under which conditions you should use the respective implementation. The following links lead to the corresponding articles:

You can also access these articles at any time via the tutorial navigation in the right margin.

Summary and Outlook

This article first explained the differences between bounded/unbounded and blocking/non-blocking queues. After that, you learned about the BlockingQueue interface and its methods put(), offer(), take(), and poll().

In the following parts of this series, we will look at all Queue and BlockingQueue implementations and their characteristics.

If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Then click here to sign up for the HappyCoders.eu newsletter.