Skip to content

Commit

Permalink
GH-2178: Fix CSA.onPartitionsAssigned (Manual)
Browse files Browse the repository at this point in the history
Resolves #2178

The wrong collection was used as the source for the `assignments` map,

`SeekPosition.BEGINNING` and `.END` entries are removed; use the
`definedPartitions` field instead.

**cherry-pick to 2.8.x, 2.7.x**
  • Loading branch information
garyrussell authored and artembilan committed Mar 21, 2022
1 parent b27a0d6 commit 243e68c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2932,7 +2932,7 @@ private void initPartitionsIfNeeded() {
});
doInitialSeeks(partitions, beginnings, ends);
if (this.consumerSeekAwareListener != null) {
this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream()
this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream()
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
this.seekCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2784,7 +2784,21 @@ public void testInitialSeek() throws Exception {
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setMessageListener((MessageListener) r -> { });

Map<TopicPartition, Long> assigned = new HashMap<>();
class Listener extends AbstractConsumerSeekAware implements MessageListener {

@Override
public void onMessage(Object data) {
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assigned.putAll(assignments);
}

}
containerProps.setMessageListener(new Listener());
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
Expand All @@ -2802,6 +2816,7 @@ public void testInitialSeek() throws Exception {
verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE);
verify(consumer).seek(new TopicPartition("foo", 6), 42L);
container.stop();
assertThat(assigned).hasSize(8);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down

0 comments on commit 243e68c

Please sign in to comment.