Upgrade to Pro — share decks privately, control downloads, hide ads and more …

RxJava 2.0 介紹

RxJava 2.0 介紹

2016/11/24 Android Taipei 分享

Chien Shuo (Kros)

November 24, 2016
Tweet

More Decks by Chien Shuo (Kros)

Other Decks in Programming

Transcript

  1. 為什什麼要有 2.0 • RxJava 2.0 has been completely rewritten from

    scratch on top of the Reactive-Streams specification. The specification itself has evolved out of RxJava 1.x and provides a common baseline for reactive systems and libraries. • 因為要符合 Reactive-Streams specification。所以把 RxJava Library 重寫⼀一遍,稱作 2.0
  2. 什什麼是Reactive Streams • Reactive Streams is an initiative to provide

    a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols. • 簡單來來說就是⼀一個 “處理理 asynchronous 程式” 的標準規範
  3. 反觀 Java 9 • Java 9 也⽀支援 Reactive-Streams specification •

    Reactive Programming with JDK 9 Flow API
 https://community.oracle.com/docs/DOC-1006738
  4. interface Publisher<T> {
 void subscribe(Subscriber<? super T> s);
 }
 


    interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void request(long n);
 void cancel();
 } interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } Reactive-Streams specification
  5. RxJava • A set of classes for representing sources of

    data. • A set of classes for listening to data sources. • A set of methods for modifying and composing the data.
  6. RxJava • A set of classes for representing sources of

    data. • A set of classes for listening to data sources. • A set of methods for modifying and composing the data.
  7. Sources • 通常當你開始(或結束)監聽時,才開始運作。 • 運作時可以是 synchronous 或 asynchronous。 • Source

    傳送的資料可以是 Single item, many items, 或 empty。 • 當發⽣生 error 或收到 complete 時結束。也或者永遠不結束。
  8. 2.0 差別 • 新增了了不同的 來來源(Source) • RxJava 1.0 - Only

    Observable • RxJava 2.0 - Observable And Flowable
  9. interface Publisher<T> {
 void subscribe(Subscriber<? super T> s);
 }
 


    interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void request(long n);
 void cancel();
 } interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } Flowable vs. Observable
  10. interface Publisher<T> {
 public void subscribe(Subscriber<? super T> s);
 }

    Flowable Flowable vs. Observable Observable interface ObservableSource<T> {
 void subscribe(Observer<? super T> observer);
 }
  11. Flowable vs. Observable • Observable<T> • 發射 0 ~ n

    筆資料 • 收到 complete 或 error 時結束 • 沒有流量量控制(backpressure) • Flowable<T> • 發射 0 ~ n 筆資料 • 收到 complete 或 error 時結束 • 有流量量控制(backpressure)
  12. Flowable vs. Observable • Observable<T> • 發射 0 ~ n

    筆資料 • 收到 complete 或 error 時結束 • 沒有流量量控制 • Flowable<T> • 發射 0 ~ n 筆資料 • 收到 complete 或 error 時結束 • 有流量量控制
  13. Reactive-Streams specification interface Publisher<T> {
 void subscribe(Subscriber<? super T> s);


    }
 
 interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void request(long n);
 void cancel();
 } interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } 流量量控制 (Backpressure)
  14. Flowable vs. Observable Observable<MotionEvent> events = RxView.touches(paintView)
 Observable<Row> rows =

    db.createQuery("SELECT *")
 無法控制 User 畫圖的事件, 本質上無法做流量量控制(Backpressure)
  15. Flowable vs. Observable Observable<MotionEvent> events = RxView.touches(paintView)
 Observable<Row> rows =

    db.createQuery("SELECT *")
 while (cursor.moveToNext()) { // … } Cursor 就是流量量控制
  16. Flowable vs. Observable Observable<MotionEvent> events = RxView.touches(paintView)
 Flowable<Row> rows =

    db.createQuery("SELECT *")
 while (cursor.moveToNext()) { // … } • 要針對不同的情況,使⽤用不同的 Design Pattern。
  17. interface Publisher<T> {
 public void subscribe(Subscriber<? super T> s);
 }

    Flowable Flowable vs. Observable Observable interface ObservableSource<T> {
 void subscribe(Observer<? super T> observer);
 }
  18. interface Publisher<T> {
 public void subscribe(Subscriber<? super T> s);
 }

    Flowable Flowable vs. Observable Observable interface ObservableSource<T> {
 void subscribe(Observer<? super T> observer);
 }
  19. interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable

    t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } Observable Flowable Flowable vs. Observable interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 }
  20. interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable

    t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 } Observable Flowable Flowable vs. Observable • 收到 Source 傳來來的資料,可接收多次。
  21. interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable

    t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 } Observable Flowable Flowable vs. Observable • 收到完成,表⽰示此 Source 已經「成功」的結束,不再傳送任何東⻄西。
  22. interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable

    t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 } Observable Flowable Flowable vs. Observable • 此 Source 發⽣生錯誤,並結束。
  23. Flowable vs. Observable interface Subscriber<T> {
 void onNext(T t);
 void

    onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 } Observable Flowable
  24. Source Specializations • RxJava 2.0 額外新增了了幾個特別的 Source • 這些 Source

    為 Observable 的⼦子集和(subsets) • Single - 回傳 1 或 error • Completable - 回傳 complete 或 error • Maybe - 回傳 0, 1, complete 或 error
  25. Maybe • 只會回傳下列列三種情況: • Succeeds with an item. • Completes

    with no items. • Errors • 不⽀支援流量量控制(backpressure)
  26. Source Specializations • RxJava 2.0 額外新增了了幾個特別的 Source • 這些 Source

    為 Observable 的⼦子集和(subsets) • Single - 回傳 1 或 error • Completable - 回傳 complete 或 error • Maybe - 回傳 0, 1, complete 或 error
  27. Creating Sources String[] array = {"a", "b"};
 List<String> list =

    new ArrayList<>(); 
 Flowable.fromArray(array);
 Flowable.fromIterable(list);
 
 Observable.fromArray(array);
 Observable.fromIterable(list);
  28. Creating Sources Flowable.fromCallable(() -> "Hello");
 
 Observable.fromCallable(() -> "Hello");
 


    Single.fromCallable(() -> "Hello");
 
 Maybe.fromCallable(() -> "Hello");
 
 Completable.fromCallable(() -> "Ignore!");

  29. Creating Sources Flowable.fromCallable(() -> "Hello");
 
 Observable.fromCallable(() -> "Hello");
 


    Single.fromCallable(() -> "Hello");
 
 Maybe.fromCallable(() -> "Hello");
 Maybe.fromAction(() -> System.out.println("Hello"));
 Maybe.fromRunnable(() -> System.out.println("Hello"));
 
 Completable.fromCallable(() -> "Ignore!");
 Completable.fromAction(() -> System.out.println("Hello"));
 Completable.fromRunnable(() -> System.out.println("Hello"));
  30. Creating Sources Observable.fromCallable(new Callable<String>() {
 @Override
 public String call() throws

    Exception {
 return "Hello";
 }
 }); • Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function. • 當有⼈人訂閱 (subscribe) 時,才發射資料。
  31. Creating Sources Observable.create(e -> {
 e.onNext(“Hello"); e.onNext("World");
 e.onComplete();
 }); •

    可以呼叫多次 onNext • (fromCallable 只能呼叫⼀一次 onNext)
  32. Creating Sources ExecutorService executor = Executors.newSingleThreadExecutor();
 Observable.create(e -> {
 executor.submit(()

    -> {
 // ...
 e.onNext(result);
 e.onComplete();
 });
 }); • 可在 background thread 呼叫 onNext,實作 async task。
  33. Creating Sources OkHttpClient client = // … Request request =

    // … 
 Observable.create(e -> {
 client.newCall(request).enqueue(new Callback() {
 @Override
 public void onResponse(Call call, Response response) throws IOException {
 e.onNext(response.body().toString());
 e.onComplete();
 }
 
 @Override
 public void onFailure(Call call, IOException ex) {
 e.onError(ex);
 }
 });
 });
  34. Creating Sources OkHttpClient client = // … Request request =

    // … 
 Observable.create(e -> {
 client.newCall(request).enqueue(new Callback() {
 @Override
 public void onResponse(Call call, Response response) throws IOException {
 e.onNext(response.body().toString());
 e.onComplete();
 }
 
 @Override
 public void onFailure(Call call, IOException ex) {
 e.onError(ex);
 }
 });
 });
  35. Creating Sources OkHttpClient client = // … Request request =

    // … 
 Observable.create(e -> {
 client.newCall(request).enqueue(new Callback() {
 @Override
 public void onResponse(Call call, Response response) throws IOException {
 e.onNext(response.body().toString());
 e.onComplete();
 }
 
 @Override
 public void onFailure(Call call, IOException ex) {
 e.onError(ex);
 }
 });
 });
  36. Creating Sources OkHttpClient client = // … Request request =

    // … 
 Observable.create(e -> {
 client.newCall(request).enqueue(new Callback() {
 @Override
 public void onResponse(Call call, Response response) throws IOException {
 e.onNext(response.body().toString());
 e.onComplete();
 }
 
 @Override
 public void onFailure(Call call, IOException ex) {
 e.onError(ex);
 }
 });
 });
  37. Creating Sources OkHttpClient client = // … Request request =

    // … 
 Observable.create(e -> {
 Call call = client.newCall(request);
 e.setCancellable(() -> call.cancel());
 call.enqueue(new Callback() {
 @Override
 public void onResponse(Call call, Response response) throws IOException {
 e.onNext(response.body().toString());
 e.onComplete();
 }
 
 @Override
 public void onFailure(Call call, IOException ex) {
 e.onError(ex);
 }
 });
 });
  38. Creating Sources OkHttpClient client = // … Request request =

    // … 
 Observable.create(e -> {
 Call call = client.newCall(request);
 e.setCancellable(() -> call.cancel());
 call.enqueue(new Callback() {
 @Override
 public void onResponse(Call call, Response response) throws IOException {
 e.onNext(response.body().toString());
 e.onComplete();
 }
 
 @Override
 public void onFailure(Call call, IOException ex) {
 e.onError(ex);
 }
 });
 }); 當 unsubscribe 時,取消 request
  39. Creating Sources Button button = // … Observable.create(e -> {


    e.setCancellable(() -> {
 button.setOnClickListener(null) }); button.setOnClickListener(v -> e.onNext(v));
 });
  40. Creating Sources Button button = // … Observable.create(e -> {


    e.setCancellable(() -> {
 button.setOnClickListener(null) }); button.setOnClickListener(v -> e.onNext(v));
 }); • 在 Android 中常會⽤用到,避免 memory leak。
  41. Observing Sources interface Subscriber<T> {
 void onNext(T t);
 void onComplete();


    void onError(Throwable t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 } Observable Flowable
  42. Observing Sources interface Subscriber<T> {
 void onNext(T t);
 void onComplete();


    void onError(Throwable t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 } Observable Flowable
  43. • 當開始監聽資料時,會⾺馬上呼叫 onSubscribe。 Observing Sources interface Subscriber<T> {
 void onNext(T

    t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 } Observable Flowable
  44. interface Subscriber<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable

    t);
 void onSubscribe(Subscription s);
 }
 
 interface Subscription {
 void cancel();
 void request(long r);
 } interface Observer<T> {
 void onNext(T t);
 void onComplete();
 void onError(Throwable t);
 void onSubscribe(Disposable d);
 }
 
 interface Disposable {
 void dispose();
 } Observable Flowable • 當開始監聽資料時,會⾺馬上呼叫 onSubscribe。 • Observer 可以利利⽤用 onSubscribe 提供的物件,呼叫 cancel 或是 backpressure。 Observing Sources
  45. Observing Sources Observable.just("Hello").subscribe(new Observer<String>() {
 @Override
 public void onNext(String value)

    {
 
 }
 @Override
 public void onComplete() {
 
 }
 @Override
 public void onError(Throwable e) {
 
 }
 @Override
 public void onSubscribe(Disposable d) {
 
 }
 });
  46. Observing Sources Observable.just("Hello").subscribe(new Observer<String>() {
 @Override
 public void onNext(String value)

    {
 
 }
 @Override
 public void onComplete() {
 
 }
 @Override
 public void onError(Throwable e) {
 
 }
 @Override
 public void onSubscribe(Disposable d) {
 
 }
 });
  47. Observing Sources Observable.just("Hello").subscribe(new DisposableObserver<String>() {
 @Override
 public void onNext(String value)

    {
 } 
 @Override
 public void onComplete() {
 }
 
 @Override
 public void onError(Throwable e) {
 }
 });
  48. Observing Sources DisposableObserver<String> observer = new DisposableObserver<String>() {
 @Override
 public

    void onNext(String value) {
 }
 
 @Override
 public void onComplete() {
 }
 
 @Override
 public void onError(Throwable e) {
 }
 }; 
 Observable.just(“Hello”).subscribe(observer);
  49. Observing Sources DisposableObserver<String> observer = new DisposableObserver<String>() {
 @Override
 public

    void onNext(String value) {
 }
 
 @Override
 public void onComplete() {
 }
 
 @Override
 public void onError(Throwable e) {
 }
 }; 
 Observable.just(“Hello”).subscribe(observer); // 如何 unsubscribe ?
  50. Observing Sources DisposableObserver<String> observer = new DisposableObserver<String>() {
 @Override
 public

    void onNext(String value) {
 }
 
 @Override
 public void onComplete() {
 }
 
 @Override
 public void onError(Throwable e) {
 }
 }; 
 Observable.just(“Hello").subscribe(observer); observer.dispose();
  51. Observing Sources Observable<String> o = Observable.just("Hello"); Disposable d = o.subscribeWith(new

    DisposableObserver<String>() {
 @Override
 public void onNext(String value) { … }
 @Override
 public void onComplete() { … }
 @Override
 public void onError(Throwable e) { … }
 }); d.dispose();
  52. Observing Sources Observable<String> o = Observable.just(“Hello"); Disposable d = o.subscribeWith(new

    DisposableObserver<String>() {
 @Override
 public void onNext(String value) { … }
 @Override
 public void onComplete() { … }
 @Override
 public void onError(Throwable e) { … }
 }); d.dispose(); New Method: subscribeWith
  53. Observing Sources Observable<String> o = Observable.just(“Hello"); Disposable d = o.subscribeWith(new

    DisposableObserver<String>() {
 @Override
 public void onNext(String value) { … }
 @Override
 public void onComplete() { … }
 @Override
 public void onError(Throwable e) { … }
 }); d.dispose(); 回傳 Disposable
  54. Observing Sources Observable<String> o = Observable.just(“Hello"); Disposable d = o.subscribeWith(new

    DisposableObserver<String>() {
 @Override
 public void onNext(String value) { … }
 @Override
 public void onComplete() { … }
 @Override
 public void onError(Throwable e) { … }
 }); d.dispose();
  55. Observing Sources Observable<String> o = Observable.just(“Hello”); CompositeDisposable disposables = new

    CompositeDisposable(); disposables.add(o.subscribeWith(new DisposableObserver<String>() {
 @Override
 public void onNext(String value) { … }
 @Override
 public void onComplete() { … }
 @Override
 public void onError(Throwable e) { … }
 })); disposables.dispose();
  56. Observing Sources Observable<String> o = Observable.just(“Hello”); CompositeDisposable disposables = new

    CompositeDisposable(); disposables.add(o.subscribeWith(new DisposableObserver<String>() {
 @Override
 public void onNext(String value) { … }
 @Override
 public void onComplete() { … }
 @Override
 public void onError(Throwable e) { … }
 })); disposables.dispose();
  57. Observing Sources Observable<String> o = Observable.just(“Hello”); CompositeDisposable disposables = new

    CompositeDisposable(); disposables.add(o.subscribeWith(new DisposableObserver<String>() {
 @Override
 public void onNext(String value) { … }
 @Override
 public void onComplete() { … }
 @Override
 public void onError(Throwable e) { … }
 })); disposables.dispose();
  58. Observing Sources Observable<String> o = Observable.just("Hello");
 o.subscribeWith(new DisposableObserver<String>() { …

    }); Maybe<String> m = Maybe.just("Hello");
 m.subscribeWith(new DisposableObserver<String>() { … }); Single<String> s = Single.just("Hello");
 s.subscribeWith(new DisposableObserver<String>() { … }); Completable<String> c = Completable.completed(“Hello");
 c.subscribeWith(new DisposableObserver<String>() { … });
  59. Observing Sources Flowable<String> f = Flowable.just("Hello");
 f.subscribeWith(new DisposableSubscriber<String>() { …

    }); Observable<String> o = Observable.just("Hello");
 o.subscribeWith(new DisposableObserver<String>() { … }); Maybe<String> m = Maybe.just("Hello");
 m.subscribeWith(new DisposableObserver<String>() { … }); Single<String> s = Single.just("Hello");
 s.subscribeWith(new DisposableObserver<String>() { … }); Completable<String> c = Completable.completed(“Hello");
 c.subscribeWith(new DisposableObserver<String>() { … }); • Flowable 也提供類似的 Disposable。
  60. RxJava • A set of classes for representing sources of

    data. • A set of classes for listening to data sources. • A set of methods for modifying and composing the data.
  61. Operators String normal = "hello";
 String upperCase = normal.toUpperCase(); Observable<String>

    normal = Observable.just("hello");
 Observable<String> upperCase = normal.map(s -> s.toUpperCase());
  62. Operators Observable.fromCallable(new Callable<String>() {
 @Override
 public String call() throws Exception

    {
 // ...
 return result;
 }
 }).subscribeOn(Schedulers.computation());
  63. Operators Observable.fromCallable(new Callable<String>() {
 @Override
 public String call() throws Exception

    {
 // ...
 return result;
 }
 }).subscribeOn(Schedulers.computation()); • 在 background thread 做事情。 bg thread
  64. Operators Observable.fromCallable(new Callable<String>() {
 @Override
 public String call() throws Exception

    {
 // ...
 return result;
 }
 }).subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); bg thread
  65. Operators Observable.fromCallable(new Callable<String>() {
 @Override
 public String call() throws Exception

    {
 // ...
 return result;
 }
 }).subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); • Observer 在 main thread 做事情。 bg thread main thread
  66. Operators Observable.fromCallable(new Callable<String>() {
 @Override
 public String call() throws Exception

    {
 // ...
 return result;
 }
 }).subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); • Observer 在 main thread 做事情。 main thread bg thread
  67. Operators Observable.fromCallable(() -> {
 return result;
 }).subscribeOn(Schedulers.computation()) .map() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())

    .subscribe(observer); • 在同⼀一個 Source 中,subscribeOn 是先搶先贏。會依照第⼀一次 subscribeOn 的 schedulers 為準。 computation thread computation thread
  68. • RxJava 1.0 Operator: • first() - return Observable •

    takeFirst() - return Observable Operators
  69. • RxJava 1.0 Operator: • first() - return Observable •

    takeFirst() - return Observable • first():找出第⼀一個 element,如果沒有任何 item 則會回傳 error。 • takeFirst():找出第⼀一個 element,就算沒有拿到任何 item,也會 回傳 succeed。 Operators
  70. • RxJava 2.0 Operator: • firstOrError() - return Single •

    firstElement() - return Maybe Operators
  71. • RxJava 2.0 Operator: • firstOrError() - return Single •

    firstElement() - return Maybe • BJ4 Operators
  72. Being Reactive apiService.listToilets() .subscribeOn(Schedulers.io())
 .toSortedList(this::compareDistance) .observerOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<String>() {
 @Override


    public void onNext(String value) {
 // Show toilets.
 }
 @Override
 public void onComplete() {/* ignore */}
 @Override
 public void onError(Throwable e) {/* show error */}
 });
  73. Being Reactive disposables.add(apiService.listToilets() .subscribeOn(Schedulers.io())
 .toSortedList(this::compareDistance) .observerOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<String>() {
 @Override


    public void onNext(String value) {
 // Show toilets.
 }
 @Override
 public void onComplete() {/* ignore */}
 @Override
 public void onError(Throwable e) {/* show error */}
 }));
  74. Being Reactive // onCreate disposables.add(apiService.listToilets() .subscribeOn(Schedulers.io())
 .toSortedList(this::compareDistance) .observerOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<String>()

    {
 @Override
 public void onNext(String value) {
 // Show toilets.
 }
 @Override
 public void onComplete() {/* ignore */}
 @Override
 public void onError(Throwable e) {/* show error */}
 })); // onDestory disposables.dispose();
  75. Reference • Exploring RxJava 2 for Android
 https://speakerdeck.com/jakewharton/exploring-rxjava-2-for- android-gotocph-october-2016 •

    RxJava Official wiki
 https://github.com/ReactiveX/RxJava/wiki/What's-different- in-2.0 • Reactive-streams API
 http://www.reactive-streams.org/reactive-streams-1.0.0- javadoc/