Queue, Deque und Stack in Java Ultimate Guide - feature image

Java Queue, Deque und Stack: Ultimate Guide

von Sven Woltmann – 8. April 2020

Früher oder später müssen sich Java-Entwickler mit den Datenstrukturen Queue, Deque und Stack auseinandersetzen. Dieser Artikel erklärt die grundlegenden Funktionen dieser Datenstrukturen und gibt eine ausführliche Übersicht über alle im JDK vorhandenen Implementierungen. Zahlreiche Code-Beispiele sollen dir das Verständnis erleichern.

Im Detail werden folgende Fragen beantwortet:

  • Wie funktionieren die Datenstrukturen Queue, Deque und Stack im Allgemeinen?
  • Wie unterscheiden sie sich?
  • Was bedeutet „bounded“ bzw. „unbounded“?
  • Was bedeutet „blocking“ bzw. „non-blocking“?
  • Welche Vorteile haben Array-basierte Implementierungen gegenüber verketteten Listen?
  • Welche Queue-, Deque- und Stack-Implementierungen gibt es im JDK?
  • Welche der zahlreichen Implementierungen sind für welche Einsatzzwecke geeignet?
  • Was sind MPMC-, MPSC-, SPMC, SPSC-Queue?

Die Code-Beispiele findest du wie immer in meinem GitHub-Repository.

Datenstrukturen: Was sind Queues, Deques und Stacks?

In diesem Kapitel beschreibe ich die allgemeine Funktionsweise von Queues, Deques und Stacks. Allgemein heißt: unabhängig von einer bestimmten Programmiersprache.

Was ist eine Queue?

Eine Queue (auf deutsch: „Warteschlange“) ist eine Liste von Elementen, bei der die Elemente auf einer Seite eingefügt und in derselben Reihenfolge auf der anderen Seite wieder entnommen werden:

Datenstruktur Queue

Eine Queue bietet folgende Operationen:

  • „Enqueue“: Anhängen von Elementen ans Ende („back“, „tail“) der Queue
  • „Dequeue“: Entnehmen von Elementen vom Anfang („front“, „head“) der Queue

(Die entsprechenden Methoden der Java-Queue-Implementierungen heißen allerdings anders, wie du weiter unten sehen wirst.)

Da die Reihenfolge der Elemente beibehalten wird, spricht man auch von FIFO-Liste („First-in-first-out“).

Was ist ein Stack?

Ein Stack (auf deutsch: „Stapelspeicher“, „Kellerspeicher“ oder kurz „Stapel“, „Keller“) ist eine Liste von Elementen, bei der die Elemente auf derselben Seite (in Darstellungen klassischerweise oben) eingefügt („gestapelt“) und wieder entnommen werden:

Datenstruktur Stack

Die Operationen des Stacks lauten:

  • „Push“: Hinzufügen von Elementen auf den Stack
  • „Pop“: Entnehmen von Elementen von der Oberseite des Stacks

Da das zuletzt hinzugefügte Element als erstes wieder entnommen wird, spricht man von einer LIFO-Liste („Last-in-first-out“).

Was ist ein Deque?

Ein Deque (Double-ended queue, ausgesprochen „Deck“ – eine deutsche Übersetzung gibt es nicht) ist eine Liste von Elementen, bei der die Elemente sowohl auf der einen als auch auf der anderen Seite eingefügt und entnommen werden können:

Datenstruktur Deque

Die Operationen lauten analog zur Queue auf beiden Seiten „Enqueue“ und „Dequeue“:

  • „Enqueue at front“: Anhängen von Elementen an den Anfang des Deques
  • „Enqueue at back“: Anhängen von Elementen an das Ende des Deques
  • „Dequeue at front“: Entnehmen von Elementen vom Anfang des Deques
  • „Dequeue at back“: Entnehmen von Elementen vom Ende des Deques

(Wie bei der Queue heißen auch beim Deque die entsprechenden Methoden der Java-Implementierungen anders, dazu unten mehr.)

Ein Deque kann sowohl als Queue (FIFO) als auch als Stack (LIFO) verwendet werden. Es ist darauf aber nicht beschränkt, da Elemente jederzeit an einer beliebigen Seiten angehängt und entnommen werden können.

Was ist eine bounded Queue bzw. eine unbounded Queue?

Wenn eine Queue nur eine begrenzte Anzahl von Elementen aufnehmen kann, spricht man von einer „bounded Queue“. Die maximale Anzahl von Elementen wird mit „Kapazität“ (englisch „capacity“) bezeichnet und über den Constructor der Queue einmalig festgelegt.

Ist die Anzahl der Elemente in der Queue nicht begrenzt (bzw. nur durch den zur Verfügung stehenden Speicher), spricht man von einer „unbounded Queue“.

Dieselbe Definition gilt auch für Deques und Stacks.

Was ist eine blocking Queue bzw. eine non-blocking Queue?

Bei den vorgestellten Queue-Operationen können folgende Sonderfälle auftreten:

  • Einfügen eines Elements in eine bounded Queue, die ihre Kapazitätsgrenze erreicht hat – oder anders gesagt: die voll ist.
  • Entnehmen eines Elements aus einer leeren Queue.

Je nach Implementierung (dazu später mehr) wird bei einer nicht blockierenden („non-blocking“) Queue ein bestimmter Rückgabewert (false oder null) zurückgeliefert oder eine Exception geworfen.

Eine blocking Queue bietet zusätzlich folgende blockierende Operationen:

  • Eine Methode, die beim Einfügen in eine volle bounded Queue solange wartet, bis durch Entnahme eines Elements (durch einen anderen Thread) Platz freigeworden ist.
  • Eine Methode, die beim Versuch ein Element aus einer leeren Queue zu entnehmen darauf wartet, dass ein Element (durch einen anderen Thread) eingefügt worden ist.

Blockierende Methoden werden dabei nicht automatisch in der Reihenfolge bedient, in der sie aufgerufen wurden. Die Abarbeitung in Aufrufreihenfolge kann bei einigen Queue-Implementierungen durch eine optionale „Fairness Policy“ aktiviert werden. Diese erhöht allerdings den Overhead und verringert damit den Durchsatz der Queue.

Auch diese Definition gilt analog für Deques und Stacks.

UML-Diagramm: Java Queue, Deque, BlockingQueue, BlockingDeque

Bevor ich die Java-Queue– und Deque-Interfaces im Detail vorstelle, möchte ich einen Überblick in Form eines UML-Klassendiagramms geben:

Class diagram: Collection, Queue, Deque, BlockingQueue, BlockingDeque

Die Interfaces werde ich in den folgenden Kapiteln beschreiben und anhand von Code-Beispielen erklären.

Queues in Java

Das JDK enthält seit Java 5.0 das Interface java.util.Queue und mehrere Queue-Implementierungen, die sich in diversen Eigenschaften (wie bounded/unbounded, blocking/non-blocking, threadsicher/nicht threadsicher) unterscheiden.

Queue Interface

Das java.util.Queue-Interface (→ Javadoc) erbt von java.util.Collection und erweitert diese um Methoden, um Elemente anzuhängen, zu entnehmen und das Element am Kopf der Queue anzusehen, ohne es zu entnehmen.

Dabei gibt es für jede der drei Operationen je zwei Methoden, die sich lediglich in der Fehlerbehandlung unterscheiden: Jeweils eine Methode wirft im Fall eines Fehlers eine Exception, die andere liefert einen bestimmten Rückgabewert (false oder null). Fehler können zum Beispiel dann auftreten, wenn eine bounded Queue voll ist, oder wenn man versucht aus einer leeren Queue ein Element zu entnehmen.

Wirft ExceptionRückgabewert
Element anhängen (enqueue)add(E e)offer(E e)
Element entnehmen (dequeue)remove()poll()
Element ansehen (examine)element()peek()

Java Queue Beispiel

Hier ein einfaches Beispiel (→ Code in GitHub):

public class QueueExample {
  public static void main(String[] args) {
    Queue<Integer> queue = new ConcurrentLinkedQueue<>();
    queue.offer(1);
    queue.offer(2);
    queue.offer(3);
    System.out.println("queue = " + queue);
    System.out.println("queue.peek() = " + queue.peek());
    System.out.println("queue.poll() = " + queue.poll());
    System.out.println("queue.poll() = " + queue.poll());
    System.out.println("queue.poll() = " + queue.poll());
    System.out.println("queue.poll() = " + queue.poll());
    System.out.println("queue.remove() = " + queue.remove());
  }
}

Das Programm fügt mit offer() die Elemente 1, 2 und 3 in die Queue ein, gibt dann mit peek() das erste Element aus und entnimmt dann alle Elemente mit poll(). Nachdem alle Elemente entnommen wurden, wird noch einmal poll() und dann remove() aufgerufen.

Das Programm gibt folgendes aus:

queue = [1, 2, 3]
queue.peek() = 1
queue.poll() = 1
queue.poll() = 2
queue.poll() = 3
queue.poll() = null
Exception in thread "main" java.util.NoSuchElementException
    at [...]

Wie du siehst, werden die Elemente in FIFO-Reihenfolge ausgegeben, also in derselben Reihenfolge, wie sie eingefügt wurden. Außerdem sieht man schön den Unterschied zwischen poll() und remove(): Während poll() bei leerer Queue den Wert null zurückliefert, wirft remove() eine NoSuchElementException.

Entsprechend würde auch peek() bei leerer Queue null zurückliefern und element() eine NoSuchElementException werfen.

BlockingQueue Interface

Das ebenso seit Java 5.0 verfügbare Interface java.util.concurrent.BlockingQueue (→ Javadoc) erweitert Queue um zusätzliche blockierende Operationen, die beim Entnehmen eines Elements aus einer leeren Queue darauf warten, dass ein Element vorhanden ist, bzw. beim Einfügen eines Elements in eine volle Queue warten, bis freier Platz verfügbar ist.

Beide Operationen gibt es in jeweils zwei Varianten. Eine, die unbegrenzt lange wartet, und eine, die nach Ablauf einer vorgegebenen Wartezeit aufgibt und false bzw. null zurückliefert. Blockierende Methoden, die Exceptions werfen, gibt es nicht.

Wirft Exception
(geerbt von Queue)
Rückgabewert
(geerbt von Queue)
Blockiert
(neu in BlockingQueue)
Timeout
(neu in BlockingQueue)
Element anhängen (enqueue)add(E e)offer(E e)put(E e)offer(E e,
  long timeout,
  TimeUnit unit)
Element entnehmen
(dequeue)
remove()poll()take()poll(
  long timeout,
  TimeUnit unit)
Element ansehen
(examine)
element()peek()

Java BlockingQueue Beispiel

Hier ein Beispiel, welches aufgrund der Nebenläufigkeit deutlich komplexer ist als das erste Beispiel (→ Code in GitHub):

public class BlockingQueueExample {
  private static final long startTime = System.currentTimeMillis();

  public static void main(String[] args) {
    BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

    // Start reading from the queue immediately, every 3 seconds
    for (int i = 0; i < 10; i++) {
      pool.schedule(() -> dequeue(queue), i * 3, TimeUnit.SECONDS);
    }

    // Start writing to the queue after 3.5 seconds (so there are already 2
    // threads waiting), every 1 seconds (so that the queue fills faster than
    // it's emptied, so that we see a full queue soon)
    for (int i = 0; i < 10; i++) {
      int finalI = i;
      pool.schedule(() -> enqueue(queue, finalI),
          3500 + i * 1000, TimeUnit.MILLISECONDS);
    }

    pool.shutdown();
    try {
      pool.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void dequeue(BlockingQueue<Integer> queue) {
    log("    Calling queue.take() (queue = %s)...", queue);
    try {
      Integer e = queue.take();
      log("    queue.take() returned %d (queue = %s)", e, queue);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void enqueue(BlockingQueue<Integer> queue, int i) {
    log("Calling queue.put(%d) (queue = %s)...", i, queue);
    try {
      queue.put(i);
      log("queue.put(%d) returned (queue = %s)", i, queue);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void log(String format, Object... args) {
    System.out.printf(Locale.US, "[%4.1fs] [%-16s] %s%n",
        (System.currentTimeMillis() - startTime) / 1000.0,
        Thread.currentThread().getName(),
        String.format(format, args));
  }
}

In diesem Beispiel erstellen wir eine blocking, bounded Queue mit der Kapazität 3 und schedulen jeweils zehn Enqueue- und zehn Dequeue-Operationen.

Die Enqueue-Operationen beginnen später, so dass wir am Anfang blockierende Dequeue-Operationen sehen. Außerdem erfolgen die Enqueue-Operationen in kürzen Abständen, so dass die Kapazitätsgrenze der Queue nach einer Weile erreicht ist und wir blockierende Enqueue-Operationen sehen.

Hier die Ausgabe des Programms:

[ 0.0s] [pool-1-thread-1 ]     Calling queue.take() (queue = [])...
[ 3.0s] [pool-1-thread-3 ]     Calling queue.take() (queue = [])...
[ 3.5s] [pool-1-thread-2 ] Calling queue.put(0) (queue = [])...
[ 3.5s] [pool-1-thread-2 ] queue.put(0) returned (queue = [])
[ 3.5s] [pool-1-thread-1 ]     queue.take() returned 0 (queue = [])
[ 4.5s] [pool-1-thread-10] Calling queue.put(1) (queue = [])...
[ 4.5s] [pool-1-thread-10] queue.put(1) returned (queue = [])
[ 4.5s] [pool-1-thread-3 ]     queue.take() returned 1 (queue = [])
[ 5.5s] [pool-1-thread-8 ] Calling queue.put(2) (queue = [])...
[ 5.5s] [pool-1-thread-8 ] queue.put(2) returned (queue = [2])
[ 6.0s] [pool-1-thread-9 ]     Calling queue.take() (queue = [2])...
[ 6.0s] [pool-1-thread-9 ]     queue.take() returned 2 (queue = [])
[ 6.5s] [pool-1-thread-5 ] Calling queue.put(3) (queue = [])...
[ 6.5s] [pool-1-thread-5 ] queue.put(3) returned (queue = [3])
[ 7.5s] [pool-1-thread-6 ] Calling queue.put(4) (queue = [3])...
[ 7.5s] [pool-1-thread-6 ] queue.put(4) returned (queue = [3, 4])
[ 8.5s] [pool-1-thread-7 ] Calling queue.put(5) (queue = [3, 4])...
[ 8.5s] [pool-1-thread-7 ] queue.put(5) returned (queue = [3, 4, 5])
[ 9.0s] [pool-1-thread-4 ]     Calling queue.take() (queue = [3, 4, 5])...
[ 9.0s] [pool-1-thread-4 ]     queue.take() returned 3 (queue = [4, 5])
[ 9.5s] [pool-1-thread-2 ] Calling queue.put(6) (queue = [4, 5])...
[ 9.5s] [pool-1-thread-2 ] queue.put(6) returned (queue = [4, 5, 6])
[10.5s] [pool-1-thread-1 ] Calling queue.put(7) (queue = [4, 5, 6])...
[11.5s] [pool-1-thread-10] Calling queue.put(8) (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-3 ]     Calling queue.take() (queue = [4, 5, 6])...
[12.0s] [pool-1-thread-3 ]     queue.take() returned 4 (queue = [5, 6, 7])
[12.0s] [pool-1-thread-1 ] queue.put(7) returned (queue = [5, 6, 7])
[12.5s] [pool-1-thread-8 ] Calling queue.put(9) (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-9 ]     Calling queue.take() (queue = [5, 6, 7])...
[15.0s] [pool-1-thread-9 ]     queue.take() returned 5 (queue = [6, 7, 8])
[15.0s] [pool-1-thread-10] queue.put(8) returned (queue = [6, 7, 8])
[18.0s] [pool-1-thread-5 ]     Calling queue.take() (queue = [6, 7, 8])...
[18.0s] [pool-1-thread-5 ]     queue.take() returned 6 (queue = [7, 8, 9])
[18.0s] [pool-1-thread-8 ] queue.put(9) returned (queue = [7, 8, 9])
[21.0s] [pool-1-thread-6 ]     Calling queue.take() (queue = [7, 8, 9])...
[21.0s] [pool-1-thread-6 ]     queue.take() returned 7 (queue = [8, 9])
[24.0s] [pool-1-thread-7 ]     Calling queue.take() (queue = [8, 9])...
[24.0s] [pool-1-thread-7 ]     queue.take() returned 8 (queue = [9])
[27.0s] [pool-1-thread-4 ]     Calling queue.take() (queue = [9])...
[27.0s] [pool-1-thread-4 ]     queue.take() returned 9 (queue = [])

Zu Beginn ist die Queue leer, so dass die ersten zwei Leseversuche (nach 0 und 3 s) blockieren.

Nach 3,5 s (nachdem also zwei lesende Threads an der Queue warten) beginnt das Programm sekündlich in die Queue zu schreiben. Man sieht in der Ausgabe schön, wie dabei jeweils ein lesender Thread fortgesetzt wird und das angehängte Element sofort wieder entnimmt (bei 3,5 und 4,5 s).

Da das Programm dreimal so schnell in die Queue schreibt wie es daraus liest, blockiert nach 10,5 s der Versuch eine 7 in die Queue zu schreiben, da diese mit den Elementen [4, 5, 6] ihre Kapazitätsgrenze von 3 erreicht hat.

Erst nachdem nach 12 s die 4 aus der Queue entnommen wurde, kann die 7 eingefügt werden. Für die 8 und die 9 sehen wir ein entsprechendes Verhalten.

Deques in Java

Im JDK 6.0 wurden das Interface java.util.Deque und diverse Deque-Implementierungen aufgenommen. Diese unterscheiden sich ebenso wie die Queue-Implementierungen in den Eigenschaften bounded/unbounded, blocking/non-blocking und threadsicher/nicht threadsicher.

Deque Interface

Das Interface java.util.Deque (→ JavaDoc) erbt von java.util.Queue und erweitert diese um Methoden, um Elemente am Anfang der Queue anzuhängen, Elemente am Ende der Queue zu entnehmen und das Element am Ende der Queue anzusehen, ohne es zu entnehmen.

Auch diese Methoden unterscheiden sich wieder darin, ob sie im Fehlerfall eine Exception werfen oder einen speziellen Rückgabewert liefern.

Aus Konsistenzgründen wurden die Operationen, die Deque von Queue erbt, mit neuem Namen erneut implementiert (beispielsweise Queue.offer() als Deque.offerLast() und Queue.poll() als Deque.pollFirst()). Diese Fälle erkennst du daran, dass in der folgenden Tabelle zwei Methodennamen angegeben sind.

AnfangEnde
Wirft ExceptionRückgabewertWirft ExceptionRückgabewert
Element anhängen
(enqueue)
addFirst(E e)offerFirst(E e)addLast(E e)
add(E e)
offerLast(E e)
offer(E e)
Element entnehmen
(dequeue)
removeFirst()
remove()
pollFirst()
poll()
removeLast()pollLast()
Element ansehen
(examine)
getFirst()
element()
peekFirst()
peek()
getLast()peekLast()

Java Deque Beispiel

Hier ein Code-Beispiel, dass exakt das Deque produziert, das am Anfang des Artikels grafisch dargestellt wurde (→ Code in GitHub):

public class DequeExample {
  public static void main(String[] args) {
    Deque<Integer> dequeue = new ArrayDeque<>();
    dequeue.offerFirst(20); // -> 20
    dequeue.offerFirst(21); // -> 21 20
    dequeue.offerFirst(22); // -> 22 21 20
    dequeue.offerLast(23);  // -> 22 21 20 23
    dequeue.offerLast(24);  // -> 22 21 20 23 24
    dequeue.offerLast(25);  // -> 22 21 20 23 24 25
    dequeue.offerFirst(26); // -> 26 22 21 20 23 24 25
    dequeue.offerFirst(27); // -> 27 26 22 21 20 23 24 25
    System.out.println("dequeue = " + dequeue);

    System.out.println();
    System.out.println("dequeue.offerLast(28) = " + dequeue.offerLast(28));
    System.out.println("dequeue.offerFirst(29) = " + dequeue.offerFirst(29));
    System.out.println("dequeue = " + dequeue);

    while (!dequeue.isEmpty()) {
      System.out.println();
      System.out.println("dequeue.pollFirst() = " + dequeue.pollFirst());
      System.out.println("dequeue.pollLast() = " + dequeue.pollLast());
      System.out.println("dequeue = " + dequeue);
    }
  }
}

Das Programm füllt mit mehreren Aufrufen von offerFirst() und offerLast() das Deque, gibt es aus und entnimmt dann abwechselnd von beiden Seiten solange Elemente, bis das Deque leer ist.

Dies ist die Ausgabe des Programms:

dequeue = [27, 26, 22, 21, 20, 23, 24, 25]

dequeue.offerLast(28) = true
dequeue.offerFirst(29) = true
dequeue = [29, 27, 26, 22, 21, 20, 23, 24, 25, 28]

dequeue.pollFirst() = 29
dequeue.pollLast() = 28
dequeue = [27, 26, 22, 21, 20, 23, 24, 25]

dequeue.pollFirst() = 27
dequeue.pollLast() = 25
dequeue = [26, 22, 21, 20, 23, 24]

dequeue.pollFirst() = 26
dequeue.pollLast() = 24
dequeue = [22, 21, 20, 23]

dequeue.pollFirst() = 22
dequeue.pollLast() = 23
dequeue = [21, 20]

dequeue.pollFirst() = 21
dequeue.pollLast() = 20
dequeue = []

Wer genau hinschaut stellt fest, dass ArrayDeque.toString() die Elemente in umgekehrter Reihenfolge ausgibt (nämlich vom Kopf zum Ende), als sie in der Grafik am Anfang des Artikels dargestellt wurden und wie sie im Queue-Beispiel von ConcurrentLinkedQueue.toString() ausgegeben wurden (vom Ende zum Kopf).

BlockingDeque Interface

Analog zur BlockingQueue bietet java.util.concurrent.BlockingDeque (→ Javadoc) zusätzlich blockierende Operationen, die beim Entnehmen eines Elements aus dem Deque warten, bis ein Element verfügbar ist, bzw. die beim Einfügen eines Elements warten, bis Platz verfügbar ist.

Wiederum aus Konsistenzgründen wurden die Operationen, die BlockingDeque von BlockingQueue erbt, mit neuem Namen erneut implementiert. Diese Fälle erkennst du wieder daran, dass in einem Tabellenfeld zwei Methodennamen angegeben sind.

Die nunmehr 30 Methoden habe ich in zwei Tabellen aufgeteilt.

Operationen am Anfang (Kopf) des Deques

Wirft Exception
(geerbt von Deque)
Rückgabewert
(geerbt von Deque)
Blockiert
(neu in BlockingDeque)
Timeout
(neu in BlockingDeque)
Element
anhängen
(enqueue)
addFirst(E e)offerFirst(E e)putFirst(E e)offerFirst(E e,
  long timeout,
  TimeUnit unit)
Element
entnehmen
(dequeue)
removeFirst()
remove()
pollFirst()
poll()
takeFirst()
take()
pollFirst(
  long timeout,
  TimeUnit unit)
poll(
  long timeout,
  TimeUnit unit)
Element
ansehen
(examine)
getFirst()
element()
peekFirst()
peek()

Operationen am Ende des Deques

Wirft Exception
(geerbt von Deque)
Rückgabewert
(geerbt von Deque)
Blockiert
(neu in BlockingDeque)
Timeout
(neu in BlockingDeque)
Element
anhängen
(enqueue)
addLast(E e)
add(E e)
offerLast(E e)
offer(E e)
putLast(E e)
put(E e)
offerLast(E e,
  long timeout,
  TimeUnit unit)
offer(E e,
  long timeout,
  TimeUnit unit)
Element
entnehmen
(dequeue)
removeLast()pollLast()takeLast()pollLast(
  long timeout,
  TimeUnit unit)
Element
ansehen
(examine)
getLast()peekLast()

Java BlockingDeque Beispiel

Das Code-Beispiel zum BlockingDeque ist noch länger als die bisherigen. Daher verzichte ich darauf, es hier im Artikel abzudrucken. Du findest es (wie all die anderen Code-Beispiele dieses Artikels) in meinem GitHub-Repository.

Das Beispiel basiert auf dem vorangegangenen BlockingQueue-Beispiel und fügt Elemente auf einer zufälligen Seiten hinzu bzw. entnimmt sie von einer zufälligen Seite des Deques.

Stacks in Java

Ebenso alt wie Java ist die seit der Version 1.0 vorhandene Klasse java.util.Stack (→ Javadoc). Stack erbt von java.util.Vector und erweitert diesen um die Stack-Methoden push(E item), pop() und peek().

Genau wie Vector ist auch Stack threadsicher: Alle Methoden sind synchronized.

Java Stack Beispiel

Hier ist ein kleines Beispiel zur Verwendung von Stack:

public class StackExample {
  public static void main(String[] args) {
    Stack<Integer> stack = new Stack<>();
    stack.push(1);
    stack.push(2);
    stack.push(3);
    System.out.println("stack = " + stack);
    System.out.println("stack.peek() = " + stack.peek());
    System.out.println("stack.pop() = " + stack.pop());
    System.out.println("stack.pop() = " + stack.pop());
    System.out.println("stack.pop() = " + stack.pop());
    System.out.println("stack.pop() = " + stack.pop());
  }
}

Die Ausgabe dieses Programms lautet:

stack = [1, 2, 3]
stack.peek() = 3
stack.pop() = 3
stack.pop() = 2
stack.pop() = 1
Exception in thread "main" java.util.EmptyStackException
    at java.base/java.util.Stack.peek(Stack.java:101)
    at java.base/java.util.Stack.pop(Stack.java:83)
    at eu.happycoders.cds.stack.StackExample.main(StackExample.java:16)

Sowohl pop() als auch peek() werfen eine EmptyStackException, wenn der Stack keine Elemente enthält.

Warum man Stack nicht verwenden sollte

Die Java-Entwickler empfehlen Stack nicht mehr zu verwenden. Das Javadoc besagt: „A more complete and consistent set of LIFO stack operations is provided by the Deque interface and its implementations, which should be used in preference to this class.“

Was genau bedeutet das? Meiner Meinung nach sollte Stack aus den folgenden Gründen nicht verwendet werden:

  1. Die Verwendung von synchronized ist kein besonders performantes Mittel, um eine Datenstruktur threadsicher zu machen. Bessere Methoden sind ReentrantLocks und CAS („Compare-and-swap“)-Operationen, wie sind in den Queue- und Deque-Implementierungen zu finden sind.
  2. Durch die Erweiterung von Vector stellt Stack Operationen zur Verfügung, die in einem Stack nichts zu suchen haben, wie der Zugriff auf Elemente über deren Index sowie das Einfügen und Löschen von Elementen an beliebigen Positionen.
  3. Stack implementiert kein Interface. Mit der Verwendung von Stack legt man sich also auf eine bestimmte Implementierung fest.

Stattdessen empfehlen die Java-Entwickler eine der Deque-Implementierungen wie bspw. ArrayDeque zu verwenden. Fairerweise ist zu sagen, dass auch Deques Operationen anbieten, die ein Stack eigentlich nicht zur Verfügung stellen sollte, wie das Einfügen und Entfernen von Elementen am Boden des Stacks.

Eine bessere Stack-Implementierung

Eine saubere Stack-Implementierung besteht aus einem Stack-Interface und einer oder mehrerer Implementierungen – wobei nichts dagegen spricht, diese als Adapter um eine bestehende Deque-Implementierung zu realisieren.

Stack-Interface (→ Code in GitHub):

public interface Stack<E> extends Collection<E> {
  void push(E item);
  E pop();
  E peek();
}

Nicht threadsichere Stack-Implementierung auf Basis von ArrayDeque (→ Code in GitHub):

public class ArrayDequeStack<E> implements Stack<E> {
  private final Deque<E> deque = new ArrayDeque<>();

  @Override
  public void push(E item) {
    deque.addFirst(item);
  }

  @Override
  public E pop() {
    return deque.removeFirst();
  }

  @Override
  public E peek() {
    return deque.peekFirst();
  }

  // All other Collection methods are forwarded
  // to their corresponding methods in deque:
  // [...]
}

Array- vs. Linked List-basierte Queues und Deques

Bevor wir zu den Queue- und Deque-Implementierungen und ihren Eigenschaften kommen, hier noch ein kurzer Exkurs zu den zugrunde liegenden Datenstrukturen: Arrays und verkettete Listen („linked lists“).

Für gewöhnlich liefern Array-basierte Datenstrukturen in Java eine deutlich besseren Performance.

Zum einen ist der Speicherbedarf bei einem Array deutlich geringer*:

  • Array (z. B. ArrayList): 4 Bytes pro Eintrag.
  • Doppelt verkettete Liste (z. B. LinkedList): 24 Bytes pro Eintrag – jeder Eintrag benötigt ein Node-Objekt, und diese benötigen jeweils 12 Bytes für den Objekt-Header + 3 × 4 Bytes für die item-, next– und prev-Referenzen.
  • Einfach verkettete Liste: ebenfalls 24 Bytes pro Eintrag – die Knoten benötigen zwar keine prev-Referenzen, doch dafür werden pro Node-Objekt 4 Füllbytes („pad bytes“) eingefügt, da Objekte immer ein Vielfaches von 8 Bytes an Speicher belegen müssen.

Zum anderen haben Arrays den Vorteil, dass deren Elemente an aufeinanderfolgenden Speicheradressen liegen, so dass beim Zugriff auf den Hauptspeicher alle Array-Elemente, die auf derselben Memory Page liegen, gleichzeitig in den CPU-Cache geladen werden.

Die Knoten einer verketteten Listen hingegen können quer über den Heap verteilt sein, so dass beim Iterieren über eine verkettete Liste im worst case für jedes Element eine neue Memory Page in den CPU-Cache geladen werden muss.

Ein dritter Nachteil von verketteten Listen ist, dass diese für den Garbage Collector einen höheren Aufwand darstellen als Arrays, da der GC bei der „Reachability Analysis“ allen Knoten der Liste folgen muss, um erreichbare Objekte zu markieren.

Grundsätzlich sollte man daher Array-basierte Datenstrukturen vorziehen. Abhängig von der Art der Verwendung und der Implementierung der Threadsicherheit können jedoch auch auf verketteten Listen basierte Queues/Deques schneller sein. Ich werde darauf bei der Beschreibung der einzelnen Queue/Deque-Implementierungen näher eingehen.

(* Ich gehe bei den Speicherbetrachtungen von einer 64-Bit-JVM mit aktivierten Compressed Oops aus.)

Java Queue- und Deque-Implementierungen (non-blocking)

In den folgenden Abschnitten findest du Beschreibungen und die wichtigsten Eigenschaften aller im JDK verfügbaren Queue- und Deque-Implementierungen – zunächst die nicht blockierenden („non-blocking“) Varianten, gefolgt von den blockierenden („blocking“).

Zu jeder Implementierung empfehle ich sinnvolle Einsatzzwecke.

ArrayDeque

Die Klasse java.util.ArrayDeque (→ JavaDoc) basiert – wie der Name schon sagt – auf einem Array. Auf dessen Basis wird ein Ringpuffer („circular buffer“) implementiert. Wie der genau funktioniert, kannst du in diesem Wikipedia-Artikel nachlesen.

Der Ringbuffer hat den Vorteil, das weder beim Einfügen noch beim Entnehmen von Elementen auf beiden Seiten des Deques Elemente innerhalb des Arrays verschoben werden müssen. Wir bekommen hier also die Flexibilität einer verketteten Liste ohne deren Performance-Nachteile.

Das Array wächst bei Bedarf, wird jedoch weder automatisch verkleinert, noch kann es manuell verkleinert werden.

Die weiteren Eigenschaften von ArrayDeque:

  • Non-blocking
  • Unbounded
  • Nicht thread-safe
  • Der Iterator ist weder fail-fast (d. h. er wirft keine ConcurrentModificationException) noch weakly consistent (d. h. es werden keinerlei Garantien bzgl. der Konsistenz bei nebenläufiger Änderung gemacht).
  • Zeitkomplexität von size(): O(1)

Einsatzempfehlung

Das ArrayDeque ist für single-threaded Anwendungen (und nur dafür) eine gute Wahl. Die Tatsache, dass das zugrunde liegende Array nicht verkleinert werden kann, sollte im Auge behalten werden.

ArrayDeque-Beispiel

Ein ArrayDeque-Code-Beispiel findest du weiter oben im Abschnitt „Java Deque Beispiel“.

LinkedList

Die Klasse java.util.LinkedList (→ Javadoc) implementiert eine klassische doppelt verkettete Liste. Sie existiert im JDK seit Version 1.2, also deutlich länger als das Deque-Interface. Die Deque-Methoden wurden mit Einführung dieses Interfaces im JDK 6.0 hinzugefügt.

Die weiteren Eigenschaften von LinkedList sind:

  • Non-blocking
  • Unbounded
  • Nicht thread-safe
  • Der Iterator ist fail-fast, d. h. er wirft bei gleichzeitiger Änderung eine ConcurrentModificationException.
  • Zeitkomplexität von size(): O(1) – die Größe wird in einem int-Feld der LinkedList gespeichert, d. h. zur Bestimmung der Größe muss nicht über die komplette Liste iteriert werden (was eine Zeitkomplexität von O(n) zur Folge hätte).

Einsatzempfehlung

Ich rate grundsätzlich von der Verwendung der LinkedList ab. Wenn du eine Liste benötigst, ist ArrayList aus den o. g. Performance-Gründen die bessere Wahl; wenn du eine nicht threadsichere Queue oder ein nicht threadsicheres Deque benötigst, dann verwende ein ArrayDeque.

LinkedList-Beispiel

Um mit der LinkedList zu experimentieren, kannst du das oben vorgestellte „Java Deque Beispiel“ kopieren und die Zeile

    Deque dequeue = new ArrayDeque<>();

durch

    Deque dequeue = new LinkedList<>();

ersetzen.

ConcurrentLinkedQueue und ConcurrentLinkedDeque

Kommen wir zu den ersten threadsicheren Queue- und Deque-Implementierungen.

Die Klasse java.util.concurrent.ConcurrentLinkedQueue (→ Javadoc) basiert auf einer einfach verketteten Liste; java.util.concurrent.ConcurrentLinkedDeque (→ Javadoc) auf einer doppelt verketteten Liste.

Beide Implementierungen garantieren Threadsicherheit durch nicht-blockierende CAS („Compare-and-swap“)-Operationen auf VarHandles, wodurch ein deutlicher Performance-Gewinn gegenüber der Verwendung von synchronized oder Locks wie ReentrantLock erzielt wird.

Die weiteren Eigenschaften von ConcurrentLinkedQueue und ConcurrentLinkedDeque:

  • Non-blocking
  • Unbounded
  • Der Iterator ist weakly consistent, d. h. alle Queue-/Deque-Elemente, die zum Zeitpunkt der Erzeugung des Interators existieren, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können, müssen aber nicht, durch den Iterator berücksichtigt werden.
  • Zeitkomplexität von size(): O(n), da für die Bestimmung der Größe über alle Elemente der Queue iteriert wird. Wenn während der Bestimmung der Größe Queue-Elemente hinzugefügt oder entnommen werden, kann das Ergebnis ungenau sein.

Einsatzempfehlung

ConcurrentLinkedQueue und ConcurrentLinkedDeque sind eine gute Wahl, wenn eine nicht blockierende, unbounded und threadsichere Queue (bzw ein entsprechendes Deque) benötigt wird.

Dies gilt trotz meiner allgemeinen Empfehlung Array-basierte Datenstrukturen vorzuziehen. Das ArrayDeque ist nicht thread-safe. Und die im nächsten Kapitel beschriebene ArrayBlockingQueue ist zum einen bounded und zum anderen wird die Threadsicherheit über ein einzelnes ReentrantLock implementiert, was wesentlich unperformanter ist als Compare-and-swap auf VarHandles.

ConcurrentLinkedQueue-Beispiel

Die ConcurrentLinkedQueue haben wir bereits im ersten Code-Beispiel im Abschnitt „Java Queue Beispiel“ eingesetzt.

ConcurrentLinkedDeque-Beispiel

Das folgende Beispiel demonstriert die Threadsicherheit von ConcurrentLinkedDeque. Vier schreibende und drei lesende Threads fügen nebenläufig Elemente auf zufälligen Seiten des Deques hinzu und entfernen sie (→ Code in GitHub):

public class ConcurrentLinkedDequeExample {
  private static final Deque<Integer> deque = new ConcurrentLinkedDeque<>();

  public static void main(String[] args) {
    // Start 4 writing threads
    for (int i = 0; i < 4; i++)
      new ModifyingThread(true).start();

    // Start 3 reading threads
    for (int i = 0; i < 3; i++)
      new ModifyingThread(false).start();
  }

  private static final class ModifyingThread extends Thread {
    private final boolean write;

    private ModifyingThread(boolean write) {
      this.write = write;
    }

    @Override
    public void run() {
      for (; ; ) {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        try {
          Thread.sleep(random.nextInt(500, 2000));
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
        boolean first = random.nextBoolean();
        if (write) {
          Integer e = random.nextInt(1000);
          if (first) {
            deque.offerFirst(e);
            System.out.printf("deque.offerFirst(%3d)        --> deque = %s%n",
                e, deque);
          } else {
            deque.offerLast(e);
            System.out.printf("deque.offerLast(%3d)         --> deque = %s%n",
                e, deque);
          }
        } else {
          if (first)
            System.out.printf("    deque.pollFirst() = %4d --> deque = %s%n",
                deque.pollFirst(), deque);
          else
            System.out.printf("    deque.pollLast()  = %4d --> deque = %s%n",
                deque.pollLast(), deque);
        }
      }
    }
  }
}

Eine beispielhafte Ausgabe:

deque.offerFirst(388)         --> deque = [388]
deque.offerLast(900)          --> deque = [388, 900]
    deque.pollFirst() =  388  --> deque = [900]
deque.offerLast(906)          --> deque = [900, 906]
    deque.pollLast()  =  906  --> deque = [900]
    deque.pollFirst() =  900  --> deque = []
deque.offerLast(727)          --> deque = [727]
deque.offerFirst(720)         --> deque = [720, 727]
    deque.pollLast()  =  727  --> deque = [720]
deque.offerFirst(415)         --> deque = [415, 720]
deque.offerLast(614)          --> deque = [415, 720, 614]
deque.offerFirst(130)         --> deque = [130, 415, 720, 614]

PriorityQueue

Bei der Klasse java.util.PriorityQueue (→ Javadoc) handelt es sich nicht um eine Queue im klassischen Sinne, da die Elemente nicht in FIFO-Reihenfolge entnommen werden, sondern entsprechend ihrer natürlichen Ordnung bzw. entsprechend eines dem Constructor übergebenen Comparators.

Am Kopf der Queue befindet sich das jeweils kleinste Element. Die Sortierung ist dabei nicht stabil, d. h. zwei Elemente, die entsprechend der Sortierreihenfolge an gleicher Position stehen, werden nicht zwangsläufig in derselben Reihenfolge entnommen, wie sie in die Queue eingefügt wurden.

Der weiteren Eigenschaften von PriorityQueue sind:

  • Non-blocking
  • Unbounded
  • Array-basiert
  • Nicht thread-safe
  • Der Iterator ist fail-fast, d. h. er wirft bei gleichzeitiger Änderung eine ConcurrentModificationException.
  • Intern werden die Elemente in einem Array gespeichert, das einen Min-Heap repräsentiert; der Iterator durchläuft die Elemente in entsprechender Reihenfolge.
  • Zeitkomplexität von size(): O(1)

Einsatzempfehlung

Die PriorityQueue kann genau dann eingesetzt werden, wenn eine nicht threadsichere Queue mit oben beschriebener Dequeue-Reihenfolge benötigt wird.

Allerdings ist zu beachten, dass die PriorityQueue im JDK nur an sehr wenigen Stellen eingesetzt wird und somit eine gewisse Wahrscheinlichkeit für das Vorhandensein von Bugs existiert (was wenig genutzt wird, wird auch wenig getestet).

PriorityQueue-Beispiel

Hier ist ein kleines Beispiel, in dem zehn Zufallszahlen in eine PriorityQueue geschrieben und dann wieder entnommen werden (→ Code in GitHub):

public class PriorityQueueExample {
  public static void main(String[] args) {
    Queue<Integer> queue = new PriorityQueue<>();

    // Enqueue random numbers
    for (int i = 0; i < 10; i++) {
      Integer e = ThreadLocalRandom.current().nextInt(100);
      queue.offer(e);
      System.out.printf("queue.offer(%2d)    --> queue = %s%n", e, queue);
    }

    // Dequeue all elements
    while (!queue.isEmpty()) {
      System.out.printf("queue.poll() = %2d  --> queue = %s%n",
          queue.poll(), queue);
    }
  }
}

Es folgt eine beispielhafte Ausgabe des Programms, in der gut zu sehen ist, wie die Elemente in der Queue in keiner bestimmten Reihenfolge angezeigt werden (lediglich das kleinste Element ist immer vorne) und wie sie letztendlich in aufsteigender Reihenfolge entnommen werden:

queue.offer(44)    --> queue = [44]
queue.offer(18)    --> queue = [18, 44]
queue.offer(58)    --> queue = [18, 44, 58]
queue.offer(98)    --> queue = [18, 44, 58, 98]
queue.offer(45)    --> queue = [18, 44, 58, 98, 45]
queue.offer(83)    --> queue = [18, 44, 58, 98, 45, 83]
queue.offer(31)    --> queue = [18, 44, 31, 98, 45, 83, 58]
queue.offer(21)    --> queue = [18, 21, 31, 44, 45, 83, 58, 98]
queue.offer(22)    --> queue = [18, 21, 31, 22, 45, 83, 58, 98, 44]
queue.offer(19)    --> queue = [18, 19, 31, 22, 21, 83, 58, 98, 44, 45]
queue.poll() = 18  --> queue = [19, 21, 31, 22, 45, 83, 58, 98, 44]
queue.poll() = 19  --> queue = [21, 22, 31, 44, 45, 83, 58, 98]
queue.poll() = 21  --> queue = [22, 44, 31, 98, 45, 83, 58]
queue.poll() = 22  --> queue = [31, 44, 58, 98, 45, 83]
queue.poll() = 31  --> queue = [44, 45, 58, 98, 83]
queue.poll() = 44  --> queue = [45, 83, 58, 98]
queue.poll() = 45  --> queue = [58, 83, 98]
queue.poll() = 58  --> queue = [83, 98]
queue.poll() = 83  --> queue = [98]
queue.poll() = 98  --> queue = []

Java Queue- und Deque-Implementierungen (blocking)

In den folgenden Abschnitten beschreibe ich alle blockierenden Queue- und Deque-Implementierungen des JDK, d. h. diejenigen Klassen, die die Interfaces BlockingQueue bzw. BlockingDeque implementieren.

ArrayBlockingQueue

Die Klasse java.util.concurrent.ArrayBlockingQueue (→ Javadoc) basiert – wie das ArrayDeque – auf einem Array, ist allerdings

  • threadsicher,
  • bounded (hat eine maximale Kapazität),
  • ist entsprechend blockierend
  • und bietet eine Fairness Policy (zur Erinnerung: blockierende Methoden werden in der Reihenfolge bedient, in der sie aufgerufen wurden).

Die Threadsicherheit wird durch ein einzelnes ReentrantLock gewährleistet, so dass es bei gleichzeitigen Schreib- und Lesezugriffen zu einer hohen Anzahl an Zugangskonflikten („thread contention“) kommen kann.

Die weiteren Eigenschaften der ArrayBlockingQueue sind:

  • Der Iterator ist weakly consistent (hier noch einmal die Definition: Alle Queue-/Deque-Elemente, die zum Zeitpunkt der Erzeugung des Interators existieren, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können, müssen aber nicht, durch den Iterator berücksichtigt werden).
  • Zeitkomplexität von size(): O(1)

Einsatzempfehlung

Aufgrund der möglicherweise hohen Contention bei gleichzeitigem Schreib- und Lesezugriff solltest du – wenn du eine blockierende, threadsichere Queue benötigst – für deinen speziellen Einsatzweck testen, ob evtl. eine LinkedBlockingQueue performanter ist. Diese basiert zwar auf einer verketteten Liste, verwendet allerdings zum Schreiben und Lesen zwei separate ReentrantLocks, was die Zugangskonflikte reduziert.

ArrayBlockingQueue-Beispiel

Um die ArrayBlockingQueue auszuprobieren, kannst du das „Java BlockingQueue Beispiel“ von oben kopieren und

      BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);

durch

      BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);

ersetzen.

LinkedBlockingQueue und LinkedBlockingDeque

Die Klassen java.util.concurrent.LinkedBlockingQueue (→ Javadoc) und java.util.concurrent.LinkedBlockingDeque (→ Javadoc) basieren – genau wie ConcurrentLinkedQueue und ConcurrentLinkedDeque – auf verketteten Listen, sind allerdings – ebenso wie ArrayBlockingQueue

  • threadsicher,
  • bounded (haben eine maximale Kapazität)
  • und blockierend.

Die Threadsicherheit der ConcurrentLinkedQueue wird im Gegensatz zur ArrayBlockingQueue nicht mit einem, sondern mit zwei separaten ReentrantLocks gewährleistet – eines für Schreib- und eines für Lesezugriff. Dies reduziert die Zugangskonflikte zwischen Producer- und Consumer-Threads.

Die weiteren Eigenschaften von LinkedBlockingQueue und LinkedBlockingDeque sind:

  • keine Fairness Policy verfügbar
  • Der Iterator ist weakly consistent (Definition s. o.).
  • Zeitkomplexität von size(): O(1) – die Implementierungen verwenden intern ein AtomicInteger, um die Größe verfügbar zu halten (im Gegensatz zu ConcurrentLinkedQueue und ConcurrentLinkedDeque, die bei der Abfrage der Größe über alle Elemente iterieren müssen).

Einsatzempfehlung

Ich empfehle LinkedBlockingQueue und LinkedBlockingDeque, wenn du blockierende, threadsichere Queues/Deques brauchst.

Die Klasse LinkedBlockingQueue wird von Executors.newFixedThreadPool() und Executors.newSingleThreadedExecutor() als „work queue“ für den Executor verwendet. Sie wird somit intensiv genutzt, was die Wahrscheinlichkeit für Bugs äußerst gering hält.

LinkedBlockingQueue- und LinkedBlockingDeque-Beispiele

Code-Beispiele für LinkedBlockingQueue und LinkedBlockingDeque findest du in den Abschnitten „Java BlockingQueue Beispiel“ und „Java BlockingDeque Beispiel“ weiter oben.

PriorityBlockingQueue

Bei der java.util.concurrent.PriorityBlockingQueue (→ Javadoc) handelt es sich um eine threadsichere und blockierende Variante der PriorityQueue.

Threadsicherheit wird durch ein einzelnes ReentrantLock sichergestellt.

Im Gegensatz zu allen bisher vorgestellten blockierenden Implementierungen ist die PriorityBlockingQueue nicht bounded, sie hat also keine Kapazitätsgrenze. Das bedeutet, dass die Methode add(e) keine Exception wirft, offer(e) niemals false zurückliefert und put(e) und offer(e, time, unit) niemals blockieren. Nur die Dequeue-Operationen blockieren bzw. liefern null zurück, wenn die Queue leer ist.

Die weiteren Eigenschaften der PriorityBlockingQueue:

  • Array-basiert
  • keine Fairness Policy verfügbar
  • Der Iterator ist weakly consistent (Definition s. o.).
  • Wie bei der PriorityQueue werden die Elemente in einem Array gespeichert, das einen Min-Heap repräsentiert; der Iterator durchläuft die Elemente in entsprechender Reihenfolge.
  • Zeitkomplexität von size(): O(1)

Einsatzempfehlung

Die PriorityBlockingQueue wird im JDK nicht genutzt. Es ist also eine gewisse Wahrscheinlichkeit vorhanden, dass sie Bugs enthält. Wenn du eine Queue mit entsprechenden Eigenschaften benötigst und die PriorityBlockingQueue verwendest, solltest du intensiv testen.

PriorityBlockingQueue-Beispiel

Als Beispiel für die PriorityBlockingQueue habe ich den Code aus dem Abschnitt „Java BlockingQueue Beispiel“ leicht abgewandelt: Statt einer LinkedBlockingQueue wird die PriorityBlockingQueue verwendet; und statt der Zahlen 0 bis 9 werden Zufallszahlen in die Queue geschrieben.

Das Beispiel drucke ich hier aufgrund der minimalen Code-Änderungen nicht ab. Du findest es hier in meinem GitHub-Repository. Wenn du es laufen lässt, wirst du sehen, dass bei der PriorityBlockingQueue – genau wie bei der PriorityQueue – immer das kleinste Element am Kopf der Queue steht und als nächstes entnommen wird.

DelayQueue

Zum Ende kommen wir zu ein paar sehr speziellen Queue-Implementierungen des JDK.

Die Klasse java.util.concurrent.DelayQueue (→ Javadoc) ist – wie die PriorityQueue, die sie intern verwendet – keine FIFO-Queue. Stattdessen können Elemente erst dann entnommen werden, wenn eine dem Element zugeordnete Wartezeit („delay“) abgelaufen ist.

Dazu müssen die Elemente das Interface java.util.concurrent.Delayed (→ Javadoc) und dessen getDelay​()-Methode implementieren. Diese liefert bei jeden Aufruf die restliche Wartezeit zurück, die noch ablaufen muss, bis das Element aus der Queue entnommen werden darf.

Genau wie die PriorityBlockingQueue ist auch die DelayQueue blocking aber unbounded, kann also beliebig viele Elemente aufnehmen und blockiert nur bei der Entnahme.

Threadsicherheit wird durch ein einzelnes ReentrantLock sichergestellt.

Die weiteren Eigenschaften der DelayQueue:

  • keine Fairness Policy verfügbar
  • Der Iterator ist weakly consistent (Definition s. o.).
  • Zeitkomplexität von size(): O(1) wie bei der zugrunde liegenden PriorityQueue.

Einsatzempfehlung

Ich habe die DelayQueue noch nie benötigt und kann sie für keinen mir bekannten, sinnvollen Einsatzzweck empfehlen. Sie wird im JDK nur ein einziges Mal genutzt (in einer alten Swing-Klasse, die mit einem ScheduledExecutorService wahrscheinlich eleganter hätte implementiert werden können), von daher ist nicht ausgeschlossen, dass sie Fehler enthält.

DelayQueue-Beispiel

In folgendem Beispiel (→ Code in GitHub) wird eine DelayQueue mit Objekten der Klasse DelayedElement gefüllt, welche eine Zufallszahl enthalten sowie ein zufälliges initiales Delay zwischen 100 und 1.000 ms. Danach wird so oft poll() aufgerufen, bis die Queue wieder leer ist.

public class DelayQueueExample {
  public static void main(String[] args) {
    BlockingQueue<DelayedElement<Integer>> queue = new DelayQueue<>();
    ThreadLocalRandom tlr = ThreadLocalRandom.current();
    long startTime = System.currentTimeMillis();

    // Enqueue random numbers with random initial delays
    for (int i = 0; i < 7; i++) {
      DelayedElement<Integer> e = new DelayedElement<>(
          tlr.nextInt(10, 100),
          tlr.nextInt(100, 1000));
      queue.offer(e);
      System.out.printf("[%3dms] queue.offer(%s)   --> queue = %s%n",
          System.currentTimeMillis() - startTime, e, queue);
    }

    // Dequeue all elements
    while (!queue.isEmpty()) {
      try {
        DelayedElement<Integer> e = queue.take();
        System.out.printf("[%3dms] queue.poll() = %s --> queue = %s%n",
            System.currentTimeMillis() - startTime, e, queue);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

Und hier die zugehörige Klasse DelayedElement (→ Code in GitHub). Die Sortierung ist – um den Code nicht noch länger zu machen – nicht stabil, d. h. wenn zwei Elemente mit derselben Wartezeit in die Queue gestellt werden, werden sie in zufälliger Reihenfolge wieder entnommen.

public class DelayedElement<E extends Comparable<E>> implements Delayed {
  private final E e;
  private final long initialDelayMillis;
  private final long expiration;

  public DelayedElement(E e, long initialDelayMillis) {
    this.e = e;
    this.initialDelayMillis = initialDelayMillis;
    this.expiration = System.currentTimeMillis() + initialDelayMillis;
  }

  @Override
  public long getDelay(TimeUnit unit) {
    long remainingDelayMillis = expiration - System.currentTimeMillis();
    return unit.convert(remainingDelayMillis, TimeUnit.MILLISECONDS);
  }

  @Override
  public int compareTo(Delayed o) {
    DelayedElement<?> other = (DelayedElement<?>) o;
    return Long.compare(expiration, other.expiration);
  }

  @Override
  public String toString() {
    return "{" + e + ", " + initialDelayMillis + "ms}";
  }
}

Hier eine beispielhafte Ausgabe des Programms. Es ist gut zu sehen, wie die Queue zwar nicht sortiert ist, das Element mit der kürzesten Wartezeit sich jedoch immer vorne (links) befindet und dass die Elemente nach Ablauf ihrer jeweiligen Wartezeit entnommen werden:

[  1ms] queue.offer({19, 713ms})   --> queue = [{19, 713ms}]
[ 28ms] queue.offer({15, 643ms})   --> queue = [{15, 643ms}, {19, 713ms}]
[ 29ms] queue.offer({35, 253ms})   --> queue = [{35, 253ms}, {19, 713ms}, {15, 643ms}]
[ 29ms] queue.offer({11, 781ms})   --> queue = [{35, 253ms}, {19, 713ms}, {15, 643ms}, {11, 781ms}]
[ 29ms] queue.offer({17, 844ms})   --> queue = [{35, 253ms}, {19, 713ms}, {15, 643ms}, {11, 781ms}, {17, 844ms}]
[ 29ms] queue.offer({40, 490ms})   --> queue = [{35, 253ms}, {19, 713ms}, {40, 490ms}, {11, 781ms}, {17, 844ms}, {15, 643ms}]
[ 30ms] queue.offer({39, 119ms})   --> queue = [{39, 119ms}, {19, 713ms}, {35, 253ms}, {11, 781ms}, {17, 844ms}, {15, 643ms}, {40, 490ms}]
[151ms] queue.poll() = {39, 119ms} --> queue = [{35, 253ms}, {19, 713ms}, {40, 490ms}, {11, 781ms}, {17, 844ms}, {15, 643ms}]
[283ms] queue.poll() = {35, 253ms} --> queue = [{40, 490ms}, {19, 713ms}, {15, 643ms}, {11, 781ms}, {17, 844ms}]
[520ms] queue.poll() = {40, 490ms} --> queue = [{15, 643ms}, {19, 713ms}, {17, 844ms}, {11, 781ms}]
[673ms] queue.poll() = {15, 643ms} --> queue = [{19, 713ms}, {11, 781ms}, {17, 844ms}]
[716ms] queue.poll() = {19, 713ms} --> queue = [{11, 781ms}, {17, 844ms}]
[811ms] queue.poll() = {11, 781ms} --> queue = [{17, 844ms}]
[874ms] queue.poll() = {17, 844ms} --> queue = []

LinkedTransferQueue

Eine weitere Spezialqueue ist die java.util.concurrent.LinkedTransferQueue (→ Javadoc). Es handelt sich – wie die PriorityBlockingQueue – um eine unbounded blocking Queue, d. h. nur die Dequeue-Operationen können zu Fehlern führen oder blockieren.

LinkedTransferQueue basiert auf einer verketteten Liste, und Threadsicherheit wird genau wie bei ConcurrentLinkedQueue und ConcurrentLinkedDeque durch nicht-blockierende Compare-and-swap-Operationen erreicht.

Das besondere an dieser Queue ist, dass sie zusätzliche transfer()– und tryTransfer()-Methoden bereitstellt, die solange blockieren, bis das per transfer()/tryTransfer() angehängte Element wieder aus der Queue entnommen wurde (im Gegensatz zu: bis Platz in der Queue frei wurde). Für die Details dieser Methoden verweise ich auf das oben verlinkte Javadoc.

Die weiteren Eigenschaften der LinkedTransferQueue:

  • keine Fairness Policy verfügbar
  • Der Iterator ist weakly consistent (Definition s. o.).
  • Zeitkomplexität von size(): O(n) – es wird über die komplette Queue iteriert, um die Größe zu bestimmen.

Einsatzempfehlung

Die LinkedTransferQueue wird im JDK nicht verwendet. Ursprünglich wurde sie für das im JDK 7 eingeführte Fork/Join-Framework implementiert, dann aber noch nicht dafür genutzt. Die Wahrscheinlichkeit von Bugs ist daher recht hoch, so dass auf den Einsatz dieser Klasse verzichtet werden sollte.

LinkedTransferQueue-Beispiel

Im folgenden Beispiel (→ Code in GitHub) werden fünf Threads gestartet, die LinkedTransferQueue.transfer() aufrufen. Danach werden solange Elemente aus der Queue entnommen, bis diese wieder leer ist.

public class LinkedTransferQueueExample {
  public static void main(String[] args) {
    LinkedTransferQueue<Integer> queue = new LinkedTransferQueue<>();

    // Start 5 threads calling queue.transfer()
    for (int i = 0; i < 5; i++) {
      final int finalI = i;
      new Thread(() -> {
        log("Calling queue.transfer(%d) (queue = %s)...", finalI, queue);
        try {
          queue.transfer(finalI);
          log("queue.transfer(%d) returned (queue = %s)", finalI, queue);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
      }).start();

      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }

    log("queue = " + queue);

    // Now take all elements until the queue is empty
    while (!queue.isEmpty()) {
      log("    Calling queue.take() (queue = %s)...", queue);
      try {
        Integer e = queue.take();
        log("    queue.take() returned %d (queue = %s)", e, queue);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }

  private static void log(String format, Object... args) {
    System.out.printf(Locale.US, "[%-8s] %s%n",
        Thread.currentThread().getName(),
        String.format(format, args));
  }
}

In der Ausgabe des Beispiels ist gut zu erkennen, wie die queue.transfer()-Aufrufe zunächst blockieren und immer dann ein Aufruf zurückkehrt, wenn queue.take() den von transfer() übergebenen Wert entnommen hat:

[Thread-0] Calling queue.transfer(0) (queue = [])...
[Thread-1] Calling queue.transfer(1) (queue = [0])...
[Thread-2] Calling queue.transfer(2) (queue = [0, 1])...
[Thread-3] Calling queue.transfer(3) (queue = [0, 1, 2])...
[Thread-4] Calling queue.transfer(4) (queue = [0, 1, 2, 3])...
[main    ] queue = [0, 1, 2, 3, 4]
[main    ]     Calling queue.take() (queue = [0, 1, 2, 3, 4])...
[main    ]     queue.take() returned 0 (queue = [1, 2, 3, 4])
[Thread-0] queue.transfer(0) returned (queue = [1, 2, 3, 4])
[main    ]     Calling queue.take() (queue = [1, 2, 3, 4])...
[main    ]     queue.take() returned 1 (queue = [2, 3, 4])
[Thread-1] queue.transfer(1) returned (queue = [2, 3, 4])
[main    ]     Calling queue.take() (queue = [2, 3, 4])...
[main    ]     queue.take() returned 2 (queue = [3, 4])
[Thread-2] queue.transfer(2) returned (queue = [3, 4])
[main    ]     Calling queue.take() (queue = [3, 4])...
[main    ]     queue.take() returned 3 (queue = [4])
[Thread-3] queue.transfer(3) returned (queue = [4])
[main    ]     Calling queue.take() (queue = [4])...
[main    ]     queue.take() returned 4 (queue = [])
[Thread-4] queue.transfer(4) returned (queue = [])

SynchronousQueue

Kommen wir zur letzten blockierenden Spezialqueue, der java.util.concurrent.SynchronousQueue (→ Javadoc). “Synchronous” ist dabei nicht mit „synchronized“ zu verwechseln. Es bedeutet vielmehr, dass jede Enqueue-Operation auf eine korrespondierende Dequeue-Operation warten muss und jede Dequeue-Operation auf eine Enqueue-Operation.

Eine SynchronousQueue enthält niemals Elemente, auch dann nicht, wenn Enqueue-Operationen gerade auf Dequeue-Operation warten. Analog dazu ist die Größe einer SynchronousQueue immer 0, und peek() liefert immer null zurück.

Die SynchronousQueue ist neben der ArrayBlockingQueue die zweite Queue-Implementierung, die eine Fairness Policy anbietet. Hierbei gibt es eine Besonderheit: Wenn die Fairness Policy nicht aktiviert ist, werden blockierende Aufrufe laut Dokumentation in unspezifizierter Reihenfolge bedient. Tatsächlich ist es jedoch so, dass diese exakt in umgekehrter Reihenfolge bedient werden (also in LIFO-Reihenfolge), da intern ein Stack verwendet wird.

Die weiteren Eigenschaften der SynchronousQueue:

  • Unbounded
  • basiert auf einer verketteten Liste
  • threadsicher durch Compare-and-swap-Operationen auf VarHandles
  • Der Iterator ist immer leer.
  • Zeitkomplexität von size(): O(1)size() liefert immer 0 zurück.

Einsatzempfehlung

Genau wie die DelayQueue und die LinkedTransferQueue habe ich auch die SynchronousQueue in eigenen Projekten noch nie direkt eingesetzt.

Sollten ihre Eigenschaften zu deinen Anforderungen passen, kannst du sie bedenkenlos verwenden. Im JDK wird die SynchronousQueue von Executors.newCachedThreadPool() als „work queue“ für den Executor eingesetzt; die Wahrscheinlichkeit von Bugs ist also äußerst gering.

SynchronousQueue-Beispiel

Im folgende Beispiel (→ Code in GitHub) werden zunächst drei Threads gestartet, die queue.put() aufrufen, danach sechs Threads, die queue.take() aufrufen und anschließend noch einmal drei Threads, die queue.put() ausführen:

public class SynchronousQueueExample {
  private static boolean FAIR = false;

  public static void main(String[] args) {
    BlockingQueue<Integer> queue = new SynchronousQueue<>(FAIR);

    // Start 3 producing threads
    for (int i = 0; i < 3; i++) {
      final int finalI = i;
      new Thread(() -> enqueue(queue, finalI)).start();
      sleep(250);
    }

    // Start 6 consuming threads
    for (int i = 0; i < 6; i++) {
      new Thread(() -> dequeue(queue)).start();
      sleep(250);
    }

    // Start 3 more producing threads
    for (int i = 3; i < 6; i++) {
      final int finalI = i;
      new Thread(() -> enqueue(queue, finalI)).start();
      sleep(250);
    }
  }

  private static void enqueue(BlockingQueue<Integer> queue, int finalI) {
    log("Calling queue.put(%d) (queue = %s)...", finalI, queue);
    try {
      queue.put(finalI);
      log("queue.put(%d) returned (queue = %s)", finalI, queue);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void dequeue(BlockingQueue<Integer> queue) {
    log("    Calling queue.take() (queue = %s)...", queue);
    try {
      Integer e = queue.take();
      log("    queue.take() returned %d (queue = %s)", e, queue);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void sleep(long millis) {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void log(String format, Object... args) {
    System.out.printf(Locale.US, "[%-9s] %s%n",
        Thread.currentThread().getName(),
        String.format(format, args));
  }
}

In der Ausgabe kannst du sehen, wie die ersten drei Aufrufe von queue.put() solange blockieren, bis die eingefügten Elemente mit queue.take() in umgekehrter Reihenfolge entnommen werden.

Danach blockieren die drei folgenden Aufrufe von queue.take() solange bis mit queue.put() drei weitere Elemente in die Queue geschrieben wurden.

Die Queue bleibt während der gesamten Zeit leer.

[Thread-0 ] Calling queue.put(0) (queue = [])...
[Thread-1 ] Calling queue.put(1) (queue = [])...
[Thread-2 ] Calling queue.put(2) (queue = [])...
[Thread-3 ]     Calling queue.take() (queue = [])...
[Thread-3 ]     queue.take() returned 2 (queue = [])
[Thread-2 ] queue.put(2) returned (queue = [])
[Thread-4 ]     Calling queue.take() (queue = [])...
[Thread-4 ]     queue.take() returned 1 (queue = [])
[Thread-1 ] queue.put(1) returned (queue = [])
[Thread-5 ]     Calling queue.take() (queue = [])...
[Thread-5 ]     queue.take() returned 0 (queue = [])
[Thread-0 ] queue.put(0) returned (queue = [])
[Thread-6 ]     Calling queue.take() (queue = [])...
[Thread-7 ]     Calling queue.take() (queue = [])...
[Thread-8 ]     Calling queue.take() (queue = [])...
[Thread-9 ] Calling queue.put(3) (queue = [])...
[Thread-9 ] queue.put(3) returned (queue = [])
[Thread-8 ]     queue.take() returned 3 (queue = [])
[Thread-10] Calling queue.put(4) (queue = [])...
[Thread-10] queue.put(4) returned (queue = [])
[Thread-7 ]     queue.take() returned 4 (queue = [])
[Thread-11] Calling queue.put(5) (queue = [])...
[Thread-11] queue.put(5) returned (queue = [])
[Thread-6 ]     queue.take() returned 5 (queue = [])

Wenn du die Konstante FAIR auf true setzt, wirst du sehen, wie die Elemente nicht in LIFO-, sondern in FIFO-Reihenfolge entnommen werden.

Optimierte MPMC-, MPSC-, SPMC- und SPSC-Queues

Alle vom JDK angebotenen threadsicheren Queue- und Deque-Implementierungen können bedenkenlos in multi-producer-multi-consumer-Umgebungen eingesetzt werden. Das bedeutet, dass ein oder mehrere schreibende Threads sowie ein oder mehrere lesende Threads nebenläufig auf die JDK-Queues und -Deques zugreifen können.

Mit speziellen Mechanismen ist es möglich Queues und Deques so zu optimieren, dass der Overhead für das Gewährleisten der Threadsicherheit minimiert wird, wenn es eine Beschränkung auf einen lesenden und/oder einen schreibenden Thread gibt.

Dementsprechend unterscheidet man folgende vier Fälle:

  • Multi-producer-multi-consumer (MPMC)
  • Multi-producer-single-consumer (MPSC)
  • Single-producer-multi-consumer (SPMC)
  • Single-producer-single-consumer (SPSC)

Die Open-Source-Bibliothek JCTools bietet für alle vier Fälle hoch-optimierte Queue-Implementierungen.

Zusammenfassung

In diesem Ultimate Guide habe ich die grundlegenden Funktionen von Queues, Deques und Stacks erklärt, die entsprechenden Interfaces und Implementierungen im JDK anhand zahlreicher Code-Beispiele im Detail vorgestellt und Empfehlungen ausgesprochen, in welchen Fällen du zu welchen Implementierungen greifen solltest.

Für den tagtäglichen Gebrauch sind das:

  • ArrayDeque für single-threaded Anwendungen.
  • ConcurrentLinkedQueue und ConcurrentLinkedDeque als threadsichere, nicht blockierende und unbounded Queues/Deques.
  • ArrayBlockingQueue als threadsichere, blockierende, bounded Queue, sofern du wenig Contention zwischen Producer- und Consumer-Threads erwartest.
  • LinkedBlockingQueue als threadsichere, blockierende, bounded Queues, wenn du eher hohe Contention zwischen Producer- und Consumer-Threads erwartest (am besten testen, welche Implementierung für deinen Use Case performanter ist).
  • LinkedBlockingDeque als threadsicheres, blockierendes, bounded Deque.
  • … oder die optimierten Queues von JCTools.

Wenn du den Artikel hilfreich findest, teile ihn gerne über einen der Share-Buttons etwas weiter unten. Wenn du informiert werden möchtest, wenn ich neue Artikel veröffentliche, dann trage dich gerne über das folgende Formular in meinen Mail-Verteiler ein.

  •  
  •  
  •  
  •  
  •  
  •  

Über den Autor

Ich bin freiberuflicher Softwareentwickler mit über 20 Jahren Erfahrung in skalierbaren Java-Enterprise-Anwendungen. Mein Schwerpunkt liegt auf der Optimierung komplexer Algorithmen und auf fortgeschrittenen Themen wie Concurrency, dem Java Memory Model und Garbage Collection. Hier auf HappyCoders.eu möchte ich dir helfen, ein besserer Java-Programmierer zu werden. Lies mehr über mich hier.

Die folgenden Artikel könnten dir auch gefallen
Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Pflichtfelder sind markiert.

{"email":"Email address invalid","url":"Website address invalid","required":"Required field missing"}