Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Kafka Summit - Introduction to Kafka Streams wi...

Kafka Summit - Introduction to Kafka Streams with a Real-Life Example

On our project, we built a great system to analyze customer records in real time. We pioneered a microservices architecture using Spark and Kafka and we had to tackle many technical challenges. In this session, I will show how Kafka Streams provided a great replacement to Spark Streaming and I will explain how to use this great library to implement low latency data pipelines.

Alexis Seigneurin

May 08, 2017
Tweet

More Decks by Alexis Seigneurin

Other Decks in Technology

Transcript

  1. MICROSERVICES WITH KAFKA — AN INTRODUCTION TO KAFKA STREAMS API

    WITH A REAL-LIFE EXAMPLE Alexis Seigneurin
  2. Who I am • Software engineer for 15+ years •

    Consultant at Ippon USA, previously at Ippon France • Favorite subjects: Spark, Kafka, Machine Learning, Scala • Spark certified • @aseigneurin
  3. • 200+ software engineers in France, the US and Australia

    • In the US: NYC, DC, RVA • Digital, Big Data and Cloud applications • Java & Agile expertise • Open-source projects: JHipster, Tatami, etc. • @ipponusa
  4. The project • Analyze records from customers → Send notifications

    • High volume of data • 25 millions records per day in average + with seasonal peaks • Need to keep at least 60 days of history = 1.5 Billion records • Need an hybrid platform • Batch processing for some types of analysis • Streaming for other analyses • Hybrid team • Data Scientists: more familiar with Python • Software Engineers: Java
  5. Architecture - Real time platform • New detectors are implemented

    by Data Scientists all the time • Need the implementations to be independent from each other • One Spark Streaming job per detector • Microservice-inspired architecture • Diamond-shaped • Upstream jobs are written in Scala • Core is made of multiple Python jobs, one per detector • Downstream jobs are written in Scala • Plumbing between the jobs → Kafka 1/2
  6. Modularity • One Spark job per detector • Hot deployments:

    can roll out new detectors (= new jobs) without stopping existing jobs • Can roll out updated code without affecting other jobs • Able to measure the resources consumed by a single job • Shared services are provided by upstream and downstream jobs
  7. Spark 1.x + Kafka? • Spark has become the de-facto

    processing framework • Provides APIs for multiple programming languages • Python → Data Scientists • Scala/Java → Software Engineers • Supports batch jobs and streaming jobs, incl. support for Kafka…
  8. Consuming from Kafka • Connecting Spark to Kafka, 2 methods:

    • Receiver-based approach: not ideal for parallelism • Direct approach: better for parallelism but have to deal with Kafka offsets Spark + Kakfa problem s
  9. Dealing with Kafka offsets • Default: consumes from the end

    of the Kafka topic (or the beginning) • Documentation → Use checkpoints • Tasks have to be Serializable (not always possible: dependent libraries) • Harder to deploy the application (classes are serialized) → run a new instance in parallel and kill the first one (harder to automate; messages consumed twice) • Requires a shared file system (HDFS, S3) → big latency on these FS that forces to increase the micro-batch interval 1/2 Spark + Kakfa problem s
  10. Dealing with Kafka offsets • Dealing with Kafka offsets •

    Solution: deal with offsets in the Spark Streaming application • Write the offsets to a reliable storage: ZooKeeper, Kafka… • Write after processing the data • Read the offsets on startup (if no offsets, start from the end) • blog.ippon.tech/spark-kafka-achieving-zero-data-loss/ 2/2 Spark + Kakfa problem s
  11. Micro-batches Spark streaming processes events in micro-batches • Impact on

    latency • Spark Streaming micro-batches → hard to achieve sub-second latency • See spark.apache.org/docs/latest/streaming-programming-guide.html#task-launching-overheads • Total latency of the system = sum of the latencies of each stage • In this use case, events are independent from each other - no need for windowing computation → a real streaming framework would be more appropriate • Impact on memory usage • Kafka+Spark using the direct approach = 1 RDD partition per Kafka partition • If you start the Spark with lots of unprocessed data in Kafka, RDD partitions can exceed the size of the memory Spark + Kakfa problem s
  12. Allocation of resources in Spark • With Spark Streaming, resources

    (CPU & memory) are allocated per job • Resources are allocated when the job is submitted and cannot be updated on the fly • Have to allocate 1 core to the Driver of the job → unused resource • Have to allocate extra resources to each job to handle variations in traffic → unused resources • For peak periods, easy to add new Spark Workers but jobs have to restarted • Idea to be tested: • Over allocation of real resources, e.g let Spark know it has 6 cores on a 4-cores server Spark + Kakfa problem s
  13. Python code in production • Data Scientists know Python →

    They can contribute • But shipping code written by Data Scientists is not ideal • Need production-grade code (error handling, logging…) • Code is less tested than Scala code • Harder to deploy than a JAR file → Python Virtual Environments • blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache- hadoop-cluster-for-pyspark-jobs/ Spark + Kakfa problem s
  14. Resilience of Spark Jobs • Spark Streaming application = 1

    Driver + 1 Application • Application = N Executors • If an Executor dies → restarted (seamless) • If the Driver dies, the whole Application must be restarted • Scala/Java jobs → “supervised” mode • Python jobs → not supported with Spark Standalone Spark + Kakfa problem s
  15. Writing to Kafka • Spark Streaming comes with a library

    to read from Kafka but none to write to Kafka! • Flink or Kafka Streams do that out-of-the-box • Cloudera provides an open-source library: • github.com/cloudera/spark-kafka-writer • (Has been removed by now!) Spark + Kakfa problem s
  16. Spark 2.x + Kafka? • New API: Structured Streaming •

    Still ALPHA in 2.1 • Support is improving…
  17. Kafka Streams docs.confluent.io/3.2.0/streams/index.html • “powerful, easy-to-use library for building highly

    scalable, fault-tolerant, distributed stream processing applications on top of Apache Kafka” • Works with Kafka ≥ 0.10 • No cluster needed: Kafka is the cluster manager (consumer groups) • Natively consumes messages from Kafka (and handles offsets) • Natively pushes produced messages to Kafka • Processes messages one at a time → low latency, low footprint • Java library (works best in Java, can work in Scala)
  18. • Read text from a topic • Process the text:

    • Only keep messages containing the “a” character • Capitalize the text • Output the result to another topic Quick Example 1/3
  19. • Create a regular Java application (with a main) •

    Add the Kafka Streams dependency: <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-streams</artifactId>
 <version>0.10.2.1</version>
 </dependency> • Add the Kafka Streams code (next slide) • Build and run the JAR Quick Example 2/3
  20. Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, “text-transformer");
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,

    "localhost:2181");
 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "8");
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
 KStreamBuilder builder = new KStreamBuilder(); 
 builder.stream(Serdes.String(), Serdes.String(), "text-input")
 .filter((key, value) -> value.contains("a"))
 .mapValues(text -> text.toUpperCase())
 .to(Serdes.String(), Serdes.String(), "text-output");
 
 KafkaStreams streams = new KafkaStreams(builder, props);
 streams.start(); Quick Example 3/3 • Application ID = Kafka consumer group • Threads for parallel processing (relates to partitions) • Topic to read from + key/ value deserializers • Transformations: map, filter… • Topic to write to + key/value serializers
  21. Processor Topology • Need to define one or more processor

    topologies • Two APIs to define topologies: • DSL (preferred): map(), filter(), to()… • Processor API (low level): implement the Processor interface then connect source processors, stream processors and sink processors together
  22. Parallelism (one process) • Kafka Streams creates 1 task per

    partition in the input topic • A task is an instance of the topology • Tasks are independent from each other • The number of processing threads is determined by the developer props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "8"); • Tasks are distributed between threads 1/2
  23. Parallelism (one process) • 3 partitions → 3 tasks •

    The tasks are distributed to the 2 threads 2/2
  24. Parallelism (multiple processes) • With multiple processes (multiple instances of

    the JVM), each consumer process is assigned a portion of the partitions → Consumer group • Reassignment of partitions occurs: • When a new consumer joins the group • When a consumer dies → Tasks are created/deleted accordingly 1/2
  25. Parallelism (multiple processes) • Partitions are assigned to 2 consumers

    • 3 partitions → 3 tasks (as before) • Each thread has one task → Improved parallelism 2/2
  26. KStream vs, KTable KStream is a stream of records •

    Records are independent from each other • (Do not use log compaction) Example: 
 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), "input-topic"); Example (inspired from the documentation): • Sum values as records arrive • Records: • (alice, 1) = 1 • (charlie, 1) = 2 • (alice, 3) = 5 • → Adds to (alice, 1)
  27. KStream vs, KTable KTable is a change log stream •

    New records with the same key are an update of previously received records for the same key • Keys are required • Requires a state store Example: KStreamBuilder builder = new KStreamBuilder();
 KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "input-topic", "store-name"); Example (inspired from the documentation): • Sum values as records arrive • Records: • (alice, 1) = 1 • (charlie, 1) = 2 • (alice, 3) = 4 • → Replaces (alice, 1)
  28. map / mapValues Apply a transformation to the records flatMap

    / flatMapValues Apply a transformation to the records and create 0/1/n records per input record filter Apply a predicate groupBy / groupByKey Group the records. Followed by a call to reduce, aggregate or count join / leftJoin / outerJoin Joins 2 KStreams / KTables to Writes the records to a Kafka topic through Writes the records to a Kafka topic and builds a new KStream / KTable from this topic API 1/2
  29. State Stores • Some operations require to store a state

    • KTables (by definition, they need to keep previously received values) • Aggregations (groupBy / groupByKey) • Windowing operations • One state store per task (RocksDB or a hash map) • Backed by internal topics for recovery → fault tolerance • Can be queried internally
  30. Aggregations • Aggregations are performed by keys • Records with

    a null key are ignored • Repartition the data using an internal topic if need be • Aggregations can be windowed or non-windowed • Aggregating a KStream generates a KTable builder.stream(dummySerde, accountSerde, "accounts")
 .map((_, account) => (account.accountId, account))
 .to(stringSerde, accountSerde, "accounts-by-id")
 
 builder.stream(stringSerde, accountSerde, "accounts-by-id")
  31. Joins • Types of joins: inner / outer / left

    join • Operands: • KTable to KTable → Non-windowed • KStream to KTable → Non-windowed • KStream to KStream → Windowed (to avoid an infinitely growing result) • Data must be co-partitioned • Repartition using an internal topic if need be
  32. Deploying and Running • Assemble a JAR (maven-shade plugin) •

    Run the JAR as a regular Java application (java -cp …) • Make sure all instances are in the same consumer group (same application ID)
  33. Running Topic “AUTH-JSON” with 4 partitions Application ID = “auth-converter”

    Log on the first instance: 11:00:22,331 ...AbstractCoordinator - Successfully joined group auth-converter with generation 1 11:00:22,332 ...ConsumerCoordinator - Setting newly assigned partitions [AUTH_JSON-2, AUTH_JSON-1, AUTH_JSON-3, AUTH_JSON-0] for group auth-converter
  34. Running - Scaling up Start a new instance: Log on

    the first instance: 11:01:31,402 ...AbstractCoordinator - Successfully joined group auth-converter with generation 2 11:01:31,404 ...ConsumerCoordinator - Setting newly assigned partitions [AUTH_JSON-2, AUTH_JSON-3] for group auth-converter 11:01:31,390 ...ConsumerCoordinator - Revoking previously assigned partitions [AUTH_JSON-2, AUTH_JSON-1, AUTH_JSON-3, AUTH_JSON-0] for group auth-converter 11:01:31,401 ...ConsumerCoordinator - Setting newly assigned partitions [AUTH_JSON-1, AUTH_JSON-0] for group auth-converter
  35. Running - Scaling down Kill one of the instances Log

    on the remaining instance: 11:02:13,410 ...ConsumerCoordinator - Revoking previously assigned partitions [AUTH_JSON-1, AUTH_JSON-0] for group auth-converter 11:02:13,415 ...ConsumerCoordinator - Setting newly assigned partitions [AUTH_JSON-2, AUTH_JSON-1, AUTH_JSON-3, AUTH_JSON-0] for group auth-converter
  36. Delivery semantics • At least once • No messages will

    be lost • Messages can be processed a second time when failure happens → Make your system idempotent • Exactly once planned for Kafka 0.11 • KIP-98 - Exactly Once Delivery and Transactional Messaging • KIP-129: Streams Exactly-Once Semantics
  37. Migration • Conversion of Spark / Scala code • Upgraded

    from Scala 2.10 to 2.11 and enabled the -Xexperimental flag of the Scala compiler so that Scala lambdas are converted into Java lambdas (SAM support) • Removed lots of specific code to read from / write to Kafka (supported out-of-the-box with Kafka Streams) • API similar to the RDD API → Very straightforward conversion (no need to call foreachRDD, so even better!) • Conversion of Spark / Python code: not attempted
  38. Metrics • Kafka Streams doesn’t have a UI to display

    metrics (e.g. number of records processed) • Used Dropwizard Metrics (metrics.dropwizard.io) • Java API to calculate metrics and send them to various sinks • Used InfluxDB to store the metrics • Graphite compatible • Used Grafana to display the metrics as graphs
  39. Metrics aggregation • Each instance reports its own metrics →

    Need to aggregate metrics • Specific reporter to send Dropwizard Metrics to a Kafka topic • Kafka topic to collect metrics • 1 partition • Key = instance ID (e.g. app-1, app-2…) • Value = monotonic metric • Kafka Streams app to aggregate metrics • Input is a KTable (new values replace previous values) • Send aggregated metrics to InfluxDB
  40. Kafka Streams app to aggregate metrics KTable<String, CounterMetric> metricsStream =

    builder.table(appIdSerde, metricSerde, "metrics", "raw-metrics");
 KStream<String, CounterMetric> metricValueStream = metricsStream
 .groupBy((key, value) -> new KeyValue<>(value.getName(), value), metricNameSerde, metricSerde)
 .reduce(CounterMetric::add, CounterMetric::subtract, "aggregates")
 .toStream() .to(metricNameSerde, metricSerde, "metrics-agg");
 
 // --- Second topology
 
 GraphiteReporter graphite = GraphiteReporter.builder()
 .hostname("localhost")
 .port(2003)
 .build();
 
 KStream<String, CounterMetric> aggMetricsStream = builder.stream(metricNameSerde, metricSerde, "metrics-agg");
 aggMetricsStream.foreach((key, metric) -> graphite.send(metric));
  41. Send data into Kafka (1M records) Start consumer 1 Start

    consumer 2 Aggregated metric (from consumers 1 and 2) Stop consumer 2 Delta = records processed twice
  42. Results Pros • Simpler code (no manual handling of offsets)

    • Simpler packaging (no dependencies to exclude, less dependency version conflicts) • Much lower latency: from seconds to milliseconds • Reduced memory footprint • Easier scaling • Improved stability when restarting the application Cons • No UI • No centralized logs → Use ELK or equivalent… • No centralized metrics → Aggregate metrics • Have to use an intermediate topic if grouping the data by a value that is not the key
  43. Summary • Very easy to build pipelines on top of

    Kafka • Great fit for micro-services • Compared to Spark Streaming: • Better for realtime apps than Spark Streaming • Lower latency, lower memory footprint, easier scaling • Lower level: good for prod, lacks a UI for dev • Compared to a standard Kafka consumer: • Higher level: faster to build a sophisticated app • Less control for very fine-grained consumption