Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Dealing with change in event sourced applicatio...

Michiel Rook
September 13, 2018

Dealing with change in event sourced applications (JavaZone 2018)

In software development, change is pretty much the only constant factor. In fact, embracing change is one of the twelve principles behind the Agile Manifesto. As time passes, our understanding of the domain we are working in evolves. We develop based on new requirements, (better) insights, opportunities, changes in the market or legislation, or other factors.

These inputs eventually all lead to modifications to our application, which can be very challenging to implement if the application uses event sourcing. Indeed, when applied very strictly, event sourcing can be quite resistant to change. And unfortunately, there’s not a lot of literature on this subject (yet).

In this talk, we’ll explore how to deal with projection updates, event versioning and the GDPR, and the tools offered by a popular framework (Axon).

Michiel Rook

September 13, 2018
Tweet

More Decks by Michiel Rook

Other Decks in Programming

Transcript

  1. YOU

  2. read CQRS / Event Sourcing theory followed a tutorial, built

    a hobby project RAISE YOUR HAND IF YOU HAVE
  3. read CQRS / Event Sourcing theory followed a tutorial, built

    a hobby project used it in production RAISE YOUR HAND IF YOU HAVE
  4. ' Event Sourcing ensures that all changes to application state

    are stored as a sequence of events. -Martin Fowler
  5. ACTIVE RECORD VS. EVENT SOURCING Account Id Account number Balance

    1234 12345678 €50,00 ... ... ... Money Withdrawn Account Id 1234 Amount €50,00 Money Deposited Account Id 1234 Amount €100,00 Account Opened Account Id 1234 Account number 12345678 @michieltcs
  6. COMMANDS TO EVENTS Deposit Money Account Id 1234 Amount €100,00

    @michieltcs 1 @Value 2 public class DepositMoney { 3 @TargetAggregateIdentifier 4 String accountId; 5 BigDecimal amount; 6 }
  7. COMMANDS TO EVENTS Deposit Money Account Id 1234 Amount €100,00

    command
 handler @michieltcs 1 @CommandHandler 2 public void depositMoney(DepositMoney command) { 3 apply(new MoneyDeposited( 4 command.getAccountId(), 5 command.getAmount(), 6 ZonedDateTime.now())); 7 }
  8. COMMANDS TO EVENTS Deposit Money Account Id 1234 Amount €100,00

    Money Deposited Account Id 1234 Amount €100,00 command
 handler @michieltcs 1 @Value 2 public class MoneyDeposited { 3 String accountId; 4 BigDecimal amount; 5 ZonedDateTime timestamp; 6 }
  9. AGGREGATES @michieltcs 1 class BankAccount { 2 @AggregateIdentifier 3 private

    String accountId; 4 private String accountNumber; 5 private BigDecimal balance; 6 7 // ... 8 @EventHandler 9 public void accountOpened(AccountOpened event) { 10 this.accountId = event.getAccountId(); 11 this.accountNumber = event.getAccountNumber(); 12 this.balance = BigDecimal.valueOf(0); 13 } 14 15 @EventHandler 16 public void moneyDeposited(MoneyDeposited event) { 17 this.balance = this.balance.add(event.getAmount()); 18 } 19 }
  10. AGGREGATE STATE Account number Balance 12345678 €0,00 Account number Balance

    12345678 €100,00 Account number Balance 12345678 €50,00 event
 handler event
 handler event
 handler @michieltcs Money Withdrawn Account Id 1234 Amount €50,00 Money Deposited Account Id 1234 Amount €100,00 Account Opened Account Id 1234 Account number 12345678
  11. VALIDATING COMMANDS @michieltcs 1 @CommandHandler 2 public void withdrawMoney(WithdrawMoney command)

    throws 3 OverdraftDetectedException { 4 if (balance.compareTo(command.getAmount()) >= 0) { 5 apply(new MoneyWithdrawn( 6 command.getAccountId(), 7 command.getAmount(), 8 ZonedDateTime.now())); 9 } else { 10 throw new OverdraftDetectedException(accountNumber, balance, command. 11 getAmount()); 12 } 13 }
  12. TESTING AGGREGATES @michieltcs 1 public class BankAccountTest { 2 private

    FixtureConfiguration<BankAccount> fixture; 3 4 @Before 5 public void createFixture() { 6 fixture = new AggregateTestFixture<>(BankAccount.class); 7 } 8 9 @Test 10 public void noOverdraftsOnEmptyAccount() { 11 fixture.given(new AccountOpened(ACCOUNT_ID, ACCOUNT_NUMBER)) 12 .when(new WithdrawMoney(ACCOUNT_ID, new BigDecimal(20))) 13 .expectException(OverdraftDetectedException.class); 14 } 15 16 private final static String ACCOUNT_ID = "accountId"; 17 private final static String ACCOUNT_NUMBER = "accountNumber"; 18 }
  13. QUERIES Money Deposited Money Withdrawn Interest Received Accounts with balance

    > €100? Money Deposited Money Withdrawn @michieltcs
  14. PROJECTION Account Opened Event Handler # of active accounts +1

    Account Closed Event Handler # of active accounts -1 @michieltcs
  15. Domain UI Event Bus Event Handlers Command a Database Database

    Event Store Aggregates @michieltcs commands events events
  16. Domain UI Event Bus Event Handlers Command Repository Data Layer

    Database Database Event Store commands events events queries DTOs Aggregates @michieltcs
  17. MULTIPLE AGGREGATES Seller Event Seller Event Seller Event Seller Event

    Seller Event @michieltcs Listing Event Listing Event Listing Event Listing Event Listing Event Seller Listing Seller Name Listing Date Listing Description ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
  18. PROJECTION @michieltcs 1 public class BankAccountProjections { 2 private Map<String,

    String> activeAccounts = new HashMap<>(); 3 4 @EventHandler 5 public void onAccountOpened(AccountOpened accountOpened) { 6 activeAccounts.put(accountOpened.getAccountId(), accountOpened. 7 getAccountNumber()); 8 } 9 10 @EventHandler 11 public void onAccountClosed(AccountClosed accountClosed) { 12 activeAccounts.remove(accountClosed.getAccountId()); 13 } 14 15 public String getAccountNumberForAccountId(String accountId) { 16 return activeAccounts.get(accountId); 17 } 18 }
  19. PROJECTION @michieltcs 1 public class BankAccountProjections { 2 private Map<String,

    String> activeAccounts = new HashMap<>(); 3 4 @EventHandler 5 public void onAccountOpened(AccountOpened accountOpened) { 6 activeAccounts.put(accountOpened.getAccountId(), accountOpened. 7 getAccountNumber()); 8 } 9 10 @EventHandler 11 public void onAccountClosed(AccountClosed accountClosed) { 12 activeAccounts.remove(accountClosed.getAccountId()); 13 } 14 15 public int getNumberOfActiveAccounts() { 16 return activeAccounts.size(); 17 } 18 }
  20. ZERO DOWNTIME New events Queue Loop over existing events Apply

    to new projection Apply queued events Use projection @michieltcs
  21. DIVIDING THE WORK Event Event Event Event Event @michieltcs Aggregate

    Instance Event Event Event Event Instance Event Event Event
  22. DIVIDING THE WORK Seller Event Seller Event Seller Event Seller

    Event Seller Event @michieltcs Listing Event Listing Event Listing Event Listing Event Listing Event Seller Listing Seller Name Listing Date Listing Description ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
  23. DIVIDING THE WORK Seller Event Seller Event Seller Event Seller

    Event Seller Event @michieltcs Listing Event Listing Event Listing Event Listing Event Listing Event Seller Listing Instance Instance
  24. DIVIDING THE WORK Seller Event Seller Event Seller Event Seller

    Event Seller Event @michieltcs Listing Event Listing Event Listing Event Listing Event Listing Event Seller Listing Instance Instance ?
  25. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  26. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  27. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  28. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  29. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  30. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event Token Store
  31. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  32. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  33. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  34. TRACKING EVENTS @michieltcs Event Event Event Event Event Event .

    . . . . . . . . . . . . . Event Event Event
  35. TRACKING EVENT PROCESSOR Get next event Apply to new projection

    Last event? Use projection yes no @michieltcs
  36. TRACKING EVENT PROCESSOR @michieltcs 1 @Target(ElementType.TYPE) 2 @Retention(RetentionPolicy.RUNTIME) 3 public

    @interface RebuildableProjection { 4 String version() default ""; 5 boolean rebuild() default false; 6 }
  37. TRACKING EVENT PROCESSOR @michieltcs 1 @Configuration 2 public class ProjectionsConfiguration

    { 3 @Autowired 4 private EventHandlingConfiguration eventHandlingConfiguration;
  38. TRACKING EVENT PROCESSOR @michieltcs 1 @PostConstruct 2 public void startTrackingProjections()

    throws ClassNotFoundException { 3 ClassPathScanningCandidateComponentProvider scanner = 4 new ClassPathScanningCandidateComponentProvider(false); 5 scanner.addIncludeFilter(new AnnotationTypeFilter(RebuildableProjection. 6 class)); 7 8 for (BeanDefinition bd : scanner.findCandidateComponents("org.demo")) { 9 Class<?> aClass = Class.forName(bd.getBeanClassName()); 10 RebuildableProjection rebuildableProjection = aClass.getAnnotation( 11 RebuildableProjection. 12 class); 13 14 if (rebuildableProjection.rebuild()) { 15 registerRebuildableProjection(aClass, rebuildableProjection); 16 } 17 } 18 }
  39. TRACKING EVENT PROCESSOR @michieltcs 1 private void registerRebuildableProjection(Class<?> aClass, 2

    RebuildableProjection 3 rebuildableProjection) { 4 ProcessingGroup processingGroup = aClass.getAnnotation(ProcessingGroup. 5 class); 6 7 String name = Optional.ofNullable(processingGroup).map(ProcessingGroup:: 8 value) 9 .orElse(aClass.getName() + "/" + rebuildableProjection.version()); 10 11 eventHandlingConfiguration.assignHandlersMatching( 12 name, 13 Integer.MAX_VALUE, 14 (eventHandler) -> aClass.isAssignableFrom(eventHandler.getClass())); 15 16 eventHandlingConfiguration.registerTrackingProcessor(name); 17 }
  40. TRACKING EVENT PROCESSOR @michieltcs 1 private void registerRebuildableProjection(Class<?> aClass, 2

    RebuildableProjection 3 rebuildableProjection) { 4 ProcessingGroup processingGroup = aClass.getAnnotation(ProcessingGroup. 5 class); 6 7 String name = Optional.ofNullable(processingGroup).map(ProcessingGroup:: 8 value) 9 .orElse(aClass.getName() + "/" + rebuildableProjection.version()); 10 11 eventHandlingConfiguration.assignHandlersMatching( 12 name, 13 Integer.MAX_VALUE, 14 (eventHandler) -> aClass.isAssignableFrom(eventHandler.getClass())); 15 16 eventHandlingConfiguration.registerTrackingProcessor(name); 17 }
  41. Commands can be renamed 1 Events are immutable Correct old

    events with new events 2 3 @michieltcs
  42. @michieltcs Ledger Entry Aug 14 Inventory €15600,00 Accounts Payable €15600,00

    Ledger Entry Aug 14 Inventory €16500,00 Accounts Payable €16500,00
  43. @michieltcs Ledger Entry Aug 14 Inventory €15600,00 Accounts Payable €15600,00

    Ledger Entry Aug 14 Inventory €16500,00 Accounts Payable €16500,00 Ledger Correction Entry Aug 14 Inventory €900,00 Accounts Payable €900,00
  44. COMPENSATING ACTIONS class MoneyWithdrawn {
 String accountId;
 BigDecimal amount;
 }

    class WithdrawalRolledBack {
 String accountId;
 BigDecimal amount;
 } Typo: too much withdrawn!
  45. COMPENSATING ACTIONS class AccountOpened {
 String accountId;
 String accountNumber;
 }

    class DuplicateAccountClosed {
 String accountId;
 } Duplicate account number!
  46. UPCASTING @michieltcs 1 @Value 2 @Revision("1.0") 3 public class AccountOpened

    { 4 String accountId; 5 String accountNumber; 6 }
  47. UPCASTING @michieltcs 1 @Value 2 @Revision("2.0") 3 public class AccountOpened

    { 4 String accountId; 5 String accountNumberIban; 6 }
  48. UPCASTING @michieltcs 1 @Getter 2 public class AccountOpenedUpcaster implements EventUpcaster

    { 3 private final SerializedType typeConsumed 4 = new SimpleSerializedType(AccountOpened.class.getTypeName(), "1.0"); 5 private final SerializedType typeProduced 6 = new SimpleSerializedType(AccountOpened.class.getTypeName(), "2.0");
  49. UPCASTING @michieltcs 1 @Override 2 public final Stream<IntermediateEventRepresentation> upcast( 3

    Stream<IntermediateEventRepresentation> intermediateRepresentations) { 4 return intermediateRepresentations.map(evt -> { 5 if (evt.getType().equals(getTypeConsumed())) { 6 return evt.upcastPayload(getTypeProduced(), Document.class, 7 document -> { 8 Element rootElement = document.getRootElement(); 9 Element accountNumberElement = rootElement.element( 10 "accountNumber"); 11 rootElement.remove(accountNumberElement); 12 rootElement.addElement("accountNumberIban") 13 .setText(toIban(accountNumberElement.getText())); 14 return document; 15 }); 16 } else { 17 return evt; 18 } 19 }); 20 }
  50. UPCASTING @michieltcs 1 @Override 2 public final Stream<IntermediateEventRepresentation> upcast( 3

    Stream<IntermediateEventRepresentation> intermediateRepresentations) { 4 return intermediateRepresentations.map(evt -> { 5 if (evt.getType().equals(getTypeConsumed())) { 6 return evt.upcastPayload(getTypeProduced(), Document.class, 7 document -> { 8 Element rootElement = document.getRootElement(); 9 Element accountNumberElement = rootElement.element( 10 "accountNumber"); 11 rootElement.remove(accountNumberElement); 12 rootElement.addElement("accountNumberIban") 13 .setText(toIban(accountNumberElement.getText())); 14 return document; 15 }); 16 } else { 17 return evt; 18 } 19 }); 20 }
  51. VERSIONED EVENT STORE events_v1 [
 {
 "id": "12345678",
 "type": "AccountOpened",


    "aggregateType": "Account",
 "aggregateIdentifier": "1234",
 "sequenceNumber": 0,
 "payloadRevision": "1.0",
 "payload": { ... },
 "timestamp": ...
 ...
 },
 ...
 ] @michieltcs
  52. VERSIONED EVENT STORE Loop over existing events Apply upcaster Add

    queued events Use new event store New events Queue @michieltcs
  53. VERSIONED EVENT STORE events_v2 [
 {
 "id": "12345678",
 "type": "AccountOpened",


    "aggregateType": "Account",
 "aggregateIdentifier": "1234",
 "sequenceNumber": 0,
 "payloadRevision": "2.0",
 "payload": { ... },
 "timestamp": ...
 ...
 },
 ...
 ] @michieltcs
  54. ' ... shall have the right to obtain ... the

    erasure of personal data concerning him or her without undue delay -GDPR, Article 17
  55. STORE PII EXTERNALLY @michieltcs AccountOpened External Storage 1 @Value 2

    public class AccountOpened { 3 String accountId; 4 } Account Id Account number Name 1234 12345678 John Doe ... ... ...
  56. SHEDDING THE KEY Load
 event Find associated encryption key Decrypt

    payload values Process
 event @michieltcs X
  57. AXON GDPR MODULE @michieltcs 1 @Value 2 public class AccountOpened

    { 3 @DataSubjectId 4 String accountId; 5 6 @PersonalData 7 String accountNumberIban; 8 9 @PersonalData 10 String firstName; 11 12 @PersonalData 13 String lastName;
 14 }