From 5235643a196a6cdb3b01eb3030abc561cf5463ff Mon Sep 17 00:00:00 2001 From: lilyevsky Date: Tue, 3 Mar 2020 15:07:21 -0500 Subject: [PATCH 1/2] Support consumer groups for which member information is unavailable. Also minor fix in setting properties, KafkaClient.scala lines 61 and 73: the documentation for java.util.Properties suggests not to use putAll method because it is not type-safe (in fact, in my environment those two lines did not even compile). They suggest using setProperty method. --- .../kafkalagexporter/KafkaClient.scala | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index 6e6cd04c..7b54e334 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -58,7 +58,7 @@ object KafkaClient { private def createAdminClient(cluster: KafkaCluster, clientTimeout: FiniteDuration): AdminClient = { val props = new Properties() // AdminClient config: https://kafka.apache.org/documentation/#adminclientconfigs - props.putAll(cluster.adminClientProperties.asJava) + cluster.adminClientProperties foreach { case (k, v) => props.setProperty(k, v)} props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers) props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout.toMillis.toString) props.put(AdminClientConfig.RETRIES_CONFIG, AdminClientConfigRetries.toString) @@ -70,7 +70,7 @@ object KafkaClient { val props = new Properties() val deserializer = (new ByteArrayDeserializer).getClass.getName // KafkaConsumer config: https://kafka.apache.org/documentation/#consumerconfigs - props.putAll(cluster.consumerProperties.asJava) + cluster.consumerProperties foreach { case (k, v) => props.setProperty(k, v)} props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers) props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfigAutoCommit.toString) @@ -150,12 +150,34 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, groups <- adminClient.listConsumerGroups() groupIds = getGroupIds(groups) groupDescriptions <- adminClient.describeConsumerGroups(groupIds) + noMemberGroups = groupDescriptions.asScala.filter{ case (_, d) => d.members().isEmpty}.keys + noMemberGroupsPartitionsInfo <- Future.sequence(noMemberGroups.map(g => getGroupPartitionsInfo(g))) } yield { val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => groupTopicPartitions(id, desc) }.toList - (groupIds, gtps) + val gtpsNoMembers = noMemberGroupsPartitionsInfo.flatten + (groupIds, gtps ++ gtpsNoMembers) } } + /** + * Retrieve partitions by consumer group ID. This is used in case when members info is unavailable for the group. + * @param groupId + * @return + */ + def getGroupPartitionsInfo(groupId: String): Future[List[Domain.GroupTopicPartition]] = { + for { + offsetsMap <- adminClient.listConsumerGroupOffsets(groupId) + topicPartitions = offsetsMap.keySet.asScala.toList + } yield topicPartitions.map(ktp => Domain.GroupTopicPartition( + groupId, + "unknown", + "unknown", + "unknown", + ktp.topic(), + ktp.partition() + )) + } + private[kafkalagexporter] def getGroupIds(groups: util.Collection[ConsumerGroupListing]): List[String] = groups.asScala.map(_.groupId()).toList.filter(g => cluster.groupWhitelist.exists(r => g.matches(r))) From 3f3672a4e647b7ec27f9dfadff140b958409d6a4 Mon Sep 17 00:00:00 2001 From: lilyevsky Date: Mon, 9 Mar 2020 09:54:59 -0400 Subject: [PATCH 2/2] Remove two comment lines --- src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index 7b54e334..72efc902 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -161,8 +161,6 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, /** * Retrieve partitions by consumer group ID. This is used in case when members info is unavailable for the group. - * @param groupId - * @return */ def getGroupPartitionsInfo(groupId: String): Future[List[Domain.GroupTopicPartition]] = { for {