onNext(Article article) { // this can be called multiple times } @Override public void onError(Throwable e) { // no more events, and automatically unsubscribed } @Override public void onCompleted() { // no more events, and automatically unsubscribed } });
• Apply where it makes sense ◦ Domain layer / business logic ◦ REST calls (Retrofit) ◦ When coping with life cycle ◦ More specialised event bus • UI, maybe, but save it for later and/or specific cases.
supposed to be in stock 2. Group all articles by the article id, derived from the rfid tag 3. Count all articles in the group 4. Emit one ArticleQuantity object per group
observable.filter(article -> article.isInStock). groupBy(this::articleIdFromTag). // convert each group to a new observable flatMap(group -> // each item in the group is converted to ArticleQuantity group.map(articleInfo -> new ArticleQuantity(group.getKey(), 1)). // then these items are reduced to a single item per group reduce((q1, q2) -> new ArticleQuantity(q1.articleId, q1.quantity + q2.quantity)) ); }
observable.filter(article -> article.isInStock). groupBy(this::articleIdFromTag). // convert each group to a new observable flatMap(group -> // each item in the group is converted to ArticleQuantity group.map(article -> new ArticleQuantity(group.getKey(), 1)). // then these items are reduced to a single item per group reduce((q1, q2) -> new ArticleQuantity(q1.articleId, q1.quantity + q2.quantity)) ); }
observable.filter(article -> article.isInStock). groupBy(this::articleIdFromTag). // convert each group to a new observable flatMap(group -> // each item in the group is converted to ArticleQuantity group.map(article -> new ArticleQuantity(group.getKey(), 1)). // then these items are reduced to a single item per group reduce((q1, q2) -> new ArticleQuantity(q1.articleId, q1.quantity + q2.quantity)) ); }
observable.filter(article -> article.isInStock). groupBy(this::articleIdFromTag). // convert each group to a new observable // group type: Observable<GroupObservable<String,Article>> flatMap(group -> // GroupObservable<String, Article> // each item in the group is converted to ArticleQuantity group.map(article -> new ArticleQuantity(group.getKey(), 1)). // then these items are reduced to a single item per group reduce((q1, q2) -> new ArticleQuantity(q1.articleId, q1.quantity + q2.quantity)) ); }
observable.filter(article -> article.isInStock). groupBy(this::articleIdFromTag). // convert each group to a new observable // group type: Observable<GroupObservable<String,Article>> flatMap(group -> // GroupObservable<String, Article> // each item in the group is converted to ArticleQuantity group.map(article -> new ArticleQuantity(group.getKey(), 1)). // then these items are reduced to a single item per group reduce((q1, q2) -> new ArticleQuantity(q1.articleId, q1.quantity + q2.quantity)) ); }
observable.filter(article -> article.isInStock). groupBy(this::articleIdFromTag). // convert each group to a new observable flatMap(group -> // each item in the group is converted to ArticleQuantity group.map(articleInfo -> new ArticleQuantity(group.getKey(), 1)). // then these items are reduced to a single item per group reduce((q1, q2) -> new ArticleQuantity(q1.articleId, q1.quantity + q2.quantity)) ); }
once!) • observeOn controls the thread for events • Operators with a default scheduler, like timer() observable.map(...).subscribeOn(io()).observeOn(mainThread()).flatMap(...);
Don’t subscribe() in an operator, but flatMap createObservable().map(value -> { // this is not the place to subscribe or call // other functions. createAnotherObservable().subscribe( … ); return value; });
Apply schedulers late as possible for better API // this gets boring and cumbersome retrieveArticle(). observeOn(Schedulers.io()). subscribeOn(AndroidSchedulers.mainThread()). subscribe(article -> {});
Apply schedulers late as possible for better API /** * Retrieve a single article. Will return the result on the main thread * @return the observable */ public Observable<Article> retrieveArticle() { return articleHttpCall(). subscribeOn(Schedulers.io()). observeOn(AndroidSchedulers.mainThread()); } // does the right thing retrieveArticle().subscribe(article -> { … }, … );