diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 7ba487437e..70770c81a9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2932,7 +2932,7 @@ private void initPartitionsIfNeeded() { }); doInitialSeeks(partitions, beginnings, ends); if (this.consumerSeekAwareListener != null) { - this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream() + this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream() .map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp))) .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())), this.seekCallback); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 68a2324433..078adbf813 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -2784,7 +2784,21 @@ public void testInitialSeek() throws Exception { containerProps.setGroupId("grp"); containerProps.setAckMode(AckMode.RECORD); containerProps.setClientId("clientId"); - containerProps.setMessageListener((MessageListener) r -> { }); + + Map assigned = new HashMap<>(); + class Listener extends AbstractConsumerSeekAware implements MessageListener { + + @Override + public void onMessage(Object data) { + } + + @Override + public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { + assigned.putAll(assignments); + } + + } + containerProps.setMessageListener(new Listener()); containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -2802,6 +2816,7 @@ public void testInitialSeek() throws Exception { verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE); verify(consumer).seek(new TopicPartition("foo", 6), 42L); container.stop(); + assertThat(assigned).hasSize(8); } @SuppressWarnings({ "unchecked", "rawtypes" })