Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
🐛 handle simple comsumers
Browse files Browse the repository at this point in the history
  • Loading branch information
yazgoo committed Feb 10, 2020
1 parent 0371e33 commit 270c846
Showing 1 changed file with 34 additions and 1 deletion.
35 changes: 34 additions & 1 deletion src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ object KafkaClient {
def listConsumerGroups(): Future[util.Collection[ConsumerGroupListing]]
def describeConsumerGroups(groupIds: List[String]): Future[util.Map[String, ConsumerGroupDescription]]
def listConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]]
def listTopicsNames(): Future[util.Set[String]]
def describeTopics(topicNames: util.Collection[String]): Future[util.Map[String,TopicDescription]]
def close(): Unit
}

Expand All @@ -111,8 +113,10 @@ object KafkaClient {
private val listConsumerGroupsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs(_clientTimeout.toMillis.toInt)

def listConsumerGroups(): Future[util.Collection[ConsumerGroupListing]] = kafkaFuture(client.listConsumerGroups(listGroupOptions).all())
def listTopicsNames(): Future[util.Set[String]] = kafkaFuture(client.listTopics().names())
def describeConsumerGroups(groupIds: List[String]): Future[util.Map[String, ConsumerGroupDescription]] =
kafkaFuture(client.describeConsumerGroups(groupIds.asJava, describeGroupOptions).all())
def describeTopics(topicNames: util.Collection[String]) = kafkaFuture(client.describeTopics(topicNames).all())
def listConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] =
kafkaFuture(client.listConsumerGroupOffsets(group, listConsumerGroupsOptions).partitionsToOffsetAndMetadata())
def close(): Unit = client.close(_clientTimeout)
Expand Down Expand Up @@ -150,15 +154,44 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
groups <- adminClient.listConsumerGroups()
groupIds = getGroupIds(groups)
groupDescriptions <- adminClient.describeConsumerGroups(groupIds)
top <- adminClient.listTopicsNames()
descriptions <- adminClient.describeTopics(top)
} yield {
val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => groupTopicPartitions(id, desc) }.toList
val gtps = groupDescriptions.asScala.flatMap { case (id, desc) =>
if(desc.members().isEmpty()) {
descriptions.asScala.flatMap { case (_, description) =>
groupTopicPartitionsForSimpleConsumer(id, description.partitions().size(), description.name())
}
}
else {
groupTopicPartitions(id, desc)
}
}.toList
(groupIds, gtps)
}
}

private[kafkalagexporter] def getGroupIds(groups: util.Collection[ConsumerGroupListing]): List[String] =
groups.asScala.map(_.groupId()).toList.filter(g => cluster.groupWhitelist.exists(r => g.matches(r)))

private[kafkalagexporter] def groupTopicPartitionsForSimpleConsumer(groupId: String, numPartitions: Int, topic: String): List[Domain.GroupTopicPartition] = {
if (cluster.topicWhitelist.exists(r => topic.matches(r)) && !cluster.topicBlacklist.exists(r => topic.matches(r))) {
Seq.range(0, numPartitions - 1).map { i =>
Domain.GroupTopicPartition(
groupId,
s"client-${i}",
s"consumer-${i}",
s"host-${i}",
topic,
i
)
}.toList
}
else {
Nil
}
}

private[kafkalagexporter] def groupTopicPartitions(groupId: String, desc: ConsumerGroupDescription): List[Domain.GroupTopicPartition] = {
val groupTopicPartitions = for {
member <- desc.members().asScala
Expand Down

0 comments on commit 270c846

Please sign in to comment.