Upgrade to Pro — share decks privately, control downloads, hide ads and more …

KSQL - The power of SQL, the simplicity of SQL

KSQL - The power of SQL, the simplicity of SQL

Kafka, KSQL

Avatar for Alexis Seigneurin

Alexis Seigneurin

January 29, 2019
Tweet

More Decks by Alexis Seigneurin

Other Decks in Technology

Transcript

  1. FREE E-BOOK I WROTE A guide to producing, consuming and

    processing events en.ippon.tech/apache-kafka
  2. KAFKA: A FEW REMINDERS Publish-subscribe system Messages are published to

    topics Messages are persisted Messages can be read by multiple consumers Multiple partitions for scaling
  3. PRODUCER AND CONSUMER API (JAVA) Low-level API Producer: send() Consumer:

    subscribe(), seek(), poll()… Consumer groups
  4. KAFKA STREAMS Client library for Java and Scala DSL: from(),

    map(), filter(), to()… Aggregations, joins, windowing Simple deployment model Allows to create “microservices” KStream / KTable
  5. KAFKA STREAMS: QUICK EXAMPLE Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG,

    "text-transformer"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "8"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder streamsBuilder = new StreamsBuilder(); streamsBuilder.stream("text-input", Consumed.with(Serdes.String(), Serdes.String())) .filter((key, value) -> value.contains("a")) .mapValues(text -> text.toUpperCase()) .to("text-output", Produced.with(Serdes.String(), Serdes.String())); Topology topology = streamsBuilder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); Application ID = Kafka consumer group Threads for parallel processing (relates to partitions) Topic to read from + key/value deserializers Transformations: map, filter… Topic to write to + key/value serializers
  6. KAFKA STREAMS: KSTREAM KStream: stream of events Records are independent

    from each other Stateless Example (inspired from the documentation): ‣ Sum values as records arrive ‣ Records: ‣ (alice, 1) = 1 ‣ (charlie, 1) = 2 ‣ (alice, 3) = 5 ‣ → Adds to (alice, 1)
  7. KAFKA STREAMS: KTABLE KTable: change log stream New records =

    updates Stateful (requires a state store) Example (inspired from the documentation): ‣ Sum values as records arrive ‣ Records: ‣ (alice, 1) = 1 ‣ (charlie, 1) = 2 ‣ (alice, 3) = 4 ‣ → Replaces (alice, 1)
  8. KSQL Built on top of Kafka Streams SQL queries translate

    into Kafka Streams applications ‣ CREATE STREAM → KStream ‣ CREATE TABLE → KTable Interactive or non-interactive queries
  9. KSQL: EXAMPLE { “title":"Toy Story”, ”id”:"862", "imdb_id":"tt0114709", … } {

    "title":"Full Metal Jacket”, “id”:”600”, ”imdb_id":"tt0093058", … } … { “userId":"14", “movieId":"1287", “rating":2.0, "timestamp":1260759187} {“userId”:”28", "movieId":"862", "rating": 2.0, "timestamp":1260759148} {“userId”:”7", "movieId":"1339", "rating": 3.5, “timestamp":1260759125} … Movielens dataset Topic: ratings Topic: movies
  10. KTABLE OF MOVIES CREATE TABLE movies ( \ adult VARCHAR,

    \ belongs_to_collection VARCHAR, \ budget VARCHAR, \ genres VARCHAR, \ homepage VARCHAR, \ id VARCHAR, \ imdb_id VARCHAR, \ original_language VARCHAR, \ original_title VARCHAR, \ overview VARCHAR, \ popularity VARCHAR, \ poster_path VARCHAR, \ production_companies VARCHAR, \ production_countries VARCHAR, \ release_date VARCHAR, \ revenue VARCHAR, \ runtime VARCHAR, \ spoken_languages VARCHAR, \ status VARCHAR, \ tagline VARCHAR, \ title VARCHAR, \ video VARCHAR, \ vote_average VARCHAR, \ vote_count VARCHAR) \ WITH (KAFKA_TOPIC = 'movies', \ VALUE_FORMAT='JSON', \ KEY = 'id'); SHOW TABLES; DESCRIBE EXTENDED movies;
  11. KSTREAM OF RATINGS CREATE STREAM ratings ( \ userId VARCHAR,

    \ movieId VARCHAR, \ rating DOUBLE, \ timestamp BIGINT \ ) \ WITH (KAFKA_TOPIC = 'ratings', VALUE_FORMAT='JSON'); DESCRIBE EXTENDED ratings; SELECT USERID, MOVIEID, RATING FROM RATINGS;
  12. JOIN RATINGS WITH MOVIES SELECT USERID, MOVIEID, TITLE, RATING \

    FROM RATINGS \ LEFT JOIN MOVIES ON RATINGS.MOVIEID = MOVIES.ID;
  13. QUERY RUNNING IN THE BACKGROUND CREATE STREAM ratings_enriched \ WITH

    (KAFKA_TOPIC = 'ratings_enriched', VALUE_FORMAT='JSON') \ AS SELECT USERID, MOVIEID, TITLE, RATING \ FROM RATINGS \ LEFT JOIN MOVIES ON RATINGS.MOVIEID = MOVIES.ID; EXPLAIN CSAS_RATINGS_ENRICHED_0; kafka-console-consumer --bootstrap-server localhost:9092 --topic ratings_enriched Kill the KSQL CLI... TERMINATE CSAS_RATINGS_ENRICHED_0; DROP STREAM ratings_enriched;
  14. WINDOWED AGGREGATIONS $ kafka-console-consumer --bootstrap-server localhost:9092 --topic events CREATE STREAM

    events ( \ id VARCHAR, \ info VARCHAR, \ source VARCHAR, \ userid BIGINT ) \ WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT='JSON');
  15. WINDOWED AGGREGATIONS SELECT * FROM EVENTS; SELECT \ SOURCE, \

    WINDOWSTART(), \ WINDOWEND(), \ COUNT(*) \ FROM EVENTS \ WINDOW TUMBLING(SIZE 5 SECONDs) \ GROUP BY SOURCE; SELECT \ SOURCE, \ TIMESTAMPTOSTRING(WINDOWSTART(), 'yyyy-MM-dd HH:mm:ss'), \ TIMESTAMPTOSTRING(WINDOWEND(), 'yyyy-MM-dd HH:mm:ss'), \ COUNT(*) \ FROM EVENTS \ WINDOW TUMBLING(SIZE 5 SECONDs) \ GROUP BY SOURCE;
  16. WINDOWED AGGREGATIONS Support for late events Event time vs Processing

    time | t1 | t2 | t3 | ... |* * * * |** * | * *** * *| ... | t1 | t2 | t3 | ... |* * * . |** * | * *** * *| ... \-----> *(t1)