Skip to content

Commit

Permalink
Merge pull request #612 from sangria-graphql/config-fetcher-concurrency
Browse files Browse the repository at this point in the history
support for configuring max concurrent batches in fetcher
  • Loading branch information
yanns authored Apr 16, 2021
2 parents 7686938 + e8cf501 commit d823b99
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 16 deletions.
6 changes: 6 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ align.preset = none

danglingParentheses.preset = false

rewriteTokens {
"⇒":"=>"
"←":"<-"
"→":"->"
}

rewrite.rules = [
AvoidInfix
RedundantBraces
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ ThisBuild / githubWorkflowBuildPreamble ++= List(
// Binary Incompatible Changes, we'll document.
ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[Problem]("sangria.schema.ProjectedName*"),
ProblemFilters.exclude[Problem]("sangria.schema.Args*")
ProblemFilters.exclude[Problem]("sangria.schema.Args*"),
ProblemFilters.exclude[Problem]("sangria.execution.deferred.FetcherConfig*")
)

lazy val root = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,13 @@ object Fetcher {

case class FetcherConfig(
cacheConfig: Option[() => FetcherCache] = None,
maxBatchSizeConfig: Option[Int] = None) {
maxBatchSizeConfig: Option[Int] = None,
maxConcurrentBatchesConfig: Option[Int] = None) {
def caching = copy(cacheConfig = Some(() => FetcherCache.simple))
def caching(cache: FetcherCache) = copy(cacheConfig = Some(() => cache))

def maxBatchSize(size: Int) = copy(maxBatchSizeConfig = Some(size))
def maxConcurrentBatches(numBatches: Int) = copy(maxConcurrentBatchesConfig = Some(numBatches))
}

object FetcherConfig {
Expand All @@ -215,6 +217,7 @@ object FetcherConfig {
def caching(cache: FetcherCache) = empty.caching(cache)

def maxBatchSize(size: Int) = empty.maxBatchSize(size)
def maxConcurrentBatches(numBatches: Int) = empty.maxConcurrentBatches(numBatches)
}

trait DeferredOne[+T, Id] extends Deferred[T] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,42 @@ class FetcherBasedDeferredResolver[-Ctx](
}
}

// This is here for scala 2.12 / 2.13 compat, as `Iterator.empty` does NOT
// take type parameters in 2.12, but does in 2.13.
private val emptyIterator = Iterator[Future[(Vector[Any], Try[Seq[Any]])]]()

private def resolveConcurrentBatches(
ctx: FetcherContext[Ctx] @uncheckedVariance,
f: Fetcher[Ctx, Any, Any, Any] @uncheckedVariance,
nonCachedIds: Vector[Any]
)(implicit ec: ExecutionContext): Iterator[Future[(Vector[Any], Try[Seq[Any]])]] = {

val groupedIds = ctx.fetcher.config.maxBatchSizeConfig match {
case Some(size) => nonCachedIds.grouped(size)
case None => Iterator(nonCachedIds)
}

val groupedBatches =
ctx.fetcher.config.maxConcurrentBatchesConfig match {
case Some(size) => groupedIds.grouped(size)
case None => Iterator(groupedIds)
}

groupedBatches
.foldLeft(emptyIterator) { (accumFutureSeq, groupedIds) =>
val results: Iterator[Future[(Vector[Any], Try[Seq[Any]])]] =
groupedIds.toIterator.collect {
case group if group.nonEmpty =>
f.fetch(ctx, group)
.map(r => group -> Success(r): (Vector[Any], Try[Seq[Any]]))
.recover { case e => group -> Failure(e) }
case group =>
Future.successful(group -> Success(Seq.empty))
}
accumFutureSeq ++ results
}
}

private def resolveEntities(
ctx: FetcherContext[Ctx] @uncheckedVariance,
deferredToResolve: Vector[Deferred[Any]],
Expand All @@ -191,19 +227,7 @@ class FetcherBasedDeferredResolver[-Ctx](
val ids = ctx.fetcher.ids(deferredToResolve)
val (nonCachedIds, cachedResults) = partitionCached(ctx.cache, ids)

val groupedIds = ctx.fetcher.config.maxBatchSizeConfig match {
case Some(size) => nonCachedIds.grouped(size)
case None => Iterator(nonCachedIds)
}

val results = groupedIds.map { group =>
if (group.nonEmpty)
f.fetch(ctx, group).map(r => group -> Success(r): (Vector[Any], Try[Seq[Any]])).recover {
case e => group -> Failure(e)
}
else
Future.successful(group -> Success(Seq.empty))
}
val results = resolveConcurrentBatches(ctx, f, nonCachedIds)

val futureRes = Future.sequence(results).map { allResults =>
val byId =
Expand Down
3 changes: 2 additions & 1 deletion modules/core/src/main/scala/sangria/schema/Context.scala
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ object Args {

apply(
schemaElem.arguments,
ast.ObjectValue(astElem.arguments.map(arg ast.ObjectField(arg.name, arg.value))): ast.Value,
ast.ObjectValue(
astElem.arguments.map(arg => ast.ObjectField(arg.name, arg.value))): ast.Value,
Some(variables)
)
}
Expand Down

0 comments on commit d823b99

Please sign in to comment.