diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index add76c9682a0..4d7aa6b32aef 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -21,11 +21,9 @@ import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -463,7 +461,8 @@ public ProcessContinuation processElement( // and move to process the next element. if (rawRecords.isEmpty()) { if (!topicPartitionExists( - kafkaSourceDescriptor.getTopicPartition(), consumer.listTopics())) { + kafkaSourceDescriptor.getTopicPartition(), + consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) { return ProcessContinuation.stop(); } if (timestampPolicy != null) { @@ -557,20 +556,10 @@ public ProcessContinuation processElement( } private boolean topicPartitionExists( - TopicPartition topicPartition, Map> topicListMap) { + TopicPartition topicPartition, List partitionInfos) { // Check if the current TopicPartition still exists. - Set existingTopicPartitions = new HashSet<>(); - for (List topicPartitionList : topicListMap.values()) { - topicPartitionList.forEach( - partitionInfo -> { - existingTopicPartitions.add( - new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - }); - } - if (!existingTopicPartitions.contains(topicPartition)) { - return false; - } - return true; + return partitionInfos.stream() + .anyMatch(partitionInfo -> partitionInfo.partition() == (topicPartition.partition())); } // see https://github.com/apache/beam/issues/25962 diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 52c141685760..cbff0f896619 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -252,6 +252,17 @@ public synchronized Map> listTopics() { topicPartition.topic(), topicPartition.partition(), null, null, null))); } + @Override + public synchronized List partitionsFor(String partition) { + if (this.isRemoved) { + return ImmutableList.of(); + } else { + return ImmutableList.of( + new PartitionInfo( + topicPartition.topic(), topicPartition.partition(), null, null, null)); + } + } + @Override public synchronized void assign(Collection partitions) { assertTrue(Iterables.getOnlyElement(partitions).equals(this.topicPartition));