Upgrade to Pro — share decks privately, control downloads, hide ads and more …

GeeCON 2019: Don’t be Homer Simpson with your R...

GeeCON 2019: Don’t be Homer Simpson with your Reactor!

Project Reactor is a very powerful tool in the developers' hands. But, the more powerful tools you use, the stronger safety rules are. Do you know all of them?

In this talk, we will learn about advanced Project Reactor debugging & testing techniques, which are helpful for anyone who works with Project Reactor. Because like the cockroaches can survive even the radiation, bugs can escape the most well-thought-out code.

Never heard about BlockHound?
Or you know about Backpressure, but not sure if it is handled correctly in your​ code?
Maybe you want to prepare you reactive code upfront to help your future self when something goes wrong in prod?
Or perhaps just watch how Reactor team members work with Reactor code and learn some neat tricks?
Then this talk is for you!

Sergei Egorov

May 15, 2019
Tweet

More Decks by Sergei Egorov

Other Decks in Programming

Transcript

  1. About me • Staff Engineer at Pivotal’s Spring R&D, working

    on Project Reactor ⚛ • Berlin Spring User Group co-organizer • Testcontainers co-maintainer • Developer tools geek • … although spent most of my career building
 highly scalable backend services @bsideup
  2. I agree with Heinrich Apfelmus that the essence of functional

    reactive programming is to specify the dynamic behavior of a value completely at the time of declaration. “Reactive Sir” @bsideup
  3. // com/example/demo/Example.java Flux.range(0, 5) .single() .subscribeOn(Schedulers.parallel()) .subscribe(); java.lang.IndexOutOfBoundsException: Source emitted

    more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131) at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribe(Mono.java:3711) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) @bsideup
  4. // com/example/demo/Example.java Flux.range(0, 5) .single() .subscribeOn(Schedulers.parallel()) .subscribe(); java.lang.IndexOutOfBoundsException: Source emitted

    more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131) at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribe(Mono.java:3711) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) @bsideup
  5. // com/example/demo/Example.java Flux.range(0, 5) .single() .subscribeOn(Schedulers.parallel()) .subscribe(); java.lang.IndexOutOfBoundsException: Source emitted

    more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131) at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribe(Mono.java:3711) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Which source? @bsideup
  6. // com/example/demo/Example.java Flux.range(0, 5) .single() .subscribeOn(Schedulers.parallel()) .subscribe(); java.lang.IndexOutOfBoundsException: Source emitted

    more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131) at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribe(Mono.java:3711) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Which source? Where is “com.example.demo” package? @bsideup
  7. // com/example/demo/Example.java Flux.range(0, 5) .single() .subscribeOn(Schedulers.parallel()) .subscribe(); java.lang.IndexOutOfBoundsException: Source emitted

    more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131) at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribe(Mono.java:3711) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Which source? Where is “com.example.demo” package? Why they closed Google Inbox? @bsideup
  8. Flux.range(0, 5) .filter(it -> it > 3) .map(it -> it.toString())

    .single() .subscribeOn(Schedulers.parallel()) Assembly @bsideup
  9. Flux.range(0, 5) .filter(it -> it > 3) .map(it -> it.toString())

    .single() .subscribeOn(Schedulers.parallel()) .subscribe() Assembly @bsideup
  10. FluxRange FluxFilter FluxMap MonoSingle MonoSubscribeOn Execution current thread parallel-1 .onNext(1)

    java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(Mono at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(Flux at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxR at reactor.core.publisher.MonoSingle$SingleSubscriber.request(Mono at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber. at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber. at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(M at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribe(Mono.java:3711) at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber. @bsideup
  11. Demo outcomes • Use .checkpoint(“something”) to “mark” reactive “milestones” •

    Read about Hooks.onOperatorDebug()… • … but use reactor-tools’ ReactorDebugAgent (works in prod too) • https://spring.io/blog/2019/03/06/flight-of-the-flux-1-assembly-vs- subscription - great article from Simon Basle about the internals @bsideup
  12. Default thread pools Schedulers.parallel() - N threads, where N matches

    the CPUs count. Schedulers.single() - 1 thread handling all submitted tasks Schedulers.elastic() - dynamic, thread caching pool @bsideup
  13. public class Thread implements Runnable { private static native void

    $$BlockHound$$_sleep(long millis); public static void sleep(long millis) { $$BlockHound$$_sleep(millis); } } @bsideup
  14. public class Thread implements Runnable { private static native void

    $$BlockHound$$_sleep(long millis); public static void sleep(long millis) { reactor.BlockHoundRuntime.checkBlocking( "java.lang.Thread", "sleep", /*method modifiers*/ ); $$BlockHound$$_sleep(millis); } } @bsideup
  15. public class Thread implements Runnable { private static native void

    $$BlockHound$$_sleep(long millis); public static void sleep(long millis) { reactor.BlockHoundRuntime.checkBlocking( "java.lang.Thread", "sleep", /*method modifiers*/ ); $$BlockHound$$_sleep(millis); } } @bsideup
  16. Blocking check 1. Get or tag a current thread •

    JVMTI’s built-in mechanism to tag objects, calls nonBlockingThreadPredicate on creation and caches the result forever @bsideup
  17. Blocking check 1. Get or tag a current thread •

    JVMTI’s built-in mechanism to tag objects, calls nonBlockingThreadPredicate on creation and caches the result forever 2. Is it non-blocking? (the predicate returns “true”) • Very fast, O(1) check @bsideup
  18. Blocking check 1. Get or tag a current thread •

    JVMTI’s built-in mechanism to tag objects, calls nonBlockingThreadPredicate on creation and caches the result forever 2. Is it non-blocking? (the predicate returns “true”) • Very fast, O(1) check 3. Walk the stacktrace until a frame marked with (dis)allowBlockingCallsInside • Makes it possible to whitelist loggers and other non-harmful blocking calls @bsideup
  19. Stack @bsideup at sun.misc.Unsafe.park(Unsafe.java) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)

    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at com.example.demo.BlockingCodeTest.lambda$testBlockingCode$1(BlockingCodeTest.java:24) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onNext(FluxSubscribeOn.java:151) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.requestUpstream(FluxSubscribeOn.java:131) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onSubscribe(FluxSubscribeOn.java:124) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.Flux.subscribe(Flux.java:7800) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) at reactor.core.scheduler.ReactorBlockHoundIntegration$Wrapper.run(ReactorBlockHoundIntegration.java:56) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
  20. Stack @bsideup at sun.misc.Unsafe.park(Unsafe.java) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)

    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at com.example.demo.BlockingCodeTest.lambda$testBlockingCode$1(BlockingCodeTest.java:24) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onNext(FluxSubscribeOn.java:151) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.requestUpstream(FluxSubscribeOn.java:131) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onSubscribe(FluxSubscribeOn.java:124) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.Flux.subscribe(Flux.java:7800) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) at reactor.core.scheduler.ReactorBlockHoundIntegration$Wrapper.run(ReactorBlockHoundIntegration.java:56) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
  21. Stack @bsideup at sun.misc.Unsafe.park(Unsafe.java) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)

    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at com.example.demo.BlockingCodeTest.lambda$testBlockingCode$1(BlockingCodeTest.java:24) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onNext(FluxSubscribeOn.java:151) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.requestUpstream(FluxSubscribeOn.java:131) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onSubscribe(FluxSubscribeOn.java:124) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.Flux.subscribe(Flux.java:7800) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) at reactor.core.scheduler.ReactorBlockHoundIntegration$Wrapper.run(ReactorBlockHoundIntegration.java:56) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
  22. Stack @bsideup at sun.misc.Unsafe.park(Unsafe.java) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)

    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at com.example.demo.BlockingCodeTest.lambda$testBlockingCode$1(BlockingCodeTest.java:24) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onNext(FluxSubscribeOn.java:151) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.requestUpstream(FluxSubscribeOn.java:131) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onSubscribe(FluxSubscribeOn.java:124) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.Flux.subscribe(Flux.java:7800) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) at reactor.core.scheduler.ReactorBlockHoundIntegration$Wrapper.run(ReactorBlockHoundIntegration.java:56) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
  23. Stack @bsideup at sun.misc.Unsafe.park(Unsafe.java) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)

    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at com.example.demo.BlockingCodeTest.lambda$testBlockingCode$1(BlockingCodeTest.java:24) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onNext(FluxSubscribeOn.java:151) at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.requestUpstream(FluxSubscribeOn.java:131) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onSubscribe(FluxSubscribeOn.java:124) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.Flux.subscribe(Flux.java:7800) at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194) at reactor.core.scheduler.ReactorBlockHoundIntegration$Wrapper.run(ReactorBlockHoundIntegration.java:56) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Schedulers.onScheduleHook("BlockHound", Wrapper::new); builder.disallowBlockingCallsInside(Wrapper.class.getName(), "run");
  24. Demo outcomes • Blocking calls are bad, m’kay? • They

    may sneak into your production system • Use https://github.com/reactor/BlockHound to detect them • Supports multiple frameworks (Reactor, RxJava, etc) • … and maybe even Kotlin: 
 https://github.com/Kotlin/kotlinx.coroutines/issues/1031 • Use a dedicated pool for the necessary blocking calls, or schedule them on the Schedulers.elastic() built-in pool if they happen rarely @bsideup
  25. KafkaProducer#send(ProducerRecord,Callback) “Asynchronously send a record to a topic and invoke

    the provided callback when the send has been acknowledged.” - Javadoc @bsideup
  26. KafkaProducer#send(ProducerRecord,Callback) “Asynchronously send a record to a topic and invoke

    the provided callback when the send has been acknowledged.” - Javadoc java.lang.Error: Blocking call! java.lang.Object#wait at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154) at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254) at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43) at java.lang.Object.wait(Object.java) at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:181) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) @bsideup
  27. KafkaProducer#send(ProducerRecord,Callback) “Asynchronously send a record to a topic and invoke

    the provided callback when the send has been acknowledged.” - Javadoc java.lang.Error: Blocking call! java.lang.Object#wait at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154) at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254) at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43) at java.lang.Object.wait(Object.java) at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:181) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) https://issues.apache.org/jira/browse/KAFKA-3539 @bsideup
  28. long remainingWaitMs = maxWaitMs; long elapsed; // Issue metadata requests

    until we have metadata for the topic or maxWaitTimeMs is exceeded. // In case we already have cached metadata for the topic, but the requested partition is greater // than expected, issue an update request only once. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. do { log.trace("Requesting metadata update for topic {}.", topic); metadata.add(topic); int version = metadata.requestUpdate(); sender.wakeup(); try { metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (cluster.unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); Default is “60 seconds” @bsideup
  29. Schedulers.onScheduleHook(fn) @bsideup Schedulers.onScheduleHook("myHook", runnable -> { System.out.println("Before every scheduled runnable");

    return () -> { System.out.println("Before execution"); runnable.run(); System.out.println("After execution"); }; });
  30. Takeaways • https://github.com/reactor/reactor-tools • Flux#checkpoint() / Flux#log() • Hooks class

    • https://github.com/reactor/BlockHound • Async APIs - trust, but verify @bsideup