Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZQuery Improvements #200

Merged
merged 2 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/src/main/scala/caliban/wrappers/ApolloTracing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ object ApolloTracing {
{
case (query, fieldInfo) =>
for {
start <- ZQuery.fromEffect(clock.nanoTime)
result <- query
end <- ZQuery.fromEffect(clock.nanoTime)
duration = Duration.fromNanos(end - start)
summarized <- query.summarized { (start: Long, end: Long) =>
(start, end)
}(clock.nanoTime)
((start, end), result) = summarized
duration = Duration.fromNanos(end - start)
_ <- ZQuery.fromEffect(
ref
.update(
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/zquery/Cache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ final class Cache private (private val state: Ref[Map[Any, Any]]) {
state.update(_ + (request -> result)).unit

/**
* Looks up a request in the cache, returning `None` if the request is not in
* the cache, `Some(Ref(None))` if the request is in the cache but has not
* been executed yet, or `Some(Ref(Some(value)))` if the request has been
* executed.
* Looks up a request in the cache, failing with the unit value if the
* request is not in the cache, succeeding with `Ref(None)` if the request is
* in the cache but has not been executed yet, or `Ref(Some(value))` if the
* request has been executed.
*/
def lookup[E, A](request: Request[E, A]): IO[Unit, Ref[Option[Either[E, A]]]] =
state.get.map(_.get(request).asInstanceOf[Option[Ref[Option[Either[E, A]]]]]).get
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/zquery/CompletedRequestMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ final class CompletedRequestMap private (private val map: Map[Any, Either[Any, A
def insert[E, A](request: Request[E, A])(result: Either[E, A]): CompletedRequestMap =
new CompletedRequestMap(self.map + (request -> result))

/**
* Appends the specified optional result to the completed request map.
*/
def insertOption[E, A](request: Request[E, A])(result: Either[E, Option[A]]): CompletedRequestMap =
result match {
case Left(e) => insert(request)(Left(e))
case Right(Some(a)) => insert(request)(Right(a))
case Right(None) => self
}

/**
* Retrieves the result of the specified request if it exists.
*/
Expand Down
88 changes: 76 additions & 12 deletions core/src/main/scala/zquery/DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import zio.{ NeedsEnv, ZIO }
* the request type to a `Request[A]`. Data sources can then pattern match on
* the collection of requests to determine the information requested, execute
* the query, and place the results into the `CompletedRequestsMap` using
* [[CompletedRequestMap.empty]] and [[CompletedRequestMap.insert]]. Data
* sources must provide requests for all results received or fail with an `E`.
* Failure to do so will cause a query to die with a `QueryFailure` when run.
* [[CompletedRequestMap.empty]] and [[CompletedRequestMap.insert]].
*/
trait DataSource[-R, -A] { self =>

Expand Down Expand Up @@ -159,14 +157,47 @@ object DataSource {
def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] =
f(requests)
.fold(
failure => requests.map((_, Left(failure))),
results => requests.zip(results.map(Right(_)))
e => requests.map((_, Left(e))),
bs => requests.zip(bs.map(Right(_)))
)
.map(_.foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insert(k)(v)
})
}

/**
* Constructs a data source from a pure function that takes a list of
* requests and returns a list of optional results of the same size. Each
* item in the result list must correspond to the item at the same index in
* the request list.
*/
def fromFunctionBatchedOption[A, B](
name: String
)(f: Iterable[A] => Iterable[Option[B]])(implicit ev: A <:< Request[Nothing, B]): DataSource[Any, A] =
fromFunctionBatchedOptionM(name)(f andThen ZIO.succeed)

/**
* Constructs a data source from an effectual function that takes a list of
* requests and returns a list of optional results of the same size. Each
* item in the result list must correspond to the item at the same index in
* the request list.
*/
def fromFunctionBatchedOptionM[R, E, A, B](
name: String
)(f: Iterable[A] => ZIO[R, E, Iterable[Option[B]]])(implicit ev: A <:< Request[E, B]): DataSource[R, A] =
new DataSource[R, A] {
val identifier = name
def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] =
f(requests)
.fold(
e => requests.map((_, Left(e))),
bs => requests.zip(bs.map(Right(_)))
)
.map(_.foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insertOption(k)(v)
})
}

/**
* Constructs a data source from a function that takes a list of requests and
* returns a list of results of the same size. Uses the specified function to
Expand All @@ -176,7 +207,9 @@ object DataSource {
*/
def fromFunctionBatchedWith[A, B](
name: String
)(f: Iterable[A] => Iterable[B], g: B => Request[Nothing, B]): DataSource[Any, A] =
)(f: Iterable[A] => Iterable[B], g: B => Request[Nothing, B])(
implicit ev: A <:< Request[Nothing, B]
): DataSource[Any, A] =
fromFunctionBatchedWithM(name)(f andThen ZIO.succeed, g)

/**
Expand All @@ -188,15 +221,20 @@ object DataSource {
*/
def fromFunctionBatchedWithM[R, E, A, B](
name: String
)(f: Iterable[A] => ZIO[R, Nothing, Iterable[B]], g: B => Request[E, B]): DataSource[R, A] =
)(f: Iterable[A] => ZIO[R, E, Iterable[B]], g: B => Request[E, B])(
implicit ev: A <:< Request[E, B]
): DataSource[R, A] =
new DataSource[R, A] {
val identifier = name
def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] =
f(requests).map { results =>
results.map(b => (g(b), b)).foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insert(k)(Right(v))
}
}
f(requests)
.fold(
e => requests.map(a => (ev(a), Left(e))),
bs => bs.map(b => (g(b), Right(b)))
)
.map(_.foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insert(k)(v)
})
}

/**
Expand All @@ -212,4 +250,30 @@ object DataSource {
.foreachPar(requests)(k => ZIO.succeed(k).zip(f(k).either))
.map(_.foldLeft(CompletedRequestMap.empty) { case (map, (k, v)) => map.insert(k)(v) })
}

/**
* Constructs a data source from a pure function that may not provide results
* for all requests received.
*/
def fromFunctionOption[A, B](
name: String
)(f: A => Option[B])(implicit ev: A <:< Request[Nothing, B]): DataSource[Any, A] =
fromFunctionOptionM(name)(f andThen ZIO.succeed)

/**
* Constructs a data source from an effectual function that may not provide
* results for all requests received.
*/
def fromFunctionOptionM[R, E, A, B](
name: String
)(f: A => ZIO[R, E, Option[B]])(implicit ev: A <:< Request[E, B]): DataSource[R, A] =
new DataSource[R, A] {
val identifier = name
def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] =
ZIO
.foreachPar(requests)(k => ZIO.succeed(k).zip(f(k).either))
.map(_.foldLeft(CompletedRequestMap.empty) {
case (map, (k, v)) => map.insertOption(k)(v)
})
}
}
10 changes: 10 additions & 0 deletions core/src/main/scala/zquery/Result.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ private[zquery] object Result {
def fromEither[E, A](either: Either[E, A]): Result[Any, E, A] =
either.fold(e => Result.fail(Cause.fail(e)), a => Result.done(a))

/**
* Lifts an `Option[Either[E, A]]` into a result.
*/
def fromOptionEither[E, A](oeea: Option[Either[E, A]]): Result[Any, E, Option[A]] =
oeea match {
case None => Result.done(None)
case Some(Left(e)) => Result.fail(Cause.fail(e))
case Some(Right(a)) => Result.done(Some(a))
}

final case class Blocked[-R, +E, +A](blockedRequests: BlockedRequestMap[R], continue: ZQuery[R, E, A])
extends Result[R, E, A]

Expand Down
67 changes: 49 additions & 18 deletions core/src/main/scala/zquery/ZQuery.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package zquery

import zio._
import zio.clock._
import zio.duration._

/**
* A `ZQuery[R, E, A]` is a purely functional description of an effectual query
Expand Down Expand Up @@ -199,6 +201,24 @@ sealed trait ZQuery[-R, +E, +A] { self =>
a <- runCache(cache)
} yield (cache, a)

/**
* Summarizes a query by computing some value before and after execution,
* and then combining the values to produce a summary, together with the
* result of execution.
*/
final def summarized[R1 <: R, E1 >: E, B, C](f: (B, B) => C)(summary: ZIO[R1, E1, B]): ZQuery[R1, E1, (C, A)] =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we switch the parameters here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I was slightly inclined to wait until RC18 and make conforming changes to that so we track the public ZIO API for identical methods but happy to change now as well.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, either way's fine 👍

for {
start <- ZQuery.fromEffect(summary)
value <- self
end <- ZQuery.fromEffect(summary)
} yield (f(start, end), value)

/**
* Returns a new query that executes this one and times the execution.
*/
final def timed: ZQuery[R with Clock, E, (Duration, A)] =
ghostdogpr marked this conversation as resolved.
Show resolved Hide resolved
summarized[R with Clock, E, Long, Duration]((start, end) => Duration.fromNanos(end - start))(clock.nanoTime)

/**
* Returns a query that models the execution of this query and the specified
* query sequentially, combining their results into a tuple.
Expand Down Expand Up @@ -286,6 +306,12 @@ object ZQuery {
def collectAllPar[R, E, A](as: Iterable[ZQuery[R, E, A]]): ZQuery[R, E, List[A]] =
foreachPar(as)(identity)

/**
* Constructs a query that dies with the specified error.
*/
def die(t: Throwable): ZQuery[Any, Nothing, Nothing] =
ZQuery(ZIO.die(t))

/**
* Accesses the whole environment of the query.
*/
Expand Down Expand Up @@ -320,15 +346,30 @@ object ZQuery {
ZQuery(effect.foldCause(Result.fail, Result.done))

/**
* Constructs a query from a request and a data source. Queries must be
* constructed with `fromRequest` or combinators derived from it for
* optimizations to be applied.
* Constructs a query from a request and a data source. Queries will die with
* a `QueryFailure` when run if the data source does not provide results for
* all requests received. Queries must be constructed with `fromRequest` or
* combinators derived from it for optimizations to be applied.
*/
def fromRequest[R, E, A, B](
request: A
)(dataSource: DataSource[R, A])(implicit ev: A <:< Request[E, B]): ZQuery[R, E, B] =
new ZQuery[R, E, B] {
def step(cache: Cache): ZIO[R, Nothing, Result[R, E, B]] =
fromRequestOption(request)(dataSource).flatMap {
case None => ZQuery.die(QueryFailure(dataSource, request))
case Some(b) => ZQuery.succeed(b)
}

/**
* Constructs a query from a request and a data source. Returns `Some` if the
* data source provides a result for a request or `None` otherwise. Queries
* must be constructed with `fromRequest` or combinators derived from it for
* optimizations to be applied.
*/
def fromRequestOption[R, E, A, B](
request: A
)(dataSource: DataSource[R, A])(implicit ev: A <:< Request[E, B]): ZQuery[R, E, Option[B]] =
new ZQuery[R, E, Option[B]] {
def step(cache: Cache): ZIO[R, Nothing, Result[R, E, Option[B]]] =
cache
.lookup(request)
.foldM(
Expand All @@ -338,25 +379,15 @@ object ZQuery {
_ <- cache.insert(request, ref)
} yield Result.blocked(
BlockedRequestMap(dataSource, BlockedRequest(request, ref)),
ZQuery {
ref.get.flatMap {
case None => ZIO.die(QueryFailure(dataSource, request))
case Some(b) => ZIO.succeed(Result.fromEither(b))
}
}
ZQuery(ref.get.map(Result.fromOptionEither))
),
ref =>
ref.get.map {
case Some(b) => Result.fromEither(b)
case Some(b) => Result.fromOptionEither(Some(b))
case None =>
Result.blocked(
BlockedRequestMap.empty,
ZQuery {
ref.get.flatMap {
case None => ZIO.die(QueryFailure(dataSource, request))
case Some(b) => ZIO.succeed(Result.fromEither(b))
}
}
ZQuery(ref.get.map(Result.fromOptionEither))
)
}
)
Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/zquery/ZQuerySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ object ZQuerySpec
failure.getMessage,
equalTo("Data source UserRequestDataSource did not complete request GetNameById(27).")
)
},
testM("timed does not prevent batching") {
val a = getUserNameById(1).zip(getUserNameById(2)).timed
val b = getUserNameById(3).zip(getUserNameById(4))
for {
result <- ZQuery.collectAllPar(List(a, b)).run
log <- TestConsole.output
} yield assert(log, hasSize(equalTo(2)))
}
)
)
Expand Down