Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Start with the kotlin flow

Avatar for Abhishesh Abhishesh
February 06, 2021

Start with the kotlin flow

Flow is a new stream processing API introduced in kotlin.

In this talk we'll learn about flow API's, internal details & how flow can be used to handle asynchronous streams of data.

Avatar for Abhishesh

Abhishesh

February 06, 2021
Tweet

More Decks by Abhishesh

Other Decks in Programming

Transcript

  1. fun doLongRunningWork( callback: (x: Int) -> Unit) { thread {

    val result = compute() callback(result) } }
  2. suspend fun doLongRunningWork : Int() { val result = compute()

    return result } suspend fun compute() : Int { ... }
  3. fun doLongRunningWork( callback: (x: Int) -> Unit) { thread {

    val result = compute() callback(result) } } suspend fun doLongRunningWork : Int() { val result = compute() return result } suspend fun compute() : Int { ... }
  4. doLongRunningWork { doLongRunningWork { ... ... } } couroutineScope.launch {

    val first = doLongRunningWork() val second = doLongRunningWork() ... ... }
  5. doLongRunningWork { doLongRunningWork { doLongRunningWork { ... ... } }

    } couroutineScope.launch { val first = doLongRunningWork() val second = doLongRunningWork() val third = doLongRunningWork() ... ... }
  6. • Asynchronous data stream • Built on top of coroutine

    • Emits value • Completes normally OR with exception • Usually flows are cold
  7. • Asynchronous data stream • Built on top of coroutine

    • Emits value • Completes normally OR with exception • Usually flows are cold flow<Int> { for (i in 1..10) { emit(i) } }
  8. • Asynchronous data stream • Built on top of coroutine

    • Emits value • Completes normally OR with exception • Usually flows are cold flow<Int> { for (i in 1..10) { println(“emit: $i”) emit(i) } }
  9. • Asynchronous data stream • Built on top of coroutine

    • Emits value • Completes normally OR with exception • Usually flows are cold flow<Int> { for (i in 1..10) { println(“emit: $i”) emit(i) } }.collect { println(“col: $it”) println(it) }
  10. public interface Flow<out T> { public suspend fun collect(collector: FlowCollector<T>)

    } public interface FlowCollector<in T> { public suspend fun emit(value: T) }
  11. flow { emit(value) } flowOf(1, 2, 3) listOf<Int>(1, 2, 3,

    4).asFlow() channelFlow<Int> { ... }
  12. CoroutineScope(Dispatchers.Main).launch { flow { for (i in 1..3) { println("emit:

    $i") emit(i) } }.map { println("map: $it") it * 2 }.collect { println("collect: $it") } }
  13. CoroutineScope(Dispatchers.Main).launch { flow { for (i in 1..3) { println("emit:

    $i") emit(i) } }.map { println("map: $it") it * 2 }.collect { println("collect: $it") } } emit: 1 map: 1 collect: 2 emit: 2 map: 2 collect: 4 emit: 3 map: 3 collect: 6
  14. CoroutineScope(Dispatchers.Main).launch { flow { for (i in 1..3) { println("emit:

    $i") emit(i) } }.filter { println("filter: $it") it % 2 == 0 }.collect { println("collect: $it") } }
  15. CoroutineScope(Dispatchers.Main).launch { flow { for (i in 1..3) { println("emit:

    $i") emit(i) } }.filter { println("filter: $it") it % 2 == 0 }.collect { println("collect: $it") } } emit: 1 filter: 1 emit: 2 filter: 2 collect: 2 emit: 3 filter: 3
  16. CoroutineScope(Dispatchers.Main).launch { flow { for (i in 1..3) { delay(100)

    emit(i) } }.collect { delay(200) println("collect: $it") } } Output 1 2 3 Time taken : 937ms
  17. CoroutineScope(Dispatchers.Main).launch { flow { for (i in 1..3) { delay(100)

    emit(i) } }.buffer(). collect { delay(200) println("collect: $it") } } Output 1 2 3 Time taken : 745ms
  18. CoroutineScope(Dispatchers.Main).launch { flow { for (i in 1..3) { delay(100)

    emit(i) } }.conflate() .collect { delay(300) println("collect: $it") } } Output 1 3
  19. CoroutineScope(Dispatchers.Main).launch { flow<Int> { withContext(Dispatchers.IO) { for (i in 1..10)

    { emit(i) } } }.collect { println(it) } } java.lang.IllegalStateException: Flow invariant is violated:
  20. CoroutineScope(Dispatchers.Main).launch { flow<Int> { for (i in 1..10) { emit(i)

    } }.flowOn(Dispatchers.IO) .collect { println(it) } } Changes the context of upstream flow
  21. uiScope.launch { dataFlow() .catch{ e -> println(“caught $e“} .collect {

    value -> println(value) } } a downstream exception must always be propagated to the collector.
  22. Integrated with room @Dao abstract class Dao { @Query("SELECT *

    FROM table") abstract fun getExamples(): Flow<List<Model>> }
  23. Integrated with live data implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle_version" fun <T> LiveData<T>.asFlow(): Flow<T>

    {} fun <T> Flow<T>.asLiveData( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration ): LiveData<T> { }
  24. class FakeFlowProducer : FlowProducer { fun data() = flow {

    emit(ITEM_1) } fun allData() = listOf(ALL_VALUES).asFlow() } @Test fun fakeFlowProducerTest() = runBlocking { val producer = FakeFlowProducer() val firstItem = producer.data.first() assertThat(firstItem, isEqualTo(ITEM_1) }
  25. class FakeFlowProducer : FlowProducer { fun data() = flow {

    emit(ITEM_1) } fun allData() = listOf(ALL_VALUES).asFlow() } @Test fun fakeFlowProducerTest() = runBlocking { val producer = FakeFlowProducer() val firstItem = producer.data.first() assertThat(firstItem, isEqualTo(ITEM_1) } Creates a testcoroutinescope which have aa testcoroutinedispatcher & testcoroutineexceptionhandler. Also accepts and lambda test body.
  26. @Test fun testMultipleValues() = runBlocking { val producer = FakeFlowProducer()

    val multipleValues = producer.multipleValues().toList() assertThat(multipleValues, isEqualTo(ALL_VALUES)) }