From f5eb4b27db9dc4a13b5915518fe1251b8a66d4ab Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sun, 5 Nov 2023 13:25:48 +1100 Subject: [PATCH 1/2] Run deferred stream sequentially --- .../scala/caliban/execution/Executor.scala | 2 +- .../execution/DeferredExecutionSpec.scala | 35 ++++++++++++++++--- .../execution/TestDeferredSchema.scala | 20 +++++++++++ 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/caliban/execution/Executor.scala b/core/src/main/scala/caliban/execution/Executor.scala index 202d9d4711..48f7abb0c1 100644 --- a/core/src/main/scala/caliban/execution/Executor.scala +++ b/core/src/main/scala/caliban/execution/Executor.scala @@ -231,7 +231,7 @@ object Executor { ZStream.succeed(resp) ++ makeDeferStream(more, cache) }) - ZStream.mergeAllUnbounded()(defers.map(run): _*) + ZStream.mergeAll(1)(defers.map(run): _*) } def runIncrementalQuery( diff --git a/core/src/test/scala/caliban/execution/DeferredExecutionSpec.scala b/core/src/test/scala/caliban/execution/DeferredExecutionSpec.scala index d5e97ee8f2..67221d74e8 100644 --- a/core/src/test/scala/caliban/execution/DeferredExecutionSpec.scala +++ b/core/src/test/scala/caliban/execution/DeferredExecutionSpec.scala @@ -2,11 +2,11 @@ package caliban.execution import caliban.Macros.gqldoc import caliban.ResponseValue.StreamValue -import caliban.TestUtils.{ characters, Character } -import caliban.{ CalibanError, GraphQLResponse, ResponseValue, Value } +import caliban.TestUtils.{Character, characters} +import caliban.{CalibanError, GraphQLResponse, ResponseValue, Value} import zio.test.Assertion.hasSameElements -import zio.test.{ assert, assertTrue, TestAspect, ZIOSpecDefault } -import zio.{ Chunk, UIO, ZIO, ZLayer } +import zio.test.{TestAspect, TestClock, ZIOSpecDefault, assert, assertTrue} +import zio._ trait CharacterService { def characterBy(pred: Character => Boolean): UIO[List[Character]] @@ -188,7 +188,32 @@ object DeferredExecutionSpec extends ZIOSpecDefault { """{"hasNext":false}""" ) ) - } + }, + test("deferred fields backed by a datasource") { + import TestDatasourceDeferredSchema._ + val query = gqldoc(""" + { + foo { + bar { ... @defer { value } } + } + }""") + + for { + resp <- interpreter.flatMap(_.execute(query)) + rest <- runIncrementalResponses(resp) + head = rest.head.toString + mid = rest.tail.init.toList.map(_.toString) + last = rest.last.toString + } yield assertTrue( + head == """{"foo":{"bar":[{},{},{}]}}""", + mid.sorted == List( + """{"incremental":[{"data":{"value":"value"},"path":["foo","bar",0]}],"hasNext":true}""", + """{"incremental":[{"data":{"value":"value"},"path":["foo","bar",1]}],"hasNext":true}""", + """{"incremental":[{"data":{"value":"value"},"path":["foo","bar",2]}],"hasNext":true}""" + ), + last == """{"hasNext":false}""" + ) + } @@ TestAspect.nonFlaky(50) ).provide(CharacterService.test) def runIncrementalResponses(response: GraphQLResponse[CalibanError]) = diff --git a/core/src/test/scala/caliban/execution/TestDeferredSchema.scala b/core/src/test/scala/caliban/execution/TestDeferredSchema.scala index cb904fa760..a3a7898fc8 100644 --- a/core/src/test/scala/caliban/execution/TestDeferredSchema.scala +++ b/core/src/test/scala/caliban/execution/TestDeferredSchema.scala @@ -6,6 +6,7 @@ import caliban.{ graphQL, RootResolver } import caliban.schema.Annotations.GQLName import caliban.schema.{ GenericSchema, Schema } import caliban.wrappers.DeferSupport +import zio.query.{ DataSource, Request, UQuery, ZQuery } import zio.{ UIO, URIO, ZIO } object TestDeferredSchema extends GenericSchema[CharacterService] { @@ -77,3 +78,22 @@ object TestDeferredSchema extends GenericSchema[CharacterService] { val interpreter = (graphQL(resolver) @@ DeferSupport.defer).interpreter } + +object TestDatasourceDeferredSchema extends GenericSchema[Any] { + import auto._ + + case class Bar(value: UQuery[String]) + case class Foo(bar: UIO[List[Bar]]) + case class Query(foo: UIO[Foo]) + + case class Req(i: Int) extends Request[Nothing, String] + private val ds = DataSource.fromFunctionZIO("ValuesDS")((_: Req) => ZIO.succeed("value")) + + private def makeBar(i: Int): Bar = Bar(ZQuery.fromRequest(Req(i))(ds)) + + private val resolver = RootResolver(Query(ZIO.succeed { + Foo(bar = ZIO.succeed(List(makeBar(1), makeBar(3), makeBar(3)))) + })) + + val interpreter = (graphQL(resolver) @@ DeferSupport.defer).interpreter +} From 49ea6aba87c6c617139f0bc947f9e6511f7f806c Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sun, 5 Nov 2023 13:29:06 +1100 Subject: [PATCH 2/2] fmt --- .../scala/caliban/execution/DeferredExecutionSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/caliban/execution/DeferredExecutionSpec.scala b/core/src/test/scala/caliban/execution/DeferredExecutionSpec.scala index 67221d74e8..c94164bd34 100644 --- a/core/src/test/scala/caliban/execution/DeferredExecutionSpec.scala +++ b/core/src/test/scala/caliban/execution/DeferredExecutionSpec.scala @@ -2,10 +2,10 @@ package caliban.execution import caliban.Macros.gqldoc import caliban.ResponseValue.StreamValue -import caliban.TestUtils.{Character, characters} -import caliban.{CalibanError, GraphQLResponse, ResponseValue, Value} +import caliban.TestUtils.{ characters, Character } +import caliban.{ CalibanError, GraphQLResponse, ResponseValue, Value } import zio.test.Assertion.hasSameElements -import zio.test.{TestAspect, TestClock, ZIOSpecDefault, assert, assertTrue} +import zio.test.{ assert, assertTrue, TestAspect, TestClock, ZIOSpecDefault } import zio._ trait CharacterService {