Skip to content

Commit

Permalink
[#1773] improvement(server): Reduce chunkSize of gRPC Netty's PooledB…
Browse files Browse the repository at this point in the history
…yteBufAllocator to reduce memory usage (#1780)

### What changes were proposed in this pull request?

Introduce configs to reduce the chunkSize of the gRPC internal Netty's PooledByteBufAllocator to reduce memory usage on the server-side.
When enabling Netty, the changes for gRPC internal Netty's configs will be as follows:
`io.netty.allocator.pageSize`: 8192 -> 4096 (bytes)
`io.netty.allocator.maxOrder`: 11 -> 3
`io.netty.allocator.smallCacheSize`: 128 -> 1024
Note: `chunkSize` = `pageSize` * 2^`maxOrder`, 
Thus, the `io.netty.allocator.chunkSize`: 2MB -> 32KB

### Why are the changes needed?

For #1773.

### Does this PR introduce _any_ user-facing change?

No. This PR will not change anything when using GRPC. It will only take effect when using GRPC_NETTY.
Adds configs: `rss.rpc.netty.pageSize`, `rss.rpc.netty.maxOrder`, `rss.rpc.netty.smallCacheSize`

### How was this patch tested?

Existing tests.
  • Loading branch information
rickyma authored Jun 13, 2024
1 parent 4495668 commit 3d0c91a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,30 @@ public class RssBaseConf extends RssConf {
.defaultValue(true)
.withDescription("If enable metrics for rpc connection");

public static final ConfigOption<Integer> RPC_NETTY_PAGE_SIZE =
ConfigOptions.key("rss.rpc.netty.pageSize")
.intType()
.defaultValue(4096)
.withDescription(
"The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the server-side. "
+ "This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY.");

public static final ConfigOption<Integer> RPC_NETTY_MAX_ORDER =
ConfigOptions.key("rss.rpc.netty.maxOrder")
.intType()
.defaultValue(3)
.withDescription(
"The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the server-side. "
+ "This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY.");

public static final ConfigOption<Integer> RPC_NETTY_SMALL_CACHE_SIZE =
ConfigOptions.key("rss.rpc.netty.smallCacheSize")
.intType()
.defaultValue(1024)
.withDescription(
"The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the server-side. "
+ "This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY.");

public static final ConfigOption<Integer> JETTY_HTTP_PORT =
ConfigOptions.key("rss.jetty.http.port")
.intType()
Expand Down
19 changes: 17 additions & 2 deletions common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +46,7 @@
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.GrpcNettyUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;

Expand Down Expand Up @@ -93,10 +97,21 @@ static void reset() {
private Server buildGrpcServer(int serverPort) {
boolean isMetricsEnabled = rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
long maxInboundMessageSize = rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
ServerType serverType = rssConf.get(RssBaseConf.RPC_SERVER_TYPE);
int pageSize = rssConf.getInteger(RssBaseConf.RPC_NETTY_PAGE_SIZE);
int maxOrder = rssConf.getInteger(RssBaseConf.RPC_NETTY_MAX_ORDER);
int smallCacheSize = rssConf.getInteger(RssBaseConf.RPC_NETTY_SMALL_CACHE_SIZE);
PooledByteBufAllocator pooledByteBufAllocator =
serverType == ServerType.GRPC
? GrpcNettyUtils.createPooledByteBufAllocator(true, 0, 0, 0, 0)
: GrpcNettyUtils.createPooledByteBufAllocatorWithSmallCacheOnly(
true, 0, pageSize, maxOrder, smallCacheSize);
ServerBuilder<?> builder =
ServerBuilder.forPort(serverPort)
NettyServerBuilder.forPort(serverPort)
.executor(pool)
.maxInboundMessageSize((int) maxInboundMessageSize);
.maxInboundMessageSize((int) maxInboundMessageSize)
.withOption(ChannelOption.ALLOCATOR, pooledByteBufAllocator)
.withChildOption(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
if (isMetricsEnabled) {
builder.addTransportFilter(new MonitoringServerTransportFilter(grpcMetrics));
}
Expand Down
4 changes: 4 additions & 0 deletions docs/client_guide/client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ The important configuration of client is listed as following. These configuratio
| <client_type>.rss.client.max.concurrency.of.per-partition.write | - | The maximum number of files that can be written concurrently to a single partition is determined. This value will only be respected by the remote shuffle server if it is greater than 0. |
| <client_type>.rss.client.rpc.timeout.ms | 60000 | Timeout in milliseconds for RPC calls. |
| <client_type>.rss.client.rpc.maxAttempts | 3 | When we fail to send RPC calls, we will retry for maxAttempts times. |
| <client_type>.rss.client.rpc.netty.pageSize | 4096 | The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
| <client_type>.rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
| <client_type>.rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |

Notice:

1. `<client_type>` should be `mapreduce` `tez` or `spark`
Expand Down
3 changes: 3 additions & 0 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ This document will introduce how to deploy Uniffle shuffle servers.
| rss.coordinator.quorum | - | Coordinator quorum |
| rss.rpc.server.type | GRPC | Shuffle server type, supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using GRPC_NETTY to enable Netty on the server side for better stability and performance. |
| rss.rpc.server.port | 19999 | RPC port for Shuffle server, if set zero, grpc server start on random port. |
| rss.rpc.netty.pageSize | 4096 | The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the server-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
| rss.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the server-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
| rss.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the server-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
| rss.jetty.http.port | 19998 | Http port for Shuffle server |
| rss.server.netty.port | -1 | Netty port for Shuffle server, if set zero, Netty server start on random port. |
| rss.server.netty.epoll.enable | false | Whether to enable epoll model with Netty server. |
Expand Down

0 comments on commit 3d0c91a

Please sign in to comment.