Reactive Programming code with Reactor and WebFlux ➤ Operations like Stream API’s stream(), map(), collect() methods ➤ This talk does NOT contain ➤ Reactive Systems ➤ R2DBC, RSocket, Reactive Streams
Mono.just("Hello, world!"); mono.subscribe(s -> System.out.println(s)); Flux<String> flux = Flux.just("H", "e", "l", "l", "o"); flux.subscribe(s -> System.out.println(s)); Hello, world! H e l l o
Create Mono, Flux ➤ Mono.just(T), Flux.just(T...) ➤ Creates a Mono or Flux from concrete instance(s) ➤ Flux.interval(Duration) ➤ Create a Flux that emits numbers starting with 0 ➤ Use Mono, Flux ➤ subscribe(Consumer<T>) ➤ Subscribes the Mono or Flux and executes the Consumer method ➤ map(Function<T, V>) ➤ Transforms the item by applying the function to each item
Use Flux ➤ Flux#take(long) ➤ takes only the first given N value from Flux, and then Flux completes ➤ Flux#doOnComplete(Runnable) ➤ executes the given function when Flux completes
java.util.concurrent.CountDownLatch ➤ new CountDownLatch(int) - sets the count to given number ➤ countDown() - decrements the count ➤ await() - waits until the latch has counted down to zero
From Optional to the value ➤ Optional#get() ➤ From Stream to Collection such as List, Set, Map or other values ➤ Stream#collect(Collector<T, A, R>) ➤ From Mono to the value or Optional of the value ➤ Mono#block(), blockOptional() ➤ From Flux to List ➤ Flux#collectList().block() ➤ Flux#collectList() converts Flux<T> to Mono<List<T>> Not recommended to use
breaks non-blocking! ➤ Mono#block(), blockOptional() ➤ Flux#blockFirst(), blockLast() ➤ blocks until the next/first/last signal using the thread (blocking operation) ➤ blocks breaks non-bloking!
➤ Non-blocking web framework ➤ Spring MVC + Reactor ➤ Runs on Netty, Undertow, Servlet Container like Tomcat ➤ Reactive WebClient is available instead of RestTemplate
WebFluxDemoController { @GetMapping public Flux<String> demo() { return Flux.interval(Duration.ofMillis(300)) .map(i -> i + " " + LocalDateTime.now() + "\n") .take(10); } } Annotations are same as Spring MVC Annotations are same as Spring MVC Arguments and return value can be a Mono and Flux
like … RestTemplate restTemplate = new RestTemplate(); ParameterizedTypeReference<List<Student>> type = new ParameterizedTypeReference<>() {}; List<Student> list = restTemplate .exchange("http://localhost:8081/list", HttpMethod.GET, null, type) .getBody();
WebClient.builder().build(); Flux<Student> flux = webClient.get() // HTTP GET method .uri("localhost:8081/students/flux") // to this URL .retrieve() // accesses to the server and retrieve .bodyToFlux(Student.class); // get response body as Flux
get N data from one microservice (1 access) ➤ Second, get detail data from another microservice for each N data (N access) ➤ Finally, merge them ➤ More concrete ➤ First, get a list of 33 students (List<Student>) ➤ Second, get scores for each student (List<Score>) ➤ Finally merge them (List<StudentScore>)
➤ Student Resource Service ➤ 33 students ➤ 100ms for each student ➤ Flux - returns one by one every 100ms (total duration: 33 * 100ms = 3.3 secs) ➤ List - returns all after 3.3 secs ➤ Score Resource Service ➤ Only 100ms overhead regardless of the number of data
webClient.get() .uri("localhost:8081/students/flux") .retrieve() .bodyToFlux(Student.class); Flux<StudentScore> studentScore = students.flatMap(student -> webClient.get() .uri("localhost:8081/scores/" + student.id) .retrieve() .bodyToFlux(Score.class) .collectList() .map(scores -> new StudentScore(student, scores))); Get score list from Score Resource Service for each student Merge student and scores into StudentScore.
students.flatMap(student -> { Flux<Score> flux = webClient.get() .uri("localhost:8081/scores/" + student.id) .retrieve() .bodyToFlux(Score.class); Mono<List<Score>> listMono = flux.collectList(); Mono<StudentScore> mono = listMono.map(scores -> new StudentScore(student, scores)); return mono; }); Get scores and keep them as Flux<Score>. But I want List<Score> for StudentScore
Get all students: 3.3 secs (100ms * 33 data) ➤ Get scores: 3.3 secs (another 100ms * 33 accesses) ➤ More than 6.6 sec ➤ WebClient version ➤ Get each student: 3.3 secs (100ms * 33 data) ➤ Get scores while getting student : 3.3 secs (another 100ms * 33 accesses) ➤ More than 3.3 sec
➤ When getting score, arguments should be Flux (by HTTP request body), ➤ Not String (by HTTP query string) ➤ webClient.post() .uri("localhost:8081/scores/flux") .body(fluxStudents) ➤ Only access once, not 33 times.