Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Presentation: Reactive: Do. Or do not. There is...

Sergei Egorov
February 04, 2020

Presentation: Reactive: Do. Or do not. There is no try.

Sergei Egorov

February 04, 2020
Tweet

More Decks by Sergei Egorov

Other Decks in Programming

Transcript

  1. • Staff Engineer at Pivotal, Project Reactor team • Oracle

    Groundbreakers Ambassador • Berlin Spring User Group co-organizer • Testcontainers co-maintainer About me @bsideup
  2. I agree with Heinrich Apfelmus that the essence of functional

    reactive programming is to specify the dynamic behavior of a value completely at the time of declaration. “Reactive Sir” @bsideup
  3. Quiz static <T> void addLogging(Flux<T> flux) { flux.doOnNext(it -> println("Received

    " + it)) .doOnError(e -> e.printStackTrace()); } // ... Flux<Item> items = getFlux(); addLogging(items); items.subscribe(); @bsideup
  4. Quiz static <T> void addLogging(Flux<T> flux) { flux.doOnNext(it -> println("Received

    " + it)) .doOnError(e -> e.printStackTrace()); } // ... Flux<Item> items = getFlux(); addLogging(items); items.subscribe(); 1. Will print both items and errors @bsideup
  5. Quiz static <T> void addLogging(Flux<T> flux) { flux.doOnNext(it -> println("Received

    " + it)) .doOnError(e -> e.printStackTrace()); } // ... Flux<Item> items = getFlux(); addLogging(items); items.subscribe(); 1. Will print both items and errors 2. Will only print items, not errors @bsideup
  6. Quiz static <T> void addLogging(Flux<T> flux) { flux.doOnNext(it -> println("Received

    " + it)) .doOnError(e -> e.printStackTrace()); } // ... Flux<Item> items = getFlux(); addLogging(items); items.subscribe(); 1. Will print both items and errors 2. Will only print items, not errors 3. Will not print anything @bsideup
  7. Quiz static <T> void addLogging(Flux<T> flux) { flux.doOnNext(it -> println("Received

    " + it)) .doOnError(e -> e.printStackTrace()); } // ... Flux<Item> items = getFlux(); addLogging(items); items.subscribe(); 1. Will print both items and errors 2. Will only print items, not errors 3. Will not print anything @bsideup
  8. public <R> Mono<R> map(Function<? super T, ? extends R> mapper)

    { return new MonoMap<>(this, mapper); } Immutability of reactive operators @bsideup Mono.just("Hello") .map(it -> it + " World!") .map(String::length)
  9. public <R> Mono<R> map(Function<? super T, ? extends R> mapper)

    { return new MonoMap<>(this, mapper); } Immutability of reactive operators @bsideup Mono.just("Hello") .map(it -> it + " World!") .map(String::length)
  10. static <T> Flux<T> addLogging(Flux<T> flux) { return flux .doOnNext(it ->

    println("Received " + it)) .doOnError(e -> e.printStackTrace()); } // ... getFlux() .transform(flux -> addLogging(flux)) .subscribe(); @bsideup Always return…
  11. static <T> Flux<T> addLogging(Flux<T> flux) { return flux .doOnNext(it ->

    println("Received " + it)) .doOnError(e -> e.printStackTrace()); } // ... getFlux() .transform(flux -> addLogging(flux)) .subscribe(); … and use the returned value! @bsideup
  12. static <T> Flux<T> addLogging(Flux<T> flux) { return flux .doOnNext(it ->

    println("Received " + it)) .doOnError(e -> e.printStackTrace()); } // ... getFlux() .transform(flux -> addLogging(flux)) .subscribe(); … and use the returned value! @bsideup
  13. Mono<String> userIdMono = getUserId(); userIdMono.subscribe(userId -> { Mono<User> userMono =

    getUser(userId); userMono.subscribe(user -> { if (isUserValid(user)) { println("User: " + user); } }); }); // vs getUserId() .flatMap(userId -> getUser(userId)) .filter(user -> isUserValid(user)) .subscribe(user -> System.out.println("User: " + user)); Higher-order functions @bsideup
  14. Mono<String> userIdMono = getUserId(); userIdMono.subscribe(userId -> { Mono<User> userMono =

    getUser(userId); userMono.subscribe(user -> { if (isUserValid(user)) { println("User: " + user); } }); }); // vs getUserId() .flatMap(userId -> getUser(userId)) .filter(user -> isUserValid(user)) .subscribe(user -> System.out.println("User: " + user)); Higher-order functions @bsideup
  15. Mono<String> userIdMono = getUserId(); userIdMono.subscribe(userId -> { Mono<User> userMono =

    getUser(userId); userMono.subscribe(user -> { if (isUserValid(user)) { println("User: " + user); } }); }); // vs getUserId() .flatMap(userId -> getUser(userId)) .filter(user -> isUserValid(user)) .subscribe(user -> System.out.println("User: " + user)); Higher-order functions @bsideup
  16. Mono<String> userIdMono = getUserId(); userIdMono.subscribe(userId -> { Mono<User> userMono =

    getUser(userId); userMono.subscribe(user -> { if (isUserValid(user)) { println("User: " + user); } }); }); // vs getUserId() .flatMap(userId -> getUser(userId)) .filter(user -> isUserValid(user)) .subscribe(user -> System.out.println("User: " + user)); Higher-order functions @bsideup
  17. Mono<String> userIdMono = getUserId(); userIdMono.subscribe(userId -> { Mono<User> userMono =

    getUser(userId); userMono.subscribe(user -> { if (isUserValid(user)) { println("User: " + user); } }); }); // vs getUserId() .flatMap(userId -> getUser(userId)) .filter(user -> isUserValid(user)) .subscribe(user -> println("User: " + user)); Higher-order functions @bsideup
  18. Mono<String> userIdMono = getUserId(); userIdMono.subscribe(userId -> { Mono<User> userMono =

    getUser(userId); userMono.subscribe(user -> { if (isUserValid(user)) { println("User: " + user); } }); }); // vs getUserId() .flatMap(userId -> getUser(userId)) .filter(user -> isUserValid(user)) .subscribe(user -> println("User: " + user)); Higher-order functions Less scopes @bsideup
  19. Mono<String> userIdMono = getUserId(); userIdMono.subscribe(userId -> { Mono<User> userMono =

    getUser(userId); userMono.subscribe(user -> { if (isUserValid(user)) { println("User: " + user); } }); }); // vs getUserId() .flatMap(userId -> getUser(userId)) .filter(user -> isUserValid(user)) .subscribe(user -> println("User: " + user)); Higher-order functions Composition @bsideup
  20. Avoid side effects Flux<User> users = getUsers(); return users.doOnNext(user ->

    { storeUser(user); }); Side effect! void storeUser(User user) { // } @bsideup
  21. Avoid side effects Flux<User> users = getUsers(); return users.concatMap(user ->

    storeUser(user)); Mono<Void> storeUser(User user) { // } @bsideup
  22. “things you do in your (async) callbacks should never take

    (significant) time” Stephane Maldini, 2019 @bsideup
  23. Errors are side effects too return fetchUsers().map(json -> { try

    { return decodeJSON(json); } catch (IOException e) { throw new RuntimeException(e); } }); @bsideup
  24. Errors are side effects too return fetchUsers().map(json -> { try

    { return decodeJSON(json); } catch (IOException e) { throw new RuntimeException(e); } }); @bsideup
  25. Errors are side effects too return fetchUsers().map(json -> { try

    { return decodeJSON(json); } catch (IOException e) { throw new RuntimeException(e); } }); return fetchUsers().handle((json, sink) -> { try { sink.next(decodeJSON(json)); } catch (IOException e) { sink.error(e); } }); @bsideup
  26. Errors are side effects too return fetchUsers().map(json -> { try

    { return decodeJSON(json); } catch (IOException e) { throw new RuntimeException(e); } }); return fetchUsers().handle((json, sink) -> { try { sink.next(decodeJSON(json)); } catch (IOException e) { sink.error(e); } }); @bsideup
  27. Errors are side effects too return fetchUsers().map(json -> { try

    { return decodeJSON(json); } catch (IOException e) { throw new RuntimeException(e); } }); return fetchUsers().handle((json, sink) -> { try { sink.next(decodeJSON(json)); } catch (IOException e) { sink.error(e); } }); No need to wrap with RuntimeException @bsideup
  28. “flatMaps” • flatMap - transform every item concurrently into a

    sub-stream, and join the current and the sub-stream. @bsideup
  29. “flatMaps” • flatMap - transform every item concurrently into a

    sub-stream, and join the current and the sub-stream. • concatMap - same as flatMap, but one-by-one @bsideup
  30. “flatMaps” • flatMap - transform every item concurrently into a

    sub-stream, and join the current and the sub-stream. • concatMap - same as flatMap, but one-by-one • switchMap - same as concatMap, but will cancel the previous sub- stream when a new item arrives @bsideup
  31. “flatMaps” • flatMap - transform every item concurrently into a

    sub-stream, and join the current and the sub-stream. • concatMap - same as flatMap, but one-by-one • switchMap - same as concatMap, but will cancel the previous sub- stream when a new item arrives • flatMapSequential - same as flatMap, but preserves the order of sub- stream items according to the original stream’s order @bsideup
  32. Default thread pools Schedulers.parallel() - N threads, where N matches

    the CPUs count. Schedulers.single() - 1 thread handling all submitted tasks Schedulers.boundedElastic() - dynamic, thread caching capped pool @bsideup
  33. Demo outcomes • Blocking calls are bad, m’kay? • They

    may sneak into your production system • Use https://github.com/reactor/BlockHound to detect them • Supports multiple frameworks (Reactor, RxJava, etc) • … and maybe even Kotlin: 
 https://github.com/Kotlin/kotlinx.coroutines/issues/1031 • Use a dedicated pool for the necessary blocking calls, or schedule them on the Schedulers.boundedElastic() built-in pool if they happen rarely @bsideup
  34. KafkaProducer#send(ProducerRecord,Callback) “Asynchronously send a record to a topic and invoke

    the provided callback when the send has been acknowledged.” - Javadoc @bsideup
  35. KafkaProducer#send(ProducerRecord,Callback) “Asynchronously send a record to a topic and invoke

    the provided callback when the send has been acknowledged.” - Javadoc java.lang.Error: Blocking call! java.lang.Object#wait at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154) at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254) at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43) at java.lang.Object.wait(Object.java) at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:181) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) @bsideup
  36. KafkaProducer#send(ProducerRecord,Callback) “Asynchronously send a record to a topic and invoke

    the provided callback when the send has been acknowledged.” - Javadoc java.lang.Error: Blocking call! java.lang.Object#wait at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154) at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254) at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43) at java.lang.Object.wait(Object.java) at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:181) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) https://issues.apache.org/jira/browse/KAFKA-3539 @bsideup
  37. long remainingWaitMs = maxWaitMs; long elapsed; // Issue metadata requests

    until we have metadata for the topic or maxWaitTimeMs is exceeded. // In case we already have cached metadata for the topic, but the requested partition is greater // than expected, issue an update request only once. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. do { log.trace("Requesting metadata update for topic {}.", topic); metadata.add(topic); int version = metadata.requestUpdate(); sender.wakeup(); try { metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (cluster.unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); Default is “60 seconds” @bsideup
  38. class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() { return

    usersService.getUsers().stream() .map(UserDTO::new) .collect(Collectors.toList()); } } class UsersService { UsersRepository usersRepository; List<User> getUsers() { return usersRepository.findAll(); } } interface UsersRepository { List<User> findAll(); } @bsideup
  39. class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() { return

    usersService.getUsers().stream() .map(UserDTO::new) .collect(Collectors.toList()); } } class UsersService { UsersRepository usersRepository; List<User> getUsers() { return usersRepository.findAll(); } } interface UsersRepository { List<User> findAll(); } @bsideup
  40. class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() { return

    usersService.getUsers().stream() .map(UserDTO::new) .collect(Collectors.toList()); } } class UsersService { UsersRepository usersRepository; List<User> getUsers() { return usersRepository.findAll(); } } interface UsersRepository { Flux<User> findAll(); } @bsideup
  41. class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() { return

    usersService.getUsers().stream() .map(UserDTO::new) .collect(Collectors.toList()); } } class UsersService { UsersRepository usersRepository; List<User> getUsers() { return usersRepository.findAll(); } } interface UsersRepository { Flux<User> findAll(); } @bsideup
  42. class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() { return

    usersService.getUsers().stream() .map(UserDTO::new) .collect(Collectors.toList()); } } class UsersService { UsersRepository usersRepository; List<User> getUsers() { return usersRepository.findAll() .collectList() .block(); } } interface UsersRepository { Flux<User> findAll(); } @bsideup
  43. class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() { return

    usersService.getUsers().stream() .map(UserDTO::new) .collect(Collectors.toList()); } } class UsersService { UsersRepository usersRepository; List<User> getUsers() { return usersRepository.findAll() .collectList() .timeout(Mono.delay(ofSeconds(10))) .retry(5) .block(); } } interface UsersRepository { Flux<User> findAll(); } @bsideup
  44. class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() { return

    usersService.getUsers().stream() .map(UserDTO::new) .collect(Collectors.toList()); } } class UsersService { UsersRepository usersRepository; Flux<User> getUsers() { return usersRepository.findAll() .timeout(Mono.delay(ofSeconds(10))) .retry(5); } } interface UsersRepository { Flux<User> findAll(); } @bsideup
  45. class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() { return

    usersService.getUsers().stream() .map(UserDTO::new) .collect(Collectors.toList()); } } class UsersService { UsersRepository usersRepository; Flux<User> getUsers() { return usersRepository.findAll() .timeout(Mono.delay(ofSeconds(10))) .retry(5); } } interface UsersRepository { Flux<User> findAll(); } @bsideup
  46. class UsersController { UsersService usersService; @GetMapping("/users") Flux<UserDTO> getUsers() { return

    usersService.getUsers() .map(UserDTO::new); } } class UsersService { UsersRepository usersRepository; Flux<User> getUsers() { return usersRepository.findAll() .timeout(Mono.delay(ofSeconds(10))) .retry(5); } } interface UsersRepository { Flux<User> findAll(); } @bsideup
  47. Before: class UsersController { UsersService usersService; @GetMapping("/users") List<UserDTO> getUsers() {

    return usersService.getUsers() .stream() .map(UserDTO::new) .collect(toList()); } } class UsersService { UsersRepository usersRepository; List<User> getUsers() { return usersRepository.findAll(); } } interface UsersRepository { List<User> findAll(); } class UsersController { UsersService usersService; @GetMapping("/users") Flux<UserDTO> getUsers() { return usersService.getUsers() .map(UserDTO::new); } } class UsersService { UsersRepository usersRepository; Flux<User> getUsers() { return usersRepository.findAll() .timeout(Mono.delay(ofSeconds(10))) .retry(5); } } interface UsersRepository { Flux<User> findAll(); } After:
  48. Synchronous programming • 1 request == 1 thread • Everything

    is blocking, the execution continues on the same thread • ThreadLocals work just fine @bsideup
  49. Asynchronous programming • 1 request will most probably be handled

    by multiple threads • Everything is non-blocking, an accepted request may be returned to the client from another thread • ThreadLocals must be propagated from one thread to another @bsideup
  50. Mono .deferWithContext(ctx -> { return this.getUser(ctx.get("userId")); }) // Later in

    the framework (e.g. Spring Security) .subscriberContext(Context.of("userId", "bsideup")) .subscribe(); @bsideup
  51. Mono .deferWithContext(ctx -> { return this.getUser(ctx.get("userId")); }) // Later in

    the framework (e.g. Spring Security) .subscriberContext(Context.of("userId", "bsideup")) .subscribe(); @bsideup
  52. Mono .deferWithContext(ctx -> { return this.getUser(ctx.get("userId")); }) // Later in

    the framework (e.g. Spring Security) .subscriberContext(Context.of("userId", "bsideup")) .subscribe(); @bsideup
  53. Schedulers.onScheduleHook(fn) @bsideup Schedulers.onScheduleHook("myHook", runnable -> { println("Before every scheduled runnable");

    return () -> { println("Before execution"); runnable.run(); println("After execution"); }; });
  54. Schedulers.onScheduleHook(fn) @bsideup Schedulers.onScheduleHook("mdc", runnable -> { String userId = MDC.get("userId");

    return () -> { MDC.put("userId", userId); try { runnable.run(); } finally { MDC.remove("userId"); } }; });
  55. Resiliency with Reactor • .timeout(Duration) - cancel the subscription and

    fail if no items emitted • .retry()/retryWithBackoff() - retry the subscription on failure @bsideup
  56. Resiliency with Reactor • .timeout(Duration) - cancel the subscription and

    fail if no items emitted • .retry()/retryWithBackoff() - retry the subscription on failure • .repeatWhenEmpty() - repeat the subscription when it completes without values @bsideup
  57. Resiliency with Reactor • .timeout(Duration) - cancel the subscription and

    fail if no items emitted • .retry()/retryWithBackoff() - retry the subscription on failure • .repeatWhenEmpty() - repeat the subscription when it completes without values • .defaultIfEmpty() - fallback when empty @bsideup
  58. Resiliency with Reactor • .timeout(Duration) - cancel the subscription and

    fail if no items emitted • .retry()/retryWithBackoff() - retry the subscription on failure • .repeatWhenEmpty() - repeat the subscription when it completes without values • .defaultIfEmpty() - fallback when empty • .onErrorResume() - fallback on error @bsideup
  59. You may ask… • “How do I get the current

    Scheduler?” • “Why does flatMap changes the thread?” • …”Why it does not?” @bsideup
  60. Demo outcomes • Use .checkpoint(“something”) to “mark” reactive “milestones” •

    Read about Hooks.onOperatorDebug()… • … but use reactor-tools’ ReactorDebugAgent (works in prod too) • https://spring.io/blog/2019/03/06/flight-of-the-flux-1-assembly-vs- subscription - great article from Simon Basle about the internals @bsideup
  61. Summary • Learn functional programming • Check various operators •

    Think about the resiliency • Start gradually • Prepare for day 2 • Use it for heavy computations • Care about the threads • Block non-blocking threads • Use ThreadLocals • Be afraid of it ;) DO… DON’T… @bsideup