structured concurrency javastructured concurrency java
HappyCoders Glasses

Structured Concurrency in Java with StructuredTaskScope

Sven Woltmann
Sven Woltmann
Last update: January 11, 2024

Structured Concurrency was developed – together with Virtual Threads and Scoped Values – in Project Loom. Structured Concurrency has been included in the JDK since Java 19 as an incubator feature (JEP 428) and since Java 21 as a preview feature (JEP 453).

In this article, you will learn:

  • Why do we need Structured Concurrency?
  • What is Structured Concurrency?
  • How does StructuredTaskScope work?
  • What StructuredTaskScope policies exist, and how can we write a custom policy?
  • What is the advantage of Structured Concurrency?

You can find a companion demo application in this GitHub repository.

Let’s start by looking at how we have implemented concurrent subtasks so far…

Why Do We Need Structured Concurrency?

Suppose a task consists of various – mainly blocking – subtasks that can be done concurrently (e.g., accessing data from a database or calling a remote API).

We could implement this using the Java executable framework, which could look like this, for example (InvoiceGenerator3_ThreadPool class in the demo application):

Invoice createInvoice(int orderId, int customerId, String language)
    throws InterruptedException, ExecutionException {
  Future<Order> orderFuture = 
      executor.submit(() -> orderService.getOrder(orderId));

  Future<Customer> customerFuture =
      executor.submit(() -> customerService.getCustomer(customerId));

  Future<InvoiceTemplate> invoiceTemplateFuture =
      executor.submit(() -> invoiceTemplateService.getTemplate(language));

  Order order = orderFuture.get();
  Customer customer = customerFuture.get();
  InvoiceTemplate invoiceTemplate = invoiceTemplateFuture.get();

  return Invoice.generate(order, customer, invoiceTemplate);
}Code language: Java (java)

We pass the three subtasks to the executor and wait for the partial results. The happy path is quickly implemented. But how do we handle exceptions?

  • If an error occurs in one subtask – how can we cancel the others? In the example above, if loadOrderFromOrderService(…) fails, then orderFuture.get() throws an exception, the createInvoice(…) method ends, and we may have two orphan threads still running.
  • How can we abort the subtasks when the parent task (“create invoice”) is cancelled – or when the complete application is shut down?
  • How can we – in an alternative use case – cancel the remaining subtasks when only the result of a single subtask is needed?

Everything is doable but requires exceptionally complex, hard-to-maintain code (you can find two examples in the GitHub repository: InvoiceGenerator2b_CompletableFutureCancelling and InvoiceGenerator4b_NewVirtualThreadPerTaskCancelling).

And what if we want to debug code of this kind? A thread dump, for example, would give us a bunch of threads named “pool-X-thread-Y” – but we wouldn’t know which pool thread belongs to which calling threads since all calling threads share the executor’s thread pool.

What is Unstructured Concurrency?

“Unstructured concurrency” means that our tasks run in a web of tangled threads whose start and end is hard to see in the code. Clean error handling is usually not present, and orphaned threads often occur when a control structure (in the example above: the createInvoice(…) method) ends:

Unstructured Concurrency
Unstructured Concurrency

What is Structured Concurrency?

Structured Concurrency, introduced in Java 19 as an incubator feature and in Java 21 as a preview feature, is a concept that improves the implementation, readability, and maintainability of code for dividing a task into subtasks and processing them concurrently.

For this purpose, it introduces a new control structure – the StructuredTaskScope class – that

  • defines a clear scope at the beginning of which the subtasks' threads start and at the end of which the subtasks' threads end,
  • that allows clean error handling,
  • and that allows a clean cancellation of subtasks whose results are no longer needed.

I will show you in the following sections, using several examples, what this means exactly.

StructuredTaskScope Example

We can implement Structured Concurrency using the StructuredTaskScope class. Using this class, we can rewrite the example as follows (InvoiceGenerator5_StructuredTaskScope class in the demo application):

Invoice createInvoice(int orderId, int customerId, String language)
    throws InterruptedException {
  try (var scope = new StructuredTaskScope<>()) {
    Subtask<Order> orderSubtask = 
        scope.fork(() -> orderService.getOrder(orderId));

    Subtask<Customer> customerSubtask = 
        scope.fork(() -> customerService.getCustomer(customerId));

    Subtask<InvoiceTemplate> invoiceTemplateSubtask =
        scope.fork(() -> invoiceTemplateService.getTemplate(language));

    scope.join();

    Order order = orderSubtask.get();
    Customer customer = customerSubtask.get();
    InvoiceTemplate template = invoiceTemplateSubtask.get();

    return Invoice.generate(order, customer, template);
  }
}Code language: Java (java)

We replace the ExecutorService in the scope of the class with a StructuredTaskScope located in the method’s scope – and executor.submit() with scope.fork().

Using scope.join(), we wait for all tasks to be completed. This eliminates the risk of orphaned threads.

After that, we can read the results of the three tasks via Subtask.get(). If an exception occurred in one of the tasks, Subtask.get() throws an IllegalStateException. Therefore it is better to query the state of a subtask with state() before calling get():

Order order;
if (orderSubtask.state() == Subtask.State.SUCCESS) {
  order = orderSubtask.get();
} else {
  // Handle error
}Code language: Java (java)

If you want to try the example yourself: you must explicitly enable preview features, in this case with --enable-preview --source 21. You can find detailed instructions in the README of the demo application.

StructuredTaskScope Policies

In the example above, we waited for all tasks to complete using StructuredTaskScope.join(). But if an exception occurs in one of the tasks, we can’t do anything with the results of the other two tasks – so why wait for them?

This is where the so-called “policies” come into play, allowing us to specify when a scope is finished, among other things. StructuredTaskScope defines two such policies, but we can also implement our own (more on that later).

“Shutdown on Failure” Policy

Using the “Shutdown on Failure” policy, we can specify that the occurrence of an exception in one task will cause all other tasks to be terminated.

We can use the “Shutdown on Failure” policy as follows (you can find the code in the InvoiceGenerator6_ShutdownOnFailure class in the GitHub repo):

Invoice createInvoice(int orderId, int customerId, String language)
    throws InterruptedException, ExecutionException {
  try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<Order> orderSubtask = 
        scope.fork(() -> orderService.getOrder(orderId));

    Subtask<Customer> customerSubtask = 
        scope.fork(() -> customerService.getCustomer(customerId));

    Subtask<InvoiceTemplate> invoiceTemplateSubtask =
        scope.fork(() -> invoiceTemplateService.getTemplate(language));

    scope.join();
    scope.throwIfFailed();

    Order order = orderSubtask.get();
    Customer customer = customerSubtask.get();
    InvoiceTemplate template = invoiceTemplateSubtask.get();

    return Invoice.generate(order, customer, template);
  }
}Code language: Java (java)

Compared to the previous example, I had to change only two things:

  • I replaced new StructuredTaskScope<>() with new StructuredTaskScope.ShutdownOnFailure() in the third line.
  • I added the command scope.throwIfFailed() after scope.join().

Now, if an exception occurs in any of the three tasks, all other subtasks are immediately interrupted, scope.join() returns, and scope.throwIfFailed() throws the failed subtask’s exception embedded in an ExecutionException.

In the sample code, the three subtasks throw an exception with some probability. If you run the program a few times, you will see how an exception in one task leads to an interruption in the other tasks and a termination of the program:

$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo1_invoice/InvoiceGenerator6_ShutdownOnFailure
[Thread[#1,main,5,main]] Forking tasks
[Thread[#1,main,5,main]] Waiting for all tasks to finish or one to fail
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Loading customer
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Loading order
[VirtualThread[#35]/runnable@ForkJoinPool-1-worker-1] Loading template
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-1] Finished loading customer
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-2] Error loading order
[VirtualThread[#35]/runnable@ForkJoinPool-1-worker-1] Template loading was interrupted
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error loading order
        [...]Code language: plaintext (plaintext)

By the way, you can also see from this output that all tasks are executed in virtual threads.

“Shutdown on Success” Policy

Alternatively, you can use new StructuredTaskScope.ShutdownOnSuccess() to create a scope with the “Shutdown on Success” policy. With this policy, the scope is terminated as soon as one subtask succeeds. The other subtasks are terminated, and the scope.result() method returns the result of the successful subtask.

Here’s an example of this – with a different use case: We want to verify a customer address using multiple external APIs at the same time, and we only want to use the first result (you can find the code in the AddressVerification2_ShutdownOnSuccess class in the GitHub repo):

AddressVerificationResponse verifyAddress(Address address)
    throws InterruptedException, ExecutionException {
  try (var scope = new ShutdownOnSuccess<AddressVerificationResponse>()) {
    scope.fork(() -> verificationService.verifyViaServiceA(address));
    scope.fork(() -> verificationService.verifyViaServiceB(address));
    scope.fork(() -> verificationService.verifyViaServiceC(address));

    scope.join();

    return scope.result();
  }
}Code language: Java (java)

If, contrary to expectations, all three invocations of the verifyViaServiceX() method threw an exception, scope.result() will rethrow the first of them, embedded in an ExecutionException.

If you run the sample code, you will see how the first successful subtask produces a result, and the other tasks get aborted:

$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo2_address/AddressVerification2_ShutdownOnSuccess
[Thread[#1,main,5,main]] Forking tasks
[Thread[#1,main,5,main]] Waiting for one task to finish
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Verifying address via service B
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Verifying address via service A
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1] Verifying address via service C
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1] Finished loading address via service C
[Thread[#1,main,5,main]] Retrieving result
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-3] Verifying address via service B was interrupted
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-2] Verifying address via service A was interruptedCode language: plaintext (plaintext)

Note that the result() method is only available on ShutdownOnSuccess, and the throwIfFailed() method is only available on ShutdownOnFailure.

How to Write a Custom StructuredTaskScope Policy

If neither of the two standard policies is suitable for your purpose, you can write your own policy with relatively little effort.

Suppose we want to check the availability of a product from multiple suppliers, and we don't want to use the first result, but the one with the fastest availability. We want to propagate failed requests only if the requests failed for all suppliers.

That is surprisingly simple to realize – and in a way that makes it reusable for other deployment scenarios. First, here’s the policy (BestResultScope class in the GitHub repo).

BestResultScope extends the StructuredTaskScope class, overrides its handleComplete() method, and adds a resultOrElseThrow() method. As a constructor parameter, it takes a Comparator, which we will use later to define the best result as the fastest availability:

public class BestResultScope<T> extends StructuredTaskScope<T> {

  private final Comparator<T> comparator;

  private T bestResult;
  private final List<Throwable> exceptions = 
      Collections.synchronizedList(new ArrayList<>());

  public BestResultScope(Comparator<T> comparator) {
    this.comparator = comparator;
  }

  @Override
  protected void handleComplete(Subtask<? extends T> subtask) {
    switch (subtask.state()) {
      case UNAVAILABLE -> {
        // Ignore
      }
      case SUCCESS -> {
        T result = subtask.get();
        synchronized (this) {
          if (bestResult == null || comparator.compare(result, bestResult) > 0) {
            bestResult = result;
          }
        }
      }
      case FAILED -> exceptions.add(subtask.exception());
    }
  }

  public <X extends Throwable> T resultOrElseThrow(
      Supplier<? extends X> exceptionSupplier) throws X {
    ensureOwnerAndJoined();
    if (bestResult != null) {
      return bestResult;
    } else {
      X exception = exceptionSupplier.get();
      exceptions.forEach(exception::addSuppressed);
      throw exception;
    }
  }
}
Code language: Java (java)

The handleComplete(…) method is called for each terminated subtask – both successful ones and those that threw an exception. We check which case has occurred with subtask.state().

If successful, we fetch the result with subtask.get() and write it – if it is better than the best result so far – in a thread-safe way into the bestResult field.

In case of an exception, we collect them in a thread-safe list.

The resultOrElseThrow() method first ensures by calling ensureOwnerAndJoined() that it has been called from the same thread that created the StructuredTaskScope and that this thread has previously called join() or joinUntil(…).

resultOrElseThrow() then checks if a successful result is available and, if yes, returns it. Otherwise it throws the specified exception to which it appends the collected exceptions as “suppressed exceptions.”

We can use the custom policy as follows (SupplierDeliveryTimeCheck2_StructuredTaskScope class in the GitHub repo):

SupplierDeliveryTime getSupplierDeliveryTime(String productId, List<String> supplierIds)
    throws SupplierDeliveryTimeCheckException, InterruptedException {
  try (var scope =
      new BestResultScope<>(
          Comparator.comparing(SupplierDeliveryTime::deliveryTimeHours).reversed())) {
    for (String supplierId : supplierIds) {
      scope.fork(() -> service.getDeliveryTime(productId, supplierId));
    }

    scope.join();
    return scope.resultOrElseThrow(SupplierDeliveryTimeCheckException::new);
  }
}Code language: Java (java)

The output of the sample program could look like this, for example:

$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo3_suppliers/SupplierDeliveryTimeCheck2_StructuredTaskScope
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Retrieving delivery time from supplier B
[VirtualThread[#33]/runnable@ForkJoinPool-1-worker-4] Retrieving delivery time from supplier D
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-3] Retrieving delivery time from supplier E
[VirtualThread[#32]/runnable@ForkJoinPool-1-worker-5] Retrieving delivery time from supplier C
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Retrieving delivery time from supplier A
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-3] Error retrieving delivery time from supplier B
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-5] Finished retrieving delivery time from supplier A: 110 hours
[VirtualThread[#32]/runnable@ForkJoinPool-1-worker-3] Finished retrieving delivery time from supplier C: 104 hours
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-3] Error retrieving delivery time from supplier E
[VirtualThread[#33]/runnable@ForkJoinPool-1-worker-3] Finished retrieving delivery time from supplier D: 51 hours
[Thread[#1,main,5,main]] Response: SupplierDeliveryTime[supplier=D, deliveryTimeHours=51]Code language: plaintext (plaintext)

You can see how although the call for suppliers B and E failed, the remaining suppliers delivered results and in the end, the best result – supplier D with 51 hours delivery time – is returned.

Nested StructuredTaskScopes

If we want to query not only the suppliers for one product at a time, but the suppliers for multiple products, we can easily solve this, for instance, with the following method (class SupplierDeliveryTimeCheck3_NestedStructuredTaskScope in the GitHub repo):

List<SupplierDeliveryTime> getSupplierDeliveryTimes(
    List<String> productIds, List<String> supplierIds) throws InterruptedException {
  try (var scope = new StructuredTaskScope<SupplierDeliveryTime>()) {
    List<Subtask<SupplierDeliveryTime>> subtasks =
        productIds.stream()
            .map(productId -> 
                scope.fork(() -> getSupplierDeliveryTime(productId, supplierIds)))
            .toList();

    scope.join();

    return subtasks.stream()
        .filter(subtask -> subtask.state() == State.SUCCESS)
        .map(Subtask::get)
        .toList();
  }
}Code language: Java (java)

Here we create a StructuredTaskScope – and within this scope, we fork subtasks, which in turn call the getSupplierDeliveryTime(…) method shown in the previous section, which thus open nested scopes within the scope of getSupplierDeliveryTimes(…).

The following image shows these scopes as dashed lines:

Nested StructuredTaskScopes
Nested StructuredTaskScopes

Advantages of Structured Concurrency

Structured Concurrency is characterized by start and end points of concurrent subtasks clearly visible in the code. Errors in the subtasks are propagated to the parent scope. This makes the code easier to read and maintain and ensures that all started threads are finished at the end of a scope.

The following figure shows a comparison of unstructured and structured concurrency:

Unstructured Concurrency vs. Structured Concurrency
Unstructured Concurrency vs. Structured Concurrency

Advantages of StructuredTaskScope

With StructuredTaskScope we have a Java language construct for structured concurrency:

  • Task and subtasks form a self-contained unit in the code – there is no ExecutorService in a higher scope. The threads do not come from a thread pool; instead, each subtask is executed in a new virtual thread.
  • The scope spanned by the try-with-resources block results in clear start and end points of all threads.
  • At the end of the scope, all threads are finished.
  • Errors within the subtasks are propagated cleanly to the parent scope.
  • Depending on the policy, the remaining subtasks are aborted if a subtask was successful or if an error occurred in a subtask.
  • When the calling thread is canceled, the subtasks are also canceled.
  • The call hierarchy between the calling thread and the subtask-executing threads is visible in the thread dump.

In addition, StructuredTaskScope helps with debugging: If we create a thread dump in the new JSON format (jcmd <pid> Thread.dump_to_file -format=json <file>), then it will reflect the call hierarchy between parent and child threads.

StructuredTaskScope and Scoped Values

Scoped Values, introduced in Java 20 as an incubator feature and also in Java 21 as a preview, are automatically inherited by all child threads created by StructuredTaskScope.fork(…) when StructuredTaskScope is used within the scope of a Scoped Value.

I’ll show you exactly how this works with the following code example (SupplierDeliveryTimeCheck4_NestedStructuredTaskScopeUsingScopedValue class in the demo application). We create a ScopedValue – in the example, for an API key, bind it to the API key, and then call the getSupplierDeliveryTimes(…) method shown in the section “Nested StructuredTaskScopes” within the scope via call():

public static final ScopedValue<String> API_KEY = ScopedValue.newInstance();

List<SupplierDeliveryTime> getSupplierDeliveryTimes(
    List<String> productIds, List<String> supplierIds, String apiKey) throws Exception {
  return ScopedValue.where(API_KEY, apiKey)
      .call(() -> getSupplierDeliveryTimes(productIds, supplierIds));
}Code language: Java (java)

Due to the inheritance of the scoped value API_KEY, it can also be accessed within the SupplierDeliveryTimeService.getDeliveryTime(…) method without having to pass it through method arguments to this method – even if the methods are not executed in the thread that calls ScopedValue.where(…) but in the child or, in this example, even grandchild threads created by StructuredTaskScope.fork(…).

Summary

Structured Concurrency – building on virtual threads – will significantly simplify the management of tasks split into concurrent subtasks. Policies allow us to influence the behavior of StructuredTaskScope, e.g., to abort all tasks should one of them fail.

Please note that as of Java 21, Structured Concurrency is still in the preview stage and, thus, may still be subject to minor changes.

If you still have questions, please ask them via the comment function. Do you want to be informed about new tutorials and articles? Then click here to sign up for the HappyCoders.eu newsletter.