Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Akka Persistence Typed

Akka Persistence Typed

Although type-safety is the coolest thing we are getting from Akka Typed, the new persistence API is more than just types.

There are also a few API improvements that will help you write safer persistent actors.

Avatar for Renato Cavalcanti

Renato Cavalcanti

September 21, 2019
Tweet

More Decks by Renato Cavalcanti

Other Decks in Programming

Transcript

  1. History • Started as an Akka Extension called Eventsourced by

    Martin Krasser • Brought into Akka as Akka Persistence in Akka 2.4.0 • Akka Persistence Query in Akka 2.5.0 • Akka Persistence Typed in Akka 2.6.0
  2. Akka Typed - protocol When defining an Actor, we start

    by defining it's protocol sealed trait Message final case class SayHello(name: String) extends Message final case class ChangeGreeting(greet: String) extends Message
  3. Akka Typed - behavior In Akka Typed, an Actor is

    just a Behavior function def behavior(greeting: String) Behavior[Message] = Behaviors.receiveMessage { case SayHello(name) println(s"$greeting $name!") Behaviors.same case ChangeGreeting(greet) behavior(greet) }
  4. Akka Typed - ask pattern There is no sender(). If

    you need to reply to an ask, your incoming message must have an ActorRef[R] that you can use to reply to. final case class Hello(msg: String) final case class SayHello(name: String, replyTo: ActorRef[Hello]) def behavior(greeting: String) Behavior[SayHello] = Behaviors.receiveMessage { case SayHello(name, replyTo) replyTo ! Hello(s"$greeting $name!") Behaviors.same }
  5. Akka Typed final case class Hello(msg: String) final case class

    SayHello(name: String, replyTo: ActorRef[Hello]) def behavior(greeting: String) Behavior[SayHello] = Behaviors.receiveMessage { case SayHello(name, replyTo) replyTo ! Hello(s"$greeting $name!") Behaviors.same } val greeter: ActorSystem[SayHello] = ActorSystem(behavior("Hello"), "HelloAkka") val res: Future[Hello] = greeter.ask(replyTo SayHello("Akka Typed", replyTo))
  6. Akka Persistence Typed - Highlights • Protocol is defined in

    terms of Command, Event and State • EventSourcedBehavior instead of Behavior • Tagging function • Better controlled snapshotting (number of events and/or predicate) • Enforced Replies • Old plugins are still compatible • Akka Persistence Query untouched, already typed and based on Akka Streams
  7. Commands, Events and State sealed trait AccountCommand final case class

    Deposit(amount: Double) extends AccountCommand final case class Withdraw(amount: Double) extends AccountCommand case class GetBalance(replyTo: ActorRef[Balance]) extends AccountCommand sealed trait AccountEvent final case class Deposited(amount: Double) extends AccountEvent final case class Withdrawn(amount: Double) extends AccountEvent case class Account(balance: Double)
  8. Command Handler (State, Command) Effect case class Account(balance: Double) {

    def applyCommand(cmd: AccountCommand) Effect[AccountEvent, Account] = cmd match { case Deposit(amount) Effect.persist(Deposited(amount)) other cases intentionally omitted } }
  9. Event Handler (State, Event) State case class Account(balance: Double) {

    def applyEvent(evt: AccountEvent) Account = { evt match { case Deposited(amount) copy(balance = balance + amount) case Withdrawn(amount) copy(balance = balance - amount) } } }
  10. EventSourcedBehavior def behavior(id: String) EventSourcedBehavior[AccountCommand, AccountEvent, Account] = { EventSourcedBehavior[AccountCommand,

    AccountEvent, Account]( persistenceId = PersistenceId("Account", id), emptyState = Account(balance = 0), command handler: (State, Command) Effect commandHandler = (account, cmd) account.applyCommand(cmd), event handler: (State, Event) State eventHandler = (account, evt) account.applyEvent(evt) ) }
  11. Tagging def behavior(id: String) EventSourcedBehavior[AccountCommand, AccountEvent, Account] = { EventSourcedBehavior[AccountCommand,

    AccountEvent, Account]( persistenceId = PersistenceId("Account", id), emptyState = Account(balance = 0), commandHandler = (account, cmd) account.applyCommand(cmd), eventHandler = (account, evt) account.applyEvent(evt) ) .withTagger { tagging events are useful for querying by tag case evt: Deposited Set("account", "deposited") case evt: Withdrawn Set("account", "withdrawn") } }
  12. Snapshots def behavior(id: String) EventSourcedBehavior[AccountCommand, AccountEvent, Account] = { EventSourcedBehavior[AccountCommand,

    AccountEvent, Account]( persistenceId = PersistenceId("Account", id), emptyState = Account(balance = 0), commandHandler = (account, cmd) account.applyCommand(cmd), eventHandler = (account, evt) account.applyEvent(evt) ) save a snapshot on every 100 events and keep max 2 .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2)) save a snapshot when a predicate holds .snapshotWhen { case (account, evt: Withdrawn, seqNr) true case _ false } }
  13. Cluster Sharding and Persistence • Manage state over different JVMs

    • Knows where is your instance • Honours single writer principle for Persistence • Entity Passivation • Rolling updates without downtime • Commands must be serializable
  14. Cluster Sharding - EntityContext object Account { val typeKey =

    EntityTypeKey[AccountCommand]("Account") def behavior(entityContext: EntityContext[AccountCommand]) EventSourcedBehavior[AccountCommand, AccountEvent, Account] = { EventSourcedBehavior[AccountCommand, AccountEvent, Account]( persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId), emptyState = Account(balance = 0), commandHandler = (account, cmd) account.applyCommand(cmd), eventHandler = (account, evt) account.applyEvent(evt) ) }
  15. Takeaways • Declarative API • Developer can concentrate on modelling

    • Types everywhere • And functions • Any Unit is part of the past • Event Sourcing opens the door for decoupling your services • High throughput with append only journals • Scalability with Cluster Sharding • Rolling updates when clustered • Distributed event consuming (included in Lagom, will be extracted)