Skip to content

Commit

Permalink
Allow to provide custom labels in metrics (#453)
Browse files Browse the repository at this point in the history
* allow to provide custom labels

* code review suggestion
  • Loading branch information
BrainHorse authored Nov 22, 2024
1 parent fadc2cc commit 07d7860
Showing 1 changed file with 64 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,34 @@ import java.util
import scala.jdk.CollectionConverters._
import scala.util.matching.Regex

/**
* Prometheus collector for Kafka client metrics.
*
* Metrics from kafka-clients are pull-based (meaning we have to manually call methods that return metrics), this
* doesn't allow us to use the default push-based approach of Prometheus with passing an instance of `CollectorRegistry`
* to the code being observed.
*
* This class is a workaround for this problem. It's a Prometheus collector that evaluates a given
* `F[Seq[ClientMetric[F]]]` on each `collect()` call and returns the result as a list of `MetricFamilySamples`.
*
* Please note it ignores metrics and labels containing some symbols (like '-' and '.'). This is because Prometheus
* doesn't allow such symbols in metric names and label names. Also, it ignores metrics without a name or a description.
* See [[https://prometheus.io/docs/practices/naming/]] for more details.
*
* Example:
* {{{
* val collectorRegistry: CollectorRegistry = ??? // your Prometheus Java collector registry
* val consumer: Consumer[F, K, V] = ??? // your consumer
* val collector: KafkaMetricsCollector[F] = new KafkaMetricsCollector[F](consumer.clientMetrics)
* collectorRegistry.register(collector)
* }}}
**/
class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[ClientMetric[F]]], prefix: Option[String] = None) extends Collector {
/** Prometheus collector for Kafka client metrics.
*
* Metrics from kafka-clients are pull-based (meaning we have to manually call methods that return metrics), this
* doesn't allow us to use the default push-based approach of Prometheus with passing an instance of
* `CollectorRegistry` to the code being observed.
*
* This class is a workaround for this problem. It's a Prometheus collector that evaluates a given
* `F[Seq[ClientMetric[F]]]` on each `collect()` call and returns the result as a list of `MetricFamilySamples`.
*
* Please note it ignores metrics and labels containing some symbols (like '-' and '.'). This is because Prometheus
* doesn't allow such symbols in metric names and label names. Also, it ignores metrics without a name or a
* description. See [[https://prometheus.io/docs/practices/naming/]] for more details.
*
* Example:
* {{{
* val collectorRegistry: CollectorRegistry = ??? // your Prometheus Java collector registry
* val consumer: Consumer[F, K, V] = ??? // your consumer
* val collector: KafkaMetricsCollector[F] = new KafkaMetricsCollector[F](consumer.clientMetrics)
* collectorRegistry.register(collector)
* }}}
*/
class KafkaMetricsCollector[F[_]: Monad: ToTry](
kafkaClientMetrics: F[Seq[ClientMetric[F]]],
prefix: Option[String] = None,
customLabels: List[(String, String)] = List.empty,
) extends Collector {

private val (customLabelsKeys, customLabelsValues) = customLabels.separate

protected def getCollectorType(metric: ClientMetric[F]): Collector.Type = {
// https://prometheus.io/docs/practices/naming/#metric-names
Expand All @@ -45,7 +50,7 @@ class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[Client
}

private val MetricNameRegex: Regex = "[a-zA-Z_:][a-zA-Z0-9_:]*".r
private val LabelNameRegex: Regex = "[a-zA-Z_][a-zA-Z0-9_]*".r
private val LabelNameRegex: Regex = "[a-zA-Z_][a-zA-Z0-9_]*".r

protected def getPrometheusName(metric: ClientMetric[F]): Option[String] = {
(metric.name, metric.description) match {
Expand All @@ -61,39 +66,47 @@ class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[Client

override def collect(): util.List[MetricFamilySamples] = {
for {
metrics <- kafkaClientMetrics
metrics <- kafkaClientMetrics
metricsGroups = metrics.groupBy(m => (m.name, m.group)).values.toList
result <- metricsGroups.traverse { metricsGroup =>
val metric = metricsGroup.head
val prometheusName = getPrometheusName(metric)
val collectorType = getCollectorType(metric)
result <- metricsGroups
.traverse { metricsGroup =>
val metric = metricsGroup.head
val prometheusName = getPrometheusName(metric)
val collectorType = getCollectorType(metric)

prometheusName match {
case Some(name) =>
metricsGroup.toVector.traverse { metric =>
val tags = metric.tags.flatMap {
case (key, value) =>
val prometheusKey = key.replaceAll("-", "_")
if (LabelNameRegex.findFirstIn(prometheusKey).contains(prometheusKey)) (prometheusKey -> value).some else None
}
val tagsKeys = tags.keys.toList.asJava
val tagsValues = tags.values.toList.asJava
metric.value.map {
case v: Number =>
new Sample(name, tagsKeys, tagsValues, v.doubleValue()).some
prometheusName match {
case Some(name) =>
metricsGroup
.toVector
.traverse { metric =>
val tags = metric.tags.flatMap {
case (key, value) =>
val prometheusKey = key.replaceAll("-", "_")
if (LabelNameRegex.findFirstIn(prometheusKey).contains(prometheusKey))
(prometheusKey -> value).some
else None
}
val tagsKeys = (tags.keys.toList ++ customLabelsKeys).asJava
val tagsValues = (tags.values.toList ++ customLabelsValues).asJava
metric.value.map {
case v: Number =>
new Sample(name, tagsKeys, tagsValues, v.doubleValue()).some

case _ => Option.empty[Sample]
}
}.map(_.flatten).map {
case samples if samples.nonEmpty =>
new MetricFamilySamples(name, collectorType, metric.description, samples.asJava).some
case _ => Option.empty[Sample]
}
}
.map(_.flatten)
.map {
case samples if samples.nonEmpty =>
new MetricFamilySamples(name, collectorType, metric.description, samples.asJava).some

case _ => Option.empty[MetricFamilySamples]
}
case _ => Option.empty[MetricFamilySamples]
}

case None => Option.empty[MetricFamilySamples].pure
case None => Option.empty[MetricFamilySamples].pure
}
}
}.map(_.flatten.asJava)
.map(_.flatten.asJava)
} yield result
}.toTry.get
}

0 comments on commit 07d7860

Please sign in to comment.