This part of the tutorial series is about LinkedBlockingQueue
. You will get to know its unique characteristics and see how to use this queue with an example. You will also learn when exactly you should use this queue.
Here we are in the class hierarchy:
LinkedBlockingQueue Characteristics
The class java.util.concurrent.LinkedBlockingQueue
is – just like ConcurrentLinkedQueue – based on a linked list, but is – like ArrayBlockingQueue presented in the next part – thread-safe (see below), bounded, and blocking.
Unlike ArrayBlockingQueue
, LinkedBlockingQueue
does not provide a fairness policy. (Fairness policy means that blocking methods are served in the order they were called.)
The queue's characteristics in detail:
Underlying data structure | Thread-safe? | Blocking/ non-blocking | Fairness policy | Bounded/ unbounded | Iterator type |
---|---|---|---|---|---|
Linked list | Yes (pessimistic locking with two locks) | Blocking | Not available | Bounded | Weakly consistent¹ |
¹ Weakly consistent: All elements that exist when the iterator is created are traversed by the iterator exactly once. Changes that occur after this can, but do not need to, be reflected by the iterator.
Recommended Use Case
I recommend LinkedBlockingQueue
if you need a blocking, thread-safe queue.
By the way, the LinkedBlockingQueue
class is used by Executors.newFixedThreadPool()
and Executors.newSingleThreadedExecutor()
as a "work queue" for the executor. It is, therefore, used intensively, which keeps the probability of bugs extremely low.
LinkedBlockingQueue Example
The following example shows how to use LinkedBlockingQueue
. We create a queue with a capacity of 3. Immediately afterward, we start reading elements from the queue at intervals of three seconds. After 3.5 seconds, we begin writing elements to the queue at intervals of one second each (→ code on GitHub).
public class LinkedBlockingQueueExample {
private static final long startTime = System.currentTimeMillis();
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// Start reading from the queue immediately, every 3 seconds
for (int i = 0; i < 10; i++) {
int delaySeconds = i * 3;
pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS);
}
// Start writing to the queue after 3.5 seconds (so there are already 2 threads
// waiting), every 1 seconds (so that the queue fills faster than it's emptied,
// so that we see a full queue soon)
for (int i = 0; i < 10; i++) {
int element = i; // Assign to an effectively final variable
int delayMillis = 3500 + i * 1000;
pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
}
private static void enqueue(BlockingQueue<Integer> queue, int element) {
log("Calling queue.put(%d) (queue = %s)...", element, queue);
try {
queue.put(element);
log("queue.put(%d) returned (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void dequeue(BlockingQueue<Integer> queue) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer element = queue.take();
log(" queue.take() returned %d (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void log(String format, Object... args) {
System.out.printf(
Locale.US,
"[%4.1fs] [%-16s] %s%n",
(System.currentTimeMillis() - startTime) / 1000.0,
Thread.currentThread().getName(),
String.format(format, args));
}
}
Code language: Java (java)
Below you can see the output of the sample program:
[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])...
[ 3.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [])...
[ 3.5s] [pool-1-thread-8 ] Calling queue.put(0) (queue = [])...
[ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = [])
[ 3.5s] [pool-1-thread-8 ] queue.put(0) returned (queue = [])
[ 4.5s] [pool-1-thread-5 ] Calling queue.put(1) (queue = [])...
[ 4.5s] [pool-1-thread-4 ] queue.take() returned 1 (queue = [])
[ 4.5s] [pool-1-thread-5 ] queue.put(1) returned (queue = [])
[ 5.5s] [pool-1-thread-3 ] Calling queue.put(2) (queue = [])...
[ 5.5s] [pool-1-thread-3 ] queue.put(2) returned (queue = [2])
[ 6.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [2])...
[ 6.0s] [pool-1-thread-7 ] queue.take() returned 2 (queue = [])
[ 6.5s] [pool-1-thread-9 ] Calling queue.put(3) (queue = [])...
[ 6.5s] [pool-1-thread-9 ] queue.put(3) returned (queue = [3])
[ 7.5s] [pool-1-thread-6 ] Calling queue.put(4) (queue = [3])...
[ 7.5s] [pool-1-thread-6 ] queue.put(4) returned (queue = [3, 4])
[ 8.5s] [pool-1-thread-2 ] Calling queue.put(5) (queue = [3, 4])...
[ 8.5s] [pool-1-thread-2 ] queue.put(5) returned (queue = [3, 4, 5])
[ 9.0s] [pool-1-thread-10] Calling queue.take() (queue = [3, 4, 5])...
[ 9.0s] [pool-1-thread-10] queue.take() returned 3 (queue = [4, 5])
[ 9.5s] [pool-1-thread-1 ] Calling queue.put(6) (queue = [4, 5])...
[ 9.5s] [pool-1-thread-1 ] queue.put(6) returned (queue = [4, 5, 6])
[10.5s] [pool-1-thread-8 ] Calling queue.put(7) (queue = [4, 5, 6])...
[11.5s] [pool-1-thread-4 ] Calling queue.put(8) (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-5 ] queue.take() returned 4 (queue = [5, 6, 7])
[12.0s] [pool-1-thread-8 ] queue.put(7) returned (queue = [5, 6, 7])
[12.5s] [pool-1-thread-3 ] Calling queue.put(9) (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-7 ] queue.take() returned 5 (queue = [6, 7, 8])
[15.0s] [pool-1-thread-4 ] queue.put(8) returned (queue = [6, 7, 8])
[18.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [6, 7, 8])...
[18.0s] [pool-1-thread-3 ] queue.put(9) returned (queue = [7, 8, 9])
[18.0s] [pool-1-thread-9 ] queue.take() returned 6 (queue = [7, 8, 9])
[21.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [7, 8, 9])...
[21.0s] [pool-1-thread-6 ] queue.take() returned 7 (queue = [8, 9])
[24.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [8, 9])...
[24.0s] [pool-1-thread-2 ] queue.take() returned 8 (queue = [9])
[27.0s] [pool-1-thread-10] Calling queue.take() (queue = [9])...
[27.0s] [pool-1-thread-10] queue.take() returned 9 (queue = [])
Code language: plaintext (plaintext)
Since we start writing only after two threads already call take(), these first two read attempts block at 0.0 and 3.0 s (threads 1 and 4).
After 3.5 s, the first element is written (thread 8). This wakes up thread 1, and the take() method immediately removes this element from the queue again.
After 4.5 s, the second element is written (thread 5). Thread 4 is woken up and takes this element from the queue again.
The program writes faster than it reads. After 10.5 s, a writing thread (thread 8) blocks for the first time when trying to write 7 into the queue, which is full at that time. After 11.5 s, thread 4 also blocks the attempt to write 8 into the queue.
After 12.0 s, thread 5 removes an element from the queue, which frees up space. Thread 8 is woken up and writes 7 into the queue.
See if you can read and understand the rest of the issues yourself.
Is LinkedBlockingQueue Thread-Safe?
Yes, LinkedBlockingQueue
is thread-safe.
Thread safety of LinkedBlockingQueue
is guaranteed by pessimistic locking using two separate ReentrantLock
s for write and read operations. This prevents contention (access conflicts) between producer and consumer threads.
Differences from other queues:
- With ConcurrentLinkedQueue, thread safety is provided by optimistic locking via compare-and-set, resulting in better performance with low to moderate contention.
- ArrayBlockingQueue is protected with only one
ReentrantLock
, so access conflicts between producer and consumer threads are possible.
LinkedBlockingQueue Time Complexity
As with all queues, the time required for enqueue and dequeue operations is independent of the length of the queue. The time complexity is, therefore, O(1).
That also applies to the size()
method. Unlike ConcurrentLinkedQueue
, which is also based on a linked list and runs through the complete list to count the elements each time size()
is called, LinkedBlockingQueue
uses an AtomicInteger
internally, which is updated on insertion and removal, and thus keeps the size available with constant time.
Summary and Outlook
In this article, you have learned about LinkedBlockingQueue
– a thread-safe, blocking, bounded queue. You saw an example of how you can use LinkedBlockingQueue
, and you also learned in which cases you should use it.
LinkedBlockingQueue
is based on a linked list. The next part of the tutorial is about the array-based counterpart – ArrayBlockingQueue.
If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Then click here to sign up for the HappyCoders.eu newsletter.