Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer network bytes to smaller buffer #62673

Merged
merged 10 commits into from
Sep 30, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.NettyByteBufSizer;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.netty4.Netty4Utils;

Expand Down Expand Up @@ -283,6 +284,7 @@ public ChannelHandler configureServerChannelHandler() {
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {

private final Netty4HttpServerTransport transport;
private final NettyByteBufSizer byteBufSizer;
private final Netty4HttpRequestCreator requestCreator;
private final Netty4HttpRequestHandler requestHandler;
private final Netty4HttpResponseCreator responseCreator;
Expand All @@ -291,6 +293,7 @@ protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.byteBufSizer = new NettyByteBufSizer();
this.requestCreator = new Netty4HttpRequestCreator();
this.requestHandler = new Netty4HttpRequestHandler(transport);
this.responseCreator = new Netty4HttpResponseCreator();
Expand All @@ -300,6 +303,7 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

@ChannelHandler.Sharable
public class NettyByteBufSizer extends MessageToMessageDecoder<ByteBuf> {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we really do this in general? It seems only makes sense for REST handlers that don't copy the buffers to unpooled anyway (search and bulk only as of right now). Maybe we should just copy those requests to new pooled buffers of appropriate size and leave the rest of them alone since we're releasing them on the io thread right away anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in the meeting, this is valuable as large messages can be aggregated for a period of time, hurting the memory ratios without this change.

int readableBytes = buf.readableBytes();
if (buf.capacity() >= 1024) {
ByteBuf resized = buf.discardReadBytes().capacity(readableBytes);
assert resized.readableBytes() == readableBytes;
out.add(resized.retain());
} else {
out.add(buf.retain());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Netty4NioSocketChannel;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.NettyByteBufSizer;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;
Expand Down Expand Up @@ -326,6 +327,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
protected class ServerChannelInitializer extends ChannelInitializer<Channel> {

protected final String name;
private final NettyByteBufSizer sizer = new NettyByteBufSizer();

protected ServerChannelInitializer(String name) {
this.name = name;
Expand All @@ -338,6 +340,7 @@ protected void initChannel(Channel ch) throws Exception {
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("byte_buf_sizer", sizer);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
serverAcceptedChannel(nettyTcpChannel);
Expand Down