

Structured Concurrency wurde – zusammen mit virtuellen Threads und Scoped Values – in Project Loom entwickelt. Structured Concurrency ist seit Java 19 als Incubator-Feature und seit Java 21 als Preview-Feature im JDK enthalten.
In diesem Artikel erfährst du:
- Warum benötigen wir Structured Concurrency?
- Was ist Structured Concurrency?
- Wie wird
StructuredTaskScope
verwendet? - Was ist eine Policy? Welche Policies gibt es, und wie können wir selbst eine Policy schreiben?
- Was ist der Vorteil von Structured Concurrency?
Eine begleitende Demo-Anwendung (mit Java-21- als auch Java-25-Code) findest du in diesem GitHub-Repository.
Schauen wir uns zuerst einmal an, wie wir nebenläufige Teilaufgaben bisher implementiert haben.
Warum benötigen wir Structured Concurrency?
Wenn eine Aufgabe aus verschiedenen – vor allem blockierenden – Teilaufgaben besteht, die nebenläufig erledigt werden können (z. B. Zugriff auf Daten aus einer Datenbank oder Aufruf einer Remote API), so konnten wir hierfür bisher das Java-Executor-Framework einsetzen.
Das könnte dann z. B. so aussehen (Klasse InvoiceGenerator3_ThreadPool in der Demo-Anwendung):
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException, ExecutionException {
Future<Order> orderFuture =
executor.submit(() -> orderService.getOrder(orderId));
Future<Customer> customerFuture =
executor.submit(() -> customerService.getCustomer(customerId));
Future<InvoiceTemplate> invoiceTemplateFuture =
executor.submit(() -> invoiceTemplateService.getTemplate(language));
Order order = orderFuture.get();
Customer customer = customerFuture.get();
InvoiceTemplate invoiceTemplate = invoiceTemplateFuture.get();
return Invoice.generate(order, customer, invoiceTemplate);
}
Code-Sprache: Java (java)
Wir übergeben die drei Teilaufgaben an den Executor
und warten auf die Teilergebnisse. Der Happy Path ist schnell implementiert. Aber wie behandeln wir Ausnahmen?
- Wenn in einem Subtask ein Fehler auftritt – wie können wir dann die anderen Subtasks abbrechen? Wenn im Beispiel oben
loadOrderFromOrderService(...)
fehlschlägt, dann wirftorderFuture.get()
eine Exception, diecreateInvoice(...)
-Methode endet, und wir haben evtl. zwei noch weiterlaufende Threads. - Wie können wir die Subtasks abbrechen, wenn der Parent Task („Erstelle eine Rechung”) abgebrochen wird – oder wenn die komplette Anwendung heruntergefahren wird?
- Wie können wir – in einem alternativen Use Case – verbleibende Subtasks abbrechen, wenn lediglich das Ergebnis eines einzigen Subtasks benötigt wird?
Alles ist machbar, erfordert aber äußerst komplexen, schwer wartbaren Code (im GitHub-Repository findest du zwei Beispiele dafür: InvoiceGenerator2b_CompletableFutureCancelling und InvoiceGenerator4b_NewVirtualThreadPerTaskCancelling).
Und was, wenn wir Code dieser Art debuggen möchten? Ein Thread-Dump z. B. würde uns haufenweise Threads mit dem Namen „pool-X-thread-Y” liefern – wir wüssten aber nicht, welcher Pool-Thread zu welchem aufrufenden Thread gehört, da sich alle aufrufenden Threads den Thread-Pool des Executors teilen.
Was ist Unstructured Concurrency?
„Unstructured Concurrency” bedeutet, dass unsere Tasks in einem Netz von Threads ablaufen, deren Start und Ende im Code schwer erkennbar ist. Eine saubere Fehlerbehandlung ist meist nicht vorhanden, und oft kommt es zu verwaisten Threads, wenn eine Kontrollstruktur (im Beispiel oben: die createInvoice(...)
-Methode) endet:

Was ist Structured Concurrency?
Die in Java 19 als Incubator und Java 21 als Preview eingeführte – und in Java 25 noch einmal überarbeitete – Structured Concurrency ist ein Konzept, das die Implementierung, Lesbarkeit und Wartbarkeit von Code für die Aufteilung einer Aufgabe in Teilaufgaben und deren nebenläufige Abarbeitung erheblich verbessert.
Dazu führt sie mit der Klasse StructuredTaskScope
eine Kontrollstruktur ein, die
- einen klaren Scope definiert, an dessen Anfang die Threads der Teilaufgaben starten und an dessen Ende die Threads der Teilaufgaben enden,
- die eine saubere Fehlerbehandlung ermöglicht
- und die einen sauberen Abbruch von Teilaufgaben erlaubt, deren Ergebnisse nicht mehr benötigten werden.
Was das genau bedeutet, zeige ich dir in den folgenden Abschnitten an mehreren Beispielen.
StructuredTaskScope Beispiel
Structured Concurrency wird mit der Klasse StructuredTaskScope
implementiert. Mit dieser Klasse können wir das Beispiel wie folgt umschreiben.
StructuredTaskScope Beispiel – Java 21–24
Klasse InvoiceGenerator5_StructuredTaskScope.java im java-21
-Branch der Demo-Anwendung:
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException {
try (var scope = new StructuredTaskScope<>()) {
Subtask<Order> orderSubtask =
scope.fork(() -> orderService.getOrder(orderId));
Subtask<Customer> customerSubtask =
scope.fork(() -> customerService.getCustomer(customerId));
Subtask<InvoiceTemplate> invoiceTemplateSubtask =
scope.fork(() -> invoiceTemplateService.getTemplate(language));
scope.join();
Order order = orderSubtask.get();
Customer customer = customerSubtask.get();
InvoiceTemplate template = invoiceTemplateSubtask.get();
return Invoice.generate(order, customer, template);
}
}
Code-Sprache: Java (java)
Eine gemeinsame Erklärung für alle Java-Versionen folgt unter dem Java-25-Beispiel.
StructuredTaskScope Beispiel – Java 25
Klasse InvoiceGenerator5_StructuredTaskScope.java im main
-Branch der Demo-Anwendung:
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException {
try (var scope = StructuredTaskScope.open()) {
Subtask<Order> orderSubtask =
scope.fork(() -> orderService.getOrder(orderId));
Subtask<Customer> customerSubtask =
scope.fork(() -> customerService.getCustomer(customerId));
Subtask<InvoiceTemplate> invoiceTemplateSubtask =
scope.fork(() -> invoiceTemplateService.getTemplate(language));
scope.join();
Order order = orderSubtask.get();
Customer customer = customerSubtask.get();
InvoiceTemplate template = invoiceTemplateSubtask.get();
return Invoice.generate(order, customer, template);
}
}
Code-Sprache: Java (java)
In diesem einfachen Beispiel unterscheidet sich lediglich die Art, wie der StructuredTaskScope
geöffnet wird: vor Java 25 mit new StructuredTaskScope<>()
und ab Java 25 mit StructuredTaskScope.open()
.
Erläuterungen für alle Java-Versionen
Verglichen mit dem Unstructured-Concurrency-Beispiel ersetzen wir den im Scope der Klasse liegenden ExecutorService
durch einen im Scope der Methode liegenden StructuredTaskScope
– und executor.submit()
durch scope.fork()
.
Mit scope.join()
warten wir darauf, dass alle Tasks erledigt sind. Das Risiko verwaister Tasks besteht damit nicht mehr.
Danach können wir über Subtask.get()
die Ergebnisse der drei Tasks auslesen.
StructuredTaskScope Fehlerbehandlung – Java 21–24
Sollte es in einem der Tasks zu einer Exception gekommen sein, wirft Subtask.get()
eine IllegalStateException
. Besser ist es daher mit state()
den Status eines Subtasks abzufragen, bevor wir get()
aufrufen:
Order order;
if (orderSubtask.state() == Subtask.State.SUCCESS) {
order = orderSubtask.get();
} else {
// Handle error
}
Code-Sprache: Java (java)
StructuredTaskScope Fehlerbehandlung – Java 25
Wenn es in Java 25 zu einer Exception in einem der Tasks kommt, wird diese – gewrappt in eine StructuredTaskScope.FailedException
– bereits von scope.join()
geworfen. Eine Prüfung des Subtask-Status nach dem Aufruf von scope.join()
ist daher nicht erforderlich.
Den Beispielcode ausführen
Falls du das Beispiel selbst ausprobieren möchtest: Preview-Features müssen explizit freigeschaltet werden, in diesem Fall mit --enable-preview --source <verwendete Java-Version>
. Eine genaue Anleitung findest du in der README der Demo-Anwendung.
StructuredTaskScope Policies
Eine sogenannte Policy definiert, was passiert, wenn ein Subtask beendet wird oder eine Exception wirft. Außerdem kann eine Policy einen Rückgabewert für scope.join()
definieren – dazu später mehr.
Policies in Java 21–24
Vor Java 25 hatte ein durch new StructuredTaskScope()
geöffneter Scope die Policy, darauf zu warten, dass alle Subtasks erfolgreich oder mit einer Exception beendet wurden.
Wenn aber in einem der Tasks in unserem Beispiel eine Exception auftritt, können wir mit den Ergebnissen der anderen zwei Tasks nichts anfangen – warum also auf sie warten?
Java 21–24: „Shutdown on Failure”-Policy
Mit der „Shutdown on Failure”-Policy können wir in Java 21–24 festlegen, dass das Auftreten einer Exception in einem Task dazu führt, dass alle anderen Tasks abgebrochen werden.
Die „Shutdown on Failure”-Policy können wir wie folgt einsetzen (du findest den Code in der Klasse InvoiceGenerator6_ShutdownOnFailure im java-21
-Branch des GitHub-Repos):
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<Order> orderSubtask =
scope.fork(() -> orderService.getOrder(orderId));
Subtask<Customer> customerSubtask =
scope.fork(() -> customerService.getCustomer(customerId));
Subtask<InvoiceTemplate> invoiceTemplateSubtask =
scope.fork(() -> invoiceTemplateService.getTemplate(language));
scope.join();
scope.throwIfFailed();
Order order = orderSubtask.get();
Customer customer = customerSubtask.get();
InvoiceTemplate template = invoiceTemplateSubtask.get();
return Invoice.generate(order, customer, template);
}
}
Code-Sprache: Java (java)
Verglichen mit dem vorherigen Beispiel habe ich zwei Dinge geändert:
- Ich habe in der dritten Zeile
new StructuredTaskScope<>()
durchnew StructuredTaskScope.ShutdownOnFailure()
ersetzt. - Ich habe nach
scope.join()
das Kommandoscope.throwIfFailed()
eingefügt.
Sollte nun in einem der drei Tasks eine Exception auftreten, werden alle anderen Subtasks sofort abgebrochen, scope.join()
kehrt zurück und scope.throwIfFailed()
wirft die Exception des fehlgeschlagenen Subtasks, eingebettet in eine ExecutionException
.
Im Beispielcode werfen die drei Subtasks mit einer gewissen Wahrscheinlichkeit eine Exception. Wenn du das Programm ein paar mal startest, wirst du sehen, wie eine Exception in einem Task zu einer Interruption in den anderen Tasks und einer Beendigung des Programms führt:
$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo1_invoice/InvoiceGenerator6_ShutdownOnFailure
[Thread[#1,main,5,main]] Forking tasks
[Thread[#1,main,5,main]] Waiting for all tasks to finish or one to fail
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Loading customer
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Loading order
[VirtualThread[#35]/runnable@ForkJoinPool-1-worker-1] Loading template
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-1] Finished loading customer
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-2] Error loading order
[VirtualThread[#35]/runnable@ForkJoinPool-1-worker-1] Template loading was interrupted
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error loading order
[...]
Code-Sprache: Klartext (plaintext)
An dieser Ausgabe siehst du übrigens auch, dass alle Tasks in virtuellen Threads ausgeführt werden.
Policies in Java 25
Ein in Java 25 mit StructuredTaskScope.open()
geöffneter Scope hat die Policy, beim Auftreten einer Exception in einem der Subtasks umgehend alle anderen Subtasks abzubrechen und über scope.join()
eine FailedException
zu werfen mit der im Subtask aufgetreteten Exception als „Cause“.
Also „Shutdown on Failure” by default.
Ein Aufruf von scope.throwIfFailed()
wie in Java 21–24 ist nicht erforderlich – die throwIfFailed()
-Methode existiert in Java 25 nicht mehr.
In Java 25 sieht die Programmausgabe beim Auftreten eines Fehlers beispielsweise wie folgt aus:
[Thread[#3,main,5,main]] Forking tasks
[VirtualThread[#37]/runnable@ForkJoinPool-1-worker-1] Loading order
[VirtualThread[#39]/runnable@ForkJoinPool-1-worker-4] Loading customer
[Thread[#3,main,5,main]] Waiting for all tasks to finish
[VirtualThread[#44]/runnable@ForkJoinPool-1-worker-3] Loading template
[VirtualThread[#39]/runnable@ForkJoinPool-1-worker-3] Error loading customer
[VirtualThread[#37]/runnable@ForkJoinPool-1-worker-1] Order loading was interrupted
[VirtualThread[#44]/runnable@ForkJoinPool-1-worker-1] Template loading was interrupted
Exception in thread "main" java.util.concurrent.StructuredTaskScope$FailedException: java.lang.RuntimeException: Error loading customer
Code-Sprache: Klartext (plaintext)
Du siehst: Statt einer generischen ExecutionException
wie in Java 21–24 erhalten wir ab Java 25 eine StructuredTaskScope
-spezifische FailedException
.
Java 21–24: „Shutdown on Success”-Policy
Eine alternative Policy ist „Shutdown on Success”. Bei dieser Policy wird der Scope beendet, sobald ein Subtask erfolgreich war. Die anderen Subtasks werden dann abgebrochen.
In Java 21–24 erzeugst du einen Scope mit der „Shutdown on Success”-Policy durch new StructuredTaskScope.ShutdownOnSuccess()
. Das Ergebnis des einen erfolgreichen Subtasks kannst du mit scope.result()
auslesen.
Hier ein Beispiel dazu – mit einem anderen Use Case: Wir wollen eine Kundenadresse über mehrere externe APIs gleichzeitig verifizieren und wollen nur das erste Ergebnis verwenden (du findest den Code in der Klasse AddressVerification2_ShutdownOnSuccess im java-21
-Branch des GitHub-Repos):
AddressVerificationResponse verifyAddress(Address address)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<AddressVerificationResponse>()) {
scope.fork(() -> verificationService.verifyViaServiceA(address));
scope.fork(() -> verificationService.verifyViaServiceB(address));
scope.fork(() -> verificationService.verifyViaServiceC(address));
scope.join();
return scope.result();
}
}
Code-Sprache: Java (java)
Hier wartet scope.join()
darauf, dass der erste Subtask erfolgreich beendet wird – danach liefert scope.result()
dessen Ergebnis zurück. Sollten wider Erwarten alle drei Aufrufe der verifyViaServiceX()
-Methoden eine Exception geworfen haben, wirft scope.result()
die erste davon, eingebettet in eine ExecutionException
.
Wenn du den Beispielcode ausführst, siehst du, wie der erste erfolgreiche Subtask zu einem Ergebnis führt und die anderen Tasks abgebrochen werden:
$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo2_address/AddressVerification2_ShutdownOnSuccess
[Thread[#1,main,5,main]] Forking tasks
[Thread[#1,main,5,main]] Waiting for one task to finish
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Verifying address via service B
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Verifying address via service A
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1] Verifying address via service C
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1] Finished loading address via service C
[Thread[#1,main,5,main]] Retrieving result
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-3] Verifying address via service B was interrupted
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-2] Verifying address via service A was interrupted
Code-Sprache: Klartext (plaintext)
Beachte, dass die result()
-Methode nur bei ShutdownOnSuccess
verfügbar ist und die throwIfFailed()
-Methode nur bei ShutdownOnFailure
.
Java 25: „Any Successful Result”-Policy
In Java 25 heißt die entsprechende Policy „Any Successful Result”. Auch bei dieser wird der Scope beendet, sobald der erste Subtask erfolgreich war. Die anderen Subtasks werden abgebrochen.
In Java 25 wird eine Policy nicht mehr durch Ableiten der StructuredTaskScope
-Klasse implementiert, wie es bei Java 21–24 der Fall war. Policies werden ab Java 25 durch sogenannte Joiner implementiert.
Ein Joiner für die „Any Successful Result”-Policy wird über die statische Methode Joiner.anySuccessfulResultOrThrow()
erzeugt.
Der folgende Code zeigt, wie das Adress-Überprüfungs-Beispiel mit Java 25 implementiert wird (Klasse AddressVerification2_AnySuccessfulResult im main
-Branch des GitHub-Repos):
AddressVerificationResponse verifyAddress(Address address) throws InterruptedException {
try (var scope =
StructuredTaskScope.open(
Joiner.<AddressVerificationResponse>anySuccessfulResultOrThrow())) {
log("Forking tasks");
scope.fork(() -> verificationService.verifyViaServiceA(address));
scope.fork(() -> verificationService.verifyViaServiceB(address));
scope.fork(() -> verificationService.verifyViaServiceC(address));
log("Waiting for one task to finish");
return scope.join();
}
}
Code-Sprache: Java (java)
Änderungen gegenüber Java 21–24:
- Der Scope wird mit
StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow()))
geöffnet – nicht mehr mitnew StructuredTaskScope.ShutdownOnSuccess()
. - Die
scope.result()
-Methode fällt weg – stattdessen liefertscope.join()
das Ergebnis zurück. - Sollten alle drei Subtasks fehlschlagen, wirft
scope.join()
eineFailedException
statt einerExecutionException
.
Java 25: Alle Policies
In Java 21–24 gibt es nur die oben gezeigten Policies „Shutdown on Failure” und „Shutdown on Success”.
In Java 25 stehen deutlich mehr Policies zur Auswahl:
Joiner | Beschreibung |
---|---|
ohne Joiner oder Joiner.awaitAllSuccessfulOrThrow() | Eine Exception in einem Subtask führt umgehend zu einem Abbruch des Scopes; scope.join() wirft die Exception gewrappt in eine FailedException .Wenn alle Subtasks erfolgreich abgeschlossen sind, endet scope.join() ohne Exception und gibt null zurück. Die Ergebnisse der Subtasks müssen aus den von scope.fork() zurückgegebenen Subtask -Objekten ausgelesen werden.Entspricht in Java 21–24 der „Shutdown on Failure”-Policy. |
Joiner.awaitAll() | scope.join() wartet darauf, dass alle Subtasks beendet sind – egal ob erfolgreich oder nicht. scope.join() gibt in jedem Fall null zurück; die Ergebnisse der Subtasks müssen aus den von scope.fork() zurückgegebenen Subtask -Objekten ausgelesen werden.Entspricht in Java 21–24 dem Standardverhalten. |
Joiner .anySuccessfulResultOrThrow() | scope.join() liefert das Ergebnis des ersten erfolgreichen Subtasks; andere Subtasks werden abgebrochen. Schlagen alle Subtasks fehl, wirft scope.join() die Exception des ersten fehlgeschlagenen Subtasks gewrappt in eine FailedException .Entspricht in Java 21–24 der „Shutdown on Success”-Policy. |
Joiner.allSuccessfulOrThrow() | Entspricht der grundsätzlichen Funktionsweise ohne Joiner bzw. mit Joiner.awaitAllSuccessfulOrThrow() – mit dem Unterschied, dass scope.join() bei Erfolg einen Stream aller Subtasks zurückliefert.Eine Exception in einem Subtask führt umgehend zu einem Abbruch des Scopes; scope.join() wirft die Exception gewrappt in eine FailedException .Wenn alle Subtasks erfolgreich abgeschlossen sind, endet scope.join() ohne Exception und gibt einen Stream aller Subtasks zurück. |
Joiner.allUntil(Predicate isDone) | scope.join() wartet darauf, dass entweder alle Subtasks beendet sind – egal ob erfolgreich oder nicht – oder dass das übergebene Predicate auf wenigstens einen beendeten Subtask matcht. Liefert wie Joiner.allSuccessfulOrThrow() einen Stream der Subtasks zurück. |
Benutzerdefinierte StructuredTaskScope-Policy
Sollte keine der vordefinierten Policies für deinen Einsatzzweck geeignet sein, kannst du mit relativ geringem Aufwand eine eigene Policy schreiben.
Nehmen wir an, wir wollen die Verfügbarkeit eines Produkts bei mehreren Lieferanten prüfen, und wir wollen nicht das erste Ergebnis verwenden, sondern das mit der schnellsten Verfügbarkeit. Gleichzeitig wollen wir fehlgeschlagene Anfragen nur dann propagieren, wenn die Anfragen bei allen Lieferanten fehlgeschlagen sind.
Das lässt sich überraschend einfach – und zugleich für andere Einsatzszenarien wiederverwendbar – realisieren.
Benutzerdefinierte Policy in Java 21–24
Hier zunächst die Policy für Java 21–24 (Klasse BestResultScope im java-21
-Branch des GitHub-Repos). Diese nimmt als Konstruktor-Parameter einen Comparator
entgegen, über den wir später definieren werden, dass wir als best result – also als bestes Ergebnis – die schnellste Verfügbarkeit erwarten.
BestResultScope
erweitert zudem die Klasse StructuredTaskScope
, überschreibt deren handleComplete(…)
-Methode und fügt eine resultOrElseThrow()
-Methode hinzu:
public class BestResultScope<T> extends StructuredTaskScope<T> {
private final Comparator<T> comparator;
private T bestResult;
private final List<Throwable> exceptions =
Collections.synchronizedList(new ArrayList<>());
public BestResultScope(Comparator<T> comparator) {
this.comparator = comparator;
}
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
switch (subtask.state()) {
case UNAVAILABLE -> {
// Ignore
}
case SUCCESS -> {
T result = subtask.get();
synchronized (this) {
if (bestResult == null || comparator.compare(result, bestResult) > 0) {
bestResult = result;
}
}
}
case FAILED -> exceptions.add(subtask.exception());
}
}
public <X extends Throwable> T resultOrElseThrow(
Supplier<? extends X> exceptionSupplier) throws X {
ensureOwnerAndJoined();
if (bestResult != null) {
return bestResult;
} else {
X exception = exceptionSupplier.get();
exceptions.forEach(exception::addSuppressed);
throw exception;
}
}
}
Code-Sprache: Java (java)
Die handleComplete(…)
-Methode wird für jeden beendeten Subtask aufgerufen – sowohl für erfolgreiche als auch für solche, die eine Exception geworfen haben. Welcher Fall eingetreten ist, prüfen wir mit subtask.state()
.
Im Erfolgsfall holen wir mit subtask.get()
das Resultat und schreiben dieses – sofern es besser ist als das bisher beste – threadsicher in das Feld bestResult
.
Im Fall einer Exception sammeln wir diese threadsicher in einer Liste.
Die resultOrElseThrow()
-Methode stellt zunächst durch den Aufruf von ensureOwnerAndJoined()
sicher, dass sie aus demjenigen Thread aufgerufen wurde, der den StructuredTaskScope
erzeugt hat, und dass dieser Thread zuvor join()
oder joinUntil(…)
aufgerufen hat.
Daraufhin prüft resultOrElseThrow()
, ob ein erfolgreiches Ergebnis vorliegt, und gibt dieses zurück. Andernfalls wirft es die spezifizierte Exception, an die es die gesammelten Exceptions als „suppressed exceptions” anhängt.
Die benutzerdefinierte Policy können wir wie folgt einsetzen (Klasse SupplierDeliveryTimeCheck2_StructuredTaskScope im java-21
-Branch des GitHub-Repos):
SupplierDeliveryTime getSupplierDeliveryTime(String productId, List<String> supplierIds)
throws SupplierDeliveryTimeCheckException, InterruptedException {
try (var scope =
new BestResultScope<>(
Comparator.comparing(SupplierDeliveryTime::deliveryTimeHours).reversed())) {
for (String supplierId : supplierIds) {
scope.fork(() -> service.getDeliveryTime(productId, supplierId));
}
scope.join();
return scope.result();
}
}
Code-Sprache: Java (java)
Die Ausgabe des Beispielprogramms könnte z. B. so aussehen:
$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo3_suppliers/SupplierDeliveryTimeCheck2_StructuredTaskScope
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Retrieving delivery time from supplier B
[VirtualThread[#33]/runnable@ForkJoinPool-1-worker-4] Retrieving delivery time from supplier D
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-3] Retrieving delivery time from supplier E
[VirtualThread[#32]/runnable@ForkJoinPool-1-worker-5] Retrieving delivery time from supplier C
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Retrieving delivery time from supplier A
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-3] Error retrieving delivery time from supplier B
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-5] Finished retrieving delivery time from supplier A: 110 hours
[VirtualThread[#32]/runnable@ForkJoinPool-1-worker-3] Finished retrieving delivery time from supplier C: 104 hours
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-3] Error retrieving delivery time from supplier E
[VirtualThread[#33]/runnable@ForkJoinPool-1-worker-3] Finished retrieving delivery time from supplier D: 51 hours
[Thread[#1,main,5,main]] Response: SupplierDeliveryTime[supplier=D, deliveryTimeHours=51]
Code-Sprache: Klartext (plaintext)
Es ist schön zu sehen, wie zwar der Aufruf für die Lieferanten B und E fehlerhaft war, die restlichen Lieferanten aber Ergebnisse geliefert haben und schließlich das beste Ergebnis – Lieferant D mit 51 Stunden Lieferzeit – zurückgeliefert wird.
Benutzerdefinierte Policy in Java 25
In Java 25 definierten wir eine benutzterdefinierte Policy nicht durch Erweitern von StructuredTaskScope
, sondern durch Implementierung des Joiner
-Interfaces.
Hier der entsprechende Code für den Joiner (Klasse BestResultJoiner im main
-Branch des GitHub-Repos):
public class BestResultJoiner<T> implements Joiner<T, T> {
private final Comparator<T> comparator;
private T bestResult;
private final List<Throwable> exceptions =
Collections.synchronizedList(new ArrayList<>());
public BestResultJoiner(Comparator<T> comparator) {
this.comparator = comparator;
}
@Override
public boolean onComplete(Subtask<? extends T> subtask) {
switch (subtask.state()) {
case UNAVAILABLE -> {
// Ignore
}
case SUCCESS -> {
T result = subtask.get();
synchronized (this) {
if (bestResult == null || comparator.compare(result, bestResult) > 0) {
bestResult = result;
}
}
}
case FAILED -> exceptions.add(subtask.exception());
}
return false; // Don't cancel the scope
}
@Override
public T result() throws SupplierDeliveryTimeCheckException {
if (bestResult != null) {
return bestResult;
} else {
SupplierDeliveryTimeCheckException exception =
new SupplierDeliveryTimeCheckException();
exceptions.forEach(exception::addSuppressed);
throw exception;
}
}
}
Code-Sprache: Java (java)
Die onComplete()
-Methode entspricht größtenteils der vorherigen handleComplete()
-Methode. Der einizge Unterschied: onComplete()
gibt ein boolean
zurück, mit dem die Methode angeben kann, ob der Scope abgebrochen werden soll (true
steht für abbrechen).
Die result()
-Methode entspricht größtenteils der vorherigen resultOrElseThrow()
-Methode, allerdings bekommt die result()
-Methode keinen Exception-Supplier
übergeben und muss selbst entscheiden, welche Exception sie wirft. Möchte man den Exception-Typ variabel gestalten wie im Beispiel für Java 21–24, könnte man den entsprechenden Supplier dem Joiner
im Konstruktor übergeben.
Verschachtelte StructuredTaskScopes
Falls wir nicht nur die Lieferanten für ein Produkt gleichzeitig abfragen möchten, sondern die Lieferanten für mehrere Produkte, so können wir das ganz einfach wie folgt lösen:
Hier zunächst der Code für Java 21–24 (Klasse SupplierDeliveryTimeCheck3_NestedStructuredTaskScope im java-21
-Branch des GitHub-Repos):
List<SupplierDeliveryTime> getSupplierDeliveryTimes(
List<String> productIds, List<String> supplierIds) throws InterruptedException {
try (var scope = new StructuredTaskScope<SupplierDeliveryTime>()) {
List<Subtask<SupplierDeliveryTime>> subtasks =
productIds.stream()
.map(productId ->
scope.fork(() -> getSupplierDeliveryTime(productId, supplierIds)))
.toList();
scope.join();
return subtasks.stream()
.filter(subtask -> subtask.state() == State.SUCCESS)
.map(Subtask::get)
.toList();
}
}
Code-Sprache: Java (java)
Und hier der entsprechende Code für Java 25 (Klasse SupplierDeliveryTimeCheck3_NestedStructuredTaskScope im main
-Branch des GitHub-Repos). Dieses Mal verwende ich Joiner.allSuccessfulOrThrow()
, um von scope.join()
direkt einen Stream der Subtasks geliefert zu bekommen – so muss ich mir die Subtasks nicht vorher merken.
List<SupplierDeliveryTime> getSupplierDeliveryTimes(
List<String> productIds, List<String> supplierIds) throws InterruptedException {
try (var scope =
StructuredTaskScope.open(Joiner.<SupplierDeliveryTime>allSuccessfulOrThrow())) {
productIds.forEach(
productId -> scope.fork(() -> getSupplierDeliveryTime(productId, supplierIds)));
return scope.join().map(Subtask::get).toList();
}
}
Code-Sprache: Java (java)
In beiden Beispielen erzeugen wir einen StructuredTaskScope
– und innerhalb dieses Scopes forken wir Subtasks, die wiederum die im vorherigen Abschnitt gezeigte Methode getSupplierDeliveryTime(...)
aufrufen, welche damit also innerhalb des Scopes von getSupplierDeliveryTimes(...)
verschachtelte Scopes öffnen.
Die folgende Grafik zeigt diese Scopes als gestrichelte Linien:

Vorteile von Structured Concurrency
Structured Concurrency zeichnet sich durch klar im Code ersichtliche Start- und Endpunkte nebenläufiger Subtasks aus. Fehler in den Subtasks werden an den Eltern-Scope propagiert. Das macht den Code besser les- und wartbar und stellt sicher, dass zum Ende eines Scopes alle gestarteten Threads beendet sind.
Die folgende Grafik zeigt Unstructured und Structured Concurrency gegenübergestellt:

Vorteile von StructuredTaskScope
Mit StructuredTaskScope
haben wir ein Sprachkonstrukt für Structured Concurrency:
- Task und Subtasks bilden im Code eine abgeschlossene Einheit – es gibt keinen
ExecutorService
in einem höheren Scope, wie z. B. der Klasse. Die Threads kommen nicht aus einem Threadpool; stattdessen wird jeder Subtask in einem neuen virtuellen Thread ausgeführt. - Durch den mittels try-with-resources-Block aufgespannten Scope ergeben sich klare Start- und Endpunkte aller Threads.
- Am Ende des Scopes sind alle Threads beendet.
- Fehler innerhalb der Subtasks werden sauber an den Eltern-Scope propagiert.
- Je nach Policy werden die verbleibenden Subtasks abgebrochen, wenn ein Subtasks erfolgreich war oder wenn in einem Subtask ein Fehler auftrat.
- Wenn der aufrufende Thread abgebrochen wird, werden auch die Subtasks abgebrochen.
Darüber hinaus hilft StructuredTaskScope
beim Debuggen: Wenn wir einen Thread-Dump im neuen JSON-Format ausgeben (jcmd <pid> Thread.dump_to_file -format=json <file>
), dann ist darin die Aufrufhierarchie zwischen Eltern- und Kind-Threads ersichtlich.
StructuredTaskScope und Scoped Values
Die in Java 20 als Incubator, in Java 21 als Preview eingeführten und in Java 25 finalisierten Scoped Values werden beim Einsatz von StructuredTaskScope
innerhalb eines Scopes automatisch an alle durch StructuredTaskScope.fork(...)
erzeugten Kind-Threads weitervererbt.
Wie das genau funktioniert, zeige ich dir an folgendem Code-Beispiel (Klasse SupplierDeliveryTimeCheck4_NestedStructuredTaskScopeUsingScopedValue im java-21
-Branch bzw. SupplierDeliveryTimeCheck4_NestedStructuredTaskScopeUsingScopedValue im main
-Branch).
Wir erzeugen ein ScopedValue
– im Beispiel für einen API-Key, binden dieses an den API-Key und rufen dann die im Abschnitt „Verschachtelte StructuredTaskScopes” gezeigte Methode getSupplierDeliveryTimes(...)
innerhalb des Scopes per call()
auf:
public static final ScopedValue<String> API_KEY = ScopedValue.newInstance();
List<SupplierDeliveryTime> getSupplierDeliveryTimes(
List<String> productIds, List<String> supplierIds, String apiKey) throws Exception {
return ScopedValue.where(API_KEY, apiKey)
.call(() -> getSupplierDeliveryTimes(productIds, supplierIds));
}
Code-Sprache: Java (java)
Durch die Vererbung des Scoped Values API_KEY
kann auf diesen auch innerhalb der SupplierDeliveryTimeService.getDeliveryTime(...)
-Methode zugegriffen werden ohne ihn über Methodenargumente an diese Methode durchschleifen zu müssen – und das selbst dann, wenn die Methoden nicht in demjenigen Thread ausgeführt werden, der ScopedValue.where(...)
aufruft, sondern eben in den durch StructuredTaskScope.fork(...)
erzeugten Kind- bzw. in diesem Beispiel sogar Enkel-Threads.
Fazit
Structured Concurrency wird – aufbauend auf virtuelle Threads – die Verwaltung von Tasks, die in nebenläufige Subtasks aufgeteilt werden, deutlich vereinfachen. Policies erlauben uns das Verhalten von StructuredTaskScope
zu beeinflussen, z. B. um alle Tasks abzubrechen, sollte einer fehlgeschlagen sein.
Die API wurde in Java 25 grundlegend überarbeitet: StructuredTaskScope
und Join-Strategie wurden entkoppelt, was zu klarer strukturiertem, verständlicherem und robusterem Code führt (Stichwort: Composition over inheritance).
Bitte beachte, dass sich Structured Concurrency nach wie vor im Preview-Stadium befindet und somit noch Änderungen unterliegen kann.