Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Rx Java
Search
berglind
October 11, 2015
Technology
1
300
Rx Java
Intro to Rx Java and example how it's used in the QuizUp on Android
berglind
October 11, 2015
Tweet
Share
More Decks by berglind
See All by berglind
Cleaner Code
berglind
0
120
The 65K Dex Limit
berglind
0
85
The Android QuizUp Saga
berglind
1
200
Other Decks in Technology
See All in Technology
スタートアップに選択肢を 〜生成AIを活用したセカンダリー事業への挑戦〜
nstock
0
260
SEQUENCE object comparison - db tech showcase 2025 LT2
nori_shinoda
0
170
衛星運用をソフトウェアエンジニアに依頼したときにできあがるもの
sankichi92
1
160
Getting to Know Your Legacy (System) with AI-Driven Software Archeology (WeAreDevelopers World Congress 2025)
feststelltaste
1
160
SREのためのeBPF活用ステップアップガイド
egmc
1
180
Glacierだからってコストあきらめてない? / JAWS Meet Glacier Cost
taishin
1
170
OSSのSNSツール「Misskey」をさわってみよう(右下ワイプで私のOSCの20年を振り返ります) / 20250705-osc2025-do
akkiesoft
0
170
敢えて生成AIを使わないマネジメント業務
kzkmaeda
2
470
改めてAWS WAFを振り返る~業務で使うためのポイント~
masakiokuda
2
300
ゼロからはじめる採用広報
yutadayo
3
990
VS CodeとGitHub Copilotで爆速開発!アップデートの波に乗るおさらい会 / Rapid Development with VS Code and GitHub Copilot: Catch the Latest Wave
yamachu
2
190
20250707-AI活用の個人差を埋めるチームづくり
shnjtk
6
4k
Featured
See All Featured
Become a Pro
speakerdeck
PRO
29
5.4k
How to Ace a Technical Interview
jacobian
278
23k
What’s in a name? Adding method to the madness
productmarketing
PRO
23
3.5k
Statistics for Hackers
jakevdp
799
220k
The Invisible Side of Design
smashingmag
301
51k
The Art of Programming - Codeland 2020
erikaheidi
54
13k
Code Reviewing Like a Champion
maltzj
524
40k
Building an army of robots
kneath
306
45k
Optimising Largest Contentful Paint
csswizardry
37
3.3k
Building Flexible Design Systems
yeseniaperezcruz
328
39k
Learning to Love Humans: Emotional Interface Design
aarron
273
40k
Design and Strategy: How to Deal with People Who Don’t "Get" Design
morganepeng
130
19k
Transcript
RxJava Siggi Jónsson -
[email protected]
• RxJava is a Java VM implementation of Reactive Extensions:
a library for composing asynchronous and event-based programs by using observable sequences. What is RxJava?
• Java: RxJava • JavaScript: RxJS • C#: Rx.NET •
C#(Unity): UniRx • Scala: RxScala • Clojure: RxClojure • C++: RxCpp Platforms • Ruby: Rx.rb • Python: RxPY • Groovy: RxGroovy • JRuby: RxJRuby • Kotlin: RxKotlin • Swift: RxSwift • ObjC: ReactiveCocoa*
• Java: RxJava • JavaScript: RxJS • C#: Rx.NET •
C#(Unity): UniRx • Scala: RxScala • Clojure: RxClojure • C++: RxCpp Platforms • Ruby: Rx.rb • Python: RxPY • Groovy: RxGroovy • JRuby: RxJRuby • Kotlin: RxKotlin • Swift: RxSwift • ObjC: ReactiveCocoa*
• An awesome tool to work with asynchronous streams of
data What is RxJava?
• Button clicks • Text edits • Network requests •
Push notifications • …. What are streams?
What is not a stream?
The Past Life before RxJava
Asynchronous Operations • AsyncTasks • Callbacks • Handlers • Threads
• Runnables • Executors • EventBus
AsyncTask public class SomeTask extends AsyncTask<String, Void, SomeResult> { private
WeakReference<Callback> callback; public SomeTask(Callback callback) { callback = new WeakReference<Callback>(callback); } @Override protected SomeResult doInBackground(String... params) { try { //do stuff } catch (Exception e) { //oh god, what now? return null? } return null; } @Override protected void onPostExecute(SomeResult result) { if (callback.get() != null) { callback.onResult(result); } } }
Callback Hell button.setOnClickListener(new View.OnClickListener() { public void onClick(View v) {
loadPlayer(playerId, new Callback() { public void playerLoaded(Player p1) { loadPlayer(p1.bestFriendId, new Callback() { public void playerLoaded(Player p2) { loadImage(p2.profileUrl, new ImageCallback() { public void imageLoaded(Bitmap bitmap) { imageView.setImageBitmap(bitmap); } }); } }); } }); } });
• Did the player reach the Top 10 in a
topic in the first month after he joined or the first month after the topic was created (which ever is first)? Complexity grows fast
RxJava to the rescue
• Observable -> produces events • Observer/Subscriber -> receives events
• Operators • Schedulers -> Multithreading RxJava Concepts
Observable Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String>
subscriber) { try { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } });
Observable Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String>
subscriber) { try { subscriber.onNext("Hello, World!"); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } });
Observable Observable.just("Hello, World!");
Observer interface Observer<T> { void onNext(T t); void onCompleted();
void onError(Throwable e); }
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together Observable.just("Hello, World").subscribe(new Observer<String>() { @Override public void
onCompleted() { System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together api.getHello().subscribe(new Observer<String>() { @Override public void onCompleted()
{ System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Putting it together api.getHello().subscribe(new Observer<String>() { @Override public void onCompleted()
{ System.out.println("Done!"); } @Override public void onError(Throwable e) { System.err.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
Operators The Toolbox
None
None
None
None
None
None
None
• Did the player reach the Top 10 in a
topic in the first month after he joined or the first month after the topic was created (which ever is first)? Complexity grows fast
Easy Observable.zip( playerService.getPlayer(“<player id>”), topicsService.getTopic("<topic id>”), player, topic -> {
if (player.created.before(topic.created)) return player.created; else return topic.created; } ).flatMap(date -> { return topicsService.getLeaderboard("<topic id>”, date); }).flatMap(leaderboard -> return Observable.from(leaderboard.top); }).exists(player -> { return player.id.equals("<player_id>"); });
Easy
Schedulers RxAndroid
Schedulers
Schedulers • Skipping Frames. The application may be doing too
much work on its main thread. • CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views. • NetworkOnMainThreadException • :(
Schedulers playerService.getFollowers() .subscribe(response -> { adapter.addAll(response.players); });
Schedulers playerService.getFollowers() .observeOn(AndroidSchedulers.mainThread()) .subscribe(response -> { adapter.addAll(response.players); });
Schedulers playerService.getFollowers() .observeOn(AndroidSchedulers.mainThread()) .subscribe(response -> { adapter.addAll(response.players); });
QuizUp Gameplay
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
QuizUp Round
QuizUp Round Observable<MatchEvent> events = Observable.concat( roundStarted(round), showQuestion(round, question).delay(ROUND_START_DELAY), showAnswers(round,
question).delay(showAnswersDelay), answerPeriodStart(round).delay(ANSWER_PERIOD_START_DELAY), Observable.merge( playerAnswer.takeUntil(timer), opponentAnswer.takeUntil(timer), timer.takeUntil(bothPlayersHaveAnswered) ), answerPeriodEnd(round), playerDidNotAnswer(round, question, playerDidAnswer), verifyOpponentIsStillHere(opponentDidAnswer), showCorrectAnswer(round, question).delay(SHOW_CORRECT_ANSWER_DELAY), Observable.empty().delaySubscription(FINISH_UP_ROUND_DELAY) );
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds Observable<MatchEvent> match = Observable.range(1, 7).concatMap(round -> {
return round(round); });
All Rounds
Thank You q.is/jobs