Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to provide custom labels in metrics #453

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,32 @@ import java.util
import scala.jdk.CollectionConverters._
import scala.util.matching.Regex

/**
* Prometheus collector for Kafka client metrics.
*
* Metrics from kafka-clients are pull-based (meaning we have to manually call methods that return metrics), this
* doesn't allow us to use the default push-based approach of Prometheus with passing an instance of `CollectorRegistry`
* to the code being observed.
*
* This class is a workaround for this problem. It's a Prometheus collector that evaluates a given
* `F[Seq[ClientMetric[F]]]` on each `collect()` call and returns the result as a list of `MetricFamilySamples`.
*
* Please note it ignores metrics and labels containing some symbols (like '-' and '.'). This is because Prometheus
* doesn't allow such symbols in metric names and label names. Also, it ignores metrics without a name or a description.
* See [[https://prometheus.io/docs/practices/naming/]] for more details.
*
* Example:
* {{{
* val collectorRegistry: CollectorRegistry = ??? // your Prometheus Java collector registry
* val consumer: Consumer[F, K, V] = ??? // your consumer
* val collector: KafkaMetricsCollector[F] = new KafkaMetricsCollector[F](consumer.clientMetrics)
* collectorRegistry.register(collector)
* }}}
**/
class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[ClientMetric[F]]], prefix: Option[String] = None) extends Collector {
/** Prometheus collector for Kafka client metrics.
*
* Metrics from kafka-clients are pull-based (meaning we have to manually call methods that return metrics), this
* doesn't allow us to use the default push-based approach of Prometheus with passing an instance of
* `CollectorRegistry` to the code being observed.
*
* This class is a workaround for this problem. It's a Prometheus collector that evaluates a given
* `F[Seq[ClientMetric[F]]]` on each `collect()` call and returns the result as a list of `MetricFamilySamples`.
*
* Please note it ignores metrics and labels containing some symbols (like '-' and '.'). This is because Prometheus
* doesn't allow such symbols in metric names and label names. Also, it ignores metrics without a name or a
* description. See [[https://prometheus.io/docs/practices/naming/]] for more details.
*
* Example:
* {{{
* val collectorRegistry: CollectorRegistry = ??? // your Prometheus Java collector registry
* val consumer: Consumer[F, K, V] = ??? // your consumer
* val collector: KafkaMetricsCollector[F] = new KafkaMetricsCollector[F](consumer.clientMetrics)
* collectorRegistry.register(collector)
* }}}
*/
class KafkaMetricsCollector[F[_]: Monad: ToTry](
kafkaClientMetrics: F[Seq[ClientMetric[F]]],
prefix: Option[String] = None,
customLabels: List[(String, String)] = List.empty,
) extends Collector {

protected def getCollectorType(metric: ClientMetric[F]): Collector.Type = {
// https://prometheus.io/docs/practices/naming/#metric-names
Expand All @@ -45,7 +48,7 @@ class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[Client
}

private val MetricNameRegex: Regex = "[a-zA-Z_:][a-zA-Z0-9_:]*".r
private val LabelNameRegex: Regex = "[a-zA-Z_][a-zA-Z0-9_]*".r
private val LabelNameRegex: Regex = "[a-zA-Z_][a-zA-Z0-9_]*".r

protected def getPrometheusName(metric: ClientMetric[F]): Option[String] = {
(metric.name, metric.description) match {
Expand All @@ -61,39 +64,48 @@ class KafkaMetricsCollector[F[_]: Monad: ToTry](kafkaClientMetrics: F[Seq[Client

override def collect(): util.List[MetricFamilySamples] = {
for {
metrics <- kafkaClientMetrics
metrics <- kafkaClientMetrics
metricsGroups = metrics.groupBy(m => (m.name, m.group)).values.toList
result <- metricsGroups.traverse { metricsGroup =>
val metric = metricsGroup.head
val prometheusName = getPrometheusName(metric)
val collectorType = getCollectorType(metric)
result <- metricsGroups
.traverse { metricsGroup =>
val metric = metricsGroup.head
val prometheusName = getPrometheusName(metric)
val collectorType = getCollectorType(metric)

prometheusName match {
case Some(name) =>
metricsGroup.toVector.traverse { metric =>
val tags = metric.tags.flatMap {
case (key, value) =>
val prometheusKey = key.replaceAll("-", "_")
if (LabelNameRegex.findFirstIn(prometheusKey).contains(prometheusKey)) (prometheusKey -> value).some else None
}
val tagsKeys = tags.keys.toList.asJava
val tagsValues = tags.values.toList.asJava
metric.value.map {
case v: Number =>
new Sample(name, tagsKeys, tagsValues, v.doubleValue()).some
prometheusName match {
case Some(name) =>
metricsGroup
.toVector
.traverse { metric =>
val tags = metric.tags.flatMap {
case (key, value) =>
val prometheusKey = key.replaceAll("-", "_")
if (LabelNameRegex.findFirstIn(prometheusKey).contains(prometheusKey))
(prometheusKey -> value).some
else None
}
val (customLabelsKeys, customLabelsValues) = customLabels.separate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like it doesn't have to be called on each collect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, changed

val tagsKeys = (tags.keys.toList ++ customLabelsKeys).asJava
val tagsValues = (tags.values.toList ++ customLabelsValues).asJava
metric.value.map {
case v: Number =>
new Sample(name, tagsKeys, tagsValues, v.doubleValue()).some

case _ => Option.empty[Sample]
}
}.map(_.flatten).map {
case samples if samples.nonEmpty =>
new MetricFamilySamples(name, collectorType, metric.description, samples.asJava).some
case _ => Option.empty[Sample]
}
}
.map(_.flatten)
.map {
case samples if samples.nonEmpty =>
new MetricFamilySamples(name, collectorType, metric.description, samples.asJava).some

case _ => Option.empty[MetricFamilySamples]
}
case _ => Option.empty[MetricFamilySamples]
}

case None => Option.empty[MetricFamilySamples].pure
case None => Option.empty[MetricFamilySamples].pure
}
}
}.map(_.flatten.asJava)
.map(_.flatten.asJava)
} yield result
}.toTry.get
}
Loading