Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Elementary Analytics with Kafka Streams (Anna McDonald, Confluent) | RTA Summit '23

Elementary Analytics with Kafka Streams (Anna McDonald, Confluent) | RTA Summit '23

In life, there are certainly situations where only ANOVA will do. However, many times valuable insights can be found with a much simpler approach. Whether it’s direct analysis or data format transformations, Kafka Streams has got you covered.

In this talk we will cover the core features of Kafka Streams that enable elementary analytics and data transformation.

Including:
– Windowing and window types
– Aggregate functions
– Continuous to discrete mappings

Armed with this information, your understanding of conducting elementary analytics in Kafka Streams will improve exponentially with no chance of regression.

StarTree

May 23, 2023
Tweet

More Decks by StarTree

Other Decks in Technology

Transcript

  1. @jbfletch_ Why should I not do advanced Math Stat? Java

    • No elegant support for 2d dynamic arrays • Lack of support for more advanced DoE test etc.
  2. @jbfletch_ Why should I not do advanced Math Stat? Java

    • No elegant support for 2d dynamic arrays • Lack of support for more advanced DoE test etc. Pandas, R!
  3. A Case for Kafka Streams @jbfletch_ I want to run

    my analysis as soon as all my assumptions are met. I don’t want to wait until a window closes if I have everything I need…
  4. Example Assumption Templates @jbfletch_ • Number of Observations == 30

    • Number of Unique States (as in NY) represented == 50
  5. Example Assumption Templates @jbfletch_ • Number of Observations == 30

    • Number of Unique States (as in NY) represented == 50 • Specific combinations for Design of Experiments
  6. Example Assumption Templates @jbfletch_ • Number of Observations == 30

    • Number of Unique States (as in NY) represented == 50 • Specific combinations for Design of Experiments • Study Group completeness
  7. Example Assumption Templates @jbfletch_ • Number of Observations == 30

    • Number of Unique States (as in NY) represented == 50 • Specific combinations for Design of Experiments • Study Group completeness • Number of observations == Given model prediction confidence
  8. Time Wall-Clock Time @jbfletch_ • The actual time based on

    system time or your watch • Not fully available in all DSL Methods • Used when there is no way to drive time by incoming messages
  9. Time @MatthiasJSax Stream Time @jbfletch_ • Per Partition aka Stream

    Task • Advances Based on Incoming Messages at a Partition Level
  10. @jbfletch_ Stream Time 101 this.testTopology.input() .at(1000).add("1",obs1) .at(1100).add("1",obs2) .at(1200).add("3",obs3) .at(1300).add("4",obs4) .at(1400).add("1",obs5)

    .at(1500).add("6",obs6) .at(1510).add("10",obs10) .at(1520).add("11",obs11) .at(1530).add("1",obs13) .at(1540).add("4",obs14) .at(1550).add("1",obs15) .at(1560).add("4",obs16) .at(12000).add("7",obs7) .at(12100).add("1",obs8) .at(12200).add("4",obs9); Stream Time: 12200
  11. Group Data • What do I need to reason about?

    Ex. Store Location, Shoe Type, Feature ID groupBy @jbfletch_
  12. @jbfletch_ Tumbling Windows • Non-Overlapping • Controlled by a Fixed-Size

    Window • Uniquely Identifiable as: Memory Changelog <key>@[start epoch]/[end epoch] <key>@[start epoch] <pass in window size>
  13. @jbfletch_ Hopping Windows • Overlapping • Fixed Sized • Controlled

    by Window Size and Advance • Uniquely Identifiable as: Memory Changelog <key>@[start epoch]/[end epoch] <key>@[start epoch] <pass in window size>
  14. @jbfletch_ Sliding Windows aka Look Back • Slides continuously along

    a timeline • Fixed Size • Controlled by max time difference between two records of the same key
  15. @jbfletch_ Sliding Windows aka Look Back • Slides continuously along

    a timeline • Fixed Size • Controlled by max time difference between two records of the same key • Uniquely Identifiable as: Memory Changelog <key>@[start epoch]/[end epoch] <key>@[start epoch] <pass in window size>
  16. @jbfletch_ Sliding Windows aka Look Back Key Value Record Time

    Stream Time A 1 8000 8000 A 2 9200 9200 A 3 12400 12400 Phil 496 13200 13200 Angela Lansbury 96 14500 14500 We'd have the following 5 windows: • window [3000;8000] contains [1] (created when first record enters the window) • window [4200;9200] contains [1,2] (created when second record enters the window) • window [7400;12400] contains [1,2,3] (created when third record enters the window) • window [8001;13001] contains [2,3] (created when the first record drops out of the window) • window [9201;14201] contains [3] (created when the second record drops out of the window) Time Difference = 5000
  17. @jbfletch_ Session Windows • Non-Overlapping with Gaps • Unfixed Size!

    • Controlled by defining an inactivity gap for seeing records with a given key
  18. @jbfletch_ Session Windows • Non-Overlapping with Gaps • Unfixed Size!

    • Controlled by defining an inactivity gap for seeing records with a given key • Uniquely Identifiable as: Memory Changelog <key>@[start epoch]/[end epoch] <key>@[start epoch]/[end epoch]
  19. @jbfletch_ Aggregate • Simple Aggregates - Count, Summation, etc. •

    Create an array containing all values for further analysis
  20. @jbfletch_ Aggregate • Simple Aggregates - Count, Summation, etc. •

    Create an array containing all values for further analysis • Continuous to Discrete Mappings
  21. @jbfletch_ Putting it all Together! Task: Calculate the following descriptive

    statistics when and only when you have at least 30 observations for each class of sneaker: • Count • Min • Max • Mean • Skewness • Kurtosis
  22. @jbfletch_ Putting it all Together! We want fixed size non-overlapping

    windows so we choose tumbling and define our window size TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10));
  23. @jbfletch_ Putting it all Together! We define our input KTable

    from our baseStream and create our groupBy, windowBy and aggregate TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10)); KTable<Windowed<String>, List<Integer>> observationsByClass = baseStream .groupBy((k,v) -> v.path("sneakerID").asText()) .windowedBy(tumblingWindow) .aggregate(() -> new ArrayList<Integer>(), (key, value, aggregate) -> { aggregate.add(value.path("peeps").asInt()); return aggregate; } , Materialized.<String, List<Integer>, WindowStore<Bytes, byte[]>>as("classes").withValueSerde(listSerde));
  24. @jbfletch_ Putting it all Together! We define our input KTable

    from our baseStream and create our groupBy, windowBy and aggregate TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10)); KTable<Windowed<String>, List<Integer>> observationsByClass = baseStream .groupBy((k,v) -> v.path("sneakerID").asText()) .windowedBy(tumblingWindow) .aggregate(() -> new ArrayList<Integer>(), (key, value, aggregate) -> { aggregate.add(value.path("peeps").asInt()); return aggregate; } , Materialized.<String, List<Integer>, WindowStore<Bytes, byte[]>>as("classes").withValueSerde(listSerde));
  25. @jbfletch_ Putting it all Together! We define our input KTable

    from our baseStream and create our groupBy, windowBy and aggregate TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10)); KTable<Windowed<String>, List<Integer>> observationsByClass = baseStream .groupBy((k,v) -> v.path("sneakerID").asText()) .windowedBy(tumblingWindow) .aggregate(() -> new ArrayList<Integer>(), (key, value, aggregate) -> { aggregate.add(value.path("peeps").asInt()); return aggregate; } , Materialized.<String, List<Integer>, WindowStore<Bytes, byte[]>>as("classes").withValueSerde(listSerde));
  26. @jbfletch_ Putting it all Together! observationsByClass .toStream() .filter((k,v) -> v.size()

    == 30) .map( ((stringWindowed, integers) -> { DescriptiveStatistics stats= new DescriptiveStatistics(); double[] array = integers.stream().mapToDouble(Integer::doubleValue).toArray(); Arrays.stream(array) .forEach(obs -> { stats.addValue(obs); }); System.out.println("Number of Values: " + String.valueOf(stats.getN())); System.out.println("Min Value: " + String.valueOf(stats.getMin())); System.out.println("Max Value: " + String.valueOf(stats.getMax())); System.out.println("Mean: " + String.valueOf(stats.getMean())); System.out.println("Skewness: " + String.valueOf(stats.getSkewness())); System.out.println("Kurtosis: " + String.valueOf(stats.getKurtosis())); return KeyValue.pair(stringWindowed, integers); }) ) .print(Printed.<Windowed<String>, List<Integer>>toSysOut().withLabel("Full Key and Values")); Time to apply our statistical assumption template (SAT)
  27. @jbfletch_ Putting it all Together! observationsByClass .toStream() .filter((k,v) -> v.size()

    == 30) .map( ((stringWindowed, integers) -> { DescriptiveStatistics stats= new DescriptiveStatistics(); double[] array = integers.stream().mapToDouble(Integer::doubleValue).toArray(); Arrays.stream(array) .forEach(obs -> { stats.addValue(obs); }); System.out.println("Number of Values: " + String.valueOf(stats.getN())); System.out.println("Min Value: " + String.valueOf(stats.getMin())); System.out.println("Max Value: " + String.valueOf(stats.getMax())); System.out.println("Mean: " + String.valueOf(stats.getMean())); System.out.println("Skewness: " + String.valueOf(stats.getSkewness())); System.out.println("Kurtosis: " + String.valueOf(stats.getKurtosis())); return KeyValue.pair(stringWindowed, integers); }) ) .print(Printed.<Windowed<String>, List<Integer>>toSysOut().withLabel("Full Key and Values")); Create and add the observations in our window to a new stats object
  28. @jbfletch_ Putting it all Together! observationsByClass .toStream() .filter((k,v) -> v.size()

    == 30) .map( ((stringWindowed, integers) -> { DescriptiveStatistics stats= new DescriptiveStatistics(); double[] array = integers.stream().mapToDouble(Integer::doubleValue).toArray(); Arrays.stream(array) .forEach(obs -> { stats.addValue(obs); }); System.out.println("Number of Values: " + String.valueOf(stats.getN())); System.out.println("Min Value: " + String.valueOf(stats.getMin())); System.out.println("Max Value: " + String.valueOf(stats.getMax())); System.out.println("Mean: " + String.valueOf(stats.getMean())); System.out.println("Skewness: " + String.valueOf(stats.getSkewness())); System.out.println("Kurtosis: " + String.valueOf(stats.getKurtosis())); return KeyValue.pair(stringWindowed, integers); }) ) .print(Printed.<Windowed<String>, List<Integer>>toSysOut().withLabel("Full Key and Values")); And finally…calculate our required descriptive statistics!