

In diesem Teil der Tutorialserie geht es um die LinkedBlockingQueue
. Du wirst deren speziellen Eigenschaften kennenlernen und anhand eines Beispiels sehen, wie man diese Queue einsetzt. Außerdem wirst du erfahren, wann du genau diese Queue einsetzen solltest.
Hier befinden wir uns in der Klassenhierarchie:

LinkedBlockingQueue Eigenschaften
Die Klasse java.util.concurrent
.LinkedBlockingQueue
basiert – genau wie ConcurrentLinkedQueue – auf einer verketteten Liste, ist allerdings – ebenso wie die im nächsten Teil vorgestellte ArrayBlockingQueue – threadsicher (s. u.), bounded und blockierend.
Anders als die ArrayBlockingQueue
bietet LinkedBlockingQueue
keine Fairness Policy an. (Fairness Policy bedeutet, dass blockierende Methoden in der Reihenfolge bedient werden, in der sie aufgerufen werden.)
Die Queue-Eigenschaften im Detail:
Unterliegende Datenstruktur | Thread-safe? | Blocking/ Non-blocking | Fairness Policy | Bounded/ Unbounded | Iterator Type |
---|---|---|---|---|---|
Verkettete Liste | Ja (pessimistisches Locking mit zwei Locks) | Blocking | Nicht verfügbar | Bounded | Weakly consistent¹ |
¹ Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.
Einsatzempfehlung
Ich empfehle LinkedBlockingQueue
, wenn du eine blockierende, threadsichere Queue benötigst.
Die Klasse LinkedBlockingQueue
wird übrigens von Executors.newFixedThreadPool()
und Executors.newSingleThreadedExecutor()
als "work queue" für den Executor verwendet. Sie wird somit intensiv genutzt, was die Wahrscheinlichkeit für Bugs äußerst gering hält.
LinkedBlockingQueue Beispiel
Das folgende Beispiel zeigt, wie die LinkedBlockingQueue
verwendet wird. Wir erzeugen eine Queue mit der Kapazität 3. Direkt im Anschluss beginnen wir im Abstand von jeweils drei Sekunden Elemente aus der Queue zu lesen. Erst nach 3,5 Sekunden beginnen wir im Abstand von jeweils einer Sekunde Elemente in die Queue zu schreiben (→ Code auf GitHub).
public class LinkedBlockingQueueExample {
private static final long startTime = System.currentTimeMillis();
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// Start reading from the queue immediately, every 3 seconds
for (int i = 0; i < 10; i++) {
int delaySeconds = i * 3;
pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS);
}
// Start writing to the queue after 3.5 seconds (so there are already 2 threads
// waiting), every 1 seconds (so that the queue fills faster than it's emptied,
// so that we see a full queue soon)
for (int i = 0; i < 10; i++) {
int element = i; // Assign to an effectively final variable
int delayMillis = 3500 + i * 1000;
pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
}
private static void enqueue(BlockingQueue<Integer> queue, int element) {
log("Calling queue.put(%d) (queue = %s)...", element, queue);
try {
queue.put(element);
log("queue.put(%d) returned (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void dequeue(BlockingQueue<Integer> queue) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer element = queue.take();
log(" queue.take() returned %d (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void log(String format, Object... args) {
System.out.printf(
Locale.US,
"[%4.1fs] [%-16s] %s%n",
(System.currentTimeMillis() - startTime) / 1000.0,
Thread.currentThread().getName(),
String.format(format, args));
}
}
Code-Sprache: Java (java)
Im folgenden siehst du die Ausgabe des Beispiel-Programms:
[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])...
[ 3.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [])...
[ 3.5s] [pool-1-thread-8 ] Calling queue.put(0) (queue = [])...
[ 3.5s] [pool-1-thread-1 ] queue.take() returned 0 (queue = [])
[ 3.5s] [pool-1-thread-8 ] queue.put(0) returned (queue = [])
[ 4.5s] [pool-1-thread-5 ] Calling queue.put(1) (queue = [])...
[ 4.5s] [pool-1-thread-4 ] queue.take() returned 1 (queue = [])
[ 4.5s] [pool-1-thread-5 ] queue.put(1) returned (queue = [])
[ 5.5s] [pool-1-thread-3 ] Calling queue.put(2) (queue = [])...
[ 5.5s] [pool-1-thread-3 ] queue.put(2) returned (queue = [2])
[ 6.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [2])...
[ 6.0s] [pool-1-thread-7 ] queue.take() returned 2 (queue = [])
[ 6.5s] [pool-1-thread-9 ] Calling queue.put(3) (queue = [])...
[ 6.5s] [pool-1-thread-9 ] queue.put(3) returned (queue = [3])
[ 7.5s] [pool-1-thread-6 ] Calling queue.put(4) (queue = [3])...
[ 7.5s] [pool-1-thread-6 ] queue.put(4) returned (queue = [3, 4])
[ 8.5s] [pool-1-thread-2 ] Calling queue.put(5) (queue = [3, 4])...
[ 8.5s] [pool-1-thread-2 ] queue.put(5) returned (queue = [3, 4, 5])
[ 9.0s] [pool-1-thread-10] Calling queue.take() (queue = [3, 4, 5])...
[ 9.0s] [pool-1-thread-10] queue.take() returned 3 (queue = [4, 5])
[ 9.5s] [pool-1-thread-1 ] Calling queue.put(6) (queue = [4, 5])...
[ 9.5s] [pool-1-thread-1 ] queue.put(6) returned (queue = [4, 5, 6])
[10.5s] [pool-1-thread-8 ] Calling queue.put(7) (queue = [4, 5, 6])...
[11.5s] [pool-1-thread-4 ] Calling queue.put(8) (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-5 ] Calling queue.take() (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-5 ] queue.take() returned 4 (queue = [5, 6, 7])
[12.0s] [pool-1-thread-8 ] queue.put(7) returned (queue = [5, 6, 7])
[12.5s] [pool-1-thread-3 ] Calling queue.put(9) (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-7 ] queue.take() returned 5 (queue = [6, 7, 8])
[15.0s] [pool-1-thread-4 ] queue.put(8) returned (queue = [6, 7, 8])
[18.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [6, 7, 8])...
[18.0s] [pool-1-thread-3 ] queue.put(9) returned (queue = [7, 8, 9])
[18.0s] [pool-1-thread-9 ] queue.take() returned 6 (queue = [7, 8, 9])
[21.0s] [pool-1-thread-6 ] Calling queue.take() (queue = [7, 8, 9])...
[21.0s] [pool-1-thread-6 ] queue.take() returned 7 (queue = [8, 9])
[24.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [8, 9])...
[24.0s] [pool-1-thread-2 ] queue.take() returned 8 (queue = [9])
[27.0s] [pool-1-thread-10] Calling queue.take() (queue = [9])...
[27.0s] [pool-1-thread-10] queue.take() returned 9 (queue = [])
Code-Sprache: Klartext (plaintext)
Da wir erst mit dem Schreiben beginnen, nachdem bereits zwei Threads take()
aufrufen, blockieren diese ersten zwei Leseversuche bei 0,0 und 3,0 s (Thread 1 und 4).
Nach 3,5 s wird das erste Element geschrieben (Thread 8). Dadurch wird Thread 1 aufgeweckt und die take()
Methode entnimmt dieses Element sofort wieder aus der Queue.
Nach 4,5 s wird das zweite Element geschrieben (Thread 5). Thread 4 wird aufgeweckt und entnimmt dieses Element wieder aus der Queue.
Das Programm schreibt schneller als es liest. Nach 10,5 s blockiert daher zum ersten Mal ein schreibender Thread (Thread 8) beim Versuch die 7 in die - zu dem Zeitpunkt volle - Queue zu schreiben. Nach 11,5 s blockiert ebenfalls Thread 4 beim Versuch die 8 in die Queue zu schreiben.
Nach 12,0 s entnimmt Thread 5 ein Element aus der Queue. Dadurch wird in der Queue ein Platz frei. Thread 8 wird aufgeweckt und schreibt die 7 in die Queue.
Versuch einmal selbst die restlichen Ausgaben zu lesen und zu verstehen.
Ist LinkedBlockingQueue threadsicher?
Ja, LinkedBlockingQueue
ist threadsicher.
Die Threadsicherheit der LinkedBlockingQueue
wird durch pessimistisches Locking mittels zweier separater ReentrantLock
für Schreib- und Leseoperationen gewährleistet. So kann es zu keiner Contention (Zugriffskonflikten) zwischen Producer- und Consumer-Threads kommen.
Unterschiede zu anderen Queues:
- Bei ConcurrentLinkedQueue wird die Threadsicherheit durch optimistisches Locking via Compare-and-Set gewährleistet, was zu besserer Performance bei niedriger bis moderater Contention führt.
- ArrayBlockingQueue wird mit nur einem
ReentrantLock
geschützt, so dass dort Zugriffskonflikte zwischen Producer- und Consumer-Threads möglich sind.
LinkedBlockingQueue Zeitkomplexität
Wie bei allen Queues ist der Aufwand für die Enqueue- und Dequeue-Operationen unabhängig von der Länge der Queue. Die Zeitkomplexität beträgt also O(1).
Dies gilt auch für die size()
-Methode. Im Gegensatz zur ebenfalls auf einer verketteten Liste basierenden ConcurrentLinkedQueue
, die bei jedem Aufruf von size()
die komplette Liste durchläuft, um die Elemente zu zählen, verwendet LinkedBlockingQueue
intern ein AtomicInteger
, das beim Einfügen und Entnehmen aktualisiert wird, und somit die Größe mit konstantem Aufwand verfügbar hält.
Zusammenfassung und Ausblick
In diesem Artikel hast du die LinkedBlockingQueue
kennengelernt – eine threadsichere, blockierende, bounded Queue. An einem Beispiel hast du gesehen, wie du LinkedBlockingQueue
einsetzen kannst. Du hast außerdem erfahren, in welchen Fällen du LinkedBlockingQueue
einsetzen solltest.
LinkedBlockingQueue
basiert auf einer verketteten Liste. Im nächsten Teil des Tutorials geht es um das auf einem Array basierende Pendant – die ArrayBlockingQueue.
Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.