From fa100e06cfdf5a3bd046261b2739cc48b94c9064 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 11 Oct 2019 16:00:51 -0600 Subject: [PATCH 01/11] Remove option to enable direct buffer pooling This commit removes the option to change the netty system properties to reenable the direct buffer pooling. It also removes the need for us to disable the buffer pooling in the system properties file. Instead, we programmatically craete an allocator that is used by our networking layer. This commit does introduce an Elasticsearch property which allows the user to fallback on the netty default allocator. If they choose this option, they can configure the default allocator how they wish using the standard netty properties. --- .../netty4/Netty4HttpServerTransport.java | 19 +- .../transport/NettyAllocator.java | 163 ++++++++++++++++++ .../transport/netty4/Netty4Transport.java | 30 ++-- .../elasticsearch/ESNetty4IntegTestCase.java | 12 +- .../netty4/SimpleNetty4TransportTests.java | 9 +- 5 files changed, 201 insertions(+), 32 deletions(-) create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 6c1579bc28362..ee32075ef20c5 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -20,7 +20,6 @@ package org.elasticsearch.http.netty4; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; @@ -32,7 +31,6 @@ import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; -import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentDecompressor; @@ -64,6 +62,7 @@ import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.CopyBytesServerSocketChannel; +import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.netty4.Netty4Utils; import java.net.InetSocketAddress; @@ -186,14 +185,14 @@ protected void doStart() { serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))); - // If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child - // channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer - // per-event-loop thread to be used for IO operations. - if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) { - serverBootstrap.channel(NioServerSocketChannel.class); - } else { - serverBootstrap.channel(CopyBytesServerSocketChannel.class); - } + // CopyBytesServerSocketChannel which will create child channels of type CopyBytesSocketChannel. + // CopyBytesSocketChannel pool a single direct buffer per-event-loop thread to be used for IO + // operations. + serverBootstrap.channel(CopyBytesServerSocketChannel.class); + + // Set the allocators for both the server channel and the child channels created + serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); + serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); serverBootstrap.childHandler(configureServerChannelHandler()); serverBootstrap.handler(new ServerChannelExceptionHandler(this)); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java new file mode 100644 index 0000000000000..5c8e4acee26c6 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import org.elasticsearch.common.Booleans; + +public class NettyAllocator { + + public static final ByteBufAllocator ALLOCATOR; + + static { + ByteBufAllocator delegate; + if (Booleans.parseBoolean(System.getProperty("es.unsafe.use_netty_default_allocator"), false)) { + delegate = ByteBufAllocator.DEFAULT; + } else { + if ("unpooled".equals(System.getProperty("io.netty.allocator.type"))) { + delegate = UnpooledByteBufAllocator.DEFAULT; + } else { + int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena(); + int pageSize = PooledByteBufAllocator.defaultPageSize(); + int maxOrder = PooledByteBufAllocator.defaultMaxOrder(); + int tinyCacheSize = PooledByteBufAllocator.defaultTinyCacheSize(); + int smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize(); + int normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize(); + boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads(); + delegate = new PooledByteBufAllocator(false, nHeapArena, 0, pageSize, maxOrder, tinyCacheSize, + smallCacheSize, normalCacheSize, useCacheForAllThreads); + } + } + ALLOCATOR = new NoDirectBuffers(delegate); + } + + private static class NoDirectBuffers implements ByteBufAllocator { + + private final ByteBufAllocator delegate; + + private NoDirectBuffers(ByteBufAllocator delegate) { + this.delegate = delegate; + } + + @Override + public ByteBuf buffer() { + return heapBuffer(); + } + + @Override + public ByteBuf buffer(int initialCapacity) { + return heapBuffer(initialCapacity); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) { + return heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return heapBuffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) { + return heapBuffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { + return heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer() { + return delegate.heapBuffer(); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) { + return delegate.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + return delegate.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf directBuffer() { + throw new UnsupportedOperationException("Direct buffers not supported."); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) { + throw new UnsupportedOperationException("Direct buffers not supported."); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException("Direct buffers not supported."); + } + + @Override + public CompositeByteBuf compositeBuffer() { + return compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + return compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + return delegate.compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + return delegate.compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + throw new UnsupportedOperationException("Direct buffers not supported."); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + throw new UnsupportedOperationException("Direct buffers not supported."); + } + + @Override + public boolean isDirectBufferPooled() { + return delegate.isDirectBufferPooled(); + } + + @Override + public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { + return delegate.calculateNewCapacity(minNewCapacity, maxCapacity); + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d3e43e16dd5f4..9061a6d69fd5a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -21,7 +21,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -34,8 +33,6 @@ import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import org.apache.logging.log4j.LogManager; @@ -61,6 +58,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.CopyBytesServerSocketChannel; import org.elasticsearch.transport.CopyBytesSocketChannel; +import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; @@ -151,14 +149,8 @@ protected void doStart() { private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); - - // If direct buffer pooling is disabled, use the CopyBytesSocketChannel which will pool a single - // direct buffer per-event-loop thread which will be used for IO operations. - if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) { - bootstrap.channel(NioSocketChannel.class); - } else { - bootstrap.channel(CopyBytesSocketChannel.class); - } + bootstrap.channel(CopyBytesSocketChannel.class); + bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings)); bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings)); @@ -216,14 +208,14 @@ private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoop serverBootstrap.group(eventLoopGroup); - // If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child - // channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer - // per-event-loop thread to be used for IO operations. - if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) { - serverBootstrap.channel(NioServerSocketChannel.class); - } else { - serverBootstrap.channel(CopyBytesServerSocketChannel.class); - } + // CopyBytesServerSocketChannel which will create child channels of type CopyBytesSocketChannel. + // CopyBytesSocketChannel pool a single direct buffer per-event-loop thread to be used for IO + // operations. + serverBootstrap.channel(CopyBytesServerSocketChannel.class); + + // Set the allocators for both the server channel and the child channels created + serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); + serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); serverBootstrap.childHandler(getServerChannelInitializer(name)); serverBootstrap.handler(new ServerChannelExceptionHandler()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/ESNetty4IntegTestCase.java b/modules/transport-netty4/src/test/java/org/elasticsearch/ESNetty4IntegTestCase.java index 9d8baf9e3f871..f973029aa4c1f 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/ESNetty4IntegTestCase.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/ESNetty4IntegTestCase.java @@ -18,6 +18,7 @@ */ package org.elasticsearch; +import io.netty.buffer.PooledByteBufAllocator; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; @@ -25,8 +26,8 @@ import org.elasticsearch.transport.Netty4Plugin; import org.elasticsearch.transport.netty4.Netty4Transport; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; public abstract class ESNetty4IntegTestCase extends ESIntegTestCase { @@ -54,6 +55,13 @@ protected Settings nodeSettings(int nodeOrdinal) { @Override protected Collection> nodePlugins() { - return Arrays.asList(Netty4Plugin.class); + return Collections.singletonList(Netty4Plugin.class); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedHeapMemory()); + assertEquals(-1, PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory()); } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 0b210995795bf..d4eba6ed17e19 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty4; +import io.netty.buffer.PooledByteBufAllocator; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -45,6 +46,13 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase { + @Override + public void tearDown() throws Exception { + super.tearDown(); + assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedHeapMemory()); + assertEquals(-1, PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory()); + } + @Override protected Transport build(Settings settings, final Version version, ClusterSettings clusterSettings, boolean doHandshake) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); @@ -73,5 +81,4 @@ public void testConnectException() throws UnknownHostException { assertThat(e.getMessage(), containsString("[127.0.0.1:9876]")); } } - } From 023e11f59584794d3865705c3f2865ad5b1990d4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 11 Oct 2019 16:06:53 -0600 Subject: [PATCH 02/11] Changes --- .../src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy | 1 - distribution/src/config/jvm.options | 1 - 2 files changed, 2 deletions(-) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy index c5335648e65af..aec2172433f63 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy @@ -890,7 +890,6 @@ class BuildPlugin implements Plugin { test.systemProperty('io.netty.noUnsafe', 'true') test.systemProperty('io.netty.noKeySetOptimization', 'true') test.systemProperty('io.netty.recycler.maxCapacityPerThread', '0') - test.systemProperty('io.netty.allocator.numDirectArenas', '0') test.testLogging { TestLoggingContainer logging -> logging.showExceptions = true diff --git a/distribution/src/config/jvm.options b/distribution/src/config/jvm.options index 1c329d39e1465..528589af3eb12 100644 --- a/distribution/src/config/jvm.options +++ b/distribution/src/config/jvm.options @@ -80,7 +80,6 @@ -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 --Dio.netty.allocator.numDirectArenas=0 # log4j 2 -Dlog4j.shutdownHookEnabled=false From 6c241cc454692f2c8e15ed7844798ed347b55716 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 11 Oct 2019 16:26:11 -0600 Subject: [PATCH 03/11] Fix tests --- .../java/org/elasticsearch/ESNetty4IntegTestCase.java | 2 +- .../org/elasticsearch/http/netty4/Netty4HttpClient.java | 9 +++++++-- .../transport/netty4/SimpleNetty4TransportTests.java | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/ESNetty4IntegTestCase.java b/modules/transport-netty4/src/test/java/org/elasticsearch/ESNetty4IntegTestCase.java index f973029aa4c1f..d4a6706153a9b 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/ESNetty4IntegTestCase.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/ESNetty4IntegTestCase.java @@ -62,6 +62,6 @@ protected Collection> nodePlugins() { public void tearDown() throws Exception { super.tearDown(); assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedHeapMemory()); - assertEquals(-1, PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory()); + assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory()); } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java index a595de3a47ed9..b9e2e7e58007c 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java @@ -25,10 +25,10 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -45,6 +45,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.CopyBytesSocketChannel; +import org.elasticsearch.transport.NettyAllocator; import java.io.Closeable; import java.net.SocketAddress; @@ -84,7 +86,10 @@ static Collection returnOpaqueIds(Collection responses private final Bootstrap clientBootstrap; Netty4HttpClient() { - clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).group(new NioEventLoopGroup()); + clientBootstrap = new Bootstrap() + .channel(CopyBytesSocketChannel.class) + .option(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR) + .group(new NioEventLoopGroup()); } public Collection get(SocketAddress remoteAddress, String... uris) throws InterruptedException { diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index d4eba6ed17e19..884cf46a1688f 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -50,7 +50,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase public void tearDown() throws Exception { super.tearDown(); assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedHeapMemory()); - assertEquals(-1, PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory()); + assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory()); } @Override From aae53c65b8d64336c17e0f7dcc71e21eac8b4a22 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 14 Oct 2019 09:56:28 -0600 Subject: [PATCH 04/11] Cleanup --- .../java/org/elasticsearch/transport/NettyAllocator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java index 5c8e4acee26c6..3fc7506dbe9c1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java @@ -31,10 +31,10 @@ public class NettyAllocator { public static final ByteBufAllocator ALLOCATOR; static { - ByteBufAllocator delegate; if (Booleans.parseBoolean(System.getProperty("es.unsafe.use_netty_default_allocator"), false)) { - delegate = ByteBufAllocator.DEFAULT; + ALLOCATOR = ByteBufAllocator.DEFAULT; } else { + ByteBufAllocator delegate; if ("unpooled".equals(System.getProperty("io.netty.allocator.type"))) { delegate = UnpooledByteBufAllocator.DEFAULT; } else { @@ -48,8 +48,8 @@ public class NettyAllocator { delegate = new PooledByteBufAllocator(false, nHeapArena, 0, pageSize, maxOrder, tinyCacheSize, smallCacheSize, normalCacheSize, useCacheForAllThreads); } + ALLOCATOR = new NoDirectBuffers(delegate); } - ALLOCATOR = new NoDirectBuffers(delegate); } private static class NoDirectBuffers implements ByteBufAllocator { From 75f7ce167e072da882b332f24c1beb61bf247757 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 14 Oct 2019 13:49:27 -0600 Subject: [PATCH 05/11] Test thing --- .../java/org/elasticsearch/transport/NettyAllocator.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java index 3fc7506dbe9c1..98ca6de48c36a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java @@ -107,17 +107,19 @@ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { @Override public ByteBuf directBuffer() { - throw new UnsupportedOperationException("Direct buffers not supported."); + // TODO: Remove + return heapBuffer(); } @Override public ByteBuf directBuffer(int initialCapacity) { - throw new UnsupportedOperationException("Direct buffers not supported."); + // TODO: Remove + return heapBuffer(initialCapacity); } @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { - throw new UnsupportedOperationException("Direct buffers not supported."); + return heapBuffer(initialCapacity, maxCapacity); } @Override From 71209def9cdc273114489c3e762d87f19d085712 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 14 Oct 2019 17:28:33 -0600 Subject: [PATCH 06/11] Expand comments --- .../org/elasticsearch/transport/NettyAllocator.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java index 98ca6de48c36a..aac8fad379ec7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java @@ -107,18 +107,25 @@ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { @Override public ByteBuf directBuffer() { - // TODO: Remove + // TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the + // JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap + // ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException return heapBuffer(); } @Override public ByteBuf directBuffer(int initialCapacity) { - // TODO: Remove + // TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the + // JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap + // ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException return heapBuffer(initialCapacity); } @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + // TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the + // JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap + // ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException return heapBuffer(initialCapacity, maxCapacity); } From b0d6ec9b08193e95e2c083a23f5e8f1b3b0270f5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 16 Oct 2019 09:00:57 -0600 Subject: [PATCH 07/11] Changes --- .../org/elasticsearch/tools/launchers/JvmErgonomics.java | 6 +++--- modules/transport-netty4/build.gradle | 4 ++-- .../java/org/elasticsearch/transport/NettyAllocator.java | 7 +++++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java b/distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java index d0d5bef9cfcf4..fe9bddecb4809 100644 --- a/distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java +++ b/distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java @@ -56,11 +56,11 @@ static List choose(final List userDefinedJvmOptions) throws Inte final Map> finalJvmOptions = finalJvmOptions(userDefinedJvmOptions); final long heapSize = extractHeapSize(finalJvmOptions); final Map systemProperties = extractSystemProperties(userDefinedJvmOptions); - if (systemProperties.containsKey("io.netty.allocator.type") == false) { + if (systemProperties.containsKey("es.unsafe.use_unpooled_allocator") == false) { if (heapSize <= 1 << 30) { - ergonomicChoices.add("-Dio.netty.allocator.type=unpooled"); + ergonomicChoices.add("-Des.unsafe.use_unpooled_allocator=true"); } else { - ergonomicChoices.add("-Dio.netty.allocator.type=pooled"); + ergonomicChoices.add("-Des.unsafe.use_unpooled_allocator=false"); } } final long maxDirectMemorySize = extractMaxDirectMemorySize(finalJvmOptions); diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index 62e2d6aa2bf86..6c7d59fc60e8d 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -66,7 +66,7 @@ integTestRunner { TaskProvider pooledTest = tasks.register("pooledTest", Test) { include '**/*Tests.class' systemProperty 'es.set.netty.runtime.available.processors', 'false' - systemProperty 'io.netty.allocator.type', 'pooled' + systemProperty 'es.unsafe.use_unpooled_allocator', 'false' } // TODO: we can't use task avoidance here because RestIntegTestTask does the testcluster creation RestIntegTestTask pooledIntegTest = tasks.create("pooledIntegTest", RestIntegTestTask) { @@ -75,7 +75,7 @@ RestIntegTestTask pooledIntegTest = tasks.create("pooledIntegTest", RestIntegTes } } testClusters.pooledIntegTest { - systemProperty 'io.netty.allocator.type', 'pooled' + systemProperty 'es.unsafe.use_unpooled_allocator', 'false' } check.dependsOn(pooledTest, pooledIntegTest) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java index aac8fad379ec7..4c768acc47906 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java @@ -30,12 +30,15 @@ public class NettyAllocator { public static final ByteBufAllocator ALLOCATOR; + private static final String USE_UNPOOLED = "es.unsafe.use_unpooled_allocator"; + private static final String USE_NETTY_DEFAULT = "es.unsafe.use_netty_default_allocator"; + static { - if (Booleans.parseBoolean(System.getProperty("es.unsafe.use_netty_default_allocator"), false)) { + if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) { ALLOCATOR = ByteBufAllocator.DEFAULT; } else { ByteBufAllocator delegate; - if ("unpooled".equals(System.getProperty("io.netty.allocator.type"))) { + if (Booleans.parseBoolean(System.getProperty(USE_UNPOOLED), false)) { delegate = UnpooledByteBufAllocator.DEFAULT; } else { int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena(); From 86c5bbaf16ef64f54447eb41a1b880cf408d5562 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 16 Oct 2019 11:44:40 -0600 Subject: [PATCH 08/11] Fix test --- .../org/elasticsearch/tools/launchers/JvmErgonomicsTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java b/distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java index 7fe5cd0cf98b0..77eb0b697ff29 100644 --- a/distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java +++ b/distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java @@ -121,14 +121,14 @@ public void testPooledMemoryChoiceOnSmallHeap() throws InterruptedException, IOE final String smallHeap = randomFrom(Arrays.asList("64M", "512M", "1024M", "1G")); assertThat( JvmErgonomics.choose(Arrays.asList("-Xms" + smallHeap, "-Xmx" + smallHeap)), - hasItem("-Dio.netty.allocator.type=unpooled")); + hasItem("-Des.unsafe.use_unpooled_allocator=true")); } public void testPooledMemoryChoiceOnNotSmallHeap() throws InterruptedException, IOException { final String largeHeap = randomFrom(Arrays.asList("1025M", "2048M", "2G", "8G")); assertThat( JvmErgonomics.choose(Arrays.asList("-Xms" + largeHeap, "-Xmx" + largeHeap)), - hasItem("-Dio.netty.allocator.type=pooled")); + hasItem("-Des.unsafe.use_unpooled_allocator=false")); } public void testMaxDirectMemorySizeChoice() throws InterruptedException, IOException { From 8aa74184bee8b3d509bfd737329ffdf01d6ff9ed Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 Oct 2019 12:10:34 -0600 Subject: [PATCH 09/11] Changes from review --- .../tools/launchers/JvmErgonomics.java | 8 ---- .../tools/launchers/JvmErgonomicsTests.java | 14 ------ modules/transport-netty4/build.gradle | 4 +- .../netty4/Netty4HttpServerTransport.java | 11 ++--- .../transport/NettyAllocator.java | 46 +++++++++++++++++-- .../transport/netty4/Netty4Transport.java | 18 ++++---- .../http/netty4/Netty4HttpClient.java | 5 +- 7 files changed, 58 insertions(+), 48 deletions(-) diff --git a/distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java b/distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java index fe9bddecb4809..4a0eab45fb6ee 100644 --- a/distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java +++ b/distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java @@ -55,14 +55,6 @@ static List choose(final List userDefinedJvmOptions) throws Inte final List ergonomicChoices = new ArrayList<>(); final Map> finalJvmOptions = finalJvmOptions(userDefinedJvmOptions); final long heapSize = extractHeapSize(finalJvmOptions); - final Map systemProperties = extractSystemProperties(userDefinedJvmOptions); - if (systemProperties.containsKey("es.unsafe.use_unpooled_allocator") == false) { - if (heapSize <= 1 << 30) { - ergonomicChoices.add("-Des.unsafe.use_unpooled_allocator=true"); - } else { - ergonomicChoices.add("-Des.unsafe.use_unpooled_allocator=false"); - } - } final long maxDirectMemorySize = extractMaxDirectMemorySize(finalJvmOptions); if (maxDirectMemorySize == 0) { ergonomicChoices.add("-XX:MaxDirectMemorySize=" + heapSize / 2); diff --git a/distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java b/distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java index 77eb0b697ff29..ee049a57d8528 100644 --- a/distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java +++ b/distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java @@ -117,20 +117,6 @@ public void testExtractNoSystemProperties() { assertTrue(parsedSystemProperties.isEmpty()); } - public void testPooledMemoryChoiceOnSmallHeap() throws InterruptedException, IOException { - final String smallHeap = randomFrom(Arrays.asList("64M", "512M", "1024M", "1G")); - assertThat( - JvmErgonomics.choose(Arrays.asList("-Xms" + smallHeap, "-Xmx" + smallHeap)), - hasItem("-Des.unsafe.use_unpooled_allocator=true")); - } - - public void testPooledMemoryChoiceOnNotSmallHeap() throws InterruptedException, IOException { - final String largeHeap = randomFrom(Arrays.asList("1025M", "2048M", "2G", "8G")); - assertThat( - JvmErgonomics.choose(Arrays.asList("-Xms" + largeHeap, "-Xmx" + largeHeap)), - hasItem("-Des.unsafe.use_unpooled_allocator=false")); - } - public void testMaxDirectMemorySizeChoice() throws InterruptedException, IOException { final Map heapMaxDirectMemorySize = Map.of( "64M", Long.toString((64L << 20) / 2), diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index 6c7d59fc60e8d..98cb23ae28515 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -66,7 +66,7 @@ integTestRunner { TaskProvider pooledTest = tasks.register("pooledTest", Test) { include '**/*Tests.class' systemProperty 'es.set.netty.runtime.available.processors', 'false' - systemProperty 'es.unsafe.use_unpooled_allocator', 'false' + systemProperty 'es.use_unpooled_allocator', 'false' } // TODO: we can't use task avoidance here because RestIntegTestTask does the testcluster creation RestIntegTestTask pooledIntegTest = tasks.create("pooledIntegTest", RestIntegTestTask) { @@ -75,7 +75,7 @@ RestIntegTestTask pooledIntegTest = tasks.create("pooledIntegTest", RestIntegTes } } testClusters.pooledIntegTest { - systemProperty 'es.unsafe.use_unpooled_allocator', 'false' + systemProperty 'es.use_unpooled_allocator', 'false' } check.dependsOn(pooledTest, pooledIntegTest) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index ee32075ef20c5..cefa589a103f7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -61,7 +61,6 @@ import org.elasticsearch.http.HttpServerChannel; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.CopyBytesServerSocketChannel; import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -185,14 +184,12 @@ protected void doStart() { serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))); - // CopyBytesServerSocketChannel which will create child channels of type CopyBytesSocketChannel. - // CopyBytesSocketChannel pool a single direct buffer per-event-loop thread to be used for IO - // operations. - serverBootstrap.channel(CopyBytesServerSocketChannel.class); + // NettyAllocator will return the channel type designed to work with the configuredAllocator + serverBootstrap.channel(NettyAllocator.getServerChannelType()); // Set the allocators for both the server channel and the child channels created - serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); - serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); + serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); + serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); serverBootstrap.childHandler(configureServerChannelHandler()); serverBootstrap.handler(new ServerChannelExceptionHandler(this)); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java index 4c768acc47906..d5076ce3cbc30 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java @@ -24,13 +24,18 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ServerChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import org.elasticsearch.common.Booleans; +import org.elasticsearch.monitor.jvm.JvmInfo; public class NettyAllocator { - public static final ByteBufAllocator ALLOCATOR; + private static final ByteBufAllocator ALLOCATOR; - private static final String USE_UNPOOLED = "es.unsafe.use_unpooled_allocator"; + private static final String USE_UNPOOLED = "es.use_unpooled_allocator"; private static final String USE_NETTY_DEFAULT = "es.unsafe.use_netty_default_allocator"; static { @@ -38,8 +43,8 @@ public class NettyAllocator { ALLOCATOR = ByteBufAllocator.DEFAULT; } else { ByteBufAllocator delegate; - if (Booleans.parseBoolean(System.getProperty(USE_UNPOOLED), false)) { - delegate = UnpooledByteBufAllocator.DEFAULT; + if (useUnpooled()) { + delegate = new NoDirectBuffers(UnpooledByteBufAllocator.DEFAULT); } else { int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena(); int pageSize = PooledByteBufAllocator.defaultPageSize(); @@ -55,6 +60,39 @@ public class NettyAllocator { } } + public static boolean useCopySocket() { + return ALLOCATOR instanceof NoDirectBuffers; + } + + public static ByteBufAllocator getAllocator() { + return ALLOCATOR; + } + + public static Class getChannelType() { + if (ALLOCATOR instanceof NoDirectBuffers) { + return CopyBytesSocketChannel.class; + } else { + return NioSocketChannel.class; + } + } + + public static Class getServerChannelType() { + if (ALLOCATOR instanceof NoDirectBuffers) { + return CopyBytesServerSocketChannel.class; + } else { + return NioServerSocketChannel.class; + } + } + + private static boolean useUnpooled() { + if (Booleans.parseBoolean(System.getProperty(USE_UNPOOLED), false)) { + return true; + } else { + long heapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes(); + return heapSize <= 1 << 30; + } + } + private static class NoDirectBuffers implements ByteBufAllocator { private final ByteBufAllocator delegate; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 9061a6d69fd5a..93f41285c5f3d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -56,8 +56,6 @@ import org.elasticsearch.core.internal.net.NetUtils; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.CopyBytesServerSocketChannel; -import org.elasticsearch.transport.CopyBytesSocketChannel; import org.elasticsearch.transport.NettyAllocator; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; @@ -149,8 +147,10 @@ protected void doStart() { private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) { final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); - bootstrap.channel(CopyBytesSocketChannel.class); - bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); + + // NettyAllocator will return the channel type designed to work with the configured allocator + bootstrap.channel(NettyAllocator.getChannelType()); + bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings)); bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings)); @@ -208,14 +208,12 @@ private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoop serverBootstrap.group(eventLoopGroup); - // CopyBytesServerSocketChannel which will create child channels of type CopyBytesSocketChannel. - // CopyBytesSocketChannel pool a single direct buffer per-event-loop thread to be used for IO - // operations. - serverBootstrap.channel(CopyBytesServerSocketChannel.class); + // NettyAllocator will return the channel type designed to work with the configuredAllocator + serverBootstrap.channel(NettyAllocator.getServerChannelType()); // Set the allocators for both the server channel and the child channels created - serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); - serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR); + serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); + serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()); serverBootstrap.childHandler(getServerChannelInitializer(name)); serverBootstrap.handler(new ServerChannelExceptionHandler()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java index b9e2e7e58007c..df3aadb9bd5e1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java @@ -45,7 +45,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.tasks.Task; -import org.elasticsearch.transport.CopyBytesSocketChannel; import org.elasticsearch.transport.NettyAllocator; import java.io.Closeable; @@ -87,8 +86,8 @@ static Collection returnOpaqueIds(Collection responses Netty4HttpClient() { clientBootstrap = new Bootstrap() - .channel(CopyBytesSocketChannel.class) - .option(ChannelOption.ALLOCATOR, NettyAllocator.ALLOCATOR) + .channel(NettyAllocator.getChannelType()) + .option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()) .group(new NioEventLoopGroup()); } From b024371486cc2c8832aac14f0b59d2daa19b316d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 21 Oct 2019 09:24:59 -0600 Subject: [PATCH 10/11] Changes --- .../java/org/elasticsearch/transport/NettyAllocator.java | 6 +++--- .../org/elasticsearch/http/netty4/Netty4HttpClient.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java index d5076ce3cbc30..f8560b13d07ff 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java @@ -85,8 +85,8 @@ public static Class getServerChannelType() { } private static boolean useUnpooled() { - if (Booleans.parseBoolean(System.getProperty(USE_UNPOOLED), false)) { - return true; + if (System.getProperty(USE_UNPOOLED) != null) { + return Booleans.parseBoolean(System.getProperty(USE_UNPOOLED)); } else { long heapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes(); return heapSize <= 1 << 30; @@ -202,7 +202,7 @@ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { @Override public boolean isDirectBufferPooled() { - return delegate.isDirectBufferPooled(); + return false; } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java index df3aadb9bd5e1..558e833c74bed 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java @@ -88,7 +88,7 @@ static Collection returnOpaqueIds(Collection responses clientBootstrap = new Bootstrap() .channel(NettyAllocator.getChannelType()) .option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()) - .group(new NioEventLoopGroup()); + .group(new NioEventLoopGroup(1)); } public Collection get(SocketAddress remoteAddress, String... uris) throws InterruptedException { From bfd10031451c881daf288a31c7199939a0d80cdc Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 21 Oct 2019 09:45:31 -0600 Subject: [PATCH 11/11] Assert --- .../main/java/org/elasticsearch/transport/NettyAllocator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java index f8560b13d07ff..bfe0a92a9f2b8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/NettyAllocator.java @@ -202,6 +202,7 @@ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { @Override public boolean isDirectBufferPooled() { + assert delegate.isDirectBufferPooled() == false; return false; }