From 428eed139bdea087743c08c8ba3f9c28e1262754 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Tue, 4 Feb 2020 21:52:41 -0500 Subject: [PATCH 1/2] query improvements --- core/src/main/scala/zquery/Cache.scala | 8 +- .../scala/zquery/CompletedRequestMap.scala | 10 +++ core/src/main/scala/zquery/DataSource.scala | 88 ++++++++++++++++--- core/src/main/scala/zquery/Result.scala | 10 +++ core/src/main/scala/zquery/ZQuery.scala | 67 ++++++++++---- core/src/test/scala/zquery/ZQuerySpec.scala | 8 ++ 6 files changed, 157 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/zquery/Cache.scala b/core/src/main/scala/zquery/Cache.scala index 648901375..2b76aae29 100644 --- a/core/src/main/scala/zquery/Cache.scala +++ b/core/src/main/scala/zquery/Cache.scala @@ -18,10 +18,10 @@ final class Cache private (private val state: Ref[Map[Any, Any]]) { state.update(_ + (request -> result)).unit /** - * Looks up a request in the cache, returning `None` if the request is not in - * the cache, `Some(Ref(None))` if the request is in the cache but has not - * been executed yet, or `Some(Ref(Some(value)))` if the request has been - * executed. + * Looks up a request in the cache, failing with the unit value if the + * request is not in the cache, succeeding with `Ref(None)` if the request is + * in the cache but has not been executed yet, or `Ref(Some(value))` if the + * request has been xecuted. */ def lookup[E, A](request: Request[E, A]): IO[Unit, Ref[Option[Either[E, A]]]] = state.get.map(_.get(request).asInstanceOf[Option[Ref[Option[Either[E, A]]]]]).get diff --git a/core/src/main/scala/zquery/CompletedRequestMap.scala b/core/src/main/scala/zquery/CompletedRequestMap.scala index fec90011f..ac4af416d 100644 --- a/core/src/main/scala/zquery/CompletedRequestMap.scala +++ b/core/src/main/scala/zquery/CompletedRequestMap.scala @@ -20,6 +20,16 @@ final class CompletedRequestMap private (private val map: Map[Any, Either[Any, A def insert[E, A](request: Request[E, A])(result: Either[E, A]): CompletedRequestMap = new CompletedRequestMap(self.map + (request -> result)) + /** + * Appends the specified optional result to the completed request map. + */ + def insertOption[E, A](request: Request[E, A])(result: Either[E, Option[A]]): CompletedRequestMap = + result match { + case Left(e) => insert(request)(Left(e)) + case Right(Some(a)) => insert(request)(Right(a)) + case Right(None) => self + } + /** * Retrieves the result of the specified request if it exists. */ diff --git a/core/src/main/scala/zquery/DataSource.scala b/core/src/main/scala/zquery/DataSource.scala index d30f99c07..7118a42cd 100644 --- a/core/src/main/scala/zquery/DataSource.scala +++ b/core/src/main/scala/zquery/DataSource.scala @@ -18,9 +18,7 @@ import zio.{ NeedsEnv, ZIO } * the request type to a `Request[A]`. Data sources can then pattern match on * the collection of requests to determine the information requested, execute * the query, and place the results into the `CompletedRequestsMap` using - * [[CompletedRequestMap.empty]] and [[CompletedRequestMap.insert]]. Data - * sources must provide requests for all results received or fail with an `E`. - * Failure to do so will cause a query to die with a `QueryFailure` when run. + * [[CompletedRequestMap.empty]] and [[CompletedRequestMap.insert]]. */ trait DataSource[-R, -A] { self => @@ -159,14 +157,47 @@ object DataSource { def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] = f(requests) .fold( - failure => requests.map((_, Left(failure))), - results => requests.zip(results.map(Right(_))) + e => requests.map((_, Left(e))), + bs => requests.zip(bs.map(Right(_))) ) .map(_.foldLeft(CompletedRequestMap.empty) { case (map, (k, v)) => map.insert(k)(v) }) } + /** + * Constructs a data source from a pure function that takes a list of + * requests and returns a list of optional results of the same size. Each + * item in the result list must correspond to the item at the same index in + * the request list. + */ + def fromFunctionBatchedOption[A, B]( + name: String + )(f: Iterable[A] => Iterable[Option[B]])(implicit ev: A <:< Request[Nothing, B]): DataSource[Any, A] = + fromFunctionBatchedOptionM(name)(f andThen ZIO.succeed) + + /** + * Constructs a data source from an effectual function that takes a list of + * requests and returns a list of optional results of the same size. Each + * item in the result list must correspond to the item at the same index in + * the request list. + */ + def fromFunctionBatchedOptionM[R, E, A, B]( + name: String + )(f: Iterable[A] => ZIO[R, E, Iterable[Option[B]]])(implicit ev: A <:< Request[E, B]): DataSource[R, A] = + new DataSource[R, A] { + val identifier = name + def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] = + f(requests) + .fold( + e => requests.map((_, Left(e))), + bs => requests.zip(bs.map(Right(_))) + ) + .map(_.foldLeft(CompletedRequestMap.empty) { + case (map, (k, v)) => map.insertOption(k)(v) + }) + } + /** * Constructs a data source from a function that takes a list of requests and * returns a list of results of the same size. Uses the specified function to @@ -176,7 +207,9 @@ object DataSource { */ def fromFunctionBatchedWith[A, B]( name: String - )(f: Iterable[A] => Iterable[B], g: B => Request[Nothing, B]): DataSource[Any, A] = + )(f: Iterable[A] => Iterable[B], g: B => Request[Nothing, B])( + implicit ev: A <:< Request[Nothing, B] + ): DataSource[Any, A] = fromFunctionBatchedWithM(name)(f andThen ZIO.succeed, g) /** @@ -188,15 +221,20 @@ object DataSource { */ def fromFunctionBatchedWithM[R, E, A, B]( name: String - )(f: Iterable[A] => ZIO[R, Nothing, Iterable[B]], g: B => Request[E, B]): DataSource[R, A] = + )(f: Iterable[A] => ZIO[R, E, Iterable[B]], g: B => Request[E, B])( + implicit ev: A <:< Request[E, B] + ): DataSource[R, A] = new DataSource[R, A] { val identifier = name def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] = - f(requests).map { results => - results.map(b => (g(b), b)).foldLeft(CompletedRequestMap.empty) { - case (map, (k, v)) => map.insert(k)(Right(v)) - } - } + f(requests) + .fold( + e => requests.map(a => (ev(a), Left(e))), + bs => bs.map(b => (g(b), Right(b))) + ) + .map(_.foldLeft(CompletedRequestMap.empty) { + case (map, (k, v)) => map.insert(k)(v) + }) } /** @@ -212,4 +250,30 @@ object DataSource { .foreachPar(requests)(k => ZIO.succeed(k).zip(f(k).either)) .map(_.foldLeft(CompletedRequestMap.empty) { case (map, (k, v)) => map.insert(k)(v) }) } + + /** + * Constructs a data source from a pure function that may not provide results + * for all requests received. + */ + def fromFunctionOption[A, B]( + name: String + )(f: A => Option[B])(implicit ev: A <:< Request[Nothing, B]): DataSource[Any, A] = + fromFunctionOptionM(name)(f andThen ZIO.succeed) + + /** + * Constructs a data source from an effectual function that may not provide + * results for all requests received. + */ + def fromFunctionOptionM[R, E, A, B]( + name: String + )(f: A => ZIO[R, E, Option[B]])(implicit ev: A <:< Request[E, B]): DataSource[R, A] = + new DataSource[R, A] { + val identifier = name + def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] = + ZIO + .foreachPar(requests)(k => ZIO.succeed(k).zip(f(k).either)) + .map(_.foldLeft(CompletedRequestMap.empty) { + case (map, (k, v)) => map.insertOption(k)(v) + }) + } } diff --git a/core/src/main/scala/zquery/Result.scala b/core/src/main/scala/zquery/Result.scala index c89b5c268..5e2fd904a 100644 --- a/core/src/main/scala/zquery/Result.scala +++ b/core/src/main/scala/zquery/Result.scala @@ -80,6 +80,16 @@ private[zquery] object Result { def fromEither[E, A](either: Either[E, A]): Result[Any, E, A] = either.fold(e => Result.fail(Cause.fail(e)), a => Result.done(a)) + /** + * Lifts an `Option[Either[E, A]]` into a result. + */ + def fromOptionEither[E, A](oeea: Option[Either[E, A]]): Result[Any, E, Option[A]] = + oeea match { + case None => Result.done(None) + case Some(Left(e)) => Result.fail(Cause.fail(e)) + case Some(Right(a)) => Result.done(Some(a)) + } + final case class Blocked[-R, +E, +A](blockedRequests: BlockedRequestMap[R], continue: ZQuery[R, E, A]) extends Result[R, E, A] diff --git a/core/src/main/scala/zquery/ZQuery.scala b/core/src/main/scala/zquery/ZQuery.scala index 466426436..87fdcb05f 100644 --- a/core/src/main/scala/zquery/ZQuery.scala +++ b/core/src/main/scala/zquery/ZQuery.scala @@ -1,6 +1,8 @@ package zquery import zio._ +import zio.clock._ +import zio.duration._ /** * A `ZQuery[R, E, A]` is a purely functional description of an effectual query @@ -199,6 +201,24 @@ sealed trait ZQuery[-R, +E, +A] { self => a <- runCache(cache) } yield (cache, a) + /** + * Summarizes a query by computing some value before and after execution, + * and then combining the values to produce a summary, together with the + * result of execution. + */ + final def summarized[R1 <: R, E1 >: E, B, C](f: (B, B) => C)(summary: ZIO[R1, E1, B]): ZQuery[R1, E1, (C, A)] = + for { + start <- ZQuery.fromEffect(summary) + value <- self + end <- ZQuery.fromEffect(summary) + } yield (f(start, end), value) + + /** + * Returns a new query that executes this one and times the execution. + */ + final def timed: ZQuery[R with Clock, E, (Duration, A)] = + summarized[R with Clock, E, Long, Duration]((start, end) => Duration.fromNanos(end - start))(clock.nanoTime) + /** * Returns a query that models the execution of this query and the specified * query sequentially, combining their results into a tuple. @@ -286,6 +306,12 @@ object ZQuery { def collectAllPar[R, E, A](as: Iterable[ZQuery[R, E, A]]): ZQuery[R, E, List[A]] = foreachPar(as)(identity) + /** + * Constructs a query that dies with the specified error. + */ + def die(t: Throwable): ZQuery[Any, Nothing, Nothing] = + ZQuery(ZIO.die(t)) + /** * Accesses the whole environment of the query. */ @@ -320,15 +346,30 @@ object ZQuery { ZQuery(effect.foldCause(Result.fail, Result.done)) /** - * Constructs a query from a request and a data source. Queries must be - * constructed with `fromRequest` or combinators derived from it for - * optimizations to be applied. + * Constructs a query from a request and a data source. Queries will die with + * a `QueryFailure` when run if the data source does not provide results for + * all requests received. Queries must be constructed with `fromRequest` or + * combinators derived from it for optimizations to be applied. */ def fromRequest[R, E, A, B]( request: A )(dataSource: DataSource[R, A])(implicit ev: A <:< Request[E, B]): ZQuery[R, E, B] = - new ZQuery[R, E, B] { - def step(cache: Cache): ZIO[R, Nothing, Result[R, E, B]] = + fromRequestOption(request)(dataSource).flatMap { + case None => ZQuery.die(QueryFailure(dataSource, request)) + case Some(b) => ZQuery.succeed(b) + } + + /** + * Constructs a query from a request and a data source. Returns `Some` if the + * data source provides a result for a request or `None` otherwise. Queries + * must be constructed with `fromRequest` or combinators derived from it for + * optimizations to be applied. + */ + def fromRequestOption[R, E, A, B]( + request: A + )(dataSource: DataSource[R, A])(implicit ev: A <:< Request[E, B]): ZQuery[R, E, Option[B]] = + new ZQuery[R, E, Option[B]] { + def step(cache: Cache): ZIO[R, Nothing, Result[R, E, Option[B]]] = cache .lookup(request) .foldM( @@ -338,25 +379,15 @@ object ZQuery { _ <- cache.insert(request, ref) } yield Result.blocked( BlockedRequestMap(dataSource, BlockedRequest(request, ref)), - ZQuery { - ref.get.flatMap { - case None => ZIO.die(QueryFailure(dataSource, request)) - case Some(b) => ZIO.succeed(Result.fromEither(b)) - } - } + ZQuery(ref.get.map(Result.fromOptionEither)) ), ref => ref.get.map { - case Some(b) => Result.fromEither(b) + case Some(b) => Result.fromOptionEither(Some(b)) case None => Result.blocked( BlockedRequestMap.empty, - ZQuery { - ref.get.flatMap { - case None => ZIO.die(QueryFailure(dataSource, request)) - case Some(b) => ZIO.succeed(Result.fromEither(b)) - } - } + ZQuery(ref.get.map(Result.fromOptionEither)) ) } ) diff --git a/core/src/test/scala/zquery/ZQuerySpec.scala b/core/src/test/scala/zquery/ZQuerySpec.scala index e03921ad1..02d7135aa 100644 --- a/core/src/test/scala/zquery/ZQuerySpec.scala +++ b/core/src/test/scala/zquery/ZQuerySpec.scala @@ -36,6 +36,14 @@ object ZQuerySpec failure.getMessage, equalTo("Data source UserRequestDataSource did not complete request GetNameById(27).") ) + }, + testM("timed does not prevent batching") { + val a = getUserNameById(1).zip(getUserNameById(2)).timed + val b = getUserNameById(3).zip(getUserNameById(4)) + for { + result <- ZQuery.collectAllPar(List(a, b)).run + log <- TestConsole.output + } yield assert(log, hasSize(equalTo(2))) } ) ) From 9aacc2fc971743a5b0c0696caedcb1e945d577e1 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Tue, 4 Feb 2020 22:37:07 -0500 Subject: [PATCH 2/2] address review comments --- core/src/main/scala/caliban/wrappers/ApolloTracing.scala | 9 +++++---- core/src/main/scala/zquery/Cache.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/caliban/wrappers/ApolloTracing.scala b/core/src/main/scala/caliban/wrappers/ApolloTracing.scala index 4f568d412..78fbfc731 100644 --- a/core/src/main/scala/caliban/wrappers/ApolloTracing.scala +++ b/core/src/main/scala/caliban/wrappers/ApolloTracing.scala @@ -159,10 +159,11 @@ object ApolloTracing { { case (query, fieldInfo) => for { - start <- ZQuery.fromEffect(clock.nanoTime) - result <- query - end <- ZQuery.fromEffect(clock.nanoTime) - duration = Duration.fromNanos(end - start) + summarized <- query.summarized { (start: Long, end: Long) => + (start, end) + }(clock.nanoTime) + ((start, end), result) = summarized + duration = Duration.fromNanos(end - start) _ <- ZQuery.fromEffect( ref .update( diff --git a/core/src/main/scala/zquery/Cache.scala b/core/src/main/scala/zquery/Cache.scala index 2b76aae29..d0645ac38 100644 --- a/core/src/main/scala/zquery/Cache.scala +++ b/core/src/main/scala/zquery/Cache.scala @@ -21,7 +21,7 @@ final class Cache private (private val state: Ref[Map[Any, Any]]) { * Looks up a request in the cache, failing with the unit value if the * request is not in the cache, succeeding with `Ref(None)` if the request is * in the cache but has not been executed yet, or `Ref(Some(value))` if the - * request has been xecuted. + * request has been executed. */ def lookup[E, A](request: Request[E, A]): IO[Unit, Ref[Option[Either[E, A]]]] = state.get.map(_.get(request).asInstanceOf[Option[Ref[Option[Either[E, A]]]]]).get