Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Виктор Гамов — Kafka Streams IQ: «Зачем нам баз...

Виктор Гамов — Kafka Streams IQ: «Зачем нам база данных? Нам база не нужна!»

Рост популярности Apache Kafka как потоковой платформы, потребовал пересмотра традиционного подхода к распределенной обработке данных. Kafka Streams позволяет разрабатывать приложения без каких-либо кластеров. Подход «кластер на коленке» позволяет начать разработку и не задумываться о том, сможем ли мы потом масштабироваться (spoiler alert: Сможем!).

А слабо выкинуть традиционную базу данных для хранения результатов и промежуточного состояния?

Доклад про Interactive Queries — часть API Kafka Streams, которая позволяет получить доступ к состоянию приложения без использования традиционных хранилищ — БД, кэшей и тп. Как такой подход позволяет упростить архитектуру для использования Kafka Stream в микросервисах.

Moscow JUG

May 08, 2019
Tweet

More Decks by Moscow JUG

Other Decks in Programming

Transcript

  1. @gamussa | #jugmsk | @ConfluentINc ЗАчем нам БД? Нам База

    не нужна… May, 2019 / Moscow, Russia @gamussa | #jugmsk | @ConfluentINc
  2. @gamussa | #jugmsk | @ConfluentINc 6 Agenda Kafka Streams 101

    What is state and why stateful stream processing? Technical Deep Dive on Interactive Queries What does IQ provide? State handling with Kafka Streams How to use IQ?
  3. @gamussa | #jugmsk | @ConfluentINc 8 Kafka Streams – 101

    Stream Processing Kafka Streams Kafka Connect Kafka Connect Other Systems Other Systems
  4. @gamussa | #jugmsk | @ConfluentINc 10 LET’S TALK ABOUT THIS

    FRAMEWORK OF YOURS. I THINK ITS GOOD, EXCEPT IT SUCKS
  5. @gamussa | #jugmsk | @ConfluentINc 11 SO LET ME SHOW

    KAFKA STREAMS THAT WAY IT MIGHT BE REALLY GOOD
  6. @gamussa | #jugmsk | @ConfluentINc 16 Stay and write your

    Java apps, I’m going to write ETL jobs
  7. @gamussa | #jugmsk | @confluentinc the KAFKA STREAMS API is

    a 
 JAVA API to 
 BUILD REAL-TIME APPLICATIONS
  8. @gamussa | #jugmsk | @ConfluentINc 21 Brokers? Nope! App Streams

    API App Streams API App Streams API Same app, many instances
  9. @gamussa | #jugmsk | @ConfluentINc 22 Stateful Stream Processing What

    is State? ◦ Anything your application needs to “remember” beyond the scope of a single record
  10. @gamussa | #jugmsk | @ConfluentINc 23 Stateful Stream Processing Stateless

    operators: ◦ filter, map, flatMap, foreach Stateful operator: ◦ Aggregations ◦ Any window operation ◦ Joins ◦ CEP
  11. @gamussa | #jugmsk | @ConfluentINc 24 Stock Trade Stats Example

    KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as("trade-aggregates") .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)));
  12. @gamussa | #jugmsk | @ConfluentINc 25 Stock Trade Stats Example

    KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as("trade-aggregates") .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)));
  13. @gamussa | #jugmsk | @ConfluentINc 26 Stock Trade Stats Example

    KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as("trade-aggregates") .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)));
  14. @gamussa | #jugmsk | @ConfluentINc 27 Stock Trade Stats Example

    KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as("trade-aggregates") .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)));
  15. @gamussa | #jugmsk | @ConfluentINc 28 Topologies Source Node Processor

    Node Processor Node Sink Node streams state stores Processor Topology builder.stream source.groupByKey .windowedBy(…) .aggregate(…) mapValues() to(…)
  16. @gamussa | #jugmsk | @ConfluentINc 29 Accessing Application State External

    DB: query database Local State: Interactive Queries ◦ Allows to query the local DB instances ◦ Kafka Streams application is the database ◦ Read-Only access (no “external” state updates) ◦ Works for DSL or Processor API
  17. @gamussa | #jugmsk | @ConfluentINc 30 Accessing Application State –

    “DB to go” Interactive Queries turn Kafka Streams application into queryable light-weight database
  18. @gamussa | #jugmsk | @ConfluentINc 31 Kafka Streams uses Local

    State Light-weight embedded DB Local data access No RPC, network I/O
  19. @gamussa | #jugmsk | @ConfluentINc 32 Scaling Local State Topics

    are scaled via partitions Kafka Streams scales via tasks • 1:1 mapping from input topic partitions to tasks State can be sharded based on partitions • Local DB for each shard • 1:1 mapping from state shard to task
  20. @gamussa | #jugmsk | @ConfluentINc 33 Partitions, Tasks, and Consumer

    Groups input topic result topic 4 input topic partitions => 4 tasks Task executes processor topology One consumer group: 
 can be executed with 1 - 4 threads on 1 - 4 machines
  21. @gamussa | #jugmsk | @ConfluentINc 35 Trade Stats App Trade

    Stats App “no state” Scaling with State Instance 1 Instance 2
  22. @gamussa | #jugmsk | @ConfluentINc 36 Instance 1 Trade Stats

    App Instance 2 Instance 3 Trade Stats App Trade Stats App “no state” Scaling with State
  23. @gamussa | #jugmsk | @ConfluentINc 38 Fault-Tolerance Instance 1 Trade

    Stats App Instance 2 Instance 3 Trade Stats App Trade Stats App
  24. @gamussa | #jugmsk | @ConfluentINc 40 Trade Stats App Migrate

    State Instance 2 Trade Stats App Changelog Topic Instance 1 Trade Stats App restore
  25. @gamussa | #jugmsk | @ConfluentINc 41 Recovery Time Changelog topics

    are log compacted Size of changelog topic linear in size of state Large state implies high recovery times
  26. @gamussa | #jugmsk | @ConfluentINc 42 Recovery Overhead Recovery overhead

    is proportional to ◦ segment-size / state-size Segment-size is smaller than state-size => reduced overhead Update changelog topic segment size accordingly ◦ topic config: log.segments.bytes ◦ log cleaner interval important, too
  27. @gamussa | #jugmsk | @ConfluentINc 44 key-X 7 key-A 5

    key-g 4 key-B 7 key-h 2 key-Y 3 How to use Interactive Queries Query local state Discover remote instances Query remote state
  28. @gamussa | #jugmsk | @ConfluentINc 46 Discover Remote Instances Kafka

    Streams application instances are started independently ◦ What other instances are there? ◦ Which instances hold which data? key-A 5 key-B 7 get(“key-C”) key-g 4 key-h 2 get(“key-A”)
  29. @gamussa | #jugmsk | @ConfluentINc 47 Discover Remote Instances Kafka

    Streams metadata API to the rescue: ◦ KafkaStreams#allMetadata() ◦ KafkaStreams#allMetadataForStore(String storeName) ◦ KafkaStreams#allMetadataForKey(
 String storeName,
 K key,
 Serializer<K> keySerializer) Returns StreamsMetadata object
  30. @gamussa | #jugmsk | @ConfluentINc 48 Discover Remote Instances StreamsMetadata

    object ◦ Host information (server/port) according to application.server config ◦ State store names of hosted stores ◦ TopicPartition mapped to the store (ie, shard)
  31. @gamussa | #jugmsk | @ConfluentINc 49 How to access data

    of another instance? key-A 5 key-B 7 get(“key-C”) key-g 4 key-h 2 get(“key-A”) ???
  32. @gamussa | #jugmsk | @ConfluentINc 50 Query remote state via

    RPC Choose whatever you like: ◦ REST ◦ gRPC ◦ Apache Thrift ◦ XML-RPC ◦ Java RMI ◦ SOAP
  33. @gamussa | #jugmsk | @ConfluentINc 51 Query remote state get(“key-C”)

    key-A 5 key-B 7 key-g 4 key-h 2 key-X 7 key-Y 3 1234 1234 1235 get(“key-A”) <key-A,5> application.server=host1:1234 application.server=host2:1234 application.server=host2:1235 host1 host2
  34. @gamussa | #jugmsk | @ConfluentINc 53 PROS There are fewer

    moving pieces; no external database It enables faster and more efficient use of the application state It provides better isolation It allows for flexibility Mix-and-match local an remote DB
  35. @gamussa | #jugmsk | @ConfluentINc 54 Cons It may involve

    moving away from a datastore you know and trust You might want to scale storage independently of processing You might need customized queries, specific to some datastores
  36. @gamussa | #jugmsk | @ConfluentINc 56 Summary Interactive Queries is

    a powerful abstractions that simplifies stateful stream processing There are still cases for which external database/storage might be a better fit – with IQ you can choose!
  37. @gamussa | #jugmsk | @ConfluentINc 57 Summary Music example: https://github.com/confluentinc/

    examples/blob/master/kafka-streams/src/main/java/io/ confluent/examples/streams/interactivequeries/kafkamusic/ KafkaMusicExample.java Streaming Movie Ratings: https://github.com/ confluentinc/demo-scene/tree/master/streams-movie-demo
  38. 59