ArrayBlockingQueue - Feature ImageArrayBlockingQueue - Feature Image
HappyCoders Glasses

Java ArrayBlockingQueue
(mit Beispiel)

Sven Woltmann
Sven Woltmann
20. April 2022

Im diesem Artikel geht es um die ArrayBlockingQueue und deren Eigenschaften. Du siehst anhand eines Beispiels, wie die ArrayBlockingQueue eingesetzt wird. Außerdem gebe ich dir eine Empfehlung, in welchen Fällen du diese Queue einsetzen solltest.

Hier befinden wir uns in der Klassenhierarchie:

ArrayBlockingQueue in der Klassenhierarchie
ArrayBlockingQueue in der Klassenhierarchie

ArrayBlockingQueue Eigenschaften

Die Klasse java.util.concurrent.ArrayBlockingQueue basiert auf einem Array und ist – wie die meisten Queue-Implementierungen – threadsicher (s. u.). Sie ist bounded (hat eine maximale Kapazität), entsprechend blockierend und bietet eine Fairness Policy (d. h. blockierende Methoden werden in der Reihenfolge bedient, in der sie aufgerufen wurden).

Die Eigenschaften in der Übersicht:

Unterliegende DatenstrukturThread-safe?Blocking/
Non-blocking
Fairness
Policy
Bounded/
Unbounded
Iterator Type
ArrayJa
(pessimistisches Locking mit einem Lock)
BlockingOptionalBoundedWeakly consistent¹

¹ Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.

Einsatzempfehlung

Aufgrund der möglicherweise hohen Contention bei gleichzeitigem Schreib- und Lesezugriff solltest du – wenn du eine blockierende, threadsichere Queue benötigst – für deinen speziellen Einsatzweck testen, ob evtl. eine LinkedBlockingQueue performanter ist. Diese basiert zwar auf einer verketteten Liste, verwendet allerdings zum Schreiben und Lesen zwei separate ReentrantLocks, was die Zugangskonflikte reduziert.

ArrayBlockingQueue Beispiel

Im folgenden Beispiel erzeugen wir eine ArrayBlockingQueue mit der Kapazität 3. Dann lassen wir über einen ScheduledExecutorService in bestimmten Abständen Elemente in die Queue schreiben und aus ihr lesen (→ Code auf GitHub):

public class ArrayBlockingQueueExample { private static final long startTime = System.currentTimeMillis(); public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3); ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); // Start reading from the queue immediately, every 3 seconds for (int i = 0; i < 10; i++) { int delaySeconds = i * 3; pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS); } // Start writing to the queue after 3.5 seconds (so there are already 2 threads // waiting), every 1 seconds (so that the queue fills faster than it's emptied, // so that we see a full queue soon) for (int i = 0; i < 10; i++) { int element = i; // Assign to an effectively final variable int delayMillis = 3500 + i * 1000; pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS); } pool.shutdown(); pool.awaitTermination(1, TimeUnit.MINUTES); } private static void enqueue(BlockingQueue<Integer> queue, int element) { log("Calling queue.put(%d) (queue = %s)...", element, queue); try { queue.put(element); log("queue.put(%d) returned (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void dequeue(BlockingQueue<Integer> queue) { log(" Calling queue.take() (queue = %s)...", queue); try { Integer element = queue.take(); log(" queue.take() returned %d (queue = %s)", element, queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private static void log(String format, Object... args) { System.out.printf( Locale.US, "[%4.1fs] [%-16s] %s%n", (System.currentTimeMillis() - startTime) / 1000.0, Thread.currentThread().getName(), String.format(format, args)); } }
Code-Sprache: Java (java)

Wir versuchen alle drei Sekunden, beginnend sofort, ein Element aus der Queue zu lesen. Wir schreiben die Elemente sekündlich, fangen allerdings erst nach 3,5 s damit an. Zu diesem Zeitpunkt sollten also bereits zwei lesende Threads blockiert haben und darauf warten, dass Elemente in die Queue geschrieben werden.

Da wir schneller schreiben als lesen, sollte die Queue bald ihre Kapazitätsgrenze erreicht haben. Ab dem Moment sollten die schreibenden Threads so lange blockieren, bis die lesenden Threads aufgeholt haben.

Hier eine beispielhafte Ausgabe:

[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])... [ 3.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [])... [ 3.5s] [pool-1-thread-3 ] Calling queue.put(0) (queue = [])... [ 3.5s] [pool-1-thread-3 ] queue.put(0) returned (queue = []) [ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = []) [ 4.5s] [pool-1-thread-9 ] Calling queue.put(1) (queue = [])... [ 4.5s] [pool-1-thread-9 ] queue.put(1) returned (queue = []) [ 4.5s] [pool-1-thread-2 ] queue.take() returned 1 (queue = []) [ 5.5s] [pool-1-thread-7 ] Calling queue.put(2) (queue = [])... [ 5.5s] [pool-1-thread-7 ] queue.put(2) returned (queue = [2]) [ 6.0s] [pool-1-thread-8 ] Calling queue.take() (queue = [2])... [ 6.0s] [pool-1-thread-8 ] queue.take() returned 2 (queue = []) [ 6.5s] [pool-1-thread-5 ] Calling queue.put(3) (queue = [])... [ 6.5s] [pool-1-thread-5 ] queue.put(3) returned (queue = [3]) [ 7.5s] [pool-1-thread-4 ] Calling queue.put(4) (queue = [3])... [ 7.5s] [pool-1-thread-4 ] queue.put(4) returned (queue = [3, 4]) [ 8.5s] [pool-1-thread-10] Calling queue.put(5) (queue = [3, 4])... [ 8.5s] [pool-1-thread-10] queue.put(5) returned (queue = [3, 4, 5]) [ 9.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [3, 4, 5])... [ 9.0s] [pool-1-thread-6 ] queue.take() returned 3 (queue = [4, 5]) [ 9.5s] [pool-1-thread-3 ] Calling queue.put(6) (queue = [4, 5])... [ 9.5s] [pool-1-thread-3 ] queue.put(6) returned (queue = [4, 5, 6]) [10.5s] [pool-1-thread-1 ] Calling queue.put(7) (queue = [4, 5, 6])... [11.5s] [pool-1-thread-9 ] Calling queue.put(8) (queue = [4, 5, 6])... [12.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [4, 5, 6])... [12.0s] [pool-1-thread-2 ] queue.take() returned 4 (queue = [5, 6, 7]) [12.0s] [pool-1-thread-1 ] queue.put(7) returned (queue = [5, 6, 7]) [12.5s] [pool-1-thread-7 ] Calling queue.put(9) (queue = [5, 6, 7])... [15.0s] [pool-1-thread-8 ] Calling queue.take() (queue = [5, 6, 7])... [15.0s] [pool-1-thread-8 ] queue.take() returned 5 (queue = [6, 7, 8]) [15.0s] [pool-1-thread-9 ] queue.put(8) returned (queue = [6, 7, 8]) [18.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [6, 7, 8])... [18.0s] [pool-1-thread-5 ] queue.take() returned 6 (queue = [7, 8, 9]) [18.0s] [pool-1-thread-7 ] queue.put(9) returned (queue = [7, 8, 9]) [21.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [7, 8, 9])... [21.0s] [pool-1-thread-4 ] queue.take() returned 7 (queue = [8, 9]) [24.0s] [pool-1-thread-10] Calling queue.take() (queue = [8, 9])... [24.0s] [pool-1-thread-10] queue.take() returned 8 (queue = [9]) [27.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [9])... [27.0s] [pool-1-thread-6 ] queue.take() returned 9 (queue = [])
Code-Sprache: Klartext (plaintext)

Wie vorausgesehen blockieren die ersten zwei Leseversuche bei 0,0 s und 3,0 s, da noch keine Elemente in die Queue geschrieben wurden.

Nach 3,5 s wird das erste Element geschrieben. Dadurch wird der erste Thread aufgeweckt und entnimmt dieses Element wieder. Nach 4,5 s wird das zweite Element geschrieben und der zweite Thread aufgeweckt, um das Element zu entnehmen.

Da das Programm schneller schreibt als liest, blockieren nach 10,5 s Thread 1, nach 11,5 s Thread 9 und nach 12,5 s Thread 7 beim Versuch weitere Elemente in die zu dem Zeitpunkt volle Queue zu schreiben.

Nach 12,0 s wird ein Element entnommen und Thread 1 kann mit dem Schreiben fortfahren. Nach 15,0 s wird ein weiteres Element entnommen und Thread 9 kann fortfahren. Nach 18,0 s kann Thread 7 fortfahren.

Da keine weiteren Elemente in die Queue geschrieben werden, leert sie sich gegen Ende wieder.

Ist ArrayBlockingQueue threadsicher?

Ja, ArrayBlockingQueue ist threadsicher.

Die Threadsicherheit von ArrayBlockingQueue wird durch ein einzelnes ReentrantLock gewährleistet. Dieses wird sowohl für den Kopf als auch für das Ende der Queue verwendet, so dass es bei gleichzeitigen Schreib- und Lesezugriffen zu Zugriffskonflikten ("thread contention") zwischen Producer- und Consumer-Threads kommen kann.

Explizite Locks wie ReentrantLock sind hauptsächlich für Einsatzgebiete geeignet, in denen es zu hoher Thread Contention kommt. Bei niedriger bis moderater Contention ist optimistisches Locking performanter.

Unterschiede zu anderen Queues:

  • Bei LinkedBlockingQueue wird die Threadsicherheit nicht nur durch eines, sondern durch zwei Locks gewährleistet. So können Producer- und Consumer-Threads sich nicht gegenseitig blockieren.
  • Bei ConcurrentLinkedQueue wird die Threadsicherheit durch optimistisches Locking via Compare-and-Set gewährleistet, was zu besserer Performance bei niedriger bis moderater Contention führt.

Zusammenfassung und Ausblick

Dieser Artikel hat dir die ArrayBlockingQueue vorgestellt. Diese Queue ist threadsicher, blockierend und bounded. Anhand eines Beispiels hast du gesehen, wie du ArrayBlockingQueue einsetzen kannst.

Wie der Name schon sagt, basiert diese Queue auf einem Array. Das auf einer verketteten Liste basierende Pendant – LinkedBlockingQueue – wurde im vorherigen Teil der Serie behandelt.

Im nächsten Teil der Serie geht es um die PriorityBlockingQueue – eine threadsichere und blockierende Variante der in einem vorherigen Teil vorgestellten PriorityQueue.

Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.