From 7cdc5abcad7f8cdcedad4bba94dc9e9d870791b6 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sat, 1 Feb 2020 09:23:19 -0500 Subject: [PATCH] support errors --- core/src/main/scala/zquery/DataSource.scala | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/zquery/DataSource.scala b/core/src/main/scala/zquery/DataSource.scala index 289c0de57..d30f99c07 100644 --- a/core/src/main/scala/zquery/DataSource.scala +++ b/core/src/main/scala/zquery/DataSource.scala @@ -153,15 +153,18 @@ object DataSource { */ def fromFunctionBatchedM[R, E, A, B]( name: String - )(f: Iterable[A] => ZIO[R, Nothing, Iterable[B]])(implicit ev: A <:< Request[E, B]): DataSource[R, A] = + )(f: Iterable[A] => ZIO[R, E, Iterable[B]])(implicit ev: A <:< Request[E, B]): DataSource[R, A] = new DataSource[R, A] { val identifier = name def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] = - f(requests).map { results => - (requests zip results).foldLeft(CompletedRequestMap.empty) { - case (map, (k, v)) => map.insert(k)(Right(v)) - } - } + f(requests) + .fold( + failure => requests.map((_, Left(failure))), + results => requests.zip(results.map(Right(_))) + ) + .map(_.foldLeft(CompletedRequestMap.empty) { + case (map, (k, v)) => map.insert(k)(v) + }) } /** @@ -201,12 +204,12 @@ object DataSource { */ def fromFunctionM[R, E, A, B]( name: String - )(f: A => ZIO[R, Nothing, B])(implicit ev: A <:< Request[E, B]): DataSource[R, A] = + )(f: A => ZIO[R, E, B])(implicit ev: A <:< Request[E, B]): DataSource[R, A] = new DataSource[R, A] { val identifier = name def run(requests: Iterable[A]): ZIO[R, Nothing, CompletedRequestMap] = ZIO - .foreachPar(requests)(k => ZIO.succeed(k).zip(f(k))) - .map(_.foldLeft(CompletedRequestMap.empty) { case (map, (k, v)) => map.insert(k)(Right(v)) }) + .foreachPar(requests)(k => ZIO.succeed(k).zip(f(k).either)) + .map(_.foldLeft(CompletedRequestMap.empty) { case (map, (k, v)) => map.insert(k)(v) }) } }