Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Fix topic creation may not complete after METADATA response is sent
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed May 16, 2021
1 parent 74a9590 commit 11c2935
Showing 1 changed file with 36 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,19 +465,38 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
getAllTopicsAsync(pulsarTopicsFuture);
} else {
// get only the provided topics
Map<String, List<TopicName>> pulsarTopics = Maps.newHashMap();
final Map<String, List<TopicName>> pulsarTopics = new ConcurrentHashMap<>();

List<String> requestTopics = metadataRequest.topics();
final int topicsNumber = requestTopics.size();
AtomicInteger topicsCompleted = new AtomicInteger(0);

final Runnable completeOneTopic = () -> {
if (topicsCompleted.incrementAndGet() == topicsNumber) {
if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Completed get {} topic's partitions",
ctx.channel(), metadataHar.getHeader(), topicsNumber);
}
pulsarTopicsFuture.complete(pulsarTopics);
}
};

final BiConsumer<String, Integer> addTopicPartition = (topic, partition) -> {
final KopTopic kopTopic = new KopTopic(topic);
pulsarTopics.putIfAbsent(topic,
IntStream.range(0, partition)
.mapToObj(i -> TopicName.get(kopTopic.getPartitionName(i)))
.collect(Collectors.toList()));
completeOneTopic.run();
};

requestTopics.stream()
.forEach(topic -> {
KopTopic kopTopic = new KopTopic(topic);
final String fullTopicName = new KopTopic(topic).getFullName();

// get partition numbers for each topic.
// If topic doesn't exist and allowAutoTopicCreation is enabled, the topic will be created first.
getPartitionedTopicMetadataAsync(kopTopic.getFullName())
getPartitionedTopicMetadataAsync(fullTopicName)
.whenComplete((partitionedTopicMetadata, throwable) -> {
if (throwable != null) {
// Failed get partitions.
Expand All @@ -489,19 +508,15 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
Collections.emptyList()));
log.warn("[{}] Request {}: Failed to get partitioned pulsar topic {} metadata: {}",
ctx.channel(), metadataHar.getHeader(),
kopTopic.getFullName(), throwable.getMessage());
fullTopicName, throwable.getMessage());
completeOneTopic.run();
} else {
List<TopicName> pulsarTopicNames;
if (partitionedTopicMetadata.partitions > 0) {
if (log.isDebugEnabled()) {
log.debug("Topic {} has {} partitions",
topic, partitionedTopicMetadata.partitions);
}
pulsarTopicNames = IntStream
.range(0, partitionedTopicMetadata.partitions)
.mapToObj(i -> TopicName.get(kopTopic.getPartitionName(i)))
.collect(Collectors.toList());
pulsarTopics.put(topic, pulsarTopicNames);
addTopicPartition.accept(topic, partitionedTopicMetadata.partitions);
} else {
if (kafkaConfig.isAllowAutoTopicCreation()
&& metadataRequest.allowAutoTopicCreation()) {
Expand All @@ -510,14 +525,16 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
+ "auto create partitioned topic",
ctx.channel(), metadataHar.getHeader(), topic);
}
admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(),
defaultNumPartitions);
pulsarTopicNames = IntStream
.range(0, defaultNumPartitions)
.mapToObj(i -> TopicName.get(kopTopic.getPartitionName(i)))
.collect(Collectors.toList());
pulsarTopics.put(topic, pulsarTopicNames);

admin.topics().createPartitionedTopicAsync(fullTopicName, defaultNumPartitions)
.whenComplete((ignored, e) -> {
if (e == null) {
addTopicPartition.accept(topic, defaultNumPartitions);
} else {
log.error("[{}] Failed to create partitioned topic {}",
ctx.channel(), topic, e);
completeOneTopic.run();
}
});
} else {
// NOTE: Currently no matter topic is a non-partitioned topic or topic doesn't
// exist, the queried partitions from broker are both 0.
Expand All @@ -532,19 +549,10 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
topic,
false,
Collections.emptyList()));
completeOneTopic.run();
}
}
}

// whether handled all topics get partitions
int completedTopics = topicsCompleted.incrementAndGet();
if (completedTopics == topicsNumber) {
if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Completed get {} topic's partitions",
ctx.channel(), metadataHar.getHeader(), topicsNumber);
}
pulsarTopicsFuture.complete(pulsarTopics);
}
});
});
}
Expand Down

0 comments on commit 11c2935

Please sign in to comment.