In diesem Artikel geht es um eine sehr spezielle Queue – die SynchronousQueue
, deren Eigenschaften und Einsatzgebiete. Anhand eines Beispiels siehst du, wie man die SynchronousQueue
einsetzt.
Hier befinden wir uns in der Klassenhierarchie:
SynchronousQueue Eigenschaften
Das “Synchronous” in der Klasse java.util.concurrent.SynchronousQueue
ist nicht mit "synchronized" zu verwechseln. Es bedeutet vielmehr, dass jede Enqueue-Operation auf eine korrespondierende Dequeue-Operation warten muss und jede Dequeue-Operation auf eine Enqueue-Operation.
Eine SynchronousQueue
enthält niemals Elemente, auch dann nicht, wenn Enqueue-Operationen gerade auf Dequeue-Operationen warten. Analog dazu ist die Größe einer SynchronousQueue
immer 0, und peek()
liefert immer null
zurück.
Die SynchronousQueue
und die ArrayBlockingQueue sind die einzigen Queue-Implementierungen, die eine Fairness Policy anbieten. Hierbei gibt es eine Besonderheit: Wenn die Fairness Policy nicht aktiviert ist, werden blockierende Aufrufe laut Dokumentation in unspezifizierter Reihenfolge bedient. Tatsächlich ist es jedoch so, dass diese exakt in umgekehrter Reihenfolge bedient werden (also in LIFO-Reihenfolge), da intern ein Stack verwendet wird.
Die Eigenschaften der SynchronousQueue
im Detail:
Unterliegende Datenstruktur | Thread-safe? | Blocking/ Non-blocking | Fairness Policy | Bounded/ Unbounded | Iterator Type |
---|---|---|---|---|---|
Stack (implementiert mit verketteter Liste) | Ja (optimistisches Locking durch Compare-and-set) | Blocking | Optional | Unbounded | Der Iterator ist immer leer. |
Einsatzempfehlung
Genau wie die DelayQueue und die LinkedTransferQueue habe ich auch die SynchronousQueue
in eigenen Projekten noch nie direkt eingesetzt.
Sollten ihre Eigenschaften zu deinen Anforderungen passen, kannst du sie bedenkenlos verwenden. Im JDK wird die SynchronousQueue
von Executors.newCachedThreadPool()
als „work queue“ für den Executor eingesetzt; die Wahrscheinlichkeit von Bugs ist also äußerst gering.
SynchronousQueue Beispiel
Im folgende Beispiel (→ Code auf GitHub) werden zunächst drei Threads gestartet, die SynchronousQueue.put()
aufrufen, danach sechs Threads, die SynchronousQueue.take()
aufrufen und anschließend noch einmal drei Threads, die SynchronousQueue.put()
ausführen:
public class SynchronousQueueExample {
private static final boolean FAIR = false;
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new SynchronousQueue<>(FAIR);
// Start 3 producing threads
for (int i = 0; i < 3; i++) {
int element = i; // Assign to an effectively final variable
new Thread(() -> enqueue(queue, element)).start();
Thread.sleep(250);
}
// Start 6 consuming threads
for (int i = 0; i < 6; i++) {
new Thread(() -> dequeue(queue)).start();
Thread.sleep(250);
}
// Start 3 more producing threads
for (int i = 3; i < 6; i++) {
int element = i; // Assign to an effectively final variable
new Thread(() -> enqueue(queue, element)).start();
Thread.sleep(250);
}
}
private static void enqueue(BlockingQueue<Integer> queue, int element) {
log("Calling queue.put(%d) (queue = %s)...", element, queue);
try {
queue.put(element);
log("queue.put(%d) returned (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void dequeue(BlockingQueue<Integer> queue) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer element = queue.take();
log(" queue.take() returned %d (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void log(String format, Object... args) {
System.out.printf(
Locale.US,
"[%-9s] %s%n",
Thread.currentThread().getName(),
String.format(format, args));
}
}
Code-Sprache: Java (java)
In der Ausgabe kannst du sehen, wie die ersten drei Aufrufe von put()
durch die Threads 0, 1 und 2 solange blockieren, bis die eingefügten Elemente mit take()
durch die Threads 3, 4 und 5 in umgekehrter Reihenfolge entnommen werden.
Danach blockieren die drei folgenden Aufrufe von take()
(Threads 6, 7, 8) solange bis mit put()
drei weitere Elemente in die Queue geschrieben wurden (Threads 9, 10, 11).
Die Queue bleibt während der gesamten Zeit leer.
[Thread-0 ] Calling queue.put(0) (queue = [])...
[Thread-1 ] Calling queue.put(1) (queue = [])...
[Thread-2 ] Calling queue.put(2) (queue = [])...
[Thread-3 ] Calling queue.take() (queue = [])...
[Thread-3 ] queue.take() returned 2 (queue = [])
[Thread-2 ] queue.put(2) returned (queue = [])
[Thread-4 ] Calling queue.take() (queue = [])...
[Thread-4 ] queue.take() returned 1 (queue = [])
[Thread-1 ] queue.put(1) returned (queue = [])
[Thread-5 ] Calling queue.take() (queue = [])...
[Thread-5 ] queue.take() returned 0 (queue = [])
[Thread-0 ] queue.put(0) returned (queue = [])
[Thread-6 ] Calling queue.take() (queue = [])...
[Thread-7 ] Calling queue.take() (queue = [])...
[Thread-8 ] Calling queue.take() (queue = [])...
[Thread-9 ] Calling queue.put(3) (queue = [])...
[Thread-9 ] queue.put(3) returned (queue = [])
[Thread-8 ] queue.take() returned 3 (queue = [])
[Thread-10] Calling queue.put(4) (queue = [])...
[Thread-10] queue.put(4) returned (queue = [])
[Thread-7 ] queue.take() returned 4 (queue = [])
[Thread-11] Calling queue.put(5) (queue = [])...
[Thread-11] queue.put(5) returned (queue = [])
[Thread-6 ] queue.take() returned 5 (queue = [])
Code-Sprache: Klartext (plaintext)
Wenn du die Konstante FAIR
auf true
setzt, wirst du sehen, wie die Elemente nicht in LIFO-, sondern in FIFO-Reihenfolge entnommen werden.
Zusammenfassung und Ausblick
In diesem Artikel hast du die SynchronousQueue
kennengelernt – eine Queue, die niemals Elemente enthält, sondern diese direkt von den enqueuenden Threads an die dequeuenden Threads übergibt.
Im nächsten Teil geht es um die letzte Queue-Impementierung dieser Tutorial-Serie: die LinkedTransferQueue.
Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.