Upgrade to Pro — share decks privately, control downloads, hide ads and more …

A Step-by-Step Guide to Kotlin Flow 手把手帶你認識 Kot...

Ivan
August 01, 2020

A Step-by-Step Guide to Kotlin Flow 手把手帶你認識 Kotlin Flow

In Android development, you might be already familiar with RxJava. Recently, Jetbrains released a reative programming library, Kotlin Flow. It is an asynchronous stream library based on top of Kotlin Coroutines. In this talk, we will show you the pros/cons of Kotlin Flow and explain how it works by analyzing its source code. Since Flow shares similarities with Rx streams, we will also talk about the comparison between Flow and Rx. Additionally, we will show the methods of testing Flow and examples of migrating from Rx to Flow.

Ivan

August 01, 2020
Tweet

More Decks by Ivan

Other Decks in Programming

Transcript

  1. - 什麼是 Reactive Programming? Abstract Kotlin Flow - Flow 的基本用法

    - Flow 的源碼 - Flow 與 RxJava 的比較 - 如何從 RxJava 轉到 Flow - Kotlin Flow Testing
  2. 我們是誰? 沒看過帥哥嗎?現在你可以看清楚一點 Android Developer @ PicCollage 彥彬 RxJava experience: 4

    years Community: Android Taipei, Android 讀書會 Senior Engineer @ ETtoday Ivan Wu Community: Taiwan Kotlin User Group
  3. Flow 的 基本用法 val numberFlow = flowOf(1, 2, 3, 4)

    numberFlow.map { element -> element + 1} .collect { number -> println(number) } Producer Consumer
  4. Transformer val numberFlow = flowOf(1, 2, 3, 4) val anotherFlow:

    Flow<Int> = numberFlow.map { element -> element + 1} anotherFlow.collect { number -> println(number) }
  5. Higher order function PRESENTATION fun foo() { higherOrderFunction { text

    -> println("Hello world! $text") } } fun higherOrderFunction(lambda: (String) -> Unit) { lambda.invoke("from lambda") }
  6. Producer in Flow PRESENTATION fun <T> flowOf(vararg elements: T): Flow<T>

    = flow { for (element in elements) { emit(element) } }
  7. PRESENTATION fun <T> flowOf(vararg elements: T): Flow<T> = flow {

    for (element in elements) { emit(element) } } fun <T> flow(block: FlowCollector<T>.() -> Unit) = SafeFlow(block) private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
  8. PRESENTATION fun <T> flowOf(vararg elements: T): Flow<T> = flow {

    for (element in elements) { emit(element) } } fun <T> flow(block: FlowCollector<T>.() -> Unit) = SafeFlow(block) private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
  9. PRESENTATION fun <T> flowOf(vararg elements: T): Flow<T> = flow {

    for (element in elements) { emit(element) } } fun <T> flow(block: FlowCollector<T>.() -> Unit) = SafeFlow(block) private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
  10. PRESENTATION fun <T> flowOf(vararg elements: T): Flow<T> = flow {

    for (element in elements) { emit(element) } } fun <T> flow(block: FlowCollector<T>.() -> Unit) = SafeFlow(block) private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
  11. PRESENTATION fun <T> flowOf(vararg elements: T): Flow<T> = flow {

    for (element in elements) { emit(element) } } fun <T> flow(block: FlowCollector<T>.() -> Unit) = SafeFlow(block) private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
  12. Produce elements 做投影片做到崩潰,讓我講個幹話 fun <T> flowOf(vararg elements: T): Flow<T> =

    flow { for (element in elements) { emit(element) } } Lambda => Lazy evaluation
  13. fun <T> flowOf(vararg elements: T): Flow<T> = flow { for

    (element in elements) { emit(element) } } fun <T> flow(block: FlowCollector<T>.() -> Unit) = SafeFlow(block) private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } } 我敢打賭你現在的頭一定是歪的
  14. 聽完這場大家都落枕 fun <T> flowOf(vararg elements: T): Flow<T> = flow {

    for (element in elements) { emit(element) } } fun <T> flow(block: FlowCollector<T>.() -> Unit) = SafeFlow(block) private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
  15. PRESENTATION fun <T> flowOf(vararg elements: T): Flow<T> = flow {

    for (element in elements) { emit(element) } } fun <T> flow(block: FlowCollector<T>.() -> Unit) = SafeFlow(block) private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } }
  16. PRESENTATION interface FlowCollector<T> { // Collect the value emitted by

    the upstream. // This method is not thread-safe and should not be invoked concurrently. fun emit (value: T) }
  17. PRESENTATION private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit

    ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } } abstract class AbstractFlow<T>: Flow<T> { override fun collect(collector: FlowCollector<T>) { val safeCollector = SafeCollector(collector, coroutineContext) try { collectSafely(collector) } finally { safeCollector.releaseInterrupted() } } }
  18. 是不是很期待看到幹話啊? private class SafeFlow<T>( private val block: FlowCollector<T>.() -> Unit

    ): AbstractFlow<T>() { override fun collectSafely(collector: FlowCollector<T>) { collector.block() } } abstract class AbstractFlow<T>: Flow<T> { override fun collect(collector: FlowCollector<T>) { val safeCollector = SafeCollector(collector, coroutineContext) try { collectSafely(collector) } finally { safeCollector.releaseInterrupted() } } }
  19. SafeCollector 就叫你不要看了 - Just a delegate class invoke “collector.emit()” for

    you - Source code: https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jv m/src/flow/internal/SafeCollector.kt
  20. PRESENTATION interface Flow<out T> { fun collect(collect: FlowCollector<T>) } fun

    <T> Flow<T>.collect(action: (value: T) -> Unit) = collect(object: FlowCollector<T> { override fun emit(value: T) = action(value) }) }
  21. 好了拉!快結束了 interface Flow<out T> { fun collect(collect: FlowCollector<T>) } fun

    <T> Flow<T>.collect(action: (value: T) -> Unit) = collect(object: FlowCollector<T> { override fun emit(value: T) = action(value) }) } Anonymous class
  22. Let’s lookback val numberFlow = flowOf(1, 2, 3, 4) numberFlow.map

    { element -> element + 1} .collect { number -> println(number) }
  23. Bonus! fun <T, R> Flow<T>.map(transform: (value: T) -> R): Flow<R>

    = transform { value -> return emit(transform(value)) } fun <T, R> Flow<T>.transform( transform: FlowCollector<R>.(value: T) -> Unit ): Flow<R> = flow { collect { value -> return@collect transform(value) } } Transformer
  24. Bonus! fun <T, R> Flow<T>.map(trnsform: (value: T) -> R): Flow<R>

    = transform { value -> return emit(transform(value)) } fun <T, R> Flow<T>.transform( transform: FlowCollector<R>.(value: T) -> Unit ): Flow<R> = flow { collect { value -> // kludge, without it Unit will be returned and TCE won't kick in, KT-28938 return@collect transform(value) } }
  25. Bonus! fun <T, R> Flow<T>.map(transform: (value: T) -> R): Flow<R>

    = transform { value -> return emit(transform(value)) } fun <T, R> Flow<T>.transform( transform: FlowCollector<R>.(value: T) -> Unit ): Flow<R> = flow { collect { value -> transform(value) } }
  26. Bonus! fun <T, R> Flow<T>.map(transform: (value: T) -> R): Flow<R>

    = flow { collect { value -> emit(transform(value)) } }
  27. 真的很暈 fun <T, R> Flow<T>.map(transform: (value: T) -> R): Flow<R>

    = flow { flowCollector -> collect { value -> emit(transform(value)) } } OldFlow NewFlow, NewCollector
  28. Comparison Stream Types • Flow ◦ Simple ◦ Kotlin language

    features support ◦ Smaller lib size ◦ Less operators • RxJava ◦ Steep learning curve ◦ Additional dependencies ◦ Inflexible structure ◦ More operators
  29. Stream Types RxJava VS. Flow • RxJava ◦ Observable ◦

    Flowable ▪ Same as Observable, but with backpressure support. ◦ Single ◦ Maybe ▪ It might complete without providing any value. ◦ Completable ▪ Without emitting values. • Flow ◦ Just... Flow.
  30. Example 1 Observable .just("Pikachu", "John", "Blabla") .subscribe { name ->

    println(name) } flowOf("Pikachu", "John", "Blabla") .collect { name -> println(name) }
  31. Example 2 Single.just(6) .subscribe { number -> println(number) } flow

    { emit(6) } .collect { number -> println(number) }
  32. Example 3 Observable .just("Pikachu", "John", "Blabla") .map { names ->

    names[5] } .subscribe( { ch -> println(ch) }, { e -> println(e) }, { println("Completed") } ) flowOf("Pikachu", "John", "Blabla") .map { names -> names[5] } .catch { e -> println(e) } .onCompletion { println("Completed") } .collect { ch -> println(ch) }
  33. Threading Stream Types • RxJava Schedulers ◦ io ◦ computation

    ◦ mainThread • Flow Dispatchers ◦ IO ◦ Default (for CPU-consuming code) ◦ Main
  34. Threading Operators (cont.) • In RxJava, we declare start and

    modify chain below. observeSomething() .subscribeOn(io()) .observe(mainThread()) .subscribeOn(computation()) .subscribe { result -> println(result) }
  35. Threading Operators (cont.) • In Kotlin Flow, we have end

    declared and can modify chain above. CouroutineScope(Job() + Dispatchers.Main).launch { observeSomething() .flowOn(Dispathcers.IO) .map { result -> result.length } .flowOn(Dispatchers.Default) .collect { result -> println(result) } }
  36. Migration From RxJava to Flow Stream Types • No need

    to do the whole migration all at once. ◦ The commits will be huge. ◦ Making incremental changes might be a good idea! • Incremental migration ◦ Use kotlinx-coroutines-rx2 • Kotlinx-coroutines-rx2 ◦ Observables transformation ◦ Suspending extensions
  37. Observable to Flow runBlocking { Observable.just(1, 2, 3, 4, 5)

    .asFlow() .flowOn(Dispatchers.IO) .collect { } }
  38. Suspend Functions Stream Types • A function that can be

    paused and resumed at a later time. • Executes a long running operation and wait for it to complete without blocking
  39. Coroutine Builders Stream Types • runBlocking ◦ Bridge between regular

    and suspend functions • Launch ◦ Fire and forget • Async ◦ Expect result
  40. Using Mocking Libs to test Flow Stream Types • Flow

    can be easily tested by using mocking libraries. • Use Mocking Libs ◦ Mockito-kotlin ◦ MockK
  41. Preparation Preparation data class User(val name: String, val id: Long)

    interface Service { fun getUser(): User } class ApiService : Service { override fun getUser() = User("John", 1L) }
  42. Preparation Preparation class UserRepository(private val service: Service) { fun getUser():

    Flow<User> = flow { emit(service.getUser()) } } infix fun Any?.shouldBe(expect: Any?) { Assert.assertEquals(expect, this) }
  43. Using Mocking Libs to test Flow Preparation @Test fun `Test

    getting an user info by MockK`() = runBlocking { } val fakeUser = User("Doe", 2L) val mockApiService = mockk<ApiService>(relaxed = true) every { mockApiService.getUser() } returns fakeUser val userRepository = UserRepository(mockApiService) userRepository.getUser().collect { it shouldBe fakeUser }
  44. Flow Assert Stream Types • It is from SqlDelight lib

    ◦ There is another lib also has flow assert, called Kotlin flow test observer. • An extension function of Flow ◦ Flow<T>.test(timeout: Long, validate: suspend FlowAssert<T>.() -> Unit) • Makes expressions more specific ◦ ExpectedItem ◦ ExpectedError ◦ ExpectedComplete • Takes advantage of Channels API and Coroutines API
  45. Concept Concept • Under the hood, it uses Channels to

    test flow. Flow Items Unlimited Buffer Channel FlowAssert expectedItem() expectedError() expectedComplete() Query the Channel Reference: https://proandroiddev.com/kotlin-flow-assert-ff45465c01c0
  46. Flow Assert Flow Assert @Test fun `Test getting an user

    info by Flow Assert`() = runBlocking { } val fakeUser = User("Pikachu", 3L) val fakeApiService = provideFakeService(fakeUser) val userRepository = UserRepository(fakeApiService) userRepository.getUser().test { expectItem() shouldBe fakeUser expectComplete() }
  47. Residential Projects Presentations are tools that can be used as

    speeches, reports, and more. It is mostly presented before an audience. 09
  48. Presentations are tools that can be used as speeches, reports,

    and more. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. AWARD 1 Presentations are tools that can be used as speeches, reports, and more. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. AWARD 2 Presentations are tools that can be used as speeches, reports, and more. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. AWARD 3 AWARDS AND RECOGNITION ARCHITECTURE PRESENTATION
  49. Services Presentations are tools that can be used as speeches,

    and more. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. ARCHITECTURAL AND ENGINEERING DESIGN ARCHITECTURE PRESENTATION no. 1 Presentations are tools that can be used as speeches, and more. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. CONSTRUCTION PROJECT MANAGEMENT Presentations are tools that can be used as speeches, and more. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. CONSULTATION no. 2 no. 3
  50. About us Presentations are communication tools that can be used

    as demonstrations, lectures, speeches, reports, and more. It is mostly presented before an audience. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. 02
  51. How we work It serves a variety of purposes, making

    presentations powerful tools for teaching. PHASE 1 It serves a variety of purposes, making presentations powerful tools for teaching. PHASE 2 It serves a variety of purposes, making presentations powerful tools for teaching. PHASE 3 It serves a variety of purposes, making presentations powerful tools for teaching. PHASE 4 It serves a variety of purposes, making presentations powerful tools for teaching. PHASE 5
  52. CLIENT TESTIMONIALS ARCHITECTURE PRESENTATION Home owner SHAY WILDER Presentations are

    tools that can be used as demonstrations, lectures, speeches, reports, and more. It is mostly presented before an audience. Commercial space owner AZARIAH PHILIPS Presentations are tools that can be used as demonstrations, lectures, speeches, reports, and more. It is mostly presented before an audience. Museum director REIGN GELLER Presentations are tools that can be used as demonstrations, lectures, speeches, reports, and more. It is mostly presented before an audience.
  53. Commercial projects Presentations are communication tools that can be used

    as demonstrations, lectures, speeches, reports, and more. It is mostly presented before an audience. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. 10 ARCHITECTURE PRESENTATION
  54. Restoration projects Presentations are tools that can be used as

    speeches, reports, and more. It is mostly presented before an audience. 11 ARCHITECTURE PRESENTATION
  55. Renovation projects Presentations are communication tools that can be used

    as demonstrations, lectures, speeches, reports, and more. It is mostly presented before an audience. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. 12 ARCHITECTURE PRESENTATION
  56. Featured projects Presentations are tools that can be used as

    speeches, and more. It serves a variety of purposes, making presentations powerful tools for convincing and teaching. ARCHITECTURE PRESENTATION