Channels ▼
RSS

Parallel

Lambdas and Streams in Java 8 Libraries


Conveniently, when used in a source-lazy-lazy-eager pipeline, the laziness is mostly invisible, as the computation is "sandwiched" with a source at one end (often a collection) and an operation that produces the desired result (or side-effect) at the other end. This turns out to yield good usability and performance in an API with a relatively small surface area.

Methods like anyMatch(Predicate) or findFirst(), while eager, can use short-circuiting to stop processing once they can determine the final result. Given a pipeline like:

Optional<Shape> firstBlue = shapes.stream()
                                  .filter(s -> s.getColor() == BLUE)
                                  .findFirst();

Because the filter step is lazy, the findFirst implementation will only draw from upstream until it gets an element, which means we need only apply the predicate to input elements until we find one for which the predicate is true, rather than all of them. The findFirst() method returns an Optional, since there might not be any elements matching the desired criteria. Optional provides a means to describe a value that might or might not be present.

Note that the user didn't have to ask for laziness, or even think about it very much; the right thing happened, with the library arranging for as little computation as it could.

Parallelism

Stream pipelines can execute either in serial or parallel; this choice is a property of the stream. Unless you explicitly ask for a parallel stream, JDK implementations always return sequential streams (a sequential stream may be converted into a parallel one with the parallel() method.)

While parallelism is always explicit, it need not be intrusive. Our sum-of-weights example can be executed in parallel simply by invoking the parallelStream() method on the source collection instead of stream():

int sum = shapes.parallelStream()
                .filter(s -> s.getColor() == BLUE)
                .mapToInt(s -> s.getWeight())
                .sum();

The result is that the serial and parallel expressions of the same computation look similar, but parallel executions are still clearly identified as parallel (without the parallel machinery overwhelming the code).

Because the stream source might be a mutable collection, there is the possibility for interference if the source is modified while it is being traversed. The stream operations are intended to be used while the underlying source is held constant for the duration of the operation. This condition is generally easy to maintain; if the collection is confined to the current thread, simply ensure that the lambda expressions passed to stream operations do not mutate the stream source. (This condition is not substantially different from the restrictions on iterating Collections today; if a Collection is modified while being iterated, most implementations throw ConcurrentModificationException.) We refer to this requirement as non-interference.

It is best to avoid any side-effects in the lambdas passed to stream methods. While some side-effects, such as debugging statements that print out values are usually safe, accessing mutable state from these lambdas can cause data races or surprising behavior since lambdas may be executed from many threads simultaneously, and may not see elements in their natural encounter order. Non-interference includes not only not interfering with the source, but not interfering with other lambdas; this sort of interference can arise when one lambda modifies mutable state and another lambda reads it.

As long as the non-interference requirement is satisfied, we can execute parallel operations safely and with predictable results even on non-thread-safe sources such as ArrayList.

Examples

Below is an fragment from the JDK class Class (the getEnclosingMethod method), which loops over all declared methods, matching method name, return type, and number and type of parameters. Here is the original code:

 for (Method m : enclosingInfo.getEnclosingClass().getDeclaredMethods()) {
     if (m.getName().equals(enclosingInfo.getName()) ) {
         Class<?>[] candidateParamClasses = m.getParameterTypes();
         if (candidateParamClasses.length == parameterClasses.length) {
             boolean matches = true;
             for(int i = 0; i < candidateParamClasses.length; i++) {
                 if (!candidateParamClasses[i].equals(parameterClasses[i])) {
                     matches = false;
                     break;
                 }
             }

             if (matches) { // finally, check return type
                 if (m.getReturnType().equals(returnType) )
                     return m;
             }
         }
     }
 }

 throw new InternalError("Enclosing method not found");

Using streams, we can eliminate all the temporary variables and move the control logic into the library. We fetch the list of methods via reflection, turn it into a Stream with Arrays.stream, and then use a series of filters to reject the ones that don't match name, parameter types, or return type. The result of findFirst is an Optional<Method>, and we then either fetch and return the resulting method or throw an exception.

return Arrays.stream(enclosingInfo.getEnclosingClass().getDeclaredMethods())
             .filter(m -> Objects.equals(m.getName(), enclosingInfo.getName())
             .filter(m ->  Arrays.equals(m.getParameterTypes(), parameterClasses))
             .filter(m -> Objects.equals(m.getReturnType(), returnType))
             .findFirst()
             .orElseThrow(() -> new InternalError("Enclosing method not found");

This version of the code is more compact, more readable, and less error prone.

Stream operations are very effective for ad hoc queries over collections. Consider a hypothetical "music library" application, where a library has a list of albums, an album has a title and a list of tracks, and a track has a name, artist, and rating.

Consider the query "find the names of albums that have at least one track rated four or higher, sorted by name." To construct this set, we might write:

List<Album> favs = new ArrayList<>();
for (Album a : albums) {
    boolean hasFavorite = false;
    for (Track t : a.tracks) {
        if (t.rating >= 4) {
            hasFavorite = true;
            break;
        }
    }
    if (hasFavorite)
        favs.add(a);
}
Collections.sort(favs, new Comparator<Album>() {
                           public int compare(Album a1, Album a2) {
                               return a1.name.compareTo(a2.name);
                           }});

We can use the stream operations to simplify each of the three major steps — identification of whether any track in an album has a rating of at least for (anyMatch), the sorting, and the collection of albums matching our criteria into a List:

List<Album> sortedFavs =
  albums.stream()
        .filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4)))
        .sorted(Comparator.comparing(a -> a.name))
        .collect(Collectors.toList());

The Comparator.comparing() method takes a function that extracts a Comparable sort key, and returns a Comparator that compares on that key.

Collectors

In the examples so far, we've used the collect() method to gather the elements of a stream into a List or Set. The argument to collect() is a Collector, which embodies a recipe for folding elements into a data structure or summary. The Collectors class contains factories for many common collectors; toList() and toSet() are among the most commonly used, but there are many more that can be used to perform sophisticated transforms on the data.

A Collector is parameterized by its input and output types. The toList() collector has an input type of some T and an output type of List<T>. A slightly more complicated Collector is toMap, of which there are several versions. The simplest version takes a pair of functions, one which maps input elements to map keys, and the other to map values. It takes a T as input and produces a Map<K,V>, where K and V are the result types of the key and value mapping functions. (More complex versions allow you to customize the type of the resulting map, or to resolve duplicates when multiple elements map to the same key.) For example, to create a reverse index on a known unique key such as catalog number:

Map<Integer, Album> albumsByCatalogNumber =
    albums.stream()
          .collect(Collectors.toMap(a -> a.getCatalogNumber(), a -> a));

Related to toMap is groupingBy. Let's say we wanted to tabulate our favorite tracks by artist. We want a Collector that takes as input Track and produces a Map<Artist,List<Track>>. This exactly matches the behavior of the simplest form of the groupingBy collector, which takes a classification function and produces a map keyed by that function, whose corresponding values are a list of input elements who correspond to that key.

Map<Artist, List<Track>> favsByArtist =
    tracks.stream()
          .filter(t -> t.rating >= 4)
          .collect(Collectors.groupingBy(t -> t.artist));

Collectors can be composed and reused to produce more complex collectors. The simple form of the groupingBy collector organized elements into buckets according to the classification function (here, the track's artist), and put all elements that map to the same bucket into a List. There is a more general version that lets you use another collector to organize the elements within a bucket; this version takes a classifying function and a downstream collector as arguments, and all elements mapped into the same bucket by the classifying function are passed to the downstream collector. (The one-argument version of groupingBy implicitly uses toList() as its downstream collector.) For example, if we want to collect the tracks associated with each artist into a Set instead of a List, we could combine this with the toSet() collector:

Map<Artist, Set<Track>> favsByArtist =
    tracks.stream()
          .filter(t -> t.rating >= 4)
          .collect(Collectors.groupingBy(t -> t.artist, 
                                         Collectors.toSet()));

If we wanted to categorize tracks by artist and rating to create a multi-level map, we could do:

Map<Artist, Map<Integer, List<Track>>> byArtistAndRating =
    tracks.stream()
          .collect(groupingBy(t -> t.artist, 
                              groupingBy(t -> t.rating)));

As a final example, let's say we wanted to create a frequency distribution of words that appear in track titles. We first use Stream.flatMap() and Pattern.splitAsStream to take a stream of tracks and explode each track into the words in that track's name, producing a stream of words in all the names of all the tracks. We can then use groupingBy using String.toUpperCase as the classifier function (so all words that are the same word, ignoring case, are considered the same and therefore appear in the same bucket) and use the counting() collector as the downstream collector to count the appearances of each word (without having to create an intermediate collection):

Pattern pattern = Pattern.compile(\\s+");
Map<String, Integer> wordFreq = 
    tracks.stream()
          .flatMap(t -> pattern.splitAsStream(t.name)) // Stream<String>
          .collect(groupingBy(s -> s.toUpperCase(),
                              counting()));

The flatMap method takes as its argument a function that maps an input element (here, a track) to a stream of something (here, words in the track name). It applies this mapping function to every element of the stream, replacing each element with the contents of the resulting stream. (Think of this as two operations, first mapping every element to a stream of zero or more other elements, and then flattening out the contents of the resulting streams into a single stream.) So here, the result of the flatMap operation is a stream containing all the words in all the track names. We then group the words together into buckets containing occurrences of words that are identical modulo case, and use the counting() collector to count the number of words in each bucket.

The Collectors class has lots of methods for constructing collectors that can be used for all sorts of common queries, roll-ups, and tabulations, and you can implement your own Collector as well.


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.
 

Video