Home | Send Feedback | Share on Bluesky |

Stream Gatherers in the Java Stream API

Published: 5. March 2026  •  java

Java streams have been around since Java 8, and they have changed how we process data in collections and pipelines.

What is a stream? A stream is a sequence of elements that can be processed in a functional style. You can filter, map, reduce, and collect data in a fluent way. A stream consists of three parts: a source, zero or more intermediate operations, and a terminal operation.

In this example, we have a stream of strings as the source, two intermediate operations (filter and map), and a terminal operation (toList).

    List<String> result = Stream.of("the", "", "fox", "jumps")
          .filter(s -> !s.isBlank())      // intermediate
          .map(String::toUpperCase)       // intermediate
          .toList();                      // terminal
    System.out.println(result);
    // [THE, FOX, JUMPS]

Main.java

Custom terminal operations

The standard library provides a set of built-in terminal operations like toList, count, findFirst, etc. These cover many common use cases, but sometimes you need something more specific. The Stream API makes it easy to write custom terminal operations by implementing the Collector interface and then passing it to the collect method.

A collector is defined by four components: a supplier that creates a new result container, an accumulator that adds each element to that container, a combiner that merges two containers (for parallel processing), and a finisher that transforms the container into the final result. In practice, the supplier, accumulator, and combiner are the core construction pieces, while the finisher is often the identity function when the accumulation type already matches the result type.

    Collector<String, ?, List<String>> toUpperCaseList = Collector.of(
      // Supplier
      ArrayList::new,
      // Accumulator
        (list, s) -> {
          if (!s.isBlank()) {
            list.add(s.toUpperCase());
          }
        }, 
        // Combiner
        (left, right) -> {
          left.addAll(right);
          return left;
        });

    List<String> result = Stream.of("the", "", "fox", "jumps")
        .collect(toUpperCaseList);

    System.out.println(result);
    // [THE, FOX, JUMPS]

Main.java

Custom intermediate operations

While there is an easy way to write custom terminal operations, there was no clean way to write custom intermediate operations. The JDK provides a rich set of built-in intermediate operations, but sometimes you need something that is not covered by the standard library. You had to either drop out of the stream and write an imperative loop, or use some awkward hack to fit your logic into the existing operators.

Fortunately, this gap has been filled in Java 24 with the introduction of Stream Gatherers (JEP 485).

Stream Gatherers add a new gather intermediate operation to the Stream API.

<R> Stream<R> gather(Gatherer<? super T, ?, R> gatherer)

This works similarly to the collect method for terminal operations, but instead of a Collector implementation, you provide a Gatherer implementation that defines how to process each element and emit results downstream.

Built-in gatherers

Before we dive into writing custom gatherers, let's look at all the built-in gatherers that Java 24 provides in java.util.stream.Gatherers:

windowFixed(int windowSize)

Groups elements into fixed-size batches (windows) and emits each batch as a list. The last batch may be smaller if there are not enough elements. The parameter specifies the size of each window.

    List<List<Integer>> fixed = Stream.of(1, 2, 3, 4, 5, 6, 7)
        .gather(Gatherers.windowFixed(3))
        .toList();
    System.out.println(fixed);
    // [[1, 2, 3], [4, 5, 6], [7]]

Main.java

This is a common pattern for processing data in chunks, especially when dealing with APIs that have batch limits or when you want to reduce overhead by processing multiple items at once.


windowSliding(int windowSize)

This is similar to the fixed window, but instead of non-overlapping batches, it produces overlapping windows by sliding one element at a time.

    List<List<Integer>> sliding = Stream.of(1, 2, 3, 4, 5)
        .gather(Gatherers.windowSliding(3))
        .toList();
    System.out.println(sliding);
    // [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

Main.java

This produces n - size + 1 windows for a finite stream of size n (when n >= size). It is ideal for adjacent-pattern analytics where overlap is required, such as moving averages, anomaly detection, or pair/triplet comparisons.


fold(Supplier initial, BiFunction<R,T,R> folder)

Accumulates many input elements into one result and emits it at the end. The initial supplier provides the starting state, and the folder function defines how to combine each element with the accumulated state.

This example sums a stream of integers.

    Optional<Integer> folded = Stream.of(1, 2, 3, 4)
        .gather(Gatherers.fold(() -> 0, Integer::sum))
        .findFirst();
    System.out.println(folded);
    // Optional[10]

Main.java

This is similar to the reducing terminal operation, but expressed as a gatherer. Because this is an intermediate operation, you can still chain more stream operations after it.


scan(Supplier initial, BiFunction<R,T,R> scanner)

This operation is related to fold, but instead of emitting only the final result, it emits the intermediate accumulated state after processing each input element. This is often called a "running fold" or "prefix scan".

Like the fold method, the initial supplier provides the starting state, and the scanner function defines how to combine each element with the accumulated state.

As in the fold example, this will sum a stream of integers, but it will emit the running total after each element.

    List<Integer> scanned = Stream.of(1, 2, 3, 4)
        .gather(Gatherers.scan(() -> 0, Integer::sum))
        .toList();
    System.out.println(scanned);
    // [1, 3, 6, 10]

Main.java

This can be useful for timelines, progressive metrics, and running balances where you want to see the evolution of the accumulated state over time, rather than just the final result.


mapConcurrent(int maxConcurrency, Function<T,R> mapper)

This is similar to the map intermediate operation, but it allows you to process elements concurrently with a specified maximum concurrency level. It uses virtual threads under the hood to achieve this.

The parameter maxConcurrency limits the number of concurrent tasks, and the mapper function defines how to transform each element.

    List<ProductImage> images = IntStream.rangeClosed(1, 10)
        .mapToObj(i -> new ProductImage(
            URI.create("https://cdn.example.com/products/" + i + ".jpg"), i))
        .toList();

    List<String> results = images.stream().gather(Gatherers.mapConcurrent(
      4, // Max parallel tasks
        image -> { // Mapper
          System.out.printf("Processing %s%n", image.productId());
          try {
            return image.processImage();
          }
          catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
          }
        })).toList();
    results.forEach(System.out::println);

Main.java

This can be useful for latency-heavy mapping operations, such as I/O-bound tasks or expensive computations, where you want to take advantage of concurrency without overwhelming the system. The method propagates cancellation/short-circuiting signals, so it can avoid unnecessary work when downstream operations signal that they are done (e.g., findFirst, limit). It preserves encounter order in the resulting stream, so the output order matches the input order, even though processing happens concurrently.

How to implement your own gatherers

A Gatherer consists of up to four building blocks.

Initializer (optional)
Creates private mutable state.

Integrator (required)
Consumes each input element and can emit output downstream to the next intermediate or terminal operation. This is where the main logic of the gatherer lives.

Combiner (optional)
Merges states for parallel processing. This is only needed if you want your gatherer to be parallel-capable and your state has safe merge semantics. If you omit the combiner, the gatherer can still be used in parallel pipelines, but the gatherer stage itself will be effectively sequential.

Finisher (optional)
Emits final output after input ends. This is the place to flush any buffered state that may not have been emitted during integration. For example, the windowFixed gatherer uses the finisher to emit the last partial window when the stream ends.

Only the integrator is required, and you can choose to implement just that for simple cases. The other components are there to support more complex state management, parallel execution, and end-of-stream behavior.


Stateless: Integrator-only gatherer

A simple gatherer that only implements the integrator can be created with Gatherer.ofSequential(Gatherer.Integrator<Void,T,R> integrator).

An integrator receives the current state, the next input element, and a downstream emitter. It can do something with the input element and then decide whether to push results downstream or not. The integrator can also push multiple results downstream for a single input element, or it can choose to emit nothing for some inputs.

An integrator returns a boolean indicating whether it wants to continue consuming more input. If it returns false, it signals that it wants to stop consuming more input (short-circuiting). If it returns true, it indicates that it is ready to consume the next input element.

In this example, we create a simple gatherer that doubles each input value and pushes it downstream without maintaining any state. Since we don't need to keep track of any state, we can use Void for the state type and ignore it. The push method returns a boolean indicating whether the downstream is still accepting input. It is recommended to propagate that signal back up by returning the result of push from the integrator, so that if the downstream signals that it is done (e.g., due to a short-circuiting operation), the integrator will also stop consuming more input.

    Gatherer<Integer, Void, Integer> doubleValuesNonGreedy = Gatherer.ofSequential(
        (state, element, downstream) -> downstream.push(element * 2));

    List<Integer> result = Stream.of(1, 2, 3).gather(doubleValuesNonGreedy).toList();
    System.out.println(result);
  // [2, 4, 6]

Main.java

Because this integrator consumes all input elements and does not short-circuit on its own, it is considered a greedy integrator. If we want to make the intent explicit, we can use Gatherer.Integrator.ofGreedy(...) to declare that this integrator does not short-circuit on its own and will keep consuming input as long as the downstream is accepting it.

    Gatherer<Integer, Void, Integer> doubleValues = Gatherer.ofSequential(
        Gatherer.Integrator.ofGreedy((state, element, downstream) -> downstream
            .push(element * 2)));

There is no functional difference between the two, but using ofGreedy can clarify intent for readers and for the stream implementation, as it explicitly states that the integrator will not stop on its own and only propagates downstream rejection. This can help the internal stream runtime choose the correct control-flow behavior for the stage.


Stateless: Integrator with Finisher

The ofSequential method also allows you to provide a finisher function that will be called when the stream ends. This is useful for emitting any final output that may be needed after all input has been processed. For example, if you want to emit a final "END" string after processing all input elements, you can implement the finisher to push that final output downstream. The finisher receives the final state (which is Void in this case) and the downstream emitter, and it can push any final results downstream.

    Gatherer<Integer, Void, String> labelValues = Gatherer.ofSequential(
        Gatherer.Integrator
            .ofGreedy((state, element, downstream) -> downstream.push("value=" + element)),
        (state, downstream) -> {
          downstream.push("END");
        });

    List<String> result = Stream.of(10, 20, 30).gather(labelValues).toList();
    List<String> noData = Stream.<Integer>empty().gather(labelValues).toList();
    System.out.println(result);
  // [value=10, value=20, value=30, END]
    System.out.println(noData);
  // [END] (no input elements)

Main.java


Stateful: Integrator with Initializer

The next level of complexity is when we need to maintain some state across input elements.

This is an example of a gatherer that emits the running sum of a stream of integers. To keep track of the running total, it needs to store the accumulated sum somewhere, and that is what the state is for. We can define a simple State class that has a single field, runningTotal, to keep track of the accumulated sum. You could also use a mutable wrapper like AtomicInteger or an array of size 1. It only needs to be mutable.

In this Gatherer implementation, we need to define the initializer and the integrator. The initializer is responsible for creating a new instance of the state. Note that you don't pass an instance of the state to the ofSequential method, but rather an implementation of the Supplier functional interface that creates a new instance of the state when called. The stream runtime will call this supplier to create a new state instance for each stream execution.

The integrator receives the current state as a parameter, and it can update the state with each input element. In this example, the integrator adds the current element to the runningTotal field of the state, and then pushes the updated running total downstream. Since this integrator also consumes all input elements and does not short-circuit on its own, we can declare it as a greedy integrator.

    class State {
      int runningTotal;
    }

    Gatherer<Integer, State, Integer> runningSumGreedy = Gatherer.ofSequential(
        State::new,
        Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
          state.runningTotal += element;
          return downstream.push(state.runningTotal);
        }));

    List<Integer> result = Stream.of(1, 2, 3, 4).gather(runningSumGreedy).toList();
    System.out.println(result);
  // [1, 3, 6, 10]

Main.java


Stateful: Integrator, Initializer, and Finisher

The last Gatherer implementation you can create with the ofSequential method is one that has an initializer, an integrator, and a finisher.

In this example, we create a gatherer that emits the sum of pairs of integers from the input stream. The state keeps track of a pending integer that has not yet been paired.

The initializer creates a new instance of the State class.

The integrator checks if there is a pending integer; if there isn't, it stores the current element as pending and does not push anything to the downstream. If there is a pending integer, it sums it with the current element, pushes the sum downstream, and clears the pending state.

The finisher checks if there is still a pending integer when the stream ends, and if so, it pushes that pending integer downstream as well.

    class State {
      Integer pending;
    }

    Gatherer<Integer, State, Integer> pairSums = Gatherer.ofSequential(State::new,
        Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
          if (state.pending == null) {
            state.pending = element;
            return true;
          }
          int sum = state.pending + element;
          state.pending = null;
          return downstream.push(sum);
        }), (state, downstream) -> {
          if (state.pending != null) {
            downstream.push(state.pending);
          }
        });

    List<Integer> result = Stream.of(1, 2, 3, 4, 5).gather(pairSums).toList();
    System.out.println(result);
  // [3, 7, 5]

Main.java

Parallel-capable gatherers with Combiner

If we want to implement a stateful gatherer that can be used in parallel streams, we need to use the Gatherer.of(...) factory method that allows us to provide a combiner function. The combiner is responsible for merging two state instances together when the stream is processed in parallel. This is necessary because in a parallel stream, the input elements may be processed in different threads, and each thread will have its own instance of the state. The combiner defines how to merge those state instances together to produce a correct final result.

In this example, we create a gatherer that computes the weighted average of a stream of StudentGrade objects, where each grade has an associated number of credit hours that serves as the weight. The state keeps track of the weighted sum of the grades and the total weight (credit hours) so far.

The initializer creates a new instance of the WeightedAverageState class, which has two fields: weightedSum and totalWeight. Because we will run this gatherer in parallel, the initializer will be called multiple times to create separate state instances for each thread.

The integrator updates the weightedSum and totalWeight fields of the state for each input element. It calculates the weight for the current element using the provided weightFunction, and then updates the weighted sum and total weight accordingly. Since this integrator consumes all input elements and does not short-circuit on its own, we can declare it as a greedy integrator. This example also shows you that an integrator does not necessarily need to push data downstream.

The combiner merges two WeightedAverageState instances by summing their weightedSum and totalWeight fields. Each thread will have its own instance of the WeightedAverageState class, and when the parallel stream processing is done, the combiner will be called to merge those instances together to produce a single state that represents the combined results of all threads.

The finisher receives the merged state from the combiner and emits the final weighted average by dividing the weightedSum by the totalWeight.

  class WeightedAverageState {
    double weightedSum = 0.0;
    double totalWeight = 0.0;
  }

  record StudentGrade(double grade, double creditHours) {
  }

  <TR extends StudentGrade> Gatherer<TR, WeightedAverageState, Double> weightedAverage(
      Function<TR, Double> weightFunction) {

    return Gatherer.of(
        /* Initializer */
        WeightedAverageState::new,
        /* Integrator */
        Gatherer.Integrator.ofGreedy((state, element, _) -> {
          double weight = weightFunction.apply(element);
          state.weightedSum += element.grade() * weight;
          state.totalWeight += weight;
          return true;
        }),
        /* Combiner */
        (leftState, rightState) -> {
          leftState.weightedSum += rightState.weightedSum;
          leftState.totalWeight += rightState.totalWeight;
          return leftState;
        },
        /* Finisher */
        (state, downstream) -> {
          if (state.totalWeight > 0) {
            double weightedAverage = state.weightedSum / state.totalWeight;
            downstream.push(weightedAverage);
          }
        });

ParallelGathererDemo.java

    ParallelGathererDemo demo = new ParallelGathererDemo();
    Function<StudentGrade, Double> weightFunction = sg -> sg.creditHours();

    List<StudentGrade> grades = List.of(new StudentGrade(90, 3), new StudentGrade(80, 4),
        new StudentGrade(85, 2), new StudentGrade(70, 3), new StudentGrade(95, 1));

    double weightedAverage = grades.stream().parallel()
      .gather(demo.weightedAverage(weightFunction)).findFirst().get();
    System.out.println("Weighted Average: " + weightedAverage);
    // Weighted Average: 81.92307692307692

ParallelGathererDemo.java

Note that findFirst().get() will throw an exception if the stream is empty, so in a real application you would want to handle that case properly. You could use ifPresentOrElse to provide a default value when the input source is empty.

    grades = List.of();
    grades.stream().parallel().gather(demo.weightedAverage(weightFunction)).findFirst()
        .ifPresentOrElse(wa -> {
          System.out.println("Weighted Averages: " + wa);
        }, () -> {
          System.out.println("No data");
        });
    // No data

ParallelGathererDemo.java

Gatherer composition with andThen(...)

Gatherers can be composed directly, which is useful when you want to package multiple intermediate behaviors into one reusable unit. The andThen method allows you to chain gatherers together, where the output of one gatherer becomes the input of the next.

    Gatherer<Integer, ?, Integer> running = Gatherers.scan(() -> 0, Integer::sum);

    Gatherer<Integer, ?, String> asCsv = Gatherers.fold(() -> "",
        (acc, n) -> acc.isEmpty() ? n.toString() : acc + ";" + n);

    String out = Stream.of(1, 2, 3, 4).gather(running.andThen(asCsv)).findFirst().orElse("");
    System.out.println(out);
    // 1;3;6;10

Main.java

Custom Gatherers library

Before you start writing your own gatherers, take a look at this open-source library of custom gatherers: Gatherers4j. It provides a growing collection of reusable gatherers for common patterns that are not covered by the built-in gatherers in the JDK. You can use these gatherers as-is, or you can use them as inspiration for writing your own custom gatherers.

The coordinates for adding the library to your project are:

<dependency>
    <groupId>com.ginsberg</groupId>
    <artifactId>gatherers4j</artifactId>
    <version>...LATEST VERSION...</version>
</dependency>

Here is a simple example of one gatherer from the library that groups consecutive equal elements together into lists.

import java.util.List;
import java.util.stream.Stream;
import com.ginsberg.gatherers4j.Gatherers4j;

  List<List<String>> result = 
        Stream.of("A", "A", "BB", "BB", "CCC", "A")
          .gather(Gatherers4j.group())
          .toList();

  System.out.println(result);
  // [[A, A], [BB, BB], [CCC], [A]]

Final take

Stream Gatherers (JEP 485) fill a gap in the Stream API. Introduced in Java 24, they allow you to write custom intermediate operations with state and complex logic that was previously difficult or impossible to express cleanly in the Stream API. The built-in gatherers in Gatherers cover common patterns like fixed/sliding windows, folds, scans, and concurrent mapping. For more complex scenarios, you can implement your own gatherers with custom state management, integration logic, parallel support, and finishing behavior.