easily batch things up for consumption in a database: val dataStream: Stream[Throwable, Record] dataStream .groupedWithin(2000, 30.seconds) Stream[Throwable, List[Record]] .mapM(insertRows) Itamar Ravid - @iravid_ 29
easily batch things up for consumption in a database: val dataStream: Stream[Throwable, Record] dataStream .aggregateAsyncWithin( ZTransducer.collectAllN(2000), Schedule.f xed(30.seconds) ) .mapM(insertRows) Itamar Ravid - @iravid_ 30
batch schedule: val schedule: Schedule[Clock, List[Record], Unit] = Start off with 30 second timeouts as long as batch size is < 1000 ZSchedule.f xed(30.seconds).whileInput(_.size < 1000) andThen and then, switch to a shorter, jittered schedule ZSchedule.f xed(5.seconds).jittered for as long as batches remain over 1000 .whileInput(_.size 1000) Itamar Ravid - @iravid_ 33
of these? • Launch and wait so our app doesn't exit prematurely • Interrupt everything when SIGTERM is received • Watch all the fibers and quickly exit if something goes wrong Itamar Ravid - @iravid_ 36