@EnableKafkaStreams public class KafkaStreamsConfig { @Bean public KStream<String, StockQuote> kStream(StreamsBuilder streamsBuilder) { KStream<String, StockQuote> branchedStream = new KafkaStreamBrancher<String, StockQuote>() .branch((key, value) -> value.getExchange().equalsIgnoreCase("NYSE"), kStream -> kStream.to("stock-quotes-nyse")) .branch((key, value) -> value.getExchange().equalsIgnoreCase("NASDAQ"), kStream -> kStream.to("stock-quotes-nasdaq")) .branch((key, value) -> value.getExchange().equalsIgnoreCase("AMS"), kStream -> kStream.to("stock-quotes-ams")) .defaultBranch(kStream -> kStream.to("stock-quotes-exchange-other")) .onTopOf(streamsBuilder.stream("stock-quotes")); return branchedStream; } }