diff --git a/README.md b/README.md index 3bc45108..971bee08 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ import ConsumerMetricsOf.* val config: ConsumerConfig = ??? val registry: CollectorRegistry = ??? // Prometheus client val metrics0: ConsumerMetrics[IO] = ??? -val metrics1: ConsumerMetrics[IO] = metrics0.exposeJavaClientMetrics("my-app", registry) +val metrics1: ConsumerMetrics[IO] = metrics0.exposeJavaClientMetrics(identity, registry) ConsumerOf .apply1(metrics1.some) .apply(config) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 88a68199..859b4481 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -1,9 +1,8 @@ package com.evolutiongaming.skafka.consumer import cats.effect.{Resource, Sync} -import cats.syntax.all.* import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.{Topic, TopicPartition} +import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition} import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector import io.prometheus.client.CollectorRegistry @@ -13,7 +12,7 @@ object ConsumerMetricsOf { def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ConsumerMetrics[F], - prefix: String, + prefix: ClientId => String, prometheus: CollectorRegistry ): ConsumerMetrics[F] = new ConsumerMetrics[F] { @@ -30,8 +29,11 @@ object ConsumerMetricsOf { override def topics(latency: FiniteDuration): F[Unit] = source.topics(latency) - override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = { - val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, prefix.some) + override def exposeJavaMetrics[K, V]( + consumer: Consumer[F, K, V], + clientId: Option[ClientId] + ): Resource[F, Unit] = { + val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, clientId.map(prefix)) Resource.make { Sync[F].delay { prometheus.register(collector) } } { _ => @@ -44,7 +46,7 @@ object ConsumerMetricsOf { implicit final class Syntax[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { def exposeJavaClientMetrics( - prefix: String, + prefix: ClientId => String, prometheus: CollectorRegistry )(implicit F: Sync[F], toTry: ToTry[F]): ConsumerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index d71dd852..9553186a 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -1,9 +1,8 @@ package com.evolutiongaming.skafka.producer import cats.effect.{Resource, Sync} -import cats.syntax.all.* import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.Topic +import com.evolutiongaming.skafka.{ClientId, Topic} import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector import io.prometheus.client.CollectorRegistry @@ -13,7 +12,7 @@ object ProducerMetricsOf { def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ProducerMetrics[F], - prefix: String, + prefix: ClientId => String, prometheus: CollectorRegistry ): ProducerMetrics[F] = new ProducerMetrics[F] { @@ -37,8 +36,8 @@ object ProducerMetricsOf { override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) - override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = { - val collector = new KafkaMetricsCollector[F](producer.clientMetrics, prefix.some) + override def exposeJavaMetrics(producer: Producer[F], clientId: Option[ClientId]): Resource[F, Unit] = { + val collector = new KafkaMetricsCollector[F](producer.clientMetrics, clientId.map(prefix)) Resource.make { Sync[F].delay { prometheus.register(collector) } } { _ => @@ -51,7 +50,7 @@ object ProducerMetricsOf { implicit final class Syntax[F[_]](val source: ProducerMetrics[F]) extends AnyVal { def exposeJavaClientMetrics( - prefix: String, + prefix: ClientId => String, prometheus: CollectorRegistry )(implicit F: Sync[F], toTry: ToTry[F]): ProducerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala index a2d38504..3c30488f 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala @@ -22,8 +22,10 @@ trait ConsumerMetrics[F[_]] { def topics(latency: FiniteDuration): F[Unit] - private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] = - Resource.unit[F] + private[consumer] def exposeJavaMetrics[K, V]( + @nowarn consumer: Consumer[F, K, V], + @nowarn clientId: Option[ClientId], + ): Resource[F, Unit] = Resource.unit[F] } object ConsumerMetrics { @@ -218,8 +220,8 @@ object ConsumerMetrics { fg(self.topics(latency)) } - override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) = - self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg) + override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V], clientId: Option[ClientId]) = + self.exposeJavaMetrics(consumer.mapK(gf, fg), clientId).mapK(fg) } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala index c9a97662..1a6911a4 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala @@ -39,7 +39,7 @@ object ConsumerOf { case Some(metrics) => for { - _ <- metrics.exposeJavaMetrics[K, V](consumer) + _ <- metrics.exposeJavaMetrics[K, V](consumer, config.common.clientId) } yield { consumer.withMetrics1[Throwable](metrics) } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala index 8a1610e0..6911337a 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala @@ -32,7 +32,10 @@ trait ProducerMetrics[F[_]] { def flush(latency: FiniteDuration): F[Unit] - private[producer] def exposeJavaMetrics(@nowarn producer: Producer[F]): Resource[F, Unit] = Resource.unit[F] + private[producer] def exposeJavaMetrics( + @nowarn producer: Producer[F], + @nowarn clientId: Option[ClientId], + ): Resource[F, Unit] = Resource.unit[F] } object ProducerMetrics { @@ -230,8 +233,8 @@ object ProducerMetrics { def flush(latency: FiniteDuration) = fg(self.flush(latency)) - override def exposeJavaMetrics(producer: Producer[G]) = - self.exposeJavaMetrics(producer.mapK[F](gf, fg)).mapK(fg) + override def exposeJavaMetrics(producer: Producer[G], clientId: Option[ClientId]) = + self.exposeJavaMetrics(producer.mapK[F](gf, fg), clientId).mapK(fg) } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala index e431616c..de5b4547 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala @@ -33,7 +33,7 @@ object ProducerOf { case Some(metrics) => for { - _ <- metrics.exposeJavaMetrics(producer) + _ <- metrics.exposeJavaMetrics(producer, config.common.clientId) } yield { producer.withMetrics[Throwable](metrics) }