diff --git a/server/src/main/java/org/elasticsearch/cluster/service/TransportVersionsFixupListener.java b/server/src/main/java/org/elasticsearch/cluster/service/TransportVersionsFixupListener.java index 7a83e26fcc8fb..711f0c84136e7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/TransportVersionsFixupListener.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/TransportVersionsFixupListener.java @@ -37,6 +37,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; import java.util.stream.Collectors; import static org.elasticsearch.cluster.ClusterState.INFERRED_TRANSPORT_VERSION; @@ -55,22 +56,30 @@ public class TransportVersionsFixupListener implements ClusterStateListener { private final MasterServiceTaskQueue taskQueue; private final ClusterAdminClient client; private final Scheduler scheduler; + private final Executor executor; private final Set pendingNodes = Collections.synchronizedSet(new HashSet<>()); - public TransportVersionsFixupListener(ClusterService service, ClusterAdminClient client, Scheduler scheduler) { + public TransportVersionsFixupListener(ClusterService service, ClusterAdminClient client, ThreadPool threadPool) { // there tends to be a lot of state operations on an upgrade - this one is not time-critical, // so use LOW priority. It just needs to be run at some point after upgrade. - this(service.createTaskQueue("fixup-transport-versions", Priority.LOW, new TransportVersionUpdater()), client, scheduler); + this( + service.createTaskQueue("fixup-transport-versions", Priority.LOW, new TransportVersionUpdater()), + client, + threadPool, + threadPool.executor(ThreadPool.Names.CLUSTER_COORDINATION) + ); } TransportVersionsFixupListener( MasterServiceTaskQueue taskQueue, ClusterAdminClient client, - Scheduler scheduler + Scheduler scheduler, + Executor executor ) { this.taskQueue = taskQueue; this.client = client; this.scheduler = scheduler; + this.executor = executor; } class NodeTransportVersionTask implements ClusterStateTaskListener { @@ -147,7 +156,7 @@ public void clusterChanged(ClusterChangedEvent event) { private void scheduleRetry(Set nodes, int thisRetryNum) { // just keep retrying until this succeeds logger.debug("Scheduling retry {} for nodes {}", thisRetryNum + 1, nodes); - scheduler.schedule(() -> updateTransportVersions(nodes, thisRetryNum + 1), RETRY_TIME, ThreadPool.Names.CLUSTER_COORDINATION); + scheduler.schedule(() -> updateTransportVersions(nodes, thisRetryNum + 1), RETRY_TIME, executor); } private void updateTransportVersions(Set nodes, int retryNum) { diff --git a/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java b/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java index 8dd8925cd86de..3d488b6d55bff 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java @@ -32,12 +32,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import static org.elasticsearch.test.LambdaMatchers.transformedMatch; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -113,7 +115,7 @@ public void testNothingFixedWhenNothingToInfer() { .compatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0))) .build(); - TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null); + TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null); listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE)); verify(taskQueue, never()).submitTask(anyString(), any(), any()); @@ -128,7 +130,7 @@ public void testNothingFixedWhenOnNextVersion() { .compatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION))) .build(); - TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null); + TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null); listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE)); verify(taskQueue, never()).submitTask(anyString(), any(), any()); @@ -145,7 +147,7 @@ public void testNothingFixedWhenOnPreviousVersion() { ) .build(); - TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null); + TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null); listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE)); verify(taskQueue, never()).submitTask(anyString(), any(), any()); @@ -169,7 +171,7 @@ public void testVersionsAreFixed() { ArgumentCaptor> action = ArgumentCaptor.forClass(ActionListener.class); ArgumentCaptor task = ArgumentCaptor.forClass(NodeTransportVersionTask.class); - TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null); + TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null); listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE)); verify(client).nodesInfo( argThat(transformedMatch(NodesInfoRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))), @@ -195,7 +197,7 @@ public void testConcurrentChangesDoNotOverlap() { ) .build(); - TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null); + TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null); listeners.clusterChanged(new ClusterChangedEvent("test", testState1, ClusterState.EMPTY_STATE)); verify(client).nodesInfo(argThat(transformedMatch(NodesInfoRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))), any()); // don't send back the response yet @@ -219,6 +221,7 @@ public void testFailedRequestsAreRetried() { MasterServiceTaskQueue taskQueue = newMockTaskQueue(); ClusterAdminClient client = mock(ClusterAdminClient.class); Scheduler scheduler = mock(Scheduler.class); + Executor executor = mock(Executor.class); ClusterState testState1 = ClusterState.builder(ClusterState.EMPTY_STATE) .nodes(node(NEXT_VERSION, NEXT_VERSION, NEXT_VERSION)) @@ -233,12 +236,12 @@ public void testFailedRequestsAreRetried() { ArgumentCaptor> action = ArgumentCaptor.forClass(ActionListener.class); ArgumentCaptor retry = ArgumentCaptor.forClass(Runnable.class); - TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, scheduler); + TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, scheduler, executor); listeners.clusterChanged(new ClusterChangedEvent("test", testState1, ClusterState.EMPTY_STATE)); verify(client, times(1)).nodesInfo(any(), action.capture()); // do response immediately action.getValue().onFailure(new RuntimeException("failure")); - verify(scheduler).schedule(retry.capture(), any(), anyString()); + verify(scheduler).schedule(retry.capture(), any(), same(executor)); // running retry should cause another check retry.getValue().run();