LinkedBlockingQueue - Feature ImageLinkedBlockingQueue - Feature Image
HappyCoders Glasses

Java LinkedBlockingQueue
(with Example)

Sven Woltmann
Sven Woltmann
April 26, 2022

This part of the tutorial series is about LinkedBlockingQueue. You will get to know its unique characteristics and see how to use this queue with an example. You will also learn when exactly you should use this queue.

Here we are in the class hierarchy:

LinkedBlockingQueue in the class hierarchy
LinkedBlockingQueue in the class hierarchy

LinkedBlockingQueue Characteristics

The class java.util.concurrent.LinkedBlockingQueue is – just like ConcurrentLinkedQueue – based on a linked list, but is – like ArrayBlockingQueue presented in the next part – thread-safe (see below), bounded, and blocking.

Unlike ArrayBlockingQueue, LinkedBlockingQueue does not provide a fairness policy. (Fairness policy means that blocking methods are served in the order they were called.)

The queue's characteristics in detail:

Underlying data structureThread-safe?Blocking/
non-blocking
Fairness
policy
Bounded/
unbounded
Iterator type
Linked listYes
(pessimistic locking with two locks)
BlockingNot availableBoundedWeakly consistent¹

¹ Weakly consistent: All elements that exist when the iterator is created are traversed by the iterator exactly once. Changes that occur after this can, but do not need to, be reflected by the iterator.

I recommend LinkedBlockingQueue if you need a blocking, thread-safe queue.

By the way, the LinkedBlockingQueue class is used by Executors.newFixedThreadPool() and Executors.newSingleThreadedExecutor() as a "work queue" for the executor. It is, therefore, used intensively, which keeps the probability of bugs extremely low.

LinkedBlockingQueue Example

The following example shows how to use LinkedBlockingQueue. We create a queue with a capacity of 3. Immediately afterward, we start reading elements from the queue at intervals of three seconds. After 3.5 seconds, we begin writing elements to the queue at intervals of one second each (→ code on GitHub).

public class LinkedBlockingQueueExample { private static final long startTime = System.currentTimeMillis(); public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3); ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); // Start reading from the queue immediately, every 3 seconds for (int i = 0; i < 10; i++) { int delaySeconds = i * 3; pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS); } // Start writing to the queue after 3.5 seconds (so there are already 2 threads // waiting), every 1 seconds (so that the queue fills faster than it's emptied, // so that we see a full queue soon) for (int i = 0; i < 10; i++) { int element = i; // Assign to an effectively final variable int delayMillis = 3500 + i * 1000; pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS); } pool.shutdown(); pool.awaitTermination(1, TimeUnit.MINUTES); } private static void enqueue(BlockingQueue<Integer> queue, int element) { log("Calling queue.put(%d) (queue = %s)...", element, queue); try { queue.put(element); log("queue.put(%d) returned (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void dequeue(BlockingQueue<Integer> queue) { log(" Calling queue.take() (queue = %s)...", queue); try { Integer element = queue.take(); log(" queue.take() returned %d (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void log(String format, Object... args) { System.out.printf( Locale.US, "[%4.1fs] [%-16s] %s%n", (System.currentTimeMillis() - startTime) / 1000.0, Thread.currentThread().getName(), String.format(format, args)); } }
Code language: Java (java)

Below you can see the output of the sample program:

[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])... [ 3.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [])... [ 3.5s] [pool-1-thread-8 ] Calling queue.put(0) (queue = [])... [ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = []) [ 3.5s] [pool-1-thread-8 ] queue.put(0) returned (queue = []) [ 4.5s] [pool-1-thread-5 ] Calling queue.put(1) (queue = [])... [ 4.5s] [pool-1-thread-4 ] queue.take() returned 1 (queue = []) [ 4.5s] [pool-1-thread-5 ] queue.put(1) returned (queue = []) [ 5.5s] [pool-1-thread-3 ] Calling queue.put(2) (queue = [])... [ 5.5s] [pool-1-thread-3 ] queue.put(2) returned (queue = [2]) [ 6.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [2])... [ 6.0s] [pool-1-thread-7 ] queue.take() returned 2 (queue = []) [ 6.5s] [pool-1-thread-9 ] Calling queue.put(3) (queue = [])... [ 6.5s] [pool-1-thread-9 ] queue.put(3) returned (queue = [3]) [ 7.5s] [pool-1-thread-6 ] Calling queue.put(4) (queue = [3])... [ 7.5s] [pool-1-thread-6 ] queue.put(4) returned (queue = [3, 4]) [ 8.5s] [pool-1-thread-2 ] Calling queue.put(5) (queue = [3, 4])... [ 8.5s] [pool-1-thread-2 ] queue.put(5) returned (queue = [3, 4, 5]) [ 9.0s] [pool-1-thread-10] Calling queue.take() (queue = [3, 4, 5])... [ 9.0s] [pool-1-thread-10] queue.take() returned 3 (queue = [4, 5]) [ 9.5s] [pool-1-thread-1 ] Calling queue.put(6) (queue = [4, 5])... [ 9.5s] [pool-1-thread-1 ] queue.put(6) returned (queue = [4, 5, 6]) [10.5s] [pool-1-thread-8 ] Calling queue.put(7) (queue = [4, 5, 6])... [11.5s] [pool-1-thread-4 ] Calling queue.put(8) (queue = [4, 5, 6])... [12.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [4, 5, 6])... [12.0s] [pool-1-thread-5 ] queue.take() returned 4 (queue = [5, 6, 7]) [12.0s] [pool-1-thread-8 ] queue.put(7) returned (queue = [5, 6, 7]) [12.5s] [pool-1-thread-3 ] Calling queue.put(9) (queue = [5, 6, 7])... [15.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [5, 6, 7])... [15.0s] [pool-1-thread-7 ] queue.take() returned 5 (queue = [6, 7, 8]) [15.0s] [pool-1-thread-4 ] queue.put(8) returned (queue = [6, 7, 8]) [18.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [6, 7, 8])... [18.0s] [pool-1-thread-3 ] queue.put(9) returned (queue = [7, 8, 9]) [18.0s] [pool-1-thread-9 ] queue.take() returned 6 (queue = [7, 8, 9]) [21.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [7, 8, 9])... [21.0s] [pool-1-thread-6 ] queue.take() returned 7 (queue = [8, 9]) [24.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [8, 9])... [24.0s] [pool-1-thread-2 ] queue.take() returned 8 (queue = [9]) [27.0s] [pool-1-thread-10] Calling queue.take() (queue = [9])... [27.0s] [pool-1-thread-10] queue.take() returned 9 (queue = [])
Code language: plaintext (plaintext)

Since we start writing only after two threads already call take(), these first two read attempts block at 0.0 and 3.0 s (threads 1 and 4).

After 3.5 s, the first element is written (thread 8). This wakes up thread 1, and the take() method immediately removes this element from the queue again.

After 4.5 s, the second element is written (thread 5). Thread 4 is woken up and takes this element from the queue again.

The program writes faster than it reads. After 10.5 s, a writing thread (thread 8) blocks for the first time when trying to write 7 into the queue, which is full at that time. After 11.5 s, thread 4 also blocks the attempt to write 8 into the queue.

After 12.0 s, thread 5 removes an element from the queue, which frees up space. Thread 8 is woken up and writes 7 into the queue.

See if you can read and understand the rest of the issues yourself.

Is LinkedBlockingQueue Thread-Safe?

Yes, LinkedBlockingQueue is thread-safe.

Thread safety of LinkedBlockingQueue is guaranteed by pessimistic locking using two separate ReentrantLocks for write and read operations. This prevents contention (access conflicts) between producer and consumer threads.

Differences from other queues:

  • With ConcurrentLinkedQueue, thread safety is provided by optimistic locking via compare-and-set, resulting in better performance with low to moderate contention.
  • ArrayBlockingQueue is protected with only one ReentrantLock, so access conflicts between producer and consumer threads are possible.

LinkedBlockingQueue Time Complexity

As with all queues, the time required for enqueue and dequeue operations is independent of the length of the queue. The time complexity is, therefore, O(1).

That also applies to the size() method. Unlike ConcurrentLinkedQueue, which is also based on a linked list and runs through the complete list to count the elements each time size() is called, LinkedBlockingQueue uses an AtomicInteger internally, which is updated on insertion and removal, and thus keeps the size available with constant time.

Summary and Outlook

In this article, you have learned about LinkedBlockingQueue – a thread-safe, blocking, bounded queue. You saw an example of how you can use LinkedBlockingQueue, and you also learned in which cases you should use it.

LinkedBlockingQueue is based on a linked list. The next part of the tutorial is about the array-based counterpart – ArrayBlockingQueue.

If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Then click here to sign up for the HappyCoders.eu newsletter.