Skip to content

Commit

Permalink
Merge pull request #3017 from softwaremill/feature/netty-streaming
Browse files Browse the repository at this point in the history
Netty streaming for Cats Effect
  • Loading branch information
adamw authored Jul 19, 2023
2 parents be73a49 + 7dcdc6d commit f5c73cf
Show file tree
Hide file tree
Showing 23 changed files with 487 additions and 109 deletions.
10 changes: 8 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,10 @@ lazy val nettyServer: ProjectMatrix = (projectMatrix in file("server/netty-serve
.settings(commonJvmSettings)
.settings(
name := "tapir-netty-server",
libraryDependencies ++= Seq("io.netty" % "netty-all" % Versions.nettyAll)
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % Versions.nettyAll,
"com.typesafe.netty" % "netty-reactive-streams-http" % Versions.nettyReactiveStreams
)
++ loggerDependencies,
// needed because of https://github.com/coursier/coursier/issues/2016
useCoursier := false
Expand All @@ -1362,7 +1365,10 @@ lazy val nettyServer: ProjectMatrix = (projectMatrix in file("server/netty-serve
.dependsOn(serverCore, serverTests % Test)

lazy val nettyServerCats: ProjectMatrix = nettyServerProject("cats", catsEffect)
.settings(libraryDependencies += "com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared)
.settings(libraryDependencies ++= Seq(
"com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared,
"co.fs2" %% "fs2-reactive-streams" % Versions.fs2
))

lazy val nettyServerZio: ProjectMatrix = nettyServerProject("zio", zio)
.settings(libraryDependencies += "dev.zio" %% "zio-interop-cats" % Versions.zioInteropCats)
Expand Down
14 changes: 7 additions & 7 deletions doc/server/netty.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ To expose an endpoint using a [Netty](https://netty.io)-based server, first add

Then, use:

* `NettyFutureServer().addEndpoints` to expose `Future`-based server endpoints.
* `NettyCatsServer().addEndpoints` to expose `F`-based server endpoints, where `F` is any cats-effect supported effect.
* `NettyZioServer().addEndpoints` to expose `ZIO`-based server endpoints, where `R` represents ZIO requirements supported effect.
- `NettyFutureServer().addEndpoints` to expose `Future`-based server endpoints.
- `NettyCatsServer().addEndpoints` to expose `F`-based server endpoints, where `F` is any cats-effect supported effect. [Streaming](../endpoint/streaming.md) request and response bodies is supported with fs2.
- `NettyZioServer().addEndpoints` to expose `ZIO`-based server endpoints, where `R` represents ZIO requirements supported effect.

These methods require a single, or a list of `ServerEndpoint`s, which can be created by adding [server logic](logic.md)
These methods require a single, or a list of `ServerEndpoint`s, which can be created by adding [server logic](logic.md)
to an endpoint.

For example:
Expand All @@ -36,7 +36,7 @@ val helloWorld = endpoint
.out(stringBody)
.serverLogic(name => Future.successful[Either[Unit, String]](Right(s"Hello, $name!")))

val binding: Future[NettyFutureServerBinding] =
val binding: Future[NettyFutureServerBinding] =
NettyFutureServer().addEndpoint(helloWorld).start()
```

Expand All @@ -60,7 +60,7 @@ NettyFutureServer().port(9090).addEndpoints(???)
NettyFutureServer(NettyFutureServerOptions.customiseInterceptors.serverLog(None).options)

// customise Netty config
NettyFutureServer(NettyConfig.default.socketBacklog(256))
NettyFutureServer(NettyConfig.defaultNoStreaming.socketBacklog(256))
```

## Domain socket support
Expand All @@ -83,4 +83,4 @@ val serverBinding: Future[NettyFutureDomainSocketBinding] =
Future.successful[Either[Unit, String]](Right(s"Hello, $name!")))
)
.startUsingDomainSocket(Paths.get(System.getProperty("java.io.tmpdir"), "hello"))
```
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package sttp.tapir.examples.streaming

import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import fs2.{Chunk, Stream}
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3._
import sttp.model.HeaderNames
import sttp.tapir._
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.cats.{NettyCatsServer, NettyCatsServerBinding}

import java.nio.charset.StandardCharsets
import scala.concurrent.duration._

object StreamingNettyFs2Server extends IOApp {
// corresponds to: GET /receive?name=...
// We need to provide both the schema of the value (for documentation), as well as the format (media type) of the
// body. Here, the schema is a `string` (set by `streamTextBody`) and the media type is `text/plain`.
val streamingEndpoint: PublicEndpoint[Unit, Unit, (Long, Stream[IO, Byte]), Fs2Streams[IO]] =
endpoint.get
.in("receive")
.out(header[Long](HeaderNames.ContentLength))
.out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain(), Some(StandardCharsets.UTF_8)))

val serverEndpoint: ServerEndpoint[Fs2Streams[IO], IO] = streamingEndpoint
.serverLogicSuccess { _ =>
val size = 100L
Stream
.emit(List[Char]('a', 'b', 'c', 'd'))
.repeat
.flatMap(list => Stream.chunk(Chunk.seq(list)))
.metered[IO](100.millis)
.take(size)
.covary[IO]
.map(_.toByte)
.pure[IO]
.map(s => (size, s))
}

private val declaredPort = 9090
private val declaredHost = "localhost"

override def run(args: List[String]): IO[ExitCode] = {
// starting the server
NettyCatsServer
.io()
.use { server =>

val startServer: IO[NettyCatsServerBinding[IO]] = server
.port(declaredPort)
.host(declaredHost)
.addEndpoint(serverEndpoint)
.start()

startServer
.map { binding =>

val port = binding.port
val host = binding.hostName
println(s"Server started at port = ${binding.port}")

val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend()
val result: String =
basicRequest.response(asStringAlways).get(uri"http://$declaredHost:$declaredPort/receive").send(backend).body
println("Got result: " + result)

assert(result == "abcd" * 25)
}
.as(ExitCode.Success)
}
}
}
1 change: 1 addition & 0 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ object Versions {
val finatra = "22.12.0"
val catbird = "21.12.0"
val json4s = "4.0.6"
val nettyReactiveStreams = "2.0.8"
val sprayJson = "1.3.6"
val scalaCheck = "1.17.0"
val scalaTest = "3.2.16"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import sttp.tapir.server.netty.{NettyConfig, Route}
import java.net.{InetSocketAddress, SocketAddress}
import java.nio.file.{Path, Paths}
import java.util.UUID
import sttp.capabilities.fs2.Fs2Streams

case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: NettyCatsServerOptions[F], config: NettyConfig) {
def addEndpoint(se: ServerEndpoint[Any, F]): NettyCatsServer[F] = addEndpoints(List(se))
def addEndpoint(se: ServerEndpoint[Any, F], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] =
def addEndpoint(se: ServerEndpoint[Fs2Streams[F], F]): NettyCatsServer[F] = addEndpoints(List(se))
def addEndpoint(se: ServerEndpoint[Fs2Streams[F], F], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] =
addEndpoints(List(se), overrideOptions)
def addEndpoints(ses: List[ServerEndpoint[Any, F]]): NettyCatsServer[F] = addRoute(
def addEndpoints(ses: List[ServerEndpoint[Fs2Streams[F], F]]): NettyCatsServer[F] = addRoute(
NettyCatsServerInterpreter(options).toRoute(ses)
)
def addEndpoints(ses: List[ServerEndpoint[Any, F]], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] = addRoute(
def addEndpoints(ses: List[ServerEndpoint[Fs2Streams[F], F]], overrideOptions: NettyCatsServerOptions[F]): NettyCatsServer[F] = addRoute(
NettyCatsServerInterpreter(overrideOptions).toRoute(ses)
)

Expand Down Expand Up @@ -60,7 +61,7 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty
val channelFuture =
NettyBootstrap(
config,
new NettyServerHandler(route, (f: () => F[Unit]) => options.dispatcher.unsafeToFuture(f())),
new NettyServerHandler(route, (f: () => F[Unit]) => options.dispatcher.unsafeToFuture(f()), config.maxContentLength),
eventLoopGroup,
socketOverride
)
Expand All @@ -81,9 +82,9 @@ case class NettyCatsServer[F[_]: Async](routes: Vector[Route[F]], options: Netty

object NettyCatsServer {
def apply[F[_]: Async](dispatcher: Dispatcher[F]): NettyCatsServer[F] =
NettyCatsServer(Vector.empty, NettyCatsServerOptions.default(dispatcher), NettyConfig.default)
NettyCatsServer(Vector.empty, NettyCatsServerOptions.default(dispatcher), NettyConfig.defaultWithStreaming)
def apply[F[_]: Async](options: NettyCatsServerOptions[F]): NettyCatsServer[F] =
NettyCatsServer(Vector.empty, options, NettyConfig.default)
NettyCatsServer(Vector.empty, options, NettyConfig.defaultWithStreaming)
def apply[F[_]: Async](dispatcher: Dispatcher[F], config: NettyConfig): NettyCatsServer[F] =
NettyCatsServer(Vector.empty, NettyCatsServerOptions.default(dispatcher), config)
def apply[F[_]: Async](options: NettyCatsServerOptions[F], config: NettyConfig): NettyCatsServer[F] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,50 @@ package sttp.tapir.server.netty.cats

import cats.effect.Async
import cats.effect.std.Dispatcher
import sttp.capabilities.fs2.Fs2Streams
import sttp.monad.MonadError
import sttp.monad.syntax._
import sttp.tapir.integ.cats.effect.CatsMonadError
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.Route
import sttp.tapir.server.netty.internal.{NettyServerInterpreter, RunAsync}
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interceptor.reject.RejectInterceptor
import sttp.tapir.server.interpreter.{BodyListener, FilterServerEndpoints, ServerInterpreter}
import sttp.tapir.server.netty.internal.{NettyBodyListener, RunAsync, _}
import sttp.tapir.server.netty.{NettyResponse, NettyServerRequest, Route}

trait NettyCatsServerInterpreter[F[_]] {
implicit def async: Async[F]
def nettyServerOptions: NettyCatsServerOptions[F]

def toRoute(ses: List[ServerEndpoint[Any, F]]): Route[F] = {
def toRoute(ses: List[ServerEndpoint[Fs2Streams[F], F]]): Route[F] = {

implicit val monad: MonadError[F] = new CatsMonadError[F]
val runAsync = new RunAsync[F] {
override def apply[T](f: => F[T]): Unit = nettyServerOptions.dispatcher.unsafeRunAndForget(f)
}
NettyServerInterpreter.toRoute(
ses,
nettyServerOptions.interceptors,
nettyServerOptions.createFile,
nettyServerOptions.deleteFile,
runAsync
implicit val bodyListener: BodyListener[F, NettyResponse] = new NettyBodyListener(runAsync)

val interceptors = nettyServerOptions.interceptors
val createFile = nettyServerOptions.createFile
val deleteFile = nettyServerOptions.deleteFile

val serverInterpreter = new ServerInterpreter[Fs2Streams[F], F, NettyResponse, Fs2Streams[F]](
FilterServerEndpoints(ses),
new NettyCatsRequestBody(createFile),
new NettyCatsToResponseBody(nettyServerOptions.dispatcher, delegate = new NettyToResponseBody),
RejectInterceptor.disableWhenSingleEndpoint(interceptors, ses),
deleteFile
)

val handler: Route[F] = { (request: NettyServerRequest) =>
serverInterpreter(request)
.map {
case RequestResult.Response(response) => Some(response)
case RequestResult.Failure(_) => None
}
}

handler
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package sttp.tapir.server.netty.internal

import cats.effect.{Async, Sync}
import cats.syntax.all._
import com.typesafe.netty.http.StreamedHttpRequest
import fs2.Chunk
import fs2.interop.reactivestreams.StreamSubscriber
import fs2.io.file.{Files, Path}
import io.netty.buffer.ByteBufUtil
import io.netty.handler.codec.http.{FullHttpRequest, HttpContent}
import sttp.capabilities.fs2.Fs2Streams
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.interpreter.{RawValue, RequestBody}
import sttp.tapir.{FileRange, InputStreamRange, RawBodyType, TapirFile}

import java.io.ByteArrayInputStream
import java.nio.ByteBuffer

private[netty] class NettyCatsRequestBody[F[_]](createFile: ServerRequest => F[TapirFile])(implicit val monad: Async[F])
extends RequestBody[F, Fs2Streams[F]] {

override val streams: Fs2Streams[F] = Fs2Streams[F]

override def toRaw[R](serverRequest: ServerRequest, bodyType: RawBodyType[R]): F[RawValue[R]] = {

bodyType match {
case RawBodyType.StringBody(charset) => nettyRequestBytes(serverRequest).map(bs => RawValue(new String(bs, charset)))
case RawBodyType.ByteArrayBody =>
nettyRequestBytes(serverRequest).map(RawValue(_))
case RawBodyType.ByteBufferBody =>
nettyRequestBytes(serverRequest).map(bs => RawValue(ByteBuffer.wrap(bs)))
case RawBodyType.InputStreamBody =>
nettyRequestBytes(serverRequest).map(bs => RawValue(new ByteArrayInputStream(bs)))
case RawBodyType.InputStreamRangeBody =>
nettyRequestBytes(serverRequest).map(bs => RawValue(InputStreamRange(() => new ByteArrayInputStream(bs))))
case RawBodyType.FileBody =>
createFile(serverRequest)
.flatMap(tapirFile => {
toStream(serverRequest)
.through(
Files[F](Files.forAsync[F]).writeAll(Path.fromNioPath(tapirFile.toPath))
)
.compile
.drain
.map(_ => RawValue(FileRange(tapirFile), Seq(FileRange(tapirFile))))
})
case _: RawBodyType.MultipartBody => ???
}
}

override def toStream(serverRequest: ServerRequest): streams.BinaryStream = {
val nettyRequest = serverRequest.underlying.asInstanceOf[StreamedHttpRequest]
fs2.Stream
.eval(StreamSubscriber[F, HttpContent](NettyRequestBody.DefaultChunkSize))
.flatMap(s => s.sub.stream(Sync[F].delay(nettyRequest.subscribe(s))))
.flatMap(httpContent => fs2.Stream.chunk(Chunk.byteBuffer(httpContent.content.nioBuffer())))
}

private def nettyRequestBytes(serverRequest: ServerRequest): F[Array[Byte]] = serverRequest.underlying match {
case req: FullHttpRequest => monad.delay(ByteBufUtil.getBytes(req.content()))
case _: StreamedHttpRequest => toStream(serverRequest).compile.to(Chunk).map(_.toArray[Byte])
case other => monad.raiseError(new UnsupportedOperationException(s"Unexpected Netty request of type ${other.getClass().getName()}"))
}
}
Loading

0 comments on commit f5c73cf

Please sign in to comment.