Skip to content

Commit

Permalink
More executor optimizations (#2052)
Browse files Browse the repository at this point in the history
  • Loading branch information
kyri-petrou authored Dec 20, 2023
1 parent ee2d9e9 commit 2febf6d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 53 deletions.
121 changes: 75 additions & 46 deletions core/src/main/scala/caliban/execution/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object Executor {
val wrapPureValues = fieldWrappers.exists(_.wrapPureValues)
val isDeferredEnabled = featureSet(Feature.Defer)
val isMutation = request.operationType == OperationType.Mutation
val isSubscription = request.operationType == OperationType.Subscription

type ExecutionQuery[+A] = ZQuery[R, ExecutionError, A]

Expand Down Expand Up @@ -111,7 +112,13 @@ object Executor {
i += 1
remaining = remaining.tail
}
reduceList(lb.result(), Types.listOf(currentField.fieldType).fold(false)(_.isNullable))
reduceList(
lb.result(),
Types.listOf(currentField.fieldType) match {
case Some(tpe) => tpe.isNullable
case None => false
}
)
}

def reduceQuery(query: ZQuery[R, Throwable, Step[R]]) =
Expand All @@ -135,13 +142,13 @@ object Executor {
case None => s
}
case s: PureStep => s
case FunctionStep(step) => reduceStep(step(arguments), currentField, Map.empty, path)
case MetadataFunctionStep(step) => reduceStep(step(currentField), currentField, arguments, path)
case QueryStep(inner) => reduceQuery(inner)
case ObjectStep(objectName, fields) => reduceObjectStep(objectName, fields)
case FunctionStep(step) => reduceStep(step(arguments), currentField, Map.empty, path)
case MetadataFunctionStep(step) => reduceStep(step(currentField), currentField, arguments, path)
case ListStep(steps) => reduceListStep(steps)
case StreamStep(stream) =>
if (request.operationType == OperationType.Subscription) {
if (isSubscription) {
ReducedStep.StreamStep(
stream
.mapErrorCause(effectfulExecutionError(path, Some(currentField.locationInfo), _))
Expand Down Expand Up @@ -179,52 +186,61 @@ object Executor {
loop(query, fieldWrappers)
}

def objectFieldQuery(name: String, step: ReducedStep[R], info: FieldInfo) = {
val q = wrap(loop(step), step.isPure, info)

if (info.details.fieldType.isNullable) {
q.foldQuery(
handleError(_).map((name, _)),
v => ZQuery.succeed((name, v))
)
} else q.map((name, _))
def objectFieldQuery(step: ReducedStep[R], info: FieldInfo, isPure: Boolean = false) = {
val q = wrap(loop(step), isPure, info)
if (info.details.fieldType.isNullable) q.catchAll(handleError) else q
}

def makeObjectQuery(steps: List[(String, ReducedStep[R], FieldInfo)], isTopLevelField: Boolean) = {
def makeObjectQuery(
steps: List[(String, ReducedStep[R], FieldInfo)],
hasPureFields: Boolean,
isTopLevelField: Boolean
) = {

def collectAllQueries() =
collectAll(steps, isTopLevelField)((objectFieldQuery _).tupled).map(ObjectValue.apply)
collectAll(steps, isTopLevelField) { case (_, step, info) =>
// Only way we could have ended with pure fields here is if we wrap pure values, so we check that first as it's cheaper
objectFieldQuery(step, info, wrapPureValues && step.isPure)
}.map(ls => ObjectValue(ls.lazyZip(steps).map { case (resp, (name, _, _)) => (name, resp) }))

def combineResults(names: List[String], resolved: List[ResponseValue])(fromQueries: Vector[ResponseValue]) = {
var results: List[(String, ResponseValue)] = Nil
var i = fromQueries.length
var remainingResponses = resolved
var remainingNames = names
while (remainingResponses ne Nil) {
val name = remainingNames.head
val resp = remainingResponses.head match {
case null => i -= 1; fromQueries(i)
case resp => resp
}
results = (name, resp) :: results
remainingResponses = remainingResponses.tail
remainingNames = remainingNames.tail
}
ObjectValue(results)
}

def collectMixed() = {
val resolved = ListBuffer.empty[(String, ResponseValue)]
val queries = Vector.newBuilder[(String, ReducedStep[R], FieldInfo)]
var names = List.empty[String]
var resolved = List.empty[ResponseValue]
var remaining = steps
while (remaining ne Nil) {
remaining.head match {
case (name, PureStep(value), _) => resolved += ((name, value))
case step =>
resolved += null
queries += step
val (name, step, _) = remaining.head
val value = step match {
case PureStep(value) => value
case _ => queries += remaining.head; null
}
resolved = value :: resolved
names = name :: names
remaining = remaining.tail
}

def combineResults(fromQueries: Vector[(String, ResponseValue)]) = {
val lb = List.newBuilder[(String, ResponseValue)]
var i = -1
val iter = resolved.iterator
while (iter.hasNext)
lb += (iter.next() match {
case null => i += 1; fromQueries(i)
case t => t
})
ObjectValue(lb.result())
}

collectAll(queries.result(), isTopLevelField)((objectFieldQuery _).tupled).map(combineResults)
collectAll(queries.result(), isTopLevelField) { case (_, s, i) => objectFieldQuery(s, i) }
.map(combineResults(names, resolved))
}

if (wrapPureValues || !steps.exists(_._2.isPure)) collectAllQueries()
else collectMixed()
if (hasPureFields && !wrapPureValues) collectMixed() else collectAllQueries()
}

def makeListQuery(steps: List[ReducedStep[R]], areItemsNullable: Boolean): ExecutionQuery[ResponseValue] =
Expand All @@ -235,7 +251,7 @@ object Executor {
step match {
case PureStep(value) => ZQuery.succeed(value)
case ReducedStep.QueryStep(step) => step.flatMap(loop(_))
case ReducedStep.ObjectStep(steps) => makeObjectQuery(steps, isTopLevelField)
case ReducedStep.ObjectStep(steps, hasPureFields) => makeObjectQuery(steps, hasPureFields, isTopLevelField)
case ReducedStep.ListStep(steps, areItemsNullable) => makeListQuery(steps, areItemsNullable)
case ReducedStep.StreamStep(stream) =>
ZQuery
Expand Down Expand Up @@ -368,20 +384,33 @@ object Executor {
private def fieldInfo(field: Field, path: List[PathValue], fieldDirectives: List[Directive]): FieldInfo =
FieldInfo(field.aliasedName, field, path, fieldDirectives, field.parentType)

// In 99.99% of the cases, if the head is pure, all the other elements will be pure as well but we catch that error just in case
// NOTE: Our entire test suite passes without catching the error
private def reduceList[R](list: List[ReducedStep[R]], areItemsNullable: Boolean): ReducedStep[R] =
if (list.forall(_.isInstanceOf[PureStep]))
PureStep(ListValue(list.asInstanceOf[List[PureStep]].map(_.value)))
if (list.isEmpty || list.head.isPure)
try PureStep(ListValue(list.asInstanceOf[List[PureStep]].map(_.value)))
catch { case _: ClassCastException => ReducedStep.ListStep(list, areItemsNullable) }
else ReducedStep.ListStep(list, areItemsNullable)

private def reduceObject[R](
items: List[(String, ReducedStep[R], FieldInfo)],
wrapPureValues: Boolean
): ReducedStep[R] =
if (!wrapPureValues && items.forall(_._2.isPure))
PureStep(ObjectValue(items.asInstanceOf[List[(String, PureStep, FieldInfo)]].map { case (k, v, _) =>
(k, v.value)
}))
else ReducedStep.ObjectStep(items)
): ReducedStep[R] = {
var hasPures = false
var hasQueries = wrapPureValues
var remaining = items
while ((remaining ne Nil) && !(hasPures && hasQueries)) {
if (remaining.head._2.isPure) hasPures = true
else hasQueries = true
remaining = remaining.tail
}

if (hasQueries) ReducedStep.ObjectStep(items, hasPures)
else
PureStep(
ObjectValue(items.asInstanceOf[List[(String, PureStep, FieldInfo)]].map { case (k, v, _) => (k, v.value) })
)
}

private def effectfulExecutionError(
path: List[PathValue],
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/caliban/schema/Step.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,13 @@ object Step {
}

sealed trait ReducedStep[-R] { self =>
def isPure: Boolean =
self match {
case _: PureStep => true
case _ => false
}
def isPure: Boolean = false
}

object ReducedStep {
case class ListStep[-R](steps: List[ReducedStep[R]], areItemsNullable: Boolean) extends ReducedStep[R]
case class ObjectStep[-R](fields: List[(String, ReducedStep[R], FieldInfo)]) extends ReducedStep[R]
case class ObjectStep[-R](fields: List[(String, ReducedStep[R], FieldInfo)], hasPureFields: Boolean)
extends ReducedStep[R]
case class QueryStep[-R](query: ZQuery[R, ExecutionError, ReducedStep[R]]) extends ReducedStep[R]
case class StreamStep[-R](inner: ZStream[R, ExecutionError, ReducedStep[R]]) extends ReducedStep[R]
case class DeferStep[-R](
Expand All @@ -108,4 +105,6 @@ object ReducedStep {
*
* @param value the response value to return for that step
*/
case class PureStep(value: ResponseValue) extends Step[Any] with ReducedStep[Any]
case class PureStep(value: ResponseValue) extends Step[Any] with ReducedStep[Any] {
final override def isPure: Boolean = true
}

0 comments on commit 2febf6d

Please sign in to comment.