From 1a5fad8f7c7c278618ec55dd5c4ceba80bb917d1 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 15 Mar 2022 13:55:07 +0100 Subject: [PATCH 1/2] Shutdown server threads on server close This fix threads leak in tests. --- .../main/java/io/trino/execution/ClusterSizeMonitor.java | 1 + .../main/java/io/trino/metadata/DiscoveryNodeManager.java | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/execution/ClusterSizeMonitor.java b/core/trino-main/src/main/java/io/trino/execution/ClusterSizeMonitor.java index d2a03bb7bcd9..086c418bcfa1 100644 --- a/core/trino-main/src/main/java/io/trino/execution/ClusterSizeMonitor.java +++ b/core/trino-main/src/main/java/io/trino/execution/ClusterSizeMonitor.java @@ -87,6 +87,7 @@ public void start() public void stop() { nodeManager.removeNodeChangeListener(listener); + executor.shutdown(); } /** diff --git a/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java b/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java index c228a9f6515a..1a20c3dd8e07 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java @@ -154,6 +154,13 @@ public void startPollingNodeStates() pollWorkers(); } + @PreDestroy + public void destroy() + { + nodeStateUpdateExecutor.shutdown(); + nodeStateEventExecutor.shutdown(); + } + private void pollWorkers() { AllNodes allNodes = getAllNodes(); From ab6c0cda0da382d62b797385e3bf2c3d25dc0e7f Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 15 Mar 2022 13:56:15 +0100 Subject: [PATCH 2/2] Use daemon threads for internal thread pools This is the standard practice. --- .../main/java/io/trino/execution/ClusterSizeMonitor.java | 4 ++-- .../main/java/io/trino/metadata/DiscoveryNodeManager.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/ClusterSizeMonitor.java b/core/trino-main/src/main/java/io/trino/execution/ClusterSizeMonitor.java index 086c418bcfa1..f66b56760733 100644 --- a/core/trino-main/src/main/java/io/trino/execution/ClusterSizeMonitor.java +++ b/core/trino-main/src/main/java/io/trino/execution/ClusterSizeMonitor.java @@ -37,7 +37,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; -import static io.airlift.concurrent.Threads.threadsNamed; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; import static java.lang.String.format; import static java.util.Comparator.comparing; @@ -73,7 +73,7 @@ public ClusterSizeMonitor( { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.includeCoordinator = includeCoordinator; - this.executor = newSingleThreadScheduledExecutor(threadsNamed("node-monitor-%s")); + this.executor = newSingleThreadScheduledExecutor(daemonThreadsNamed("node-monitor-%s")); } @PostConstruct diff --git a/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java b/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java index 1a20c3dd8e07..0a89ec60e588 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java @@ -54,7 +54,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Sets.difference; -import static io.airlift.concurrent.Threads.threadsNamed; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; import static io.trino.metadata.NodeState.ACTIVE; import static io.trino.metadata.NodeState.INACTIVE; @@ -106,8 +106,8 @@ public DiscoveryNodeManager( this.failureDetector = requireNonNull(failureDetector, "failureDetector is null"); this.expectedNodeVersion = requireNonNull(expectedNodeVersion, "expectedNodeVersion is null"); this.httpClient = requireNonNull(httpClient, "httpClient is null"); - this.nodeStateUpdateExecutor = newSingleThreadScheduledExecutor(threadsNamed("node-state-poller-%s")); - this.nodeStateEventExecutor = newCachedThreadPool(threadsNamed("node-state-events-%s")); + this.nodeStateUpdateExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("node-state-poller-%s")); + this.nodeStateEventExecutor = newCachedThreadPool(daemonThreadsNamed("node-state-events-%s")); this.httpsRequired = internalCommunicationConfig.isHttpsRequired(); this.currentNode = findCurrentNode(