Skip to content

Commit

Permalink
Revert "Make http pipelining support mandatory (#30695)" (#30813)
Browse files Browse the repository at this point in the history
This reverts commit 31251c9 introduced in #30695.

We suspect this commit is causing the OOME's reported in #30811 and we will use this PR to test this assertion.
  • Loading branch information
colings86 authored and Tim-Brooks committed May 23, 2018
1 parent ab047ca commit 4fd0a3e
Show file tree
Hide file tree
Showing 38 changed files with 669 additions and 1,002 deletions.
10 changes: 1 addition & 9 deletions docs/reference/migration/migrate_7_0/settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@
[[remove-http-enabled]]
==== Http enabled setting removed

* The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing
The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing
use of the transport client. This setting has been removed, as the transport client
will be removed in the future, thus requiring HTTP to always be enabled.

[[remove-http-pipelining-setting]]
==== Http pipelining setting removed

* The setting `http.pipelining` previously allowed disabling HTTP pipelining support.
This setting has been removed, as disabling http pipelining support on the server
provided little value. The setting `http.pipelining.max_events` can still be used to
limit the number of pipelined requests in-flight.
2 changes: 2 additions & 0 deletions docs/reference/modules/http.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ and stack traces in response output. Note: When set to `false` and the `error_tr
parameter is specified, an error will be returned; when `error_trace` is not specified, a
simple message will be returned. Defaults to `true`

|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`.

|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`.

|`http.max_warning_header_count` |The maximum number of warning headers in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -58,24 +59,29 @@ final class Netty4HttpChannel extends AbstractRestChannel {
private final Netty4HttpServerTransport transport;
private final Channel channel;
private final FullHttpRequest nettyRequest;
private final int sequence;
private final HttpPipelinedRequest pipelinedRequest;
private final ThreadContext threadContext;
private final HttpHandlingSettings handlingSettings;

/**
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel.
* @param sequence The pipelining sequence number for this request
* @param handlingSettings true if error messages should include stack traces.
* @param threadContext the thread context for the channel
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel.
* @param pipelinedRequest If HTTP pipelining is enabled provide the corresponding pipelined request. May be null if
* HTTP pipelining is disabled.
* @param handlingSettings true iff error messages should include stack traces.
* @param threadContext the thread context for the channel
*/
Netty4HttpChannel(Netty4HttpServerTransport transport, Netty4HttpRequest request, int sequence, HttpHandlingSettings handlingSettings,
ThreadContext threadContext) {
Netty4HttpChannel(
final Netty4HttpServerTransport transport,
final Netty4HttpRequest request,
final HttpPipelinedRequest pipelinedRequest,
final HttpHandlingSettings handlingSettings,
final ThreadContext threadContext) {
super(request, handlingSettings.getDetailedErrorsEnabled());
this.transport = transport;
this.channel = request.getChannel();
this.nettyRequest = request.request();
this.sequence = sequence;
this.pipelinedRequest = pipelinedRequest;
this.threadContext = threadContext;
this.handlingSettings = handlingSettings;
}
Expand Down Expand Up @@ -123,7 +129,7 @@ public void sendResponse(RestResponse response) {
final ChannelPromise promise = channel.newPromise();

if (releaseContent) {
promise.addListener(f -> ((Releasable) content).close());
promise.addListener(f -> ((Releasable)content).close());
}

if (releaseBytesStreamOutput) {
Expand All @@ -134,9 +140,13 @@ public void sendResponse(RestResponse response) {
promise.addListener(ChannelFutureListener.CLOSE);
}

Netty4HttpResponse newResponse = new Netty4HttpResponse(sequence, resp);

channel.writeAndFlush(newResponse, promise);
final Object msg;
if (pipelinedRequest != null) {
msg = pipelinedRequest.createHttpResponse(resp, promise);
} else {
msg = resp;
}
channel.writeAndFlush(msg, promise);
releaseContent = false;
releaseBytesStreamOutput = false;
} finally {
Expand All @@ -146,6 +156,9 @@ public void sendResponse(RestResponse response) {
if (releaseBytesStreamOutput) {
bytesOutputOrNull().close();
}
if (pipelinedRequest != null) {
pipelinedRequest.release();
}
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,41 @@
import io.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.Collections;

@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {

private final Netty4HttpServerTransport serverTransport;
private final HttpHandlingSettings handlingSettings;
private final boolean httpPipeliningEnabled;
private final ThreadContext threadContext;

Netty4HttpRequestHandler(Netty4HttpServerTransport serverTransport, HttpHandlingSettings handlingSettings,
ThreadContext threadContext) {
this.serverTransport = serverTransport;
this.httpPipeliningEnabled = serverTransport.pipelining;
this.handlingSettings = handlingSettings;
this.threadContext = threadContext;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
final FullHttpRequest request = msg.getRequest();
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final FullHttpRequest request;
final HttpPipelinedRequest pipelinedRequest;
if (this.httpPipeliningEnabled && msg instanceof HttpPipelinedRequest) {
pipelinedRequest = (HttpPipelinedRequest) msg;
request = (FullHttpRequest) pipelinedRequest.last();
} else {
pipelinedRequest = null;
request = (FullHttpRequest) msg;
}

boolean success = false;
try {

final FullHttpRequest copy =
Expand Down Expand Up @@ -100,7 +111,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<Full
Netty4HttpChannel innerChannel;
try {
innerChannel =
new Netty4HttpChannel(serverTransport, httpRequest, msg.getSequence(), handlingSettings, threadContext);
new Netty4HttpChannel(serverTransport, httpRequest, pipelinedRequest, handlingSettings, threadContext);
} catch (final IllegalArgumentException e) {
if (badRequestCause == null) {
badRequestCause = e;
Expand All @@ -115,7 +126,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<Full
copy,
ctx.channel());
innerChannel =
new Netty4HttpChannel(serverTransport, innerRequest, msg.getSequence(), handlingSettings, threadContext);
new Netty4HttpChannel(serverTransport, innerRequest, pipelinedRequest, handlingSettings, threadContext);
}
channel = innerChannel;
}
Expand All @@ -127,9 +138,12 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<Full
} else {
serverTransport.dispatchRequest(httpRequest, channel);
}
success = true;
} finally {
// As we have copied the buffer, we can release the request
request.release();
// the request is otherwise released in case of dispatch
if (success == false && pipelinedRequest != null) {
pipelinedRequest.release();
}
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
Expand Down Expand Up @@ -98,6 +99,7 @@
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
import static org.elasticsearch.http.netty4.cors.Netty4CorsHandler.ANY_ORIGIN;

Expand Down Expand Up @@ -160,6 +162,8 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {

protected final int workerCount;

protected final boolean pipelining;

protected final int pipeliningMaxEvents;

/**
Expand Down Expand Up @@ -200,16 +204,14 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.httpHandlingSettings = new HttpHandlingSettings(Math.toIntExact(maxContentLength.getBytes()),
Math.toIntExact(maxChunkSize.getBytes()),
Math.toIntExact(maxHeaderSize.getBytes()),
Math.toIntExact(maxInitialLineLength.getBytes()),
SETTING_HTTP_RESET_COOKIES.get(settings),
SETTING_HTTP_COMPRESSION.get(settings),
SETTING_HTTP_COMPRESSION_LEVEL.get(settings),
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings),
pipeliningMaxEvents);
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings));

this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
Expand All @@ -224,12 +226,14 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());

this.pipelining = SETTING_PIPELINING.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.corsConfig = buildCorsConfig(settings);

logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " +
"receive_predictor[{}], max_composite_buffer_components[{}], pipelining_max_events[{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength, receivePredictor, maxCompositeBufferComponents,
pipeliningMaxEvents);
"receive_predictor[{}], max_composite_buffer_components[{}], pipelining[{}], pipelining_max_events[{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictor, maxCompositeBufferComponents,
pipelining, pipeliningMaxEvents);
}

public Settings settings() {
Expand Down Expand Up @@ -448,7 +452,9 @@ protected void initChannel(Channel ch) throws Exception {
if (SETTING_CORS_ENABLED.get(transport.settings())) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
}
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
if (transport.pipelining) {
ch.pipeline().addLast("pipelining", new HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
}
ch.pipeline().addLast("handler", requestHandler);
}

Expand Down
Loading

0 comments on commit 4fd0a3e

Please sign in to comment.