Upgrade to Pro — share decks privately, control downloads, hide ads and more …

業務で使いたいWebFluxによるReactiveプログラミング / Introduction...

業務で使いたいWebFluxによるReactiveプログラミング / Introduction to Reactive Programming using Spring WebFlux

Spring Fest 2018
業務で使いたいWebFluxによるReactiveプログラミング
#jsug #sf_h6

Shin Tanimoto

October 31, 2018
Tweet

More Decks by Shin Tanimoto

Other Decks in Programming

Transcript

  1. #sf_h6 自己紹介  谷本 心 (Shin Tanimoto)  Twitter: @cero_t

     Acroquest Technology 株式会社  システムアーキテクト、コンサルティング、トラブルシューティング  著書「Java本格入門」  コミュニティ活動  日本Javaユーザーグループ / 関西Javaエンジニアの会  Java Champion  対戦格闘ゲーム / たこ焼き / BABYMETAL
  2. #sf_h6 質問  Java8のStream APIを使ったことがある人?(8割ぐらい)  仕事で使ってる人?(6割ぐらい)  Spring BootのRestTemplateを使ったことがある人?(7-8割ぐらい)

     仕事で使ってる人?(5割ぐらい)  Spring Web FluxとWebClientを使ったことがある人?(1割未満)  仕事で使ってる人?(ごく数名)
  3. #sf_h6 今日のお話  リアクティブプログラミングの基本的なパターンを把握する  Stream APIの stream / map

    / collect に相当するもの  ノンブロッキングなソースを「使う」ことを学ぶ  「作る側」の話はあまりしません  Stream APIのSpliteratorとか使って生成する人って多くないでしょ?  Spring WebFluxを題材にして実装を学ぶ  Project Reactorを使った実装  RxJavaの人は適宜読み替えて  Spring WebFluxやReactorを利用していない人が、利用の仕方や雰囲気を掴む、 というのがこのセッションの主題
  4. #sf_h6 リアクティブプログラミングとは?  リアクティブプログラミングとは  データの作成や変更の通知をきっかけにして処理をするようなプログラミング  Pub-subモデルのイベントハンドラ、というイメージに近い  例

    : 税額 = 商品金額 * 税率  商品金額が変われば、税額は変わる  × : 商品金額のイベントハンドラが、税額を更新する  〇 : 商品金額が変わった通知を受け、税額が自分を更新する  概念の話は苦手! 定義をよく知りたい人は、ググると出てくる資料を読んで!  このセッションで実装の話をたくさんするから、雰囲気をつかんで!
  5. #sf_h6 リアクティブプログラミングまでの背景  同期処理  皆さんがよく書く、DBに書き込んで、読み込んで、更新して、外部システムを呼び 出して、メールを送って・・・という処理を順番に行うもの  実は「順番」でなくとも良いことが多い 

    独立した2つのテーブルを更新するときに、どっちを先に更新しても良い  一部の処理が遅いと、他の処理が待たされる  全部ひとりで仕事をするワンオペのイメージ  あるいは、他の人に仕事を任せるものの、それが終わるまでずっと待っている  非効率このうえない
  6. #sf_h6 リアクティブプログラミングまでの背景  スレッドを利用した非同期処理  「DBに書き込む」「DBを更新する」「外部システムを呼び出す」「メールを送る」 などを1スレッド1処理に割り当てて全て同時に行ない、終わるまで待つ  一部の遅い処理が終わるまで待つ間に、他の処理が進む 

    複数のスレッドを利用してしまう  「Webアプリケーションではスレッドを起こすべきではない」というプラクティス に違反する  複数の部下に指示を出して結果を待つ、偉そうな上司のイメージ  部下が疲弊する
  7. #sf_h6 リアクティブプログラミングとは?  リアクティブプログラミングのキーポイント  時間の掛かる部分や得意でない処理を外部に委譲する  データストアへのアクセス  外部サービスへのアクセス

     大量の計算  待たない  外部に委譲した処理の結果を待たず、次に進められる処理を進める  外部から一部でも応答が返ってきたら、それを利用して次の処理を進める  処理が終わった所までをアウトプットする
  8. #sf_h6 リアクティブプログラミングとは?  余談「リアクティブシステム」と「リアクティブプログラミング」  リアクティブシステム  message-driven(キューなどを介してメッセージドリブンで処理する)  elsatic(負荷に応じてリソースを増減できる)

     resilient(一部がダウンしても継続する)  responsive(素早く反応する)  リアクティブプログラミングなど一切使わなくても作れる  リアクティブプログラミング  別にキューなどを使うためにあるわけではない  このセッションでは「非同期/ノンブロッキングな処理を簡単に書ける仕組み」と定義  性能やリソース消費を抑えるため
  9. #sf_h6 今日紹介するReactorとSpring WebFlux  Project Reactor  リアクティブプログラミングを行うためのライブラリ  Pivotal社が主に開発を主導

     Java 8対応(8以降が必須)  Reactive Streams(標準仕様)に準拠  Spring WebFlux  Spring MVC 5から導入された  ReactorをSpringのWeb(サーバ、クライアント)で使えるようにしたもの  マイクロサービス間を非同期/ノンブロッキングに通信しやすくなるもの
  10. #sf_h6 はじめてのReactor  所感  Java8のStream APIのようなAPIが凄い数あるイメージ  https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html 

    非同期で動作するため、同期処理脳だと「書いた通りに動かない」ってなる  基本の基本は「Mono」と「Flux」  Mono  単一の要素を扱う  Optionalの非同期版  Flux  複数の要素を使う  Streamの非同期版
  11. #sf_h6 はじめてのReactor Mono<String> mono = Mono.just("abc"); mono.subscribe(s -> System.out.println(s)); Flux<String>

    flux = Flux.just("a", "b", "c", "d", "e"); flux.subscribe(s -> System.out.println(s)); abc a b c d e
  12. #sf_h6 最初に覚えるメソッド  Mono /Fluxを作る  Flux.just  既に存在するデータからMono /

    Fluxを作る  Flux.interval  一定時間ごとにデータを作る(カウントアップする)  Mono / Fluxを使う  subscribe ( ≒ forEach )  MonoやFluxのデータを受け取るたびに処理をする  map ( ≒ map )  MonoやFluxのデータを変換する
  13. #sf_h6 一定時間おきにデータを作る Flux.interval(Duration.ofMillis(100)) .map(i -> i + " " +

    LocalDateTime.now()) .subscribe(System.out::println); なにも表示されずに終わる
  14. #sf_h6 最初の次に覚えるメソッド  Mono / Fluxを使う  take  指定した件数だけ受け取る

     doOnComplete  受け取り終わった際に処理を行う  Mono / Fluxと合わせて使う  CountDownLatchクラス  コンストラクタ  いくつカウントダウンすれば良いかを指定する  await  コンストラクタで指定した回数だけカウントダウンされるまで待つ
  15. #sf_h6 一定時間おきにデータを作る(正しく) var latch = new CountDownLatch(1); Flux.interval(Duration.ofMillis(100)) .map(i ->

    i + " " + LocalDateTime.now()) .take(10) .doOnComplete(latch::countDown) .subscribe(System.out::println); latch.await();
  16. #sf_h6 一定時間おきにデータを作る(正しく) var latch = new CountDownLatch(1); Flux.interval(Duration.ofMillis(100)) .map(i ->

    i + " " + LocalDateTime.now()) .take(10) .doOnComplete(latch::countDown) .subscribe(System.out::println); latch.await(); 0 2018-11-01T13:55:45.969860800 1 2018-11-01T13:55:46.051231 2 2018-11-01T13:55:46.150832200 3 2018-11-01T13:55:46.250867900 … 7 2018-11-01T13:55:46.660427100 8 2018-11-01T13:55:46.760725800 9 2018-11-01T13:55:46.860524100 10件だけ取得して completeする
  17. #sf_h6 ログで状況を見る var latch = new CountDownLatch(1); Flux.interval(Duration.ofMillis(100)) .map(i ->

    i + " " + LocalDateTime.now()) .take(10) .doOnComplete(latch::countDown) .log() .subscribe(System.out::println); latch.await();
  18. #sf_h6 ログで状況を見る var latch = new CountDownLatch(1); Flux.interval(Duration.ofMillis(100)) .map(i ->

    i + " " + LocalDateTime.now()) .take(10) .doOnComplete(latch::countDown) .log() .subscribe(System.out::println); latch.await(); 13:56:58.067 [main] INFO reactor.Flux.Peek.1 - onSubscribe(FluxPeek.PeekSubscriber) 13:56:58.067 [main] INFO reactor.Flux.Peek.1 - request(unbounded) 13:56:58.225 [parallel-1] INFO reactor.Flux.Peek.1 - onNext(0 2018-11-05T13:56:58.191329900) 0 2018-11-05T13:56:58.191329900 13:56:58.281 [parallel-1] INFO reactor.Flux.Peek.1 - onNext(1 2018-11-05T13:56:58.281027200) 1 2018-11-05T13:56:58.281027200 ログ出力する メソッド (副作用なし) スレッド名と 処理内容をログに出力
  19. #sf_h6 はじめてのSpring WebFlux  概要  Spring MVC(spring-boot-starter-web)で戻り値にMono / Fluxが使えるようになる

     WebClientという新しいクライアントが使えるようになる  TomcatではなくNettyで起動する  基本はSpring MVCと変わらない  @RestController  @GetMapping / @PostMapping
  20. #sf_h6 はじめてのSpring WebFlux @RestController public class ReactorDemo { @GetMapping public

    Flux<String> demo() { return Flux.interval(Duration.ofMillis(300)) .map(i -> i + " " + LocalDateTime.now()) .take(10); } }
  21. #sf_h6 はじめてのSpring WebFlux @RestController public class ReactorDemo { @GetMapping public

    Flux<String> demo() { return Flux.interval(Duration.ofMillis(300)) .map(i -> i + " " + LocalDateTime.now()) .take(10); } } 0 2018-11-01T14:09:52.0456159001 2018-11-01T14:09:52.3458706002 2018-11- 01T14:09:52.6454726003 2018-11-01T14:09:52.9452633004 2018-11-01T14:09:53.2467024005 2018-11-01T14:09:53.5471006006 2018-11-01T14:09:53.8456003007 2018-11- 01T14:09:54.1456879008 2018-11-01T14:09:54.4452391009 2018-11-01T14:09:54.747032300 戻り値にMonoや Fluxを利用可能 RestControllerなどは SpringMVCと共通 数秒待たされた後 一気に表示される
  22. #sf_h6 Fluxはevent-streamとして返す @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> demo() { return

    Flux.interval(Duration.ofMillis(300)) .map(i -> i + " " + LocalDateTime.now()) .take(10); }
  23. #sf_h6 Fluxはevent-streamとして返す @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> demo() { return

    Flux.interval(Duration.ofMillis(300)) .map(i -> i + " " + LocalDateTime.now()) .take(10); } data:0 2018-11-05T14:16:43.520237200 data:1 2018-11-05T14:16:43.809622100 data:2 2018-11-05T14:16:44.120498 … data:7 2018-11-05T14:16:45.620371900 data:8 2018-11-05T14:16:45.920298700 data:9 2018-11-05T14:16:46.220127400 300msごとに 1データずつ表示される アノテーションを 追加
  24. #sf_h6 大きく変わるのはクライアントサイド  RestTemplateより洗練されたWebClientが使える  RestTemplateのココがイケてない  Generics対応が不便  getForObjectやgetForEntityで型指定する際に、Genericsを利用できない

     ParameterizedTypeReferenceクラスとexchangeメソッドを使う必要がある  event-streamの扱いが面倒  getForObjectやexchangeメソッドを用いた型変換ができない
  25. #sf_h6 Genericsが弱いRestTemplate String url = "http://localhost:8081/students/list"; var restTemplate = new

    RestTemplate(); // getForObjectはGenerics指定できず List list = restTemplate.getForObject(url, List.class); list.forEach(e -> System.out.println(e.getClass() + " " + e)); List.classとして 取得するため Genericsが使えない
  26. #sf_h6 Genericsが面倒なRestTemplate String url = "http://localhost:8081/students/list"; var restTemplate = new

    RestTemplate(); // Genericsを使うならこのクラスを使う必要がある ParameterizedTypeReference<List<Student>> type = new ParameterizedTypeReference<>() {}; List<Student> students = restTemplate.exchange(url, HttpMethod.GET, null, type) .getBody(); students.forEach(s -> System.out.println("list: " + s)); これが必要 exchangeメソッドの 引数に渡す
  27. #sf_h6 event-streamを処理しづらい RestTemplate String url = "http://localhost:8081/students/flux"; var restTemplate =

    new RestTemplate(); // いずれの呼び方でもエラー restTemplate.getForObject(url, List.class); ParameterizedTypeReference<List<Student>> type = new ParameterizedTypeReference<>() {}; restTemplate.exchange(url, HttpMethod.GET, null, type).getBody(); 呼び出し先が event-streamの 場合
  28. #sf_h6 event-streamを処理しづらい RestTemplate String url = "http://localhost:8081/students/flux"; var restTemplate =

    new RestTemplate(); // BufferedReader経由で読み込みならできる restTemplate.execute(url, HttpMethod.GET, request -> { }, response -> { try (var reader = new BufferedReader(new InputStreamReader(response.getBody()))) { reader.lines().forEach(s -> System.out.println("flux: " + s)); } return response; }); 取得した文字列を 自分でパースする 必要がある
  29. #sf_h6 WebClientの基本的な使い方 var webClient = WebClient.builder().build(); // WebClientのインスタンスを作成 webClient.get() //

    最初にHTTPメソッドを指定 .uri("localhost:8081/list") // 次にURLを指定 .retrieve() // HTTPアクセスをして結果を取り出す .bodyToFlux(Employee.class); // レスポンスボディをFluxにマッピング
  30. #sf_h6 WebClientならGenericsも問題なし var webClient = WebClient.builder().build(); var latch = new

    CountDownLatch(1); Flux<Student> students = webClient.get() .uri("localhost:8081/students/list") .retrieve() .bodyToFlux(Student.class) .doOnComplete(latch::countDown); students.subscribe(s -> System.out.println(LocalDateTime.now() + " list: " + s)); latch.await();
  31. #sf_h6 WebClientならevent-streamも問題なし var webClient = WebClient.builder().build(); var latch = new

    CountDownLatch(1); Flux<Student> students = webClient.get() .uri("localhost:8081/students/flux") .retrieve() .bodyToFlux(Student.class) .doOnComplete(latch::countDown); students.subscribe(s -> System.out.println(LocalDateTime.now() + " list: " + s)); latch.await(); 呼び出し先が event-streamになっても 処理内容が変わらない
  32. #sf_h6 WebClientの落とし穴  FluxやMonoのblockメソッドを用いて、ListやObjectを取り出すことができる  しかしNetty上でblockメソッドを呼び出すとエラーとなる  block()/blockFirst()/blockLast() are blocking,

    which is not supported  mapメソッドを用いるなど、blockメソッドに頼らない実装が必要となる  Optional.get() を使わないのと同じ  Streamでcollectせず、Streamのまま処理を続けるのと同じ
  33. #sf_h6 WebClientの落とし穴  FluxやMonoのblockメソッドを用いて、ListやObjectを取り出すことができる  しかしNetty上でblockメソッドを呼び出すとエラーとなる  block()/blockFirst()/blockLast() are blocking,

    which is not supported  mapメソッドを用いるなど、blockメソッドに頼らない実装が必要となる  Optional.get() を使わないのと同じ  Streamでcollectせず、Streamのまま処理を続けるのと同じ  でもそれって、簡単にできることなんでしたっけ?
  34. #sf_h6 課題設定  課題の概要  1回のマイクロサービスアクセスで、N件のデータを取る  N件のデータに対して、1件ずつマイクロサービスにアクセスし、詳細データを取る  RestTemplateで処理した場合と、WebClientで処理した場合の違いを見る

     具体的な課題  /students にアクセスして生徒の一覧を取得する  /scores/{id} にアクセスしてそれぞれの生徒の成績一覧を取得する  その結果をマージして返す
  35. #sf_h6 課題設定  特別な課題設定  /students にアクセスして生徒の一覧を取得する  この処理がとても重いものと想定する 

    生徒の人数あたり100msの取得時間が掛かる  Fluxの場合は100msごとに生徒を1人ずつ返す  Listの場合は生徒の人数 * 100ms後に全員分を返す  /scores/{id} にアクセスして生徒の成績一覧を取得する  この処理は軽いと想定する  何件取得しても100msのオーバーヘッド時間のみで取得できる  (性能はだいぶ恣意的な例なので、参考程度に)
  36. #sf_h6 生徒の情報を提供するサービス (1/2) Student[] students = { new Student(1, "武藤"),

    new Student(2, "三吉"), } @GetMapping(value = "/list") public List<Student> getAsList() throws InterruptedException { Thread.sleep(students.length * 100L); return Arrays.asList(students); }
  37. #sf_h6 生徒の情報を提供するサービス (2/2) @GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public

    Flux<Student> getAsFlux() { return Flux.interval(Duration.ofMillis(100)) .map(i -> students[i.intValue()]) .take(students.length); }
  38. #sf_h6 成績の情報を提供するサービス Map<Integer, List<Score>> scoreStore = new HashMap<>(); // 初期化処理は割愛

    public List<Score> getAsList(@PathVariable List<Integer> ids) throws InterruptedException { Thread.sleep(100); return ids.stream() .flatMap(id -> scoreStore.get(id).stream()) .collect(Collectors.toList()); }
  39. #sf_h6 RestTemplateを用いた場合 (1/2) ParameterizedTypeReference<List<Student>> studentType = new ParameterizedTypeReference<>() { };

    ParameterizedTypeReference<List<Score>> scoreType = new ParameterizedTypeReference<>() { }; List<Student> students = restTemplate .exchange("http://localhost:8081/students/list", HttpMethod.GET, null, studentType) .getBody();
  40. #sf_h6 RestTemplateを用いた場合 (1/2) ParameterizedTypeReference<List<Student>> studentType = new ParameterizedTypeReference<>() { };

    ParameterizedTypeReference<List<Score>> scoreType = new ParameterizedTypeReference<>() { }; List<Student> students = restTemplate .exchange("http://localhost:8081/students/list", HttpMethod.GET, null, studentType) .getBody(); 生徒一覧の取得
  41. #sf_h6 RestTemplateを用いた場合 (2/2) List<StudentScore> studentScores = students.stream() .map(student -> {

    String url = "http://localhost:8081/scores/" + student.id; List<Score> scores = restTemplate .exchange(url, HttpMethod.GET, null, scoreType) .getBody(); return new StudentScore(student, scores); }) .collect(Collectors.toList());
  42. #sf_h6 RestTemplateを用いた場合 (2/2) List<StudentScore> studentScores = students.stream() .map(student -> {

    String url = "http://localhost:8081/scores/" + student.id; List<Score> scores = restTemplate .exchange(url, HttpMethod.GET, null, scoreType) .getBody(); return new StudentScore(student, scores); }) .collect(Collectors.toList()); 生徒を1件ずつ 処理して 生徒1件に対する 成績を取得して 生徒と成績を ペアにする
  43. #sf_h6 WebClientを用いた場合 Flux<Student> students = webClient.get() .uri("localhost:8081/students/flux") .retrieve() .bodyToFlux(Student.class); Flux<StudentScore>

    studentScore = students.flatMap(student -> webClient.get() .uri("localhost:8081/scores/" + student.id) .retrieve() .bodyToFlux(Score.class) .collectList() .map(scores -> new StudentScore(student, scores)));
  44. #sf_h6 WebClientを用いた場合 Flux<Student> students = webClient.get() .uri("localhost:8081/students/flux") .retrieve() .bodyToFlux(Student.class); Flux<StudentScore>

    studentScore = students.flatMap(student -> webClient.get() .uri("localhost:8081/scores/" + student.id) .retrieve() .bodyToFlux(Score.class) .collectList() .map(scores -> new StudentScore(student, scores))); 生徒の一覧を Fluxとして取得 生徒を1件ずつ 処理して 生徒1件に対する 成績を取得して 生徒と成績を ペアにする
  45. #sf_h6 WebClientを用いた場合 Flux<Student> students = webClient.get() .uri("localhost:8081/students/flux") .retrieve() .bodyToFlux(Student.class); Flux<StudentScore>

    studentScore = students.flatMap(student -> webClient.get() .uri("localhost:8081/scores/" + student.id) .retrieve() .bodyToFlux(Score.class) .collectList() .map(scores -> new StudentScore(student, scores))); マッパー処理の戻り値が MonoかFluxの場合にはflatMapを用いる。 ここでmapメソッドにすると 戻り値が Flux<Mono<StudentScore>> になる マッパー処理の戻り値が オブジェクトの場合には mapを用いる
  46. #sf_h6 WebClientを用いた場合 Flux<Student> students = webClient.get() .uri("localhost:8081/students/flux") .retrieve() .bodyToFlux(Student.class); Flux<StudentScore>

    studentScore = students.flatMap(student -> webClient.get() .uri("localhost:8081/scores/" + student.id) .retrieve() .bodyToFlux(Score.class) .collectList() .map(scores -> new StudentScore(student, scores))); StudentScoreには FluxではなくListを渡したい しかしblockはできないので collectList()を用いてMono<List>にする Mono<List>はmap/flatMapの マッパー処理内でListとして扱える
  47. #sf_h6 利用機会が多いflatMap  ReactorのmapとflatMap  マッパー処理の戻り値の違い  Mapメソッドのマッパー処理はオブジェクトを返す  flatMapメソッドのマッパー処理はMono

    / Fluxを返す  Stream APIではmapをよく利用するが、FluxではflatMapを利用することが多い  Reactorでは特にマッパー処理の中でMonoを返すことが多いため
  48. #sf_h6 課題設定 その2  課題の概要  1回のマイクロサービスアクセスで、N件のデータを取る  N件のデータに対して、1回のマイクロサービスアクセスで、詳細データを取る 

    RestTemplateで処理した場合と、WebClientで処理した場合の違いを見る  具体的な課題  /students にアクセスして生徒の一覧を取得する  /scores/{ids} にアクセスして生徒の成績一覧を取得する  その結果をマージして返す
  49. #sf_h6 RestTemplateを用いた場合 (1/2) ParameterizedTypeReference<List<Student>> studentType = new ParameterizedTypeReference<>() { };

    ParameterizedTypeReference<List<Score>> scoreType = new ParameterizedTypeReference<>() { }; List<Student> students = restTemplate .exchange("http://localhost:8081/students/list", HttpMethod.GET, null, studentType) .getBody();
  50. #sf_h6 RestTemplateを用いた場合 (2/2) String url = "http://localhost:8081/scores/" + ids(students); List<Score>

    scores = restTemplate .exchange(url, HttpMethod.GET, null, scoreType) .getBody(); Map<Integer, List<Score>> scoreMap = scores.stream() .collect(Collectors.groupingBy(s -> s.id)); List<StudentScore> studentScores = students.stream() .map(student -> new StudentScore(student, scoreMap.get(student.id))) .collect(Collectors.toList());
  51. #sf_h6 RestTemplateを用いた場合 (2/2) String url = "http://localhost:8081/scores/" + ids(students); List<Score>

    scores = restTemplate .exchange(url, HttpMethod.GET, null, scoreType) .getBody(); Map<Integer, List<Score>> scoreMap = scores.stream() .collect(Collectors.groupingBy(s -> s.id)); List<StudentScore> studentScores = students.stream() .map(student -> new StudentScore(student, scoreMap.get(student.id))) .collect(Collectors.toList()); 複数の生徒の成績を まとめて取得 利用しやすいよう いったんMapに変換 生徒と成績を ペアにする 生徒一覧を生徒IDの カンマ区切りに変換
  52. #sf_h6 WebClientを用いた場合 (1/2) Mono<List<Student>> students = webClient.get() .uri("localhost:8081/students/flux") .retrieve() .bodyToFlux(Student.class)

    .collectList(); 成績を「まとめて検索」するために すべての生徒の情報を取得しきってから 処理をしたい やはりblockを使うわけにはいかないため collectListを利用する
  53. #sf_h6 WebClientを用いた場合 (2/2) Mono<List<StudentScore>> studentScore = students.flatMap(studentList -> webClient.get() .uri("localhost:8081/scores/"

    + ids(studentList)) .retrieve().bodyToFlux(Score.class).collectList() .map(scores -> { Map<Integer, List<Score>> scoreMap = scores.stream() .collect(Collectors.groupingBy(s -> s.id)); return studentList.stream() .map(s -> new StudentScore(s, scoreMap.get(s.id))) .collect(Collectors.toList()); }));
  54. #sf_h6 WebClientを用いた場合 (2/2) Mono<List<StudentScore>> studentScore = students.flatMap(studentList -> webClient.get() .uri("localhost:8081/scores/"

    + ids(studentList)) .retrieve().bodyToFlux(Score.class).collectList() .map(scores -> { Map<Integer, List<Score>> scoreMap = scores.stream() .collect(Collectors.groupingBy(s -> s.id)); return studentList.stream() .map(s -> new StudentScore(s, scoreMap.get(s.id))) .collect(Collectors.toList()); })); 利用しやすいよう いったんMapに変換 生徒と成績をペアにする 流れがRestTemplateの時と 同じ Studentではなく List<Student>として 処理できる 生徒一覧を生徒IDの カンマ区切りに変換
  55. #sf_h6 BodyとしてFluxを受け取る(サーバ側) @PostMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Score>

    getAsPost(@RequestBody Flux<Integer> ids) { return ids.flatMapIterable(id -> scoreStore.get(id)); } POSTのbodyとして Fluxを受け取るよう指定
  56. #sf_h6 BodyにFluxを渡す(クライアント側 1/2) Flux<Student> students = webClient.get() .uri("localhost:8081/students/flux") .retrieve() .bodyToFlux(Student.class);

    .cache(); あとで2回、このFluxを利用するため 取得したStudentをcacheしておく。 cacheしなければ この Flux<Student> を利用するたびに /students/flux へのアクセスが発生する。
  57. #sf_h6 Flux<StudentScore> studentScore = webClient.post() .uri("localhost:8081/scores/flux") .contentType(MediaType.APPLICATION_STREAM_JSON) .body(students.map(s -> s.id),

    Integer.class) .retrieve() .bodyToFlux(Score.class) .collectList() .map(scores -> scores.stream().collect(Collectors.groupingBy(s -> s.id))) .flatMapMany(scoreMap -> students.map(student -> new StudentScore(student, scoreMap.get(student.id))) ); BodyにFluxを渡す(クライアント側 2/2)
  58. #sf_h6 Flux<StudentScore> studentScore = webClient.post() .uri("localhost:8081/scores/flux") .contentType(MediaType.APPLICATION_STREAM_JSON) .body(students.map(s -> s.id),

    Integer.class) .retrieve() .bodyToFlux(Score.class) .collectList() .map(scores -> scores.stream().collect(Collectors.groupingBy(s -> s.id))) .flatMapMany(scoreMap -> students.map(student -> new StudentScore(student, scoreMap.get(student.id))) ); postを指定 Content-typeに application/stream+jsonを指定 BodyにFlux<Integer>を渡す BodyにFluxを渡す(クライアント側 2/2)
  59. #sf_h6 振り返り  Reactorはノンブロッキングプログラミングを行うためのライブラリ  APIが膨大で、考え方も変える必要があるため、最初の敷居は少し高い  Mono / Fluxを基本として、map

    / flatMap / subscribe / collectList / doOnComplete + CountDownLatch などが最初に覚えるべきメソッドたち  WebFluxで利用できるようになったWebClientは、RestTemplateに比べて洗練 されたAPIになったが、Mono / Fluxを中心としたリアクティブプログラミング が必須になることは要注意
  60. #sf_h6 参考  このスライドで紹介したWebFluxアプリケーションのソースコード  https://github.com/cero-t/springfest2018  Project Reactor -

    Learn  https://projectreactor.io/learn  https://github.com/reactor/lite-rx-api-hands-on/  ReactorでN+1問題な処理を実装してみた話 – せろ部屋  http://d.hatena.ne.jp/cero-t/20171215/1513290305  ECサイトのサンプルをWebFluxなどで実装したもの  https://github.com/cero-t/spring-cloud-kinoko-2017