ConcurrentLinkedQueue - Feature ImageConcurrentLinkedQueue - Feature Image
HappyCoders Glasses

Java ConcurrentLinkedQueue
(mit Beispiel)

Sven Woltmann
Sven Woltmann
20. April 2022

In diesem Artikel erfährst du alles über die ConcurrentLinkedQueue, deren Eigenschaften und Einsatzszenarien. Anhand eines Beispiels siehst du, wie man ConcurrentLinkedQueue einsetzt.

Hier befinden wir uns in der Klassenhierarchie:

ConcurrentLinkedQueue in der Klassenhierarchie
ConcurrentLinkedQueue in der Klassenhierarchie

ConcurrentLinkedQueue Eigenschaften

Die Klasse java.util.concurrent.ConcurrentLinkedQueue basiert auf einer einfach verketteten Liste und ist – wie die meisten Queue-Implementierungen – threadsicher (s. u.).

(Die einzige nicht threadsichere Queue ist PriorityQueue – sowie die Deques ArrayDeque und LinkedList, welche auch das Queue-Interface implementieren. Dazu mehr in der nächsten Tutorial-Serie "Deques".)

Da die Länge einer verketteten Liste nur aufwändig zu bestimmen ist, ist ConcurrentLinkedQueue unbounded. ConcurrentLinkedQueue bietet außerdem keine blockierenden Operationen.

Die Eigenschaften im Detail:

Unterliegende DatenstrukturThread-safe?Blocking/
Non-blocking
Bounded/
Unbounded
Iterator Type
Verkettete ListeJa
(optimistisches Locking durch Compare-and-set)
Non-blockingUnboundedWeakly consistent¹

¹ Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.

Einsatzempfehlung

ConcurrentLinkedQueue ist eine gute Wahl, wenn eine threadsichere, nicht blockierende und unbounded Queue benötigt wird.

Dies gilt trotz meiner allgemeinen Empfehlung Array-basierte Datenstrukturen solchen vorzuziehen, die mit verketteten Listen implementiert sind.

Array-basierte Alternativen sind:

  • Das im Folgetutorial über Deques beschriebene ArrayDeque – dieses ist allerdings nicht thread-safe.
  • Die später in diesem Tutorial beschriebe ArrayBlockingQueue – diese ist zum einen bounded, und zum anderen wird die Threadsicherheit über ein einzelnes ReentrantLock implementiert. Dies ist für die meisten Einsatzszenarien (mit niedriger bis mittlerer Contention) unperformanter ist als optimistisches Locking.

ConcurrentLinkedQueue Beispiel

Das folgende Beispiel demonstriert die Threadsicherheit von ConcurrentLinkedDeque. Vier schreibende und drei lesende Threads fügen nebenläufig Elemente in die Queue ein und entnehmen sie wieder (→ Code auf GitHub):

public class ConcurrentLinkedQueueExample { private static final int NUMBER_OF_PRODUCERS = 4; private static final int NUMBER_OF_CONSUMERS = 3; private static final int NUMBER_OF_ELEMENTS_TO_PUT_INTO_QUEUE_PER_THREAD = 5; private static final int MIN_SLEEP_TIME_MILLIS = 500; private static final int MAX_SLEEP_TIME_MILLIS = 2000; private static final int POISON_PILL = -1; public static void main(String[] args) throws InterruptedException { Queue<Integer> queue = new ConcurrentLinkedQueue<>(); // Start producers CountDownLatch producerFinishLatch = new CountDownLatch(NUMBER_OF_PRODUCERS); for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) { createProducerThread(queue, producerFinishLatch).start(); } // Start consumers for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) { createConsumerThread(queue).start(); } // Wait until all producers are finished producerFinishLatch.await(); // Put poison pills on the queue (one for each consumer) for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) { queue.offer(POISON_PILL); } // We'll let the program end when all consumers are finished } private static Thread createProducerThread( Queue<Integer> queue, CountDownLatch finishLatch) { return new Thread( () -> { ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i = 0; i < NUMBER_OF_ELEMENTS_TO_PUT_INTO_QUEUE_PER_THREAD; i++) { sleepRandomTime(); Integer element = random.nextInt(1000); queue.offer(element); System.out.printf( "[%s] queue.offer(%3d) --> queue = %s%n", Thread.currentThread().getName(), element, queue); } finishLatch.countDown(); }); } private static Thread createConsumerThread(Queue<Integer> queue) { return new Thread( () -> { while (true) { sleepRandomTime(); Integer element = queue.poll(); System.out.printf( "[%s] queue.poll() = %4d --> queue = %s%n", Thread.currentThread().getName(), element, queue); // End the thread when a poison pill is detected if (element != null && element == POISON_PILL) { break; } } }); } private static void sleepRandomTime() { ThreadLocalRandom random = ThreadLocalRandom.current(); try { Thread.sleep(random.nextInt(MIN_SLEEP_TIME_MILLIS, MAX_SLEEP_TIME_MILLIS)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
Code-Sprache: Java (java)

Hier die ersten 10 Zeilen einer beispielhaften Ausgabe:

[Thread-1] queue.offer(982) --> queue = [982] [Thread-6] queue.poll() = 982 --> queue = [] [Thread-5] queue.poll() = null --> queue = [] [Thread-0] queue.offer(917) --> queue = [917] [Thread-3] queue.offer(224) --> queue = [917, 224] [Thread-2] queue.offer(932) --> queue = [917, 224, 932] [Thread-6] queue.poll() = 917 --> queue = [224, 932] [Thread-4] queue.poll() = 224 --> queue = [932] [Thread-5] queue.poll() = 932 --> queue = [] [Thread-0] queue.offer(607) --> queue = [607] [Thread-1] queue.offer( 87) --> queue = [607, 87] [Thread-3] queue.offer(264) --> queue = [607, 87, 264] [Thread-4] queue.poll() = 607 --> queue = [87, 264] [Thread-0] queue.offer(348) --> queue = [87, 264, 348] [Thread-2] queue.offer(728) --> queue = [87, 264, 348, 728]
Code-Sprache: Klartext (plaintext)

Wir sehen sehr schön, wie die sieben Threads Elemente einfügen und entnehmen. In der dritten Zeile sehen wir auch, dass Thread 5 beim Aufruf von queue.poll() den Wert null geliefert bekommen hat, da die Queue zu diesem Zeitpunkt leer war.

ConcurrentLinkedQueue Performance

Dieser Abschnitt behandelt die Threadsicherheit und Zeitkomplexität von ConcurrentLinkedQueue.

Ist ConcurrentLinkedQueue threadsicher?

Die Threadsicherheit der ConcurrentLinkedQueue wird durch optimistisches Locking erzielt. Genauer gesagt: durch nicht-blockierende Compare-and-set (CAS)-Operationen auf separate VarHandles für den Kopf und das Ende der Queue.

Beim Zugriff auf Queues ist normalerweise niedrige bis moderate Contention (Zugriffskonflikte durch mehrere Threads) zu erwarten, da ein Thread in der Regel nicht ununterbrochen auf die Queue zugreift, sondern das einzustellende Element erstmal erzeugen bzw. das entnommende Element verarbeiten muss.

Bei niedriger bis moderater Contention erzielt optimistisches Locking einen deutlichen Performance-Gewinn gegenüber pessimistischem Locking durch implizite oder explizite Locks.

Unterschiede zu anderen Queues:

  • Bei LinkedBlockingQueue wird die Threadsicherheit durch pessimistisches Locking via zweier ReentrantLocks gewährleistet, was zu besserer Performance bei hoher Contention führen kann.
  • Bei ArrayBlockingQueue wird die Threadsicherheit durch ein einzelnes ReentrantLock gewährleistet.

ConcurrentLinkedQueue Zeitkomplexität

Wie bei allen Queues ist der Aufwand für die Enqueue- und Dequeue-Operationen unabhängig von der Länge der Queue. Die Zeitkomplexität beträgt also O(1).

Dies gilt hingegen nicht für die size()-Methode. Um die Länge der Queue zu bestimmen, muss über alle Elemente der verketteten Liste iteriert werden. Je länger die Queue, desto länger dauert es die Länge zu berechnen. Die Zeitkomplexität für size() beträgt somit O(n).

(Hier findest du eine Einführung in das Thema Zeitkomplexität.)

Zusammenfassung und Ausblick

In diesem Teil der Tutorialserie habe ich dir die konkrete Queue-Implementierung ConcurrentLinkedQueue und deren Eigenschaften vorgestellt.

Im folgenden Teil wird es um die PriorityQueue gehen, die einige Überraschungen bereithält.

Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.