Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Managing the Reactive World with RxJava (Philly...

Managing the Reactive World with RxJava (Philly ETE 2017)

The world around our programs are inherently asynchronous. RxJava builds on the primitives offered by the Java platform in order to model these asynchronous sources in an easy-to-consume way. This talk will be an exploration about how to think about these asynchronous sources, the tools that RxJava provides to handle them, and a look to the future of Java and how it will affect the library.

Video: coming soon

Jake Wharton

April 18, 2017
Tweet

More Decks by Jake Wharton

Other Decks in Programming

Transcript

  1. Why Reactive? Unless you can model your entire system synchronously,

    a single asynchronous source breaks imperative programming.
  2. Why Reactive? interface UserManager { User getUser(); void setName(String name);

    void setAge(int age);
 }A UserManager um = new UserManager();
  3. Why Reactive? interface UserManager { User getUser(); void setName(String name);

    void setAge(int age);
 }A UserManager um = new UserManager(); System.out.println(um.getUser());
  4. Why Reactive? interface UserManager { User getUser(); void setName(String name);

    void setAge(int age);
 }A UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe");
  5. Why Reactive? interface UserManager { User getUser(); void setName(String name);

    void setAge(int age);
 }A UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe"); System.out.println(um.getUser());
  6. Why Reactive? interface UserManager { User getUser(); void setName(String name);

    // <-- now async void setAge(int age); // <-- now async
 }A
  7. Why Reactive? interface UserManager { User getUser(); void setName(String name);

    void setAge(int age);
 }A UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe"); System.out.println(um.getUser());
  8. Why Reactive? interface UserManager { User getUser(); void setName(String name,

    Runnable callback); void setAge(int age, Runnable callback);
 }A
  9. Why Reactive? interface UserManager { User getUser(); void setName(String name,

    Runnable callback);A void setAge(int age, Runnable callback);B
 }A UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe", new Runnable() { @Override public void run() { System.out.println(um.getUser()); }X });
  10. Why Reactive? interface UserManager { User getUser(); void setName(String name,

    Listener listener);A void setAge(int age, Listener listener);B interface Listener { void success(User user); void failure(IOException e); }G
 }A
  11. Why Reactive? UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe",

    new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); }A @Override public void failure(IOException e) { // TODO show the error... }B });
  12. Why Reactive? UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe",

    new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); }A @Override public void failure(IOException e) { // TODO show the error... }B }); um.setAge(40, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); }C 2@Override public void failure(IOException e) {2 2// TODO show the error...2 }D });2
  13. Why Reactive? UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe",

    new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); um.setAge(40, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); }C 2@Override public void failure(IOException e) {2 2// TODO show the error...2 }D });2 }A @Override public void failure(IOException e) { // TODO show the error... }B });
  14. Why Reactive? public final class UserActivity extends Activity { private

    final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { tv.setText(um.getUser().toString()); }A @Override public void failure(IOException e) { // TODO show the error... }B }); }Y }Z System.out.println(um.getUser()); System.out.println(um.getUser()); um.setAge(40, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); }C 2@Override public void failure(IOException e) {2 2// TODO show the error...2 }D });2
  15. Why Reactive? public final class UserActivity extends Activity { private

    final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { tv.setText(um.getUser().toString()); }A @Override public void failure(IOException e) { // TODO show the error... }B }); }Y }Z
  16. Why Reactive? public final class UserActivity extends Activity { private

    final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } }A @Override public void failure(IOException e) { // TODO show the error... }B }); }Y }Z
  17. Why Reactive? public final class UserActivity extends Activity { private

    final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } }A @Override public void failure(IOException e) { // TODO show the error... }B }); }Y }Z
  18. Why Reactive? public final class UserActivity extends Activity { private

    final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); }L }A @Override public void failure(IOException e) { // TODO show the error... }B }); }Y }Z
  19. Why Reactive? public final class UserActivity extends Activity { private

    final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { runOnUiThread(new Runnable() { @Override public void run() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); }L }4 }); }A @Override public void failure(IOException e) { // TODO show the error... }B }); }Y }Z
  20. Why Reactive? public final class UserActivity extends Activity { private

    final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { runOnUiThread(new Runnable() { @Override public void run() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } }4 }); }A @Override public void failure(IOException e) { // TODO show the error... }B }); }Y }Z
  21. Why Reactive? GET / 200 OK SELECT * Jane Doe

    setText onClick SELECT * Jane Doe setText UPDATE user Jane Doe setText
  22. Why Reactive? Unless you can model your entire system synchronously,

    a single asynchronous source breaks imperative programming.
  23. RxJava • A set of classes for representing sources of

    data. • A set of classes for listening to data sources.
  24. RxJava • A set of classes for representing sources of

    data. • A set of classes for listening to data sources. • A set of methods for modifying and composing the data.
  25. RxJava • A set of classes for representing sources of

    data. • A set of classes for listening to data sources. • A set of methods for modifying and composing the data.
  26. Sources • Usually do work when you start or stop

    listening. • Synchronous or asynchronous.
  27. Sources • Usually do work when you start or stop

    listening. • Synchronous or asynchronous. • Single item or many items.
  28. Sources • Usually do work when you start or stop

    listening. • Synchronous or asynchronous. • Single item, many items, or empty.
  29. Sources • Usually do work when you start or stop

    listening. • Synchronous or asynchronous. • Single item, many items, or empty. • Terminates with an error or succeeds to completion.
  30. Sources • Usually do work when you start or stop

    listening. • Synchronous or asynchronous. • Single item, many items, or empty. • Terminates with an error or succeeds to completion. • May never terminate!
  31. Sources • Usually do work when you start or stop

    listening. • Synchronous or asynchronous. • Single item, many items, or empty. • Terminates with an error or succeeds to completion. • May never terminate! • Just an implementation of the Observer pattern.
  32. • Observable<T> • Emits 0 to n items. • Terminates

    with complete or error. • Flowable<T> • Emits 0 to n items. • Terminates with complete or error. void void
 Sources
  33. • Observable<T> • Emits 0 to n items. • Terminates

    with complete or error. • Does not have backpressure. • Flowable<T> • Emits 0 to n items. • Terminates with complete or error. • Has backpressure. void void
 Sources
  34. Flowable vs. Observable • Backpressure allows you to control how

    fast a source emits items. • RxJava 1.x added backpressure late in the design process.
  35. Flowable vs. Observable • Backpressure allows you to control how

    fast a source emits items. • RxJava 1.x added backpressure late in the design process. • All types exposed backpressure but not all sources respected it.
  36. Flowable vs. Observable • Backpressure allows you to control how

    fast a source emits items. • RxJava 1.x added backpressure late in the design process. • All types exposed backpressure but not all sources respected it. • Backpressure must be designed for.
  37. Flowable vs. Observable • Backpressure must be designed for. Observable<MotionEvent>

    events = RxView.touches(paintView); Observable<Row> rows = db.createQuery("SELECT * …");
  38. Flowable vs. Observable • Backpressure must be designed for. Observable<MotionEvent>

    events = RxView.touches(paintView); Observable<Row> rows = db.createQuery("SELECT * …");
  39. Flowable vs. Observable • Backpressure must be designed for. Observable<MotionEvent>

    events = RxView.touches(paintView); Observable<Row> rows = db.createQuery("SELECT * …");
  40. Flowable vs. Observable • Backpressure must be designed for. Observable<MotionEvent>

    events = RxView.touches(paintView); Observable<Row> rows = db.createQuery("SELECT * …");
  41. Flowable vs. Observable • Backpressure must be designed for. Observable<MotionEvent>

    events = RxView.touches(paintView); Observable<Row> rows = db.createQuery("SELECT * …"); MissingBackpressureException
  42. Flowable vs. Observable • Backpressure must be designed for. Observable<MotionEvent>

    events = RxView.touches(paintView); Flowable<Row> rows = db.createQuery("SELECT * …");
  43. Flowable vs. Observable Observable<MotionEvent> Flowable<Row> interface Subscriber<T> {
 void onNext(T

    t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }B
  44. Flowable vs. Observable Observable<MotionEvent> Flowable<Row> interface Subscriber<T> {
 void onNext(T

    t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }B
  45. Flowable vs. Observable Observable<MotionEvent> Flowable<Row> interface Subscriber<T> {
 void onNext(T

    t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }B
  46. Flowable vs. Observable Observable<MotionEvent> Flowable<Row> interface Subscriber<T> {
 void onNext(T

    t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }B
  47. Flowable vs. Observable Observable<MotionEvent> Flowable<Row> interface Subscriber<T> {
 void onNext(T

    t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }B
  48. Flowable vs. Observable Observable<MotionEvent> Flowable<Row> interface Subscriber<T> {
 void onNext(T

    t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }B interface Disposable {
 void dispose();
 }B
  49. Flowable vs. Observable Observable<MotionEvent> Flowable<Row> interface Subscriber<T> {
 void onNext(T

    t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }B interface Disposable {
 void dispose();
 }B interface Subscription {
 void cancel();
 void request(long r);
 }B
  50. Reactive Streams ...is an initiative to provide a standard for

    asynchronous stream processing with non-blocking back pressure.
  51. Reactive Streams interface Publisher<T> {
 void subscribe(Subscriber<? super T> s);


    }A interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B
  52. Reactive Streams interface Publisher<T> {
 void subscribe(Subscriber<? super T> s);


    }A interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Subscription {
 void request(long n);
 void cancel();
 }C
  53. Reactive Streams interface Publisher<T> {
 void subscribe(Subscriber<? super T> s);


    }A interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Subscription {
 void request(long n);
 void cancel();
 }C interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
 }D

  54. Single • Either succeeds with an item or errors. •

    No backpressure support. • Think "reactive scalar".
  55. Completable • Either completes or errors. Has no items! •

    No backpressure support. • Think "reactive runnable".
  56. Maybe • Either succeeds with an item, completes with no

    items, or errors. • No backpressure support.
  57. Maybe • Either succeeds with an item, completes with no

    items, or errors. • No backpressure support. • Think "reactive optional".
  58. Source Specializations • Encoding subsets of Observable into the type

    system. • Single – Item or error. Think "scalar". • Completable – Complete or error. Think "runnable". • Maybe – Item, complete, or error. Think "optional".
  59. Reactive Streams (Backpressure) No Backpressure 0…n items, complete|error Flowable Observable

    item|complete|error Maybe item|error Single complete|error Completable
  60. Creating Sources String[] array = { "Hello", "World" };
 List<String>

    list = Arrays.asList(array);
 Flowable.fromArray(array); Flowable.fromIterable(list); Observable.fromArray(array); Observable.fromIterable(list);
  61. Creating Sources OkHttpClient client = // … Request request =

    // … Observable.fromCallable(new Callable<String>() {
 @Override public String call() throws Exception {Y
 return client.newCall(request).execute();Z
 }X
 }); 
 
 getName()
  62. Creating Sources Flowable.fromCallable(() -> "Hello");
 
 Observable.fromCallable(() -> "Hello");
 


    Maybe.fromCallable(() -> "Hello");
 
 Single.fromCallable(() -> "Hello");
 
 Completable.fromCallable(() -> "Ignored!");
  63. Creating Sources Flowable.fromCallable(() -> "Hello");
 
 Observable.fromCallable(() -> "Hello");
 


    Maybe.fromCallable(() -> "Hello");
 Maybe.fromAction(() -> System.out.println("Hello"));
 Maybe.fromRunnable(() -> System.out.println("Hello"))
 
 Single.fromCallable(() -> "Hello");
 
 Completable.fromCallable(() -> "Ignored!");
 Completable.fromAction(() -> System.out.println("Hello"));
 Completable.fromRunnable(() -> System.out.println("Hello"));
  64. Creating Sources Observable.create(e -> {
 e.onNext("Hello");
 e.onComplete();
 }); new ObservableOnSubscribe<String>()


    @Override
 public void subscribe(ObservableEmitter<String> ) throws Exception {
 
 
 }X

  65. Creating Sources OkHttpClient client = // … Request request =

    // … Observable.create(e -> { client.newCall(request).enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); }A @Override public void onFailure(IOException e) { e.onError(e); }B });
 }); 
 e.onNext("Hello"); e.onNext("World");
 e.onComplete();
  66. Creating Sources OkHttpClient client = // … Request request =

    // … Observable.create(e -> { Call call = client.newCall(request); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); }A @Override public void onFailure(IOException e) { e.onError(e); }B });
 });
  67. Creating Sources OkHttpClient client = // … Request request =

    // … Observable.create(e -> { Call call = client.newCall(request); e.setCancelation(() -> call.cancel()); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); }A @Override public void onFailure(IOException e) { e.onError(e); }B });
 }); OkHttpClient client = // … Request request = // … Observable.create(e -> { Call call = client.newCall(request); e.setCancelation(() -> call.cancel()); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); }A @Override public void onFailure(IOException e) { e.onError(e); }B });
 });
  68. Creating Sources View view = // … Observable.create(e -> {

    e.setCancelation(() -> view.setOnClickListener(null)); view.setOnClickListener(v -> e.onNext(v));
 }); OkHttpClient client = // ... Request request = // ... Call call = client.newCall(request); e.setCancelation(() -> call.cancel()); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); }A @Override public void onFailure(IOException e) { e.onError(e); }B });

  69. Creating Sources Flowable.create(e -> { … }); Observable.create(e -> {

    … }); Maybe.create(e -> { … }); Single.create(e -> { … }); Completable.create(e -> { … });
  70. Observing Sources Observable<String> Flowable<String> interface Subscriber<T> {
 void onNext(T t);


    void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }B
  71. Observing Sources Observable<String> Flowable<String> interface Subscriber<T> {
 void onNext(T t);


    void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Observer<T> {
 void onNext(T t);A
 void onComplete();B
 void onError(Throwable t);C
 void onSubscribe(Disposable d);D
 }B interface Disposable {
 void dispose();
 }B interface Subscription {
 void cancel();
 void request(long r);
 }B
  72. Observing Sources Observable<String> o = Observable.just("Hello"); o.subscribe(new Observer<String>() {
 @Override

    public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … } 
 @Override public void onSubscribe(Disposable d) { ??? }B
 }); Flowable<String> interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }B interface Disposable {
 void dispose();
 }B interface Subscription {
 void cancel();
 void request(long r);
 }B interface T
 T t ;A
 ;B
 ;C
 ;D

  73. Observing Sources Observable<String> o = Observable.just("Hello"); o.subscribe(new DisposableObserver<String>() {
 @Override

    public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 }); 
 
 
 
 @Override public void onSubscribe(Disposable d) { ??? }B

  74. Observing Sources Observable<String> o = Observable.just("Hello"); o.subscribe(new DisposableObserver<String>() {
 @Override

    public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 });Z // TODO how do we dispose???
  75. Observing Sources Observable<String> o = Observable.just("Hello"); DisposableObserver observer = new

    DisposableObserver<String>() {
 @Override public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 } o.subscribe(observer);Z // TODO how do we dispose??????
  76. Observing Sources Observable<String> o = Observable.just("Hello"); DisposableObserver observer = new

    DisposableObserver<String>() {
 @Override public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 } o.subscribe(observer);Z observer.dispose();
  77. Observing Sources Observable<String> o = Observable.just("Hello"); o.subscribe(new DisposableObserver<String>() {
 @Override

    public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 });Z
  78. Observing Sources Observable<String> o = Observable.just("Hello"); o.subscribeWith(new DisposableObserver<String>() {
 @Override

    public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 });Z
  79. Observing Sources Observable<String> o = Observable.just("Hello"); Disposable d = o.subscribeWith(new

    DisposableObserver<String>() {
 @Override public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 });Z d.dispose();
  80. Observing Sources Observable<String> o = Observable.just("Hello"); CompositeDisposable disposables = new

    CompositeDisposable(); disposables.add(o.subscribeWith(new DisposableObserver<String>() {
 @Override public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 }));Z disposables.dispose(); 
 
 
 
 d.dispose(); Disposable d =
  81. Observing Sources Observable<String> o = Observable.just("Hello"); o.subscribeWith(new DisposableObserver<String>() { …

    });Z Maybe<String> m = Maybe.just("Hello"); m.subscribeWith(new DisposableMaybeObserver<String>() { … });Z Single<String> s = Single.just("Hello"); s.subscribeWith(new DisposableSingleObserver<String>() { … });Z Completable c = Completable.completed(); c.subscribeWith(new DisposableCompletableObserver<String>() { … });Z disposables.add(
 @Override public void onNext(String s) { … }
 @Override public void onComplete() { … }
 @Override public void onError(Throwable t) { … }
 ) disposables.dispose();
  82. Observing Sources Flowable<String> f = Flowable.just("Hello"); f.subscribeWith(new DisposableSubscriber<String>() { …

    }); Observable<String> o = Observable.just("Hello"); o.subscribeWith(new DisposableObserver<String>() { … }); Maybe<String> m = Maybe.just("Hello"); m.subscribeWith(new DisposableMaybeObserver<String>() { … }); Single<String> s = Single.just("Hello"); s.subscribeWith(new DisposableSingleObserver<String>() { … }); Completable c = Completable.completed(); c.subscribeWith(new DisposableCompletableObserver<String>() { … }); Flowable<String> f = Flowable.just("Hello"); f.subscribeWith(new DisposableSubscriber<String>() { … }); Observable<String> o = Observable.just("Hello"); o.subscribeWith(new DisposableObserver<String>() { … }); Maybe<String> m = Maybe.just("Hello"); m.subscribeWith(new DisposableMaybeObserver<String>() { … }); Single<String> s = Single.just("Hello"); s.subscribeWith(new DisposableSingleObserver<String>() { … }); Completable c = Completable.completed(); c.subscribeWith(new DisposableCompletableObserver<String>() { … });
  83. Observing Sources Flowable<String> f = Flowable.just("Hello"); Disposable d1 = f.subscribeWith(new

    DisposableSubscriber<String>() { … }); Observable<String> o = Observable.just("Hello"); Disposable d2 = o.subscribeWith(new DisposableObserver<String>() { … }); Maybe<String> m = Maybe.just("Hello"); Disposable d3 = m.subscribeWith(new DisposableMaybeObserver<String>() { … }); Single<String> s = Single.just("Hello"); Disposable d4 = s.subscribeWith(new DisposableSingleObserver<String>() { … }); Completable c = Completable.completed(); Disposable d5 = c.subscribeWith(new DisposableCompletableObserver<String>() { … });
  84. RxJava • A set of classes for representing sources of

    data. • A set of classes for listening to data sources. • A set of methods for modifying and composing data.
  85. RxJava • A set of classes for representing sources of

    data. • A set of classes for listening to data sources. • A set of methods for modifying and composing data.
  86. Operators • Manipulate or combine data in some way. •

    Manipulate threading in some way. • Manipulate emissions in some way.
  87. Operators @Override public void success() { runOnUiThread(new Runnable() { @Override

    public void run() { tv.setText(um.getUser().toString()); }4 }); }A
  88. Operators OkHttpClient client = // … Request request = //

    … Response response = client.newCall(request).execute();
  89. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); });
  90. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }); Observable<Response> backgroundResponse = response.subscribeOn(Schedulers.io());
  91. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }); Observable<Response> backgroundResponse = response.subscribeOn(Schedulers.io());
  92. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }); Observable<Response> backgroundResponse = response.subscribeOn(Schedulers.io());
  93. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io());Y
  94. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread());Y
  95. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(response -> response.body().string());Y
  96. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(response -> response.body().string());Y// NetworkOnMainThread!
  97. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .map(response -> response.body().string()) // Ok! .observeOn(AndroidSchedulers.mainThread());Y// NetworkOnMainThread!
  98. Operators OkHttpClient client = // … Request request = //

    … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .map(response -> response.body().string()) // Ok! .flatMap(s -> Observable.fromArray(s.split(" "))) .observeOn(AndroidSchedulers.mainThread());Y// NetworkOnMainThread!
  99. Operator Specialization 1 2 3 4 5 6 7 8

    9 10 11 first() 1 Observable Observable
  100. Operator Specialization 1 2 3 4 5 6 7 8

    9 10 11 get(0) 1 List<Long> List<Long>
  101. Operator Specialization 1 2 3 4 5 6 7 8

    9 10 11 first() 1 Observable Single
  102. Operator Specialization 1 2 3 4 5 6 7 8

    9 ignoreElements() Observable Completable
  103. Flowable Observable Maybe Single Completable Flowable toObservable() reduce() elementAt() firstElement()

    lastElement() singleElement() scan() elementAt() first()/firstOrError() last()/lastOrError() single/singleOrError() all()/any()/count() (and more) ignoreElements() Observable toFlowable() reduce() elementAt() firstElement() lastElement() singleElement() scan() elementAt() first()/firstOrError() last()/lastOrError() single/singleOrError() all()/any()/count() (and more) ignoreElements() Maybe toFlowable() toObservable() toSingle() sequenceEqual() toCompletable() Single toFlowable() toObservable() toMaybe() toCompletable() Completable toFlowable() toObservable() toMaybe() toSingle() toSingleDefault() From To
  104. Flowable Observable Maybe Single Completable Flowable toObservable() reduce() elementAt() firstElement()

    lastElement() singleElement() scan() elementAt() first()/firstOrError() last()/lastOrError() single/singleOrError() all()/any()/count() (and more) ignoreElements() Observable toFlowable() reduce() elementAt() firstElement() lastElement() singleElement() scan() elementAt() first()/firstOrError() last()/lastOrError() single/singleOrError() all()/any()/count() (and more) ignoreElements() Maybe toFlowable() toObservable() toSingle() sequenceEqual() toCompletable() Single toFlowable() toObservable() toMaybe() toCompletable() Completable toFlowable() toObservable() toMaybe() toSingle() toSingleDefault() From To
  105. Flowable Observable Maybe Single Completable Flowable toObservable() reduce() elementAt() firstElement()

    lastElement() singleElement() scan() elementAt() first()/firstOrError() last()/lastOrError() single/singleOrError() all()/any()/count() (and more) ignoreElements() Observable toFlowable() reduce() elementAt() firstElement() lastElement() singleElement() scan() elementAt() first()/firstOrError() last()/lastOrError() single/singleOrError() all()/any()/count() (and more) ignoreElements() Maybe toFlowable() toObservable() toSingle() sequenceEqual() toCompletable() Single toFlowable() toObservable() toMaybe() toCompletable() Completable toFlowable() toObservable() toMaybe() toSingle() toSingleDefault() From To
  106. Being Reactive um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void

    onNext(User user) { }1 @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } });
  107. Being Reactive um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void

    onNext(User user) { tv.setText(user.toString()); }1 @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } });2
  108. Being Reactive disposables.add(um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void

    onNext(User user) { tv.setText(user.toString()); }1 @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } }));2
  109. Being Reactive // onCreate disposables.add(um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override

    public void onNext(User user) { tv.setText(user.toString()); }1 @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } })); // onDestroy disposables.dispose();
  110. Being Reactive um.setName("Jane Doe") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableCompletableObserver() { @Override

    public void onComplete() { // success! re-enable editing }1 @Override public void onError(Throwable t) { // retry or show }2 });3
  111. Being Reactive disposables.add(um.setName("Jane Doe") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableCompletableObserver() { @Override

    public void onComplete() { // success! re-enable editing }1 @Override public void onError(Throwable t) { // retry or show }2 }));3
  112. Java 9 • JEP 266: More Concurrency Updates Interfaces supporting

    the Reactive Streams publish-subscribe framework, nested within the new class Flow, along with a utility class SubmissionPublisher that developers can use to create custom components. These (very small) interfaces correspond to those defined with broad participation (from the Reactive Streams initiative) and support interoperability across a number of async systems running on JVMs. Nesting the interfaces within a class is a conservative policy allowing their use across various short-term and long-term possibilities. The proposed java.util.concurrent components have been offered in pre-release since January 2015, and have benefitted from several rounds of review. There are no plans to provide network- or I/O-based java.util.concurrent components for distributed messaging, but it is possible that future JDK releases will include such APIs in other packages.
  113. final class Flow { private Flow() {} interface Publisher<T> {

    void subscribe(Subscriber<? super T> subscriber); } interface Subscriber<T> { void onSubscribe(Subscription subscription); void onNext(T item); void onError(Throwable throwable); void onComplete(); } interface Subscription { void request(long n); void cancel(); } interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } }