Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Implement consumer group whitelist & misc. refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Sep 22, 2019
1 parent 7063cca commit ad9d39f
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 146 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,18 @@ General Configuration (`kafka-lag-exporter{}`)
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |
| `topic-whitelist` | `[".*"]` | Regex of topics monitored, e.g. `["input-.+", "output-.+"]` |

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

| Key | Default | Required | Description |
|---------------------------|-------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `name` | `""` | Yes | A unique cluster name to for this Kafka connection detail object |
| `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames |
| `group-whitelist` | `[".*"]` | No | A list of Regex of consumer groups monitored. For example, if you only wish to expose only certain groups with `input` and `output` prefixes, use `["^input-.+", "^output-.+"]`. |
| `topic-whitelist` | `[".*"]` No | A list of Regex of topics monitored. For example, if you only wish to expose only certain topics, use either `["^topic.+"]` or `["topic1", "topic2"]`. |
| `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. |
| `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. |
| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. |
| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. |

Watchers (`kafka-lag-exporters.watchers{}`)

Expand All @@ -242,6 +243,9 @@ kafka-lag-exporter {
{
name = "a-cluster"
bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092"
topic-whitelist = [
"widgets-.+"
]
consumer-properties = {
client.id = "consumer-client-id"
}
Expand Down
22 changes: 16 additions & 6 deletions charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ data:
{
name = "{{ $cluster.name }}"
bootstrap-brokers = "{{ $cluster.bootstrapBrokers }}"
{{- if $cluster.groupWhitelist }}
group-whitelist = [
{{- $lastIndex := sub (len $cluster.groupWhitelist) 1}}
{{- range $i, $whitelist := $cluster.groupWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
{{- end }}
{{- if $cluster.topicWhitelist }}
topic-whitelist = [
{{- $lastIndex := sub (len $cluster.groupWhitelist) 1}}
{{- range $i, $whitelist := $cluster.groupWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
{{- end }}
consumer-properties = {
{{- range $key, $val := $cluster.consumerProperties }}
{{ $key }} = {{ quote $val }}
Expand Down Expand Up @@ -47,12 +63,6 @@ data:
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
topic-whitelist = [
{{- $lastIndex := sub (len .Values.topicWhitelist) 1}}
{{- range $i, $whitelist := .Values.topicWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
}
akka {
Expand Down
17 changes: 4 additions & 13 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ clusters: {}
#clusters:
# - name: "default"
# bootstrapBrokers: "simple-strimzi-kafka-bootstrap.strimzi.svc.cluster.local:9092"
# topicWhitelist:
# - "^xyz-corp-topics\..+"
# groupWhitelist:
# - "^analytics-app-.+"
# # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# # can be defined in this configuration section.
# # https://kafka.apache.org/documentation/#consumerconfigs
Expand Down Expand Up @@ -52,19 +56,6 @@ serviceAccount:
metricWhitelist:
- .*

## You can use regex to control the set of topics monitored.
## For example, if you only wish to expose only certain topics, use either:
## topicWhitelist:
## - ^topic.+
##
## Or
##
## topicWhitelist:
## - topic1
## - topic2
topicWhitelist:
- .*

## The log level of the ROOT logger
rootLogLevel: INFO
## The log level of Kafka Lag Exporter
Expand Down
1 change: 0 additions & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ kafka-lag-exporter {
strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI}
}
metric-whitelist = [".*"]
topic-whitelist = [".*"]
}

akka {
Expand Down
19 changes: 17 additions & 2 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,18 @@ object AppConfig {
).toMap
}.getOrElse(Map.empty[String, String])

val topicWhitelist = c.getStringList("topic-whitelist").asScala.toList
val groupWhitelist = if (clusterConfig.hasPath("group-whitelist"))
clusterConfig.getStringList("group-whitelist").asScala.toList
else KafkaCluster.GroupWhitelistDefault

val topicWhitelist = if (clusterConfig.hasPath("topic-whitelist"))
clusterConfig.getStringList("topic-whitelist").asScala.toList
else KafkaCluster.TopicWhitelistDefault

KafkaCluster(
clusterConfig.getString("name"),
clusterConfig.getString("bootstrap-brokers"),
groupWhitelist,
topicWhitelist,
consumerProperties,
adminClientProperties,
Expand Down Expand Up @@ -85,14 +92,22 @@ object AppConfig {
}
}

final case class KafkaCluster(name: String, bootstrapBrokers: String, topicWhitelist: List[String] = List(".*"),
object KafkaCluster {
val GroupWhitelistDefault = List(".*")
val TopicWhitelistDefault = List(".*")
}

final case class KafkaCluster(name: String, bootstrapBrokers: String,
groupWhitelist: List[String] = KafkaCluster.GroupWhitelistDefault,
topicWhitelist: List[String] = KafkaCluster.TopicWhitelistDefault,
consumerProperties: Map[String, String] = Map.empty,
adminClientProperties: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty) {
override def toString(): String = {
s"""
| Cluster name: $name
| Cluster Kafka bootstrap brokers: $bootstrapBrokers
| Consumer group whitelist: [${groupWhitelist.mkString(", ")}]
| Topic whitelist: [${topicWhitelist.mkString(", ")}]
""".stripMargin
}
Expand Down
90 changes: 62 additions & 28 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit
import java.{lang, util}

import com.lightbend.kafkalagexporter.Domain.{GroupOffsets, PartitionOffsets}
import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract
import com.lightbend.kafkalagexporter.KafkaClient.{AdminKafkaClientContract, ConsumerKafkaClientContract, KafkaClientContract}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
Expand All @@ -28,8 +28,12 @@ object KafkaClient {
val CommonClientConfigRetryBackoffMs = 1000 // longer interval between retry attempts so we don't overload clusters (default = 100ms)
val ConsumerConfigAutoCommit = false

def apply(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract =
new KafkaClient(cluster, groupId, clientTimeout)(ec)
def apply(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract = {
val consumer = new ConsumerKafkaClient(createConsumerClient(cluster, groupId, clientTimeout), clientTimeout)
val adminKafkaClient = new AdminKafkaClient(createAdminClient(cluster, clientTimeout), clientTimeout)
new KafkaClient(cluster, consumer, adminKafkaClient)(ec)
}


trait KafkaClientContract {
def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])]
Expand Down Expand Up @@ -64,7 +68,7 @@ object KafkaClient {
private def createConsumerClient(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration): KafkaConsumer[Byte, Byte] = {
val props = new Properties()
val deserializer = (new ByteArrayDeserializer).getClass.getName
// https://kafka.apache.org/documentation/#consumerconfigs
// KafkaConsumer config: https://kafka.apache.org/documentation/#consumerconfigs
props.putAll(cluster.consumerProperties.asJava)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
Expand All @@ -85,37 +89,72 @@ object KafkaClient {
private[kafkalagexporter] implicit class KafkaTopicPartitionOps(tp: Domain.TopicPartition) {
def asKafka: KafkaTopicPartition = new KafkaTopicPartition(tp.topic, tp.partition)
}

/**
* AdminClient wrapper. Encapsulates calls to `AdminClient`. This abstraction exists so the `AdminClient` can be be
* mocked in tests because the various `*Result` types that are returned cannot be mocked.
*/
trait AdminKafkaClientContract {
def listConsumerGroups(): Future[util.Collection[ConsumerGroupListing]]
def describeConsumerGroups(groupIds: List[String]): Future[util.Map[String, ConsumerGroupDescription]]
def listConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]]
def close(): Unit
}

class AdminKafkaClient private[kafkalagexporter](client: AdminClient, clientTimeout: FiniteDuration)
(implicit ec: ExecutionContext) extends AdminKafkaClientContract {
private implicit val _clientTimeout: Duration = clientTimeout.toJava

private val listGroupOptions = new ListConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis.toInt)
private val describeGroupOptions = new DescribeConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis.toInt)
private val listConsumerGroupsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs(_clientTimeout.toMillis.toInt)

def listConsumerGroups(): Future[util.Collection[ConsumerGroupListing]] = kafkaFuture(client.listConsumerGroups(listGroupOptions).all())
def describeConsumerGroups(groupIds: List[String]): Future[util.Map[String, ConsumerGroupDescription]] =
kafkaFuture(client.describeConsumerGroups(groupIds.asJava, describeGroupOptions).all())
def listConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] =
kafkaFuture(client.listConsumerGroupOffsets(group, listConsumerGroupsOptions).partitionsToOffsetAndMetadata())
def close(): Unit = client.close(_clientTimeout)
}

trait ConsumerKafkaClientContract {
def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long]
def close(): Unit
}

class ConsumerKafkaClient private[kafkalagexporter](consumer: KafkaConsumer[Byte,Byte], clientTimeout: FiniteDuration) extends ConsumerKafkaClientContract {
private val _clientTimeout: Duration = clientTimeout.toJava

def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] =
consumer.endOffsets(partitions, _clientTimeout)
def close(): Unit = consumer.close(_clientTimeout)
}

}

class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
groupId: String,
clientTimeout: FiniteDuration)
consumer: ConsumerKafkaClientContract,
adminClient: AdminKafkaClientContract)
(implicit ec: ExecutionContext) extends KafkaClientContract {
import KafkaClient._

private implicit val _clientTimeout: Duration = clientTimeout.toJava

private lazy val adminClient: AdminClient = createAdminClient(cluster, clientTimeout)
private lazy val consumer: KafkaConsumer[Byte,Byte] = createConsumerClient(cluster, groupId, clientTimeout)

private lazy val listGroupOptions = new ListConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt)
private lazy val describeGroupOptions = new DescribeConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt)
private lazy val listConsumerGroupsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs(_clientTimeout.toMillis().toInt)

/**
* Get a list of consumer groups
*/
def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])] = {
for {
groups <- kafkaFuture(adminClient.listConsumerGroups(listGroupOptions).all())
groupIds = groups.asScala.map(_.groupId()).toList
groupDescriptions <- kafkaFuture(adminClient.describeConsumerGroups(groupIds.asJava, describeGroupOptions).all())
groups <- adminClient.listConsumerGroups()
groupIds = getGroupIds(groups)
groupDescriptions <- adminClient.describeConsumerGroups(groupIds)
} yield {
val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => groupTopicPartitions(id, desc) }.toList
(groupIds, gtps)
}
}

private[kafkalagexporter] def getGroupIds(groups: util.Collection[ConsumerGroupListing]): List[String] =
groups.asScala.map(_.groupId()).toList.filter(g => cluster.groupWhitelist.exists(r => g.matches(r)))

private[kafkalagexporter] def groupTopicPartitions(groupId: String, desc: ConsumerGroupDescription): List[Domain.GroupTopicPartition] = {
val groupTopicPartitions = for {
member <- desc.members().asScala
Expand All @@ -135,7 +174,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
* Get latest offsets for a set of topic partitions.
*/
def getLatestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets] = Try {
val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.endOffsets(topicPartitions.map(_.asKafka).asJava, _clientTimeout)
val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.endOffsets(topicPartitions.map(_.asKafka).asJava)
topicPartitions.map(tp => tp -> LookupTable.Point(offsets.get(tp.asKafka).toLong,now)).toMap
}

Expand All @@ -148,7 +187,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
val groupOffsetsF: Future[List[GroupOffsets]] = Future.sequence {
groups.map { group =>
val gtps = allGtps.filter(_.id == group)
getListConsumerGroupOffsets(group)
adminClient.listConsumerGroupOffsets(group)
.map(offsetMap => getGroupOffsets(now, gtps, offsetMap.asScala.toMap))
}
}
Expand All @@ -162,12 +201,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
* Call to `AdminClient` to get group offset info. This is only its own method so it can be mocked out in a test
* because it's not possible to instantiate or mock the `ListConsumerGroupOffsetsResult` type for some reason.
*/
private[kafkalagexporter] def getListConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] =
kafkaFuture {
adminClient
.listConsumerGroupOffsets(group, listConsumerGroupsOptions)
.partitionsToOffsetAndMetadata()
}


/**
* Backfill any group topic partitions with no offset as None
Expand All @@ -189,7 +223,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
} yield gtp -> Some(LookupTable.Point(offset, now))).toMap

def close(): Unit = {
adminClient.close(_clientTimeout)
consumer.close(_clientTimeout)
adminClient.close()
consumer.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class AppConfigSpec extends FreeSpec with Matchers {
| {
| name = "clusterA"
| bootstrap-brokers = "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
| group-whitelist = ["group-a", "group-b"]
| topic-whitelist = ["topic-a", "topic-b"]
| consumer-properties = {
| client.id = "consumer-client-id"
| }
Expand Down Expand Up @@ -46,6 +48,8 @@ class AppConfigSpec extends FreeSpec with Matchers {
appConfig.clusters.length shouldBe 3
appConfig.clusters(0).name shouldBe "clusterA"
appConfig.clusters(0).bootstrapBrokers shouldBe "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
appConfig.clusters(0).groupWhitelist shouldBe List("group-a", "group-b")
appConfig.clusters(0).topicWhitelist shouldBe List("topic-a", "topic-b")
appConfig.clusters(0).consumerProperties("client.id") shouldBe "consumer-client-id"
appConfig.clusters(0).adminClientProperties("client.id") shouldBe "admin-client-id"
appConfig.clusters(0).labels("environment") shouldBe "integration"
Expand Down
Loading

0 comments on commit ad9d39f

Please sign in to comment.