Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Support consumer groups for which member information is unavailable. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lilyevsky authored May 8, 2020
1 parent a627b9f commit b48d5ba
Showing 1 changed file with 23 additions and 3 deletions.
26 changes: 23 additions & 3 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object KafkaClient {
private def createAdminClient(cluster: KafkaCluster, clientTimeout: FiniteDuration): AdminClient = {
val props = new Properties()
// AdminClient config: https://kafka.apache.org/documentation/#adminclientconfigs
props.putAll(cluster.adminClientProperties.asJava)
cluster.adminClientProperties foreach { case (k, v) => props.setProperty(k, v)}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout.toMillis.toString)
props.put(AdminClientConfig.RETRIES_CONFIG, AdminClientConfigRetries.toString)
Expand All @@ -70,7 +70,7 @@ object KafkaClient {
val props = new Properties()
val deserializer = (new ByteArrayDeserializer).getClass.getName
// KafkaConsumer config: https://kafka.apache.org/documentation/#consumerconfigs
props.putAll(cluster.consumerProperties.asJava)
cluster.consumerProperties foreach { case (k, v) => props.setProperty(k, v)}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfigAutoCommit.toString)
Expand Down Expand Up @@ -150,12 +150,32 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
groups <- adminClient.listConsumerGroups()
groupIds = getGroupIds(groups)
groupDescriptions <- adminClient.describeConsumerGroups(groupIds)
noMemberGroups = groupDescriptions.asScala.filter{ case (_, d) => d.members().isEmpty}.keys
noMemberGroupsPartitionsInfo <- Future.sequence(noMemberGroups.map(g => getGroupPartitionsInfo(g)))
} yield {
val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => groupTopicPartitions(id, desc) }.toList
(groupIds, gtps)
val gtpsNoMembers = noMemberGroupsPartitionsInfo.flatten
(groupIds, gtps ++ gtpsNoMembers)
}
}

/**
* Retrieve partitions by consumer group ID. This is used in case when members info is unavailable for the group.
*/
def getGroupPartitionsInfo(groupId: String): Future[List[Domain.GroupTopicPartition]] = {
for {
offsetsMap <- adminClient.listConsumerGroupOffsets(groupId)
topicPartitions = offsetsMap.keySet.asScala.toList
} yield topicPartitions.map(ktp => Domain.GroupTopicPartition(
groupId,
"unknown",
"unknown",
"unknown",
ktp.topic(),
ktp.partition()
))
}

private[kafkalagexporter] def getGroupIds(groups: util.Collection[ConsumerGroupListing]): List[String] =
groups.asScala.map(_.groupId()).toList.filter(g => cluster.groupWhitelist.exists(r => g.matches(r)))

Expand Down

0 comments on commit b48d5ba

Please sign in to comment.