Upgrade to Pro — share decks privately, control downloads, hide ads and more …

The Joys of (Z)Streams

The Joys of (Z)Streams

Avatar for Itamar Ravid

Itamar Ravid

May 28, 2020
Tweet

More Decks by Itamar Ravid

Other Decks in Programming

Transcript

  1. ZIO's Chunk • Immutable, array-backed • Keeps primitives unboxed (!!!)

    • O(1) concatenation, O(1) slicing • Extremely fast single-element append Itamar Ravid - @iravid_ 8
  2. Streaming super-powers With bounded memory: ZStream.fromFile(f le, chunkSize = 16384)

    .transduce(utf8Decode splitLines) .foreach(console.putStrLn) Itamar Ravid - @iravid_ 11
  3. Parallelism and pipelining val numbers = 1 to 1000 for

    { primes ZIO.foreachParN(numbers, 20)(isPrime) _ ZIO.foreachParN(primes, 20)(moreHardWork) } yield () Itamar Ravid - @iravid_ 16
  4. Parallelism val numbers = 1 to 1000 ZStream .fromIterable(numbers) .mapMPar(20)(isPrime)

    .mapMPar(20)(moreHardWork) Now we get pipelining! Itamar Ravid - @iravid_ 17
  5. Fibers and Queues It's very tempting to do this: for

    { input Queue.bounded(16) middle Queue.bounded(16) output Queue.bounded(16) _ writeToInput(input).fork _ processBetweenQueues(input, middle).fork _ processBetweenQueues(middle, output).fork _ printElements(inputQueue).fork } yield () Itamar Ravid - @iravid_ 18
  6. Multiple-callback hell Rabbit recommends its driver be used in push

    mode: val channel: Channel channel.basicConsume("queue", autoAck = false, "consumer tag", new DefaultConsumer(channel) { def handleDelivery(body: Array[Byte]) Unit = ??? def handleShutdown() Unit = ??? def handleCancel() Unit = ??? } ) Itamar Ravid - @iravid_ 23
  7. Multiple-callback hell Stuff our entire logic into the callback? channel.basicConsume("queue",

    autoAck = false, "consumer tag", new DefaultConsumer(channel) { def handleDelivery(body: Array[Byte]) Unit = deserialize(body) match { } } ) Itamar Ravid - @iravid_ 24
  8. Multiple-callback hell Enqueue somewhere? channel.basicConsume("queue", autoAck = false, "consumer tag",

    new DefaultConsumer(channel) { def handleDelivery(body: Array[Byte]) Unit = queue.offer(body) } ) Itamar Ravid - @iravid_ 25
  9. Escaping multiple callback hell The ZStream solution for this is

    ZStream.effectAsync: ZStream.effectAsync[Any, Throwable, Array[Byte]] { cb channel.basicConsume( new DefaultConsumer(channel) { def handleDelivery(body: Array[Byte]) Unit = cb(ZIO.succeed(body)) def handleShutdown() Unit = cb(ZIO.fail(None)) } ) } Itamar Ravid - @iravid_ 26
  10. Escaping multiple callback hell From that call, we get a

    plain old: ZStream[Any, Throwable, Array[Byte]] Which we can freely compose as usual. No more callback contortions! Itamar Ravid - @iravid_ 27
  11. Escaping multiple callback hell effectAsyncInterrupt can specify how to cancel

    the subscription: ZStream.effectAsyncInterrupt[Any, Throwable, Array[Byte]] { cb val consumerTag = channel.basicConsume(new DefaultConsumer ) Left(UIO(channel.basicCancel(consumerTag))) } Itamar Ravid - @iravid_ 28
  12. Adaptive batching Databases love batches. And with streams, we can

    easily batch things up for consumption in a database: val dataStream: Stream[Throwable, Record] dataStream .groupedWithin(2000, 30.seconds) Stream[Throwable, List[Record]] .mapM(insertRows) Itamar Ravid - @iravid_ 29
  13. Adaptive batching Databases love batches. And with streams, we can

    easily batch things up for consumption in a database: val dataStream: Stream[Throwable, Record] dataStream .aggregateAsyncWithin( ZTransducer.collectAllN(2000), Schedule.f xed(30.seconds) ) .mapM(insertRows) Itamar Ravid - @iravid_ 30
  14. Adaptive batching This approach favours throughput over latency. What if

    we want to balance the two? Itamar Ravid - @iravid_ 31
  15. Adaptive batching We can ship things off as long as

    the path is clear: val dataStream: Stream[Throwable, Record] dataStream .aggregateAsync(ZTransducer.collectAllN(2000)) .mapM(insertRows) Itamar Ravid - @iravid_ 32
  16. Adaptive batching Thanks to Schedule, we can construct a sophisticated

    batch schedule: val schedule: Schedule[Clock, List[Record], Unit] = Start off with 30 second timeouts as long as batch size is < 1000 ZSchedule.f xed(30.seconds).whileInput(_.size < 1000) andThen and then, switch to a shorter, jittered schedule ZSchedule.f xed(5.seconds).jittered for as long as batches remain over 1000 .whileInput(_.size 1000) Itamar Ravid - @iravid_ 33
  17. Adaptive batching Thanks to Schedule, we can construct a sophisticated

    batch schedule: val schedule: Schedule[Clock, List[Record], Unit] = ZSchedule.f xed(30.seconds).whileInput(_.size < 1000) andThen ZSchedule.f xed(5.seconds).jittered.whileInput(_.size 1000) dataStream .aggregateAsyncWithin(collectAllN(2000), schedule) .mapM(insertRows) Itamar Ravid - @iravid_ 34
  18. Application composition Long-running applications are often composed of multiple components:

    val kafkaStream: ZStream[Blocking, Throwable, Record] val httpServer: Task[Nothing] val scheduledJobRunner: Task[Nothing] Itamar Ravid - @iravid_ 35
  19. Application composition What should our main fiber do with all

    of these? • Launch and wait so our app doesn't exit prematurely • Interrupt everything when SIGTERM is received • Watch all the fibers and quickly exit if something goes wrong Itamar Ravid - @iravid_ 36
  20. Application composition Here's an approach that doesn't work: val main

    = kafkaConsumer.runDrain.fork httpServer.fork scheduledJobRunner.fork ZIO.never Itamar Ravid - @iravid_ 37
  21. Application composition So let's watch the fibers: val managedApp =

    for { kafka kafkaConsumer.runDrain.forkManaged http httpServer.forkManaged jobs scheduledJobRunner.forkManaged } yield ZIO.raceAll(kafka.await, List(http.await, jobs.await)) val main = managedApp .use(identity) . atMap(ZIO.done(_)) Itamar Ravid - @iravid_ 38
  22. Application composition val managedApp = for { _ other resources

    _ ZStream.mergeAllUnbounded( kafkaConsumer.drain, ZStream.fromEffect(httpServer), ZStream.fromEffect(scheduledJobRunner) ).runDrain.toManaged_ } yield () val main = managedApp.use_(ZIO.unit).exitCode Itamar Ravid - @iravid_ 40
  23. Want to learn more? Stream Processing with Scala 15-18.6, remote

    only https://bit.ly/2AaEMeB Itamar Ravid - @iravid_ 41