Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Using Kotlin Coroutines for Asynchronous and Co...

Pedro Felix
February 28, 2019

Using Kotlin Coroutines for Asynchronous and Concurrent Programming

Slides for my NDC Porto 2019 session on Kotlin Coroutines.

Pedro Felix

February 28, 2019
Tweet

More Decks by Pedro Felix

Other Decks in Programming

Transcript

  1. whoami • @pmhsfelix • Software engineer • Backend systems, HTTP,

    Identity and Access Management, .NET, JVM • Professor at the Lisbon Polytechnic Institute • Concurrent Programming and Web Application Development • Working in HTTP based systems for the last 7 years • Mostly asynchronous implementations • .NET: APM, Task-based Asynchronous Pattern, async-await • JVM: RxJava and Reactive Streams
  2. Kotlin • Statically typed programming language • Created circa 2011

    by JetBrains • Excellent JVM ecosystem integration • Android official language, since 2017 • Spring Boot 2.0 support, also since 2017 • Version 1.3 in October 2018 • Coroutines are now a stable feature
  3. Why Asynchronous? • Blocking waits monopolize threads • All threads

    are costly (e.g. 1Mb stack memory on JVM) … • … and some threads are special (e.g. UI thread)
  4. Why Asynchronous? External Dependency 500 RPS 2 seconds latency Sync

    I/O 1000 = 2 x 500 blocked threads on the outbound IO ~ 1Gb just for stack memory External Dependency 500 RPS 2 seconds latency Async I/O No thread blocked on I/O Synchronous Asynchronous
  5. Why Asynchronous? • Not the same as parallel programming -

    Sequential asynchronous flows • Asynchronous implementations of synchronous communication patterns • Will not make single things necessarily faster • Asynchronous I/O will be slower than synchronous I/O • Better • Throughput • Responsiveness • Resource usage – thread usage • Stability
  6. Asynchronous APIs • Callbacks (e.g. Java NIO2, Node.js APIs, .NET

    APM) • Futures (e.g. JDK11 HttpClient, JS Promises, .NET Task<T>) • Reactive Streams (e.g. RxJava, RxJS, .NET Rx Extensions) However…
  7. … let’s consider an example fun example0(input: Int): Int {

    var acc = input for (i in 0..2) { val result = blockingOperation(acc) acc = doSomethingWith(result) } return acc } A loop … … with a blocking operation inside it … sequential
  8. Futures fun example3(input: Int): Int { var acc = input

    for (i in 0..2) { val future = asyncOperationReturningFuture(acc) acc = doSomethingWith(future.get()) } return acc } Now it is blocking again!
  9. Futures fun example4(input: Int): Int { var acc = input

    for (i in 0..2) { val future = asyncOperationReturningFuture(acc) future.thenAccept { result -> acc = doSomethingWith(result) } // acc = doSomethingWith(result) } return acc } Breaks control flow!
  10. Callbacks, Futures, Observables • Do not fit well with the

    structure of imperative code • Functions created by sequences of statements • With complex control logic (ifs, fors, try-catchs) • Futures and Reactive Streams (Observables) • Function composition and not statement sequences • E.g. map, flatMap, zip • Impose a different programming style
  11. Callbacks, Futures, Observables • Do not fit well with the

    structure of imperative code • Functions created by sequences of statements • With complex control logic (ifs, fors, try-catchs) • Futures and Reactive Streams (Observables) • Function composition and not statement sequences • E.g. map, flatMap, zip • Impose a different programming style
  12. The challenge How to create code that doesn’t block threads?

    While still using the same classical imperative synchronous coding style
  13. If Kotlin had async-await (similar to C#/JS) async fun example5(input:

    Int): CompletableFuture<Int> { var acc = input for (i in 0..2) { val result = await asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc } await operator transforms a future<T> into a T Waits for the response Without blocking the host thread
  14. If Kotlin had async-await (similar to C#/JS) async fun example5(input:

    Int): CompletableFuture<Int> { var acc = input for (i in 0..2) { val result = await asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc } future<T> return type Even if return is a T Internal detail not visible in the function type
  15. • async is a private implementation detail • Function still

    return future<T> (e.g. Task<T> or Promise) • Needs await to access the value from the future<T> async function async function async function future<T> future<T> future<T> await await await
  16. Coroutines “Coroutines (…) generalize subroutines (..) by allowing multiple entry

    points for suspending and resuming execution at certain locations.” In https://en.wikipedia.org/wiki/Coroutine “The coroutine notion can greatly simplify the conception of a program when its modules do not communicate with each other synchronously” In M. Conway, “Design of a Separable Transition-Diagram Compiler”, Communications of the ACM, 1963.
  17. fun example(input: Int): Int { var acc = input for

    (i in 0..2) { val result = blockingOperation(acc) acc = doSomethingWith(result) } return acc }
  18. Kotlin Coroutines suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = suspendFunctionReturningValue(acc) acc = doSomethingWith(result) } return acc }
  19. Kotlin Coroutines suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = suspendFunctionReturningValue(acc) acc = doSomethingWith(result) } return acc } Returns an Int and not a future<Int> suspension point Part of the signature Not an implementation detail
  20. Direct call suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = suspendFunctionReturningValue(acc) acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … }
  21. Direct call suspend fun example(input: Int): Int { var acc

    = input for (i in 0..2) { val result = suspendFunctionReturningValue(acc) acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … } suspend fun caller2(): Int { … val res: Int = caller1(anotherInt) … } Look, no awaits!
  22. suspend function T suspend function suspend function No await or

    async needed Sequential by default Non-blocking by default T Plain calls between suspend functions Plain types returned – no future<T>
  23. Callback based method future-like object suspend function future<T> T suspend

    function suspend function No await or async needed Sequential by default Non-blocking by default T Non-suspend Asynchronous APIs
  24. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc }
  25. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine
  26. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept
  27. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes suspendCoroutine
  28. suspendCoroutine suspend fun example5(input: Int): Int { var acc =

    input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes suspendCoroutine val result = acc = doSomething(…)
  29. Hiding the suspendCoroutine… private suspend fun <T> CompletableFuture<T>.await(): T =

    suspendCoroutine { continuation -> whenComplete { value, error -> if (error != null) continuation.resumeWithException(error) else continuation.resume(value) } } suspend fun example6(input: Int): Int { var acc = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } await is an extension function (and not a language operator)
  30. Callback based method future-like object suspend function future<T> T await

    suspendCoroutine suspend function suspend function T T await extension function bridge to future-like objects suspendCourotine function bridge to callback based functions
  31. Callback based method future-like object suspend function regular function future<T>

    T T await suspendCoroutine suspend function suspend function T T Non-suspend Callers
  32. Callback based method future-like object suspend function regular function future<T>

    T T await suspendCoroutine suspend function suspend function T T Non-suspend Callers A suspend function compilation uses the Continuation Passing Style (CSP) style The underlying function signature is changed fun(input:Int, continuation: Continuation<Int>): Object
  33. Callback based method future-like object suspend function regular function future<T>

    T coroutine builders T future<T> await suspendCoroutine suspend function suspend function Use coroutine builders to call a suspend function i.e., create and start a coroutine, from a plain function (or from another suspend function) T T
  34. Callback based method future-like object suspend function regular function future<T>

    T coroutine builders T future<T> await suspendCoroutine suspend function suspend function coroutine builders to call a suspend function i.e., create and start a coroutine, from a plain function (or from another suspend function) T T A suspend function defines a state machine A coroutine is an instance of that state machine A coroutine builder creates and starts the instance
  35. Callback based method future-like object suspend function regular function future<T>

    T coroutine builders T await suspendCoroutine suspend function suspend function T T future<T> builder suspend return notes launch () -> Unit Job Similar to Task async () -> T Deferred<T> Similar to Task<T> future () -> T CompletableFuture<T> JDK8 runBlocking () -> T T Blocking On a plain function, a coroutine is represented by a future-like object
  36. A Spring controller @GetMapping("/example") fun getExample(): Int { log.info("controller handler

    starting") example(1) log.info("controller handler ending") } Error: Kotlin: suspend function ‘…’ should be called only from a coroutine or another suspend function
  37. A Spring controller @GetMapping("/example") fun getExample() = DeferredResult<Int>().apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } DeferredResult is a future-like type from Spring Complete the DeferredResult
  38. A Spring controller @GetMapping("/example") fun getExample() = DeferredResult<Int>().apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } Async I/O suspend suspend Spring Handler coroutine builder await
  39. A Spring controller @GetMapping("/example") fun getExample() = DeferredResult<Int>().apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } 6927 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 6954 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending 6963 [DefaultDispatcher-worker-2] INFO intro - before asyncOperationReturningFuture 7973 [DefaultDispatcher-worker-2] INFO intro - after asyncOperationReturningFuture (…) 8977 [DefaultDispatcher-worker-1] INFO intro - doing something with 86 8977 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 9978 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 9978 [DefaultDispatcher-worker-1] INFO intro - doing something with 129
  40. Can we do it better? @GetMapping("/example") fun getExample() = DeferredResult<Int>().apply

    { log.info("controller handler starting") GlobalScope.launch { try { setResult(example(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") }
  41. Sure! fun <T> asyncHandler(ms: Long? = null, block: suspend ()

    -> T) = DeferredResult<T>(ms).apply { GlobalScope.launch { try { setResult(block()) } catch (ex: Throwable) { setErrorResult(ex) } } } @GetMapping("/example") fun getExample() = asyncHandler { example(1) }
  42. With a timeout @GetMapping("/example") fun getExample() = DeferredResult<Int>(1000).apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } 19212 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 19240 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending (…) 20266 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] 21258 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 21258 [DefaultDispatcher-worker-1] INFO intro - doing something with 86 21258 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - doing something with 129 After 1000 ms the requests timeout and a 503 response is produced However the coroutine keeps running GlobalScope.launch
  43. With a timeout @GetMapping("/example") fun getExample() = DeferredResult<Int>(1000).apply { log.info("controller

    handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } 19212 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 19240 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending (…) 20266 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] 21258 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 21258 [DefaultDispatcher-worker-1] INFO intro - doing something with 86 21258 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - doing something with 129 After 1000 ms the requests timeout and a 503 response is produced However the coroutine keeps running GlobalScope.launch future coroutine coroutine builder Lifecycle?
  44. Coroutine Scope (...) suspend function coroutine builders (...) suspend coroutine

    builders future<T> scope Lifecycle management A coroutine is always created in the context of a scope • GlobalScope – similar to a deamon thread • Specific scope
  45. Parent-child coroutine relations coroutine coroutine coroutine coroutine coroutine Parent only

    completes when descendants complete Cancellation propagates to children Error propagates to parent Completion synchronization Error and cancellation propagation
  46. What is a scope? •Depends on the context • An

    Android Activity • An HTTP request •A thing with a lifecycle that we want to associate with the coroutines
  47. DeferredResultScope (HTTP message) class DeferredResultScope<T>(val timeout: Long) : DeferredResult<T>(timeout), CoroutineScope

    { private val job = Job() override val coroutineContext: CoroutineContext get() = job init { this.onTimeout { log.info("DeferredResult timed out") setErrorResult(AsyncRequestTimeoutException()) job.cancel() } } }
  48. With a timeout @GetMapping("/example") fun getExample() = DeferredResultScope<Int>(1000).apply { log.info("controller

    handler starting") this.launch { try { setResult(example7(1)) } catch (ex: Throwable) { log.info("catched '{}'", ex.message) setErrorResult(ex) } } log.info("controller handler ending") }
  49. With a timeout @GetMapping("/example") fun getExample() = DeferredResultScope<Int>(1000).apply { log.info("controller

    handler starting") this.launch { try { setResult(example7(1)) } catch (ex: Throwable) { log.info("catched '{}'", ex.message) setErrorResult(ex) } } log.info("controller handler ending") } 12127 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 12152 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending 12157 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 13168 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 13170 [DefaultDispatcher-worker-1] INFO intro - doing something with 43 13170 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 14029 [http-nio-8080-exec-2] INFO DemoApplication - DeferredResult timed out 14035 [DefaultDispatcher-worker-2] INFO DemoApplication - catched 'Job was cancelled' 14045 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]
  50. With a timeout @GetMapping("/example") fun getExample() = DeferredResultScope<Int>(1000).apply { log.info("controller

    handler starting") this.launch { try { setResult(example7(1)) } catch (ex: Throwable) { log.info("catched '{}'", ex.message) setErrorResult(ex) } } log.info("controller handler ending") } 12127 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 12152 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending 12157 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 13168 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 13170 [DefaultDispatcher-worker-1] INFO intro - doing something with 43 13170 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 14029 [http-nio-8080-exec-2] INFO DemoApplication - DeferredResult timed out 14035 [DefaultDispatcher-worker-2] INFO DemoApplication - catched 'Job was cancelled' 14045 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] Ø Any inner coroutines will also be cancelled Ø As long as scope is properly propagated Ø I.e. GlobalScope is not used
  51. How about the async-await model? •It does not solve this

    issue •C# – explicit usage of CancellationToken •JS – no defined cancellation model
  52. How about concurrency? data multiple threads concurrent access proper synchronization

    required coroutine data multiple threads no concurrent access no synchronization required
  53. How about concurrency? coroutine data multiple threads no concurrent access

    no synchronization required coroutine data multiple threads concurrent access Proper synchronization required coroutine single coroutine multiple coroutine
  54. Concurrency • Access by multiple coroutines to shared mutable data

    requires proper synchronization • Beware of the traditional synchronization primitives • e.g. Semaphore • They block threads! • Mutex interface with a suspend lock method • Channel interface with suspend send and suspend receive methods
  55. Or… • Confine shared mutable data access to a single

    coroutine • Use channels to communicate between coroutines • Actor-like programming model • Still experimental in Kotlin 1.3 • And remember • A coroutine doesn’t block threads • We can have thousands of coroutines
  56. var img1 = await loadImage(...) var img2 = await loadImage(...)

    doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential non-blocking Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov
  57. var img1 = await loadImage(...) var img2 = await loadImage(...)

    doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential non-blocking Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov Default behavior is sequential
  58. var img1 = await loadImage(...) var img2 = await loadImage(...)

    doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential non-blocking Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov Concurrency is explicit Default behavior is sequential
  59. var img1 = await loadImage(...) var img2 = await loadImage(...)

    doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential non-blocking Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov Concurrency is explicit Concurrency is structured Default behavior is sequential
  60. Resources • https://github.com/pmhsfelix/kotlin-coroutines • https://speakerdeck.com/pmhsfelix • KotlinConf 2017 and 2018

    videos by Roman Elizarov • Coroutines Guide • https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md • ktor – Web Applications framework • https://ktor.io
  61. Resources • https://github.com/pmhsfelix/kotlin-coroutines • https://speakerdeck.com/pmhsfelix • KotlinConf 2017 and 2018

    videos by Roman Elizarov • Coroutines Guide • https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md • ktor – Web Applications framework • https://ktor.io Thanks!