Upgrade to Pro — share decks privately, control downloads, hide ads and more …

マイクロサービスでイベントソーシングを利用したくなる理由とその動作原理を学ぼう / Why Event Sourcing

nrs
April 22, 2022

マイクロサービスでイベントソーシングを利用したくなる理由とその動作原理を学ぼう / Why Event Sourcing

イベントソーシングがなぜ必要になるのかの解説と、その動作原理を解説する際に利用した資料です。
この資料を使ったトークが YouTube にアップロードされています。
トークURL: https://youtu.be/Jtcp9ry8ZcE

◆ URL
トークURL: https://youtu.be/Jtcp9ry8ZcE
チャンネル登録: https://www.youtube.com/c/narusemi?sub_confirmation=1
Twitter: https://twitter.com/nrslib
SampleCode: https://github.com/nrslib/microservice-with-event-sourcing-sample-kotlin

nrs

April 22, 2022
Tweet

More Decks by nrs

Other Decks in Programming

Transcript

  1. @Transactional public void order(String itemId) { var itemDao = new

    ItemDao(); var item = itemDao.findById(itemId); ... var accountDao = new AccountDao(); var account = accountDao.find(Session.Id); var paymentDao = new PaymentDao(); var paymentInfo = paymentDao.find(account.id); paymentInfo.validate(); creditService.approval(paymentInfo.cardInfo, order.total); ... orderDao.save(order); }
  2. @Transactional public void order(String itemId) { var itemDao = new

    ItemDao(); var item = itemDao.findById(itemId); ... var accountDao = new AccountDao(); var account = accountDao.find(Session.Id); var paymentDao = new PaymentDao(); var paymentInfo = paymentDao.find(account.id); paymentInfo.validate(); creditService.approval(paymentInfo.cardInfo, order.total); ... orderDao.save(order); }
  3. @Transactional public void order(String itemId) { var itemDao = new

    ItemDao(); var item = itemDao.findById(itemId); ... var accountDao = new AccountDao(); var account = accountDao.find(Session.Id); var paymentDao = new PaymentDao(); var paymentInfo = paymentDao.find(account.id); paymentInfo.validate(); creditService.approval(paymentInfo.cardInfo, order.total); ... orderDao.save(order); }
  4. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState ... }
  5. class OrderApplicationService( private val itemRepository: ItemRepository, private val orderRepository: AggregateRepository<Order,

    OrderCommand> ) { @Transactional fun createOrder(accountId: String, itemAndQuantities: List<ItemIdAndQuantity>): String { val items = itemRepository.find(itemAndQuantities.map { ItemId(it.itemId) }) val orderItems = makeOrderItems(itemAndQuantities, items) val command = CreateOrderCommand(UUID.randomUUID().toString(), accountId, orderItems) val result = orderRepository.save(command) return result.idAndVersion.id } ... private fun makeOrderItems(itemAndQuantities: List<ItemIdAndQuantity>, items: List<Item>): OrderItems { val orderItems = itemAndQuantities.map { val item = items.first { item -> item.id.value == it.itemId } OrderItem( it.quantity, item.price ) } return OrderItems(orderItems) } }
  6. class OrderApplicationService( private val itemRepository: ItemRepository, private val orderRepository: AggregateRepository<Order,

    OrderCommand> ) { @Transactional fun createOrder(accountId: String, itemAndQuantities: List<ItemIdAndQuantity>): String { val items = itemRepository.find(itemAndQuantities.map { ItemId(it.itemId) }) val orderItems = makeOrderItems(itemAndQuantities, items) val command = CreateOrderCommand(UUID.randomUUID().toString(), accountId, orderItems) val result = orderRepository.save(command) return result.idAndVersion.id } ... private fun makeOrderItems(itemAndQuantities: List<ItemIdAndQuantity>, items: List<Item>): OrderItems { val orderItems = itemAndQuantities.map { val item = items.first { item -> item.id.value == it.itemId } OrderItem( it.quantity, item.price ) } return OrderItems(orderItems) } }
  7. class OrderApplicationService( private val itemRepository: ItemRepository, private val orderRepository: AggregateRepository<Order,

    OrderCommand> ) { @Transactional fun createOrder(accountId: String, itemAndQuantities: List<ItemIdAndQuantity>): String { val items = itemRepository.find(itemAndQuantities.map { ItemId(it.itemId) }) val orderItems = makeOrderItems(itemAndQuantities, items) val command = CreateOrderCommand(UUID.randomUUID().toString(), accountId, orderItems) val result = orderRepository.save(command) return result.idAndVersion.id } ... private fun makeOrderItems(itemAndQuantities: List<ItemIdAndQuantity>, items: List<Item>): OrderItems { val orderItems = itemAndQuantities.map { val item = items.first { item -> item.id.value == it.itemId } OrderItem( it.quantity, item.price ) } return OrderItems(orderItems) } }
  8. class OrderApplicationService( private val itemRepository: ItemRepository, private val orderRepository: AggregateRepository<Order,

    OrderCommand> ) { @Transactional fun createOrder(accountId: String, itemAndQuantities: List<ItemIdAndQuantity>): String { val items = itemRepository.find(itemAndQuantities.map { ItemId(it.itemId) }) val orderItems = makeOrderItems(itemAndQuantities, items) val command = CreateOrderCommand(UUID.randomUUID().toString(), accountId, orderItems) val result = orderRepository.save(command) return result.idAndVersion.id } ... private fun makeOrderItems(itemAndQuantities: List<ItemIdAndQuantity>, items: List<Item>): OrderItems { val orderItems = itemAndQuantities.map { val item = items.first { item -> item.id.value == it.itemId } OrderItem( it.quantity, item.price ) } return OrderItems(orderItems) } }
  9. class OrderApplicationService( private val itemRepository: ItemRepository, private val orderRepository: AggregateRepository<Order,

    OrderCommand> ) { @Transactional fun createOrder(accountId: String, itemAndQuantities: List<ItemIdAndQuantity>): String { val items = itemRepository.find(itemAndQuantities.map { ItemId(it.itemId) }) val orderItems = makeOrderItems(itemAndQuantities, items) val command = CreateOrderCommand(UUID.randomUUID().toString(), accountId, orderItems) val result = orderRepository.save(command) return result.idAndVersion.id } ... private fun makeOrderItems(itemAndQuantities: List<ItemIdAndQuantity>, items: List<Item>): OrderItems { val orderItems = itemAndQuantities.map { val item = items.first { item -> item.id.value == it.itemId } OrderItem( it.quantity, item.price ) } return OrderItems(orderItems) } }
  10. class OrderApplicationService( private val itemRepository: ItemRepository, private val orderRepository: AggregateRepository<Order,

    OrderCommand> ) { @Transactional fun createOrder(accountId: String, itemAndQuantities: List<ItemIdAndQuantity>): String { val items = itemRepository.find(itemAndQuantities.map { ItemId(it.itemId) }) val orderItems = makeOrderItems(itemAndQuantities, items) val command = CreateOrderCommand(UUID.randomUUID().toString(), accountId, orderItems) val result = orderRepository.save(command) return result.idAndVersion.id } ... private fun makeOrderItems(itemAndQuantities: List<ItemIdAndQuantity>, items: List<Item>): OrderItems { val orderItems = itemAndQuantities.map { val item = items.first { item -> item.id.value == it.itemId } OrderItem( it.quantity, item.price ) } return OrderItems(orderItems) } }
  11. class OrderApplicationService( private val itemRepository: ItemRepository, private val orderRepository: AggregateRepository<Order,

    OrderCommand> ) { @Transactional fun createOrder(accountId: String, itemAndQuantities: List<ItemIdAndQuantity>): String { val items = itemRepository.find(itemAndQuantities.map { ItemId(it.itemId) }) val orderItems = makeOrderItems(itemAndQuantities, items) val command = CreateOrderCommand(UUID.randomUUID().toString(), accountId, orderItems) val result = orderRepository.save(command) return result.idAndVersion.id } ... private fun makeOrderItems(itemAndQuantities: List<ItemIdAndQuantity>, items: List<Item>): OrderItems { val orderItems = itemAndQuantities.map { val item = items.first { item -> item.id.value == it.itemId } OrderItem( it.quantity, item.price ) } return OrderItems(orderItems) } }
  12. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState fun process(command: CreateOrderCommand): List<Event> { val total = command.orderItems.total() val event = OrderCreatedEvent(command.id, command.accountId, total, command.orderItems) return listOf(event) } fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } }
  13. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState fun process(command: CreateOrderCommand): List<Event> { val total = command.orderItems.total() val event = OrderCreatedEvent(command.id, command.accountId, total, command.orderItems) return listOf(event) } fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } }
  14. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState fun process(command: CreateOrderCommand): List<Event> { val total = command.orderItems.total() val event = OrderCreatedEvent(command.id, command.accountId, total, command.orderItems) return listOf(event) } fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } }
  15. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState fun process(command: CreateOrderCommand): List<Event> { val total = command.orderItems.total() val event = OrderCreatedEvent(command.id, command.accountId, total, command.orderItems) return listOf(event) } fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } }
  16. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState fun process(command: CreateOrderCommand): List<Event> { val total = command.orderItems.total() val event = OrderCreatedEvent(command.id, command.accountId, total, command.orderItems) return listOf(event) } fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } }
  17. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState fun process(command: CreateOrderCommand): List<Event> { val total = command.orderItems.total() val event = OrderCreatedEvent(command.id, command.accountId, total, command.orderItems) return listOf(event) } fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } }
  18. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState fun process(command: CreateOrderCommand): List<Event> { val total = command.orderItems.total() val event = OrderCreatedEvent(command.id, command.accountId, total, command.orderItems) return listOf(event) } ... }
  19. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState ... fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } ... }
  20. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { fun save(command: CT): EntityWithIdAndVersion<T> { val aggregate = clazz.getDeclaredConstructor().newInstance() val events = aggregate.processCommand(command) Aggregate.applyEvents(aggregate, events) val serializedEvents = events.map { EventTypeAndData(it.javaClass.name, Gson().toJson(it)) } val entityIdVersionAndEventIds = aggregateStore.save(clazz, serializedEvents) return EntityWithIdAndVersion( entityIdVersionAndEventIds, aggregate ) } ... }
  21. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { fun save(command: CT): EntityWithIdAndVersion<T> { val aggregate = clazz.getDeclaredConstructor().newInstance() val events = aggregate.processCommand(command) Aggregate.applyEvents(aggregate, events) val serializedEvents = events.map { EventTypeAndData(it.javaClass.name, Gson().toJson(it)) } val entityIdVersionAndEventIds = aggregateStore.save(clazz, serializedEvents) return EntityWithIdAndVersion( entityIdVersionAndEventIds, aggregate ) } ... }
  22. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { fun save(command: CT): EntityWithIdAndVersion<T> { val aggregate = clazz.getDeclaredConstructor().newInstance() val events = aggregate.processCommand(command) Aggregate.applyEvents(aggregate, events) val serializedEvents = events.map { EventTypeAndData(it.javaClass.name, Gson().toJson(it)) } val entityIdVersionAndEventIds = aggregateStore.save(clazz, serializedEvents) return EntityWithIdAndVersion( entityIdVersionAndEventIds, aggregate ) } ... }
  23. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { fun save(command: CT): EntityWithIdAndVersion<T> { val aggregate = clazz.getDeclaredConstructor().newInstance() val events = aggregate.processCommand(command) Aggregate.applyEvents(aggregate, events) val serializedEvents = events.map { EventTypeAndData(it.javaClass.name, Gson().toJson(it)) } val entityIdVersionAndEventIds = aggregateStore.save(clazz, serializedEvents) return EntityWithIdAndVersion( entityIdVersionAndEventIds, aggregate ) } ... }
  24. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { fun save(command: CT): EntityWithIdAndVersion<T> { val aggregate = clazz.getDeclaredConstructor().newInstance() val events = aggregate.processCommand(command) Aggregate.applyEvents(aggregate, events) val serializedEvents = events.map { EventTypeAndData(it.javaClass.name, Gson().toJson(it)) } val entityIdVersionAndEventIds = aggregateStore.save(clazz, serializedEvents) return EntityWithIdAndVersion( entityIdVersionAndEventIds, aggregate ) } ... }
  25. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { fun save(command: CT): EntityWithIdAndVersion<T> { val aggregate = clazz.getDeclaredConstructor().newInstance() val events = aggregate.processCommand(command) Aggregate.applyEvents(aggregate, events) val serializedEvents = events.map { EventTypeAndData(it.javaClass.name, Gson().toJson(it)) } val entityIdVersionAndEventIds = aggregateStore.save(clazz, serializedEvents) return EntityWithIdAndVersion( entityIdVersionAndEventIds, aggregate ) } ... }
  26. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState ... fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } fun apply(event: OrderApprovedEvent) { this.orderState = OrderState.APPROVED } fun apply(event: OrderRejectedEvent) { this.orderState = OrderState.APPROVAL_REJECTED } }
  27. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState ... fun apply(event: OrderCreatedEvent) { accountId = event.accountId total = event.total items = event.orderItems orderState = OrderState.APPROVAL_PENDING } fun apply(event: OrderApprovedEvent) { this.orderState = OrderState.APPROVED } fun apply(event: OrderRejectedEvent) { this.orderState = OrderState.APPROVAL_REJECTED } }
  28. class Order : CommandProcessingAggregate<Order, OrderCommand>() { lateinit var accountId: String

    var total: Money = Money.zero() lateinit var items: OrderItems lateinit var orderState: OrderState ... fun process(command: ApproveOrderCommand): List<Event> { return listOf(OrderApprovedEvent()) } }
  29. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  30. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  31. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  32. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  33. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  34. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  35. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  36. enum class DocumentType { Entity, Event, Snapshot } class EventSourcingDocument

    { @Id lateinit var id: String lateinit var type: DocumentType lateinit var entityId: String lateinit var body: String var version: Long = -1 @JsonProperty("_etag") var etag: String = "" }
  37. enum class DocumentType { Entity, Event, Snapshot } class EventSourcingDocument

    { @Id lateinit var id: String lateinit var type: DocumentType lateinit var entityId: String lateinit var body: String var version: Long = -1 @JsonProperty("_etag") var etag: String = "" }
  38. enum class DocumentType { Entity, Event, Snapshot } class EventSourcingDocument

    { @Id lateinit var id: String lateinit var type: DocumentType lateinit var entityId: String lateinit var body: String var version: Long = -1 @JsonProperty("_etag") var etag: String = "" }
  39. class EventDataModel { lateinit var id: String lateinit var eventType:

    String lateinit var entityType: String lateinit var entityId: String lateinit var payload: String var version: Long = -1 ... }
  40. class SnapshotDataModel { lateinit var id: String lateinit var entityType:

    String lateinit var entityId: String var entityVersion: Long = -1 lateinit var snapshotType: String lateinit var snapshotData: String ... }
  41. class CosmosAggregateStore(...) : AggregateStore { override fun <T : Aggregate<T>>

    save(clazz: Class<T>, events: List<EventTypeAndData>, options: AggregateCrudSaveOptions val container = getContainer() val entityId = UUID.randomUUID().toString() val version = events.count().toLong() val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val entityDataModel = EntityDataModel(entityId, clazz.name, version) val entityDocument = entityDataModel.toDocument() batch.createItemOperation(entityDocument) events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.eventType, clazz.name, entityId, index.toLong() + 1, it.eventData ) }.map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val response = container.executeTransactionalBatch(batch) if (!response.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entityId, version) }
  42. class CosmosAggregateStore(...) : AggregateStore { override fun <T : Aggregate<T>>

    save(clazz: Class<T>, events: List<EventTypeAndData>, options: AggregateCrudSaveOptions? val container = getContainer() val entityId = UUID.randomUUID().toString() val version = events.count().toLong() val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val entityDataModel = EntityDataModel(entityId, clazz.name, version) val entityDocument = entityDataModel.toDocument() batch.createItemOperation(entityDocument) events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.eventType, clazz.name, entityId, index.toLong() + 1, it.eventData ) }.map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val response = container.executeTransactionalBatch(batch) if (!response.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entityId, version) }
  43. class CosmosAggregateStore(...) : AggregateStore { override fun <T : Aggregate<T>>

    save(clazz: Class<T>, events: List<EventTypeAndData>, options: AggregateCrudSaveOptions? val container = getContainer() val entityId = UUID.randomUUID().toString() val version = events.count().toLong() val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val entityDataModel = EntityDataModel(entityId, clazz.name, version) val entityDocument = entityDataModel.toDocument() batch.createItemOperation(entityDocument) events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.eventType, clazz.name, entityId, index.toLong() + 1, it.eventData ) }.map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val response = container.executeTransactionalBatch(batch) if (!response.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entityId, version) }
  44. class CosmosAggregateStore(...) : AggregateStore { override fun <T : Aggregate<T>>

    save(clazz: Class<T>, events: List<EventTypeAndData>, options: AggregateCrudSaveOptions? val container = getContainer() val entityId = UUID.randomUUID().toString() val version = events.count().toLong() val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val entityDataModel = EntityDataModel(entityId, clazz.name, version) val entityDocument = entityDataModel.toDocument() batch.createItemOperation(entityDocument) events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.eventType, clazz.name, entityId, index.toLong() + 1, it.eventData ) }.map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val response = container.executeTransactionalBatch(batch) if (!response.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entityId, version) }
  45. class CosmosAggregateStore(...) : AggregateStore { override fun <T : Aggregate<T>>

    save(clazz: Class<T>, events: List<EventTypeAndData>, options: AggregateCrudSaveOptions? val container = getContainer() val entityId = UUID.randomUUID().toString() val version = events.count().toLong() val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val entityDataModel = EntityDataModel(entityId, clazz.name, version) val entityDocument = entityDataModel.toDocument() batch.createItemOperation(entityDocument) events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.eventType, clazz.name, entityId, index.toLong() + 1, it.eventData ) }.map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val response = container.executeTransactionalBatch(batch) if (!response.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entityId, version) }
  46. class CosmosAggregateStore(...) : AggregateStore { override fun <T : Aggregate<T>>

    save(clazz: Class<T>, events: List<EventTypeAndData>, options: AggregateCrudSaveOptions? val container = getContainer() val entityId = UUID.randomUUID().toString() val version = events.count().toLong() val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val entityDataModel = EntityDataModel(entityId, clazz.name, version) val entityDocument = entityDataModel.toDocument() batch.createItemOperation(entityDocument) events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.eventType, clazz.name, entityId, index.toLong() + 1, it.eventData ) }.map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val response = container.executeTransactionalBatch(batch) if (!response.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entityId, version) }
  47. class CosmosAggregateStore(...) : AggregateStore { override fun <T : Aggregate<T>>

    save(clazz: Class<T>, events: List<EventTypeAndData>, options: AggregateCrudSaveOptions? val container = getContainer() val entityId = UUID.randomUUID().toString() val version = events.count().toLong() val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val entityDataModel = EntityDataModel(entityId, clazz.name, version) val entityDocument = entityDataModel.toDocument() batch.createItemOperation(entityDocument) events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.eventType, clazz.name, entityId, index.toLong() + 1, it.eventData ) }.map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val response = container.executeTransactionalBatch(batch) if (!response.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entityId, version) }
  48. class CosmosAggregateStore(...) : AggregateStore { override fun <T : Aggregate<T>>

    save(clazz: Class<T>, events: List<EventTypeAndData>, options: AggregateCrudSaveOptions? val container = getContainer() val entityId = UUID.randomUUID().toString() val version = events.count().toLong() val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val entityDataModel = EntityDataModel(entityId, clazz.name, version) val entityDocument = entityDataModel.toDocument() batch.createItemOperation(entityDocument) events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.eventType, clazz.name, entityId, index.toLong() + 1, it.eventData ) }.map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val response = container.executeTransactionalBatch(batch) if (!response.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entityId, version) }
  49. } val eventDatamodels = events.mapIndexed { index, it -> EventDataModel(

    UUID.randomUUID().toString(), it.javaClass.name, clazz.name, entityId, currentVersion + index + 1, Gson().toJson(it) ) } eventDatamodels .map(EventDataModel::toDocument) .forEach { batch.createItemOperation(it) } val latestVersion = eventDatamodels.last().version val entity = EntityDataModel.from(entityDocument) entity.version = latestVersion val replaceEntityDocument = entity.toDocument() batch.replaceItemOperation(entityId, replaceEntityDocument, TransactionalBatchItemRequestOptions().setIfMatchETag(entit if (options?.snapshot != null) { val snapshot = options.snapshot val snapshotDataModel = SnapshotDataModel( UUID.randomUUID().toString(), clazz.name, entityIdAndVersion.id, latestVersion, snapshot!!.javaClass.name,
  50. override fun <T : Aggregate<T>> update(clazz: Class<T>, entityIdAndVersion: EntityIdAndVersion, events:

    List<Event>, option val entityId = entityIdAndVersion.id val currentVersion = entityIdAndVersion.version val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val container = getContainer() val entityDocumentReadResponse = container.readItem(entityId, PartitionKey(entityId), EventSourcingDocument::class.java val entityDocument = entityDocumentReadResponse.item if (currentVersion != entityDocument.version) { throw OptimisticConcurrencyException(entityId, currentVersion, entityDocument.version) } val eventDatamodels = events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.javaClass.name, clazz.name, entityId, currentVersion + index + 1, Gson().toJson(it) ) } eventDatamodels.map(EventDataModel::toDocument).forEach {batch.createItemOperation(it)} val latestVersion = eventDatamodels.last().version val entity = EntityDataModel.from(entityDocument) entity.version = latestVersion val replaceEntityDocument = entity.toDocument() batch.replaceItemOperation(entityId, replaceEntityDocument, TransactionalBatchItemRequestOptions().setIfMatchETag(entit
  51. override fun <T : Aggregate<T>> update(clazz: Class<T>, entityIdAndVersion: EntityIdAndVersion, events:

    List<Event>, option val entityId = entityIdAndVersion.id val currentVersion = entityIdAndVersion.version val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val container = getContainer() val entityDocumentReadResponse = container.readItem(entityId, PartitionKey(entityId), EventSourcingDocument::class.java val entityDocument = entityDocumentReadResponse.item if (currentVersion != entityDocument.version) { throw OptimisticConcurrencyException(entityId, currentVersion, entityDocument.version) } val eventDatamodels = events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.javaClass.name, clazz.name, entityId, currentVersion + index + 1, Gson().toJson(it) ) } eventDatamodels.map(EventDataModel::toDocument).forEach {batch.createItemOperation(it)} val latestVersion = eventDatamodels.last().version val entity = EntityDataModel.from(entityDocument) entity.version = latestVersion val replaceEntityDocument = entity.toDocument() batch.replaceItemOperation(entityId, replaceEntityDocument, TransactionalBatchItemRequestOptions().setIfMatchETag(entit
  52. override fun <T : Aggregate<T>> update(clazz: Class<T>, entityIdAndVersion: EntityIdAndVersion, events:

    List<Event>, option val entityId = entityIdAndVersion.id val currentVersion = entityIdAndVersion.version val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val container = getContainer() val entityDocumentReadResponse = container.readItem(entityId, PartitionKey(entityId), EventSourcingDocument::class.java val entityDocument = entityDocumentReadResponse.item if (currentVersion != entityDocument.version) { throw OptimisticConcurrencyException(entityId, currentVersion, entityDocument.version) } val eventDatamodels = events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.javaClass.name, clazz.name, entityId, currentVersion + index + 1, Gson().toJson(it) ) } eventDatamodels.map(EventDataModel::toDocument).forEach {batch.createItemOperation(it)} val latestVersion = eventDatamodels.last().version val entity = EntityDataModel.from(entityDocument) entity.version = latestVersion val replaceEntityDocument = entity.toDocument() batch.replaceItemOperation(entityId, replaceEntityDocument, TransactionalBatchItemRequestOptions().setIfMatchETag(entit
  53. override fun <T : Aggregate<T>> update(clazz: Class<T>, entityIdAndVersion: EntityIdAndVersion, events:

    List<Event>, option val entityId = entityIdAndVersion.id val currentVersion = entityIdAndVersion.version val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val container = getContainer() val entityDocumentReadResponse = container.readItem(entityId, PartitionKey(entityId), EventSourcingDocument::class.java val entityDocument = entityDocumentReadResponse.item if (currentVersion != entityDocument.version) { throw OptimisticConcurrencyException(entityId, currentVersion, entityDocument.version) } val eventDatamodels = events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.javaClass.name, clazz.name, entityId, currentVersion + index + 1, Gson().toJson(it) ) } eventDatamodels.map(EventDataModel::toDocument).forEach {batch.createItemOperation(it)} val latestVersion = eventDatamodels.last().version val entity = EntityDataModel.from(entityDocument) entity.version = latestVersion val replaceEntityDocument = entity.toDocument() batch.replaceItemOperation(entityId, replaceEntityDocument, TransactionalBatchItemRequestOptions().setIfMatchETag(entit
  54. override fun <T : Aggregate<T>> update(clazz: Class<T>, entityIdAndVersion: EntityIdAndVersion, events:

    List<Event>, option val entityId = entityIdAndVersion.id val currentVersion = entityIdAndVersion.version val batch = TransactionalBatch.createTransactionalBatch(PartitionKey(entityId)) val container = getContainer() val entityDocumentReadResponse = container.readItem(entityId, PartitionKey(entityId), EventSourcingDocument::class.java val entityDocument = entityDocumentReadResponse.item if (currentVersion != entityDocument.version) { throw OptimisticConcurrencyException(entityId, currentVersion, entityDocument.version) } val eventDatamodels = events.mapIndexed { index, it -> EventDataModel( UUID.randomUUID().toString(), it.javaClass.name, clazz.name, entityId, currentVersion + index + 1, Gson().toJson(it) ) } eventDatamodels.map(EventDataModel::toDocument).forEach {batch.createItemOperation(it)} val latestVersion = eventDatamodels.last().version val entity = EntityDataModel.from(entityDocument) entity.version = latestVersion val replaceEntityDocument = entity.toDocument() batch.replaceItemOperation(entityId, replaceEntityDocument, TransactionalBatchItemRequestOptions().setIfMatchETag(entit
  55. if (options?.snapshot != null) { val snapshot = options.snapshot val

    snapshotDataModel = SnapshotDataModel( UUID.randomUUID().toString(), clazz.name, entityIdAndVersion.id, latestVersion, snapshot!!.javaClass.name, Gson().toJson(snapshot) ) val snapshotDocument = snapshotDataModel.toDocument() batch.createItemOperation(snapshotDocument) } val result = container.executeTransactionalBatch(batch) if (!result.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entity.id, latestVersion) }
  56. if (options?.snapshot != null) { val snapshot = options.snapshot val

    snapshotDataModel = SnapshotDataModel( UUID.randomUUID().toString(), clazz.name, entityIdAndVersion.id, latestVersion, snapshot!!.javaClass.name, Gson().toJson(snapshot) ) val snapshotDocument = snapshotDataModel.toDocument() batch.createItemOperation(snapshotDocument) } val result = container.executeTransactionalBatch(batch) if (!result.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entity.id, latestVersion) }
  57. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  58. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  59. class AggregateRepository<T, CT>( private val clazz: Class<T>, private val aggregateStore:

    AggregateStore ) where T : CommandProcessingAggregate<T, CT>, CT : Command { ... fun update(id: String, cmd: CT, updateOption: UpdateOptions = UpdateOptions()) : EntityWithIdAndVersion<T> { val loadedData = aggregateStore.find(clazz, id) ?: throw IllegalArgumentException("$id is notfound") val aggregate = loadedData.entity val events = aggregate.processCommand(cmd) Aggregate.applyEvents(aggregate, events) val idAndVersion = aggregateStore.update( clazz, loadedData.idAndVersion, events, withPossibleSnapshot( updateOption, aggregate, loadedData.snapshotVersion, loadedData.events, events) ) return EntityWithIdAndVersion(idAndVersion, aggregate) } }
  60. class OrderSnapshotStrategy : SnapshotStrategy { override fun getAggregateClass(): Class<*> =

    Order::class.java override fun possiblySnapshot( aggregate: Aggregate<*>, oldEvents: List<EventWithMetaData>, newEvents: List<Event>, snapshotVersion: String? ): Snapshot? { val order = aggregate as Order val snapshot = OrderSnapshot() snapshot.accountId = order.accountId snapshot.orderState = order.orderState snapshot.total = order.total snapshot.items = order.items return snapshot } override fun <T> recreateAggregate(clasz: Class<T>, snapshot: Snapshot): T where T : Aggregate<T> { val orderSnapshot = snapshot as OrderSnapshot val aggregate = Order() aggregate.accountId = orderSnapshot.accountId aggregate.orderState = orderSnapshot.orderState aggregate.total = orderSnapshot.total aggregate.items = orderSnapshot.items return aggregate as T } }
  61. if (options?.snapshot != null) { val snapshot = options.snapshot val

    snapshotDataModel = SnapshotDataModel( UUID.randomUUID().toString(), clazz.name, entityIdAndVersion.id, latestVersion, snapshot!!.javaClass.name, Gson().toJson(snapshot) ) val snapshotDocument = snapshotDataModel.toDocument() batch.createItemOperation(snapshotDocument) } val result = container.executeTransactionalBatch(batch) if (!result.isSuccessStatusCode) { throw UnsupportedOperationException() } return EntityIdAndVersion(entity.id, latestVersion) }
  62. val entity = EntityDataModel.from(entityDocument) val getEventsQuery = if (snapshotDataModel !=

    null) { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.version > ${entity.version} AND x.entityId = ¥"${en } else { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.entityId = ¥"${entity.id}¥" ORDER BY x.version" } val eventDocuments = container.queryItems( getEventsQuery, CosmosQueryRequestOptions().setPartitionKey(PartitionKey(entity.id)), EventSourcingDocument::class.java ) val events = eventDocuments.map(EventDataModel.Companion::from).map { val eventClazz = Class.forName(it.eventType) val event = Gson().fromJson(it.payload, eventClazz) as Event EventWithMetaData(event, it.version) } if (snapshotDataModel == null && events.isEmpty()) { return null } val version = if (events.isEmpty()) { entity.version } else { events.last().version } val idAndVersion = EntityIdAndVersion(id, version) val recreatedAggregate = if (snapshotDataModel != null) {
  63. override fun <T : Aggregate<T>> find(clazz: Class<T>, id: String): EntityWithMetadata<T>?

    { val container = getContainer() val readSnapshotDocumentResponse = container.queryItems( "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Snapshot}¥" AND x.entityId = ¥"${id}¥" ORDER BY x.version DESC", CosmosQueryRequestOptions().setPartitionKey(PartitionKey(id)).setMaxBufferedItemCount(1), EventSourcingDocument::class.java ) val snapshotDataModel = readSnapshotDocumentResponse.map(SnapshotDataModel.Companion::from).firstOrNull() val readEntityDocumentResponse = container.readItem(id, PartitionKey(id), EventSourcingDocument::class.java) val entityDocument = readEntityDocumentResponse.item val entity = EntityDataModel.from(entityDocument) val getEventsQuery = if (snapshotDataModel != null) { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.version > ${entity.version} AND x.entityId = ¥"${en } else { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.entityId = ¥"${entity.id}¥" ORDER BY x.version" } val eventDocuments = container.queryItems( getEventsQuery, CosmosQueryRequestOptions().setPartitionKey(PartitionKey(entity.id)), EventSourcingDocument::class.java ) val events = eventDocuments.map(EventDataModel.Companion::from).map { val eventClazz = Class.forName(it.eventType) val event = Gson().fromJson(it.payload, eventClazz) as Event EventWithMetaData(event, it.version) }
  64. override fun <T : Aggregate<T>> find(clazz: Class<T>, id: String): EntityWithMetadata<T>?

    { val container = getContainer() val readSnapshotDocumentResponse = container.queryItems( "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Snapshot}¥" AND x.entityId = ¥"${id}¥" ORDER BY x.version DESC", CosmosQueryRequestOptions().setPartitionKey(PartitionKey(id)).setMaxBufferedItemCount(1), EventSourcingDocument::class.java ) val snapshotDataModel = readSnapshotDocumentResponse.map(SnapshotDataModel.Companion::from).firstOrNull() val readEntityDocumentResponse = container.readItem(id, PartitionKey(id), EventSourcingDocument::class.java) val entityDocument = readEntityDocumentResponse.item val entity = EntityDataModel.from(entityDocument) val getEventsQuery = if (snapshotDataModel != null) { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.version > ${entity.version} AND x.entityId = ¥"${en } else { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.entityId = ¥"${entity.id}¥" ORDER BY x.version" } val eventDocuments = container.queryItems( getEventsQuery, CosmosQueryRequestOptions().setPartitionKey(PartitionKey(entity.id)), EventSourcingDocument::class.java ) val events = eventDocuments.map(EventDataModel.Companion::from).map { val eventClazz = Class.forName(it.eventType) val event = Gson().fromJson(it.payload, eventClazz) as Event EventWithMetaData(event, it.version) }
  65. override fun <T : Aggregate<T>> find(clazz: Class<T>, id: String): EntityWithMetadata<T>?

    { val container = getContainer() val readSnapshotDocumentResponse = container.queryItems( "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Snapshot}¥" AND x.entityId = ¥"${id}¥" ORDER BY x.version DESC", CosmosQueryRequestOptions().setPartitionKey(PartitionKey(id)).setMaxBufferedItemCount(1), EventSourcingDocument::class.java ) val snapshotDataModel = readSnapshotDocumentResponse.map(SnapshotDataModel.Companion::from).firstOrNull() val readEntityDocumentResponse = container.readItem(id, PartitionKey(id), EventSourcingDocument::class.java) val entityDocument = readEntityDocumentResponse.item val entity = EntityDataModel.from(entityDocument) val getEventsQuery = if (snapshotDataModel != null) { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.version > ${entity.version} AND x.entityId = ¥"${en } else { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.entityId = ¥"${entity.id}¥" ORDER BY x.version" } val eventDocuments = container.queryItems( getEventsQuery, CosmosQueryRequestOptions().setPartitionKey(PartitionKey(entity.id)), EventSourcingDocument::class.java ) val events = eventDocuments.map(EventDataModel.Companion::from).map { val eventClazz = Class.forName(it.eventType) val event = Gson().fromJson(it.payload, eventClazz) as Event EventWithMetaData(event, it.version) }
  66. override fun <T : Aggregate<T>> find(clazz: Class<T>, id: String): EntityWithMetadata<T>?

    { val container = getContainer() val readSnapshotDocumentResponse = container.queryItems( "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Snapshot}¥" AND x.entityId = ¥"${id}¥" ORDER BY x.version DESC", CosmosQueryRequestOptions().setPartitionKey(PartitionKey(id)).setMaxBufferedItemCount(1), EventSourcingDocument::class.java ) val snapshotDataModel = readSnapshotDocumentResponse.map(SnapshotDataModel.Companion::from).firstOrNull() val readEntityDocumentResponse = container.readItem(id, PartitionKey(id), EventSourcingDocument::class.java) val entityDocument = readEntityDocumentResponse.item val entity = EntityDataModel.from(entityDocument) val getEventsQuery = if (snapshotDataModel != null) { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.version > ${entity.version} AND x.entityId = ¥"${en } else { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.entityId = ¥"${entity.id}¥" ORDER BY x.version" } val eventDocuments = container.queryItems( getEventsQuery, CosmosQueryRequestOptions().setPartitionKey(PartitionKey(entity.id)), EventSourcingDocument::class.java ) val events = eventDocuments.map(EventDataModel.Companion::from).map { val eventClazz = Class.forName(it.eventType) val event = Gson().fromJson(it.payload, eventClazz) as Event EventWithMetaData(event, it.version) }
  67. override fun <T : Aggregate<T>> find(clazz: Class<T>, id: String): EntityWithMetadata<T>?

    { val container = getContainer() val readSnapshotDocumentResponse = container.queryItems( "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Snapshot}¥" AND x.entityId = ¥"${id}¥" ORDER BY x.version DESC", CosmosQueryRequestOptions().setPartitionKey(PartitionKey(id)).setMaxBufferedItemCount(1), EventSourcingDocument::class.java ) val snapshotDataModel = readSnapshotDocumentResponse.map(SnapshotDataModel.Companion::from).firstOrNull() val readEntityDocumentResponse = container.readItem(id, PartitionKey(id), EventSourcingDocument::class.java) val entityDocument = readEntityDocumentResponse.item val entity = EntityDataModel.from(entityDocument) val getEventsQuery = if (snapshotDataModel != null) { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.version > ${entity.version} AND x.entityId = ¥"${en } else { "SELECT * FROM x WHERE x.type = ¥"${DocumentType.Event}¥" AND x.entityId = ¥"${entity.id}¥" ORDER BY x.version" } val eventDocuments = container.queryItems( getEventsQuery, CosmosQueryRequestOptions().setPartitionKey(PartitionKey(entity.id)), EventSourcingDocument::class.java ) val events = eventDocuments.map(EventDataModel.Companion::from).map { val eventClazz = Class.forName(it.eventType) val event = Gson().fromJson(it.payload, eventClazz) as Event EventWithMetaData(event, it.version) }
  68. if (snapshotDataModel == null && events.isEmpty()) { return null }

    val version = if (events.isEmpty()) { entity.version } else { events.last().version } val idAndVersion = EntityIdAndVersion(id, version) val recreatedAggregate = if (snapshotDataModel != null) { val snapshotClazz = Class.forName(snapshotDataModel.snapshotType) val snapshot = Gson().fromJson(snapshotDataModel.snapshotData, snapshotClazz) as Snapshot val aggregate = snapshotManager.recreateFromSnapshot(clazz, snapshot) Aggregate.applyEvents(aggregate, events.map { it.event }) } else { Aggregate.recreate(clazz, events.map { it.event }) } return EntityWithMetadata( idAndVersion, events, recreatedAggregate, snapshotDataModel?.entityVersion.toString() ) }
  69. if (snapshotDataModel == null && events.isEmpty()) { return null }

    val version = if (events.isEmpty()) { entity.version } else { events.last().version } val idAndVersion = EntityIdAndVersion(id, version) val recreatedAggregate = if (snapshotDataModel != null) { val snapshotClazz = Class.forName(snapshotDataModel.snapshotType) val snapshot = Gson().fromJson(snapshotDataModel.snapshotData, snapshotClazz) as Snapshot val aggregate = snapshotManager.recreateFromSnapshot(clazz, snapshot) Aggregate.applyEvents(aggregate, events.map { it.event }) } else { Aggregate.recreate(clazz, events.map { it.event }) } return EntityWithMetadata( idAndVersion, events, recreatedAggregate, snapshotDataModel?.entityVersion.toString() ) }
  70. if (snapshotDataModel == null && events.isEmpty()) { return null }

    val version = if (events.isEmpty()) { entity.version } else { events.last().version } val idAndVersion = EntityIdAndVersion(id, version) val recreatedAggregate = if (snapshotDataModel != null) { val snapshotClazz = Class.forName(snapshotDataModel.snapshotType) val snapshot = Gson().fromJson(snapshotDataModel.snapshotData, snapshotClazz) as Snapshot val aggregate = snapshotManager.recreateFromSnapshot(clazz, snapshot) Aggregate.applyEvents(aggregate, events.map { it.event }) } else { Aggregate.recreate(clazz, events.map { it.event }) } return EntityWithMetadata( idAndVersion, events, recreatedAggregate, snapshotDataModel?.entityVersion.toString() ) }
  71. if (snapshotDataModel == null && events.isEmpty()) { return null }

    val version = if (events.isEmpty()) { entity.version } else { events.last().version } val idAndVersion = EntityIdAndVersion(id, version) val recreatedAggregate = if (snapshotDataModel != null) { val snapshotClazz = Class.forName(snapshotDataModel.snapshotType) val snapshot = Gson().fromJson(snapshotDataModel.snapshotData, snapshotClazz) as Snapshot val aggregate = snapshotManager.recreateFromSnapshot(clazz, snapshot) Aggregate.applyEvents(aggregate, events.map { it.event }) } else { Aggregate.recreate(clazz, events.map { it.event }) } return EntityWithMetadata( idAndVersion, events, recreatedAggregate, snapshotDataModel?.entityVersion.toString() ) }