Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Reactive Programming with RxJava

Reactive Programming with RxJava

Hugo Cordier

March 10, 2015
Tweet

More Decks by Hugo Cordier

Other Decks in Programming

Transcript

  1. RxJava I’m Hugo Cordier CTO at Melusyn ◦ Java developer

    ◦ Groovy developer ◦ Javascript developer
  2. RxJava ◦ Callbacks and Futures ◦ Where is Rx from?

    ◦ Introducing Observable ◦ Subscribing to an Observable ◦ Applying operators ◦ Handling errors ◦ Hot and Cold Observable ◦ Backpressure ◦ Schedulers
  3. Rx - Callbacks server.listen(8080, "127.0.0.1", new AsyncResultHandler<Void>() { public void

    handle(AsyncResult<HttpServer> asyncResult) { log.info("Listen succeeded?"+asyncResult.succeeded()); } } ); Lot of boilerplate code Hard to read A mess to chain
  4. Rx - CompletableFutures CompletableFuture<Customer> customerFuture = loadCustomerDetails(123); CompletableFuture<Shop> shopFuture =

    closestShop(); CompletableFuture<Route> routeFuture = customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop) ); Easier to chain Easier to read But...
  5. Rx - Where’s it from ◦ Erik Meijer developed the

    concepts at Microsoft ◦ Microsoft open sourced Rx.NET and RxJS in 2012 ◦ In 2013 Netflix open sourced RxJava, based on Erik Meijer concepts
  6. Observable - Iterable parallel Single return value Mutiple return values

    Synchronous Data getData() Iterable<Data> getData() Asynchronous Future<Data> getData() Observable<Data> getData()
  7. Observable - Iterable parallel Data data = getData() if (data.equals(x))

    { // Do something } else { // Do something else } Single return value Mutiple return values Synchronous Data getData() Iterable<Data> getData() Asynchronous Future<Data> getData() Observable<Data> getData()
  8. Observable - Iterable parallel Iterable<Data> data = getData() data.forEach(value ->

    { if (value.equals(x)) { // Do something } else { // Do something else } }) Single return value Mutiple return values Synchronous Data getData() Iterable<Data> getData() Asynchronous Future<Data> getData() Observable<Data> getData()
  9. Observable - Iterable parallel CompletableFuture<Data> data = getData() data.thenApply(value ->

    { if (value.equals(x)) { // Do something } else { // Do something else } }) Single return value Mutiple return values Synchronous Data getData() Iterable<Data> getData() Asynchronous Future<Data> getData() Observable<Data> getData()
  10. Observable - Iterable parallel Observable<Data> data = getData() data.map(value ->

    { if (value.equals(x)) { // Do something } else { // Do something else } }) Single return value Mutiple return values Synchronous Data getData() Iterable<Data> getData() Asynchronous Future<Data> getData() Observable<Data> getData()
  11. An Observable can : • emit events (0 to ∞)

    • emit an error • complete stream Once an error has been emitted, no further events can be. An Observable could be infinite. Observable - Introduction
  12. rx.Observable.create { observer -> try{ for (int i = 0;

    i < 2; i++) { observer.onNext(i); } observer.onCompleted(); } catch (Exception e) { observer.onError(e); } }
  13. rx.Observable.create { observer -> executor.execute(new Runnable() { def void run()

    { try { for (int i = 0; i < 10; i++) { observer.onNext(i); this.sleep(2000) } observer.onCompleted(); } catch (Exception e) { observer.onError(e); } } }) }
  14. getData() .subscribe(new Subscriber() { @Override void onNext(Object o) { logger.info(o)

    } @Override void onError(Throwable e) { logger.error("Oops", e) } @Override void onCompleted() { logger.debug("Stream has completed") } })
  15. getData() .subscribe( { next -> logger.info(o) }, { e ->

    logger.error("Oops", e)}, { logger.debug("Stream has completed")} )
  16. rx.Observable.zip(getDataA(), getDataB(), {vA, vB -> [vA, vB] }) .subscribe {value

    -> logger.info value} Live example : http://rxmarbles.com/#zip
  17. Let’s use swapi.co, the Star Wars API Films, Characters, Planets,

    Species, everything from Star Wars :D What if I want to fetch every Character of a film, consolidated with its homeworld planet and species? Let’s try with Rx! Real life exemple
  18. getData() .filter {it>3} .map {it+2} .subscribe( { next -> logger.info(o)

    }, { e -> logger.error("Oops", e) // Handle any error here }, { logger.debug("Stream has completed")} ) Once Observable has failed, it terminates and do not send any more events.
  19. Errors can be caught with Observable<T> onErrorResumeNext (Observable<? extends T>

    resumeSequence) Observable<T> onErrorReturn (Func1<java.lang.Throwable,? extends T> resumeFunction)
  20. Cold Observables They are passive streams, producing events when subscribed

    to. This is default behaviour. def source = rx.Observable .interval(1, java.util.concurrent.TimeUnit. SECONDS) .doOnEach {println "Emitted an event"} source.subscribe ( {println it}, {it.printStackTrace()} ) This part is declarative. No events are emitted until something subscribe to it At this point, source starts to emit events
  21. Hot Observables They are active streams, and produce events regardless

    of subscription def source = rx.Observable .interval(1, java.util.concurrent.TimeUnit.SECONDS) .doOnEach {println "Emitted an event"} .publish() source.connect() source.subscribe ( {println it}, {it.printStackTrace()} ) publish() makes it a hot observable which will be triggered by connect() At this point, source starts to emit events
  22. Backpressure As Hot Observables emit events as their own pace,

    what if it emits faster than subscriber can consume data? Backpressure allows subscriber to control the stream. Some existing operators implements backpressure : merge, zip, from, groupBy, delay, scan, ... It can be implemented in any Observable source.
  23. Schedulers Rx is single threaded by default Schedulers define where

    to run each computation Each Operator has a default Scheduler [1] Rx allows developers to specify where to run their code using : public final Observable<T> subscribeOn(Scheduler scheduler) public final Observable<T> observeOn(Scheduler scheduler) [1] And you can find them here
  24. Schedulers Scheduler Purpose Schedulers.computation(  ) meant for computational work such

    as event- loops and callback processing; do not use this scheduler for I/O (use Schedulers.io(  ) instead) Schedulers.from(executor) uses the specified Executor as a Scheduler Schedulers.immediate(  ) schedules work to begin immediately in the current thread Schedulers.io(  ) meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation(  ) Schedulers.newThread(  ) creates a new thread for each unit of work Schedulers.trampoline(  ) queues work to begin on the current thread after any already-queued work
  25. Rx World • Rx.NET • RxJava, RxScala, RxGroovy, RxJRuby, RxNetty,

    RxAndroid • Rx.rb • Rx.py • RxClojure • RxCpp • RxKotlin