Unit 39: Asynchronous Programming
Learning Objectives
After completing this unit, students should be able to:
- explain the limitations of using
Threaddirectly for coordinating concurrent tasks - describe how task dependencies and execution order can be expressed using higher-level abstractions
- use
CompletableFutureto compose and execute asynchronous computations - distinguish between synchronous and asynchronous chaining operations in
CompletableFuture - handle exceptions arising from asynchronous computations using
CompletableFuture
Overview
While working directly with Thread provides flexibility, it also places a significant burden on the programmer. Coordinating execution order, sharing results safely, handling exceptions, and managing thread lifecycles quickly become complex as the number of interacting tasks increases.
In this unit, we move away from direct thread management and introduce asynchronous programming as a higher-level approach to concurrency. Rather than focusing on threads, we focus on tasks and their dependencies. Using CompletableFuture, we will see how asynchronous computations can be composed and executed concurrently in a structured way, with built-in support for combining results and handling errors. This approach leads to concurrent programs that are clearer, more expressive, and easier to reason about.
Limitations of Thread
Writing code directly with the Thread class gives us control over how many threads to create, what they do, how they communicate with each other, and some level of control on which thread gets executed when. Java's Thread is already a higher-level abstraction compared to, say, the pthread library in C and C++. It, however, still takes significant effort to write complex multi-threaded programs in Java.
Consider the situation where we have a series of tasks that we wish to execute concurrently and we want to organize them such that:
- Task A must start first.
- When Task A is done, we take the result from Task A, and pass it to Tasks B, C, and D.
- We want Task B and C to complete before we pass their results to Task E.
We also want to handle exceptions gracefully — if one of the tasks encounters an exception, the other tasks not dependent on it should still be completed.
Implementing the above using Thread requires careful coordination. Firstly, there are no methods in Thread that return a value. We need the threads to communicate through shared variables. Secondly, there is no mechanism to specify the execution order and dependencies between threads. Finally, we have to consider the possibility of exceptions in each of our tasks.
Another drawback of using Thread is its overhead — the creation of Thread instances takes up some resources in Java. As much as possible, we should reuse our Thread instances to run multiple tasks. For instance, the same Thread instance could have run Tasks A, B, and E in the example above. Managing the Thread instances and deciding which Thread instance should run which task is a gigantic undertaking.
A Higher-Level Abstraction
What we need is a higher-level abstraction that allows programmers to focus on specifying the tasks and their dependencies, without worrying about the details. Suppose we want to run the tasks in a single thread, we could do the following:
1 2 3 4 5 6 7 8 | |
We could also use monads to chain up the computations. Let's say that one of the tasks might not produce a value, then we can use the Maybe<T> monad:
1 2 3 4 5 6 7 8 | |
taskA to taskD now returns Maybe<T> instead of T.
If we want to perform the tasks lazily, then we can use the Lazy<T> monad:
1 2 3 4 5 6 7 8 | |
taskA to taskD modified to Lazy<T> instead of T.
It would be useful if there is a monad that allows us to perform the tasks concurrently. java.util.concurrent.CompletableFuture is such monad. Here is an example of how to use it:
1 2 3 4 5 6 7 8 | |
taskA to taskD now returns CompletableFuture<T> instead of T.
We can then run foo(x).get() to wait for all the concurrent tasks to complete and return us the value. CompletableFuture<T> is a monad that encapsulates a value that is either there or not there yet. Such an abstraction is also known as a promise in other languages (e.g., Promise in JavaScript and std::promise in C++) — it encapsulates the promise to produce a value.
The CompletableFuture Monad
Let's now examine the CompletableFuture monad in more detail. A key property of CompletableFuture is whether the value it promises is ready — i.e., the tasks that it encapsulates have been completed or not.
Creating a CompletableFuture
There are several ways we can create a CompletableFuture<T> instance:
- Use the
completedFuturemethod. This method is equivalent to creating a task that is already completed and return us a value. - Use the
runAsyncmethod that takes in aRunnablelambda expression.runAsynchas the return type ofCompletableFuture<Void>. The returnedCompletableFutureinstance completes when the given lambda expression finishes. - Use the
supplyAsyncmethod that takes in aSupplier<T>lambda expression.supplyAsynchas the return type ofCompletableFuture<T>. The returnedCompletableFutureinstance completes when the given lambda expression finishes.
We can also create a CompletableFuture that relies on other CompletableFuture instances. We can use allOf or anyOf methods for this. Both of these methods take in a variable number of other CompletableFuture instances. A new CompletableFuture created with allOf is completed only when all the given CompletableFuture instances complete. On the other hand, a new CompletableFuture created with anyOf is completed when any one of the given CompletableFuture instance completes.
Chaining CompletableFuture
The usefulness of CompletableFuture comes from the ability to chain them up and specify the dependencies of computations to be run. We have the following methods:
thenApply, which is analogous tomapthenCompose, which is analogous toflatMapthenCombine, which is analogous tocombine
The methods above run the given lambda expression in the same thread as the caller. There is also an asynchronous version (thenApplyAsync, thenComposeAsync, thenCombineAsync), which may cause the given lambda expression to run in a different thread (thus more concurrency).
CompletableFuture also has several methods that take in Runnable.
thenRuntakes in aRunnable. It executes theRunnableafter the current stage is completed.runAfterBothtakes in anotherCompletableFuture1 and aRunnable. It executes theRunnableafter the current stage completes and the inputCompletableFutureis completed.runAfterEithertakes in anotherCompletableFuture1 and aRunnable. It executes theRunnableafter the current stage completes or the inputCompletableFutureis completed.
All of the methods that take in Runnable return CompletableFuture<Void>. Similarly, they also have the asynchronous version (thenRunAsync, runAfterBothAsync, runAfterEitherAsync).
Note that chaining of CompletableFuture instances is not limited to linear dependencies. In the example above,
1 2 3 | |
There is a branch after a completes. Both b and c depend on a, but they are independent of each other. Since a calls thenComposeAsync, both b and c can run concurrently after a completes.
An alternative is to call thenCompose instead of thenComposeAsync.
1 2 3 | |
b and c would still run sequentially in the same thread as the caller, one after the other, after a completes. The CompletableFuture implementation keeps a stack of tasks to run after it completes. The tasks are run in the reverse order they were added to the stack. Hence, in the example above, taskC would run first, followed by taskB.
Besides branching, we can also merge two CompletableFuture instances. For instance, the following merges b and c after they both complete:
1 | |
The other merge method is allOf.
1 | |
b and c complete. Note that the return type of allOf is CompletableFuture<Void> since there is no single value to return.
Getting the Result
After we have set up all the tasks to run asynchronously, we have to wait for them to complete. We can call get() to get the result. Since get() is a synchronous call, i.e., it blocks until the CompletableFuture completes, to maximize concurrency, we should only call get() as the final step in our code.
The method CompletableFuture::get throws a couple of checked exceptions: InterruptedException and ExecutionException, which we need to catch and handle. The former refers to an exception indicating the the thread was interrupted, while the latter refers to errors/exceptions during execution.
An alternative to get() is join(). join() behaves just like get() except that no checked exception is thrown.
Example
Let's look at some examples. We reuse our method that computes the i-th prime number.
1 2 3 4 5 6 7 8 | |
Given two numbers i and j, we want to find the difference between the i-th prime number and the j-th prime number. We can first do the following:
1 2 | |
These calls would launch two concurrent threads to compute the i-th and the j-th primes. The method call to supplyAsync returns immediately without waiting for findIthPrime to complete.
Next, we can say, that, when ith and jth are complete, take the value computed by them, and take the difference. We can use the thenCombine method:
1 | |
This statement creates another CompletableFuture that runs asynchronously and computes the difference between the two prime numbers. At this point, we can move on to run other tasks, or if we just want to wait until the result is ready, we call
1 | |
to get the difference between the two primes2.
Handling Exceptions
One of the advantages of using CompletableFuture<T> instead of Thread to handle concurrency is its ability to handle exceptions. CompletableFuture<T> has three methods that deal with exceptions: exceptionally, whenComplete, and handle. We will focus on handle since it is the most general.
Suppose we have a computation inside a CompletableFuture<T> that might throw an exception. Since the computation is asynchronous and could run in a different thread, the question of which thread should catch and handle the exception arises. CompletableFuture<T> keeps things simpler by storing the exception and passing it down the chain of calls, until join() is called. join() might throw CompletionException and whoever calls join() will be responsible for handling this exception. The CompletionException contains information on the original exception.
For instance, the code below would throw a CompletionException with a NullPointerException contained within it.
1 2 3 | |
Suppose we want to continue chaining our tasks despite exceptions. We can use the handle method, to handle the exception. The handle method takes in a BiFunction (similar to cs2030s.fp.Combiner). The first parameter of the BiFunction is the value, the second is the exception. The funtion's return value becomes the result of the new CompletableFuture returned by handle.
Only one of the first two parameters is not null. If the value is null, this means that an exception has been thrown. Otherwise, the exception is null3.
Here is a simple example where we use handle to replace a default value.
1 2 3 | |
-
Actually, this is a
CompletionStagewhich is a supertype ofCompletableFuture. ↩↩ -
There is repeated computation in primality checks between the two calls to
findIthPrimehere, which one could optimize. We don't do that here to keep the example simple. ↩ -
This is another instance where Java uses
nullto indicate a missing value. We can't usenullas a legitimate value due to this flawed design. ↩