Upgrade to Pro — share decks privately, control downloads, hide ads and more …

GoとアクターモデルでES+CQRSを実践! / proto_actor_es_cqrs

GoとアクターモデルでES+CQRSを実践! / proto_actor_es_cqrs

Proto Actorを使ってES+CQRSを理解してみよう

yuuki takezawa

July 24, 2024
Tweet

More Decks by yuuki takezawa

Other Decks in Technology

Transcript

  1. Pro fi le • ஛ᖒ ༗و a.k.a ytake • ઍגࣜձࣾ

    CTO / ΄͔ٕज़ސ໰ • Go / Scala • ΞΫλʔϞσϧେ޷͖
  2. package persistence import ( "google.golang.org/protobuf/proto" ) // Provider is the

    abstraction used for persistence type Provider interface { GetState() ProviderState } // ProviderState is an object containing the implementation for the provider type ProviderState interface { SnapshotStore EventStore Restart() GetSnapshotInterval() int } type SnapshotStore interface { GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool) PersistSnapshot(actorName string, snapshotIndex int, snapshot proto.Message) DeleteSnapshots(actorName string, inclusiveToIndex int) } type EventStore interface { GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{})) PersistEvent(actorName string, eventIndex int, event proto.Message) DeleteEvents(actorName string, inclusiveToIndex int) }
  3. package persistence import ( "google.golang.org/protobuf/proto" ) // Provider is the

    abstraction used for persistence type Provider interface { GetState() ProviderState } // ProviderState is an object containing the implementation for the provider type ProviderState interface { SnapshotStore EventStore Restart() GetSnapshotInterval() int } type SnapshotStore interface { GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool) PersistSnapshot(actorName string, snapshotIndex int, snapshot proto.Message) DeleteSnapshots(actorName string, inclusiveToIndex int) } type EventStore interface { GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{})) PersistEvent(actorName string, eventIndex int, event proto.Message) DeleteEvents(actorName string, inclusiveToIndex int) } ঢ়ଶͷӬଓԽʹͳʹΛ࢖͏͔
  4. package persistence import ( "google.golang.org/protobuf/proto" ) // Provider is the

    abstraction used for persistence type Provider interface { GetState() ProviderState } // ProviderState is an object containing the implementation for the provider type ProviderState interface { SnapshotStore EventStore Restart() GetSnapshotInterval() int } type SnapshotStore interface { GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool) PersistSnapshot(actorName string, snapshotIndex int, snapshot proto.Message) DeleteSnapshots(actorName string, inclusiveToIndex int) } type EventStore interface { GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{})) PersistEvent(actorName string, eventIndex int, event proto.Message) DeleteEvents(actorName string, inclusiveToIndex int) } ΠϕϯτετΞʢδϟʔφϧʣ޲͚ ӬଓԽͷํ๏ʢΫΤϦʣ΍ ෮ݩ࣌ͷΫΤϦͳͲ
  5. package persistence import ( "google.golang.org/protobuf/proto" ) // Provider is the

    abstraction used for persistence type Provider interface { GetState() ProviderState } // ProviderState is an object containing the implementation for the provider type ProviderState interface { SnapshotStore EventStore Restart() GetSnapshotInterval() int } type SnapshotStore interface { GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool) PersistSnapshot(actorName string, snapshotIndex int, snapshot proto.Message) DeleteSnapshots(actorName string, inclusiveToIndex int) } type EventStore interface { GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{})) PersistEvent(actorName string, eventIndex int, event proto.Message) DeleteEvents(actorName string, inclusiveToIndex int) } εφοϓγϣοτ޲͚ ӬଓԽͷํ๏ʢΫΤϦʣ΍ ෮ݩ࣌ͷΫΤϦͳͲ
  6. package acme import ( "errors" "fmt" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/persistence" // ུ

    "google.golang.org/protobuf/proto" ) // Cart is an actor to create cart type Cart struct { persistence.Mixin stream *actor.PID state *cart.Items }
  7. package acme import ( "errors" "fmt" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/persistence" // ུ

    "google.golang.org/protobuf/proto" ) // Cart is an actor to create cart type Cart struct { persistence.Mixin stream *actor.PID state *cart.Items } ΞΫλʔͷӬଓԽΛར༻͢Δ৔߹ʹ ඞਢ
  8. package acme import ( "errors" "fmt" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/persistence" // ུ

    "google.golang.org/protobuf/proto" ) // Cart is an actor to create cart type Cart struct { persistence.Mixin stream *actor.PID state *cart.Items } $BSUΞΫλʔͷঢ়ଶ
  9. package acme import ( "errors" "fmt" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/persistence" // ུ

    "google.golang.org/protobuf/proto" ) // Cart is an actor to create cart type Cart struct { persistence.Mixin stream *actor.PID state *cart.Items } ϦʔυϞσϧߋ৽ΞΫλʔͳͲ΁
  10. func (u *Cart) Receive(context actor.Context) { defer context.Poison(context.Self()) switch msg

    := context.Message().(type) { case *persistence.RequestSnapshot: u.PersistSnapshot(u.state) case *persistence.ReplayComplete: // ϦϓϨΠ͕׬ྃͨ͠Β಺෦ঢ়ଶΛมߋ͢Δ context.Logger().Info( fmt.Sprintf("replay completed, internal state changed to '%v'", u.state)) case *command.AddItem: if u.IsStateExists(msg.Name) { context.Send(u.stream, &message.AddItemError{Message: "item already exists"}) return } // ུ u.persistence(context, ev) // xxxੜ੒ΠϕϯτΛΠϕϯτετϦʔϜ΁ context.Send(u.stream, ev) case *event.ItemAdded: if msg.String() != "" { // event ͕ϦϓϨΠ͞Εͨ৔߹͸ঢ়ଶΛߋ৽͢Δ u.state = msg u.sendToReadModelUpdater(context, msg) } } }
  11. func (u *Cart) Receive(context actor.Context) { defer context.Poison(context.Self()) switch msg

    := context.Message().(type) { case *persistence.RequestSnapshot: u.PersistSnapshot(u.state) case *persistence.ReplayComplete: // ϦϓϨΠ͕׬ྃͨ͠Β಺෦ঢ়ଶΛมߋ͢Δ context.Logger().Info( fmt.Sprintf("replay completed, internal state changed to '%v'", u.state)) case *command.AddItem: if u.IsStateExists(msg.Email) { context.Send(u.stream, &message.AddItemError{Message: "item already exists"}) return } // ུ u.persistence(context, ev) // xxxੜ੒ΠϕϯτΛΠϕϯτετϦʔϜ΁ context.Send(u.stream, ev) case *event.ItemAdded: if msg.String() != "" { // event ͕ϦϓϨΠ͞Εͨ৔߹͸ঢ়ଶΛߋ৽͢Δ u.state = msg u.sendToReadModelUpdater(context, msg) } } } ίϚϯυΛडऔɺ ࣗΞΫλʔͷঢ়ଶΛมߋ ঢ়ଶมߋΛΠϕϯτͱͯ͠ӬଓԽ
  12. func (u *Cart) Receive(context actor.Context) { defer context.Poison(context.Self()) switch msg

    := context.Message().(type) { case *persistence.RequestSnapshot: u.PersistSnapshot(u.state) case *persistence.ReplayComplete: // ϦϓϨΠ͕׬ྃͨ͠Β಺෦ঢ়ଶΛมߋ͢Δ context.Logger().Info( fmt.Sprintf("replay completed, internal state changed to '%v'", u.state)) case *command.AddItem: if u.IsStateExists(msg.Name) { context.Send(u.stream, &message.AddItemError{Message: "item already exists"}) return } // ུ u.persistence(context, ev) // xxxੜ੒ΠϕϯτΛΠϕϯτετϦʔϜ΁ context.Send(u.stream, ev) case *event.ItemAdded: if msg.String() != "" { // event ͕ϦϓϨΠ͞Εͨ৔߹͸ঢ়ଶΛߋ৽͢Δ u.state = msg u.sendToReadModelUpdater(context, msg) } } } ͢Ͱʹঢ়ଶ͕͋Δʢੜ੒ࡁΈͳͲʣ৔߹͸ ࣗ਎Λมߋͤͣʹϝοηʔδฦ৴
  13. func (u *Cart) Receive(context actor.Context) { defer context.Poison(context.Self()) switch msg

    := context.Message().(type) { case *persistence.RequestSnapshot: u.PersistSnapshot(u.state) case *persistence.ReplayComplete: // ϦϓϨΠ͕׬ྃͨ͠Β಺෦ঢ়ଶΛมߋ͢Δ context.Logger().Info( fmt.Sprintf("replay completed, internal state changed to '%v'", u.state)) case *command.AddItem: if u.IsStateExists(msg.Name) { context.Send(u.stream, &message.AddItemError{Message: "item already exists"}) return } // ུ u.persistence(context, ev) // xxxੜ੒ΠϕϯτΛΠϕϯτετϦʔϜ΁ context.Send(u.stream, ev) case *event.ItemAdded: if msg.String() != "" { // event ͕ϦϓϨΠ͞Εͨ৔߹͸ঢ়ଶΛߋ৽͢Δ u.state = msg u.sendToReadModelUpdater(context, msg) } } } ϦʔυϞσϧߋ৽ϋϯυϥʢΞΫλʔʣ΁
  14. func (u *Cart) Receive(context actor.Context) { defer context.Poison(context.Self()) switch msg

    := context.Message().(type) { case *persistence.RequestSnapshot: u.PersistSnapshot(u.state) case *persistence.ReplayComplete: // ϦϓϨΠ͕׬ྃͨ͠Β಺෦ঢ়ଶΛมߋ͢Δ context.Logger().Info( fmt.Sprintf("replay completed, internal state changed to '%v'", u.state)) case *command.AddItem: if u.IsStateExists(msg.Name) { context.Send(u.stream, &message.AddItemError{Message: "item already exists"}) return } // ུ u.persistence(context, ev) // xxxੜ੒ΠϕϯτΛΠϕϯτετϦʔϜ΁ context.Send(u.stream, ev) case *event.ItemAdded: if msg.String() != "" { // event ͕ϦϓϨΠ͞Εͨ৔߹͸ঢ়ଶΛߋ৽͢Δ u.state = msg u.sendToReadModelUpdater(context, msg) } } } ॲཧऴྃޙʹΞΫλʔΛ࡟আ
  15. func (u *Cart) Receive(context actor.Context) { defer context.Poison(context.Self()) switch msg

    := context.Message().(type) { case *persistence.RequestSnapshot: u.PersistSnapshot(u.state) case *persistence.ReplayComplete: // ϦϓϨΠ͕׬ྃͨ͠Β಺෦ঢ়ଶΛมߋ͢Δ context.Logger().Info( fmt.Sprintf("replay completed, internal state changed to '%v'", u.state)) case *command.AddItem: if u.IsStateExists(msg.Name) { context.Send(u.stream, &message.AddItemError{Message: "item already exists"}) return } // ུ u.persistence(context, ev) // xxxੜ੒ΠϕϯτΛΠϕϯτετϦʔϜ΁ context.Send(u.stream, ev) case *event.ItemAdded: if msg.String() != "" { // event ͕ϦϓϨΠ͞Εͨ৔߹͸ঢ়ଶΛߋ৽͢Δ u.state = msg u.sendToReadModelUpdater(context, msg) } } } ΞΫλʔੜ੒ʢ࠶ੜ੒ɺϦελʔτͳͲʣ࣌ աڈͷঢ়ଶ͕͋Ε͹ɺ ࠷ॳͷΠϕϯτ͔Βड৴͠ɺঢ়ଶΛ෮ݩ
  16. // ReadModelUpdate is actor to update read model type ReadModelUpdate

    struct { query mysql.RegistrationItemExecutor } // NewReadModelUpdate is constructor for ReadModelUpdate func NewReadModelUpdate(query mysql.RegistrationItemExecutor) actor.Actor { return &ReadModelUpdate{ query: query, } } // Receive is sent messages to be processed from the mailbox associated with the instance of the actor func (u *ReadModelUpdate) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *event.ItemAdded: // ΠϕϯτΛಡΈࠐΜͰɺRead ModelΛߋ৽͢Δ // ͜͜Ͱ͸Read ModelʹΞΠςϜ͕ଘࡏ͠ͳ͍৔߹ʹ࡞੒͢Δ෩ err := u.query.AddItemIfNotExists(context.Background(), mysql.AddItemParams{ Name: msg.ItemName, ID: msg.ItemID, }) if err != nil { // Τϥʔ͕ൃੜͨ͠৔߹͸ϩάΛग़ྗ͢Δ ctx.Logger().Error(err.Error()) return } } }
  17. // ReadModelUpdate is actor to update read model type ReadModelUpdate

    struct { query mysql.RegistrationItemExecutor } // NewReadModelUpdate is constructor for ReadModelUpdate func NewReadModelUpdate(query mysql.RegistrationItemExecutor) actor.Actor { return &ReadModelUpdate{ query: query, } } // Receive is sent messages to be processed from the mailbox associated with the instance of the actor func (u *ReadModelUpdate) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *event.ItemAdded: // ΠϕϯτΛಡΈࠐΜͰɺRead ModelΛߋ৽͢Δ // ͜͜Ͱ͸Read ModelʹΞΠςϜ͕ଘࡏ͠ͳ͍৔߹ʹ࡞੒͢Δ෩ err := u.query.AddItemIfNotExists(context.Background(), mysql.AddItemParams{ Name: msg.ItemName, ID: msg.ItemID, }) if err != nil { // Τϥʔ͕ൃੜͨ͠৔߹͸ϩάΛग़ྗ͢Δ ctx.Logger().Error(err.Error()) return } } } 2VFSZͰར༻͍ͨ͠ςʔϒϧͳͲʹ൓ө
  18. func (a *RestAPI) Receive(ctx actor.Context) { switch msg := ctx.Message().(type)

    { case *actor.Started: a.rmu = ctx.Spawn(actor.PropsFromProducer(func() actor.Actor { return registration.NewItemModelUpdate(a.db) })) case *command.AddItem: ref, err := ctx.SpawnNamed( actor.PropsFromProducer(func() actor.Actor { return registration.NewItem(msg.Stream, a.rmu) }, actor.WithReceiverMiddleware(persistence.Using(a.provider))), "item-"+msg.Name) if errors.Is(err, actor.ErrNameExists) { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("item %s already exists", msg.Email)}) return } if err != nil { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("failed error %s", err.Error())}) return } ctx.Send(ref, msg) } }
  19. func (a *RestAPI) Receive(ctx actor.Context) { switch msg := ctx.Message().(type)

    { case *actor.Started: a.rmu = ctx.Spawn(actor.PropsFromProducer(func() actor.Actor { return registration.NewItemModelUpdate(a.db) })) case *command.AddItem: ref, err := ctx.SpawnNamed( actor.PropsFromProducer(func() actor.Actor { return registration.NewItem(msg.Stream, a.rmu) }, actor.WithReceiverMiddleware(persistence.Using(a.provider))), "item-"+msg.Name) if errors.Is(err, actor.ErrNameExists) { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("item %s already exists", msg.Email)}) return } if err != nil { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("failed error %s", err.Error())}) return } ctx.Send(ref, msg) } } ϦʔυϞσϧߋ৽ΞΫλʔ
  20. func (a *RestAPI) Receive(ctx actor.Context) { switch msg := ctx.Message().(type)

    { case *actor.Started: a.rmu = ctx.Spawn(actor.PropsFromProducer(func() actor.Actor { return registration.NewItemModelUpdate(a.db) })) case *command.AddItem: ref, err := ctx.SpawnNamed( actor.PropsFromProducer(func() actor.Actor { return registration.NewItem(msg.Stream, a.rmu) }, actor.WithReceiverMiddleware(persistence.Using(a.provider))), "item-"+msg.Name) if errors.Is(err, actor.ErrNameExists) { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("item %s already exists", msg.Email)}) return } if err != nil { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("failed error %s", err.Error())}) return } ctx.Send(ref, msg) } } ίϚϯυΛड͚෇͚ΔΞΫλʔ
  21. func (a *RestAPI) Receive(ctx actor.Context) { switch msg := ctx.Message().(type)

    { case *actor.Started: a.rmu = ctx.Spawn(actor.PropsFromProducer(func() actor.Actor { return registration.NewItemModelUpdate(a.db) })) case *command.AddItem: ref, err := ctx.SpawnNamed( actor.PropsFromProducer(func() actor.Actor { return registration.NewItem(msg.Stream, a.rmu) }, actor.WithReceiverMiddleware(persistence.Using(a.provider))), "item-"+msg.Name) if errors.Is(err, actor.ErrNameExists) { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("item %s already exists", msg.Email)}) return } if err != nil { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("failed error %s", err.Error())}) return } ctx.Send(ref, msg) } } ΞΫλʔͷӬଓԽࢦࣔ 1SPUP"DUPSͰ͸ϛυϧ΢ΣΞͱͯ͠දݱ
  22. func (a *RestAPI) Receive(ctx actor.Context) { switch msg := ctx.Message().(type)

    { case *actor.Started: a.rmu = ctx.Spawn(actor.PropsFromProducer(func() actor.Actor { return registration.NewItemModelUpdate(a.db) })) case *command.AddItem: ref, err := ctx.SpawnNamed( actor.PropsFromProducer(func() actor.Actor { return registration.NewItem(msg.Stream, a.rmu) }, actor.WithReceiverMiddleware(persistence.Using(a.provider))), "item-"+msg.Name) if errors.Is(err, actor.ErrNameExists) { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("item %s already exists", msg.Email)}) return } if err != nil { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("failed error %s", err.Error())}) return } ctx.Send(ref, msg) } } ಠཱͨ͠ΞΫλʔʹࣝผՄೳͳ໊લΛ༩͑ɺ ෮ݩ࣌ʹಠཱͯ͠෮ݩ͞ΕΔ
  23. func (a *RestAPI) Receive(ctx actor.Context) { switch msg := ctx.Message().(type)

    { case *actor.Started: a.rmu = ctx.Spawn(actor.PropsFromProducer(func() actor.Actor { return registration.NewItemModelUpdate(a.db) })) case *command.AddItem: ref, err := ctx.SpawnNamed( actor.PropsFromProducer(func() actor.Actor { return registration.NewItem(msg.Stream, a.rmu) }, actor.WithReceiverMiddleware(persistence.Using(a.provider))), "item-"+msg.Name) if errors.Is(err, actor.ErrNameExists) { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("item %s already exists", msg.Email)}) return } if err != nil { ctx.Send(msg.Stream, &message.ItemCreateError{Message: fmt.Sprintf("failed error %s", err.Error())}) return } ctx.Send(ref, msg) } } ΞΫλʔ͕ੜ੒Ͱ͖ͳ͔ͬͨ৔߹ͷΤϥʔ
  24. func (u *ItemRegistration) Handle(c echo.Context) error { ui := new(itemInput)

    if err := c.Bind(ui); err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) } if err := c.Validate(ui); err != nil { return err } go func() { u.system.Root.Send(u.ref, &command.AddItem{ // ུ Name: ui.Name, Stream: u.stream.PID(), }) }() res := <-u.stream.C() if res.IsSuccess() { return c.JSON(http.StatusOK, res) } return c.JSON(http.StatusBadRequest, res) }
  25. func (u *ItemRegistration) Handle(c echo.Context) error { ui := new(itemInput)

    if err := c.Bind(ui); err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) } if err := c.Validate(ui); err != nil { return err } go func() { u.system.Root.Send(u.ref, &command.AddItem{ // ུ Name: ui.Name, Stream: u.stream.PID(), }) }() res := <-u.stream.C() if res.IsSuccess() { return c.JSON(http.StatusOK, res) } return c.JSON(http.StatusBadRequest, res) } 1SPUP"DUPS΁ૹ৴
  26. func (u *ItemRegistration) Handle(c echo.Context) error { ui := new(itemInput)

    if err := c.Bind(ui); err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) } if err := c.Validate(ui); err != nil { return err } go func() { u.system.Root.Send(u.ref, &command.AddItem{ // ུ Name: ui.Name, Stream: u.stream.PID(), }) }() res := <-u.stream.C() if res.IsSuccess() { return c.JSON(http.StatusOK, res) } return c.JSON(http.StatusBadRequest, res) } ΄͍͠ϝοηʔδΛετϦʔϜ͔ΒऔΓग़͢ ଞʹ΋'VUVSFΛ࢖͏ͳͲ΋͋Γ·͢