diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java index 33d786a9..7dd1a67d 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java @@ -265,8 +265,6 @@ public void handleBatch(List> records, RecordCommitt PubsubMessage message = buildPubSubMessage(record); deliveries.add(publisher.publish(message)); - - committer.markProcessed(record); } List messageIds; try { @@ -276,6 +274,12 @@ public void handleBatch(List> records, RecordCommitt throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); + + // Once publishing is confirmed, mark all records as processed + for (ChangeEvent record : records) { + committer.markProcessed(record); + } + committer.markBatchFinished(); } diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java index 6a07a23c..86798bdf 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java @@ -132,7 +132,6 @@ public void handleBatch(List> records, RecordCommitt PubsubMessage message = buildPubSubMessage(record); deliveries.add(publisher.publish(message)); - committer.markProcessed(record); } List messageIds; try { @@ -142,6 +141,12 @@ public void handleBatch(List> records, RecordCommitt throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); + + // Once publishing is confirmed, mark all records as processed + for (ChangeEvent record : records) { + committer.markProcessed(record); + } + committer.markBatchFinished(); }