Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Introduction to Functional-Reactive Programming

Grzegorz Dyrda
September 14, 2017

Introduction to Functional-Reactive Programming

A gentle introduction to Functional-Reactive Programming (FRP).

Includes short intro to Functional Programming (FP), which actually is the foundation of FRP.

Grzegorz Dyrda

September 14, 2017
Tweet

More Decks by Grzegorz Dyrda

Other Decks in Programming

Transcript

  1. What does the Market say? • Java 9 + Reactive

    Streams (JEP 266) • Spring Web Reactive (WebFlux) • Angular 2+, Cycle.js • Android Architecture Components, Room, Retrofit, Realm, …
  2. Functional Programming Programming style in which we avoid sharing state

    and data mutation, in favor of composing functions and passing data from one to another.
  3. • Functions as first-class citizens • Higher-order functions • Lambda

    expressions • Immutable data types • Functions are closures Language Support JAVA 8+ APPROVED
  4. Client says: “Make a sum of all our prices
 that

    are higher than $20,
 discounted by 10%” Example
  5. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example var sum = 0.0 for (var i = 0; i <= prices.size; i++) {
 if (price[i] > 20) {
 sum += price[i] * 0.9
 }}
 }} val prices = listOf(20, 12, 15, 21, 25, 28, 30)
  6. var sum = 0.0 for (var i = 0; i

    < prices.size; i++) {
 if (price[i] > 20) {
 sum += price[i] * 0.9
 }}
 }} Example val prices = listOf(20, 12, 15, 21, 25, 28, 30)
  7. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example var sum = 0.0 for (price in prices) {
 if (price > 20) {
 sum += price * 0.9
 }}
 }}
  8. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum() var sum = 0.0 for (price in prices) {
 if (price > 20) {
 sum += price * 0.9
 }}
 }}
  9. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum()
  10. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum()
  11. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum()
  12. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum()
  13. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum()
  14. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum() Important: 1. No import = stdlib 2. Cross-platform
  15. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum()
  16. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum() // Returns list containing only elements matching [predicate] fun <T> filter(predicate: (T) -> Boolean): List<T>
  17. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum() // Returns list containing only elements matching [predicate] fun <T> filter(predicate: (T) -> Boolean): List<T>
  18. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example // Returns list containing elements transformed by [transform] fun <T, R> map(transform: (T) -> R): List<R> val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum()
  19. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example // Returns list containing elements transformed by [transform] fun <T, R> map(transform: (T) -> R): List<R> val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum()
  20. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum() Higher level, more readable & closer to the Business Intent
  21. val prices = listOf(20, 12, 15, 21, 25, 28, 30)

    Example val sum = prices
 .filter { price -> price > 20 }
 .map { price -> price * 0.9 }
 .sum() var sum = 0.0 for (price in prices) {
 if (price > 20) {
 sum += price * 0.9
 }}
 }}
  22. You’re a Blog admin: “Calculate average blogpost length of all

    your blog’s Users that speak Polish” Example
  23. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example
  24. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val users = db.getAllUsers()
  25. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = ??? val users = db.getAllUsers()
  26. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users val users = db.getAllUsers()
  27. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { ??? }} val users = db.getAllUsers()
  28. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }} val users = db.getAllUsers()
  29. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .map { ??? }} val users = db.getAllUsers()
  30. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .map { user -> user.posts }} val users = db.getAllUsers()
  31. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .map { user -> user.posts } // Return type is: List<List<Post>> // But we need: List<Post> val users = db.getAllUsers()
  32. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .map { user -> user.posts } val users = db.getAllUsers()
  33. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .flatMap { user -> user.posts } val users = db.getAllUsers()
  34. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .flatMap { user -> user.posts } // Returns a single list with all elements yielded from // [transform] invoked on on each element of original collection. fun <T, R> flatMap(transform: (T) -> Iterable<R>): List<R> val users = db.getAllUsers()
  35. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .flatMap { user -> user.posts } // Returns a single list with all elements yielded from // [transform] invoked on on each element of original collection. fun <T, R> flatMap(transform: (T) -> Iterable<R>): List<R> val users = db.getAllUsers()
  36. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .flatMap { user -> user.posts } // Returns a single list with all elements yielded from // [transform] invoked on on each element of original collection. fun <T, R> flatMap(transform: (T) -> Iterable<R>): List<R> val users = db.getAllUsers()
  37. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .flatMap { user -> user.posts } val users = db.getAllUsers()
  38. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .flatMap { user -> user.posts }
 .map { ??? }} val users = db.getAllUsers()
  39. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .flatMap { user -> user.posts }
 .map { post -> post.body.length }} val users = db.getAllUsers()
  40. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .filter { user -> user.language == "PL" }
 .flatMap { user -> user.posts }
 .map { post -> post.body.length }
 .average() val users = db.getAllUsers()
  41. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .take(100)
 .flatMap { user -> user.posts }
 .map { post -> post.body.length }
 .average() val users = db.getAllUsers()
  42. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .take(100)
 .flatMap { user -> user.posts } .map { post -> post.body.length } .count() val users = db.getAllUsers()
  43. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .take(100)
 .flatMap { user -> user.posts }
 .map { post -> post.body.length }
 .reduce { acc, length -> acc + length } val users = db.getAllUsers()
  44. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .take(100)
 .flatMap { user -> user.posts }
 .map { post -> post.body.length }
 .reduce { acc, length -> acc + length }} val users = db.getAllUsers() // Returns a single value computed by accumulating values // returned from applying [operation] to each element. fun <S, T:S> reduce(operation: (acc: S, T) -> S): S
  45. data class Post(val title: String, val body: String)
 data class

    User(val language: String, val posts: List<Post>) Example val avgPolishPostLength = users
 .take(100)
 .flatMap { user -> user.posts }
 .map { post -> post.body.length }
 .fold(0) { acc, _ -> acc + 1 }} val users = db.getAllUsers() // Accumulates value starting with [initial] and applying // [operation] to current accumulator value and each element. fun <T, R> fold(initial: R, operation: (acc: R, T) -> R): R
  46. • No explicit mutation, reassignment, iteration etc. = less error

    prone • Low-level details controlled by the library = may be optimized under the hood • More expressive, more readable • Easy to enhance & modify the logic • Closer to the business intent Benefits
  47. // Returns list containing only elements matching [predicate] fun <T>

    filter(predicate: (T) -> Boolean): List<T> // Returns list containing elements transformed by [transform] fun <T, R> map(transform: (T) -> R): List<R> // Flattens nested lists into single list fun <T, R> flatMap(transform: (T) -> Iterable<R>): List<R> // Accumulates value by applying [operation] fun <S, T:S> reduce(operation: (acc: S, T) -> S): S // Accumulates value, starting with [initial] fun <T, R> fold(initial: R, operation: (acc: R, T) -> R): R Common Specialized Functions
  48. • RxJava • RxJS • Rx.NET • RxScala • RxClosure

    • RxCpp Implementation • RxPython • RxGo • RxKotlin • RxSwift • RxPHP • RxDart
  49. Observable.just(1) Observable.just(true) Observable.just(user) Observable.just("one", "two", "three") Creating a Stream Observable.fromIterable(users)

    Observable.fromArray(usersArray) Observable.fromCallable(this::sendRequest) Observable.fromFuture(future) Observable.defer(...) Observable.create(...)
  50. Observable.just(user)) .subscribe { item ->
 print("Item = $item")
 }} //

    Item = User(language="PL",posts=...) Creating a Stream
  51. Observable.just("one", "two", "three") .subscribe { item ->
 print("Item = $item")


    }} // Item = one // Item = two // Item = three Creating a Stream
  52. Observable.just(user1, user2, ...)) .subscribe { item ->
 print("Item = $item")


    }} // Item = User(language="PL",posts=...) // Item = User(language="EN",posts=...) // ... Creating a Stream
  53. Observable.fromIterable(users)) .subscribe { item ->
 print("Item = $item")
 }} //

    Item = User(language="PL",posts=...) // Item = User(language="EN",posts=...) // Item = User(language="PL",posts=...) // Item = User(language="PL",posts=...) ... Creating a Stream
  54. Observable.fromIterable(users)) .subscribe { item ->
 print("Item = $item")
 }} //

    Item = User(language="PL",posts=...) // Item = User(language="EN",posts=...) // Item = User(language="PL",posts=...) // Item = User(language="PL",posts=...) ... Creating a Stream Where are my Steroids?
  55. API

  56. API abstract class Observable<T> { fun <T> filter(predicate: (T) ->

    Boolean): Observable<T> fun <T, R> map(transform: (T) -> R): Observable<R> // // ...13k lines of code... // }} interface Observer<T> {
 }}
  57. API abstract class Observable<T> { fun <T> filter(predicate: (T) ->

    Boolean): Observable<T> fun <T, R> map(transform: (T) -> R): Observable<R> // // ...13k lines of code... // }} interface Observer<T> {
 fun onNext(@NonNull t: T)
 fun onError(@NonNull e: Throwable)
 fun onComplete()
 }}
  58. Observable.just("5", "8", "3", "7") .map(String::toInt) .filter { item -> item

    > 5 } .subscribe { item ->
 print("Item = $item")
 }} // Item = 8 // Item = 7 Transforming a Stream
  59. Observable.interval(1, TimeUnit.SECONDS) Interval .subscribe { item ->
 print("Item = $item")


    }} //00:00: Item = 0 //00:01: Item = 1 //00:02: Item = 2 //00:03: Item = 3 //00:04: Item = 4 //...
  60. Observable.interval(1, TimeUnit.SECONDS) Interval .map { item -> item * item

    } .subscribe { item ->
 print("Item = $item")
 }}
  61. Observable.interval(1, TimeUnit.SECONDS) Interval .map { item -> item * item

    }. .take(3) .subscribe { item ->
 print("Item = $item")
 }} //00:00: Item = 0 //00:01: Item = 1 //00:02: Item = 4
  62. // RxBinding RxView.clicks(button) // Observable<Object> .map { click -> 1

    } .scan { acc, next -> acc + next } UI Events .subscribe { count ->
 print("Count = $count")
 } // Count = 1 // Count = 2 // Count = 3 // Count = 4 // ...
  63. // Retrofit myService.getUsers() // Observable<User> .map { user -> user.posts.size

    } .reduce { acc, next -> acc + next } Network Requests .subscribe { count ->
 print("Post count = $count")
 } // Post count = 120
  64. // Task: perform network request after each click Combining //

    Return type: Observable<Observable<User>> // But we want: Observable<User> val buttonClicks = RxView.clicks(button) val userInfoRequest = myService.getUserInfo() buttonClicks .map { click -> myService.getUserInfo() }
  65. // Returns Observable that emits all items emitted by //

    Observables returned by [mapper] fun <T, R> flatMap(mapper: (T) -> Observable<R>): Observable<R> buttonClicks .flatMap { click -> myService.getUserInfo() } Combining val buttonClicks = RxView.clicks(button) val userInfoRequest = myService.getUserInfo() // Task: perform network request after each click
  66. Combining .map { user -> user.firstName } .subscribe { firstName

    ->
 print("firstName = $firstName")
 } // firstName = John val buttonClicks = RxView.clicks(button) val userInfoRequest = myService.getUserInfo() buttonClicks .flatMap { click -> myService.getUserInfo() } // Task: perform network request after each click
  67. // Task: do the same thing on btn1 & btn2

    click Combining val btn1Clicks = RxView.clicks(button1) val btn2Clicks = RxView.clicks(button2) btn1Clicks .map { click -> "Click!" } .subscribe { msg ->
 print("msg = $msg")
 }}
  68. // Task: do the same thing on btn1 & btn2

    click Combining val btn1Clicks = RxView.clicks(button1) val btn2Clicks = RxView.clicks(button2) Observable.merge(btn1Clicks, btn2Clicks) .map { click -> "Click!" } .subscribe { msg ->
 print("msg = $msg")
 }} // Merges two (or more) Observables into a single one. fun <T> merge(s1: Observable<T>, s2: Observable<T>): Observable<T>
  69. fun fetchUser(): User { /* fetch from network */ }

    Threading Observable.fromCallable(this::fetchUser) .subscribeOn(Schedulers.io()) .subscribe { user ->
 print("User = $user")
 }}
  70. fun fetchUser(): User { /* fetch from network */ }

    Threading Observable.fromCallable(this::fetchUser) .subscribeOn(Schedulers.io()) .subscribe { user -> statusLabel.text = user.firstName
 }}
  71. fun fetchUser(): User { /* fetch from network */ }

    Threading .subscribe { user -> statusLabel.text = user.firstName
 }} Observable.fromCallable(this::fetchUser) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
  72. Tips • Start small - get used to FP first


    (be declarative, avoid side-effects etc.) • RX is not an architecture. It's a Tool.
 So use it wisely • Subtask = flatMap • Remember that error = end of Stream
  73. Use RX when you need... • Composing & orchestrating multiple

    async events • Manipulating time (delay, timeout etc.) • Cancellation of async events
  74. • Great for composing/orchestrating multiple Event streams • 100+ operators

    for creating, transforming & combining streams • Declarative threading • Cancellation of async tasks • Do not overuse for simple tasks Conclusions on RX