Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Create cluster if it doesn't exist during metadata initialisation
Browse files Browse the repository at this point in the history
This case is usually encountered when attempting to use KoP with
standalone mode. When new behavior was introduced in #168 standalone
mode was broken due to sample namespace setup etc not taking place until
after the broker is initalised (which in turn initialises protocol
handlers).

Fixes #185
  • Loading branch information
josephglanville committed Apr 18, 2021
1 parent c4d1892 commit a9356b3
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down Expand Up @@ -370,7 +371,11 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
.build();

PulsarAdmin pulsarAdmin = service.pulsar().getAdminClient();
MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, kafkaConfig);
ClusterData clusterData = new ClusterData(service.getPulsar().getWebServiceAddress(),
service.getPulsar().getWebServiceAddressTls(),
service.getPulsar().getBrokerServiceUrl(),
service.getPulsar().getBrokerServiceUrlTls());
MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);


this.groupCoordinator = GroupCoordinator.of(
Expand Down Expand Up @@ -403,7 +408,12 @@ public void initTransactionCoordinator() throws Exception {
.build();

PulsarAdmin pulsarAdmin = brokerService.getPulsar().getAdminClient();
MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, kafkaConfig);
ClusterData clusterData = new ClusterData(brokerService.getPulsar().getWebServiceAddress(),
brokerService.getPulsar().getWebServiceAddressTls(),
brokerService.getPulsar().getBrokerServiceUrl(),
brokerService.getPulsar().getBrokerServiceUrlTls());

MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);

this.transactionCoordinator = TransactionCoordinator.of(
transactionConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.common.internals.Topic;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -56,16 +55,20 @@ public static String constructTxnLogTopicBaseName(KafkaServiceConfiguration conf
+ "/" + Topic.TRANSACTION_STATE_TOPIC_NAME;
}

public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf)
throws PulsarServerException, PulsarAdminException {
public static void createOffsetMetadataIfMissing(PulsarAdmin pulsarAdmin,
ClusterData clusterData,
KafkaServiceConfiguration conf)
throws PulsarAdminException {
KopTopic kopTopic = new KopTopic(constructOffsetsTopicBaseName(conf));
createKafkaMetadataIfMissing(pulsarAdmin, conf, kopTopic, conf.getOffsetsTopicNumPartitions());
createKafkaMetadataIfMissing(pulsarAdmin, clusterData, conf, kopTopic, conf.getOffsetsTopicNumPartitions());
}

public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf)
throws PulsarServerException, PulsarAdminException {
public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin,
ClusterData clusterData,
KafkaServiceConfiguration conf)
throws PulsarAdminException {
KopTopic kopTopic = new KopTopic(constructTxnLogTopicBaseName(conf));
createKafkaMetadataIfMissing(pulsarAdmin, conf, kopTopic, conf.getTxnLogTopicNumPartitions());
createKafkaMetadataIfMissing(pulsarAdmin, clusterData, conf, kopTopic, conf.getTxnLogTopicNumPartitions());
}

/**
Expand All @@ -83,10 +86,11 @@ public static void createTxnMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServ
* </ul>
*/
private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
KafkaServiceConfiguration conf,
KopTopic kopTopic,
int partitionNum)
throws PulsarServerException, PulsarAdminException {
ClusterData clusterData,
KafkaServiceConfiguration conf,
KopTopic kopTopic,
int partitionNum)
throws PulsarAdminException {
String cluster = conf.getClusterName();
String kafkaMetadataTenant = conf.getKafkaMetadataTenant();
String kafkaMetadataNamespace = kafkaMetadataTenant + "/" + conf.getKafkaMetadataNamespace();
Expand All @@ -97,12 +101,21 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
try {
Clusters clusters = pulsarAdmin.clusters();
if (!clusters.getClusters().contains(cluster)) {
throw new PulsarServerException.NotFoundException("Configured cluster does not exist");
try {
pulsarAdmin.clusters().createCluster(cluster, clusterData);
} catch (PulsarAdminException e) {
if (e instanceof ConflictException) {
log.info("Attempted to create cluster {} however it was created concurrently.", cluster);
} else {
// Re-throw all other exceptions
throw e;
}
}
} else {
ClusterData configuredClusterData = clusters.getCluster(cluster);
log.info("Cluster {} found: {}", cluster, configuredClusterData);
clusterExists = true;
}
clusterExists = true;

// Check if the metadata tenant exists and create it if not
Tenants tenants = pulsarAdmin.tenants();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.Test;
Expand All @@ -50,6 +51,7 @@ public class MetadataUtilsTest {
public void testCreateKafkaMetadataIfMissing() throws Exception {
KopTopic.initialize("public/default");
KafkaServiceConfiguration conf = new KafkaServiceConfiguration();
ClusterData clusterData = new ClusterData();
conf.setClusterName("test");
conf.setKafkaMetadataTenant("public");
conf.setKafkaMetadataNamespace("default");
Expand Down Expand Up @@ -86,7 +88,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
TenantInfo partialTenant = new TenantInfo();
doReturn(partialTenant).when(mockTenants).getTenantInfo(eq(conf.getKafkaMetadataTenant()));

MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

// After call the createOffsetMetadataIfMissing, these methods should return expected data.
doReturn(Lists.newArrayList(conf.getKafkaMetadataTenant())).when(mockTenants).getTenants();
Expand All @@ -95,7 +97,7 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
doReturn(Lists.newArrayList(conf.getClusterName())).when(mockNamespaces)
.getNamespaceReplicationClusters(eq(namespace));

MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

verify(mockTenants, times(1)).createTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class));
verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaMetadataTenant() + "/"
Expand Down Expand Up @@ -141,8 +143,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
doReturn(incompletePartitionList).when(mockTopics).getList(eq(conf.getKafkaMetadataTenant()
+ "/" + conf.getKafkaMetadataNamespace()));

MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, conf);
MetadataUtils.createOffsetMetadataIfMissing(mockPulsarAdmin, clusterData, conf);
MetadataUtils.createTxnMetadataIfMissing(mockPulsarAdmin, clusterData, conf);

verify(mockTenants, times(1)).updateTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class));
verify(mockNamespaces, times(2)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
Expand Down Expand Up @@ -231,6 +232,8 @@ protected final void init() throws Exception {
String brokerServiceUrl = "pulsar://" + this.conf.getAdvertisedAddress() + ":" + brokerPort;
String brokerServiceUrlTls = null; // TLS not supported at this time

ClusterData clusterData = new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, null);

mockZooKeeper = createMockZooKeeper(configClusterName, serviceUrl, serviceUrlTls, brokerServiceUrl,
brokerServiceUrlTls);
mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);
Expand All @@ -239,8 +242,8 @@ protected final void init() throws Exception {

createAdmin();

MetadataUtils.createOffsetMetadataIfMissing(admin, this.conf);
MetadataUtils.createTxnMetadataIfMissing(admin, this.conf);
MetadataUtils.createOffsetMetadataIfMissing(admin, clusterData, this.conf);
MetadataUtils.createTxnMetadataIfMissing(admin, clusterData, this.conf);

if (enableSchemaRegistry) {
admin.topics().createPartitionedTopic(KAFKASTORE_TOPIC, 1);
Expand Down

0 comments on commit a9356b3

Please sign in to comment.