Unit 37: Parallel Streams
Learning Objectives
Students should
- be aware that a program can be broken into subtasks to run parallelly and/or concurrently
- be aware of the issues caused by running subtasks parallelly and concurrently.
- be aware that there exist tradeoffs in the number of subtasks and the processing overhead.
- be familiar with how to process a stream parallelly and correctly.
Parallel and Concurrent Programming
So far, the programs that we have written in CS2030S run sequentially. What this means is that at any one time, there is only one instruction of the program running on a processor.
What is Concurrency?
A single-core processor can only execute one instruction at one time -- this means that only one process (or less precisely speaking, one application) can run at any one time. Yet, when we use the computer, it feels as if we are running multiple processes at the same time. The operating system, behind the scenes, is switching between the different processes, to give the user the illusion that they are running at the same time.
We can write a program so that it runs concurrently -- by dividing the computation into subtasks called threads. Such multi-thread programs are useful in two ways: (i) it allows us, the programmers, to separate unrelated tasks into threads, and write each thread separately; (ii) it improves the utilization of the processor. For instance, if I/O is in one thread, and UI rendering is in another, then when the processor is waiting for I/O to complete, it can switch to the rendering thread to make sure that the slow I/O does not affect the responsiveness of UI.
What is Parallelism?
While concurrency gives the illusion of subtasks running at the same time, parallel computing refers to the scenario where multiple subtasks are truly running at the same time -- either we have a processor that is capable of running multiple instructions at the same time, or we have multiple cores/processors and dispatch the instructions to the cores/processors so that they are executed at the same time.
All parallel programs are concurrent, but not all concurrent programs are parallel.
Modern computers have more than one core/processor1. As such, the line between parallelism and concurrency is blurred.
Parallel Computing
Parallel computing is one of the major topics in computer science. One can teach a whole module (or a focus area) on this topic alone. The goal of this lecture is not to cover it in-depth but is to expose students in CS2030S to the concept of parallel computing in relation to the Stream abstraction in Java.
Parallel Stream
We have seen that the Java Stream
class is a powerful and useful class for processing data in a declarative style. But, we have not fully unleashed the power of Stream
. The neatest thing about Stream
is that it allows parallel operations on the elements of the stream in one single line of code.
Let's consider the following program that prints out all the prime numbers between 2,030,000 and 2,040,000.
1 2 3 |
|
We can parallelize the code by adding the call parallel()
into the stream.
1 2 3 4 |
|
You may observe that the output has been reordered, although the same set of numbers are still being produced. This is because Stream
has broken down the numbers into subsequences, and run filter
and forEach
for each subsequence in parallel. Since there is no coordination among the parallel tasks on the order of the printing, whichever parallel tasks that complete first will output the result to screen first, causing the sequence of numbers to be reordered.
If you want to produce the output in the order of input, use forEachOrdered
instead of forEach
, we will lose some benefits of parallelization because of this.
Suppose now that we want to compute the number of primes between 2,030,000 and 2,040,000. We can run:
1 2 3 4 |
|
The code above produces the same output regardless if it is being parallelized or not.
Note that the task above is stateless and does not produce any side effects. Furthermore, each element is processed individually without depending on other elements. Such computation is sometimes known as embarrassingly parallel. The only communication needed for each of the parallel subtasks is to combine the result of count()
from the subtasks into the final count (which has been implemented in Stream
for us).
How to Parallelize a Stream
You have seen that adding parallel()
to the pipeline of calls in a stream enables parallel processing of the stream. Note that parallel()
is a lazy operation -- it merely marks the stream to be processed in parallel. As such, you can insert the call to parallel()
anywhere in the pipeline after the data source and before the terminal operation.
sequential()
There is a method sequential()
which marks the stream to be process sequentially. If you call both parallel()
and sequential()
in a stream,
the last call "wins". The example below processes the stream
sequentially:
1 |
|
Another way to create a parallel stream is to call the method parallelStream()
instead of stream()
of the Collector
class. Doing so would create a stream that will be processed in parallel from the collection.
What Can be Parallelized?
To ensure that the output of the parallel execution is correct, the stream operations must not interfere with the stream data, and most of the time must be stateless. Side-effects should be kept to a minimum.
Interference
Interference means that one of the stream operations modifies the source of the stream during the execution of the terminal operation. For instance:
1 2 3 4 5 6 7 8 |
|
would cause ConcurrentModificationException
to be thrown. Note that this non-interference rule applies even if we are using stream()
instead of parallelStream()
.
Stateful vs. Stateless
A stateful lambda is one where the result depends on any state that might change during the execution of the stream.
For instance, the generate
and map
operations below are stateful, since they depend on the state of the standard input. Parallelizing this may lead to incorrect output. To ensure that the output is correct, additional work needs to be done to ensure that state updates are visible to all parallel subtasks.
1 2 3 |
|
Side Effects
Side-effects can lead to incorrect results in parallel execution. Consider the following code:
1 2 3 4 5 6 |
|
The forEach
lambda generates a side effect -- it modifies result
. ArrayList
is what we call a non-thread-safe data structure. If two threads manipulate it at the same time, an incorrect result may result.
There are three ways to resolve this. One, we can use the .collect
method.
1 2 3 |
|
Second, we can use a thread-safe data structure. Java provides several in java.util.concurrent
package, including CopyOnWriteArrayList
.
1 2 3 4 |
|
Lastly, in Java 17, there is a .toList
method that simply returns a list in the same order as the stream.
1 2 3 |
|
Associativity
The reduce
operation is inherently parallelizable, as we can easily reduce each sub-stream and then use the combiner
to combine the results. Consider this example:
1 |
|
To allow us to run reduce
in parallel, however, there are several rules that the identity
, the accumulator
, and the combiner
must follow:
combiner.apply(identity, i)
must be equal toi
.- The
combiner
and theaccumulator
must be associative -- the order of applying must not matter2. - The
combiner
and theaccumulator
must be compatible --combiner.apply(u, accumulator.apply(identity, t))
must equal toaccumulator.apply(u, t)
The multiplication example above meetings the three rules:
i * 1
equalsi
(x * y) * z
equalsx * (y * z)
u * (1 * t)
equalsu * t
Performance of Parallel Stream
Let's go back to:
1 2 3 4 |
|
How much time can we save by parallelizing the code above?
Let's use the Instant
and Duration
class from Java to help us:
1 2 3 4 5 6 7 |
|
The code above measures roughly the time it takes to count the number of primes between 2 million and 3 million. On my iMac, it takes 450-550 ms. If I remove parallel()
, it takes slightly more than 1 second. So with parallel()
we gain about 50% performance.
Can we parallelize some more? Remember how we implement isPrime
3
1 2 3 4 |
|
Let's parallelize this to make this even faster!
1 2 3 4 5 |
|
If you run the code above, however, you will find that the code is not as fast as we expect. On my iMac, it takes about 18s, about 18 times slower!
Parallelizing a stream does not always improve the performance. Creating a thread to run a task incurs some overhead, and the overhead of creating too many threads might outweigh the benefits of parallelization.
Ordered vs. Unordered Source
Whether or not the stream elements are ordered or unordered also plays a role in the performance of parallel stream operations. A stream may define an encounter order. Streams created from iterate
, ordered collections (e.g., List
or arrays), from of
, are ordered. Stream created from generate
or unordered collections (e.g., Set
) are unordered.
Some stream operations respect the encounter order. For instance, both distinct
and sorted
preserve the original order of elements (if ordering is preserved, we say that an operation is stable).
The parallel version of findFirst
, limit
, and skip
can be expensive on an ordered stream, since it needs to coordinate between the streams to maintain the order.
If we have an ordered stream and respecting the original order is not important, we can call unordered()
as part of the chain command to make the parallel operations much more efficient.
The following, for example, takes about 700 ms on my iMac:
1 2 3 4 5 |
|
But, with unordered()
inserted, it takes about 350ms, a 2x speedup!
1 2 3 4 5 6 |
|
-
iPhone 12 comes with an A14 Bionic chip with six cores. The fastest supercomputer in the world as of this writing, the Frontier, has 9,472 processors, each has 64 cores (total of 606,208 CPU cores) as well as 37,888 GPUs each has 220 cores (total of 8,335,360 GPU cores). ↩
-
It is actually NOT necessary for
accumulator
to be associative because the parallel reduce will first split the list into blocks. Each block will actually be run in sequential order so theaccumulator
will be invoked in a specific order. What this requirement says is that if theaccumulator
is associative (as well as other conditions), then we can sufficiently say that parallel reduce will produce the same result as sequential reduce. So it is not a necessary but a sufficient condition. ↩ -
This is a more efficient version of the code you have seen since it stops testing after the square root of the \(n\). ↩