Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Support consumer groups for which member information is unavailable. #128

Merged
merged 2 commits into from
May 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object KafkaClient {
private def createAdminClient(cluster: KafkaCluster, clientTimeout: FiniteDuration): AdminClient = {
val props = new Properties()
// AdminClient config: https://kafka.apache.org/documentation/#adminclientconfigs
props.putAll(cluster.adminClientProperties.asJava)
cluster.adminClientProperties foreach { case (k, v) => props.setProperty(k, v)}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout.toMillis.toString)
props.put(AdminClientConfig.RETRIES_CONFIG, AdminClientConfigRetries.toString)
Expand All @@ -70,7 +70,7 @@ object KafkaClient {
val props = new Properties()
val deserializer = (new ByteArrayDeserializer).getClass.getName
// KafkaConsumer config: https://kafka.apache.org/documentation/#consumerconfigs
props.putAll(cluster.consumerProperties.asJava)
cluster.consumerProperties foreach { case (k, v) => props.setProperty(k, v)}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfigAutoCommit.toString)
Expand Down Expand Up @@ -150,12 +150,32 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
groups <- adminClient.listConsumerGroups()
groupIds = getGroupIds(groups)
groupDescriptions <- adminClient.describeConsumerGroups(groupIds)
noMemberGroups = groupDescriptions.asScala.filter{ case (_, d) => d.members().isEmpty}.keys
noMemberGroupsPartitionsInfo <- Future.sequence(noMemberGroups.map(g => getGroupPartitionsInfo(g)))
} yield {
val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => groupTopicPartitions(id, desc) }.toList
(groupIds, gtps)
val gtpsNoMembers = noMemberGroupsPartitionsInfo.flatten
(groupIds, gtps ++ gtpsNoMembers)
}
}

/**
* Retrieve partitions by consumer group ID. This is used in case when members info is unavailable for the group.
*/
def getGroupPartitionsInfo(groupId: String): Future[List[Domain.GroupTopicPartition]] = {
for {
offsetsMap <- adminClient.listConsumerGroupOffsets(groupId)
topicPartitions = offsetsMap.keySet.asScala.toList
} yield topicPartitions.map(ktp => Domain.GroupTopicPartition(
groupId,
"unknown",
"unknown",
"unknown",
ktp.topic(),
ktp.partition()
))
}

private[kafkalagexporter] def getGroupIds(groups: util.Collection[ConsumerGroupListing]): List[String] =
groups.asScala.map(_.groupId()).toList.filter(g => cluster.groupWhitelist.exists(r => g.matches(r)))

Expand Down