Skip to content

Commit

Permalink
Handle the incoming LastHttpContent with no content as if it is EMPTY…
Browse files Browse the repository at this point in the history
…_LAST_CONTENT

When the incoming LastHttpContent is EMPTY_LAST_CONTENT, Reactor Netty directly completes the inbound.
Handle the incoming LastHttpContent with no content in the same way instead of buffering it in FluxReceive.

Adapt the tests - instead of counting the received content on the server, directly count the outbound data on the client.
  • Loading branch information
violetagg committed Dec 3, 2024
1 parent 35eca3a commit 3de5b33
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,13 @@ else if (msg == EMPTY_LAST_CONTENT) {
handleLastHttpContent();
}
else if (msgClass == DefaultLastHttpContent.class) {
super.onInboundNext(ctx, msg);
DefaultLastHttpContent lastHttpContent = (DefaultLastHttpContent) msg;
if (lastHttpContent.content().readableBytes() > 0) {
super.onInboundNext(ctx, msg);
}
else {
lastHttpContent.release();
}
handleLastHttpContent();
}
else if (msgClass == DefaultHttpContent.class) {
Expand Down Expand Up @@ -831,7 +837,13 @@ else if (msg instanceof HttpRequest) {
}
}
else if (msg instanceof LastHttpContent) {
super.onInboundNext(ctx, msg);
LastHttpContent lastHttpContent = (LastHttpContent) msg;
if (lastHttpContent.content().readableBytes() > 0) {
super.onInboundNext(ctx, msg);
}
else {
lastHttpContent.release();
}
handleLastHttpContent();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
Expand Down Expand Up @@ -375,25 +380,42 @@ void testMonoRequestBodySentAsFullRequest_Mono() {
doTestMonoRequestBodySentAsFullRequest(ByteBufMono.fromString(Mono.just("test")), 1);
}

@SuppressWarnings("FutureReturnValueIgnored")
private void doTestMonoRequestBodySentAsFullRequest(Publisher<? extends ByteBuf> body, int expectedMsg) {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx =
Http2SslContextSpec.forClient()
.configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE));

AtomicInteger counter = new AtomicInteger();
disposableServer =
createServer()
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(serverCtx))
.handle((req, res) -> req.receiveContent()
.doOnNext(httpContent -> counter.getAndIncrement())
.handle((req, res) -> req.receive()
.then(res.send()))
.bindNow(Duration.ofSeconds(30));

AtomicInteger counter = new AtomicInteger();
createClient(disposableServer.port())
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(clientCtx))
.doOnRequest((req, conn) -> {
ChannelPipeline pipeline = conn.channel().parent().pipeline();
ChannelHandlerContext ctx = pipeline.context(Http2FrameCodec.class);
if (ctx != null) {
pipeline.addAfter(ctx.name(), "testMonoRequestBodySentAsFullRequest",
new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Http2DataFrame) {
counter.getAndIncrement();
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}
});
}
})
.post()
.uri("/")
.send(body)
Expand Down

0 comments on commit 3de5b33

Please sign in to comment.