Früher oder später müssen sich Java-Entwickler mit den Datenstrukturen Queue, Deque und Stack auseinandersetzen. Dieser Artikel erklärt die grundlegenden Funktionen dieser Datenstrukturen und gibt eine ausführliche Übersicht über alle im JDK vorhandenen Implementierungen. Zahlreiche Code-Beispiele sollen dir das Verständnis erleichern.
Im Detail werden folgende Fragen beantwortet:
Die Code-Beispiele findest du wie immer in meinem GitHub-Repository.
In diesem Kapitel beschreibe ich die allgemeine Funktionsweise von Queues, Deques und Stacks. Allgemein heißt: unabhängig von einer bestimmten Programmiersprache.
Eine Queue (auf deutsch: "Warteschlange") ist eine Liste von Elementen, bei der die Elemente auf einer Seite eingefügt und in derselben Reihenfolge auf der anderen Seite wieder entnommen werden:
Eine Queue bietet folgende Operationen:
(Die entsprechenden Methoden der Java-Queue-Implementierungen heißen allerdings anders, wie du weiter unten sehen wirst.)
Da die Reihenfolge der Elemente beibehalten wird, spricht man auch von FIFO-Liste ("First-in-first-out").
Ein Stack (auf deutsch: "Stapelspeicher", "Kellerspeicher" oder kurz "Stapel", "Keller") ist eine Liste von Elementen, bei der die Elemente auf derselben Seite (in Darstellungen klassischerweise oben) eingefügt ("gestapelt") und wieder entnommen werden:
Die Operationen des Stacks lauten:
Da das zuletzt hinzugefügte Element als erstes wieder entnommen wird, spricht man von einer LIFO-Liste ("Last-in-first-out").
Ein Deque (Double-ended queue, ausgesprochen "Deck" – eine deutsche Übersetzung gibt es nicht) ist eine Liste von Elementen, bei der die Elemente sowohl auf der einen als auch auf der anderen Seite eingefügt und entnommen werden können:
Die Operationen lauten analog zur Queue auf beiden Seiten "Enqueue" und "Dequeue":
(Wie bei der Queue heißen auch beim Deque die entsprechenden Methoden der Java-Implementierungen anders, dazu unten mehr.)
Ein Deque kann sowohl als Queue (FIFO) als auch als Stack (LIFO) verwendet werden. Es ist darauf aber nicht beschränkt, da Elemente jederzeit an einer beliebigen Seiten angehängt und entnommen werden können.
Wenn eine Queue nur eine begrenzte Anzahl von Elementen aufnehmen kann, spricht man von einer "bounded Queue". Die maximale Anzahl von Elementen wird mit "Kapazität" (englisch "capacity") bezeichnet und über den Constructor der Queue einmalig festgelegt.
Ist die Anzahl der Elemente in der Queue nicht begrenzt (bzw. nur durch den zur Verfügung stehenden Speicher), spricht man von einer "unbounded Queue".
Dieselbe Definition gilt auch für Deques und Stacks.
Bei den vorgestellten Queue-Operationen können folgende Sonderfälle auftreten:
Je nach Implementierung (dazu später mehr) wird bei einer nicht blockierenden ("non-blocking") Queue ein bestimmter Rückgabewert (false
oder null
) zurückgeliefert oder eine Exception geworfen.
Eine blocking Queue bietet zusätzlich folgende blockierende Operationen:
Blockierende Methoden werden dabei nicht automatisch in der Reihenfolge bedient, in der sie aufgerufen wurden. Die Abarbeitung in Aufrufreihenfolge kann bei einigen Queue-Implementierungen durch eine optionale "Fairness Policy" aktiviert werden. Diese erhöht allerdings den Overhead und verringert damit den Durchsatz der Queue.
Auch diese Definition gilt analog für Deques und Stacks.
Bevor ich die Java-Queue
- und Deque
-Interfaces im Detail vorstelle, möchte ich einen Überblick in Form eines UML-Klassendiagramms geben:
Die Interfaces werde ich in den folgenden Kapiteln beschreiben und anhand von Code-Beispielen erklären.
Das JDK enthält seit Java 5.0 das Interface java.util.Queue
und mehrere Queue-Implementierungen, die sich in diversen Eigenschaften (wie bounded/unbounded, blocking/non-blocking, threadsicher/nicht threadsicher) unterscheiden.
Das java.util.Queue
-Interface (→ Javadoc) erbt von java.util.Collection
und erweitert diese um Methoden, um Elemente anzuhängen, zu entnehmen und das Element am Kopf der Queue anzusehen, ohne es zu entnehmen.
Dabei gibt es für jede der drei Operationen je zwei Methoden, die sich lediglich in der Fehlerbehandlung unterscheiden: Jeweils eine Methode wirft im Fall eines Fehlers eine Exception, die andere liefert einen bestimmten Rückgabewert (false
oder null
). Fehler können zum Beispiel dann auftreten, wenn eine bounded Queue voll ist, oder wenn man versucht aus einer leeren Queue ein Element zu entnehmen.
Wirft Exception | Rückgabewert | |
Element anhängen (enqueue) | add(E e) | offer(E e) |
Element entnehmen (dequeue) | remove() | poll() |
Element ansehen (examine) | element() | peek() |
Hier ein einfaches Beispiel (→ Code in GitHub):
public class QueueExample {
public static void main(String[] args) {
Queue<Integer> queue = new ConcurrentLinkedQueue<>();
queue.offer(1);
queue.offer(2);
queue.offer(3);
System.out.println("queue = " + queue);
System.out.println("queue.peek() = " + queue.peek());
System.out.println("queue.poll() = " + queue.poll());
System.out.println("queue.poll() = " + queue.poll());
System.out.println("queue.poll() = " + queue.poll());
System.out.println("queue.poll() = " + queue.poll());
System.out.println("queue.remove() = " + queue.remove());
}
}
Das Programm fügt mit offer()
die Elemente 1, 2 und 3 in die Queue ein, gibt dann mit peek()
das erste Element aus und entnimmt dann alle Elemente mit poll()
. Nachdem alle Elemente entnommen wurden, wird noch einmal poll()
und dann remove()
aufgerufen.
Das Programm gibt folgendes aus:
queue = [1, 2, 3] queue.peek() = 1 queue.poll() = 1 queue.poll() = 2 queue.poll() = 3 queue.poll() = null Exception in thread "main" java.util.NoSuchElementException at [...]
Wie du siehst, werden die Elemente in FIFO-Reihenfolge ausgegeben, also in derselben Reihenfolge, wie sie eingefügt wurden. Außerdem sieht man schön den Unterschied zwischen poll()
und remove()
: Während poll()
bei leerer Queue den Wert null
zurückliefert, wirft remove()
eine NoSuchElementException
.
Entsprechend würde auch peek()
bei leerer Queue null
zurückliefern und element()
eine NoSuchElementException
werfen.
Das ebenso seit Java 5.0 verfügbare Interface java.util.concurrent.BlockingQueue
(→ Javadoc) erweitert Queue
um zusätzliche blockierende Operationen, die beim Entnehmen eines Elements aus einer leeren Queue darauf warten, dass ein Element vorhanden ist, bzw. beim Einfügen eines Elements in eine volle Queue warten, bis freier Platz verfügbar ist.
Beide Operationen gibt es in jeweils zwei Varianten. Eine, die unbegrenzt lange wartet, und eine, die nach Ablauf einer vorgegebenen Wartezeit aufgibt und false
bzw. null
zurückliefert. Blockierende Methoden, die Exceptions werfen, gibt es nicht.
Wirft Exception (geerbt von Queue) | Rückgabewert (geerbt von Queue) | Blockiert (neu in BlockingQueue) | Timeout (neu in BlockingQueue) | |
Element anhängen (enqueue) | add(E e) | offer(E e) | put(E e) | offer(E e, |
Element entnehmen (dequeue) | remove() | poll() | take() | poll( |
Element ansehen (examine) | element() | peek() | – | – |
Hier ein Beispiel, welches aufgrund der Nebenläufigkeit deutlich komplexer ist als das erste Beispiel (→ Code in GitHub):
public class BlockingQueueExample {
private static final long startTime = System.currentTimeMillis();
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// Start reading from the queue immediately, every 3 seconds
for (int i = 0; i < 10; i++) {
pool.schedule(() -> dequeue(queue), i * 3, TimeUnit.SECONDS);
}
// Start writing to the queue after 3.5 seconds (so there are already 2
// threads waiting), every 1 seconds (so that the queue fills faster than
// it's emptied, so that we see a full queue soon)
for (int i = 0; i < 10; i++) {
int finalI = i;
pool.schedule(() -> enqueue(queue, finalI),
3500 + i * 1000, TimeUnit.MILLISECONDS);
}
pool.shutdown();
try {
pool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void dequeue(BlockingQueue<Integer> queue) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer e = queue.take();
log(" queue.take() returned %d (queue = %s)", e, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void enqueue(BlockingQueue<Integer> queue, int i) {
log("Calling queue.put(%d) (queue = %s)...", i, queue);
try {
queue.put(i);
log("queue.put(%d) returned (queue = %s)", i, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void log(String format, Object... args) {
System.out.printf(Locale.US, "[%4.1fs] [%-16s] %s%n",
(System.currentTimeMillis() - startTime) / 1000.0,
Thread.currentThread().getName(),
String.format(format, args));
}
}
In diesem Beispiel erstellen wir eine blocking, bounded Queue mit der Kapazität 3 und schedulen jeweils zehn Enqueue- und zehn Dequeue-Operationen.
Die Enqueue-Operationen beginnen später, so dass wir am Anfang blockierende Dequeue-Operationen sehen. Außerdem erfolgen die Enqueue-Operationen in kürzen Abständen, so dass die Kapazitätsgrenze der Queue nach einer Weile erreicht ist und wir blockierende Enqueue-Operationen sehen.
Hier die Ausgabe des Programms:
[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])... [ 3.0s] [pool-1-thread-3 ] Calling queue.take() (queue = [])... [ 3.5s] [pool-1-thread-2 ] Calling queue.put(0) (queue = [])... [ 3.5s] [pool-1-thread-2 ] queue.put(0) returned (queue = []) [ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = []) [ 4.5s] [pool-1-thread-10] Calling queue.put(1) (queue = [])... [ 4.5s] [pool-1-thread-10] queue.put(1) returned (queue = []) [ 4.5s] [pool-1-thread-3 ] queue.take() returned 1 (queue = []) [ 5.5s] [pool-1-thread-8 ] Calling queue.put(2) (queue = [])... [ 5.5s] [pool-1-thread-8 ] queue.put(2) returned (queue = [2]) [ 6.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [2])... [ 6.0s] [pool-1-thread-9 ] queue.take() returned 2 (queue = []) [ 6.5s] [pool-1-thread-5 ] Calling queue.put(3) (queue = [])... [ 6.5s] [pool-1-thread-5 ] queue.put(3) returned (queue = [3]) [ 7.5s] [pool-1-thread-6 ] Calling queue.put(4) (queue = [3])... [ 7.5s] [pool-1-thread-6 ] queue.put(4) returned (queue = [3, 4]) [ 8.5s] [pool-1-thread-7 ] Calling queue.put(5) (queue = [3, 4])... [ 8.5s] [pool-1-thread-7 ] queue.put(5) returned (queue = [3, 4, 5]) [ 9.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [3, 4, 5])... [ 9.0s] [pool-1-thread-4 ] queue.take() returned 3 (queue = [4, 5]) [ 9.5s] [pool-1-thread-2 ] Calling queue.put(6) (queue = [4, 5])... [ 9.5s] [pool-1-thread-2 ] queue.put(6) returned (queue = [4, 5, 6]) [10.5s] [pool-1-thread-1 ] Calling queue.put(7) (queue = [4, 5, 6])... [11.5s] [pool-1-thread-10] Calling queue.put(8) (queue = [4, 5, 6])... [12.0s] [pool-1-thread-3 ] Calling queue.take() (queue = [4, 5, 6])... [12.0s] [pool-1-thread-3 ] queue.take() returned 4 (queue = [5, 6, 7]) [12.0s] [pool-1-thread-1 ] queue.put(7) returned (queue = [5, 6, 7]) [12.5s] [pool-1-thread-8 ] Calling queue.put(9) (queue = [5, 6, 7])... [15.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [5, 6, 7])... [15.0s] [pool-1-thread-9 ] queue.take() returned 5 (queue = [6, 7, 8]) [15.0s] [pool-1-thread-10] queue.put(8) returned (queue = [6, 7, 8]) [18.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [6, 7, 8])... [18.0s] [pool-1-thread-5 ] queue.take() returned 6 (queue = [7, 8, 9]) [18.0s] [pool-1-thread-8 ] queue.put(9) returned (queue = [7, 8, 9]) [21.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [7, 8, 9])... [21.0s] [pool-1-thread-6 ] queue.take() returned 7 (queue = [8, 9]) [24.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [8, 9])... [24.0s] [pool-1-thread-7 ] queue.take() returned 8 (queue = [9]) [27.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [9])... [27.0s] [pool-1-thread-4 ] queue.take() returned 9 (queue = [])
Zu Beginn ist die Queue leer, so dass die ersten zwei Leseversuche (nach 0 und 3 s) blockieren.
Nach 3,5 s (nachdem also zwei lesende Threads an der Queue warten) beginnt das Programm sekündlich in die Queue zu schreiben. Man sieht in der Ausgabe schön, wie dabei jeweils ein lesender Thread fortgesetzt wird und das angehängte Element sofort wieder entnimmt (bei 3,5 und 4,5 s).
Da das Programm dreimal so schnell in die Queue schreibt wie es daraus liest, blockiert nach 10,5 s der Versuch eine 7 in die Queue zu schreiben, da diese mit den Elementen [4, 5, 6] ihre Kapazitätsgrenze von 3 erreicht hat.
Erst nachdem nach 12 s die 4 aus der Queue entnommen wurde, kann die 7 eingefügt werden. Für die 8 und die 9 sehen wir ein entsprechendes Verhalten.
Im JDK 6.0 wurden das Interface java.util.Deque
und diverse Deque-Implementierungen aufgenommen. Diese unterscheiden sich ebenso wie die Queue-Implementierungen in den Eigenschaften bounded/unbounded, blocking/non-blocking und threadsicher/nicht threadsicher.
Das Interface java.util.Deque
(→ JavaDoc) erbt von java.util.Queue
und erweitert diese um Methoden, um Elemente am Anfang der Queue anzuhängen, Elemente am Ende der Queue zu entnehmen und das Element am Ende der Queue anzusehen, ohne es zu entnehmen.
Auch diese Methoden unterscheiden sich wieder darin, ob sie im Fehlerfall eine Exception werfen oder einen speziellen Rückgabewert liefern.
Aus Konsistenzgründen wurden die Operationen, die Deque
von Queue
erbt, mit neuem Namen erneut implementiert (beispielsweise Queue.offer()
als Deque.offerLast()
und Queue.poll()
als Deque.pollFirst()
). Diese Fälle erkennst du daran, dass in der folgenden Tabelle zwei Methodennamen angegeben sind.
Anfang | Ende | |||
Wirft Exception | Rückgabewert | Wirft Exception | Rückgabewert | |
Element anhängen (enqueue) | addFirst(E e) | offerFirst(E e) | addLast(E e) | offerLast(E e) |
Element entnehmen (dequeue) | removeFirst() | pollFirst() poll() | removeLast() | pollLast() |
Element ansehen (examine) | getFirst() | peekFirst() peek() | getLast() | peekLast() |
Hier ein Code-Beispiel, dass exakt das Deque produziert, das am Anfang des Artikels grafisch dargestellt wurde (→ Code in GitHub):
public class DequeExample {
public static void main(String[] args) {
Deque<Integer> dequeue = new ArrayDeque<>();
dequeue.offerFirst(20); // -> 20
dequeue.offerFirst(21); // -> 21 20
dequeue.offerFirst(22); // -> 22 21 20
dequeue.offerLast(23); // -> 22 21 20 23
dequeue.offerLast(24); // -> 22 21 20 23 24
dequeue.offerLast(25); // -> 22 21 20 23 24 25
dequeue.offerFirst(26); // -> 26 22 21 20 23 24 25
dequeue.offerFirst(27); // -> 27 26 22 21 20 23 24 25
System.out.println("dequeue = " + dequeue);
System.out.println();
System.out.println("dequeue.offerLast(28) = " + dequeue.offerLast(28));
System.out.println("dequeue.offerFirst(29) = " + dequeue.offerFirst(29));
System.out.println("dequeue = " + dequeue);
while (!dequeue.isEmpty()) {
System.out.println();
System.out.println("dequeue.pollFirst() = " + dequeue.pollFirst());
System.out.println("dequeue.pollLast() = " + dequeue.pollLast());
System.out.println("dequeue = " + dequeue);
}
}
}
Das Programm füllt mit mehreren Aufrufen von offerFirst()
und offerLast()
das Deque, gibt es aus und entnimmt dann abwechselnd von beiden Seiten solange Elemente, bis das Deque leer ist.
Dies ist die Ausgabe des Programms:
dequeue = [27, 26, 22, 21, 20, 23, 24, 25] dequeue.offerLast(28) = true dequeue.offerFirst(29) = true dequeue = [29, 27, 26, 22, 21, 20, 23, 24, 25, 28] dequeue.pollFirst() = 29 dequeue.pollLast() = 28 dequeue = [27, 26, 22, 21, 20, 23, 24, 25] dequeue.pollFirst() = 27 dequeue.pollLast() = 25 dequeue = [26, 22, 21, 20, 23, 24] dequeue.pollFirst() = 26 dequeue.pollLast() = 24 dequeue = [22, 21, 20, 23] dequeue.pollFirst() = 22 dequeue.pollLast() = 23 dequeue = [21, 20] dequeue.pollFirst() = 21 dequeue.pollLast() = 20 dequeue = []
Wer genau hinschaut stellt fest, dass ArrayDeque.toString()
die Elemente in umgekehrter Reihenfolge ausgibt (nämlich vom Kopf zum Ende), als sie in der Grafik am Anfang des Artikels dargestellt wurden und wie sie im Queue-Beispiel von ConcurrentLinkedQueue.toString()
ausgegeben wurden (vom Ende zum Kopf).
Analog zur BlockingQueue
bietet java.util.concurrent.BlockingDeque
(→ Javadoc) zusätzlich blockierende Operationen, die beim Entnehmen eines Elements aus dem Deque warten, bis ein Element verfügbar ist, bzw. die beim Einfügen eines Elements warten, bis Platz verfügbar ist.
Wiederum aus Konsistenzgründen wurden die Operationen, die BlockingDeque
von BlockingQueue
erbt, mit neuem Namen erneut implementiert. Diese Fälle erkennst du wieder daran, dass in einem Tabellenfeld zwei Methodennamen angegeben sind.
Die nunmehr 30 Methoden habe ich in zwei Tabellen aufgeteilt.
Wirft Exception (geerbt von Deque) | Rückgabewert (geerbt von Deque) | Blockiert (neu in BlockingDeque) | Timeout (neu in BlockingDeque) | |
Element anhängen (enqueue) | addFirst(E e) | offerFirst(E e) | putFirst(E e) | offerFirst(E e, |
Element entnehmen (dequeue) | removeFirst() remove() | pollFirst() poll() | takeFirst() take() | pollFirst( |
Element ansehen (examine) | getFirst() | peekFirst() peek() | – | – |
Wirft Exception (geerbt von Deque) | Rückgabewert (geerbt von Deque) | Blockiert (neu in BlockingDeque) | Timeout (neu in BlockingDeque) | |
Element anhängen (enqueue) | addLast(E e) add(E e) | offerLast(E e) offer(E e) | putLast(E e) put(E e) | offerLast(E e, |
Element entnehmen (dequeue) | removeLast() | pollLast() | takeLast() | pollLast( |
Element ansehen (examine) | getLast() | peekLast() | – | – |
Das Code-Beispiel zum BlockingDeque
ist noch länger als die bisherigen. Daher verzichte ich darauf, es hier im Artikel abzudrucken. Du findest es (wie all die anderen Code-Beispiele dieses Artikels) in meinem GitHub-Repository.
Das Beispiel basiert auf dem vorangegangenen BlockingQueue
-Beispiel und fügt Elemente auf einer zufälligen Seiten hinzu bzw. entnimmt sie von einer zufälligen Seite des Deques.
Ebenso alt wie Java ist die seit der Version 1.0 vorhandene Klasse java.util.Stack
(→ Javadoc). Stack
erbt von java.util.Vector
und erweitert diesen um die Stack-Methoden push(E item)
, pop()
und peek()
.
Genau wie Vector
ist auch Stack
threadsicher: Alle Methoden sind synchronized
.
Hier ist ein kleines Beispiel zur Verwendung von Stack
:
public class StackExample {
public static void main(String[] args) {
Stack<Integer> stack = new Stack<>();
stack.push(1);
stack.push(2);
stack.push(3);
System.out.println("stack = " + stack);
System.out.println("stack.peek() = " + stack.peek());
System.out.println("stack.pop() = " + stack.pop());
System.out.println("stack.pop() = " + stack.pop());
System.out.println("stack.pop() = " + stack.pop());
System.out.println("stack.pop() = " + stack.pop());
}
}
Die Ausgabe dieses Programms lautet:
stack = [1, 2, 3] stack.peek() = 3 stack.pop() = 3 stack.pop() = 2 stack.pop() = 1 Exception in thread "main" java.util.EmptyStackException at java.base/java.util.Stack.peek(Stack.java:101) at java.base/java.util.Stack.pop(Stack.java:83) at eu.happycoders.cds.stack.StackExample.main(StackExample.java:16)
Sowohl pop()
als auch peek()
werfen eine EmptyStackException
, wenn der Stack keine Elemente enthält.
Die Java-Entwickler empfehlen Stack nicht mehr zu verwenden. Das Javadoc besagt: "A more complete and consistent set of LIFO stack operations is provided by the Deque interface and its implementations, which should be used in preference to this class."
Was genau bedeutet das? Meiner Meinung nach sollte Stack
aus den folgenden Gründen nicht verwendet werden:
synchronized
ist kein besonders performantes Mittel, um eine Datenstruktur threadsicher zu machen. Bessere Methoden sind ReentrantLock
s und CAS ("Compare-and-swap")-Operationen, wie sind in den Queue- und Deque-Implementierungen zu finden sind.Vector
stellt Stack
Operationen zur Verfügung, die in einem Stack nichts zu suchen haben, wie der Zugriff auf Elemente über deren Index sowie das Einfügen und Löschen von Elementen an beliebigen Positionen.Stack
implementiert kein Interface. Mit der Verwendung von Stack
legt man sich also auf eine bestimmte Implementierung fest.Stattdessen empfehlen die Java-Entwickler eine der Deque-Implementierungen wie bspw. ArrayDeque
zu verwenden. Fairerweise ist zu sagen, dass auch Deques Operationen anbieten, die ein Stack eigentlich nicht zur Verfügung stellen sollte, wie das Einfügen und Entfernen von Elementen am Boden des Stacks.
Eine saubere Stack-Implementierung besteht aus einem Stack-Interface und einer oder mehrerer Implementierungen – wobei nichts dagegen spricht, diese als Adapter um eine bestehende Deque-Implementierung zu realisieren.
Stack-Interface (→ Code in GitHub):
public interface Stack<E> extends Collection<E> {
void push(E item);
E pop();
E peek();
}
Nicht threadsichere Stack-Implementierung auf Basis von ArrayDeque
(→ Code in GitHub):
public class ArrayDequeStack<E> implements Stack<E> {
private final Deque<E> deque = new ArrayDeque<>();
@Override
public void push(E item) {
deque.addFirst(item);
}
@Override
public E pop() {
return deque.removeFirst();
}
@Override
public E peek() {
return deque.peekFirst();
}
// All other Collection methods are forwarded
// to their corresponding methods in deque:
// [...]
}
Bevor wir zu den Queue- und Deque-Implementierungen und ihren Eigenschaften kommen, hier noch ein kurzer Exkurs zu den zugrunde liegenden Datenstrukturen: Arrays und verkettete Listen ("linked lists").
Für gewöhnlich liefern Array-basierte Datenstrukturen in Java eine deutlich besseren Performance.
Zum einen ist der Speicherbedarf bei einem Array deutlich geringer*:
ArrayList
): 4 Bytes pro Eintrag.LinkedList
): 24 Bytes pro Eintrag – jeder Eintrag benötigt ein Node
-Objekt, und diese benötigen jeweils 12 Bytes für den Objekt-Header + 3 × 4 Bytes für die item
-, next
- und prev
-Referenzen.prev
-Referenzen, doch dafür werden pro Node
-Objekt 4 Füllbytes ("pad bytes") eingefügt, da Objekte immer ein Vielfaches von 8 Bytes an Speicher belegen müssen.Zum anderen haben Arrays den Vorteil, dass deren Elemente an aufeinanderfolgenden Speicheradressen liegen, so dass beim Zugriff auf den Hauptspeicher alle Array-Elemente, die auf derselben Memory Page liegen, gleichzeitig in den CPU-Cache geladen werden.
Die Knoten einer verketteten Listen hingegen können quer über den Heap verteilt sein, so dass beim Iterieren über eine verkettete Liste im worst case für jedes Element eine neue Memory Page in den CPU-Cache geladen werden muss.
Ein dritter Nachteil von verketteten Listen ist, dass diese für den Garbage Collector einen höheren Aufwand darstellen als Arrays, da der GC bei der "Reachability Analysis" allen Knoten der Liste folgen muss, um erreichbare Objekte zu markieren.
Grundsätzlich sollte man daher Array-basierte Datenstrukturen vorziehen. Abhängig von der Art der Verwendung und der Implementierung der Threadsicherheit können jedoch auch auf verketteten Listen basierte Queues/Deques schneller sein. Ich werde darauf bei der Beschreibung der einzelnen Queue/Deque-Implementierungen näher eingehen.
(* Ich gehe bei den Speicherbetrachtungen von einer 64-Bit-JVM mit aktivierten Compressed Oops aus.)
In den folgenden Abschnitten findest du Beschreibungen und die wichtigsten Eigenschaften aller im JDK verfügbaren Queue- und Deque-Implementierungen – zunächst die nicht blockierenden ("non-blocking") Varianten, gefolgt von den blockierenden ("blocking").
Zu jeder Implementierung empfehle ich sinnvolle Einsatzzwecke.
Die Klasse java.util.ArrayDeque
(→ JavaDoc) basiert – wie der Name schon sagt – auf einem Array. Auf dessen Basis wird ein Ringpuffer ("circular buffer") implementiert. Wie der genau funktioniert, kannst du in diesem Wikipedia-Artikel nachlesen.
Der Ringbuffer hat den Vorteil, das weder beim Einfügen noch beim Entnehmen von Elementen auf beiden Seiten des Deques Elemente innerhalb des Arrays verschoben werden müssen. Wir bekommen hier also die Flexibilität einer verketteten Liste ohne deren Performance-Nachteile.
Das Array wächst bei Bedarf, wird jedoch weder automatisch verkleinert, noch kann es manuell verkleinert werden.
Die weiteren Eigenschaften von ArrayDeque
:
ConcurrentModificationException
) noch weakly consistent (d. h. es werden keinerlei Garantien bzgl. der Konsistenz bei nebenläufiger Änderung gemacht).size()
: O(1)Das ArrayDeque
ist für single-threaded Anwendungen (und nur dafür) eine gute Wahl. Die Tatsache, dass das zugrunde liegende Array nicht verkleinert werden kann, sollte im Auge behalten werden.
Ein ArrayDeque
-Code-Beispiel findest du weiter oben im Abschnitt "Java Deque Beispiel".
Die Klasse java.util.LinkedList
(→ Javadoc) implementiert eine klassische doppelt verkettete Liste. Sie existiert im JDK seit Version 1.2, also deutlich länger als das Deque
-Interface. Die Deque-Methoden wurden mit Einführung dieses Interfaces im JDK 6.0 hinzugefügt.
Die weiteren Eigenschaften von LinkedList
sind:
ConcurrentModificationException
.size()
: O(1) – die Größe wird in einem int
-Feld der LinkedList
gespeichert, d. h. zur Bestimmung der Größe muss nicht über die komplette Liste iteriert werden (was eine Zeitkomplexität von O(n) zur Folge hätte).Ich rate grundsätzlich von der Verwendung der LinkedList
ab. Wenn du eine Liste benötigst, ist ArrayList
aus den o. g. Performance-Gründen die bessere Wahl; wenn du eine nicht threadsichere Queue oder ein nicht threadsicheres Deque benötigst, dann verwende ein ArrayDeque
.
Um mit der LinkedList
zu experimentieren, kannst du das oben vorgestellte "Java Deque Beispiel" kopieren und die Zeile
Deque dequeue = new ArrayDeque<>();
durch
Deque dequeue = new LinkedList<>();
ersetzen.
Kommen wir zu den ersten threadsicheren Queue- und Deque-Implementierungen.
Die Klasse java.util.concurrent.ConcurrentLinkedQueue
(→ Javadoc) basiert auf einer einfach verketteten Liste; java.util.concurrent.ConcurrentLinkedDeque
(→ Javadoc) auf einer doppelt verketteten Liste.
Beide Implementierungen garantieren Threadsicherheit durch nicht-blockierende CAS ("Compare-and-swap")-Operationen auf VarHandles, wodurch ein deutlicher Performance-Gewinn gegenüber der Verwendung von synchronized
oder Locks wie ReentrantLock
erzielt wird.
Die weiteren Eigenschaften von ConcurrentLinkedQueue
und ConcurrentLinkedDeque
:
size()
: O(n), da für die Bestimmung der Größe über alle Elemente der Queue iteriert wird. Wenn während der Bestimmung der Größe Queue-Elemente hinzugefügt oder entnommen werden, kann das Ergebnis ungenau sein.ConcurrentLinkedQueue
und ConcurrentLinkedDeque
sind eine gute Wahl, wenn eine nicht blockierende, unbounded und threadsichere Queue (bzw ein entsprechendes Deque) benötigt wird.
Dies gilt trotz meiner allgemeinen Empfehlung Array-basierte Datenstrukturen vorzuziehen. Das ArrayDeque
ist nicht thread-safe. Und die im nächsten Kapitel beschriebene ArrayBlockingQueue
ist zum einen bounded und zum anderen wird die Threadsicherheit über ein einzelnes ReentrantLock
implementiert, was wesentlich unperformanter ist als Compare-and-swap auf VarHandles.
Die ConcurrentLinkedQueue
haben wir bereits im ersten Code-Beispiel im Abschnitt "Java Queue Beispiel" eingesetzt.
Das folgende Beispiel demonstriert die Threadsicherheit von ConcurrentLinkedDeque
. Vier schreibende und drei lesende Threads fügen nebenläufig Elemente auf zufälligen Seiten des Deques hinzu und entfernen sie (→ Code in GitHub):
public class ConcurrentLinkedDequeExample {
private static final Deque<Integer> deque = new ConcurrentLinkedDeque<>();
public static void main(String[] args) {
// Start 4 writing threads
for (int i = 0; i < 4; i++)
new ModifyingThread(true).start();
// Start 3 reading threads
for (int i = 0; i < 3; i++)
new ModifyingThread(false).start();
}
private static final class ModifyingThread extends Thread {
private final boolean write;
private ModifyingThread(boolean write) {
this.write = write;
}
@Override
public void run() {
for (; ; ) {
ThreadLocalRandom random = ThreadLocalRandom.current();
try {
Thread.sleep(random.nextInt(500, 2000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
boolean first = random.nextBoolean();
if (write) {
Integer e = random.nextInt(1000);
if (first) {
deque.offerFirst(e);
System.out.printf("deque.offerFirst(%3d) --> deque = %s%n",
e, deque);
} else {
deque.offerLast(e);
System.out.printf("deque.offerLast(%3d) --> deque = %s%n",
e, deque);
}
} else {
if (first)
System.out.printf(" deque.pollFirst() = %4d --> deque = %s%n",
deque.pollFirst(), deque);
else
System.out.printf(" deque.pollLast() = %4d --> deque = %s%n",
deque.pollLast(), deque);
}
}
}
}
}
Eine beispielhafte Ausgabe:
deque.offerFirst(388) --> deque = [388] deque.offerLast(900) --> deque = [388, 900] deque.pollFirst() = 388 --> deque = [900] deque.offerLast(906) --> deque = [900, 906] deque.pollLast() = 906 --> deque = [900] deque.pollFirst() = 900 --> deque = [] deque.offerLast(727) --> deque = [727] deque.offerFirst(720) --> deque = [720, 727] deque.pollLast() = 727 --> deque = [720] deque.offerFirst(415) --> deque = [415, 720] deque.offerLast(614) --> deque = [415, 720, 614] deque.offerFirst(130) --> deque = [130, 415, 720, 614]
Bei der Klasse java.util.PriorityQueue
(→ Javadoc) handelt es sich nicht um eine Queue im klassischen Sinne, da die Elemente nicht in FIFO-Reihenfolge entnommen werden, sondern entsprechend ihrer natürlichen Ordnung bzw. entsprechend eines dem Constructor übergebenen Comparator
s.
Am Kopf der Queue befindet sich das jeweils kleinste Element. Die Sortierung ist dabei nicht stabil, d. h. zwei Elemente, die entsprechend der Sortierreihenfolge an gleicher Position stehen, werden nicht zwangsläufig in derselben Reihenfolge entnommen, wie sie in die Queue eingefügt wurden.
Der weiteren Eigenschaften von PriorityQueue
sind:
ConcurrentModificationException
.size()
: O(1)Die PriorityQueue
kann genau dann eingesetzt werden, wenn eine nicht threadsichere Queue mit oben beschriebener Dequeue-Reihenfolge benötigt wird.
Allerdings ist zu beachten, dass die PriorityQueue
im JDK nur an sehr wenigen Stellen eingesetzt wird und somit eine gewisse Wahrscheinlichkeit für das Vorhandensein von Bugs existiert (was wenig genutzt wird, wird auch wenig getestet).
Hier ist ein kleines Beispiel, in dem zehn Zufallszahlen in eine PriorityQueue
geschrieben und dann wieder entnommen werden (→ Code in GitHub):
public class PriorityQueueExample {
public static void main(String[] args) {
Queue<Integer> queue = new PriorityQueue<>();
// Enqueue random numbers
for (int i = 0; i < 10; i++) {
Integer e = ThreadLocalRandom.current().nextInt(100);
queue.offer(e);
System.out.printf("queue.offer(%2d) --> queue = %s%n", e, queue);
}
// Dequeue all elements
while (!queue.isEmpty()) {
System.out.printf("queue.poll() = %2d --> queue = %s%n",
queue.poll(), queue);
}
}
}
Es folgt eine beispielhafte Ausgabe des Programms, in der gut zu sehen ist, wie die Elemente in der Queue in keiner bestimmten Reihenfolge angezeigt werden (lediglich das kleinste Element ist immer vorne) und wie sie letztendlich in aufsteigender Reihenfolge entnommen werden:
queue.offer(44) --> queue = [44] queue.offer(18) --> queue = [18, 44] queue.offer(58) --> queue = [18, 44, 58] queue.offer(98) --> queue = [18, 44, 58, 98] queue.offer(45) --> queue = [18, 44, 58, 98, 45] queue.offer(83) --> queue = [18, 44, 58, 98, 45, 83] queue.offer(31) --> queue = [18, 44, 31, 98, 45, 83, 58] queue.offer(21) --> queue = [18, 21, 31, 44, 45, 83, 58, 98] queue.offer(22) --> queue = [18, 21, 31, 22, 45, 83, 58, 98, 44] queue.offer(19) --> queue = [18, 19, 31, 22, 21, 83, 58, 98, 44, 45] queue.poll() = 18 --> queue = [19, 21, 31, 22, 45, 83, 58, 98, 44] queue.poll() = 19 --> queue = [21, 22, 31, 44, 45, 83, 58, 98] queue.poll() = 21 --> queue = [22, 44, 31, 98, 45, 83, 58] queue.poll() = 22 --> queue = [31, 44, 58, 98, 45, 83] queue.poll() = 31 --> queue = [44, 45, 58, 98, 83] queue.poll() = 44 --> queue = [45, 83, 58, 98] queue.poll() = 45 --> queue = [58, 83, 98] queue.poll() = 58 --> queue = [83, 98] queue.poll() = 83 --> queue = [98] queue.poll() = 98 --> queue = []
In den folgenden Abschnitten beschreibe ich alle blockierenden Queue- und Deque-Implementierungen des JDK, d. h. diejenigen Klassen, die die Interfaces BlockingQueue
bzw. BlockingDeque
implementieren.
Die Klasse java.util.concurrent.ArrayBlockingQueue
(→ Javadoc) basiert – wie das ArrayDeque
– auf einem Array, ist allerdings
Die Threadsicherheit wird durch ein einzelnes ReentrantLock
gewährleistet, so dass es bei gleichzeitigen Schreib- und Lesezugriffen zu einer hohen Anzahl an Zugangskonflikten ("thread contention") kommen kann.
Die weiteren Eigenschaften der ArrayBlockingQueue
sind:
size()
: O(1)Aufgrund der möglicherweise hohen Contention bei gleichzeitigem Schreib- und Lesezugriff solltest du – wenn du eine blockierende, threadsichere Queue benötigst – für deinen speziellen Einsatzweck testen, ob evtl. eine LinkedBlockingQueue
performanter ist. Diese basiert zwar auf einer verketteten Liste, verwendet allerdings zum Schreiben und Lesen zwei separate ReentrantLock
s, was die Zugangskonflikte reduziert.
Um die ArrayBlockingQueue
auszuprobieren, kannst du das "Java BlockingQueue Beispiel" von oben kopieren und
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
durch
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
ersetzen.
Die Klassen java.util.concurrent.LinkedBlockingQueue
(→ Javadoc) und java.util.concurrent.LinkedBlockingDeque
(→ Javadoc) basieren – genau wie ConcurrentLinkedQueue
und ConcurrentLinkedDeque
– auf verketteten Listen, sind allerdings – ebenso wie ArrayBlockingQueue
–
Die Threadsicherheit der ConcurrentLinkedQueue
wird im Gegensatz zur ArrayBlockingQueue
nicht mit einem, sondern mit zwei separaten ReentrantLock
s gewährleistet – eines für Schreib- und eines für Lesezugriff. Dies reduziert die Zugangskonflikte zwischen Producer- und Consumer-Threads.
Die weiteren Eigenschaften von LinkedBlockingQueue
und LinkedBlockingDeque
sind:
size()
: O(1) – die Implementierungen verwenden intern ein AtomicInteger
, um die Größe verfügbar zu halten (im Gegensatz zu ConcurrentLinkedQueue
und ConcurrentLinkedDeque
, die bei der Abfrage der Größe über alle Elemente iterieren müssen).Ich empfehle LinkedBlockingQueue
und LinkedBlockingDeque
, wenn du blockierende, threadsichere Queues/Deques brauchst.
Die Klasse LinkedBlockingQueue
wird von Executors.newFixedThreadPool()
und Executors.newSingleThreadedExecutor()
als "work queue" für den Executor verwendet. Sie wird somit intensiv genutzt, was die Wahrscheinlichkeit für Bugs äußerst gering hält.
Code-Beispiele für LinkedBlockingQueue
und LinkedBlockingDeque
findest du in den Abschnitten "Java BlockingQueue Beispiel" und "Java BlockingDeque Beispiel" weiter oben.
Bei der java.util.concurrent.PriorityBlockingQueue
(→ Javadoc) handelt es sich um eine threadsichere und blockierende Variante der PriorityQueue
.
Threadsicherheit wird durch ein einzelnes ReentrantLock
sichergestellt.
Im Gegensatz zu allen bisher vorgestellten blockierenden Implementierungen ist die PriorityBlockingQueue
nicht bounded, sie hat also keine Kapazitätsgrenze. Das bedeutet, dass die Methode add(e)
keine Exception wirft, offer(e)
niemals false
zurückliefert und put(e)
und offer(e, time, unit)
niemals blockieren. Nur die Dequeue-Operationen blockieren bzw. liefern null
zurück, wenn die Queue leer ist.
Die weiteren Eigenschaften der PriorityBlockingQueue
:
PriorityQueue
werden die Elemente in einem Array gespeichert, das einen Min-Heap repräsentiert; der Iterator durchläuft die Elemente in entsprechender Reihenfolge.size()
: O(1)Die PriorityBlockingQueue
wird im JDK nicht genutzt. Es ist also eine gewisse Wahrscheinlichkeit vorhanden, dass sie Bugs enthält. Wenn du eine Queue mit entsprechenden Eigenschaften benötigst und die PriorityBlockingQueue
verwendest, solltest du intensiv testen.
Als Beispiel für die PriorityBlockingQueue
habe ich den Code aus dem Abschnitt "Java BlockingQueue Beispiel" leicht abgewandelt: Statt einer LinkedBlockingQueue
wird die PriorityBlockingQueue
verwendet; und statt der Zahlen 0 bis 9 werden Zufallszahlen in die Queue geschrieben.
Das Beispiel drucke ich hier aufgrund der minimalen Code-Änderungen nicht ab. Du findest es hier in meinem GitHub-Repository. Wenn du es laufen lässt, wirst du sehen, dass bei der PriorityBlockingQueue
– genau wie bei der PriorityQueue
– immer das kleinste Element am Kopf der Queue steht und als nächstes entnommen wird.
Zum Ende kommen wir zu ein paar sehr speziellen Queue-Implementierungen des JDK.
Die Klasse java.util.concurrent.DelayQueue
(→ Javadoc) ist – wie die PriorityQueue
, die sie intern verwendet – keine FIFO-Queue. Stattdessen können Elemente erst dann entnommen werden, wenn eine dem Element zugeordnete Wartezeit ("delay") abgelaufen ist.
Dazu müssen die Elemente das Interface java.util.concurrent.Delayed
(→ Javadoc) und dessen getDelay()
-Methode implementieren. Diese liefert bei jeden Aufruf die restliche Wartezeit zurück, die noch ablaufen muss, bis das Element aus der Queue entnommen werden darf.
Genau wie die PriorityBlockingQueue
ist auch die DelayQueue
blocking aber unbounded, kann also beliebig viele Elemente aufnehmen und blockiert nur bei der Entnahme.
Threadsicherheit wird durch ein einzelnes ReentrantLock
sichergestellt.
Die weiteren Eigenschaften der DelayQueue
:
size()
: O(1) wie bei der zugrunde liegenden PriorityQueue
.Ich habe die DelayQueue
noch nie benötigt und kann sie für keinen mir bekannten, sinnvollen Einsatzzweck empfehlen. Sie wird im JDK nur ein einziges Mal genutzt (in einer alten Swing-Klasse, die mit einem ScheduledExecutorService
wahrscheinlich eleganter hätte implementiert werden können), von daher ist nicht ausgeschlossen, dass sie Fehler enthält.
In folgendem Beispiel (→ Code in GitHub) wird eine DelayQueue
mit Objekten der Klasse DelayedElement
gefüllt, welche eine Zufallszahl enthalten sowie ein zufälliges initiales Delay zwischen 100 und 1.000 ms. Danach wird so oft poll()
aufgerufen, bis die Queue wieder leer ist.
public class DelayQueueExample {
public static void main(String[] args) {
BlockingQueue<DelayedElement<Integer>> queue = new DelayQueue<>();
ThreadLocalRandom tlr = ThreadLocalRandom.current();
long startTime = System.currentTimeMillis();
// Enqueue random numbers with random initial delays
for (int i = 0; i < 7; i++) {
DelayedElement<Integer> e = new DelayedElement<>(
tlr.nextInt(10, 100),
tlr.nextInt(100, 1000));
queue.offer(e);
System.out.printf("[%3dms] queue.offer(%s) --> queue = %s%n",
System.currentTimeMillis() - startTime, e, queue);
}
// Dequeue all elements
while (!queue.isEmpty()) {
try {
DelayedElement<Integer> e = queue.take();
System.out.printf("[%3dms] queue.poll() = %s --> queue = %s%n",
System.currentTimeMillis() - startTime, e, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Und hier die zugehörige Klasse DelayedElement
(→ Code in GitHub). Die Sortierung ist – um den Code nicht noch länger zu machen – nicht stabil, d. h. wenn zwei Elemente mit derselben Wartezeit in die Queue gestellt werden, werden sie in zufälliger Reihenfolge wieder entnommen.
public class DelayedElement<E extends Comparable<E>> implements Delayed {
private final E e;
private final long initialDelayMillis;
private final long expiration;
public DelayedElement(E e, long initialDelayMillis) {
this.e = e;
this.initialDelayMillis = initialDelayMillis;
this.expiration = System.currentTimeMillis() + initialDelayMillis;
}
@Override
public long getDelay(TimeUnit unit) {
long remainingDelayMillis = expiration - System.currentTimeMillis();
return unit.convert(remainingDelayMillis, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
DelayedElement<?> other = (DelayedElement<?>) o;
return Long.compare(expiration, other.expiration);
}
@Override
public String toString() {
return "{" + e + ", " + initialDelayMillis + "ms}";
}
}
Hier eine beispielhafte Ausgabe des Programms. Es ist gut zu sehen, wie die Queue zwar nicht sortiert ist, das Element mit der kürzesten Wartezeit sich jedoch immer vorne (links) befindet und dass die Elemente nach Ablauf ihrer jeweiligen Wartezeit entnommen werden:
[ 1ms] queue.offer({19, 713ms}) --> queue = [{19, 713ms}] [ 28ms] queue.offer({15, 643ms}) --> queue = [{15, 643ms}, {19, 713ms}] [ 29ms] queue.offer({35, 253ms}) --> queue = [{35, 253ms}, {19, 713ms}, {15, 643ms}] [ 29ms] queue.offer({11, 781ms}) --> queue = [{35, 253ms}, {19, 713ms}, {15, 643ms}, {11, 781ms}] [ 29ms] queue.offer({17, 844ms}) --> queue = [{35, 253ms}, {19, 713ms}, {15, 643ms}, {11, 781ms}, {17, 844ms}] [ 29ms] queue.offer({40, 490ms}) --> queue = [{35, 253ms}, {19, 713ms}, {40, 490ms}, {11, 781ms}, {17, 844ms}, {15, 643ms}] [ 30ms] queue.offer({39, 119ms}) --> queue = [{39, 119ms}, {19, 713ms}, {35, 253ms}, {11, 781ms}, {17, 844ms}, {15, 643ms}, {40, 490ms}] [151ms] queue.poll() = {39, 119ms} --> queue = [{35, 253ms}, {19, 713ms}, {40, 490ms}, {11, 781ms}, {17, 844ms}, {15, 643ms}] [283ms] queue.poll() = {35, 253ms} --> queue = [{40, 490ms}, {19, 713ms}, {15, 643ms}, {11, 781ms}, {17, 844ms}] [520ms] queue.poll() = {40, 490ms} --> queue = [{15, 643ms}, {19, 713ms}, {17, 844ms}, {11, 781ms}] [673ms] queue.poll() = {15, 643ms} --> queue = [{19, 713ms}, {11, 781ms}, {17, 844ms}] [716ms] queue.poll() = {19, 713ms} --> queue = [{11, 781ms}, {17, 844ms}] [811ms] queue.poll() = {11, 781ms} --> queue = [{17, 844ms}] [874ms] queue.poll() = {17, 844ms} --> queue = []
Eine weitere Spezialqueue ist die java.util.concurrent.LinkedTransferQueue
(→ Javadoc). Es handelt sich – wie die PriorityBlockingQueue
– um eine unbounded blocking Queue, d. h. nur die Dequeue-Operationen können zu Fehlern führen oder blockieren.
LinkedTransferQueue
basiert auf einer verketteten Liste, und Threadsicherheit wird genau wie bei ConcurrentLinkedQueue
und ConcurrentLinkedDeque
durch nicht-blockierende Compare-and-swap-Operationen erreicht.
Das besondere an dieser Queue ist, dass sie zusätzliche transfer()
- und tryTransfer()
-Methoden bereitstellt, die solange blockieren, bis das per transfer()
/tryTransfer()
angehängte Element wieder aus der Queue entnommen wurde (im Gegensatz zu: bis Platz in der Queue frei wurde). Für die Details dieser Methoden verweise ich auf das oben verlinkte Javadoc.
Die weiteren Eigenschaften der LinkedTransferQueue
:
size()
: O(n) – es wird über die komplette Queue iteriert, um die Größe zu bestimmen.Die LinkedTransferQueue
wird im JDK nicht verwendet. Ursprünglich wurde sie für das im JDK 7 eingeführte Fork/Join-Framework implementiert, dann aber noch nicht dafür genutzt. Die Wahrscheinlichkeit von Bugs ist daher recht hoch, so dass auf den Einsatz dieser Klasse verzichtet werden sollte.
Im folgenden Beispiel (→ Code in GitHub) werden fünf Threads gestartet, die LinkedTransferQueue.transfer()
aufrufen. Danach werden solange Elemente aus der Queue entnommen, bis diese wieder leer ist.
public class LinkedTransferQueueExample {
public static void main(String[] args) {
LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();
// Start 5 threads calling queue.transfer()
for (int i = 0; i < 5; i++) {
final int finalI = i;
new Thread(() -> {
log("Calling queue.transfer(%d) (queue = %s)...", finalI, queue);
try {
queue.transfer(finalI);
log("queue.transfer(%d) returned (queue = %s)", finalI, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log("queue = " + queue);
// Now take all elements until the queue is empty
while (!queue.isEmpty()) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer e = queue.take();
log(" queue.take() returned %d (queue = %s)", e, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static void log(String format, Object... args) {
System.out.printf(Locale.US, "[%-8s] %s%n",
Thread.currentThread().getName(),
String.format(format, args));
}
}
In der Ausgabe des Beispiels ist gut zu erkennen, wie die queue.transfer()
-Aufrufe zunächst blockieren und immer dann ein Aufruf zurückkehrt, wenn queue.take()
den von transfer()
übergebenen Wert entnommen hat:
[Thread-0] Calling queue.transfer(0) (queue = [])... [Thread-1] Calling queue.transfer(1) (queue = [0])... [Thread-2] Calling queue.transfer(2) (queue = [0, 1])... [Thread-3] Calling queue.transfer(3) (queue = [0, 1, 2])... [Thread-4] Calling queue.transfer(4) (queue = [0, 1, 2, 3])... [main ] queue = [0, 1, 2, 3, 4] [main ] Calling queue.take() (queue = [0, 1, 2, 3, 4])... [main ] queue.take() returned 0 (queue = [1, 2, 3, 4]) [Thread-0] queue.transfer(0) returned (queue = [1, 2, 3, 4]) [main ] Calling queue.take() (queue = [1, 2, 3, 4])... [main ] queue.take() returned 1 (queue = [2, 3, 4]) [Thread-1] queue.transfer(1) returned (queue = [2, 3, 4]) [main ] Calling queue.take() (queue = [2, 3, 4])... [main ] queue.take() returned 2 (queue = [3, 4]) [Thread-2] queue.transfer(2) returned (queue = [3, 4]) [main ] Calling queue.take() (queue = [3, 4])... [main ] queue.take() returned 3 (queue = [4]) [Thread-3] queue.transfer(3) returned (queue = [4]) [main ] Calling queue.take() (queue = [4])... [main ] queue.take() returned 4 (queue = []) [Thread-4] queue.transfer(4) returned (queue = [])
Kommen wir zur letzten blockierenden Spezialqueue, der java.util.concurrent.SynchronousQueue
(→ Javadoc). “Synchronous” ist dabei nicht mit "synchronized" zu verwechseln. Es bedeutet vielmehr, dass jede Enqueue-Operation auf eine korrespondierende Dequeue-Operation warten muss und jede Dequeue-Operation auf eine Enqueue-Operation.
Eine SynchronousQueue
enthält niemals Elemente, auch dann nicht, wenn Enqueue-Operationen gerade auf Dequeue-Operation warten. Analog dazu ist die Größe einer SynchronousQueue
immer 0, und peek()
liefert immer null
zurück.
Die SynchronousQueue
ist neben der ArrayBlockingQueue
die zweite Queue-Implementierung, die eine Fairness Policy anbietet. Hierbei gibt es eine Besonderheit: Wenn die Fairness Policy nicht aktiviert ist, werden blockierende Aufrufe laut Dokumentation in unspezifizierter Reihenfolge bedient. Tatsächlich ist es jedoch so, dass diese exakt in umgekehrter Reihenfolge bedient werden (also in LIFO-Reihenfolge), da intern ein Stack verwendet wird.
Die weiteren Eigenschaften der SynchronousQueue
:
size()
: O(1) – size()
liefert immer 0 zurück.Genau wie die DelayQueue
und die LinkedTransferQueue
habe ich auch die SynchronousQueue
in eigenen Projekten noch nie direkt eingesetzt.
Sollten ihre Eigenschaften zu deinen Anforderungen passen, kannst du sie bedenkenlos verwenden. Im JDK wird die SynchronousQueue
von Executors.newCachedThreadPool()
als „work queue“ für den Executor eingesetzt; die Wahrscheinlichkeit von Bugs ist also äußerst gering.
Im folgende Beispiel (→ Code in GitHub) werden zunächst drei Threads gestartet, die queue.put()
aufrufen, danach sechs Threads, die queue.take()
aufrufen und anschließend noch einmal drei Threads, die queue.put()
ausführen:
public class SynchronousQueueExample {
private static boolean FAIR = false;
public static void main(String[] args) {
BlockingQueue<Integer> queue = new SynchronousQueue<>(FAIR);
// Start 3 producing threads
for (int i = 0; i < 3; i++) {
final int finalI = i;
new Thread(() -> enqueue(queue, finalI)).start();
sleep(250);
}
// Start 6 consuming threads
for (int i = 0; i < 6; i++) {
new Thread(() -> dequeue(queue)).start();
sleep(250);
}
// Start 3 more producing threads
for (int i = 3; i < 6; i++) {
final int finalI = i;
new Thread(() -> enqueue(queue, finalI)).start();
sleep(250);
}
}
private static void enqueue(BlockingQueue<Integer> queue, int finalI) {
log("Calling queue.put(%d) (queue = %s)...", finalI, queue);
try {
queue.put(finalI);
log("queue.put(%d) returned (queue = %s)", finalI, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void dequeue(BlockingQueue<Integer> queue) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer e = queue.take();
log(" queue.take() returned %d (queue = %s)", e, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void log(String format, Object... args) {
System.out.printf(Locale.US, "[%-9s] %s%n",
Thread.currentThread().getName(),
String.format(format, args));
}
}
In der Ausgabe kannst du sehen, wie die ersten drei Aufrufe von queue.put()
solange blockieren, bis die eingefügten Elemente mit queue.take()
in umgekehrter Reihenfolge entnommen werden.
Danach blockieren die drei folgenden Aufrufe von queue.take()
solange bis mit queue.put()
drei weitere Elemente in die Queue geschrieben wurden.
Die Queue bleibt während der gesamten Zeit leer.
[Thread-0 ] Calling queue.put(0) (queue = [])... [Thread-1 ] Calling queue.put(1) (queue = [])... [Thread-2 ] Calling queue.put(2) (queue = [])... [Thread-3 ] Calling queue.take() (queue = [])... [Thread-3 ] queue.take() returned 2 (queue = []) [Thread-2 ] queue.put(2) returned (queue = []) [Thread-4 ] Calling queue.take() (queue = [])... [Thread-4 ] queue.take() returned 1 (queue = []) [Thread-1 ] queue.put(1) returned (queue = []) [Thread-5 ] Calling queue.take() (queue = [])... [Thread-5 ] queue.take() returned 0 (queue = []) [Thread-0 ] queue.put(0) returned (queue = []) [Thread-6 ] Calling queue.take() (queue = [])... [Thread-7 ] Calling queue.take() (queue = [])... [Thread-8 ] Calling queue.take() (queue = [])... [Thread-9 ] Calling queue.put(3) (queue = [])... [Thread-9 ] queue.put(3) returned (queue = []) [Thread-8 ] queue.take() returned 3 (queue = []) [Thread-10] Calling queue.put(4) (queue = [])... [Thread-10] queue.put(4) returned (queue = []) [Thread-7 ] queue.take() returned 4 (queue = []) [Thread-11] Calling queue.put(5) (queue = [])... [Thread-11] queue.put(5) returned (queue = []) [Thread-6 ] queue.take() returned 5 (queue = [])
Wenn du die Konstante FAIR
auf true
setzt, wirst du sehen, wie die Elemente nicht in LIFO-, sondern in FIFO-Reihenfolge entnommen werden.
Alle vom JDK angebotenen threadsicheren Queue- und Deque-Implementierungen können bedenkenlos in multi-producer-multi-consumer-Umgebungen eingesetzt werden. Das bedeutet, dass ein oder mehrere schreibende Threads sowie ein oder mehrere lesende Threads nebenläufig auf die JDK-Queues und -Deques zugreifen können.
Mit speziellen Mechanismen ist es möglich Queues und Deques so zu optimieren, dass der Overhead für das Gewährleisten der Threadsicherheit minimiert wird, wenn es eine Beschränkung auf einen lesenden und/oder einen schreibenden Thread gibt.
Dementsprechend unterscheidet man folgende vier Fälle:
Die Open-Source-Bibliothek JCTools bietet für alle vier Fälle hoch-optimierte Queue-Implementierungen.
In diesem Ultimate Guide habe ich die grundlegenden Funktionen von Queues, Deques und Stacks erklärt, die entsprechenden Interfaces und Implementierungen im JDK anhand zahlreicher Code-Beispiele im Detail vorgestellt und Empfehlungen ausgesprochen, in welchen Fällen du zu welchen Implementierungen greifen solltest.
Für den tagtäglichen Gebrauch sind das:
ArrayDeque
für single-threaded Anwendungen.ConcurrentLinkedQueue
und ConcurrentLinkedDeque
als threadsichere, nicht blockierende und unbounded Queues/Deques.ArrayBlockingQueue
als threadsichere, blockierende, bounded Queue, sofern du wenig Contention zwischen Producer- und Consumer-Threads erwartest.LinkedBlockingQueue
als threadsichere, blockierende, bounded Queues, wenn du eher hohe Contention zwischen Producer- und Consumer-Threads erwartest (am besten testen, welche Implementierung für deinen Use Case performanter ist).LinkedBlockingDeque
als threadsicheres, blockierendes, bounded Deque.Wenn du den Artikel hilfreich findest, teile ihn gerne über einen der Share-Buttons etwas weiter unten. Wenn du informiert werden möchtest, wenn ich neue Artikel veröffentliche, dann trage dich gerne über das folgende Formular in meinen Mail-Verteiler ein.