Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Rx Java
Search
Sponsored
·
Ship Features Fearlessly
Turn features on and off without deploys. Used by thousands of Ruby developers.
→
berglind
October 11, 2015
Technology
1
310
Rx Java
Intro to Rx Java and example how it's used in the QuizUp on Android
berglind
October 11, 2015
Tweet
Share
More Decks by berglind
See All by berglind
Cleaner Code
berglind
0
120
The 65K Dex Limit
berglind
0
93
The Android QuizUp Saga
berglind
1
220
Other Decks in Technology
See All in Technology
SREじゃなかった僕らがenablingを通じて「SRE実践者」になるまでのリアル / SRE Kaigi 2026
aeonpeople
6
2k
クレジットカード決済基盤を支えるSRE - 厳格な監査とSRE運用の両立 (SRE Kaigi 2026)
capytan
6
2.4k
M&A 後の統合をどう進めるか ─ ナレッジワーク × Poetics が実践した組織とシステムの融合
kworkdev
PRO
1
380
Introduction to Sansan, inc / Sansan Global Development Center, Inc.
sansan33
PRO
0
3k
Oracle Cloud Observability and Management Platform - OCI 運用監視サービス概要 -
oracle4engineer
PRO
2
14k
セキュリティ はじめの一歩
nikinusu
0
1.5k
Amazon Bedrock AgentCore 認証・認可入門
hironobuiga
2
500
予期せぬコストの急増を障害のように扱う――「コスト版ポストモーテム」の導入とその後の改善
muziyoshiz
1
1.5k
名刺メーカーDevグループ 紹介資料
sansan33
PRO
0
1k
ClickHouseはどのように大規模データを活用したAIエージェントを全社展開しているのか
mikimatsumoto
0
190
生成AI時代にこそ求められるSRE / SRE for Gen AI era
ymotongpoo
5
2.5k
小さく始めるBCP ― 多プロダクト環境で始める最初の一歩
kekke_n
1
320
Featured
See All Featured
End of SEO as We Know It (SMX Advanced Version)
ipullrank
3
3.9k
The Curse of the Amulet
leimatthew05
1
8.2k
How to build an LLM SEO readiness audit: a practical framework
nmsamuel
1
640
Why Mistakes Are the Best Teachers: Turning Failure into a Pathway for Growth
auna
0
50
Un-Boring Meetings
codingconduct
0
200
Visualizing Your Data: Incorporating Mongo into Loggly Infrastructure
mongodb
49
9.8k
How to make the Groovebox
asonas
2
1.9k
Self-Hosted WebAssembly Runtime for Runtime-Neutral Checkpoint/Restore in Edge–Cloud Continuum
chikuwait
0
320
Jamie Indigo - Trashchat’s Guide to Black Boxes: Technical SEO Tactics for LLMs
techseoconnect
PRO
0
55
We Are The Robots
honzajavorek
0
160
Introduction to Domain-Driven Design and Collaborative software design
baasie
1
580
Git: the NoSQL Database
bkeepers
PRO
432
66k
Transcript
RxJava Siggi Jónsson -
[email protected]
• RxJava is a Java VM implementation of Reactive Extensions:
a library for composing asynchronous and event-based programs by using observable sequences. What is RxJava?
• Java: RxJava • JavaScript: RxJS • C#: Rx.NET •
C#(Unity): UniRx • Scala: RxScala • Clojure: RxClojure • C++: RxCpp Platforms • Ruby: Rx.rb • Python: RxPY • Groovy: RxGroovy • JRuby: RxJRuby • Kotlin: RxKotlin • Swift: RxSwift • ObjC: ReactiveCocoa*
• Java: RxJava • JavaScript: RxJS • C#: Rx.NET •
C#(Unity): UniRx • Scala: RxScala • Clojure: RxClojure • C++: RxCpp Platforms • Ruby: Rx.rb • Python: RxPY • Groovy: RxGroovy • JRuby: RxJRuby • Kotlin: RxKotlin • Swift: RxSwift • ObjC: ReactiveCocoa*
• An awesome tool to work with asynchronous streams of
data What is RxJava?
• Button clicks • Text edits • Network requests •
Push notifications • …. What are streams?
What is not a stream?
The Past Life before RxJava
Asynchronous Operations • AsyncTasks • Callbacks • Handlers • Threads
• Runnables • Executors • EventBus
AsyncTask public class SomeTask extends AsyncTask<String, Void, SomeResult> { private
WeakReference<Callback> callback; public SomeTask(Callback callback) { callback = new WeakReference<Callback>(callback); } @Override protected SomeResult doInBackground(String... params) { try { //do stuff } catch (Exception e) { //oh god, what now? return null? } return null; } @Override protected void onPostExecute(SomeResult result) { if (callback.get() != null) { callback.onResult(result); } } }
Callback Hell button.setOnClickListener(new View.OnClickListener() { public void onClick(View v) {
loadPlayer(playerId, new Callback() { public void playerLoaded(Player p1) { loadPlayer(p1.bestFriendId, new Callback() { public void playerLoaded(Player p2) { loadImage(p2.profileUrl, new ImageCallback() { public void imageLoaded(Bitmap bitmap) { imageView.setImageBitmap(bitmap); } }); } }); } }); } });
• Did the player reach the Top 10 in a
topic in the first month after he joined or the first month after the topic was created (which ever is first)? Complexity grows fast
RxJava to the rescue
• Observable -> produces events • Observer/Subscriber -> receives events
• Operators • Schedulers -> Multithreading RxJava Concepts
Observable Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String>
subscriber) { try { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } });
Observable Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String>
subscriber) { try { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } });
Observable Observable.just("Hello, World!");
Observer interface Observer<T> { void onNext(T t); void onCompleted();
void onError(Throwable e); }
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together api.getHello().subscribe(new Observer<String>() { @Override public void onCompleted()
{ System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together api.getHello().subscribe(new Observer<String>() { @Override public void onCompleted()
{ System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Operators The Toolbox
None
None
None
None
None
None
None
• Did the player reach the Top 10 in a
topic in the first month after he joined or the first month after the topic was created (which ever is first)? Complexity grows fast
Easy Observable.zip( playerService.getPlayer(“<player id>”), topicsService.getTopic("<topic id>”), player, topic -> {
if (player.created.before(topic.created)) return player.created; else return topic.created; } ).flatMap(date -> { return topicsService.getLeaderboard("<topic id>”, date); }).flatMap(leaderboard -> return Observable.from(leaderboard.top); }).exists(player -> { return player.id.equals("<player_id>"); });
Easy
Schedulers RxAndroid
Schedulers
Schedulers • Skipping Frames. The application may be doing too
much work on its main thread. • CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views. • NetworkOnMainThreadException • :(
Schedulers playerService.getFollowers() .subscribe(response -> { adapter.addAll(response.players); });
Schedulers playerService.getFollowers() .observeOn(AndroidSchedulers.mainThread()) .subscribe(response -> { adapter.addAll(response.players); });
Schedulers playerService.getFollowers() .observeOn(AndroidSchedulers.mainThread()) .subscribe(response -> { adapter.addAll(response.players); });
QuizUp Gameplay
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds
Thank You q.is/jobs