queue ▸ Has suspending operations instead of blocking ones ▸ Can be closed ▸ Send / receive messages operations can cause a suspension. ▸ The send operation can suspend when the channel is full ▸ The receive operation can suspend when the channel is empty
channel, suspending the caller while this channel is full ▸ offer(element: E): Boolean adds an element to channel if possible, or return false if the channel is full ▸ close(cause: Throwable? = null) closes the channel
suspended until someone else invokes receive (unless there is some receive operation already suspended) ▸ every receive invocation is suspended until someone else invokes send (unless there is some send operation already suspended)
subsequent send invocations ▸ send is never suspended, but new element overrides any old element waiting to be received ▸ receive is suspended when buffer is empty
val TO = 10 val channel = produce(coroutineContext) { print ("Start: ") (FROM until TO).forEach { send(it) } println("End.") } val flowable = Flowable.range(FROM, TO) .doOnSubscribe { print("Start: ") } .doFinally { println("End.") } println("~~~ Round ONE ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ Round TWO ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ VERSUS ~~~") }
val TO = 10 val channel = produce(coroutineContext) { print ("Start: ") (FROM until TO).forEach { send(it) } println("End.") } val flowable = Flowable.range(FROM, TO) .doOnSubscribe { print("Start: ") } .doFinally { println("End.") } println("~~~ Round ONE ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ Round TWO ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ VERSUS ~~~") }
val TO = 10 val channel = produce(coroutineContext) { print ("Start: ") (FROM until TO).forEach { send(it) } println("End.") } val flowable = Flowable.range(FROM, TO) .doOnSubscribe { print("Start: ") } .doFinally { println("End.") } println("~~~ Round ONE ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ Round TWO ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ VERSUS ~~~") }
val TO = 10 val channel = produce(coroutineContext) { print ("Start: ") (FROM until TO).forEach { send(it) } println("End.") } val flowable = Flowable.range(FROM, TO) .doOnSubscribe { print("Start: ") } .doFinally { println("End.") } println("~~~ Round ONE ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ Round TWO ~~~") println("~~~ MC Channel ~~~") for (i in channel) { print("$i ") } println("~~~ MC Flowable ~~~") flowable.subscribe { print("$it ") } println("~~~ VERSUS ~~~") }
or receive some elements to/from other parts of the system. ▸ Does not share any mutable state ▸ Can send messages to other actors ▸ Can create new actors ▸ Can designate the behavior to be used for the next message it receives
Expr() class Dec : Expr() } fun counter(start: Int) = actor<Expr>(CommonPool) { var value = start for (expr in channel) { when (expr) { is Inc -> value++ is Dec -> value-- } } } WHY THIS IS SAFE???
between suspension points follows normal JMM rules that state that operations on a single thread establish happens-before relation. ▸ Execution around suspension points has happens-before relation, because it is established by the synchronisation primitives that are used by implementations of suspending functions.
private state here suspend fun onReceive(msg: MyMsg) { // ... your code here ... } } fun myActorJob(): SendChannel<MyMsg> = actor(CommonPool) { with(MyActor()) { for (msg in channel) onReceive(msg) } }
for coroutines in Kotlin: https://github.com/Kotlin/kotlin-coroutines ▸ Library support for Kotlin coroutines: https://github.com/Kotlin/kotlinx.coroutines