Skip to content

Commit

Permalink
Transfer network bytes to smaller buffer (#62673)
Browse files Browse the repository at this point in the history
Currently we read in 64KB blocks from the network. When TLS is not
enabled, these bytes are normally passed all the way to the application
layer (some exceptions: compression). For the HTTP layer this means that
these bytes can live throughout the entire lifecycle of an indexing
request.

The problem is that if the reads from the socket are small, this means
that 64KB buffers can be consumed by 1KB or smaller reads. If the socket
buffer or TCP buffer sizes are small, the leads to massive memory
waste. It has been identified as a major source of OOMs on coordinating
nodes as Elasticsearch easily exhausts the heap for these network bytes.

This commit resolves the problem by placing a handler after the TLS
handler to copy these bytes to a more appropriate buffer size as
necessary. This comes after TLS, because TLS is a framing layer which
often resolves this problem for us (the 64KB buffer will be decoded
into a more appropriate buffer size). However, this extra handler will
solve it for the non-TLS pipelines.
  • Loading branch information
Tim-Brooks authored Sep 30, 2020
1 parent 56d7f91 commit 1547bd6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.NettyByteBufSizer;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.netty4.Netty4Utils;

Expand Down Expand Up @@ -283,6 +284,7 @@ public ChannelHandler configureServerChannelHandler() {
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {

private final Netty4HttpServerTransport transport;
private final NettyByteBufSizer byteBufSizer;
private final Netty4HttpRequestCreator requestCreator;
private final Netty4HttpRequestHandler requestHandler;
private final Netty4HttpResponseCreator responseCreator;
Expand All @@ -291,6 +293,7 @@ protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.byteBufSizer = new NettyByteBufSizer();
this.requestCreator = new Netty4HttpRequestCreator();
this.requestHandler = new Netty4HttpRequestHandler(transport);
this.responseCreator = new Netty4HttpResponseCreator();
Expand All @@ -300,6 +303,7 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

@ChannelHandler.Sharable
public class NettyByteBufSizer extends MessageToMessageDecoder<ByteBuf> {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
int readableBytes = buf.readableBytes();
if (buf.capacity() >= 1024) {
ByteBuf resized = buf.discardReadBytes().capacity(readableBytes);
assert resized.readableBytes() == readableBytes;
out.add(resized.retain());
} else {
out.add(buf.retain());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Netty4NioSocketChannel;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.NettyByteBufSizer;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;
Expand Down Expand Up @@ -326,6 +327,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
protected class ServerChannelInitializer extends ChannelInitializer<Channel> {

protected final String name;
private final NettyByteBufSizer sizer = new NettyByteBufSizer();

protected ServerChannelInitializer(String name) {
this.name = name;
Expand All @@ -338,6 +340,7 @@ protected void initChannel(Channel ch) throws Exception {
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("byte_buf_sizer", sizer);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
serverAcceptedChannel(nettyTcpChannel);
Expand Down

0 comments on commit 1547bd6

Please sign in to comment.