Skip to content

Commit

Permalink
ZQuery Improvements (#200)
Browse files Browse the repository at this point in the history
* query improvements

* address review comments
  • Loading branch information
adamgfraser authored Feb 5, 2020
1 parent 78559d9 commit 2a7c53b
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 38 deletions.
9 changes: 5 additions & 4 deletions core/src/main/scala/caliban/wrappers/ApolloTracing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ object ApolloTracing {
{
case (query, fieldInfo) =>
for {
start <- ZQuery.fromEffect(clock.nanoTime)
result <- query
end <- ZQuery.fromEffect(clock.nanoTime)
duration = Duration.fromNanos(end - start)
summarized <- query.summarized { (start: Long, end: Long) =>
(start, end)
}(clock.nanoTime)
((start, end), result) = summarized
duration = Duration.fromNanos(end - start)
_ <- ZQuery.fromEffect(
ref
.update(
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/zquery/Cache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ final class Cache private (private val state: Ref[Map[Any, Any]]) {
state.update(_ + (request -> result)).unit

/**
* Looks up a request in the cache, returning `None` if the request is not in
* the cache, `Some(Ref(None))` if the request is in the cache but has not
* been executed yet, or `Some(Ref(Some(value)))` if the request has been
* executed.
* Looks up a request in the cache, failing with the unit value if the
* request is not in the cache, succeeding with `Ref(None)` if the request is
* in the cache but has not been executed yet, or `Ref(Some(value))` if the
* request has been executed.
*/
def lookup[E, A](request: Request[E, A]): IO[Unit, Ref[Option[Either[E, A]]]] =
state.get.map(_.get(request).asInstanceOf[Option[Ref[Option[Either[E, A]]]]]).get
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/zquery/CompletedRequestMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ final class CompletedRequestMap private (private val map: Map[Any, Either[Any, A
def insert[E, A](request: Request[E, A])(result: Either[E, A]): CompletedRequestMap =
new CompletedRequestMap(self.map + (request -> result))

/**
* Appends the specified optional result to the completed request map.
*/
def insertOption[E, A](request: Request[E, A])(result: Either[E, Option[A]]): CompletedRequestMap =
result match {
case Left(e) => insert(request)(Left(e))
case Right(Some(a)) => insert(request)(Right(a))
case Right(None) => self
}

/**
* Retrieves the result of the specified request if it exists.
*/
Expand Down
88 changes: 76 additions & 12 deletions core/src/main/scala/zquery/DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import zio.{ NeedsEnv, ZIO }
* the request type to a `Request[A]`. Data sources can then pattern match on
* the collection of requests to determine the information requested, execute
* the query, and place the results into the `CompletedRequestsMap` using
* [[CompletedRequestMap.empty]] and [[CompletedRequestMap.insert]]. Data
* sources must provide requests for all results received or fail with an `E`.
* Failure to do so will cause a query to die with a `QueryFailure` when run.
* [[CompletedRequestMap.empty]] and [[CompletedRequestMap.insert]].
*/
trait DataSource[-R, -A] { self =>

Expand Down Expand Up @@ -159,14 +157,47 @@ object DataSource {
def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] =
f(requests)
.fold(
failure => requests.map((_, Left(failure))),
results => requests.zip(results.map(Right(_)))
e => requests.map((_, Left(e))),
bs => requests.zip(bs.map(Right(_)))
)
.map(_.foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insert(k)(v)
})
}

/**
* Constructs a data source from a pure function that takes a list of
* requests and returns a list of optional results of the same size. Each
* item in the result list must correspond to the item at the same index in
* the request list.
*/
def fromFunctionBatchedOption[A, B](
name: String
)(f: Iterable[A] => Iterable[Option[B]])(implicit ev: A <:< Request[Nothing, B]): DataSource[Any, A] =
fromFunctionBatchedOptionM(name)(f andThen ZIO.succeed)

/**
* Constructs a data source from an effectual function that takes a list of
* requests and returns a list of optional results of the same size. Each
* item in the result list must correspond to the item at the same index in
* the request list.
*/
def fromFunctionBatchedOptionM[R, E, A, B](
name: String
)(f: Iterable[A] => ZIO[R, E, Iterable[Option[B]]])(implicit ev: A <:< Request[E, B]): DataSource[R, A] =
new DataSource[R, A] {
val identifier = name
def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] =
f(requests)
.fold(
e => requests.map((_, Left(e))),
bs => requests.zip(bs.map(Right(_)))
)
.map(_.foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insertOption(k)(v)
})
}

/**
* Constructs a data source from a function that takes a list of requests and
* returns a list of results of the same size. Uses the specified function to
Expand All @@ -176,7 +207,9 @@ object DataSource {
*/
def fromFunctionBatchedWith[A, B](
name: String
)(f: Iterable[A] => Iterable[B], g: B => Request[Nothing, B]): DataSource[Any, A] =
)(f: Iterable[A] => Iterable[B], g: B => Request[Nothing, B])(
implicit ev: A <:< Request[Nothing, B]
): DataSource[Any, A] =
fromFunctionBatchedWithM(name)(f andThen ZIO.succeed, g)

/**
Expand All @@ -188,15 +221,20 @@ object DataSource {
*/
def fromFunctionBatchedWithM[R, E, A, B](
name: String
)(f: Iterable[A] => ZIO[R, Nothing, Iterable[B]], g: B => Request[E, B]): DataSource[R, A] =
)(f: Iterable[A] => ZIO[R, E, Iterable[B]], g: B => Request[E, B])(
implicit ev: A <:< Request[E, B]
): DataSource[R, A] =
new DataSource[R, A] {
val identifier = name
def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] =
f(requests).map { results =>
results.map(b => (g(b), b)).foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insert(k)(Right(v))
}
}
f(requests)
.fold(
e => requests.map(a => (ev(a), Left(e))),
bs => bs.map(b => (g(b), Right(b)))
)
.map(_.foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insert(k)(v)
})
}

/**
Expand All @@ -212,4 +250,30 @@ object DataSource {
.foreachPar(requests)(k => ZIO.succeed(k).zip(f(k).either))
.map(_.foldLeft(CompletedRequestMap.empty) { case (map, (k, v)) => map.insert(k)(v) })
}

/**
* Constructs a data source from a pure function that may not provide results
* for all requests received.
*/
def fromFunctionOption[A, B](
name: String
)(f: A => Option[B])(implicit ev: A <:< Request[Nothing, B]): DataSource[Any, A] =
fromFunctionOptionM(name)(f andThen ZIO.succeed)

/**
* Constructs a data source from an effectual function that may not provide
* results for all requests received.
*/
def fromFunctionOptionM[R, E, A, B](
name: String
)(f: A => ZIO[R, E, Option[B]])(implicit ev: A <:< Request[E, B]): DataSource[R, A] =
new DataSource[R, A] {
val identifier = name
def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] =
ZIO
.foreachPar(requests)(k => ZIO.succeed(k).zip(f(k).either))
.map(_.foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insertOption(k)(v)
})
}
}
10 changes: 10 additions & 0 deletions core/src/main/scala/zquery/Result.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ private[zquery] object Result {
def fromEither[E, A](either: Either[E, A]): Result[Any, E, A] =
either.fold(e => Result.fail(Cause.fail(e)), a => Result.done(a))

/**
* Lifts an `Option[Either[E, A]]` into a result.
*/
def fromOptionEither[E, A](oeea: Option[Either[E, A]]): Result[Any, E, Option[A]] =
oeea match {
case None => Result.done(None)
case Some(Left(e)) => Result.fail(Cause.fail(e))
case Some(Right(a)) => Result.done(Some(a))
}

final case class Blocked[-R, +E, +A](blockedRequests: BlockedRequestMap[R], continue: ZQuery[R, E, A])
extends Result[R, E, A]

Expand Down
67 changes: 49 additions & 18 deletions core/src/main/scala/zquery/ZQuery.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package zquery

import zio._
import zio.clock._
import zio.duration._

/**
* A `ZQuery[R, E, A]` is a purely functional description of an effectual query
Expand Down Expand Up @@ -199,6 +201,24 @@ sealed trait ZQuery[-R, +E, +A] { self =>
a <- runCache(cache)
} yield (cache, a)

/**
* Summarizes a query by computing some value before and after execution,
* and then combining the values to produce a summary, together with the
* result of execution.
*/
final def summarized[R1 <: R, E1 >: E, B, C](f: (B, B) => C)(summary: ZIO[R1, E1, B]): ZQuery[R1, E1, (C, A)] =
for {
start <- ZQuery.fromEffect(summary)
value <- self
end <- ZQuery.fromEffect(summary)
} yield (f(start, end), value)

/**
* Returns a new query that executes this one and times the execution.
*/
final def timed: ZQuery[R with Clock, E, (Duration, A)] =
summarized[R with Clock, E, Long, Duration]((start, end) => Duration.fromNanos(end - start))(clock.nanoTime)

/**
* Returns a query that models the execution of this query and the specified
* query sequentially, combining their results into a tuple.
Expand Down Expand Up @@ -286,6 +306,12 @@ object ZQuery {
def collectAllPar[R, E, A](as: Iterable[ZQuery[R, E, A]]): ZQuery[R, E, List[A]] =
foreachPar(as)(identity)

/**
* Constructs a query that dies with the specified error.
*/
def die(t: Throwable): ZQuery[Any, Nothing, Nothing] =
ZQuery(ZIO.die(t))

/**
* Accesses the whole environment of the query.
*/
Expand Down Expand Up @@ -320,15 +346,30 @@ object ZQuery {
ZQuery(effect.foldCause(Result.fail, Result.done))

/**
* Constructs a query from a request and a data source. Queries must be
* constructed with `fromRequest` or combinators derived from it for
* optimizations to be applied.
* Constructs a query from a request and a data source. Queries will die with
* a `QueryFailure` when run if the data source does not provide results for
* all requests received. Queries must be constructed with `fromRequest` or
* combinators derived from it for optimizations to be applied.
*/
def fromRequest[R, E, A, B](
request: A
)(dataSource: DataSource[R, A])(implicit ev: A <:< Request[E, B]): ZQuery[R, E, B] =
new ZQuery[R, E, B] {
def step(cache: Cache): ZIO[R, Nothing, Result[R, E, B]] =
fromRequestOption(request)(dataSource).flatMap {
case None => ZQuery.die(QueryFailure(dataSource, request))
case Some(b) => ZQuery.succeed(b)
}

/**
* Constructs a query from a request and a data source. Returns `Some` if the
* data source provides a result for a request or `None` otherwise. Queries
* must be constructed with `fromRequest` or combinators derived from it for
* optimizations to be applied.
*/
def fromRequestOption[R, E, A, B](
request: A
)(dataSource: DataSource[R, A])(implicit ev: A <:< Request[E, B]): ZQuery[R, E, Option[B]] =
new ZQuery[R, E, Option[B]] {
def step(cache: Cache): ZIO[R, Nothing, Result[R, E, Option[B]]] =
cache
.lookup(request)
.foldM(
Expand All @@ -338,25 +379,15 @@ object ZQuery {
_ <- cache.insert(request, ref)
} yield Result.blocked(
BlockedRequestMap(dataSource, BlockedRequest(request, ref)),
ZQuery {
ref.get.flatMap {
case None => ZIO.die(QueryFailure(dataSource, request))
case Some(b) => ZIO.succeed(Result.fromEither(b))
}
}
ZQuery(ref.get.map(Result.fromOptionEither))
),
ref =>
ref.get.map {
case Some(b) => Result.fromEither(b)
case Some(b) => Result.fromOptionEither(Some(b))
case None =>
Result.blocked(
BlockedRequestMap.empty,
ZQuery {
ref.get.flatMap {
case None => ZIO.die(QueryFailure(dataSource, request))
case Some(b) => ZIO.succeed(Result.fromEither(b))
}
}
ZQuery(ref.get.map(Result.fromOptionEither))
)
}
)
Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/zquery/ZQuerySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ object ZQuerySpec
failure.getMessage,
equalTo("Data source UserRequestDataSource did not complete request GetNameById(27).")
)
},
testM("timed does not prevent batching") {
val a = getUserNameById(1).zip(getUserNameById(2)).timed
val b = getUserNameById(3).zip(getUserNameById(4))
for {
result <- ZQuery.collectAllPar(List(a, b)).run
log <- TestConsole.output
} yield assert(log, hasSize(equalTo(2)))
}
)
)
Expand Down

0 comments on commit 2a7c53b

Please sign in to comment.