Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Implement consumer group whitelist & misc. refactorings #75

Merged
merged 1 commit into from
Sep 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,18 @@ General Configuration (`kafka-lag-exporter{}`)
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |
| `topic-whitelist` | `[".*"]` | Regex of topics monitored, e.g. `["input-.+", "output-.+"]` |

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

| Key | Default | Required | Description |
|---------------------------|-------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `name` | `""` | Yes | A unique cluster name to for this Kafka connection detail object |
| `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames |
| `group-whitelist` | `[".*"]` | No | A list of Regex of consumer groups monitored. For example, if you only wish to expose only certain groups with `input` and `output` prefixes, use `["^input-.+", "^output-.+"]`. |
| `topic-whitelist` | `[".*"]` No | A list of Regex of topics monitored. For example, if you only wish to expose only certain topics, use either `["^topic.+"]` or `["topic1", "topic2"]`. |
| `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. |
| `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. |
| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. |
| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. |

Watchers (`kafka-lag-exporters.watchers{}`)

Expand All @@ -242,6 +243,9 @@ kafka-lag-exporter {
{
name = "a-cluster"
bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092"
topic-whitelist = [
"widgets-.+"
]
consumer-properties = {
client.id = "consumer-client-id"
}
Expand Down
22 changes: 16 additions & 6 deletions charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ data:
{
name = "{{ $cluster.name }}"
bootstrap-brokers = "{{ $cluster.bootstrapBrokers }}"
{{- if $cluster.groupWhitelist }}
group-whitelist = [
{{- $lastIndex := sub (len $cluster.groupWhitelist) 1}}
{{- range $i, $whitelist := $cluster.groupWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
{{- end }}
{{- if $cluster.topicWhitelist }}
topic-whitelist = [
{{- $lastIndex := sub (len $cluster.groupWhitelist) 1}}
{{- range $i, $whitelist := $cluster.groupWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
{{- end }}
consumer-properties = {
{{- range $key, $val := $cluster.consumerProperties }}
{{ $key }} = {{ quote $val }}
Expand Down Expand Up @@ -47,12 +63,6 @@ data:
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
topic-whitelist = [
{{- $lastIndex := sub (len .Values.topicWhitelist) 1}}
{{- range $i, $whitelist := .Values.topicWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
}

akka {
Expand Down
17 changes: 4 additions & 13 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ clusters: {}
#clusters:
# - name: "default"
# bootstrapBrokers: "simple-strimzi-kafka-bootstrap.strimzi.svc.cluster.local:9092"
# topicWhitelist:
# - "^xyz-corp-topics\..+"
# groupWhitelist:
# - "^analytics-app-.+"
# # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# # can be defined in this configuration section.
# # https://kafka.apache.org/documentation/#consumerconfigs
Expand Down Expand Up @@ -52,19 +56,6 @@ serviceAccount:
metricWhitelist:
- .*

## You can use regex to control the set of topics monitored.
## For example, if you only wish to expose only certain topics, use either:
## topicWhitelist:
## - ^topic.+
##
## Or
##
## topicWhitelist:
## - topic1
## - topic2
topicWhitelist:
- .*

## The log level of the ROOT logger
rootLogLevel: INFO
## The log level of Kafka Lag Exporter
Expand Down
1 change: 0 additions & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ kafka-lag-exporter {
strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI}
}
metric-whitelist = [".*"]
topic-whitelist = [".*"]
}

akka {
Expand Down
19 changes: 17 additions & 2 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,18 @@ object AppConfig {
).toMap
}.getOrElse(Map.empty[String, String])

val topicWhitelist = c.getStringList("topic-whitelist").asScala.toList
val groupWhitelist = if (clusterConfig.hasPath("group-whitelist"))
clusterConfig.getStringList("group-whitelist").asScala.toList
else KafkaCluster.GroupWhitelistDefault

val topicWhitelist = if (clusterConfig.hasPath("topic-whitelist"))
clusterConfig.getStringList("topic-whitelist").asScala.toList
else KafkaCluster.TopicWhitelistDefault

KafkaCluster(
clusterConfig.getString("name"),
clusterConfig.getString("bootstrap-brokers"),
groupWhitelist,
topicWhitelist,
consumerProperties,
adminClientProperties,
Expand Down Expand Up @@ -85,14 +92,22 @@ object AppConfig {
}
}

final case class KafkaCluster(name: String, bootstrapBrokers: String, topicWhitelist: List[String] = List(".*"),
object KafkaCluster {
val GroupWhitelistDefault = List(".*")
val TopicWhitelistDefault = List(".*")
}

final case class KafkaCluster(name: String, bootstrapBrokers: String,
groupWhitelist: List[String] = KafkaCluster.GroupWhitelistDefault,
topicWhitelist: List[String] = KafkaCluster.TopicWhitelistDefault,
consumerProperties: Map[String, String] = Map.empty,
adminClientProperties: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty) {
override def toString(): String = {
s"""
| Cluster name: $name
| Cluster Kafka bootstrap brokers: $bootstrapBrokers
| Consumer group whitelist: [${groupWhitelist.mkString(", ")}]
| Topic whitelist: [${topicWhitelist.mkString(", ")}]
""".stripMargin
}
Expand Down
90 changes: 62 additions & 28 deletions src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit
import java.{lang, util}

import com.lightbend.kafkalagexporter.Domain.{GroupOffsets, PartitionOffsets}
import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract
import com.lightbend.kafkalagexporter.KafkaClient.{AdminKafkaClientContract, ConsumerKafkaClientContract, KafkaClientContract}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
Expand All @@ -28,8 +28,12 @@ object KafkaClient {
val CommonClientConfigRetryBackoffMs = 1000 // longer interval between retry attempts so we don't overload clusters (default = 100ms)
val ConsumerConfigAutoCommit = false

def apply(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract =
new KafkaClient(cluster, groupId, clientTimeout)(ec)
def apply(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract = {
val consumer = new ConsumerKafkaClient(createConsumerClient(cluster, groupId, clientTimeout), clientTimeout)
val adminKafkaClient = new AdminKafkaClient(createAdminClient(cluster, clientTimeout), clientTimeout)
new KafkaClient(cluster, consumer, adminKafkaClient)(ec)
}


trait KafkaClientContract {
def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])]
Expand Down Expand Up @@ -64,7 +68,7 @@ object KafkaClient {
private def createConsumerClient(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration): KafkaConsumer[Byte, Byte] = {
val props = new Properties()
val deserializer = (new ByteArrayDeserializer).getClass.getName
// https://kafka.apache.org/documentation/#consumerconfigs
// KafkaConsumer config: https://kafka.apache.org/documentation/#consumerconfigs
props.putAll(cluster.consumerProperties.asJava)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
Expand All @@ -85,37 +89,72 @@ object KafkaClient {
private[kafkalagexporter] implicit class KafkaTopicPartitionOps(tp: Domain.TopicPartition) {
def asKafka: KafkaTopicPartition = new KafkaTopicPartition(tp.topic, tp.partition)
}

/**
* AdminClient wrapper. Encapsulates calls to `AdminClient`. This abstraction exists so the `AdminClient` can be be
* mocked in tests because the various `*Result` types that are returned cannot be mocked.
*/
trait AdminKafkaClientContract {
def listConsumerGroups(): Future[util.Collection[ConsumerGroupListing]]
def describeConsumerGroups(groupIds: List[String]): Future[util.Map[String, ConsumerGroupDescription]]
def listConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]]
def close(): Unit
}

class AdminKafkaClient private[kafkalagexporter](client: AdminClient, clientTimeout: FiniteDuration)
(implicit ec: ExecutionContext) extends AdminKafkaClientContract {
private implicit val _clientTimeout: Duration = clientTimeout.toJava

private val listGroupOptions = new ListConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis.toInt)
private val describeGroupOptions = new DescribeConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis.toInt)
private val listConsumerGroupsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs(_clientTimeout.toMillis.toInt)

def listConsumerGroups(): Future[util.Collection[ConsumerGroupListing]] = kafkaFuture(client.listConsumerGroups(listGroupOptions).all())
def describeConsumerGroups(groupIds: List[String]): Future[util.Map[String, ConsumerGroupDescription]] =
kafkaFuture(client.describeConsumerGroups(groupIds.asJava, describeGroupOptions).all())
def listConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] =
kafkaFuture(client.listConsumerGroupOffsets(group, listConsumerGroupsOptions).partitionsToOffsetAndMetadata())
def close(): Unit = client.close(_clientTimeout)
}

trait ConsumerKafkaClientContract {
def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long]
def close(): Unit
}

class ConsumerKafkaClient private[kafkalagexporter](consumer: KafkaConsumer[Byte,Byte], clientTimeout: FiniteDuration) extends ConsumerKafkaClientContract {
private val _clientTimeout: Duration = clientTimeout.toJava

def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] =
consumer.endOffsets(partitions, _clientTimeout)
def close(): Unit = consumer.close(_clientTimeout)
}

}

class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
groupId: String,
clientTimeout: FiniteDuration)
consumer: ConsumerKafkaClientContract,
adminClient: AdminKafkaClientContract)
(implicit ec: ExecutionContext) extends KafkaClientContract {
import KafkaClient._

private implicit val _clientTimeout: Duration = clientTimeout.toJava

private lazy val adminClient: AdminClient = createAdminClient(cluster, clientTimeout)
private lazy val consumer: KafkaConsumer[Byte,Byte] = createConsumerClient(cluster, groupId, clientTimeout)

private lazy val listGroupOptions = new ListConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt)
private lazy val describeGroupOptions = new DescribeConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt)
private lazy val listConsumerGroupsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs(_clientTimeout.toMillis().toInt)

/**
* Get a list of consumer groups
*/
def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])] = {
for {
groups <- kafkaFuture(adminClient.listConsumerGroups(listGroupOptions).all())
groupIds = groups.asScala.map(_.groupId()).toList
groupDescriptions <- kafkaFuture(adminClient.describeConsumerGroups(groupIds.asJava, describeGroupOptions).all())
groups <- adminClient.listConsumerGroups()
groupIds = getGroupIds(groups)
groupDescriptions <- adminClient.describeConsumerGroups(groupIds)
} yield {
val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => groupTopicPartitions(id, desc) }.toList
(groupIds, gtps)
}
}

private[kafkalagexporter] def getGroupIds(groups: util.Collection[ConsumerGroupListing]): List[String] =
groups.asScala.map(_.groupId()).toList.filter(g => cluster.groupWhitelist.exists(r => g.matches(r)))

private[kafkalagexporter] def groupTopicPartitions(groupId: String, desc: ConsumerGroupDescription): List[Domain.GroupTopicPartition] = {
val groupTopicPartitions = for {
member <- desc.members().asScala
Expand All @@ -135,7 +174,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
* Get latest offsets for a set of topic partitions.
*/
def getLatestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets] = Try {
val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.endOffsets(topicPartitions.map(_.asKafka).asJava, _clientTimeout)
val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.endOffsets(topicPartitions.map(_.asKafka).asJava)
topicPartitions.map(tp => tp -> LookupTable.Point(offsets.get(tp.asKafka).toLong,now)).toMap
}

Expand All @@ -148,7 +187,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
val groupOffsetsF: Future[List[GroupOffsets]] = Future.sequence {
groups.map { group =>
val gtps = allGtps.filter(_.id == group)
getListConsumerGroupOffsets(group)
adminClient.listConsumerGroupOffsets(group)
.map(offsetMap => getGroupOffsets(now, gtps, offsetMap.asScala.toMap))
}
}
Expand All @@ -162,12 +201,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
* Call to `AdminClient` to get group offset info. This is only its own method so it can be mocked out in a test
* because it's not possible to instantiate or mock the `ListConsumerGroupOffsetsResult` type for some reason.
*/
private[kafkalagexporter] def getListConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] =
kafkaFuture {
adminClient
.listConsumerGroupOffsets(group, listConsumerGroupsOptions)
.partitionsToOffsetAndMetadata()
}


/**
* Backfill any group topic partitions with no offset as None
Expand All @@ -189,7 +223,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
} yield gtp -> Some(LookupTable.Point(offset, now))).toMap

def close(): Unit = {
adminClient.close(_clientTimeout)
consumer.close(_clientTimeout)
adminClient.close()
consumer.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class AppConfigSpec extends FreeSpec with Matchers {
| {
| name = "clusterA"
| bootstrap-brokers = "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
| group-whitelist = ["group-a", "group-b"]
| topic-whitelist = ["topic-a", "topic-b"]
| consumer-properties = {
| client.id = "consumer-client-id"
| }
Expand Down Expand Up @@ -46,6 +48,8 @@ class AppConfigSpec extends FreeSpec with Matchers {
appConfig.clusters.length shouldBe 3
appConfig.clusters(0).name shouldBe "clusterA"
appConfig.clusters(0).bootstrapBrokers shouldBe "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
appConfig.clusters(0).groupWhitelist shouldBe List("group-a", "group-b")
appConfig.clusters(0).topicWhitelist shouldBe List("topic-a", "topic-b")
appConfig.clusters(0).consumerProperties("client.id") shouldBe "consumer-client-id"
appConfig.clusters(0).adminClientProperties("client.id") shouldBe "admin-client-id"
appConfig.clusters(0).labels("environment") shouldBe "integration"
Expand Down
Loading