Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Field guide to DDD/CQRS using the Scala Type Sy...

Field guide to DDD/CQRS using the Scala Type System and Akka

In this talk we'll explore some techniques you can use when implementing a DDD/CQRS application using Akka and Scala.

First we'll show how we can use the Scala Type System to build a domain model based on case classes and functional paradigms. Contrary to what the common sense dictates, we'll demonstrate that functional programming is a very good fit for domain modeling.

Secondly we'll explore the available options when integrating it with Akka and Akka Persistence.

We'll cover the following points:

- The advantages and pitfalls of using Akka as a messaging subsystem
- How to reliably propagate events from the command side to the query side of your CQRS application
- How to recover from failure
- How to fit it in an asynchronous programming model

We'll wrap-up the talk with a short demo application built on Play and Akka.

Renato Cavalcanti

November 17, 2015
Tweet

More Decks by Renato Cavalcanti

Other Decks in Programming

Transcript

  1. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Fun.CQRS Field guide to

    DDD/CQRS using the Scala Type System and Akka
  2. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Who am I? Renato

    Cavalcanti 
 Independent consultant (mainly Scala)
 Founder of BeScala @renatocaval
 [email protected]
 www.strongtyped.io github.com/rcavalcanti
 github.com/strongtyped
  3. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Credits Christophe Vanfleteren Filip

    Mortier Jeroen Vuerinckx (@jeroenvu) Karel Maesen (@karelmaesen) Peter Mortier (@kwarkk) Peter Rigole (@peterrigole) Philip Van Bauwel (@vanbauph) Natal Vande Casteele Dan Brooke (@kiequoo) - GitHub
  4. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Agenda • DDD /

    CQRS / Event Sourcing (intro) • Akka and asynchrounous programming • Akka Persistence and Event Sourcing • View Projections with Akka Persistence Query • Aggregates / Commands / Events in Scala • Type members: type projections • Partial Functions, lifting and function composition • Magnet Pattern - implicits (bonus)
  5. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka DDD - Aggregate •

    Aggregate is a central DDD concept. • It has a root and zero or more entities and value objects underneath.
  6. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka DDD - Aggregate •

    Aggregate is a central DDD concept. • It has a root and zero or more entities and value objects underneath. • Responsible for the consistency of underneath objects.
  7. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka DDD - Aggregate •

    Aggregate is a central DDD concept. • It has a root and zero or more entities and value objects underneath. • Responsible for the consistency of underneath objects. • You can only modify one Aggregate per transaction.
  8. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka CQRS - Event Sourcing

    • "It is simply the creation of two objects where there was previously only one" - Greg Young
  9. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka CQRS - Event Sourcing

    • "It is simply the creation of two objects where there was previously only one" - Greg Young • Write Model receives Commands and produces Events.
  10. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka CQRS - Event Sourcing

    • "It is simply the creation of two objects where there was previously only one" - Greg Young • Write Model receives Commands and produces Events. • Read Models are created from Events.
  11. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka CQRS - Event Sourcing

    • "It is simply the creation of two objects where there was previously only one" - Greg Young • Write Model receives Commands and produces Events. • Read Models are created from Events. • Events can be stored and replayed.
  12. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Event Driven / Event

    Sourcing • CQRS is Event Driven, but not necessarily implements Event Sourcing
  13. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Event Driven / Event

    Sourcing • CQRS is Event Driven, but not necessarily implements Event Sourcing • in synchronous CQRS:
  14. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Event Driven / Event

    Sourcing • CQRS is Event Driven, but not necessarily implements Event Sourcing • in synchronous CQRS: •tx(Cmd 㱺 Write Model 㱺 Event 㱺 View)
  15. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Event Driven / Event

    Sourcing • CQRS is Event Driven, but not necessarily implements Event Sourcing • in synchronous CQRS: •tx(Cmd 㱺 Write Model 㱺 Event 㱺 View) •tx(Cmd 㱺 Write Model 㱺 Event 㱺 
 View1, View2, View3, View4, …, ViewN)
  16. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Event Driven / Event

    Sourcing • CQRS is Event Driven, but not necessarily implements Event Sourcing • in synchronous CQRS: •tx(Cmd 㱺 Write Model 㱺 Event 㱺 View) •tx(Cmd 㱺 Write Model 㱺 Event 㱺 
 View1, View2, View3, View4, …, ViewN) Won’t scale! Damn blocking!
  17. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Event Driven / Event

    Sourcing • in asynchronous CQRS: •tx(Cmd 㱺 Write Model 㱺 Event) •tx(Event 㱺 View1) •tx(Event 㱺 View2) •tx(Event 㱺 View3)
  18. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Event Driven / Event

    Sourcing • in asynchronous CQRS: •tx(Cmd 㱺 Write Model 㱺 Event) •tx(Event 㱺 View1) •tx(Event 㱺 View2) •tx(Event 㱺 View3) • Introduces Eventual Consistency
  19. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka • Aggregate is responsible

    for consistency (DDD) • You can only modify one Aggregate per transaction (DDD)
  20. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka • Aggregate is responsible

    for consistency (DDD) • You can only modify one Aggregate per transaction (DDD) • Write Model receives Commands and produces Events (CQRS)
  21. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka • Aggregate is responsible

    for consistency (DDD) • You can only modify one Aggregate per transaction (DDD) • Write Model receives Commands and produces Events (CQRS) • Strict Consistency on Write Model side (DDD / CQRS)

  22. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka • Aggregate is responsible

    for consistency (DDD) • You can only modify one Aggregate per transaction (DDD) • Write Model receives Commands and produces Events (CQRS) • Strict Consistency on Write Model side (DDD / CQRS)
 • Read Models are created from Events (CQRS)
  23. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka • Aggregate is responsible

    for consistency (DDD) • You can only modify one Aggregate per transaction (DDD) • Write Model receives Commands and produces Events (CQRS) • Strict Consistency on Write Model side (DDD / CQRS)
 • Read Models are created from Events (CQRS) • Eventual Consistency if asynchronous (CQRS / Event Sourcing)
  24. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka • Aggregate is responsible

    for consistency (DDD) • You can only modify one Aggregate per transaction (DDD) • Write Model receives Commands and produces Events (CQRS) • Strict Consistency on Write Model side (DDD / CQRS)
 • Read Models are created from Events (CQRS) • Eventual Consistency if asynchronous (CQRS / Event Sourcing) • Events can be stored and replayed (Event Sourcing)
  25. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka trait Behavior { }

    Basic Operations (non-reactive) trait DomainCommand trait DomainEvent trait Aggregate
  26. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka trait Behavior { }

    Basic Operations (non-reactive) trait DomainCommand trait DomainEvent trait Aggregate // validates a Command and produces an Event def validate(cmd: DomainCommand): DomainEvent
  27. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka trait Behavior { }

    Basic Operations (non-reactive) trait DomainCommand trait DomainEvent trait Aggregate // validates a Command and produces an Event def validate(cmd: DomainCommand): DomainEvent // applies an Event and produces a new Aggregate def applyEvent(evt: DomainEvent): Aggregate
  28. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka trait Behavior { }

    Basic Operations (non-reactive) trait DomainCommand trait DomainEvent trait Aggregate // validates a Command and produces an Event def validate(cmd: DomainCommand): DomainEvent // validates a Command against current Aggregate state def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent // applies an Event and produces a new Aggregate def applyEvent(evt: DomainEvent): Aggregate
  29. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka trait Behavior { }

    Basic Operations (non-reactive) trait DomainCommand trait DomainEvent trait Aggregate // validates a Command and produces an Event def validate(cmd: DomainCommand): DomainEvent // validates a Command against current Aggregate state def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent // applies an Event and produces a new Aggregate def applyEvent(evt: DomainEvent): Aggregate // applies an Event on current Aggregate def applyEvent(evt: DomainEvent, agg: Aggregate): Aggregate
  30. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 // first command

    is validated def validate(cmd: DomainCommand): DomainEvent // CreateLottery => LotteryCreated State of Aggregate Event Store
  31. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 S1 // first

    command is validated def validate(cmd: DomainCommand): DomainEvent // CreateLottery => LotteryCreated // applies an Event and produces a new Aggregate def applyEvent(evt: DomainEvent): Aggregate // LotteryCreated => Lottery State of Aggregate Event Store
  32. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 E2 S1 S2

    // applies an Event on current Aggregate def applyEvent(evt: DomainEvent, agg: Aggregate): Aggregate // (ParticipantAdded, Lottery) => Lottery // validates a Command against current Aggregate state def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent // (AddParticipant, Lottery) => ParticipantAdded State of Aggregate Event Store
  33. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 E2 E3 S2

    S3 // applies an Event on current Aggregate def applyEvent(evt: DomainEvent, agg: Aggregate): Aggregate // (WinnerSelected, Lottery) => Lottery // validates a Command against current Aggregate state def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent // (Run, Lottery) => WinnerSelected State of Aggregate Event Store
  34. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 E2 E3 S3

    // validates a Command against current Aggregate state def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent // (Run, Lottery) => ??? E4 State of Aggregate Event Store
  35. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 E2 E3 S3

    // validates a Command against current Aggregate state def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent // (Run, Lottery) => ??? E4 Invalid Command! Lottery has already a winner! State of Aggregate Event Store
  36. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Watch out! We don’t

    throw exceptions in Scala. We’ll fix it soon! E1 E2 E3 S3 // validates a Command against current Aggregate state def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent // (Run, Lottery) => ??? E4 Invalid Command! Lottery has already a winner! State of Aggregate Event Store
  37. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 S1 // applies

    an Event and produces a new Aggregate // LotteryCreated => Lottery def applyEvent(evt: DomainEvent): Aggregate E1 State of Aggregate Event Store E2 E3 S1
  38. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 State of Aggregate

    Event Store E2 E3 S1 S2 S1 S2 // applies an Event on current Aggregate // (ParticipantAdded, Lottery) => Lottery def applyEvent(evt: DomainEvent, agg: Aggregate): Aggregate E2
  39. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 State of Aggregate

    Event Store E2 E3 S2 S3 E3 S2 S3 // applies an Event on current Aggregate // (WinnerSelected, Lottery) => Lottery def applyEvent(evt: DomainEvent, agg: Aggregate): Aggregate
  40. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka E1 State of Aggregate

    Event Store E2 E3 S3 E3 S2 S3 // applies an Event on current Aggregate // (WinnerSelected, Lottery) => Lottery def applyEvent(evt: DomainEvent, agg: Aggregate): Aggregate
  41. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Basics • We

    send messages to an Actor via its ActorRef. No direct access
  42. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Basics • We

    send messages to an Actor via its ActorRef. No direct access • Mailbox - Actor processes one message at a time
  43. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Basics • We

    send messages to an Actor via its ActorRef. No direct access • Mailbox - Actor processes one message at a time • May have mutable state, but shielded from external world
  44. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Basics • We

    send messages to an Actor via its ActorRef. No direct access • Mailbox - Actor processes one message at a time • May have mutable state, but shielded from external world • An Actor stays in memory until it dies or it’s terminated
  45. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Basics • We

    send messages to an Actor via its ActorRef. No direct access • Mailbox - Actor processes one message at a time • May have mutable state, but shielded from external world • An Actor stays in memory until it dies or it’s terminated • Akka Persistence allows us to save messages and recover state
  46. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg
  47. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor Thread 1 - application SomeMsg AnotherMsg
  48. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg
  49. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor Thread 2 - akka pool SomeMsg AnotherMsg
  50. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg Thread 1 - application
  51. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg Thread 1 - application
  52. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg Thread 1 - application
  53. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg Thread 1 - application
  54. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg Thread 1 - application
  55. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg Thread 1 - application
  56. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorRef // Actor's receive

    method def receive: Receive = { case msg: SomeMsg => println(s"got a message: $msg") case msg: AnotherMsg => println(s"got another one: $msg") } ActorSystem Mailbox Actor SomeMsg AnotherMsg
  57. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated LotteryCreated E1 Behavior
  58. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated LotteryCreated E1 Lottery S1 Behavior (apply)
  59. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated E1 Lottery S1 AddParticipant (message) Behavior
  60. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated E1 Lottery S1 AddParticipant Behavior
  61. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated E1 Lottery S1 AddParticipant Behavior (validate)
  62. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated E1 Lottery S1 ParticipantAdded Behavior
  63. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated E1 Lottery S1 ParticipantAdded ParticipantAdded E2 Behavior
  64. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated E1 Lottery S1 ParticipantAdded ParticipantAdded E2 S2 Behavior (apply)
  65. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ActorSystem AggregateActor ActorRef Actor’s

    Mailbox Event Store LotteryCreated E1 Lottery S1 ParticipantAdded E2 S2 Behavior
  66. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka AggregateManager [Lottery] AggregateActor Lottery.id

    = 1 • Entry point for a given Aggregate type • Manage it’s children
  67. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka AggregateManager [Lottery] AggregateActor Lottery.id

    = 2 AggregateActor Lottery.id = 1 • Entry point for a given Aggregate type • Manage it’s children
  68. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka AggregateManager [Lottery] AggregateActor Lottery.id

    = 2 AggregateActor Lottery.id = 1 • Entry point for a given Aggregate type • Manage it’s children • Forward message to the right Aggregate base on id
  69. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka AggregateManager [Lottery] AggregateActor Lottery.id

    = 3 AggregateActor Lottery.id = 2 AggregateActor Lottery.id = 1 • Entry point for a given Aggregate type • Manage it’s children • Forward message to the right Aggregate base on id
  70. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka AggregateManager [Lottery] AggregateActor Lottery.id

    = 3 AggregateActor Lottery.id = 2 • Entry point for a given Aggregate type • Manage it’s children • Forward message to the right Aggregate base on id • Stop children according to configured passivation strategy
  71. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Persistence - Snapshots

    E1 State of Aggregate Event Store E2 E3 S2 S3 snapshot
  72. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Persistence - Snapshots

    E1 State of Aggregate Event Store E2 E3 S3 snapshot E4 S4
  73. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Persistence - Snapshots

    E1 State of Aggregate Event Store E2 E3 E4 Crashed!!
  74. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Persistence - Snapshots

    E1 State of Aggregate Event Store E2 E3 E4 snapshot S3 On restart!
  75. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka Persistence - Snapshots

    E1 State of Aggregate Event Store E2 E3 E4 snapshot S3 S4 On restart!
  76. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Coding the C in

    CQRS • A Protocol - message (cmd & evt) for a given Aggregate • A case class extending the Aggregate trait • A Behavior • Behavior DSL - convenient way for coding behavior base on commands and events
  77. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka a1 a3 a2 b1

    b2 b3 domain = a1, a2 and a3 a4 PartialFunction[A, B]
  78. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Basic Operations - Revisited

    trait Behavior { } def validate(cmd: DomainCommand): Future[DomainEvent] def validate(cmd: DomainCommand, agg: Aggregate): Future[Seq[DomainEvent]] def applyEvent(evt: DomainEvent): Aggregate def applyEvent(evt: DomainEvent, agg: Aggregate): Aggregate def validate(cmd: DomainCommand): DomainEvent def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent
  79. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Lifting… // behavior def

    validate(cmd: DomainCommand): Future[DomainEvent] def validate(cmd: DomainCommand, agg: Aggregate): Future[Seq[DomainEvent]]
  80. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Lifting… // behavior def

    validate(cmd: DomainCommand): Future[DomainEvent] def validate(cmd: DomainCommand, agg: Aggregate): Future[Seq[DomainEvent]] // on-create def validate(cmd: DomainCommand): DomainEvent // post-create def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent def validate(cmd: DomainCommand, agg: Aggregate): Future[DomainEvent] def validate(cmd: DomainCommand, agg: Aggregate): Seq[DomainEvent]
  81. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Lifting… // behavior def

    validate(cmd: DomainCommand): Future[DomainEvent] def validate(cmd: DomainCommand, agg: Aggregate): Future[Seq[DomainEvent]] // on-create def validate(cmd: DomainCommand): DomainEvent // post-create def validate(cmd: DomainCommand, agg: Aggregate): DomainEvent def validate(cmd: DomainCommand, agg: Aggregate): Future[DomainEvent] def validate(cmd: DomainCommand, agg: Aggregate): Seq[DomainEvent]
  82. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Coding the Q in

    CQRS • Akka Persistence Query (experimental in Akka 2.4.0) • Generate Read Model (Views) from Events • Akka Streams Source with selected Events • ProjectionActor to consume the source and save the pointer (offset) to the last processed event
  83. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E1 E2 E3 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  84. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E1 E2 E3 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  85. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E1 E2 E3 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  86. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E2 E3 lastProcessedEvent = 1 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  87. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E2 E3 lastProcessedEvent = 1 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  88. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E2 E3 lastProcessedEvent = 1 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  89. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E3 lastProcessedEvent = 2 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  90. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E3 lastProcessedEvent = 2 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  91. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection E3 lastProcessedEvent = 2 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  92. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka ProjectionActor E1 E2 E3

    Event Stream Projection lastProcessedEvent = 3 • Subscriber to an Event Stream
 via Akka Persistence Query • Forward each event to a Projection definition • Projection is responsible for generating and updating the Read Model • ProjectionActor does NOT accept new events while Projection is busy
  93. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka for CQRS /

    Event Sourcing • Given an Aggregate, its Protocol and its Behavior…
  94. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka for CQRS /

    Event Sourcing • Given an Aggregate, its Protocol and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate
  95. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka for CQRS /

    Event Sourcing • Given an Aggregate, its Protocol and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence
  96. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka for CQRS /

    Event Sourcing • Given an Aggregate, its Protocol and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence • One Actor per Aggregate Id
  97. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka for CQRS /

    Event Sourcing • Given an Aggregate, its Protocol and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence • One Actor per Aggregate Id • Actor MUST handle one single Command at time (become/unbecome)
  98. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka for CQRS /

    Event Sourcing • Given an Aggregate, its Protocol and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence • One Actor per Aggregate Id • Actor MUST handle one single Command at time (become/unbecome) • Guarantees consistency without Optimistic Locking
  99. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Akka for CQRS /

    Event Sourcing • Given an Aggregate, its Protocol and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence • One Actor per Aggregate Id • Actor MUST handle one single Command at time (become/unbecome) • Guarantees consistency without Optimistic Locking • Projections backed by Akka Persistence Query
  100. @renatocaval #Devoxx #funcqrs #scala #cqrs #akka Questions? http://bit.ly/fun-cqrs 
 (GitHub)

    @renatocaval
 [email protected]
 www.strongtyped.io github.com/rcavalcanti
 github.com/strongtyped