Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Coroutines in Kotlin v3: In-depth review

Coroutines in Kotlin v3: In-depth review

Slides from my talk at UA Mobile 2017, 2017-11-25
Video: https://www.youtube.com/watch?v=tql9fmWX_oc

Modern programming cannot be imagined without long-running operations such as CPU-intensive computations or blocking IO running in the background. Unfortunately, having too many active threads is very expensive and kills performance, but there's a solution - coroutines. They provide a way to avoid blocking a thread and replace it with a cheaper and more controllable operation: suspension of a coroutine.

I'll talk about generators/yield, async/await, channels, composable/delimited continuations, and its usage; consider available APIs and make the overview of standard kotlinx.coroutines library.

Avatar for Dmytro Zaitsev

Dmytro Zaitsev

November 25, 2017
Tweet

More Decks by Dmytro Zaitsev

Other Decks in Programming

Transcript

  1. v1: Sequential (Direct style) fun postItem(item: Item) {
 val token

    = prepareToken() / / 1
 val post = submitPost(token, item) / / 2
 processPost(post) / / 3
 }
  2. v2: Callbacks (Continuation-Passing style) fun postItem(item: Item) {
 prepareTokenAsync {

    token -> / / 1
 submitPostAsync(token, item) { post -> / / 2
 processPost(post) / / 3
 }
 }
 } Continuation
  3. v2: Callbacks (Continuation-Passing style) fun postItem(item: Item) {
 prepareTokenAsync {

    token -> / / 1
 submitPostAsync(token, item) { post -> / / 2
 processPost(post) / / 3
 }
 }
 } Callback hell
  4. v3: Rx/Futures/Promises fun postItem(item: Item) {
 observeToken()
 .concatMap { token

    -> observePost(token, item) }
 .subscribe { post -> processPost(post) }
 }
  5. v4: Coroutines Direct Style suspend fun postItem(item: Item) {
 val

    token = prepareToken() / / 1
 val post = submitPost(token, item) / / 2
 processPost(post) / / 3
 }
  6. v4: Coroutines Direct Style suspend fun postItem(item: Item) {
 val

    token = prepareToken() / / 1
 val post = submitPost(token, item) / / 2
 processPost(post) / / 3
 }
  7. v4: Coroutines Direct Style suspend fun postItem(item: Item) {
 val

    token = prepareToken() / / 1
 val post = submitPost(token, item) / / 2
 processPost(post) / / 3
 }
  8. Profit: try/catch, loops, std-lib suspend fun postItems(items: List<Item>) { try

    { val token = prepareToken() items.forEach { item -> val post = submitPost(token, item) processPost(post) } } catch (e: BadTokenException) { /*…*/ } }
  9. Experimental status New style of programming The design is not

    final and expected to change JetBrains still collects information and feedbacks Backwards compatibility guaranteed
  10. A coroutine is… an instance of suspendable computation similar to

    a daemon thread, but very light-weight similar to a future or promise
  11. Why coroutines? threads are expensive to keep and switch your

    code is single threaded you’ve got lots of mutable states
  12. Standard API • Language support (`suspend` keyword) • low-level basic

    API (stdlib: kotlin.coroutines) • high-level APIs that can be used in user code
  13. Low-level API (kotlin.coroutines) • kotlin.coroutines.experimental • create/start/suspendCoroutine() • Continuation interface

    • @RestrictSuspension annotation • kotlin.coroutines.experimental.intrinsics • suspendCoroutineOrReturn()
  14. Continuation / / Kotlin suspend fun submitPost( token: Token, item:

    Item): Post {…} / / Java/JVM Object submitPost( Token token, Item item, Continuation<Post> cont) {…} compiler magic
  15. Continuations suspend fun postItem(item: Item) {
 val token = prepareToken()


    val post = submitPost(token, item)
 processPost(post)
 } Initial continuation
  16. Continuations suspend fun postItem(item: Item) {
 val token = prepareToken()


    val post = submitPost(token, item)
 processPost(post)
 } Continuation
  17. Continuations suspend fun postItem(item: Item) {
 val token = prepareToken()


    val post = submitPost(token, item)
 processPost(post)
 } Continuation
  18. Labels suspend fun postItem(item: Item) { / / LABEL 0


    val token = prepareToken() / / LABEL 1
 val post = submitPost(token, item) / / LABEL 2
 processPost(post)
 }
  19. Labels suspend fun postItem(item: Item) { switch(label) { case 0:

    val token = prepareToken() case 1:
 val post = submitPost(token, item) case 2:
 processPost(post) }
 }
  20. State suspend fun postItem(item: Item) { val stateMachine = object:

    CoroutineImpl {…} switch(stateMachine.label) { case 0: val token = prepareToken() case 1:
 val post = submitPost(token, item) case 2:
 processPost(post) }
 }
  21. CPS Transform fun postItem(item: Item, cont: Continuation) { val stateMachine

    = object: CoroutineImpl {…} switch(stateMachine.label) { case 0: val token = prepareToken(stateMachine) case 1:
 val post = submitPost(token, item, stateMachine) case 2:
 processPost(post) }
 }
  22. Save state fun postItem(item: Item, cont: Continuation) { val stateMachine

    = object: CoroutineImpl {…} switch(stateMachine.label) { case 0: stateMachine.item = item stateMachine.label = 1 prepareToken(stateMachine) case 1:
 … }
 }
  23. Callback fun postItem(item: Item, cont: Continuation) { val stateMachine =

    cont as? ThisSM ?: object: ThisSM { fun resume(…) { postItem(null, this) } } switch(stateMachine.label) { case 0: …. }
 }
  24. Restore state and continue fun postItem(item: Item, cont: Continuation) {

    … case 0: stateMachine.item = item stateMachine.label = 1 prepareToken(stateMachine) case 1: val item = stateMachine.item val token = stateMachine.result stateMachine.label = 2
 submitPost(token, item, stateMachine) case 2:
 …
  25. Subscribe suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont

    -> subscribe(object : SingleObserver<T> { override fun onSuccess(t: T) { TODO() } override fun onError(error: Throwable) { TODO() } override fun onSubscribe(d: Disposable) { TODO() } }) }
  26. Return a result, if successful suspend fun <T> Single<T>.await(): T

    = suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver<T> { override fun onSuccess(t: T) { cont.resume(t) } override fun onError(error: Throwable) { TODO() } override fun onSubscribe(d: Disposable) { TODO() } }) }
  27. Or resume with exception suspend fun <T> Single<T>.await(): T =

    suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver<T> { override fun onSuccess(t: T) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } override fun onSubscribe(d: Disposable) { TODO() } }) }
  28. Don’t forget to dispose suspend fun <T> Single<T>.await(): T =

    suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver<T> { override fun onSuccess(t: T) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } override fun onSubscribe(d: Disposable) { cont.invokeOnCompletion { d.dispose() } } }) }
  29. And this is it suspend fun <T> Single<T>.await(): T =

    suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver<T> { override fun onSuccess(t: T) { cont.resume(t) } override fun onError(error: Throwable) { cont.resumeWithException(error) } override fun onSubscribe(d: Disposable) { cont.invokeOnCompletion { d.dispose() } } }) }
  30. kotlinx.coroutines Core integration Guava JDK 8 NIO Quasar reactive Reactor

    RxJava 1.x Reactive Streams RxJava 2.x UI Android JavaFX Swing
  31. async/await / / C# way async Task ProcessImage(String url)
 {


    var image = await LoadImage(url);
 imageCache.Add(image);
 } / / Kotlin way suspend fun processImage(url: String) {
 val image = loadImageAsync(url).await()
 imageCache.add(image)
 }
  32. Not idiomatic way fun loadImageAsync(url: String): Deferred<Image> = async {

    TODO() } suspend fun processImage(url: String) { val image = loadImageAsync(url).await() imageCache.add(image) } Don’t define async functions in the first place
  33. Idiomatic way suspend fun loadImage(url: String): Image = TODO() suspend

    fun processImage(url: String) { val image = async { loadImage(url) }.await() imageCache.add(image) } Keep concurrency explicit
  34. buildSequence {
 print(“Start: ")
 var prev = 1; var cur

    = 1
 while (true) { print(“Next")
 yield(prev) / / suspension point val next = prev + cur
 prev = cur; cur = next
 }
 print("End") / / unreachable code
 }.take(6).forEach { print(" $it ") }
 / / Output: Start 1 Next 1 Next 2 Next 3 Next 5 Next 8
  35. buildSequence {
 print(“Start: ")
 var prev = 1; var cur

    = 1
 while (true) { print(“Next")
 yield(prev) / / suspension point val next = prev + cur
 prev = cur; cur = next
 }
 print("End") / / unreachable code
 }.take(6).forEach { print(" $it ") }
 / / Output: Start 1 Next 1 Next 2 Next 3 Next 5 Next 8
  36. buildSequence {
 print(“Start: ")
 var prev = 1; var cur

    = 1
 while (true) { print(“Next")
 yield(prev) / / suspension point val next = prev + cur
 prev = cur; cur = next
 }
 print("End") / / unreachable code
 }.take(6).forEach { print(" $it ") }
 / / Output: Start 1 Next 1 Next 2 Next 3 Next 5 Next 8
  37. buildSequence {
 print(“Start: ")
 var prev = 1; var cur

    = 1
 while (true) { print(“Next")
 yield(prev) / / suspension point val next = prev + cur
 prev = cur; cur = next
 }
 print("End") / / unreachable code
 }.take(6).forEach { print(" $it ") }
 / / Output: Start 1 Next 1 Next 2 Next 3 Next 5 Next 8
  38. buildSequence {
 print(“Start: ")
 var prev = 1; var cur

    = 1
 while (true) { print(“Next")
 yield(prev) / / suspension point val next = prev + cur
 prev = cur; cur = next
 }
 print("End") / / unreachable code
 }.take(6).forEach { print(" $it ") }
 / / Output: Start 1 Next 1 Next 2 Next 3 Next 5 Next 8
  39. Job states State isActive isCompleted isCancelled New - - -

    Active ✔ - - Completed - ✔ - Cancelling - - ✔ Cancelled - ✔ ✔
  40. val deferred = async(CommonPool) {
 throw SomeException("I'm thrown inside a

    coroutine")
 }
 try {
 deferred.await() / / re-throws
 } catch (e: SomeException) {
 log(e.message)
 } Exception handling
  41. WeakReference “life hack” suspend operator fun <T> WeakReference<T>.invoke(): T =

    suspendCoroutineOrReturn { get() ?: throw CancellationException() } val activityRef = WeakReference(this)
 launch(CommonPool) {
 activityRef().expensiveComputation()
 }
  42. Links Andrey Breslav FAQ:
 https:/ /discuss.kotlinlang.org/t/experimental-status-of-coroutines-in-1-1-and- related-compatibility-concerns/2236 Design document (KEEP):


    https:/ /github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines- informal.md Full kotlinx.coroutines API:
 http:/ /kotlin.github.io/kotlinx.coroutines Coroutines guide by Roman ELizarov:
 https:/ /github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md