Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Cg empty member topic #476

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ target
*.iml
application.conf
!examples/standalone/application.conf
.bloop
.metals
.vscode
project
81 changes: 68 additions & 13 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,28 @@ class KafkaClient private[kafkalagexporter] (
groups <- adminClient.listConsumerGroups()
groupIds = getGroupIds(groups)
groupDescriptions <- adminClient.describeConsumerGroups(groupIds)
groupIdTopicMap <- getGroupTopicPartitionsMap(groupIds)

// Filters out groups with no active members, or with a topic that has no active members assigned.
noMemberGroups = groupDescriptions.asScala.filter { case (_, d) =>
d.members().isEmpty
d.members().isEmpty || hasTopicsWithoutActiveMembers(groupIdTopicMap, d)
}.keys
noMemberGroupsPartitionsInfo <- Future.sequence(
noMemberGroups.map(g => getGroupPartitionsInfo(g))
noMemberGroups.map(g => {
val desc = groupDescriptions.asScala.get(g)
val assignedTopics = desc
.map(d =>
d.members()
.asScala
.flatMap(
_.assignment().topicPartitions().asScala.map(_.topic())
)
.toSet
)
.getOrElse(Set.empty)

getGroupPartitionsInfo(g, assignedTopics)
})
)
} yield {
val gtps = groupDescriptions.asScala.flatMap { case (id, desc) =>
Expand All @@ -277,25 +294,63 @@ class KafkaClient private[kafkalagexporter] (
}
}

/** Retrieve the topic partitions map for consumer groups available in the
* cluster
*/
def getGroupTopicPartitionsMap(
consumerGroupIds: List[String]
): Future[Map[String, Set[String]]] = {
val futures =
consumerGroupIds.map(id => getTopicParitionByGroupId(id).map(id -> _))
Future.sequence(futures).map(_.toMap)
}

def getTopicParitionByGroupId(groupId: String): Future[Set[String]] = {
for {
offsetsMap <- adminClient.listConsumerGroupOffsets(groupId)
topicPartitions = offsetsMap.keySet.asScala.toSet
} yield topicPartitions.map(_.topic())
}

/** Check if a consumer group has topics without active members
*/
def hasTopicsWithoutActiveMembers(
groupIdTopicMap: Map[String, Set[String]],
groupDescription: ConsumerGroupDescription
): Boolean = {
val groupId = groupDescription.groupId()
val allTopics = groupIdTopicMap.getOrElse(groupId, Set.empty)
val assignedTopics = groupDescription
.members()
.asScala
.flatMap(_.assignment().topicPartitions().asScala.map(_.topic()))
.toSet

allTopics.exists(topic => !assignedTopics.contains(topic))
}

/** Retrieve partitions by consumer group ID. This is used in case when
* members info is unavailable for the group.
* members info is unavailable for the group or for certain topics.
*/
def getGroupPartitionsInfo(
groupId: String
groupId: String,
assignedTopics: Set[String]
): Future[List[Domain.GroupTopicPartition]] = {
for {
offsetsMap <- adminClient.listConsumerGroupOffsets(groupId)
topicPartitions = offsetsMap.keySet.asScala.toList
} yield topicPartitions.map(ktp =>
Domain.GroupTopicPartition(
groupId,
"unknown",
"unknown",
"unknown",
ktp.topic(),
ktp.partition()
} yield topicPartitions
.filter(ktp => !assignedTopics.contains(ktp.topic()))
.map(ktp =>
Domain.GroupTopicPartition(
groupId,
"unknown",
"unknown",
"unknown",
ktp.topic(),
ktp.partition()
)
)
)
}

private[kafkalagexporter] def getGroupIds(
Expand Down