-
Notifications
You must be signed in to change notification settings - Fork 422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Netty streaming for Cats Effect #3017
Conversation
3dac45d
to
24e64e7
Compare
.io() | ||
.use { server => | ||
|
||
val effect: IO[NettyCatsServerBinding[IO]] = server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] effect -> sth descriptive, e.g. startServer
?
doc/server/netty.md
Outdated
* `NettyCatsServer().addEndpoints` to expose `F`-based server endpoints, where `F` is any cats-effect supported effect. | ||
* `NettyZioServer().addEndpoints` to expose `ZIO`-based server endpoints, where `R` represents ZIO requirements supported effect. | ||
- `NettyFutureServer().addEndpoints` to expose `Future`-based server endpoints. | ||
- `NettyCatsServer().addEndpoints` to expose `F`-based server endpoints, where `F` is any cats-effect supported effect. [Streaming](../endpoint/streaming.md) request and response body is supported with fs2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] shouldn't this be "bodies"?
if (cfg.addLoggingHandler) pipeline.addLast(new LoggingHandler()) | ||
() | ||
} | ||
|
||
def defaultWithStreaming: NettyConfig = default.copy(initPipeline = cfg => streamingPipeline(cfg)(_, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should be more explicit here and rename default
to defaultNoStreaming
, same with defaultInitPipeliene
, so that there's no ambiguity
new DefaultStreamedHttpResponse(req.protocolVersion(), HttpResponseStatus.valueOf(serverResponse.code.code), publisher) | ||
|
||
res.setHeadersFrom(serverResponse) | ||
res.handleContentLengthAndChunkedHeaders(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this work if the content-length is explicitly set by the user in headers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check if there's a test for this, but my first thought is we don't want users to set content-length when streaming. It's either transfer-encoding: chunked (stream) or explicit content-length. (4.4 https://greenbytes.de/tech/webdav/rfc2616.html#rfc.section.4.4)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well the user might know what the length of the streaming body is going to be upfront. There's even a test for it:
tapir/server/tests/src/main/scala/sttp/tapir/server/tests/ServerStreamingTests.scala
Line 37 in c3f45cb
.contentLength(penPineapple.length.toLong) |
if
in the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the specification always requires chunked response to be without the content-length header, then this if
is IMO correct, this shouldn't depend on implementation, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How we produce the data (streaming / non-streaming) doesn't have to directly translate to chunked/non-chunked encoding. We can have content with known length, and just produce it lazily
val req = request.retain() | ||
request match { | ||
case full: FullHttpRequest => | ||
val req = full.retain() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the only difference between those branches that we call retain/release in one case, and not in the other? If so, maybe we can extract the common part to a method
case RawBodyType.InputStreamBody => | ||
val stream = inputStreamToFs2(() => v) | ||
(ctx: ChannelHandlerContext) => | ||
new NettyResponseContent.ReactivePublisherNettyResponseContent(ctx.newPromise(), fs2StreamToPublisher(stream)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] other classes don't have the prefix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean other usages of the classes :)
|
||
override def fromRawValue[R](v: R, headers: HasHeaders, format: CodecFormat, bodyType: RawBodyType[R]): NettyResponse = { | ||
bodyType match { | ||
case RawBodyType.StringBody(charset) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most branches are the same as in NettyToResponseBody
? Maybe we can handle the different cases here, and for the raw body types that are the same as in NettyToResponseBody
, delegate to that impl?
Looks good :) |
There's one more thing I need to add here, I'm working on it now. Since we're no longer using |
@kciesielski this also might be part of another task (or sub-task) here: #2971 - I don't think there are any tests for this, for example |
@adamw I just wrote one, for the regular byte buf body. But for the streaming, I was about to suggest the same - to make it another task :) |
Awesome! :) I'd still add a task to polish the max content length, though - maybe we can somehow reuse what the user provides in the exception handler, to create the response? Or create another interceptor? Also, we should make sure that if a limit exists, we never read into memory more than the limit specifies. |
This PR enhances Netty Cats Effect server with streaming.
From now on, the public API is:
Implementation notes:
HttpStreamsServerHandler
from netty-reactive-streams-http. Adding it to the pipeline allows using a response of typeDefaultStreamedHttpResponse
, which wraps a reactivePublisher[HttpContent]
.HttpObjectAggregator
, so I had to split the pipeline into streaming/non-streaming variants.HttpObjectAggregator
we no longer can sendChunkedInput
, but this isn't a problem, we can just send files as streams as well.HttpObjectAggregator
we need to handle request which have a body asStreamedHttpRequest
, which have a.subscribe
method. Using fs2, I create streams which read data from this.subscribe
interface and materialize it into byte arrays as raw values. This may introduce performance overhead.HttpStreamsServerHandler
as instances of a regularFullHttpRequest
.