Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netty streaming for Cats Effect #3017

Merged
merged 18 commits into from
Jul 19, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ private[netty] class NettyCatsRequestBody[F[_]](createFile: ServerRequest => F[T
override def toStream(serverRequest: ServerRequest): streams.BinaryStream = {
val nettyRequest = serverRequest.underlying.asInstanceOf[StreamedHttpRequest]
fs2.Stream
.eval(StreamSubscriber[F, HttpContent](NettyRequestBody.bufferSize))
.eval(StreamSubscriber[F, HttpContent](NettyRequestBody.DefaultChunkSize))
.flatMap(s => s.sub.stream(Sync[F].delay(nettyRequest.subscribe(s))))
.flatMap(httpContent => fs2.Stream.chunk(Chunk.byteBuffer(httpContent.content.nioBuffer())))
}
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@ import fs2.io.file.{Files, Flags, Path}
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.{DefaultHttpContent, HttpContent}
import io.netty.handler.stream.ChunkedStream
import org.reactivestreams.Publisher
import sttp.capabilities.fs2.Fs2Streams
import sttp.model.HasHeaders
@@ -21,12 +20,6 @@ import java.io.InputStream
import java.nio.ByteBuffer
import java.nio.charset.Charset

private[netty] class RangedChunkedStream(raw: InputStream, length: Long) extends ChunkedStream(raw) {

override def isEndOfInput(): Boolean =
super.isEndOfInput || transferredBytes == length
}

class NettyCatsToResponseBody[F[_]: Async](dispatcher: Dispatcher[F]) extends ToResponseBody[NettyResponse, Fs2Streams[F]] {
override val streams: Fs2Streams[F] = Fs2Streams[F]

Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package sttp.tapir.server.netty.cats

import cats.effect.{IO, Resource}
import com.typesafe.scalalogging.StrictLogging
import io.netty.channel.nio.NioEventLoopGroup
import org.scalatest.EitherValues
import org.scalatest.matchers.should.Matchers
import sttp.capabilities.fs2.Fs2Streams
import sttp.monad.MonadError
import sttp.tapir.integ.cats.effect.CatsMonadError
import sttp.tapir.server.netty.internal.FutureUtil
import sttp.tapir.server.tests._
import sttp.tapir.tests.{Test, TestSuite}

class NettyCatsServerTest extends TestSuite with EitherValues with StrictLogging with Matchers {
class NettyCatsServerTest extends TestSuite with EitherValues {
override def tests: Resource[IO, List[Test]] =
backendResource.flatMap { backend =>
Resource
@@ -23,7 +21,8 @@ class NettyCatsServerTest extends TestSuite with EitherValues with StrictLogging
val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher)
val createServerTest = new DefaultCreateServerTest(backend, interpreter)

val tests = new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests() ++ new ServerStreamingTests(createServerTest, Fs2Streams[IO]).tests()
val tests = new AllServerTests(createServerTest, interpreter, backend, multipart = false)
.tests() ++ new ServerStreamingTests(createServerTest, Fs2Streams[IO]).tests()

IO.pure((tests, eventLoopGroup))
} { case (_, eventLoopGroup) =>
Original file line number Diff line number Diff line change
@@ -96,7 +96,6 @@ case class NettyConfig(
def eventLoopGroup(elg: EventLoopGroup): NettyConfig = copy(eventLoopConfig = EventLoopConfig.useExisting(elg))

def initPipeline(f: NettyConfig => (ChannelPipeline, ChannelHandler) => Unit): NettyConfig = copy(initPipeline = f)

}

object NettyConfig {
Original file line number Diff line number Diff line change
@@ -46,6 +46,6 @@ class NettyRequestBody[F[_]](createFile: ServerRequest => F[TapirFile])(implicit
private def nettyRequest(serverRequest: ServerRequest): FullHttpRequest = serverRequest.underlying.asInstanceOf[FullHttpRequest]
}

object NettyRequestBody {
private[internal] val bufferSize = 8192
private[internal] object NettyRequestBody {
val DefaultChunkSize = 8192
}