Unit 39: Asynchronous Programming
Learning Objectives
Students should
- understand the limitation of thread.
- understand and be able to use
CompletableFuture
.
Limitations of Thread
Writing code directly with the Thread
class gives us control on how many threads to create, what they do, how they communicate with each other, and some level of control on which thread gets executed when. While Java's Thread
is already a higher-level abstraction compared to, say, the pthread
library in C and C++, it still takes a fair amount of effort to write complex multi-threaded programs in Java.
Consider the situation where we have a series of tasks that we wish to execute concurrently and we want to organize them such that:
- Task A must start first.
- When Task A is done, we take the result from Task A, and pass it to Tasks B, C, and D.
- We want Task B and C to complete before we pass their results to Task E.
We also want to handle exceptions gracefully -- if one of the tasks encounters an exception, the other tasks not dependent on it should still be completed.
Implementing the above using Thread
requires careful coordination. Firstly, there are no methods in Thread
that return a value. We need the threads to communicate through shared variables. Secondly, there is no mechanism to specify the execution order and dependencies among them -- which thread to start after another thread completes. Finally, we have to consider the possibility of exceptions in each of our tasks.
Another drawback of using Thread
is its overhead -- the creation of Thread
instances takes up some resources in Java. As much as possible, we should reuse our Thread
instances to run multiple tasks. For instance, the same Thread
instance could have run Tasks A, B, and E in the example above. Managing the Thread
instances itself and deciding which Thread
instance should run which Thread
is a gigantic undertaking.
A Higher-Level Abstraction
What we need is a higher-level abstraction that allows programmers to focus on specifying the tasks and their dependencies, without worrying about the details. Suppose we want to run the tasks in a single thread, we could do the following:
1 2 3 4 5 6 7 8 |
|
We could also use monads to chain up the computations. Let's say that one of the tasks might not produce a value, then we can use the Maybe<T>
monad:
1 2 3 4 5 6 7 8 |
|
Assume there is such a Maybe::combine
method to combine the result of two monads. This can also be done using a nested map
. Now, if we want to perform the tasks lazily, then we can use the Lazy<T>
monad:
1 2 3 4 5 6 7 8 |
|
In fact, a monad is an abstraction of a computation! It can be used to combine program fragments (e.g., a sequence of statements, which can also be written as functions) and we return a data with context (side information) such that we can perform additional computation (i.e., the monad!). This kind of functions are called monadic functions.
So, wouldn't it be nice if there is a monad that allows us to perform the tasks concurrently? java.util.concurrent.CompletableFuture
does just that! Here is an example of how to use it:
1 2 3 4 5 6 7 8 |
|
We can then run foo(x).get()
to wait for all the concurrent tasks to complete and return us the value. CompletableFuture<T>
is a monad that encapsulates a value that is either there or not there yet. Such an abstraction is also known as a promise in other languages (e.g., Promise
in JavaScript and std::promise
in C++)
-- it encapsulates the promise to produce a value.
The CompletableFuture
Monad
Let's now examine the CompletableFuture
monad in more detail. A key property of CompletableFuture
is whether the value it promises is ready -- i.e., the tasks that it encapsulates has completed or not.
Creating a CompletableFuture
There are several ways we can create a CompletableFuture<T>
instance:
- Use the
completedFuture
method. This method is equivalent to creating a task that is already completed and return us a value. - Use the
runAsync
method that takes in aRunnable
lambda expression.runAsync
has the return type ofCompletableFuture<Void>
. The returnedCompletableFuture
instance completes when the given lambda expression finishes. - Use the
supplyAsync
method that takes in aSupplier<T>
lambda expression.supplyAsync
has the return type ofCompletableFuture<T>
. The returnedCompletableFuture
instance completes when the given lambda expression finishes.
We can also create a CompletableFuture
that relies on other CompletableFuture
instances. We can use allOf
or anyOf
methods for this. Both of these methods take in a variable number of other CompletableFuture
instances. A new CompletableFuture
created with allOf
is completed only when all the given CompletableFuture
completes. On the other hand, a new CompletableFuture
created with anyOf
is completed when any one of the given CompletableFuture
completes.
Chaining CompletableFuture
The usefulness of CompletableFuture
comes from the ability to chain them up and specify a sequence of computations to be run. We have the following methods:
thenApply
, which is analogous tomap
thenCompose
, which is analogous toflatMap
thenCombine
, which is analogous tocombine
The methods above run the given lambda expression in the same thread as the caller. There is also an asynchronous version (thenApplyAsync
, thenComposeAsync
, thenCombineAsync
), which may cause the given lambda expression to run in a different thread (thus more concurrency).
CompletableFuture
also has several methods that takes in Runnable
. These methods have no analogy in our lab but it is similar to runAsync
above.
thenRun
takes in aRunnable
. It executes theRunnable
after the current stage is completed.runAfterBoth
takes in anotherCompletableFuture
1 and aRunnable
. It executes theRunnable
after the current stage completes and the inputCompletableFuture
are completed.runAfterEither
takes in anotherCompletableFuture
1 and aRunnable
. It executes theRunnable
after the current stage completes or the inputCompletableFuture
are completed.
All of the methods that takes in Runnable
return CompletableFuture<Void>
. Similarly, they also have the asynchronous version (thenRunAsync
, runAfterBothAsync
, runAfterEitherAsync
).
Getting The Result
After we have set up all the tasks to run asynchronously, we have to wait for them to complete. We can call get()
to get the result. Since get()
is a synchronous call, i.e., it blocks until the CompletableFuture
completes, to maximize concurrency, we should only call get()
as the final step in our code.
The method CompletableFuture::get
throws a couple of checked exceptions: InterruptedException
and ExecutionException
, which we need to catch and handle. The former refers to the exception that the thread has been interrupted, while the latter refers to errors/exceptions during execution.
An alternative to get()
is join()
. join()
behaves just like get()
except that no checked exception is thrown.
Example
Let's look at some examples. Let's reuse our method that computes the i-th prime number.
1 2 3 4 5 6 7 8 |
|
Given two numbers i and j, we want to find the difference between the i-th prime number and the j-th prime number. We can first do the following:
1 2 |
|
These calls would launch two concurrent threads to compute the i-th and the j-th primes. The method calls supplyAsync
returns immediately without waiting for findIthPrime
to complete.
Next, we can say, that, when ith
and jth
complete, take the value computed by them, and take the difference. We can use the thenCombine
method:
1 |
|
This statement creates another CompletableFuture
which runs asynchronously that will compute the difference between the two prime numbers. At this point, we can move on to run other tasks, or if we just want to wait until the result is ready, we call
1 |
|
to get the difference between the two primes2.
Handling Exceptions
One of the advantages of using CompletableFuture<T>
instead of Thread
to handle concurrency is its ability to handle exceptions. CompletableFuture<T>
has three methods that deal with exceptions: exceptionally
, whenComplete
, and handle
. We will focus on handle
since it is the most general.
Suppose we have a computation inside a CompletableFuture<T>
that might throw an exception. Since the computation is asynchronous and could run in a different thread, the question of which thread should catch and handle the exception arises. CompletableFuture<T>
keeps things simpler by storing the exception and passing it down the chain of calls, until join()
is called. join()
might throw CompletionException
and whoever calls join()
will be responsible for handling this exception. The CompletionException
contains information on the original exception.
For instance, the code below would throw a CompletionException
with a NullPointerException
contains within it.
1 2 3 |
|
Suppose we want to continue chaining our tasks despite exceptions. We can use the handle
method, to handle the exception. The handle
method takes in a BiFunction
(imagine if we add a cs2030s.fp.BiTransformer<T, U, R>
). The first parameter to the BiFunction
is the value, the second is the exception, the third is the return value.
Only one of the first two parameters is meaningful. If the exception is NOT null
, this means that an exception has been thrown and we can use the exception (i.e., second parameter). Otherwise, the exception is null
and we can use the value (i.e., first parameter).
Here is a simple example where we use handle
to replace a default value.
1 2 3 |
|
-
Actually, this is a
CompletionStage
which is a supertype ofCompletableFuture
. ↩↩ -
There is repeated computation in primality checks between the two calls to
findIthPrime
here, which one could optimize. We don't do that here to keep the example simple. ↩ -
This is another instance where Java uses
null
to indicates a missing value. ↩