Skip to content

Commit

Permalink
Merge pull request #2 from manjeetvk/master
Browse files Browse the repository at this point in the history
Added code that parallelize offset request per broker to improve performance
  • Loading branch information
wushujames authored Apr 28, 2018
2 parents 631e2fb + c5ca2c4 commit 89805ec
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,21 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import kafka.admin.AdminClient;
import kafka.admin.AdminClient.ConsumerGroupSummary;
import kafka.admin.AdminClient.ConsumerSummary;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;


public class ConsumerGroupLag {

Expand All @@ -39,14 +37,16 @@ static public class PartitionState {

}

public static void main(String[] args) throws JsonProcessingException {
public static void main(String[] args) throws JsonProcessingException, OffsetFetchException {

String bootstrapServer;
String group;
boolean outputAsJson = false;
boolean includeStartOffset = false;
Long groupStabilizationTimeoutMs = 5000L;

Long waitForResponseTime = 1000L;


// parse command-line arguments to get bootstrap.servers and group.id
Options options = new Options();
try {
Expand All @@ -62,6 +62,10 @@ public static void main(String[] args) throws JsonProcessingException {
.argName("groupStabilizationTimeoutMs")
.desc("time (ms) to wait for the consumer group description to be avaialble (e.g. wait for a consumer group rebalance to complete)")
.build());
options.addOption(Option.builder("w").longOpt("response-wait-time")
.argName("waitForResponseTime")
.desc("time (ms) to wait for asking request response.")
.build());
CommandLine line = new DefaultParser().parse(options, args);
bootstrapServer = line.getOptionValue("bootstrap-server");
group = line.getOptionValue("group");
Expand All @@ -74,7 +78,6 @@ public static void main(String[] args) throws JsonProcessingException {
}



Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Expand All @@ -84,6 +87,7 @@ public static void main(String[] args) throws JsonProcessingException {
Properties props2 = new Properties();
props2.put("bootstrap.servers", bootstrapServer);
AdminClient ac = AdminClient.create(props);

/*
* {
* "topic-name" : {
Expand All @@ -101,7 +105,6 @@ public static void main(String[] args) throws JsonProcessingException {
* }
* }
*/
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Collection<TopicPartition> c = new ArrayList<TopicPartition>();


Expand All @@ -112,7 +115,7 @@ public static void main(String[] args) throws JsonProcessingException {
System.out.println();
System.exit(1);
}

if (!summary.state().equals("Stable")) {
// PreparingRebalance, AwaitingSync
System.err.format("Warning: Consumer group %s has state %s.", group, summary.state());
Expand All @@ -126,24 +129,35 @@ public static void main(String[] args) throws JsonProcessingException {
System.out.println();
System.exit(1);
}

Map<TopicPartition, ConsumerSummary> whoOwnsPartition = new HashMap<TopicPartition, ConsumerSummary>();
List<String> topicNamesList = new ArrayList<String>();

for (ConsumerSummary cs : csList) {
scala.collection.immutable.List<TopicPartition> scalaAssignment = cs.assignment();
List<TopicPartition> assignment = scala.collection.JavaConversions.seqAsJavaList(scalaAssignment);

for (TopicPartition tp : assignment) {
whoOwnsPartition.put(tp, cs);
topicNamesList.add(tp.topic());
}
c.addAll(assignment);
}

Map<TopicPartition, Long> endOffsets = consumer.endOffsets(c);
Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(c);
//Get Per broker topic info based on topic list
TopicPartitionsOffsetInfo topicPartitionsOffsetInfo = new TopicPartitionsOffsetInfo(ac);
topicPartitionsOffsetInfo.storeTopicPartitionPerNodeInfo(props, topicNamesList);

//Get endoffset and BeginningOffsets info for all topics
Map<TopicPartition, Long> endOffsets = topicPartitionsOffsetInfo.getEndOffsets();
Map<TopicPartition, Long> beginningOffsets = topicPartitionsOffsetInfo.getBeginningOffsets();

//Get commited offset info
topicPartitionsOffsetInfo.findCoordinatorNodeForGroup(group, waitForResponseTime);
Map<TopicPartition, PartitionData> commitedOffsets = topicPartitionsOffsetInfo.getCommitedOffsets(group, (List<TopicPartition>) c, waitForResponseTime);

Map<String, Map<Integer, PartitionState>> results = new HashMap<String, Map<Integer, PartitionState>>();
consumer.assign(c);

for (TopicPartition tp : c) {

if (!results.containsKey(tp.topic())) {
Expand All @@ -155,26 +169,31 @@ public static void main(String[] args) throws JsonProcessingException {
}
PartitionState partitionMap = topicMap.get(tp.partition());

OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
PartitionData topicPartitionCommitedOffset = commitedOffsets.get(tp);
//-1: No Info available
if(topicPartitionCommitedOffset.offset == -1){
topicPartitionCommitedOffset = null;
}

long end = endOffsets.get(tp);
long begin = beginningOffsets.get(tp);

partitionMap.logStartOffset = begin;
partitionMap.logEndOffset = end;
partitionMap.partition = tp.partition();

if (offsetAndMetadata == null) {
if (topicPartitionCommitedOffset == null) {
// no committed offsets
partitionMap.currentOffset = "unknown";
} else {
partitionMap.currentOffset = offsetAndMetadata.offset();
partitionMap.currentOffset = topicPartitionCommitedOffset.offset;
}

if (offsetAndMetadata == null) {
if (topicPartitionCommitedOffset == null || end == -1) {
// no committed offsets
partitionMap.lag = "unknown";
} else {
partitionMap.lag = end - offsetAndMetadata.offset();
partitionMap.lag = end - topicPartitionCommitedOffset.offset;
}
ConsumerSummary cs = whoOwnsPartition.get(tp);
partitionMap.consumerId = cs.consumerId();
Expand Down Expand Up @@ -210,6 +229,5 @@ public static void main(String[] args) throws JsonProcessingException {
}
}
System.exit(0);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.wushujames.kafka;

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;

public class KafkaApiRequest {

final private ConsumerNetworkClient networkClient;
private RequestFuture<ClientResponse> clientResponse = null;

KafkaApiRequest(final ConsumerNetworkClient networkClient){
this.networkClient = networkClient;
}

public void sendApiRequest(final Node node, final AbstractRequest.Builder<?> requestBuilder){
this.clientResponse = this.networkClient.send(node, requestBuilder);
}

public AbstractResponse getLastApiResponse(final long waitTimeMsBetweenCheckingResponse){

while(!this.clientResponse.isDone()){
try {
Thread.sleep(waitTimeMsBetweenCheckingResponse);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return this.clientResponse.value().responseBody();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.wushujames.kafka;

public class OffsetFetchException extends Exception {
public OffsetFetchException(String message) {
super(message);
}
}
Loading

0 comments on commit 89805ec

Please sign in to comment.