Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Added a new MetadataUtils with an implementation of createKafkaMetada…
Browse files Browse the repository at this point in the history
…taIfMissing… (#168)

* Added a new MetadataUtils with an implementation of createKafkaMetadataIfMissing that coes not overwrite the cluster metadata. Modified several use cases to support the change.
Logic for the createKafkaMetadataIfMissing implementation is as follows:
 - If the tenant does not exist it will be created
 - If the tenant exists but the allowedClusters list does not include the cluster this method will add the cluster to the allowedClusters list
 - If the namespace does not exist it will be created
 - If the namespace exists but the replicationClusters list does not include the cluster this method will add the cluster to the replicationClusters list
 - If the offset topic does not exist it will be created
 - If the offset topic exists but some partitions are missing, the missing partitions will be created

* Fixing checkstyle errors

Co-authored-by: William Mccarley <[email protected]>
  • Loading branch information
wmccarley and William Mccarley authored Aug 18, 2020
1 parent af7ccf8 commit b6ed767
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,39 @@
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down Expand Up @@ -313,9 +308,9 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
.offsetsRetentionMs(TimeUnit.MINUTES.toMillis(kafkaConfig.getOffsetsRetentionMinutes()))
.build();

createKafkaMetadataNamespaceIfNeeded(service);
// topicName in pulsar format: tenant/ns/topic
createKafkaOffsetsTopic(service);
PulsarAdmin pulsarAdmin = service.pulsar().getAdminClient();
MetadataUtils.createKafkaMetadataIfMissing(pulsarAdmin, kafkaConfig);


this.groupCoordinator = GroupCoordinator.of(
(PulsarClientImpl) (service.pulsar().getClient()),
Expand All @@ -338,98 +333,37 @@ public void startGroupCoordinator() throws Exception {
}
}

private void createKafkaMetadataNamespaceIfNeeded(BrokerService service)
throws PulsarServerException, PulsarAdminException {
String cluster = kafkaConfig.getClusterName();
String kafkaMetadataTenant = kafkaConfig.getKafkaMetadataTenant();
String kafkaMetadataNamespace = kafkaMetadataTenant + "/" + kafkaConfig.getKafkaMetadataNamespace();
PulsarAdmin pulsarAdmin = service.pulsar().getAdminClient();

try {
ClusterData clusterData = new ClusterData(service.pulsar().getWebServiceAddress(),
null /* serviceUrlTls */,
service.pulsar().getBrokerServiceUrl(),
null /* brokerServiceUrlTls */);
if (!pulsarAdmin.clusters().getClusters().contains(cluster)) {
pulsarAdmin.clusters().createCluster(cluster, clusterData);
} else {
pulsarAdmin.clusters().updateCluster(cluster, clusterData);
}

if (!pulsarAdmin.tenants().getTenants().contains(kafkaMetadataTenant)) {
pulsarAdmin.tenants().createTenant(kafkaMetadataTenant,
new TenantInfo(Sets.newHashSet(kafkaConfig.getSuperUserRoles()), Sets.newHashSet(cluster)));
}
if (!pulsarAdmin.namespaces().getNamespaces(kafkaMetadataTenant).contains(kafkaMetadataNamespace)) {
Set<String> clusters = Sets.newHashSet(kafkaConfig.getClusterName());
pulsarAdmin.namespaces().createNamespace(kafkaMetadataNamespace, clusters);
pulsarAdmin.namespaces().setNamespaceReplicationClusters(kafkaMetadataNamespace, clusters);
pulsarAdmin.namespaces().setRetention(kafkaMetadataNamespace,
new RetentionPolicies(-1, -1));
}
} catch (PulsarAdminException e) {
if (e instanceof ConflictException) {
log.info("Resources concurrent creating and cause e: ", e);
return;
}

log.error("Failed to get retention policy for kafka metadata namespace {}",
kafkaMetadataNamespace, e);
throw e;
}
}

private String createKafkaOffsetsTopic(BrokerService service) throws PulsarServerException, PulsarAdminException {
String offsetsTopic = kafkaConfig.getKafkaMetadataTenant() + "/" + kafkaConfig.getKafkaMetadataNamespace()
+ "/" + Topic.GROUP_METADATA_TOPIC_NAME;
/**
* This method discovers ownership of offset topic partitions and attempts to load offset topics
* assigned to this broker.
*/
private void loadOffsetTopics(GroupCoordinator groupCoordinator) throws Exception {
Lookup lookupService = brokerService.pulsar().getAdminClient().lookups();
String currentBroker = brokerService.pulsar().getBrokerServiceUrl();
String topicBase = MetadataUtils.constructOffsetsTopicBaseName(kafkaConfig);
int numPartitions = kafkaConfig.getOffsetsTopicNumPartitions();

PartitionedTopicMetadata offsetsTopicMetadata =
service.pulsar().getAdminClient().topics().getPartitionedTopicMetadata(offsetsTopic);
if (offsetsTopicMetadata.partitions <= 0) {
log.info("Kafka group metadata topic {} doesn't exist. Creating it ...",
offsetsTopic);
try {
service.pulsar().getAdminClient().topics().createPartitionedTopic(
offsetsTopic,
kafkaConfig.getOffsetsTopicNumPartitions()
);

for (int i = 0; i < kafkaConfig.getOffsetsTopicNumPartitions(); i++) {
service.pulsar().getAdminClient().topics()
.createNonPartitionedTopic(offsetsTopic + PARTITIONED_TOPIC_SUFFIX + i);
}
} catch (ConflictException e) {
log.info("Topic {} concurrent creating and cause e: ", offsetsTopic, e);
return offsetsTopic;
}
Map<String, List<Integer>> mapBrokerToPartition = new HashMap<>();

log.info("Successfully created group metadata topic {} with {} partitions.",
offsetsTopic, kafkaConfig.getOffsetsTopicNumPartitions());
for (int i = 0; i < numPartitions; i++) {
String broker = lookupService.lookupTopic(topicBase + PARTITIONED_TOPIC_SUFFIX + i);
mapBrokerToPartition.putIfAbsent(broker, new ArrayList());
mapBrokerToPartition.get(broker).add(i);
}

return offsetsTopic;
}
mapBrokerToPartition.entrySet().stream().forEach(
e -> log.info("Discovered broker: {} owns offset topic partitions: {} ", e.getKey(), e.getValue()));

private void loadOffsetTopics(GroupCoordinator groupCoordinator) throws Exception {
String offsetsTopic = kafkaConfig.getKafkaMetadataTenant() + "/" + kafkaConfig.getKafkaMetadataNamespace()
+ "/" + Topic.GROUP_METADATA_TOPIC_NAME;
int numPartitions = kafkaConfig.getOffsetsTopicNumPartitions();
List<CompletableFuture<Void>> lists = Lists.newArrayListWithExpectedSize(numPartitions);
for (int i = 0; i < numPartitions; i++) {
String partition = offsetsTopic + PARTITIONED_TOPIC_SUFFIX + i;
String broker = brokerService.pulsar().getAdminClient().lookups()
.lookupTopic(partition);
List<Integer> partitionsOwnedByCurrentBroker = mapBrokerToPartition.get(currentBroker);

if (log.isDebugEnabled()) {
log.debug("found broker {} for offset topic partition {}. current broker: {}",
broker, partition, brokerService.pulsar().getBrokerServiceUrl());
}
if (null != partitionsOwnedByCurrentBroker && !partitionsOwnedByCurrentBroker.isEmpty()) {
List<CompletableFuture<Void>> lists = partitionsOwnedByCurrentBroker.stream().map(
(ii) -> groupCoordinator.handleGroupImmigration(ii)).collect(Collectors.toList());

if (broker.equalsIgnoreCase(brokerService.pulsar().getBrokerServiceUrl())) {
lists.add(groupCoordinator.handleGroupImmigration(i));
}
FutureUtil.waitForAll(lists).get();
} else {
log.info("Current broker: {} does not own any of the offset topic partitions", currentBroker);
}
FutureUtil.waitForAll(lists).get();
}

public static int getListenerPort(String listener) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.utils;

import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;

import com.google.common.collect.Sets;

import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.common.internals.Topic;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;

/**
* Utils for KoP Metadata.
*/
@Slf4j
public class MetadataUtils {

public static String constructOffsetsTopicBaseName(KafkaServiceConfiguration conf) {
return conf.getKafkaMetadataTenant() + "/" + conf.getKafkaMetadataNamespace()
+ "/" + Topic.GROUP_METADATA_TOPIC_NAME;
}

/**
* This method creates the Kafka metadata tenant and namespace if they are not currently present.
* <ul>
* <li>If the cluster does not exist this method will throw a PulsarServerException.NotFoundException</li>
* <li>If the tenant does not exist it will be created</li>
* <li>If the tenant exists but the allowedClusters list does not include the cluster this method will
* add the cluster to the allowedClusters list</li>
* <li>If the namespace does not exist it will be created</li>
* <li>If the namespace exists but the replicationClusters list does not include the cluster this method
* will add the cluster to the replicationClusters list</li>
* <li>If the offset topic does not exist it will be created</li>
* <li>If the offset topic exists but some partitions are missing, the missing partitions will be created</li>
* </ul>
*/
public static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin, KafkaServiceConfiguration conf)
throws PulsarServerException, PulsarAdminException {
String cluster = conf.getClusterName();
String kafkaMetadataTenant = conf.getKafkaMetadataTenant();
String kafkaMetadataNamespace = kafkaMetadataTenant + "/" + conf.getKafkaMetadataNamespace();

String offsetsTopic = constructOffsetsTopicBaseName(conf);

boolean clusterExists, tenantExists, namespaceExists, offsetsTopicExists;
clusterExists = tenantExists = namespaceExists = offsetsTopicExists = false;

try {
Clusters clusters = pulsarAdmin.clusters();
if (!clusters.getClusters().contains(cluster)) {
throw new PulsarServerException.NotFoundException("Configured cluster does not exist");
} else {
ClusterData configuredClusterData = clusters.getCluster(cluster);
log.info("Cluster {} found: {}", cluster, configuredClusterData);
clusterExists = true;
}

// Check if the metadata tenant exists and create it if not
Tenants tenants = pulsarAdmin.tenants();
if (!tenants.getTenants().contains(kafkaMetadataTenant)) {
log.info("Tenant: {} does not exist, creating it ...", kafkaMetadataTenant);
tenants.createTenant(kafkaMetadataTenant,
new TenantInfo(Sets.newHashSet(conf.getSuperUserRoles()), Sets.newHashSet(cluster)));
} else {
TenantInfo kafkaMetadataTenantInfo = tenants.getTenantInfo(kafkaMetadataTenant);
Set<String> allowedClusters = kafkaMetadataTenantInfo.getAllowedClusters();
if (!allowedClusters.contains(cluster)) {
log.info("Tenant: {} exists but cluster: {} is not in the allowedClusters list, updating it ...",
kafkaMetadataTenant, cluster);
allowedClusters.add(cluster);
tenants.updateTenant(kafkaMetadataTenant, kafkaMetadataTenantInfo);
}
}
tenantExists = true;

// Check if the metadata namespace exists and create it if not
Namespaces namespaces = pulsarAdmin.namespaces();
if (!namespaces.getNamespaces(kafkaMetadataTenant).contains(kafkaMetadataNamespace)) {
log.info("Namespaces: {} does not exist in tenant: {}, creating it ...",
kafkaMetadataNamespace, kafkaMetadataTenant);
Set<String> replicationClusters = Sets.newHashSet(conf.getClusterName());
namespaces.createNamespace(kafkaMetadataNamespace, replicationClusters);
namespaces.setNamespaceReplicationClusters(kafkaMetadataNamespace, replicationClusters);
namespaces.setRetention(kafkaMetadataNamespace,
new RetentionPolicies(-1, -1));
} else {
List<String> replicationClusters = namespaces.getNamespaceReplicationClusters(kafkaMetadataNamespace);
if (!replicationClusters.contains(cluster)) {
log.info("Namespace: {} exists but cluster: {} is not in the replicationClusters list,"
+ "updating it ...", kafkaMetadataNamespace, cluster);
Set<String> newReplicationClusters = Sets.newHashSet(replicationClusters);
newReplicationClusters.add(cluster);
namespaces.setNamespaceReplicationClusters(kafkaMetadataNamespace, newReplicationClusters);
}
}
namespaceExists = true;

// Check if the offsets topic exists and create it if not
Topics topics = pulsarAdmin.topics();
PartitionedTopicMetadata offsetsTopicMetadata = topics.getPartitionedTopicMetadata(offsetsTopic);

Set<String> offsetPartitionSet = new HashSet<String>(conf.getOffsetsTopicNumPartitions());
for (int i = 0; i < conf.getOffsetsTopicNumPartitions(); i++) {
offsetPartitionSet.add(offsetsTopic + PARTITIONED_TOPIC_SUFFIX + i);
}

if (offsetsTopicMetadata.partitions <= 0) {
log.info("Kafka group metadata topic {} doesn't exist. Creating it ...", offsetsTopic);

topics.createPartitionedTopic(
offsetsTopic,
conf.getOffsetsTopicNumPartitions()
);

for (String partition : offsetPartitionSet) {
topics.createNonPartitionedTopic(partition);
}

log.info("Successfully created group metadata topic {} with {} partitions.",
offsetsTopic, conf.getOffsetsTopicNumPartitions());
} else {
// Check to see if the partitions all exist
offsetPartitionSet.removeAll(
topics.getList(kafkaMetadataNamespace).stream()
.filter((topic) -> {
return topic.startsWith(offsetsTopic + PARTITIONED_TOPIC_SUFFIX);
}).collect(Collectors.toList()));

if (!offsetPartitionSet.isEmpty()) {
log.info("Identified missing offset topic partitions: {}", offsetPartitionSet);
for (String offsetPartition : offsetPartitionSet) {
topics.createNonPartitionedTopic(offsetPartition);
}
}
}
offsetsTopicExists = true;
} catch (PulsarAdminException e) {
if (e instanceof ConflictException) {
log.info("Resources concurrent creating and cause e: ", e);
return;
}

log.error("Failed to successfully initialize Kafka Metadata {}",
kafkaMetadataNamespace, e);
throw e;
} finally {
log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {},"
+ " namespace: {} exists: {}, topic: {} exists: {}", cluster, clusterExists, kafkaMetadataTenant,
tenantExists, kafkaMetadataNamespace, namespaceExists, offsetsTopic, offsetsTopicExists);
}
}
}
Loading

0 comments on commit b6ed767

Please sign in to comment.