In diesem Artikel lernst du eine sehr spezielle Queue kennen: die LinkedTransferQueue
. Dieser Artikel beschreibt deren Eigenschaften und zeigt dir anhand eines Beispiels, wie du diese Queue einsetzt.
Wir befinden uns nunmehr am untersten Punkt der Queue-Klassenhierarchie:
TransferQueue Interface
Wie im Klassendiagram gut zu erkennen ist, ist java.util.concurrent.LinkedTransferQueue
die einzige Klasse, die das Interface TransferQueue
implementiert.
TransferQueue
definiert zusätzliche Enqueue-Methoden, die nur dann erfolgreich ausgeführt werden können, wenn ein anderer Thread mit take()
oder poll()
das übergebe Element entgegennimmt:
transfer(E e)
– übermittelt das Element an einen Thread, der mittake()
oderpoll()
auf ein Element wartet. Wenn solch ein Thread nicht existiert, blockiert die Methode solange, bis ein anderer Threadtake()
oderpoll()
aufruft.tryTransfer(E e)
– übermittelt das Element an einen Thread, der mittake()
oderpoll()
auf ein Element wartet. Wenn solch ein Thread nicht existiert, gibt die Methode umgehendfalse
zurück.tryTransfer(E e, long timeout, TimeUnit unit)
– übermittelt das Element an einen Thread, der mittake()
oderpoll()
auf ein Element wartet. Wenn solch ein Thread nicht existiert und auch nicht innerhalb der Wartezeit erscheint, gibt die Methodefalse
zurück.
LinkedTransferQueue Eigenschaften
Bei der LinkedTransferQueue
handelt es sich um eine unbounded blocking Queue, d. h. die regulären Enqueue-Operationen put()
und offer()
können nicht blockieren (da die Queue beliebig groß werden kann). Blockieren können hingegen:
- die Dequeue-Operationen (wenn die Queue leer ist)
- und die
transfer()
- bzw.tryTransfer()
-Methoden desTransferQueue
-Interfaces, bis die jeweiligen Elemente entnommen werden.
LinkedTransferQueue
basiert auf einer einfach verketteten Liste. Das hat zur Folge, dass die Zeitkomplexität der size()
-Methode O(n) beträgt¹ (und nicht O(1) wie bei den Array-basierten Queues), da zum Bestimmen der Länge die komplette Liste abgelaufen werden muss.
Threadsicherheit wird durch nicht-blockierende Compare-and-set (CAS)-Operationen erreicht, was eine hohe Performance bei niedriger bis moderater Contention (Zugriffskonflikten durch mehrere Threads) sicherstellt.
Die Eigenschaften im Detail:
Unterliegende Datenstruktur | Thread-safe? | Blocking/ Non-blocking | Fairness Policy | Bounded/ Unbounded | Iterator Type |
---|---|---|---|---|---|
Verkettete Liste | Ja (optimistisches Locking durch Compare-and-set) | Blocking | Nicht verfügbar | Unbounded | Weakly consistent² |
¹ Alles über Zeitkomplexität erfährst du im Artikel "O-Notation und Zeitkomplexität – anschaulich erklärt"
² Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.
Einsatzempfehlung
Die LinkedTransferQueue
wird im JDK nicht verwendet. Ursprünglich wurde sie für das im JDK 7 eingeführte Fork/Join-Framework implementiert, dann aber noch nicht dafür genutzt. Die Wahrscheinlichkeit von Bugs ist daher recht hoch, so dass du auf den Einsatz dieser Klasse verzichten solltest.
LinkedTransferQueue Beispiel
Im folgenden Beispiel (→ Code auf GitHub) werden zwei Threads gestartet, die LinkedTransferQueue.transfer()
aufrufen. Danach wird ein Element direkt in die Queue geschrieben. Dann werden zwei weitere Threads gestartet, die transfer()
aufrufen. Abschließend werden solange Elemente aus der Queue entnommen, bis diese wieder leer ist.
public class LinkedTransferQueueExample {
public static void main(String[] args) throws InterruptedException {
TransferQueue<Integer> queue = new LinkedTransferQueue<>();
// Start 2 threads calling queue.transfer(),
startTransferThread(queue, 1);
startTransferThread(queue, 2);
// ... then put one element directly,
enqueueViaPut(queue, 3);
// ... then start 2 more threads calling queue.transfer().
startTransferThread(queue, 4);
startTransferThread(queue, 5);
// Now take all elements until the queue is empty
while (!queue.isEmpty()) {
dequeueViaTake(queue);
}
}
private static void startTransferThread(TransferQueue<Integer> queue, int element)
throws InterruptedException {
new Thread(() -> enqueueViaTransfer(queue, element)).start();
// Wait a bit to let the thread enqueue the element
Thread.sleep(100);
log(" --> queue = " + queue);
}
private static void enqueueViaTransfer(TransferQueue<Integer> queue, int element) {
log("Calling queue.transfer(%d)...", element);
try {
queue.transfer(element);
log("queue.transfer(%d) returned --> queue = %s", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void enqueueViaPut(TransferQueue<Integer> queue, int element)
throws InterruptedException {
log("Calling queue.put(%d)...", element);
queue.put(element);
log("queue.put(%d) returned --> queue = %s", element, queue);
}
private static void dequeueViaTake(TransferQueue<Integer> queue)
throws InterruptedException {
log(" Calling queue.take() (queue = %s)...", queue);
Integer e = queue.take();
log(" queue.take() returned %d --> queue = %s", e, queue);
// Wait a bit to get the log output in a readable order
Thread.sleep(10);
}
private static void log(String format, Object... args) {
System.out.printf(
Locale.US, "[%-8s] %s%n",
Thread.currentThread().getName(),
String.format(format, args));
}
}
Code-Sprache: Java (java)
Im folgenden siehst du die Ausgabe des Programms:
[Thread-0] Calling queue.transfer(1)...
[main ] --> queue = [1]
[Thread-1] Calling queue.transfer(2)...
[main ] --> queue = [1, 2]
[main ] Calling queue.put(3)...
[main ] queue.put(3) returned --> queue = [1, 2, 3]
[Thread-2] Calling queue.transfer(4)...
[main ] --> queue = [1, 2, 3, 4]
[Thread-3] Calling queue.transfer(5)...
[main ] --> queue = [1, 2, 3, 4, 5]
[main ] Calling queue.take() (queue = [1, 2, 3, 4, 5])...
[main ] queue.take() returned 1 --> queue = [2, 3, 4, 5]
[Thread-0] queue.transfer(1) returned --> queue = [2, 3, 4, 5]
[main ] Calling queue.take() (queue = [2, 3, 4, 5])...
[main ] queue.take() returned 2 --> queue = [3, 4, 5]
[Thread-1] queue.transfer(2) returned --> queue = [3, 4, 5]
[main ] Calling queue.take() (queue = [3, 4, 5])...
[main ] queue.take() returned 3 --> queue = [4, 5]
[main ] Calling queue.take() (queue = [4, 5])...
[main ] queue.take() returned 4 --> queue = [5]
[Thread-2] queue.transfer(4) returned --> queue = [5]
[main ] Calling queue.take() (queue = [5])...
[main ] queue.take() returned 5 --> queue = []
[Thread-3] queue.transfer(5) returned --> queue = []
Code-Sprache: Klartext (plaintext)
Man sieht sehr schön, wie zu Beginn zwei mal transfer()
aufgerufen wird (aber nicht zurückkehrt), wie dann einmal put()
aufgerufen wird (und zurückkehrt) und wie noch zwei mal transfer()
aufgerufen wird (und nicht zurückkehrt).
Danach sehen wir, wie das erste Element entnommen wird und daraufhin auch transfer(1)
zurückkehrt.
Dann wird das zweite Element entnommen und transfer(2)
kehrt zurück.
Die Entnahme der 3 führt zu keiner weiteren Aktion, da diese mit put()
in die Queue geschrieben wurde.
Nach der Entnahme der 4 und der 5 sieht man wieder schön, wie der jeweils zugehörige transfer()
-Aufruf zurückkehrt.
Zusammenfassung und Ausblick
In diesem Artikel hast du das TransferQueue
-Interface und die LinkedTransferQueue
-Implementierung kennengelernt und an einem Beispiel gesehen, wie man diese einsetzen kann.
Im nächsten Teil dieser Tutorialserie findest du eine Zusammenfassung aller Queue-Implementierungen des JDK und eine Übersicht, in welchen Fällen du welche Implementierung verwenden solltest.
Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.