From 041b1d2957f0e8bbafb7cbd91aff2309a81121e3 Mon Sep 17 00:00:00 2001 From: chenyuzhi Date: Wed, 11 Dec 2024 16:46:56 +0800 Subject: [PATCH 1/4] [FLINK-36876] Support external eventLoopGroup for RestClient --- .../program/rest/RestClusterClient.java | 21 ++++++-- .../apache/flink/runtime/rest/RestClient.java | 52 +++++++++++++------ .../flink/runtime/rest/RestClientTest.java | 17 ++++++ 3 files changed, 70 insertions(+), 20 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index fc466efa5a179..49818a7df0248 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -133,6 +133,7 @@ import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.slf4j.Logger; @@ -215,7 +216,16 @@ public RestClusterClient(Configuration config, T clusterId) throws Exception { public RestClusterClient( Configuration config, T clusterId, ClientHighAvailabilityServicesFactory factory) throws Exception { - this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), factory); + this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), factory, null); + } + + public RestClusterClient( + Configuration config, + T clusterId, + ClientHighAvailabilityServicesFactory factory, + EventLoopGroup group) + throws Exception { + this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), factory, group); } @VisibleForTesting @@ -230,7 +240,8 @@ public RestClusterClient( restClient, clusterId, waitStrategy, - DefaultClientHighAvailabilityServicesFactory.INSTANCE); + DefaultClientHighAvailabilityServicesFactory.INSTANCE, + null); } private RestClusterClient( @@ -238,7 +249,8 @@ private RestClusterClient( @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy, - ClientHighAvailabilityServicesFactory clientHAServicesFactory) + ClientHighAvailabilityServicesFactory clientHAServicesFactory, + EventLoopGroup group) throws Exception { this.configuration = checkNotNull(configuration); @@ -258,7 +270,8 @@ private RestClusterClient( if (restClient != null) { this.restClient = restClient; } else { - this.restClient = RestClient.forUrl(configuration, executorService, jobmanagerUrl); + this.restClient = + RestClient.forUrl(configuration, executorService, jobmanagerUrl, group); } this.waitStrategy = checkNotNull(waitStrategy); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 152609b6a7283..1a5e4d5063341 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -59,6 +59,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.flink.shaded.netty4.io.netty.channel.DefaultSelectStrategyFactory; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategyFactory; import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; @@ -143,6 +144,7 @@ public class RestClient implements AutoCloseableAsync { ConcurrentHashMap.newKeySet(); private final List outboundChannelHandlerFactories; + private final Boolean useInternalEventLoopGroup; /** * Creates a new RestClient for the provided root URL. If the protocol of the URL is "https", @@ -150,23 +152,34 @@ public class RestClient implements AutoCloseableAsync { */ public static RestClient forUrl(Configuration configuration, Executor executor, URL rootUrl) throws ConfigurationException { + return forUrl(configuration, executor, rootUrl, null); + } + + public static RestClient forUrl( + Configuration configuration, Executor executor, URL rootUrl, EventLoopGroup group) + throws ConfigurationException { Preconditions.checkNotNull(configuration); Preconditions.checkNotNull(rootUrl); if ("https".equals(rootUrl.getProtocol())) { configuration = configuration.clone(); configuration.set(SSL_REST_ENABLED, true); } - return new RestClient(configuration, executor, rootUrl.getHost(), rootUrl.getPort()); + return new RestClient(configuration, executor, rootUrl.getHost(), rootUrl.getPort(), group); } public RestClient(Configuration configuration, Executor executor) throws ConfigurationException { - this(configuration, executor, null, -1); + this(configuration, executor, null, -1, null); } - public RestClient(Configuration configuration, Executor executor, String host, int port) + public RestClient( + Configuration configuration, + Executor executor, + String host, + int port, + EventLoopGroup group) throws ConfigurationException { - this(configuration, executor, host, port, DefaultSelectStrategyFactory.INSTANCE); + this(configuration, executor, host, port, DefaultSelectStrategyFactory.INSTANCE, group); } @VisibleForTesting @@ -175,7 +188,7 @@ public RestClient(Configuration configuration, Executor executor, String host, i Executor executor, SelectStrategyFactory selectStrategyFactory) throws ConfigurationException { - this(configuration, executor, null, -1, selectStrategyFactory); + this(configuration, executor, null, -1, selectStrategyFactory, null); } private RestClient( @@ -183,7 +196,8 @@ private RestClient( Executor executor, String host, int port, - SelectStrategyFactory selectStrategyFactory) + SelectStrategyFactory selectStrategyFactory, + EventLoopGroup group) throws ConfigurationException { Preconditions.checkNotNull(configuration); this.executor = Preconditions.checkNotNull(executor); @@ -264,15 +278,21 @@ protected void initChannel(SocketChannel socketChannel) { } }; - // No NioEventLoopGroup constructor available that allows passing nThreads, threadFactory, - // and selectStrategyFactory without also passing a SelectorProvider, so mimicking its - // default value seen in other constructors - NioEventLoopGroup group = - new NioEventLoopGroup( - 1, - new ExecutorThreadFactory("flink-rest-client-netty"), - SelectorProvider.provider(), - selectStrategyFactory); + if (group == null) { + // No NioEventLoopGroup constructor available that allows passing nThreads, + // threadFactory, + // and selectStrategyFactory without also passing a SelectorProvider, so mimicking its + // default value seen in other constructors + group = + new NioEventLoopGroup( + 1, + new ExecutorThreadFactory("flink-rest-client-netty"), + SelectorProvider.provider(), + selectStrategyFactory); + useInternalEventLoopGroup = true; + } else { + useInternalEventLoopGroup = false; + } bootstrap = new Bootstrap(); bootstrap @@ -317,7 +337,7 @@ private CompletableFuture shutdownInternally(Duration timeout) { LOG.debug("Shutting down rest endpoint."); if (bootstrap != null) { - if (bootstrap.config().group() != null) { + if (bootstrap.config().group() != null && useInternalEventLoopGroup) { bootstrap .config() .group() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java index cd6b76ec6a2d6..040a10b543114 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java @@ -31,6 +31,7 @@ import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.function.CheckedSupplier; @@ -38,8 +39,10 @@ import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; import org.apache.flink.shaded.netty4.io.netty.channel.DefaultSelectStrategyFactory; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategy; import org.apache.flink.shaded.netty4.io.netty.channel.SelectStrategyFactory; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.assertj.core.api.InstanceOfAssertFactories; @@ -115,6 +118,20 @@ void testConnectionTimeout() throws Exception { } } + @Test + void testExternalEventGroup() throws Exception { + EventLoopGroup externalGroup = + new NioEventLoopGroup( + 1, new ExecutorThreadFactory("flink-rest-client-netty-external")); + + final RestClient restClient = + new RestClient( + new Configuration(), Executors.directExecutor(), null, -1, externalGroup); + restClient.closeAsync(); + + assertThat(externalGroup.isShuttingDown() || externalGroup.isShutdown()).isFalse(); + } + @Test void testInvalidVersionRejection() throws Exception { try (final RestClient restClient = From 0ebb1739f7c2d2c410f658090a25dacb6f0947bf Mon Sep 17 00:00:00 2001 From: chenyuzhi Date: Wed, 11 Dec 2024 20:00:10 +0800 Subject: [PATCH 2/4] [FLINK-36876] use boolean var --- .../src/main/java/org/apache/flink/runtime/rest/RestClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 1a5e4d5063341..ba41b96eefd48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -144,7 +144,7 @@ public class RestClient implements AutoCloseableAsync { ConcurrentHashMap.newKeySet(); private final List outboundChannelHandlerFactories; - private final Boolean useInternalEventLoopGroup; + private final boolean useInternalEventLoopGroup; /** * Creates a new RestClient for the provided root URL. If the protocol of the URL is "https", From a781d4a254aedb9fdbabe535a9a18f6b0c60a6b7 Mon Sep 17 00:00:00 2001 From: chenyuzhi Date: Tue, 17 Dec 2024 11:14:06 +0800 Subject: [PATCH 3/4] [FLINK-36876] add tag and check --- .../java/org/apache/flink/runtime/rest/RestClient.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index ba41b96eefd48..7da3afa63da46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -90,6 +90,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -197,7 +199,7 @@ private RestClient( String host, int port, SelectStrategyFactory selectStrategyFactory, - EventLoopGroup group) + @Nullable EventLoopGroup group) throws ConfigurationException { Preconditions.checkNotNull(configuration); this.executor = Preconditions.checkNotNull(executor); @@ -291,6 +293,9 @@ protected void initChannel(SocketChannel socketChannel) { selectStrategyFactory); useInternalEventLoopGroup = true; } else { + Preconditions.checkArgument( + !group.isShuttingDown() && !group.isShutdown(), + "provided eventLoopGroup is shut/shutting down"); useInternalEventLoopGroup = false; } From 0891fb729b1a5566ea2b7f9f81ca76c3958e111a Mon Sep 17 00:00:00 2001 From: chenyuzhi Date: Tue, 17 Dec 2024 11:18:20 +0800 Subject: [PATCH 4/4] [FLINK-36876] add tag --- .../org/apache/flink/client/program/rest/RestClusterClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 49818a7df0248..496612406b531 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -250,7 +250,7 @@ private RestClusterClient( T clusterId, WaitStrategy waitStrategy, ClientHighAvailabilityServicesFactory clientHAServicesFactory, - EventLoopGroup group) + @Nullable EventLoopGroup group) throws Exception { this.configuration = checkNotNull(configuration);