$30 off During Our Annual Pro Sale. View Details »

3 Flink Mistakes We Made So You Won’t Have To

Robert Metzger
September 28, 2023

3 Flink Mistakes We Made So You Won’t Have To

Robert Metzger

September 28, 2023
Tweet

More Decks by Robert Metzger

Other Decks in Technology

Transcript

  1. 3 Flink Mistakes We Made
    So You Won’t Have To
    Robert Metzger, Staff Engineer @ Decodable
    Apache Flink Committer and PMC Chair
    Sharon Xie, Founding Engineer @ Decodable

    View Slide

  2. What we’ll be talking about today
    #1 Data Loss with Flink Exactly-Once Delivery to Kafka
    #2 Inefficient Memory Configuration
    #3 Inefficient Checkpointing Config

    View Slide

  3. #1 Data Loss with Flink Exactly-Once
    Delivery to Kafka

    View Slide

  4. Two Phase Commit for EO - Happy Path

    View Slide

  5. Two Phase Commit for EO - Phase 1 Failure

    View Slide

  6. Two Phase Commit for EO - Phase 2 Failure

    View Slide

  7. Life is doomed when…
    Phase 2 can’t be successful 💣🔥

    View Slide

  8. Important Kafka Broker Configurations
    transaction.max.timeout.ms
    ● Default: 900000 (15 minutes)
    transactional.id.expiration.ms
    ● Default: 604800000 (7 days)

    View Slide

  9. Timeout Causes Data Loss

    View Slide

  10. ● Flink Kafka Producer creates a new transaction id for each checkpoint per task
    ● transactional.id.expiration.ms = 604800000 (7 days)
    Excessive Memory Usage

    View Slide

  11. ● transaction.max.timeout.ms = 604800000 (7 days)
    ○ From default: 15min
    ● transactional.id.expiration.ms = 3600000 (1 hour)
    ○ From default: 7 days
    Better Kafka Transaction Configuration

    View Slide

  12. When a checkpoint/savepoint to restore is over 1 hour (the new
    transactional.id.expiration.ms) old
    org.apache.kafka.common.errors.InvalidPidMappingException: The
    producer attempted to use a producer id which is not currently
    assigned to its transactional id.
    InvalidPidMappingException

    View Slide

  13. Short-term: Ignore InvalidPidMappingException 😇
    ● ONLY when transaction.timeout.ms (Kafka client configuration in Flink)
    > transactional.id.expiration.ms
    Long-term: 🤝
    ● KIP-939: Support Participation in 2PC
    ● FLIP-319: Integrate with Kafka's Support for Proper 2PC Participation
    Fix InvalidPidMappingException

    View Slide

  14. What we’ll be talking about today
    #1 Data Loss with Flink Exactly-Once Delivery to Kafka ✅
    #2 Inefficient Memory Configuration
    #3 Inefficient Checkpointing Config

    View Slide

  15. #2 Inefficient Memory Configuration

    View Slide

  16. How to Tune TaskManager Memory
    ● Flink automatically computes memory budgets
    Just provide total process size.
    ● Main memory consumers
    ○ Framework + Task heap
    ○ RocksDB State backend (off-heap)
    ○ Network stack (off-heap)
    ○ JVM internal structures [metaspace, thread
    stacks] (off-heap)

    View Slide

  17. How to Tune TaskManager Memory
    ● Example: taskmanager.memory.process.size: 8gb
    JVM internal structures
    [metaspace, thread stacks]
    (off-heap)
    Framework + Task heap
    RocksDB State backend
    (off-heap)
    Network stack (off-heap)

    View Slide

  18. How to Tune TaskManager Memory
    ● Let’s tune for this particular job
    150mb
    700mb
    2300mb
    = 3150mb unused memory

    View Slide

  19. How to Tune TaskManager Memory
    ● Give as much memory as possible to Managed Memory = RocksDB
    taskmanager.memory.task.heap.size: 1 gb
    taskmanager.memory.managed.size: 5800 mb
    taskmanager.memory.network.min: 32 mb
    taskmanager.memory.network.max: 32 mb
    taskmanager.memory.jvm-metaspace.size: 120 mb

    View Slide

  20. ● Stateful workloads with RocksDB benefit
    most from as much memory as possible
    → Check out the full documentation:
    https://nightlies.apache.org/flink/flink-docs
    -master/docs/deployment/memory/mem_s
    etup/
    Memory Configuration Wrap Up

    View Slide

  21. What we’ll be talking about today
    #1 Data Loss with Flink Exactly-Once Delivery to Kafka ✅
    #2 Inefficient Memory Configuration ✅
    #3 Inefficient Checkpointing Config

    View Slide

  22. execution.checkpointing.interval: 10s
    execution.checkpointing.min-pause:
    10s
    Make sure your job is not
    spending all the time
    checkpointing
    Image source: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#tuning-checkpointing
    #3 Reliable, Fast Checkpointing

    View Slide

  23. state.backend: rocksdb
    state.backend.incremental: true
    Only upload the diff to the last checkpoint
    #33
    full
    #34
    incremental
    #35
    incremental
    Reliable, Fast Checkpointing

    View Slide

  24. state.backend.local-recovery: true
    Local recovery: Only re-download the state on
    failed machines
    After a failure without local recovery:
    All TaskManagers download the state
    TM1 TM2 TM3 TM4
    1 - TM4 fails
    TM1 TM2 TM3 TM4
    2 - Recovery
    With local recovery: Most machines use local
    disks, only one needs to download
    TM1 TM2 TM3 TM4
    1 - TM4 fails
    TM1 TM2 TM3 TM4
    2 - Recovery
    Reliable, Fast Checkpointing

    View Slide

  25. Fast Checkpointing and State
    Put your RocksDB state on the fastest available
    disk. Typically a local SSD.
    TaskManager
    Your Flink
    Worker
    Remote EBS
    Volume
    Your Flink
    Worker
    TaskManager
    Local
    SSD

    View Slide

  26. The End – Q&A
    Robert Metzger, Staff Engineer @ Decodable
    Apache Flink Committer and PMC Chair
    Sharon Xie, Founding Engineer @ Decodable
    Get your free decodable.co account today if you want us to
    handle the issues discussed in the talk.
    Visit the Decodable Booth (201) for any Flink related questions.

    View Slide

  27. Fast Checkpointing and State
    ● RocksDB stores your state on the /tmp directory
    ● On AWS Kubernetes, that’s by default an EBS volume
    Type Size IOPS (max) Throughput Price per Month
    io1 950 GB 64000 $4278
    io2 block express 950 GB 256000 $9769
    gp3 950 GB 16000 1000 mb/s $176
    M6gd.4xlarge
    64g | 16c
    950 GB Read: 93000
    Write: 222000
    $+78 per instance for
    a local NVMe SSD
    → Using an instance type with a local SSD gives you by far the best performance per $
    We just mount the entire Docker working directory on the local SSD.

    View Slide

  28. ● Flink EO with Kafka can still cause data loss
    ● Transaction timeout is the key
    ● Flink EO implementation can consume excessive memory from Kafka
    ● A better approach with Flink + Kafka is under way
    Recap

    View Slide