Contents

Structured concurrency in Java

Writen by: David Vlijmincx

Introduction

Java 19 will introduce the incubating Structured Concurrency project (JEP 428). As the name suggests, the project will add structured concurrency capabilities to Java. This post will walk through how to use the new APIs for handling multithreaded code.

Why use structured concurrency?

When you create a new Thread in Java, a system call is done to the OS, telling it to create a new system thread. Creating a system thread is expensive because the call takes up much time, and each thread takes up some memory. Your threads also share the same CPU, so you don't want them to block it, causing other threads to wait unnecessarily.

You can use asynchronous programming to prevent this from happening. You start a thread and tell it what to do when the data arrives. Until the data is available, other threads can use the CPU recourses for their task. In the example below, we use the CompletableFuture to get some data and tell it to print it to the console when the data is available.

1
2
CompletableFuture.supplyAsync(() -> "some data")
                .thenAccept(System.out::println);

Asynchronous programming works fine, but there is another way to work and think about concurrency, called structured concurrency. Structured concurrency is a Java enhancement proposal (JEP) for developing concurrent applications. It aims to make it easier to write and debug multithreaded code in Java.

What is structured concurrency

With JEP 428, we get a “Structured concurrency” model to work with and think about threads. The idea behind structured concurrency is to make the lifetime of a thread work the same as code blocks in structured programming. For example, in a structured programming language like Java, If you call method B inside method A then method B must be finished before you can exit method A. The lifetime of method B can't exceed that of method A.

With Structured concurrency, we want the same kind of rules as with structured programming. When you create virtual thread X inside virtual thread Y, the lifetime of thread X can't exceed that of thread Y. Structured concurrency makes working and thinking about threads a lot easier. For example, when you stop parent thread Y, all its child threads will also be cancelled, so you don't have to be afraid of runaway threads still running. The crux of the pattern is to avoid fire and forget concurrency.

Thread YThread XStart thread XPerform workPerform workDone!Thread Y is finishedbecause its work andThreads X work is doneThread YThread X

Thread Y starts a new thread X; both work separately from each other, but before thread Y can finish, it has to wait for thread X to have completed its work. Let's see what that looks like in Java!

Invoke all pattern - with structured concurrency

Structured concurrency binds the lifetime of threads to the code block which created them. The binding is done by using the StructuredTaskScope inside a try-with-recourses. In this try, you have a scope object you use to fork new child threads.

In the example below, we create a scope using new StructuredTaskScope.ShutdownOnFailure(). This type of scope shutdowns every running child thread if one of them throws an exception during execution. We will only get a result when all the created child threads finish successfully. This behaviour looks similar to the invokeAll method of the ExecutorService.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
String getDog() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String>  name  = scope.fork(this::getName);
        Future<String> breed = scope.fork(this::dogBreed);

        scope.join();
        scope.throwIfFailed();

        return "it's name is:" + name.resultNow() + ", and is a " + breed.resultNow();
    }
}

String getName(){return "Max";}

String dogBreed(){return "Golden retriever";}

Invoke any pattern - with structured concurrency

Unlike the Invoke all pattern that waits for every child thread to finish. Instead, the invoke any pattern will return the result of the first thread to finish and shut down the remaining child threads. The example below looks similar to the previous one with some subtle changes. Now we initiate the StructuredTaskScope like this new StructuredTaskScope.ShutdownOnSuccess<String>(), and we don't create holders for future references.

Only after calling scope.join() we can get the result of the first finished thread by calling scope.result(). The example below will return “result: Golden retriever”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
String getDog() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
        scope.fork(this::getName);
        scope.fork(this::dogBreed);

        scope.join();

        return "result: " + scope.result();
    }
}

String getName() throws InterruptedException {
    Thread.sleep(5000);
    return "Max";
}

String dogBreed(){return "Golden retriever";}

Exceptions and structured concurrency

We will look back at the example of the Invoke all pattern to see what happens when an exception is thrown. The second child thread that retrieves the dog's breed will throw an exception while the first child thread is still running. When the exception is thrown, the StructuredTaskScope will shut down every child thread it has created. To get the underlying exception, you must call the throwIfFailed() method on your scope object. Please remember that you can only call throwIfFailed() after you call join().

If you don't call the throwIfFailed() method, the exception you will see in the console will be an IllegalStateException. This is confusing because we throw a RuntimeException() in the dogBreed method. You need to call the throwIfFailed method to show the RuntimeException in the console.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
String getDog() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String>  name  = scope.fork(this::getName);
        Future<String> breed = scope.fork(this::dogBreed);

        scope.join();
        scope.throwIfFailed();

        return "it's name is:" + name.resultNow() + ", and is a " + breed.resultNow();
    }
}

String getName() throws InterruptedException {
    Thread.sleep(5000);
    return "Max";
}

String dogBreed(){
    throw new RuntimeException();
}

Conclusion

This post looked at the benefits of structured concurrency to the Java language and how it is implemented in Java 19 EA with JEP 428. We went over how to create different kinds of scopes for threads and what happens when one of the virtual threads in a scope throws an error.

References and further reading

 




Questions, comments, concerns?

Have a question or comment about the content? Feel free to reach out!