Im diesem Artikel geht es um die ArrayBlockingQueue
und deren Eigenschaften. Du siehst anhand eines Beispiels, wie die ArrayBlockingQueue
eingesetzt wird. Außerdem gebe ich dir eine Empfehlung, in welchen Fällen du diese Queue einsetzen solltest.
Hier befinden wir uns in der Klassenhierarchie:
ArrayBlockingQueue Eigenschaften
Die Klasse java.util.concurrent.ArrayBlockingQueue
basiert auf einem Array und ist – wie die meisten Queue-Implementierungen – threadsicher (s. u.). Sie ist bounded (hat eine maximale Kapazität), entsprechend blockierend und bietet eine Fairness Policy (d. h. blockierende Methoden werden in der Reihenfolge bedient, in der sie aufgerufen wurden).
Die Eigenschaften in der Übersicht:
Unterliegende Datenstruktur | Thread-safe? | Blocking/ Non-blocking | Fairness Policy | Bounded/ Unbounded | Iterator Type |
---|---|---|---|---|---|
Array | Ja (pessimistisches Locking mit einem Lock) | Blocking | Optional | Bounded | Weakly consistent¹ |
¹ Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.
Einsatzempfehlung
Aufgrund der möglicherweise hohen Contention bei gleichzeitigem Schreib- und Lesezugriff solltest du – wenn du eine blockierende, threadsichere Queue benötigst – für deinen speziellen Einsatzweck testen, ob evtl. eine LinkedBlockingQueue
performanter ist. Diese basiert zwar auf einer verketteten Liste, verwendet allerdings zum Schreiben und Lesen zwei separate ReentrantLock
s, was die Zugangskonflikte reduziert.
ArrayBlockingQueue Beispiel
Im folgenden Beispiel erzeugen wir eine ArrayBlockingQueue mit der Kapazität 3. Dann lassen wir über einen ScheduledExecutorService
in bestimmten Abständen Elemente in die Queue schreiben und aus ihr lesen (→ Code auf GitHub):
public class ArrayBlockingQueueExample {
private static final long startTime = System.currentTimeMillis();
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// Start reading from the queue immediately, every 3 seconds
for (int i = 0; i < 10; i++) {
int delaySeconds = i * 3;
pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS);
}
// Start writing to the queue after 3.5 seconds (so there are already 2 threads
// waiting), every 1 seconds (so that the queue fills faster than it's emptied,
// so that we see a full queue soon)
for (int i = 0; i < 10; i++) {
int element = i; // Assign to an effectively final variable
int delayMillis = 3500 + i * 1000;
pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
}
private static void enqueue(BlockingQueue<Integer> queue, int element) {
log("Calling queue.put(%d) (queue = %s)...", element, queue);
try {
queue.put(element);
log("queue.put(%d) returned (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void dequeue(BlockingQueue<Integer> queue) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer element = queue.take();
log(" queue.take() returned %d (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void log(String format, Object... args) {
System.out.printf(
Locale.US,
"[%4.1fs] [%-16s] %s%n",
(System.currentTimeMillis() - startTime) / 1000.0,
Thread.currentThread().getName(),
String.format(format, args));
}
}
Code-Sprache: Java (java)
Wir versuchen alle drei Sekunden, beginnend sofort, ein Element aus der Queue zu lesen. Wir schreiben die Elemente sekündlich, fangen allerdings erst nach 3,5 s damit an. Zu diesem Zeitpunkt sollten also bereits zwei lesende Threads blockiert haben und darauf warten, dass Elemente in die Queue geschrieben werden.
Da wir schneller schreiben als lesen, sollte die Queue bald ihre Kapazitätsgrenze erreicht haben. Ab dem Moment sollten die schreibenden Threads so lange blockieren, bis die lesenden Threads aufgeholt haben.
Hier eine beispielhafte Ausgabe:
[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])...
[ 3.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [])...
[ 3.5s] [pool-1-thread-3 ] Calling queue.put(0) (queue = [])...
[ 3.5s] [pool-1-thread-3 ] queue.put(0) returned (queue = [])
[ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = [])
[ 4.5s] [pool-1-thread-9 ] Calling queue.put(1) (queue = [])...
[ 4.5s] [pool-1-thread-9 ] queue.put(1) returned (queue = [])
[ 4.5s] [pool-1-thread-2 ] queue.take() returned 1 (queue = [])
[ 5.5s] [pool-1-thread-7 ] Calling queue.put(2) (queue = [])...
[ 5.5s] [pool-1-thread-7 ] queue.put(2) returned (queue = [2])
[ 6.0s] [pool-1-thread-8 ] Calling queue.take() (queue = [2])...
[ 6.0s] [pool-1-thread-8 ] queue.take() returned 2 (queue = [])
[ 6.5s] [pool-1-thread-5 ] Calling queue.put(3) (queue = [])...
[ 6.5s] [pool-1-thread-5 ] queue.put(3) returned (queue = [3])
[ 7.5s] [pool-1-thread-4 ] Calling queue.put(4) (queue = [3])...
[ 7.5s] [pool-1-thread-4 ] queue.put(4) returned (queue = [3, 4])
[ 8.5s] [pool-1-thread-10] Calling queue.put(5) (queue = [3, 4])...
[ 8.5s] [pool-1-thread-10] queue.put(5) returned (queue = [3, 4, 5])
[ 9.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [3, 4, 5])...
[ 9.0s] [pool-1-thread-6 ] queue.take() returned 3 (queue = [4, 5])
[ 9.5s] [pool-1-thread-3 ] Calling queue.put(6) (queue = [4, 5])...
[ 9.5s] [pool-1-thread-3 ] queue.put(6) returned (queue = [4, 5, 6])
[10.5s] [pool-1-thread-1 ] Calling queue.put(7) (queue = [4, 5, 6])...
[11.5s] [pool-1-thread-9 ] Calling queue.put(8) (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-2 ] queue.take() returned 4 (queue = [5, 6, 7])
[12.0s] [pool-1-thread-1 ] queue.put(7) returned (queue = [5, 6, 7])
[12.5s] [pool-1-thread-7 ] Calling queue.put(9) (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-8 ] Calling queue.take() (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-8 ] queue.take() returned 5 (queue = [6, 7, 8])
[15.0s] [pool-1-thread-9 ] queue.put(8) returned (queue = [6, 7, 8])
[18.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [6, 7, 8])...
[18.0s] [pool-1-thread-5 ] queue.take() returned 6 (queue = [7, 8, 9])
[18.0s] [pool-1-thread-7 ] queue.put(9) returned (queue = [7, 8, 9])
[21.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [7, 8, 9])...
[21.0s] [pool-1-thread-4 ] queue.take() returned 7 (queue = [8, 9])
[24.0s] [pool-1-thread-10] Calling queue.take() (queue = [8, 9])...
[24.0s] [pool-1-thread-10] queue.take() returned 8 (queue = [9])
[27.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [9])...
[27.0s] [pool-1-thread-6 ] queue.take() returned 9 (queue = [])
Code-Sprache: Klartext (plaintext)
Wie vorausgesehen blockieren die ersten zwei Leseversuche bei 0,0 s und 3,0 s, da noch keine Elemente in die Queue geschrieben wurden.
Nach 3,5 s wird das erste Element geschrieben. Dadurch wird der erste Thread aufgeweckt und entnimmt dieses Element wieder. Nach 4,5 s wird das zweite Element geschrieben und der zweite Thread aufgeweckt, um das Element zu entnehmen.
Da das Programm schneller schreibt als liest, blockieren nach 10,5 s Thread 1, nach 11,5 s Thread 9 und nach 12,5 s Thread 7 beim Versuch weitere Elemente in die zu dem Zeitpunkt volle Queue zu schreiben.
Nach 12,0 s wird ein Element entnommen und Thread 1 kann mit dem Schreiben fortfahren. Nach 15,0 s wird ein weiteres Element entnommen und Thread 9 kann fortfahren. Nach 18,0 s kann Thread 7 fortfahren.
Da keine weiteren Elemente in die Queue geschrieben werden, leert sie sich gegen Ende wieder.
Ist ArrayBlockingQueue threadsicher?
Ja, ArrayBlockingQueue
ist threadsicher.
Die Threadsicherheit von ArrayBlockingQueue
wird durch ein einzelnes ReentrantLock
gewährleistet. Dieses wird sowohl für den Kopf als auch für das Ende der Queue verwendet, so dass es bei gleichzeitigen Schreib- und Lesezugriffen zu Zugriffskonflikten ("thread contention") zwischen Producer- und Consumer-Threads kommen kann.
Explizite Locks wie ReentrantLock
sind hauptsächlich für Einsatzgebiete geeignet, in denen es zu hoher Thread Contention kommt. Bei niedriger bis moderater Contention ist optimistisches Locking performanter.
Unterschiede zu anderen Queues:
- Bei LinkedBlockingQueue wird die Threadsicherheit nicht nur durch eines, sondern durch zwei Locks gewährleistet. So können Producer- und Consumer-Threads sich nicht gegenseitig blockieren.
- Bei ConcurrentLinkedQueue wird die Threadsicherheit durch optimistisches Locking via Compare-and-Set gewährleistet, was zu besserer Performance bei niedriger bis moderater Contention führt.
Zusammenfassung und Ausblick
Dieser Artikel hat dir die ArrayBlockingQueue
vorgestellt. Diese Queue ist threadsicher, blockierend und bounded. Anhand eines Beispiels hast du gesehen, wie du ArrayBlockingQueue
einsetzen kannst.
Wie der Name schon sagt, basiert diese Queue auf einem Array. Das auf einer verketteten Liste basierende Pendant – LinkedBlockingQueue – wurde im vorherigen Teil der Serie behandelt.
Im nächsten Teil der Serie geht es um die PriorityBlockingQueue – eine threadsichere und blockierende Variante der in einem vorherigen Teil vorgestellten PriorityQueue.
Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.