Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Understanding RxJava Scheduler

Understanding RxJava Scheduler

The talk at Rx Ja Night #2

This talk explains what is the difference of RxJava Schedulers and how they can be used. In addition, some caveat and the underlying structure of Scheduler is described.

Reference:
- Advanced Reactive Java: Schedulers (part 1~4)
http://akarnokd.blogspot.jp/2015/05/schedulers-part-1.html

Hiroshi Kurokawa

June 12, 2017
Tweet

More Decks by Hiroshi Kurokawa

Other Decks in Technology

Transcript

  1. COMPUTATION SCHEDULER ▸ for CPU-bound work ▸ bounded number of

    workers ▸ could be blocked if the maximum number of threads are being used
  2. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌}
  3. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌}
  4. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌}
  5. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌}
  6. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌}
  7. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌}
  8. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌}
  9. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌} println("$time [msec]”) // -> ?
  10. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌} println(“${Runtime.getRuntime().availableProcessors()}") // -> 4 println("$time [msec]”) // -> ?
  11. val time = measureTimeMillis { val observers = List(8) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.computation()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌} println(“${Runtime.getRuntime().availableProcessors()}") // -> 4 println("$time [msec]”) // -> 2156 [msec]
  12. TEXT CAVEAT ‣ Keep in mind how much concurrency you

    need when using computation scheduler
  13. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌}.subscribeOn(Schedulers.computation()) ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> ?
  14. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌}.subscribeOn(Schedulers.computation()) ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> ?
  15. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌}.subscribeOn(Schedulers.computation()) ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> ?
  16. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌}.subscribeOn(Schedulers.computation()) ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> ?
  17. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌}.subscribeOn(Schedulers.computation()) ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> ?
  18. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌}.subscribeOn(Schedulers.computation()) ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> 2314 [msec]
  19. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌}.subscribeOn(Schedulers.computation()) ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> ?
  20. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .subscribeOn(Schedulers.computation()) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌} ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> ?
  21. val time = measureTimeMillis { val observer = TestObserver.create<Int>() Observable.range(1,

    8) .subscribeOn(Schedulers.computation()) .flatMap { Observable.fromCallable { Thread.sleep(1_000L) println(Thread.currentThread()) it ‌} ‌} .subscribe(observer) observer.awaitTerminalEvent() ‌} println("Elapsed time: $time [msec]") // -> 8211 [msec]
  22. val time = measureTimeMillis { val observers = List(24) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.io()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌} println("$time [msec]”) // -> ? [msec]
  23. val time = measureTimeMillis { val observers = List(24) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.io()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌} println("$time [msec]”) // -> ? [msec]
  24. val time = measureTimeMillis { val observers = List(24) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.io()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌} println("$time [msec]”) // -> 1142 [msec]
  25. val time = measureTimeMillis { val observers = List(2026) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.io()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌} println("$time [msec]”) // -> 1142 [msec]
  26. val time = measureTimeMillis { val observers = List(2026) {

    i -> val observer = TestObserver.create<Any>() Completable.fromAction { Thread.sleep(1_000L) } .subscribeOn(Schedulers.io()) .doOnComplete { println("Done $i") } .subscribe(observer) observer ‌} observers.forEach { it.awaitTerminalEvent() } ‌} println("$time [msec]”) // -> 1142 [msec] java.lang.OutOfMemoryError: unable to create new native thread
  27. NEW SCHEDULER ▸ If io or computation scheduler does not

    fit ▸ Create a new thread for each worker ▸ Shut down at dispose()
  28. SCHEDULER AND WORKER ▸ Scheduler does not schedule (!) ▸

    Scheduler just creates a worker ▸ Worker processes tasks in a FIFO manner
  29. WORKER CONTRACT ▸ All methods should be thread-safe. ▸ Execute

    tasks in FIFO order. ▸ Make best effort to cancel outstanding tasks when unsubscribed. ▸ Unsubscription of a Worker should not affect other Worker instances of the same Scheduler.
  30. SCHEDULER API public Worker createWorker() public long now(TimeUnit unit) public

    void start() public void shutdown() public Disposable scheduleDirect(Runnable run) public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit)
  31. SCHEDULER API public Worker createWorker() public long now(TimeUnit unit) public

    void start() public void shutdown() public Disposable scheduleDirect(Runnable run) public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit)
  32. WORKER API public Disposable schedule(Runnable run) public Disposable schedule(Runnable run,

    long delay, TimeUnit unit) public Disposable schedulePeriodically(Runnable run, long initialDelay, final long period, final TimeUnit unit) public long now(TimeUnit unit)
  33. TRAMPOLINE VS. IMMEDIATE ▸ Schedulers.immediate() was dismissed in RxJava 2

    ▸ Instead, Schedulers.trampoline() is recommended ▸ Why?
  34. IMMEDIATE SCHEDULER (RXJAVA 1) val worker = Schedulers.immediate().createWorker() worker.schedule {

    println("outer task start") worker.schedule { println("inner task start") println("inner task end") } println("outer task end") }     $POUSBDU7JPMBUJPO ˑ  &YFDVUFUBTLTJO'*'0PSEFS˒
  35. val worker = Schedulers.trampoline().createWorker() worker.schedule { println("outer task start") worker.schedule

    { println("inner task start") println("inner task end") ‌} println("outer task end") ‌} TRAMPOLINE SCHEDULER     $POGPSNJOHUPUIFDPOUSBDU#
  36. WHEN WORKER IS BORN, WHEN A TASK IS ASSIGNED ▸

    A worker is created when a scheduler-related operator is subscribed ▸ delay(), observeOn(), subscribeOn(), etc. ▸ A task is scheduled when an event is emitted to the operator ▸ Events through an operator are serialised (FIFO) when not delayed
  37. FURTHER READING ▸ Advanced Reactive Java: Schedulers (part 1~4)
 http://akarnokd.blogspot.jp/2015/05/schedulers-

    part-1.html ▸ Read source code
 e.g. SingleScheduler, TrampolineScheduler, ExecutorScheduler