.NET, JVM • Professor at the Lisbon Polytechnic Institute • Concurrent Programming and Web Application Development • Working in HTTP based systems for the last 7 years • Mostly asynchronous implementations • .NET: APM, TAP, async-await • JVM: RxJava and Reactive Streams
by JetBrains • Excellent JVM ecosystem integration • Android official language, since 2017 • Spring Boot 2.0 support, also since 2017 • Version 1.3 in October 2018 • Coroutines are now a stable feature
are costly (e.g. 1Mb stack memory) … • … and some threads are special (e.g. UI thread) • Asynchronous I/O • Does not require a blocked thread per request • None. Really.
I/O 1000 = 2 x 500 blocked threads on the outbound IO ~ 1Gb just for stack memory External Dependency 500 RPS 2 seconds latency Async I/O No thread blocked on I/O Synchronous Asynchronous
Sequential asynchronous flows • Asynchronous implementations of synchronous communication patterns • Will not make single things necessarily faster • Asynchronous I/O will be slower than synchronous I/O • Better • Throughput • Responsiveness • Resource (i.e. thread) usage
var acc = input for (i in 0..2) { val result = blockingOperation(acc) acc = doSomethingWith(result) } return acc } A loop … … with a blocking operation inside it … sequential
(i in 0..2) { asyncOperationWithCallback(acc) { result -> acc = doSomethingWith(result) } // acc = doSomethingWith(result) } return acc } Callbacks Now doSomethingWith is not on the for anymore Breaks control flow!
for (i in 0..2) { val future = asyncOperationReturningFuture(acc) future.thenAccept { result -> acc = doSomethingWith(result) } // acc = doSomethingWith(result) } return acc } Same issue as with callbacks Breaks control flow!
structure of imperative code • Functions created by sequences of statements • With complex control logic (ifs, fors, try-catchs) • Futures and Reactive Streams (Observables) • Function composition and not statement sequences • E.g. map, flatMap, zip • Impose a different programming style
structure of imperative code • Functions created by sequences of statements • With complex control logic (ifs, fors, try-catchs) • Futures and Reactive Streams (Observables) • Function composition and not statement sequences • E.g. map, flatMap, zip • Impose a different programming style
Int): CompletableFuture<Int> { var acc = input for (i in 0..2) { val result = await asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc } await operator transforms a future<T> into a T Waits for the response Without blocking the host thread
Int): CompletableFuture<Int> { var acc = input for (i in 0..2) { val result = await asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc } future<T> return type Even if return is a T Internal detail not visible in the function type
points for suspending and resuming execution at certain locations.” In https://en.wikipedia.org/wiki/Coroutine “The coroutine notion can greatly simplify the conception of a program when its modules do not communicate with each other synchronously” In M. Conway, “Design of a Separable Transition-Diagram Compiler”, Communications of the ACM, 1963.
= input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } still returns an Int and not a future<Int> await is an extension function (and not a language operator) suspension point
= input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … }
= input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … } suspend fun caller2(): Int { … val res: Int = caller1(anotherInt) … } Look, no awaits!
= input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … } suspend fun caller2(): Int { … val res: Int = caller1(anotherInt) … } future-like object await suspend suspend future<T> T T T Direct calls No awaits or similar Sequential by default
var acc = input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc }
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes suspendCoroutine
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes suspendCoroutine val result = acc = doSomething(…)
suspendCoroutine { continuation -> whenComplete { value, error -> if (error != null) continuation.resumeWithException(error) else continuation.resume(value) } } suspend fun example6(input: Int): Int { var acc = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc }
to starts in one thread and end on another • Creates a suspension point with any callback-based API • Used inside await extension functions • Only usable on suspend functions
required • Non-future return type • Because the underlying function interface is changed • Continuous Passing Style interface • fun(input:Int, continuation: Continuation<Int>): Object • Cannot be called directly from non-suspend functions
= input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } (…) suspend fun caller2(): Int { … val res: Int = caller1(someInt) … } fun nonSuspendCaller(): Int { … val res: Int = caller2(someInt) … } Error: Kotlin: suspend function ‘caller1’ should be called only from a coroutine or another suspend function
regular function future<T> T T T Error: Kotlin: suspend function ‘…’ should be called only from a coroutine or another suspend function suspend is part of the function type • E.g. a suspend method cannot override a non-suspend method • E.g. a suspend method cannot be a Spring MVC controller handler
regular function future<T> T T coroutine builders T future<T> builder suspend return notes launch () -> Unit Job async () -> T Deferred<T> Derives from Job future () -> T CompletableFuture<T> JDK8 runBlocking () -> T T Blocking
of the state machine defined by suspend • Can be used from suspend and non suspend functions • Parallel semantics • The started coroutine will run in parallel with the creation point
starting") example7(1) log.info("controller handler ending") } Error: Kotlin: suspend function ‘…’ should be called only from a coroutine or another suspend function
handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } DeferredResult is a future-like type from Spring Complete the DeferredResult
to thread local storage • Available during coroutine lifetime - coroutineContext • Defined when building the coroutine – launch, async • Or explicitly changed using withContext • Context elements configure coroutine behavior • Namely thread dispatching
run? • The initial segment, after creation • The segments after the callbacks • The answer is in the thread dispatcher element public object Dispatchers { public val Default … public val IO … public val Main … @ExperimentalCoroutinesApi public val Unconfined … } // defined when creating launch(Dispatchers.IO) { … } // and can be changed withContext(Dispatchers.Main) { … }
handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } 19212 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 19240 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending (…) 20266 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] 21258 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 21258 [DefaultDispatcher-worker-1] INFO intro - doing something with 86 21258 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - doing something with 129 After 1000 ms the requests timeout and a 503 response is produced However the coroutine keeps running GlobalScope.launch
handler starting") this.launch { try { setResult(example7(1)) } catch (ex: Throwable) { log.info("catched '{}'", ex.message) setErrorResult(ex) } } log.info("controller handler ending") } 12127 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 12152 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending 12157 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 13168 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 13170 [DefaultDispatcher-worker-1] INFO intro - doing something with 43 13170 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 14029 [http-nio-8080-exec-2] INFO DemoApplication - DeferredResult timed out 14035 [DefaultDispatcher-worker-2] INFO DemoApplication - catched 'Job was cancelled' 14045 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] Ø Any inner coroutines will also be cancelled Ø As long as scope is properly propagated Ø I.e. GlobalScope is not used
= await loadImage(...) doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential (yet asynchronous) Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov
= await loadImage(...) doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential (yet asynchronous) Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov Concurrency is explicit