diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index 6e6cd04c..72efc902 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -58,7 +58,7 @@ object KafkaClient { private def createAdminClient(cluster: KafkaCluster, clientTimeout: FiniteDuration): AdminClient = { val props = new Properties() // AdminClient config: https://kafka.apache.org/documentation/#adminclientconfigs - props.putAll(cluster.adminClientProperties.asJava) + cluster.adminClientProperties foreach { case (k, v) => props.setProperty(k, v)} props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers) props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout.toMillis.toString) props.put(AdminClientConfig.RETRIES_CONFIG, AdminClientConfigRetries.toString) @@ -70,7 +70,7 @@ object KafkaClient { val props = new Properties() val deserializer = (new ByteArrayDeserializer).getClass.getName // KafkaConsumer config: https://kafka.apache.org/documentation/#consumerconfigs - props.putAll(cluster.consumerProperties.asJava) + cluster.consumerProperties foreach { case (k, v) => props.setProperty(k, v)} props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers) props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfigAutoCommit.toString) @@ -150,12 +150,32 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, groups <- adminClient.listConsumerGroups() groupIds = getGroupIds(groups) groupDescriptions <- adminClient.describeConsumerGroups(groupIds) + noMemberGroups = groupDescriptions.asScala.filter{ case (_, d) => d.members().isEmpty}.keys + noMemberGroupsPartitionsInfo <- Future.sequence(noMemberGroups.map(g => getGroupPartitionsInfo(g))) } yield { val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => groupTopicPartitions(id, desc) }.toList - (groupIds, gtps) + val gtpsNoMembers = noMemberGroupsPartitionsInfo.flatten + (groupIds, gtps ++ gtpsNoMembers) } } + /** + * Retrieve partitions by consumer group ID. This is used in case when members info is unavailable for the group. + */ + def getGroupPartitionsInfo(groupId: String): Future[List[Domain.GroupTopicPartition]] = { + for { + offsetsMap <- adminClient.listConsumerGroupOffsets(groupId) + topicPartitions = offsetsMap.keySet.asScala.toList + } yield topicPartitions.map(ktp => Domain.GroupTopicPartition( + groupId, + "unknown", + "unknown", + "unknown", + ktp.topic(), + ktp.partition() + )) + } + private[kafkalagexporter] def getGroupIds(groups: util.Collection[ConsumerGroupListing]): List[String] = groups.asScala.map(_.groupId()).toList.filter(g => cluster.groupWhitelist.exists(r => g.matches(r)))