From ce6e49ac71fccdb92f0c9797733545b2add43237 Mon Sep 17 00:00:00 2001 From: kyri-petrou <67301607+kyri-petrou@users.noreply.github.com> Date: Tue, 2 Jul 2024 10:11:56 +1000 Subject: [PATCH] Add method to "suspend" wrappers and reimplement APQs (#2321) * Add suspended wrappers and reimplement APQs * Cleanups * Revert changes to wrappers with Clock and add back ApolloCaching * fmt --- core/src/main/scala/caliban/implicits.scala | 7 ++ .../wrappers/ApolloPersistedQueries.scala | 92 ++++++++++++------- .../caliban/wrappers/ApolloTracing.scala | 10 +- .../caliban/wrappers/CostEstimation.scala | 16 ++-- .../scala/caliban/wrappers/FieldMetrics.scala | 46 +++++----- .../main/scala/caliban/wrappers/Wrapper.scala | 82 +++++++++++------ .../scala/caliban/wrappers/Wrappers.scala | 2 +- .../caliban/wrappers/FieldMetricsSpec.scala | 2 +- .../scala/caliban/wrappers/WrappersSpec.scala | 78 +++------------- .../tracing/ApolloFederatedTracing.scala | 24 ++--- 10 files changed, 183 insertions(+), 176 deletions(-) create mode 100644 core/src/main/scala/caliban/implicits.scala diff --git a/core/src/main/scala/caliban/implicits.scala b/core/src/main/scala/caliban/implicits.scala new file mode 100644 index 000000000..9a19ee83e --- /dev/null +++ b/core/src/main/scala/caliban/implicits.scala @@ -0,0 +1,7 @@ +package caliban + +import zio.Unsafe + +private object implicits { + implicit val unsafe: Unsafe = Unsafe.unsafe(identity) +} diff --git a/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala b/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala index 31bd691fd..e6e6f61b6 100644 --- a/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala +++ b/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala @@ -10,6 +10,7 @@ import zio._ import java.nio.charset.StandardCharsets import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable object ApolloPersistedQueries { @@ -26,44 +27,48 @@ object ApolloPersistedQueries { def add(hash: String, query: Document): ZIO[ApolloPersistence, Nothing, Unit] = ZIO.serviceWithZIO[ApolloPersistence](_.add(hash, query)) - val live: UIO[ApolloPersistence] = - ZIO.succeed(new ConcurrentHashMap[String, Document]()).map { docCache => - new ApolloPersistence { - override def get(hash: String): UIO[Option[Document]] = ZIO.succeed(Option(docCache.get(hash))) - override def add(hash: String, query: Document): UIO[Unit] = ZIO.succeed(docCache.put(hash, query)).unit - } - } + val live: UIO[ApolloPersistence] = ZIO.succeed(new Live)(Trace.empty) + + final class Live extends ApolloPersistence { + private implicit val trace: Trace = Trace.empty + private val docCache: ConcurrentHashMap[String, Document] = new ConcurrentHashMap[String, Document]() + + override def get(hash: String): UIO[Option[Document]] = ZIO.succeed(Option(docCache.get(hash))) + override def add(hash: String, query: Document): UIO[Unit] = ZIO.succeed(docCache.put(hash, query)).unit + } + } val live: Layer[Nothing, ApolloPersistence] = ZLayer(ApolloPersistence.live) private def parsingWrapper( - docVar: Promise[Nothing, Option[(String, Option[Document])]] - ): ParsingWrapper[ApolloPersistence] = - new ParsingWrapper[ApolloPersistence] { - override def wrap[R1 <: ApolloPersistence]( + docVar: AtomicReference[Option[(String, Option[Document])]] + ): ParsingWrapper[Any] = + new ParsingWrapper[Any] { + override def wrap[R1 <: Any]( f: String => ZIO[R1, CalibanError.ParsingError, Document] ): String => ZIO[R1, CalibanError.ParsingError, Document] = (query: String) => - docVar.await.flatMap { - case Some((_, Some(doc))) => ZIO.succeed(doc) + docVar.get() match { + case Some((_, Some(doc))) => Exit.succeed(doc) case _ => f(query) } } private def validationWrapper( - docVar: Promise[Nothing, Option[(String, Option[Document])]] - ): ValidationWrapper[ApolloPersistence] = - new ValidationWrapper[ApolloPersistence] { + store: ApolloPersistence, + docVar: AtomicReference[Option[(String, Option[Document])]] + ): ValidationWrapper[Any] = + new ValidationWrapper[Any] { override val priority: Int = 100 - override def wrap[R1 <: ApolloPersistence]( + override def wrap[R1 <: Any]( f: Document => ZIO[R1, ValidationError, ExecutionRequest] ): Document => ZIO[R1, ValidationError, ExecutionRequest] = (doc: Document) => - docVar.await.flatMap { + docVar.get() match { case Some((_, Some(_))) => Configurator.ref.locallyWith(_.copy(skipValidation = true))(f(doc)) - case Some((hash, None)) => f(doc) <* ApolloPersistence.add(hash, doc) + case Some((hash, None)) => f(doc) <* store.add(hash, doc) case None => f(doc) } } @@ -74,40 +79,61 @@ object ApolloPersistedQueries { * value to `(_, None)` which will then get passed to the parsing wrapper where it will populate the cache with the validated query document */ private def overallWrapper( - docVar: Promise[Nothing, Option[(String, Option[Document])]] - ): OverallWrapper[ApolloPersistence] = - new OverallWrapper[ApolloPersistence] { - def wrap[R1 <: ApolloPersistence]( + store: ApolloPersistence, + docVar: AtomicReference[Option[(String, Option[Document])]] + ): OverallWrapper[Any] = + new OverallWrapper[Any] { + def wrap[R1 <: Any]( process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = (request: GraphQLRequest) => readHash(request) match { case Some(hash) => - ApolloPersistence + store .get(hash) .flatMap { - case Some(doc) => docVar.succeed(Some((hash, Some(doc)))) as request + case Some(doc) => + docVar.set(Some((hash, Some(doc)))) + Exit.succeed(request) case None => request.query match { - case Some(value) if checkHash(hash, value) => docVar.succeed(Some((hash, None))).as(request) - case Some(_) => ZIO.fail(ValidationError("Provided sha does not match any query", "")) - case None => ZIO.fail(ValidationError("PersistedQueryNotFound", "")) + case Some(value) if checkHash(hash, value) => + docVar.set(Some((hash, None))) + Exit.succeed(request) + case Some(_) => Exit.fail(ValidationError("Provided sha does not match any query", "")) + case None => Exit.fail(ValidationError("PersistedQueryNotFound", "")) } } .flatMap(process) .catchAll(ex => Exit.succeed(GraphQLResponse(NullValue, List(ex)))) - case None => docVar.succeed(None) *> process(request) + case None => process(request) } } + @deprecated("Use `wrapper` instead and pass the cache explicitly", "2.9.0") + val apolloPersistedQueries: EffectfulWrapper[ApolloPersistence] = + EffectfulWrapper(ZIO.serviceWith[ApolloPersistence](wrapper)) + /** * Returns a wrapper that persists and retrieves queries based on a hash * following Apollo Persisted Queries spec: https://github.com/apollographql/apollo-link-persisted-queries. + * + * This wrapper will initialize a non-expiring cache which will be used for all queries + * + * @see Overloaded method for a variant that allows using a custom cache */ - val apolloPersistedQueries: EffectfulWrapper[ApolloPersistence] = - EffectfulWrapper(Promise.make[Nothing, Option[(String, Option[Document])]].map { docVar => - overallWrapper(docVar) |+| parsingWrapper(docVar) |+| validationWrapper(docVar) - }) + def wrapper: Wrapper[Any] = wrapper(new ApolloPersistence.Live) + + /** + * Returns a wrapper that persists and retrieves queries based on a hash + * following Apollo Persisted Queries spec: https://github.com/apollographql/apollo-link-persisted-queries. + * + * @param cache the query cache that will be used to store the parsed documents + */ + def wrapper(cache: ApolloPersistence): Wrapper[Any] = Wrapper.suspend { + val ref = new AtomicReference[Option[(String, Option[Document])]](None) + overallWrapper(cache, ref) |+| parsingWrapper(ref) |+| validationWrapper(cache, ref) + } private def readHash(request: GraphQLRequest): Option[String] = request.extensions diff --git a/core/src/main/scala/caliban/wrappers/ApolloTracing.scala b/core/src/main/scala/caliban/wrappers/ApolloTracing.scala index 58a1928e2..6b9f9fc0c 100644 --- a/core/src/main/scala/caliban/wrappers/ApolloTracing.scala +++ b/core/src/main/scala/caliban/wrappers/ApolloTracing.scala @@ -39,7 +39,7 @@ object ApolloTracing { } yield apolloTracingOverall(clock, ref) |+| apolloTracingParsing(clock, ref) |+| apolloTracingValidation(clock, ref) |+| - Unsafe.unsafe(implicit u => apolloTracingField(clock.unsafe.nanoTime(), ref, !excludePureFields)) + apolloTracingField(clock.unsafe, ref, !excludePureFields) ) .someOrElse(Wrapper.empty) ) @@ -196,19 +196,21 @@ object ApolloTracing { } private def apolloTracingField( - nanoTime: => Long, + clock: Clock#UnsafeAPI, ref: AtomicReference[Tracing], wrapPureValues: Boolean ): FieldWrapper[Any] = new FieldWrapper[Any](wrapPureValues) { + import caliban.implicits.unsafe + def wrap[R1]( query: ZQuery[R1, CalibanError.ExecutionError, ResponseValue], fieldInfo: FieldInfo ): ZQuery[R1, CalibanError.ExecutionError, ResponseValue] = ZQuery.suspend { - val start = nanoTime + val start = clock.nanoTime() query.map { result => - val end = nanoTime + val end = clock.nanoTime() val _ = ref.updateAndGet(state => state.copy( execution = state.execution.copy( diff --git a/core/src/main/scala/caliban/wrappers/CostEstimation.scala b/core/src/main/scala/caliban/wrappers/CostEstimation.scala index abc008a92..2e5fc35c8 100644 --- a/core/src/main/scala/caliban/wrappers/CostEstimation.scala +++ b/core/src/main/scala/caliban/wrappers/CostEstimation.scala @@ -4,13 +4,12 @@ import caliban.CalibanError.ValidationError import caliban.InputValue.ListValue import caliban.ResponseValue.ObjectValue import caliban.Value.{ FloatValue, IntValue, StringValue } +import caliban._ import caliban.execution.{ ExecutionRequest, Field } import caliban.parsing.adt.{ Directive, Document } import caliban.schema.Annotations.GQLDirective import caliban.schema.Types -import caliban.validation.Validator -import caliban.wrappers.Wrapper.{ EffectfulWrapper, OverallWrapper, ValidationWrapper } -import caliban.{ CalibanError, Configurator, GraphQLRequest, GraphQLResponse, ResponseValue } +import caliban.wrappers.Wrapper.{ OverallWrapper, ValidationWrapper } import zio.{ Ref, UIO, URIO, ZIO } import scala.annotation.tailrec @@ -155,11 +154,12 @@ object CostEstimation { costWrapper: Ref[Double] => Wrapper[R] )( p: (Double, GraphQLResponse[CalibanError]) => URIO[R, GraphQLResponse[CalibanError]] - ): Wrapper[R] = EffectfulWrapper( - Ref.make(0.0).map { cost => - costWrapper(cost) |+| costOverall(resp => cost.get.flatMap(p(_, resp))) - } - ) + ): Wrapper[R] = Wrapper.suspend { + import caliban.implicits.unsafe + + val cost = Ref.unsafe.make(0.0) + costWrapper(cost) |+| costOverall(resp => cost.get.flatMap(p(_, resp))) + } /** * Computes the estimated cost of executing the query using the provided function and compares it to diff --git a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala index 2ad7e0a6e..8f4b09593 100644 --- a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala +++ b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala @@ -3,7 +3,7 @@ package caliban.wrappers import caliban.Value.StringValue import caliban._ import caliban.execution.FieldInfo -import caliban.wrappers.Wrapper.OverallWrapper +import caliban.wrappers.Wrapper.{ EffectfulWrapper, OverallWrapper } import zio._ import zio.metrics.MetricKeyType.Histogram import zio.metrics.{ Metric, MetricKey, MetricLabel } @@ -56,16 +56,15 @@ object FieldMetrics { durationLabel: String = "graphql_fields_duration_seconds", buckets: Histogram.Boundaries = defaultBuckets, extraLabels: Set[MetricLabel] = Set.empty - ): Wrapper.EffectfulWrapper[Any] = - Wrapper.EffectfulWrapper( - for { - timings <- ZIO.succeed(new AtomicReference(List.empty[Timing])) - failures <- ZIO.succeed(new AtomicReference(List.empty[String])) - clock <- ZIO.clock - metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels) - } yield overallWrapper(timings, failures, metrics) |+| - Unsafe.unsafe(implicit us => fieldWrapper(clock.unsafe.nanoTime(), timings, failures)) - ) + ): EffectfulWrapper[Any] = + Wrapper.EffectfulWrapper { + ZIO.clock.map { clock => + val timings = new AtomicReference(List.empty[Timing]) + val failures = new AtomicReference(List.empty[String]) + val metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels) + overallWrapper(timings, failures, metrics) |+| fieldWrapper(clock.unsafe, timings, failures) + } + } private def overallWrapper( timings: AtomicReference[List[Timing]], @@ -78,21 +77,20 @@ object FieldMetrics { ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = (request: GraphQLRequest) => process(request) <* - ZIO - .blocking(for { - _ <- ZIO.suspendSucceed(metrics.recordFailures(failures.get)) - ts <- ZIO.succeed(timings.get) - nodeOffsets = resolveNodeOffsets(ts) - _ <- metrics.recordSuccesses(nodeOffsets, ts) - } yield ()) - .forkDaemon + (for { + _ <- ZIO.suspendSucceed(metrics.recordFailures(failures.get)) + ts <- ZIO.succeed(timings.get) + nodeOffsets = resolveNodeOffsets(ts) + _ <- metrics.recordSuccesses(nodeOffsets, ts) + } yield ()).forkDaemon } private def resolveNodeOffsets(timings: List[Timing]): Map[Vector[PathValue], Long] = { val map = new java.util.HashMap[Vector[PathValue], Long]() + val nil = Nil var remaining = timings - while (!remaining.isEmpty) { + while (remaining ne nil) { val t = remaining.head val iter = t.path.inits val duration = t.duration @@ -118,11 +116,13 @@ object FieldMetrics { } private def fieldWrapper( - nanoTime: => Long, + clock: Clock#UnsafeAPI, timings: AtomicReference[List[Timing]], failures: AtomicReference[List[String]] ): Wrapper.FieldWrapper[Any] = new Wrapper.FieldWrapper[Any] { + import caliban.implicits.unsafe + def wrap[R]( query: ZQuery[R, CalibanError.ExecutionError, ResponseValue], info: FieldInfo @@ -143,7 +143,7 @@ object FieldMetrics { ) ZQuery.suspend { - val st = nanoTime + val st = clock.nanoTime() query.foldQuery( e => ZQuery.fail { @@ -152,7 +152,7 @@ object FieldMetrics { }, result => ZQuery.succeed { - val t = makeTiming(nanoTime - st) + val t = makeTiming(clock.nanoTime() - st) val _ = timings.updateAndGet(t :: _) result } diff --git a/core/src/main/scala/caliban/wrappers/Wrapper.scala b/core/src/main/scala/caliban/wrappers/Wrapper.scala index 91ab95435..613639072 100644 --- a/core/src/main/scala/caliban/wrappers/Wrapper.scala +++ b/core/src/main/scala/caliban/wrappers/Wrapper.scala @@ -7,7 +7,7 @@ import caliban.introspection.adt.__Introspection import caliban.parsing.adt.Document import caliban.wrappers.Wrapper.CombinedWrapper import zio.query.ZQuery -import zio.{ Exit, Trace, UIO, ZIO } +import zio.{ Exit, Trace, URIO, ZIO } import zio.stacktracer.TracingImplicits.disableAutoTrace import scala.annotation.tailrec @@ -55,6 +55,13 @@ object Wrapper { def empty[R]: Wrapper[R] = Empty + /** + * Suspends the creation of a wrapper which allows capturing any side effects in the creation of a wrapper. + * The wrapper will be recreated for each query. + * @param wrapper the wrapper to suspend + */ + def suspend[R](wrapper: => Wrapper[R]): Wrapper[R] = SuspendedWrapper[R](() => wrapper) + sealed trait SimpleWrapper[-R, E, A, Info] extends Wrapper[R] { def wrap[R1 <: R](f: Info => ZIO[R1, E, A]): Info => ZIO[R1, E, A] } @@ -137,7 +144,9 @@ object Wrapper { * A wrapper that requires an effect to be built. The effect will be run for each query. * @param wrapper an effect that builds a wrapper */ - case class EffectfulWrapper[-R](wrapper: UIO[Wrapper[R]]) extends Wrapper[R] + case class EffectfulWrapper[-R](wrapper: URIO[R, Wrapper[R]]) extends Wrapper[R] + + private case class SuspendedWrapper[-R](wrapper: () => Wrapper[R]) extends Wrapper[R] private[caliban] def wrap[R1 >: R, R, E, A, Info]( process: Info => ZIO[R1, E, A] @@ -154,7 +163,8 @@ object Wrapper { private val emptyWrappers = Exit.succeed((Nil, Nil, Nil, Nil, Nil, Nil)) - private[caliban] def decompose[R](wrappers: List[Wrapper[R]])(implicit trace: Trace): UIO[ + private[caliban] def decompose[R](wrappers: List[Wrapper[R]])(implicit trace: Trace): URIO[ + R, ( List[OverallWrapper[R]], List[ParsingWrapper[R]], @@ -165,32 +175,46 @@ object Wrapper { ) ] = if (wrappers.isEmpty) emptyWrappers - else - ZIO.suspendSucceed { - val o = ListBuffer.empty[OverallWrapper[R]] - val p = ListBuffer.empty[ParsingWrapper[R]] - val v = ListBuffer.empty[ValidationWrapper[R]] - val e = ListBuffer.empty[ExecutionWrapper[R]] - val f = ListBuffer.empty[FieldWrapper[R]] - val i = ListBuffer.empty[IntrospectionWrapper[R]] - - def loop(wrapper: Wrapper[R]): UIO[Unit] = wrapper match { - case wrapper: OverallWrapper[R] => ZIO.succeed(o append wrapper) - case wrapper: ParsingWrapper[R] => ZIO.succeed(p append wrapper) - case wrapper: ValidationWrapper[R] => ZIO.succeed(v append wrapper) - case wrapper: ExecutionWrapper[R] => ZIO.succeed(e append wrapper) - case wrapper: FieldWrapper[R] => ZIO.succeed(f append wrapper) - case wrapper: IntrospectionWrapper[R] => ZIO.succeed(i append wrapper) - case CombinedWrapper(wrappers) => ZIO.foreachDiscard(wrappers)(loop) - case EffectfulWrapper(wrapper) => wrapper.flatMap(loop) - case Wrapper.Empty => ZIO.unit - } - - def finalize[W <: Wrapper[R]](buffer: ListBuffer[W]): List[W] = buffer.sortBy(_.priority).result() - - ZIO - .foreachDiscard(wrappers)(loop) - .as((finalize(o), finalize(p), finalize(v), finalize(e), finalize(f), finalize(i))) + else { + val o = ListBuffer.empty[OverallWrapper[R]] + val p = ListBuffer.empty[ParsingWrapper[R]] + val v = ListBuffer.empty[ValidationWrapper[R]] + val e = ListBuffer.empty[ExecutionWrapper[R]] + val f = ListBuffer.empty[FieldWrapper[R]] + val i = ListBuffer.empty[IntrospectionWrapper[R]] + + def loop(wrapper: Wrapper[R]): Option[URIO[R, Unit]] = wrapper match { + case wrapper: OverallWrapper[R] => o append wrapper; None + case wrapper: ParsingWrapper[R] => p append wrapper; None + case wrapper: ValidationWrapper[R] => v append wrapper; None + case wrapper: ExecutionWrapper[R] => e append wrapper; None + case wrapper: FieldWrapper[R] => f append wrapper; None + case wrapper: IntrospectionWrapper[R] => i append wrapper; None + case SuspendedWrapper(f) => loop(f()) + case CombinedWrapper(wrappers) => + wrappers.flatMap(loop) match { + case Nil => None + case fs => Some(ZIO.collectAllDiscard(fs)) + } + case EffectfulWrapper(wrapper) => + Some(wrapper.flatMap { + loop(_) match { + case None => Exit.unit + case Some(w) => w + } + }) + case Wrapper.Empty => None } + def finalize[W <: Wrapper[R]](buffer: ListBuffer[W]): List[W] = buffer.sortBy(_.priority).result() + + def result() = + (finalize(o), finalize(p), finalize(v), finalize(e), finalize(f), finalize(i)) + + wrappers.flatMap(loop) match { + case Nil => Exit.succeed(result()) + case fs => ZIO.collectAllDiscard(fs).as(result()) + } + } + } diff --git a/core/src/main/scala/caliban/wrappers/Wrappers.scala b/core/src/main/scala/caliban/wrappers/Wrappers.scala index 0c418039c..ab1744d6d 100644 --- a/core/src/main/scala/caliban/wrappers/Wrappers.scala +++ b/core/src/main/scala/caliban/wrappers/Wrappers.scala @@ -5,7 +5,7 @@ import caliban.Value.NullValue import caliban.execution.{ ExecutionRequest, Field, FieldInfo } import caliban.parsing.adt.{ Directive, Document } import caliban.wrappers.Wrapper.{ FieldWrapper, OverallWrapper, ValidationWrapper } -import caliban.{ CalibanError, Configurator, GraphQLRequest, GraphQLResponse, ResponseValue } +import caliban._ import zio.Console.{ printLine, printLineError } import zio._ import zio.metrics.MetricKeyType.Histogram diff --git a/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala b/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala index d599d0a32..ff5583d49 100644 --- a/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala +++ b/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala @@ -7,7 +7,7 @@ import caliban.schema.ArgBuilder.auto._ import caliban.schema.Schema.auto._ import zio._ import zio.metrics.Metric -import zio.metrics.{ Metric, MetricLabel, MetricState } +import zio.metrics.{ MetricLabel, MetricState } import zio.test._ object FieldMetricsSpec extends ZIOSpecDefault { diff --git a/core/src/test/scala/caliban/wrappers/WrappersSpec.scala b/core/src/test/scala/caliban/wrappers/WrappersSpec.scala index 1ffa5f254..fb911785f 100644 --- a/core/src/test/scala/caliban/wrappers/WrappersSpec.scala +++ b/core/src/test/scala/caliban/wrappers/WrappersSpec.scala @@ -1,32 +1,28 @@ package caliban.wrappers -import caliban._ import caliban.CalibanError.{ ExecutionError, ValidationError } import caliban.InputValue.ObjectValue import caliban.Macros.gqldoc import caliban.TestUtils._ import caliban.Value.{ IntValue, StringValue } +import caliban._ import caliban.execution.{ ExecutionRequest, FieldInfo } import caliban.introspection.adt.{ __Directive, __DirectiveLocation } import caliban.parsing.adt.{ Directive, Document, LocationInfo } import caliban.schema.Annotations.GQLDirective -import caliban.schema.{ ArgBuilder, GenericSchema, Schema } import caliban.schema.Schema.auto._ -import caliban.wrappers.ApolloPersistedQueries.apolloPersistedQueries +import caliban.schema.{ ArgBuilder, GenericSchema, Schema } +import caliban.wrappers.ApolloPersistedQueries.ApolloPersistence import caliban.wrappers.Wrapper.{ CombinedWrapper, ExecutionWrapper, FieldWrapper, ValidationWrapper } import caliban.wrappers.Wrappers._ import io.circe.syntax._ import zio._ import zio.query.ZQuery -import zio.test.Assertion._ import zio.test._ -import scala.annotation.nowarn import scala.language.postfixOps -@nowarn("msg=deprecated") object WrappersSpec extends ZIOSpecDefault { - import caliban.wrappers.ApolloCaching.GQLCacheControl override def spec = suite("WrappersSpec")( @@ -272,51 +268,6 @@ object WrappersSpec extends ZIOSpecDefault { } ) }, - test("Apollo Caching") { - case class Query(@GQLCacheControl(maxAge = Some(10.seconds)) hero: Hero) - - @GQLCacheControl(maxAge = Some(2.seconds)) - case class Hero(name: UIO[String], friends: List[Hero] = Nil) - - object schema extends GenericSchema[Any] { - implicit lazy val heroSchema: Schema[Any, Hero] = gen - def api = - graphQL( - RootResolver( - Query( - Hero( - ZIO.succeed("R2-D2"), - List( - Hero(ZIO.succeed("Luke Skywalker")), - Hero(ZIO.succeed("Han Solo")), - Hero(ZIO.succeed("Leia Organa")) - ) - ) - ) - ) - ) @@ ApolloCaching.apolloCaching - } - - val query = gqldoc(""" - { - hero { - name - friends { - name - } - } - }""") - for { - interpreter <- schema.api.interpreter - result <- interpreter.execute(query).map(_.extensions.map(_.toString)) - } yield assert(result)( - isSome( - equalTo( - "{\"cacheControl\":{\"version\":1,\"hints\":[{\"path\":[\"hero\"],\"maxAge\":10,\"scope\":\"PRIVATE\"}]}}" - ) - ) - ) - }, suite("Apollo Persisted Queries")({ def mockWrapper[R](fail: Ref[Boolean]): ValidationWrapper[R] = new ValidationWrapper[R] { override def wrap[R1 <: R]( @@ -342,14 +293,14 @@ object WrappersSpec extends ZIOSpecDefault { case class Test(test: String) (for { - interpreter <- (graphQL(RootResolver(Test("ok"))) @@ apolloPersistedQueries).interpreter + interpreter <- + (graphQL(RootResolver(Test("ok"))) @@ ApolloPersistedQueries.wrapper).interpreter result <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"))) } yield assertTrue(result.asJson.noSpaces == """{"data":{"test":"ok"}}""")) - .provide(ApolloPersistedQueries.live) }, test("hash not found") { case class Test(test: String) - val interpreter = (graphQL(RootResolver(Test("ok"))) @@ apolloPersistedQueries).interpreter + val interpreter = (graphQL(RootResolver(Test("ok"))) @@ ApolloPersistedQueries.wrapper).interpreter interpreter .flatMap( _.executeRequest( @@ -363,30 +314,28 @@ object WrappersSpec extends ZIOSpecDefault { response.asJson.noSpaces == """{"data":null,"errors":[{"message":"PersistedQueryNotFound"}]}""" ) } - .provide(ApolloPersistedQueries.live) }, test("cache poisoning") { case class Test(test: String, malicious: String) (for { - interpreter <- (graphQL(RootResolver(Test("ok", "malicious"))) @@ apolloPersistedQueries).interpreter + interpreter <- + (graphQL(RootResolver(Test("ok", "malicious"))) @@ ApolloPersistedQueries.wrapper).interpreter // The hash for the query "{test}" attempting to poison the cache by passing in a different query r1 <- interpreter.executeRequest(GraphQLRequest(query = Some("{malicious}"), extensions = extensions)) r2 <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield assertTrue( r1.asJson.noSpaces == """{"data":null,"errors":[{"message":"Provided sha does not match any query"}]}""" ) && assertTrue(r2.asJson.noSpaces == """{"data":null,"errors":[{"message":"PersistedQueryNotFound"}]}""")) - .provideLayer(ApolloPersistedQueries.live) }, test("hash found") { case class Test(test: String) (for { - interpreter <- (graphQL(RootResolver(Test("ok"))) @@ apolloPersistedQueries).interpreter + interpreter <- (graphQL(RootResolver(Test("ok"))) @@ ApolloPersistedQueries.wrapper).interpreter _ <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) result <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield assertTrue(result.asJson.noSpaces == """{"data":{"test":"ok"}}""")) - .provide(ApolloPersistedQueries.live) }, test("executes first") { case class Test(test: String) @@ -395,12 +344,11 @@ object WrappersSpec extends ZIOSpecDefault { shouldFail <- Ref.make(false) interpreter <- (graphQL(RootResolver(Test("ok"))) @@ - mockWrapper(shouldFail) @@ apolloPersistedQueries @@ mockWrapper(shouldFail)).interpreter + mockWrapper(shouldFail) @@ ApolloPersistedQueries.wrapper @@ mockWrapper(shouldFail)).interpreter _ <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) _ <- shouldFail.set(true) result <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield assertTrue(result.asJson.noSpaces == """{"data":{"test":"ok"}}""")) - .provide(ApolloPersistedQueries.live) }, test("does not register successful validation if another validation wrapper fails") { case class Test(test: String) @@ -409,7 +357,7 @@ object WrappersSpec extends ZIOSpecDefault { shouldFail <- Ref.make(true) interpreter <- (graphQL(RootResolver(Test("ok"))) @@ - mockWrapper(shouldFail) @@ apolloPersistedQueries @@ mockWrapper(shouldFail)).interpreter + mockWrapper(shouldFail) @@ ApolloPersistedQueries.wrapper @@ mockWrapper(shouldFail)).interpreter first <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) second <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield { @@ -418,7 +366,6 @@ object WrappersSpec extends ZIOSpecDefault { second.asJson.noSpaces == """{"data":null,"errors":[{"message":"PersistedQueryNotFound"}]}""" ) }) - .provide(ApolloPersistedQueries.live) }, test("invalid / missing variables in cached query") { case class TestInput(testField: String) @@ -441,7 +388,7 @@ object WrappersSpec extends ZIOSpecDefault { (for { interpreter <- - (graphQL(RootResolver(Test(_.testField))) @@ apolloPersistedQueries).interpreter + (graphQL(RootResolver(Test(_.testField))) @@ ApolloPersistedQueries.wrapper).interpreter validTest <- interpreter.executeRequest( GraphQLRequest(query = Some(query), variables = Some(validVariables), extensions = extensions) @@ -457,7 +404,6 @@ object WrappersSpec extends ZIOSpecDefault { assertTrue( missingVariableTest.asJson.noSpaces == """{"data":null,"errors":[{"message":"Variable 'testField' is null but is specified to be non-null."}]}""" )) - .provide(ApolloPersistedQueries.live) } ) }), diff --git a/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala b/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala index 1f3cdce31..2168ac6e7 100644 --- a/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala +++ b/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala @@ -5,6 +5,7 @@ import caliban.ResponseValue.ObjectValue import caliban.Value.{ IntValue, StringValue } import caliban._ import caliban.execution.FieldInfo +import caliban.wrappers.Wrapper import caliban.wrappers.Wrapper.{ EffectfulWrapper, FieldWrapper, OverallWrapper } import com.google.protobuf.timestamp.Timestamp import mdg.engine.proto.reports.Trace @@ -28,14 +29,14 @@ object ApolloFederatedTracing { * WARNING: Use this with caution as it could potentially cause issues if the tracing client expects all queried fields to be included in the traces */ def wrapper(excludePureFields: Boolean = false): EffectfulWrapper[Any] = - EffectfulWrapper( - for { - tracing <- ZIO.succeed(new AtomicReference(Tracing(NodeTrie.empty))) - enabled <- ZIO.succeed(new AtomicReference(false)) - clock <- ZIO.clock - } yield apolloTracingOverall(clock, tracing, enabled) |+| - Unsafe.unsafe(implicit u => apolloTracingField(clock.unsafe.nanoTime(), tracing, enabled, !excludePureFields)) - ) + EffectfulWrapper { + ZIO.clock.map { clock => + val tracing = new AtomicReference(Tracing(NodeTrie.empty)) + val enabled = new AtomicReference(false) + apolloTracingOverall(clock, tracing, enabled) |+| + apolloTracingField(clock.unsafe, tracing, enabled, !excludePureFields) + } + } private def toTimestamp(epochMilli: Long): Timestamp = Timestamp.of( @@ -92,15 +93,16 @@ object ApolloFederatedTracing { } private def apolloTracingField( - nanoTime: => Long, + clock: Clock#UnsafeAPI, ref: AtomicReference[Tracing], enabled: AtomicReference[Boolean], wrapPureValues: Boolean ): FieldWrapper[Any] = new FieldWrapper[Any](wrapPureValues) { + import caliban.implicits.unsafe private def updateState(startTime: Long, fieldInfo: FieldInfo, error: Option[ExecutionError]): Unit = { - val endTime = nanoTime + val endTime = clock.nanoTime() val path = (PathValue.Key(fieldInfo.name) :: fieldInfo.path).toVector val _ = ref.updateAndGet(state => state.copy( @@ -131,7 +133,7 @@ object ApolloFederatedTracing { ): ZQuery[R1, CalibanError.ExecutionError, ResponseValue] = if (enabled.get()) ZQuery.suspend { - val startTime = nanoTime + val startTime = clock.nanoTime() query.foldQuery( error => ZQuery.fail {