Guide to Java 8 Collectors: groupingByConcurrent()

Guide to Java 8 Collectors: groupingByConcurrent()

Introduction

A stream represents a sequence of elements and supports different kinds of operations that lead to the desired result. The source of a stream is usually a Collection or an Array, from which data is streamed from.

Streams differ from collections in several ways; most notably in that the streams are not a data structure that stores elements. They're functional in nature, and it's worth noting that operations on a stream produce a result and typically return another stream, but do not modify its source.

To "solidify" the changes, you collect the elements of a stream back into a Collection.

In this guide, we'll take a look at how to group Stream data in Java with Collectors.groupingBy()!

Collectors and Parallelism

Collectors represent implementations of the Collector interface, which implements various useful reduction operations, such as accumulating elements into collections, summarizing elements based on a specific parameter, etc.

All predefined implementations can be found within the Collectors class.

You can also very easily implement your own collector and use it instead of the predefined ones, though - you can get pretty far with the built-in collectors, as they cover the vast majority of cases in which you might want to use them.

To be able to use the class in our code we need to import it:

import static java.util.stream.Collectors.*;


Stream.collect() performs a mutable reduction operation on the elements of the stream.

A mutable reduction operation collects input elements into a mutable container, such as a Collection, as it processes the elements of the stream.

Parallel computing (parallelism) refers to the process of dividing a problem into two or more subproblems, solving those problems simultaneously, in parallel, with each subproblem being computed on a separate thread, and then combining all of the solutions to the subproblems in one uniform result.

One of the biggest challenges of implementing parallelism in programs that use collections is that the collections are non thread-safe, which means that multiple threads cannot manipulate a collection without introducing thread interference or memory consistency errors. What we also need to note is that parallelism is not necessarily faster performing than serial execution, although this heavily depends on the amount of data and the number of cores of the CPU.

Tying back into context, streams can be executed in serial or in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams. Operations are executed on independent substreams in parallel and then combined into a final result.

When creating a stream, it's always a serial stream unless stated otherwise, that is specifically parallel. To create a parallel stream we invoke the Collection.parallelStream(), which is a method found within the Collection interface.

Note: While using this method enables you to more easily implement parallelism, it is still your responsibility to determine if your application is suitable for parallelism at all, based on your knowledge of the hardware you're executing your code on.

Collectors.groupingByConcurrent()

Collectors.groupingByConcurrent() uses a multi-core architecture and is very similar to Collectors.groupingBy(), as it also behaves like the "GROUP BY" statement in SQL.

It groups objects by a given specific property and store the end result in a ConcurrentMap.

If you'd like to read more about groupingBy(), read our Guide to Java 8 Collectors: groupingBy()!

Let's define a simple class to use throughout the examples. It'll be a representation of a book, with a few fields:

public class Book {
private String title;
private String author;
private int releaseYear;

// Constructor, getters, setters, toString()
}


With the model in place, let's instantiate a list of a few books that we'll be working with:

List<Book> books = Arrays.asList(
new Book("The Lord of the Rings", "J.R.R. Tolkien", 1954),
new Book("The Hobbit", "J.R.R. Tolkien", 1937),
new Book("Animal Farm", "George Orwell", 1945),
new Book("Nineteen Eighty-Four", "George Orwell", 1949),
new Book("The Road to Wigan Pier", "George Orwell", 1937),
new Book("Lord of the Flies", "William Golding", 1954)
);


The groupingByConcurrent() has three overloads within the Collectors class. We'll be going through each one of them and explain the differences in implementation through examples along the way.

Let's start out with the simplest of them.

Collectors.groupingByConcurrent() with a Classifier Function

The first overload of this method takes only one argument - the classifier function:

public static <T,K> Collector<T,?,ConcurrentMap<K,List<T>>>
groupingByConcurrent(Function<? super T,? extends K> classifier)


This method returns a Collector that groups the input elements of type T according to the classification function. The classification function maps elements to a key of type K. The collector itself produces a ConcurrentMap<K, List<T>> whose keys represent the values we get by applying the classification function on the input, and whose corresponding values are Lists containing the input elements which map to the associated key.

This Collector is both concurrent and unordered. Being unordered, the collection operation doesn't preserve the order of the input by it's encounter. Being concurrent, the result container supports functions being called concurrently with the same result container from multiple threads.

This property isn't unique to this specific overload of the groupingByConcurrent() method, but applies to the other two overloads as well.

Let's go ahead and group the books by author:

ConcurrentMap<String, List<Book>> booksByAuthor = books.parallelStream()
.collect(Collectors.groupingByConcurrent(Book::getAuthor));


The collected elements will be unordered - but grouped. Running the same code will result in different sorts of the elements within the groups - but the sort of the groups themselves will be consistent:

{
J.R.R. Tolkien=[Book{author='J.R.R. Tolkien', title='The Hobbit', releaseYear=1937}, Book{author='J.R.R. Tolkien', title='The Lord of the Rings', releaseYear=1954}],
William Golding=[Book{author='William Golding', title='Lord of the Flies', releaseYear=1954}],
George Orwell=[Book{author='George Orwell', title='Animal Farm', releaseYear=1945}, Book{author='George Orwell', title='The Road to Wigan Pier', releaseYear=1937}, Book{author='George Orwell', title='Nineteen Eighty-Four', releaseYear=1949}]
}


Depending on how the threads in the CPU perform, and which finish their computation first - the Hobbit might appear after the Lord of the Rings, and vice versa.

Benchmarking Sequential and Concurrent Collectors.groupingBy()

While the difference between the regular groupingBy() and groupingByConcurrent() might not be obvious looking from afar - the underlying principle of grouping is significantly different.

When dealing with large amounts of books, with a decent processor, this approach may improve the performance significantly.

Let's generate a bunch of books and try grouping them sequentially and in parallel ...

List<Book> books = new ArrayList<>();
List<String> authorList = Arrays.asList(
"George Orwell",
"Nick Bostrom",
);

for (int i = 0; i < 100000; i++) {
String.valueOf(i),
authorList.get(new Random().ints(1, 1, authorList.size()).findFirst().getAsInt()),
1900));
}

long startTimeSequential = System.currentTimeMillis();
Map<String, List<Book>> booksByAuthorSequential = books.stream()
.collect(Collectors.groupingBy(Book::getAuthor));

long endTimeSequential = System.currentTimeMillis();
System.out.printf("Total time for sequential process: %sms\n",  (endTimeSequential-startTimeSequential));

long startTimeParallel = System.currentTimeMillis();
ConcurrentMap<String, List<Book>> booksByAuthorParallel = books.parallelStream()
.collect(Collectors.groupingByConcurrent(Book::getAuthor));
long endTimeParallel = System.currentTimeMillis();
System.out.printf("Total time for parallel process: %sms\n",  (endTimeParallel-startTimeParallel));


Depending on your system and CPU the sequential process might take longer or shorter than the parallel counterpart. This also heavily depends on the number of groups. If you have a few groups (fewer authors), the process of splitting them up and aggregating the results might offset the parallel approach enough to make it slower than the sequential approach.

Note: The fewer groups you're dealing with, the more likely it is for the sequential approach to outperform the parallel one, but this also highly depends on the CPU of the machine you're running the code on.

With just two authors, running this piece of code results in:

Total time for sequential process: 12ms
Total time for parallel process: 26ms


Free eBook: Git Essentials

Check out our hands-on, practical guide to learning Git, with best-practices, industry-accepted standards, and included cheat sheet. Stop Googling Git commands and actually learn it!

While both processes took a really small amount of time to execute, considering the creation and grouping of 100k objects - the parallel process took significantly longer.

If we were to expand our list with a few more authors:

List <String> authorList = Arrays.asList(
"George Orwell",
"Nick Bostrom",
"Ray Kurzweil",
"J.R.R. Tolkien",
"Eliezer Yudkowsky",
"Stuart Russel",
"Max Tegmark",
"Anil Seth",
"Thomas Metzinger",
"Aurélien Geron",
"Max Lapan",
"Brian Greene",
"Frank Wilczek"
);


The results would be pretty similar:

Total time for sequential process: 13ms
Total time for parallel process: 19ms


However, if we expanded it significantly:

for (int i = 0; i < 10000; i++) {
}


Can you guess what happens now, with 10 thousand authors? Actually - the same thing:

Total time for sequential process: 19ms
Total time for parallel process: 33ms


But, if you run this code on another machine that can utilize threads more efficiently, you'll be greeted with:

Total time for sequential process: 97ms
Total time for parallel process: 52ms


Note: Concurrency isn't a silver bullet that always just works and makes code execute faster.

Collectors.groupingByConcurrent() with Classification Function and Downstream Collector

The second variation of the method takes two arguments - a classification function, and an additional, downstream collector:

public static <T,K,A,D> Collector<T,?,ConcurrentMap<K,D>>
groupingByConcurrent(Function<? super T,? extends K> classifier,
Collector<? super T,A,D> downstream)


This method returns a Collector that groups the input elements of type T according to the classification function, afterwards applying a reduction operation on the values associated with a given key using the specified downstream Collector.

The reduction operation "reduces" the data we've collected by applying an operation that's useful in a specific situation.

If you'd like to read more about reduction in Java in great detail - read our Java 8 Streams: Guide to reduce()!

Let's see an example of this variant of the method. As the downstream here we'll be using mapping(), which takes 2 parameters:

• A mapper - a function to be applied to the input elements and
• A downstream collector – a collector which will accept mapped values

Collectors.mapping() itself does a pretty straightforward job. It adapts a collector accepting elements of one type to accept a different type by applying a mapping function to each input element before accumulation. In our case, we'll map each Student to their name and return those names as a list.

Here we'll once again group our books by the author, but instead of using ConcurrentMap<String, List<Book> we'll use ConcurrentMap<String, List<String> and reduce our books into a simple string:

ConcurrentMap<String, List<String>> booksByAuthor = books.parallelStream()
.collect(Collectors.groupingByConcurrent(Book::getAuthor, Collectors.mapping(Book::getTitle, Collectors.toList())));


These are reductions of books, where we've reduced them to a title, though you could substitute this with any other reduction operation as well:

{
J.R.R. Tolkien=[The Lord of the Rings, The Hobbit],
William Golding=[Lord of the Flies],
George Orwell=[Nineteen Eighty-Four, The Road to Wigan Pier, Animal Farm]
}


Another very useful application of this overload is that our downstream function can be, well, another Collectors.groupingByConcurrent(). You can thus chain any number of groups, creating nested groups.

Let's group the books by their release year, but within those groups, we'll group the books by authors:

ConcurrentMap<Integer, ConcurrentMap<String, List<String>>> booksByAuthor = books.parallelStream()
.collect(Collectors.groupingByConcurrent(Book::getReleaseYear,
Collectors.groupingByConcurrent(Book::getAuthor, Collectors.mapping(Book::getTitle, Collectors.toList()))));


And get the following output:

{
1937={J.R.R. Tolkien=[The Hobbit], George Orwell=[The Road to Wigan Pier]},
1954={J.R.R. Tolkien=[The Lord of the Rings], William Golding=[Lord of the Flies]},
1945={George Orwell=[Animal Farm]},
1949={George Orwell=[Nineteen Eighty-Four]}
}


Collectors.groupingBy() with Classifier Function, Downstream Collector and Supplier

The third and last overload of this method takes three arguments. The first and the third are the same as in the previous overload, but the second argument is a supplier method.

The supplier method provides the specific ConcurrentMap implementation we want to use to contain our end result. We have two known classes that implement this interface - ConcurrentHashMap and ConcurrentSkipListMap:

public static <T,K,A,D,M extends ConcurrentMap<K,D>> Collector<T,?,M>
groupingByConcurrent(Function<? super T,? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T,A,D> downstream)


The return value of this method is the same as the previous overload as well. The only difference with this one is that the ConcurrentMap produced by the collector is created with the supplied factory function.

We'll be doing just one simple example for this overload, since everything is pretty much the same as the previous with the exception of the specified ConcurrentMap implementation:

ConcurrentMap<String, List<String>> booksByAuthor = books.parallelStream()
.collect(Collectors.groupingByConcurrent(Book::getAuthor,
ConcurrentHashMap::new,
Collectors.mapping(Book::getTitle, Collectors.toList())));



Conclusion

The Collectors class is a powerful one and allows us to collect streams into collections in various ways.

You can define your own collectors, but the built-in collectors can get you very far as they're generic and can be generalized to the vast majority of tasks you can think of.

In this guide, we've gone through a few examples of using the Collectors.groupingByConcurrent() method, that groups elements together given specific parameters and returns a ConcurrentMap.

By using this method instead of the non-concurrent Collectors.groupingBy() we can fully utilize the multi-core architecture, if the underlying hardware allows us to. However, while using this method enables you to more easily implement parallelism, it is still your responsibility to determine if your application is suitable for parallelism at all.

You've learned how to use the basic form, as well as forms with downstream collectors and suppliers to simplify code and run powerful yet simple functional operations on streams.

Last Updated: December 1st, 2021