Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Coroutines In Action

Coroutines In Action

Slides from my talk at Kotlin Night Kyiv, 2018-03-24
Video: https://www.youtube.com/watch?v=Jthnqb-eTII

Channels, Actors, RX streams, Generators and custom coroutines

Avatar for Dmytro Zaitsev

Dmytro Zaitsev

March 24, 2018
Tweet

More Decks by Dmytro Zaitsev

Other Decks in Programming

Transcript

  1. AGENDA ▸ Channels ▸ Channels vs Rx streams ▸ Actors

    ▸ Launch ▸ Generators ▸ Write your custom coroutines
  2. CHANNEL ▸ The channel concept is similar to a blocking

    queue ▸ Has suspending operations instead of blocking ones ▸ Can be closed ▸ Send / receive messages operations can cause a suspension. ▸ The send operation can suspend when the channel is full ▸ The receive operation can suspend when the channel is empty
  3. runBlocking { val channel = Channel<Int>() launch { for (x

    in 0..9) channel.send(x) channel.close() } for (y in channel) println(y) println("Done!") }
  4. runBlocking { val channel = Channel<Int>() launch { for (x

    in 0..9) channel.send(x) channel.close() } for (y in channel) println(y) println("Done!") }
  5. runBlocking { val channel = Channel<Int>() launch { for (x

    in 0..9) channel.send(x) channel.close() } for (y in channel) println(y) println("Done!") }
  6. SendChannel<E> suspend fun send(E) fun offer(E): Boolean fun close(Throwable) ReceiveChannel<E>

    suspend fun receive(): E fun poll(): E? fun close(Throwable) Channel<E>
  7. SEND CHANNEL ▸ send(element: E)
 Adds element into to this

    channel, suspending the caller while this channel is full ▸ offer(element: E): Boolean
 adds an element to channel if possible, or return false if the channel is full ▸ close(cause: Throwable? = null)
 closes the channel
  8. RECEIVE CHANNEL ▸ receive(): E
 retrieves and removes the element

    from this channel suspending the caller while this channel is empty ▸ poll(): E?
 returns an element if available, or null if channel is empty
  9. RENDEZVOUS CHANNEL (WITH NO BUFFER) ▸ every send invocation is

    suspended until someone else invokes receive (unless there is some receive operation already suspended) ▸ every receive invocation is suspended until someone else invokes send (unless there is some send operation already suspended)
  10. ARRAY CHANNEL (WITH FIXED SIZE BUFFER) ▸ send is suspended

    only if the buffer is full ▸ receive is suspended only if the buffer is empty
  11. LINKEDLIST CHANNEL (WITH UNLIMITED CAPACITY) ▸ send is never suspended

    ▸ receive is suspended when buffer is empty ………….
  12. CONFLATED CHANNEL (BUFFERS AT MOST ONE ELEMENT) ▸ conflates all

    subsequent send invocations ▸ send is never suspended, but new element overrides any old element waiting to be received ▸ receive is suspended when buffer is empty
  13. SIMILAR CONCEPT ▸ Reactive streams (Publisher), RxJava (Observable), RxJava 2

    (Flowable) ▸ Asynchronous stream ▸ Backpressure support ▸ Channel always represents a hot stream
  14. fun main(args: Array<String>) = runBlocking { val FROM = 0

    val TO = 10 val channel = produce(coroutineContext) { print ("Start: ") (FROM until TO).forEach { send(it) } println("End.") } val flowable = Flowable.range(FROM, TO) .doOnSubscribe { print("Start: ") } .doFinally { println("End.") } println("~~~ Round ONE ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ Round TWO ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ VERSUS ~~~") }
  15. fun main(args: Array<String>) = runBlocking { val FROM = 0

    val TO = 10 val channel = produce(coroutineContext) { print ("Start: ") (FROM until TO).forEach { send(it) } println("End.") } val flowable = Flowable.range(FROM, TO) .doOnSubscribe { print("Start: ") } .doFinally { println("End.") } println("~~~ Round ONE ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ Round TWO ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ VERSUS ~~~") }
  16. fun main(args: Array<String>) = runBlocking { val FROM = 0

    val TO = 10 val channel = produce(coroutineContext) { print ("Start: ") (FROM until TO).forEach { send(it) } println("End.") } val flowable = Flowable.range(FROM, TO) .doOnSubscribe { print("Start: ") } .doFinally { println("End.") } println("~~~ Round ONE ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ Round TWO ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ VERSUS ~~~") }
  17. fun main(args: Array<String>) = runBlocking { val FROM = 0

    val TO = 10 val channel = produce(coroutineContext) { print ("Start: ") (FROM until TO).forEach { send(it) } println("End.") } val flowable = Flowable.range(FROM, TO) .doOnSubscribe { print("Start: ") } .doFinally { println("End.") } println("~~~ Round ONE ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ Round TWO ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ VERSUS ~~~") }
  18. ~~~ Round ONE ~~~ ~~~ MC Channel ~~~ Start: 0

    1 2 3 4 5 6 7 8 9 End. ~~~ MC Flowable ~~~ Start: 0 1 2 3 4 5 6 7 8 9 End. ~~~ Round TWO ~~~ ~~~ MC Channel ~~~ ~~~ MC Flowable ~~~ Start: 0 1 2 3 4 5 6 7 8 9 End. ~~~ VERSUS ~~~ ???
  19. CONCLUSION ▸ Channel is a stream of elements itself ▸

    Rx defines a recipe on how the stream of elements is produced ▸ Each subscriber may receive the same or a different stream of elements
  20. ACTORS BASICS ▸ A coroutine with attached channel to send

    or receive some elements to/from other parts of the system. ▸ Does not share any mutable state ▸ Can send messages to other actors ▸ Can create new actors ▸ Can designate the behavior to be used for the next message it receives
  21. PRODUCER-CONSUMER ▸ • Producer behavior: a coroutine only sends elements

    to the channel ▸ • Consumer behavior or rather an “actor”: a coroutine only receives elements from a channel
  22. ACTORS AND THREAD-SAFETY sealed class Expr { class Inc :

    Expr() class Dec : Expr() } fun counter(start: Int) = actor<Expr>(CommonPool) { var value = start for (expr in channel) { when (expr) { is Inc -> value++ is Dec -> value-- } } } WHY THIS IS SAFE???
  23. HOW IS THAT ACHIEVED? ▸ Execution inside a coroutine is

    sequential ▸ All the operations in the coroutine are totally ordered with “happens before”
  24. THERE IS NO CONCURRENCY IN A COROUTINE ▸ Execution in

    between suspension points follows normal JMM rules that state that operations on a single thread establish happens-before relation. ▸ Execution around suspension points has happens-before relation, because it is established by the synchronisation primitives that are used by implementations of suspending functions.
  25. // Custom Actor. Option #1 class MyActor { // your

    private state here suspend fun onReceive(msg: MyMsg) { // ... your code here ... } } fun myActorJob(): SendChannel<MyMsg> = actor(CommonPool) { with(MyActor()) { for (msg in channel) onReceive(msg) } }
  26. // Custom Actor. Option #2. Specific implementation class MyActor(scope: ActorScope<MyMsg>):

    Actor<MyMsg>(scope) { // your private state here override suspend fun onReceive(msg: MyMsg) { // ... your code here ... } }
  27. // Easy-to-read invocation fun <T> actorOf( context: CoroutineContext, block: (ActorScope<T>)

    -> Actor<T> ): SendChannel<T> = actor(context) { val instance = block(this@actor) for (msg in channel) instance.onReceive(msg) } actorOf(CommonPool, ::MyActor)
  28. @Dao public interface UserDao { @Insert void insert(User user); @Query("SELECT

    * FROM user WHERE name = :name") List<User> findByName(String name); }
  29. override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) insertButton.onClickAsync { dao.insert(randomUser())

    } selectButton.onClickPrintResult { dao.findByName(“Dmytro Zaitsev”) } // … } private fun View.onClickAsync(action: suspend () -> Unit) = setOnClickListener { launch(IO) { action() } } private fun View.onClickPrintResult(action: suspend () -> List<Any>) = setOnClickListener { launch(UI) { userAdapter.items = withContext(IO) { action() } userAdapter.notifyDataSetChanged() } }
  30. override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) insertButton.onClickAsync { dao.insert(randomUser())

    } selectButton.onClickPrintResult { dao.findByName(“Dmytro Zaitsev”) } // … } private fun View.onClickAsync(action: suspend () -> Unit) = setOnClickListener { launch(IO) { action() } } private fun View.onClickPrintResult(action: suspend () -> List<Any>) = setOnClickListener { launch(UI) { userAdapter.items = withContext(IO) { action() } userAdapter.notifyDataSetChanged() } }
  31. override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) insertButton.onClickAsync { dao.insert(randomUser())

    } selectButton.onClickPrintResult { dao.findByName(“Dmytro Zaitsev”) } // … } private fun View.onClickAsync(action: suspend () -> Unit) = setOnClickListener { launch(IO) { action() } } private fun View.onClickPrintResult(action: suspend () -> List<Any>) = setOnClickListener { launch(UI) { userAdapter.items = withContext(IO) { action() } userAdapter.notifyDataSetChanged() } }
  32. override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) insertButton.onClickAsync { dao.insert(randomUser())

    } selectButton.onClickPrintResult { dao.findByName(“Dmytro Zaitsev”) } // … } private fun View.onClickAsync(action: suspend () -> Unit) = setOnClickListener { launch(IO) { action() } } private fun View.onClickPrintResult(action: suspend () -> List<Any>) = setOnClickListener { launch(UI) { userAdapter.items = withContext(IO) { action() } userAdapter.notifyDataSetChanged() } }
  33. suspend fun <T> withContext( // You might remember it as

    `run` context: CoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend () -> T ): T
  34. internal fun printLogOldSchool(textView: TextView) { val mainHandler = object :

    Handler(Looper.getMainLooper()) { override fun handleMessage(msg: Message) { when (msg.what) { LOG_MSG -> msg.obj?.let { textView.append("\n${it as String}") } } } } execService.submit { Runtime.getRuntime() .exec(COMMAND) .inputStream .bufferedReader(Charsets.UTF_8).forEachLine { val message = Message.obtain(mainHandler, LOG_MSG, it) mainHandler.sendMessage(message) } } }
  35. internal fun printLogOldSchool(textView: TextView) { val mainHandler = object :

    Handler(Looper.getMainLooper()) { override fun handleMessage(msg: Message) { when (msg.what) { LOG_MSG -> msg.obj?.let { textView.append("\n${it as String}") } } } } execService.submit { Runtime.getRuntime() .exec(COMMAND) .inputStream .bufferedReader(Charsets.UTF_8).forEachLine { val message = Message.obtain(mainHandler, LOG_MSG, it) mainHandler.sendMessage(message) } } }
  36. internal fun printLogOldSchool(textView: TextView) { val mainHandler = object :

    Handler(Looper.getMainLooper()) { override fun handleMessage(msg: Message) { when (msg.what) { LOG_MSG -> msg.obj?.let { textView.append("\n${it as String}") } } } } execService.submit { Runtime.getRuntime() .exec(COMMAND) .inputStream .bufferedReader(Charsets.UTF_8).forEachLine { val message = Message.obtain(mainHandler, LOG_MSG, it) mainHandler.sendMessage(message) } } }
  37. internal fun printLogOldSchool(textView: TextView) { val mainHandler = object :

    Handler(Looper.getMainLooper()) { override fun handleMessage(msg: Message) { when (msg.what) { LOG_MSG -> msg.obj?.let { textView.append("\n${it as String}") } } } } execService.submit { Runtime.getRuntime() .exec(COMMAND) .inputStream .bufferedReader(Charsets.UTF_8).forEachLine { val message = Message.obtain(mainHandler, LOG_MSG, it) mainHandler.sendMessage(message) } } }
  38. internal fun printLogCoroutines(textView: TextView) { launch(IO) { Runtime.getRuntime() .exec(COMMAND) .inputStream

    .bufferedReader(Charsets.UTF_8).forEachLine { launch(UI) { textView.append(“\n$it") } } } }
  39. internal fun printLogCoroutines(textView: TextView) { launch(IO) { Runtime.getRuntime() .exec(COMMAND) .inputStream

    .bufferedReader(Charsets.UTF_8).forEachLine { launch(UI) { textView.append(“\n$it") } } } }
  40. fun launch(context: CoroutineContext, block: suspend () -> Unit) = block.startCoroutine(object

    : Continuation<Unit> { override val context = context override fun resume(value: Unit) {} override fun resumeWithException(exception: Throwable) = Thread.currentThread().run { uncaughtExceptionHandler.uncaughtException(this, exception) } })
  41. fun launch(context: CoroutineContext, block: suspend () -> Unit) = block.startCoroutine(object

    : Continuation<Unit> { override val context = context override fun resume(value: Unit) {} override fun resumeWithException(exception: Throwable) = Thread.currentThread().run { uncaughtExceptionHandler.uncaughtException(this, exception) } })
  42. fun launch(context: CoroutineContext, block: suspend () -> Unit) = block.startCoroutine(object

    : Continuation<Unit> { override val context = context override fun resume(value: Unit) {} override fun resumeWithException(exception: Throwable) = Thread.currentThread().run { uncaughtExceptionHandler.uncaughtException(this, exception) } })
  43. fun launch(context: CoroutineContext, block: suspend () -> Unit) = block.startCoroutine(object

    : Continuation<Unit> { override val context = context override fun resume(value: Unit) {} override fun resumeWithException(exception: Throwable) = Thread.currentThread().run { uncaughtExceptionHandler.uncaughtException(this, exception) } })
  44. fun launch(context: CoroutineContext, block: suspend () -> Unit) = block.startCoroutine(object

    : Continuation<Unit> { override val context = context override fun resume(value: Unit) {} override fun resumeWithException(exception: Throwable) = Thread.currentThread().run { uncaughtExceptionHandler.uncaughtException(this, exception) } })
  45. GENERATOR ▸ special routine that can be used to control

    the iteration behaviour of a loop ▸ yields the values one at a time ▸ looks like a function but behaves like an iterator
  46. val sequence = buildSequence { var start = 0 while

    (start < 5) { yield(start++) } } sequence.take(5).toList()
  47. interface Generator<out T> { fun next(): T? } @RestrictsSuspension interface

    GeneratorBuilder<in T> { suspend fun yield(value: T) }
  48. internal class GeneratorCoroutine<T>(override val context: CoroutineContext = EmptyCoroutineContext) : Generator<T>,

    GeneratorBuilder<T>, Continuation<Unit> { lateinit var nextStep: () -> Unit private var lastValue: T? = null private var throwable: Throwable? = null override fun next(): T? { nextStep() throwable?.let { throw it } return lastValue } override suspend fun yield(value: T) = suspendCoroutineOrReturn<Unit> { cont -> lastValue = value nextStep = { cont.resume(Unit) } COROUTINE_SUSPENDED } override fun resume(value: Unit) { lastValue = null } override fun resumeWithException(exception: Throwable) { this.throwable = exception } }
  49. internal class GeneratorCoroutine<T>(override val context: CoroutineContext = EmptyCoroutineContext) : Generator<T>,

    GeneratorBuilder<T>, Continuation<Unit> { lateinit var nextStep: () -> Unit private var lastValue: T? = null private var throwable: Throwable? = null override fun next(): T? { nextStep() throwable?.let { throw it } return lastValue } override suspend fun yield(value: T) = suspendCoroutineOrReturn<Unit> { cont -> lastValue = value nextStep = { cont.resume(Unit) } COROUTINE_SUSPENDED } override fun resume(value: Unit) { lastValue = null } override fun resumeWithException(exception: Throwable) { this.throwable = exception } }
  50. internal class GeneratorCoroutine<T>(override val context: CoroutineContext = EmptyCoroutineContext) : Generator<T>,

    GeneratorBuilder<T>, Continuation<Unit> { lateinit var nextStep: () -> Unit private var lastValue: T? = null private var throwable: Throwable? = null override fun next(): T? { nextStep() throwable?.let { throw it } return lastValue } override suspend fun yield(value: T) = suspendCoroutineOrReturn<Unit> { cont -> lastValue = value nextStep = { cont.resume(Unit) } COROUTINE_SUSPENDED } override fun resume(value: Unit) { lastValue = null } override fun resumeWithException(exception: Throwable) { this.throwable = exception } }
  51. internal class GeneratorCoroutine<T>(override val context: CoroutineContext = EmptyCoroutineContext) : Generator<T>,

    GeneratorBuilder<T>, Continuation<Unit> { lateinit var nextStep: () -> Unit private var lastValue: T? = null private var throwable: Throwable? = null override fun next(): T? { nextStep() throwable?.let { throw it } return lastValue } override suspend fun yield(value: T) = suspendCoroutineOrReturn<Unit> { cont -> lastValue = value nextStep = { cont.resume(Unit) } COROUTINE_SUSPENDED } override fun resume(value: Unit) { lastValue = null } override fun resumeWithException(exception: Throwable) { this.throwable = exception } }
  52. internal class GeneratorCoroutine<T>(override val context: CoroutineContext = EmptyCoroutineContext) : Generator<T>,

    GeneratorBuilder<T>, Continuation<Unit> { lateinit var nextStep: () -> Unit private var lastValue: T? = null private var throwable: Throwable? = null override fun next(): T? { nextStep() throwable?.let { throw it } return lastValue } override suspend fun yield(value: T) = suspendCoroutineOrReturn<Unit> { cont -> lastValue = value nextStep = { cont.resume(Unit) } COROUTINE_SUSPENDED } override fun resume(value: Unit) { lastValue = null } override fun resumeWithException(exception: Throwable) { this.throwable = exception } }
  53. internal class GeneratorCoroutine<T>(override val context: CoroutineContext = EmptyCoroutineContext) : Generator<T>,

    GeneratorBuilder<T>, Continuation<Unit> { lateinit var nextStep: () -> Unit private var lastValue: T? = null private var throwable: Throwable? = null override fun next(): T? { nextStep() throwable?.let { throw it } return lastValue } override suspend fun yield(value: T) = suspendCoroutineOrReturn<Unit> { cont -> lastValue = value nextStep = { cont.resume(Unit) } COROUTINE_SUSPENDED } override fun resume(value: Unit) { lastValue = null } override fun resumeWithException(exception: Throwable) { this.throwable = exception } }
  54. fun <T> generate( block: suspend GeneratorBuilder<T>.() -> Unit ): Generator<T>

    { return GeneratorCoroutine<T>().apply { val firstStep: suspend () -> Unit = { block() } nextStep = { firstStep.startCoroutine(this) } } }
  55. fun pagination(restService: MyRestService) { val gen = generate { var

    hasNext = true while (hasNext) { val data = restService.getData() hasNext = data.hasNext() yield(data.items()) } } ui.set(gen.next()) }
  56. MANUALS, EXAMPLES, VIDEOS AND SLIDES: ▸ Design documents and examples

    for coroutines in Kotlin:
 https://github.com/Kotlin/kotlin-coroutines ▸ Library support for Kotlin coroutines:
 https://github.com/Kotlin/kotlinx.coroutines