Upgrade to Pro — share decks privately, control downloads, hide ads and more …

CQRS & Event Sourcing in the wild (Confoo Montr...

CQRS & Event Sourcing in the wild (Confoo Montreal 2018)

CQRS & event sourcing are very popular topics. However, most blogs and talks focus on the theory or introductions to a framework, not necessarily the challenges of a production deployment.

This session bridges that gap and looks at some of the pitfalls of a real-world deployment. I'll discuss topics like concurrency & scale, refactoring events and updating read models. Attend this talk to learn from my experiences and be better prepared.

Michiel Rook

March 09, 2018
Tweet

More Decks by Michiel Rook

Other Decks in Programming

Transcript

  1. YOU

  2. read CQRS / Event Sourcing theory followed a tutorial, built

    a hobby project RAISE YOUR HAND IF YOU HAVE
  3. read CQRS / Event Sourcing theory followed a tutorial, built

    a hobby project used it in production RAISE YOUR HAND IF YOU HAVE
  4. ' Event Sourcing ensures that all changes to application state

    are stored as a sequence of events. -Martin Fowler
  5. ACTIVE RECORD VS. EVENT SOURCING Account Id Account number Balance

    1234 12345678 €50,00 ... ... ... Money Withdrawn Account Id 1234 Amount €50,00 Money Deposited Account Id 1234 Amount €100,00 Account Opened Account Id 1234 Account number 12345678 @michieltcs
  6. COMMANDS TO EVENTS Deposit Money Account Id 1234 Amount €100,00

    @michieltcs 1 @Value 2 public class DepositMoney { 3 @TargetAggregateIdentifier 4 String accountId; 5 BigDecimal amount; 6 }
  7. COMMANDS TO EVENTS Deposit Money Account Id 1234 Amount €100,00

    command
 handler @michieltcs 1 @CommandHandler 2 public void depositMoney(DepositMoney command) { 3 apply(new MoneyDeposited( 4 command.getAccountId(), 5 command.getAmount(), 6 ZonedDateTime.now())); 7 }
  8. COMMANDS TO EVENTS Deposit Money Account Id 1234 Amount €100,00

    Money Deposited Account Id 1234 Amount €100,00 command
 handler @michieltcs 1 @Value 2 public class MoneyDeposited { 3 String accountId; 4 BigDecimal amount; 5 ZonedDateTime timestamp; 6 }
  9. AGGREGATES @michieltcs 1 class BankAccount { 2 @AggregateIdentifier 3 private

    String accountId; 4 private String accountNumber; 5 private BigDecimal balance; 6 7 // ... 8 @EventHandler 9 public void accountOpened(AccountOpened event) { 10 this.accountId = event.getAccountId(); 11 this.accountNumber = event.getAccountNumber(); 12 this.balance = BigDecimal.valueOf(0); 13 } 14 15 @EventHandler 16 public void moneyDeposited(MoneyDeposited event) { 17 this.balance = this.balance.add(event.getAmount()); 18 } 19 }
  10. AGGREGATE STATE Account number Balance 12345678 €0,00 Account number Balance

    12345678 €100,00 Account number Balance 12345678 €50,00 event
 handler event
 handler event
 handler @michieltcs Money Withdrawn Account Id 1234 Amount €50,00 Money Deposited Account Id 1234 Amount €100,00 Account Opened Account Id 1234 Account number 12345678
  11. TESTING AGGREGATES @michieltcs 1 public class BankAccountTest { 2 private

    FixtureConfiguration<BankAccount> fixture; 3 4 @Before 5 public void createFixture() { 6 fixture = new AggregateTestFixture<>(BankAccount.class); 7 } 8 9 @Test 10 public void noOverdraftsOnEmptyAccount() { 11 fixture.given(new AccountOpened(ACCOUNT_ID, ACCOUNT_NUMBER)) 12 .when(new WithdrawMoney(ACCOUNT_ID, new BigDecimal(20))) 13 .expectException(OverdraftDetectedException.class); 14 } 15 16 private final static String ACCOUNT_ID = "accountId"; 17 private final static String ACCOUNT_NUMBER = "accountNumber"; 18 }
  12. Domain UI Event Bus Event Handlers Command Repository Database Database

    Event Store commands events events Aggregates @michieltcs
  13. Domain UI Event Bus Event Handlers Command Repository Data Layer

    Database Database Event Store commands events events queries DTOs Aggregates @michieltcs
  14. QUERIES Money Deposited Money Withdrawn Interest Received Accounts with balance

    > €100? Money Deposited Money Withdrawn @michieltcs
  15. PROJECTION Account Opened Event Handler # of active accounts +1

    Account Closed Event Handler # of active accounts -1 @michieltcs
  16. PROJECTION @michieltcs 1 public class BankAccountProjections { 2 private Map<String,

    String> activeAccounts = new HashMap<>(); 3 4 @EventHandler 5 public void onAccountOpened(AccountOpened accountOpened) { 6 activeAccounts.put(accountOpened.getAccountId(), accountOpened. 7 getAccountNumber()); 8 } 9 10 @EventHandler 11 public void onAccountClosed(AccountClosed accountClosed) { 12 activeAccounts.remove(accountClosed.getAccountId()); 13 } 14 15 public String getAccountNumberForAccountId(String accountId) { 16 return activeAccounts.get(accountId); 17 } 18 }
  17. ZERO DOWNTIME New events Queue Loop over existing events Apply

    to new projection Apply queued events Use projection @michieltcs
  18. ZERO DOWNTIME Get next event Apply to new projection Last

    event? Use projection yes no @michieltcs
  19. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  20. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  21. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  22. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  23. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  24. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event Token Store
  25. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  26. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  27. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  28. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  29. TRACKING EVENT PROCESSOR @michieltcs 1 @Target(ElementType.TYPE) 2 @Retention(RetentionPolicy.RUNTIME) 3 public

    @interface RebuildableProjection { 4 String version() default ""; 5 boolean rebuild() default false; 6 }
  30. TRACKING EVENT PROCESSOR @michieltcs 1 @Configuration 2 public class ProjectionsConfiguration

    { 3 @Autowired 4 private EventHandlingConfiguration eventHandlingConfiguration;
  31. TRACKING EVENT PROCESSOR @michieltcs 1 @PostConstruct 2 public void startTrackingProjections()

    throws ClassNotFoundException { 3 ClassPathScanningCandidateComponentProvider scanner = 4 new ClassPathScanningCandidateComponentProvider(false); 5 scanner.addIncludeFilter(new AnnotationTypeFilter(RebuildableProjection. 6 class)); 7 8 for (BeanDefinition bd : scanner.findCandidateComponents("org.demo")) { 9 Class<?> aClass = Class.forName(bd.getBeanClassName()); 10 RebuildableProjection rebuildableProjection = aClass.getAnnotation( 11 RebuildableProjection. 12 class); 13 14 if (rebuildableProjection.rebuild()) { 15 registerRebuildableProjection(aClass, rebuildableProjection); 16 } 17 } 18 }
  32. TRACKING EVENT PROCESSOR @michieltcs 1 private void registerRebuildableProjection(Class<?> aClass, 2

    RebuildableProjection 3 rebuildableProjection) { 4 ProcessingGroup processingGroup = aClass.getAnnotation(ProcessingGroup. 5 class); 6 7 String name = Optional.ofNullable(processingGroup).map(ProcessingGroup:: 8 value) 9 .orElse(aClass.getName() + "/" + rebuildableProjection.version()); 10 11 eventHandlingConfiguration.assignHandlersMatching( 12 name, 13 Integer.MAX_VALUE, 14 (eventHandler) -> aClass.isAssignableFrom(eventHandler.getClass())); 15 16 eventHandlingConfiguration.registerTrackingProcessor(name); 17 }
  33. Commands can be renamed 1 Events are immutable Correct old

    events with new events 2 3 @michieltcs
  34. COMPENSATING ACTIONS class MoneyWithdrawn {
 String accountId;
 BigDecimal amount;
 }

    class WithdrawalRolledBack {
 String accountId;
 BigDecimal amount;
 } Typo: too much withdrawn!
  35. COMPENSATING ACTIONS class AccountOpened {
 String accountId;
 String accountNumber;
 }

    class DuplicateAccountClosed {
 String accountId;
 } Duplicate account number!
  36. UPCASTING @michieltcs 1 @Value 2 @Revision("1.0") 3 public class AccountOpened

    { 4 String accountId; 5 String accountNumber; 6 }
  37. UPCASTING @michieltcs 1 @Value 2 @Revision("2.0") 3 public class AccountOpened

    { 4 String accountId; 5 String accountNumberIban; 6 }
  38. UPCASTING @michieltcs 1 @Getter 2 public class AccountOpenedUpcaster implements EventUpcaster

    { 3 private final SerializedType typeConsumed 4 = new SimpleSerializedType(AccountOpened.class.getTypeName(), "1.0"); 5 private final SerializedType typeProduced 6 = new SimpleSerializedType(AccountOpened.class.getTypeName(), "2.0");
  39. UPCASTING @michieltcs 1 @Override 2 public final Stream<IntermediateEventRepresentation> upcast( 3

    Stream<IntermediateEventRepresentation> intermediateRepresentations) { 4 return intermediateRepresentations.map(evt -> { 5 if (evt.getType().equals(getTypeConsumed())) { 6 return evt.upcastPayload(getTypeProduced(), Document.class, 7 document -> { 8 Element rootElement = document.getRootElement(); 9 Element accountNumberElement = rootElement.element( 10 "accountNumber"); 11 rootElement.remove(accountNumberElement); 12 rootElement.addElement("accountNumberIban") 13 .setText(toIban(accountNumberElement.getText())); 14 return document; 15 }); 16 } else { 17 return evt; 18 } 19 }); 20 }
  40. VERSIONED EVENT STORE events_v1 [
 {
 "id": "12345678",
 "type": "AccountOpened",


    "aggregateType": "Account",
 "aggregateIdentifier": "1234",
 "sequenceNumber": 0,
 "payloadRevision": "1.0",
 "payload": { ... },
 "timestamp": ...
 ...
 },
 ...
 ] @michieltcs
  41. VERSIONED EVENT STORE Loop over existing events Apply upcaster Add

    queued events Use new event store New events Queue @michieltcs
  42. VERSIONED EVENT STORE events_v2 [
 {
 "id": "12345678",
 "type": "AccountOpened",


    "aggregateType": "Account",
 "aggregateIdentifier": "1234",
 "sequenceNumber": 0,
 "payloadRevision": "2.0",
 "payload": { ... },
 "timestamp": ...
 ...
 },
 ...
 ] @michieltcs
  43. PROCESSING GDPR ART. 17 REQUESTS @michieltcs ConsentWithdrawn Remove from event

    store ? Remove from read models Notify 3rd parties
  44. STORE PII EXTERNALLY @michieltcs AccountOpened External Storage 1 @Value 2

    public class AccountOpened { 3 String accountId; 4 } Account Id Account number Name 1234 12345678 John Doe ... ... ...
  45. AXON GDPR MODULE @michieltcs 1 @Value 2 public class AccountOpened

    { 3 @DataSubjectId 4 String accountId; 5 6 @PersonalData 7 String accountNumber; 8 9 @PersonalData 10 String fullName; 11 }
  46. CONCURRENT COMMANDS Withdraw Money Account Id 1234 Amount €50,00 Deposit

    Money Account Id 1234 Amount €100,00 ? @michieltcs
  47. PESSIMISTIC LOCKING Withdraw Money Account Id 1234 Amount €50,00 Deposit

    Money Account Id 1234 Amount €100,00 Account Id Balance 1234 €100,00 Account Id Balance 1234 €50,00 wait for lock lock @michieltcs
  48. OPTIMISTIC LOCKING Withdraw Money Account Id 1234 Amount €50,00 version

    1 Deposit Money Account Id 1234 Amount €100,00 version 1 Account Id Balance 1234 €100,00 version 2 ConcurrencyException @michieltcs
  49. MULTIPLE INSTANCES Replica Replica Replica Commands Distributed
 Command Bus Command


    Handler Command
 Handler Command
 Handler Command
 Handler Command
 Handler Command
 Handler Command
 Handler Command
 Handler Command
 Handler @michieltcs