Java 8 Streams: Definitive Guide to Parallel Streaming with parallel()

Java 8 Streams: Definitive Guide to Parallel Streaming with parallel()

Parallel Streams in Java 8

Today, the Java Streams API is in extensive use, making Java more functional than ever. Thus, models like MapReduce have emerged for easier stream handling.

Although these models made using streams effortless, they've also introduced efficiency concerns. The built-in parallel() operation is simple enough to deploy, and allows you to leverage parallelism.

For example, say you need to sum up all the numbers between 1 and 1,000,000. The code snippet below calculates the result by processing every number in the range in a serial order:

int sum = Stream.iterate(1, a -> a +1)
    .limit(1_000_000)
    .reduce(0, (a, b) -> a + b );

We could reduce its runtime with a simple addition of parallel(). The calculation now takes place over several, parallel threads:

int sum = Stream.iterate(1, a -> a +1)
    .limit(1_000_000)
    .parallel()
    .reduce(0, (a, b) -> a + b );

That's it in terms of the usage of the method - you just make a stream parallel() before other operations. Though, looks can be deceiving. For one, poor code design choices cause parallelism to work slower than sequentially.

In this example, calling Stream.iterate() is expensive. A lot of boxing and un-boxing occurs because of it. Hence, a stream with very many elements will take a performance hit because of this.

Also, lambdas which produce side effects makes parallel-running streams hazardous to thread safety. We have mused over these considerations at length in other guides dedicated to the Functional API in Java, based on the operations and their specific implementations.

  • In Guide to reduce() - We found that a simple parallel() call could reduce the runtime of calculations. That is because reduce() applies the divide-and-conquer pattern. And, parallelism is an excellent aid to it.

  • In Guide to findFirst() and findAny() - Short-circuiting a stream using findAny() proved efficient when it was running in parallel.

Still, all these explorations did not explore what calling parallel() actually entailed. For instance, on using parallel() we never asked ourselves:

  • With parallelism, did we also create concurrency?
  • Does the multi-threading, which parallel() creates, work for blocking routines? Could these calls make IO operations faster, for example?

This guide aims to answer all these questions.

Parallelization in Legacy Java

Say you want to find the sum of all the numbers between 1 and n, where n = 1000. Using the classic for loop, you would do something like this:

private long result = 0;

public long sumUsingClassicForLoop(long n) {
    for(long i = 1L; i <= n; i++) {
        result += i;
    }
    return result;
}

And on running this code snippet, you would get the output:

500500

Granted, the result is as expected. Yet, what if your n value is much bigger? Say, n = 1000000. Would you not want your code to run efficiently and take full advantage of your computer’s processor’s many cores?

A natural approach would be to use many threads to calculate the result. Yet before Java 8,the only option was to manually create threads or use the Executor Framework with a thread pool:

private long result = 0;

public long sumUsingThreading(long n) {
    Runtime r = Runtime.getRuntime();
    int procs = r.getAvailableProcessors();
    ExecutorService es = newFixedThreadPool(procs);
    
    try {
        for(long i = 1L; i <= n; i++) {
            // Notice how we do not use variable i in the lambda directly
            // That is because i would need to be effectively final to be used
            // inside a lambda
            long toAdd = i;
            es.execute(() -> result += toAdd);
        }
    } catch (Exception e) {
        System.out.println("An error occured");
    } finally {
        es.shutdown();
    }    
    return result;
}

Note: By using ExecutorService, we have actually simplified the use of threads. At least, the client code does not declare its own thread variables. Otherwise, there is thread management involved, which could need thread forks and joins - in short, thread pooling. If we had chosen to go that route, the method sumUsingThreading() would have become complex. It would have included thread locking mechanisms, for example.

With this, you would get a result like the one in the for loop approach. But, for such a simple calculation, the lines of code are too many - hence, hurting readability. There was an obvious need to make this process simpler and less verbose.

On the contrary, by using parallel(), adding any form of threading in the client code is not necessary. The Streams API does all that for you in the background. See, you would get the same results as the previous example by writing this:

public long sumUsingParallel(long n) {
    return Stream.iterate(1L, a -> a + 1)
        .limit(n)
        .parallel()
        .reduce(0L, Long::sum);        
}

Notice how with the parallel() approach we have reduced the lines of code from the previous example’s 23 to 6! Also, the improvement in the code's readability is drastic.

Definitions

Method signature:

S parallel()

This is the method which you are most likely to use when you have created a stream yourself. And by “yourself”, it means you have initiated a stream - such as:

Stream<Integer> myStream = Stream.of(1, 2, 3);
myStream.parallel().close();

In other use cases, you could have a sequential collection which you want to parallelize. And for that, the Collections API provides a way for creating parallel streams too.

It offers the parallelStream() method:

Stream<E> parallelStream()

Which you would use as:

Collection<Integer> numbers = Arrays.asList(1, 2, 3);
numbers.parallelStream().close();

Though, keep in mind that parallelStream() is just a shortcut for:

numbers.stream().parallel().close();

The BaseStream interface defines a parallel() method as one which:

"Returns an equivalent stream that is parallel. May return itself, either because the stream was already parallel, or because the underlying stream state was modified to be parallel."

Official BaseStream documentation

Also, the Streams API provides a way of interrogating whether a stream is running in parallel. The isParallel() method returns a boolean value, which tells you whether a stream would execute in parallel if a terminal operation is executed.

Putting isParallel() to Action - Check if Stream is Parallelized

Some terminal operations, such as forEach(), do not care about the encounter order of a stream. Moreover, forEach() is explicitly nondeterministic.

Being explicitly nondeterministic, forEach() does not promise to respect the encounter order of a stream. Because by doing so, it would not take advantage of parallelism.

But, what if you have a use case where a serial encounter order is crucial? Where parallel operations would mess the desired output, for example?

Say you want to print a stream of logs to console, for instance. And your code features the method printToConsole():

public void printToConsole(Stream<String> logs) {
    logs.forEach(System.out::println);
}

Here, the use of forEach() may make your code to print logs in the wrong order, as they wouldn't be consistent. And, since you may not be the author of the logs stream, you may not know whether it is parallel or not.

The isParallel() operation is very useful in this scenario. It will inform you about the encounter order of the stream. If it returns true, it would mean you are dealing with a parallelized stream. And, false if it is sequential.

These reports will then help you to tweak your code. Hence, as per our case, we would ensure that we print the logs in the correct order by changing printToConsole() to:

public void printToConsole(Stream<String> logs) {
    if(logs.isParallel()) {
        logs.forEachOrdered(System.out::println);
    } else {
        logs.forEach(System.out::println);
    }    
}

When you have a terminal operation that requires a serial order, use isParallel(). It will help you to determine which variant of forEach() to use.

Free eBook: Git Essentials

Check out our hands-on, practical guide to learning Git, with best-practices, industry-accepted standards, and included cheat sheet. Stop Googling Git commands and actually learn it!

In short, choose:

  • forEachOrdered(): to encounter elements from any stream in a serial order.
  • forEach(): to encounter elements from:
    • A serial stream when you care about the order
    • A parallel stream when you are not concerned about the order

How do Parallel Streams Work - Deeper Dive

"Parallelism is about doing lots of things at once"

-- Rob Pike

In Java, parallelism consists of several phases:

  • A given routine breaks down a task into its constituent tasks
  • Each task attaches itself to a distinct thread
  • Another routine computes the results of each sub task
  • Then, another routine collects the results from each task into one, aggregated result

Yet, it was possible to execute these sequence of activities even in legacy Java versions.

Starting from Java 5, for example, new implementations of ExecutorService simplified parallelism.

Then in Java 7, the introduction of ForkJoinPool made parallelism even simpler. The class is a concrete implementation of ExecutorService. And, it extended the interface by adding the aspect of work stealing, thus, setting up parallelism for increased efficiency. With ForkJoinPool, idle tasks aim to relieve busy tasks of some of their load.

Starting with Java 8, the aspect of streams has made parallelism idiomatic also.

Streams' parallel() calls the ForkJoinPool. And, they do it in a functional manner too. With functional Java, its internals execute the how of parallelism. While they leave client code to declare what it wishes to parallelize.

As an illustration, let us compare how two code snippets work.

The first one uses the sumUsingSequential() method . This one calculates the sum of all numbers between 1 and 10. It does it in a serial order. One number plus the next one. Then the result plus the next number—the classic reduce() pattern of folding.

If you'd like to read more about reducing in Java - read our Java 8 Streams: Guide to reduce()!

We have kept the range small so that we can examine every step of that method's routine.

Then the second method sumUsingParallel() calculates the sum of those numbers also. But, it does so using parallel-running threads:

public long sumUsingSequential() {
    return LongStream.rangeClosed(1L, 10L)
        .peek(this::printThreadName)
        .reduce(0L, this::printSum);
}

public void printThreadName(long l) {
    String tName = currentThread().getName();
    System.out.println(tName + " offers:" + l);
}

public long printSum(long i, long j) {
    long sum = i + j;
    String tName = currentThread().getName();
    System.out.printf(
        "%s has: %d; plus: %d; result: %d\n", 
        tName, i, j, sum
    );
    
    return sum;
}

These two methods both call the methods printThreadName() and printSum(). In sumUsingSequential() we can thus represent the steps using this activity diagram:

Note how the control flow uses only one thread. The main thread. And when you run the snippet you get these results:

main offers:1
main has: 0; plus: 1; result: 1
main offers:2
main has: 1; plus: 2; result: 3
main offers:3
main has: 3; plus: 3; result: 6
main offers:4
main has: 6; plus: 4; result: 10
main offers:5
main has: 10; plus: 5; result: 15
main offers:6
main has: 15; plus: 6; result: 21
main offers:7
main has: 21; plus: 7; result: 28
main offers:8
main has: 28; plus: 8; result: 36
main offers:9
main has: 36; plus: 9; result: 45
main offers:10
main has: 45; plus: 10; result: 55
Sum parallel: 55

The calculation flow is as any as you would expect from an imperative pattern. For example, a for loop implementation. Yet, it gets more interesting when we execute sumUsingParallel():

public long sumUsingParallel() {
    return LongStream.rangeClosed(1L, 10L)
        .parallel()
        .peek(this::printThreadName)
        .reduce(0L, this::printSum);
}

The simple inclusion of parallel() has caused the streaming to use all the available CPU cores. And in this case, we ran the code on four-core CPU computer. And as you can tell from this activity diagram, the fork-join approach is in use:

The parallel() call triggers the fork-join mechanism on the stream of numbers. It splits the stream to run in four threads. Once each thread has a stream, the mechanism calls reduce() on each to run in concurrence.

As:

stream –> (stream1, stream2)

Where:

stream1 –> (stream1.1, stream1.2)
stream2 –> (stream2.1, stream2.2)

Then, the results from every reduce() aggregates into intermediate results: r5 and r6:

r5 = r1 + r2
r6 = r3 + r4

Where r1, r2, r3, and r4 are the results from each serial reduce() operation. The final result, r7, is a sum of the intermediate results; r5 and r6. This summing up of intermediate results occurs in the join phase of the fork-join.

These operations are also evident on the method's console output:

worker-1 offers:3
main offers:7
worker-1 has: 0; plus: 3; result: 3
worker-2 offers:9
worker-1 offers:5
worker-1 has: 0; plus: 5; result: 5
worker-3 offers:2
worker-1 offers:4
worker-2 has: 0; plus: 9; result: 9
worker-2 offers:10
worker-2 has: 0; plus: 10; result: 10
main has: 0; plus: 7; result: 7
worker-2 has: 9; plus: 10; result: 19
worker-1 has: 0; plus: 4; result: 4
worker-3 has: 0; plus: 2; result: 2
worker-1 has: 4; plus: 5; result: 9
worker-2 offers:8
worker-2 has: 0; plus: 8; result: 8
main offers:6
worker-2 has: 8; plus: 19; result: 27
worker-1 has: 3; plus: 9; result: 12
worker-3 offers:1
worker-3 has: 0; plus: 1; result: 1
main has: 0; plus: 6; result: 6
main has: 6; plus: 7; result: 13
main has: 13; plus: 27; result: 40
worker-3 has: 1; plus: 2; result: 3
worker-3 has: 3; plus: 12; result: 15
worker-3 has: 15; plus: 40; result: 55
Sum parallel: 55

It is vital to note that the threads did their calculations in no discernible order. And, as we will see later on, this feature is a point of concern where reduce() results have no associativity.

Are Parallel Streams Concurrent Too?

"Concurrency is about dealing with a lot of things at once"

--Rob Pike

In short, yes. Parallel-running threads execute in a concurrent order. And, that is why we stated earlier that:

Once each thread has a stream, the mechanism calls reduce() on each to run in concurrence.

But the important distinction is - it is not a must for concurrent-running threads to execute in parallel. As an illustration, we have the startThreads() method:

public void startThreads() {
    StringBuffer sb = new StringBuffer("world");

    Thread t1 = new Thread(() -> {
        String tName = currentThread().getName();
        System.out.printf(
            "before running %s: %s\n", 
            tName, sb
        );

        if (sb.length() > 0) {
            int idx = sb.length() - 1;
            char c = sb.charAt(idx);
            sb.deleteCharAt(idx);

            System.out.printf(
                "on running: %s; remove %s\n", 
                tName, c
            );
        }
        System.out.printf(
            "after running %s: %s\n", 
            tName, sb
        );
    }, "thread-1");
    
    Thread t2 = new Thread(() -> {
        String tName = currentThread().getName();
        System.out.printf(
            "before running %s: %s\n", 
            tName, sb
        );

        if (sb.length() > 0) {
            int idx = sb.length() - 1;
            char c = sb.charAt(idx);
            sb.deleteCharAt(idx);

            System.out.printf(
                "on running: %s; remove %s\n", 
                tName, c
            );
        }
        System.out.printf(
            "after running %s: %s\n", 
            tName, sb
        );
    }, "thread-2");

    t1.start();
    t2.start();

    try {
        Thread.sleep(10000);
    } catch (InterruptedException ex) {
        // Handle exception
    }

    System.out.printf("after all runs: %s\n", sb);
}

The method creates two threads: t1 and t2. Both attempt to remove the characters at the end of a StringBuffer - sb. Then, the method starts the two.

On executing the code you get the output:

before running thread-1: hello world
before running thread-2: hello worl
on running: thread-2; remove l
after running thread-2: hello wor
on running: thread-1; remove d
after running thread-1: hello wor
after all runs: hello wor

Yet, when you execute it a second time, you may get:

before running thread-1: hello world
on running: thread-1; remove d
after running thread-1: hello worl
before running thread-2: hello worl
on running: thread-2; remove l
after running thread-2: hello wor
after all runs: hello wor

These results show that the threads are changing the contents of sb in a synchronized way. Though, you cannot predict their simultaneous executions. This is up to the SDK scheduler's allocation.

Best Practices with Parallel Streams

That being said - let's summarize the best practices:

  • Associativity: expect results to come without following any order
  • Lambda expressions should be stateless
  • Avoid the modification of the streams' elements
  • Lambda expressions should not emit side effects
  • Only use parallelism when the number of elements is very huge. For example, with a stream of int elements which are less than 10,000, prefer serial to parallel execution.

Conclusion

The Streams API's parallel() feature has simplified how we can make code handle many tasks at the same time. By splitting tasks into sub-tasks, it helps us to run executions faster than before.

Yet, the parallel() operation requires a careful look at code design first. In most use cases, streams do not contain as many elements to warrant parallelism. Even when they do, the final aggregating operations should respect associativity.

The order in which the executions occur should not have an effect on the eventual result. If it does, then parallelism would have been the wrong design choice for your code.

Then again, with careful design, parallel() will improve code performance. And it will do so without sacrificing your code's readability.

You can find the code in its entirety that this guide used in this GitHub repository.

Feel free to clone it and change it so that you get a deeper look into how parallel() works.

Last Updated: November 28th, 2021
Was this article helpful?

Improve your dev skills!

Get tutorials, guides, and dev jobs in your inbox.

No spam ever. Unsubscribe at any time. Read our Privacy Policy.

Hiram KamauAuthor

In addition to catching code errors and going through debugging hell, I also obsess over whether writing in an active voice is truly better than doing it in passive.

Want a remote job?

    Prepping for an interview?

    • Improve your skills by solving one coding problem every day
    • Get the solutions the next morning via email
    • Practice on actual problems asked by top companies, like:
     
     
     

    Make Clarity from Data - Quickly Learn Data Visualization with Python

    Learn the landscape of Data Visualization tools in Python - work with Seaborn, Plotly, and Bokeh, and excel in Matplotlib!

    From simple plot types to ridge plots, surface plots and spectrograms - understand your data and learn to draw conclusions from it.

    © 2013-2021 Stack Abuse. All rights reserved.