structured concurrency javastructured concurrency java
HappyCoders Glasses

Structured Concurrency in Java with StructuredTaskScope

Sven Woltmann
Sven Woltmann
June 13, 2022

Structured Concurrency was developed – together with Virtual Threads and Scoped Values – in Project Loom. Structured Concurrency has been included in the JDK since Java 19 as an Incubator feature (JDK Enhancement Proposal 428).

In this article, you will learn:

  • Why do we need Structured Concurrency?
  • What is Structured Concurrency?
  • How does StructuredTaskScope work?
  • What is the advantage of Structured Concurrency?

Let's start by looking at how we have implemented parallel subtasks so far…

Why Do We Need Structured Concurrency?

Suppose a task consists of subtasks that can be done in parallel (e.g., accessing data from a database, calling a remote API, and loading a file).

We could implement this using the Java executable framework, which could look like this, for example:

private final ExecutorService executor = Executors.newCachedThreadPool(); public Invoice createInvoice(int orderId, int customerId, String language) throws ExecutionException, InterruptedException { Future<Order> orderFuture = executor.submit(() -> loadOrderFromOrderService(orderId)); Future<Customer> customerFuture = executor.submit(() -> loadCustomerFromDatabase(customerId)); Future<String> invoiceTemplateFuture = executor.submit(() -> loadInvoiceTemplateFromFile(language)); Order order = orderFuture.get(); Customer customer = customerFuture.get(); String invoiceTemplate = invoiceTemplateFuture.get(); return Invoice.generate(order, customer, invoiceTemplate); }
Code language: Java (java)

We pass the three subtasks to the executor and wait for the partial results. The happy path is quickly implemented. But how do we handle exceptions?

  • If an error occurs in one subtask – how can we cancel the others?
  • How can we cancel the subtasks if the invoice is no longer needed?

Both are possible but require complex and difficult-to-maintain code.

And what if we want to debug code of this kind? A thread dump, for example, would give us a bunch of threads named "pool-X-thread-Y" – but we wouldn't know which pool thread belongs to which calling threads since all calling threads share the executor's thread pool.

What is Structured Concurrency?

Structured Concurrency, introduced in Java 19, is a concept that improves the implementation, readability, and maintainability of code for dividing a task into subtasks and processing them in parallel.

StructuredTaskScope Example

We can implement Structured Concurrency using the StructuredTaskScope class. Using this class, we can rewrite the example as follows:

Invoice createInvoice(int orderId, int customerId, String language) throws ExecutionException, InterruptedException { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Future<Order> orderFuture = scope.fork(() -> loadOrderFromOrderService(orderId)); Future<Customer> customerFuture = scope.fork(() -> loadCustomerFromDatabase(customerId)); Future<String> invoiceTemplateFuture = scope.fork(() -> loadInvoiceTemplateFromFile(language)); scope.join(); scope.throwIfFailed(); Order order = orderFuture.resultNow(); Customer customer = customerFuture.resultNow(); String invoiceTemplate = invoiceTemplateFuture.resultNow(); return new Invoice(order, customer, invoiceTemplate); } }
Code language: Java (java)

So we replace the ExecutorService in the scope of the class with a StructuredTaskScope located in the method's scope – and executor.submit() with scope.fork().

Using scope.join(), we wait for all tasks to be completed – or at least one to fail or be canceled. In the latter two cases, the subsequent throwIfFailed() throws an ExecutionException or a CancellationException.

In case of success, scope.throwIfFailed() does nothing, and we can use Future.resultNow() to read the results of the three tasks.

If you want to try the example yourself: you must explicitly enable preview features and add the incubator module to the module path. For example, if you have saved the code in a file named StructuredConcurrencyTest.java, you can compile and run it with Java 19 as follows:

$ javac --enable-preview -source 19 --add-modules jdk.incubator.concurrent StructuredConcurrencyTest.java $ java --enable-preview --add-modules jdk.incubator.concurrent StructuredConcurrencyTest
Code language: plaintext (plaintext)

Advantages of Structured Concurrency

The new approach brings the following improvements over the old one:

  • Task and subtasks form a self-contained unit in the code – there is no ExecutorService in a higher scope. The threads do not come from a thread pool; instead, each subtask is executed in a new virtual thread.
  • As soon as an error occurs in one of the subtasks, all other subtasks get canceled.
  • When the calling thread is canceled, the subtasks are also canceled.
  • The call hierarchy between the calling thread and the subtask-executing threads is visible in the thread dump.

StructuredTaskScope and Scoped Values

Scoped Values, introduced in Java 20, are automatically inherited by all child threads created by StructuredTaskScope.fork(…) when StructuredTaskScope is used within the scope of a Scoped Value.

I'll show you exactly how this works with the following code example. Here the StructuredTaskScope is executed within the run method of a ScopedValue:

private static final ScopedValue<String> INVOICE_NUMBER = ScopedValue.newInstance(); ScopedValue.where(INVOICE_NUMBER, "2022-437", () -> { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Future<Order> orderFuture = scope.fork(() -> loadOrderFromOrderService(orderId)); Future<Customer> customerFuture = scope.fork(() -> loadCustomerFromDatabase(customerId)); Future<String> invoiceTemplateFuture = scope.fork(() -> loadInvoiceTemplateFromFile(language)); // ... same as above ... } });
Code language: Java (java)

Due to the inheritance of the scoped value INVOICE_NUMBER, it can also be accessed within the methods loadOrderFromOrderService(), loadCustomerFromDatabase(), and loadInvoiceTemplateFromFile() via INVOICE_NUMBER.get() – even if they are not executed in the thread that calls ScopedValue.where(…) but in the child threads created by StructuredTaskScope.fork(…).

Summary

Structured Concurrency – building on virtual threads – will greatly simplify the management of tasks split into parallel subtasks.

Please note that as of Java 20, Structured Concurrency is still in the incubator stage and, thus, may still be subject to fundamental changes.

If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Then click here to sign up for the HappyCoders.eu newsletter.