Skip to content

Commit

Permalink
DBZ-8251 Sent records without waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane authored and vjuranek committed Sep 23, 2024
1 parent 0190e66 commit 54709ea
Showing 1 changed file with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
package io.debezium.server.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -88,13 +91,16 @@ void stop() {
public void handleBatch(final List<ChangeEvent<Object, Object>> records,
final RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {

final List<Future<RecordMetadata>> deliveryFutures = new ArrayList<>(records.size());

for (ChangeEvent<Object, Object> record : records) {
try {
LOGGER.trace("Received event '{}'", record);
Headers headers = convertKafkaHeaders(record);

String topicName = streamNameMapper.map(record.destination());
Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, null, null, record.key(), record.value(), headers),
deliveryFutures.add(producer.send(new ProducerRecord<>(topicName, null, null, record.key(), record.value(), headers),
(metadata, exception) -> {
if (exception != null) {
LOGGER.error("Failed to send record to {}:", topicName, exception);
Expand All @@ -103,18 +109,36 @@ public void handleBatch(final List<ChangeEvent<Object, Object>> records,
else {
LOGGER.trace("Sent message with offset: {}", metadata.offset());
}
});
}));
}
catch (Exception e) {
throw new DebeziumException(e);
}
}

try {
for (int i = 0; i < records.size(); i++) {
final var recordMetadataFuture = deliveryFutures.get(i);
final var record = records.get(i);

if (waitMessageDeliveryTimeout == 0) {
recordMetadataFuture.get();
}
else {
recordMetadataFuture.get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS);
try {
recordMetadataFuture.get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
LOGGER.error("Timed out while waiting to send a record to '{}'", streamNameMapper.map(record.destination()));
throw new DebeziumException(e);
}

}
committer.markProcessed(record);
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
catch (ExecutionException e) {
throw new DebeziumException(e);
}

committer.markBatchFinished();
Expand Down

0 comments on commit 54709ea

Please sign in to comment.