In this article, you will learn everything about ConcurrentLinkedQueue
, its characteristics and usage scenarios. An example will show you how to use ConcurrentLinkedQueue
.
Here we are in the class hierarchy:
ConcurrentLinkedQueue Characteristics
The class java.util.concurrent.ConcurrentLinkedQueue
is based on a singly linked list and is – like most queue implementations – thread-safe (see below).
(The only non-thread-safe queue is PriorityQueue
– and the deques ArrayDeque
and LinkedList
, which also implement the Queue
interface. More about this in the next tutorial series on "Deques".)
Since the length of a linked list is difficult to determine, ConcurrentLinkedQueue
is unbounded. ConcurrentLinkedQueue
also does not provide blocking operations.
The characteristics in detail:
Underlying data structure | Thread-safe? | Blocking/ non-blocking | Bounded/ unbounded | Iterator type |
---|---|---|---|---|
Linked list | Yes (optimistic locking via compare-and-set) | Non-blocking | Unbounded | Weakly consistent¹ |
¹ Weakly consistent: All elements that exist when the iterator is created are traversed by the iterator exactly once. Changes that occur after this can, but do not need to, be reflected by the iterator.
Recommended Use Case
ConcurrentLinkedQueue
is a good choice when a thread-safe, non-blocking, unbounded queue is needed.
This applies despite my general recommendation to prefer array-based data structures over those implemented with linked lists.
Array-based alternatives are:
- The
ArrayDeque
described in the following tutorial about deques – this is, however, not thread-safe. - The ArrayBlockingQueue described later in this tutorial – firstly, it is bounded, and secondly, it implements thread safety via a single
ReentrantLock
. This is, for most use cases (with low to medium contention), less performant than optimistic locking.
ConcurrentLinkedQueue Example
The following example demonstrates the thread safety of ConcurrentLinkedDeque
. Four writing and three reading threads concurrently add and remove elements from the queue (→ code on GitHub):
public class ConcurrentLinkedQueueExample {
private static final int NUMBER_OF_PRODUCERS = 4;
private static final int NUMBER_OF_CONSUMERS = 3;
private static final int NUMBER_OF_ELEMENTS_TO_PUT_INTO_QUEUE_PER_THREAD = 5;
private static final int MIN_SLEEP_TIME_MILLIS = 500;
private static final int MAX_SLEEP_TIME_MILLIS = 2000;
private static final int POISON_PILL = -1;
public static void main(String[] args) throws InterruptedException {
Queue<Integer> queue = new ConcurrentLinkedQueue<>();
// Start producers
CountDownLatch producerFinishLatch = new CountDownLatch(NUMBER_OF_PRODUCERS);
for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
createProducerThread(queue, producerFinishLatch).start();
}
// Start consumers
for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
createConsumerThread(queue).start();
}
// Wait until all producers are finished
producerFinishLatch.await();
// Put poison pills on the queue (one for each consumer)
for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
queue.offer(POISON_PILL);
}
// We'll let the program end when all consumers are finished
}
private static Thread createProducerThread(
Queue<Integer> queue, CountDownLatch finishLatch) {
return new Thread(
() -> {
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = 0; i < NUMBER_OF_ELEMENTS_TO_PUT_INTO_QUEUE_PER_THREAD; i++) {
sleepRandomTime();
Integer element = random.nextInt(1000);
queue.offer(element);
System.out.printf(
"[%s] queue.offer(%3d) --> queue = %s%n",
Thread.currentThread().getName(), element, queue);
}
finishLatch.countDown();
});
}
private static Thread createConsumerThread(Queue<Integer> queue) {
return new Thread(
() -> {
while (true) {
sleepRandomTime();
Integer element = queue.poll();
System.out.printf(
"[%s] queue.poll() = %4d --> queue = %s%n",
Thread.currentThread().getName(), element, queue);
// End the thread when a poison pill is detected
if (element != null && element == POISON_PILL) {
break;
}
}
});
}
private static void sleepRandomTime() {
ThreadLocalRandom random = ThreadLocalRandom.current();
try {
Thread.sleep(random.nextInt(MIN_SLEEP_TIME_MILLIS, MAX_SLEEP_TIME_MILLIS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Code language: Java (java)
Here are the first ten lines of an exemplary output:
[Thread-1] queue.offer(982) --> queue = [982]
[Thread-6] queue.poll() = 982 --> queue = []
[Thread-5] queue.poll() = null --> queue = []
[Thread-0] queue.offer(917) --> queue = [917]
[Thread-3] queue.offer(224) --> queue = [917, 224]
[Thread-2] queue.offer(932) --> queue = [917, 224, 932]
[Thread-6] queue.poll() = 917 --> queue = [224, 932]
[Thread-4] queue.poll() = 224 --> queue = [932]
[Thread-5] queue.poll() = 932 --> queue = []
[Thread-0] queue.offer(607) --> queue = [607]
[Thread-1] queue.offer( 87) --> queue = [607, 87]
[Thread-3] queue.offer(264) --> queue = [607, 87, 264]
[Thread-4] queue.poll() = 607 --> queue = [87, 264]
[Thread-0] queue.offer(348) --> queue = [87, 264, 348]
[Thread-2] queue.offer(728) --> queue = [87, 264, 348, 728]
Code language: plaintext (plaintext)
We can see very nicely how the seven threads insert and remove elements. In the third line, we see that thread 5 received null
from the call to queue.poll()
because the queue was empty at that time.
ConcurrentLinkedQueue Performance
This section discusses thread the safety and time complexity of ConcurrentLinkedQueue
.
Ist ConcurrentLinkedQueue Thread-Safe?
The thread-safety of ConcurrentLinkedQueue
is achieved by optimistic locking. More precisely: by non-blocking compare-and-set (CAS) operations on separate VarHandles for the queue's head and tail.
When accessing queues, low to moderate contention (access conflicts due to multiple threads) is usually to be expected. A thread usually does not access the queue continuously but must first create the element to be set or process the element to be taken.
With low to moderate contention, optimistic locking achieves a significant performance gain over pessimistic locking through implicit or explicit locks.
Differences from other queues:
- With LinkedBlockingQueue, thread safety is provided by pessimistic locking via two
ReentrantLock
s, leading to better performance with high contention. - With ArrayBlockingQueue, thread safety is provided by a single
ReentrantLock
.
ConcurrentLinkedQueue Time Complexity
As with all queues, the overhead for the enqueue and dequeue operations is independent of the queue length. The time complexity is, therefore, O(1).
However, this does not apply to the size()
method. To determine the length of the queue, you must iterate over all elements of the linked list. The longer the queue, the longer it takes to calculate the length. Therefore, the time complexity for size()
is O(n)
.
(Here, you can find an introduction to the topic of time complexity.)
Summary and Outlook
In this part of the tutorial series, I introduced you to the concrete Queue
implementation ConcurrentLinkedQueue
and its characteristics.
The following part will be about the PriorityQueue, which has some surprises in store.
If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Then click here to sign up for the HappyCoders.eu newsletter.