From cda07d712dde11a18765bb975938e442ead3cd45 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Sat, 6 Feb 2021 16:33:04 +0800 Subject: [PATCH] kop using pulsar consumer metrics (#263) fix for #248 --- kafka-impl/conf/kop.conf | 3 + .../handlers/kop/KafkaRequestHandler.java | 19 ++++++ .../kop/KafkaServiceConfiguration.java | 7 ++ .../handlers/kop/KafkaTopicManager.java | 38 +++++++++++ .../handlers/kop/MessageFetchContext.java | 19 ++++++ .../kop/coordinator/group/OffsetAcker.java | 2 +- .../handlers/kop/format/EntryFormatter.java | 21 ++++++ .../handlers/kop/utils/ZooKeeperUtils.java | 67 +++++++++++++++++++ .../kop/KafkaServiceConfigurationTest.java | 8 +++ 9 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf index 2aafe0de90..1ce5f66829 100755 --- a/kafka-impl/conf/kop.conf +++ b/kafka-impl/conf/kop.conf @@ -89,6 +89,9 @@ maxReadEntriesNum=5 # kafka producer works well with kafka consumer. entryFormat=pulsar +# Zookeeper path for storing kop consumer group +groupIdZooKeeperPath=/client_group_id + ### --- KoP SSL configs--- ### # Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index c4e49169ac..3332922d62 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -40,9 +40,11 @@ import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder; +import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -176,6 +178,9 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final String advertisedListeners; private final int defaultNumPartitions; public final int maxReadEntriesNum; + // store the group name for current connected client. + private final ConcurrentHashMap currentConnectedGroup; + private final String groupIdStoredPath; @Getter private final EntryFormatter entryFormatter; @@ -208,6 +213,8 @@ public KafkaRequestHandler(PulsarService pulsarService, this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions(); this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum(); this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat()); + this.currentConnectedGroup = new ConcurrentHashMap<>(); + this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath(); } @Override @@ -236,6 +243,11 @@ protected void close() { writeAndFlushWhenInactiveChannel(ctx.channel()); ctx.close(); topicManager.close(); + String clientHost = ctx.channel().remoteAddress().toString(); + if (currentConnectedGroup.containsKey(clientHost)){ + log.info("currentConnectedGroup remove {}", clientHost); + currentConnectedGroup.remove(clientHost); + } } } @@ -664,6 +676,13 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type " + request.coordinatorType()); } + // store group name to zk for current client + String groupId = request.coordinatorKey(); + String zkSubPath = ZooKeeperUtils.groupIdPathFormat(findCoordinator.getClientHost(), + findCoordinator.getHeader().clientId()); + byte[] groupIdBytes = groupId.getBytes(Charset.forName("UTF-8")); + ZooKeeperUtils.createPath(pulsarService.getZkClient(), groupIdStoredPath, + zkSubPath, groupIdBytes); findBroker(TopicName.get(pulsarTopicName)) .whenComplete((node, t) -> { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index ee90ebe2c0..1830ac7194 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -141,6 +141,12 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private long offsetsRetentionCheckIntervalMs = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs; + @FieldContext( + category = CATEGORY_KOP, + doc = "Zookeeper path for storing kop consumer group" + ) + private String groupIdZooKeeperPath = "/client_group_id"; + @Deprecated @FieldContext( category = CATEGORY_KOP, @@ -287,6 +293,7 @@ public String getListeners() { doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]" ) private String entryFormat = "pulsar"; + @FieldContext( category = CATEGORY_KOP_TRANSACTION, doc = "Flag to enable transaction coordinator" diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index c1d2c4eea0..25aa6c6a65 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkState; +import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import java.net.InetSocketAddress; import java.util.Map; import java.util.Optional; @@ -26,12 +27,15 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; /** @@ -73,6 +77,10 @@ public class KafkaTopicManager { public static final ConcurrentHashMap>> KOP_ADDRESS_CACHE = new ConcurrentHashMap<>(); + // cache for consumers for collect metrics: + public static final ConcurrentHashMap> + CONSUMERS_CACHE = new ConcurrentHashMap<>(); + KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) { this.requestHandler = kafkaRequestHandler; this.pulsarService = kafkaRequestHandler.getPulsarService(); @@ -135,6 +143,7 @@ public CompletableFuture getTopicConsumerManager(Stri public static void removeTopicManagerCache(String topicName) { LOOKUP_CACHE.remove(topicName); KOP_ADDRESS_CACHE.remove(topicName); + CONSUMERS_CACHE.clear(); } public static void clearTopicManagerCache() { @@ -369,4 +378,33 @@ public void deReference(String topicName) { } } + public CompletableFuture getGroupConsumers(String groupId, TopicPartition kafkaPartition) { + // make sure internal consumer existed + CompletableFuture consumerFuture = new CompletableFuture<>(); + if (groupId == null || groupId.isEmpty() || !requestHandler.getGroupCoordinator() + .getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) { + log.warn("not get consumer for group {} this time", groupId); + consumerFuture.complete(null); + return consumerFuture; + } + return CONSUMERS_CACHE.computeIfAbsent(groupId, group -> { + try { + TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition)); + NamespaceBundle namespaceBundle = pulsarService.getBrokerService() + .pulsar().getNamespaceService().getBundle(topicName); + PersistentTopic persistentTopic = (PersistentTopic) pulsarService + .getBrokerService().getMultiLayerTopicsMap() + .get(topicName.getNamespace()).get(namespaceBundle.toString()) + .get(topicName.toString()); + // only one consumer existed for internal subscription + Consumer consumer = persistentTopic.getSubscriptions() + .get(groupId).getDispatcher().getConsumers().get(0); + consumerFuture.complete(consumer); + } catch (Exception e) { + log.error("get topic error", e); + consumerFuture.complete(null); + } + return consumerFuture; + }); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index 2cc1f1a026..8d081f5022 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -20,8 +20,10 @@ import io.netty.util.Recycler.Handle; import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; +import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils; import java.util.Date; import java.util.LinkedHashMap; @@ -51,6 +53,7 @@ import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FetchResponse.PartitionData; import org.apache.kafka.common.requests.IsolationLevel; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.naming.TopicName; /** @@ -343,7 +346,23 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, } else if (apiVersion <= 3) { magic = RecordBatch.MAGIC_VALUE_V1; } + // get group and consumer + String clientHost = fetch.getClientHost(); + String groupName = requestHandler + .getCurrentConnectedGroup().computeIfAbsent(clientHost, ignored -> { + String zkSubPath = ZooKeeperUtils.groupIdPathFormat(clientHost, + fetch.getHeader().clientId()); + String groupId = ZooKeeperUtils.getData(requestHandler.getPulsarService().getZkClient(), + requestHandler.getGroupIdStoredPath(), zkSubPath); + log.info("get group name from zk for current connection:{} groupId:{}", + clientHost, groupId); + return groupId; + }); + CompletableFuture consumerFuture = requestHandler.getTopicManager() + .getGroupConsumers(groupName, kafkaPartition); final MemoryRecords records = requestHandler.getEntryFormatter().decode(entries, magic); + // collect consumer metrics + EntryFormatter.updateConsumerStats(records, consumerFuture); List abortedTransactions; if (IsolationLevel.READ_UNCOMMITTED.equals(isolationLevel)) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index ec1c956ee8..b3ea3a37e9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -152,7 +152,7 @@ public void close() { close(consumers.keySet()); } - private CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { + public CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { Map>> group = consumers .computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>()); return group.computeIfAbsent( diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java index 6003f0690c..c2e62f3016 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -15,10 +15,13 @@ import io.netty.buffer.ByteBuf; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.common.policies.data.ConsumerStats; /** @@ -61,4 +64,22 @@ static int parseNumMessages(final MemoryRecords records) { } return numMessages; } + + /** + * Update Consumer Stats. + * + * @param records messages with Kafka's format + * @param consumerFuture pulsar internal consumer + */ + static void updateConsumerStats(final MemoryRecords records, CompletableFuture consumerFuture) { + ConsumerStats consumerStats = new ConsumerStats(); + consumerFuture.whenComplete((consumer, throwable) -> { + if (consumer == null || throwable != null) { + return; + } + consumerStats.bytesOutCounter = records.sizeInBytes(); + consumerStats.msgOutCounter = parseNumMessages(records); + consumer.updateStats(consumerStats); + }); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java new file mode 100644 index 0000000000..c0a95561a6 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java @@ -0,0 +1,67 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.utils; + +import java.nio.charset.StandardCharsets; +import lombok.extern.slf4j.Slf4j; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + + +/** + * Utils for ZooKeeper. + */ +@Slf4j +public class ZooKeeperUtils { + public static void createPath(ZooKeeper zooKeeper, String zkPath, String subPath, byte[] data) { + try { + if (zooKeeper.exists(zkPath, false) == null) { + zooKeeper.create(zkPath, + new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + String addSubPath = zkPath + subPath; + if (zooKeeper.exists(addSubPath, false) == null) { + zooKeeper.create(addSubPath, + data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } else { + zooKeeper.setData(addSubPath, data, -1); + } + log.debug("create zk path, addSubPath:{} data:{}.", + addSubPath, new String(data, StandardCharsets.UTF_8)); + } catch (Exception e) { + log.error("create zookeeper path error", e); + } + } + + public static String getData(ZooKeeper zooKeeper, String zkPath, String subPath) { + String data = null; + try { + String addSubPath = zkPath + subPath; + Stat zkStat = zooKeeper.exists(addSubPath, true); + if (zkStat != null) { + data = new String(zooKeeper.getData(addSubPath, false, zkStat), StandardCharsets.UTF_8); + } + } catch (Exception e) { + log.error("get zookeeper path data error", e); + } + return data; + } + + public static String groupIdPathFormat(String clientHost, String clientId) { + String path = clientHost.split(":")[0] + "-" + clientId; + return path; + } +} \ No newline at end of file diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java index e89bda3697..752885f55f 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java @@ -64,6 +64,14 @@ public void testKafkaListeners() { assertEquals(configuration.getListeners(), "PLAINTEXT://localhost:9093"); } + @Test + public void testGroupIdZooKeeperPath() { + String zkPathForKop = "/consumer_group_test"; + KafkaServiceConfiguration configuration = new KafkaServiceConfiguration(); + configuration.setGroupIdZooKeeperPath(zkPathForKop); + assertEquals("/consumer_group_test", configuration.getGroupIdZooKeeperPath()); + } + @Test public void testConfigurationUtilsStream() throws Exception { File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");