LinkedBlockingQueue - Feature ImageLinkedBlockingQueue - Feature Image
HappyCoders Glasses

LinkedBlockingQueue in Java
(mit Beispiel)

Sven Woltmann
Sven Woltmann
Aktualisiert: 27. November 2024

In diesem Teil der Tutorialserie geht es um die LinkedBlockingQueue. Du wirst deren speziellen Eigenschaften kennenlernen und anhand eines Beispiels sehen, wie man diese Queue einsetzt. Außerdem wirst du erfahren, wann du genau diese Queue einsetzen solltest.

Hier befinden wir uns in der Klassenhierarchie:

LinkedBlockingQueue in der Klassenhierarchie
LinkedBlockingQueue in der Klassenhierarchie

LinkedBlockingQueue Eigenschaften

Die Klasse java.util.concurrent.LinkedBlockingQueue basiert – genau wie ConcurrentLinkedQueue – auf einer verketteten Liste, ist allerdings – ebenso wie die im nächsten Teil vorgestellte ArrayBlockingQueue – threadsicher (s. u.), bounded und blockierend.

Anders als die ArrayBlockingQueue bietet LinkedBlockingQueue keine Fairness Policy an. (Fairness Policy bedeutet, dass blockierende Methoden in der Reihenfolge bedient werden, in der sie aufgerufen werden.)

Die Queue-Eigenschaften im Detail:

Unterliegende DatenstrukturThread-safe?Blocking/
Non-blocking
Fairness
Policy
Bounded/
Unbounded
Iterator Type
Verkettete ListeJa
(pessimistisches Locking mit zwei Locks)
BlockingNicht verfügbarBoundedWeakly consistent¹

¹ Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.

Einsatzempfehlung

Ich empfehle LinkedBlockingQueue, wenn du eine blockierende, threadsichere Queue benötigst.

Die Klasse LinkedBlockingQueue wird übrigens von Executors.newFixedThreadPool() und Executors.newSingleThreadedExecutor() als "work queue" für den Executor verwendet. Sie wird somit intensiv genutzt, was die Wahrscheinlichkeit für Bugs äußerst gering hält.

LinkedBlockingQueue Beispiel

Das folgende Beispiel zeigt, wie die LinkedBlockingQueue verwendet wird. Wir erzeugen eine Queue mit der Kapazität 3. Direkt im Anschluss beginnen wir im Abstand von jeweils drei Sekunden Elemente aus der Queue zu lesen. Erst nach 3,5 Sekunden beginnen wir im Abstand von jeweils einer Sekunde Elemente in die Queue zu schreiben (→ Code auf GitHub).

public class LinkedBlockingQueueExample {
  private static final long startTime = System.currentTimeMillis();

  public static void main(String[] args) throws InterruptedException {
    BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

    // Start reading from the queue immediately, every 3 seconds
    for (int i = 0; i < 10; i++) {
      int delaySeconds = i * 3;
      pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS);
    }

    // Start writing to the queue after 3.5 seconds (so there are already 2 threads 
    // waiting), every 1 seconds (so that the queue fills faster than it's emptied, 
    // so that we see a full queue soon)
    for (int i = 0; i < 10; i++) {
      int element = i; // Assign to an effectively final variable
      int delayMillis = 3500 + i * 1000;
      pool.schedule(() -> enqueue(queue, element), delayMillis, TimeUnit.MILLISECONDS);
    }

    pool.shutdown();
    pool.awaitTermination(1, TimeUnit.MINUTES);
  }

  private static void enqueue(BlockingQueue<Integer> queue, int element) {
    log("Calling queue.put(%d) (queue = %s)...", element, queue);
    try {
      queue.put(element);
      log("queue.put(%d) returned (queue = %s)", element, queue);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void dequeue(BlockingQueue<Integer> queue) {
    log("    Calling queue.take() (queue = %s)...", queue);
    try {
      Integer element = queue.take();
      log("    queue.take() returned %d (queue = %s)", element, queue);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void log(String format, Object... args) {
    System.out.printf(
        Locale.US,
        "[%4.1fs] [%-16s] %s%n",
        (System.currentTimeMillis() - startTime) / 1000.0,
        Thread.currentThread().getName(),
        String.format(format, args));
  }
}Code-Sprache: Java (java)

Im folgenden siehst du die Ausgabe des Beispiel-Programms:

[ 0.0s] [pool-1-thread-1 ]     Calling queue.take() (queue = [])...
[ 3.0s] [pool-1-thread-4 ]     Calling queue.take() (queue = [])...
[ 3.5s] [pool-1-thread-8 ] Calling queue.put(0) (queue = [])...
[ 3.5s] [pool-1-thread-1 ]     queue.take() returned 0 (queue = [])
[ 3.5s] [pool-1-thread-8 ] queue.put(0) returned (queue = [])
[ 4.5s] [pool-1-thread-5 ] Calling queue.put(1) (queue = [])...
[ 4.5s] [pool-1-thread-4 ]     queue.take() returned 1 (queue = [])
[ 4.5s] [pool-1-thread-5 ] queue.put(1) returned (queue = [])
[ 5.5s] [pool-1-thread-3 ] Calling queue.put(2) (queue = [])...
[ 5.5s] [pool-1-thread-3 ] queue.put(2) returned (queue = [2])
[ 6.0s] [pool-1-thread-7 ]     Calling queue.take() (queue = [2])...
[ 6.0s] [pool-1-thread-7 ]     queue.take() returned 2 (queue = [])
[ 6.5s] [pool-1-thread-9 ] Calling queue.put(3) (queue = [])...
[ 6.5s] [pool-1-thread-9 ] queue.put(3) returned (queue = [3])
[ 7.5s] [pool-1-thread-6 ] Calling queue.put(4) (queue = [3])...
[ 7.5s] [pool-1-thread-6 ] queue.put(4) returned (queue = [3, 4])
[ 8.5s] [pool-1-thread-2 ] Calling queue.put(5) (queue = [3, 4])...
[ 8.5s] [pool-1-thread-2 ] queue.put(5) returned (queue = [3, 4, 5])
[ 9.0s] [pool-1-thread-10]     Calling queue.take() (queue = [3, 4, 5])...
[ 9.0s] [pool-1-thread-10]     queue.take() returned 3 (queue = [4, 5])
[ 9.5s] [pool-1-thread-1 ] Calling queue.put(6) (queue = [4, 5])...
[ 9.5s] [pool-1-thread-1 ] queue.put(6) returned (queue = [4, 5, 6])
[10.5s] [pool-1-thread-8 ] Calling queue.put(7) (queue = [4, 5, 6])...
[11.5s] [pool-1-thread-4 ] Calling queue.put(8) (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-5 ]     Calling queue.take() (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-5 ]     queue.take() returned 4 (queue = [5, 6, 7])
[12.0s] [pool-1-thread-8 ] queue.put(7) returned (queue = [5, 6, 7])
[12.5s] [pool-1-thread-3 ] Calling queue.put(9) (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-7 ]     Calling queue.take() (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-7 ]     queue.take() returned 5 (queue = [6, 7, 8])
[15.0s] [pool-1-thread-4 ] queue.put(8) returned (queue = [6, 7, 8])
[18.0s] [pool-1-thread-9 ]     Calling queue.take() (queue = [6, 7, 8])...
[18.0s] [pool-1-thread-3 ] queue.put(9) returned (queue = [7, 8, 9])
[18.0s] [pool-1-thread-9 ]     queue.take() returned 6 (queue = [7, 8, 9])
[21.0s] [pool-1-thread-6 ]     Calling queue.take() (queue = [7, 8, 9])...
[21.0s] [pool-1-thread-6 ]     queue.take() returned 7 (queue = [8, 9])
[24.0s] [pool-1-thread-2 ]     Calling queue.take() (queue = [8, 9])...
[24.0s] [pool-1-thread-2 ]     queue.take() returned 8 (queue = [9])
[27.0s] [pool-1-thread-10]     Calling queue.take() (queue = [9])...
[27.0s] [pool-1-thread-10]     queue.take() returned 9 (queue = [])Code-Sprache: Klartext (plaintext)

Da wir erst mit dem Schreiben beginnen, nachdem bereits zwei Threads take() aufrufen, blockieren diese ersten zwei Leseversuche bei 0,0 und 3,0 s (Thread 1 und 4).

Nach 3,5 s wird das erste Element geschrieben (Thread 8). Dadurch wird Thread 1 aufgeweckt und die take() Methode entnimmt dieses Element sofort wieder aus der Queue.

Nach 4,5 s wird das zweite Element geschrieben (Thread 5). Thread 4 wird aufgeweckt und entnimmt dieses Element wieder aus der Queue.

Das Programm schreibt schneller als es liest. Nach 10,5 s blockiert daher zum ersten Mal ein schreibender Thread (Thread 8) beim Versuch die 7 in die - zu dem Zeitpunkt volle - Queue zu schreiben. Nach 11,5 s blockiert ebenfalls Thread 4 beim Versuch die 8 in die Queue zu schreiben.

Nach 12,0 s entnimmt Thread 5 ein Element aus der Queue. Dadurch wird in der Queue ein Platz frei. Thread 8 wird aufgeweckt und schreibt die 7 in die Queue.

Versuch einmal selbst die restlichen Ausgaben zu lesen und zu verstehen.

Ist LinkedBlockingQueue threadsicher?

Ja, LinkedBlockingQueue ist threadsicher.

Die Threadsicherheit der LinkedBlockingQueue wird durch pessimistisches Locking mittels zweier separater ReentrantLock für Schreib- und Leseoperationen gewährleistet. So kann es zu keiner Contention (Zugriffskonflikten) zwischen Producer- und Consumer-Threads kommen.

Unterschiede zu anderen Queues:

  • Bei ConcurrentLinkedQueue wird die Threadsicherheit durch optimistisches Locking via Compare-and-Set gewährleistet, was zu besserer Performance bei niedriger bis moderater Contention führt.
  • ArrayBlockingQueue wird mit nur einem ReentrantLock geschützt, so dass dort Zugriffskonflikte zwischen Producer- und Consumer-Threads möglich sind.

LinkedBlockingQueue Zeitkomplexität

Wie bei allen Queues ist der Aufwand für die Enqueue- und Dequeue-Operationen unabhängig von der Länge der Queue. Die Zeitkomplexität beträgt also O(1).

Dies gilt auch für die size()-Methode. Im Gegensatz zur ebenfalls auf einer verketteten Liste basierenden ConcurrentLinkedQueue, die bei jedem Aufruf von size() die komplette Liste durchläuft, um die Elemente zu zählen, verwendet LinkedBlockingQueue intern ein AtomicInteger, das beim Einfügen und Entnehmen aktualisiert wird, und somit die Größe mit konstantem Aufwand verfügbar hält.

Zusammenfassung und Ausblick

In diesem Artikel hast du die LinkedBlockingQueue kennengelernt – eine threadsichere, blockierende, bounded Queue. An einem Beispiel hast du gesehen, wie du LinkedBlockingQueue einsetzen kannst. Du hast außerdem erfahren, in welchen Fällen du LinkedBlockingQueue einsetzen solltest.

LinkedBlockingQueue basiert auf einer verketteten Liste. Im nächsten Teil des Tutorials geht es um das auf einem Array basierende Pendant – die ArrayBlockingQueue.

Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.