From 52e2587ea3c4a0d72a2f0b37cf19c7ced4968f11 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sat, 29 Jun 2024 15:05:23 +1000 Subject: [PATCH 1/4] Add suspended wrappers and reimplement APQs --- build.sbt | 5 +- core/src/main/scala/caliban/implicits.scala | 7 + .../caliban/wrappers/ApolloCaching.scala | 161 ------------------ .../wrappers/ApolloPersistedQueries.scala | 92 ++++++---- .../caliban/wrappers/ApolloTracing.scala | 10 +- .../caliban/wrappers/CostEstimation.scala | 16 +- .../scala/caliban/wrappers/FieldMetrics.scala | 43 +++-- .../main/scala/caliban/wrappers/Wrapper.scala | 82 +++++---- .../scala/caliban/wrappers/Wrappers.scala | 3 +- .../caliban/wrappers/FieldMetricsSpec.scala | 54 +++--- .../scala/caliban/wrappers/WrappersSpec.scala | 78 ++------- .../tracing/ApolloFederatedTracing.scala | 26 +-- .../tracing/FederationTracingSpec.scala | 28 +-- 13 files changed, 228 insertions(+), 377 deletions(-) create mode 100644 core/src/main/scala/caliban/implicits.scala delete mode 100644 core/src/main/scala/caliban/wrappers/ApolloCaching.scala diff --git a/build.sbt b/build.sbt index 9a073c4fb9..e8ee3a4d6c 100644 --- a/build.sbt +++ b/build.sbt @@ -742,7 +742,10 @@ lazy val enableMimaSettingsJVM = Def.settings( mimaFailOnProblem := enforceMimaCompatibility, mimaPreviousArtifacts := previousStableVersion.value.map(organization.value %% moduleName.value % _).toSet, - mimaBinaryIssueFilters := Seq() + mimaBinaryIssueFilters := Seq( + ProblemFilters.exclude[Problem]("caliban.wrappers.*"), + ProblemFilters.exclude[DirectMissingMethodProblem]("caliban.federation.tracing.ApolloFederatedTracing.wrapper") + ) ) lazy val enableMimaSettingsJS = diff --git a/core/src/main/scala/caliban/implicits.scala b/core/src/main/scala/caliban/implicits.scala new file mode 100644 index 0000000000..9a19ee83e5 --- /dev/null +++ b/core/src/main/scala/caliban/implicits.scala @@ -0,0 +1,7 @@ +package caliban + +import zio.Unsafe + +private object implicits { + implicit val unsafe: Unsafe = Unsafe.unsafe(identity) +} diff --git a/core/src/main/scala/caliban/wrappers/ApolloCaching.scala b/core/src/main/scala/caliban/wrappers/ApolloCaching.scala deleted file mode 100644 index 3310d89958..0000000000 --- a/core/src/main/scala/caliban/wrappers/ApolloCaching.scala +++ /dev/null @@ -1,161 +0,0 @@ -package caliban.wrappers - -import caliban.CalibanError.ExecutionError -import caliban.ResponseValue.{ ListValue, ObjectValue } -import caliban.Value.{ EnumValue, IntValue, StringValue } -import caliban.execution.FieldInfo -import caliban.parsing.adt.Directive -import caliban.schema.Annotations.GQLDirective -import caliban.wrappers.Wrapper.{ EffectfulWrapper, FieldWrapper, OverallWrapper } -import caliban._ -import zio._ -import zio.query.ZQuery - -import java.util.concurrent.TimeUnit - -/** - * Returns a wrapper which applies apollo caching response extensions - */ -@deprecated("Use `caliban.wrappers.Caching` for a more flexible implementation", "2.4.0") -object ApolloCaching { - - private val directiveName = "cacheControl" - - @deprecated("Use `caliban.wrappers.Caching` for a more flexible implementation", "2.4.0") - case class GQLCacheControl(maxAge: Option[Duration] = None, scope: Option[CacheScope] = None) - extends GQLDirective(CacheControl(scope, maxAge)) - - object CacheControl { - - def apply(scope: Option[CacheScope], maxAge: Option[Duration]): Directive = - (scope, maxAge) match { - case (Some(scope), Some(age)) => apply(age, scope) - case (None, Some(age)) => apply(age) - case (Some(scope), None) => apply(scope) - case _ => Directive(directiveName, Map.empty) - } - - def apply(scope: ApolloCaching.CacheScope): Directive = - Directive(directiveName, Map("scope" -> EnumValue(scope.toString))) - - def apply(maxAge: Duration): Directive = - Directive(directiveName, Map("maxAge" -> IntValue(maxAge.toMillis / 1000))) - - def apply(maxAge: Duration, scope: ApolloCaching.CacheScope): Directive = - Directive(directiveName, Map("maxAge" -> IntValue(maxAge.toMillis / 1000), "scope" -> EnumValue(scope.toString))) - - } - - val apolloCaching: EffectfulWrapper[Any] = - EffectfulWrapper( - Ref.make(Caching()).map(ref => apolloCachingOverall(ref) |+| apolloCachingField(ref)) - ) - - sealed trait CacheScope - - object CacheScope { - - case object Private extends CacheScope { - override def toString: String = "PRIVATE" - } - - case object Public extends CacheScope { - override def toString: String = "PUBLIC" - } - - } - - case class CacheHint( - fieldName: String = "", - path: List[PathValue] = Nil, - maxAge: Duration, - scope: CacheScope - ) { - - def toResponseValue: ResponseValue = - ObjectValue( - List( - "path" -> ListValue((PathValue.Key(fieldName) :: path).reverse), - "maxAge" -> IntValue(maxAge.toMillis / 1000), - "scope" -> StringValue(scope match { - case CacheScope.Private => "PRIVATE" - case CacheScope.Public => "PUBLIC" - }) - ) - ) - - } - - case class Caching( - version: Int = 1, - hints: List[CacheHint] = List.empty - ) { - - def toResponseValue: ResponseValue = - ObjectValue(List("version" -> IntValue(version), "hints" -> ListValue(hints.map(_.toResponseValue)))) - } - - case class CacheDirective(scope: Option[CacheScope] = None, maxAge: Option[Duration] = None) - - private def extractCacheDirective(directives: List[Directive]): Option[CacheDirective] = - directives.collectFirst { - case d if d.name == directiveName => - val scope = d.arguments.get("scope").collectFirst { - case StringValue("PRIVATE") | EnumValue("PRIVATE") => CacheScope.Private - case StringValue("PUBLIC") | EnumValue("PUBLIC") => CacheScope.Public - } - - val maxAge = d.arguments.get("maxAge").collectFirst { case i: IntValue => - Duration(i.toLong, TimeUnit.SECONDS) - } - - CacheDirective(scope, maxAge) - } - - private def apolloCachingOverall(ref: Ref[Caching]): OverallWrapper[Any] = - new OverallWrapper[Any] { - def wrap[R1 <: Any]( - process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] - ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = - (request: GraphQLRequest) => - for { - result <- process(request) - cache <- ref.get - } yield result.copy( - extensions = Some( - ObjectValue( - ("cacheControl" -> cache.toResponseValue) :: result.extensions.fold( - List.empty[(String, ResponseValue)] - )(_.fields) - ) - ) - ) - } - - private def apolloCachingField(ref: Ref[Caching]): FieldWrapper[Any] = - new FieldWrapper(true) { - def wrap[R1 <: Any]( - query: ZQuery[R1, ExecutionError, ResponseValue], - fieldInfo: FieldInfo - ): ZQuery[R1, ExecutionError, ResponseValue] = { - val cacheDirectives = extractCacheDirective( - fieldInfo.directives ++ fieldInfo.details.fieldType.ofType.flatMap(_.directives).getOrElse(Nil) - ) - - cacheDirectives.foldLeft(query) { case (q, cacheDirective) => - q <* ZQuery.fromZIO( - ref.update(state => - state.copy( - hints = CacheHint( - path = fieldInfo.path, - fieldName = fieldInfo.name, - maxAge = cacheDirective.maxAge getOrElse Duration.Zero, - scope = cacheDirective.scope getOrElse CacheScope.Private - ) :: state.hints - ) - ) - ) - } - } - } -} diff --git a/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala b/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala index 31bd691fdf..e6e6f61b62 100644 --- a/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala +++ b/core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala @@ -10,6 +10,7 @@ import zio._ import java.nio.charset.StandardCharsets import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable object ApolloPersistedQueries { @@ -26,44 +27,48 @@ object ApolloPersistedQueries { def add(hash: String, query: Document): ZIO[ApolloPersistence, Nothing, Unit] = ZIO.serviceWithZIO[ApolloPersistence](_.add(hash, query)) - val live: UIO[ApolloPersistence] = - ZIO.succeed(new ConcurrentHashMap[String, Document]()).map { docCache => - new ApolloPersistence { - override def get(hash: String): UIO[Option[Document]] = ZIO.succeed(Option(docCache.get(hash))) - override def add(hash: String, query: Document): UIO[Unit] = ZIO.succeed(docCache.put(hash, query)).unit - } - } + val live: UIO[ApolloPersistence] = ZIO.succeed(new Live)(Trace.empty) + + final class Live extends ApolloPersistence { + private implicit val trace: Trace = Trace.empty + private val docCache: ConcurrentHashMap[String, Document] = new ConcurrentHashMap[String, Document]() + + override def get(hash: String): UIO[Option[Document]] = ZIO.succeed(Option(docCache.get(hash))) + override def add(hash: String, query: Document): UIO[Unit] = ZIO.succeed(docCache.put(hash, query)).unit + } + } val live: Layer[Nothing, ApolloPersistence] = ZLayer(ApolloPersistence.live) private def parsingWrapper( - docVar: Promise[Nothing, Option[(String, Option[Document])]] - ): ParsingWrapper[ApolloPersistence] = - new ParsingWrapper[ApolloPersistence] { - override def wrap[R1 <: ApolloPersistence]( + docVar: AtomicReference[Option[(String, Option[Document])]] + ): ParsingWrapper[Any] = + new ParsingWrapper[Any] { + override def wrap[R1 <: Any]( f: String => ZIO[R1, CalibanError.ParsingError, Document] ): String => ZIO[R1, CalibanError.ParsingError, Document] = (query: String) => - docVar.await.flatMap { - case Some((_, Some(doc))) => ZIO.succeed(doc) + docVar.get() match { + case Some((_, Some(doc))) => Exit.succeed(doc) case _ => f(query) } } private def validationWrapper( - docVar: Promise[Nothing, Option[(String, Option[Document])]] - ): ValidationWrapper[ApolloPersistence] = - new ValidationWrapper[ApolloPersistence] { + store: ApolloPersistence, + docVar: AtomicReference[Option[(String, Option[Document])]] + ): ValidationWrapper[Any] = + new ValidationWrapper[Any] { override val priority: Int = 100 - override def wrap[R1 <: ApolloPersistence]( + override def wrap[R1 <: Any]( f: Document => ZIO[R1, ValidationError, ExecutionRequest] ): Document => ZIO[R1, ValidationError, ExecutionRequest] = (doc: Document) => - docVar.await.flatMap { + docVar.get() match { case Some((_, Some(_))) => Configurator.ref.locallyWith(_.copy(skipValidation = true))(f(doc)) - case Some((hash, None)) => f(doc) <* ApolloPersistence.add(hash, doc) + case Some((hash, None)) => f(doc) <* store.add(hash, doc) case None => f(doc) } } @@ -74,40 +79,61 @@ object ApolloPersistedQueries { * value to `(_, None)` which will then get passed to the parsing wrapper where it will populate the cache with the validated query document */ private def overallWrapper( - docVar: Promise[Nothing, Option[(String, Option[Document])]] - ): OverallWrapper[ApolloPersistence] = - new OverallWrapper[ApolloPersistence] { - def wrap[R1 <: ApolloPersistence]( + store: ApolloPersistence, + docVar: AtomicReference[Option[(String, Option[Document])]] + ): OverallWrapper[Any] = + new OverallWrapper[Any] { + def wrap[R1 <: Any]( process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = (request: GraphQLRequest) => readHash(request) match { case Some(hash) => - ApolloPersistence + store .get(hash) .flatMap { - case Some(doc) => docVar.succeed(Some((hash, Some(doc)))) as request + case Some(doc) => + docVar.set(Some((hash, Some(doc)))) + Exit.succeed(request) case None => request.query match { - case Some(value) if checkHash(hash, value) => docVar.succeed(Some((hash, None))).as(request) - case Some(_) => ZIO.fail(ValidationError("Provided sha does not match any query", "")) - case None => ZIO.fail(ValidationError("PersistedQueryNotFound", "")) + case Some(value) if checkHash(hash, value) => + docVar.set(Some((hash, None))) + Exit.succeed(request) + case Some(_) => Exit.fail(ValidationError("Provided sha does not match any query", "")) + case None => Exit.fail(ValidationError("PersistedQueryNotFound", "")) } } .flatMap(process) .catchAll(ex => Exit.succeed(GraphQLResponse(NullValue, List(ex)))) - case None => docVar.succeed(None) *> process(request) + case None => process(request) } } + @deprecated("Use `wrapper` instead and pass the cache explicitly", "2.9.0") + val apolloPersistedQueries: EffectfulWrapper[ApolloPersistence] = + EffectfulWrapper(ZIO.serviceWith[ApolloPersistence](wrapper)) + /** * Returns a wrapper that persists and retrieves queries based on a hash * following Apollo Persisted Queries spec: https://github.com/apollographql/apollo-link-persisted-queries. + * + * This wrapper will initialize a non-expiring cache which will be used for all queries + * + * @see Overloaded method for a variant that allows using a custom cache */ - val apolloPersistedQueries: EffectfulWrapper[ApolloPersistence] = - EffectfulWrapper(Promise.make[Nothing, Option[(String, Option[Document])]].map { docVar => - overallWrapper(docVar) |+| parsingWrapper(docVar) |+| validationWrapper(docVar) - }) + def wrapper: Wrapper[Any] = wrapper(new ApolloPersistence.Live) + + /** + * Returns a wrapper that persists and retrieves queries based on a hash + * following Apollo Persisted Queries spec: https://github.com/apollographql/apollo-link-persisted-queries. + * + * @param cache the query cache that will be used to store the parsed documents + */ + def wrapper(cache: ApolloPersistence): Wrapper[Any] = Wrapper.suspend { + val ref = new AtomicReference[Option[(String, Option[Document])]](None) + overallWrapper(cache, ref) |+| parsingWrapper(ref) |+| validationWrapper(cache, ref) + } private def readHash(request: GraphQLRequest): Option[String] = request.extensions diff --git a/core/src/main/scala/caliban/wrappers/ApolloTracing.scala b/core/src/main/scala/caliban/wrappers/ApolloTracing.scala index 58a1928e2f..6b9f9fc0cc 100644 --- a/core/src/main/scala/caliban/wrappers/ApolloTracing.scala +++ b/core/src/main/scala/caliban/wrappers/ApolloTracing.scala @@ -39,7 +39,7 @@ object ApolloTracing { } yield apolloTracingOverall(clock, ref) |+| apolloTracingParsing(clock, ref) |+| apolloTracingValidation(clock, ref) |+| - Unsafe.unsafe(implicit u => apolloTracingField(clock.unsafe.nanoTime(), ref, !excludePureFields)) + apolloTracingField(clock.unsafe, ref, !excludePureFields) ) .someOrElse(Wrapper.empty) ) @@ -196,19 +196,21 @@ object ApolloTracing { } private def apolloTracingField( - nanoTime: => Long, + clock: Clock#UnsafeAPI, ref: AtomicReference[Tracing], wrapPureValues: Boolean ): FieldWrapper[Any] = new FieldWrapper[Any](wrapPureValues) { + import caliban.implicits.unsafe + def wrap[R1]( query: ZQuery[R1, CalibanError.ExecutionError, ResponseValue], fieldInfo: FieldInfo ): ZQuery[R1, CalibanError.ExecutionError, ResponseValue] = ZQuery.suspend { - val start = nanoTime + val start = clock.nanoTime() query.map { result => - val end = nanoTime + val end = clock.nanoTime() val _ = ref.updateAndGet(state => state.copy( execution = state.execution.copy( diff --git a/core/src/main/scala/caliban/wrappers/CostEstimation.scala b/core/src/main/scala/caliban/wrappers/CostEstimation.scala index abc008a926..466fbb17f5 100644 --- a/core/src/main/scala/caliban/wrappers/CostEstimation.scala +++ b/core/src/main/scala/caliban/wrappers/CostEstimation.scala @@ -8,9 +8,8 @@ import caliban.execution.{ ExecutionRequest, Field } import caliban.parsing.adt.{ Directive, Document } import caliban.schema.Annotations.GQLDirective import caliban.schema.Types -import caliban.validation.Validator -import caliban.wrappers.Wrapper.{ EffectfulWrapper, OverallWrapper, ValidationWrapper } -import caliban.{ CalibanError, Configurator, GraphQLRequest, GraphQLResponse, ResponseValue } +import caliban.wrappers.Wrapper.{ OverallWrapper, SuspendedWrapper, ValidationWrapper } +import caliban._ import zio.{ Ref, UIO, URIO, ZIO } import scala.annotation.tailrec @@ -155,11 +154,12 @@ object CostEstimation { costWrapper: Ref[Double] => Wrapper[R] )( p: (Double, GraphQLResponse[CalibanError]) => URIO[R, GraphQLResponse[CalibanError]] - ): Wrapper[R] = EffectfulWrapper( - Ref.make(0.0).map { cost => - costWrapper(cost) |+| costOverall(resp => cost.get.flatMap(p(_, resp))) - } - ) + ): Wrapper[R] = Wrapper.suspend { + import caliban.implicits.unsafe + + val cost = Ref.unsafe.make(0.0) + costWrapper(cost) |+| costOverall(resp => cost.get.flatMap(p(_, resp))) + } /** * Computes the estimated cost of executing the query using the provided function and compares it to diff --git a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala index 2ad7e0a6ec..e713a99660 100644 --- a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala +++ b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala @@ -11,6 +11,7 @@ import zio.query.ZQuery import java.util.concurrent.atomic.AtomicReference import scala.jdk.CollectionConverters._ +import java.time.{ Clock => JClock } object FieldMetrics { private[caliban] val defaultBuckets = Histogram.Boundaries( @@ -56,16 +57,13 @@ object FieldMetrics { durationLabel: String = "graphql_fields_duration_seconds", buckets: Histogram.Boundaries = defaultBuckets, extraLabels: Set[MetricLabel] = Set.empty - ): Wrapper.EffectfulWrapper[Any] = - Wrapper.EffectfulWrapper( - for { - timings <- ZIO.succeed(new AtomicReference(List.empty[Timing])) - failures <- ZIO.succeed(new AtomicReference(List.empty[String])) - clock <- ZIO.clock - metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels) - } yield overallWrapper(timings, failures, metrics) |+| - Unsafe.unsafe(implicit us => fieldWrapper(clock.unsafe.nanoTime(), timings, failures)) - ) + )(implicit clock: Clock = Clock.ClockLive): Wrapper[Any] = + Wrapper.suspend { + val timings = new AtomicReference(List.empty[Timing]) + val failures = new AtomicReference(List.empty[String]) + val metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels) + overallWrapper(timings, failures, metrics) |+| fieldWrapper(clock.unsafe, timings, failures) + } private def overallWrapper( timings: AtomicReference[List[Timing]], @@ -78,21 +76,20 @@ object FieldMetrics { ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = (request: GraphQLRequest) => process(request) <* - ZIO - .blocking(for { - _ <- ZIO.suspendSucceed(metrics.recordFailures(failures.get)) - ts <- ZIO.succeed(timings.get) - nodeOffsets = resolveNodeOffsets(ts) - _ <- metrics.recordSuccesses(nodeOffsets, ts) - } yield ()) - .forkDaemon + (for { + _ <- ZIO.suspendSucceed(metrics.recordFailures(failures.get)) + ts <- ZIO.succeed(timings.get) + nodeOffsets = resolveNodeOffsets(ts) + _ <- metrics.recordSuccesses(nodeOffsets, ts) + } yield ()).forkDaemon } private def resolveNodeOffsets(timings: List[Timing]): Map[Vector[PathValue], Long] = { val map = new java.util.HashMap[Vector[PathValue], Long]() + val nil = Nil var remaining = timings - while (!remaining.isEmpty) { + while (remaining ne nil) { val t = remaining.head val iter = t.path.inits val duration = t.duration @@ -118,11 +115,13 @@ object FieldMetrics { } private def fieldWrapper( - nanoTime: => Long, + clock: Clock#UnsafeAPI, timings: AtomicReference[List[Timing]], failures: AtomicReference[List[String]] ): Wrapper.FieldWrapper[Any] = new Wrapper.FieldWrapper[Any] { + import caliban.implicits.unsafe + def wrap[R]( query: ZQuery[R, CalibanError.ExecutionError, ResponseValue], info: FieldInfo @@ -143,7 +142,7 @@ object FieldMetrics { ) ZQuery.suspend { - val st = nanoTime + val st = clock.nanoTime() query.foldQuery( e => ZQuery.fail { @@ -152,7 +151,7 @@ object FieldMetrics { }, result => ZQuery.succeed { - val t = makeTiming(nanoTime - st) + val t = makeTiming(clock.nanoTime() - st) val _ = timings.updateAndGet(t :: _) result } diff --git a/core/src/main/scala/caliban/wrappers/Wrapper.scala b/core/src/main/scala/caliban/wrappers/Wrapper.scala index 91ab95435b..6136390724 100644 --- a/core/src/main/scala/caliban/wrappers/Wrapper.scala +++ b/core/src/main/scala/caliban/wrappers/Wrapper.scala @@ -7,7 +7,7 @@ import caliban.introspection.adt.__Introspection import caliban.parsing.adt.Document import caliban.wrappers.Wrapper.CombinedWrapper import zio.query.ZQuery -import zio.{ Exit, Trace, UIO, ZIO } +import zio.{ Exit, Trace, URIO, ZIO } import zio.stacktracer.TracingImplicits.disableAutoTrace import scala.annotation.tailrec @@ -55,6 +55,13 @@ object Wrapper { def empty[R]: Wrapper[R] = Empty + /** + * Suspends the creation of a wrapper which allows capturing any side effects in the creation of a wrapper. + * The wrapper will be recreated for each query. + * @param wrapper the wrapper to suspend + */ + def suspend[R](wrapper: => Wrapper[R]): Wrapper[R] = SuspendedWrapper[R](() => wrapper) + sealed trait SimpleWrapper[-R, E, A, Info] extends Wrapper[R] { def wrap[R1 <: R](f: Info => ZIO[R1, E, A]): Info => ZIO[R1, E, A] } @@ -137,7 +144,9 @@ object Wrapper { * A wrapper that requires an effect to be built. The effect will be run for each query. * @param wrapper an effect that builds a wrapper */ - case class EffectfulWrapper[-R](wrapper: UIO[Wrapper[R]]) extends Wrapper[R] + case class EffectfulWrapper[-R](wrapper: URIO[R, Wrapper[R]]) extends Wrapper[R] + + private case class SuspendedWrapper[-R](wrapper: () => Wrapper[R]) extends Wrapper[R] private[caliban] def wrap[R1 >: R, R, E, A, Info]( process: Info => ZIO[R1, E, A] @@ -154,7 +163,8 @@ object Wrapper { private val emptyWrappers = Exit.succeed((Nil, Nil, Nil, Nil, Nil, Nil)) - private[caliban] def decompose[R](wrappers: List[Wrapper[R]])(implicit trace: Trace): UIO[ + private[caliban] def decompose[R](wrappers: List[Wrapper[R]])(implicit trace: Trace): URIO[ + R, ( List[OverallWrapper[R]], List[ParsingWrapper[R]], @@ -165,32 +175,46 @@ object Wrapper { ) ] = if (wrappers.isEmpty) emptyWrappers - else - ZIO.suspendSucceed { - val o = ListBuffer.empty[OverallWrapper[R]] - val p = ListBuffer.empty[ParsingWrapper[R]] - val v = ListBuffer.empty[ValidationWrapper[R]] - val e = ListBuffer.empty[ExecutionWrapper[R]] - val f = ListBuffer.empty[FieldWrapper[R]] - val i = ListBuffer.empty[IntrospectionWrapper[R]] - - def loop(wrapper: Wrapper[R]): UIO[Unit] = wrapper match { - case wrapper: OverallWrapper[R] => ZIO.succeed(o append wrapper) - case wrapper: ParsingWrapper[R] => ZIO.succeed(p append wrapper) - case wrapper: ValidationWrapper[R] => ZIO.succeed(v append wrapper) - case wrapper: ExecutionWrapper[R] => ZIO.succeed(e append wrapper) - case wrapper: FieldWrapper[R] => ZIO.succeed(f append wrapper) - case wrapper: IntrospectionWrapper[R] => ZIO.succeed(i append wrapper) - case CombinedWrapper(wrappers) => ZIO.foreachDiscard(wrappers)(loop) - case EffectfulWrapper(wrapper) => wrapper.flatMap(loop) - case Wrapper.Empty => ZIO.unit - } - - def finalize[W <: Wrapper[R]](buffer: ListBuffer[W]): List[W] = buffer.sortBy(_.priority).result() - - ZIO - .foreachDiscard(wrappers)(loop) - .as((finalize(o), finalize(p), finalize(v), finalize(e), finalize(f), finalize(i))) + else { + val o = ListBuffer.empty[OverallWrapper[R]] + val p = ListBuffer.empty[ParsingWrapper[R]] + val v = ListBuffer.empty[ValidationWrapper[R]] + val e = ListBuffer.empty[ExecutionWrapper[R]] + val f = ListBuffer.empty[FieldWrapper[R]] + val i = ListBuffer.empty[IntrospectionWrapper[R]] + + def loop(wrapper: Wrapper[R]): Option[URIO[R, Unit]] = wrapper match { + case wrapper: OverallWrapper[R] => o append wrapper; None + case wrapper: ParsingWrapper[R] => p append wrapper; None + case wrapper: ValidationWrapper[R] => v append wrapper; None + case wrapper: ExecutionWrapper[R] => e append wrapper; None + case wrapper: FieldWrapper[R] => f append wrapper; None + case wrapper: IntrospectionWrapper[R] => i append wrapper; None + case SuspendedWrapper(f) => loop(f()) + case CombinedWrapper(wrappers) => + wrappers.flatMap(loop) match { + case Nil => None + case fs => Some(ZIO.collectAllDiscard(fs)) + } + case EffectfulWrapper(wrapper) => + Some(wrapper.flatMap { + loop(_) match { + case None => Exit.unit + case Some(w) => w + } + }) + case Wrapper.Empty => None } + def finalize[W <: Wrapper[R]](buffer: ListBuffer[W]): List[W] = buffer.sortBy(_.priority).result() + + def result() = + (finalize(o), finalize(p), finalize(v), finalize(e), finalize(f), finalize(i)) + + wrappers.flatMap(loop) match { + case Nil => Exit.succeed(result()) + case fs => ZIO.collectAllDiscard(fs).as(result()) + } + } + } diff --git a/core/src/main/scala/caliban/wrappers/Wrappers.scala b/core/src/main/scala/caliban/wrappers/Wrappers.scala index 0c418039ca..75d728ac82 100644 --- a/core/src/main/scala/caliban/wrappers/Wrappers.scala +++ b/core/src/main/scala/caliban/wrappers/Wrappers.scala @@ -11,6 +11,7 @@ import zio._ import zio.metrics.MetricKeyType.Histogram import zio.metrics.MetricLabel import zio.query.ZQuery +import java.time.{ Clock => JClock } import scala.annotation.tailrec @@ -172,7 +173,7 @@ object Wrappers { durationLabel: String = "graphql_fields_duration_seconds", buckets: Histogram.Boundaries = FieldMetrics.defaultBuckets, extraLabels: Set[MetricLabel] = Set.empty - ): Wrapper.EffectfulWrapper[Any] = + )(implicit clock: Clock = Clock.ClockLive): Wrapper[Any] = FieldMetrics.wrapper(totalLabel, durationLabel, buckets, extraLabels) private def countFields(rootField: Field): Int = { diff --git a/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala b/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala index d599d0a328..807b4beb7c 100644 --- a/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala +++ b/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala @@ -76,32 +76,34 @@ object FieldMetricsSpec extends ZIOSpecDefault { } } }""") - for { - interpreter <- (api @@ Wrappers.metrics(extraLabels = Set(MetricLabel("test", "success")))).interpreter - _ <- TestClock.adjust(30.seconds) - fiber <- interpreter.execute(query).fork - _ <- TestClock.adjust(30.seconds) - res <- fiber.join - root <- metric.tagged("field", "Queries.person").value - name <- metric.tagged("field", "Person.name").value - friendsName <- metric.tagged("field", "Friend.name").value - totalRoot <- metricTotal.tagged("field", "Queries.person").tagged("status", "ok").value - total <- metricTotal.tagged("field", "Friend.name").tagged("status", "ok").value - totalError <- metricTotal.tagged("field", "Friend.name").tagged("status", "error").value - } yield assertTrue( - res.errors.size == 1, - getCountForDuration(root, 0.025) == 0, - getCountForDuration(root, 0.05) == 1, - getCountForDuration(root, 0.5) == 1, - getCountForDuration(name, 0.075) == 0, - getCountForDuration(name, 0.1) == 1, - getCountForDuration(name, 0.5) == 1, - getCountForDuration(friendsName, 0.25) == 0, - getCountForDuration(friendsName, 0.5) == 3, - totalRoot.count == 1.0, - total.count == 3.0, - totalError.count == 1.0 - ) + ZIO.clockWith { implicit clock => + for { + interpreter <- (api @@ Wrappers.metrics(extraLabels = Set(MetricLabel("test", "success")))).interpreter + _ <- TestClock.adjust(30.seconds) + fiber <- interpreter.execute(query).fork + _ <- TestClock.adjust(30.seconds) + res <- fiber.join + root <- metric.tagged("field", "Queries.person").value + name <- metric.tagged("field", "Person.name").value + friendsName <- metric.tagged("field", "Friend.name").value + totalRoot <- metricTotal.tagged("field", "Queries.person").tagged("status", "ok").value + total <- metricTotal.tagged("field", "Friend.name").tagged("status", "ok").value + totalError <- metricTotal.tagged("field", "Friend.name").tagged("status", "error").value + } yield assertTrue( + res.errors.size == 1, + getCountForDuration(root, 0.025) == 0, + getCountForDuration(root, 0.05) == 1, + getCountForDuration(root, 0.5) == 1, + getCountForDuration(name, 0.075) == 0, + getCountForDuration(name, 0.1) == 1, + getCountForDuration(name, 0.5) == 1, + getCountForDuration(friendsName, 0.25) == 0, + getCountForDuration(friendsName, 0.5) == 3, + totalRoot.count == 1.0, + total.count == 3.0, + totalError.count == 1.0 + ) + } } ) } diff --git a/core/src/test/scala/caliban/wrappers/WrappersSpec.scala b/core/src/test/scala/caliban/wrappers/WrappersSpec.scala index 1ffa5f2548..1c99cb1674 100644 --- a/core/src/test/scala/caliban/wrappers/WrappersSpec.scala +++ b/core/src/test/scala/caliban/wrappers/WrappersSpec.scala @@ -1,32 +1,29 @@ package caliban.wrappers -import caliban._ import caliban.CalibanError.{ ExecutionError, ValidationError } import caliban.InputValue.ObjectValue import caliban.Macros.gqldoc import caliban.TestUtils._ import caliban.Value.{ IntValue, StringValue } +import caliban._ import caliban.execution.{ ExecutionRequest, FieldInfo } import caliban.introspection.adt.{ __Directive, __DirectiveLocation } import caliban.parsing.adt.{ Directive, Document, LocationInfo } import caliban.schema.Annotations.GQLDirective -import caliban.schema.{ ArgBuilder, GenericSchema, Schema } import caliban.schema.Schema.auto._ -import caliban.wrappers.ApolloPersistedQueries.apolloPersistedQueries +import caliban.schema.{ ArgBuilder, GenericSchema, Schema } +import caliban.wrappers.ApolloPersistedQueries.ApolloPersistence import caliban.wrappers.Wrapper.{ CombinedWrapper, ExecutionWrapper, FieldWrapper, ValidationWrapper } import caliban.wrappers.Wrappers._ import io.circe.syntax._ import zio._ import zio.query.ZQuery -import zio.test.Assertion._ import zio.test._ -import scala.annotation.nowarn import scala.language.postfixOps -@nowarn("msg=deprecated") object WrappersSpec extends ZIOSpecDefault { - import caliban.wrappers.ApolloCaching.GQLCacheControl + def newApqWrapper() = ApolloPersistedQueries.wrapper(new ApolloPersistence.Live()) override def spec = suite("WrappersSpec")( @@ -272,51 +269,6 @@ object WrappersSpec extends ZIOSpecDefault { } ) }, - test("Apollo Caching") { - case class Query(@GQLCacheControl(maxAge = Some(10.seconds)) hero: Hero) - - @GQLCacheControl(maxAge = Some(2.seconds)) - case class Hero(name: UIO[String], friends: List[Hero] = Nil) - - object schema extends GenericSchema[Any] { - implicit lazy val heroSchema: Schema[Any, Hero] = gen - def api = - graphQL( - RootResolver( - Query( - Hero( - ZIO.succeed("R2-D2"), - List( - Hero(ZIO.succeed("Luke Skywalker")), - Hero(ZIO.succeed("Han Solo")), - Hero(ZIO.succeed("Leia Organa")) - ) - ) - ) - ) - ) @@ ApolloCaching.apolloCaching - } - - val query = gqldoc(""" - { - hero { - name - friends { - name - } - } - }""") - for { - interpreter <- schema.api.interpreter - result <- interpreter.execute(query).map(_.extensions.map(_.toString)) - } yield assert(result)( - isSome( - equalTo( - "{\"cacheControl\":{\"version\":1,\"hints\":[{\"path\":[\"hero\"],\"maxAge\":10,\"scope\":\"PRIVATE\"}]}}" - ) - ) - ) - }, suite("Apollo Persisted Queries")({ def mockWrapper[R](fail: Ref[Boolean]): ValidationWrapper[R] = new ValidationWrapper[R] { override def wrap[R1 <: R]( @@ -342,14 +294,14 @@ object WrappersSpec extends ZIOSpecDefault { case class Test(test: String) (for { - interpreter <- (graphQL(RootResolver(Test("ok"))) @@ apolloPersistedQueries).interpreter + interpreter <- + (graphQL(RootResolver(Test("ok"))) @@ newApqWrapper()).interpreter result <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"))) } yield assertTrue(result.asJson.noSpaces == """{"data":{"test":"ok"}}""")) - .provide(ApolloPersistedQueries.live) }, test("hash not found") { case class Test(test: String) - val interpreter = (graphQL(RootResolver(Test("ok"))) @@ apolloPersistedQueries).interpreter + val interpreter = (graphQL(RootResolver(Test("ok"))) @@ newApqWrapper()).interpreter interpreter .flatMap( _.executeRequest( @@ -363,30 +315,27 @@ object WrappersSpec extends ZIOSpecDefault { response.asJson.noSpaces == """{"data":null,"errors":[{"message":"PersistedQueryNotFound"}]}""" ) } - .provide(ApolloPersistedQueries.live) }, test("cache poisoning") { case class Test(test: String, malicious: String) (for { - interpreter <- (graphQL(RootResolver(Test("ok", "malicious"))) @@ apolloPersistedQueries).interpreter + interpreter <- (graphQL(RootResolver(Test("ok", "malicious"))) @@ newApqWrapper()).interpreter // The hash for the query "{test}" attempting to poison the cache by passing in a different query r1 <- interpreter.executeRequest(GraphQLRequest(query = Some("{malicious}"), extensions = extensions)) r2 <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield assertTrue( r1.asJson.noSpaces == """{"data":null,"errors":[{"message":"Provided sha does not match any query"}]}""" ) && assertTrue(r2.asJson.noSpaces == """{"data":null,"errors":[{"message":"PersistedQueryNotFound"}]}""")) - .provideLayer(ApolloPersistedQueries.live) }, test("hash found") { case class Test(test: String) (for { - interpreter <- (graphQL(RootResolver(Test("ok"))) @@ apolloPersistedQueries).interpreter + interpreter <- (graphQL(RootResolver(Test("ok"))) @@ newApqWrapper()).interpreter _ <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) result <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield assertTrue(result.asJson.noSpaces == """{"data":{"test":"ok"}}""")) - .provide(ApolloPersistedQueries.live) }, test("executes first") { case class Test(test: String) @@ -395,12 +344,11 @@ object WrappersSpec extends ZIOSpecDefault { shouldFail <- Ref.make(false) interpreter <- (graphQL(RootResolver(Test("ok"))) @@ - mockWrapper(shouldFail) @@ apolloPersistedQueries @@ mockWrapper(shouldFail)).interpreter + mockWrapper(shouldFail) @@ newApqWrapper() @@ mockWrapper(shouldFail)).interpreter _ <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) _ <- shouldFail.set(true) result <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield assertTrue(result.asJson.noSpaces == """{"data":{"test":"ok"}}""")) - .provide(ApolloPersistedQueries.live) }, test("does not register successful validation if another validation wrapper fails") { case class Test(test: String) @@ -409,7 +357,7 @@ object WrappersSpec extends ZIOSpecDefault { shouldFail <- Ref.make(true) interpreter <- (graphQL(RootResolver(Test("ok"))) @@ - mockWrapper(shouldFail) @@ apolloPersistedQueries @@ mockWrapper(shouldFail)).interpreter + mockWrapper(shouldFail) @@ newApqWrapper() @@ mockWrapper(shouldFail)).interpreter first <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) second <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield { @@ -418,7 +366,6 @@ object WrappersSpec extends ZIOSpecDefault { second.asJson.noSpaces == """{"data":null,"errors":[{"message":"PersistedQueryNotFound"}]}""" ) }) - .provide(ApolloPersistedQueries.live) }, test("invalid / missing variables in cached query") { case class TestInput(testField: String) @@ -441,7 +388,7 @@ object WrappersSpec extends ZIOSpecDefault { (for { interpreter <- - (graphQL(RootResolver(Test(_.testField))) @@ apolloPersistedQueries).interpreter + (graphQL(RootResolver(Test(_.testField))) @@ newApqWrapper()).interpreter validTest <- interpreter.executeRequest( GraphQLRequest(query = Some(query), variables = Some(validVariables), extensions = extensions) @@ -457,7 +404,6 @@ object WrappersSpec extends ZIOSpecDefault { assertTrue( missingVariableTest.asJson.noSpaces == """{"data":null,"errors":[{"message":"Variable 'testField' is null but is specified to be non-null."}]}""" )) - .provide(ApolloPersistedQueries.live) } ) }), diff --git a/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala b/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala index 1f3cdce317..575e3ccc22 100644 --- a/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala +++ b/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala @@ -5,7 +5,8 @@ import caliban.ResponseValue.ObjectValue import caliban.Value.{ IntValue, StringValue } import caliban._ import caliban.execution.FieldInfo -import caliban.wrappers.Wrapper.{ EffectfulWrapper, FieldWrapper, OverallWrapper } +import caliban.wrappers.Wrapper +import caliban.wrappers.Wrapper.{ FieldWrapper, OverallWrapper } import com.google.protobuf.timestamp.Timestamp import mdg.engine.proto.reports.Trace import mdg.engine.proto.reports.Trace.{ Error, Location, Node } @@ -27,15 +28,13 @@ object ApolloFederatedTracing { * Setting this to true can help improve performance at the cost of generating incomplete traces. * WARNING: Use this with caution as it could potentially cause issues if the tracing client expects all queried fields to be included in the traces */ - def wrapper(excludePureFields: Boolean = false): EffectfulWrapper[Any] = - EffectfulWrapper( - for { - tracing <- ZIO.succeed(new AtomicReference(Tracing(NodeTrie.empty))) - enabled <- ZIO.succeed(new AtomicReference(false)) - clock <- ZIO.clock - } yield apolloTracingOverall(clock, tracing, enabled) |+| - Unsafe.unsafe(implicit u => apolloTracingField(clock.unsafe.nanoTime(), tracing, enabled, !excludePureFields)) - ) + def wrapper(excludePureFields: Boolean = false)(implicit clock: Clock = Clock.ClockLive): Wrapper[Any] = + Wrapper.suspend { + val tracing = new AtomicReference(Tracing(NodeTrie.empty)) + val enabled = new AtomicReference(false) + apolloTracingOverall(clock, tracing, enabled) |+| + apolloTracingField(clock.unsafe, tracing, enabled, !excludePureFields) + } private def toTimestamp(epochMilli: Long): Timestamp = Timestamp.of( @@ -92,15 +91,16 @@ object ApolloFederatedTracing { } private def apolloTracingField( - nanoTime: => Long, + clock: Clock#UnsafeAPI, ref: AtomicReference[Tracing], enabled: AtomicReference[Boolean], wrapPureValues: Boolean ): FieldWrapper[Any] = new FieldWrapper[Any](wrapPureValues) { + import caliban.implicits.unsafe private def updateState(startTime: Long, fieldInfo: FieldInfo, error: Option[ExecutionError]): Unit = { - val endTime = nanoTime + val endTime = clock.nanoTime() val path = (PathValue.Key(fieldInfo.name) :: fieldInfo.path).toVector val _ = ref.updateAndGet(state => state.copy( @@ -131,7 +131,7 @@ object ApolloFederatedTracing { ): ZQuery[R1, CalibanError.ExecutionError, ResponseValue] = if (enabled.get()) ZQuery.suspend { - val startTime = nanoTime + val startTime = clock.nanoTime() query.foldQuery( error => ZQuery.fail { diff --git a/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala b/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala index 67ce6d65d8..a98d587413 100644 --- a/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala +++ b/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala @@ -33,20 +33,22 @@ object FederationTracingSpec extends ZIOSpecDefault { case class Queries(me: ZQuery[Any, Nothing, User]) - def api(excludePureFields: Boolean) = graphQL( - RootResolver( - Queries( - me = ZQuery.succeed( - User( - "abc123", - ZIO.sleep(100.millis) as Name("my_first", Some("my_last")), - age = 42, - parents = ZIO.succeed(List(Parent("my_parent"))) + def api(excludePureFields: Boolean) = ZIO.clock.map { implicit clock => + graphQL( + RootResolver( + Queries( + me = ZQuery.succeed( + User( + "abc123", + ZIO.sleep(100.millis) as Name("my_first", Some("my_last")), + age = 42, + parents = ZIO.succeed(List(Parent("my_parent"))) + ) ) ) ) - ) - ) @@ ApolloFederatedTracing.wrapper(excludePureFields) + ) @@ ApolloFederatedTracing.wrapper(excludePureFields) + } val query = gqldoc("query { me { id username { first, family: last } parents { name } age } }") val body = ObjectValue( @@ -143,7 +145,7 @@ object FederationTracingSpec extends ZIOSpecDefault { test("disabled by default") { for { _ <- TestClock.setTime(Instant.ofEpochSecond(1)) - interpreter <- api(false).interpreter + interpreter <- api(false).flatMap(_.interpreter) resultFiber <- interpreter.execute(query).fork result <- TestClock.adjust(1.second) *> resultFiber.join } yield assertTrue(result.data == body) && assert(result.extensions)(isNone) @@ -152,7 +154,7 @@ object FederationTracingSpec extends ZIOSpecDefault { def test_(excludePureFields: Boolean, expectedTrace: Trace) = for { _ <- TestClock.setTime(Instant.ofEpochSecond(1)) - interpreter <- api(excludePureFields).interpreter + interpreter <- api(excludePureFields).flatMap(_.interpreter) resultFiber <- interpreter .execute( From a199bdbd066a7f986fab5beb3ad6579322b2ad5c Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sat, 29 Jun 2024 15:11:58 +1000 Subject: [PATCH 2/4] Cleanups --- .../scala/caliban/wrappers/CostEstimation.scala | 4 ++-- .../scala/caliban/wrappers/FieldMetrics.scala | 1 - .../main/scala/caliban/wrappers/Wrappers.scala | 3 +-- .../scala/caliban/wrappers/WrappersSpec.scala | 16 ++++++++-------- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/caliban/wrappers/CostEstimation.scala b/core/src/main/scala/caliban/wrappers/CostEstimation.scala index 466fbb17f5..2e5fc35c84 100644 --- a/core/src/main/scala/caliban/wrappers/CostEstimation.scala +++ b/core/src/main/scala/caliban/wrappers/CostEstimation.scala @@ -4,12 +4,12 @@ import caliban.CalibanError.ValidationError import caliban.InputValue.ListValue import caliban.ResponseValue.ObjectValue import caliban.Value.{ FloatValue, IntValue, StringValue } +import caliban._ import caliban.execution.{ ExecutionRequest, Field } import caliban.parsing.adt.{ Directive, Document } import caliban.schema.Annotations.GQLDirective import caliban.schema.Types -import caliban.wrappers.Wrapper.{ OverallWrapper, SuspendedWrapper, ValidationWrapper } -import caliban._ +import caliban.wrappers.Wrapper.{ OverallWrapper, ValidationWrapper } import zio.{ Ref, UIO, URIO, ZIO } import scala.annotation.tailrec diff --git a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala index e713a99660..7a4714948c 100644 --- a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala +++ b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala @@ -11,7 +11,6 @@ import zio.query.ZQuery import java.util.concurrent.atomic.AtomicReference import scala.jdk.CollectionConverters._ -import java.time.{ Clock => JClock } object FieldMetrics { private[caliban] val defaultBuckets = Histogram.Boundaries( diff --git a/core/src/main/scala/caliban/wrappers/Wrappers.scala b/core/src/main/scala/caliban/wrappers/Wrappers.scala index 75d728ac82..4aeec8ab20 100644 --- a/core/src/main/scala/caliban/wrappers/Wrappers.scala +++ b/core/src/main/scala/caliban/wrappers/Wrappers.scala @@ -5,13 +5,12 @@ import caliban.Value.NullValue import caliban.execution.{ ExecutionRequest, Field, FieldInfo } import caliban.parsing.adt.{ Directive, Document } import caliban.wrappers.Wrapper.{ FieldWrapper, OverallWrapper, ValidationWrapper } -import caliban.{ CalibanError, Configurator, GraphQLRequest, GraphQLResponse, ResponseValue } +import caliban._ import zio.Console.{ printLine, printLineError } import zio._ import zio.metrics.MetricKeyType.Histogram import zio.metrics.MetricLabel import zio.query.ZQuery -import java.time.{ Clock => JClock } import scala.annotation.tailrec diff --git a/core/src/test/scala/caliban/wrappers/WrappersSpec.scala b/core/src/test/scala/caliban/wrappers/WrappersSpec.scala index 1c99cb1674..fb911785fa 100644 --- a/core/src/test/scala/caliban/wrappers/WrappersSpec.scala +++ b/core/src/test/scala/caliban/wrappers/WrappersSpec.scala @@ -23,7 +23,6 @@ import zio.test._ import scala.language.postfixOps object WrappersSpec extends ZIOSpecDefault { - def newApqWrapper() = ApolloPersistedQueries.wrapper(new ApolloPersistence.Live()) override def spec = suite("WrappersSpec")( @@ -295,13 +294,13 @@ object WrappersSpec extends ZIOSpecDefault { (for { interpreter <- - (graphQL(RootResolver(Test("ok"))) @@ newApqWrapper()).interpreter + (graphQL(RootResolver(Test("ok"))) @@ ApolloPersistedQueries.wrapper).interpreter result <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"))) } yield assertTrue(result.asJson.noSpaces == """{"data":{"test":"ok"}}""")) }, test("hash not found") { case class Test(test: String) - val interpreter = (graphQL(RootResolver(Test("ok"))) @@ newApqWrapper()).interpreter + val interpreter = (graphQL(RootResolver(Test("ok"))) @@ ApolloPersistedQueries.wrapper).interpreter interpreter .flatMap( _.executeRequest( @@ -320,7 +319,8 @@ object WrappersSpec extends ZIOSpecDefault { case class Test(test: String, malicious: String) (for { - interpreter <- (graphQL(RootResolver(Test("ok", "malicious"))) @@ newApqWrapper()).interpreter + interpreter <- + (graphQL(RootResolver(Test("ok", "malicious"))) @@ ApolloPersistedQueries.wrapper).interpreter // The hash for the query "{test}" attempting to poison the cache by passing in a different query r1 <- interpreter.executeRequest(GraphQLRequest(query = Some("{malicious}"), extensions = extensions)) r2 <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) @@ -332,7 +332,7 @@ object WrappersSpec extends ZIOSpecDefault { case class Test(test: String) (for { - interpreter <- (graphQL(RootResolver(Test("ok"))) @@ newApqWrapper()).interpreter + interpreter <- (graphQL(RootResolver(Test("ok"))) @@ ApolloPersistedQueries.wrapper).interpreter _ <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) result <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield assertTrue(result.asJson.noSpaces == """{"data":{"test":"ok"}}""")) @@ -344,7 +344,7 @@ object WrappersSpec extends ZIOSpecDefault { shouldFail <- Ref.make(false) interpreter <- (graphQL(RootResolver(Test("ok"))) @@ - mockWrapper(shouldFail) @@ newApqWrapper() @@ mockWrapper(shouldFail)).interpreter + mockWrapper(shouldFail) @@ ApolloPersistedQueries.wrapper @@ mockWrapper(shouldFail)).interpreter _ <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) _ <- shouldFail.set(true) result <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) @@ -357,7 +357,7 @@ object WrappersSpec extends ZIOSpecDefault { shouldFail <- Ref.make(true) interpreter <- (graphQL(RootResolver(Test("ok"))) @@ - mockWrapper(shouldFail) @@ newApqWrapper() @@ mockWrapper(shouldFail)).interpreter + mockWrapper(shouldFail) @@ ApolloPersistedQueries.wrapper @@ mockWrapper(shouldFail)).interpreter first <- interpreter.executeRequest(GraphQLRequest(query = Some("{test}"), extensions = extensions)) second <- interpreter.executeRequest(GraphQLRequest(extensions = extensions)) } yield { @@ -388,7 +388,7 @@ object WrappersSpec extends ZIOSpecDefault { (for { interpreter <- - (graphQL(RootResolver(Test(_.testField))) @@ newApqWrapper()).interpreter + (graphQL(RootResolver(Test(_.testField))) @@ ApolloPersistedQueries.wrapper).interpreter validTest <- interpreter.executeRequest( GraphQLRequest(query = Some(query), variables = Some(validVariables), extensions = extensions) From c147222f14f5bcecf847202ae9f9a9353cc6a799 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Mon, 1 Jul 2024 19:37:50 +1000 Subject: [PATCH 3/4] Revert changes to wrappers with Clock and add back ApolloCaching --- build.sbt | 5 +- .../caliban/wrappers/ApolloCaching.scala | 161 ++++++++++++++++++ .../scala/caliban/wrappers/FieldMetrics.scala | 16 +- .../scala/caliban/wrappers/Wrappers.scala | 2 +- .../caliban/wrappers/FieldMetricsSpec.scala | 56 +++--- .../tracing/ApolloFederatedTracing.scala | 16 +- .../tracing/FederationTracingSpec.scala | 7 +- 7 files changed, 211 insertions(+), 52 deletions(-) create mode 100644 core/src/main/scala/caliban/wrappers/ApolloCaching.scala diff --git a/build.sbt b/build.sbt index e8ee3a4d6c..9a073c4fb9 100644 --- a/build.sbt +++ b/build.sbt @@ -742,10 +742,7 @@ lazy val enableMimaSettingsJVM = Def.settings( mimaFailOnProblem := enforceMimaCompatibility, mimaPreviousArtifacts := previousStableVersion.value.map(organization.value %% moduleName.value % _).toSet, - mimaBinaryIssueFilters := Seq( - ProblemFilters.exclude[Problem]("caliban.wrappers.*"), - ProblemFilters.exclude[DirectMissingMethodProblem]("caliban.federation.tracing.ApolloFederatedTracing.wrapper") - ) + mimaBinaryIssueFilters := Seq() ) lazy val enableMimaSettingsJS = diff --git a/core/src/main/scala/caliban/wrappers/ApolloCaching.scala b/core/src/main/scala/caliban/wrappers/ApolloCaching.scala new file mode 100644 index 0000000000..3310d89958 --- /dev/null +++ b/core/src/main/scala/caliban/wrappers/ApolloCaching.scala @@ -0,0 +1,161 @@ +package caliban.wrappers + +import caliban.CalibanError.ExecutionError +import caliban.ResponseValue.{ ListValue, ObjectValue } +import caliban.Value.{ EnumValue, IntValue, StringValue } +import caliban.execution.FieldInfo +import caliban.parsing.adt.Directive +import caliban.schema.Annotations.GQLDirective +import caliban.wrappers.Wrapper.{ EffectfulWrapper, FieldWrapper, OverallWrapper } +import caliban._ +import zio._ +import zio.query.ZQuery + +import java.util.concurrent.TimeUnit + +/** + * Returns a wrapper which applies apollo caching response extensions + */ +@deprecated("Use `caliban.wrappers.Caching` for a more flexible implementation", "2.4.0") +object ApolloCaching { + + private val directiveName = "cacheControl" + + @deprecated("Use `caliban.wrappers.Caching` for a more flexible implementation", "2.4.0") + case class GQLCacheControl(maxAge: Option[Duration] = None, scope: Option[CacheScope] = None) + extends GQLDirective(CacheControl(scope, maxAge)) + + object CacheControl { + + def apply(scope: Option[CacheScope], maxAge: Option[Duration]): Directive = + (scope, maxAge) match { + case (Some(scope), Some(age)) => apply(age, scope) + case (None, Some(age)) => apply(age) + case (Some(scope), None) => apply(scope) + case _ => Directive(directiveName, Map.empty) + } + + def apply(scope: ApolloCaching.CacheScope): Directive = + Directive(directiveName, Map("scope" -> EnumValue(scope.toString))) + + def apply(maxAge: Duration): Directive = + Directive(directiveName, Map("maxAge" -> IntValue(maxAge.toMillis / 1000))) + + def apply(maxAge: Duration, scope: ApolloCaching.CacheScope): Directive = + Directive(directiveName, Map("maxAge" -> IntValue(maxAge.toMillis / 1000), "scope" -> EnumValue(scope.toString))) + + } + + val apolloCaching: EffectfulWrapper[Any] = + EffectfulWrapper( + Ref.make(Caching()).map(ref => apolloCachingOverall(ref) |+| apolloCachingField(ref)) + ) + + sealed trait CacheScope + + object CacheScope { + + case object Private extends CacheScope { + override def toString: String = "PRIVATE" + } + + case object Public extends CacheScope { + override def toString: String = "PUBLIC" + } + + } + + case class CacheHint( + fieldName: String = "", + path: List[PathValue] = Nil, + maxAge: Duration, + scope: CacheScope + ) { + + def toResponseValue: ResponseValue = + ObjectValue( + List( + "path" -> ListValue((PathValue.Key(fieldName) :: path).reverse), + "maxAge" -> IntValue(maxAge.toMillis / 1000), + "scope" -> StringValue(scope match { + case CacheScope.Private => "PRIVATE" + case CacheScope.Public => "PUBLIC" + }) + ) + ) + + } + + case class Caching( + version: Int = 1, + hints: List[CacheHint] = List.empty + ) { + + def toResponseValue: ResponseValue = + ObjectValue(List("version" -> IntValue(version), "hints" -> ListValue(hints.map(_.toResponseValue)))) + } + + case class CacheDirective(scope: Option[CacheScope] = None, maxAge: Option[Duration] = None) + + private def extractCacheDirective(directives: List[Directive]): Option[CacheDirective] = + directives.collectFirst { + case d if d.name == directiveName => + val scope = d.arguments.get("scope").collectFirst { + case StringValue("PRIVATE") | EnumValue("PRIVATE") => CacheScope.Private + case StringValue("PUBLIC") | EnumValue("PUBLIC") => CacheScope.Public + } + + val maxAge = d.arguments.get("maxAge").collectFirst { case i: IntValue => + Duration(i.toLong, TimeUnit.SECONDS) + } + + CacheDirective(scope, maxAge) + } + + private def apolloCachingOverall(ref: Ref[Caching]): OverallWrapper[Any] = + new OverallWrapper[Any] { + def wrap[R1 <: Any]( + process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] + ): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] = + (request: GraphQLRequest) => + for { + result <- process(request) + cache <- ref.get + } yield result.copy( + extensions = Some( + ObjectValue( + ("cacheControl" -> cache.toResponseValue) :: result.extensions.fold( + List.empty[(String, ResponseValue)] + )(_.fields) + ) + ) + ) + } + + private def apolloCachingField(ref: Ref[Caching]): FieldWrapper[Any] = + new FieldWrapper(true) { + def wrap[R1 <: Any]( + query: ZQuery[R1, ExecutionError, ResponseValue], + fieldInfo: FieldInfo + ): ZQuery[R1, ExecutionError, ResponseValue] = { + val cacheDirectives = extractCacheDirective( + fieldInfo.directives ++ fieldInfo.details.fieldType.ofType.flatMap(_.directives).getOrElse(Nil) + ) + + cacheDirectives.foldLeft(query) { case (q, cacheDirective) => + q <* ZQuery.fromZIO( + ref.update(state => + state.copy( + hints = CacheHint( + path = fieldInfo.path, + fieldName = fieldInfo.name, + maxAge = cacheDirective.maxAge getOrElse Duration.Zero, + scope = cacheDirective.scope getOrElse CacheScope.Private + ) :: state.hints + ) + ) + ) + } + } + } +} diff --git a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala index 7a4714948c..8f4b09593f 100644 --- a/core/src/main/scala/caliban/wrappers/FieldMetrics.scala +++ b/core/src/main/scala/caliban/wrappers/FieldMetrics.scala @@ -3,7 +3,7 @@ package caliban.wrappers import caliban.Value.StringValue import caliban._ import caliban.execution.FieldInfo -import caliban.wrappers.Wrapper.OverallWrapper +import caliban.wrappers.Wrapper.{ EffectfulWrapper, OverallWrapper } import zio._ import zio.metrics.MetricKeyType.Histogram import zio.metrics.{ Metric, MetricKey, MetricLabel } @@ -56,12 +56,14 @@ object FieldMetrics { durationLabel: String = "graphql_fields_duration_seconds", buckets: Histogram.Boundaries = defaultBuckets, extraLabels: Set[MetricLabel] = Set.empty - )(implicit clock: Clock = Clock.ClockLive): Wrapper[Any] = - Wrapper.suspend { - val timings = new AtomicReference(List.empty[Timing]) - val failures = new AtomicReference(List.empty[String]) - val metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels) - overallWrapper(timings, failures, metrics) |+| fieldWrapper(clock.unsafe, timings, failures) + ): EffectfulWrapper[Any] = + Wrapper.EffectfulWrapper { + ZIO.clock.map { clock => + val timings = new AtomicReference(List.empty[Timing]) + val failures = new AtomicReference(List.empty[String]) + val metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels) + overallWrapper(timings, failures, metrics) |+| fieldWrapper(clock.unsafe, timings, failures) + } } private def overallWrapper( diff --git a/core/src/main/scala/caliban/wrappers/Wrappers.scala b/core/src/main/scala/caliban/wrappers/Wrappers.scala index 4aeec8ab20..ab1744d6d5 100644 --- a/core/src/main/scala/caliban/wrappers/Wrappers.scala +++ b/core/src/main/scala/caliban/wrappers/Wrappers.scala @@ -172,7 +172,7 @@ object Wrappers { durationLabel: String = "graphql_fields_duration_seconds", buckets: Histogram.Boundaries = FieldMetrics.defaultBuckets, extraLabels: Set[MetricLabel] = Set.empty - )(implicit clock: Clock = Clock.ClockLive): Wrapper[Any] = + ): Wrapper.EffectfulWrapper[Any] = FieldMetrics.wrapper(totalLabel, durationLabel, buckets, extraLabels) private def countFields(rootField: Field): Int = { diff --git a/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala b/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala index 807b4beb7c..ff5583d493 100644 --- a/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala +++ b/core/src/test/scala/caliban/wrappers/FieldMetricsSpec.scala @@ -7,7 +7,7 @@ import caliban.schema.ArgBuilder.auto._ import caliban.schema.Schema.auto._ import zio._ import zio.metrics.Metric -import zio.metrics.{ Metric, MetricLabel, MetricState } +import zio.metrics.{ MetricLabel, MetricState } import zio.test._ object FieldMetricsSpec extends ZIOSpecDefault { @@ -76,34 +76,32 @@ object FieldMetricsSpec extends ZIOSpecDefault { } } }""") - ZIO.clockWith { implicit clock => - for { - interpreter <- (api @@ Wrappers.metrics(extraLabels = Set(MetricLabel("test", "success")))).interpreter - _ <- TestClock.adjust(30.seconds) - fiber <- interpreter.execute(query).fork - _ <- TestClock.adjust(30.seconds) - res <- fiber.join - root <- metric.tagged("field", "Queries.person").value - name <- metric.tagged("field", "Person.name").value - friendsName <- metric.tagged("field", "Friend.name").value - totalRoot <- metricTotal.tagged("field", "Queries.person").tagged("status", "ok").value - total <- metricTotal.tagged("field", "Friend.name").tagged("status", "ok").value - totalError <- metricTotal.tagged("field", "Friend.name").tagged("status", "error").value - } yield assertTrue( - res.errors.size == 1, - getCountForDuration(root, 0.025) == 0, - getCountForDuration(root, 0.05) == 1, - getCountForDuration(root, 0.5) == 1, - getCountForDuration(name, 0.075) == 0, - getCountForDuration(name, 0.1) == 1, - getCountForDuration(name, 0.5) == 1, - getCountForDuration(friendsName, 0.25) == 0, - getCountForDuration(friendsName, 0.5) == 3, - totalRoot.count == 1.0, - total.count == 3.0, - totalError.count == 1.0 - ) - } + for { + interpreter <- (api @@ Wrappers.metrics(extraLabels = Set(MetricLabel("test", "success")))).interpreter + _ <- TestClock.adjust(30.seconds) + fiber <- interpreter.execute(query).fork + _ <- TestClock.adjust(30.seconds) + res <- fiber.join + root <- metric.tagged("field", "Queries.person").value + name <- metric.tagged("field", "Person.name").value + friendsName <- metric.tagged("field", "Friend.name").value + totalRoot <- metricTotal.tagged("field", "Queries.person").tagged("status", "ok").value + total <- metricTotal.tagged("field", "Friend.name").tagged("status", "ok").value + totalError <- metricTotal.tagged("field", "Friend.name").tagged("status", "error").value + } yield assertTrue( + res.errors.size == 1, + getCountForDuration(root, 0.025) == 0, + getCountForDuration(root, 0.05) == 1, + getCountForDuration(root, 0.5) == 1, + getCountForDuration(name, 0.075) == 0, + getCountForDuration(name, 0.1) == 1, + getCountForDuration(name, 0.5) == 1, + getCountForDuration(friendsName, 0.25) == 0, + getCountForDuration(friendsName, 0.5) == 3, + totalRoot.count == 1.0, + total.count == 3.0, + totalError.count == 1.0 + ) } ) } diff --git a/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala b/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala index 575e3ccc22..2168ac6e74 100644 --- a/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala +++ b/federation/src/main/scala/caliban/federation/tracing/ApolloFederatedTracing.scala @@ -6,7 +6,7 @@ import caliban.Value.{ IntValue, StringValue } import caliban._ import caliban.execution.FieldInfo import caliban.wrappers.Wrapper -import caliban.wrappers.Wrapper.{ FieldWrapper, OverallWrapper } +import caliban.wrappers.Wrapper.{ EffectfulWrapper, FieldWrapper, OverallWrapper } import com.google.protobuf.timestamp.Timestamp import mdg.engine.proto.reports.Trace import mdg.engine.proto.reports.Trace.{ Error, Location, Node } @@ -28,12 +28,14 @@ object ApolloFederatedTracing { * Setting this to true can help improve performance at the cost of generating incomplete traces. * WARNING: Use this with caution as it could potentially cause issues if the tracing client expects all queried fields to be included in the traces */ - def wrapper(excludePureFields: Boolean = false)(implicit clock: Clock = Clock.ClockLive): Wrapper[Any] = - Wrapper.suspend { - val tracing = new AtomicReference(Tracing(NodeTrie.empty)) - val enabled = new AtomicReference(false) - apolloTracingOverall(clock, tracing, enabled) |+| - apolloTracingField(clock.unsafe, tracing, enabled, !excludePureFields) + def wrapper(excludePureFields: Boolean = false): EffectfulWrapper[Any] = + EffectfulWrapper { + ZIO.clock.map { clock => + val tracing = new AtomicReference(Tracing(NodeTrie.empty)) + val enabled = new AtomicReference(false) + apolloTracingOverall(clock, tracing, enabled) |+| + apolloTracingField(clock.unsafe, tracing, enabled, !excludePureFields) + } } private def toTimestamp(epochMilli: Long): Timestamp = diff --git a/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala b/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala index a98d587413..3a652d8646 100644 --- a/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala +++ b/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala @@ -33,7 +33,7 @@ object FederationTracingSpec extends ZIOSpecDefault { case class Queries(me: ZQuery[Any, Nothing, User]) - def api(excludePureFields: Boolean) = ZIO.clock.map { implicit clock => + def api(excludePureFields: Boolean) = graphQL( RootResolver( Queries( @@ -48,7 +48,6 @@ object FederationTracingSpec extends ZIOSpecDefault { ) ) ) @@ ApolloFederatedTracing.wrapper(excludePureFields) - } val query = gqldoc("query { me { id username { first, family: last } parents { name } age } }") val body = ObjectValue( @@ -145,7 +144,7 @@ object FederationTracingSpec extends ZIOSpecDefault { test("disabled by default") { for { _ <- TestClock.setTime(Instant.ofEpochSecond(1)) - interpreter <- api(false).flatMap(_.interpreter) + interpreter <- api(false).interpreter resultFiber <- interpreter.execute(query).fork result <- TestClock.adjust(1.second) *> resultFiber.join } yield assertTrue(result.data == body) && assert(result.extensions)(isNone) @@ -154,7 +153,7 @@ object FederationTracingSpec extends ZIOSpecDefault { def test_(excludePureFields: Boolean, expectedTrace: Trace) = for { _ <- TestClock.setTime(Instant.ofEpochSecond(1)) - interpreter <- api(excludePureFields).flatMap(_.interpreter) + interpreter <- api(excludePureFields).interpreter resultFiber <- interpreter .execute( From afc4bc1e467fa47deb5ea412c7514c86038f46ae Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Mon, 1 Jul 2024 19:41:17 +1000 Subject: [PATCH 4/4] fmt --- .../tracing/FederationTracingSpec.scala | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala b/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala index 3a652d8646..67ce6d65d8 100644 --- a/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala +++ b/federation/src/test/scala/caliban/federation/tracing/FederationTracingSpec.scala @@ -33,21 +33,20 @@ object FederationTracingSpec extends ZIOSpecDefault { case class Queries(me: ZQuery[Any, Nothing, User]) - def api(excludePureFields: Boolean) = - graphQL( - RootResolver( - Queries( - me = ZQuery.succeed( - User( - "abc123", - ZIO.sleep(100.millis) as Name("my_first", Some("my_last")), - age = 42, - parents = ZIO.succeed(List(Parent("my_parent"))) - ) + def api(excludePureFields: Boolean) = graphQL( + RootResolver( + Queries( + me = ZQuery.succeed( + User( + "abc123", + ZIO.sleep(100.millis) as Name("my_first", Some("my_last")), + age = 42, + parents = ZIO.succeed(List(Parent("my_parent"))) ) ) ) - ) @@ ApolloFederatedTracing.wrapper(excludePureFields) + ) + ) @@ ApolloFederatedTracing.wrapper(excludePureFields) val query = gqldoc("query { me { id username { first, family: last } parents { name } age } }") val body = ObjectValue(