BlockingQueue - Feature ImageBlockingQueue - Feature Image
HappyCoders Glasses

BlockingQueue Interface in Java

Sven Woltmann
Sven Woltmann
20. April 2022

In diesem Artikel lernst du das Interface java.util.concurrent.BlockingQueue kennen. BlockingQueue erweitert das im vorherigen Teil dieser Tutorial-Serie besprochene Queue-Interface um Methoden zum blockierenden Zugriff auf Queues.

Bevor wir klären, was "blockierender Zugriff" bedeutet, müssen wir zunächst über den Begriff "bounded Queue" sprechen.

Was ist eine bounded Queue?

Wenn eine Queue nur eine begrenzte Anzahl von Elementen aufnehmen kann, spricht man von einer "bounded Queue". Die maximale Anzahl von Elementen wird mit "Kapazität" (englisch "capacity") bezeichnet und beim Erzeugen der Queue einmalig festgelegt.

Die folgende Code-Zeile bspw. erzeugt eine auf 100 Elemente begrenzte ArrayBlockingQueue:

Queue<Integer> queue = new ArrayBlockingQueue<>(100);
Code-Sprache: Java (java)

Ist die Anzahl der Elemente in der Queue hingegen nicht begrenzt (bzw. nur durch den zur Verfügung stehenden Speicher), spricht man von einer "unbounded Queue".

(Dieselbe Definition gilt übrigens für alle Datenstrukturen, z. B. auch für Stacks und Deques.)

Was ist eine blockierende Queue?

Bei den Queue-Operationen "Enqueue" und "Dequeue" können zwei Sonderfälle auftreten:

  • Wir könnten versuchen ein Element in eine bounded Queue einzufügen, die ihre Kapazitätsgrenze erreicht hat – oder anders gesagt: die voll ist.
  • Wir könnten versuchen ein Element aus einer leeren Queue zu entnehmen.

Eine nicht blockierende ("non-blocking") Queue liefert in solchen Fällen einen bestimmten Rückgabewert oder wirft eine Exception (s. Abschnitt "Queue-Methoden" im Artikel über Java-Queues).

Eine blockierende Queue hingegen bietet zusätzliche Methoden, die darauf warten, dass die gewünschte Operation ausgeführt werden kann:

  • Enqueue-Methoden, die beim Einfügen in eine volle bounded Queue warten, bis diese wieder freie Kapazitäten hat (dazu muss ein ander Thread ein Element entnehmen).
  • Dequeue-Methoden, die beim Entnehmen eines Element aus einer leeren Queue darauf warten, dass diese nicht mehr leer ist (dazu muss ein anderer Thread ein Element einfügen).

Diese zusätzlichen Methoden sind im BlockingQueue-Interface definiert und werden im folgenden Kapitel erläutert.

Fairness Policy

Blockierende Methoden werden dabei nicht automatisch in der Reihenfolge bedient, in der sie aufgerufen wurden. Die Abarbeitung in Aufrufreihenfolge kann bei einigen Queue-Implementierungen durch eine optionale "Fairness Policy" aktiviert werden. Diese erhöht allerdings den Overhead und verringert damit den Durchsatz der Queue massiv. In der Regel ist die Aktivierung der "Fairness Policy" nicht nötig.

BlockingQueue Interface

Die blockierenden Enqueue- und Dequeue-Operationen gibt es in jeweils zwei Varianten. Die erste Variante wartet unbegrenzt lange. Die zweite Variante gibt nach Ablauf einer vorgegebenen Wartezeit auf und liefert false bzw. null zurück.

Die folgende Tabelle zeigt in den ersten zwei Spalten die nicht blockierenden Methoden, die BlockingQueue von Queue erbt (und die wir im vorherigen Teil des Tutorials besprochen haben). In der dritten und vierten Spalte findest du die hinzugekommenden, blockierenden Methoden:

Nicht blockierend
(geerbt von Queue)
Blockierend
(neu in BlockingQueue)
ExceptionRückgabewertBlockiertBlockiert
mit Timeout
Element anhängen
(enqueue):
add(E e)offer(E e)put(E e)offer(E e,
  long timeout,
  TimeUnit unit)
Element entnehmen
(dequeue):
remove()poll()take()poll(
  long timeout,
  TimeUnit unit)
Element ansehen
(examine):
element()peek()

Der folgende Abschnitt beschreibt die BlockingQueue-Methoden im Einzelnen

BlockingQueue Methoden

BlockingQueue.put()

Die put()-Methode fügt, sofern Platz vorhanden ist, ein Element in die Queue ein. Ist die Kapazitätsgrenze der Queue hingegen erreicht, blockiert die Methode solange, bis Platz freigeworden ist.

BlockingQueue.offer() mit Timeout

Auch die offer()-Methode fügt ein Element ein, wenn in der Queue noch Platz ist. Andernfalls wartet die Methode die angegebene Zeit. Wird in dieser Zeit ein Platz frei, wird das Element eingefügt und die Methode gibt true zurück. Läuft die Wartezeit hingegen ab, ohne dass ein Platz freigeworden ist, gibt die Methode false zurück.

BlockingQueue.take()

Diese Methode entnimmt ein Element vom Kopf der Queue, sofern die Queue nicht leer ist. Bei einer leeren Queue blockiert take() solange, bis ein Element verfügbar geworden ist und gibt dieses dann zurück.

BlockingQueue.poll() mit Timeout

Auch poll() entnimmt ein Element vom Kopf der Queue, wenn diese nicht leer ist. Ist die Queue hingegen leer, wartet die Methode die angegebene Zeit. Sofern in der Wartezeit ein Element verfügbar wird, wird dieses entnommen und zurückgegeben. Läuft die Wartezeit ergebnislos ab, gibt die Methode null zurück.

InterruptedException bei blockierenden Methoden

Alle blockierenden Methoden werfen eine InterruptedException, wenn auf dem wartenden Thread die interrupt()-Methode aufgerufen wird. Mit interrupt() sollten wartende Threads abgebrochen werden, wenn das Warten nicht mehr nötig ist.

Dies ist z. B. beim Herunterfahren der Applikation der Fall. Dabei wird eventuell das Ereignis, auf das die blockierende Methode wartet, nicht mehr eintreten. Die Methode würde aber dennoch auf den Eintritt des Ereignisses warten und damit ein reguläres Herunterfahren der Applikation verhindern. Das Abbrechen der wartenden Threads mit interrupt() ermöglicht ein sauberes Herunterfahren.

Java BlockingQueue Beispiel

Der folgende Quellcode zeigt ein Beispiel, welches aufgrund der Nebenläufigkeit deutlich komplexer ausfällt als das Beispiel mit einer nicht-blockierenden Queue (→ Code in GitHub):

public class BlockingQueueExample { private static final long startTime = System.currentTimeMillis(); public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3); ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); // Start reading from the queue immediately, every 3 seconds for (int i = 0; i < 10; i++) { int delaySeconds = i * 3; pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS); } // Start writing to the queue after 3.5 seconds (so there are already 2 // threads waiting), every 1 seconds (so that the queue fills faster than // it's emptied, so that we see a full queue soon) for (int i = 0; i < 10; i++) { int element = i; // Assign to an effectively final variable int delayMillis = 3500 + i * 1000; pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS); } pool.shutdown(); pool.awaitTermination(1, TimeUnit.MINUTES); } private static void enqueue(BlockingQueue<Integer> queue, int element) { log("Calling queue.put(%d) (queue = %s)...", element, queue); try { queue.put(element); log("queue.put(%d) returned (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void dequeue(BlockingQueue<Integer> queue) { log(" Calling queue.take() (queue = %s)...", queue); try { Integer element = queue.take(); log(" queue.take() returned %d (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void log(String format, Object... args) { System.out.printf( Locale.US, "[%4.1fs] [%-16s] %s%n", (System.currentTimeMillis() - startTime) / 1000.0, Thread.currentThread().getName(), String.format(format, args)); } }
Code-Sprache: Java (java)

In diesem Beispiel erstellen wir eine blocking, bounded Queue mit der Kapazität 3 und schedulen jeweils zehn Enqueue- und zehn Dequeue-Operationen.

Die Enqueue-Operationen beginnen später, so dass wir am Anfang blockierende Dequeue-Operationen sehen können. Außerdem erfolgen die Enqueue-Operationen in kürzen Abständen, so dass die Kapazitätsgrenze der Queue nach einer Weile erreicht ist und wir blockierende Enqueue-Operationen sehen können.

Hier die Ausgabe des Programms:

[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])... [ 3.0s] [pool-1-thread-3 ] Calling queue.take() (queue = [])... [ 3.5s] [pool-1-thread-2 ] Calling queue.put(0) (queue = [])... [ 3.5s] [pool-1-thread-2 ] queue.put(0) returned (queue = []) [ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = []) [ 4.5s] [pool-1-thread-10] Calling queue.put(1) (queue = [])... [ 4.5s] [pool-1-thread-10] queue.put(1) returned (queue = []) [ 4.5s] [pool-1-thread-3 ] queue.take() returned 1 (queue = []) [ 5.5s] [pool-1-thread-8 ] Calling queue.put(2) (queue = [])... [ 5.5s] [pool-1-thread-8 ] queue.put(2) returned (queue = [2]) [ 6.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [2])... [ 6.0s] [pool-1-thread-9 ] queue.take() returned 2 (queue = []) [ 6.5s] [pool-1-thread-5 ] Calling queue.put(3) (queue = [])... [ 6.5s] [pool-1-thread-5 ] queue.put(3) returned (queue = [3]) [ 7.5s] [pool-1-thread-6 ] Calling queue.put(4) (queue = [3])... [ 7.5s] [pool-1-thread-6 ] queue.put(4) returned (queue = [3, 4]) [ 8.5s] [pool-1-thread-7 ] Calling queue.put(5) (queue = [3, 4])... [ 8.5s] [pool-1-thread-7 ] queue.put(5) returned (queue = [3, 4, 5]) [ 9.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [3, 4, 5])... [ 9.0s] [pool-1-thread-4 ] queue.take() returned 3 (queue = [4, 5]) [ 9.5s] [pool-1-thread-2 ] Calling queue.put(6) (queue = [4, 5])... [ 9.5s] [pool-1-thread-2 ] queue.put(6) returned (queue = [4, 5, 6]) [10.5s] [pool-1-thread-1 ] Calling queue.put(7) (queue = [4, 5, 6])... [11.5s] [pool-1-thread-10] Calling queue.put(8) (queue = [4, 5, 6])... [12.0s] [pool-1-thread-3 ] Calling queue.take() (queue = [4, 5, 6])... [12.0s] [pool-1-thread-3 ] queue.take() returned 4 (queue = [5, 6, 7]) [12.0s] [pool-1-thread-1 ] queue.put(7) returned (queue = [5, 6, 7]) [12.5s] [pool-1-thread-8 ] Calling queue.put(9) (queue = [5, 6, 7])... [15.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [5, 6, 7])... [15.0s] [pool-1-thread-9 ] queue.take() returned 5 (queue = [6, 7, 8]) [15.0s] [pool-1-thread-10] queue.put(8) returned (queue = [6, 7, 8]) [18.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [6, 7, 8])... [18.0s] [pool-1-thread-5 ] queue.take() returned 6 (queue = [7, 8, 9]) [18.0s] [pool-1-thread-8 ] queue.put(9) returned (queue = [7, 8, 9]) [21.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [7, 8, 9])... [21.0s] [pool-1-thread-6 ] queue.take() returned 7 (queue = [8, 9]) [24.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [8, 9])... [24.0s] [pool-1-thread-7 ] queue.take() returned 8 (queue = [9]) [27.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [9])... [27.0s] [pool-1-thread-4 ] queue.take() returned 9 (queue = [])
Code-Sprache: Klartext (plaintext)

Zu Beginn ist die Queue leer, so dass die ersten zwei Leseversuche (nach 0 und 3 s) blockieren.

Nach 3,5 s (nachdem also zwei lesende Threads an der Queue warten) beginnt das Programm sekündlich in die Queue zu schreiben. Man sieht in der Ausgabe schön, wie dabei jeweils ein lesender Thread fortgesetzt wird und das angehängte Element sofort wieder entnimmt (bei 3,5 und 4,5 s).

Da das Programm dreimal so schnell in die Queue schreibt wie es daraus liest, blockiert nach 10,5 s der Versuch eine 7 in die Queue zu schreiben, da diese mit den Elementen [4, 5, 6] ihre Kapazitätsgrenze von 3 erreicht hat.

Erst nachdem nach 12 s die 4 aus der Queue entnommen wurde, kann die 7 eingefügt werden. Für die 8 und die 9 sehen wir ein entsprechendes Verhalten.

BlockingQueue Implementierungen

Im JDK gibt es fünf Implementierungen des BlockingQueue-Interfaces mit jeweils spezifischen Eigenschaften. In dem folgenden UML-Klassendiagramm sind diese mitsamt ihrem Interface farbig hinterlegt:

BlockingQueue Interface in der Klassenhierarchie
BlockingQueue Interface in der Klassenhierarchie

Die Implementierungen werden jeweils in separaten Teilen des Tutorials behandelt. Dort werden deren Eigenschaften vorgestellt und anhand dieser erläutert, unter welchen Voraussetzungen die jeweilige Implementierung eingesetzt werden sollte. Folgende Links führen zu den entsprechenden Artikeln:

Du gelangst zu diesen Artikeln auch jederzeit über die Tutorial-Navigation am rechten Rand.

Zusammenfassung und Ausblick

Dieser Artikel hat zunächst die Unterschiede zwischen bounded/unbounded und blocking/non-blocking Queues erläutert. Im Anschluss hast du das BlockingQueue-Interface und dessen Methoden put(), offer(), take() und poll() kennengelernt.

In den folgenden Teilen dieser Serie werden wir alle Queue- und BlockingQueue-Implementierung und deren Eigenschaften genauer unter die Lupe nehmen.

Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.