From 07d7860899264b9e609c71c9f81c0197526bf98a Mon Sep 17 00:00:00 2001 From: Andrei Mikhailov Date: Fri, 22 Nov 2024 15:11:17 +0100 Subject: [PATCH] Allow to provide custom labels in metrics (#453) * allow to provide custom labels * code review suggestion --- .../metrics/KafkaMetricsCollector.scala | 115 ++++++++++-------- 1 file changed, 64 insertions(+), 51 deletions(-) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsCollector.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsCollector.scala index 62051a78..57c0f8a1 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsCollector.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsCollector.scala @@ -13,29 +13,34 @@ import java.util import scala.jdk.CollectionConverters._ import scala.util.matching.Regex -/** - * Prometheus collector for Kafka client metrics. - * - * Metrics from kafka-clients are pull-based (meaning we have to manually call methods that return metrics), this - * doesn't allow us to use the default push-based approach of Prometheus with passing an instance of `CollectorRegistry` - * to the code being observed. - * - * This class is a workaround for this problem. It's a Prometheus collector that evaluates a given - * `F[Seq[ClientMetric[F]]]` on each `collect()` call and returns the result as a list of `MetricFamilySamples`. - * - * Please note it ignores metrics and labels containing some symbols (like '-' and '.'). This is because Prometheus - * doesn't allow such symbols in metric names and label names. Also, it ignores metrics without a name or a description. - * See [[https://prometheus.io/docs/practices/naming/]] for more details. - * - * Example: - * {{{ - * val collectorRegistry: CollectorRegistry = ??? // your Prometheus Java collector registry - * val consumer: Consumer[F, K, V] = ??? // your consumer - * val collector: KafkaMetricsCollector[F] = new KafkaMetricsCollector[F](consumer.clientMetrics) - * collectorRegistry.register(collector) - * }}} - **/ -class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[ClientMetric[F]]], prefix: Option[String] = None) extends Collector { +/** Prometheus collector for Kafka client metrics. + * + * Metrics from kafka-clients are pull-based (meaning we have to manually call methods that return metrics), this + * doesn't allow us to use the default push-based approach of Prometheus with passing an instance of + * `CollectorRegistry` to the code being observed. + * + * This class is a workaround for this problem. It's a Prometheus collector that evaluates a given + * `F[Seq[ClientMetric[F]]]` on each `collect()` call and returns the result as a list of `MetricFamilySamples`. + * + * Please note it ignores metrics and labels containing some symbols (like '-' and '.'). This is because Prometheus + * doesn't allow such symbols in metric names and label names. Also, it ignores metrics without a name or a + * description. See [[https://prometheus.io/docs/practices/naming/]] for more details. + * + * Example: + * {{{ + * val collectorRegistry: CollectorRegistry = ??? // your Prometheus Java collector registry + * val consumer: Consumer[F, K, V] = ??? // your consumer + * val collector: KafkaMetricsCollector[F] = new KafkaMetricsCollector[F](consumer.clientMetrics) + * collectorRegistry.register(collector) + * }}} + */ +class KafkaMetricsCollector[F[_]: Monad: ToTry]( + kafkaClientMetrics: F[Seq[ClientMetric[F]]], + prefix: Option[String] = None, + customLabels: List[(String, String)] = List.empty, +) extends Collector { + + private val (customLabelsKeys, customLabelsValues) = customLabels.separate protected def getCollectorType(metric: ClientMetric[F]): Collector.Type = { // https://prometheus.io/docs/practices/naming/#metric-names @@ -45,7 +50,7 @@ class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[Client } private val MetricNameRegex: Regex = "[a-zA-Z_:][a-zA-Z0-9_:]*".r - private val LabelNameRegex: Regex = "[a-zA-Z_][a-zA-Z0-9_]*".r + private val LabelNameRegex: Regex = "[a-zA-Z_][a-zA-Z0-9_]*".r protected def getPrometheusName(metric: ClientMetric[F]): Option[String] = { (metric.name, metric.description) match { @@ -61,39 +66,47 @@ class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[Client override def collect(): util.List[MetricFamilySamples] = { for { - metrics <- kafkaClientMetrics + metrics <- kafkaClientMetrics metricsGroups = metrics.groupBy(m => (m.name, m.group)).values.toList - result <- metricsGroups.traverse { metricsGroup => - val metric = metricsGroup.head - val prometheusName = getPrometheusName(metric) - val collectorType = getCollectorType(metric) + result <- metricsGroups + .traverse { metricsGroup => + val metric = metricsGroup.head + val prometheusName = getPrometheusName(metric) + val collectorType = getCollectorType(metric) - prometheusName match { - case Some(name) => - metricsGroup.toVector.traverse { metric => - val tags = metric.tags.flatMap { - case (key, value) => - val prometheusKey = key.replaceAll("-", "_") - if (LabelNameRegex.findFirstIn(prometheusKey).contains(prometheusKey)) (prometheusKey -> value).some else None - } - val tagsKeys = tags.keys.toList.asJava - val tagsValues = tags.values.toList.asJava - metric.value.map { - case v: Number => - new Sample(name, tagsKeys, tagsValues, v.doubleValue()).some + prometheusName match { + case Some(name) => + metricsGroup + .toVector + .traverse { metric => + val tags = metric.tags.flatMap { + case (key, value) => + val prometheusKey = key.replaceAll("-", "_") + if (LabelNameRegex.findFirstIn(prometheusKey).contains(prometheusKey)) + (prometheusKey -> value).some + else None + } + val tagsKeys = (tags.keys.toList ++ customLabelsKeys).asJava + val tagsValues = (tags.values.toList ++ customLabelsValues).asJava + metric.value.map { + case v: Number => + new Sample(name, tagsKeys, tagsValues, v.doubleValue()).some - case _ => Option.empty[Sample] - } - }.map(_.flatten).map { - case samples if samples.nonEmpty => - new MetricFamilySamples(name, collectorType, metric.description, samples.asJava).some + case _ => Option.empty[Sample] + } + } + .map(_.flatten) + .map { + case samples if samples.nonEmpty => + new MetricFamilySamples(name, collectorType, metric.description, samples.asJava).some - case _ => Option.empty[MetricFamilySamples] - } + case _ => Option.empty[MetricFamilySamples] + } - case None => Option.empty[MetricFamilySamples].pure + case None => Option.empty[MetricFamilySamples].pure + } } - }.map(_.flatten.asJava) + .map(_.flatten.asJava) } yield result }.toTry.get }