Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Operation Flow

Seyed Jafari
September 28, 2021

Operation Flow

Coroutine Flows are the new hot tech in reactive apps.
They are very powerful and easy to use at the same time. the best part is they support KMP so we can share our flows with other platforms.

In this talk, I go through different flow operators and flow types (ex: SharedFlow, StateFlow, and …).
I explain their usage and compare their subtle differences.

I also cover how we can leverage Coroutines Flow and its operators to build reactive architectures.

No talk should be ended without talking about testing.
In this talk, I’ll explain how we can write readable and concise tests with Kotlin Flows that run on all platforms.

Seyed Jafari

September 28, 2021
Tweet

More Decks by Seyed Jafari

Other Decks in Technology

Transcript

  1. What is Flow Docs: An asynchronous data stream that sequentially

    emits values and completes normally or with an exception.
  2. 1. Builders/Creators • flowOf<String>(“first”) • emptyFlow<String>() • flow<String> {} •

    callbackFlow<String> {} • channelFlow<String> {} • listOf<String>().asFlow() • listOf<Flow<String>>().merge()
  3. channelFlow<String> {} / callbackFlow<String> {} callbackFlow<String> { val listener :

    object : Listener{ override fun onSuccess(value : String){ sendBlocking(value) channel.close() } } locationManager.register(listener) awaitClose { locationManager.unRegister(listener) } }
  4. 2. Intermediate flowOf("1") .filter { it.isNotBlank() } .withIndex() .map {

    it.value } .drop(1) .take(1) .buffer(10) .debounce(10000) .distinctUntilChanged() .onStart { } .onEach { } .onCompletion { } .onEmpty { } .catch { } .retry(1) .flowOn() .transform<String, String> { } .zip(flowOf("")) { s, s2 -> s } .combine(flowOf("")) { a: String , b: String -> "" } .flatMapMerge<String, String> { value -> flowOf(value) } .flatMapConcat { value -> flowOf(value) } .flatMapLatest { flowOf(it) } .scan("") { accumulator: String , value: String -> accumulator + value } .runningReduce { accumulator , value -> "" }
  5. • flowOf("1", "", "2").filter { it.isNotBlank() } // "1", "2"

    • flowOf(1, 2).map { "$it" } // "1", "2" • flowOf("first", "second").withIndex() // "index= 0, value= first", "index=1, value= second" • flowOf(1, 2, 3).drop(1) // 2, 3 • flowOf(1, 2, 3).take(1) // 1 • flowOf(1, 1, 1, 2, 1, 3).distinctUntilChanged() // 1, 2, 1, 3
  6. flowOf(1, 2, 3) .onEach { println("onEach= $it") } .buffer(10, BufferOverflow.SUSPEND)

    .collect { delay(1000) println("collected= $it") } // onEach= 1, onEach= 2, onEach= 3, collected= 1, collected= 2, collected= 3
  7. flowOf(1, 2, 3).onEach { println(it) delay(1000) } // 1, 2,

    3 flowOf<Int>(1, 2,).filter{ it > 5 }.onEmpty { emitAll(flowOf(1, 2, 3)) } //1, 2, 3
  8. flowOf(1, 2, 3).onCompletion { throwable -> if (throwable == null)

    { // normal cancellation emit(4) }else{ // exception occurred // we can not emit anymore } } // 1, 2, 3, 4
  9. flow<Int> { throw IllegalStateException("cancel the flow") } .retryWhen { cause,

    attempt -> attempt < 3 } .catch { throwable -> emit(1) } // 1,
  10. flowOf("first", "", "second") .transform { if(it.isEmpty()) return@transform val newItems =

    api.search(it) emit(newItems) } fun Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R> = flow { collect { value -> return@collect transform(value)}}
  11. api1.get().zip(api2.get()) { f: Response1, s: Response2 -> f + s

    } api.fetchBookList() .combine(queryNameFlow){books: List<Book>, s: String -> books.filter { it.name.contains(s) } }
  12. userGenerateQuery.flatMapLatest { query -> localDataBase.observeForQuery(query) } flowOf(1, 2, 3).flatMapLatest {

    value -> flow { delay(50) emit("a$value") delay(Random.nextLong(1000)) emit("b$value") } } // a3, b3
  13. flowOf(1, 2, 3) .runningReduce { accumulator, value -> accumulator *

    value } // 1, 2, 6 resultFlow .scan(InitialState) { lastState, newResult -> lastState.createNewStateForResult(newResult) }
  14. 3. Terminals • flowOf(1, 2, 3).collect { } • flowOf(1,

    2, 3).collectIndexed { index, value -> } • flowOf(1, 2, 3).launchIn(scope) • flowOf(1, 2, 3).toList() • flowOf(1, 3, 3).toSet() • flowOf(1, 2, 3).single() • flowOf(1, 2, 3).first() • flowOf(1, 2, 3).reduce { acc, value -> acc + value } • flowOf(1, 2, 3).fold("") { acc, value -> acc + value }
  15. Collectors • flowOf(1, 2, 3).collect { } // 1, 2,

    3 • flowOf(1, 2, 3).collectIndexed { index, value-> } // 0:1, 1:2, 2:3 • flowOf(1, 2, 3).launchIn(scope)
  16. Reducers • flowOf(1, 2, 3).reduce { acc, value -> acc

    + value } //6 • flowOf(1, 2, 3).fold("") { acc, value -> acc + value } //"123"
  17. StateFlow Builders • MutableStateFlow<Int>(0) • flowOfStates.stateIn(scope) StateFlow Emitters • scope.launch{

    mutableSharedFlow.emit() } • mutableSharedFlow.tryEmit() • mutableSharedFlow.value = newValue
  18. Testing @Test fun test() = runBlocking { val events =

    flowOf(1, 2, 3).toList() assert(events) }
  19. Turbine @Test fun hotFlowTest() = runBlocking { val sharedFlow =

    MutableSharedFlow<Int>() sharedFlow.test { sharedFlow.emit(1) assert(expectItem()) cancelAndIgnoreRemainingEvents() }}
  20. 🎇 Operation successful 🎇 Happy Flowing, Kotlin Slack @worldsnas Thanks

    We are actively hiring in wide range of positions. make sure to checkout our openings on: https://www.revolut.com/careers