diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 6e4af0e030..62bfa7f543 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.util.FutureUtil; /** @@ -370,7 +371,11 @@ public void initGroupCoordinator(BrokerService service) throws Exception { .build(); PulsarAdmin pulsarAdmin = service.pulsar().getAdminClient(); - MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, kafkaConfig); + ClusterData clusterData = new ClusterData(service.getPulsar().getWebServiceAddress(), + service.getPulsar().getWebServiceAddressTls(), + service.getPulsar().getBrokerServiceUrl(), + service.getPulsar().getBrokerServiceUrlTls()); + MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig); this.groupCoordinator = GroupCoordinator.of( @@ -403,7 +408,12 @@ public void initTransactionCoordinator() throws Exception { .build(); PulsarAdmin pulsarAdmin = brokerService.getPulsar().getAdminClient(); - MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, kafkaConfig); + ClusterData clusterData = new ClusterData(brokerService.getPulsar().getWebServiceAddress(), + brokerService.getPulsar().getWebServiceAddressTls(), + brokerService.getPulsar().getBrokerServiceUrl(), + brokerService.getPulsar().getBrokerServiceUrlTls()); + + MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig); this.transactionCoordinator = TransactionCoordinator.of( transactionConfig, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java index be029cebab..352134d584 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java @@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.internals.Topic; -import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.client.admin.Clusters; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -56,16 +55,20 @@ public static String constructTxnLogTopicBaseName(KafkaServiceConfiguration conf + "/" + Topic.TRANSACTION_STATE_TOPIC_NAME; } - public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf) - throws PulsarServerException, PulsarAdminException { + public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin, + ClusterData clusterData, + KafkaServiceConfiguration conf) + throws PulsarAdminException { KopTopic kopTopic = new KopTopic(constructOffsetsTopicBaseName(conf)); - createKafkaMetadataIfMissing(pulsarAdmin, conf, kopTopic, conf.getOffsetsTopicNumPartitions()); + createKafkaMetadataIfMissing(pulsarAdmin, clusterData, conf, kopTopic, conf.getOffsetsTopicNumPartitions()); } - public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf) - throws PulsarServerException, PulsarAdminException { + public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, + ClusterData clusterData, + KafkaServiceConfiguration conf) + throws PulsarAdminException { KopTopic kopTopic = new KopTopic(constructTxnLogTopicBaseName(conf)); - createKafkaMetadataIfMissing(pulsarAdmin, conf, kopTopic, conf.getTxnLogTopicNumPartitions()); + createKafkaMetadataIfMissing(pulsarAdmin, clusterData, conf, kopTopic, conf.getTxnLogTopicNumPartitions()); } /** @@ -83,10 +86,11 @@ public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServ * */ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin, - KafkaServiceConfiguration conf, - KopTopic kopTopic, - int partitionNum) - throws PulsarServerException, PulsarAdminException { + ClusterData clusterData, + KafkaServiceConfiguration conf, + KopTopic kopTopic, + int partitionNum) + throws PulsarAdminException { String cluster = conf.getClusterName(); String kafkaMetadataTenant = conf.getKafkaMetadataTenant(); String kafkaMetadataNamespace = kafkaMetadataTenant + "/" + conf.getKafkaMetadataNamespace(); @@ -97,12 +101,21 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin, try { Clusters clusters = pulsarAdmin.clusters(); if (!clusters.getClusters().contains(cluster)) { - throw new PulsarServerException.NotFoundException("Configured cluster does not exist"); + try { + pulsarAdmin.clusters().createCluster(cluster, clusterData); + } catch (PulsarAdminException e) { + if (e instanceof ConflictException) { + log.info("Attempted to create cluster {} however it was created concurrently.", cluster); + } else { + // Re-throw all other exceptions + throw e; + } + } } else { ClusterData configuredClusterData = clusters.getCluster(cluster); log.info("Cluster {} found: {}", cluster, configuredClusterData); - clusterExists = true; } + clusterExists = true; // Check if the metadata tenant exists and create it if not Tenants tenants = pulsarAdmin.tenants(); diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java index 9523a2a651..d63f4fe2f0 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.annotations.Test; @@ -50,6 +51,7 @@ public class MetadataUtilsTest { public void testCreateKafkaMetadataIfMissing() throws Exception { KopTopic.initialize("public/default"); KafkaServiceConfiguration conf = new KafkaServiceConfiguration(); + ClusterData clusterData = new ClusterData(); conf.setClusterName("test"); conf.setKafkaMetadataTenant("public"); conf.setKafkaMetadataNamespace("default"); @@ -86,7 +88,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { TenantInfo partialTenant = new TenantInfo(); doReturn(partialTenant).when(mockTenants).getTenantInfo(eq(conf.getKafkaMetadataTenant())); - MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, conf); + MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, clusterData, conf); // After call the createOffsetMetadataIfMissing, these methods should return expected data. doReturn(Lists.newArrayList(conf.getKafkaMetadataTenant())).when(mockTenants).getTenants(); @@ -95,7 +97,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { doReturn(Lists.newArrayList(conf.getClusterName())).when(mockNamespaces) .getNamespaceReplicationClusters(eq(namespace)); - MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, conf); + MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, clusterData, conf); verify(mockTenants, times(1)).createTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class)); verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaMetadataTenant() + "/" @@ -141,8 +143,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { doReturn(incompletePartitionList).when(mockTopics).getList(eq(conf.getKafkaMetadataTenant() + "/" + conf.getKafkaMetadataNamespace())); - MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, conf); - MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, conf); + MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, clusterData, conf); + MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, clusterData, conf); verify(mockTenants, times(1)).updateTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class)); verify(mockNamespaces, times(2)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant() diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index fe8c581051..66ef4d20a6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -71,6 +71,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; @@ -231,6 +232,8 @@ protected final void init() throws Exception { String brokerServiceUrl = "pulsar://" + this.conf.getAdvertisedAddress() + ":" + brokerPort; String brokerServiceUrlTls = null; // TLS not supported at this time + ClusterData clusterData = new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, null); + mockZooKeeper = createMockZooKeeper(configClusterName, serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls); mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor); @@ -239,8 +242,8 @@ protected final void init() throws Exception { createAdmin(); - MetadataUtils.createOffsetMetadataIfMissing(admin, this.conf); - MetadataUtils.createTxnMetadataIfMissing(admin, this.conf); + MetadataUtils.createOffsetMetadataIfMissing(admin, clusterData, this.conf); + MetadataUtils.createTxnMetadataIfMissing(admin, clusterData, this.conf); if (enableSchemaRegistry) { admin.topics().createPartitionedTopic(KAFKASTORE_TOPIC, 1);