diff --git a/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java b/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java index b4f2f978..6bfe067d 100644 --- a/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java +++ b/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java @@ -6,9 +6,12 @@ package io.debezium.server.kafka; import java.time.Duration; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -88,13 +91,16 @@ void stop() { public void handleBatch(final List> records, final RecordCommitter> committer) throws InterruptedException { + + final List> deliveryFutures = new ArrayList<>(records.size()); + for (ChangeEvent record : records) { try { LOGGER.trace("Received event '{}'", record); Headers headers = convertKafkaHeaders(record); String topicName = streamNameMapper.map(record.destination()); - Future recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, null, null, record.key(), record.value(), headers), + deliveryFutures.add(producer.send(new ProducerRecord<>(topicName, null, null, record.key(), record.value(), headers), (metadata, exception) -> { if (exception != null) { LOGGER.error("Failed to send record to {}:", topicName, exception); @@ -103,18 +109,36 @@ public void handleBatch(final List> records, else { LOGGER.trace("Sent message with offset: {}", metadata.offset()); } - }); + })); + } + catch (Exception e) { + throw new DebeziumException(e); + } + } + + try { + for (int i = 0; i < records.size(); i++) { + final var recordMetadataFuture = deliveryFutures.get(i); + final var record = records.get(i); + if (waitMessageDeliveryTimeout == 0) { recordMetadataFuture.get(); } else { - recordMetadataFuture.get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS); + try { + recordMetadataFuture.get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + LOGGER.error("Timed out while waiting to send a record to '{}'", streamNameMapper.map(record.destination())); + throw new DebeziumException(e); + } + } committer.markProcessed(record); } - catch (Exception e) { - throw new DebeziumException(e); - } + } + catch (ExecutionException e) { + throw new DebeziumException(e); } committer.markBatchFinished();