Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#3315 - upgrade prometheus client java to 1.1.0 #3325

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,8 @@ lazy val prometheusMetrics: ProjectMatrix = (projectMatrix in file("metrics/prom
.settings(
name := "tapir-prometheus-metrics",
libraryDependencies ++= Seq(
"io.prometheus" % "simpleclient_common" % "0.16.0",
"io.prometheus" % "prometheus-metrics-core" % "1.1.0",
"io.prometheus" % "prometheus-metrics-exposition-formats" % "1.1.0",
scalaTest.value % Test
)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
package sttp.tapir.server.metrics.prometheus

import io.prometheus.client.exporter.common.TextFormat
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram}
import io.prometheus.metrics.core.metrics.{Counter, Gauge, Histogram}
import io.prometheus.metrics.expositionformats.ExpositionFormats
import io.prometheus.metrics.model.registry.PrometheusRegistry
import sttp.monad.MonadError
import sttp.tapir.CodecFormat.TextPlain
import sttp.tapir._
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor
import sttp.tapir.server.metrics.{EndpointMetric, Metric, MetricLabels}

import java.io.StringWriter
import java.io.ByteArrayOutputStream
import java.time.{Clock, Duration}

case class PrometheusMetrics[F[_]](
namespace: String = "tapir",
registry: CollectorRegistry = CollectorRegistry.defaultRegistry,
registry: PrometheusRegistry = PrometheusRegistry.defaultRegistry,
metrics: List[Metric[F, _]] = List.empty[Metric[F, _]],
endpointPrefix: EndpointInput[Unit] = "metrics"
) {
import PrometheusMetrics._

/** An endpoint exposing the current metric values. */
lazy val metricsEndpoint: ServerEndpoint[Any, F] = ServerEndpoint.public(
endpoint.get.in(endpointPrefix).out(plainBody[CollectorRegistry]),
(monad: MonadError[F]) => (_: Unit) => monad.eval(Right(registry): Either[Unit, CollectorRegistry])
endpoint.get.in(endpointPrefix).out(plainBody[PrometheusRegistry]),
(monad: MonadError[F]) => (_: Unit) => monad.eval(Right(registry): Either[Unit, PrometheusRegistry])
)

/** Registers a `$namespace_request_active{path, method}` gauge (assuming default labels). */
Expand All @@ -48,16 +49,18 @@ case class PrometheusMetrics[F[_]](

object PrometheusMetrics {

implicit val schemaForCollectorRegistry: Schema[CollectorRegistry] = Schema.string[CollectorRegistry]
implicit val schemaForPrometheusRegistry: Schema[PrometheusRegistry] = Schema.string[PrometheusRegistry]

implicit val collectorRegistryCodec: Codec[String, CollectorRegistry, CodecFormat.TextPlain] =
Codec.anyString(TextPlain())(_ => DecodeResult.Value(new CollectorRegistry()))(r => {
val output = new StringWriter()
TextFormat.write004(output, r.metricFamilySamples)
implicit val prometheusRegistryCodec: Codec[String, PrometheusRegistry, CodecFormat.TextPlain] =
Codec.anyString(TextPlain())(_ => DecodeResult.Value(new PrometheusRegistry()))(r => {
val output = new ByteArrayOutputStream()
ExpositionFormats.init().getPrometheusTextFormatWriter.write(output, r.scrape())
gastonschabas marked this conversation as resolved.
Show resolved Hide resolved
output.close()
output.toString
})

def metricNameWithNamespace(namespace: String, metricName: String) = s"${namespace}_${metricName}"
gastonschabas marked this conversation as resolved.
Show resolved Hide resolved

/** Using the default namespace and labels, registers the following metrics:
*
* - `$namespace_request_active{path, method}` (gauge)
Expand All @@ -69,7 +72,7 @@ object PrometheusMetrics {
*/
def default[F[_]](
namespace: String = "tapir",
registry: CollectorRegistry = CollectorRegistry.defaultRegistry,
registry: PrometheusRegistry = PrometheusRegistry.defaultRegistry,
labels: MetricLabels = MetricLabels.Default
): PrometheusMetrics[F] =
PrometheusMetrics(
Expand All @@ -82,57 +85,57 @@ object PrometheusMetrics {
)
)

def requestActive[F[_]](registry: CollectorRegistry, namespace: String, labels: MetricLabels): Metric[F, Gauge] =
def requestActive[F[_]](registry: PrometheusRegistry, namespace: String, labels: MetricLabels): Metric[F, Gauge] =
Metric[F, Gauge](
Gauge
.build()
.namespace(namespace)
.name("request_active")
.builder()
.name(metricNameWithNamespace(namespace, "request_active"))
.help("Active HTTP requests")
.labelNames(labels.namesForRequest: _*)
.create()
.register(registry),
onRequest = { (req, gauge, m) =>
m.unit {
EndpointMetric()
.onEndpointRequest { ep => m.eval(gauge.labels(labels.valuesForRequest(ep, req): _*).inc()) }
.onResponseBody { (ep, _) => m.eval(gauge.labels(labels.valuesForRequest(ep, req): _*).dec()) }
.onException { (ep, _) => m.eval(gauge.labels(labels.valuesForRequest(ep, req): _*).dec()) }
.onEndpointRequest { ep => m.eval(gauge.labelValues(labels.valuesForRequest(ep, req): _*).inc()) }
.onResponseBody { (ep, _) => m.eval(gauge.labelValues(labels.valuesForRequest(ep, req): _*).dec()) }
.onException { (ep, _) => m.eval(gauge.labelValues(labels.valuesForRequest(ep, req): _*).dec()) }
}
}
)

def requestTotal[F[_]](registry: CollectorRegistry, namespace: String, labels: MetricLabels): Metric[F, Counter] =
def requestTotal[F[_]](registry: PrometheusRegistry, namespace: String, labels: MetricLabels): Metric[F, Counter] =
Metric[F, Counter](
Counter
.build()
.namespace(namespace)
.name("request_total")
.builder()
// .namespace(namespace)
gastonschabas marked this conversation as resolved.
Show resolved Hide resolved
.name(metricNameWithNamespace(namespace, "request_total"))
.help("Total HTTP requests")
.labelNames(labels.namesForRequest ++ labels.namesForResponse: _*)
.register(registry),
onRequest = { (req, counter, m) =>
m.unit {
EndpointMetric()
.onResponseBody { (ep, res) =>
m.eval(counter.labels(labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(res): _*).inc())
m.eval(counter.labelValues(labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(res): _*).inc())
}
.onException { (ep, ex) =>
m.eval(counter.labelValues(labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(ex): _*).inc())
}
.onException { (ep, ex) => m.eval(counter.labels(labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(ex): _*).inc()) }
}
}
)

def requestDuration[F[_]](
registry: CollectorRegistry,
registry: PrometheusRegistry,
namespace: String,
labels: MetricLabels,
clock: Clock = Clock.systemUTC()
): Metric[F, Histogram] =
Metric[F, Histogram](
Histogram
.build()
.namespace(namespace)
.name("request_duration_seconds")
.builder()
// .namespace(namespace)
.name(metricNameWithNamespace(namespace, "request_duration_seconds"))
.help("Duration of HTTP requests")
.labelNames(labels.namesForRequest ++ labels.namesForResponse ++ List(labels.forResponsePhase.name): _*)
.register(registry),
Expand All @@ -144,7 +147,7 @@ object PrometheusMetrics {
.onResponseHeaders { (ep, res) =>
m.eval(
histogram
.labels(
.labelValues(
labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(res) ++ List(labels.forResponsePhase.headersValue): _*
)
.observe(duration)
Expand All @@ -153,14 +156,18 @@ object PrometheusMetrics {
.onResponseBody { (ep, res) =>
m.eval(
histogram
.labels(labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(res) ++ List(labels.forResponsePhase.bodyValue): _*)
.labelValues(
labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(res) ++ List(labels.forResponsePhase.bodyValue): _*
)
.observe(duration)
)
}
.onException { (ep, ex) =>
m.eval(
histogram
.labels(labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(ex) ++ List(labels.forResponsePhase.bodyValue): _*)
.labelValues(
labels.valuesForRequest(ep, req) ++ labels.valuesForResponse(ex) ++ List(labels.forResponsePhase.bodyValue): _*
)
.observe(duration)
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sttp.tapir.server.metrics.prometheus

import io.prometheus.client.CollectorRegistry
import io.prometheus.metrics.model.registry.PrometheusRegistry
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.flatspec.AnyFlatSpec
Expand Down Expand Up @@ -36,7 +36,7 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
Thread.sleep(2000)
PersonsApi.defaultLogic(name)
}.serverEp
val metrics = PrometheusMetrics[Id]("tapir", new CollectorRegistry()).addRequestsActive()
val metrics = PrometheusMetrics[Id]("tapir", new PrometheusRegistry()).addRequestsActive()
val interpreter =
new ServerInterpreter[Any, Id, String, NoStreams](
_ => List(serverEp),
Expand All @@ -52,19 +52,19 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
Thread.sleep(500)

// then
collectorRegistryCodec
.encode(metrics.registry) should include("tapir_request_active{path=\"/person\",method=\"GET\",} 1.0")
prometheusRegistryCodec
.encode(metrics.registry) should include regex "tapir_request_active\\{(?=.*path=\"/person\")(?=.*method=\"GET\").*\\} 1.0"

ScalaFutures.whenReady(response, Timeout(Span(3, Seconds))) { _ =>
collectorRegistryCodec
.encode(metrics.registry) should include("tapir_request_active{path=\"/person\",method=\"GET\",} 0.0")
prometheusRegistryCodec
.encode(metrics.registry) should include regex "tapir_request_active\\{(?=.*path=\"/person\")(?=.*method=\"GET\").*\\} 0.0"
}
}

"default metrics" should "collect requests total" in {
// given
val serverEp = PersonsApi().serverEp
val metrics = PrometheusMetrics[Id]("tapir", new CollectorRegistry()).addRequestsTotal()
val metrics = PrometheusMetrics[Id]("tapir", new PrometheusRegistry()).addRequestsTotal()
val interpreter = new ServerInterpreter[Any, Id, Unit, NoStreams](
_ => List(serverEp),
TestRequestBody,
Expand All @@ -80,9 +80,9 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
interpreter.apply(PersonsApi.request(""))

// then
val encoded = collectorRegistryCodec.encode(metrics.registry)
encoded should include("tapir_request_total{path=\"/person\",method=\"GET\",status=\"2xx\",} 2.0")
encoded should include("tapir_request_total{path=\"/person\",method=\"GET\",status=\"4xx\",} 2.0")
val encoded = prometheusRegistryCodec.encode(metrics.registry)
encoded should include regex "tapir_request_total\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"2xx\").*\\} 2.0"
encoded should include regex "tapir_request_total\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"4xx\").*\\} 2.0"
}

"default metrics" should "collect requests duration" in {
Expand All @@ -103,7 +103,7 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
}
}

val metrics = PrometheusMetrics[Id]("tapir", new CollectorRegistry()).addRequestsDuration(clock = clock)
val metrics = PrometheusMetrics[Id]("tapir", new PrometheusRegistry()).addRequestsDuration(clock = clock)
def interpret(sleepHeaders: Long, sleepBody: Long) =
new ServerInterpreter[Any, Id, String, NoStreams](
_ => List(waitServerEp(sleepHeaders)),
Expand All @@ -119,47 +119,36 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
interpret(301, 3001)

// then
val encoded = collectorRegistryCodec.encode(metrics.registry)
val encoded = prometheusRegistryCodec.encode(metrics.registry)

// headers
// no response in less than 100ms
encoded should include(
"tapir_request_duration_seconds_bucket{path=\"/person\",method=\"GET\",status=\"2xx\",phase=\"headers\",le=\"0.1\",} 0.0"
)
// \{(?=.*path="/person")(?=.*method="GET")(?=.*status="2xx")(?=.*phase="headers")(?=.*le="0.25").*\}
encoded should include regex "tapir_request_duration_seconds_bucket\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"2xx\")(?=.*phase=\"headers\")(?=.*le=\"0.1\").*\\} 0"

// two under 250ms
encoded should include(
"tapir_request_duration_seconds_bucket{path=\"/person\",method=\"GET\",status=\"2xx\",phase=\"headers\",le=\"0.25\",} 2.0"
)
encoded should include regex "tapir_request_duration_seconds_bucket\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"2xx\")(?=.*phase=\"headers\")(?=.*le=\"0.25\").*\\} 2"

// all under 500ms
encoded should include(
"tapir_request_duration_seconds_bucket{path=\"/person\",method=\"GET\",status=\"2xx\",phase=\"headers\",le=\"0.5\",} 3.0"
)
encoded should include regex "tapir_request_duration_seconds_bucket\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"2xx\")(?=.*phase=\"headers\")(?=.*le=\"0.5\").*\\} 3"

// body
// no response in less than 1000ms
encoded should include(
"tapir_request_duration_seconds_bucket{path=\"/person\",method=\"GET\",status=\"2xx\",phase=\"body\",le=\"1.0\",} 0.0"
)
encoded should include regex "tapir_request_duration_seconds_bucket\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"2xx\")(?=.*phase=\"body\")(?=.*le=\"1.0\").*\\} 0"

// two under 2500ms
encoded should include(
"tapir_request_duration_seconds_bucket{path=\"/person\",method=\"GET\",status=\"2xx\",phase=\"body\",le=\"2.5\",} 2.0"
)
encoded should include regex "tapir_request_duration_seconds_bucket\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"2xx\")(?=.*phase=\"body\")(?=.*le=\"2.5\").*\\} 2"

// all under 5000ms
encoded should include(
"tapir_request_duration_seconds_bucket{path=\"/person\",method=\"GET\",status=\"2xx\",phase=\"body\",le=\"5.0\",} 3.0"
)
encoded should include regex "tapir_request_duration_seconds_bucket\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"2xx\")(?=.*phase=\"body\")(?=.*le=\"5.0\").*\\} 3"
}

"default metrics" should "customize labels" in {
// given
val serverEp = PersonsApi().serverEp
val labels = MetricLabels(forRequest = List("key" -> { case (_, _) => "value" }), forResponse = Nil)

val metrics = PrometheusMetrics[Id]("tapir", new CollectorRegistry()).addRequestsTotal(labels)
val metrics = PrometheusMetrics[Id]("tapir", new PrometheusRegistry()).addRequestsTotal(labels)
val interpreter =
new ServerInterpreter[Any, Id, String, NoStreams](
_ => List(serverEp),
Expand All @@ -173,13 +162,13 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
interpreter.apply(PersonsApi.request("Jacob"))

// then
collectorRegistryCodec.encode(metrics.registry) should include("tapir_request_total{key=\"value\",} 1.0")
prometheusRegistryCodec.encode(metrics.registry) should include regex "tapir_request_total\\{(?=.*key=\"value\").*\\} 1.0"
}

"interceptor" should "not collect metrics from prometheus endpoint" in {
// given
val serverEp = PersonsApi().serverEp
val metrics = PrometheusMetrics[Id]("tapir", new CollectorRegistry()).addRequestsTotal()
val metrics = PrometheusMetrics[Id]("tapir", new PrometheusRegistry()).addRequestsTotal()
val interpreter =
new ServerInterpreter[Any, Id, String, NoStreams](
_ => List(metrics.metricsEndpoint, serverEp),
Expand All @@ -194,15 +183,12 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
interpreter.apply(getMetricsRequest)

// then
collectorRegistryCodec.encode(metrics.registry) shouldBe
"""# HELP tapir_request_total Total HTTP requests
|# TYPE tapir_request_total counter
|""".stripMargin
prometheusRegistryCodec.encode(metrics.registry) shouldBe empty
}

"metrics server endpoint" should "return encoded registry" in {
// given
val metrics = PrometheusMetrics[Id]("tapir", new CollectorRegistry()).addRequestsTotal()
val metrics = PrometheusMetrics[Id]("tapir", new PrometheusRegistry()).addRequestsTotal()
val interpreter =
new ServerInterpreter[Any, Id, String, NoStreams](
_ => List(metrics.metricsEndpoint),
Expand All @@ -216,9 +202,7 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
interpreter.apply(getMetricsRequest) match {
case RequestResult.Response(response) =>
response.body.map { b =>
b shouldBe """# HELP tapir_request_total Total HTTP requests
|# TYPE tapir_request_total counter
|""".stripMargin
b shouldBe empty
} getOrElse fail()
case _ => fail()
}
Expand All @@ -227,7 +211,7 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
"metrics" should "be collected on exception when response from exception handler" in {
// given
val serverEp = PersonsApi { _ => throw new RuntimeException("Ups") }.serverEp
val metrics = PrometheusMetrics[Id]("tapir", new CollectorRegistry()).addRequestsTotal()
val metrics = PrometheusMetrics[Id]("tapir", new PrometheusRegistry()).addRequestsTotal()
val interpreter = new ServerInterpreter[Any, Id, String, NoStreams](
_ => List(serverEp),
TestRequestBody,
Expand All @@ -240,9 +224,9 @@ class PrometheusMetricsTest extends AnyFlatSpec with Matchers {
interpreter.apply(PersonsApi.request("Jacob"))

// then
collectorRegistryCodec.encode(metrics.registry) should include(
"tapir_request_total{path=\"/person\",method=\"GET\",status=\"5xx\",} 1.0"
)
prometheusRegistryCodec.encode(
metrics.registry
) should include regex "tapir_request_total\\{(?=.*path=\"/person\")(?=.*method=\"GET\")(?=.*status=\"5xx\").*\\} 1.0"
}
}

Expand Down
Loading