Identity and Access Management, .NET, JVM • Professor at the Lisbon Polytechnic Institute • Concurrent Programming and Web Application Development • Working in HTTP based systems for the last 7 years • Mostly asynchronous implementations • .NET: APM, Task-based Asynchronous Pattern, async-await • JVM: RxJava and Reactive Streams
by JetBrains • Excellent JVM ecosystem integration • Android official language, since 2017 • Spring Boot 2.0 support, also since 2017 • Version 1.3 in October 2018 • Coroutines are now a stable feature
I/O 1000 = 2 x 500 blocked threads on the outbound IO ~ 1Gb just for stack memory External Dependency 500 RPS 2 seconds latency Async I/O No thread blocked on I/O Synchronous Asynchronous
Sequential asynchronous flows • Asynchronous implementations of synchronous communication patterns • Will not make single things necessarily faster • Asynchronous I/O will be slower than synchronous I/O • Better • Throughput • Responsiveness • Resource usage – thread usage • Stability
var acc = input for (i in 0..2) { val result = blockingOperation(acc) acc = doSomethingWith(result) } return acc } A loop … … with a blocking operation inside it … sequential
for (i in 0..2) { val future = asyncOperationReturningFuture(acc) future.thenAccept { result -> acc = doSomethingWith(result) } // acc = doSomethingWith(result) } return acc } Breaks control flow!
structure of imperative code • Functions created by sequences of statements • With complex control logic (ifs, fors, try-catchs) • Futures and Reactive Streams (Observables) • Function composition and not statement sequences • E.g. map, flatMap, zip • Impose a different programming style
structure of imperative code • Functions created by sequences of statements • With complex control logic (ifs, fors, try-catchs) • Futures and Reactive Streams (Observables) • Function composition and not statement sequences • E.g. map, flatMap, zip • Impose a different programming style
Int): CompletableFuture<Int> { var acc = input for (i in 0..2) { val result = await asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc } await operator transforms a future<T> into a T Waits for the response Without blocking the host thread
Int): CompletableFuture<Int> { var acc = input for (i in 0..2) { val result = await asyncOperationReturningFuture(acc) acc = doSomethingWith(result) } return acc } future<T> return type Even if return is a T Internal detail not visible in the function type
return future<T> (e.g. Task<T> or Promise) • Needs await to access the value from the future<T> async function async function async function future<T> future<T> future<T> await await await
points for suspending and resuming execution at certain locations.” In https://en.wikipedia.org/wiki/Coroutine “The coroutine notion can greatly simplify the conception of a program when its modules do not communicate with each other synchronously” In M. Conway, “Design of a Separable Transition-Diagram Compiler”, Communications of the ACM, 1963.
= input for (i in 0..2) { val result = suspendFunctionReturningValue(acc) acc = doSomethingWith(result) } return acc } Returns an Int and not a future<Int> suspension point Part of the signature Not an implementation detail
= input for (i in 0..2) { val result = suspendFunctionReturningValue(acc) acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … }
= input for (i in 0..2) { val result = suspendFunctionReturningValue(acc) acc = doSomethingWith(result) } return acc } suspend fun caller1(): Int { … val res: Int = example(someInt) … } suspend fun caller2(): Int { … val res: Int = caller1(anotherInt) … } Look, no awaits!
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc }
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes suspendCoroutine
input for (i in 0..2) { val future = asyncOperationReturningFuture(acc) val result = suspendCoroutine<Int> { continuation -> future.thenAccept { result -> continuation.resume(result) } } acc = doSomethingWith(result) } return acc } suspendCoroutine future.thenAccept continuation.resume(result) … when the future completes suspendCoroutine val result = acc = doSomething(…)
suspendCoroutine { continuation -> whenComplete { value, error -> if (error != null) continuation.resumeWithException(error) else continuation.resume(value) } } suspend fun example6(input: Int): Int { var acc = input for (i in 0..2) { val result = asyncOperationReturningFuture(acc).await() acc = doSomethingWith(result) } return acc } await is an extension function (and not a language operator)
suspendCoroutine suspend function suspend function T T await extension function bridge to future-like objects suspendCourotine function bridge to callback based functions
T T await suspendCoroutine suspend function suspend function T T Non-suspend Callers A suspend function compilation uses the Continuation Passing Style (CSP) style The underlying function signature is changed fun(input:Int, continuation: Continuation<Int>): Object
T coroutine builders T future<T> await suspendCoroutine suspend function suspend function Use coroutine builders to call a suspend function i.e., create and start a coroutine, from a plain function (or from another suspend function) T T
T coroutine builders T future<T> await suspendCoroutine suspend function suspend function coroutine builders to call a suspend function i.e., create and start a coroutine, from a plain function (or from another suspend function) T T A suspend function defines a state machine A coroutine is an instance of that state machine A coroutine builder creates and starts the instance
T coroutine builders T await suspendCoroutine suspend function suspend function T T future<T> builder suspend return notes launch () -> Unit Job Similar to Task async () -> T Deferred<T> Similar to Task<T> future () -> T CompletableFuture<T> JDK8 runBlocking () -> T T Blocking On a plain function, a coroutine is represented by a future-like object
starting") example(1) log.info("controller handler ending") } Error: Kotlin: suspend function ‘…’ should be called only from a coroutine or another suspend function
handler starting") GlobalScope.launch { try { setResult(example(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } DeferredResult is a future-like type from Spring Complete the DeferredResult
handler starting") GlobalScope.launch { try { setResult(example7(1)) } catch (ex: Throwable) { setErrorResult(ex) } } log.info("controller handler ending") } 19212 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 19240 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending (…) 20266 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] 21258 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 21258 [DefaultDispatcher-worker-1] INFO intro - doing something with 86 21258 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 22264 [DefaultDispatcher-worker-1] INFO intro - doing something with 129 After 1000 ms the requests timeout and a 503 response is produced However the coroutine keeps running GlobalScope.launch
builders future<T> scope Lifecycle management A coroutine is always created in the context of a scope • GlobalScope – similar to a deamon thread • Specific scope
completes when descendants complete Cancellation propagates to children Error propagates to parent Completion synchronization Error and cancellation propagation
handler starting") this.launch { try { setResult(example7(1)) } catch (ex: Throwable) { log.info("catched '{}'", ex.message) setErrorResult(ex) } } log.info("controller handler ending") } 12127 [http-nio-8080-exec-1] INFO DemoApplication - controller handler starting 12152 [http-nio-8080-exec-1] INFO DemoApplication - controller handler ending 12157 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 13168 [DefaultDispatcher-worker-1] INFO intro - after asyncOperationReturningFuture 13170 [DefaultDispatcher-worker-1] INFO intro - doing something with 43 13170 [DefaultDispatcher-worker-1] INFO intro - before asyncOperationReturningFuture 14029 [http-nio-8080-exec-2] INFO DemoApplication - DeferredResult timed out 14035 [DefaultDispatcher-worker-2] INFO DemoApplication - catched 'Job was cancelled' 14045 [http-nio-8080-exec-2] WARN DefaultHandlerExceptionResolver - Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException] Ø Any inner coroutines will also be cancelled Ø As long as scope is properly propagated Ø I.e. GlobalScope is not used
requires proper synchronization • Beware of the traditional synchronization primitives • e.g. Semaphore • They block threads! • Mutex interface with a suspend lock method • Channel interface with suspend send and suspend receive methods
coroutine • Use channels to communicate between coroutines • Actor-like programming model • Still experimental in Kotlin 1.3 • And remember • A coroutine doesn’t block threads • We can have thousands of coroutines
doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential non-blocking Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov
doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential non-blocking Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov Default behavior is sequential
doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential non-blocking Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov Concurrency is explicit Default behavior is sequential
doSomethingWith(img1, img2) val img1 = loadImage(...) val img2 = loadImage(...) doSomethingWith(img1, img2) var f1 = loadImage(...) var f2 = loadImage(...) doSomethingWith(await f1, await f2) val f1 = async { loadImage(...) } val f2 = async { loadImage(...) } doSomethingWith(f1.await(), f2.await()) Kotlin C#/JS Sequential non-blocking Parallel Based on example present in https://qconsf.com/sf2017/system/files/presentation-slides/2017_qcon_sf_-_fresh_async_with_kotlin.pdf by Roman Elizarov Concurrency is explicit Concurrency is structured Default behavior is sequential