Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More executor optimizations #2052

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 75 additions & 46 deletions core/src/main/scala/caliban/execution/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object Executor {
val wrapPureValues = fieldWrappers.exists(_.wrapPureValues)
val isDeferredEnabled = featureSet(Feature.Defer)
val isMutation = request.operationType == OperationType.Mutation
val isSubscription = request.operationType == OperationType.Subscription

type ExecutionQuery[+A] = ZQuery[R, ExecutionError, A]

Expand Down Expand Up @@ -111,7 +112,13 @@ object Executor {
i += 1
remaining = remaining.tail
}
reduceList(lb.result(), Types.listOf(currentField.fieldType).fold(false)(_.isNullable))
reduceList(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually faster? fold is inline.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlining via the @inline annotation only works in Scala 2 and only if it's enabled via the -opt-inline-from: compiler flag (which we don't have enabled - I think we might want to enable that one).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah :/

lb.result(),
Types.listOf(currentField.fieldType) match {
case Some(tpe) => tpe.isNullable
case None => false
}
)
}

def reduceQuery(query: ZQuery[R, Throwable, Step[R]]) =
Expand All @@ -135,13 +142,13 @@ object Executor {
case None => s
}
case s: PureStep => s
case FunctionStep(step) => reduceStep(step(arguments), currentField, Map.empty, path)
case MetadataFunctionStep(step) => reduceStep(step(currentField), currentField, arguments, path)
case QueryStep(inner) => reduceQuery(inner)
case ObjectStep(objectName, fields) => reduceObjectStep(objectName, fields)
case FunctionStep(step) => reduceStep(step(arguments), currentField, Map.empty, path)
case MetadataFunctionStep(step) => reduceStep(step(currentField), currentField, arguments, path)
case ListStep(steps) => reduceListStep(steps)
case StreamStep(stream) =>
if (request.operationType == OperationType.Subscription) {
if (isSubscription) {
ReducedStep.StreamStep(
stream
.mapErrorCause(effectfulExecutionError(path, Some(currentField.locationInfo), _))
Expand Down Expand Up @@ -179,52 +186,61 @@ object Executor {
loop(query, fieldWrappers)
}

def objectFieldQuery(name: String, step: ReducedStep[R], info: FieldInfo) = {
val q = wrap(loop(step), step.isPure, info)

if (info.details.fieldType.isNullable) {
q.foldQuery(
handleError(_).map((name, _)),
v => ZQuery.succeed((name, v))
)
} else q.map((name, _))
def objectFieldQuery(step: ReducedStep[R], info: FieldInfo, isPure: Boolean = false) = {
val q = wrap(loop(step), isPure, info)
if (info.details.fieldType.isNullable) q.catchAll(handleError) else q
}

def makeObjectQuery(steps: List[(String, ReducedStep[R], FieldInfo)], isTopLevelField: Boolean) = {
def makeObjectQuery(
steps: List[(String, ReducedStep[R], FieldInfo)],
hasPureFields: Boolean,
isTopLevelField: Boolean
) = {

def collectAllQueries() =
collectAll(steps, isTopLevelField)((objectFieldQuery _).tupled).map(ObjectValue.apply)
collectAll(steps, isTopLevelField) { case (_, step, info) =>
// Only way we could have ended with pure fields here is if we wrap pure values, so we check that first as it's cheaper
objectFieldQuery(step, info, wrapPureValues && step.isPure)
}.map(ls => ObjectValue(ls.lazyZip(steps).map { case (resp, (name, _, _)) => (name, resp) }))

def combineResults(names: List[String], resolved: List[ResponseValue])(fromQueries: Vector[ResponseValue]) = {
var results: List[(String, ResponseValue)] = Nil
var i = fromQueries.length
var remainingResponses = resolved
var remainingNames = names
while (remainingResponses ne Nil) {
val name = remainingNames.head
val resp = remainingResponses.head match {
case null => i -= 1; fromQueries(i)
case resp => resp
}
results = (name, resp) :: results
remainingResponses = remainingResponses.tail
remainingNames = remainingNames.tail
}
ObjectValue(results)
}

def collectMixed() = {
val resolved = ListBuffer.empty[(String, ResponseValue)]
val queries = Vector.newBuilder[(String, ReducedStep[R], FieldInfo)]
var names = List.empty[String]
var resolved = List.empty[ResponseValue]
var remaining = steps
while (remaining ne Nil) {
remaining.head match {
case (name, PureStep(value), _) => resolved += ((name, value))
case step =>
resolved += null
queries += step
val (name, step, _) = remaining.head
val value = step match {
case PureStep(value) => value
case _ => queries += remaining.head; null
}
resolved = value :: resolved
names = name :: names
remaining = remaining.tail
}

def combineResults(fromQueries: Vector[(String, ResponseValue)]) = {
val lb = List.newBuilder[(String, ResponseValue)]
var i = -1
val iter = resolved.iterator
while (iter.hasNext)
lb += (iter.next() match {
case null => i += 1; fromQueries(i)
case t => t
})
ObjectValue(lb.result())
}

collectAll(queries.result(), isTopLevelField)((objectFieldQuery _).tupled).map(combineResults)
collectAll(queries.result(), isTopLevelField) { case (_, s, i) => objectFieldQuery(s, i) }
.map(combineResults(names, resolved))
}

if (wrapPureValues || !steps.exists(_._2.isPure)) collectAllQueries()
else collectMixed()
if (hasPureFields && !wrapPureValues) collectMixed() else collectAllQueries()
}

def makeListQuery(steps: List[ReducedStep[R]], areItemsNullable: Boolean): ExecutionQuery[ResponseValue] =
Expand All @@ -235,7 +251,7 @@ object Executor {
step match {
case PureStep(value) => ZQuery.succeed(value)
case ReducedStep.QueryStep(step) => step.flatMap(loop(_))
case ReducedStep.ObjectStep(steps) => makeObjectQuery(steps, isTopLevelField)
case ReducedStep.ObjectStep(steps, hasPureFields) => makeObjectQuery(steps, hasPureFields, isTopLevelField)
case ReducedStep.ListStep(steps, areItemsNullable) => makeListQuery(steps, areItemsNullable)
case ReducedStep.StreamStep(stream) =>
ZQuery
Expand Down Expand Up @@ -368,20 +384,33 @@ object Executor {
private def fieldInfo(field: Field, path: List[PathValue], fieldDirectives: List[Directive]): FieldInfo =
FieldInfo(field.aliasedName, field, path, fieldDirectives, field.parentType)

// In 99.99% of the cases, if the head is pure, all the other elements will be pure as well but we catch that error just in case
// NOTE: Our entire test suite passes without catching the error
private def reduceList[R](list: List[ReducedStep[R]], areItemsNullable: Boolean): ReducedStep[R] =
if (list.forall(_.isInstanceOf[PureStep]))
PureStep(ListValue(list.asInstanceOf[List[PureStep]].map(_.value)))
if (list.isEmpty || list.head.isPure)
try PureStep(ListValue(list.asInstanceOf[List[PureStep]].map(_.value)))
catch { case _: ClassCastException => ReducedStep.ListStep(list, areItemsNullable) }
else ReducedStep.ListStep(list, areItemsNullable)

private def reduceObject[R](
items: List[(String, ReducedStep[R], FieldInfo)],
wrapPureValues: Boolean
): ReducedStep[R] =
if (!wrapPureValues && items.forall(_._2.isPure))
PureStep(ObjectValue(items.asInstanceOf[List[(String, PureStep, FieldInfo)]].map { case (k, v, _) =>
(k, v.value)
}))
else ReducedStep.ObjectStep(items)
): ReducedStep[R] = {
var hasPures = false
var hasQueries = wrapPureValues
var remaining = items
while ((remaining ne Nil) && !(hasPures && hasQueries)) {
if (remaining.head._2.isPure) hasPures = true
else hasQueries = true
remaining = remaining.tail
}

if (hasQueries) ReducedStep.ObjectStep(items, hasPures)
else
PureStep(
ObjectValue(items.asInstanceOf[List[(String, PureStep, FieldInfo)]].map { case (k, v, _) => (k, v.value) })
)
}

private def effectfulExecutionError(
path: List[PathValue],
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/caliban/schema/Step.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,13 @@ object Step {
}

sealed trait ReducedStep[-R] { self =>
def isPure: Boolean =
self match {
case _: PureStep => true
case _ => false
}
def isPure: Boolean = false
}

object ReducedStep {
case class ListStep[-R](steps: List[ReducedStep[R]], areItemsNullable: Boolean) extends ReducedStep[R]
case class ObjectStep[-R](fields: List[(String, ReducedStep[R], FieldInfo)]) extends ReducedStep[R]
case class ObjectStep[-R](fields: List[(String, ReducedStep[R], FieldInfo)], hasPureFields: Boolean)
extends ReducedStep[R]
case class QueryStep[-R](query: ZQuery[R, ExecutionError, ReducedStep[R]]) extends ReducedStep[R]
case class StreamStep[-R](inner: ZStream[R, ExecutionError, ReducedStep[R]]) extends ReducedStep[R]
case class DeferStep[-R](
Expand All @@ -108,4 +105,6 @@ object ReducedStep {
*
* @param value the response value to return for that step
*/
case class PureStep(value: ResponseValue) extends Step[Any] with ReducedStep[Any]
case class PureStep(value: ResponseValue) extends Step[Any] with ReducedStep[Any] {
final override def isPure: Boolean = true
}