Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket support for caliban-quick #2150

Merged
merged 12 commits into from
Mar 24, 2024
6 changes: 5 additions & 1 deletion adapters/quick/src/main/scala/caliban/GraphiQLHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ object GraphiQLHandler {
* @see [[https://github.com/graphql/graphiql/tree/main/examples/graphiql-cdn]]
*/
def handler(apiPath: String, graphiqlPath: String): RequestHandler[Any, Nothing] =
Handler.fromBody(Body.fromString(html(apiPath, graphiqlPath)))
Response(
Status.Ok,
Headers(Header.ContentType(MediaType.text.html).untyped),
Body.fromString(html(apiPath, graphiqlPath))
).toHandler

def html(apiPath: String, uiPath: String): String =
s"""
Expand Down
24 changes: 17 additions & 7 deletions adapters/quick/src/main/scala/caliban/QuickAdapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
*/
val handlers: QuickHandlers[R] = QuickHandlers(
api = Handler.fromFunctionZIO[Request](requestHandler.handleHttpRequest),
upload = Handler.fromFunctionZIO[Request](requestHandler.handleUploadRequest)
upload = Handler.fromFunctionZIO[Request](requestHandler.handleUploadRequest),
webSocket = Handler.fromFunctionZIO[Request](requestHandler.handleWebSocketRequest)
)

@deprecated("Use `handlers` instead", "2.5.0")
Expand All @@ -24,11 +25,13 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
* @param apiPath The path where the GraphQL API will be served.
* @param graphiqlPath The path where the GraphiQL UI will be served. If None, GraphiQL will not be served.
* @param uploadPath The path where files can be uploaded. If None, uploads will be disabled.
* @param webSocketPath The path where websocket requests will be set. If None, websocket-based subscriptions will be disabled.
*/
def toApp(
apiPath: String,
graphiqlPath: Option[String] = None,
uploadPath: Option[String] = None
uploadPath: Option[String] = None,
webSocketPath: Option[String] = None
): HttpApp[R] = {
val apiRoutes = List(
RoutePattern(Method.POST, apiPath) -> handlers.api,
Expand All @@ -40,8 +43,10 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
val uploadRoute = uploadPath.toList.map { uPath =>
RoutePattern(Method.POST, uPath) -> handlers.upload
}

Routes.fromIterable(apiRoutes ::: graphiqlRoute ::: uploadRoute).toHttpApp
val wsRoute = webSocketPath.toList.map { wsPath =>
RoutePattern(Method.ANY, wsPath) -> handlers.webSocket
}
Routes.fromIterable(apiRoutes ::: graphiqlRoute ::: uploadRoute ::: wsRoute).toHttpApp
}

/**
Expand All @@ -52,15 +57,17 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
* @param apiPath The route to serve the API on, e.g., `/api/graphql`
* @param graphiqlPath Optionally define a route to serve the GraphiQL UI on, e.g., `/graphiql`
* @param uploadPath The route where files can be uploaded, e.g., /upload/graphql. If None, uploads will be disabled.
* @param webSocketPath The path where websocket requests will be set. If None, websocket-based subscriptions will be disabled.
*/
def runServer(
port: Int,
apiPath: String,
graphiqlPath: Option[String] = None,
uploadPath: Option[String] = None
uploadPath: Option[String] = None,
webSocketPath: Option[String] = None
)(implicit trace: Trace): RIO[R, Nothing] =
Server
.serve[R](toApp(apiPath, graphiqlPath = graphiqlPath, uploadPath = uploadPath))
.serve[R](toApp(apiPath, graphiqlPath = graphiqlPath, uploadPath = uploadPath, webSocketPath = webSocketPath))
.provideSomeLayer[R](Server.defaultWithPort(port))

def configure(config: ExecutionConfiguration)(implicit trace: Trace): QuickAdapter[R] =
Expand All @@ -69,13 +76,16 @@ final class QuickAdapter[-R] private (requestHandler: QuickRequestHandler[R]) {
def configure[R1](configurator: QuickAdapter.Configurator[R1])(implicit trace: Trace): QuickAdapter[R & R1] =
new QuickAdapter(requestHandler.configure[R1](configurator))

def configureWebSocket[R1](config: quick.WebSocketConfig[R1]): QuickAdapter[R & R1] =
new QuickAdapter(requestHandler.configureWebSocket(config))

}

object QuickAdapter {
type Configurator[-R] = URIO[R & Scope, Unit]

def apply[R](interpreter: GraphQLInterpreter[R, Any]): QuickAdapter[R] =
new QuickAdapter(new QuickRequestHandler(interpreter))
new QuickAdapter(new QuickRequestHandler(interpreter, quick.WebSocketConfig.default))

def handlers[R](implicit tag: Tag[R], trace: Trace): URIO[QuickAdapter[R], QuickHandlers[R]] =
ZIO.serviceWith(_.handlers)
Expand Down
6 changes: 4 additions & 2 deletions adapters/quick/src/main/scala/caliban/QuickHandlers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import zio.http.{ HandlerAspect, RequestHandler }

final case class QuickHandlers[-R](
api: RequestHandler[R, Nothing],
upload: RequestHandler[R, Nothing]
upload: RequestHandler[R, Nothing],
webSocket: RequestHandler[R, Nothing]
) {

/**
Expand All @@ -13,7 +14,8 @@ final case class QuickHandlers[-R](
def @@[R1 <: R](aspect: HandlerAspect[R1, Unit]): QuickHandlers[R1] =
QuickHandlers(
api = (api @@ aspect).merge,
upload = (upload @@ aspect).merge
upload = (upload @@ aspect).merge,
webSocket = (webSocket @@ aspect).merge
)

}
51 changes: 47 additions & 4 deletions adapters/quick/src/main/scala/caliban/QuickRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,42 @@ import caliban.ResponseValue.StreamValue
import caliban.interop.jsoniter.ValueJsoniter
import caliban.uploads.{ FileMeta, GraphQLUploadRequest, Uploads }
import caliban.wrappers.Caching
import caliban.ws.Protocol
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import zio._
import zio.http.ChannelEvent.UserEvent.HandshakeComplete
import zio.http.Header.ContentType
import zio.http._
import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.stream.{ UStream, ZStream }
import zio.stream.{ UStream, ZPipeline, ZStream }

import java.nio.charset.StandardCharsets.UTF_8
import scala.util.control.NonFatal

final private class QuickRequestHandler[-R](interpreter: GraphQLInterpreter[R, Any]) {
final private class QuickRequestHandler[R](
interpreter: GraphQLInterpreter[R, Any],
wsConfig: quick.WebSocketConfig[R]
) {
import QuickRequestHandler._

def configure(config: ExecutionConfiguration)(implicit trace: Trace): QuickRequestHandler[R] =
new QuickRequestHandler[R](
interpreter.wrapExecutionWith[R, Any](Configurator.setWith(config)(_))
interpreter.wrapExecutionWith[R, Any](Configurator.setWith(config)(_)),
wsConfig
)

def configure[R1](configurator: QuickAdapter.Configurator[R1])(implicit
trace: Trace
): QuickRequestHandler[R & R1] =
new QuickRequestHandler[R & R1](
interpreter.wrapExecutionWith[R & R1, Any](exec => ZIO.scoped[R1 & R](configurator *> exec))
interpreter.wrapExecutionWith[R & R1, Any](exec => ZIO.scoped[R1 & R](configurator *> exec)),
wsConfig
)

def configureWebSocket[R1](config: quick.WebSocketConfig[R1]): QuickRequestHandler[R & R1] =
new QuickRequestHandler[R & R1](interpreter, config)

def handleHttpRequest(request: Request)(implicit trace: Trace): URIO[R, Response] =
transformHttpRequest(request)
.flatMap(executeRequest(request.method, _))
Expand All @@ -45,6 +55,17 @@ final private class QuickRequestHandler[-R](interpreter: GraphQLInterpreter[R, A
.provideSomeLayer[R](fileHandle)
}.merge

def handleWebSocketRequest(request: Request)(implicit trace: Trace): URIO[R, Response] =
Response.fromSocketApp {
val protocol = request.headers.get(Header.SecWebSocketProtocol) match {
case Some(value) => Protocol.fromName(value.renderedValue)
case None => Protocol.Legacy
}
Handler
.webSocket(webSocketChannelListener(protocol))
.withConfig(wsConfig.zHttpConfig.subProtocol(Some(protocol.name)))
}

private def transformHttpRequest(httpReq: Request)(implicit trace: Trace): IO[Response, GraphQLRequest] = {

def decodeQueryParams(queryParams: QueryParams): Either[Response, GraphQLRequest] = {
Expand Down Expand Up @@ -214,6 +235,28 @@ final private class QuickRequestHandler[-R](interpreter: GraphQLInterpreter[R, A
req.headers
.get(GraphQLRequest.`apollo-federation-include-trace`)
.exists(_.equalsIgnoreCase(GraphQLRequest.ftv1))

private def webSocketChannelListener(protocol: Protocol)(ch: WebSocketChannel)(implicit trace: Trace): RIO[R, Unit] =
for {
queue <- Queue.unbounded[GraphQLWSInput]
pipe <- protocol.make(interpreter, wsConfig.keepAliveTime, wsConfig.hooks).map(ZPipeline.fromFunction(_))
out = ZStream
.fromQueueWithShutdown(queue)
.via(pipe)
.interruptWhen(ch.awaitShutdown)
.map {
case Right(output) => WebSocketFrame.Text(writeToString(output))
case Left(close) => WebSocketFrame.Close(close.code, Some(close.reason))
}
_ <- ZIO.scoped(ch.receiveAll {
case ChannelEvent.UserEventTriggered(HandshakeComplete) =>
out.runForeach(frame => ch.send(ChannelEvent.Read(frame))).forkScoped
case ChannelEvent.Read(WebSocketFrame.Text(text)) =>
ZIO.suspend(queue.offer(readFromString[GraphQLWSInput](text)))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paulpdaniels @frekw, do you happen to know how should the server handle invalid messages? Based on this logic, we are going to raise an error if the frame cannot be decoded to GraphQLWSInput, which AFAICT will close the stream.

Is this sound? Or should we be ignoring messages we can't decode?

case _ =>
ZIO.unit
})
} yield ()
}

object QuickRequestHandler {
Expand Down
24 changes: 24 additions & 0 deletions adapters/quick/src/main/scala/caliban/quick/WebSocketConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package caliban.quick

import caliban.ws.WebSocketHooks
import zio._
import zio.http.{ WebSocketConfig => ZWebSocketConfig }

case class WebSocketConfig[-R](
keepAliveTime: Option[Duration],
hooks: WebSocketHooks[R, Any],
zHttpConfig: ZWebSocketConfig
) {
def withHooks[R1](newHooks: WebSocketHooks[R1, Any]): WebSocketConfig[R & R1] =
copy(hooks = hooks ++ newHooks)

def withKeepAliveTime(time: Duration): WebSocketConfig[R] =
copy(keepAliveTime = Some(time))

def withZHttpConfig(newConfig: ZWebSocketConfig): WebSocketConfig[R] =
copy(zHttpConfig = newConfig)
}

object WebSocketConfig {
def default: WebSocketConfig[Any] = WebSocketConfig(None, WebSocketHooks.empty, ZWebSocketConfig.default)
}
26 changes: 22 additions & 4 deletions adapters/quick/src/main/scala/caliban/quick/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@ package object quick {
* @param apiPath The route to serve the API on, e.g., `/api/graphql`
* @param graphiqlPath Optionally define a route to serve the GraphiQL UI on, e.g., `/graphiql`
* @param uploadPath Optionally define a route to serve file uploads on, e.g., `/api/upload`
* @param webSocketPath The path where websocket requests will be set. If None, websocket-based subscriptions will be disabled.
*/
def runServer(
port: Int,
apiPath: String,
graphiqlPath: Option[String] = None,
uploadPath: Option[String] = None
uploadPath: Option[String] = None,
webSocketPath: Option[String] = None
)(implicit
trace: Trace
): RIO[R, Nothing] =
gql.interpreter.flatMap(QuickAdapter(_).runServer(port, apiPath, graphiqlPath, uploadPath))
gql.interpreter.flatMap(
QuickAdapter(_).runServer(
port,
apiPath = apiPath,
graphiqlPath = graphiqlPath,
uploadPath = uploadPath,
webSocketPath = webSocketPath
)
)

/**
* Creates zio-http `HttpApp` from the GraphQL API
Expand All @@ -37,9 +47,17 @@ package object quick {
def toApp(
apiPath: String,
graphiqlPath: Option[String] = None,
uploadPath: Option[String] = None
uploadPath: Option[String] = None,
webSocketPath: Option[String] = None
)(implicit trace: Trace): IO[CalibanError.ValidationError, HttpApp[R]] =
gql.interpreter.map(QuickAdapter(_).toApp(apiPath, graphiqlPath, uploadPath))
gql.interpreter.map(
QuickAdapter(_).toApp(
apiPath = apiPath,
graphiqlPath = graphiqlPath,
uploadPath = uploadPath,
webSocketPath = webSocketPath
)
)

/**
* Creates a zio-http handler for the GraphQL API
Expand Down
6 changes: 4 additions & 2 deletions adapters/quick/src/test/scala/caliban/QuickAdapterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ object QuickAdapterSpec extends ZIOSpecDefault {

private val apiLayer = envLayer >>> ZLayer.fromZIO {
for {
app <- TestApi.api.toApp("/api/graphql", uploadPath = Some("/upload/graphql")).map(_ @@ auth)
app <- TestApi.api
.toApp("/api/graphql", uploadPath = Some("/upload/graphql"), webSocketPath = Some("/ws/graphql"))
.map(_ @@ auth)
_ <- Server.serve(app).forkScoped
_ <- Live.live(Clock.sleep(3 seconds))
service <- ZIO.service[TestService]
Expand All @@ -35,7 +37,7 @@ object QuickAdapterSpec extends ZIOSpecDefault {
val suite = TapirAdapterSpec.makeSuite(
"QuickAdapterSpec",
uri"http://localhost:8090/api/graphql",
wsUri = None,
wsUri = Some(uri"ws://localhost:8090/ws/graphql"),
uploadUri = Some(uri"http://localhost:8090/upload/graphql")
)
suite.provideShared(
Expand Down
6 changes: 5 additions & 1 deletion adapters/zio-http/src/main/scala/caliban/ZHttpAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package caliban

import caliban.interop.tapir.ws.Protocol
import caliban.interop.tapir.{ HttpInterpreter, WebSocketInterpreter }
import caliban.ws.Protocol
import sttp.capabilities.zio.ZioStreams
import sttp.model.HeaderNames
import sttp.tapir.server.ziohttp.{ ZioHttpInterpreter, ZioHttpServerOptions }
import zio.http._

@deprecated(
"The `caliban-zio-http` package is deprecated and scheduled to be removed in a future release. To use Caliban with zio-http, use the `caliban-quick` instead",
"2.6.0"
)
object ZHttpAdapter {

@deprecated("Defining subprotocols in the server config is no longer required")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import zio._
import zio.http._
import zio.test.{ Live, ZIOSpecDefault }

import scala.annotation.nowarn
import scala.language.postfixOps

@nowarn
object ZHttpAdapterSpec extends ZIOSpecDefault {
import sttp.tapir.json.zio._

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ lazy val commonSettings = Def.settings(
})
)

lazy val enforceMimaCompatibility = true // Enable / disable failing CI on binary incompatibilities
lazy val enforceMimaCompatibility = false // Enable / disable failing CI on binary incompatibilities

lazy val enableMimaSettingsJVM =
Def.settings(
Expand Down
Loading