Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize metrics wrapper #1850

Merged
merged 7 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package caliban.execution

import caliban._
import caliban.wrappers.{ ApolloTracing, Wrappers }
import org.openjdk.jmh.annotations._
import zio.{ Runtime, Task, Unsafe, ZLayer }

Expand Down Expand Up @@ -65,6 +66,20 @@ class NestedZQueryBenchmark {
).interpreter
)

val metricsInterpreter: GraphQLInterpreter[Any, CalibanError] =
run(
graphQL[Any, MultifieldRoot, Unit, Unit](
RootResolver(NestedZQueryBenchmarkSchema.multifield1000Elements)
).withWrapper(Wrappers.metrics()).interpreter
)

val apolloInterpreter: GraphQLInterpreter[Any, CalibanError] =
run(
graphQL[Any, MultifieldRoot, Unit, Unit](
RootResolver(NestedZQueryBenchmarkSchema.multifield1000Elements)
).withWrapper(ApolloTracing.apolloTracing).interpreter
)

@Benchmark
def simpleParallelQuery100(): Any = {
val io =
Expand Down Expand Up @@ -257,4 +272,28 @@ class NestedZQueryBenchmark {
val io = deep10000.execute(deepQuery).provide(ZLayer.scoped(Configurator.setQueryExecution(QueryExecution.Batched)))
run(io)
}

@Benchmark
def noWrappersBenchmark(): Any = {
val io = multifield1000
.execute(multifieldQuery)
.provide(ZLayer.scoped(Configurator.setQueryExecution(QueryExecution.Batched)))
run(io)
}

@Benchmark
def apolloTracingBenchmark(): Any = {
val io = apolloInterpreter
.execute(multifieldQuery)
.provide(ZLayer.scoped(Configurator.setQueryExecution(QueryExecution.Batched)))
run(io)
}

@Benchmark
def metricsBenchmark(): Any = {
val io = metricsInterpreter
.execute(multifieldQuery)
.provide(ZLayer.scoped(Configurator.setQueryExecution(QueryExecution.Batched)))
run(io)
}
}
175 changes: 108 additions & 67 deletions core/src/main/scala/caliban/wrappers/FieldMetrics.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package caliban.wrappers

import caliban.CalibanError
import caliban.ResponseValue
import caliban.execution.FieldInfo
import caliban.wrappers.Wrapper.OverallWrapper
import caliban.{ CalibanError, GraphQLRequest, GraphQLResponse, ResponseValue }
import zio._
import zio.metrics.Metric
import zio.query.ZQuery
import zio.metrics.MetricKeyType.Histogram
import zio.metrics.MetricLabel
import zio.metrics.{ Metric, MetricKey, MetricLabel }
import zio.query.ZQuery

import scala.jdk.CollectionConverters._

object FieldMetrics {
private[caliban] val defaultBuckets = Histogram.Boundaries(
Expand All @@ -16,15 +17,37 @@ object FieldMetrics {
)
)

type Timing = (String, Long)
private def fieldDuration(name: String, field: String, buckets: Histogram.Boundaries) =
Metric.histogram(name, buckets).tagged("field", field)
private class Metrics(
totalLabel: String,
durationLabel: String,
buckets: Histogram.Boundaries,
extraLabels: Set[MetricLabel]
) {

def recordFailures(fieldNames: List[String]): UIO[Unit] =
ZIO.foreachDiscard(fieldNames)(fn => failed.tagged("field", fn).increment)

private def fieldTotal(
def recordSuccesses(nodeOffsets: Map[Vector[Either[String, Int]], Long], timings: List[Timing]): UIO[Unit] =
ZIO.foreachDiscard(timings) { timing =>
val d = timing.duration - nodeOffsets.getOrElse(timing.path :+ Left(timing.name), 0L)
succeeded.tagged(Set(MetricLabel("field", timing.fullName))).increment *>
duration.tagged(Set(MetricLabel("field", timing.fullName))).update(d / 1e9)
}

private lazy val failed = makeCounter("error")
private val succeeded = makeCounter("ok")
private val duration = Metric.fromMetricKey(MetricKey.histogram(durationLabel, buckets).tagged(extraLabels))

private def makeCounter(status: String) =
Metric.fromMetricKey(MetricKey.counter(totalLabel).tagged(extraLabels + MetricLabel("status", status)))
}

private case class Timing(
name: String,
field: String,
status: String
) = Metric.counter(name).tagged("field", field).tagged("status", status)
path: Vector[Either[String, Int]],
fullName: String,
duration: Long
)

def wrapper(
totalLabel: String = "graphql_fields_total",
Expand All @@ -34,68 +57,86 @@ object FieldMetrics {
): Wrapper.EffectfulWrapper[Any] =
Wrapper.EffectfulWrapper(
for {
ref <- Ref.make(List.empty[Timing])
} yield fieldDuration(totalLabel, durationLabel, ref, buckets, extraLabels)
timings <- Ref.make(List.empty[Timing])
failures <- Ref.make(List.empty[String])
metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels)
} yield overallWrapper(timings, failures, metrics) |+| fieldWrapper(timings, failures)
)

private def fieldDuration(
totalLabel: String,
durationLabel: String,
ref: Ref[List[Timing]],
buckets: Histogram.Boundaries,
extraLabels: Set[MetricLabel]
): Wrapper.FieldWrapper[Any] =
private def overallWrapper(
timings: Ref[List[Timing]],
failures: Ref[List[String]],
metrics: Metrics
): OverallWrapper[Any] =
new OverallWrapper[Any] {
def wrap[R1](
process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]]
): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] =
(request: GraphQLRequest) =>
process(request) <*
(for {
_ <- failures.get.flatMap(metrics.recordFailures)
timings <- timings.get
nodeOffsets = resolveNodeOffsets(timings)
_ <- metrics.recordSuccesses(nodeOffsets, timings)
} yield ()).forkDaemon
Comment on lines +77 to +82
Copy link
Collaborator Author

@kyri-petrou kyri-petrou Aug 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how I feel about forking this effect, but it's by far the most performant implementation. With forking, it becomes faster than the ApolloTracing wrapper (see performance result in PR description)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to not make the execution slower because we're updating the metrics. And since this wrapper does't wrap pure fields, we don't create too many useless fibers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way we're forking only once per query execution here, so the effect of the forking itself should be extremely minimal. My only concern is that in the very unlikely case that the calculation of offsets + writing of metrics is slower than the query execution, this could lead to a memory leak.

However, the chances of that happening are extremely low since in almost all cases the query execution will be slower than this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think being able to separate the measurements from the final calculation/reporting of metrics is the most beautiful and clever part of this change, so I definitely think we should keep the forkDaemon. :)

Also, a single forkDaemon call is likely to pale in comparison to all other fork calls during query execution.

}

private def resolveNodeOffsets(timings: List[Timing]): Map[Vector[Either[String, Int]], Long] = {
val map = new java.util.HashMap[Vector[Either[String, Int]], Long]()
timings.foreach { t =>
val iter = t.path.inits
var continue = true
while (continue) {
val segment = iter.next()
if (!iter.hasNext) {
continue = false // Last element of `.inits` is an empty list
} else if (segment.last.isLeft) { // List indices are not fields so we don't care about recording their offset
map.compute(
segment,
{
case (_, v) if v >= t.duration =>
continue = false // We know that any subsequent segments will have a smaller offset
v
case _ => t.duration
}
)
}
}
}
map.asScala.toMap
}

private def fieldWrapper(timings: Ref[List[Timing]], failures: Ref[List[String]]): Wrapper.FieldWrapper[Any] =
new Wrapper.FieldWrapper[Any] {
override def wrap[R](
def wrap[R](
query: ZQuery[R, CalibanError.ExecutionError, ResponseValue],
info: FieldInfo
): ZQuery[R, CalibanError.ExecutionError, ResponseValue] =
for {
summarized <-
query
.foldQuery(
error =>
ZQuery.fromZIO(
fieldTotal(totalLabel, fieldName(info), "error").tagged(extraLabels).increment
) *> ZQuery.fail(error),
success =>
ZQuery.fromZIO(fieldTotal(totalLabel, fieldName(info), "ok").tagged(extraLabels).increment) *> ZQuery
.succeed(success)
)
.summarized(Clock.nanoTime)((_, _))
((start, end), result) = summarized
measure <- ZQuery.fromZIO(for {
currentPath <- ZIO.succeed(toPath(info))
popped <- ref.modify { state =>
val (popped, rest) = state.partition { case (path, _) =>
path.startsWith(currentPath)
}
(popped, rest)
}
offset = if (popped.isEmpty) ("", 0L)
else popped.maxBy { case (_, duration) => duration }
duration = end - start - offset._2
_ <- ref.update(current => (currentPath, duration + offset._2) :: current)
} yield duration / 1e9)
_ <-
ZQuery.fromZIO(fieldDuration(durationLabel, fieldName(info), buckets).tagged(extraLabels).update(measure))
} yield result
}
): ZQuery[R, CalibanError.ExecutionError, ResponseValue] = {

private def fieldName(fieldInfo: FieldInfo): String = {
val parent = fieldInfo.parent.flatMap(_.name).getOrElse("Unknown")
s"$parent.${fieldInfo.details.name}"
}
def fieldName: String = {
val parent = info.parent.flatMap(_.name).getOrElse("Unknown")
new StringBuilder().append(parent).append('.').append(info.name).result()
}

def makeTiming(duration: Long) =
Timing(
name = info.name,
path = info.path.view.reverse.toVector,
fullName = fieldName,
duration = duration
)

private def toPath(info: FieldInfo) =
(Left(info.name) :: info.path).foldRight("") { case (part, acc) =>
val withDot =
if (acc == "") acc
else s"$acc."
def recordFailure(e: CalibanError.ExecutionError) =
ZQuery.fromZIO(failures.update(fieldName :: _) *> ZIO.fail(e))

withDot + (part match {
case Left(value) => value
case Right(i) => i.toString
})
for {
summarized <- query.summarized(Clock.nanoTime)((s, e) => e - s).catchAll(recordFailure)
(duration, result) = summarized
timing = makeTiming(duration)
_ <- ZQuery.fromZIO(timings.update(timing :: _))
} yield result
}
}

}
23 changes: 15 additions & 8 deletions core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object FieldMetricsSpec extends ZIOSpecDefault {
}""")
for {
interpreter <- (api @@ Wrappers.metrics(extraLabels = Set(MetricLabel("test", "success")))).interpreter
_ <- TestClock.adjust(30.seconds)
fiber <- interpreter.execute(query).fork
_ <- TestClock.adjust(30.seconds)
res <- fiber.join
Expand All @@ -87,14 +88,20 @@ object FieldMetricsSpec extends ZIOSpecDefault {
totalRoot <- metricTotal.tagged("field", "Queries.person").tagged("status", "ok").value
total <- metricTotal.tagged("field", "Friend.name").tagged("status", "ok").value
totalError <- metricTotal.tagged("field", "Friend.name").tagged("status", "error").value
} yield assertTrue(res.errors.size == 1) &&
assertTrue(getCountForDuration(root, 0.5) == 1) &&
assertTrue(getCountForDuration(name, 0.1) == 1) && // top-level name
assertTrue(getCountForDuration(name, 0.5) == 1) && // top level
assertTrue(getCountForDuration(friendsName, 0.5) == 3) && // three friends
assertTrue(totalRoot.count == 1.0) &&
assertTrue(total.count == 3.0) &&
assertTrue(totalError.count == 1.0)
} yield assertTrue(
res.errors.size == 1,
getCountForDuration(root, 0.025) == 0,
getCountForDuration(root, 0.05) == 1,
getCountForDuration(root, 0.5) == 1,
getCountForDuration(name, 0.075) == 0,
getCountForDuration(name, 0.1) == 1,
getCountForDuration(name, 0.5) == 1,
getCountForDuration(friendsName, 0.25) == 0,
getCountForDuration(friendsName, 0.5) == 3,
totalRoot.count == 1.0,
total.count == 3.0,
totalError.count == 1.0
)
}
)
}