Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Using Kotlin Coroutines for Asynchronous and Co...

Using Kotlin Coroutines for Asynchronous and Concurrent

BuildStuff 2018 session on using Kotlin Coroutines for Asynchronous and Concurrent programming.
Video available at https://www.youtube.com/watch?v=oerXox4U9MU

Avatar for Pedro Felix

Pedro Felix

November 15, 2018

More Decks by Pedro Felix

Other Decks in Programming

Transcript

  1. whoami • @pmhsfelix • Software engineer • Backend systems, HTTP,

    .NET, JVM • Professor at the Lisbon Polytechnic Institute • Concurrent Programming and Web Application Development • Working in HTTP based systems for the last 7 years • Mostly asynchronous implementations • .NET: APM, TAP, async-await • JVM: RxJava and Reactive Streams
  2. Kotlin • Statically typed programming language • Created circa 2011

    by JetBrains • Excellent JVM ecosystem integration • Android official language, since 2017 • Spring Boot 2.0 support, also since 2017 • Version 1.3 in October 2018 • Coroutines are now a stable feature
  3. Why Asynchronous? • Blocking waits monopolize threads • All threads

    are costly (e.g. 1Mb stack memory) … • … and some threads are special (e.g. UI thread) • Asynchronous I/O • Does not require a blocked thread per request • None. Really.
  4. Why Asynchronous? External Dependency 500 RPS 2 seconds latency Sync

    I/O 1000 = 2 x 500 blocked threads on the outbound IO ~ 1Gb just for stack memory External Dependency 500 RPS 2 seconds latency Async I/O No thread blocked on I/O Synchronous Asynchronous
  5. Why Asynchronous? • Not the same as parallel programming -

    Sequential asynchronous flows • Asynchronous implementations of synchronous communication patterns • Will not make single things necessarily faster • Asynchronous I/O will be slower than synchronous I/O • Better • Throughput • Responsiveness • Resource (i.e. thread) usage
  6. Asynchronous APIs • Callbacks (e.g. NIO2, Apache HTTP Client) •

    Futures (e.g. AsyncHttpClient, JDK11’s HttpClient) • RxJava and Reactive Streams (e.g. Spring WebFlux) However…
  7. … let’s consider an example fun example0(input: Int): Int {

    var acc = input for (i in 0..2) { val result = blockingOperation(acc) acc = doSomethingWith(result) } return acc } A loop … … with a blocking operation inside it … sequential
  8. Callbacks fun example1(input: Int): Int { var acc = input

    for (i in 0..2) { asyncOperationWithCallback(acc) { result -> `?` } acc = doSomethingWith(`?`) } return acc }
  9. fun example2(input: Int): Int { var acc = input for

    (i in 0..2) { asyncOperationWithCallback(acc) { result -> acc = doSomethingWith(result) } // acc = doSomethingWith(result) } return acc } Callbacks Now doSomethingWith is not on the for anymore Breaks control flow!
  10. Futures fun example3(input: Int): Int { var acc = input

    for (i in 0..2) { val future = asyncOperationReturningFuture(acc) acc = doSomethingWith(future.get()) } return acc } Now it is blocking again!
  11. Futures fun example4(input: Int): Int { var acc = input

    for (i in 0..2) { val future = asyncOperationReturningFuture(acc) future.thenAccept { result -> acc = doSomethingWith(result) } // acc = doSomethingWith(result) } return acc } Same issue as with callbacks Breaks control flow!
  12. Callbacks, Futures, Observables • Do not fit well with the

    structure of imperative code • Functions created by sequences of statements • With complex control logic (ifs, fors, try-catchs) • Futures and Reactive Streams (Observables) • Function composition and not statement sequences • E.g. map, flatMap, zip • Impose a different programming style
  13. Callbacks, Futures, Observables • Do not fit well with the

    structure of imperative code • Functions created by sequences of statements • With complex control logic (ifs, fors, try-catchs) • Futures and Reactive Streams (Observables) • Function composition and not statement sequences • E.g. map, flatMap, zip • Impose a different programming style
  14. The challenge How to create code that doesn’t block threads?

    While still using the same classical imperative synchronous coding style
  15. If Kotlin had async-await (similar to C#/JS) async fun example5(input:

    Int): CompletableFuture<Int> { var acc = input for (i in 0..2) { val result = await asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc } await operator transforms a future<T> into a T Waits for the response Without blocking the host thread
  16. If Kotlin had async-await (similar to C#/JS) async fun example5(input:

    Int): CompletableFuture<Int> { var acc = input for (i in 0..2) { val result = await asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc } future<T> return type Even if return is a T Internal detail not visible in the function type
  17. Coroutines “Coroutines (…) generalize subroutines (..) by allowing multiple entry

    points for suspending and resuming execution at certain locations.” In https://en.wikipedia.org/wiki/Coroutine “The coroutine notion can greatly simplify the conception of a program when its modules do not communicate with each other synchronously” In M. Conway, “Design of a Separable Transition-Diagram Compiler”, Communications of the ACM, 1963.
  18. fun example(input: Int): Int { var acc = input for

    (i in 0..2) { val result = asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc }
  19. Kotlin Coroutines suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc }
  20. Kotlin Coroutines suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } still returns an Int and not a future<Int> await is an extension function (and not a language operator) suspension point
  21. Direct call suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … }
  22. Direct call suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … } suspend fun caller2(): Int { … val res: Int = caller1(anotherInt) … } Look, no awaits!
  23. Direct call suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … } suspend fun caller2(): Int { … val res: Int = caller1(anotherInt) … } future-like object await suspend suspend future<T> T T T Direct calls No awaits or similar Sequential by default
  24. Looking inside .await … suspend fun example5(input: Int): Int {

    var acc = input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc }
  25. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine
  26. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept
  27. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes
  28. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes suspendCoroutine
  29. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes suspendCoroutine val result = acc = doSomething(…)
  30. Hiding the suspendCoroutine private suspend fun <T> CompletableFuture<T>.await(): T =

    suspendCoroutine { continuation -> whenComplete { value, error -> if (error != null) continuation.resumeWithException(error) else continuation.resume(value) } } suspend fun example6(input: Int): Int { var acc = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc }
  31. SuspendCoroutine function • Provides continuation to resume execution • Seems

    to starts in one thread and end on another • Creates a suspension point with any callback-based API • Used inside await extension functions • Only usable on suspend functions
  32. Suspend functions • Compose directly – called without any await

    required • Non-future return type • Because the underlying function interface is changed • Continuous Passing Style interface • fun(input:Int, continuation: Continuation<Int>): Object • Cannot be called directly from non-suspend functions
  33. Kotlin Coroutines suspend fun example7(input: Int): Int { var acc

    = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } (…) suspend fun caller2(): Int { … val res: Int = caller1(someInt) … } fun nonSuspendCaller(): Int { … val res: Int = caller2(someInt) … } Error: Kotlin: suspend function ‘caller1’ should be called only from a coroutine or another suspend function
  34. Callback based method suspend Coroutine future-like object await suspend suspend

    regular function future<T> T T T Error: Kotlin: suspend function ‘…’ should be called only from a coroutine or another suspend function suspend is part of the function type • E.g. a suspend method cannot override a non-suspend method • E.g. a suspend method cannot be a Spring MVC controller handler
  35. Callback based method suspend Coroutine future-like object await suspend suspend

    regular function future<T> T T coroutine builders T future<T> builder suspend return notes launch () -> Unit Job async () -> T Deferred<T> Derives from Job future () -> T CompletableFuture<T> JDK8 runBlocking () -> T T Blocking
  36. Callback based method suspend Coroutine future-like object await suspend suspend

    regular function future<T> T T coroutine builders T future<T> Øsuspend function – defines a state machine Øcoroutine - instance of the state machine
  37. Coroutine builders • Create and start a coroutine • instance

    of the state machine defined by suspend • Can be used from suspend and non suspend functions • Parallel semantics • The started coroutine will run in parallel with the creation point
  38. A Spring controller @GetMapping("/example") fun getExample(): Int { log.info("controller handler

    starting") example7(1) log.info("controller handler ending") } Error: Kotlin: suspend function ‘…’ should be called only from a coroutine or another suspend function
  39. A Spring controller @GetMapping("/example") fun getExample() = DeferredResult<Int>().apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } DeferredResult is a future-like type from Spring Complete the DeferredResult
  40. A Spring Controller @GetMapping("/example") fun getExample() = DeferredResult<Int>().apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } Async I/O await suspend suspend Spring Handler coroutine builders
  41. A Spring Controller @GetMapping("/example") fun getExample() = DeferredResult<Int>().apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } 6927 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 6954 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending 6963 [DefaultDispatcher-worker-2] INFO intro - before asyncOperationReturningFuture 7973 [DefaultDispatcher-worker-2] INFO intro - after asyncOperationReturningFuture (…) 8977 [DefaultDispatcher-worker-1] INFO intro - doing something with 86 8977 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 9978 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 9978 [DefaultDispatcher-worker-1] INFO intro - doing something with 129
  42. Can we do it better? @GetMapping("/example") fun getExample() = DeferredResult<Int>().apply

    { log.info("controller handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") }
  43. Sure! fun <T> asyncHandler(ms: Long? = null, block: suspend ()

    -> T) = DeferredResult<T>(ms).apply { GlobalScope.launch { try { setResult(block()) } catch (ex: Throwable) { setErrorResult(ex) } } } @GetMapping("/example") fun getExample() = asyncHandler { example7(1) }
  44. Coroutine Context • Immutable set of context elements • Similar

    to thread local storage • Available during coroutine lifetime - coroutineContext • Defined when building the coroutine – launch, async • Or explicitly changed using withContext • Context elements configure coroutine behavior • Namely thread dispatching
  45. Thread dispatch • In which thread do the coroutine segments

    run? • The initial segment, after creation • The segments after the callbacks • The answer is in the thread dispatcher element public object Dispatchers { public val Default … public val IO … public val Main … @ExperimentalCoroutinesApi public val Unconfined … } // defined when creating launch(Dispatchers.IO) { … } // and can be changed withContext(Dispatchers.Main) { … }
  46. With a custom dispatcher 9283 [http-nio-8080-exec-1] INFO DemoApplication - starting

    controller handler 9309 [http-nio-8080-exec-1] INFO DemoApplication - returning from controller handler 9318 [the-example-thread] INFO DemoApplication - using interceptor java.util.concurrent.Executors$FinalizableDelegatedExecutorService@824a3b5 9320 [the-example-thread] INFO intro - before asyncOperationReturningFuture 10330 [the-example-thread] INFO intro - after asyncOperationReturningFuture 10330 [the-example-thread] INFO intro - doing something with 43 private val exampleDispatcher = Executors.newSingleThreadExecutor { r -> Thread(r).apply { name = "the-example-thread" } }.asCoroutineDispatcher() @GetMapping("/example-with-dispatcher") fun getExampleWithDispatcher() = asyncHandler { withContext(exampleDispatcher) { log.info("using interceptor {}", coroutineContext[ContinuationInterceptor]) example7(1) } }
  47. What if a timeout happens… @GetMapping("/example") fun getExample() = DeferredResult<Int>(1000).apply

    { log.info("controller handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } 19212 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 19240 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending (…) 20266 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] 21258 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 21258 [DefaultDispatcher-worker-1] INFO intro - doing something with 86 21258 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - doing something with 129
  48. With a timeout @GetMapping("/example") fun getExample() = DeferredResult<Int>(1000).apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } 19212 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 19240 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending (…) 20266 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] 21258 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 21258 [DefaultDispatcher-worker-1] INFO intro - doing something with 86 21258 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - doing something with 129 After 1000 ms the requests timeout and a 503 response is produced However the coroutine keeps running GlobalScope.launch
  49. Parent-child coroutine relations coroutine coroutine coroutine coroutine coroutine Parent only

    completes when descendants complete Cancellation propagates to children Error propagates to parent
  50. DeferredResultScope class DeferredResultScope<T>(val timeout: Long) : DeferredResult<T>(timeout), CoroutineScope { private

    val job = Job() override val coroutineContext: CoroutineContext get() = job init { this.onTimeout { log.info("DeferredResult timed out") setErrorResult(AsyncRequestTimeoutException()) job.cancel() } } }
  51. With a timeout @GetMapping("/example") fun getExample() = DeferredResultScope<Int>(1000).apply { log.info("controller

    handler starting") this.launch { try { setResult(example7(1)) } catch (ex: Throwable) { log.info("catched '{}'", ex.message) setErrorResult(ex) } } log.info("controller handler ending") }
  52. With a timeout @GetMapping("/example") fun getExample() = DeferredResultScope<Int>(1000).apply { log.info("controller

    handler starting") this.launch { try { setResult(example7(1)) } catch (ex: Throwable) { log.info("catched '{}'", ex.message) setErrorResult(ex) } } log.info("controller handler ending") } 12127 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 12152 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending 12157 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 13168 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 13170 [DefaultDispatcher-worker-1] INFO intro - doing something with 43 13170 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 14029 [http-nio-8080-exec-2] INFO DemoApplication - DeferredResult timed out 14035 [DefaultDispatcher-worker-2] INFO DemoApplication - catched 'Job was cancelled' 14045 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]
  53. With a timeout @GetMapping("/example") fun getExample() = DeferredResultScope<Int>(1000).apply { log.info("controller

    handler starting") this.launch { try { setResult(example7(1)) } catch (ex: Throwable) { log.info("catched '{}'", ex.message) setErrorResult(ex) } } log.info("controller handler ending") } 12127 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 12152 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending 12157 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 13168 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 13170 [DefaultDispatcher-worker-1] INFO intro - doing something with 43 13170 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 14029 [http-nio-8080-exec-2] INFO DemoApplication - DeferredResult timed out 14035 [DefaultDispatcher-worker-2] INFO DemoApplication - catched 'Job was cancelled' 14045 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] Ø Any inner coroutines will also be cancelled Ø As long as scope is properly propagated Ø I.e. GlobalScope is not used
  54. Sequential vs Parallel var img1 = await loadImage(...) var img2

    = await loadImage(...) doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential (yet asynchronous) Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov
  55. Sequential vs Parallel var img1 = await loadImage(...) var img2

    = await loadImage(...) doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential (yet asynchronous) Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov Concurrency is explicit
  56. Resources • https://github.com/pmhsfelix/kotlin-coroutines • https://speakerdeck.com/pmhsfelix • KotlinConf 2017 and 2018

    videos by Roman Elizarov • Coroutines Guide • https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md • ktor – Web Applications framework • https://ktor.io
  57. Resources • https://github.com/pmhsfelix/kotlin-coroutines • https://speakerdeck.com/pmhsfelix • KotlinConf 2017 and 2018

    videos by Roman Elizarov • Coroutines Guide • https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md • ktor – Web Applications framework • https://ktor.io Thanks!