Skip to content

Commit

Permalink
support for configuring max concurrent batches in fetcher
Browse files Browse the repository at this point in the history
added a new config param maxConcurrentBatches to configure
the max number of batched IDs that can be fetched concurrently by fetcher.
not specifying any value for this param will fallback to the existing
behaviour of running all batches concurrently.
  • Loading branch information
vivekdhayaal authored and nickhudkins committed Apr 14, 2021
1 parent 7686938 commit a1cb696
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,13 @@ object Fetcher {

case class FetcherConfig(
cacheConfig: Option[() => FetcherCache] = None,
maxBatchSizeConfig: Option[Int] = None) {
maxBatchSizeConfig: Option[Int] = None,
maxConcurrentBatchesConfig: Option[Int] = None) {
def caching = copy(cacheConfig = Some(() => FetcherCache.simple))
def caching(cache: FetcherCache) = copy(cacheConfig = Some(() => cache))

def maxBatchSize(size: Int) = copy(maxBatchSizeConfig = Some(size))
def maxConcurrentBatches(size: Int) = empty.maxConcurrentBatches(size)
}

object FetcherConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,34 @@ class FetcherBasedDeferredResolver[-Ctx](
}
}

private def resolveConcurrentBatches(
ctx: FetcherContext[Ctx] @uncheckedVariance,
f: Fetcher[Ctx, Any, Any, Any] @uncheckedVariance,
nonCachedIds: Vector[Any]
)(implicit ec: ExecutionContext): Iterator[Future[(Vector[Any], Try[Seq[Any]])]] = {
val groupedIds: Iterator[Vector[Any]] = ctx.fetcher.config.maxBatchSizeConfig match {
case Some(size) nonCachedIds.grouped(size)
case None Iterator(nonCachedIds)
}

val groupedBatches: Iterator[TraversableOnce[Vector[Any]]] = ctx.fetcher.config.maxConcurrentBatchesConfig match {
case Some(size) groupedIds.grouped(size)
case None Iterator(groupedIds)
}

groupedBatches
.foldLeft(Iterator.empty[Future[(Vector[Any], Try[Seq[Any]])]]) {
(accumFutureSeq, groupedIds) =>
val results: Iterator[Future[(Vector[Any], Try[Seq[Any]])]] = groupedIds.toIterator map { group
if (group.nonEmpty)
f.fetch(ctx, group).map(r group Success(r): (Vector[Any], Try[Seq[Any]])).recover {case e group Failure(e)}
else
Future.successful(group Success(Seq.empty))
}
accumFutureSeq ++ results
}
}

private def resolveEntities(
ctx: FetcherContext[Ctx] @uncheckedVariance,
deferredToResolve: Vector[Deferred[Any]],
Expand All @@ -191,19 +219,7 @@ class FetcherBasedDeferredResolver[-Ctx](
val ids = ctx.fetcher.ids(deferredToResolve)
val (nonCachedIds, cachedResults) = partitionCached(ctx.cache, ids)

val groupedIds = ctx.fetcher.config.maxBatchSizeConfig match {
case Some(size) => nonCachedIds.grouped(size)
case None => Iterator(nonCachedIds)
}

val results = groupedIds.map { group =>
if (group.nonEmpty)
f.fetch(ctx, group).map(r => group -> Success(r): (Vector[Any], Try[Seq[Any]])).recover {
case e => group -> Failure(e)
}
else
Future.successful(group -> Success(Seq.empty))
}
val results = resolveConcurrentBatches(ctx, f, nonCachedIds)

val futureRes = Future.sequence(results).map { allResults =>
val byId =
Expand Down

0 comments on commit a1cb696

Please sign in to comment.