structured concurrency javastructured concurrency java
HappyCoders Glasses

Structured Concurrency in Java mit StructuredTaskScope

Sven Woltmann
Sven Woltmann
13. Juni 2022

Structured Concurrency wurde – zusammen mit virtuellen Threads und Scoped Values – in Project Loom entwickelt. Structured Concurrency ist seit Java 19 als Incubator-Feature (JDK Enhancement Proposal 428) im JDK enthalten.

In diesem Artikel erfährst du:

  • Warum benötigen wir Structured Concurrency?
  • Was ist Structured Concurrency?
  • Wie funktioniert StructuredTaskScope?
  • Was ist der Vorteil von Structured Concurrency?

Schauen wir uns zuerst einmal an, wie wir parallele Teilaufgaben bisher implementiert haben...

Warum benötigen wir Structured Concurrency?

Wenn eine Aufgabe aus verschiedenen Teilaufgaben besteht, die parallel erledigt werden können (z. B. Zugriff auf Daten aus einer Datenbank, Aufruf einer Remote API und Laden einer Datei), so konnten wir hierfür bisher das Java-Executur-Framework einsetzen.

Das könnte dann z. B. so aussehen:

private final ExecutorService executor = Executors.newCachedThreadPool();

public Invoice createInvoice(int orderId, int customerId, String language)
    throws ExecutionException, InterruptedException {
  Future<Order> orderFuture =
      executor.submit(() -> loadOrderFromOrderService(orderId));

  Future<Customer> customerFuture =
      executor.submit(() -> loadCustomerFromDatabase(customerId));

  Future<String> invoiceTemplateFuture =
      executor.submit(() -> loadInvoiceTemplateFromFile(language));

  Order order = orderFuture.get();
  Customer customer = customerFuture.get();
  String invoiceTemplate = invoiceTemplateFuture.get();

  return Invoice.generate(order, customer, invoiceTemplate);
}Code-Sprache: Java (java)

Wir übergeben die drei Teilaufgaben an den Executor und warten auf die Teilergebnisse. Der Happy Path ist schnell implementiert. Aber wie behandeln wir Ausnahmen?

  • Wenn in einem Subtask ein Fehler auftritt – wie können wir dann die anderen abbrechen?
  • Wie können wir die Subtasks abbrechen, wenn die gesamte Rechnung nicht mehr benötigt wird?

Beides ist möglich, erfordert aber ziemlich komplexen, schwer wartbaren Code.

Und was, wenn wir Code dieser Art debuggen möchten? Ein Thread-Dump z. B. würde uns haufenweise Threads mit dem Namen "pool-X-thread-Y" liefern – wir wüssten aber nicht, welcher Pool-Thread zu welchem aufrufenden Threads gehört, da sich alle aufrufenden Threads den Thread-Pool des Executors teilen.

Was ist Structured Concurrency?

Die in Java eingeführte "Structured Concurrency" ist ein Konzept, das die Implementierung, Lesbarkeit und Wartbarkeit von Code für die Aufteilung einer Aufgabe in Teilaufgaben und deren parallele Abarbeitung verbessert.

StructuredTaskScope Beispiel

Structured Concurrency wird mit der Klasse StructuredTaskScope implementiert. Mit dieser Klasse können wir das Beispiel wie folgt umschreiben:

Invoice createInvoice(int orderId, int customerId, String language)
    throws ExecutionException, InterruptedException {
  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<Order> orderFuture =
        scope.fork(() -> loadOrderFromOrderService(orderId));

    Future<Customer> customerFuture =
        scope.fork(() -> loadCustomerFromDatabase(customerId));

    Future<String> invoiceTemplateFuture =
        scope.fork(() -> loadInvoiceTemplateFromFile(language));

    scope.join();
    scope.throwIfFailed();

    Order order = orderFuture.resultNow();
    Customer customer = customerFuture.resultNow();
    String invoiceTemplate = invoiceTemplateFuture.resultNow();

    return new Invoice(order, customer, invoiceTemplate);
  }
}Code-Sprache: Java (java)

Wir ersetzen also den im Scope der Klasse liegenden ExecutorService durch einen im Scope der Methode liegenden StructuredTaskScope – und executor.submit() durch scope.fork().

Mit scope.join() warten wir darauf, dass alle Tasks erledigt sind – oder mindestens einer fehlgeschlagen ist oder abgebrochen wurde. In den letztgenannten zwei Fällen wirft das anschließende throwIfFailed() eine ExecutionException bzw. eine CancellationException.

Im Erfolgsfall tut scope.throwIfFailed() nichts, und wir können über Future.resultNow() die Ergebnisse der drei Tasks auslesen.

Falls du das Beispiel selbst ausprobieren möchtest: Preview-Features müssen explizit freigeschaltet und Incubator-Module müssen explizit dem Modulpfad hinzugefügt werden. Wenn du den Code beispielsweise in einer Datei namens StructuredConcurrencyTest.java gespeichert hast, kannst du ihn mit Java 19 wie folgt compilieren und starten:

$ javac --enable-preview -source 19 --add-modules jdk.incubator.concurrent StructuredConcurrencyTest.java
$ java --enable-preview --add-modules jdk.incubator.concurrent StructuredConcurrencyTestCode-Sprache: Klartext (plaintext)

Vorteile von Structured Concurrency

Der neue Ansatz bringt gegenüber dem alten folgende Verbesserungen:

  • Task und Subtasks bilden im Code eine abgeschlossene Einheit – es gibt keinen ExecutorService in einem höheren Scope. Die Threads kommen nicht aus einem Threadpool; stattdessen wird jeder Subtask in einem neuen virtuellen Thread ausgeführt.
  • Sobald in einem der Subtasks ein Fehler auftritt, werden alle anderen Subtasks abgebrochen.
  • Wenn der aufrufende Thread abgebrochen wird, werden auch die Subtasks abgebrochen.
  • Im Thread-Dump ist die Aufrufhierarchie zwischen aufrufendem Thread und Threads, die die Subtasks ausführen, ersichtlich.

StructuredTaskScope und Scoped Values

Die in Java 20 eingeführten Scoped Values werden beim Einsatz von StructuredTaskScope innerhalb eines Scopes automatisch an alle durch StructuredTaskScope.fork(...) erzeugten Kind-Threads weitervererbt.

Wie das genau funktioniert, zeige ich Dir an folgendem Code-Beispiel. Hier wird der StructuredTaskScope innerhalb der run-Methode eines ScopedValues ausgeführt:

private static final ScopedValue<String> INVOICE_NUMBER = ScopedValue.newInstance();

ScopedValue.where(INVOICE_NUMBER, "2022-437", () -> {
  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<Order> orderFuture =
        scope.fork(() -> loadOrderFromOrderService(orderId));

    Future<Customer> customerFuture =
        scope.fork(() -> loadCustomerFromDatabase(customerId));

    Future<String> invoiceTemplateFuture =
        scope.fork(() -> loadInvoiceTemplateFromFile(language));

    // ... same as above ...
  }
});Code-Sprache: Java (java)

Durch die Vererbung des Scoped Values INVOICE_NUMBER kann auf diesen auch innerhalb der Methoden loadOrderFromOrderService(), loadCustomerFromDatabase() und loadInvoiceTemplateFromFile() per INVOICE_NUMBER.get() zugegriffen werden – selbst wenn diese nicht in demjenigen Thread ausgeführt werden, der ScopedValue.where(...) aufruft, sondern eben in den durch StructuredTaskScope.fork(...) erzeugten Kind-Threads.

Fazit

Strukturierte Concurrency wird – aufbauend auf virtuelle Threads – die Verwaltung von Tasks, die in parallele Subtasks aufgeteilt werden, deutlich vereinfachen.

Bitte beachte, dass sich Structured Concurrency zum Stand von Java 20 noch im Incubator-Stadium befindet und somit noch grundlegenden Änderungen unterliegen kann.

Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.