chain of processing of beans External Channels: Channels are to be connected to external systems like Kafka or AMQP broker External channels Internal channels
access record metadata var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow(); // process the message payload. double price = msg.getPayload(); // Acknowledge the incoming message (commit the offset) return msg.ack(); } Java file
commits an offset of the latest acked message in sequence (meaning, all previous messages were also acked). This is the default if enable.auto.commit is not explicitly set to true. latest commits the record offset received by the Kafka consumer as soon as the associated message is acknowledged (if the offset is higher than the previously committed offset). ignore performs no commit. This strategy is the default strategy when the consumer is explicitly configured with enable.auto.commit to true.
will be processed (default strategy). The offset of the record that has not been processed correctly is not committed. ignore the failure is logged, but the processing continue. The offset of the record that has not been processed correctly is committed. dead-letter-queue the offset of the record that has not been processed correctly is committed, but the record is written to a Kafka dead letter topic.
Multiple consumer threads inside a consumer group Multiple consumer applications inside a consumer group Pub/Sub: Multiple consumer groups subscribed to a topic 4 Patterns for consuming:
java.time.Duration; import java.util.Random; @ApplicationScoped public class KafkaPriceProducer { private final Random random = new Random(); @Outgoing("prices") public Multi<Double> generate() { // Build an infinite stream of random prices // It emits a price every second return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) .map(x -> random.nextDouble()); } } Java file
imports omitted... @Path("/prices") public class PriceResource { @Inject @Channel("prices") Emitter<Double> priceEmitter; @POST @Consumes(MediaType.TEXT_PLAIN) public void addPrice(Double price) { priceEmitter.send(Message.of(price) .withAck(() -> { // Called when the message is acked return CompletableFuture.completedFuture(null); }) .withNack(throwable -> { // Called when the message is nacked return CompletableFuture.completedFuture(null); })); } } Java file