Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Stream Driven Development - Design your data pi...

Stream Driven Development - Design your data pipeline with Akka Streams

Akka Streams is a well-known Reactive Streams implementation that helps you build asynchronous, data-intensive apps with no predetermined data volumes. But how would you leverage its full power to design complex, Akka-backed reactive pipelines? At HomeAway we devised an approach to tackle this problem, combining elements of DDD with the abstraction power of the Akka Streams model. Here we’ll discuss useful patterns to – reason about your stream and identify its building blocks
– type-drive the implementation
– handle failure
– instrument your application for logging and monitoring purposes

Stefano Bonetti

April 16, 2018
Tweet

More Decks by Stefano Bonetti

Other Decks in Programming

Transcript

  1. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 12 SOURCE (1 output)

    FAN-IN (n inputs, 1 output) FAN-OUT (1 input, n outputs) RUNNABLEGRAPH (no input or output) STREAMS STAGES FLOW (1 input, 1 output) SINK (1 input) ... CUSTOM
  2. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 13 Source ~> Flow

    = Source Flow ~> Sink = Sink Flow ~> Flow = Flow Broadcast ~> Merge = Flow Source ~> Sink = RunnableGraph Source ~> Flow ~> Sink = RunnableGraph STAGE ARITHMETIC
  3. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 25 Flow[PropertyChange, AdwordsChange, NotUsed]

    RunnableGraph Source[PropertyChange, NotUsed] Sink[AdwordsChange, Future[Done]]
  4. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 26 Flow[PropertyChange, AdwordsChange, NotUsed]

    RunnableGraph[Future[Done]] Source[PropertyChange, NotUsed] Sink[AdwordsChange, Future[Done]]
  5. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 27 Flow[PropertyChange, AdwordsChange, NotUsed]

    RunnableGraph[Future[Done]] Source[PropertyChange, NotUsed] Sink[AdwordsChange, Future[Done]]
  6. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 29 SIDE EFFECTS Error

    Handling Logging Monitoring EVENTS Errors Audit ... ...
  7. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 30 SIDE EFFECTS Error

    Handling Logging Monitoring EVENTS Errors Audit ... ... REFERENTIALLY TRANSPARENT STAGES Flow[In, Either[Error, Out], Mat] Flow[In, (Seq[Audit], Out), Mat] Flow[(Offset,In), (Offset, Out), Mat] ...
  8. def propertySource(config: KafkaConfig): Source[PropertyChange, NotUsed] = { def settings(config: KafkaConfig):

    ConsumerSettings[String, PropertyChange] = ??? val kafkaSrc: Source[ConsumerRecord[String, PropertyChange], Control] = Consumer.plainSource( settings(config), Subscriptions.topics(config.topic) ) kafkaSrc .map(_.value) .mapMaterializedValue { _ ⇒ NotUsed } } SOURCE
  9. def propertySource(config: KafkaConfig): Source[PropertyChange, NotUsed] = { def settings(config: KafkaConfig):

    ConsumerSettings[String, PropertyChange] = ??? val kafkaSrc: Source[ConsumerRecord[String, PropertyChange], Control] = Consumer.plainSource( settings(config), Subscriptions.topics(config.topic) ) kafkaSrc .map(_.value) .mapMaterializedValue { _ ⇒ NotUsed } } SOURCE - REACTIVE KAFKA
  10. def propertySource(config: KafkaConfig): Source[PropertyChange, NotUsed] = { def settings(config: KafkaConfig):

    ConsumerSettings[String, PropertyChange] = ??? val kafkaSrc: Source[ConsumerRecord[String, PropertyChange], Control] = Consumer.plainSource( settings(config), Subscriptions.topics(config.topic) ) kafkaSrc .map(_.value) .mapMaterializedValue { _ ⇒ NotUsed } } SOURCE - REACTIVE KAFKA
  11. def processingFlow(): Flow[PropertyChange, Either[ValidationError, AdwordsChange], NotUsed] = { val service:

    PropertyProcessingService = ??? Flow.fromFunction(service.process) } PROCESSING FLOW
  12. def processingFlow(): Flow[PropertyChange, Either[ValidationError, AdwordsChange], NotUsed] = { val validationService

    : PropertyValidationService = ??? val transformationService: PropertyTransformationService = ??? Flow.fromFunction(validationService.validate) .async .map(transformationService.transform) } PROCESSING FLOW
  13. STORING FLOW def adwordsFlow(config: AdwordsConfig): Flow[AdwordsChange, Either[StorageError, Stored], NotUsed] =

    { val service: AdwordsService = ??? Flow[AdwordsChange].mapAsync(config.parallelism)(service.store) }
  14. STORING FLOW - WITH THROTTLING def adwordsFlow(config: AdwordsConfig): Flow[AdwordsChange, Either[StorageError,

    Stored], NotUsed] = { val service: AdwordsService = ??? Flow[AdwordsChange] .throttle(50, per = 1.second, maximumBurst = 50, mode = Shaping) .mapAsync(config.parallelism)(service.store) }
  15. STORING FLOW - WITH BATCHING def adwordsFlow(config: AdwordsConfig): Flow[AdwordsChange, Either[StorageError,

    Stored], NotUsed] = { val service: AdwordsService = ??? Flow[AdwordsChange] .batch(max = 5000, seed = List(_))(_ :+ _) .throttle(50, per = 1.second, maximumBurst = 50, mode = Shaping) .mapAsync(config.parallelism)(service.store) }
  16. ERROR SINK def errorSink(cfg: ErrorConfig): Sink[AdwordsStreamError, NotUsed] = { val

    service: AdwordsErrorService = ??? Sink.foreach[AdwordsStreamError](service.handle) .mapMaterializedValue(_ ⇒ NotUsed) }
  17. def graph(source : Source[PropertyChange, NotUsed], process : Flow[PropertyChange, Either[ValidationError, AdwordsChange],

    NotUsed], store : Flow[AdwordsChange, Either[StorageError, Stored], NotUsed], errorSink: Sink[AdwordsStreamError, NotUsed]): RunnableGraph[Future[Done]] = { val processAndDivert: Flow[PropertyChange, AdwordsChange, NotUsed] = process via divertErrors(to = errorSink) val storeAndDivert: Flow[AdwordsChange, Stored, NotUsed] = store via divertErrors(to = errorSink) source .via(processAndDivert) .via(storeAndDivert) .toMat(Sink.ignore)(Keep.right) } GRAPH
  18. def graph(source : Source[PropertyChange, NotUsed], process : Flow[PropertyChange, Either[ValidationError, AdwordsChange],

    NotUsed], store : Flow[AdwordsChange, Either[StorageError, Stored], NotUsed], errorSink: Sink[AdwordsStreamError, NotUsed]): RunnableGraph[Future[Done]] = { val processAndDivert: Flow[PropertyChange, AdwordsChange, NotUsed] = process via divertErrors(to = errorSink) val storeAndDivert: Flow[AdwordsChange, Stored, NotUsed] = store via divertErrors(to = errorSink) source .via(processAndDivert) .via(storeAndDivert) .toMat(Sink.ignore)(Keep.right) } GRAPH
  19. def graph(source : Source[PropertyChange, NotUsed], process : Flow[PropertyChange, Either[ValidationError, AdwordsChange],

    NotUsed], store : Flow[AdwordsChange, Either[StorageError, Stored], NotUsed], errorSink: Sink[AdwordsStreamError, NotUsed]): RunnableGraph[Future[Done]] = { val processAndDivert: Flow[PropertyChange, AdwordsChange, NotUsed] = process via divertErrors(to = errorSink) val storeAndDivert: Flow[AdwordsChange, Stored, NotUsed] = store via divertErrors(to = errorSink) source .via(processAndDivert) .via(storeAndDivert) .toMat(Sink.ignore)(Keep.right) } GRAPH
  20. def graph(source : Source[PropertyChange, NotUsed], process : Flow[PropertyChange, Either[ValidationError, AdwordsChange],

    NotUsed], store : Flow[AdwordsChange, Either[StorageError, Stored], NotUsed], errorSink: Sink[AdwordsStreamError, NotUsed]): RunnableGraph[Future[Done]] = { val processAndDivert: Flow[PropertyChange, AdwordsChange, NotUsed] = process via divertErrors(to = errorSink) val storeAndDivert: Flow[AdwordsChange, Stored, NotUsed] = store via divertErrors(to = errorSink) source .via(processAndDivert) .via(storeAndDivert) .toMat(Sink.ignore)(Keep.right) } GRAPH
  21. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 52 Flow[Either[E,T], T, M]

    Sink[E, M] Either[E, T] T E DIVERT ERRORS
  22. def divertErrors[T, E, M](sink: Sink[E, M]): Flow[Either[E, T], T, M]

    = Flow[Either[E, T]] .divertToMat(sink.contramap(_.left.get), when = _.isLeft)(Keep.right) .map(_.right.get) GRAPH - DIVERT ERRORS
  23. def divertErrors[T, E, M](to: Sink[E, M]): Flow[Either[E, T], T, M]

    = { Flow.fromGraph(GraphDSL.create(to) { implicit b ⇒ sink ⇒ val partition = b.add(Partition[Either[E, T]](2, _.fold(_ ⇒ 0, _ ⇒ 1))) val left = b.add(Flow[Either[E, T]].map (_.left.get)) val right = b.add(Flow[Either[E, T]].map (_.right.get)) partition ~> left ~> sink partition ~> right FlowShape(partition.in, right.out) }) } GRAPH - DIVERT ERRORS (ante 2.5.10)
  24. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 55 PropertyProcessingService def process(p:

    PropertyChange): Either[ValidationError, AdwordsChange] PropertyChange AdwordsService def store(p: AdwordsChange): Future[Either[StorageError, Stored]] DOMAIN ValidationError AdwordsChange Stored StorageError AdwordsErrorService def handle(p: AdwordsError): Unit
  25. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 56 APPLICATION Failure Handling

    Config Management Materialization Stage Creation
  26. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 57 Application layer Graph

    Events Services Domain layer Repositories Factories Sources Flows Sinks Other Reactive layer
  27. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 59 Application Domain TEST

    PYRAMID E2E Domain Tests (unit / integration) Graph Other Stages Stages Tests Graph Tests
  28. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 60 Application Domain TEST

    PYRAMID E2E Domain Tests (unit / integration) Graph Other Stages Stages Tests Graph Tests
  29. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 62 Application Domain TEST

    PYRAMID E2E Domain Tests (unit / integration) Graph Other Stages Stages Tests Graph Tests
  30. "graph" should "connect the provided stages" in { val changesOfMixedValidity:

    List[PropertyChange] = ??? val source = Source(changesOfMixedValidity) val processingFlow = Flow[PropertyChange].map { ??? } val storingFlow = Flow[AdwordsChange].map { ??? } val errorSink = Flow[Error].map { ??? }.to(Sink.ignore) Graph(source, processingFlow, storingFlow, errorSink).run().futureValue // verify side-effects } GRAPH TEST
  31. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. 64 Application Domain TEST

    PYRAMID E2E Domain Tests (unit / integration) Graph Other Stages Stages Tests Graph Tests
  32. "property change source" should "read events from Kafka" in {

    val change: PropertyChange = ??? val config: KafkaConfig = ??? val kafka = new EmbeddedKafka(config) kafka.publish(config.topic, change) val source = PropertySource(config) source.runWith(TestSink.probe) .request(1) .expectNext(change) } STAGE TEST - SOURCE
  33. "property change source" should "read events from Kafka" in {

    val change: PropertyChange = ??? val config: KafkaConfig = ??? val kafka = new EmbeddedKafka(config) kafka.publish(config.topic, change) val source = PropertySource(config) source.runWith(TestSink.probe) .request(1) .expectNext(change) } STAGE TEST - SOURCE
  34. "property change source" should "read events from Kafka" in {

    val change: PropertyChange = ??? val config: KafkaConfig = ??? val kafka = new EmbeddedKafka(config) kafka.publish(config.topic, change) val source = PropertySource(config) val result: Future[PropertyChange] = source.runWith(Sink.head) result.futureValue shouldBe Some(change) } STAGE TEST - SOURCE
  35. "property change source" should "read events from Kafka" in {

    val change: PropertyChange = ??? val config: KafkaConfig = ??? val kafka = new EmbeddedKafka(config) kafka.publish(config.topic, change) val source = PropertySource(config) val result: Future[Seq[PropertyChange]] = source.take(2) .runWith(Sink.seq) result.futureValue shouldBe Seq(change1, change2) } STAGE TEST - SOURCE
  36. © 2018 HOMEAWAY. ALL RIGHTS RESERVED. Stefano Bonetti Software Engineer

    @svezfaz @svez_faz Thank you. homeaway.com/careers