Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Create cluster if it doesn't exist during metadata initialisation #445

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down Expand Up @@ -370,7 +371,11 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
.build();

PulsarAdmin pulsarAdmin = service.pulsar().getAdminClient();
MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, kafkaConfig);
ClusterData clusterData = new ClusterData(service.getPulsar().getWebServiceAddress(),
service.getPulsar().getWebServiceAddressTls(),
service.getPulsar().getBrokerServiceUrl(),
service.getPulsar().getBrokerServiceUrlTls());
MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);


this.groupCoordinator = GroupCoordinator.of(
Expand Down Expand Up @@ -403,7 +408,12 @@ public void initTransactionCoordinator() throws Exception {
.build();

PulsarAdmin pulsarAdmin = brokerService.getPulsar().getAdminClient();
MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, kafkaConfig);
ClusterData clusterData = new ClusterData(brokerService.getPulsar().getWebServiceAddress(),
brokerService.getPulsar().getWebServiceAddressTls(),
brokerService.getPulsar().getBrokerServiceUrl(),
brokerService.getPulsar().getBrokerServiceUrlTls());

MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);

this.transactionCoordinator = TransactionCoordinator.of(
transactionConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.common.internals.Topic;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -56,16 +55,20 @@ public static String constructTxnLogTopicBaseName(KafkaServiceConfiguration conf
+ "/" + Topic.TRANSACTION_STATE_TOPIC_NAME;
}

public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf)
throws PulsarServerException, PulsarAdminException {
public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin,
ClusterData clusterData,
KafkaServiceConfiguration conf)
throws PulsarAdminException {
KopTopic kopTopic = new KopTopic(constructOffsetsTopicBaseName(conf));
createKafkaMetadataIfMissing(pulsarAdmin, conf, kopTopic, conf.getOffsetsTopicNumPartitions());
createKafkaMetadataIfMissing(pulsarAdmin, clusterData, conf, kopTopic, conf.getOffsetsTopicNumPartitions());
}

public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf)
throws PulsarServerException, PulsarAdminException {
public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin,
ClusterData clusterData,
KafkaServiceConfiguration conf)
throws PulsarAdminException {
KopTopic kopTopic = new KopTopic(constructTxnLogTopicBaseName(conf));
createKafkaMetadataIfMissing(pulsarAdmin, conf, kopTopic, conf.getTxnLogTopicNumPartitions());
createKafkaMetadataIfMissing(pulsarAdmin, clusterData, conf, kopTopic, conf.getTxnLogTopicNumPartitions());
}

/**
Expand All @@ -83,10 +86,11 @@ public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServ
* </ul>
*/
private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
KafkaServiceConfiguration conf,
KopTopic kopTopic,
int partitionNum)
throws PulsarServerException, PulsarAdminException {
ClusterData clusterData,
KafkaServiceConfiguration conf,
KopTopic kopTopic,
int partitionNum)
throws PulsarAdminException {
String cluster = conf.getClusterName();
String kafkaMetadataTenant = conf.getKafkaMetadataTenant();
String kafkaMetadataNamespace = kafkaMetadataTenant + "/" + conf.getKafkaMetadataNamespace();
Expand All @@ -97,12 +101,21 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
try {
Clusters clusters = pulsarAdmin.clusters();
if (!clusters.getClusters().contains(cluster)) {
throw new PulsarServerException.NotFoundException("Configured cluster does not exist");
try {
pulsarAdmin.clusters().createCluster(cluster, clusterData);
} catch (PulsarAdminException e) {
if (e instanceof ConflictException) {
log.info("Attempted to create cluster {} however it was created concurrently.", cluster);
} else {
// Re-throw all other exceptions
throw e;
}
}
} else {
ClusterData configuredClusterData = clusters.getCluster(cluster);
log.info("Cluster {} found: {}", cluster, configuredClusterData);
clusterExists = true;
}
clusterExists = true;

// Check if the metadata tenant exists and create it if not
Tenants tenants = pulsarAdmin.tenants();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.Test;
Expand All @@ -50,6 +51,7 @@ public class MetadataUtilsTest {
public void testCreateKafkaMetadataIfMissing() throws Exception {
KopTopic.initialize("public/default");
KafkaServiceConfiguration conf = new KafkaServiceConfiguration();
ClusterData clusterData = new ClusterData();
conf.setClusterName("test");
conf.setKafkaMetadataTenant("public");
conf.setKafkaMetadataNamespace("default");
Expand Down Expand Up @@ -86,7 +88,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
TenantInfo partialTenant = new TenantInfo();
doReturn(partialTenant).when(mockTenants).getTenantInfo(eq(conf.getKafkaMetadataTenant()));

MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

// After call the createOffsetMetadataIfMissing, these methods should return expected data.
doReturn(Lists.newArrayList(conf.getKafkaMetadataTenant())).when(mockTenants).getTenants();
Expand All @@ -95,7 +97,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
doReturn(Lists.newArrayList(conf.getClusterName())).when(mockNamespaces)
.getNamespaceReplicationClusters(eq(namespace));

MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

verify(mockTenants, times(1)).createTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class));
verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaMetadataTenant() + "/"
Expand Down Expand Up @@ -141,8 +143,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
doReturn(incompletePartitionList).when(mockTopics).getList(eq(conf.getKafkaMetadataTenant()
+ "/" + conf.getKafkaMetadataNamespace()));

MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, clusterData, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

verify(mockTenants, times(1)).updateTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class));
verify(mockNamespaces, times(2)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
Expand Down Expand Up @@ -231,6 +232,8 @@ protected final void init() throws Exception {
String brokerServiceUrl = "pulsar://" + this.conf.getAdvertisedAddress() + ":" + brokerPort;
String brokerServiceUrlTls = null; // TLS not supported at this time

ClusterData clusterData = new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, null);

mockZooKeeper = createMockZooKeeper(configClusterName, serviceUrl, serviceUrlTls, brokerServiceUrl,
brokerServiceUrlTls);
mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);
Expand All @@ -239,8 +242,8 @@ protected final void init() throws Exception {

createAdmin();

MetadataUtils.createOffsetMetadataIfMissing(admin, this.conf);
MetadataUtils.createTxnMetadataIfMissing(admin, this.conf);
MetadataUtils.createOffsetMetadataIfMissing(admin, clusterData, this.conf);
MetadataUtils.createTxnMetadataIfMissing(admin, clusterData, this.conf);

if (enableSchemaRegistry) {
admin.topics().createPartitionedTopic(KAFKASTORE_TOPIC, 1);
Expand Down