Aggregates / Commands / Events in Scala • Type members: type projections • Partial Functions, lifting and function composition • Akka and asynchrounous programming • Akka Persistence and Event Sourcing
• It has a root and zero or more entities and value objects underneath. • Responsible for the consistency of underneath objects. • You can only modify one Aggregate per transaction.
of two objects where there was previously only one" - Greg Young • Write Model receives Commands and produces Events. • Read Models are created from Events.
of two objects where there was previously only one" - Greg Young • Write Model receives Commands and produces Events. • Read Models are created from Events. • Events can be stored and replayed.
one Aggregate per transaction • Write Model receives Commands and produces Events • Read Models are created from Events • Events can be stored and replayed (when Event Sourcing) • Eventual Consistency (if async Write/Read models)
validate a Command and produces an Event def validate(cmd:Cmd): Event // apply creational Event and produces a new Aggregate def applyEvent(evt:Event): Aggregate // validate a Command against current Aggregate state def validate(agg:Aggregate, cmd:Cmd): Seq[Event] // apply event on current Aggregate def applyEvent(agg:Aggregate, evt:Event): Aggregate
validate a Command and produces an Event def validate(cmd:Cmd): Event // apply creational Event and produces a new Aggregate def applyEvent(evt:Event): Aggregate // validate a Command against current Aggregate state def validate(agg:Aggregate, cmd:Cmd): Seq[Event] // apply event on current Aggregate def applyEvent(agg:Aggregate, evt:Event): Aggregate
validate a Command and produces an Event def validate(cmd:Cmd): Event // apply creational Event and produces a new Aggregate def applyEvent(evt:Event): Aggregate // validate a Command against current Aggregate state def validate(agg:Aggregate, cmd:Cmd): Seq[Event] // apply event on current Aggregate def applyEvent(agg:Aggregate, evt:Event): Aggregate
validate a Command and produces an Event def validate(cmd:Cmd): Event // apply creational Event and produces a new Aggregate def applyEvent(evt:Event): Aggregate // validate a Command against current Aggregate state def validate(agg:Aggregate, cmd:Cmd): Seq[Event] // apply event on current Aggregate def applyEvent(agg:Aggregate, evt:Event): Aggregate
is validated def validate(cmd:Cmd): Event // CreateOrder => OrderCreated // apply creational Event and produces a new Aggregate def applyEvent(evt:Event): Aggregate // OrderCreated => Order
apply event on current Aggregate def applyEvent(agg:Aggregate, evt:Event): Aggregate // (Order, ProductAdded) => Order // validate a Command against current Aggregate state def validate(agg:Aggregate, cmd:Cmd): Seq[Event] // (Order, AddProduct) => ProductAdded
S2 // validate a Command against current Aggregate state def validate(agg:Aggregate, cmd:Cmd): Seq[Event] // (Order, ExecuteOrder) => OrderExecute // apply event on current Aggregate def applyEvent(agg:Aggregate, evt:Event): Aggregate // (Order, OrderExecuted) => Order
S2 // validate a Command against current Aggregate state def validate(agg:Aggregate, cmd:Cmd): Seq[Event] // (Order, ExecuteOrder) => OrderExecute E3 Invalid Command! Order is already executed!
it soon! E0 State of Aggregate Event Store E1 E2 S0 S1 S2 // validate a Command against current Aggregate state def validate(agg:Aggregate, cmd:Cmd): Seq[Event] // (Order, ExecuteOrder) => OrderExecute E3 Invalid Command! Order is already executed!
type Id <: AggregateID type Protocol <: ProtocolDef def id: Id } case class ProductNumber(value: String) extends AggregateID case class Product(name: String, description: String, price: Double, id: ProductNumber) extends Aggregate { type Id = ProductNumber type Protocol = ProductProtocol.type }
case any => Future.failed(new IllegalArgumentException(s"Can NOT do anything useful with $any")) } PartialFunction[A, Future[B]] a1 a3 a2 Success(b1) Success(b2) Success(b3) domain = a1, a2 and a3 a4 Failure(e)
produce (Event, Future[Event], Seq[Event] and Future[Seq[Event]]) • lift them all to Future[Event] or Future[Seq[Event]] • fallback PartialFunction for unknown Commands returning a failed Future
produce (Event, Future[Event], Seq[Event] and Future[Seq[Event]]) • lift them all to Future[Event] or Future[Seq[Event]] • fallback PartialFunction for unknown Commands returning a failed Future • compose them all with ‘orElse’ (fallback as last)
produce (Event, Future[Event], Seq[Event] and Future[Seq[Event]]) • lift them all to Future[Event] or Future[Seq[Event]] • fallback PartialFunction for unknown Commands returning a failed Future • compose them all with ‘orElse’ (fallback as last) • Since Future has two possible outcomes (Success and Failure) we can build the total functions we need
and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence
and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence • One Actor per Aggregate Id
and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence • One Actor per Aggregate Id • Actor MUST handle one single Command at time (become/unbecome)
and its Behavior… • …we can have an AggregateActor that understands the life-cycle of an Aggregate • Events are stored and replayed via Akka Persistence • One Actor per Aggregate Id • Actor MUST handle one single Command at time (become/unbecome) • Guarantees consistency without Optimistic Locking