Upgrade to Pro — share decks privately, control downloads, hide ads and more …

RxJava: A Stream of Joy and Woe

RxJava: A Stream of Joy and Woe

RxJava has become an invaluable tool to many Android developers, aiding in the composition of the several asynchronous systems we must deal with. However, with so much power RxJava can sometimes be the hammer we hold with almost everything looking like a nail. This talk aims to run through anecdotal examples of where RxJava has worked well, and where it maybe wasn’t the best idea.

Chris Horner

October 27, 2018
Tweet

More Decks by Chris Horner

Other Decks in Technology

Transcript

  1. webServices.getSomething() .flatMap { firstResult -> webServices.getSomethingElse(firstResult) } .map { result

    -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { something -> // ... }/
  2. webServices.getSomething() .flatMap { firstResult -> webServices.getSomethingElse(firstResult) } .map { result

    -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe ( { something -> // ... }, { error -> // ... } )
  3. webServices.getSomething() .flatMap { firstResult -> webServices.getSomethingElse(firstResult) } .map { result

    -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe ( { something -> // ... }, { error -> // ... } ) OF TERMINAL EVENTS BEWARE
  4. val getSomethingFromWeb = webServices.getSomething() .flatMap { firstResult -> webServices.getSomethingElse(firstResult) }

    .map { result -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
  5. webServices.getSomething() .flatMap { firstResult -> webServices.getSomethingElse(firstResult) } .map { result

    -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { something -> // ... }/
  6. webServices.getSomething() .flatMap { firstResult -> webServices.getSomethingElse(firstResult) } .map { result

    -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { something -> // ... }/
  7. webServices.getSomething() .flatMap { firstResult -> if (!firstResult.isError()) { webServices.getSomethingElse(firstResult.response().body()) }

    else { Single.just(ERROR) }/ }/ .map { result -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { something -> // ... }/
  8. webServices.getSomething() .flatMap { firstResult -> if (!firstResult.isError() && firstResult.response().isSuccessful) {

    webServices.getSomethingElse(firstResult.response().body()) } else { Single.just(ERROR) }/ }/ .map { result -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { something -> // ... }/
  9. webServices.getSomething() .flatMap { firstResult -> if (!firstResult.isError() && firstResult.response().isSuccessful) {

    webServices.getSomethingElse(firstResult.response().body()) } else { Single.just(ERROR) }/ }/ .map { result -> transform(result) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { something -> // ... }/
  10. suspend fun doSomeWork() { try { val firstResponse = webServices.getSomething().await()

    if (firstResponse.isSuccessful) { val something = firstResponse.body() val secondResponse = webServices.getSomethingElse(something).await() // .. } } catch (e: IOException) { // .. } }
  11. • Interfacing with an engine monitoring system • Over bluetooth

    • Which is emulating a serial connection • Enabling a (very slow) satellite internet service • Via SOAP • Sending location and engine data over that connection • Meanwhile monitoring the accelerometer to check for accidents
  12. Z X

  13. Y

  14. sensors.observeSensor(TYPE_GRAVITY) .distinctUntilChanged() .map { it.sensorEvent.values } .filter { it[0] >

    ROLLOVER_THRESHOLD || it[2] > ROLLOVER_THRESHOLD } .map { Emergency.ROLLOVER } .throttleFirst(THROTTLE_SECONDS, TimeUnit.SECONDS) .observeOn(RealRxSchedulers.mainThread()) .subscribe { }
  15. • Location updates • Bluetooth payloads from engine • Managing

    output sockets as network conditions change • Shift status timers
  16. RxPermissions .request(Manifest.permission.CAMERA) .subscribe { granted -> if (granted) { //

    I can control the camera now. } else { // Oh no, permission denied. } }
  17. RxActivityResult.on(this) .startIntent(takePhoto) .subscribe { result, resultCode -> if (resultCode ==

    RESULT_OK) { result.targetUi().showImage(data) } else { result.targetUi().printUserCancelled() } }
  18. fun onViewAttached() { val user = userStore.getCurrentValue()() avatarView.setImage(user.profilePic) usernameView.text =

    user.username userStore.observe().subscribe { user -> avatarView.setImage(user.profilePic) usernameView.text = user.usernmame } }/
  19. https:/ /github.com/Gridstone/RxStore val userStore = storeProvider.valueStore<User>(file, converter) userStore.put(user) val observeUser:

    Observable<User> = userStore.observe() val putUser: Single<User> = userStore.observePut(user)
  20. val events: Observable<Event> = Observable.merge( loginButton.clicks().map { Event.AttemptLogin(username, password) },

    cancelButton.clicks().map { Event.Cancel } ) https:/ /github.com/JakeWharton/RxBinding
  21. Observables .combineLatest( usernameView.textChanges().map { it.isNotEmpty() }, passwordView.textChanges().map { it.isNotEmpty() })

    { hasUsername, hasPassword -> hasUsername && hasPassword } .subscribe(loginButton::setEnabled) Username Password Login
  22. Takeaways • If you only use Rx for callbacks with

    nice threading, consider coroutines • Beware terminal events • Be mindful of thread boundaries and state delivery • Don’t try to Rx-AllTheThings