Upgrade to Pro — share decks privately, control downloads, hide ads and more …

CQRS+ES解体新書 / CQRS ES Disassembly Book

nrs
May 22, 2024

CQRS+ES解体新書 / CQRS ES Disassembly Book

「アーキテクチャを突き詰める Online Conference」にてお話した内容です。

CQRS+ESについてナラティブにしゃべるためのスライドなので、特に前半部分はわかりづらいかと思います。
ご了承ください。

◆ URL
Twitter: https://twitter.com/nrslib
イベント:https://findy.connpass.com/event/314782/
実践PUB/SUBマイクロサービス:https://www.youtube.com/watch?v=gejnwpvsWJE

nrs

May 22, 2024
Tweet

More Decks by nrs

Other Decks in Programming

Transcript

  1. public record DocCreated( DocId docId, String body ) implements DocEvent

    { } public record Doc( DocId docId, String body ) implements EventDrivenAggregateRoot<DocEvent> { ... } public record DocCreate(DocId docId, String body) implements DocCommand { }
  2. @Configuration public class AxonConfiguration { @Bean public CorrelationDataProvider processIdCorrelationDataProvider() {

    return new MultiCorrelationDataProvider<CommandMessage<?>>( List.of( new SimpleCorrelationDataProvider("processId"), new MessageOriginProvider() ) ); } }
  3. axon: kafka: client-id: pubsubdoc-service-back producer: retries: 0 consumer: event-processor-mode: tracking

    properties: security.protocol: PLAINTEXT default-topic: pubsubdoc-service-topic bootstrap-servers: localhost:29092,localhost:29192,localhost:29292
  4. application: kafka: topics: doc-service-topic, payment-service-topic, pubsubdoc-service-topic, user-service-topic @Configuration @ConditionalOnExpression("!${application.disable-kafka:false} and

    '${axon.kafka.consumer.event-processor-mode}' == 'trackin public class AxonKafkaConfiguration { @Value("${application.kafka.topics}") private List<String> topics; @Bean public StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource(...) { return StreamableKafkaMessageSource.<String, byte[]>builder() .topics(topics) .consumerFactory(kafkaConsumerFactory) .fetcher(kafkaFetcher) .messageConverter(kafkaMessageConverter) .bufferFactory ( () -> new SortedKafkaMessageBuffer<>( properties.getFetcher().getBufferSize() )
  5. @Configuration @ConditionalOnExpression("!${application.disable-kafka:false} and '${axon.kafka.consumer.event-pr public class AxonKafkaTrackingConfiguration { @Autowired public

    void registerProcessor( EventProcessingConfigurer configurer, StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource) { var processorNames = List.of( DocProjection.class.getPackageName(), UserProjection.class.getPackageName() ); processorNames.forEach(it -> configurer.registerTrackingEventProcessor( it, c -> streamableKafkaMessageSource )); } }
  6. axon: eventhandling: processors: "[com.example.service.processor]": initialSegmentCount: 3 eventProcessingConfiguration.eventProcessors() .map { it.value

    } .filter { it is TrackingEventProcessor }.forEach { val casted = it as TrackingEventProcessor casted.splitSegment(segmentId) }