Upgrade to PRO for Only $50/Year—Limited-Time Offer! 🔥

Spring-Flavored Kotlin Coroutines

Spring-Flavored Kotlin Coroutines

Kotlin has built-in support for asynchronous programming with Kotlin Coroutines that are designed to allow for simple and easy-to-understand code. We'll witness that reduction in complexity that coroutines bring and dissect what kind of magic is working behind the scenes to make it possible, how it all integrates with Spring and how can you integrate coroutines with any kind of asynchronous code you write.

Roman Elizarov

October 08, 2019
Tweet

More Decks by Roman Elizarov

Other Decks in Programming

Transcript

  1. Speaker: Roman Elizarov Professional developer since 2000 Previously developed high-perf

    trading software @ Devexperts Teach concurrent & distributed programming @ St. Petersburg ITMO University Chief judge @ Northern Eurasia Contest / ICPC Now team lead in Kotlin Libraries @ JetBrains elizarov @ relizarov
  2. ET 1 ET 2 ET N … DB Old-school client-server

    monolith Clients Executor Threads
  3. ET 1 ET 2 ET N … DB Incoming request

    Clients Executor Threads
  4. ET 1 ET 2 ET N … DB Blocks tread

    Clients Executor Threads
  5. ET 1 ET 2 ET N … DB Sizing threads

    – easy Clients N = number of DB connections Executor Threads
  6. ET 1 ET 2 ET N … DB Old-school client-server

    monolith Clients Executor Threads
  7. ET 1 ET 2 ET N … DB Now with

    Services Service Clients Executor Threads
  8. ET 1 ET 2 ET N … Services everywhere …

    Service K Service 1 Service 2 Clients Executor Threads
  9. Complex business logic fun placeOrder(order: Order): Response { val account

    = accountService.loadAccout(order.accountId) … }
  10. Complex business logic fun placeOrder(order: Order): Response { val account

    = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) … }
  11. Complex business logic fun placeOrder(order: Order): Response { val account

    = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } … }
  12. Complex business logic fun placeOrder(order: Order): Response { val account

    = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } … }
  13. Complex business logic fun placeOrder(order: Order): Response { val account

    = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } return validateOrder(order, margin) }
  14. Complex business logic fun placeOrder(order: Order): Response { val account

    = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } return validateOrder(order, margin) }
  15. What if a service is slow? fun placeOrder(order: Order): Response

    { val account = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } return validateOrder(order, margin) }
  16. Slow service @Service class MarginService { private val rest =

    RestTemplate() fun loadMargin(account: Account): Margin = rest.getForObject( "http://localhost:9090/margin/${account.id}", Margin::class.java)!! }
  17. Slow service @Service class MarginService { private val rest =

    RestTemplate() fun loadMargin(account: Account): Margin = rest.getForObject( "http://localhost:9090/margin/${account.id}", Margin::class.java)!! }
  18. ET 2 ET N … Clients Blocks threads … Service

    K Service 1 Service 2 ET 1 Executor Threads
  19. ET 2 ET N … Clients Blocks threads … Service

    K Service 1 Service 2 ET 1 Executor Threads
  20. ET 2 ET N … Clients Blocks threads … Service

    K Service 1 Service 2 ET 1 Executor Threads
  21. ET 2 ET N … Clients Instead of blocking… …

    Service K Service 1 Service 2 ET 1 Executor Threads
  22. ET 2 ET N … Release the thread … Service

    K Service 1 Service 2 ET 1 Clients Executor Threads
  23. Clients ET 2 ET N … Resume operation later …

    Service K Service 1 Service 2 ET 1 Executor Threads
  24. The logic fun placeOrder(order: Order): Response { val account =

    accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } return validateOrder(order, margin) }
  25. Going reactive fun placeOrder(order: Order): Mono<Response> = accountService.loadAccount(order.accountId) .flatMap {

    account -> if (account.isOptionsAccount) { marginService.loadMargin(account) } else { Mono.just(defaultMargin) } }
  26. Going reactive fun placeOrder(order: Order): Mono<Response> = accountService.loadAccount(order.accountId) .flatMap {

    account -> if (account.isOptionsAccount) { marginService.loadMargin(account) } else { Mono.just(defaultMargin) } }
  27. Going reactive fun placeOrder(order: Order): Mono<Response> = accountService.loadAccount(order.accountId) .flatMap {

    account -> if (account.isOptionsAccount) { marginService.loadMargin(account) } else { Mono.just(defaultMargin) } } .map { margin -> … }
  28. Going reactive fun placeOrder(order: Order): Mono<Response> = accountService.loadAccount(order.accountId) .flatMap {

    account -> if (account.isOptionsAccount) { marginService.loadMargin(account) } else { Mono.just(defaultMargin) } } .map { margin -> validateOrder(order, margin) }
  29. Reactive Service @Service class MarginService { private val client =

    WebClient.create("http://localhost:9090/") fun loadMargin(account: Account): Mono<Margin> = client .get() .uri("margin/${account.id}") .retrieve() .bodyToMono(Margin::class.java) }
  30. Reactive Service @Service class MarginService { private val client =

    WebClient.create("http://localhost:9090/") fun loadMargin(account: Account): Mono<Margin> = client .get() .uri("margin/${account.id}") .retrieve() .bodyToMono(Margin::class.java) }
  31. Choice 1: Direct code, blocking fun placeOrder(order: Order): Response {

    val account = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } return validateOrder(order, margin) }
  32. Choice 2: Complex code, non-blocking fun placeOrder(order: Order): Mono<Response> =

    accountService.loadAccount(order.accountId) .flatMap { account -> if (account.isOptionsAccount) { marginService.loadMargin(account) } else { Mono.just(defaultMargin) } } .map { margin -> validateOrder(order, margin) }
  33. Direct code: Blocking fun placeOrder(order: Order): Response { val account

    = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } return validateOrder(order, margin) }
  34. Direct code: Coroutines suspend fun placeOrder(order: Order): Response { val

    account = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } return validateOrder(order, margin) } Write regular code! Call suspending funs
  35. Coroutines Service @Service class MarginService { private val client =

    WebClient.create("http://localhost:9090/") suspend fun loadMargin(account: Account): Margin = client .get() .uri("margin/${account.id}") .retrieve() .awaitBody() }
  36. Coroutines Service @Service class MarginService { private val client =

    WebClient.create("http://localhost:9090/") suspend fun loadMargin(account: Account): Margin = client .get() .uri("margin/${account.id}") .retrieve() .awaitBody() }
  37. Suspend behind the scenes suspend fun loadMargin(account: Account): Margin fun

    loadMargin(account: Account, cont: Continuation<Margin>) But why callback and not future/mono?
  38. Suspend is efficient suspend fun placeOrder(order: Order): Response { val

    account = accountService.loadAccout(order.accountId) val margin = if (account.isOptionsAccount) { marginService.loadMargin(account) } else { defaultMargin } return validateOrder(order, margin) } One object allocated
  39. Reactive fun placeOrder(order: Order): Mono<Response> = accountService.loadAccount(order.accountId) .flatMap { account

    -> if (account.isOptionsAccount) { marginService.loadMargin(account) } else { Mono.just(defaultMargin) } } .map { margin -> validateOrder(order, margin) } Lambda allocated* Mono allocated Lambda allocated Mono allocated
  40. Let’s go deeper fun placeOrder(params: Params): Mono<Response> { // check

    pre-conditions return actuallyPlaceOrder(order) } fun actuallyPlaceOrder(order: Order): Mono<Response>
  41. Let’s go deeper (with coroutines) suspend fun placeOrder(params: Params): Response

    { // check pre-conditions return actuallyPlaceOrder(order) } suspend fun actuallyPlaceOrder(params: Params): Response Tail call optimization Tail call
  42. ET 2 ET N … Clients Thread pools ET 1

    ST 2 ST M1 … S1 1 N = number of CPU cores M1 = depends Service 1 Threads Executor Threads
  43. IO-bound withContext suspend fun loadAccount(order: Order): Account = withContext(dispatcher) {

    // some blocking code here.... } val dispatcher = Executors.newFixedThreadPool(M2).asCoroutineDispatcher()
  44. CPU-bound code suspend fun validateOrder(order: Order, margin: Margin): Response =

    withContext(compute) { // perform CPU-consuming computation } val compute = Executors.newFixedThreadPool(M3).asCoroutineDispatcher()
  45. ET 2 ET N … Clients Fine-grained control and encapsulation

    ET 1 S1 1 ST M1 Service 2 Threads S1 1 ST M2 Service 3 Threads S1 1 ST M3 Async IO-bound CPU- bound Never blocked Service 1 Threads Executor Threads
  46. withTimeout propagation suspend fun placeOrder(order: Order): Response = withTimeout(1000) {

    // code before loadMargin(account) // code after } suspend fun loadMargin(account: Account): Margin = suspendCoroutine { cont -> // install callback & use cont to resume }
  47. withTimeout propagation suspend fun placeOrder(order: Order): Response = withTimeout(1000) {

    // code before loadMargin(account) // code after } suspend fun loadMargin(account: Account): Margin = suspendCancellableCoroutine { cont -> // install callback & use cont to resume }
  48. withTimeout propagation suspend fun placeOrder(order: Order): Response = withTimeout(1000) {

    // code before loadMargin(account) // code after } suspend fun loadMargin(account: Account): Margin = suspendCancellableCoroutine { cont -> // install callback & use cont to resume cont.invokeOnCancellation { … } }
  49. Example fun placeOrder(order: Order): Response { val account = accountService.loadAccount(order)

    val margin = marginService.loadMargin(order) return validateOrder(order, account, margin) }
  50. Example fun placeOrder(order: Order): Response { val account = accountService.loadAccount(order)

    val margin = marginService.loadMargin(order) return validateOrder(order, account, margin) } No data dependencies
  51. Concurrency with async (futures) fun placeOrder(order: Order): Response { val

    account = accountService.loadAccountAsync(order) val margin = marginService.loadMarginAsync(order) return validateOrder(order, account.await(), margin.await()) }
  52. Concurrency with async (futures) fun placeOrder(order: Order): Response { val

    account = accountService.loadAccountAsync(order) val margin = marginService.loadMarginAsync(order) return validateOrder(order, account.await(), margin.await()) }
  53. Concurrency with async (futures) fun placeOrder(order: Order): Response { val

    account = accountService.loadAccountAsync(order) val margin = marginService.loadMarginAsync(order) return validateOrder(order, account.await(), margin.await()) } Fails?
  54. Concurrency with async (futures) fun placeOrder(order: Order): Response { val

    account = accountService.loadAccountAsync(order) val margin = marginService.loadMarginAsync(order) return validateOrder(order, account.await(), margin.await()) } Fails? Leaks !
  55. Concurrency with coroutines suspend fun placeOrder(order: Order): Response = coroutineScope

    { val account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } validateOrder(order, account.await(), margin.await()) }
  56. Concurrency with coroutines suspend fun placeOrder(order: Order): Response = coroutineScope

    { val account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } validateOrder(order, account.await(), margin.await()) }
  57. Concurrency with coroutines suspend fun placeOrder(order: Order): Response = coroutineScope

    { val account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } validateOrder(order, account.await(), margin.await()) }
  58. Concurrency with coroutines suspend fun placeOrder(order: Order): Response = coroutineScope

    { val account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } validateOrder(order, account.await(), margin.await()) } Fails?
  59. Concurrency with coroutines suspend fun placeOrder(order: Order): Response = coroutineScope

    { val account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } validateOrder(order, account.await(), margin.await()) } Fails? Cancels
  60. Concurrency with coroutines suspend fun placeOrder(order: Order): Response = coroutineScope

    { val account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } validateOrder(order, account.await(), margin.await()) } Fails? Cancels Cancels
  61. Concurrency with coroutines suspend fun placeOrder(order: Order): Response = coroutineScope

    { val account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } validateOrder(order, account.await(), margin.await()) } Waits for completion of all children
  62. Without coroutine scope? suspend fun placeOrder(order: Order): Response { val

    account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } return validateOrder(order, account.await(), margin.await()) }
  63. Without coroutine scope? suspend fun placeOrder(order: Order): Response { val

    account = async { accountService.loadAccount(order) } val margin = async { marginService.loadMargin(order) } return validateOrder(order, account.await(), margin.await()) } ERROR: Unresolved reference.
  64. Extensions of CoroutineScope fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext,

    start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
  65. Types as documentation fun foo(params: Params): Response suspend fun foo(params:

    Params): Response fun CoroutineScope.foo(params: Params): Response Fast, local Remote, or slow Side effect - bg
  66. Types are enforced fun foo(params: Params): Response suspend fun foo(params:

    Params): Response fun CoroutineScope.foo(params: Params): Response Not allowed But must provide scope explicitly Using coroutineScope { … } Fast, local Remote, or slow Side effect - bg
  67. Green threads / Fibers ET 2 ET N … ET

    1 F 2 F M … Fibers F 1 ~ Coroutines Hidden from developer Executor Threads
  68. ET 2 ET N … Clients Threads ET 1 S1

    1 ST M1 S1 1 ST M2 S1 1 ST M3 Service 2 Threads Service 3 Threads Service 1 Threads Executor Threads
  69. ET 2 ET N … Clients Solution – shared thread

    pool ET 1 ET N+1 Executor Threads
  70. ET 2 ET N … Clients Solution – shared thread

    pool ET 1 ET N+1 ET N+2 Executor Threads
  71. ET 2 ET N … Clients Solution – shared thread

    pool ET 1 ET N+1 … ET M ET N+2 ET N+M Executor Threads
  72. withContext for IO suspend fun loadAccount(order: Order): Account = withContext(dispatcher)

    { // some blocking code here.... } val dispatcher = Executors.newFixedThreadPool(M2).asCoroutineDispatcher()
  73. withContext for Dispatсhers.IO suspend fun loadAccount(order: Order): Account = withContext(Dispatchers.IO)

    { // some blocking code here.... } No thread switch from Dispatchers.Default pool
  74. ET 2 ET N … Clients Solution – shared thread

    pool ET 1 Dispatchers.Default Executor Threads
  75. ET 2 ET N … Clients Solution – shared thread

    pool ET 1 ET N+1 … ET M ET N+2 ET N+M Dispatchers.Default Dispatchers.IO Executor Threads
  76. Returning many responses suspend fun foo(params: Params): Response One response

    suspend fun foo(params: Params): List<Response> Many responses
  77. Returning many responses suspend fun foo(params: Params): Response One response

    suspend fun foo(params: Params): List<Response> Many responses fun foo(params: Params): Flow<Response> Many responses async
  78. fun foo(): Flow<Int> = flow { for (i in 1..10)

    { emit(i) delay(100) } } suspend fun main() { foo().collect { x -> println(x) } }
  79. fun foo(): Flow<Int> = flow { for (i in 1..10)

    { emit(i) delay(100) } } suspend fun main() { foo() } Flow is cold: describes the data, does not run it until collected
  80. Flow operators fun foo(): Flow<Int> = flow { for (i

    in 1..10) { emit(i) delay(100) } } suspend fun main() { foo() .map { it * it } .toList() }
  81. A A’ mapper fun map(mapper: (T) -> R): Flux<R> fun

    flatMap(mapper: (T) -> Publisher<R>): Flux<R> Synchronous Asynchronous Flux<T>
  82. A A’ mapper fun map(mapper: (T) -> R): Flux<R> fun

    flatMap(mapper: (T) -> Publisher<R>): Flux<R> fun filter(predicate: (T) -> Boolean): Flux<T> A A predicate Synchronous Asynchronous Synchronous Asynchronous Flux<T> fun filterWhen(predicate: (T) -> Publisher<Boolean>): Flux<T>
  83. A A’ transform fun map(transform: suspend (T) -> R): Flow<R>

    Flow<T> fun filter(predicate: suspend (T) -> Boolean): Flow<T> A predicate A
  84. Operator avoidance startWith(value) onStart { emit(value) } delaySubscription(time) onStart {

    delay(time) } startWith(flow) onStart { emitAll(flow) } delayElements(time) onEach { delay(time) } onErrorReturn(value) catch { emit(value) } onErrorResume(flow) catch { emitAll(flow) } generate(…) flow { … } Composable
  85. Spring ❤ Flow @GetMapping("/sse/{n}", produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) fun greetings(@PathVariable n:

    String): Flow<Greeting> = flow { while(true) { emit(Greeting("Hello from coroutines $n @ ${Instant.now()}")) delay(1000) } }