Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Kotlin Flows

Kotlin Flows

They finally arrived - Kotlin Flows!
Are they really great or just good enough?
Does RxJava finally have a true competitor?
Will they be the saviour we all have been waiting for?

Avatar for Rostyslav Lesovyi

Rostyslav Lesovyi

December 26, 2019
Tweet

More Decks by Rostyslav Lesovyi

Other Decks in Programming

Transcript

  1. // KLUG - Kotlin Flows Flows - Introduction val list

    = listOf(1, 2, 3) list.forEach { print(it) } val sequence = sequenceOf(1, 2, 3) sequence.forEach { print(it) } val flow = flowOf(1, 2, 3) flow.collect { print(it) } val observable = Observable.fromArray( 1, 2, 3) observable.subscribe { print(it) } -> 1, 2, 3
  2. // KLUG - Kotlin Flows Flows - Introduction listOf(1, 2,

    3) .filter { it % 2 != 0 } .map { it * 2 } .forEach { print(it) } 1 2 3 1 3 2 6 2 6 listOf filter map forEach List created
  3. // KLUG - Kotlin Flows Flows - Introduction sequenceOf(1, 2,

    3) .filter { it % 2 != 0 } .map { it * 2 } .forEach { print(it) } 1 2 3 1 6 6 sequenceOf filter map forEach 2 2 3 .. ..
  4. // KLUG - Kotlin Flows IO Thread Caller Thread flowOf(1,

    2, 3) .filter { it % 2 != 0 } .flowOn(Dispatchers.IO) .collect { print(it) } Flows - Introduction 1 2 3 1 3 flowOf filter forEach 1 3 .. ..
  5. // KLUG - Kotlin Flows Flows - Introduction flowOf(1, 2,

    3) .filter { it % 2 != 0 } .flowOn(Dispatchers.IO) .collect { print(it) } Observable.fromArray( 1, 2, 3) .filter { it % 2 != 0 } .subscribeOn(Schedulers.io()) .observeOn(mainScheduler) .subscribe { print(it) }
  6. // KLUG - Kotlin Flows Using Flows Wow, they are

    so similar to Rx. I know them already!
  7. // KLUG - Kotlin Flows Using Flows - Type of

    the Operators • Builders • Terminal operators • Flow operators • Context • Delay • Error • Distinct • Transform • Limit • Merge • Emitters • Zip
  8. // KLUG - Kotlin Flows Using Flows - Terminology flowOf(1,

    2, 3) .filter { it % 2 != 0 } .map { it * 2 } .map { "value = $it" } .catch { print(it.message) } .collect() < Builder < Flow operators < Terminal operator
  9. // KLUG - Kotlin Flows Using Flows - Terminology flowOf(1,

    2, 3) .filter { it % 2 != 0 } .map { it * 2 } .map { "value = $it" } .catch { print(it.message) } .collect() Upstream Downstream
  10. // KLUG - Kotlin Flows Using Flows - Builders val

    flow1 = flowOf(1, 2, 3) // Observable.fromArray val flow2 = listOf(1, 2, 3).asFlow() // toObservable() val empty = emptyFlow<Int>() // Observable.empty<Int>()
  11. // KLUG - Kotlin Flows Using Flows - Builders val

    list = flowOf(1, 2, 3).toList() // Observable.toList() val set = flowOf(1, 2, 3).toSet() val linkedList = flowOf(1, 2, 3).toCollection(LinkedList())
  12. // KLUG - Kotlin Flows Using Flows - Builders val

    flow = flow { // Observable.create emit(1) scope.launch { emit(2) <-- prohibited } } val flow = channelFlow { // Observable.create send(1) launch { send(2) <-- allowed } awaitClose { /* free resources if needed */ } }
  13. // KLUG - Kotlin Flows Using Flows - Terminal operators

    val flow = flowOf(1, 2, 3) scope.launch { flow.collect { print(it) } // Observable.subscribe { print(it) } } -> 1, 2, 3
  14. // KLUG - Kotlin Flows Using Flows - Terminal operators

    val flow = flowOf(1, 2, 3) flow.launchIn(scope) scope.launch { flow.collect() }
  15. // KLUG - Kotlin Flows Using Flows - Terminal operators

    val flow = flow { emit(1) delay(50) // suspend block, we can delay here emit(2) } flow.collectLatest { delay(100) // emulate work println(it) } -> 2
  16. // KLUG - Kotlin Flows Using Flows - Terminal operators

    val flow = flowOf(1, 2, 3) flow.count() // Observable.count() -> 3
  17. // KLUG - Kotlin Flows Using Flows - Terminal operators

    val flow = flowOf(1) flow.single() // Observable.singleOrError() flow.singleOrNull() // Observable.singleElement() flow.first() // Observable.firstElement() flow.firstOrNull() // Observable.firstOrError() -> 1
  18. // KLUG - Kotlin Flows Using Flows - Terminal operators

    val flow = flowOf(1, 2, 3) flow.reduce { accumulator, value -> // Observable.reduce() accumulator + value } -> 6
  19. // KLUG - Kotlin Flows Using Flows - Terminal operators

    val flow = flowOf(1, 2, 3) flow.fold(20) { accumulator, value -> // Observable.reduce() accumulator + value } -> 26
  20. // KLUG - Kotlin Flows Using Flows - Flow operators

    val flow = flowOf(1, 2, 3) flow.map { it * 2 } // Observable.map { ... } -> 2, 4, 6
  21. // KLUG - Kotlin Flows Using Flows - Flow operators

    val flow = flowOf(1, 2, 3) flow.filter { it % 2 != 0 } // Observable.filter { ... } -> 1, 3
  22. // KLUG - Kotlin Flows Using Flows - Flow operators

    val flow = flowOf(1, 2, 3) flow.flatMapMerge { // Observable.flatMap { ... } flowOf(it * 2, it * 3) } -> [1, 2, 3] -> [[2, 3], [4, 6], [6, 9]] -> [2, 3, 4, 6, 6, 9]
  23. // KLUG - Kotlin Flows Using Flows - Flow operators

    val flow = flowOf(1, 2, 3) flow.transform { // Observable.flatMap { ... } emit(it * 2) emit(it * 3) } -> [1, 2, 3] -> [[2, 3], [4, 6], [6, 9]] -> [2, 3, 4, 6, 6, 9]
  24. // KLUG - Kotlin Flows Using Flows - Flow operators

    val flow = flowOf(1, 2, 3) // Observable.onErrorResumeNext { } flow.map { throw Exception("oops") } .catch { print(it.message) } .collect() -> oops
  25. // KLUG - Kotlin Flows Using Flows - Flow operators

    val flow = flowOf(1, 2, 3) flow.onStart { coroutineContext[CoroutineName]?. name } .flowOn(CoroutineName( "name2")) .onStart { coroutineContext[CoroutineName]?. name } .flowOn(CoroutineName( "name1")) .collect() -> name1, name2
  26. // KLUG - Kotlin Flows Using Flows - Cancellation #1

    val flow = flowOf(1, 2, 3) val job = scope.launch { flow.collect { } } job.cancel() // Disposable.dispose()
  27. // KLUG - Kotlin Flows Using Flows - Cancellation #2

    val flow = flowOf(1, 2, 3) try { flow.collect { if (it == 2) throw Exception("cancel") print(it) } } catch (e: Exception) { print(e.message) } -> 1, cancel
  28. // KLUG - Kotlin Flows Using Flows - Cancellation #3

    val flow = flowOf(1, 2, 3) withTimeoutOrNull(400) { flow.collect { delay(150) print(it) } } -> 1, 2
  29. // KLUG - Kotlin Flows Flow Internals • Properties ◦

    Context preservation ◦ Exception transparency • Tax Flow collectors • Operator fusing • Backpressure • Cold vs Hot
  30. // KLUG - Kotlin Flows Flow Internals - Context preservation

    1. scope.launch { 2. flow { 3. emit(1) 4. 5. launch { 6. emit(2) 7. } 8. } 9. } < OK < prohibited < coroutine context #1 < coroutine context #2 Rule #1: emit must always be called on the coroutine context of its flow
  31. // KLUG - Kotlin Flows Flow Internals - Context preservation

    1. scope.launch { 2. channelFlow { 3. send(1) 4. 5. launch { 6. send(2) 7. } 8. } 9. } Rule #2: send/offer doesn’t care about coroutine context
  32. // KLUG - Kotlin Flows Flow Internals - Context preservation

    1. flow { 2. val channel = Channel<Int>(Channel. BUFFERED) 3. 4. launch { 5. channel.send(1) 6. channel.send( 2) 7. channel.send( 3) 8. } 9. 10. for (value in channel) { 11. emit(value) 12. } 13. }
  33. // KLUG - Kotlin Flows Flow Internals - Exception transparency

    1. @Throws(Exception::class) 2. fun doSomethingOrCrash(): Int 3. 4. flow { 5. try { 6. emit(doSomethingOrCrash()) 7. } catch (e: Exception) { 8. // handle or ignore 9. } 10. } < prohibited Rule #3: emit (downstream) exception must never be handled
  34. // KLUG - Kotlin Flows Flow Internals - Exception transparency

    1. val flow = flow { 2. for (value in 0..1000) { 3. try { 4. emit(value) 5. } catch (e: Exception) { 6. print(e.message) 7. } 8. } 9. } 10. flow.collect { 11. if (it == 2) throw Exception("cancel") 12. print(it) 13. } -> 0, 1, cancel, 3, 4, 5, ...
  35. // KLUG - Kotlin Flows Flow Internals - Exception transparency

    1. withTimeoutOrNull(25) { 2. val flow = flow { 3. for (value in 0..1000) { 4. try { 5. emit(value) 6. delay(50) 7. } catch (e: Exception) { 8. print(e.message) 9. } 10. } 11. } 12. flow.collect { print(it) } 13. } -> 0, Timed out waiting for 25 ms, 1, Timed out waiting for 25 ms, 2, ...
  36. // KLUG - Kotlin Flows Flow Internals - Flow collectors

    1. public interface Flow<out T> { 2. public suspend fun collect(collector: FlowCollector< T>) 3. } 1. public interface FlowCollector< in T> { 2. public suspend fun emit(value: T) 3. }
  37. // KLUG - Kotlin Flows Flow Internals - Flow collectors

    1. val flow = flow { 2. emit(1) 3. emit(2) 4. } 1. val flow = object : Flow<Int> { 2. override suspend fun collect(collector: FlowCollector<Int>) { 3. collector.emit( 1) 4. collector.emit( 2) 5. } 6. } Almost the same except flow {...} also enforces properties checks
  38. // KLUG - Kotlin Flows Flow Internals - Operator fusing

    ### Operator fusion Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are always fused so that only one properly configured channel is used for execution. 1. flowOf(1, 2, 3) 2. .flowOn(Dispatchers.IO) 3. .buffer(20) 4. .flowOn(Dispatchers.Main) 5. .count { } < channel-based < channel-based < channel-based all reusing single channel
  39. // KLUG - Kotlin Flows Flow Internals - Backpressure 1.

    val flow = flow { 2. for (value in 0..5) { 3. print("out $value") 4. emit(value) 5. } 6. } 7. flow.collect { 8. print("in $it") 9. delay(100) 10. } -> out 0, in 0, out 1, in 1, out 2, in 2, ...
  40. // KLUG - Kotlin Flows Flow Internals - Backpressure 1.

    val flow = flow { 2. for (value in 0..5) { 3. print("out $value") 4. emit(value) 5. } 6. } 7. flow.buffer(1).collect { 8. delay(100) 9. print("in $it") 10. } -> out 0, out 1, out 2, in 0, out 3, in 1, out 4, in 2, out 5, ...
  41. // KLUG - Kotlin Flows Flow Internals - Backpressure 1.

    val flow = flow { 2. for (value in 0..5) { 3. print("out $value") 4. emit(value) 5. } 6. } 7. flow.buffer(Channel.CONFLATED).collect { 8. delay(100) 9. print("in $it") 10. } -> out 0, out 1, out 2, out 3, out 4, out 5, in 0, in 5
  42. // KLUG - Kotlin Flows Flow Internals - Cold vs

    Hot 1. val flow = flow { 2. print("emitting") 3. emit(1) 4. emit(2) 5. } 6. 7. delay(100) 8. 9. print("collecting") 10. flow.collect { 11. print(it) 12. } -> collecting, emitting, 1, 2 Cold
  43. // KLUG - Kotlin Flows Flow Internals - Cold vs

    Hot 1. val channel = flow { 2. print("emitting") 3. emit(1) 4. emit(2) 5. }.produceIn(scope) 6. 7. delay(100) 8. 9. print("collecting") 10. for (value in channel) { 11. print(value) 12. } -> emitting, collecting, 1, 2 HOT Convert to channel
  44. // KLUG - Kotlin Flows Extending Flows - take() example

    1. public fun <T> Flow<T>.take(count: Int) = flow { 2. var consumed = 0 3. try { 4. collect { value -> 5. emit(value) 6. if (++consumed == count) { 7. throw AbortFlowException() 8. } 9. } 10. } catch (e: AbortFlowException) { 11. // Nothing, bail out 12. } 13. }
  45. // KLUG - Kotlin Flows Extending Flows - AndroidX Room

    @Dao interface UsersDao { @Query("SELECT * FROM users WHERE id == :id") suspend fun getById(id: String): UserRecord @Query("SELECT * FROM users WHERE id == :id") fun rxTrackById(id: String): Flowable<UserRecord> @Query("SELECT * FROM users WHERE id == :id") fun flowTrackById(id: String): Flow<UserRecord> }
  46. // KLUG - Kotlin Flows Extending Flows - AndroidX Room

    1. flow { 2. val observerChannel = Channel<Unit>(Channel.CONFLATED) 3. val observer = object : InvalidationTracker.Observer(tableNames) { 4. override fun onInvalidated(tables: MutableSet<String>) { 5. observerChannel.offer(Unit) 6. } 7. } 8. observerChannel.offer(Unit) 9. 10. val flowContext = coroutineContext 11. withContext(coroutineContext) { 12. db.invalidationTracker.addObserver(observer) 13. try { 14. for (signal in observerChannel) { 15. val result = performQuery() 16. withContext(flowContext) { emit(result) } 17. } 18. } finally { 19. db.invalidationTracker.removeObserver(observer) 20. } 21. } 22. } < create channel < notify on change < signal initial query < add listener < query table < ensure context & emit < release listener
  47. // KLUG - Kotlin Flows Extending Flows - Interval flow

    1. val flow = flow { 2. while (true) { 3. emit(Unit) 4. delay(delay) 5. } 6. } 1. flow.collect { 2. print("trigger") 3. } > trigger, [delay], trigger, [delay], trigger, [delay], ...
  48. // KLUG - Kotlin Flows Extending Flows - Observable to

    Flow 1. fun <T> Observable<T>.toFlow() = channelFlow { 2. val disposable = subscribeWith(object : DisposableObserver<T>() { 3. override fun onNext(t: T) { offer(t) } 4. override fun onComplete() { close() } 5. override fun onError(e: Throwable) { close(e) } 6. }) 7. awaitClose { disposable.dispose() } 8. } 1. Observable.fromArray(1, 2, 3).toFlow() 2. .collect { 3. print(it) 4. } -> 1, 2, 3 < send next value < close channel < crash channel < release Observable
  49. // KLUG - Kotlin Flows Extending Flows - Log Flow

    Lifecycle 1. fun <T> Flow<T>.logLifecycle() = flow { 2. try { 3. print("flow start") 4. collect { 5. print("flow emit") 6. emit(it) 7. } 8. print("flow finish") 9. } catch (e: Exception) { 10. print("flow error") 11. throw e 12. } 13. } 1. flowOf(1, 2, 3) 2. .logLifecycle() 3. .collect { 4. print(it) 5. } -> flow start, flow emit, 1, flow emit, 2, flow emit, 3, flow finish
  50. // KLUG - Kotlin Flows Benchmarks - Flow vs Rx

    - Emit 1. flow { 2. for (value in 0..count) { 3. emit("") 4. } 5. }.collect() 1. Observable.create<String> { 2. for (value in 0..count) { 3. it.onNext("") 4. } 5. it.onComplete() 6. }.blockingSubscribe()
  51. // KLUG - Kotlin Flows Benchmarks - Flow vs Rx

    - Emit ~4x slower ~5x slower ~1.3x slower ~1.7x slower
  52. // KLUG - Kotlin Flows Benchmarks - Flow vs Rx

    - Cancel flowOf("", "").collect { throw Exception() } flowOf("", "").collect { throw exception } flowOf("", "").collect { coroutineContext[Job]?.cancel() } 1. Observable.fromArray( "", "") 2. .subscribe( object : DisposableObserver<String>() { 3. override fun onComplete() {} 4. override fun onNext(t: String) = dispose() 5. override fun onError(e: Throwable) {} 6. }) “Correct” - throw exception (still need to catch exception) Bad - throw cached exception (exception has wrong stack trace) Bad - cancel job (but it also cancels parent job)
  53. // KLUG - Kotlin Flows Benchmarks - Flow vs Rx

    - Cancel ~49x slower ~9x slower ~14x slower
  54. // KLUG - Kotlin Flows Benchmarks - Why Job.cancel is

    faster? internal class JobCancellationException constructor(...) : CancellationException(...), CopyableThrowable<JobCancellationException> { // ... override fun fillInStackTrace(): Throwable { if (DEBUG) { return super.fillInStackTrace() } /* * In non-debug mode we don't want to have a stacktrace * on every cancellation, parent job reference is enough. * Stacktrace of JCE is not needed most of the time and hurts performance. */ return this } // ... }
  55. // KLUG - Kotlin Flows Benchmarks - Flow vs Rx

    Benchmark Mode Cnt Score Error Units Benchmarks.flow_emit_1 thrpt 5 3652247.139 ± 16668.981 ops/s Benchmarks.flow_emit_10 thrpt 5 3281303.599 ± 42834.687 ops/s Benchmarks.flow_emit_1000 thrpt 5 366228.664 ± 8101.192 ops/s Benchmarks.flow_emit_1_000_000 thrpt 5 368.609 ± 2.751 ops/s Benchmarks.rx_emit_1 thrpt 5 19697204.475 ± 415523.489 ops/s Benchmarks.rx_emit_10 thrpt 5 19168316.124 ± 446412.865 ops/s Benchmarks.rx_emit_1000 thrpt 5 460720.252 ± 3492.123 ops/s Benchmarks.rx_emit_1_000_000 thrpt 5 627.488 ± 2.768 ops/s Benchmarks.flow_cancel thrpt 5 656798.927 ± 173212.926 ops/s Benchmarks.flow_cancel_cacheException thrpt 5 3658436.373 ± 696998.489 ops/s Benchmarks.flow_cancel_cancelJob thrpt 5 2363809.629 ± 408097.912 ops/s Benchmarks.rx_cancel thrpt 5 32290853.094 ± 8736309.342 ops/s
  56. // KLUG - Kotlin Flows Conclusion Pros: • Built-in suspend

    functions support • No need to handle cancellation • Easy to write simple extensions Cons: • Much bigger overhead -> much slower (especially for cancellation) • Requires scope to call terminal function • More edge-cases (for ex. context preservation, exception transparency) • Most interesting functions are marked as FlowPreview/ExperimentalCoroutinesApi
  57. More info: 1. Always read sources, a lot of interesting

    stuff there :) 2. https://kotlinlang.org/docs/reference/coroutines/flow.html