From 5d3e23a65c8a248d9b8dd3f6c899407fb6fc6932 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 12 Jun 2018 11:02:43 -0600 Subject: [PATCH 1/2] Modify pipeling handlers to require full requests Currently the http pipelining handlers seem to support chunked http content. However, this does not make sense. There is a content aggregator in the pipeline before the pipelining handler. This means the pipelining handler should only see full http messages. Additionally, the request handler immediately after the pipelining handler only supports full messages. This commit modifies both nio and netty4 pipelining handlers to assert that an inbound message is a full http message. Additionally it removes the tests for chunked content. --- .../netty4/Netty4HttpPipeliningHandler.java | 11 +++--- .../Netty4HttpPipeliningHandlerTests.java | 32 ----------------- .../http/nio/NioHttpPipeliningHandler.java | 13 +++---- .../nio/NioHttpPipeliningHandlerTests.java | 34 ------------------- 4 files changed, 8 insertions(+), 82 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index f99bccdaf6178..10f0e967949a8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -22,7 +22,7 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.FullHttpRequest; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.http.HttpPipelinedRequest; @@ -53,12 +53,9 @@ public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - if (msg instanceof LastHttpContent) { - HttpPipelinedRequest pipelinedRequest = aggregator.read(((LastHttpContent) msg)); - ctx.fireChannelRead(pipelinedRequest); - } else { - ctx.fireChannelRead(msg); - } + assert msg instanceof FullHttpRequest : "Message must be type: " + FullHttpRequest.class; + HttpPipelinedRequest pipelinedRequest = aggregator.read(((FullHttpRequest) msg)); + ctx.fireChannelRead(pipelinedRequest); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index 606b050f4e393..8bf5aebe0d1a5 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -148,38 +148,6 @@ public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws Int assertTrue(embeddedChannel.isOpen()); } - public void testThatPipeliningWorksWithChunkedRequests() throws InterruptedException { - final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = - new EmbeddedChannel( - new AggregateUrisAndHeadersHandler(), - new Netty4HttpPipeliningHandler(logger, numberOfRequests), - new WorkEmulatorHandler()); - - for (int i = 0; i < numberOfRequests; i++) { - final DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + i); - embeddedChannel.writeInbound(request); - embeddedChannel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT); - } - - final List latches = new ArrayList<>(); - for (int i = numberOfRequests - 1; i >= 0; i--) { - latches.add(finishRequest(Integer.toString(i))); - } - - for (final CountDownLatch latch : latches) { - latch.await(); - } - - embeddedChannel.flush(); - - for (int i = 0; i < numberOfRequests; i++) { - assertReadHttpMessageHasContent(embeddedChannel, Integer.toString(i)); - } - - assertTrue(embeddedChannel.isOpen()); - } - public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException { final int numberOfRequests = randomIntBetween(2, 128); final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests), diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java index 2b702042ba7a8..553a9a0c0ee2d 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java @@ -22,13 +22,11 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.FullHttpRequest; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.http.HttpPipeliningAggregator; -import org.elasticsearch.http.nio.NettyListener; -import org.elasticsearch.http.nio.NioHttpResponse; import java.nio.channels.ClosedChannelException; import java.util.List; @@ -55,12 +53,9 @@ public NioHttpPipeliningHandler(Logger logger, final int maxEventsHeld) { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - if (msg instanceof LastHttpContent) { - HttpPipelinedRequest pipelinedRequest = aggregator.read(((LastHttpContent) msg)); - ctx.fireChannelRead(pipelinedRequest); - } else { - ctx.fireChannelRead(msg); - } + assert msg instanceof FullHttpRequest : "Message must be type: " + FullHttpRequest.class; + HttpPipelinedRequest pipelinedRequest = aggregator.read(((FullHttpRequest) msg)); + ctx.fireChannelRead(pipelinedRequest); } @Override diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java index b156ef027c086..6773c1b8d0b9c 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java @@ -28,12 +28,10 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.QueryStringDecoder; import org.elasticsearch.common.Randomness; @@ -147,38 +145,6 @@ public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws Int assertTrue(embeddedChannel.isOpen()); } - public void testThatPipeliningWorksWithChunkedRequests() throws InterruptedException { - final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = - new EmbeddedChannel( - new AggregateUrisAndHeadersHandler(), - new NioHttpPipeliningHandler(logger, numberOfRequests), - new WorkEmulatorHandler()); - - for (int i = 0; i < numberOfRequests; i++) { - final DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + i); - embeddedChannel.writeInbound(request); - embeddedChannel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT); - } - - final List latches = new ArrayList<>(); - for (int i = numberOfRequests - 1; i >= 0; i--) { - latches.add(finishRequest(Integer.toString(i))); - } - - for (final CountDownLatch latch : latches) { - latch.await(); - } - - embeddedChannel.flush(); - - for (int i = 0; i < numberOfRequests; i++) { - assertReadHttpMessageHasContent(embeddedChannel, Integer.toString(i)); - } - - assertTrue(embeddedChannel.isOpen()); - } - public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException { final int numberOfRequests = randomIntBetween(2, 128); final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests), From 0eda28ab2bb8831cff8c85f639273c807c7fd433 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 12 Jun 2018 20:58:12 -0600 Subject: [PATCH 2/2] Change assertion messages --- .../http/netty4/Netty4HttpPipeliningHandler.java | 4 ++-- .../org/elasticsearch/http/nio/NioHttpPipeliningHandler.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 10f0e967949a8..12c2e9a685778 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -53,14 +53,14 @@ public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - assert msg instanceof FullHttpRequest : "Message must be type: " + FullHttpRequest.class; + assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass(); HttpPipelinedRequest pipelinedRequest = aggregator.read(((FullHttpRequest) msg)); ctx.fireChannelRead(pipelinedRequest); } @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - assert msg instanceof Netty4HttpResponse : "Message must be type: " + Netty4HttpResponse.class; + assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();; Netty4HttpResponse response = (Netty4HttpResponse) msg; boolean success = false; try { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java index 553a9a0c0ee2d..1eb63364f995a 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java @@ -53,14 +53,14 @@ public NioHttpPipeliningHandler(Logger logger, final int maxEventsHeld) { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - assert msg instanceof FullHttpRequest : "Message must be type: " + FullHttpRequest.class; + assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass(); HttpPipelinedRequest pipelinedRequest = aggregator.read(((FullHttpRequest) msg)); ctx.fireChannelRead(pipelinedRequest); } @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - assert msg instanceof NioHttpResponse : "Message must be type: " + NioHttpResponse.class; + assert msg instanceof NioHttpResponse : "Invalid message type: " + msg.getClass(); NioHttpResponse response = (NioHttpResponse) msg; boolean success = false; try {