In this little journey, we are gonna see what RxJava offers to us in terms of android development. We will also learn how we can apply it to real life mobile projects by showing different examples and use cases.
ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); try { // get f3 with dependent result from f1 Future<String> f1 = executor.submit(new CallToRemoteServiceA()); Future<String> f3 = executor.submit(new CallToRemoteServiceC(f1.get())); /* The work below can not proceed until f1.get() completes even though there is no dependency */ // also get f4/f5 after dependency f2 completes Future<Integer> f2 = executor.submit(new CallToRemoteServiceB()); Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(f2.get())); Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(f2.get())); System.out.println(f3.get() + " => " + (f4.get() * f5.get())); } finally { executor.shutdownNow(); } }
ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); try { List<Future<?>> futures = new ArrayList<Future<?>>(); // kick off several async tasks futures.add(executor.submit(new CallToRemoteServiceA())); futures.add(executor.submit(new CallToRemoteServiceB())); futures.add(executor.submit(new CallToRemoteServiceC("A"))); futures.add(executor.submit(new CallToRemoteServiceC("B"))); futures.add(executor.submit(new CallToRemoteServiceD(1))); futures.add(executor.submit(new CallToRemoteServiceE(2))); futures.add(executor.submit(new CallToRemoteServiceE(3))); // as each completes do further work for (Future<?> f : futures) { /* this blocks so even if other futures in the list complete earlier they will wait until this one is done */ doMoreWork(f.get()); } } finally { executor.shutdownNow(); } }
f1 executor.execute(new CallToRemoteServiceA(new Callback<String>() { @Override public void call(String f1) { executor.execute(new CallToRemoteServiceC(new Callback<String>() { @Override public void call(String f3) { // we have f1 and f3 now need to compose with others System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5")); // set to thread-safe variable accessible by external scope f3Value.set(f3); latch.countDown(); } }, f1)); } })); ...
to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.
if it encounters an error. onErrorReturn() Instructs an Observable to emit a particular item when it encounters an error. onExceptionResumeNext() Instructs an Observable to continue emitting items after it encounters an exception. retry() If a source Observable emits an error, resubscribe to it in the hopes that it will complete without error. retryWhen() If a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source. Error handling Operators
and Subscriber are independent of the transformational steps in between them. #3: Operators let you do anything to the stream of data. Key ideas behind RxJava http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/
on a * {@link rx.Scheduler} which will execute actions on * the Android UI thread */ @Singleton public class UIThread implements PostExecutionThread { @Inject public UIThread() {} @Override public Scheduler getScheduler() { return AndroidSchedulers.mainThread(); } }
{@link UseCase} that represents a * use case for retrieving a collection of all {@link User}. */ public class GetUserListUseCase extends UseCase { private final UserRepository userRepository; @Inject public GetUserListUseCase(UserRepository userRepository, ThreadExecutor threadExecutor, PostExecutionThread postExecutionThread) { super(threadExecutor, postExecutionThread); this.userRepository = userRepository; } @Override public Observable buildUseCaseObservable() { return this.userRepository.getUsers(); } }
No need to deal with threading an synchronization. #3: Very simple to wrap an http connection in an Observable How do I start with RxJava? Rx at data level
do I start with RxJava? Rx at view level Observable input = Observable.FromEventPattern(textView, "TextChanged") .Select(_ => textbox.Text) .Throttle(TimeSpan.FromSeconds(0.5)) .DistinctUntilChanged();
http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/ Reactive Programming in the Netflix API with RxJava http://techblog.netflix.com/2013/02/rxjava-netflix-api.html Rx for .NET and RxJava for Android http://futurice.com/blog/tech-pick-of-the-week-rx-for-net-and-rxjava-for-android https://github.com/android10/Android-CleanArchitecture Official Documentation https://github.com/ReactiveX/RxJava/wiki https://github.com/android10/Android-ReactiveProgramming