Skip to content

Commit

Permalink
Calculate metrics prefix based on client ID
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Jul 11, 2024
1 parent 4f447e9 commit d8fe291
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 22 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ import ConsumerMetricsOf.*
val config: ConsumerConfig = ???
val registry: CollectorRegistry = ??? // Prometheus client
val metrics0: ConsumerMetrics[IO] = ???
val metrics1: ConsumerMetrics[IO] = metrics0.exposeJavaClientMetrics("my-app", registry)
val metrics1: ConsumerMetrics[IO] = metrics0.exposeJavaClientMetrics(identity, registry)
ConsumerOf
.apply1(metrics1.some)
.apply(config)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.evolutiongaming.skafka.consumer

import cats.effect.{Resource, Sync}
import cats.syntax.all.*
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.{Topic, TopicPartition}
import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition}
import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector
import io.prometheus.client.CollectorRegistry

Expand All @@ -13,7 +12,7 @@ object ConsumerMetricsOf {

def withJavaClientMetrics[F[_]: Sync: ToTry](
source: ConsumerMetrics[F],
prefix: String,
prefix: ClientId => String,
prometheus: CollectorRegistry
): ConsumerMetrics[F] =
new ConsumerMetrics[F] {
Expand All @@ -30,8 +29,11 @@ object ConsumerMetricsOf {

override def topics(latency: FiniteDuration): F[Unit] = source.topics(latency)

override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = {
val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, prefix.some)
override def exposeJavaMetrics[K, V](
consumer: Consumer[F, K, V],
clientId: Option[ClientId]
): Resource[F, Unit] = {
val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, clientId.map(prefix))
Resource.make {
Sync[F].delay { prometheus.register(collector) }
} { _ =>
Expand All @@ -44,7 +46,7 @@ object ConsumerMetricsOf {
implicit final class Syntax[F[_]](val source: ConsumerMetrics[F]) extends AnyVal {

def exposeJavaClientMetrics(
prefix: String,
prefix: ClientId => String,
prometheus: CollectorRegistry
)(implicit F: Sync[F], toTry: ToTry[F]): ConsumerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.evolutiongaming.skafka.producer

import cats.effect.{Resource, Sync}
import cats.syntax.all.*
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.Topic
import com.evolutiongaming.skafka.{ClientId, Topic}
import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector
import io.prometheus.client.CollectorRegistry

Expand All @@ -13,7 +12,7 @@ object ProducerMetricsOf {

def withJavaClientMetrics[F[_]: Sync: ToTry](
source: ProducerMetrics[F],
prefix: String,
prefix: ClientId => String,
prometheus: CollectorRegistry
): ProducerMetrics[F] =
new ProducerMetrics[F] {
Expand All @@ -37,8 +36,8 @@ object ProducerMetricsOf {

override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency)

override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = {
val collector = new KafkaMetricsCollector[F](producer.clientMetrics, prefix.some)
override def exposeJavaMetrics(producer: Producer[F], clientId: Option[ClientId]): Resource[F, Unit] = {
val collector = new KafkaMetricsCollector[F](producer.clientMetrics, clientId.map(prefix))
Resource.make {
Sync[F].delay { prometheus.register(collector) }
} { _ =>
Expand All @@ -51,7 +50,7 @@ object ProducerMetricsOf {
implicit final class Syntax[F[_]](val source: ProducerMetrics[F]) extends AnyVal {

def exposeJavaClientMetrics(
prefix: String,
prefix: ClientId => String,
prometheus: CollectorRegistry
)(implicit F: Sync[F], toTry: ToTry[F]): ProducerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ trait ConsumerMetrics[F[_]] {

def topics(latency: FiniteDuration): F[Unit]

private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] =
Resource.unit[F]
private[consumer] def exposeJavaMetrics[K, V](
@nowarn consumer: Consumer[F, K, V],
@nowarn clientId: Option[ClientId],
): Resource[F, Unit] = Resource.unit[F]
}

object ConsumerMetrics {
Expand Down Expand Up @@ -218,8 +220,8 @@ object ConsumerMetrics {
fg(self.topics(latency))
}

override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) =
self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg)
override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V], clientId: Option[ClientId]) =
self.exposeJavaMetrics(consumer.mapK(gf, fg), clientId).mapK(fg)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object ConsumerOf {

case Some(metrics) =>
for {
_ <- metrics.exposeJavaMetrics[K, V](consumer)
_ <- metrics.exposeJavaMetrics[K, V](consumer, config.common.clientId)
} yield {
consumer.withMetrics1[Throwable](metrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ trait ProducerMetrics[F[_]] {

def flush(latency: FiniteDuration): F[Unit]

private[producer] def exposeJavaMetrics(@nowarn producer: Producer[F]): Resource[F, Unit] = Resource.unit[F]
private[producer] def exposeJavaMetrics(
@nowarn producer: Producer[F],
@nowarn clientId: Option[ClientId],
): Resource[F, Unit] = Resource.unit[F]
}

object ProducerMetrics {
Expand Down Expand Up @@ -230,8 +233,8 @@ object ProducerMetrics {

def flush(latency: FiniteDuration) = fg(self.flush(latency))

override def exposeJavaMetrics(producer: Producer[G]) =
self.exposeJavaMetrics(producer.mapK[F](gf, fg)).mapK(fg)
override def exposeJavaMetrics(producer: Producer[G], clientId: Option[ClientId]) =
self.exposeJavaMetrics(producer.mapK[F](gf, fg), clientId).mapK(fg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object ProducerOf {

case Some(metrics) =>
for {
_ <- metrics.exposeJavaMetrics(producer)
_ <- metrics.exposeJavaMetrics(producer, config.common.clientId)
} yield {
producer.withMetrics[Throwable](metrics)
}
Expand Down

0 comments on commit d8fe291

Please sign in to comment.