Skip to content

Commit

Permalink
Add method to "suspend" wrappers and reimplement APQs (#2321)
Browse files Browse the repository at this point in the history
* Add suspended wrappers and reimplement APQs

* Cleanups

* Revert changes to wrappers with Clock and add back ApolloCaching

* fmt
  • Loading branch information
kyri-petrou authored Jul 2, 2024
1 parent 0b5347a commit ce6e49a
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 176 deletions.
7 changes: 7 additions & 0 deletions core/src/main/scala/caliban/implicits.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package caliban

import zio.Unsafe

private object implicits {
implicit val unsafe: Unsafe = Unsafe.unsafe(identity)
}
92 changes: 59 additions & 33 deletions core/src/main/scala/caliban/wrappers/ApolloPersistedQueries.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import zio._

import java.nio.charset.StandardCharsets
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable

object ApolloPersistedQueries {
Expand All @@ -26,44 +27,48 @@ object ApolloPersistedQueries {
def add(hash: String, query: Document): ZIO[ApolloPersistence, Nothing, Unit] =
ZIO.serviceWithZIO[ApolloPersistence](_.add(hash, query))

val live: UIO[ApolloPersistence] =
ZIO.succeed(new ConcurrentHashMap[String, Document]()).map { docCache =>
new ApolloPersistence {
override def get(hash: String): UIO[Option[Document]] = ZIO.succeed(Option(docCache.get(hash)))
override def add(hash: String, query: Document): UIO[Unit] = ZIO.succeed(docCache.put(hash, query)).unit
}
}
val live: UIO[ApolloPersistence] = ZIO.succeed(new Live)(Trace.empty)

final class Live extends ApolloPersistence {
private implicit val trace: Trace = Trace.empty
private val docCache: ConcurrentHashMap[String, Document] = new ConcurrentHashMap[String, Document]()

override def get(hash: String): UIO[Option[Document]] = ZIO.succeed(Option(docCache.get(hash)))
override def add(hash: String, query: Document): UIO[Unit] = ZIO.succeed(docCache.put(hash, query)).unit
}

}

val live: Layer[Nothing, ApolloPersistence] = ZLayer(ApolloPersistence.live)

private def parsingWrapper(
docVar: Promise[Nothing, Option[(String, Option[Document])]]
): ParsingWrapper[ApolloPersistence] =
new ParsingWrapper[ApolloPersistence] {
override def wrap[R1 <: ApolloPersistence](
docVar: AtomicReference[Option[(String, Option[Document])]]
): ParsingWrapper[Any] =
new ParsingWrapper[Any] {
override def wrap[R1 <: Any](
f: String => ZIO[R1, CalibanError.ParsingError, Document]
): String => ZIO[R1, CalibanError.ParsingError, Document] =
(query: String) =>
docVar.await.flatMap {
case Some((_, Some(doc))) => ZIO.succeed(doc)
docVar.get() match {
case Some((_, Some(doc))) => Exit.succeed(doc)
case _ => f(query)
}
}

private def validationWrapper(
docVar: Promise[Nothing, Option[(String, Option[Document])]]
): ValidationWrapper[ApolloPersistence] =
new ValidationWrapper[ApolloPersistence] {
store: ApolloPersistence,
docVar: AtomicReference[Option[(String, Option[Document])]]
): ValidationWrapper[Any] =
new ValidationWrapper[Any] {
override val priority: Int = 100

override def wrap[R1 <: ApolloPersistence](
override def wrap[R1 <: Any](
f: Document => ZIO[R1, ValidationError, ExecutionRequest]
): Document => ZIO[R1, ValidationError, ExecutionRequest] =
(doc: Document) =>
docVar.await.flatMap {
docVar.get() match {
case Some((_, Some(_))) => Configurator.ref.locallyWith(_.copy(skipValidation = true))(f(doc))
case Some((hash, None)) => f(doc) <* ApolloPersistence.add(hash, doc)
case Some((hash, None)) => f(doc) <* store.add(hash, doc)
case None => f(doc)
}
}
Expand All @@ -74,40 +79,61 @@ object ApolloPersistedQueries {
* value to `(_, None)` which will then get passed to the parsing wrapper where it will populate the cache with the validated query document
*/
private def overallWrapper(
docVar: Promise[Nothing, Option[(String, Option[Document])]]
): OverallWrapper[ApolloPersistence] =
new OverallWrapper[ApolloPersistence] {
def wrap[R1 <: ApolloPersistence](
store: ApolloPersistence,
docVar: AtomicReference[Option[(String, Option[Document])]]
): OverallWrapper[Any] =
new OverallWrapper[Any] {
def wrap[R1 <: Any](
process: GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]]
): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] =
(request: GraphQLRequest) =>
readHash(request) match {
case Some(hash) =>
ApolloPersistence
store
.get(hash)
.flatMap {
case Some(doc) => docVar.succeed(Some((hash, Some(doc)))) as request
case Some(doc) =>
docVar.set(Some((hash, Some(doc))))
Exit.succeed(request)
case None =>
request.query match {
case Some(value) if checkHash(hash, value) => docVar.succeed(Some((hash, None))).as(request)
case Some(_) => ZIO.fail(ValidationError("Provided sha does not match any query", ""))
case None => ZIO.fail(ValidationError("PersistedQueryNotFound", ""))
case Some(value) if checkHash(hash, value) =>
docVar.set(Some((hash, None)))
Exit.succeed(request)
case Some(_) => Exit.fail(ValidationError("Provided sha does not match any query", ""))
case None => Exit.fail(ValidationError("PersistedQueryNotFound", ""))
}
}
.flatMap(process)
.catchAll(ex => Exit.succeed(GraphQLResponse(NullValue, List(ex))))
case None => docVar.succeed(None) *> process(request)
case None => process(request)
}
}

@deprecated("Use `wrapper` instead and pass the cache explicitly", "2.9.0")
val apolloPersistedQueries: EffectfulWrapper[ApolloPersistence] =
EffectfulWrapper(ZIO.serviceWith[ApolloPersistence](wrapper))

/**
* Returns a wrapper that persists and retrieves queries based on a hash
* following Apollo Persisted Queries spec: https://github.com/apollographql/apollo-link-persisted-queries.
*
* This wrapper will initialize a non-expiring cache which will be used for all queries
*
* @see Overloaded method for a variant that allows using a custom cache
*/
val apolloPersistedQueries: EffectfulWrapper[ApolloPersistence] =
EffectfulWrapper(Promise.make[Nothing, Option[(String, Option[Document])]].map { docVar =>
overallWrapper(docVar) |+| parsingWrapper(docVar) |+| validationWrapper(docVar)
})
def wrapper: Wrapper[Any] = wrapper(new ApolloPersistence.Live)

/**
* Returns a wrapper that persists and retrieves queries based on a hash
* following Apollo Persisted Queries spec: https://github.com/apollographql/apollo-link-persisted-queries.
*
* @param cache the query cache that will be used to store the parsed documents
*/
def wrapper(cache: ApolloPersistence): Wrapper[Any] = Wrapper.suspend {
val ref = new AtomicReference[Option[(String, Option[Document])]](None)
overallWrapper(cache, ref) |+| parsingWrapper(ref) |+| validationWrapper(cache, ref)
}

private def readHash(request: GraphQLRequest): Option[String] =
request.extensions
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/caliban/wrappers/ApolloTracing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object ApolloTracing {
} yield apolloTracingOverall(clock, ref) |+|
apolloTracingParsing(clock, ref) |+|
apolloTracingValidation(clock, ref) |+|
Unsafe.unsafe(implicit u => apolloTracingField(clock.unsafe.nanoTime(), ref, !excludePureFields))
apolloTracingField(clock.unsafe, ref, !excludePureFields)
)
.someOrElse(Wrapper.empty)
)
Expand Down Expand Up @@ -196,19 +196,21 @@ object ApolloTracing {
}

private def apolloTracingField(
nanoTime: => Long,
clock: Clock#UnsafeAPI,
ref: AtomicReference[Tracing],
wrapPureValues: Boolean
): FieldWrapper[Any] =
new FieldWrapper[Any](wrapPureValues) {
import caliban.implicits.unsafe

def wrap[R1](
query: ZQuery[R1, CalibanError.ExecutionError, ResponseValue],
fieldInfo: FieldInfo
): ZQuery[R1, CalibanError.ExecutionError, ResponseValue] =
ZQuery.suspend {
val start = nanoTime
val start = clock.nanoTime()
query.map { result =>
val end = nanoTime
val end = clock.nanoTime()
val _ = ref.updateAndGet(state =>
state.copy(
execution = state.execution.copy(
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/caliban/wrappers/CostEstimation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import caliban.CalibanError.ValidationError
import caliban.InputValue.ListValue
import caliban.ResponseValue.ObjectValue
import caliban.Value.{ FloatValue, IntValue, StringValue }
import caliban._
import caliban.execution.{ ExecutionRequest, Field }
import caliban.parsing.adt.{ Directive, Document }
import caliban.schema.Annotations.GQLDirective
import caliban.schema.Types
import caliban.validation.Validator
import caliban.wrappers.Wrapper.{ EffectfulWrapper, OverallWrapper, ValidationWrapper }
import caliban.{ CalibanError, Configurator, GraphQLRequest, GraphQLResponse, ResponseValue }
import caliban.wrappers.Wrapper.{ OverallWrapper, ValidationWrapper }
import zio.{ Ref, UIO, URIO, ZIO }

import scala.annotation.tailrec
Expand Down Expand Up @@ -155,11 +154,12 @@ object CostEstimation {
costWrapper: Ref[Double] => Wrapper[R]
)(
p: (Double, GraphQLResponse[CalibanError]) => URIO[R, GraphQLResponse[CalibanError]]
): Wrapper[R] = EffectfulWrapper(
Ref.make(0.0).map { cost =>
costWrapper(cost) |+| costOverall(resp => cost.get.flatMap(p(_, resp)))
}
)
): Wrapper[R] = Wrapper.suspend {
import caliban.implicits.unsafe

val cost = Ref.unsafe.make(0.0)
costWrapper(cost) |+| costOverall(resp => cost.get.flatMap(p(_, resp)))
}

/**
* Computes the estimated cost of executing the query using the provided function and compares it to
Expand Down
46 changes: 23 additions & 23 deletions core/src/main/scala/caliban/wrappers/FieldMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package caliban.wrappers
import caliban.Value.StringValue
import caliban._
import caliban.execution.FieldInfo
import caliban.wrappers.Wrapper.OverallWrapper
import caliban.wrappers.Wrapper.{ EffectfulWrapper, OverallWrapper }
import zio._
import zio.metrics.MetricKeyType.Histogram
import zio.metrics.{ Metric, MetricKey, MetricLabel }
Expand Down Expand Up @@ -56,16 +56,15 @@ object FieldMetrics {
durationLabel: String = "graphql_fields_duration_seconds",
buckets: Histogram.Boundaries = defaultBuckets,
extraLabels: Set[MetricLabel] = Set.empty
): Wrapper.EffectfulWrapper[Any] =
Wrapper.EffectfulWrapper(
for {
timings <- ZIO.succeed(new AtomicReference(List.empty[Timing]))
failures <- ZIO.succeed(new AtomicReference(List.empty[String]))
clock <- ZIO.clock
metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels)
} yield overallWrapper(timings, failures, metrics) |+|
Unsafe.unsafe(implicit us => fieldWrapper(clock.unsafe.nanoTime(), timings, failures))
)
): EffectfulWrapper[Any] =
Wrapper.EffectfulWrapper {
ZIO.clock.map { clock =>
val timings = new AtomicReference(List.empty[Timing])
val failures = new AtomicReference(List.empty[String])
val metrics = new Metrics(totalLabel, durationLabel, buckets, extraLabels)
overallWrapper(timings, failures, metrics) |+| fieldWrapper(clock.unsafe, timings, failures)
}
}

private def overallWrapper(
timings: AtomicReference[List[Timing]],
Expand All @@ -78,21 +77,20 @@ object FieldMetrics {
): GraphQLRequest => ZIO[R1, Nothing, GraphQLResponse[CalibanError]] =
(request: GraphQLRequest) =>
process(request) <*
ZIO
.blocking(for {
_ <- ZIO.suspendSucceed(metrics.recordFailures(failures.get))
ts <- ZIO.succeed(timings.get)
nodeOffsets = resolveNodeOffsets(ts)
_ <- metrics.recordSuccesses(nodeOffsets, ts)
} yield ())
.forkDaemon
(for {
_ <- ZIO.suspendSucceed(metrics.recordFailures(failures.get))
ts <- ZIO.succeed(timings.get)
nodeOffsets = resolveNodeOffsets(ts)
_ <- metrics.recordSuccesses(nodeOffsets, ts)
} yield ()).forkDaemon
}

private def resolveNodeOffsets(timings: List[Timing]): Map[Vector[PathValue], Long] = {

val map = new java.util.HashMap[Vector[PathValue], Long]()
val nil = Nil
var remaining = timings
while (!remaining.isEmpty) {
while (remaining ne nil) {
val t = remaining.head
val iter = t.path.inits
val duration = t.duration
Expand All @@ -118,11 +116,13 @@ object FieldMetrics {
}

private def fieldWrapper(
nanoTime: => Long,
clock: Clock#UnsafeAPI,
timings: AtomicReference[List[Timing]],
failures: AtomicReference[List[String]]
): Wrapper.FieldWrapper[Any] =
new Wrapper.FieldWrapper[Any] {
import caliban.implicits.unsafe

def wrap[R](
query: ZQuery[R, CalibanError.ExecutionError, ResponseValue],
info: FieldInfo
Expand All @@ -143,7 +143,7 @@ object FieldMetrics {
)

ZQuery.suspend {
val st = nanoTime
val st = clock.nanoTime()
query.foldQuery(
e =>
ZQuery.fail {
Expand All @@ -152,7 +152,7 @@ object FieldMetrics {
},
result =>
ZQuery.succeed {
val t = makeTiming(nanoTime - st)
val t = makeTiming(clock.nanoTime() - st)
val _ = timings.updateAndGet(t :: _)
result
}
Expand Down
Loading

0 comments on commit ce6e49a

Please sign in to comment.