stream gatherersstream gatherers
HappyCoders Glasses

Stream Gatherers Write Your Own Stream Operations!

Sven Woltmann
Sven Woltmann
Last update: January 11, 2024

The Java Stream API was released with Java 8 in March 2014 and has given us a fundamentally new tool for processing data streams.

However, the limited set of intermediate operations – filter, map, flatMap, mapMulti, distinct, sorted, peak, limit, skip, takeWhile, and dropWhile – means that more complex data transformations cannot be expressed by the Stream API.

For example, common intermediate operations such as window and fold and many more are missing if you look at the feature requests of the Java community.

Instead of implementing all these operations in the Stream interface, the JDK developers decided to develop an API that can be used in the JDK itself to implement much sought-after intermediate operations and that developers can use to build their own operations.

This new API is called “Stream Gatherers” and will be released as a preview feature by JDK Enhancement Proposal 461 in Java 22 in March 2024, precisely ten years after the introduction of the Stream API.

In this article, you will find out

  • what a gatherer is,
  • how the new Stream Gatherers API works,
  • how to implement any intermediate stream operations with it,
  • which gatherers the JDK team has already implemented and how to create these gatherers.

Let’s start with a summary of how the Stream API works.

Stages of the Stream API

Java streams consist of three stages:

  1. Stream source – the source generates a stream, e.g., via IntStream.of(...) or Collection.stream().
  2. Intermediate operations – these transform the elements contained in the stream, e.g., the Stream methods map(...), filter(...), and limit(...).
  3. Terminal operations – these collect the elements in a list using toList(), for example, or in a map using collect(Collectors.toMap(...)) or count the elements using count().

Here is a simple example – a method that counts how many words of a certain length are contained in a list of words:

public long countLongWords(List<String> words, int minLength) {
  return words.stream()                       // ⟵ Source
      .map(String::length)                    // ⟵ Intermediate operation
      .filter(length -> length >= minLength)  // ⟵ Intermediate operation
      .count();                               // ⟵ Terminal operation
}Code language: Java (java)

Terminal Operation: Stream Collector

And here is an example that converts the words into capital letters and saves them grouped by length in a map:

public Map<Integer, List<String>> groupByLength(List<String> words) {
  return words.stream()                                 // ⟵ Source
      .map(String::toUpperCase)                         // ⟵ Intermediate operation
      .collect(Collectors.groupingBy(String::length));  // ⟵ Terminal operation
}Code language: Java (java)

In this second example, a so-called “collector” is passed to the terminal operation collect(...). A collector is an object of a class that implements the Collector interface and defines what should happen to the stream's elements when the stream terminates. In this case, they should be grouped by length and saved in a map.

Intermediate Operation: Stream Gatherer

Similarly, the Stream Gatherers API defines the Stream.gather(...) method and a Gatherer interface. The following code example uses the intermediate operation “Fixed Window,” which groups the words in lists into three words each:

public List<List<String>> groupsOfThree(List<String> words) {
  return words.stream()                  // ⟵ Source
      .gather(Gatherers.windowFixed(3))  // ⟵ Intermediate operation
      .toList();                         // ⟵ Terminal operation
}Code language: Java (java)

For example, if we call this method as follows:

List<String> words = List.of("the", "be", "two", "of", "and", "a", "in", "that");
List<List<String>> groups = groupsOfThree(words);
System.out.println(groups);Code language: Java (java)

Then the output is:

[[the, be, two], [of, and, a], [in, that]]Code language: plaintext (plaintext)

(Since the stream source has provided a number of elements not divisible by three without remainder, the last group contains only two words).

You can find out exactly how a stream gatherer is structured and how it works in the next chapter.

Structure of a Stream Gatherer

Before we look at the structure of a stream gatherer, it is essential to know two properties of a gatherer:

  • They can have a status so that they can transform elements differently depending on what happened before (I’ll show you why this is relevant with an example in a moment).
  • You can terminate the stream prematurely, as limit(...) and takeWhile(...) do, for example.

Gatherers are made up of up to four components:

  • An optional Initializer” that initializes the status mentioned above.
  • An “integrator” that processes each stream element (taking the current status into account if necessary), updates the status if necessary, forwards elements to the next stage of the stream pipeline, and terminates the stream prematurely if necessary.
  • An optional “finisher” that is called after the last element has been processed in order to emit further elements to the next stage of the stream pipeline based on the status.
  • And an optional combiner,” which is used in the parallel processing of a stream to combine the statuses of transformations executed in parallel.

In the following sections, we look at the components one by one and with many examples.

Integrator

The integrator is the only component that is absolutely necessary. With just one integrator, we can already develop a simple, stateless gatherer.

In the following, I will show you how to implement the Stream.map(...) function with a gatherer.

The interfaces shown have additional static or default methods that are unimportant for a basic understanding. I, therefore, omit them and write three dots in place of the omitted methods.

Integrator is a functional interface with an integrate(...) method:

@FunctionalInterface
public interface Integrator<A, T, R> {
  boolean integrate(A state, T element, Downstream<? super R> downstream);
  . . .
}Code language: Java (java)

Downstream is a functional interface with a push(...) method that forwards an element to the next stage of the stream pipeline:

@FunctionalInterface
public interface Downstream<T> {
  boolean push(T element);
  . . .
}Code language: Java (java)

We can write an integrator that calls a mapping function and emits the result of the mapping function downstream (i.e., forwards it to the next processing stage of the stream) as a lambda function as follows. As we do not need a state here, we use Void as the type for the state variable:

Function<T, R> mapper = . . .

Integrator<Void, T, R> integrator = 
    (state, element, downstream) -> {
      R mappedElement = mapper.apply(element);
      downstream.push(mappedElement);
      return true;
    };Code language: Java (java)

In the first line of the integrator, the mapping function is applied to the incoming element from the upstream. In the second line, the result element of the mapping function is emitted downstream. And in the third line, the return value true indicates that the stream should process further elements.

To turn the integrator into a gatherer, we use the static Gatherer.of(...) method:

Gatherer<T, Void, R> gatherer = Gatherer.of(integrator);Code language: Java (java)

Here is a complete example with a method that creates a gatherer for a specific mapping function and a method that uses such a gatherer to map a list of strings to their lengths:

public <T, R> Gatherer<T, Void, R> mapping(Function<T, R> mapper) {
  return Gatherer.of(
      (state, element, downstream) -> {
        R mappedElement = mapper.apply(element);
        downstream.push(mappedElement);
        return true;
      });
}

public List<Integer> toLengths(List<String> words) {
  return words.stream()
      .gather(mapping(String::length))
      .toList();
}Code language: Java (java)

That explains the basic concept of a Gatherer.

Initializer

The gatherer of the previous section was stateless, i.e., the transformation of an element was independent of everything that happened before.

In this section, I will show you how to implement the Stream.limit(...) function with a gatherer. To do this, the gatherer must count the processed elements and terminate the stream prematurely once the required number of elements has been reached.

The initializer is of the type Supplier and returns the initial status. As we want to count elements, a status of AtomicInteger is suitable:

Supplier<AtomicInteger> initializer = AtomicInteger::new;Code language: Java (java)

We implement the limiting integrator as follows:

int maxSize = . . .

Integrator<AtomicInteger, T, T> integrator = 
    (state, element, downstream) -> {
      if (state.get() < maxSize) {
        downstream.push(element);
        state.incrementAndGet();
        return true;
      } else {
        return false;
      }
    };Code language: Java (java)

As long as our status, the element counter, is less than maxSize, we emit the stream elements downstream, increase the counter by one and return true. As soon as the desired number of elements is reached, we return false to indicate that the stream should be terminated.

Incidentally, we could also write state.getAndIncrement() instead of state.get() in the if statement and omit the state.incrementAndGet(). But that would have been a little more complicated to explain.

To turn the initializer and the integrator into a gatherer, we use the Gatherer.ofSequential(...) method. The name of this method indicates that the returned gatherer cannot work in parallel. That is because it has a status, but no combiner.

Gatherer<T, AtomicInteger, T> gatherer = Gatherer.ofSequential(initializer, integrator);Code language: Java (java)

The following listing shows a method that creates a limiting gatherer from the previously shown building blocks, and a method that uses this gatherer to return the first three words of the word list:

public <T> Gatherer<T, AtomicInteger, T> limiting(int maxSize) {
  return Gatherer.ofSequential(
      // Initializer
      AtomicInteger::new,
 
      // Integrator
      (state, element, downstream) -> {
        if (state.getAndIncrement() < maxSize) {
          downstream.push(element);
          return true;
        } else {
          return false;
        }
      });
}

public List<String> firstThreeWords(List<String> words) {
  return words.stream()
      .gather(limiting(3))
      .toList();
}Code language: Java (java)

Stream Gatherers are so powerful that not only map(...) and limit(...) but every existing intermediate operation of the Stream API can also be implemented as a Gatherer.

Finisher

To explain what we need a finisher for, in this section, I will show you how to implement the “Fixed Window” gatherer demonstrated in the introductory chapter.

As a reminder, this gatherer should turn a stream of elements into a stream of lists, each of which contains a certain number of elements.

Let’s start with the implementation. At some point, we’ll realize that we’re not getting anywhere  and that’s when I’ll introduce the finisher.

First of all, our gatherer needs a status. Since we want to group elements in lists, it makes sense to use such a list as a status. We implement the initializer accordingly:

Supplier<List<T>> initializer = ArrayList::new;Code language: Java (java)

We implement the integrator as follows:

Integrator<List<T>, T, List<T>> integrator =
    (state, element, downstream) -> {
      state.add(element);
      if (state.size() == windowSize) {
        downstream.push(List.copyOf(state));
        state.clear();
      }
      return true;
    };
Code language: Java (java)

We append the incoming element to the status list. As soon as the list has reached the desired size, we send a copy of the list downstream and empty the list.

However, this only works if the number of elements is a multiple of the window size. For example, if we have eight elements and a window size of three, then two lists of three elements each would be sent downstream for the first six elements. The seventh and eighth elements would also be in a list, but as this list has not yet reached the desired size, it has not been emitted downstream by the integrator.

This is where the finisher comes into play. The finisher receives the status after processing all stream elements and the downstream as input and can then emit further elements into the downstream depending on the status.

For the fixed window operation, the finisher would look as follows:

BiConsumer<List<T>, Downstream<List<T>>> finisher =
    (state, downstream) -> {
      if (!state.isEmpty()) {
        downstream.push(List.copyOf(state));
      }
    };Code language: Java (java)

If the list contains elements, it is sent downstream.

We combine the initializer, integrator, and finisher into a gatherer as follows:

Gatherer<T, List<T>, List<T>> gatherer =
    Gatherer.ofSequential(initializer, integrator, finisher);Code language: Java (java)

The following listing shows a method that generates a window gatherer from the components shown above, as well as a method that uses this gatherer:

public <T> Gatherer<T, List<T>, List<T>> windowing(int windowSize) {
  return Gatherer.ofSequential(
      // Initializer
      ArrayList::new,

      // Gatherer
      (state, element, downstream) -> {
        state.add(element);
        if (state.size() == windowSize) {
          downstream.push(List.copyOf(state));
          state.clear();
        }
        return true;
      },

      // Finisher
      (state, downstream) -> {
        if (!state.isEmpty()) {
          downstream.push(List.copyOf(state));
        }
      });
}}

public List<String> groupWords(List<String> words, int groupSize) {
  return words.stream()
      .gather(windowing(groupSize))
      .toList();
}Code language: Java (java)

In the next section, we come to the last stream gatherer component, the combiner.

Combiner

Executing a stateful gatherer in parallel requires a combiner function. A combiner combines two statuses into one in the join phase of parallel stream processing.

In this section, we want to implement a gatherer that emits the largest of all incoming elements, according to a given comparator, downstream.

As status, we use an AtomicReference, which either contains no element or the largest element currently found:

Supplier<AtomicReference<T>> initializer = AtomicReference::new;Code language: Java (java)

The integrator saves the incoming element in the status if the status is empty or if the incoming element is larger than the element saved in the status:

Integrator<AtomicReference<T>, T, T> integrator =
    (state, element, downstream) -> {
      T bestElement = state.get();
      if (bestElement == null || comparator.compare(element, bestElement) > 0) {
        state.set(element);
      }
      return true;
    };Code language: Java (java)

The finisher sends the element, if the status contains one, to the downstream:

BiConsumer<AtomicReference<T>, Downstream<T>> finisher =
    (state, downstream) -> {
      T bestElement = state.get();
      if (bestElement != null) {
        downstream.push(bestElement);
      }
    };
Code language: Java (java)

And the combiner combines two statuses into one:

  • If a status is empty, it returns the other status.
  • If both statuses contain an element, it returns the status with the larger element.

If the input stream is empty, the combiner is never called, i.e., the case in which both statuses are empty cannot occur.

BinaryOperator<AtomicReference<T>> combiner =
    (state1, state2) -> {
      T bestElement1 = state1.get();
      T bestElement2 = state2.get();

      if (bestElement1 == null) {
        return state2;
      } else if (bestElement2 == null) {
        return state1;
      } else if (comparator.compare(bestElement1, bestElement2) > 0) {
        return state1;
      } else {
        return state2;
      }
    };Code language: Java (java)

We combine the initializer, integrator, combiner, and finisher with Gatherer.of(...) to form a Gatherer:

Gatherer<T, AtomicReference<T>, T> gatherer =
    Gatherer.of(initializer, integrator, combiner, finisher);Code language: Java (java)

The following listing shows a method that generates a maximum gatherer from the building blocks shown above, as well as a method that uses this gatherer in a parallel stream to find the longest word in a list:

public <T> Gatherer<T, AtomicReference<T>, T> maximumBy(Comparator<T> comparator) {
  return Gatherer.of(
      // Initializer
      AtomicReference::new,

      // Gatherer
      (state, element, downstream) -> {
        T bestElement = state.get();
        if (bestElement == null || comparator.compare(element, bestElement) > 0) {
          state.set(element);
        }
        return true;
      },

      // Combiner
      (state1, state2) -> {
        T bestElement1 = state1.get();
        T bestElement2 = state2.get();

        if (bestElement1 == null) {
          return state2;
        } else if (bestElement2 == null) {
          return state1;
        } else if (comparator.compare(bestElement1, bestElement2) > 0) {
          return state1;
        } else {
          return state2;
        }
      },

      // Finisher
      (state, downstream) -> {
        T bestElement = state.get();
        if (bestElement != null) {
          downstream.push(bestElement);
        }
      });
}

public Optional<String> getLongest(List<String> words) {
  return words.parallelStream()
      .gather(maximumBy(Comparator.comparing(String::length)))
      .findFirst();
}Code language: Java (java)

We now have all the components of a stream gatherer together. However, we do not need to implement a custom gatherer for every purpose. The JDK developers have already done this for us for frequently requested intermediate transformations.

You can find out which gatherers are already available in the next chapter.

Stream Gatherers Available in the JDK

You can create the predefined gatherers in the JDK using the corresponding factory methods of the Gatherers class. You already got to know one gatherer in the introductory chapter: the “Fixed Window” gatherer.

Below, you will find an overview of the most important predefined gatherers:

  • Gatherers.fold(Supplier initial, BiFunction folder)
    combines all stream elements into a single element, similar to a collector. This is useful if a terminal operation is to be called on an element combined from the stream elements.
  • Gatherers.mapConcurrent(int maxConcurrency, Function mapper)
    executes the specified mapping function in the specified number of virtual threads simultaneously.
  • Gatherers.peek(Consumer effect)
    Gatherers.peekOrdered(Consumer effect)

    send each stream element to the Consumer before it is forwarded to the next stage of the stream pipeline. peekOrdered(...) ensures that a parallel stream is processed in the correct order.
  • Gatherers.scan(Supplier initial, BiFunction scanner)
    performs a so-called prefix scan.
  • Gatherers.windowFixed(int windowSize)
    Gatherers.windowSliding(int windowSize)

    group the stream elements into lists of the specified size. With the sliding variant, the lists overlap and are each shifted by one element, e.g., Stream.of(1, 2, 3, 4, 5).gather(Gatherers.windowSliding(3)).toList() generates the following list of lists: [[1, 2, 3], [2, 3, 4], [3, 4, 5]].

Combine Gatherers

Just as you can link several filter(...) and map(...) operations in succession, for example, you can also call up several gatherers in succession, e.g., like this:

var result = source
    .gather(a)
    .gather(b)
    .gather(c)
    .collect(...);Code language: Java (java)

If you regularly need a particular sequence of gatherers, you can also combine them into a single gatherer – in the spirit of DRY (don't repeat yourself) – like this, for example:

Gatherer abc = a.andThen(b).andThen(c);

var result = source
    .gather(abc)
    .collect(...);Code language: Java (java)

That allows you to apply a transformation sequence of any length to different streams without redundancy.

Conclusion

With stream gatherers, we can implement any intermediate stream operations, just as we have always been able to write any terminal operations with collectors. That allows us to write much more meaningful stream pipelines than before.

Using the Gatherers class, we can retrieve ready-made gatherers various intermediate operations.

Which is the first of the prefabricated gatherers that you will use? What functionality are you planning to implement yourself as a Gatherer? Write me a comment!

Do you want to be up to date on all new Java features? Then click here to sign up for the HappyCoders newsletter.