Lock in $30 Savings on PRO—Offer Ends Soon! ⏳
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Rx Java
Search
berglind
October 11, 2015
Technology
1
310
Rx Java
Intro to Rx Java and example how it's used in the QuizUp on Android
berglind
October 11, 2015
Tweet
Share
More Decks by berglind
See All by berglind
Cleaner Code
berglind
0
120
The 65K Dex Limit
berglind
0
91
The Android QuizUp Saga
berglind
1
220
Other Decks in Technology
See All in Technology
AI駆動開発ライフサイクル(AI-DLC)の始め方
ryansbcho79
0
190
フィッシュボウルのやり方 / How to do a fishbowl
pauli
2
390
Agent Skillsがハーネスの垣根を超える日
gotalab555
6
4.4k
オープンソースKeycloakのMCP認可サーバの仕様の対応状況 / 20251219 OpenID BizDay #18 LT Keycloak
oidfj
0
180
なぜ あなたはそんなに re:Invent に行くのか?
miu_crescent
PRO
0
210
投資戦略を量産せよ 2 - マケデコセミナー(2025/12/26)
gamella
0
420
SREが取り組むデプロイ高速化 ─ Docker Buildを最適化した話
capytan
0
150
AWS運用を効率化する!AWS Organizationsを軸にした一元管理の実践/nikkei-tech-talk-202512
nikkei_engineer_recruiting
0
170
_第4回__AIxIoTビジネス共創ラボ紹介資料_20251203.pdf
iotcomjpadmin
0
140
Oracle Database@AWS:サービス概要のご紹介
oracle4engineer
PRO
1
410
株式会社ビザスク_AI__Engineering_Summit_Tokyo_2025_登壇資料.pdf
eikohashiba
1
120
100以上の新規コネクタ提供を可能にしたアーキテクチャ
ooyukioo
0
260
Featured
See All Featured
Between Models and Reality
mayunak
0
150
ピンチをチャンスに:未来をつくるプロダクトロードマップ #pmconf2020
aki_iinuma
128
55k
A Guide to Academic Writing Using Generative AI - A Workshop
ks91
PRO
0
170
Gemini Prompt Engineering: Practical Techniques for Tangible AI Outcomes
mfonobong
2
230
How To Stay Up To Date on Web Technology
chriscoyier
791
250k
The SEO Collaboration Effect
kristinabergwall1
0
310
The Myth of the Modular Monolith - Day 2 Keynote - Rails World 2024
eileencodes
26
3.3k
VelocityConf: Rendering Performance Case Studies
addyosmani
333
24k
First, design no harm
axbom
PRO
1
1.1k
The untapped power of vector embeddings
frankvandijk
1
1.5k
Organizational Design Perspectives: An Ontology of Organizational Design Elements
kimpetersen
PRO
0
45
Building Adaptive Systems
keathley
44
2.9k
Transcript
RxJava Siggi Jónsson -
[email protected]
• RxJava is a Java VM implementation of Reactive Extensions:
a library for composing asynchronous and event-based programs by using observable sequences. What is RxJava?
• Java: RxJava • JavaScript: RxJS • C#: Rx.NET •
C#(Unity): UniRx • Scala: RxScala • Clojure: RxClojure • C++: RxCpp Platforms • Ruby: Rx.rb • Python: RxPY • Groovy: RxGroovy • JRuby: RxJRuby • Kotlin: RxKotlin • Swift: RxSwift • ObjC: ReactiveCocoa*
• Java: RxJava • JavaScript: RxJS • C#: Rx.NET •
C#(Unity): UniRx • Scala: RxScala • Clojure: RxClojure • C++: RxCpp Platforms • Ruby: Rx.rb • Python: RxPY • Groovy: RxGroovy • JRuby: RxJRuby • Kotlin: RxKotlin • Swift: RxSwift • ObjC: ReactiveCocoa*
• An awesome tool to work with asynchronous streams of
data What is RxJava?
• Button clicks • Text edits • Network requests •
Push notifications • …. What are streams?
What is not a stream?
The Past Life before RxJava
Asynchronous Operations • AsyncTasks • Callbacks • Handlers • Threads
• Runnables • Executors • EventBus
AsyncTask public class SomeTask extends AsyncTask<String, Void, SomeResult> { private
WeakReference<Callback> callback; public SomeTask(Callback callback) { callback = new WeakReference<Callback>(callback); } @Override protected SomeResult doInBackground(String... params) { try { //do stuff } catch (Exception e) { //oh god, what now? return null? } return null; } @Override protected void onPostExecute(SomeResult result) { if (callback.get() != null) { callback.onResult(result); } } }
Callback Hell button.setOnClickListener(new View.OnClickListener() { public void onClick(View v) {
loadPlayer(playerId, new Callback() { public void playerLoaded(Player p1) { loadPlayer(p1.bestFriendId, new Callback() { public void playerLoaded(Player p2) { loadImage(p2.profileUrl, new ImageCallback() { public void imageLoaded(Bitmap bitmap) { imageView.setImageBitmap(bitmap); } }); } }); } }); } });
• Did the player reach the Top 10 in a
topic in the first month after he joined or the first month after the topic was created (which ever is first)? Complexity grows fast
RxJava to the rescue
• Observable -> produces events • Observer/Subscriber -> receives events
• Operators • Schedulers -> Multithreading RxJava Concepts
Observable Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String>
subscriber) { try { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } });
Observable Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String>
subscriber) { try { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } });
Observable Observable.just("Hello, World!");
Observer interface Observer<T> { void onNext(T t); void onCompleted();
void onError(Throwable e); }
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together api.getHello().subscribe(new Observer<String>() { @Override public void onCompleted()
{ System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together api.getHello().subscribe(new Observer<String>() { @Override public void onCompleted()
{ System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Operators The Toolbox
None
None
None
None
None
None
None
• Did the player reach the Top 10 in a
topic in the first month after he joined or the first month after the topic was created (which ever is first)? Complexity grows fast
Easy Observable.zip( playerService.getPlayer(“<player id>”), topicsService.getTopic("<topic id>”), player, topic -> {
if (player.created.before(topic.created)) return player.created; else return topic.created; } ).flatMap(date -> { return topicsService.getLeaderboard("<topic id>”, date); }).flatMap(leaderboard -> return Observable.from(leaderboard.top); }).exists(player -> { return player.id.equals("<player_id>"); });
Easy
Schedulers RxAndroid
Schedulers
Schedulers • Skipping Frames. The application may be doing too
much work on its main thread. • CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views. • NetworkOnMainThreadException • :(
Schedulers playerService.getFollowers() .subscribe(response -> { adapter.addAll(response.players); });
Schedulers playerService.getFollowers() .observeOn(AndroidSchedulers.mainThread()) .subscribe(response -> { adapter.addAll(response.players); });
Schedulers playerService.getFollowers() .observeOn(AndroidSchedulers.mainThread()) .subscribe(response -> { adapter.addAll(response.players); });
QuizUp Gameplay
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds
Thank You q.is/jobs