From 01dba8601c7ac269ef92bf706cc64bbf687adb08 Mon Sep 17 00:00:00 2001 From: kyri-petrou <67301607+kyri-petrou@users.noreply.github.com> Date: Sun, 24 Dec 2023 15:27:28 +1100 Subject: [PATCH] Field wrapper optimizations (#2056) * Optimize tracing field wrappers * Add implicit empty trace to Schema instead * Use `updateAndGet` on AtomicReference * Pre-determine string builder's size --- core/src/main/scala/caliban/GraphQL.scala | 5 +- .../scala/caliban/execution/Executor.scala | 10 +- .../caliban/introspection/adt/__Type.scala | 3 + .../main/scala/caliban/parsing/Parser.scala | 5 +- .../main/scala/caliban/schema/Schema.scala | 8 +- .../scala/caliban/validation/Validator.scala | 7 +- .../caliban/wrappers/ApolloTracing.scala | 119 ++++++++++-------- .../scala/caliban/wrappers/FieldMetrics.scala | 55 ++++---- .../main/scala/caliban/wrappers/Wrapper.scala | 8 +- .../tracing/ApolloFederatedTracing.scala | 100 ++++++++------- 10 files changed, 181 insertions(+), 139 deletions(-) diff --git a/core/src/main/scala/caliban/GraphQL.scala b/core/src/main/scala/caliban/GraphQL.scala index d888b5f84b..ebc167bc28 100644 --- a/core/src/main/scala/caliban/GraphQL.scala +++ b/core/src/main/scala/caliban/GraphQL.scala @@ -13,6 +13,7 @@ import caliban.schema._ import caliban.validation.Validator import caliban.wrappers.Wrapper import caliban.wrappers.Wrapper._ +import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.{ IO, Trace, URIO, ZIO } /** @@ -87,7 +88,9 @@ trait GraphQL[-R] { self => _ <- Validator.validate(document, typeToValidate) } yield () - private def checkHttpMethod(cfg: ExecutionConfiguration)(req: ExecutionRequest): IO[ValidationError, Unit] = + private def checkHttpMethod(cfg: ExecutionConfiguration)(req: ExecutionRequest)(implicit + trace: Trace + ): IO[ValidationError, Unit] = ZIO .when(req.operationType == OperationType.Mutation && !cfg.allowMutationsOverGetRequests) { HttpRequestMethod.get.flatMap { diff --git a/core/src/main/scala/caliban/execution/Executor.scala b/core/src/main/scala/caliban/execution/Executor.scala index eb58702958..0449a83614 100644 --- a/core/src/main/scala/caliban/execution/Executor.scala +++ b/core/src/main/scala/caliban/execution/Executor.scala @@ -12,6 +12,7 @@ import caliban.schema.{ ReducedStep, Step, Types } import caliban.wrappers.Wrapper.FieldWrapper import zio._ import zio.query._ +import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stream.ZStream import scala.annotation.tailrec @@ -210,9 +211,10 @@ object Executor { var remainingNames = names while (remainingResponses ne Nil) { val name = remainingNames.head - val resp = remainingResponses.head match { - case null => i -= 1; fromQueries(i) - case resp => resp + var resp = remainingResponses.head + if (resp eq null) { + i -= 1 + resp = fromQueries(i) } results = (name, resp) :: results remainingResponses = remainingResponses.tail @@ -338,7 +340,7 @@ object Executor { } yield response } - private[caliban] def fail(error: CalibanError): UIO[GraphQLResponse[CalibanError]] = + private[caliban] def fail(error: CalibanError)(implicit trace: Trace): UIO[GraphQLResponse[CalibanError]] = ZIO.succeed(GraphQLResponse(NullValue, List(error))) private[caliban] def mergeFields(field: Field, typeName: String): List[Field] = { diff --git a/core/src/main/scala/caliban/introspection/adt/__Type.scala b/core/src/main/scala/caliban/introspection/adt/__Type.scala index 0ef41bbfbb..32a4b17bb5 100644 --- a/core/src/main/scala/caliban/introspection/adt/__Type.scala +++ b/core/src/main/scala/caliban/introspection/adt/__Type.scala @@ -5,6 +5,7 @@ import caliban.parsing.adt.Definition.TypeSystemDefinition.TypeDefinition import caliban.parsing.adt.Definition.TypeSystemDefinition.TypeDefinition._ import caliban.parsing.adt.Type.{ ListType, NamedType } import caliban.parsing.adt.{ Directive, Type } +import caliban.rendering.DocumentRenderer import caliban.schema.Annotations.GQLExcluded import caliban.schema.Types @@ -24,6 +25,8 @@ case class __Type( ) { self => final override lazy val hashCode: Int = super.hashCode() + private[caliban] lazy val typeNameRepr: String = DocumentRenderer.renderTypeName(this) + def |+|(that: __Type): __Type = __Type( kind, (name ++ that.name).reduceOption((_, b) => b), diff --git a/core/src/main/scala/caliban/parsing/Parser.scala b/core/src/main/scala/caliban/parsing/Parser.scala index 4b0d9ea2a6..7f5b6cfb6d 100644 --- a/core/src/main/scala/caliban/parsing/Parser.scala +++ b/core/src/main/scala/caliban/parsing/Parser.scala @@ -4,7 +4,8 @@ import caliban.CalibanError.ParsingError import caliban.InputValue import caliban.parsing.adt._ import fastparse._ -import zio.{ IO, ZIO } +import zio.stacktracer.TracingImplicits.disableAutoTrace +import zio.{ IO, Trace, ZIO } import scala.util.Try @@ -14,7 +15,7 @@ object Parser { /** * Parses the given string into a [[caliban.parsing.adt.Document]] object or fails with a [[caliban.CalibanError.ParsingError]]. */ - def parseQuery(query: String): IO[ParsingError, Document] = { + def parseQuery(query: String)(implicit trace: Trace): IO[ParsingError, Document] = { val sm = SourceMapper(query) ZIO .attempt(parse(query, document(_))) diff --git a/core/src/main/scala/caliban/schema/Schema.scala b/core/src/main/scala/caliban/schema/Schema.scala index 1ce037470a..85f760f32e 100644 --- a/core/src/main/scala/caliban/schema/Schema.scala +++ b/core/src/main/scala/caliban/schema/Schema.scala @@ -12,8 +12,9 @@ import caliban.schema.Types._ import caliban.uploads.Upload import caliban.{ InputValue, ResponseValue } import zio.query.ZQuery +import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stream.ZStream -import zio.{ Chunk, URIO, ZIO } +import zio.{ Chunk, Trace, URIO, ZIO } import java.time._ import java.time.format.DateTimeFormatter @@ -81,6 +82,9 @@ trait Schema[-R, T] { self => */ def arguments: List[__InputValue] = Nil + // Disables traces for caliban-provided effectful schemas + private[caliban] implicit def trace: Trace = Trace.empty + /** * Builds a new `Schema` of `A` from an existing `Schema` of `T` and a function from `A` to `T`. * @param f a function from `A` to `T`. @@ -488,7 +492,7 @@ trait GenericSchema[R] extends SchemaDerivation[R] with TemporalSchema { } implicit def futureSchema[R0, A](implicit ev: Schema[R0, A]): Schema[R0, Future[A]] = - effectSchema[R0, R0, R0, Throwable, A].contramap[Future[A]](future => ZIO.fromFuture(_ => future)) + effectSchema[R0, R0, R0, Throwable, A].contramap[Future[A]](future => ZIO.fromFuture(_ => future)(Trace.empty)) implicit def infallibleEffectSchema[R0, R1 >: R0, R2 >: R0, A](implicit ev: Schema[R2, A]): Schema[R0, URIO[R1, A]] = new Schema[R0, URIO[R1, A]] { override def optional: Boolean = ev.optional diff --git a/core/src/main/scala/caliban/validation/Validator.scala b/core/src/main/scala/caliban/validation/Validator.scala index a4d33ba868..05526c045d 100644 --- a/core/src/main/scala/caliban/validation/Validator.scala +++ b/core/src/main/scala/caliban/validation/Validator.scala @@ -21,7 +21,8 @@ import caliban.validation.Utils.isObjectType import caliban.{ Configurator, InputValue } import zio.prelude._ import zio.prelude.fx.ZPure -import zio.{ IO, ZIO } +import zio.stacktracer.TracingImplicits.disableAutoTrace +import zio.{ IO, Trace, ZIO } import scala.annotation.tailrec import scala.collection.mutable @@ -47,13 +48,13 @@ object Validator { /** * Verifies that the given document is valid for this type. Fails with a [[caliban.CalibanError.ValidationError]] otherwise. */ - def validate(document: Document, rootType: RootType): IO[ValidationError, Unit] = + def validate(document: Document, rootType: RootType)(implicit trace: Trace): IO[ValidationError, Unit] = Configurator.configuration.map(_.validations).flatMap(check(document, rootType, Map.empty, _).unit.toZIO) /** * Verifies that the given schema is valid. Fails with a [[caliban.CalibanError.ValidationError]] otherwise. */ - def validateSchema[R](schema: RootSchemaBuilder[R]): IO[ValidationError, RootSchema[R]] = { + def validateSchema[R](schema: RootSchemaBuilder[R])(implicit trace: Trace): IO[ValidationError, RootSchema[R]] = { val schemaValidation = validateSchemaEither(schema) ZIO.fromEither(schemaValidation) } diff --git a/core/src/main/scala/caliban/wrappers/ApolloTracing.scala b/core/src/main/scala/caliban/wrappers/ApolloTracing.scala index d3a619ed96..58a1928e2f 100644 --- a/core/src/main/scala/caliban/wrappers/ApolloTracing.scala +++ b/core/src/main/scala/caliban/wrappers/ApolloTracing.scala @@ -5,14 +5,14 @@ import caliban.Value.{ IntValue, StringValue } import caliban._ import caliban.execution.{ ExecutionRequest, FieldInfo } import caliban.parsing.adt.Document -import caliban.rendering.DocumentRenderer import caliban.wrappers.Wrapper._ import zio._ -import zio.query.{ UQuery, ZQuery } +import zio.query.ZQuery import java.time.format.DateTimeFormatter import java.time.{ Instant, ZoneId } import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference object ApolloTracing { @@ -34,12 +34,12 @@ object ApolloTracing { ZIO .whenZIO(isEnabledRef.get)( for { - ref <- Ref.make(Tracing()) + ref <- ZIO.succeed(new AtomicReference(Tracing())) clock <- ZIO.clock } yield apolloTracingOverall(clock, ref) |+| apolloTracingParsing(clock, ref) |+| apolloTracingValidation(clock, ref) |+| - apolloTracingField(ZQuery.fromZIO(clock.nanoTime), ref, !excludePureFields) + Unsafe.unsafe(implicit u => apolloTracingField(clock.unsafe.nanoTime(), ref, !excludePureFields)) ) .someOrElse(Wrapper.empty) ) @@ -58,14 +58,14 @@ object ApolloTracing { .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") .withZone(ZoneId.of("UTC")) - case class Parsing(startOffset: Long = 0, duration: Duration = Duration.Zero) { + case class Parsing(startOffset: Long = 0, durationNanos: Long = 0L) { def toResponseValue: ResponseValue = - ObjectValue(List("startOffset" -> IntValue(startOffset), "duration" -> IntValue(duration.toNanos))) + ObjectValue(List("startOffset" -> IntValue(startOffset), "duration" -> IntValue(durationNanos))) } - case class Validation(startOffset: Long = 0, duration: Duration = Duration.Zero) { + case class Validation(startOffset: Long = 0, durationNanos: Long = 0L) { def toResponseValue: ResponseValue = - ObjectValue(List("startOffset" -> IntValue(startOffset), "duration" -> IntValue(duration.toNanos))) + ObjectValue(List("startOffset" -> IntValue(startOffset), "duration" -> IntValue(durationNanos))) } case class Resolver( @@ -74,7 +74,7 @@ object ApolloTracing { fieldName: String = "", returnType: String = "", startOffset: Long = 0, - duration: Duration = Duration.Zero + durationNanos: Long = 0 ) { def toResponseValue: ResponseValue = ObjectValue( @@ -84,18 +84,22 @@ object ApolloTracing { "fieldName" -> StringValue(fieldName), "returnType" -> StringValue(returnType), "startOffset" -> IntValue(startOffset), - "duration" -> IntValue(duration.toNanos) + "duration" -> IntValue(durationNanos) ) ) } + object Resolver { + implicit val ordering: Ordering[Resolver] = { (x: Resolver, y: Resolver) => + val ord1 = Ordering.Long.compare(x.startOffset, y.startOffset) + if (ord1 != 0) ord1 + else Ordering.Long.compare(x.durationNanos, y.durationNanos) + } + } + case class Execution(resolvers: List[Resolver] = Nil) { def toResponseValue: ResponseValue = - ObjectValue( - List( - "resolvers" -> ListValue(resolvers.sortBy(r => (r.startOffset, r.duration.toNanos)).map(_.toResponseValue)) - ) - ) + ObjectValue(List("resolvers" -> ListValue(resolvers.sorted.map(_.toResponseValue)))) } case class Tracing( @@ -103,7 +107,7 @@ object ApolloTracing { startTime: Long = 0, endTime: Long = 0, startTimeMonotonic: Long = 0, - duration: Duration = Duration.Zero, + durationNanos: Long = 0L, parsing: Parsing = Parsing(), validation: Validation = Validation(), execution: Execution = Execution() @@ -114,7 +118,7 @@ object ApolloTracing { "version" -> IntValue(version), "startTime" -> StringValue(dateFormatter.format(Instant.ofEpochMilli(startTime))), "endTime" -> StringValue(dateFormatter.format(Instant.ofEpochMilli(endTime))), - "duration" -> IntValue(duration.toNanos), + "duration" -> IntValue(durationNanos), "parsing" -> parsing.toResponseValue, "validation" -> validation.toResponseValue, "execution" -> execution.toResponseValue @@ -122,7 +126,7 @@ object ApolloTracing { ) } - private def apolloTracingOverall(clock: Clock, ref: Ref[Tracing]): OverallWrapper[Any] = + private def apolloTracingOverall(clock: Clock, ref: AtomicReference[Tracing]): OverallWrapper[Any] = new OverallWrapper[Any] { def wrap[R1]( process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] @@ -131,12 +135,11 @@ object ApolloTracing { for { nanoTime <- clock.nanoTime currentTime <- clock.currentTime(TimeUnit.MILLISECONDS) - _ <- ref.update(_.copy(startTime = currentTime, startTimeMonotonic = nanoTime)) + _ <- ZIO.succeed(ref.updateAndGet(_.copy(startTime = currentTime, startTimeMonotonic = nanoTime))) result <- process(request).timed.flatMap { case (duration, result) => for { endTime <- clock.currentTime(TimeUnit.MILLISECONDS) - _ <- ref.update(_.copy(duration = duration, endTime = endTime)) - tracing <- ref.get + tracing <- ZIO.succeed(ref.get.copy(durationNanos = duration.toNanos, endTime = endTime)) } yield result.copy( extensions = Some( ObjectValue( @@ -149,7 +152,7 @@ object ApolloTracing { } yield result } - private def apolloTracingParsing(clock: Clock, ref: Ref[Tracing]): ParsingWrapper[Any] = + private def apolloTracingParsing(clock: Clock, ref: AtomicReference[Tracing]): ParsingWrapper[Any] = new ParsingWrapper[Any] { def wrap[R1]( process: String => ZIO[R1, CalibanError.ParsingError, Document] @@ -159,15 +162,18 @@ object ApolloTracing { start <- clock.nanoTime resultWithDuration <- process(query).timed (duration, result) = resultWithDuration - _ <- ref.update(state => - state.copy( - parsing = state.parsing.copy(startOffset = start - state.startTimeMonotonic, duration = duration) + _ <- ZIO.succeed( + ref.updateAndGet(state => + state.copy( + parsing = state.parsing + .copy(startOffset = start - state.startTimeMonotonic, durationNanos = duration.toNanos) + ) ) ) } yield result } - private def apolloTracingValidation(clock: Clock, ref: Ref[Tracing]): ValidationWrapper[Any] = + private def apolloTracingValidation(clock: Clock, ref: AtomicReference[Tracing]): ValidationWrapper[Any] = new ValidationWrapper[Any] { def wrap[R1]( process: Document => ZIO[R1, CalibanError.ValidationError, ExecutionRequest] @@ -177,18 +183,21 @@ object ApolloTracing { start <- clock.nanoTime resultWithDuration <- process(doc).timed (duration, result) = resultWithDuration - _ <- ref.update(state => - state.copy( - validation = - state.validation.copy(startOffset = start - state.startTimeMonotonic, duration = duration) - ) - ) + _ <- + ZIO.succeed( + ref.updateAndGet(state => + state.copy( + validation = state.validation + .copy(startOffset = start - state.startTimeMonotonic, durationNanos = duration.toNanos) + ) + ) + ) } yield result } private def apolloTracingField( - nanoTime: UQuery[Long], - ref: Ref[Tracing], + nanoTime: => Long, + ref: AtomicReference[Tracing], wrapPureValues: Boolean ): FieldWrapper[Any] = new FieldWrapper[Any](wrapPureValues) { @@ -196,27 +205,27 @@ object ApolloTracing { query: ZQuery[R1, CalibanError.ExecutionError, ResponseValue], fieldInfo: FieldInfo ): ZQuery[R1, CalibanError.ExecutionError, ResponseValue] = - for { - start <- nanoTime - result <- query - end <- nanoTime - _ <- ZQuery.fromZIO( - ref.update(state => - state.copy( - execution = state.execution.copy( - resolvers = Resolver( - path = fieldInfo.path, - parentType = fieldInfo.details.parentType.fold("")(DocumentRenderer.renderTypeName), - fieldName = fieldInfo.name, - returnType = DocumentRenderer.renderTypeName(fieldInfo.details.fieldType), - startOffset = start - state.startTimeMonotonic, - duration = Duration.fromNanos(end - start) - ) :: state.execution.resolvers - ) - ) - ) - ) - } yield result + ZQuery.suspend { + val start = nanoTime + query.map { result => + val end = nanoTime + val _ = ref.updateAndGet(state => + state.copy( + execution = state.execution.copy( + resolvers = Resolver( + path = fieldInfo.path, + parentType = fieldInfo.details.parentType.fold("")(_.typeNameRepr), + fieldName = fieldInfo.name, + returnType = fieldInfo.details.fieldType.typeNameRepr, + startOffset = start - state.startTimeMonotonic, + durationNanos = end - start + ) :: state.execution.resolvers + ) + ) + ) + result + } + } } } diff --git a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala index dfd4d41c5c..2ad7e0a6ec 100644 --- a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala +++ b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala @@ -7,8 +7,9 @@ import caliban.wrappers.Wrapper.OverallWrapper import zio._ import zio.metrics.MetricKeyType.Histogram import zio.metrics.{ Metric, MetricKey, MetricLabel } -import zio.query.{ UQuery, ZQuery } +import zio.query.ZQuery +import java.util.concurrent.atomic.AtomicReference import scala.jdk.CollectionConverters._ object FieldMetrics { @@ -58,17 +59,17 @@ object FieldMetrics { ): Wrapper.EffectfulWrapper[Any] = Wrapper.EffectfulWrapper( for { - timings <- Ref.make(List.empty[Timing]) - failures <- Ref.make(List.empty[String]) + timings <- ZIO.succeed(new AtomicReference(List.empty[Timing])) + failures <- ZIO.succeed(new AtomicReference(List.empty[String])) clock <- ZIO.clock metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels) } yield overallWrapper(timings, failures, metrics) |+| - fieldWrapper(ZQuery.fromZIO(clock.nanoTime), timings, failures) + Unsafe.unsafe(implicit us => fieldWrapper(clock.unsafe.nanoTime(), timings, failures)) ) private def overallWrapper( - timings: Ref[List[Timing]], - failures: Ref[List[String]], + timings: AtomicReference[List[Timing]], + failures: AtomicReference[List[String]], metrics: Metrics ): OverallWrapper[Any] = new OverallWrapper[Any] { @@ -79,10 +80,10 @@ object FieldMetrics { process(request) <* ZIO .blocking(for { - _ <- failures.get.flatMap(metrics.recordFailures) - timings <- timings.get - nodeOffsets = resolveNodeOffsets(timings) - _ <- metrics.recordSuccesses(nodeOffsets, timings) + _ <- ZIO.suspendSucceed(metrics.recordFailures(failures.get)) + ts <- ZIO.succeed(timings.get) + nodeOffsets = resolveNodeOffsets(ts) + _ <- metrics.recordSuccesses(nodeOffsets, ts) } yield ()) .forkDaemon } @@ -117,9 +118,9 @@ object FieldMetrics { } private def fieldWrapper( - nanoTime: UQuery[Long], - timings: Ref[List[Timing]], - failures: Ref[List[String]] + nanoTime: => Long, + timings: AtomicReference[List[Timing]], + failures: AtomicReference[List[String]] ): Wrapper.FieldWrapper[Any] = new Wrapper.FieldWrapper[Any] { def wrap[R]( @@ -129,7 +130,8 @@ object FieldMetrics { def fieldName: String = { val parent = info.parent.flatMap(_.name).getOrElse("Unknown") - new StringBuilder().append(parent).append('.').append(info.name).result() + val name = info.name + new StringBuilder(name.length + parent.length + 1).append(parent).append('.').append(name).result() } def makeTiming(duration: Long) = @@ -140,15 +142,22 @@ object FieldMetrics { duration = duration ) - def recordFailure(e: CalibanError.ExecutionError) = - ZQuery.fromZIO(failures.update(fieldName :: _) *> ZIO.fail(e)) - - for { - start <- nanoTime - result <- query.catchAll(recordFailure) - end <- nanoTime - _ <- ZQuery.fromZIO(timings.update(makeTiming(end - start) :: _)) - } yield result + ZQuery.suspend { + val st = nanoTime + query.foldQuery( + e => + ZQuery.fail { + val _ = failures.updateAndGet(fieldName :: _) + e + }, + result => + ZQuery.succeed { + val t = makeTiming(nanoTime - st) + val _ = timings.updateAndGet(t :: _) + result + } + ) + } } } diff --git a/core/src/main/scala/caliban/wrappers/Wrapper.scala b/core/src/main/scala/caliban/wrappers/Wrapper.scala index 438565344e..2625abdaf4 100644 --- a/core/src/main/scala/caliban/wrappers/Wrapper.scala +++ b/core/src/main/scala/caliban/wrappers/Wrapper.scala @@ -7,7 +7,8 @@ import caliban.introspection.adt.__Introspection import caliban.parsing.adt.Document import caliban.wrappers.Wrapper.CombinedWrapper import zio.query.ZQuery -import zio.{ UIO, ZIO } +import zio.{ Trace, UIO, ZIO } +import zio.stacktracer.TracingImplicits.disableAutoTrace import scala.annotation.tailrec import scala.collection.mutable.ListBuffer @@ -36,6 +37,9 @@ sealed trait Wrapper[-R] extends GraphQLAspect[Nothing, R] { self => def apply[R1 <: R](that: GraphQL[R1]): GraphQL[R1] = that.withWrapper(self) + + // Disables tracing only for wrappers in the caliban package + final private[caliban] def trace: Trace = Trace.empty } object Wrapper { @@ -147,7 +151,7 @@ object Wrapper { loop(process, wrappers)(info) } - private[caliban] def decompose[R](wrappers: List[Wrapper[R]]): UIO[ + private[caliban] def decompose[R](wrappers: List[Wrapper[R]])(implicit trace: Trace): UIO[ ( List[OverallWrapper[R]], List[ParsingWrapper[R]], diff --git a/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala b/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala index 3c94a441d5..e3ee959ec4 100644 --- a/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala +++ b/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala @@ -14,6 +14,7 @@ import zio.query.ZQuery import java.util.Base64 import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference /** * Implements the federated tracing specification detailed here: @@ -29,11 +30,11 @@ object ApolloFederatedTracing { def wrapper(excludePureFields: Boolean = false): EffectfulWrapper[Any] = EffectfulWrapper( for { - tracing <- Ref.make(Tracing(NodeTrie.empty)) - enabled <- Ref.make(false) + tracing <- ZIO.succeed(new AtomicReference(Tracing(NodeTrie.empty))) + enabled <- ZIO.succeed(new AtomicReference(false)) clock <- ZIO.clock } yield apolloTracingOverall(clock, tracing, enabled) |+| - apolloTracingField(clock, tracing, enabled, !excludePureFields) + Unsafe.unsafe(implicit u => apolloTracingField(clock.unsafe.nanoTime(), tracing, enabled, !excludePureFields)) ) private def toTimestamp(epochMilli: Long): Timestamp = @@ -42,26 +43,32 @@ object ApolloFederatedTracing { (epochMilli % 1000).toInt * 1000000 ) - private def apolloTracingOverall(clock: Clock, ref: Ref[Tracing], enabled: Ref[Boolean]): OverallWrapper[Any] = + private def apolloTracingOverall( + clock: Clock, + ref: AtomicReference[Tracing], + enabled: AtomicReference[Boolean] + ): OverallWrapper[Any] = new OverallWrapper[Any] { def wrap[R1]( process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = (request: GraphQLRequest) => ZIO.ifZIO( - enabled.updateAndGet(_ => - request.extensions.exists( - _.get(GraphQLRequest.`apollo-federation-include-trace`).contains(StringValue(GraphQLRequest.ftv1)) + ZIO.succeed( + enabled.updateAndGet(_ => + request.extensions.exists( + _.get(GraphQLRequest.`apollo-federation-include-trace`).contains(StringValue(GraphQLRequest.ftv1)) + ) ) ) )( for { startNano <- clock.nanoTime - _ <- ref.update(_.copy(startTime = startNano)) + _ <- ZIO.succeed(ref.updateAndGet(_.copy(startTime = startNano))) response <- process(request).summarized(clock.currentTime(TimeUnit.MILLISECONDS))((_, _)) ((start, end), result) = response endNano <- clock.nanoTime - tracing <- ref.get + tracing <- ZIO.succeed(ref.get) } yield { val root = Trace( startTime = Some(toTimestamp(start)), @@ -85,9 +92,9 @@ object ApolloFederatedTracing { } private def apolloTracingField( - clock: Clock, - ref: Ref[Tracing], - enabled: Ref[Boolean], + nanoTime: => Long, + ref: AtomicReference[Tracing], + enabled: AtomicReference[Boolean], wrapPureValues: Boolean ): FieldWrapper[Any] = new FieldWrapper[Any](wrapPureValues) { @@ -95,41 +102,40 @@ object ApolloFederatedTracing { query: ZQuery[R1, CalibanError.ExecutionError, ResponseValue], fieldInfo: FieldInfo ): ZQuery[R1, CalibanError.ExecutionError, ResponseValue] = - ZQuery - .fromZIO(enabled.get) - .flatMap( - if (_) - for { - response <- query.either.summarized(clock.nanoTime)((_, _)) - ((startTime, endTime), summarized) = response - id = Node.Id.ResponseName(fieldInfo.name) - result <- ZQuery.fromZIO( - ref.update(state => - state.copy( - root = state.root.insert( - (PathValue.Key(fieldInfo.name) :: fieldInfo.path).toVector, - Node( - id = id, - startTime = startTime - state.startTime, - endTime = endTime - state.startTime, - `type` = fieldInfo.details.fieldType.toType().toString, - parentType = fieldInfo.details.parentType.map(_.toType().toString) getOrElse "", - originalFieldName = fieldInfo.details.alias - .map(_ => fieldInfo.details.name) getOrElse "", - error = summarized.left.toOption.collectFirst { case e: ExecutionError => - Error( - e.getMessage(), - location = e.locationInfo.map(l => Location(l.line, l.column)).toSeq - ) - }.toSeq - ) - ) - ) - ) *> ZIO.fromEither(summarized) - ) - } yield result - else query - ) + if (enabled.get()) + ZQuery.suspend { + val startTime = nanoTime + query.either.flatMap { result => + ZQuery.fromEither { + val endTime = nanoTime + val path = (PathValue.Key(fieldInfo.name) :: fieldInfo.path).toVector + val _ = ref.updateAndGet(state => + state.copy( + root = state.root.insert( + path, + Node( + id = Node.Id.ResponseName(fieldInfo.name), + startTime = startTime - state.startTime, + endTime = endTime - state.startTime, + `type` = fieldInfo.details.fieldType.toType().toString, + parentType = fieldInfo.details.parentType.map(_.toType().toString) getOrElse "", + originalFieldName = fieldInfo.details.alias.map(_ => fieldInfo.details.name) getOrElse "", + error = result.left.toOption.collectFirst { case e: ExecutionError => + Error( + e.getMessage(), + location = e.locationInfo.map(l => Location(l.line, l.column)).toSeq + ) + }.toSeq + ) + ) + ) + ) + result + } + } + } + else query + } private type VPath = Vector[PathValue]