Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
kop using pulsar consumer metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed Jan 28, 2021
1 parent 241800b commit 4cdd0d2
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 1 deletion.
3 changes: 3 additions & 0 deletions kafka-impl/conf/kop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ maxReadEntriesNum=5
# kafka producer works well with kafka consumer.
entryFormat=pulsar

# Zookeeper path for storing kop consumer group
groupIdZooKeeperPath=/client_group_id

### --- KoP SSL configs--- ###

# Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -174,6 +176,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final String advertisedListeners;
private final int defaultNumPartitions;
public final int maxReadEntriesNum;
public final ConcurrentHashMap<String, String> currentConnectedGroup;
public final String groupIdStoredPath;
@Getter
private final EntryFormatter entryFormatter;

Expand Down Expand Up @@ -206,6 +210,8 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat());
this.currentConnectedGroup = new ConcurrentHashMap<>();
this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
}

@Override
Expand Down Expand Up @@ -234,6 +240,11 @@ protected void close() {
writeAndFlushWhenInactiveChannel(ctx.channel());
ctx.close();
topicManager.close();
String clientHost = ctx.channel().remoteAddress().toString();
if (currentConnectedGroup.containsKey(clientHost)){
log.info("currentConnectedGroup remove {}", clientHost);
currentConnectedGroup.remove(clientHost);
}
}
}

Expand Down Expand Up @@ -662,6 +673,13 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato
throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type "
+ request.coordinatorType());
}
// store group name to zk for current client
String groupId = request.coordinatorKey();
String zkSubPath = ZooKeeperUtils.groupIdPathFormat(findCoordinator.getClientHost(),
findCoordinator.getHeader().clientId());
byte[] groupIdBytes = groupId.getBytes(Charset.forName("UTF-8"));
ZooKeeperUtils.createPath(pulsarService.getZkClient(), groupIdStoredPath,
zkSubPath, groupIdBytes);

findBroker(TopicName.get(pulsarTopicName))
.whenComplete((node, t) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private long offsetsRetentionCheckIntervalMs = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs;

@FieldContext(
category = CATEGORY_KOP,
doc = "Zookeeper path for storing kop consumer group"
)
private String groupIdZooKeeperPath = "/client_group_id";

@Deprecated
@FieldContext(
category = CATEGORY_KOP,
Expand Down Expand Up @@ -287,6 +293,7 @@ public String getListeners() {
doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]"
)
private String entryFormat = "pulsar";

@FieldContext(
category = CATEGORY_KOP_TRANSACTION,
doc = "Flag to enable transaction coordinator"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkState;

import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,13 +27,16 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;

/**
Expand Down Expand Up @@ -74,6 +78,10 @@ public class KafkaTopicManager {
public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>>
KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();

// cache for consumers for collect metrics: <groupId, Consumer>
public static final ConcurrentHashMap<String, CompletableFuture<Consumer>>
CONSUMERS_CACHE = new ConcurrentHashMap<>();

KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
this.requestHandler = kafkaRequestHandler;
this.pulsarService = kafkaRequestHandler.getPulsarService();
Expand Down Expand Up @@ -136,6 +144,7 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
public static void removeTopicManagerCache(String topicName) {
LOOKUP_CACHE.remove(topicName);
KOP_ADDRESS_CACHE.remove(topicName);
CONSUMERS_CACHE.clear();
}

public static void clearTopicManagerCache() {
Expand Down Expand Up @@ -387,4 +396,33 @@ public void deReference(String topicName) {
}
}

public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartition kafkaPartition) {
// make sure internal consumer existed
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
if (!requestHandler.getGroupCoordinator()
.getoffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) {
log.warn("not get consumer for group {} this time", groupId);
consumerFuture.complete(null);
return consumerFuture;
}
return CONSUMERS_CACHE.computeIfAbsent(groupId, group -> {
try {
TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition));
NamespaceBundle namespaceBundle = pulsarService.getBrokerService()
.pulsar().getNamespaceService().getBundle(topicName);
PersistentTopic persistentTopic = (PersistentTopic) pulsarService
.getBrokerService().getMultiLayerTopicsMap()
.get(topicName.getNamespace()).get(namespaceBundle.toString())
.get(topicName.toString());
// only one consumer existed for internal subscription
Consumer consumer = persistentTopic.getSubscriptions()
.get(groupId).getDispatcher().getConsumers().get(0);
consumerFuture.complete(consumer);
} catch (Exception e) {
log.error("get topic error", e);
consumerFuture.complete(null);
}
return consumerFuture;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import io.netty.util.Recycler.Handle;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;

import java.util.Date;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -53,6 +55,7 @@
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchResponse.PartitionData;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.naming.TopicName;

/**
Expand Down Expand Up @@ -348,7 +351,23 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
} else if (apiVersion <= 3) {
magic = RecordBatch.MAGIC_VALUE_V1;
}
// get group and consumer
String clientHost = fetch.getClientHost();
String groupName = requestHandler
.getCurrentConnectedGroup().computeIfAbsent(clientHost, ignored -> {
String zkSubPath = ZooKeeperUtils.groupIdPathFormat(clientHost,
fetch.getHeader().clientId());
String groupId = ZooKeeperUtils.getData(requestHandler.getPulsarService().getZkClient(),
requestHandler.getGroupIdStoredPath(), zkSubPath);
log.info("get group name from zk for current connection:{} groupId:{}",
clientHost, groupId);
return groupId;
});
CompletableFuture<Consumer> consumerFuture = requestHandler.getTopicManager()
.getGroupConsumers(groupName, kafkaPartition);
final MemoryRecords records = requestHandler.getEntryFormatter().decode(entries, magic);
// collect consumer metrics
EntryFormatter.updateConsumerStats(records, consumerFuture);

List<FetchResponse.AbortedTransaction> abortedTransactions;
if (IsolationLevel.READ_UNCOMMITTED.equals(isolationLevel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ public void shutdown() {
log.info("Shutdown group coordinator completely.");
}

public OffsetAcker getoffsetAcker() {
return offsetAcker;
}

public int partitionFor(String coordinatorKey) {
return groupManager.partitionFor(coordinatorKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;

Expand Down Expand Up @@ -146,7 +147,7 @@ public void close() {
close(consumers.keySet());
}

private CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
return group.computeIfAbsent(
Expand All @@ -158,6 +159,7 @@ private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, Topic
KopTopic kopTopic = new KopTopic(topicPartition.topic());
return consumerBuilder.clone()
.topic(kopTopic.getPartitionName(topicPartition.partition()))
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(groupId)
.subscribeAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@

import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.policies.data.ConsumerStats;


/**
Expand Down Expand Up @@ -61,4 +64,22 @@ static int parseNumMessages(final MemoryRecords records) {
}
return numMessages;
}

/**
* Update Consumer Stats.
*
* @param records messages with Kafka's format
* @param consumerFuture pulsar internal consumer
*/
static void updateConsumerStats(final MemoryRecords records, CompletableFuture<Consumer> consumerFuture) {
ConsumerStats consumerStats = new ConsumerStats();
consumerFuture.whenComplete((consumer, throwable) -> {
if (consumer == null || throwable != null) {
return;
}
consumerStats.bytesOutCounter = records.sizeInBytes();
consumerStats.msgOutCounter = parseNumMessages(records);
consumer.updateStats(consumerStats);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.utils;

import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;


/**
* Utils for ZooKeeper.
*/
@Slf4j
public class ZooKeeperUtils {
public static void createPath(ZooKeeper zooKeeper, String zkPath, String subPath, byte[] data) {
try {
if (zooKeeper.exists(zkPath, false) == null) {
zooKeeper.create(zkPath,
new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String addSubPath = zkPath + subPath;
if (zooKeeper.exists(addSubPath, false) == null) {
zooKeeper.create(addSubPath,
data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zooKeeper.setData(addSubPath, data, -1);
}
log.debug("create zk path, addSubPath:{} data:{}.",
addSubPath, new String(data, StandardCharsets.UTF_8));
} catch (Exception e) {
log.error("create zookeeper path error", e);
}
}

public static String getData(ZooKeeper zooKeeper, String zkPath, String subPath) {
String data = "";
try {
String addSubPath = zkPath + subPath;
Stat zkStat = zooKeeper.exists(addSubPath, true);
if (zkStat != null) {
data = new String(zooKeeper.getData(addSubPath, false, zkStat), StandardCharsets.UTF_8);
}
} catch (Exception e) {
log.error("get zookeeper path data error", e);
}
return data;
}

public static String groupIdPathFormat(String clientHost, String clientId) {
String path = clientHost.split(":")[0] + "-" + clientId;
return path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ public void testKafkaListeners() {
assertEquals(configuration.getListeners(), "PLAINTEXT://localhost:9093");
}

@Test
public void testGroupIdZooKeeperPath() {
String zkPathForKop = "/consumer_group_test";
KafkaServiceConfiguration configuration = new KafkaServiceConfiguration();
configuration.setGroupIdZooKeeperPath(zkPathForKop);
assertEquals("/consumer_group_test", configuration.getGroupIdZooKeeperPath());
}

@Test
public void testConfigurationUtilsStream() throws Exception {
File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
Expand Down

0 comments on commit 4cdd0d2

Please sign in to comment.