From 7f6d1981a19aeb1b1b2dbd793ffaf51fdac69c7a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 30 Sep 2020 11:31:54 -0600 Subject: [PATCH] Transfer network bytes to smaller buffer (#62673) Currently we read in 64KB blocks from the network. When TLS is not enabled, these bytes are normally passed all the way to the application layer (some exceptions: compression). For the HTTP layer this means that these bytes can live throughout the entire lifecycle of an indexing request. The problem is that if the reads from the socket are small, this means that 64KB buffers can be consumed by 1KB or smaller reads. If the socket buffer or TCP buffer sizes are small, the leads to massive memory waste. It has been identified as a major source of OOMs on coordinating nodes as Elasticsearch easily exhausts the heap for these network bytes. This commit resolves the problem by placing a handler after the TLS handler to copy these bytes to a more appropriate buffer size as necessary. This comes after TLS, because TLS is a framing layer which often resolves this problem for us (the 64KB buffer will be decoded into a more appropriate buffer size). However, this extra handler will solve it for the non-TLS pipelines. --- .../netty4/Netty4HttpServerTransport.java | 4 ++ .../transport/NettyByteBufSizer.java | 43 +++++++++++++++++++ .../transport/netty4/Netty4Transport.java | 3 ++ 3 files changed, 50 insertions(+) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyByteBufSizer.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 6f32b6cb055ca..b0010a31375c8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -61,6 +61,7 @@ import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NettyAllocator; +import org.elasticsearch.transport.NettyByteBufSizer; import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -283,6 +284,7 @@ public ChannelHandler configureServerChannelHandler() { protected static class HttpChannelHandler extends ChannelInitializer { private final Netty4HttpServerTransport transport; + private final NettyByteBufSizer byteBufSizer; private final Netty4HttpRequestCreator requestCreator; private final Netty4HttpRequestHandler requestHandler; private final Netty4HttpResponseCreator responseCreator; @@ -291,6 +293,7 @@ protected static class HttpChannelHandler extends ChannelInitializer { protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) { this.transport = transport; this.handlingSettings = handlingSettings; + this.byteBufSizer = new NettyByteBufSizer(); this.requestCreator = new Netty4HttpRequestCreator(); this.requestHandler = new Netty4HttpRequestHandler(transport); this.responseCreator = new Netty4HttpResponseCreator(); @@ -300,6 +303,7 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht protected void initChannel(Channel ch) throws Exception { Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch); ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel); + ch.pipeline().addLast("byte_buf_sizer", byteBufSizer); ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)); final HttpRequestDecoder decoder = new HttpRequestDecoder( handlingSettings.getMaxInitialLineLength(), diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyByteBufSizer.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyByteBufSizer.java new file mode 100644 index 0000000000000..e2eadf5690be2 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyByteBufSizer.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; + +import java.util.List; + +@ChannelHandler.Sharable +public class NettyByteBufSizer extends MessageToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) { + int readableBytes = buf.readableBytes(); + if (buf.capacity() >= 1024) { + ByteBuf resized = buf.discardReadBytes().capacity(readableBytes); + assert resized.readableBytes() == readableBytes; + out.add(resized.retain()); + } else { + out.add(buf.retain()); + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index b66f68998ca2b..24ead83609415 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -54,6 +54,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Netty4NioSocketChannel; import org.elasticsearch.transport.NettyAllocator; +import org.elasticsearch.transport.NettyByteBufSizer; import org.elasticsearch.transport.SharedGroupFactory; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; @@ -326,6 +327,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E protected class ServerChannelInitializer extends ChannelInitializer { protected final String name; + private final NettyByteBufSizer sizer = new NettyByteBufSizer(); protected ServerChannelInitializer(String name) { this.name = name; @@ -338,6 +340,7 @@ protected void initChannel(Channel ch) throws Exception { NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture()); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); + ch.pipeline().addLast("byte_buf_sizer", sizer); ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this)); serverAcceptedChannel(nettyTcpChannel);