Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Connect.Tech 2017: Dive into RxJS Observables
Search
Jeremy Fairbank
September 22, 2017
Programming
2
640
Connect.Tech 2017: Dive into RxJS Observables
Jeremy Fairbank
September 22, 2017
Tweet
Share
More Decks by Jeremy Fairbank
See All by Jeremy Fairbank
Connect.Tech 2020: Advanced Cypress Testing
jfairbank
1
200
CodeMash 2020: Solving the Boolean Identity Crisis
jfairbank
1
150
CodeMash 2020: Practical Functional Programming
jfairbank
1
320
Connect.Tech 2019: Practical Functional Programming
jfairbank
0
350
Connect.Tech 2019: Solving the Boolean Identity Crisis
jfairbank
0
180
Lambda Squared 2019: Solving the Boolean Identity Crisis
jfairbank
0
120
All Things Open 2018: Practical Functional Programming
jfairbank
2
260
Connect.Tech 2018: Effective React Testing
jfairbank
1
160
Fluent Conf 2018: Building web apps with Elm Tutorial
jfairbank
2
850
Other Decks in Programming
See All in Programming
プロダクトという一杯を作る - プロダクトチームが味の責任を持つまでの煮込み奮闘記
hiliteeternal
0
390
管你要 trace 什麼、bpftrace 用下去就對了 — COSCUP 2025
shunghsiyu
0
180
大規模FlutterプロジェクトのCI実行時間を約8割削減した話
teamlab
PRO
0
450
あまり知られていない MCP 仕様たち / MCP specifications that aren’t widely known
ktr_0731
0
220
オホーツクでコミュニティを立ち上げた理由―地方出身プログラマの挑戦 / TechRAMEN 2025 Conference
lemonade_37
1
430
Quality Gates in the Age of Agentic Coding
helmedeiros
PRO
1
120
SwiftでMCPサーバーを作ろう!
giginet
PRO
2
220
DMMを支える決済基盤の技術的負債にどう立ち向かうか / Addressing Technical Debt in Payment Infrastructure
yoshiyoshifujii
5
760
React は次の10年を生き残れるか:3つのトレンドから考える
oukayuka
41
16k
構文解析器入門
ydah
7
2k
Vibe Codingの幻想を超えて-生成AIを現場で使えるようにするまでの泥臭い話.ai
fumiyakume
21
10k
#QiitaBash TDDで(自分の)開発がどう変わったか
ryosukedtomita
1
350
Featured
See All Featured
Fight the Zombie Pattern Library - RWD Summit 2016
marcelosomers
234
17k
VelocityConf: Rendering Performance Case Studies
addyosmani
332
24k
Become a Pro
speakerdeck
PRO
29
5.5k
Visualization
eitanlees
146
16k
Designing for Performance
lara
610
69k
Designing for humans not robots
tammielis
253
25k
The Illustrated Children's Guide to Kubernetes
chrisshort
48
50k
Scaling GitHub
holman
461
140k
Agile that works and the tools we love
rasmusluckow
329
21k
The Cost Of JavaScript in 2023
addyosmani
51
8.7k
[RailsConf 2023 Opening Keynote] The Magic of Rails
eileencodes
29
9.6k
The Art of Programming - Codeland 2020
erikaheidi
54
13k
Transcript
Dive into RxJS Jeremy Fairbank @elpapapollo / jfairbank Observables
Software is broken. We are here to fix it. Say
[email protected]
Async
Async API Callback
Callback fetchUserById(1, function(err, user) { if (err) { console.error('Could not
retrieve user'); } else { console.log(user); } });
}); }); }); }); }); }); Callback Callback Callback Callback
Callback Callback
function fetchCustomerNameForOrder(orderId, done, fail) { fetchOrder(orderId, function(err, order) { if
(err) { logError(err); fail(err); } else { fetchCustomer( order.customerId, function(err, customer) { if (err) { logError(err); fail(err); } else { done(customer.name); } } ); } }); }
function fetchCustomerNameForOrder(orderId, done, fail) { fetchOrder(orderId, function(err, order) { if
(err) { logError(err); fail(err); } else { fetchCustomer( order.customerId, function(err, customer) { if (err) { logError(err); fail(err); } else { done(customer.name); } } ); } }); }
function fetchCustomerNameForOrder(orderId, done, fail) { fetchOrder(orderId, function(err, order) { if
(err) { logError(err); fail(err); } else { fetchCustomer( order.customerId, function(err, customer) { if (err) { logError(err); fail(err); } else { done(customer.name); } } ); } }); }
Async API ?
Promise function fetchCustomerNameForOrder(orderId) { return fetchOrder(orderId) .then(order => fetchCustomer(order.customerId)) .then(customer
=> customer.name); }
.then(...) .then(...) .then(...) .then(...) .then(...) .then(...) .then(...) .then(...)
Error Error Error Error
function fetchCustomerNameForOrder(orderId) { return fetchOrder(orderId) .then(order => fetchCustomer(order.customerId)) .then(customer =>
customer.name) .catch(err => { logError(err); throw err; }); }
Getting there…
• More readable and maintainable async code • Better error
handling • More declarative and versatile syntax • Capable of handling events, streams, and HTTP
RxJS Observables
Arrays [ 1, 2, 3, 4, 5 ] Sequences in
space
Observables Sequences in time 1 2 3 4 5
Reactive
Reactive 3
const { Observable } = require('rxjs'); const source = Observable.of(1,
2, 3); source.subscribe(x => console.log(x)); // 1 // 2 // 3
const { Observable } = require('rxjs'); const source = Observable.of(1,
2, 3); source.subscribe(x => console.log(x)); // 1 // 2 // 3
const { Observable } = require('rxjs'); const source = Observable.of(1,
2, 3); source.subscribe(x => console.log(x)); // 1 // 2 // 3
const { Observable } = require('rxjs'); const source = Observable.of(1,
2, 3); source.subscribe(x => console.log(x)); // 1 // 2 // 3
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe 1
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe 1
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe 2
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe 2
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe 3
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); console.log subscribe 3
Declarative Transformation Operate on events
Observable.of(1, 2, 3) .map(n => n * 2) .subscribe(x =>
console.log(x)); // 2 // 4 // 6
Observable.of(1, 2, 3) .map(n => n * 2) .subscribe(x =>
console.log(x)); // 2 // 4 // 6
Observable.of(1, 2, 3) console.log n * 2 map subscribe
Observable.of(1, 2, 3) console.log n * 2 map subscribe 1
Observable.of(1, 2, 3) console.log n * 2 map subscribe 1
Observable.of(1, 2, 3) console.log n * 2 map subscribe 2
Observable.of(1, 2, 3) console.log n * 2 map subscribe 2
Observable.of(1, 2, 3) console.log n * 2 map subscribe
Observable.of(1, 2, 3) console.log n * 2 map subscribe 2
Observable.of(1, 2, 3) console.log n * 2 map subscribe 2
Observable.of(1, 2, 3) console.log n * 2 map subscribe 4
Observable.of(1, 2, 3) console.log n * 2 map subscribe 4
Observable.of(1, 2, 3) console.log n * 2 map subscribe
Observable.of(1, 2, 3) console.log n * 2 map subscribe 3
Observable.of(1, 2, 3) console.log n * 2 map subscribe 3
Observable.of(1, 2, 3) console.log n * 2 map subscribe 6
Observable.of(1, 2, 3) console.log n * 2 map subscribe 6
Lazy Transformation Do only as much work as needed
Observable.range(1, 100) .map(n => n * 2) .filter(n => n
> 4) .take(2) .subscribe(x => console.log(x)); // 6 // 8
Observable.range(1, 100) .map(n => n * 2) .filter(n => n
> 4) .take(2) .subscribe(x => console.log(x)); // 6 // 8
Observable.range(1, 100) .map(n => n * 2) .filter(n => n
> 4) .take(2) .subscribe(x => console.log(x)); // 6 // 8
Observable.range(1, 100) .map(n => n * 2) .filter(n => n
> 4) .take(2) .subscribe(x => console.log(x)); // 6 // 8
Observable.range(1, 100) .map(n => n * 2) .filter(n => n
> 4) .take(2) .subscribe(x => console.log(x)); // 6 // 8
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 2 take
console.log n * 2 map subscribe n > 4 1
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 1
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 2
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 2
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 2
Observable.range(1, 100) filter 2 take ×
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 2 take
console.log n * 2 map subscribe n > 4 2
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 2
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 4
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 2 take 4
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 2 take 4 ×
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 2 take
console.log n * 2 map subscribe n > 4 3
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 3
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 6
Observable.range(1, 100) filter 2 take
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 2 take 6
✓ console.log n * 2 map subscribe n > 4
Observable.range(1, 100) filter 2 take 6
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 2 take 6
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 1 take 6
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 1 take
console.log n * 2 map subscribe n > 4 4
Observable.range(1, 100) filter 1 take
console.log n * 2 map subscribe n > 4 4
Observable.range(1, 100) filter 1 take
console.log n * 2 map subscribe n > 4 8
Observable.range(1, 100) filter 1 take
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 1 take 8
✓ console.log n * 2 map subscribe n > 4
Observable.range(1, 100) filter 1 take 8
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 1 take 8
console.log n * 2 map subscribe n > 4 Observable.range(1,
100) filter 0 take 8
DOM Events
let counter = 0; function updateCounter(n) { counter += n;
counterEl.innerHTML = counter; } incrementBtn.addEventListener('click', () => { updateCounter(1); }); decrementBtn.addEventListener('click', () => { updateCounter(-1); });
let counter = 0; function updateCounter(n) { counter += n;
counterEl.innerHTML = counter; } incrementBtn.addEventListener('click', () => { updateCounter(1); }); decrementBtn.addEventListener('click', () => { updateCounter(-1); });
Observable.fromEvent(incrementBtn, 'click') .mapTo(1) .scan((acc, curr) => acc + curr, 0)
.subscribe((counter) => { counterEl.innerHTML = counter; });
Observable.fromEvent(incrementBtn, 'click') .mapTo(1) .scan((acc, curr) => acc + curr, 0)
.subscribe((counter) => { counterEl.innerHTML = counter; });
Observable.fromEvent(incrementBtn, 'click') .mapTo(1) .scan((acc, curr) => acc + curr, 0)
.subscribe((counter) => { counterEl.innerHTML = counter; });
Observable.fromEvent(incrementBtn, 'click') .mapTo(1) .scan((acc, curr) => acc + curr, 0)
.subscribe((counter) => { counterEl.innerHTML = counter; });
Observable.fromEvent(incrementBtn, 'click') .mapTo(1) .scan((acc, curr) => acc + curr, 0)
.subscribe((counter) => { counterEl.innerHTML = counter; });
Observable.fromEvent(incrementBtn, 'click') .mapTo(1) .scan((acc, curr) => acc + curr, 0)
.subscribe((counter) => { counterEl.innerHTML = counter; });
Observable.fromEvent(incrementBtn, 'click') .mapTo(1) .scan((acc, curr) => acc + curr, 0)
.subscribe((counter) => { counterEl.innerHTML = counter; });
.scan((acc, curr) => acc + curr, 0) Accumulated counter value
.scan((acc, curr) => acc + curr, 0) Current event value
(1)
.scan((acc, curr) => acc + curr, 0) Return new accumulated
counter value
.scan((acc, curr) => acc + curr, 0) Initial counter value
counterEl.innerHTML = counter acc + curr 1 mapTo subscribe 0
scan
acc + curr 1 mapTo subscribe e scan 0 counterEl.innerHTML
= counter
1 mapTo subscribe e scan acc + curr 0 counterEl.innerHTML
= counter
1 mapTo subscribe 1 scan acc + curr 0 counterEl.innerHTML
= counter
1 mapTo subscribe scan 1 acc + curr 0 counterEl.innerHTML
= counter
1 mapTo subscribe scan 1 acc + curr 1 Counter
value counterEl.innerHTML = counter
1 mapTo subscribe scan 1 acc + curr 1 counterEl.innerHTML
= counter
acc + curr 1 mapTo subscribe scan 1 counterEl.innerHTML =
counter
acc + curr 1 mapTo subscribe e scan 1 counterEl.innerHTML
= counter
1 mapTo subscribe e scan acc + curr 1 counterEl.innerHTML
= counter
1 mapTo subscribe 1 scan acc + curr 1 counterEl.innerHTML
= counter
1 mapTo subscribe scan 1 acc + curr 1 counterEl.innerHTML
= counter
1 mapTo subscribe scan 2 acc + curr 2 Counter
value counterEl.innerHTML = counter
1 mapTo subscribe scan 2 acc + curr 2 counterEl.innerHTML
= counter
const source = Observable.merge( Observable.fromEvent(incrementBtn, 'click').mapTo(1), Observable.fromEvent(decrementBtn, 'click').mapTo(-1), ); source
.scan((acc, curr) => acc + curr, 0) .subscribe((counter) => { counterEl.innerHTML = counter; });
const source = Observable.merge( Observable.fromEvent(incrementBtn, 'click').mapTo(1), Observable.fromEvent(decrementBtn, 'click').mapTo(-1), ); source
.scan((acc, curr) => acc + curr, 0) .subscribe((counter) => { counterEl.innerHTML = counter; });
const source = Observable.merge( Observable.fromEvent(incrementBtn, 'click').mapTo(1), Observable.fromEvent(decrementBtn, 'click').mapTo(-1), ); source
.scan((acc, curr) => acc + curr, 0) .subscribe((counter) => { counterEl.innerHTML = counter; });
const source = Observable.merge( Observable.fromEvent(incrementBtn, 'click').mapTo(1), Observable.fromEvent(decrementBtn, 'click').mapTo(-1), ); source
.scan((acc, curr) => acc + curr, 0) .subscribe((counter) => { counterEl.innerHTML = counter; });
const source = Observable.merge( Observable.fromEvent(incrementBtn, 'click').mapTo(1), Observable.fromEvent(decrementBtn, 'click').mapTo(-1), ); source
.scan((acc, curr) => acc + curr, 0) .subscribe((counter) => { counterEl.innerHTML = counter; });
acc + curr 1 mapTo subscribe 0 scan -1 mapTo
Increment Decrement counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 0 scan -1 mapTo
Increment Decrement e counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 0 scan -1 mapTo
Increment Decrement e counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 0 scan -1 mapTo
Increment Decrement 1 counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 0 scan -1 mapTo
Increment Decrement 1 counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 1 scan -1 mapTo
Increment Decrement 1 counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 1 scan -1 mapTo
Increment Decrement 1 counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 1 scan -1 mapTo
Increment Decrement counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 1 scan -1 mapTo
Increment Decrement e counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 1 scan -1 mapTo
Increment Decrement e counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 1 scan -1 mapTo
Increment Decrement -1 counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 1 scan -1 mapTo
Increment Decrement -1 counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 0 scan -1 mapTo
Increment Decrement 0 counterEl.innerHTML = counter
acc + curr 1 mapTo subscribe 0 scan -1 mapTo
Increment Decrement 0 counterEl.innerHTML = counter
Async HTTP
Promise fetchOrders() .then((orders) => { orders.forEach((order) => { console.log(order); });
});
Promise fetchOrders() .then((orders) => { orders.forEach((order) => { console.log(order); });
});
Observable fetchOrders() .subscribe((orders) => { orders.forEach((order) => { console.log(order); });
});
Why Observables?
Promise fetchOrders() .then(orders => orders.filter( order => order.customerName === 'Tucker'
)) .then(orders => orders.map(order => order.id)) .then((orderIds) => { orderIds.forEach(id => console.log(id)); });
Promise fetchOrders() .then(orders => orders.filter( order => order.customerName === 'Tucker'
)) .then(orders => orders.map(order => order.id)) .then((orderIds) => { orderIds.forEach(id => console.log(id)); });
Promise fetchOrders() .then(orders => orders.filter( order => order.customerName === 'Tucker'
)) .then(orders => orders.map(order => order.id)) .then((orderIds) => { orderIds.forEach(id => console.log(id)); });
Promise fetchOrders() .then(orders => orders.filter( order => order.customerName === 'Tucker'
)) .then(orders => orders.map(order => order.id)) .then((orderIds) => { orderIds.forEach(id => console.log(id)); });
fetchOrders() .mergeAll() .filter( order => order.customerName === 'Tucker' ) .map(order
=> order.id) .subscribe(id => console.log(id)); Observable
fetchOrders() .mergeAll() .filter( order => order.customerName === 'Tucker' ) .map(order
=> order.id) .subscribe(id => console.log(id)); Observable
fetchOrders() .mergeAll() .filter( order => order.customerName === 'Tucker' ) .map(order
=> order.id) .subscribe(id => console.log(id)); Observable
fetchOrders() .mergeAll() .filter( order => order.customerName === 'Tucker' ) .map(order
=> order.id) .subscribe(id => console.log(id)); Observable
fetchOrders() .mergeAll() .filter( order => order.customerName === 'Tucker' ) .map(order
=> order.id) .subscribe(id => console.log(id)); Observable
Cancellation
const promise = fetchOrders() .then((orders) => { orders.forEach((order) => {
console.log(order); }); }); promise.cancel(); Promise
const promise = fetchOrders() .then((orders) => { orders.forEach((order) => {
console.log(order); }); }); promise.cancel(); Promise
const promise = fetchOrders() .then((orders) => { orders.forEach((order) => {
console.log(order); }); }); promise.cancel(); Promise ×
const subscription = fetchOrders() .subscribe((orders) => { orders.forEach((order) => {
console.log(order); }); }); subscription.unsubscribe(); Observable
const subscription = fetchOrders() .subscribe((orders) => { orders.forEach((order) => {
console.log(order); }); }); subscription.unsubscribe(); Observable Cancel request
Lazy Subscriptions
const p1 = fetchOrders(); const p2 = fetchOrders(); const p3
= fetchOrders(); Promises
const p1 = fetchOrders(); const p2 = fetchOrders(); const p3
= fetchOrders(); Promises Immediate
Observables const o1 = fetchOrders(); const o2 = fetchOrders(); const
o3 = fetchOrders(); o1.subscribe(); o2.subscribe(); o3.subscribe();
Observables Lazy const o1 = fetchOrders(); const o2 = fetchOrders();
const o3 = fetchOrders(); o1.subscribe(); o2.subscribe(); o3.subscribe();
Observables Lazy const o1 = fetchOrders(); const o2 = fetchOrders();
const o3 = fetchOrders(); o1.subscribe(); o2.subscribe(); o3.subscribe(); Issue Request
const o1 = fetchOrders(); o1.subscribe(); o1.subscribe(); o1.subscribe();
const o1 = fetchOrders(); o1.subscribe(); o1.subscribe(); o1.subscribe(); Pure, shareable value
const o1 = fetchOrders(); o1.subscribe(); o1.subscribe(); o1.subscribe(); Issue new request
with same observable Pure, shareable value
Create HTTP Requests
function fetchOrders() { return Observable.ajax.get('/orders'); } Built-in AJAX
function fetchOrders() { return Observable.ajax.get('/orders'); } Built-in AJAX
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } Custom Observable Creation
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } Custom Observable Creation
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } Custom Observable Creation
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } Custom Observable Creation
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } Custom Observable Creation
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } Custom Observable Creation
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } Custom Observable Creation ?
Observable.of(1, 2, 3) .subscribe(x => console.log(x));
Observable.of(1, 2, 3) .subscribe(x => console.log(x)); Subscriber (or Observer)
Observable.of(1, 2, 3) .subscribe({ next: x => console.log(x), }); Subscriber
Object
Observable.of(1, 2, 3) .subscribe({ next: x => console.log(x), complete: ()
=> console.log('Done!'), }); // 1 // 2 // 3 // Done!
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } fetchOrders().mergeAll().subscribe({ next: x => console.log(x), complete: () => console.log('Done!'), });
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } fetchOrders().mergeAll().subscribe({ next: x => console.log(x), complete: () => console.log('Done!'), });
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } fetchOrders().mergeAll().subscribe({ next: x => console.log(x), complete: () => console.log('Done!'), });
Error Handling
const source = Observable.create((subscriber) => { subscriber.next(1); subscriber.error(new Error('Uh oh'));
subscriber.next(2); }); source.subscribe({ next: x => console.log(x), complete: () => console.log('Done!'), error: e => console.error(e), }); // 1 // Error: Uh oh
const source = Observable.create((subscriber) => { subscriber.next(1); subscriber.error(new Error('Uh oh'));
subscriber.next(2); }); source.subscribe({ next: x => console.log(x), complete: () => console.log('Done!'), error: e => console.error(e), }); // 1 // Error: Uh oh
const source = Observable.create((subscriber) => { subscriber.next(1); subscriber.error(new Error('Uh oh'));
subscriber.next(2); }); source.subscribe({ next: x => console.log(x), complete: () => console.log('Done!'), error: e => console.error(e), }); // 1 // Error: Uh oh
const source = Observable.create((subscriber) => { subscriber.next(1); subscriber.error(new Error('Uh oh'));
subscriber.next(2); }); source.subscribe({ next: x => console.log(x), complete: () => console.log('Done!'), error: e => console.error(e), }); // 1 // Error: Uh oh Never called
const source = Observable.create((subscriber) => { subscriber.next(1); subscriber.error(new Error('Uh oh'));
subscriber.next(2); }); source.subscribe(x => console.log(x)); No error handler, so what happens?
Error Error Error Error Promises
const source = Observable.create((subscriber) => { subscriber.next(1); subscriber.error(new Error('Uh oh'));
subscriber.next(2); }); source.subscribe(x => console.log(x)); No swallowed errors!
fetchOrders() .catch((e) => { logError(e); return legacyFetchOrders() .catch((e2) => {
logError(e2); return Observable.of([]); }) }) .subscribe(x => console.log(x)); Catching
fetchOrders() .catch((e) => { logError(e); return legacyFetchOrders() .catch((e2) => {
logError(e2); return Observable.of([]); }) }) .subscribe(x => console.log(x)); Catching
fetchOrders() .catch((e) => { logError(e); return legacyFetchOrders() .catch((e2) => {
logError(e2); return Observable.of([]); }) }) .subscribe(x => console.log(x)); Catching
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([])
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) ✓
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) ✓
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([])
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) ×
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) ×
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) ×
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) × ✓
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) × ✓
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([])
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) ×
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) ×
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) ×
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) × ×
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) × ×
console.log catch subscribe legacyFetchOrders() fetchOrders() catch Observable.of([]) × × []
Hot Cold vs.
Observable creates the source Cold
Best for one-off unique requests. Cold
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); }
function fetchOrders() { return Observable.create((subscriber) => { fetchOrdersFromDb((orders) => {
subscriber.next(orders); subscriber.complete(); }); }); } Resource requested/created at subscription time
WebSockets
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); }); }
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); }); }
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); }); }
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); }); }
const stream = ordersStream(); stream.subscribe(x => console.log(x));
subscriber .next(data) WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x));
subscriber .next(data) ... WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x));
subscriber .next(data) ... WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x));
subscriber .next(data) ... WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x));
subscriber .next(data) ... WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x));
None
.subscribe(...)
.subscribe(...) .subscribe(...)
.subscribe(...) .subscribe(...) .subscribe(...)
.subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...)
.subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...)
.subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...)
.subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...)
.subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...) .subscribe(...)
Hot Observable closes over the source
Hot Best for multicasting and sharing resources.
const url = 'ws://example.com/orders'; const socket = new WebSocket(url); function
ordersStream() { return Observable.create((subscriber) => { socket.addEventListener('message', (data) => { subscriber.next(data); }); }); }
const url = 'ws://example.com/orders'; const socket = new WebSocket(url); function
ordersStream() { return Observable.create((subscriber) => { socket.addEventListener('message', (data) => { subscriber.next(data); }); }); } Resource created outside subscription
const url = 'ws://example.com/orders'; const socket = new WebSocket(url); function
ordersStream() { return Observable.create((subscriber) => { socket.addEventListener('message', (data) => { subscriber.next(data); }); }); } Close over existing resource when subscribing
const stream = ordersStream(); const sub1 = stream.subscribe(x => console.log(x));
const sub2 = stream.subscribe(x => console.log(x));
Shared stream of data const stream = ordersStream(); const sub1
= stream.subscribe(x => console.log(x)); const sub2 = stream.subscribe(x => console.log(x));
subscriber .next(event.data) WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x))
subscriber .next(event.data) ... WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x))
subscriber .next(event.data) ... WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x))
subscriber .next(event.data) ... WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x))
subscriber .next(event.data) ... WebSocket Subscribed Observable console.log stream.subscribe(x => console.log(x))
subscriber .next(event.data) WebSocket Subscribed Observable console.log New Subscribed Observable console.log
stream.subscribe(...) stream.subscribe(...)
subscriber .next(event.data) WebSocket Subscribed Observable console.log ... New Subscribed Observable
console.log stream.subscribe(...) stream.subscribe(...)
subscriber .next(event.data) WebSocket Subscribed Observable console.log ... New Subscribed Observable
console.log stream.subscribe(...) stream.subscribe(...)
subscriber .next(event.data) WebSocket Subscribed Observable console.log ... New Subscribed Observable
console.log ... stream.subscribe(...) stream.subscribe(...)
subscriber .next(event.data) ... WebSocket Subscribed Observable console.log ... New Subscribed
Observable console.log stream.subscribe(...) stream.subscribe(...)
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); }) .share(); } Share It
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); }) .share(); } Share It
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); }) .share(); } Share It
const stream = ordersStream(); const sub1 = stream.subscribe(x => console.log(x));
const sub2 = stream.subscribe(x => console.log(x));
Shared stream of data too const stream = ordersStream(); const
sub1 = stream.subscribe(x => console.log(x)); const sub2 = stream.subscribe(x => console.log(x));
Clean Up
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); return () => { socket.close(); }; }) .share(); }
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); return () => { socket.close(); }; }) .share(); } Called when all subscriptions unsubscribe
function ordersStream() { return Observable.create((subscriber) => { const url =
'ws://example.com/orders'; const socket = new WebSocket(url); socket.addEventListener('message', (data) => { subscriber.next(data); }); return () => { socket.close(); }; }) .share(); } Close socket, deallocate resources, etc.
fetchOrders() .mergeAll() .filter( order => order.customerName === 'Tucker' ) .map(order
=> order.id) .subscribe(id => console.log(id)); Recall
fetchOrders() .mergeAll() .filter( order => order.customerName === 'Tucker' ) .map(order
=> order.id) .subscribe(id => console.log(id)); ? Recall
Observable.of([1, 2, 3]) .subscribe(x => console.log(x)); // [ 1, 2,
3 ]
Observable.of([1, 2, 3]) .subscribe(x => console.log(x)); // [ 1, 2,
3 ]
Observable.of([1, 2, 3]) .subscribe(x => console.log(x)); // [ 1, 2,
3 ]
Observable.of([1, 2, 3]) .mergeAll() .subscribe(x => console.log(x)); // 1 //
2 // 3
Observable.of([1, 2, 3]) .mergeAll() .subscribe(x => console.log(x)); // 1 //
2 // 3 Flatten
Observable.of([1, 2, 3]) .mergeAll() .subscribe(x => console.log(x)); // 1 //
2 // 3
subscribe mergeAll console.log [1, 2, 3]
subscribe mergeAll console.log [1, 2, 3]
subscribe mergeAll console.log [2, 3] 1
subscribe mergeAll console.log [2, 3] 1
subscribe mergeAll console.log [3] 2
subscribe mergeAll console.log [3] 2
subscribe mergeAll console.log 3
subscribe mergeAll console.log 3
Higher Order Observables
Observable.of(1, 2, 3) .map(n => n * 2) .subscribe(x =>
console.log(x)); // 2 // 4 // 6
Observable.of(1, 2, 3) .map(n => n * 2) .subscribe(x =>
console.log(x)); // 2 // 4 // 6 Delay?
Observable.of(1, 2, 3) .map(n => Observable.of(n * 2)) .subscribe(x =>
console.log(x)); // ScalarObservable { value: 2 } // ScalarObservable { value: 4 } // ScalarObservable { value: 6 }
Observable.of(1, 2, 3) .map(n => Observable.of(n * 2)) .subscribe(x =>
console.log(x)); // ScalarObservable { value: 2 } // ScalarObservable { value: 4 } // ScalarObservable { value: 6 }
Observable.of(1, 2, 3) .map(n => Observable.of(n * 2)) .subscribe(x =>
console.log(x)); // ScalarObservable { value: 2 } // ScalarObservable { value: 4 } // ScalarObservable { value: 6 }
Observable.of(1, 2, 3) .map(n => Observable.of(n * 2)) .mergeAll() .subscribe(x
=> console.log(x)); // 2 // 4 // 6
Observable.of(1, 2, 3) .map(n => Observable.of(n * 2)) .mergeAll() .subscribe(x
=> console.log(x)); // 2 // 4 // 6
Observable.of(1, 2, 3) .mergeMap(n => Observable.of(n * 2)) .subscribe(x =>
console.log(x)); // 2 // 4 // 6
subscribe mergeMap console.log Observable.of(n * 2)
subscribe mergeMap console.log Observable.of(n * 2) 1
subscribe mergeMap console.log Observable.of(n * 2) 1
subscribe mergeMap console.log Observable.of(n * 2) Observable.of(2) subscribe 2
subscribe mergeMap console.log Observable.of(n * 2) Observable.of(2) subscribe 2
subscribe mergeMap console.log Observable.of(n * 2) Observable.of(2) subscribe 2
Observable.of(1) .delay(1000) .subscribe(x => console.log(x)) // <tick> // 1
Observable.of(1) .delay(1000) .subscribe(x => console.log(x)) // <tick> // 1
Observable.of(1) .delay(1000) .subscribe(x => console.log(x)) // <tick> // 1
Observable.of(1) .delay(1000) .subscribe(x => console.log(x)) // <tick> // 1
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ))
.subscribe(x => console.log(x)); // <tick> // 2 // 4 // 6
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ))
.subscribe(x => console.log(x)); // <tick> // 2 // 4 // 6
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ))
.subscribe(x => console.log(x)); // <tick> // 2 // 4 // 6
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ))
.subscribe(x => console.log(x)); // <tick> // 2 // 4 // 6 ?
Concurrency
subscribe mergeMap console.log Observable.of(n * 2)
subscribe mergeMap console.log Observable.of(n * 2) 1
subscribe mergeMap console.log Observable.of(n * 2) 1
subscribe mergeMap console.log Observable.of(n * 2) 2
subscribe mergeMap console.log Observable.of(n * 2) 2 2
subscribe mergeMap console.log Observable.of(n * 2) 2 2
subscribe mergeMap console.log Observable.of(n * 2) 2 4
subscribe mergeMap console.log Observable.of(n * 2) 3 2 4
subscribe mergeMap console.log Observable.of(n * 2) 3 2 4
subscribe mergeMap console.log Observable.of(n * 2) 2 4 6
subscribe mergeMap console.log Observable.of(n * 2) 2 4 6
subscribe mergeMap console.log Observable.of(n * 2) 4 6 2
subscribe mergeMap console.log Observable.of(n * 2) 6 4
subscribe mergeMap console.log Observable.of(n * 2) 6
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ),
1) .subscribe(x => console.log(x)); // <tick> // 2 // <tick> // 4 // <tick> // 6
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ),
1) .subscribe(x => console.log(x)); // <tick> // 2 // <tick> // 4 // <tick> // 6
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ),
1) .subscribe(x => console.log(x)); // <tick> // 2 // <tick> // 4 // <tick> // 6
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ),
1) .subscribe(x => console.log(x)); // <tick> // 2 // <tick> // 4 // <tick> // 6
Observable.of(1, 2, 3) .mergeMap(n => ( Observable.of(n * 2).delay(1000) ),
1) .subscribe(x => console.log(x)); // <tick> // 2 // <tick> // 4 // <tick> // 6
Observable.of(1, 2, 3) .concatMap(n => ( Observable.of(n * 2).delay(1000) ))
.subscribe(x => console.log(x)); // <tick> // 2 // <tick> // 4 // <tick> // 6
subscribe concatMap console.log Observable.of(n * 2)
subscribe console.log Observable.of(n * 2) 1 concatMap
subscribe console.log Observable.of(n * 2) 1 concatMap
subscribe console.log Observable.of(n * 2) 2 concatMap
subscribe console.log Observable.of(n * 2) 2 concatMap
subscribe console.log Observable.of(n * 2) 2 concatMap
subscribe console.log Observable.of(n * 2) 2 concatMap
subscribe console.log Observable.of(n * 2) 2 concatMap
subscribe console.log Observable.of(n * 2) 4 concatMap
subscribe console.log Observable.of(n * 2) 4 concatMap
subscribe console.log Observable.of(n * 2) 4 concatMap
subscribe console.log Observable.of(n * 2) 3 concatMap
subscribe console.log Observable.of(n * 2) 3 concatMap
subscribe console.log Observable.of(n * 2) 6 concatMap
subscribe console.log Observable.of(n * 2) 6 concatMap
subscribe console.log Observable.of(n * 2) 6 concatMap
Rate Limiting
Observable.of(1, 2, 3) .concatMap((id) => { const url = `/orders/${id}`;
return Observable.ajax.get(url) .delay(1000); }) .pluck('response') .bufferCount(3) .subscribe(x => console.log(x)); // [ { id: '1', name: 'Order 1' }, // { id: '2', name: 'Order 2' }, // { id: '3', name: 'Order 3' } ]
Observable.of(1, 2, 3) .concatMap((id) => { const url = `/orders/${id}`;
return Observable.ajax.get(url) .delay(1000); }) .pluck('response') .bufferCount(3) .subscribe(x => console.log(x)); // [ { id: '1', name: 'Order 1' }, // { id: '2', name: 'Order 2' }, // { id: '3', name: 'Order 3' } ]
Observable.of(1, 2, 3) .concatMap((id) => { const url = `/orders/${id}`;
return Observable.ajax.get(url) .delay(1000); }) .pluck('response') .bufferCount(3) .subscribe(x => console.log(x)); // [ { id: '1', name: 'Order 1' }, // { id: '2', name: 'Order 2' }, // { id: '3', name: 'Order 3' } ]
Observable.of(1, 2, 3) .concatMap((id) => { const url = `/orders/${id}`;
return Observable.ajax.get(url) .delay(1000); }) .pluck('response') .bufferCount(3) .subscribe(x => console.log(x)); // [ { id: '1', name: 'Order 1' }, // { id: '2', name: 'Order 2' }, // { id: '3', name: 'Order 3' } ]
Observable .ajax .get(url) subscribe console.log pluck bufferCount 'response' 3 concatMap
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 3
1
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 3
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 3
R Rate limit next request
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 3
R
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 3
R
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 3
O1
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 2
O1
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 2
O1 +
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 1
O1 O2
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 1
O1 O2 +
Observable .ajax .get(url) subscribe concatMap console.log pluck bufferCount 'response' 0
O1 O2 O3
Observable .ajax .get(url) subscribe concatMap console.log […] pluck bufferCount 'response'
0
Sequential vs. Parallel
const promise = Promise.all([ fetchOrder(1), fetchOrder(2), fetchOrder(3), ]); promise.then(x =>
console.log(x)); // [ { id: '1', name: 'Order 1' }, // { id: '2', name: 'Order 2' }, // { id: '3', name: 'Order 3' } ]
const promise = Promise.all([ fetchOrder(1), fetchOrder(2), fetchOrder(3), ]); promise.then(x =>
console.log(x)); // [ { id: '1', name: 'Order 1' }, // { id: '2', name: 'Order 2' }, // { id: '3', name: 'Order 3' } ]
const source = Observable.forkJoin( fetchOrder(1), fetchOrder(2), fetchOrder(3) ); source.subscribe(x =>
console.log(x)); // [ { response: { id: 1, name: 'Order 1' } }, // { response: { id: 2, name: 'Order 2' } }, // { response: { id: 3, name: 'Order 3' } } ]
const source = Observable.forkJoin( fetchOrder(1), fetchOrder(2), fetchOrder(3) ); source.subscribe(x =>
console.log(x)); // [ { response: { id: 1, name: 'Order 1' } }, // { response: { id: 2, name: 'Order 2' } }, // { response: { id: 3, name: 'Order 3' } } ]
const source = Observable.forkJoin( fetchOrder(1), fetchOrder(2), fetchOrder(3) ); source.subscribe(x =>
console.log(x)); // [ { response: { id: 1, name: 'Order 1' } }, // { response: { id: 2, name: 'Order 2' } }, // { response: { id: 3, name: 'Order 3' } } ] Observable or Promise
fetchOrder(1) fetchOrder(2) fetchOrder(3) forkJoin subscribe console.log
fetchOrder(1) fetchOrder(2) fetchOrder(3) forkJoin subscribe console.log R1
fetchOrder(1) fetchOrder(2) fetchOrder(3) forkJoin R1 subscribe console.log
fetchOrder(1) fetchOrder(2) fetchOrder(3) forkJoin R1 subscribe console.log R3
fetchOrder(1) fetchOrder(2) fetchOrder(3) forkJoin R1 R3 subscribe console.log
fetchOrder(1) fetchOrder(2) fetchOrder(3) forkJoin R1 R3 subscribe console.log R2
fetchOrder(1) fetchOrder(2) fetchOrder(3) forkJoin R1 R2 R3 subscribe console.log
fetchOrder(1) fetchOrder(2) fetchOrder(3) forkJoin subscribe console.log R1 R2 R3
Tip of the iceberg
• Declarative, lazy operations • Expressive event management • No
more error swallowing • Rate limiting and concurrent processing
• Declarative, lazy operations • Expressive event management • No
more error swallowing • Rate limiting and concurrent processing ✓ ✓ ✓ ✓
github.com/ReactiveX/rxjs reactivex.io/rxjs RxJS
Observables ECMAScript Proposal: github.com/tc39/proposal-observable Another spec implementation: github.com/zenparsing/zen-observable
Thanks! Jeremy Fairbank @elpapapollo / jfairbank Slides: bit.ly/rxjs-connect