Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netty streaming for ZIO #3062

Merged
merged 11 commits into from
Jul 24, 2023
14 changes: 9 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1365,10 +1365,12 @@ lazy val nettyServer: ProjectMatrix = (projectMatrix in file("server/netty-serve
.dependsOn(serverCore, serverTests % Test)

lazy val nettyServerCats: ProjectMatrix = nettyServerProject("cats", catsEffect)
.settings(libraryDependencies ++= Seq(
"com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared,
"co.fs2" %% "fs2-reactive-streams" % Versions.fs2
))
.settings(
libraryDependencies ++= Seq(
"com.softwaremill.sttp.shared" %% "fs2" % Versions.sttpShared,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scalafmt only.

"co.fs2" %% "fs2-reactive-streams" % Versions.fs2
)
)

lazy val nettyServerZio: ProjectMatrix = nettyServerProject("zio", zio)
.settings(libraryDependencies += "dev.zio" %% "zio-interop-cats" % Versions.zioInteropCats)
Expand All @@ -1378,7 +1380,9 @@ def nettyServerProject(proj: String, dependency: ProjectMatrix): ProjectMatrix =
.settings(commonJvmSettings)
.settings(
name := s"tapir-netty-server-$proj",
libraryDependencies ++= loggerDependencies,
libraryDependencies ++= loggerDependencies ++ Seq(
"dev.zio" %% "zio-interop-reactivestreams" % Versions.zioInteropReactiveStreams
),
// needed because of https://github.com/coursier/coursier/issues/2016
useCoursier := false
)
Expand Down
2 changes: 1 addition & 1 deletion doc/server/netty.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Then, use:

- `NettyFutureServer().addEndpoints` to expose `Future`-based server endpoints.
- `NettyCatsServer().addEndpoints` to expose `F`-based server endpoints, where `F` is any cats-effect supported effect. [Streaming](../endpoint/streaming.md) request and response bodies is supported with fs2.
- `NettyZioServer().addEndpoints` to expose `ZIO`-based server endpoints, where `R` represents ZIO requirements supported effect.
- `NettyZioServer().addEndpoints` to expose `ZIO`-based server endpoints, where `R` represents ZIO requirements supported effect. Streaming is supported with ZIO Streams.

These methods require a single, or a list of `ServerEndpoint`s, which can be created by adding [server logic](logic.md)
to an endpoint.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package sttp.tapir.examples.streaming

import sttp.capabilities.zio.ZioStreams
import sttp.client3._
import sttp.model.HeaderNames
import sttp.tapir.{CodecFormat, PublicEndpoint}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.zio.NettyZioServer
import sttp.tapir.ztapir._
import zio.interop.catz._
import zio._
import zio.stream._

import java.nio.charset.StandardCharsets

object StreamingNettyZioServer extends ZIOAppDefault {
// corresponds to: GET /receive?name=...
// We need to provide both the schema of the value (for documentation), as well as the format (media type) of the
// body. Here, the schema is a `string` (set by `streamTextBody`) and the media type is `text/plain`.
val streamingEndpoint: PublicEndpoint[Unit, Unit, (Long, ZStream[Any, Throwable, Byte]), ZioStreams] =
endpoint.get
.in("receive")
.out(header[Long](HeaderNames.ContentLength))
.out(streamTextBody(ZioStreams)(CodecFormat.TextPlain(), Some(StandardCharsets.UTF_8)))

val serverEndpoint: ZServerEndpoint[Any, ZioStreams] = streamingEndpoint
.zServerLogic { _ =>
val size = 100L
val stream = ZStream
.tick(100.millis)
.zipWith(ZStream[Char]('a', 'b', 'c', 'd').repeat(Schedule.forever))((_, c) => c)
.take(size)
.map(_.toByte)

ZIO.succeed((size, stream))
}

private val declaredPort = 9090
private val declaredHost = "localhost"

override def run: URIO[Any, ExitCode] = {
(for {
binding <- NettyZioServer()
.port(declaredPort)
.host(declaredHost)
.addEndpoint(serverEndpoint)
.start()
_ = {
val port = binding.port
val host = binding.hostName
println(s"Server started at port = ${binding.port}")

val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend()
val result: String =
basicRequest.response(asStringAlways).get(uri"http://$declaredHost:$declaredPort/receive").send(backend).body
println("Got result: " + result)

assert(result == "abcd" * 25)
}
_ <- binding.stop()
} yield ()).exitCode
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package sttp.tapir.server.netty.internal

import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.{ChunkedFile, ChunkedStream}
import sttp.capabilities
import sttp.capabilities.Streams
import sttp.model.HasHeaders
import sttp.tapir.capabilities.NoStreams
import sttp.tapir.server.interpreter.ToResponseBody
import sttp.tapir.server.netty.NettyResponse
import sttp.tapir.server.netty.NettyResponseContent.{
ByteBufNettyResponseContent,
ChunkedFileNettyResponseContent,
ChunkedStreamNettyResponseContent,
ReactivePublisherNettyResponseContent
}
import sttp.tapir.{CodecFormat, FileRange, InputStreamRange, RawBodyType, WebSocketBodyOutput}

import java.io.{InputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.charset.Charset

class NettyToStreamsResponseBody[S <: Streams[S]](delegate: NettyToResponseBody, streamCompatible: StreamCompatible[S])
extends ToResponseBody[NettyResponse, S] {

override val streams: S = streamCompatible.streams

override def fromRawValue[R](v: R, headers: HasHeaders, format: CodecFormat, bodyType: RawBodyType[R]): NettyResponse = {
bodyType match {
case RawBodyType.InputStreamBody =>
(ctx: ChannelHandlerContext) =>
new ReactivePublisherNettyResponseContent(ctx.newPromise(), streamCompatible.publisherFromInputStream(() => v, length = None))

case RawBodyType.InputStreamRangeBody =>
(ctx: ChannelHandlerContext) =>
new ReactivePublisherNettyResponseContent(
ctx.newPromise(),
streamCompatible.publisherFromInputStream(v.inputStreamFromRangeStart, length = v.range.map(_.contentLength))
)

case RawBodyType.FileBody =>
(ctx: ChannelHandlerContext) => new ReactivePublisherNettyResponseContent(ctx.newPromise(), streamCompatible.publisherFromFile(v))

case _: RawBodyType.MultipartBody => throw new UnsupportedOperationException

case _ => delegate.fromRawValue(v, headers, format, bodyType)
}
}

override def fromStreamValue(
v: streams.BinaryStream,
headers: HasHeaders,
format: CodecFormat,
charset: Option[Charset]
): NettyResponse = (ctx: ChannelHandlerContext) => {
new ReactivePublisherNettyResponseContent(
ctx.newPromise(),
streamCompatible.asPublisher(v.asInstanceOf[streamCompatible.streams.BinaryStream])
)
}

override def fromWebSocketPipe[REQ, RESP](
pipe: streams.Pipe[REQ, RESP],
o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, S]
): NettyResponse = throw new UnsupportedOperationException
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sttp.tapir.server.netty.internal

import io.netty.handler.codec.http.HttpContent
import org.reactivestreams.Publisher
import sttp.capabilities.Streams
import sttp.tapir.{FileRange, TapirFile}

import java.io.InputStream

private[netty] trait StreamCompatible[S <: Streams[S]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we have an impl for cats as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as I mentioned in the PR description, I'd like to make refactoring for CE in another PR, so that the scope of this PR is ZIO-only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, of course, I didn't read the description ;)

val streams: S
def fromFile(file: FileRange): streams.BinaryStream
def fromInputStream(is: () => InputStream, length: Option[Long]): streams.BinaryStream
def fromNettyStream(s: Publisher[HttpContent]): streams.BinaryStream
def asPublisher(s: streams.BinaryStream): Publisher[HttpContent]

def publisherFromFile(file: FileRange): Publisher[HttpContent] =
asPublisher(fromFile(file))

def publisherFromInputStream(is: () => InputStream, length: Option[Long]): Publisher[HttpContent] =
asPublisher(fromInputStream(is, length))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package sttp.tapir.server.netty.internal

import com.typesafe.netty.http.StreamedHttpRequest
import io.netty.buffer.ByteBufUtil
import io.netty.handler.codec.http.FullHttpRequest
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.RawBodyType._
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.interpreter.{RawValue, RequestBody}
import sttp.tapir.{FileRange, InputStreamRange, RawBodyType, TapirFile}
import zio.interop.reactivestreams._
import zio.stream.{ZStream, _}
import zio.{Chunk, RIO, ZIO}

import java.io.ByteArrayInputStream
import java.nio.ByteBuffer

private[netty] class NettyZioRequestBody[Env](createFile: ServerRequest => RIO[Env, TapirFile])
extends RequestBody[RIO[Env, *], ZioStreams] {

override val streams: ZioStreams = ZioStreams

override def toRaw[R](serverRequest: ServerRequest, bodyType: RawBodyType[R]): RIO[Env, RawValue[R]] = {
bodyType match {
case StringBody(charset) => nettyRequestBytes(serverRequest).map(bs => RawValue(new String(bs, charset)))

case ByteArrayBody =>
nettyRequestBytes(serverRequest).map(RawValue(_))
case ByteBufferBody =>
nettyRequestBytes(serverRequest).map(bs => RawValue(ByteBuffer.wrap(bs)))
case InputStreamBody =>
nettyRequestBytes(serverRequest).map(bs => RawValue(new ByteArrayInputStream(bs)))
case InputStreamRangeBody =>
nettyRequestBytes(serverRequest).map(bs => RawValue(InputStreamRange(() => new ByteArrayInputStream(bs))))
case FileBody =>
createFile(serverRequest)
.flatMap(tapirFile => {
toStream(serverRequest)
.run(ZSink.fromFile(tapirFile))
.map(_ => RawValue(FileRange(tapirFile), Seq(FileRange(tapirFile))))
})
case MultipartBody(partTypes, defaultType) =>
throw new java.lang.UnsupportedOperationException()
}
}

override def toStream(serverRequest: ServerRequest): streams.BinaryStream = {

serverRequest.underlying
.asInstanceOf[StreamedHttpRequest]
.toZIOStream()
.flatMap(httpContent => ZStream.fromChunk(Chunk.fromByteBuffer(httpContent.content.nioBuffer())))
}

private def nettyRequestBytes(serverRequest: ServerRequest): RIO[Env, Array[Byte]] = serverRequest.underlying match {
case req: FullHttpRequest => ZIO.succeed(ByteBufUtil.getBytes(req.content()))
case _: StreamedHttpRequest => toStream(serverRequest).run(ZSink.collectAll[Byte]).map(_.toArray)
case other => ZIO.fail(new UnsupportedOperationException(s"Unexpected Netty request of type ${other.getClass().getName()}"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sttp.tapir.server.netty.zio

import io.netty.channel._
import io.netty.channel.unix.DomainSocketAddress
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.netty.{NettyConfig, Route}
import sttp.tapir.server.netty.internal.{NettyBootstrap, NettyServerHandler}
Expand All @@ -14,14 +15,14 @@ import java.nio.file.{Path, Paths}
import java.util.UUID

case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options: NettyZioServerOptions[R], config: NettyConfig) {
def addEndpoint(se: ZServerEndpoint[R, Any]): NettyZioServer[R] = addEndpoints(List(se))
def addEndpoint(se: ZServerEndpoint[R, Any], overrideOptions: NettyZioServerOptions[R]): NettyZioServer[R] =
def addEndpoint(se: ZServerEndpoint[R, ZioStreams]): NettyZioServer[R] = addEndpoints(List(se))
def addEndpoint(se: ZServerEndpoint[R, ZioStreams], overrideOptions: NettyZioServerOptions[R]): NettyZioServer[R] =
addEndpoints(List(se), overrideOptions)
def addEndpoints(ses: List[ServerEndpoint[Any, RIO[R, *]]]): NettyZioServer[R] = addRoute(
def addEndpoints(ses: List[ServerEndpoint[ZioStreams, RIO[R, *]]]): NettyZioServer[R] = addRoute(
NettyZioServerInterpreter[R](options).toRoute(ses)
)
def addEndpoints(
ses: List[ZServerEndpoint[R, Any]],
ses: List[ZServerEndpoint[R, ZioStreams]],
overrideOptions: NettyZioServerOptions[R]
): NettyZioServer[R] = addRoute(
NettyZioServerInterpreter[R](overrideOptions).toRoute(ses)
Expand Down Expand Up @@ -93,8 +94,9 @@ case class NettyZioServer[R](routes: Vector[RIO[R, Route[RIO[R, *]]]], options:
}

object NettyZioServer {
def apply[R](): NettyZioServer[R] = NettyZioServer(Vector.empty, NettyZioServerOptions.default[R], NettyConfig.defaultNoStreaming)
def apply[R](options: NettyZioServerOptions[R]): NettyZioServer[R] = NettyZioServer(Vector.empty, options, NettyConfig.defaultNoStreaming)
def apply[R](): NettyZioServer[R] = NettyZioServer(Vector.empty, NettyZioServerOptions.default[R], NettyConfig.defaultWithStreaming)
def apply[R](options: NettyZioServerOptions[R]): NettyZioServer[R] =
NettyZioServer(Vector.empty, options, NettyConfig.defaultWithStreaming)
def apply[R](config: NettyConfig): NettyZioServer[R] = NettyZioServer(Vector.empty, NettyZioServerOptions.default[R], config)
def apply[R](options: NettyZioServerOptions[R], config: NettyConfig): NettyZioServer[R] = NettyZioServer(Vector.empty, options, config)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,48 @@
package sttp.tapir.server.netty.zio

import sttp.tapir.server.netty.Route
import sttp.tapir.server.netty.internal.{NettyServerInterpreter, RunAsync}
import sttp.capabilities.zio.ZioStreams
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interceptor.reject.RejectInterceptor
import sttp.tapir.server.interpreter.{BodyListener, FilterServerEndpoints, ServerInterpreter}
import sttp.tapir.server.netty.internal.{NettyBodyListener, RunAsync, _}
import sttp.tapir.server.netty.zio.NettyZioServerInterpreter.ZioRunAsync
import sttp.tapir.ztapir.{RIOMonadError, ZServerEndpoint}
import sttp.tapir.server.netty.zio.internal.ZioStreamCompatible
import sttp.tapir.server.netty.{NettyResponse, NettyServerRequest, Route}
import sttp.tapir.ztapir.{RIOMonadError, ZServerEndpoint, _}
import zio._
import sttp.tapir.ztapir._

trait NettyZioServerInterpreter[R] {
def nettyServerOptions: NettyZioServerOptions[R]

def toRoute[R2](ses: List[ZServerEndpoint[R2, Any]]): RIO[R & R2, Route[RIO[R & R2, *]]] = ZIO.runtime.map { (runtime: Runtime[R & R2]) =>
implicit val monadError: RIOMonadError[R & R2] = new RIOMonadError[R & R2]
val runAsync = new ZioRunAsync[R & R2](runtime)
def toRoute[R2](ses: List[ZServerEndpoint[R2, ZioStreams]]): RIO[R & R2, Route[RIO[R & R2, *]]] = ZIO.runtime.map {
(runtime: Runtime[R & R2]) =>
type F[A] = RIO[R & R2, A]
implicit val monadError: RIOMonadError[R & R2] = new RIOMonadError[R & R2]
val runAsync = new ZioRunAsync[R & R2](runtime)

val widenedSes = ses.map(_.widen[R & R2])
val widenedServerOptions = nettyServerOptions.widen[R & R2]
val widenedSes = ses.map(_.widen[R & R2])
val widenedServerOptions = nettyServerOptions.widen[R & R2]

NettyServerInterpreter
.toRoute(widenedSes, widenedServerOptions.interceptors, widenedServerOptions.createFile, widenedServerOptions.deleteFile, runAsync)
// we want to log & return a 500 in case of defects as well
.andThen(_.resurrect)
implicit val bodyListener: BodyListener[F, NettyResponse] = new NettyBodyListener(runAsync)
val serverInterpreter = new ServerInterpreter[ZioStreams, F, NettyResponse, ZioStreams](
FilterServerEndpoints(widenedSes),
new NettyZioRequestBody(widenedServerOptions.createFile),
new NettyToStreamsResponseBody[ZioStreams](delegate = new NettyToResponseBody(), ZioStreamCompatible(runtime)),
RejectInterceptor.disableWhenSingleEndpoint(widenedServerOptions.interceptors, widenedSes),
widenedServerOptions.deleteFile
)

val handler: Route[F] = { (request: NettyServerRequest) =>
serverInterpreter(request)
.map {
case RequestResult.Response(response) => Some(response)
case RequestResult.Failure(_) => None
}
}

handler
// we want to log & return a 500 in case of defects as well
.andThen(_.resurrect)
}
}

Expand Down
Loading