Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Exactly-Once Semantics and Transactions in Kafka

Exactly-Once Semantics and Transactions in Kafka

Messaging guarantees are hard, especially when working in a distributed system like Kafka. See how Kafka can be used to guarantee producer / consumer messaging semantics such as: at least one, at most once, and exactly once. Simplify your client event loop by leveraging what Kafka gives you for free.See the actual configurations used to deliver these guarantees using transactions, and how they’re actually implemented inside of the distributed Kafka environment itself.

Keith Resar

March 07, 2023
Tweet

More Decks by Keith Resar

Other Decks in Technology

Transcript

  1. 1 2 3 Client message semantics (a primer!) Messaging guarantees

    (a visual guide) Kafka Transactions in action (detailed view)
  2. Messaging Semantics Explained At least once At most once Exactly

    once Missing messages ok Duplicate messages ok
  3. Producer sends message, receives no feedback from the broker, so

    retries. May result in producing duplicates messages. Messaging Semantics Explained AT-LEAST-ONCE 1 of 3 broker Producer
  4. Producer sends message and does not retry when ack times

    out or an error is returned. May result in lost messages messages. Messaging Semantics Explained AT-MOST-ONCE 2 of 3 broker Producer ¯\_(ツ)_/¯
  5. Producer sends message and retries in event of failure. Messaging

    Semantics Explained EXACTLY-ONCE 3 of 3 broker Producer
  6. Consumer reads messages and commits offset only after processing. If

    consumer fails prior to commit it will reprocess messages. Messaging Semantics Explained AT-LEAST-ONCE 1 of 3 consumer broker Danger Zone!
  7. Consumer reads messages and commits offset before processing. If consumer

    fails prior to processing message will be lost. Messaging Semantics Explained AT-MOST-ONCE 2 of 3 consumer broker Danger Zone!
  8. Consumer reads messages and commits offset only after processing. Messaging

    Semantics Explained EXACTLY-ONCE 3 of 3 consumer broker
  9. Topic A Transaction Scope Consume Process Produce Strong guarantees for

    event processing where all endpoints are topics within the same Kafka cluster. Read-Process-Write Topic B KStreams
  10. Topic A Transaction Scope Consume Process Produce Weak guarantees when

    processing makes calls to external services or those with non-deterministic responses. External / Non-deterministic Topic B KStreams
  11. Topic A Transaction Scope Consume Process Produce No guarantees when

    producing outside of the Kafka cluster, such as sinking data to a database. Requires idempotency. External Database KStreams
  12. “Light” Control with Idempotent Writer 1. // The old way

    - limit throughput 2. acks=all 3. producer.max.in.flight.requests.per.connection=1 4.
  13. “Light” Control with Idempotent Writer 1. // The old way

    - limit throughput 2. acks=all 3. producer.max.in.flight.requests.per.connection=1 4. 5. 6. // The better way - use client library 7. enable.idempotence=true 8. producer.max.in.flight.requests.per.connection=5 # or less 9.
  14. “Light” Control with Idempotent Writer 1. // The old way

    - limit throughput 2. acks=all 3. producer.max.in.flight.requests.per.connection=1 4. 5. 6. // The better way - use client library 7. enable.idempotence=true 8. producer.max.in.flight.requests.per.connection=5 # or less 9. M1 (PID: 1, SN: 1) - written to partition. For PID 1, Max SN=1 M2 (PID: 1, SN: 2) - written to partition. For PID 1, Max SN=2 M3 (PID: 1, SN: 2) - rejected, SN <= Max SN M4 (PID: 1, SN: 3) - written to partition. For PID 1, Max SN=3 broker Producer 3 1 2 2
  15. “Light” Control with Idempotent Reader 1. // Broker only makes

    committed transactions visible. 2. // Reads block on uncommitted transactions 2. isolation.level=read_committed 3.
  16. “Light” Control with Idempotent Reader 1. // Broker only makes

    committed transactions visible. 2. // Reads block on uncommitted transactions 2. isolation.level=read_committed 3. 4. // KStreams EOS and all processing is done within a transaction 5. processing.guarantee=exactly_once 6.
  17. “Light” Control with Idempotent Reader 1. // Broker only makes

    committed transactions visible. 2. // Reads block on uncommitted transactions 2. isolation.level=read_committed 3. consumer broker 3 1 2 3 1 2 2 Idempotent read
  18. Enabling Transaction Support is Easy // Kafka Streams // 1.

    import java.util.Properties; 2. import org.apache.kafka.streams.StreamsConfig; 3. 4. Properties props = new Properties(); 5. // Set a few key parameters 6. props.put(StreamsConfig.APPLICATION_ID_CONFIG, 7. "my-first-streams-application"); 8. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 8. //props.put(StreamsConfig.PROCESSING_GUARANTEE, "at_least_once"); 9. props.put( StreamsConfig.PROCESSING_GUARANTEE, "exactly_once_v2"); 10.
  19. Enabling Transaction Support is Easy // Java (consume / produce)

    // 1. KafkaProducer producer = createKafkaProducer( 2. “bootstrap.servers”, “localhost:9092”, 3. “transactional.id”, “my-transactional-id”); 4. 5. producer.initTransactions(); 6. 7. KafkaConsumer consumer = createKafkaConsumer( 8. “bootstrap.servers”, “localhost:9092”, 9. “group.id”, “my-group-id”, 10. "isolation.level", "read_committed"); 11. 12. consumer.subscribe(singleton(“inputTopic”)); 13. 14. while (true) { 15. ConsumerRecords records = consumer.poll(Long.MAX_VALUE); 16. producer.beginTransaction(); 17. for (ConsumerRecord record : records) 18. producer.send(producerRecord(“outputTopic”, record)); 19. producer.sendOffsetsToTransaction(currentOffsets(consumer), group); 20. producer.commitTransaction(); 21. }
  20. Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers

    Brokers Brokers Consumer offsets log tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) User topics M0 (PID) M1 (PID) Commit (PID) Transaction log App ID → PID InsertTp (PID) Prepare (PID) Committed (PID)
  21. Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers

    Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Find transaction coordinator. (FindCoordinatorRequest) Consumer offsets log Transaction log User topics 1 1
  22. Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers

    Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Get a producer ID (InitPidRequest) Consumer offsets log Transaction log User topics 2 2 1 2
  23. while (true) { consumer.poll producer.beginTransaction(); // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Starting the transaction (beginTransaction() API) Consumer offsets log Transaction log User topics 3 3 1 2 2
  24. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Consume transform produce loop Consumer offsets log Transaction log User topics 4 1 2 2 4
  25. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Producer sends partitions in scope for the transaction to the TC. TC starts timer. (AddPartitionsToTxnRequest) Consumer offsets log Transaction log User topics 41 1 2 2 4 4 1 4 1
  26. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Producer writes messages to User topics. Each request includes the PID, Epoch, and sequence number. Consumer offsets log Transaction log User topics 4 2 1 2 2 4 4 1 4 1 42 42
  27. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Send offsets to the TC to reference the partitions in the __consumer-offsets topic (AddOffsetCommitsToTxnRequest) Consumer offsets log Transaction log User topics 4 3 1 2 2 4 1 4 1 42 42 43 43 43
  28. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Producer persists offset with CC (TxnOffsetCommitRequest) Consumer offsets log Transaction log User topics 44 1 2 2 4 1 4 1 42 42 43 43 44 44 44
  29. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Once data is written we either commit or abort the transaction (commitTransaction, abortTransaction) Consumer offsets log Transaction log User topics 5 1 2 2 4 1 4 1 42 42 43 43 5 44 44
  30. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Prepare commit with TC (EndTxnRequest) Consumer offsets log Transaction log User topics 51 1 2 2 4 1 4 1 42 42 43 43 5 44 44 51 51
  31. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) TC writes marker request to leader of in-scope topic partitions (COMMIT or ABORT). (WriteTxnMarkerRequest) Consumer offsets log Transaction log User topics 52 1 2 2 4 1 4 1 42 42 43 43 5 44 44 51 51 52 52 52
  32. while (true) { consumer.poll producer.beginTransaction // consume / transform /

    produce producer.sendOffsetsToTransaction producer.commitTransaction } Transaction Flow Consumer Producer Application Consumer Coordinator Transaction Coordinator Brokers Brokers Brokers tp0-offset-x (PID) tp1-offset-y (PID) Commit (PID) M0 (PID) M1 (PID) Commit (PID) App ID → PID InsertTp (PID) Prepare (PID) Committed (PID) Write final commit or abort message to complete the transaction. Consumer offsets log Transaction log User topics 53 1 2 2 4 1 4 1 42 42 43 43 5 44 44 51 51 52 52 52 53
  33. 1KB Message w/ 100 ms processing time.. 3% lower throughput

    than “the old way” acks=all max.in.flight.requests.per.connection=1 20% lower throughput than the default acks=1 max.in.flight.requests.per.connection=5