Skip to content

Commit

Permalink
Use dedicated methods for steps in executor (#2017)
Browse files Browse the repository at this point in the history
* Use dedicated methods for steps in executor

* Remove implicit requirement to make mima happy

* Don't disable auto-tracing
  • Loading branch information
kyri-petrou authored Nov 22, 2023
1 parent 8ef7fb3 commit 8db6b8c
Showing 1 changed file with 101 additions and 95 deletions.
196 changes: 101 additions & 95 deletions core/src/main/scala/caliban/execution/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import caliban.schema.Step._
import caliban.schema.{ ReducedStep, Step, Types }
import caliban.wrappers.Wrapper.FieldWrapper
import zio._
import zio.query.{ Cache, ZQuery }
import zio.query.{ Cache, UQuery, URQuery, ZQuery }
import zio.stream.ZStream

import scala.annotation.tailrec
Expand All @@ -35,6 +35,7 @@ object Executor {
featureSet: Set[Feature] = Set.empty
)(implicit trace: Trace): URIO[R, GraphQLResponse[CalibanError]] = {
val wrapPureValues = fieldWrappers.exists(_.wrapPureValues)
type ExecutionQuery[+A] = ZQuery[R, ExecutionError, A]

val execution = request.operationType match {
case OperationType.Query => queryExecution
Expand All @@ -53,7 +54,55 @@ object Executor {
currentField: Field,
arguments: Map[String, InputValue],
path: List[Either[String, Int]]
): ReducedStep[R] =
): ReducedStep[R] = {

def reduceObjectStep(objectName: String, fields: Map[String, Step[R]]) = {
val filteredFields = mergeFields(currentField, objectName)
val (deferred, eager) = filteredFields.partitionMap {
case f @ Field("__typename", _, _, _, _, _, _, directives, _, _, _) =>
Right((f.aliasedName, PureStep(StringValue(objectName)), fieldInfo(f, path, directives)))
case f @ Field(name, _, _, _, _, _, args, directives, _, _, fragment) =>
val aliasedName = f.aliasedName
val field = fields
.get(name)
.fold(NullStep: ReducedStep[R])(reduceStep(_, f, args, Left(aliasedName) :: path))

val info = fieldInfo(f, path, directives)

fragment.collectFirst {
// The defer spec provides some latitude on how we handle responses. Since it is more performant to return
// pure fields rather than spin up the defer machinery we return pure fields immediately to the caller.
case IsDeferred(label) if featureSet(Feature.Defer) && !field.isPure =>
(label, (aliasedName, field, info))
}.toLeft((aliasedName, field, info))
}

deferred match {
case Nil => reduceObject(eager, wrapPureValues)
case d =>
DeferStep(
reduceObject(eager, wrapPureValues),
d.groupBy(_._1).toList.map { case (label, labelAndFields) =>
val (_, fields) = labelAndFields.unzip
reduceObject(fields, wrapPureValues) -> label
},
path
)
}
}

def reduceListStep(steps: List[Step[R]]) = {
var i = 0
val lb = List.newBuilder[ReducedStep[R]]
var remaining = steps
while (!remaining.isEmpty) {
lb += reduceStep(remaining.head, currentField, arguments, Right(i) :: path)
i += 1
remaining = remaining.tail
}
reduceList(lb.result(), Types.listOf(currentField.fieldType).fold(false)(_.isNullable))
}

step match {
case s @ PureStep(EnumValue(v)) =>
// special case of an hybrid union containing case objects, those should return an object instead of a string
Expand All @@ -67,56 +116,15 @@ object Executor {
case s: PureStep => s
case FunctionStep(step) => reduceStep(step(arguments), currentField, Map.empty, path)
case MetadataFunctionStep(step) => reduceStep(step(currentField), currentField, arguments, path)
case ObjectStep(objectName, fields) =>
val filteredFields = mergeFields(currentField, objectName)
val (deferred, eager) = filteredFields.partitionMap {
case f @ Field("__typename", _, _, _, _, _, _, directives, _, _, _) =>
Right((f.aliasedName, PureStep(StringValue(objectName)), fieldInfo(f, path, directives)))
case f @ Field(name, _, _, _, _, _, args, directives, _, _, fragment) =>
val aliasedName = f.aliasedName
val field = fields
.get(name)
.fold(NullStep: ReducedStep[R])(reduceStep(_, f, args, Left(aliasedName) :: path))

val info = fieldInfo(f, path, directives)

fragment.collectFirst {
// The defer spec provides some latitude on how we handle responses. Since it is more performant to return
// pure fields rather than spin up the defer machinery we return pure fields immediately to the caller.
case IsDeferred(label) if featureSet(Feature.Defer) && !field.isPure =>
(label, (aliasedName, field, info))
}.toLeft((aliasedName, field, info))
}

deferred match {
case Nil => reduceObject(eager, wrapPureValues)
case d =>
DeferStep(
reduceObject(eager, wrapPureValues),
d.groupBy(_._1).toList.map { case (label, labelAndFields) =>
val (_, fields) = labelAndFields.unzip
reduceObject(fields, wrapPureValues) -> label
},
path
)
}
case QueryStep(inner) =>
ReducedStep.QueryStep(
inner.foldCauseQuery(
e => ZQuery.failCause(effectfulExecutionError(path, Some(currentField.locationInfo), e)),
a => ZQuery.succeed(reduceStep(a, currentField, arguments, path))
)
)
case ListStep(steps) =>
var i = 0
val lb = List.newBuilder[ReducedStep[R]]
var remaining = steps
while (!remaining.isEmpty) {
lb += reduceStep(remaining.head, currentField, arguments, Right(i) :: path)
i += 1
remaining = remaining.tail
}
reduceList(lb.result(), Types.listOf(currentField.fieldType).fold(false)(_.isNullable))
case ObjectStep(objectName, fields) => reduceObjectStep(objectName, fields)
case ListStep(steps) => reduceListStep(steps)
case StreamStep(stream) =>
if (request.operationType == OperationType.Subscription) {
ReducedStep.StreamStep(
Expand All @@ -133,79 +141,78 @@ object Executor {
)
}
}
}

def makeQuery(
step: ReducedStep[R],
errors: Ref[List[CalibanError]],
deferred: Ref[List[Deferred[R]]]
): ZQuery[R, Nothing, ResponseValue] = {
): URQuery[R, ResponseValue] = {

def handleError(error: ExecutionError): ZQuery[Any, Nothing, ResponseValue] =
def handleError(error: ExecutionError): UQuery[ResponseValue] =
ZQuery.fromZIO(errors.update(error :: _)).as(NullValue)

def wrap(query: ZQuery[R, ExecutionError, ResponseValue], isPure: Boolean)(
wrappers: List[FieldWrapper[R]],
fieldInfo: FieldInfo
): ZQuery[R, ExecutionError, ResponseValue] = {
def wrap(query: ExecutionQuery[ResponseValue], isPure: Boolean, fieldInfo: FieldInfo) = {
@tailrec
def loop(
query: ZQuery[R, ExecutionError, ResponseValue],
wrappers: List[FieldWrapper[R]]
): ZQuery[R, ExecutionError, ResponseValue] =
def loop(query: ExecutionQuery[ResponseValue], wrappers: List[FieldWrapper[R]]): ExecutionQuery[ResponseValue] =
wrappers match {
case Nil => query
case wrapper :: tail =>
val q = if (isPure && !wrapper.wrapPureValues) query else wrapper.wrap(query, fieldInfo)
loop(q, tail)
}
loop(query, wrappers)
loop(query, fieldWrappers)
}

def objectFieldQuery(name: String, step: ReducedStep[R], info: FieldInfo) = {
val q = wrap(loop(step), step.isPure)(fieldWrappers, info)
val q = wrap(loop(step), step.isPure, info)

if (info.details.fieldType.isNullable) q.catchAll(handleError).map((name, _))
else q.map((name, _))
}

def loop(step: ReducedStep[R]): ZQuery[R, ExecutionError, ResponseValue] =
step match {
case PureStep(value) => ZQuery.succeed(value)
case ReducedStep.ObjectStep(steps) =>
var resolved: mutable.HashMap[String, ResponseValue] = null

val queries =
if (wrapPureValues) steps.map((objectFieldQuery _).tupled)
else {
val queries = List.newBuilder[ZQuery[R, ExecutionError, (String, ResponseValue)]]

var remaining = steps
while (!remaining.isEmpty) {
remaining.head match {
case (name, PureStep(value), _) =>
if (null == resolved) resolved = new mutable.HashMap[String, ResponseValue]()
resolved.update(name, value)
case (name, step, info) =>
queries += objectFieldQuery(name, step, info)
}
remaining = remaining.tail
}
queries.result()
def makeObjectQuery(steps: List[(String, ReducedStep[R], FieldInfo)]) = {
var resolved: mutable.HashMap[String, ResponseValue] = null

val queries =
if (wrapPureValues) steps.map((objectFieldQuery _).tupled)
else {
val queries = List.newBuilder[ExecutionQuery[(String, ResponseValue)]]
var remaining = steps

while (!remaining.isEmpty) {
remaining.head match {
case (name, PureStep(value), _) =>
if (null == resolved) resolved = new mutable.HashMap[String, ResponseValue]()
resolved.update(name, value)
case (name, step, info) =>
queries += objectFieldQuery(name, step, info)
}
remaining = remaining.tail
}
queries.result()
}

if (null == resolved) collectAll(queries).map(ObjectValue.apply)
else
collectAll(queries).map { results =>
results.foreach(kv => resolved.update(kv._1, kv._2))
ObjectValue(steps.map { case (name, _, _) => name -> resolved(name) })
}
case ReducedStep.QueryStep(step) =>
step.flatMap(loop)
case ReducedStep.ListStep(steps, areItemsNullable) =>
val queries =
if (areItemsNullable) steps.map(loop(_).catchAll(handleError))
else steps.map(loop)

collectAll(queries).map(ListValue.apply)
if (null == resolved) collectAll(queries).map(ObjectValue.apply)
else
collectAll(queries).map { results =>
results.foreach(kv => resolved.update(kv._1, kv._2))
ObjectValue(steps.map { case (name, _, _) => name -> resolved(name) })
}
}

def makeListQuery(steps: List[ReducedStep[R]], areItemsNullable: Boolean) =
collectAll(
if (areItemsNullable) steps.map(loop(_).catchAll(handleError))
else steps.map(loop)
).map(ListValue.apply)

def loop(step: ReducedStep[R]): ExecutionQuery[ResponseValue] =
step match {
case PureStep(value) => ZQuery.succeed(value)
case ReducedStep.QueryStep(step) => step.flatMap(loop)
case ReducedStep.ObjectStep(steps) => makeObjectQuery(steps)
case ReducedStep.ListStep(steps, areItemsNullable) => makeListQuery(steps, areItemsNullable)
case ReducedStep.StreamStep(stream) =>
ZQuery
.environment[R]
Expand Down Expand Up @@ -299,7 +306,6 @@ object Executor {
ZIO.succeed(GraphQLResponse(NullValue, List(error)))

private[caliban] def mergeFields(field: Field, typeName: String): List[Field] = {
// ugly mutable code but it's worth it for the speed ;)
val map = new java.util.LinkedHashMap[String, Field]()
var modified = false

Expand Down

0 comments on commit 8db6b8c

Please sign in to comment.