Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Apply getPartitionedTopicMetadata API with correct semantics #522

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,22 @@ CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>> describeC
admin.topics().getPartitionedTopicMetadataAsync(kopTopic.getFullName())
.whenComplete((metadata, e) -> {
if (e != null) {
future.complete(new DescribeConfigsResponse.Config(
ApiError.fromThrowable(e), Collections.emptyList()));
if (e instanceof PulsarAdminException.NotFoundException) {
final ApiError error = new ApiError(
Errors.UNKNOWN_TOPIC_OR_PARTITION,
"Topic " + kopTopic.getOriginalName() + " doesn't exist");
future.complete(new DescribeConfigsResponse.Config(
error, Collections.emptyList()));
} else {
future.complete(new DescribeConfigsResponse.Config(
ApiError.fromThrowable(e), Collections.emptyList()));
}
} else if (metadata.partitions > 0) {
future.complete(defaultTopicConfig);
} else {
final ApiError error = new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
"Topic " + kopTopic.getOriginalName() + " doesn't exist");
final ApiError error = new ApiError(Errors.INVALID_TOPIC_EXCEPTION,
"Topic " + kopTopic.getOriginalName()
+ " is non-partitioned");
future.complete(new DescribeConfigsResponse.Config(
error, Collections.emptyList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -498,32 +499,12 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
getPartitionedTopicMetadataAsync(fullTopicName)
.whenComplete((partitionedTopicMetadata, throwable) -> {
if (throwable != null) {
// Failed get partitions.
allTopicMetadata.add(
new TopicMetadata(
Errors.UNKNOWN_TOPIC_OR_PARTITION,
topic,
false,
Collections.emptyList()));
log.warn("[{}] Request {}: Failed to get partitioned pulsar topic {} metadata: {}",
ctx.channel(), metadataHar.getHeader(),
fullTopicName, throwable.getMessage());
completeOneTopic.run();
} else {
if (partitionedTopicMetadata.partitions > 0) {
if (log.isDebugEnabled()) {
log.debug("Topic {} has {} partitions",
topic, partitionedTopicMetadata.partitions);
}
addTopicPartition.accept(topic, partitionedTopicMetadata.partitions);
} else {
if (throwable instanceof PulsarAdminException.NotFoundException) {
if (kafkaConfig.isAllowAutoTopicCreation()
&& metadataRequest.allowAutoTopicCreation()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Topic {} has single partition, "
+ "auto create partitioned topic",
ctx.channel(), metadataHar.getHeader(), topic);
}
log.info("[{}] Request {}: Topic {} doesn't exist, auto create it with {} "
+ "partitions", ctx.channel(), metadataHar.getHeader(),
topic, defaultNumPartitions);
admin.topics().createPartitionedTopicAsync(fullTopicName, defaultNumPartitions)
.whenComplete((ignored, e) -> {
if (e == null) {
Expand All @@ -535,21 +516,47 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
}
});
} else {
// NOTE: Currently no matter topic is a non-partitioned topic or topic doesn't
// exist, the queried partitions from broker are both 0.
// See https://github.com/apache/pulsar/issues/8813 for details.
log.error("[{}] Request {}: Topic {} doesn't exist and it's not allowed to"
+ "auto create partitioned topic",
ctx.channel(), metadataHar.getHeader(), topic);
// not allow to auto create topic, return unknown topic
allTopicMetadata.add(
new TopicMetadata(
Errors.UNKNOWN_TOPIC_OR_PARTITION,
topic,
false,
Collections.emptyList()));
new TopicMetadata(
Errors.UNKNOWN_TOPIC_OR_PARTITION,
topic,
false,
Collections.emptyList()));
completeOneTopic.run();
}
} else {
// Failed get partitions.
allTopicMetadata.add(
new TopicMetadata(
Errors.UNKNOWN_TOPIC_OR_PARTITION,
topic,
false,
Collections.emptyList()));
log.warn("[{}] Request {}: Failed to get partitioned pulsar topic {} metadata: {}",
ctx.channel(), metadataHar.getHeader(),
fullTopicName, throwable.getMessage());
completeOneTopic.run();
}
} else { // the topic already existed
if (partitionedTopicMetadata.partitions > 0) {
if (log.isDebugEnabled()) {
log.debug("Topic {} has {} partitions",
topic, partitionedTopicMetadata.partitions);
}
addTopicPartition.accept(topic, partitionedTopicMetadata.partitions);
} else {
log.error("Topic {} is a non-partitioned topic", topic);
allTopicMetadata.add(
new TopicMetadata(
Errors.INVALID_TOPIC_EXCEPTION,
topic,
false,
Collections.emptyList()));
completeOneTopic.run();
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.internals.Topic;
import org.apache.pulsar.client.admin.Clusters;
Expand All @@ -27,8 +25,6 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -118,13 +114,13 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
if (!tenants.getTenants().contains(kafkaMetadataTenant)) {
log.info("Tenant: {} does not exist, creating it ...", kafkaMetadataTenant);
tenants.createTenant(kafkaMetadataTenant,
new TenantInfo(Sets.newHashSet(conf.getSuperUserRoles()), Sets.newHashSet(cluster)));
new TenantInfo(Sets.newHashSet(conf.getSuperUserRoles()), Sets.newHashSet(cluster)));
} else {
TenantInfo kafkaMetadataTenantInfo = tenants.getTenantInfo(kafkaMetadataTenant);
Set<String> allowedClusters = kafkaMetadataTenantInfo.getAllowedClusters();
if (!allowedClusters.contains(cluster)) {
log.info("Tenant: {} exists but cluster: {} is not in the allowedClusters list, updating it ...",
kafkaMetadataTenant, cluster);
kafkaMetadataTenant, cluster);
allowedClusters.add(cluster);
tenants.updateTenant(kafkaMetadataTenant, kafkaMetadataTenantInfo);
}
Expand All @@ -135,15 +131,15 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
Namespaces namespaces = pulsarAdmin.namespaces();
if (!namespaces.getNamespaces(kafkaMetadataTenant).contains(kafkaMetadataNamespace)) {
log.info("Namespaces: {} does not exist in tenant: {}, creating it ...",
kafkaMetadataNamespace, kafkaMetadataTenant);
kafkaMetadataNamespace, kafkaMetadataTenant);
Set<String> replicationClusters = Sets.newHashSet(cluster);
namespaces.createNamespace(kafkaMetadataNamespace, replicationClusters);
namespaces.setNamespaceReplicationClusters(kafkaMetadataNamespace, replicationClusters);
} else {
List<String> replicationClusters = namespaces.getNamespaceReplicationClusters(kafkaMetadataNamespace);
if (!replicationClusters.contains(cluster)) {
log.info("Namespace: {} exists but cluster: {} is not in the replicationClusters list,"
+ "updating it ...", kafkaMetadataNamespace, cluster);
+ "updating it ...", kafkaMetadataNamespace, cluster);
Set<String> newReplicationClusters = Sets.newHashSet(replicationClusters);
newReplicationClusters.add(cluster);
namespaces.setNamespaceReplicationClusters(kafkaMetadataNamespace, newReplicationClusters);
Expand Down Expand Up @@ -171,40 +167,7 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
namespaceExists = true;

// Check if the offsets topic exists and create it if not
Topics topics = pulsarAdmin.topics();
PartitionedTopicMetadata topicMetadata =
topics.getPartitionedTopicMetadata(kopTopic.getFullName());

Set<String> partitionSet = new HashSet<>(partitionNum);
for (int i = 0; i < partitionNum; i++) {
partitionSet.add(kopTopic.getPartitionName(i));
}

if (topicMetadata.partitions <= 0) {
log.info("Kafka group metadata topic {} doesn't exist. Creating it ...", kopTopic.getFullName());

topics.createPartitionedTopic(
kopTopic.getFullName(),
partitionNum
);

log.info("Successfully created kop metadata topic {} with {} partitions.",
kopTopic.getFullName(), partitionNum);
} else {
// Check to see if the partitions all exist
partitionSet.removeAll(
topics.getList(kafkaMetadataNamespace).stream()
.filter((topic) -> topic.startsWith(kopTopic.getFullName()))
.collect(Collectors.toList())
);

if (!partitionSet.isEmpty()) {
log.info("Identified missing kop metadata topic {} partitions: {}", kopTopic, partitionSet);
for (String offsetPartition : partitionSet) {
topics.createNonPartitionedTopic(offsetPartition);
}
}
}
createTopicIfNotExist(pulsarAdmin, kopTopic.getFullName(), partitionNum);
offsetsTopicExists = true;
} catch (PulsarAdminException e) {
if (e instanceof ConflictException) {
Expand All @@ -222,4 +185,19 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
kopTopic.getOriginalName(), offsetsTopicExists);
}
}

private static void createTopicIfNotExist(final PulsarAdmin admin,
final String topic,
final int numPartitions) throws PulsarAdminException {
try {
admin.topics().createPartitionedTopic(topic, numPartitions);
} catch (PulsarAdminException.ConflictException e) {
log.info("Resources concurrent creating: {}", e.getMessage());
}
try {
// Ensure all partitions are created
admin.topics().createMissedPartitions(topic);
} catch (PulsarAdminException ignored) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
verify(mockTenants, times(1)).updateTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class));
verify(mockNamespaces, times(2)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant()
+ "/" + conf.getKafkaMetadataNamespace()), any(Set.class));
verify(mockTopics, times(2)).createNonPartitionedTopic(contains(offsetsTopic.getOriginalName()));
verify(mockTopics, times(2)).createNonPartitionedTopic(contains(txnTopic.getOriginalName()));
verify(mockTopics, times(1)).createMissedPartitions(contains(offsetsTopic.getOriginalName()));
verify(mockTopics, times(1)).createMissedPartitions(contains(txnTopic.getOriginalName()));
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<lombok.version>1.18.4</lombok.version>
<mockito.version>2.22.0</mockito.version>
<pulsar.group.id>io.streamnative</pulsar.group.id>
<pulsar.version>2.8.0-rc-202105092228</pulsar.version>
<pulsar.version>2.8.0-rc-202105182205</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
<spotbugs-annotations.version>3.1.8</spotbugs-annotations.version>
<testcontainers.version>1.15.1</testcontainers.version>
Expand Down
Loading