Upgrade to Pro — share decks privately, control downloads, hide ads and more …

L'API Gatherer : l'outil qui manquait à vos Str...

José
April 15, 2025

L'API Gatherer : l'outil qui manquait à vos Streams

Alors qu'elle fête ses 10 ans, l'API Stream vient de se voir dotée d'une nouvelle méthode et d'une API associée : l'API Gatherer, publiée en version finale dans le JDK 24. Un Gatherer est un objet un peu complexe, capable de modéliser une opération intermédiaire sur un stream, à la manière des collectors pour les opérations terminales. Cela apporte de nouvelles possibilités à l'API Stream, inaccessibles auparavant. Cette présentation couvre l'ensemble de l'API, ses patterns d'utilisation, ainsi que la façon d'utiliser des Gatherers dans les Streams parallèles. Nous parlerons d'état mutable, d'intégrateur, d'interruption de stream, de combinaison, de streams parallèles, tous ces éléments utilisés pour construire les gatherers, et que vous avez besoin de connaître pour maîtriser cette API complexe.
Les slides sont en anglais.

José

April 15, 2025
Tweet

More Decks by José

Other Decks in Programming

Transcript

  1. L’API Gatherers : l’outil qui manquait à vos Streams José

    Paumard Java Developer Advocate Java Platform Group
  2. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 4 Tune

    in! Inside Java Newscast JEP Café Road To 21 series Inside.java Inside Java Podcast Sip of Java Cracking the Java coding interview
  3. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 7 JEP

    485: Stream Gatherers Stream Gatherers - Deep Dive with the Expert https://youtu.be/v_5SKpfkI2U
  4. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 8 Short

    answer: an old API from the JDK 8 A Stream connects to a source many sources: collections, files, random generators, regular expressions A Stream has intermediate operations = an operation that returns another stream A Stream has a single terminal operation = an operation that returns something else What is a Stream?
  5. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 9 Several

    methods for terminal operations: reduce() findFirst(), findAny() toList() collect(), which takes a Collector Collectors are great to create your own reduction! Even if there are things they cannot do Terminal Operations
  6. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 10 Plenty

    of methods for intermediate operations: map(), filter(), flatMap(), mapMulti() limit(), dropWhile() distinct(), sorted() No way to create your own intermediate operation! Intermediate Operations
  7. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 11 An

    interface (in fact more than one) interface Gatherer<T, A, R> An intermediate method on the Stream API Stream.gather(gatherer) A factory class: Gatherers What Does the Gatherer API Give You?
  8. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 12 Comparing

    With a Collector Stream<T> upstream = ...; Gatherer<T, ?, R> gatherer = ...; Stream<R> downstream = upstream.gather(gatherer); Stream<T> upstream = ...; Collector<T, ?, R> collector = ...; R result = upstream.collect(collector);
  9. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 14 Short

    answer: a functional interface! What is a Gatherer? interface Gatherer<T, A, R> { Integrator<A, T, R> integrator(); } @FunctionalInterface interface Integrator<A, T, R> { boolean integrate( A state, T element, Downstream<? super R> downstream); }
  10. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 15 Short

    answer: a functional interface! What is a Gatherer? Gatherer<T, ?, R> gatherer = () -> (_, _, _) -> true;
  11. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 16 Pushing

    elements to a downstream What is a Gatherer? Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> downstream.push(element)); // returns a boolean
  12. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 17 A

    mapping gatherer What is a Gatherer? Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> { var mapped = mapper.apply(element); return downstream.push(mapped); });
  13. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 18 A

    filtering gatherer What is a Gatherer? Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> { if (filter.test(element)) { return downstream.push(mapped)); } else { ??? } });
  14. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 19 A

    filtering gatherer What is a Gatherer? Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> { if (filter.test(element)) { return downstream.push(mapped)); } else { return true; } });
  15. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 20 Pushing

    elements to a downstream How is this boolean used by the API? What is a Gatherer? Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> downstream.push(element)); // returns a boolean
  16. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 21 1)

    You can push 0 or more elements to a downstream 2) A call to downstream.push() returns a boolean true means that this downstream accepts more elements false means it does not 3) Your integrator should follow this rule Pushing to a rejecting downstream does not thrown any exception Pushing to a Downstream
  17. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 22 If

    your Integrator does not decide on itself to return false ie it always transmits the value returned by downstream.push() Then you can declare it as a Greedy integrator Pushing to a Downstream Gatherer<T, ?, R> gatherer = Gatherer.of( Integrator.of((_, element, downstream) -> downstream.push(element)));
  18. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 23 If

    your Integrator does not decide on itself to return false ie it always transmits the value returned by downstream.push() Then you can declare it as a Greedy integrator Pushing to a Downstream Gatherer<T, ?, R> gatherer = Gatherer.of( Integrator.ofGreedy((_, element, downstream) -> downstream.push(element)));
  19. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 24 A

    Downstream holds a state: rejecting 1) Starts with false 2) Cannot commute from false to true 3) Can only commute on a call to push() A downstream could commute on a clock A downstream is not a thread safe object! Rejecting Downstreams
  20. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 25 Should

    you check for isRejecting()? Downstream (_, element, downstream) -> { if (downstream.isRejecting()) { // return false; // worth it? } // return downstream.push(mapper.apply(element)); }
  21. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 26 Should

    you check for isRejecting()? Downstream (_, element, downstream) -> { if (downstream.isRejecting()) { // return false; // worth it? NOPE! } // return downstream.push(mapper.apply(element)); }
  22. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 27 How

    to push to the downstream? A Flatmapping Gatherer Function<T, Stream<R>> flatMapper = ...; Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> { Stream<R> elements = flatMapper.apply(element); elements.forEach(downstream::push); return true; //  });
  23. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 28 How

    to push to the downstream? A Flatmapping Gatherer Function<T, Stream<R>> flatMapper = ...; Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> { Stream<R> elements = flatMapper.apply(element); elements.takeWhile(_ -> !downstream.isRejecting()) .forEach(downstream::push); return !downstream.isRejecting(); });
  24. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 29 How

    to push to the downstream? Two bug fixes to go! A Flatmapping Gatherer Function<T, Stream<R>> flatMapper = ...; Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> { Stream<R> elements = flatMapper.apply(element); return elements.allMatch(downstream::push); });
  25. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 30 How

    to push to the downstream? A stream has to be closed! A Flatmapping Gatherer Function<T, Stream<R>> flatMapper = ...; Gatherer<T, ?, R> gatherer = Gatherer.of( (_, element, downstream) -> { Stream<R> elements = flatMapper.apply(element); return elements.allMatch(downstream::push); });
  26. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 31 How

    to push to the downstream? 1 bug fix to go! A Flatmapping Gatherer (_, element, downstream) -> { try (Stream<R> elements = flatMapper.apply(element);) { return elements.allMatch(downstream::push); } }
  27. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 32 How

    to push to the downstream? A downstream is not a thread safe object! A Flatmapping Gatherer (_, element, downstream) -> { try (Stream<R> elements = flatMapper.apply(element);) { return elements.allMatch(downstream::push); } }
  28. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 33 How

    to push to the downstream? A Flatmapping Gatherer (_, element, downstream) -> { try (Stream<R> elements = flatMapper.apply(element);) { return elements.sequential() .allMatch(downstream::push); } }
  29. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 34 1)

    Carries a state: rejecting 2) Is not a thread-safe object Be careful with parallel streams! Wrapping up Downstream
  30. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 35 A

    Gatherer may carry an internal mutable state This state is initialized with an initializer It is carried from one call of the integrator to the other What About State? (state, element, downstream) -> { return true; }
  31. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 36 A

    limit Gatherer What About State? class Counter { long count = 0L; } var gatherer = Gatherer.ofSequential( Counter::new, // the initializer (state, element, downstream) -> { if (state.count++ < limit) { return downstream.push(element); } else { return false; } });
  32. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 37 A

    limit Gatherer What About State? class Counter { long count = 0L; } var gatherer = Gatherer.ofSequential( () -> new Object() { long count = 0L; }, // the initializer (state, element, downstream) -> { if (state.count++ < limit) { return downstream.push(element); } else { return false; } });
  33. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 38 A

    distinct Gatherer What About State? var gatherer = Gatherer.ofSequential( () -> new Object() { Set<T> set = new HashSet<>(); }, (state, element, downstream) -> { if (state.set.add(element)) { return downstream.push(element); } else { return true; } });
  34. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 39 A

    sorting and distinct Gatherer What About State? var gatherer = Gatherer.ofSequential( () -> new Object() { Set<T> set = new TreeSet<>(); }, (state, element, downstream) -> { state.set.add(element); return true; // when can I push the content of set? });
  35. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 40 Finisher

    FTW! Pushing a Final State var gatherer = Gatherer.ofSequential( () -> new Object() { Set<T> set = new TreeSet<>(); }, (state, element, downstream) -> { ... }, (state, downstream) -> { // finisher state.set.stream().allMatch(downstream::push); });
  36. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 41 A

    Gatherer is built on 3 elements (so far): - An initializer for its internal mutable state - An integrator, that can be greedy - A finisher, to push the elements left in the state Wrapping up Gatherers
  37. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 42 So

    far we built gatherers with two factory methods: - Gatherer.of(…) - Gatherer.ofSequential(…) Gatherers support parallel streams (of course!) parallel and sequential gatherers can be called in parallel streams What about the internal mutable state? Parallel Gatherers
  38. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 43 1

    Principle: there is one state object per thread So, in a parallel stream: 1) Each thread creates its own instance of state 2) At the end of the day, you need a combiner Parallel Gatherers (state1, state2) -> { // do something return state; }
  39. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 44 Parallel

    Gatherers State integrator State element thread-1
  40. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 45 Parallel

    Gatherers integrator State element integrator State element integrator State element integrator State element Combiner thread-1 thread-2 thread-3 thread-4
  41. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 46 A

    Parallel Distinct Gatherer var gatherer = Gatherer.of( () -> new Object() { Set<T> set = new HashSet<>(); }, (state, element, downstream) -> { // executed in state.set.add(element); // different threads return true; }, (state1, state2) -> { // combiner state1.set.addAll(state2.set); return state1; }, (state, downstream) -> { // finisher state.set.allMatch(downstream::push); } );
  42. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 47 Sequential

    Gatherers cannot be called in different threads at the same time They do not have a combiner, so they cannot combine different states But they can be used in parallel streams! What About Sequential Gatherers?
  43. sequential gatherer 4/16/2025 Copyright © 2025, Oracle and/or its affiliates

    48 Sequential Gatherers in Parallel Stream Source upstream downsteam
  44. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 49 The

    management of the threads is the responsibility of the Fork / Join framework = transparent from the user point of view A sequential Gatherer is executed by a single thread, but it can jump from one thread to the other this is how the Fork / Join framework works Sequential Gatherers in Parallel Stream
  45. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 50 Sequential

    Gatherers in Parallel Stream https://github.com/ SvenWoltmann/stream-gatherers
  46. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 51 The

    Gatherers class - windowFixed() Ready-to-Use Gatherers var ints = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9); ints.stream() .gather(Gatherers.windowFixed(3)) .toList(); > [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
  47. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 52 The

    Gatherers class - windowSliding() Ready-to-Use Gatherers var ints = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9); ints.stream() .gather(Gatherers.windowSliding(3)) .toList(); > [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], ...]
  48. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 53 The

    Gatherers class - scan() Ready-to-Use Gatherers var ints = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9); ints.stream() .gather(Gatherers.scan( () -> "", (scanned, e) -> s + e)) .toList(); > ["1", "12", "123", "1234", "12345", ...]
  49. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 54 The

    Gatherers class - fold() Ready-to-Use Gatherers var ints = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9); ints.stream() .gather(Gatherers.fold( () -> "", (folded, e) -> folded + e)) .toList(); > ["123456789"]
  50. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 55 The

    Gatherers class - mapConcurrent() Maps a stream to another stream Each mapping is computed in its own, virtual thread. Takes a maxConcurrency parameter to control the max number of running virtual threads Ready-to-Use Gatherers
  51. 4/16/2025 Copyright © 2025, Oracle and/or its affiliates 56 Builts

    on: - An Initializer - An Integrator (can be greedy) - A Combiner - A Finisher Can decide to work in parallel or not A sequential Gatherer can work in a parallel stream A sequential Stream can compute mappings in parallel Wrapping up Gatherers