The Java Stream API was released with Java 8 in March 2014 and has given us a fundamentally new tool for processing data streams.
However, the limited set of intermediate operations – filter, map, flatMap, mapMulti, distinct, sorted, peak, limit, skip, takeWhile, and dropWhile – means that more complex data transformations cannot be expressed by the Stream API.
For example, common intermediate operations such as window and fold and many more are missing if you look at the feature requests of the Java community.
Instead of implementing all these operations in the Stream
interface, the JDK developers decided to develop an API that can be used in the JDK itself to implement much sought-after intermediate operations and that developers can use to build their own operations.
This new API is called “Stream Gatherers” and was released as a preview feature via JDK Enhancement Proposal 461 in Java 22 in March 2024, precisely ten years after the introduction of the Stream API, and will be finalized by JEP 485 in Java 24.
In this article, you will find out
- what a gatherer is,
- how the new Stream Gatherers API works,
- how to implement any intermediate stream operations with it,
- which gatherers the JDK team has already implemented and how to create these gatherers.
Let’s start with a summary of how the Stream API works.
Stages of the Stream API
Java streams consist of three stages:
- Stream source – the source generates a stream, e.g., via
IntStream.of(...)
orCollection.stream()
. - Intermediate operations – these transform the elements contained in the stream, e.g., the
Stream
methodsmap(...)
,filter(...)
, andlimit(...)
. - Terminal operations – these collect the elements in a list using
toList()
, for example, or in a map usingcollect(Collectors.toMap(...))
or count the elements usingcount()
.
Here is a simple example – a method that counts how many words of a certain length are contained in a list of words:
public long countLongWords(List<String> words, int minLength) {
return words.stream() // ⟵ Source
.map(String::length) // ⟵ Intermediate operation
.filter(length -> length >= minLength) // ⟵ Intermediate operation
.count(); // ⟵ Terminal operation
}
Code language: Java (java)
Terminal Operation: Stream Collector
And here is an example that converts the words into capital letters and saves them grouped by length in a map:
public Map<Integer, List<String>> groupByLength(List<String> words) {
return words.stream() // ⟵ Source
.map(String::toUpperCase) // ⟵ Intermediate operation
.collect(Collectors.groupingBy(String::length)); // ⟵ Terminal operation
}
Code language: Java (java)
In this second example, a so-called “collector” is passed to the terminal operation collect(...)
. A collector is an object of a class that implements the Collector
interface and defines what should happen to the stream's elements when the stream terminates. In this case, they should be grouped by length and saved in a map.
Intermediate Operation: Stream Gatherer
Similarly, the Stream Gatherers API defines the Stream.gather(...)
method and a Gatherer
interface. The following code example uses the intermediate operation “Fixed Window,” which groups the words in lists into three words each:
public List<List<String>> groupsOfThree(List<String> words) {
return words.stream() // ⟵ Source
.gather(Gatherers.windowFixed(3)) // ⟵ Intermediate operation
.toList(); // ⟵ Terminal operation
}
Code language: Java (java)
For example, if we call this method as follows:
List<String> words = List.of("the", "be", "two", "of", "and", "a", "in", "that");
List<List<String>> groups = groupsOfThree(words);
System.out.println(groups);
Code language: Java (java)
Then the output is:
[[the, be, two], [of, and, a], [in, that]]
Code language: plaintext (plaintext)
(Since the stream source has provided a number of elements not divisible by three without remainder, the last group contains only two words).
You can find out exactly how a stream gatherer is structured and how it works in the next chapter.
Structure of a Stream Gatherer
Before we look at the structure of a stream gatherer, it is essential to know two properties of a gatherer:
- They can have a status so that they can transform elements differently depending on what happened before (I’ll show you why this is relevant with an example in a moment).
- You can terminate the stream prematurely, as
limit(...)
andtakeWhile(...)
do, for example.
Gatherers are made up of up to four components:
- An optional “Initializer” that initializes the status mentioned above.
- An “integrator” that processes each stream element (taking the current status into account if necessary), updates the status if necessary, forwards elements to the next stage of the stream pipeline, and terminates the stream prematurely if necessary.
- An optional “finisher” that is called after the last element has been processed in order to emit further elements to the next stage of the stream pipeline based on the status.
- And an optional “combiner,” which is used in the parallel processing of a stream to combine the statuses of transformations executed in parallel.
In the following sections, we look at the components one by one and with many examples.
Integrator
The integrator is the only component that is absolutely necessary. With just one integrator, we can already develop a simple, stateless gatherer.
In the following, I will show you how to implement the Stream.map(...)
function with a gatherer.
The interfaces shown have additional static or default methods that are unimportant for a basic understanding. I, therefore, omit them and write three dots in place of the omitted methods.
Integrator
is a functional interface with an integrate(...)
method:
@FunctionalInterface
public interface Integrator<A, T, R> {
boolean integrate(A state, T element, Downstream<? super R> downstream);
. . .
}
Code language: Java (java)
Downstream
is a functional interface with a push(...)
method that forwards an element to the next stage of the stream pipeline:
@FunctionalInterface
public interface Downstream<T> {
boolean push(T element);
. . .
}
Code language: Java (java)
We can write an integrator that calls a mapping function and emits the result of the mapping function downstream (i.e., forwards it to the next processing stage of the stream) as a lambda function as follows. As we do not need a state here, we use Void
as the type for the state
variable:
Function<T, R> mapper = . . .
Integrator<Void, T, R> integrator =
(state, element, downstream) -> {
R mappedElement = mapper.apply(element);
return downstream.push(mappedElement);
};
Code language: Java (java)
In the first line of the integrator, the mapping function is applied to the incoming element from the upstream. In the second line, the result element of the mapping function is emitted downstream and the response from the downstream is returned. This response is a boolean
indicating whether the downstream accepts more elements. For instance, the response would be false
if there were a limit()
in the pipeline whose maximum is reached.
Since the integrator shown above never returns false
on its own, but only when this comes from the downstream, the integrator is referred to as “greedy.” To indicate this to the stream pipeline and thus enable optimizations, we should therefore wrap the integrator with Integrator.ofGreedy(...)
.
Integrator<Void, T, R> integrator =
Integrator.ofGreedy(
(state, element, downstream) -> {
R mappedElement = mapper.apply(element);
return downstream.push(mappedElement);
});
Code language: Java (java)
To ultimately turn the integrator into a gatherer, we use the static Gatherer.of(...)
method:
Gatherer<T, Void, R> gatherer = Gatherer.of(integrator);
Code language: Java (java)
Here is a complete example with a method that creates a gatherer for a specific mapping function and a method that uses such a gatherer to map a list of strings to their lengths:
public <T, R> Gatherer<T, Void, R> mapping(Function<T, R> mapper) {
return Gatherer.of(
Integrator.ofGreedy(
(state, element, downstream) -> {
R mappedElement = mapper.apply(element);
return downstream.push(mappedElement);
}));
}
public List<Integer> toLengths(List<String> words) {
return words.stream()
.gather(mapping(String::length))
.toList();
}
Code language: Java (java)
That explains the basic concept of a Gatherer.
Initializer
The gatherer of the previous section was stateless, i.e., the transformation of an element was independent of everything that happened before.
In this section, I will show you how to implement the Stream.limit(...)
function with a gatherer. To do this, the gatherer must count the processed elements and terminate the stream prematurely once the required number of elements has been reached.
The initializer is of the type Supplier
and returns the initial status. As we want to count elements, a status of AtomicInteger
is suitable:
Supplier<AtomicInteger> initializer = AtomicInteger::new;
Code language: Java (java)
We implement the limiting integrator as follows:
int maxSize = . . .
Integrator<AtomicInteger, T, T> integrator =
(state, element, downstream) -> {
if (state.get() < maxSize) {
boolean result = downstream.push(element);
state.incrementAndGet();
return result;
} else {
return false;
}
};
Code language: Java (java)
As long as our status, the element counter, is less than maxSize
, we emit the stream elements downstream, increase the counter by one and return the downstream's response boolean. As soon as the desired number of elements is reached, we return false
to indicate that the stream should be terminated.
Incidentally, we could also write state.getAndIncrement()
instead of state.get()
in the if statement and omit the state.incrementAndGet()
. But that would have been a little more complicated to explain.
Note that we do not wrap this integrator with Integrator.ofGreedy(...)
, as this integrator can also return false
on its own (i.e., not only when this value comes from the downstream).
To turn the initializer and the integrator into a gatherer, we use the Gatherer.ofSequential(...)
method. The name of this method indicates that the returned gatherer cannot work in parallel. That is because it has a status, but no combiner (more on that later).
Gatherer<T, AtomicInteger, T> gatherer = Gatherer.ofSequential(initializer, integrator);
Code language: Java (java)
The following listing shows a method that creates a limiting gatherer from the previously shown building blocks – though this time using state.getAndIncrement()
, and a method that uses this gatherer to return the first three words of the word list:
public <T> Gatherer<T, AtomicInteger, T> limiting(int maxSize) {
return Gatherer.ofSequential(
// Initializer
AtomicInteger::new,
// Integrator
(state, element, downstream) -> {
if (state.getAndIncrement() < maxSize) {
return downstream.push(element);
} else {
return false;
}
});
}
public List<String> firstThreeWords(List<String> words) {
return words.stream()
.gather(limiting(3))
.toList();
}
Code language: Java (java)
Stream Gatherers are so powerful that not only map(...)
and limit(...)
but every existing intermediate operation of the Stream API can also be implemented as a Gatherer.
Finisher
To explain what we need a finisher for, in this section, I will show you how to implement the “Fixed Window” gatherer demonstrated in the introductory chapter.
As a reminder, this gatherer should turn a stream of elements into a stream of lists, each of which contains a certain number of elements.
Let’s start with the implementation. At some point, we’ll realize that we’re not getting anywhere – and that’s when I’ll introduce the finisher.
First of all, our gatherer needs a status. Since we want to group elements in lists, it makes sense to use such a list as a status. We implement the initializer accordingly:
Supplier<List<T>> initializer = ArrayList::new;
Code language: Java (java)
We implement the integrator as follows:
Integrator<List<T>, T, List<T>> integrator =
(state, element, downstream) -> {
state.add(element);
if (state.size() == windowSize) {
boolean result = downstream.push(List.copyOf(state));
state.clear();
return result;
} else {
return true;
}
};
Code language: Java (java)
We append the incoming element to the status list. As soon as the list has reached the desired size, we send a copy of the list downstream and empty the list.
However, this only works if the number of elements is a multiple of the window size. For example, if we have eight elements and a window size of three, then two lists of three elements each would be sent downstream for the first six elements. The seventh and eighth elements would also be in a list, but as this list has not yet reached the desired size, it has not been emitted downstream by the integrator.
This is where the finisher comes into play. The finisher receives the status after processing all stream elements and the downstream as input and can then emit further elements into the downstream depending on the status.
For the fixed window operation, the finisher would look as follows:
BiConsumer<List<T>, Downstream<List<T>>> finisher =
(state, downstream) -> {
if (!state.isEmpty()) {
downstream.push(List.copyOf(state));
}
};
Code language: Java (java)
If the list contains elements, it is sent downstream.
We combine the initializer, integrator, and finisher into a gatherer as follows:
Gatherer<T, List<T>, List<T>> gatherer =
Gatherer.ofSequential(initializer, integrator, finisher);
Code language: Java (java)
The following listing shows a method that generates a window gatherer from the components shown above, as well as a method that uses this gatherer:
public <T> Gatherer<T, List<T>, List<T>> windowing(int windowSize) {
return Gatherer.ofSequential(
// Initializer
ArrayList::new,
// Gatherer
(state, element, downstream) -> {
state.add(element);
if (state.size() == windowSize) {
boolean result = downstream.push(List.copyOf(state));
state.clear();
return result;
} else {
return true;
}
},
// Finisher
(state, downstream) -> {
if (!state.isEmpty()) {
downstream.push(List.copyOf(state));
}
});
}
public List<List<String>> groupWords(List<String> words, int groupSize) {
return words.stream()
.gather(windowing(groupSize))
.toList();
}
Code language: Java (java)
For simplicity, in the previous example, I used an ArrayList
as the state object. However, creating copies and clearing the list represents a significant overhead.
The following solution uses a wrapper object as the state, which contains a list that is directly pushed into the downstream and then recreated. This variant is about 20% faster:
public <T> Gatherer<T, ?, List<T>> windowing(int windowSize) {
return Gatherer.ofSequential(
// Initializer
() -> new Object() { ArrayList<T> list = new ArrayList<>(); },
// Gatherer
(state, element, downstream) -> {
state.list.add(element);
if (state.list.size() == windowSize) {
boolean result = downstream.push(state.list);
state.list = new ArrayList<>();
return result;
} else {
return true;
}
},
// Finisher
(state, downstream) -> {
if (!state.list.isEmpty()) {
downstream.push(List.copyOf(state.list));
}
});
}
Code language: Java (java)
In the next section, we come to the last stream gatherer component, the combiner.
Combiner
Executing a stateful gatherer in parallel requires a combiner function. A combiner combines two statuses into one in the join phase of parallel stream processing.
In this section, we want to implement a gatherer that emits the largest of all incoming elements, according to a given comparator, downstream.
As status, we use an AtomicReference
, which either contains no element or the largest element currently found:
Supplier<AtomicReference<T>> initializer = AtomicReference::new;
Code language: Java (java)
The integrator saves the incoming element in the status if the status is empty or if the incoming element is larger than the element saved in the status. As the integrator always returns true
, we mark it as “greedy”:
Integrator<AtomicReference<T>, T, T> integrator =
Integrator.ofGreedy(
(state, element, downstream) -> {
T bestElement = state.get();
if (bestElement == null || comparator.compare(element, bestElement) > 0) {
state.set(element);
}
return true;
});
Code language: Java (java)
The finisher sends the element, if the status contains one, to the downstream:
BiConsumer<AtomicReference<T>, Downstream<T>> finisher =
(state, downstream) -> {
T bestElement = state.get();
if (bestElement != null) {
downstream.push(bestElement);
}
};
Code language: Java (java)
And the combiner combines two statuses into one:
- If a status is empty, it returns the other status.
- If both statuses contain an element, it returns the status with the larger element.
If the input stream is empty, the combiner is never called, i.e., the case in which both statuses are empty cannot occur.
BinaryOperator<AtomicReference<T>> combiner =
(state1, state2) -> {
T bestElement1 = state1.get();
T bestElement2 = state2.get();
if (bestElement1 == null) {
return state2;
} else if (bestElement2 == null) {
return state1;
} else if (comparator.compare(bestElement1, bestElement2) > 0) {
return state1;
} else {
return state2;
}
};
Code language: Java (java)
We combine the initializer, integrator, combiner, and finisher with Gatherer.of(...)
to form a Gatherer:
Gatherer<T, AtomicReference<T>, T> gatherer =
Gatherer.of(initializer, integrator, combiner, finisher);
Code language: Java (java)
The following listing shows a method that generates a maximum gatherer from the building blocks shown above, as well as a method that uses this gatherer in a parallel stream to find the longest word in a list:
public <T> Gatherer<T, AtomicReference<T>, T> maximumBy(Comparator<T> comparator) {
return Gatherer.of(
// Initializer
AtomicReference::new,
// Integrator
Integrator.ofGreedy(
(state, element, downstream) -> {
T bestElement = state.get();
if (bestElement == null || comparator.compare(element, bestElement) > 0) {
state.set(element);
}
return true;
}),
// Combiner
(state1, state2) -> {
T bestElement1 = state1.get();
T bestElement2 = state2.get();
if (bestElement1 == null) {
return state2;
} else if (bestElement2 == null) {
return state1;
} else if (comparator.compare(bestElement1, bestElement2) > 0) {
return state1;
} else {
return state2;
}
},
// Finisher
(state, downstream) -> {
T bestElement = state.get();
if (bestElement != null) {
downstream.push(bestElement);
}
});
}
public Optional<String> getLongest(List<String> words) {
return words.parallelStream()
.gather(maximumBy(Comparator.comparing(String::length)))
.findFirst();
}
Code language: Java (java)
We now have all the components of a stream gatherer together. However, we do not need to implement a custom gatherer for every purpose. The JDK developers have already done this for us for frequently requested intermediate transformations.
You can find out which gatherers are already available in the next chapter.
Stream Gatherers Available in the JDK
You can create the predefined gatherers in the JDK using the corresponding factory methods of the Gatherers
class. You already got to know one gatherer in the introductory chapter: the “Fixed Window” gatherer.
Below, you will find an overview of the most important predefined gatherers:
Gatherers.fold(Supplier initial, BiFunction folder)
combines all stream elements into a single element, similar to a collector. This is useful if a terminal operation is to be called on an element combined from the stream elements.Gatherers.mapConcurrent(int maxConcurrency, Function mapper)
executes the specified mapping function in the specified number of virtual threads simultaneously.Gatherers.peek(Consumer effect)
Gatherers.peekOrdered(Consumer effect)
send each stream element to theConsumer
before it is forwarded to the next stage of the stream pipeline.peekOrdered(...)
ensures that a parallel stream is processed in the correct order.Gatherers.scan(Supplier initial, BiFunction scanner)
performs a so-called prefix scan.Gatherers.windowFixed(int windowSize)
Gatherers.windowSliding(int windowSize)
group the stream elements into lists of the specified size. With the sliding variant, the lists overlap and are each shifted by one element, e.g.,Stream.of(1, 2, 3, 4, 5).gather(Gatherers.windowSliding(3)).toList()
generates the following list of lists:[[1, 2, 3], [2, 3, 4], [3, 4, 5]]
.
Combine Gatherers
Just as you can link several filter(...)
and map(...)
operations in succession, for example, you can also call up several gatherers in succession, e.g., like this:
var result = source
.gather(a)
.gather(b)
.gather(c)
.collect(...);
Code language: Java (java)
If you regularly need a particular sequence of gatherers, you can also combine them into a single gatherer – in the spirit of DRY (don't repeat yourself) – like this, for example:
Gatherer abc = a.andThen(b).andThen(c);
var result = source
.gather(abc)
.collect(...);
Code language: Java (java)
That allows you to apply a transformation sequence of any length to different streams without redundancy.
Limitations of Stream Gatherers
Stream Gatherers, while powerful tools, do have two significant limitations:
- Like collectors, they are unavailable for the primitive streams
IntStream
,LongStream
, andDoubleStream
. - Just like collectors, they do not have access to the stream characteristics (the ones defined in the
Spliterator
interface). This means that they cannot be optimized based on these characteristics (e.g., the fact that the size is known or that the stream only contains distinct elements).
Conclusion
With stream gatherers, we can implement any intermediate stream operations, just as we have always been able to write any terminal operations with collectors. That allows us to write much more meaningful stream pipelines than before.
Using the Gatherers
class, we can retrieve ready-made gatherers various intermediate operations.
Which is the first of the prefabricated gatherers that you will use? What functionality are you planning to implement yourself as a Gatherer? Write me a comment!
Do you want to be up to date on all new Java features? Then click here to sign up for the HappyCoders newsletter.