builder.table(appIdSerde, metricSerde, "metrics", "raw-metrics");
KStream<String, CounterMetric> metricValueStream = metricsStream
.groupBy((key, value) -> new KeyValue<>(value.getName(), value), metricNameSerde, metricSerde)
.reduce(CounterMetric::add, CounterMetric::subtract, "aggregates")
.toStream() .to(metricNameSerde, metricSerde, "metrics-agg");
// --- Second topology
GraphiteReporter graphite = GraphiteReporter.builder()
.hostname("localhost")
.port(2003)
.build();
KStream<String, CounterMetric> aggMetricsStream = builder.stream(metricNameSerde, metricSerde, "metrics-agg");
aggMetricsStream.foreach((key, metric) -> graphite.send(metric));