LinkedTransferQueue Feature ImageLinkedTransferQueue Feature Image
HappyCoders Glasses

Java LinkedTransferQueue
(mit Beispiel)

Sven Woltmann
Sven Woltmann
20. April 2022

In diesem Artikel lernst du eine sehr spezielle Queue kennen: die LinkedTransferQueue. Dieser Artikel beschreibt deren Eigenschaften und zeigt dir anhand eines Beispiels, wie du diese Queue einsetzt.

Wir befinden uns nunmehr am untersten Punkt der Queue-Klassenhierarchie:

LinkedTransferQueue in der Klassenhierarchie
LinkedTransferQueue in der Klassenhierarchie

TransferQueue Interface

Wie im Klassendiagram gut zu erkennen ist, ist java.util.concurrent.LinkedTransferQueue die einzige Klasse, die das Interface TransferQueue implementiert.

TransferQueue definiert zusätzliche Enqueue-Methoden, die nur dann erfolgreich ausgeführt werden können, wenn ein anderer Thread mit take() oder poll() das übergebe Element entgegennimmt:

  • transfer(E e) – übermittelt das Element an einen Thread, der mit take() oder poll() auf ein Element wartet. Wenn solch ein Thread nicht existiert, blockiert die Methode solange, bis ein anderer Thread take() oder poll() aufruft.
  • tryTransfer(E e) – übermittelt das Element an einen Thread, der mit take() oder poll() auf ein Element wartet. Wenn solch ein Thread nicht existiert, gibt die Methode umgehend false zurück.
  • tryTransfer(E e, long timeout, TimeUnit unit) – übermittelt das Element an einen Thread, der mit take() oder poll() auf ein Element wartet. Wenn solch ein Thread nicht existiert und auch nicht innerhalb der Wartezeit erscheint, gibt die Methode false zurück.

LinkedTransferQueue Eigenschaften

Bei der LinkedTransferQueue handelt es sich um eine unbounded blocking Queue, d. h. die regulären Enqueue-Operationen put() und offer() können nicht blockieren (da die Queue beliebig groß werden kann). Blockieren können hingegen:

  • die Dequeue-Operationen (wenn die Queue leer ist)
  • und die transfer()- bzw. tryTransfer()-Methoden des TransferQueue-Interfaces, bis die jeweiligen Elemente entnommen werden.

LinkedTransferQueue basiert auf einer einfach verketteten Liste. Das hat zur Folge, dass die Zeitkomplexität der size()-Methode O(n) beträgt¹ (und nicht O(1) wie bei den Array-basierten Queues), da zum Bestimmen der Länge die komplette Liste abgelaufen werden muss.

Threadsicherheit wird durch nicht-blockierende Compare-and-set (CAS)-Operationen erreicht, was eine hohe Performance bei niedriger bis moderater Contention (Zugriffskonflikten durch mehrere Threads) sicherstellt.

Die Eigenschaften im Detail:

Unterliegende DatenstrukturThread-safe?Blocking/
Non-blocking
Fairness
Policy
Bounded/
Unbounded
Iterator Type
Verkettete ListeJa
(optimistisches Locking durch Compare-and-set)
BlockingNicht verfügbarUnboundedWeakly consistent²

¹ Alles über Zeitkomplexität erfährst du im Artikel "O-Notation und Zeitkomplexität – anschaulich erklärt"

² Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.

Einsatzempfehlung

Die LinkedTransferQueue wird im JDK nicht verwendet. Ursprünglich wurde sie für das im JDK 7 eingeführte Fork/Join-Framework implementiert, dann aber noch nicht dafür genutzt. Die Wahrscheinlichkeit von Bugs ist daher recht hoch, so dass du auf den Einsatz dieser Klasse verzichten solltest.

LinkedTransferQueue Beispiel

Im folgenden Beispiel (→ Code auf GitHub) werden zwei Threads gestartet, die LinkedTransferQueue.transfer() aufrufen. Danach wird ein Element direkt in die Queue geschrieben. Dann werden zwei weitere Threads gestartet, die transfer() aufrufen. Abschließend werden solange Elemente aus der Queue entnommen, bis diese wieder leer ist.

public class LinkedTransferQueueExample { public static void main(String[] args) throws InterruptedException { TransferQueue<Integer> queue = new LinkedTransferQueue<>(); // Start 2 threads calling queue.transfer(), startTransferThread(queue, 1); startTransferThread(queue, 2); // ... then put one element directly, enqueueViaPut(queue, 3); // ... then start 2 more threads calling queue.transfer(). startTransferThread(queue, 4); startTransferThread(queue, 5); // Now take all elements until the queue is empty while (!queue.isEmpty()) { dequeueViaTake(queue); } } private static void startTransferThread(TransferQueue<Integer> queue, int element) throws InterruptedException { new Thread(() -> enqueueViaTransfer(queue, element)).start(); // Wait a bit to let the thread enqueue the element Thread.sleep(100); log(" --> queue = " + queue); } private static void enqueueViaTransfer(TransferQueue<Integer> queue, int element) { log("Calling queue.transfer(%d)...", element); try { queue.transfer(element); log("queue.transfer(%d) returned --> queue = %s", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void enqueueViaPut(TransferQueue<Integer> queue, int element) throws InterruptedException { log("Calling queue.put(%d)...", element); queue.put(element); log("queue.put(%d) returned --> queue = %s", element, queue); } private static void dequeueViaTake(TransferQueue<Integer> queue) throws InterruptedException { log(" Calling queue.take() (queue = %s)...", queue); Integer e = queue.take(); log(" queue.take() returned %d --> queue = %s", e, queue); // Wait a bit to get the log output in a readable order Thread.sleep(10); } private static void log(String format, Object... args) { System.out.printf( Locale.US, "[%-8s] %s%n", Thread.currentThread().getName(), String.format(format, args)); } }
Code-Sprache: Java (java)

Im folgenden siehst du die Ausgabe des Programms:

[Thread-0] Calling queue.transfer(1)... [main ] --> queue = [1] [Thread-1] Calling queue.transfer(2)... [main ] --> queue = [1, 2] [main ] Calling queue.put(3)... [main ] queue.put(3) returned --> queue = [1, 2, 3] [Thread-2] Calling queue.transfer(4)... [main ] --> queue = [1, 2, 3, 4] [Thread-3] Calling queue.transfer(5)... [main ] --> queue = [1, 2, 3, 4, 5] [main ] Calling queue.take() (queue = [1, 2, 3, 4, 5])... [main ] queue.take() returned 1 --> queue = [2, 3, 4, 5] [Thread-0] queue.transfer(1) returned --> queue = [2, 3, 4, 5] [main ] Calling queue.take() (queue = [2, 3, 4, 5])... [main ] queue.take() returned 2 --> queue = [3, 4, 5] [Thread-1] queue.transfer(2) returned --> queue = [3, 4, 5] [main ] Calling queue.take() (queue = [3, 4, 5])... [main ] queue.take() returned 3 --> queue = [4, 5] [main ] Calling queue.take() (queue = [4, 5])... [main ] queue.take() returned 4 --> queue = [5] [Thread-2] queue.transfer(4) returned --> queue = [5] [main ] Calling queue.take() (queue = [5])... [main ] queue.take() returned 5 --> queue = [] [Thread-3] queue.transfer(5) returned --> queue = []
Code-Sprache: Klartext (plaintext)

Man sieht sehr schön, wie zu Beginn zwei mal transfer() aufgerufen wird (aber nicht zurückkehrt), wie dann einmal put() aufgerufen wird (und zurückkehrt) und wie noch zwei mal transfer() aufgerufen wird (und nicht zurückkehrt).

Danach sehen wir, wie das erste Element entnommen wird und daraufhin auch transfer(1) zurückkehrt.

Dann wird das zweite Element entnommen und transfer(2) kehrt zurück.

Die Entnahme der 3 führt zu keiner weiteren Aktion, da diese mit put() in die Queue geschrieben wurde.

Nach der Entnahme der 4 und der 5 sieht man wieder schön, wie der jeweils zugehörige transfer()-Aufruf zurückkehrt.

Zusammenfassung und Ausblick

In diesem Artikel hast du das TransferQueue-Interface und die LinkedTransferQueue-Implementierung kennengelernt und an einem Beispiel gesehen, wie man diese einsetzen kann.

Im nächsten Teil dieser Tutorialserie findest du eine Zusammenfassung aller Queue-Implementierungen des JDK und eine Übersicht, in welchen Fällen du welche Implementierung verwenden solltest.

Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.