Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Dive into Async apps with Kotlin Coroutines

Adit Lal
November 02, 2019

Dive into Async apps with Kotlin Coroutines

This talk highlights the challenges underpinning some of the paradigms of asynchronous programming, in particular, the callback-based approach. The talk will address how Kotlin aims to solve this problem with coroutines by providing an asynchronous interface to the developer.

- Coroutines Generator API - Sequences or iterables in an asynchronous manner
- Coroutine Scopes and how to manage them
- How to use Coroutines to do network calls from the app.
- Actors, easing concurrency for Android Developers
- Sneak peek at channels, broadcast channel, flow
- Using Coroutines and Rx2 together

Adit Lal

November 02, 2019
Tweet

More Decks by Adit Lal

Other Decks in Programming

Transcript

  1. Threads • Threads are expensive • occupies about 1-2 mb

    • Requires context switching • Thread management • Race conditions • Thread Pools
  2. Future/Promise • Similar to callbacks • Composition model with chained

    calls • Complicated Exception handling • Returns Promise
  3. Coroutines • Launch a long-running operation w/o blocking thread •

    Functions are now suspend-able • Platform independent
  4. fun loadData() = GlobalScope.launch(uiContext) { showLoading() // ui thread val

    result = dataProvider.fetchData() // non ui thread, suspend until finished showText(result) // ui thread hideLoading() // ui thread } Suspend
  5. fun loadData() = GlobalScope.launch(uiContext) { showLoading() // ui thread val

    result = dataProvider.fetchData() // non ui thread, suspend until finished showText(result) // ui thread hideLoading() // ui thread }
  6. fun loadData() = GlobalScope.launch(uiContext) { showLoading() // ui thread val

    result = dataProvider.fetchData() // non ui thread, suspend until finished showText(result) // ui thread hideLoading() // ui thread }
  7. fun loadData() = GlobalScope.launch(uiContext) { showLoading() // ui thread val

    result = dataProvider.fetchData() // non ui thread, suspend until finished showText(result) // ui thread hideLoading() // ui thread }
  8. fun loadData() = GlobalScope.launch(uiContext) { showLoading() // ui thread val

    result = dataProvider.fetchData() // non ui thread, suspend until finished showText(result) // ui thread hideLoading() // ui thread }
  9. Context fun main() = runBlocking<Unit> { println("My context is: $coroutineContext")

    } My context is: [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@73c6c3b2, BlockingEventLoop@48533e64]
  10. ‘Dat Sequential suspend fun doSomethingUsefulOne(): Int { delay(1000L) //some fancy

    magic return 13 } suspend fun doSomethingUsefulTwo(): Int { delay(1000L) //some fancy magic again return 29 }
  11. ‘Dat Sequential val time = measureTimeMillis { val one =

    doSomethingUsefulOne() val two = doSomethingUsefulTwo() println("The answer is ${one + two}") } println("Completed in $time ms") The answer is 42 Completed in 2007 ms
  12. ‘Dat async val time = measureTimeMillis { val one =

    async(IO){ doSomethingUsefulOne() } val two = async(IO) { doSomethingUsefulTwo() } println("The answer is ${one.await() + two.await()}") } println("Completed in $time ms") The answer is 42 Completed in 1014 ms
  13. Dispatcher To specify where the coroutines should run, Kotlin provides

    three Dispatchers you can use for thread dispatch.
  14. Dispatchers.Main Main thread on Android, interact with the UI and

    perform light work Great for : - Calling suspend functions - Call UI functions - Updating LiveData
  15. Dispatchers.IO Optimized for disk and network IO off the main

    thread Great for : - Database* - Reading/writing files - Networking$**
  16. Dispatchers.Default Optimized for CPU intensive work off the main thread

    Great for : - Sorting a list - Parsing JSON - DiffUtils
  17. class SearchViewModel(val repo: SearchRepository): ViewModel() { private val _sortedResults =

    MutableLiveData<List<SearchResults>>() val sortedResults: LiveData<List<SearchResults>> = _sortedResults fun onSortAscending() = sortSearchBy(sort = true) fun onSortDescending() = sortSearchBy(sort = false) fun sortSearchBy(sort: Boolean, query: String) { viewModelScope.launch { // suspend and resume make this database request main-safe // so our ViewModel doesn't need to worry about threading _sortedResults.value = repo.fetchSearchResults(query, sort) } } }
  18. class SearchViewModel(val repo: SearchRepository): ViewModel() { private val _sortedResults =

    MutableLiveData<List<SearchResults>>() val sortedResults: LiveData<List<SearchResults>> = _sortedResults fun onSortAscending() = sortSearchBy(ascending = true) fun onSortDescending() = sortSearchBy(ascending = false) fun sortSearchBy(sort: Boolean, query: String) { viewModelScope.launch { // suspend and resume make this database request main-safe // so our ViewModel doesn't need to worry about threading _sortedResults.value = repo.fetchSearchResults(query, sort) } } }
  19. class SearchViewModel(val repo: SearchRepository): ViewModel() { private val _sortedResults =

    MutableLiveData<List<SearchResults>>() val sortedResults: LiveData<List<SearchResults>> = _sortedResults fun onSortAscending() = sortSearchBy(ascending = true) fun onSortDescending() = sortSearchBy(ascending = false) fun sortSearchBy(sort: Boolean, query: String) { viewModelScope.launch { // suspend and resume make this database request main-safe // so our ViewModel doesn't need to worry about threading _sortedResults.value = repo.fetchSearchResults(query, sort) } } }
  20. class SearchViewModel(val repo: SearchRepository): ViewModel() { private val _sortedResults =

    MutableLiveData<List<SearchResults>>() val sortedResults: LiveData<List<SearchResults>> = _sortedResults fun onSortAscending() = sortSearchBy(ascending = true) fun onSortDescending() = sortSearchBy(ascending = false) fun sortSearchBy(sort: Boolean, query: String) { viewModelScope.launch { // suspend and resume make this database request main-safe // so our ViewModel doesn't need to worry about threading _sortedResults.value = repo.fetchSearchResults(query, sort) } } }
  21. class SearchRepository(val api: SearchAPI) { suspend fun fetchSearchResults( query: String,

    sort: Boolean ): List<SearchResults> { api.fetchResults(query, sort) .map(::mapToList) } }
  22. class SearchRepository(val api: SearchAPI) { suspend fun fetchSearchResults( query: String,

    sort: Boolean ): List<SearchResults> { api.fetchResults(query, sort) .map(::mapToList) } }
  23. class SearchRepository(val api: SearchAPI) { suspend fun fetchSearchResults( query: String,

    sort: Boolean ): List<SearchResults> { api.fetchResults(query, sort) .map(::mapToList) } }
  24. //Rx fun fetchResults( query: String, sort: Boolean ): Single<SearchResponse> //Coroutine

    suspend fun fetchResults( query: String, sort: Boolean ): SearchResponse
  25. //Rx fun fetchResults( query: String, sort: Boolean ): Single<SearchResponse> //Coroutine

    suspend fun fetchResults( query: String, sort: Boolean ): SearchResponse
  26. //Rx fun fetchResults( query: String, sort: Boolean ): Single<SearchResponse> //Coroutine

    suspend fun fetchResults( query: String, sort: Boolean ): SearchResponse
  27. //Fast and local fetch fun foo(param: Param) : Result //Slow

    and IO suspend fun foo(param: Param) : Result //BG fun CoroutineScope.foo(param: Param): Result
  28. Asynchronicity Comparison • Observables in RxJava Observable.create<Int> { emitter ->

    for (i in 1..5) { emitter.onNext(i) } emitter.onComplete() }
  29. Asynchronicity Comparison • Channels val channel = Channel<Int>() launch {

    // this might be heavy CPU-consuming computation or async logic //we'll just send five squares for (x in 1..5) channel.send(x) } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!")
  30. Asynchronicity Comparison • Channels val channel = Channel<Int>() launch {

    // this might be heavy CPU-consuming computation or async logic //we'll just send five squares for (x in 1..5) channel.send(x) channel.close() } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!")
  31. Asynchronicity Comparison • Channels val channel = Channel<Int>() launch {

    // this might be heavy CPU-consuming computation or async logic //we'll just send five squares for (x in 1..5) channel.send(x) channel.close() } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!")
  32. Asynchronicity Comparison • Channels val channel = Channel<Int>() launch {

    // this might be heavy CPU-consuming computation or async logic //we'll just send five squares for (x in 1..5) channel.send(x) channel.close() } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!")
  33. Asynchronicity Comparison • Channels ? Observable ? When you consume

    an object from the channel no other coroutine will be able to get the same object
  34. Asynchronicity Comparison • Channels ~ Race Conditions ? val channel

    = Channel<Int>() launch { val value1 = channel.receive() } launch { val value2 = channel.receive() } launch { channel.send(1) }
  35. Asynchronicity Comparison • Rx Subjects val subject: PublishSubject<Int> = PublishSubject.create()

    subject.subscribe { consumeValue(it) } subject.subscribe { println(it) }
  36. Asynchronicity Comparison • BroadcastChannel val channel = BroadcastChannel<Int>(2) val observer1Job

    = launch { channel.openSubscription().use { channel -> for (value in channel) { consumeValue(value) } // subscription will be closed } }
  37. Asynchronicity Comparison • Produce val publisher = produce(capacity = 2)

    { for (i in 1..5) send(i) } • Only the code inside produce can send elements to the channel
  38. Asynchronicity Comparison • Actor An actor also creates a coroutine

    with a channel built in. Produce implements the ReceiveChannel<E> interface whereas actor implements the SendChannel<E> interface.
  39. Asynchronicity Comparison • Actor val actor = actor<Int>(CommonPool) { for

    (int in channel) { // iterate over received Integers } } launch { actor.send(2) }
  40. Flow Flow is a Stream of Data, which can emit

    multiple values and complete with or without an error
  41. Flow val myFlow: Flow<Int> = flow { emit(1) emit(2) delay(100)

    emit(3) } flow.collect { value -> println("Received $value") } Received 1 Received 2 Received 3 $$==>
  42. Flow + Channels • Channels are Hot whereas Flows are

    cold • One Emits values from 1 side, receives from other side • A bit complex to work with
  43. Flow + Channels object Action { private val channel =

    BroadcastChannel<Unit>(1) button.onClick { channel.send(Unit) } fun getFlow(): Flow<Unit> = channel.asFlow() } Action.getFlow().collect { println(“Received Emission”) }
  44. Flow + Channels object Action { private val channel =

    BroadcastChannel<Unit>(1) button.onClick { channel.send(Unit) } fun getFlow(): Flow<Unit> = channel.asFlow() } Action.getFlow().collect { println(“Received Emission”) }
  45. Flow + Channels object Action { private val channel =

    BroadcastChannel<Unit>(1) button.onClick { channel.send(Unit) } fun getFlow(): Flow<Unit> = channel.asFlow() } Action.getFlow().collect { println(“Received Emission”) }
  46. Flow + Channels object Action { private val channel =

    BroadcastChannel<Unit>(1) button.onClick { channel.send(Unit) } fun getFlow(): Flow<Unit> = channel.asFlow() } Action.getFlow().collect { println(“Received Emission”) }
  47. Flow and Rx compositeDisposable.add(api.search(query, sort) .map{response -> response.toList() } .flatMap

    { searchResults -> cache.saveAndReturn(searchResults) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( { searchResults -> displayResults(searchResults) }, { error -> Log.e("TAG", "Failed to search", error) } ))
  48. Flow and Rx try { flow { emit(api.search(query, sort))} .map{response

    ->response.toList()} .flatMapConcat { searchResults -> cache.saveAndReturn(searchResults) } .flowOn(Dispatchers.IO) .collect { searchResultList -> withContext(Dispatchers.Main) { showSearchResult(searchResultList) } } } catch(e: Exception) { e.printStackTrace() }
  49. Rx / Suspend //Rx Observable.interval(1, TimeUnit.SECONDS) .subscribe { textView.text =

    "$it seconds have passed" } //Coroutine GlobalScope.launch { var i = 0 while (true){ textView.text = "${it++} seconds have passed" delay(1000) } }
  50. Rx / Suspend publishSubject .debounce(300, TimeUnit.MILLISECONDS) .distinctUntilChanged() .switchMap { searchQuery

    -> api.fetchResults(query, sort) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe({ ... })
  51. Rx / Suspend launch { broadcast.consumeEach { delay(300) val results

    = api.fetchResults(it) .await() displayResultsLogic() ) } }
  52. Rx / Suspend //Rx searchQuery.addTextChangedListener(object: TextWatcher { override fun afterTextChanged(s:

    Editable?) { publishSubject.onNext(s.toString()) } } //Coroutines searchQuery.addTextChangedListener(object: TextWatcher { override fun afterTextChanged(s: Editable?) { broadcast.send(s.toString()) } }