diff --git a/adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala b/adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala index be2d042a4c..f79e1dcecc 100644 --- a/adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala +++ b/adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala @@ -96,7 +96,15 @@ class AkkaHttpAdapter private (private val options: AkkaHttpServerOptions)(impli akkaInterpreter.toRoute( convertWebSocketEndpoint( endpoint.asInstanceOf[ - ServerEndpoint.Full[Unit, Unit, (ServerRequest, String), StatusCode, CalibanPipe, ZioWebSockets, RIO[R, *]] + ServerEndpoint.Full[ + Unit, + Unit, + (ServerRequest, String), + StatusCode, + (String, CalibanPipe), + ZioWebSockets, + RIO[R, *] + ] ] ) ) @@ -114,22 +122,38 @@ object AkkaHttpAdapter { type AkkaPipe = Flow[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput], Any] def convertWebSocketEndpoint[R]( - endpoint: ServerEndpoint.Full[Unit, Unit, (ServerRequest, String), StatusCode, CalibanPipe, ZioWebSockets, RIO[ - R, - * - ]] + endpoint: ServerEndpoint.Full[ + Unit, + Unit, + (ServerRequest, String), + StatusCode, + (String, CalibanPipe), + ZioWebSockets, + RIO[ + R, + * + ] + ] )(implicit ec: ExecutionContext, runtime: Runtime[R], materializer: Materializer ): ServerEndpoint[AkkaStreams with WebSockets, Future] = - ServerEndpoint[Unit, Unit, (ServerRequest, String), StatusCode, AkkaPipe, AkkaStreams with WebSockets, Future]( + ServerEndpoint[ + Unit, + Unit, + (ServerRequest, String), + StatusCode, + (String, AkkaPipe), + AkkaStreams with WebSockets, + Future + ]( endpoint.endpoint .asInstanceOf[ PublicEndpoint[ (ServerRequest, String), StatusCode, - Pipe[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput]], + (String, Pipe[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput]]), Any ] ], @@ -140,7 +164,7 @@ object AkkaHttpAdapter { runtime .unsafeRunToFuture(endpoint.logic(zioMonadError)(())(req)) .future - .map(_.map { zioPipe => + .map(_.map { case (protocol, zioPipe) => val io = for { inputQueue <- ZQueue.unbounded[GraphQLWSInput] @@ -155,7 +179,7 @@ object AkkaHttpAdapter { flow = Flow.fromSinkAndSourceCoupled(sink, source).watchTermination() { (_, f) => f.onComplete(_ => runtime.unsafeRun(fiber.interrupt)) } - } yield flow + } yield (protocol, flow) runtime.unsafeRun(io) }) ) diff --git a/adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala b/adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala index 4f3e1cc16a..af9f213ea0 100644 --- a/adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala +++ b/adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala @@ -56,7 +56,7 @@ object Http4sAdapter { queryExecution, requestInterceptor ) - val endpointsF = endpoints.map(convertHttpEndpointToF[F, R, E]) + val endpointsF = endpoints.map(convertHttpEndpointToF[F, R]) Http4sServerInterpreter().toRoutes(endpointsF) } @@ -91,7 +91,7 @@ object Http4sAdapter { queryExecution, requestInterceptor ) - val endpointF = convertHttpEndpointToF[F, R, E](endpoint) + val endpointF = convertHttpEndpointToF[F, R](endpoint) Http4sServerInterpreter().toRoutes(endpointF) } @@ -138,7 +138,7 @@ object Http4sAdapter { requestInterceptor, webSocketHooks ) - val endpointF = convertWebSocketEndpointToF[F, R, E](endpoint) + val endpointF = convertWebSocketEndpointToF[F, R](endpoint) Http4sServerInterpreter().toWebSocketRoutes(endpointF)(builder) } @@ -194,7 +194,7 @@ object Http4sAdapter { * If you wish to use `Http4sServerInterpreter` with cats-effect IO instead of `ZHttp4sServerInterpreter`, * you can use this function to convert the tapir endpoints to their cats-effect counterpart. */ - def convertHttpEndpointToF[F[_], R, E]( + def convertHttpEndpointToF[F[_], R]( endpoint: ServerEndpoint[Any, RIO[R, *]] )(implicit interop: ToEffect[F, R]): ServerEndpoint[Any, F] = ServerEndpoint[ @@ -215,7 +215,7 @@ object Http4sAdapter { * If you wish to use `Http4sServerInterpreter` with cats-effect IO instead of `ZHttp4sServerInterpreter`, * you can use this function to convert the tapir endpoints to their cats-effect counterpart. */ - def convertWebSocketEndpointToF[F[_], R, E]( + def convertWebSocketEndpointToF[F[_], R]( endpoint: ServerEndpoint[ZioWebSockets, RIO[R, *]] )(implicit interop: CatsInterop[F, R], runtime: Runtime[R]): ServerEndpoint[Fs2Streams[F] with WebSockets, F] = { type Fs2Pipe = fs2.Pipe[F, GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput]] @@ -227,7 +227,7 @@ object Http4sAdapter { endpoint.PRINCIPAL, endpoint.INPUT, endpoint.ERROR_OUTPUT, - CalibanPipe, + (String, CalibanPipe), ZioWebSockets, RIO[R, *] ] @@ -238,24 +238,28 @@ object Http4sAdapter { endpoint.PRINCIPAL, endpoint.INPUT, endpoint.ERROR_OUTPUT, - Fs2Pipe, + (String, Fs2Pipe), Fs2Streams[F] with WebSockets, F ]( - e.endpoint.asInstanceOf[Endpoint[endpoint.SECURITY_INPUT, endpoint.INPUT, endpoint.ERROR_OUTPUT, Fs2Pipe, Any]], + e.endpoint + .asInstanceOf[Endpoint[endpoint.SECURITY_INPUT, endpoint.INPUT, endpoint.ERROR_OUTPUT, (String, Fs2Pipe), Any]], _ => a => interop.toEffect(e.securityLogic(zioMonadError)(a)), _ => u => req => interop.toEffect( e.logic(zioMonadError)(u)(req) - .map(_.map { zioPipe => + .map(_.map { case (protocol, zioPipe) => import zio.stream.interop.fs2z._ - fs2InputStream => - zioPipe( - fs2InputStream.translate(interop.fromEffectK).toZStream().provide(runtime.environment) - ).toFs2Stream - .translate(interop.toEffectK) + ( + protocol, + fs2InputStream => + zioPipe( + fs2InputStream.translate(interop.fromEffectK).toZStream().provide(runtime.environment) + ).toFs2Stream + .translate(interop.toEffectK) + ) }) ) ) diff --git a/adapters/play/src/main/scala/caliban/PlayAdapter.scala b/adapters/play/src/main/scala/caliban/PlayAdapter.scala index 2df85bff7c..bf1bf984c1 100644 --- a/adapters/play/src/main/scala/caliban/PlayAdapter.scala +++ b/adapters/play/src/main/scala/caliban/PlayAdapter.scala @@ -88,7 +88,15 @@ class PlayAdapter private (private val options: Option[PlayServerOptions]) { playInterpreter.toRoutes( PlayAdapter.convertWebSocketEndpoint( endpoint.asInstanceOf[ - ServerEndpoint.Full[Unit, Unit, (ServerRequest, String), StatusCode, CalibanPipe, ZioWebSockets, RIO[R, *]] + ServerEndpoint.Full[ + Unit, + Unit, + (ServerRequest, String), + StatusCode, + (String, CalibanPipe), + ZioWebSockets, + RIO[R, *] + ] ] ) ) @@ -103,22 +111,38 @@ object PlayAdapter extends PlayAdapter(None) { type AkkaPipe = Flow[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput], Any] def convertWebSocketEndpoint[R]( - endpoint: ServerEndpoint.Full[Unit, Unit, (ServerRequest, String), StatusCode, CalibanPipe, ZioWebSockets, RIO[ - R, - * - ]] + endpoint: ServerEndpoint.Full[ + Unit, + Unit, + (ServerRequest, String), + StatusCode, + (String, CalibanPipe), + ZioWebSockets, + RIO[ + R, + * + ] + ] )(implicit ec: ExecutionContext, runtime: Runtime[R], materializer: Materializer ): ServerEndpoint[AkkaStreams with WebSockets, Future] = - ServerEndpoint[Unit, Unit, (ServerRequest, String), StatusCode, AkkaPipe, AkkaStreams with WebSockets, Future]( + ServerEndpoint[ + Unit, + Unit, + (ServerRequest, String), + StatusCode, + (String, AkkaPipe), + AkkaStreams with WebSockets, + Future + ]( endpoint.endpoint .asInstanceOf[ PublicEndpoint[ (ServerRequest, String), StatusCode, - Pipe[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput]], + (String, Pipe[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput]]), Any ] ], @@ -129,7 +153,7 @@ object PlayAdapter extends PlayAdapter(None) { runtime .unsafeRunToFuture(endpoint.logic(zioMonadError)(())(req)) .future - .map(_.map { zioPipe => + .map(_.map { case (protocol, zioPipe) => val io = for { inputQueue <- ZQueue.unbounded[GraphQLWSInput] @@ -144,7 +168,7 @@ object PlayAdapter extends PlayAdapter(None) { flow = Flow.fromSinkAndSourceCoupled(sink, source).watchTermination() { (_, f) => f.onComplete(_ => runtime.unsafeRun(fiber.interrupt)) } - } yield flow + } yield (protocol, flow) runtime.unsafeRun(io) }) ) diff --git a/interop/tapir/src/main/scala/caliban/interop/tapir/TapirAdapter.scala b/interop/tapir/src/main/scala/caliban/interop/tapir/TapirAdapter.scala index 1934eab49a..84cda7a2b4 100644 --- a/interop/tapir/src/main/scala/caliban/interop/tapir/TapirAdapter.scala +++ b/interop/tapir/src/main/scala/caliban/interop/tapir/TapirAdapter.scala @@ -1,7 +1,5 @@ package caliban.interop.tapir -import caliban.ResponseValue.{ ObjectValue, StreamValue } -import caliban.Value.StringValue import caliban._ import caliban.execution.QueryExecution import caliban.interop.tapir.ws.Protocol @@ -17,10 +15,8 @@ import sttp.tapir.model.{ ServerRequest, UnsupportedWebSocketFrameException } import sttp.tapir.server.ServerEndpoint import sttp.ws.WebSocketFrame import zio._ -import zio.clock.Clock import zio.duration.Duration import zio.random.Random -import zio.stream._ import scala.concurrent.Future import scala.util.Try @@ -58,7 +54,7 @@ object TapirAdapter { private val errorBody = statusCode.and(stringBody).and(headers).map(responseMapping) - def makeHttpEndpoints[R, E](implicit + def makeHttpEndpoints[E](implicit requestCodec: JsonCodec[GraphQLRequest], responseCodec: JsonCodec[GraphQLResponse[E]] ): List[ @@ -153,7 +149,7 @@ object TapirAdapter { makeHttpEndpoints.map(_.serverLogic(logic)) } - def makeHttpUploadEndpoint[R, E](implicit + def makeHttpUploadEndpoint[E](implicit requestCodec: JsonCodec[GraphQLRequest], mapCodec: JsonCodec[Map[String, Seq[String]]], responseCodec: JsonCodec[GraphQLResponse[E]] @@ -254,16 +250,19 @@ object TapirAdapter { def makeWebSocketEndpoint(implicit inputCodec: JsonCodec[GraphQLWSInput], outputCodec: JsonCodec[GraphQLWSOutput] - ): PublicEndpoint[(ServerRequest, String), TapirResponse, CalibanPipe, ZioStreams with WebSockets] = + ): PublicEndpoint[(ServerRequest, String), TapirResponse, (String, CalibanPipe), ZioStreams with WebSockets] = { + val protocolHeader: EndpointIO.Header[String] = header[String]("sec-websocket-protocol") endpoint .in(extractFromRequest(identity)) - .in(header[String]("sec-websocket-protocol")) + .in(protocolHeader) + .out(protocolHeader) .out( webSocketBody[GraphQLWSInput, CodecFormat.Json, Either[GraphQLWSClose, GraphQLWSOutput], CodecFormat.Json]( ZioStreams ) ) .errorOut(errorBody) + } def makeWebSocketService[R, E]( interpreter: GraphQLInterpreter[R, E], @@ -289,11 +288,11 @@ object TapirAdapter { queryExecution, webSocketHooks ) - .map(Right(_)) + .map(res => Right((protocol, res))) ).catchAll(ZIO.left(_)) } - def convertHttpEndpointToFuture[E, R]( + def convertHttpEndpointToFuture[R]( endpoint: ServerEndpoint[Any, RIO[R, *]] )(implicit runtime: Runtime[R]): ServerEndpoint[Any, Future] = ServerEndpoint[ diff --git a/interop/tapir/src/test/scala/caliban/interop/tapir/TapirAdapterSpec.scala b/interop/tapir/src/test/scala/caliban/interop/tapir/TapirAdapterSpec.scala index 182a93c0df..a0d6a5523b 100644 --- a/interop/tapir/src/test/scala/caliban/interop/tapir/TapirAdapterSpec.scala +++ b/interop/tapir/src/test/scala/caliban/interop/tapir/TapirAdapterSpec.scala @@ -48,10 +48,10 @@ object TapirAdapterSpec { ): ZSpec[TestService, Throwable] = suiteM(label) { val run = SttpClientInterpreter() - .toRequestThrowDecodeFailures(TapirAdapter.makeHttpEndpoints[Any, CalibanError].head, Some(httpUri)) + .toRequestThrowDecodeFailures(TapirAdapter.makeHttpEndpoints[CalibanError].head, Some(httpUri)) val runUpload = uploadUri.map(uploadUri => SttpClientInterpreter() - .toRequestThrowDecodeFailures(TapirAdapter.makeHttpUploadEndpoint[Any, CalibanError], Some(uploadUri)) + .toRequestThrowDecodeFailures(TapirAdapter.makeHttpUploadEndpoint[CalibanError], Some(uploadUri)) ) val runWS = wsUri.map(wsUri => SttpClientInterpreter() @@ -181,7 +181,8 @@ object TapirAdapterSpec { ) -> "graphql-ws" ) ) - pipe <- ZIO.fromEither(res.body).orElseFail(new Throwable("Failed to parse result")) + res <- ZIO.fromEither(res.body).orElseFail(new Throwable("Failed to parse result")) + (_, pipe) = res inputQueue <- Queue.unbounded[GraphQLWSInput] inputStream = ZStream.fromQueueWithShutdown(inputQueue) outputStream = pipe(inputStream) @@ -229,7 +230,8 @@ object TapirAdapterSpec { ) -> "graphql-transport-ws" ) ) - pipe <- ZIO.fromEither(res.body).orElseFail(new Throwable("Failed to parse result")) + res <- ZIO.fromEither(res.body).orElseFail(new Throwable("Failed to parse result")) + (_, pipe) = res inputQueue <- Queue.unbounded[GraphQLWSInput] inputStream = ZStream.fromQueueWithShutdown(inputQueue) outputStream = pipe(inputStream)