Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Channels & Flows

Mohit S
August 07, 2019

Channels & Flows

In this talk, I will presented on how to use constructs such as actors, channels and flows in use cases. These use cases are modeling streams with channels and flows and using actors to ensure one concurrent job is running. Channels and Flows are experimental features that allow us to represent a stream of data. Flows represent a cold stream whereas a Channel is like a hot observable. We’ll dive into these topics.

Mohit S

August 07, 2019
Tweet

More Decks by Mohit S

Other Decks in Programming

Transcript

  1. What is a Channel? • Similar to BlockingQueue.
 interface Channel<E>

    { suspend fun send(element: E) suspend fun receive(): E }
  2. Using Channels runBlocking {
 val channel = Channel<Int>() launch {

    channel.send(1) } val num = channel.receive() }
  3. Using Channels runBlocking {
 val channel = Channel<Int>() launch {

    channel.send(1) } val num = channel.receive() } Parent Coroutine
  4. Using Channels runBlocking {
 val channel = Channel<Int>() launch {

    channel.send(1) } val num = channel.receive() } Child Coroutine
  5. Using Channels runBlocking {
 val channel = Channel<Int>() launch {

    channel.send(1) } val num = channel.receive() }
  6. Using Channels runBlocking {
 val channel = Channel<Int>() launch {

    channel.send(1) } val num = channel.receive() }
  7. Using Channels runBlocking {
 val channel = Channel<Int>() launch {

    channel.send(1) } val num = channel.receive() }
  8. Creating Channel fun <E> Channel(capacity: Int = RENDEZVOUS) = when

    (capacity) { RENDEZVOUS !-> RendezvousChannel() UNLIMITED !-> LinkedListChannel() CONFLATED !-> ConflatedChannel() BUFFERED !-> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) else !-> ArrayChannel(capacity) }
  9. Rendezvous Channel runBlocking { val channel = Channel<Int>() launch {

    channel.send(1) channel.send(2) }
 channel.receive() } Receiver
  10. 
 
 $queueDebugStateString get() {
 if (head !!=== queue) return

    "EmptyQueue" } Query Channel State EmptyQueue
  11. 
 
 
 $queueDebugStateString get() {
 when (head) {
 is

    Closed!!<*> !-> head.toString()
 is Receive!!<*> !-> "ReceiveQueued"
 is Send !-> "SendQueued" else !-> "UNEXPECTED:$head" } } Query Channel State SendElement
  12. 
 
 
 $queueDebugStateString get() {
 if (tail !!!== head)

    { result += ",queueSize=${countQueueSize()}" } } Query Channel State SendElement Buffer SendElement SendElement
  13. Idiomatic Channel Creation runBlocking {
 val channel = Channel<Int>() launch

    { for (i in 1!..10) { channel.send(i) } } } Producer
  14. Produce Builder fun <E> CoroutineScope.produce( context: CoroutineContext, capacity: Int =

    0, block: suspend ProducerScope<E>.() !-> Unit ): ReceiveChannel<E>
  15. Produce Builder fun <E> CoroutineScope.produce(…) { val channel = Channel<E>(capacity)

    val newContext = newCoroutineContext(context) val coroutine = ProducerCoroutine(ctx, channel) coroutine.start(…, coroutine, block) return coroutine }
  16. Channel Iterator fun hasNextSuspend() = suspendCancellable { cont !-> while

    (true) { val result = channel.pollInternal() this.result = result if (result is Closed!!<*>) {
 cont.resume(false) } else { cont.resume(true) } …
 } }
  17. runBlocking { val channel = Channel<Int>(capacity = 3) launch {

    channel.send(1) channel.send(2) channel.send(3) } } Buffered Channel 1 2 3 Buffer
  18. runBlocking { val channel = Channel<Int>(capacity = 3) launch {

    channel.send(1) channel.send(2) channel.send(3)
 channel.send(4) } } Buffered Channel SUSPEND 1 2 3 Buffer
  19. Unlimited Channel runBlocking { val channel = Channel<Int>(Channel.UNLIMITED) launch {

    channel.send(1) channel.send(2) channel.send(3) } } SendBuffered(1) SendBuffered(2) SendBuffered(3)
  20. Conflated Channel 
 val channel = Channel<Int>(Channel.CONFLATED) channel.send(1) channel.send(2) !//

    Conflate previously sent 
 println(channel.receive()) Output:
 2
  21. Channels Rendezvous • Queue size = 1 • Buffer: None

    Array Channel • Queue size = 1 • Buffer: Array Linked List Channel • Queue size = Unlimited • Buffer: Queue Conflated • Queue size = 1 • Buffer: Queue
  22. Tips for using Channels • Query channel queue for debugging.

    • Channel type determines back pressure strategy to use.
  23. Problem class MyViewModel(val interactor: Interactor) { val scope = CoroutineScope(Dispatchers.Main)

    fun refresh() { scope.launch { val response = interactor.getData() handleResponse(response) } } }
  24. Problem class MyViewModel(val interactor: Interactor) { val scope = CoroutineScope(Dispatchers.Main)

    fun refresh() { scope.launch { val response = interactor.getData() handleResponse(response) } } } Ensure 1 concurrent job runs
  25. Check Job State class MyViewModel(val interactor: Interactor) { val job:

    Job? = null fun refresh() { if (job!?.isCompleted !== false) return
 val job = scope.launch { val response = interactor.getData() handleResponse(response) } } }
  26. fun <E> CoroutineScope.actor( capacity: Int = 0, block: suspend ActorScope<E>.()

    !-> Unit ): SendChannel<E> { val channel = Channel<E>(capacity) val coroutine = ActorCoroutine(ctx, channel, true) coroutine.start(start, coroutine, block) return coroutine } Inside Actor
  27. fun CoroutineScope.modelActor() = 
 
 actor<ActorEvents>(capacity = Channel.CONFLATED) { for

    (event in channel) { when(event) { is ActorEvents.LoadData !-> apiResponse = interactor.getData() }
 } } Using Actor
  28. What is a Flow? • Cold stream interface Flow<out T>

    { suspend fun collect(collector: FlowCollector<T>) }
  29. What is a Flow? interface Flow<out T> { suspend fun

    collect(collector: FlowCollector<T>) } Consume stream
  30. What is a Flow? interface Flow<out T> { suspend fun

    collect(collector: FlowCollector<T>) } What to consume?
  31. flowOf Builder fun <T> flowOf(vararg elements: T): Flow<T> { object

    : Flow<T> { override suspend fun collect(collector: FlowCollector<T>) { for (element in elements) { collector.emit(element) } } } }
  32. Single Operator Collects only one value. val flow = flowOf(1,2,3)

    val num: Int = flow.single() java.lang.IllegalStateException: Expected only one element
  33. DelayEach runBlocking { val flow = getFlow() flow.collect { println(it)

    } } fun getFlow() = flowOf(1,2,3).delayEach(1000)
  34. Flow Constraints Emit from the same coroutine val flow =

    flow { launch(Dispatchers.IO) { emit(1) } } Flow invariant is violated: emission from another coroutine is detected
  35. Channel Flow Builder val flow = channelFlow { launch(Dispatchers.IO) {

    send(1) } launch(Dispatchers.IO) { send(2) } } Flow Channel
  36. Channel Flow Builder class ChannelFlowBuilder<T>( val block: suspend ProducerScope<T>.() !->

    Unit, capacity: Int = BUFFERED ) : ChannelFlow<T>(context, capacity) Uses Buffered Channel
  37. Flow Context runBlocking { val flow = flow { emit(1)

    emit(2) emit(3) } flow .map { it + 1 } .collect { println(it) } }
  38. Flow Context runBlocking { val flow = flow { emit(1)

    emit(2) emit(3) } flow .map { it + 1 } .collect { println(it) } } Dispatchers.Main Dispatchers.Main
  39. Flow Context runBlocking { val flow = flow { emit(1)

    emit(2) emit(3) } flow .map { it + 1 } .flowOn(Dispatchers.IO) .collect { println(it) } } Dispatchers.IO Dispatchers.IO Dispatchers.Main
  40. Exception Transparency sealed class ApiResponse<T> { data class Success<T>(val data:

    T): ApiResponse<T>() data class Error<T>(val t: Throwable): ApiResponse<T>() }
  41. Exception Transparency val flow = flow<ApiResponse<Data!>> { val data =

    makeRequest() emit(ApiResponse.Success(data)) }.catch { emit(ApiResponse.Error(it)) }
  42. Debugging (JVM) runBlocking<Unit> { DebugProbes.install() val channel = Channel<Int>() launch

    { channel.send(2) }
 DebugProbes.dumpCoroutines() } Coroutine StandaloneCoroutine{Active}, state: SUSPENDED Install Debugger View Coroutines State