Upgrade to Pro — share decks privately, control downloads, hide ads and more …

実践Kafka Streams 〜イベント駆動型アーキテクチャを添えて〜

実践Kafka Streams 〜イベント駆動型アーキテクチャを添えて〜

JJUG CCC SPRING 2025 登壇資料

Avatar for Tomohiro Hashidate

Tomohiro Hashidate

June 07, 2025
Tweet

More Decks by Tomohiro Hashidate

Other Decks in Technology

Transcript

  1. KStream, KTableの作成 StreamsBuilder builder = new StreamsBuilder(); KStream<String, Long> wordCounts

    = builder.stream( "word-counts-input-topic", /* input topic */ Consumed.with( Serdes.String(), /* key serde */ Serdes.Long() /* value serde */ ); KTable<String, Long> wordCounts = builder.table( "word-counts-input-topic", /* input topic */ Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as( "word-counts-store" /* table/store name */) .withKeySerde(Serdes.String()) /* key serde */ .withValueSerde(Serdes.Long()) /* value serde */ ); 18
  2. 値のフィルタ・加工 KStream<String, Long> onlyPositives = stream.filter((key, value) -> value >

    0); KStream<String, Integer> transformed = stream.map( (key, value) -> KeyValue.pair(value.toLowerCase(), value.length())); // キーを変更するとrepartition flagが立つ KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase()); 19
  3. 処理の分岐と結合 Map<String, KStream<String, Long>> branches = stream.split(Named.as("Branch-")) .branch((key, value) ->

    key.startsWith("A"), /* first predicate */ Branched.as("A")) .branch((key, value) -> key.startsWith("B"), /* second predicate */ Branched.as("B")) .defaultBranch(Branched.as("C")) /* default branch */ ); KStream<byte[], String> merged = stream1.merge(stream2); 20
  4. ワードカウントを題材とした集約の例 KStream<String, Long> wordCounts = textLines .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key,

    word) -> word) // Count the occurrences of each word (record key). // // This will change the stream type from `KGroupedStream<String, String>` to // `KTable<String, Long>` (word -> count). .count() // Convert the `KTable<String, Long>` into a `KStream<String, Long>`. .toStream(); 21
  5. DSLの裏側 単純に値を変換するmapValuesの実装 class KStreamMapValues<KIn, VIn, VOut> implements FixedKeyProcessorSupplier<KIn, VIn, VOut>

    { private final ValueMapperWithKey<KIn, VIn, VOut> mapper; public KStreamMapValues(final ValueMapperWithKey<KIn, VIn, VOut> mapper) { this.mapper = mapper; } @Override public FixedKeyProcessor<KIn, VIn, VOut> get() { return new KStreamMapProcessor(); } private class KStreamMapProcessor extends ContextualFixedKeyProcessor<KIn, VIn, VOut> { @Override public void process(final FixedKeyRecord<KIn, VIn> record) { final VOut newValue = mapper.apply(record.key(), record.value()); context().forward(record.withValue(newValue)); } } 23
  6. Processor APIのInterface public interface Processor<KIn, VIn, KOut, VOut> { default

    void init(final ProcessorContext<KOut, VOut> context) {} void process(Record<KIn, VIn> record); default void close() {} } この様に非常にシンプルであり、基本的にやりたい処理をprocessメソッドに実装するだけ。 後続の処理に流したい場合は、 ProcessorContext#forward を呼ぶ。 25
  7. Statefulな処理の実装方法 以下は、汎用的なAggregateを行うDSLの実装を抜粋したもの。 final ValueAndTimestamp<VAgg> oldAggAndTimestamp = store.get(record.key()); VAgg oldAgg =

    getValueOrNull(oldAggAndTimestamp); final VAgg newAgg; final long newTimestamp; if (oldAgg == null) { oldAgg = initializer.apply(); newTimestamp = record.timestamp(); } else { oldAgg = oldAggAndTimestamp.value(); newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp()); } newAgg = aggregator.apply(record.key(), record.value(), oldAgg); final long putReturnCode = store.put(record.key(), newAgg, newTimestamp); 26
  8. RocksDBのコンフィグ例 BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); tableConfig.setBlockCache(cache); tableConfig.setCacheIndexAndFilterBlocks(true); options.setWriteBufferManager(writeBufferManager); tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);

    tableConfig.setPinTopLevelIndexAndFilter(true); tableConfig.setBlockSize(4 * 1024L); tableConfig.setFilterPolicy(filter); options.setWriteBufferSize(getMemtableSize()); options.setMaxWriteBufferNumber(4); options.setMinWriteBufferNumberToMerge(2); options.setTableFormatConfig(tableConfig); options.setTargetFileSizeBase(256L * 1024 * 1024); options.setLevel0FileNumCompactionTrigger(10); 38