Skip to content

Commit

Permalink
WebSocket support for caliban-quick (#2150)
Browse files Browse the repository at this point in the history
* Add websocket support for caliban-quick

* Fix import error

* Fix Scala 3 compiling

* Make zio-http websocket config configurable

* Fix content-type of graphiql endpoint

* PR comment

* Add method to construct WebSocketHooks from a StreamTransformer

* Use caliban-quick in stitching example

* Use port 8080 in stitching example

* Fix scaladoc
  • Loading branch information
kyri-petrou authored Mar 24, 2024
1 parent 90838f1 commit 0552909
Show file tree
Hide file tree
Showing 20 changed files with 323 additions and 173 deletions.
6 changes: 5 additions & 1 deletion adapters/quick/src/main/scala/caliban/GraphiQLHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ object GraphiQLHandler {
* @see [[https://github.com/graphql/graphiql/tree/main/examples/graphiql-cdn]]
*/
def handler(apiPath: String, graphiqlPath: String): RequestHandler[Any, Nothing] =
Handler.fromBody(Body.fromString(html(apiPath, graphiqlPath)))
Response(
Status.Ok,
Headers(Header.ContentType(MediaType.text.html).untyped),
Body.fromString(html(apiPath, graphiqlPath))
).toHandler

def html(apiPath: String, uiPath: String): String =
s"""
Expand Down
24 changes: 17 additions & 7 deletions adapters/quick/src/main/scala/caliban/QuickAdapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
*/
val handlers: QuickHandlers[R] = QuickHandlers(
api = Handler.fromFunctionZIO[Request](requestHandler.handleHttpRequest),
upload = Handler.fromFunctionZIO[Request](requestHandler.handleUploadRequest)
upload = Handler.fromFunctionZIO[Request](requestHandler.handleUploadRequest),
webSocket = Handler.fromFunctionZIO[Request](requestHandler.handleWebSocketRequest)
)

@deprecated("Use `handlers` instead", "2.5.0")
Expand All @@ -24,11 +25,13 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
* @param apiPath The path where the GraphQL API will be served.
* @param graphiqlPath The path where the GraphiQL UI will be served. If None, GraphiQL will not be served.
* @param uploadPath The path where files can be uploaded. If None, uploads will be disabled.
* @param webSocketPath The path where websocket requests will be set. If None, websocket-based subscriptions will be disabled.
*/
def toApp(
apiPath: String,
graphiqlPath: Option[String] = None,
uploadPath: Option[String] = None
uploadPath: Option[String] = None,
webSocketPath: Option[String] = None
): HttpApp[R] = {
val apiRoutes = List(
RoutePattern(Method.POST, apiPath) -> handlers.api,
Expand All @@ -40,8 +43,10 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
val uploadRoute = uploadPath.toList.map { uPath =>
RoutePattern(Method.POST, uPath) -> handlers.upload
}

Routes.fromIterable(apiRoutes ::: graphiqlRoute ::: uploadRoute).toHttpApp
val wsRoute = webSocketPath.toList.map { wsPath =>
RoutePattern(Method.ANY, wsPath) -> handlers.webSocket
}
Routes.fromIterable(apiRoutes ::: graphiqlRoute ::: uploadRoute ::: wsRoute).toHttpApp
}

/**
Expand All @@ -52,15 +57,17 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
* @param apiPath The route to serve the API on, e.g., `/api/graphql`
* @param graphiqlPath Optionally define a route to serve the GraphiQL UI on, e.g., `/graphiql`
* @param uploadPath The route where files can be uploaded, e.g., /upload/graphql. If None, uploads will be disabled.
* @param webSocketPath The path where websocket requests will be set. If None, websocket-based subscriptions will be disabled.
*/
def runServer(
port: Int,
apiPath: String,
graphiqlPath: Option[String] = None,
uploadPath: Option[String] = None
uploadPath: Option[String] = None,
webSocketPath: Option[String] = None
)(implicit trace: Trace): RIO[R, Nothing] =
Server
.serve[R](toApp(apiPath, graphiqlPath = graphiqlPath, uploadPath = uploadPath))
.serve[R](toApp(apiPath, graphiqlPath = graphiqlPath, uploadPath = uploadPath, webSocketPath = webSocketPath))
.provideSomeLayer[R](Server.defaultWithPort(port))

def configure(config: ExecutionConfiguration)(implicit trace: Trace): QuickAdapter[R] =
Expand All @@ -69,13 +76,16 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
def configure[R1](configurator: QuickAdapter.Configurator[R1])(implicit trace: Trace): QuickAdapter[R & R1] =
new QuickAdapter(requestHandler.configure[R1](configurator))

def configureWebSocket[R1](config: quick.WebSocketConfig[R1]): QuickAdapter[R & R1] =
new QuickAdapter(requestHandler.configureWebSocket(config))

}

object QuickAdapter {
type Configurator[-R] = URIO[R & Scope, Unit]

def apply[R](interpreter: GraphQLInterpreter[R, Any]): QuickAdapter[R] =
new QuickAdapter(new QuickRequestHandler(interpreter))
new QuickAdapter(new QuickRequestHandler(interpreter, quick.WebSocketConfig.default))

def handlers[R](implicit tag: Tag[R], trace: Trace): URIO[QuickAdapter[R], QuickHandlers[R]] =
ZIO.serviceWith(_.handlers)
Expand Down
6 changes: 4 additions & 2 deletions adapters/quick/src/main/scala/caliban/QuickHandlers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import zio.http.{ HandlerAspect, RequestHandler }

final case class QuickHandlers[-R](
api: RequestHandler[R, Nothing],
upload: RequestHandler[R, Nothing]
upload: RequestHandler[R, Nothing],
webSocket: RequestHandler[R, Nothing]
) {

/**
Expand All @@ -13,7 +14,8 @@ final case class QuickHandlers[-R](
def @@[R1 <: R](aspect: HandlerAspect[R1, Unit]): QuickHandlers[R1] =
QuickHandlers(
api = (api @@ aspect).merge,
upload = (upload @@ aspect).merge
upload = (upload @@ aspect).merge,
webSocket = (webSocket @@ aspect).merge
)

}
51 changes: 47 additions & 4 deletions adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,42 @@ import caliban.ResponseValue.StreamValue
import caliban.interop.jsoniter.ValueJsoniter
import caliban.uploads.{ FileMeta, GraphQLUploadRequest, Uploads }
import caliban.wrappers.Caching
import caliban.ws.Protocol
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import zio._
import zio.http.ChannelEvent.UserEvent.HandshakeComplete
import zio.http.Header.ContentType
import zio.http._
import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.stream.{ UStream, ZStream }
import zio.stream.{ UStream, ZPipeline, ZStream }

import java.nio.charset.StandardCharsets.UTF_8
import scala.util.control.NonFatal

final private class QuickRequestHandler[-R](interpreter: GraphQLInterpreter[R, Any]) {
final private class QuickRequestHandler[R](
interpreter: GraphQLInterpreter[R, Any],
wsConfig: quick.WebSocketConfig[R]
) {
import QuickRequestHandler._

def configure(config: ExecutionConfiguration)(implicit trace: Trace): QuickRequestHandler[R] =
new QuickRequestHandler[R](
interpreter.wrapExecutionWith[R, Any](Configurator.setWith(config)(_))
interpreter.wrapExecutionWith[R, Any](Configurator.setWith(config)(_)),
wsConfig
)

def configure[R1](configurator: QuickAdapter.Configurator[R1])(implicit
trace: Trace
): QuickRequestHandler[R & R1] =
new QuickRequestHandler[R & R1](
interpreter.wrapExecutionWith[R & R1, Any](exec => ZIO.scoped[R1 & R](configurator *> exec))
interpreter.wrapExecutionWith[R & R1, Any](exec => ZIO.scoped[R1 & R](configurator *> exec)),
wsConfig
)

def configureWebSocket[R1](config: quick.WebSocketConfig[R1]): QuickRequestHandler[R & R1] =
new QuickRequestHandler[R & R1](interpreter, config)

def handleHttpRequest(request: Request)(implicit trace: Trace): URIO[R, Response] =
transformHttpRequest(request)
.flatMap(executeRequest(request.method, _))
Expand All @@ -45,6 +55,17 @@ final private class QuickRequestHandler[-R](interpreter: GraphQLInterpreter[R, A
.provideSomeLayer[R](fileHandle)
}.merge

def handleWebSocketRequest(request: Request)(implicit trace: Trace): URIO[R, Response] =
Response.fromSocketApp {
val protocol = request.headers.get(Header.SecWebSocketProtocol) match {
case Some(value) => Protocol.fromName(value.renderedValue)
case None => Protocol.Legacy
}
Handler
.webSocket(webSocketChannelListener(protocol))
.withConfig(wsConfig.zHttpConfig.subProtocol(Some(protocol.name)))
}

private def transformHttpRequest(httpReq: Request)(implicit trace: Trace): IO[Response, GraphQLRequest] = {

def decodeQueryParams(queryParams: QueryParams): Either[Response, GraphQLRequest] = {
Expand Down Expand Up @@ -214,6 +235,28 @@ final private class QuickRequestHandler[-R](interpreter: GraphQLInterpreter[R, A
req.headers
.get(GraphQLRequest.`apollo-federation-include-trace`)
.exists(_.equalsIgnoreCase(GraphQLRequest.ftv1))

private def webSocketChannelListener(protocol: Protocol)(ch: WebSocketChannel)(implicit trace: Trace): RIO[R, Unit] =
for {
queue <- Queue.unbounded[GraphQLWSInput]
pipe <- protocol.make(interpreter, wsConfig.keepAliveTime, wsConfig.hooks).map(ZPipeline.fromFunction(_))
out = ZStream
.fromQueueWithShutdown(queue)
.via(pipe)
.interruptWhen(ch.awaitShutdown)
.map {
case Right(output) => WebSocketFrame.Text(writeToString(output))
case Left(close) => WebSocketFrame.Close(close.code, Some(close.reason))
}
_ <- ZIO.scoped(ch.receiveAll {
case ChannelEvent.UserEventTriggered(HandshakeComplete) =>
out.runForeach(frame => ch.send(ChannelEvent.Read(frame))).forkScoped
case ChannelEvent.Read(WebSocketFrame.Text(text)) =>
ZIO.suspend(queue.offer(readFromString[GraphQLWSInput](text)))
case _ =>
ZIO.unit
})
} yield ()
}

object QuickRequestHandler {
Expand Down
24 changes: 24 additions & 0 deletions adapters/quick/src/main/scala/caliban/quick/WebSocketConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package caliban.quick

import caliban.ws.WebSocketHooks
import zio._
import zio.http.{ WebSocketConfig => ZWebSocketConfig }

case class WebSocketConfig[-R](
keepAliveTime: Option[Duration],
hooks: WebSocketHooks[R, Any],
zHttpConfig: ZWebSocketConfig
) {
def withHooks[R1](newHooks: WebSocketHooks[R1, Any]): WebSocketConfig[R & R1] =
copy(hooks = hooks ++ newHooks)

def withKeepAliveTime(time: Duration): WebSocketConfig[R] =
copy(keepAliveTime = Some(time))

def withZHttpConfig(newConfig: ZWebSocketConfig): WebSocketConfig[R] =
copy(zHttpConfig = newConfig)
}

object WebSocketConfig {
def default: WebSocketConfig[Any] = WebSocketConfig(None, WebSocketHooks.empty, ZWebSocketConfig.default)
}
26 changes: 22 additions & 4 deletions adapters/quick/src/main/scala/caliban/quick/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@ package object quick {
* @param apiPath The route to serve the API on, e.g., `/api/graphql`
* @param graphiqlPath Optionally define a route to serve the GraphiQL UI on, e.g., `/graphiql`
* @param uploadPath Optionally define a route to serve file uploads on, e.g., `/api/upload`
* @param webSocketPath The path where websocket requests will be set. If None, websocket-based subscriptions will be disabled.
*/
def runServer(
port: Int,
apiPath: String,
graphiqlPath: Option[String] = None,
uploadPath: Option[String] = None
uploadPath: Option[String] = None,
webSocketPath: Option[String] = None
)(implicit
trace: Trace
): RIO[R, Nothing] =
gql.interpreter.flatMap(QuickAdapter(_).runServer(port, apiPath, graphiqlPath, uploadPath))
gql.interpreter.flatMap(
QuickAdapter(_).runServer(
port,
apiPath = apiPath,
graphiqlPath = graphiqlPath,
uploadPath = uploadPath,
webSocketPath = webSocketPath
)
)

/**
* Creates zio-http `HttpApp` from the GraphQL API
Expand All @@ -37,9 +47,17 @@ package object quick {
def toApp(
apiPath: String,
graphiqlPath: Option[String] = None,
uploadPath: Option[String] = None
uploadPath: Option[String] = None,
webSocketPath: Option[String] = None
)(implicit trace: Trace): IO[CalibanError.ValidationError, HttpApp[R]] =
gql.interpreter.map(QuickAdapter(_).toApp(apiPath, graphiqlPath, uploadPath))
gql.interpreter.map(
QuickAdapter(_).toApp(
apiPath = apiPath,
graphiqlPath = graphiqlPath,
uploadPath = uploadPath,
webSocketPath = webSocketPath
)
)

/**
* Creates a zio-http handler for the GraphQL API
Expand Down
6 changes: 4 additions & 2 deletions adapters/quick/src/test/scala/caliban/QuickAdapterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ object QuickAdapterSpec extends ZIOSpecDefault {

private val apiLayer = envLayer >>> ZLayer.fromZIO {
for {
app <- TestApi.api.toApp("/api/graphql", uploadPath = Some("/upload/graphql")).map(_ @@ auth)
app <- TestApi.api
.toApp("/api/graphql", uploadPath = Some("/upload/graphql"), webSocketPath = Some("/ws/graphql"))
.map(_ @@ auth)
_ <- Server.serve(app).forkScoped
_ <- Live.live(Clock.sleep(3 seconds))
service <- ZIO.service[TestService]
Expand All @@ -35,7 +37,7 @@ object QuickAdapterSpec extends ZIOSpecDefault {
val suite = TapirAdapterSpec.makeSuite(
"QuickAdapterSpec",
uri"http://localhost:8090/api/graphql",
wsUri = None,
wsUri = Some(uri"ws://localhost:8090/ws/graphql"),
uploadUri = Some(uri"http://localhost:8090/upload/graphql")
)
suite.provideShared(
Expand Down
6 changes: 5 additions & 1 deletion adapters/zio-http/src/main/scala/caliban/ZHttpAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package caliban

import caliban.interop.tapir.ws.Protocol
import caliban.interop.tapir.{ HttpInterpreter, WebSocketInterpreter }
import caliban.ws.Protocol
import sttp.capabilities.zio.ZioStreams
import sttp.model.HeaderNames
import sttp.tapir.server.ziohttp.{ ZioHttpInterpreter, ZioHttpServerOptions }
import zio.http._

@deprecated(
"The `caliban-zio-http` package is deprecated and scheduled to be removed in a future release. To use Caliban with zio-http, use the `caliban-quick` module instead",
"2.6.0"
)
object ZHttpAdapter {

@deprecated("Defining subprotocols in the server config is no longer required")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import zio._
import zio.http._
import zio.test.{ Live, ZIOSpecDefault }

import scala.annotation.nowarn
import scala.language.postfixOps

@nowarn
object ZHttpAdapterSpec extends ZIOSpecDefault {
import sttp.tapir.json.zio._

Expand Down
Loading

0 comments on commit 0552909

Please sign in to comment.