diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java index 96955afdc860..8bab4fdcc7af 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java @@ -400,15 +400,13 @@ public boolean tryClaim(Map position) { this.restriction.fetchedRecords = fetchedRecords; LOG.debug("-------------- History: {}", this.restriction.history); - if (this.restriction.maxRecords == null && this.restriction.milisToRun == -1) { - return true; - } - // If we've reached the maximum number of records OR the maximum time, we reject // the attempt to claim. // If we've reached neither, then we continue approve the claim. return (this.restriction.maxRecords == null || fetchedRecords < this.restriction.maxRecords) - && (this.restriction.milisToRun == null || elapsedTime < this.restriction.milisToRun); + && (this.restriction.milisToRun == null + || this.restriction.milisToRun == -1 + || elapsedTime < this.restriction.milisToRun); } @Override