Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

kop using pulsar consumer metrics #263

Merged
merged 4 commits into from
Feb 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kafka-impl/conf/kop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ maxReadEntriesNum=5
# kafka producer works well with kafka consumer.
entryFormat=pulsar

# Zookeeper path for storing kop consumer group
groupIdZooKeeperPath=/client_group_id

### --- KoP SSL configs--- ###

# Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -176,6 +178,9 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final String advertisedListeners;
private final int defaultNumPartitions;
public final int maxReadEntriesNum;
// store the group name for current connected client.
private final ConcurrentHashMap<String, String> currentConnectedGroup;
private final String groupIdStoredPath;
@Getter
private final EntryFormatter entryFormatter;

Expand Down Expand Up @@ -208,6 +213,8 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat());
this.currentConnectedGroup = new ConcurrentHashMap<>();
this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
}

@Override
Expand Down Expand Up @@ -236,6 +243,11 @@ protected void close() {
writeAndFlushWhenInactiveChannel(ctx.channel());
ctx.close();
topicManager.close();
String clientHost = ctx.channel().remoteAddress().toString();
if (currentConnectedGroup.containsKey(clientHost)){
log.info("currentConnectedGroup remove {}", clientHost);
currentConnectedGroup.remove(clientHost);
}
}
}

Expand Down Expand Up @@ -664,6 +676,13 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato
throw new NotImplementedException("FindCoordinatorRequest not support TRANSACTION type "
+ request.coordinatorType());
}
// store group name to zk for current client
String groupId = request.coordinatorKey();
String zkSubPath = ZooKeeperUtils.groupIdPathFormat(findCoordinator.getClientHost(),
findCoordinator.getHeader().clientId());
byte[] groupIdBytes = groupId.getBytes(Charset.forName("UTF-8"));
ZooKeeperUtils.createPath(pulsarService.getZkClient(), groupIdStoredPath,
zkSubPath, groupIdBytes);

findBroker(TopicName.get(pulsarTopicName))
.whenComplete((node, t) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private long offsetsRetentionCheckIntervalMs = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs;

@FieldContext(
category = CATEGORY_KOP,
doc = "Zookeeper path for storing kop consumer group"
)
private String groupIdZooKeeperPath = "/client_group_id";

@Deprecated
@FieldContext(
category = CATEGORY_KOP,
Expand Down Expand Up @@ -287,6 +293,7 @@ public String getListeners() {
doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]"
)
private String entryFormat = "pulsar";

@FieldContext(
category = CATEGORY_KOP_TRANSACTION,
doc = "Flag to enable transaction coordinator"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkState;

import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,12 +27,15 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;

/**
Expand Down Expand Up @@ -73,6 +77,10 @@ public class KafkaTopicManager {
public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>>
KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();

// cache for consumers for collect metrics: <groupId, Consumer>
public static final ConcurrentHashMap<String, CompletableFuture<Consumer>>
CONSUMERS_CACHE = new ConcurrentHashMap<>();

KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
this.requestHandler = kafkaRequestHandler;
this.pulsarService = kafkaRequestHandler.getPulsarService();
Expand Down Expand Up @@ -135,6 +143,7 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
public static void removeTopicManagerCache(String topicName) {
LOOKUP_CACHE.remove(topicName);
KOP_ADDRESS_CACHE.remove(topicName);
CONSUMERS_CACHE.clear();
}

public static void clearTopicManagerCache() {
Expand Down Expand Up @@ -369,4 +378,33 @@ public void deReference(String topicName) {
}
}

public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartition kafkaPartition) {
// make sure internal consumer existed
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
if (groupId == null || groupId.isEmpty() || !requestHandler.getGroupCoordinator()
.getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) {
log.warn("not get consumer for group {} this time", groupId);
consumerFuture.complete(null);
return consumerFuture;
}
return CONSUMERS_CACHE.computeIfAbsent(groupId, group -> {
try {
TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition));
NamespaceBundle namespaceBundle = pulsarService.getBrokerService()
.pulsar().getNamespaceService().getBundle(topicName);
PersistentTopic persistentTopic = (PersistentTopic) pulsarService
.getBrokerService().getMultiLayerTopicsMap()
.get(topicName.getNamespace()).get(namespaceBundle.toString())
.get(topicName.toString());
// only one consumer existed for internal subscription
Consumer consumer = persistentTopic.getSubscriptions()
.get(groupId).getDispatcher().getConsumers().get(0);
consumerFuture.complete(consumer);
} catch (Exception e) {
log.error("get topic error", e);
consumerFuture.complete(null);
}
return consumerFuture;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import io.netty.util.Recycler.Handle;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatter;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;

import java.util.Date;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -51,6 +53,7 @@
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchResponse.PartitionData;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.naming.TopicName;

/**
Expand Down Expand Up @@ -343,7 +346,23 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
} else if (apiVersion <= 3) {
magic = RecordBatch.MAGIC_VALUE_V1;
}
// get group and consumer
String clientHost = fetch.getClientHost();
String groupName = requestHandler
.getCurrentConnectedGroup().computeIfAbsent(clientHost, ignored -> {
String zkSubPath = ZooKeeperUtils.groupIdPathFormat(clientHost,
fetch.getHeader().clientId());
String groupId = ZooKeeperUtils.getData(requestHandler.getPulsarService().getZkClient(),
requestHandler.getGroupIdStoredPath(), zkSubPath);
log.info("get group name from zk for current connection:{} groupId:{}",
clientHost, groupId);
return groupId;
dockerzhang marked this conversation as resolved.
Show resolved Hide resolved
});
CompletableFuture<Consumer> consumerFuture = requestHandler.getTopicManager()
.getGroupConsumers(groupName, kafkaPartition);
final MemoryRecords records = requestHandler.getEntryFormatter().decode(entries, magic);
// collect consumer metrics
EntryFormatter.updateConsumerStats(records, consumerFuture);

List<FetchResponse.AbortedTransaction> abortedTransactions;
if (IsolationLevel.READ_UNCOMMITTED.equals(isolationLevel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void close() {
close(consumers.keySet());
}

private CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
return group.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@

import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.policies.data.ConsumerStats;


/**
Expand Down Expand Up @@ -61,4 +64,22 @@ static int parseNumMessages(final MemoryRecords records) {
}
return numMessages;
}

/**
* Update Consumer Stats.
*
* @param records messages with Kafka's format
* @param consumerFuture pulsar internal consumer
*/
static void updateConsumerStats(final MemoryRecords records, CompletableFuture<Consumer> consumerFuture) {
ConsumerStats consumerStats = new ConsumerStats();
consumerFuture.whenComplete((consumer, throwable) -> {
if (consumer == null || throwable != null) {
return;
}
consumerStats.bytesOutCounter = records.sizeInBytes();
consumerStats.msgOutCounter = parseNumMessages(records);
consumer.updateStats(consumerStats);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.utils;

import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;


/**
* Utils for ZooKeeper.
*/
@Slf4j
public class ZooKeeperUtils {
public static void createPath(ZooKeeper zooKeeper, String zkPath, String subPath, byte[] data) {
try {
if (zooKeeper.exists(zkPath, false) == null) {
zooKeeper.create(zkPath,
new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String addSubPath = zkPath + subPath;
if (zooKeeper.exists(addSubPath, false) == null) {
zooKeeper.create(addSubPath,
data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zooKeeper.setData(addSubPath, data, -1);
}
log.debug("create zk path, addSubPath:{} data:{}.",
addSubPath, new String(data, StandardCharsets.UTF_8));
} catch (Exception e) {
log.error("create zookeeper path error", e);
}
}

public static String getData(ZooKeeper zooKeeper, String zkPath, String subPath) {
String data = null;
try {
String addSubPath = zkPath + subPath;
Stat zkStat = zooKeeper.exists(addSubPath, true);
if (zkStat != null) {
data = new String(zooKeeper.getData(addSubPath, false, zkStat), StandardCharsets.UTF_8);
}
dockerzhang marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
log.error("get zookeeper path data error", e);
}
return data;
}

public static String groupIdPathFormat(String clientHost, String clientId) {
String path = clientHost.split(":")[0] + "-" + clientId;
return path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ public void testKafkaListeners() {
assertEquals(configuration.getListeners(), "PLAINTEXT://localhost:9093");
}

@Test
public void testGroupIdZooKeeperPath() {
String zkPathForKop = "/consumer_group_test";
KafkaServiceConfiguration configuration = new KafkaServiceConfiguration();
configuration.setGroupIdZooKeeperPath(zkPathForKop);
assertEquals("/consumer_group_test", configuration.getGroupIdZooKeeperPath());
}

@Test
public void testConfigurationUtilsStream() throws Exception {
File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
Expand Down