From 11afde35c94a4bde1210082e26bf7f05e87f06cd Mon Sep 17 00:00:00 2001 From: Gor Stepanyan Date: Thu, 20 Jul 2023 16:42:41 +0200 Subject: [PATCH 1/2] .gitignore: Ignore cached files from IDE's --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 40eb33b5..cd937260 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,7 @@ target *.iml application.conf !examples/standalone/application.conf +.bloop +.metals +.vscode +project From ee8d9d819e15926ce20fe579996359b787aa3d83 Mon Sep 17 00:00:00 2001 From: Gor Stepanyan Date: Wed, 19 Jul 2023 13:55:55 +0200 Subject: [PATCH 2/2] KafkaClient: Collect metrics for ConsumerGroup topics without active members Currently, lags are calculated for consumer groups that have no active members at all. If a group consumes from multiple topics and certain topic does not have active members, lag exporter should also collect metrics for it. --- .../kafkalagexporter/KafkaClient.scala | 81 ++++++++++++++++--- 1 file changed, 68 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index aa7e6279..9a796c32 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -262,11 +262,28 @@ class KafkaClient private[kafkalagexporter] ( groups <- adminClient.listConsumerGroups() groupIds = getGroupIds(groups) groupDescriptions <- adminClient.describeConsumerGroups(groupIds) + groupIdTopicMap <- getGroupTopicPartitionsMap(groupIds) + + // Filters out groups with no active members, or with a topic that has no active members assigned. noMemberGroups = groupDescriptions.asScala.filter { case (_, d) => - d.members().isEmpty + d.members().isEmpty || hasTopicsWithoutActiveMembers(groupIdTopicMap, d) }.keys noMemberGroupsPartitionsInfo <- Future.sequence( - noMemberGroups.map(g => getGroupPartitionsInfo(g)) + noMemberGroups.map(g => { + val desc = groupDescriptions.asScala.get(g) + val assignedTopics = desc + .map(d => + d.members() + .asScala + .flatMap( + _.assignment().topicPartitions().asScala.map(_.topic()) + ) + .toSet + ) + .getOrElse(Set.empty) + + getGroupPartitionsInfo(g, assignedTopics) + }) ) } yield { val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => @@ -277,25 +294,63 @@ class KafkaClient private[kafkalagexporter] ( } } + /** Retrieve the topic partitions map for consumer groups available in the + * cluster + */ + def getGroupTopicPartitionsMap( + consumerGroupIds: List[String] + ): Future[Map[String, Set[String]]] = { + val futures = + consumerGroupIds.map(id => getTopicParitionByGroupId(id).map(id -> _)) + Future.sequence(futures).map(_.toMap) + } + + def getTopicParitionByGroupId(groupId: String): Future[Set[String]] = { + for { + offsetsMap <- adminClient.listConsumerGroupOffsets(groupId) + topicPartitions = offsetsMap.keySet.asScala.toSet + } yield topicPartitions.map(_.topic()) + } + + /** Check if a consumer group has topics without active members + */ + def hasTopicsWithoutActiveMembers( + groupIdTopicMap: Map[String, Set[String]], + groupDescription: ConsumerGroupDescription + ): Boolean = { + val groupId = groupDescription.groupId() + val allTopics = groupIdTopicMap.getOrElse(groupId, Set.empty) + val assignedTopics = groupDescription + .members() + .asScala + .flatMap(_.assignment().topicPartitions().asScala.map(_.topic())) + .toSet + + allTopics.exists(topic => !assignedTopics.contains(topic)) + } + /** Retrieve partitions by consumer group ID. This is used in case when - * members info is unavailable for the group. + * members info is unavailable for the group or for certain topics. */ def getGroupPartitionsInfo( - groupId: String + groupId: String, + assignedTopics: Set[String] ): Future[List[Domain.GroupTopicPartition]] = { for { offsetsMap <- adminClient.listConsumerGroupOffsets(groupId) topicPartitions = offsetsMap.keySet.asScala.toList - } yield topicPartitions.map(ktp => - Domain.GroupTopicPartition( - groupId, - "unknown", - "unknown", - "unknown", - ktp.topic(), - ktp.partition() + } yield topicPartitions + .filter(ktp => !assignedTopics.contains(ktp.topic())) + .map(ktp => + Domain.GroupTopicPartition( + groupId, + "unknown", + "unknown", + "unknown", + ktp.topic(), + ktp.partition() + ) ) - ) } private[kafkalagexporter] def getGroupIds(