Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
kop using pulsar consumer metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed Dec 11, 2020
1 parent 905739f commit 961c2b0
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 7 deletions.
3 changes: 3 additions & 0 deletions kafka-impl/conf/kop.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ offsetsTopicNumPartitions=8
# Maximum number of entries that are read from cursor once per time
maxReadEntriesNum=5

# Zookeeper path for storing kop consumer group
groupIdZooKeeperPath="/client_group_id"

### --- KoP SSL configs--- ###

# Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.OffsetFinder;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;

import java.net.InetSocketAddress;
import java.net.URI;
Expand Down Expand Up @@ -140,7 +141,7 @@
public class KafkaRequestHandler extends KafkaCommandDecoder {
public static final long DEFAULT_TIMESTAMP = 0L;

private final PulsarService pulsarService;
public final PulsarService pulsarService;
private final KafkaServiceConfiguration kafkaConfig;
private final KafkaTopicManager topicManager;
private final GroupCoordinator groupCoordinator;
Expand All @@ -157,6 +158,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final int sslPort;
private final int defaultNumPartitions;
public final int maxReadEntriesNum;
public static ConcurrentHashMap<String, String> currentConnectedGroup;
public static String groupIdStoredPath;

private final Map<TopicPartition, PendingProduceQueue> pendingProduceQueueMap = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -184,6 +187,8 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.topicManager = new KafkaTopicManager(this);
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
this.currentConnectedGroup = new ConcurrentHashMap<>();
this.groupIdStoredPath = kafkaConfig.getGroupIdZooKeeperPath();
}

@Override
Expand Down Expand Up @@ -211,6 +216,11 @@ protected void close() {
log.info("close channel {}", ctx.channel());
writeAndFlushWhenInactiveChannel(ctx.channel());
ctx.close();
String clientHost = ctx.channel().remoteAddress().toString();
if (currentConnectedGroup.containsKey(clientHost)){
log.info("currentConnectedGroup remove {}", clientHost);
currentConnectedGroup.remove(clientHost);
}
topicManager.close();
}
}
Expand Down Expand Up @@ -626,9 +636,16 @@ protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinato
FindCoordinatorRequest request = (FindCoordinatorRequest) findCoordinator.getRequest();

if (request.coordinatorType() == FindCoordinatorRequest.CoordinatorType.GROUP) {
int partition = groupCoordinator.partitionFor(request.coordinatorKey());
String groupId = request.coordinatorKey();
int partition = groupCoordinator.partitionFor(groupId);
String pulsarTopicName = groupCoordinator.getTopicPartitionName(partition);

// store group name to zk for current client
String zkSubPath = ZooKeeperUtils.groupIdPathFormat(findCoordinator.getClientHost(),
findCoordinator.getHeader().clientId());
ZooKeeperUtils.createPath(pulsarService.getZkClient(), groupIdStoredPath,
zkSubPath, groupId.getBytes());

findBroker(TopicName.get(pulsarTopicName))
.whenComplete((node, t) -> {
if (t != null || node == null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,10 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private int maxReadEntriesNum = 5;

@FieldContext(
category = CATEGORY_KOP,
doc = "Zookeeper path for storing kop consumer group"
)
private String groupIdZooKeeperPath = "/client_group_id";

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import io.streamnative.pulsar.handlers.kop.utils.ZooKeeperUtils;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -50,6 +51,10 @@
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchResponse.PartitionData;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;

/**
* MessageFetchContext handling FetchRequest .
Expand Down Expand Up @@ -313,9 +318,46 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
} else if (apiVersion <= 3) {
magic = RecordBatch.MAGIC_VALUE_V1;
}

String clientHost = fetch.getClientHost();
// get stored group name from zk
if (!requestHandler.currentConnectedGroup.containsKey(clientHost)) {
String zkSubPath = ZooKeeperUtils.groupIdPathFormat(clientHost,
fetch.getHeader().clientId());
String groupId = ZooKeeperUtils.getData(requestHandler.pulsarService.getZkClient(),
requestHandler.groupIdStoredPath, zkSubPath);
requestHandler.currentConnectedGroup.put(clientHost, groupId);
log.info("get group name from zk for current connection:{} groupId:{}",
clientHost, groupId);
}

Consumer consumer = null;
try {
String groupId = requestHandler.currentConnectedGroup.get(clientHost);
TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition));
NamespaceBundle namespaceBundle = requestHandler.pulsarService.getBrokerService()
.pulsar().getNamespaceService().getBundle(topicName);
// make sure internal consumer existed
requestHandler.getGroupCoordinator()
.getoffsetAcker().getConsumer(groupId, kafkaPartition).get();

PersistentTopic persistentTopic = (PersistentTopic) requestHandler.pulsarService
.getBrokerService().getMultiLayerTopicsMap()
.get(topicName.getNamespace()).get(namespaceBundle.toString())
.get(topicName.toString());

// only one consumer existed for internal subscription
consumer = persistentTopic.getSubscriptions()
.get(groupId).getDispatcher().getConsumers().get(0);
} catch (InterruptedException e) {
log.error("get topic error", e);
} catch (Exception e) {
log.error("get topic error", e);
}

MemoryRecords records;
// by default kafka is produced message in batched mode.
records = entriesToRecords(entries, magic);
records = entriesToRecords(entries, magic, consumer);

partitionData = new FetchResponse.PartitionData(
Errors.NONE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ public void shutdown() {
log.info("Shutdown group coordinator completely.");
}

public OffsetAcker getoffsetAcker() {
return offsetAcker;
}

public int partitionFor(String coordinatorKey) {
return groupManager.partitionFor(coordinatorKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;

/**
Expand All @@ -43,7 +44,8 @@ public class OffsetAcker implements Closeable {
public OffsetAcker(PulsarClientImpl pulsarClient) {
this.consumerBuilder = pulsarClient.newConsumer()
.receiverQueueSize(0)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared);
}

// map off consumser: <groupId, consumers>
Expand Down Expand Up @@ -103,7 +105,7 @@ public void close() {
close(consumers.keySet());
}

private CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
return group.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import lombok.experimental.UtilityClass;
Expand All @@ -38,6 +40,8 @@
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -334,7 +338,8 @@ private static Header[] getHeadersFromMetadata(List<KeyValue> properties) {

// Convert entries read from BookKeeper into Kafka Records
// Entries can be batched messages, may need un-batch.
public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.Entry> entries, byte magic) {
public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.Entry> entries, byte magic,
Consumer consumer) {
try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, magic,
org.apache.kafka.common.record.CompressionType.NONE,
Expand All @@ -347,10 +352,15 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
MAX_RECORDS_BUFFER_SIZE);
int batchSizes = entries.size();
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
AtomicLong totalBytes = new AtomicLong(0);
AtomicInteger totalMessages = new AtomicInteger(0);

entries.parallelStream().forEachOrdered(entry -> {
// each entry is a batched message
ByteBuf metadataAndPayload = entry.getDataBuffer();
totalBytes.getAndAdd(metadataAndPayload.readableBytes());

// Uncompress the payload if necessary
MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload);
Expand All @@ -364,6 +374,8 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
throw new UncheckedIOException(ioe);
}
int numMessages = msgMetadata.getNumMessagesInBatch();
totalMessages.getAndAdd(numMessages);

boolean notBatchMessage = (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch());

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -419,6 +431,8 @@ public static MemoryRecords entriesToRecords(List<org.apache.bookkeeper.mledger.
payload.release();
entry.release();
});
consumer.updateStats(batchSizes, batchIndexesAcks, totalMessages.get(),
totalBytes.get(), 0);
return builder.build();
} catch (IOException ioe){
log.error("Meet IOException: {}", ioe);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.streamnative.pulsar.handlers.kop.utils;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
* Utils for ZooKeeper.
*/
@Slf4j
public class ZooKeeperUtils {
public static void createPath(ZooKeeper zooKeeper, String zkPath, String subPath, byte[] data) {
try {
if (zooKeeper.exists(zkPath, false) == null) {
zooKeeper.create(zkPath,
data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String addSubPath = zkPath + subPath;
if (zooKeeper.exists(addSubPath, false) == null) {
zooKeeper.create(addSubPath,
data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
zooKeeper.setData(addSubPath, data, -1);
}
log.debug("create zk path, addSubPath:{} data:{}.",
addSubPath, new String(data));
} catch (Exception e) {
log.error("create zookeeper path error", e);
}
}

public static String getData(ZooKeeper zooKeeper, String zkPath, String subPath) {
String data = "";
try {
String addSubPath = zkPath + subPath;
Stat zkStat = zooKeeper.exists(addSubPath, true);
if (zkStat != null) {
data = new String(zooKeeper.getData(addSubPath, false, zkStat));
}
} catch (Exception e) {
log.error("get zookeeper path data error", e);
}
return data;
}

public static String groupIdPathFormat(String clientHost, String clientId) {
String path = clientHost.split(":")[0] + "-" + clientId;
return path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public void testMaxReadEntriesNum() {
assertEquals(60, configuration.getMaxReadEntriesNum());
}

@Test
public void testGroupIdZooKeeperPath() {
String zkPathForKop = "/consumer_group_test";
KafkaServiceConfiguration configuration = new KafkaServiceConfiguration();
configuration.setGroupIdZooKeeperPath(zkPathForKop);
assertEquals("/consumer_group_test", configuration.getGroupIdZooKeeperPath());
}

@Test
public void testConfigurationUtilsStream() throws Exception {
File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<log4j2.version>2.13.3</log4j2.version>
<lombok.version>1.18.4</lombok.version>
<mockito.version>2.22.0</mockito.version>
<pulsar.version>2.6.2.0</pulsar.version>
<pulsar.version>2.6.2</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
<spotbugs-annotations.version>3.1.8</spotbugs-annotations.version>
<testcontainers.version>1.12.5</testcontainers.version>
Expand Down

0 comments on commit 961c2b0

Please sign in to comment.