Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Inside Stream API

Sponsored · Ship Features Fearlessly Turn features on and off without deploys. Used by thousands of Ruby developers.

Inside Stream API

2026.05.30 JJUG CCC 2026 Spring 発表資料

Avatar for Yuichi.Sakuraba

Yuichi.Sakuraba

May 30, 2026

More Decks by Yuichi.Sakuraba

Other Decks in Programming

Transcript

  1. さっそくですが、クイズです なんと出力される? void main() { var s = Stream.generate(() ->

    { IO.print("①"); return 0; }); IO.print("②"); s = s.limit(1); s = s.peek(x -> IO.print("③")); IO.println("④"); s.forEach(x -> IO.print("⑤")); } 選択肢 1. ① ② ③ ④ ⑤ 2. ② ④ ① ③ ⑤ 3. ① ② ④ ③ ⑤ 4. コンパイルエラー
  2. さっそくですが、クイズです なんと出力される? void main() { var s = Stream.generate(() ->

    { IO.print("①"); return 0; }); IO.print("②"); s = s.limit(1); s = s.peek(x -> IO.print("③")); IO.println("④"); s.forEach(x -> IO.print("⑤")); } 選択肢 1. ① ② ③ ④ ⑤ 2. ② ④ ① ③ ⑤ 3. ① ② ④ ③ ⑤ 4. コンパイルエラー
  3. JDKレポジトリのディレクトリ構成 jdk ...... モジュールごと OSごと shareは共通 bin doc make src

    test hotspot java.base java.compiler aix linux macosx share unix windows classes conf data legal man native META-INF com java javax jdk sun パッケージ
  4. ターゲットとするコード var sum = List.of(0, 1, 2, 3, 4, 5)

    .stream() .filter(x -> x%2 == 0) .map(x -> x*x) .reduce(0, (prv, prsnt) -> prv+prsnt); Stream生成 終端操作 中間操作 }
  5. Stream生成 java.util.Collection default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }

    default Spliterator<E> spliterator() { return Spliterators.spliterator(this, 0); }
  6. Stream生成 java.util.Collection default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }

    default Spliterator<E> spliterator() { return Spliterators.spliterator(this, 0); }
  7. Spliterators.spliterator() java.util.Spliterators public static <T> Spliterator<T> spliterator( Collection<? extends T>

    c, int characteristics) { return new IteratorSpliterator<>(Objects.requireNonNull(c), characteristics); } ソースの種類でオーバーロード ソースの種類に対応する実装クラス
  8. Stream生成 java.util.Collection default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }

    default Spliterator<E> spliterator() { return Spliterators.spliterator(this, 0); }
  9. StreamSupport.stream () java.util.stream.StreamSupport public static <T> Stream<T> stream( Spliterator<T> spliterator,

    boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>( spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); } ????
  10. PipelineHelper Streamインタフェース関連のクラス構成 <I> BaseStream <I> Stream パイプラインを作るための 基本機能 Head AbstractPipeline

    ReferencePipeline 参照型用パイプライン パイプラインの先頭 Spliterator パイプラインチェーンを 構成するフィールドを定義
  11. StreamSupport.stream () java.util.stream.StreamSupport public static <T> Stream<T> stream( Spliterator<T> spliterator,

    boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>( spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
  12. ラムダ式の処理順序 No.1 String hello(String who) { IO.println("Call hello()"); return "Hello,

    " + who + "!"; } void printMessage(String message) { IO.println("Call printMessage()"); IO.println(message); } printMessage(hello("World")); > java ExeOrder.java Call hello() Call printMessage() Hello, World!
  13. ラムダ式の処理順序 No.2 void printMessage(Function<String, String> func, String helloArg) { IO.println("Call

    printMessage()"); IO.println(func.apply(helloArg)); } printMessage(m -> hello(m), "World"); > java ExeOrder.java Call printMessage() Call hello() Hello, World!
  14. ラムダ式の処理順序 No.3 Supplier<String> printMessageLater( Function<String, String> func, String helloArg) {

    IO.println("Call printMessageLater()"); return () -> func.apply(helloArg); } var supplier = printMessageLater(m -> hello(m), "World"); IO.println("After printMessageLater()"); IO.println(supplier.get()); > java ExeOrder.java Call printMessageLater() After printMessageLater() Call hello() Hello, World!
  15. ターゲットとするコード var sum = List.of(0, 1, 2, 3, 4, 5)

    .stream() .filter(x -> x%2 == 0) .map(x -> x*x) .reduce(0, (prv, prsnt) -> prv+prsnt); 終端操作 中間操作 }
  16. ReferencePipeline.map() java.util.stream.ReferencePipeline public final <R> Stream<R> map( Function<? super P_OUT,

    ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } };
  17. ReferencePipeline.map() java.util.stream.ReferencePipeline public final <R> Stream<R> map( Function<? super P_OUT,

    ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } };
  18. ReferencePipeline.map() java.util.stream.ReferencePipeline public final <R> Stream<R> map( Function<? super P_OUT,

    ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } };
  19. AbstractPipeline.<init>() java.util.stream.AbstractPipeline AbstractPipeline( AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {

    if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, prev this.sourceStage = previousStage.sourceStage; this.depth = previousStage.depth + 1; }
  20. ReferencePipeline.map() java.util.stream.ReferencePipeline public final <R> Stream<R> map( Function<? super P_OUT,

    ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } };
  21. ChainedReference java.util.stream.Sink abstract static class ChainedReference<T, E_OUT> implements Sink<T> {

    protected final Sink<? super E_OUT> downstream; public ChainedReference(Sink<? super E_OUT> downstream) { this.downstream = Objects.requireNonNull(downstream); } ......
  22. ReferencePipeline.map() java.util.stream.ReferencePipeline public final <R> Stream<R> map( Function<? super P_OUT,

    ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; ラムダ式をキャプチャ
  23. ReferencePipeline.filter() java.util.stream.ReferencePipeline public final <R> Stream<R> filter( Predicate<? super P_OUT,

    ? extends R> predicate) { Objects.requireNonNull(mapper); return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<>(sink) { @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } };
  24. ReferencePipeline.reduce() java.util.stream.ReferencePipeline @Override public final P_OUT reduce( final P_OUT identity,

    final BinaryOperator<P_OUT> accumulator) { return evaluate( ReduceOps.makeRef(identity, accumulator, accumulator)); }
  25. ReduceOps.makeRef() java.util.stream.ReduceOps public static <T, U> TerminalOp<T, U> makeRef(U seed,

    BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { ...... } return new ReduceOp<T,U,ReducingSink>(StreamShape.REFERENCE) @Override public ReducingSink makeSink() { return new ReducingSink(); } };} 値を1つ持つコンテナ
  26. ReduceOps.makeRef() java.util.stream.ReduceOps public static <T, U> TerminalOp<T, U> makeRef(U seed,

    BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { ...... } return new ReduceOp<T,U,ReducingSink>(StreamShape.REFERENCE) @Override public ReducingSink makeSink() { return new ReducingSink(); } };}
  27. ReferencePipeline.reduce() java.util.stream.ReferencePipeline @Override public final P_OUT reduce( final P_OUT identity,

    final BinaryOperator<P_OUT> accumulator) { return evaluate( ReduceOps.makeRef(identity, accumulator, accumulator)); }
  28. AbstractPipeline.evaluate() java.util.stream.AbstractPipeline final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert

    getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel( this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential( this, sourceSpliterator(terminalOp.getOpFlags())); }
  29. ReduceOps.makeRef() java.util.stream.ReduceOps public static <T, U> TerminalOp<T, U> makeRef(U seed,

    BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { ...... } return new ReduceOp<T,U,ReducingSink>(StreamShape.REFERENCE) @Override public ReducingSink makeSink() { return new ReducingSink(); } };}
  30. ReduceOp.evaluateSequential() java.util.stream.ReduceOps @Override public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator)

    { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); } ReducingSink 内部でStatelessOpの opWrapSinkをコール
  31. ReferencePipeline.map() java.util.stream.ReferencePipeline public final <R> Stream<R> map( Function<? super P_OUT,

    ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; downstream
  32. ReduceOp.evaluateSequential() java.util.stream.ReduceOps @Override public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator)

    { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); } ReducingSink Sinkチェーンを生成
  33. ReduceOp.evaluateSequential() java.util.stream.ReduceOps @Override public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator)

    { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); } ReducingSink Sinkチェーンを生成 copyIntoをコール
  34. AbstractPipeline.copyInto() java.util.stream.AbstractPipeline @Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator)

    { if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags() wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } } Sinkチェーンの先頭
  35. IteratorSpliterator.forEachRemaining () java.util.Spliterators @Override public void forEachRemaining(Consumer<? super T> action)

    { if (action == null) throw new NullPointerException(); Iterator<? extends T> i; if ((i = it) == null) { i = it = collection.iterator(); est = (long)collection.size(); } i.forEachRemaining(action); } Sinkチェーンの先頭 Iteratorを使用したイテレーション 内部でSink.accept()をコール Iterator取得
  36. ReduceOp.evaluateSequential() java.util.stream.ReduceOps @Override public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator)

    { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); } ReducingSinkが保持する 結果を取り出す