In diesem Artikel erfährst du wie die PriorityBlockingQueue
funktioniert und welche Eigenschaften sie hat. Anhand eines Beispiels siehst du, wie man sie einsetzt.
Hier befinden wir uns in der Klassenhierarchie:
PriorityBlockingQueue Eigenschaften
Bei der java.util.concurrent.PriorityBlockingQueue
handelt es sich um eine threadsichere und blockierende Variante der PriorityQueue. In dem verlinkten Artikel erfährst du auch, was eine Priotity Queue ist.
Wie bei der PriorityQueue
werden die Elemente in einem Array gespeichert, das einen Min-Heap repräsentiert; der Iterator durchläuft die Elemente in entsprechender Reihenfolge.
Threadsicherheit wird durch ein einzelnes ReentrantLock
sichergestellt.
Die PriorityBlockingQueue
ist nicht bounded, sie hat also keine Kapazitätsgrenze. Das bedeutet, dass
und put(e)
offer(e, time, unit)
niemals blockieren. Nur die Dequeue-Operationen take()
und poll(time, unit)
blockieren, wenn die Queue leer ist.
Die Eigenschaften im Detail:
Unterliegende Datenstruktur | Thread-safe? | Blocking/ Non-blocking | Fairness Policy | Bounded/ Unbounded | Iterator Type |
---|---|---|---|---|---|
Min-Heap (gespeichert in einem Array) | Ja (pessimistisches Locking mit einem Lock) | Blocking (nur dequeue) | Nicht verfügbar | Unbounded | Weakly consistent¹ |
¹ Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.
Einsatzempfehlung
Die PriorityBlockingQueue
wird im JDK nicht genutzt. Es ist daher nicht auszuschließen, dass sie Bugs enthält. Wenn du eine Queue mit entsprechenden Eigenschaften benötigst und die PriorityBlockingQueue
verwendest, solltest du deine Anwendung intensiv testen.
PriorityBlockingQueue Beispiel
Das folgenden Beispiel zeigt, wie eine PriorityBlockingQueue
angelegt wird und wie mehrere Threads lesend und schreibend darauf zugreifen (→ Code auf GitHub).
Lesende Threads starten alle 3 Sekunden, beginnend sofort nach dem Erstellen der Queue.
Schreibende Threads starten nach 3,5 Sekunden (so dass bereits zwei lesende Threads warten) und schreiben sekündlich einen Zufallswert in die Queue.
public class PriorityBlockingQueueExample {
private static final long startTime = System.currentTimeMillis();
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
// Start reading from the queue immediately, every 3 seconds
for (int i = 0; i < 8; i++) {
int delaySeconds = i * 3;
pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS);
}
// Start writing to the queue after 3.5 seconds (so there are already 2 threads
// waiting), every 1 seconds (so that the queue fills faster than it's emptied,
// so that we see some more elements and their order in the queue)
for (int i = 0; i < 8; i++) {
int delayMillis = 3500 + i * 1000;
pool.schedule(() -> enqueue(queue), delayMillis, TimeUnit.MILLISECONDS);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
}
private static void enqueue(BlockingQueue<Integer> queue) {
int element = ThreadLocalRandom.current().nextInt(10, 100);
log("Calling queue.put(%d) (queue = %s)...", element, queue);
try {
queue.put(element);
log("queue.put(%d) returned (queue = %s)", element, queue);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private static void dequeue(BlockingQueue<Integer> queue) {
log(" Calling queue.take() (queue = %s)...", queue);
try {
Integer element = queue.take();
log(" queue.take() returned %d (queue = %s)", element, queue);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void log(String format, Object... args) {
System.out.printf(
Locale.US,
"[%4.1fs] [%-16s] %s%n",
(System.currentTimeMillis() - startTime) / 1000.0,
Thread.currentThread().getName(),
String.format(format, args));
}
}
Code-Sprache: Java (java)
Im folgenden siehst du eine beispielhafte Ausgabe des Programms:
[ 0.0s] [pool-1-thread-1 ] Calling queue.take() (queue = [])...
[ 3.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [])...
[ 3.5s] [pool-1-thread-6 ] Calling queue.put(87) (queue = [])...
[ 3.5s] [pool-1-thread-6 ] queue.put(87) returned (queue = [])
[ 3.5s] [pool-1-thread-1 ] queue.take() returned 87 (queue = [])
[ 4.5s] [pool-1-thread-9 ] Calling queue.put(89) (queue = [])...
[ 4.5s] [pool-1-thread-9 ] queue.put(89) returned (queue = [])
[ 4.5s] [pool-1-thread-2 ] queue.take() returned 89 (queue = [])
[ 5.5s] [pool-1-thread-7 ] Calling queue.put(31) (queue = [])...
[ 5.5s] [pool-1-thread-7 ] queue.put(31) returned (queue = [31])
[ 6.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [31])...
[ 6.0s] [pool-1-thread-4 ] queue.take() returned 31 (queue = [])
[ 6.5s] [pool-1-thread-5 ] Calling queue.put(71) (queue = [])...
[ 6.5s] [pool-1-thread-5 ] queue.put(71) returned (queue = [71])
[ 7.5s] [pool-1-thread-8 ] Calling queue.put(15) (queue = [71])...
[ 7.5s] [pool-1-thread-8 ] queue.put(15) returned (queue = [15, 71])
[ 8.5s] [pool-1-thread-10] Calling queue.put(33) (queue = [15, 71])...
[ 8.5s] [pool-1-thread-10] queue.put(33) returned (queue = [15, 71, 33])
[ 9.0s] [pool-1-thread-3 ] Calling queue.take() (queue = [15, 71, 33])...
[ 9.0s] [pool-1-thread-3 ] queue.take() returned 15 (queue = [33, 71])
[ 9.5s] [pool-1-thread-6 ] Calling queue.put(58) (queue = [33, 71])...
[ 9.5s] [pool-1-thread-6 ] queue.put(58) returned (queue = [33, 71, 58])
[10.5s] [pool-1-thread-1 ] Calling queue.put(19) (queue = [33, 71, 58])...
[10.5s] [pool-1-thread-1 ] queue.put(19) returned (queue = [19, 33, 58, 71])
[12.0s] [pool-1-thread-9 ] Calling queue.take() (queue = [19, 33, 58, 71])...
[12.0s] [pool-1-thread-9 ] queue.take() returned 19 (queue = [33, 71, 58])
[15.0s] [pool-1-thread-2 ] Calling queue.take() (queue = [33, 71, 58])...
[15.0s] [pool-1-thread-2 ] queue.take() returned 33 (queue = [58, 71])
[18.0s] [pool-1-thread-7 ] Calling queue.take() (queue = [58, 71])...
[18.0s] [pool-1-thread-7 ] queue.take() returned 58 (queue = [71])
[21.0s] [pool-1-thread-4 ] Calling queue.take() (queue = [71])...
[21.0s] [pool-1-thread-4 ] queue.take() returned 71 (queue = [])
Code-Sprache: Klartext (plaintext)
Was kann man in dieser Beispielausgabe erkennen?
Zunächst einmal siehst du, wie nach 0,0 s und 3,0 s die Threads 1 und 2 beim Aufruf von take()
blockieren, da die Queue leer ist.
Nach 3,5 s schreibt Thread 6 die 87 in die Queue. Sofort im Anschluss wacht der zuvor blockierte Thread 1 auf und entnimmt die 87 wieder.
Nach 4,5 s schreibt Thread 9 die 89 in die Queue, die sofort von Thread 2 wieder entnommen wird.
Nach 5,5 s wird die 31 in die Queue geschrieben, die nach 6,0 s wieder entnommen wird.
Nach 6,5 s, 7,5 s und 8,5 s werden die 71, die 15 und die 33 in die Queue geschrieben. Du siehst, wie jeweils das kleinste Element vorne (links) in der Queue steht.
Nach 9,0 s wird das kleinste Elemente, die 15, entnommen. Daraufhin steht das nächstkleinere Element, die 33 am Kopf der Queue.
Nach 9,5 s und 10,5 s werden zwei weitere Elemente, 58 und 19, in die Queue geschrieben. Du siehst wieder gut, wie jeweils das kleinste Element am Kopf der Queue steht.
Die Queue enthält nun vier Elemente. Es werden keine weiteren Elemente in die Queue geschrieben und die existierenden Elemente entsprechend ihrer Priorität entnommen.
Zusammenfassung und Ausblick
In diesem Artikel hast du erfahren, welche Eigenschaften die PriorityBlockingQueue
hat und wie diese eingesetzt wird.
Ab dem nächsten Teil der Tutorial-Serie stelle ich dir einige Queue-Implementierungen für Sonderfälle vor, beginnend mit der DelayQueue.
Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.