Skip to content

Commit

Permalink
Migrate TransportVersionsFixupListener away from name-based executor (e…
Browse files Browse the repository at this point in the history
…lastic#99305)

Replaces the executor name with a proper executor.

Relates elastic#99027 and friends
  • Loading branch information
DaveCTurner authored Sep 7, 2023
1 parent d4fa4d4 commit be304f0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.ClusterState.INFERRED_TRANSPORT_VERSION;
Expand All @@ -55,22 +56,30 @@ public class TransportVersionsFixupListener implements ClusterStateListener {
private final MasterServiceTaskQueue<NodeTransportVersionTask> taskQueue;
private final ClusterAdminClient client;
private final Scheduler scheduler;
private final Executor executor;
private final Set<String> pendingNodes = Collections.synchronizedSet(new HashSet<>());

public TransportVersionsFixupListener(ClusterService service, ClusterAdminClient client, Scheduler scheduler) {
public TransportVersionsFixupListener(ClusterService service, ClusterAdminClient client, ThreadPool threadPool) {
// there tends to be a lot of state operations on an upgrade - this one is not time-critical,
// so use LOW priority. It just needs to be run at some point after upgrade.
this(service.createTaskQueue("fixup-transport-versions", Priority.LOW, new TransportVersionUpdater()), client, scheduler);
this(
service.createTaskQueue("fixup-transport-versions", Priority.LOW, new TransportVersionUpdater()),
client,
threadPool,
threadPool.executor(ThreadPool.Names.CLUSTER_COORDINATION)
);
}

TransportVersionsFixupListener(
MasterServiceTaskQueue<NodeTransportVersionTask> taskQueue,
ClusterAdminClient client,
Scheduler scheduler
Scheduler scheduler,
Executor executor
) {
this.taskQueue = taskQueue;
this.client = client;
this.scheduler = scheduler;
this.executor = executor;
}

class NodeTransportVersionTask implements ClusterStateTaskListener {
Expand Down Expand Up @@ -147,7 +156,7 @@ public void clusterChanged(ClusterChangedEvent event) {
private void scheduleRetry(Set<String> nodes, int thisRetryNum) {
// just keep retrying until this succeeds
logger.debug("Scheduling retry {} for nodes {}", thisRetryNum + 1, nodes);
scheduler.schedule(() -> updateTransportVersions(nodes, thisRetryNum + 1), RETRY_TIME, ThreadPool.Names.CLUSTER_COORDINATION);
scheduler.schedule(() -> updateTransportVersions(nodes, thisRetryNum + 1), RETRY_TIME, executor);
}

private void updateTransportVersions(Set<String> nodes, int retryNum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

import static org.elasticsearch.test.LambdaMatchers.transformedMatch;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -113,7 +115,7 @@ public void testNothingFixedWhenNothingToInfer() {
.compatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0)))
.build();

TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null);
TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE));

verify(taskQueue, never()).submitTask(anyString(), any(), any());
Expand All @@ -128,7 +130,7 @@ public void testNothingFixedWhenOnNextVersion() {
.compatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION)))
.build();

TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null);
TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE));

verify(taskQueue, never()).submitTask(anyString(), any(), any());
Expand All @@ -145,7 +147,7 @@ public void testNothingFixedWhenOnPreviousVersion() {
)
.build();

TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null);
TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE));

verify(taskQueue, never()).submitTask(anyString(), any(), any());
Expand All @@ -169,7 +171,7 @@ public void testVersionsAreFixed() {
ArgumentCaptor<ActionListener<NodesInfoResponse>> action = ArgumentCaptor.forClass(ActionListener.class);
ArgumentCaptor<NodeTransportVersionTask> task = ArgumentCaptor.forClass(NodeTransportVersionTask.class);

TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null);
TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
listeners.clusterChanged(new ClusterChangedEvent("test", testState, ClusterState.EMPTY_STATE));
verify(client).nodesInfo(
argThat(transformedMatch(NodesInfoRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))),
Expand All @@ -195,7 +197,7 @@ public void testConcurrentChangesDoNotOverlap() {
)
.build();

TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null);
TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
listeners.clusterChanged(new ClusterChangedEvent("test", testState1, ClusterState.EMPTY_STATE));
verify(client).nodesInfo(argThat(transformedMatch(NodesInfoRequest::nodesIds, arrayContainingInAnyOrder("node1", "node2"))), any());
// don't send back the response yet
Expand All @@ -219,6 +221,7 @@ public void testFailedRequestsAreRetried() {
MasterServiceTaskQueue<NodeTransportVersionTask> taskQueue = newMockTaskQueue();
ClusterAdminClient client = mock(ClusterAdminClient.class);
Scheduler scheduler = mock(Scheduler.class);
Executor executor = mock(Executor.class);

ClusterState testState1 = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(NEXT_VERSION, NEXT_VERSION, NEXT_VERSION))
Expand All @@ -233,12 +236,12 @@ public void testFailedRequestsAreRetried() {
ArgumentCaptor<ActionListener<NodesInfoResponse>> action = ArgumentCaptor.forClass(ActionListener.class);
ArgumentCaptor<Runnable> retry = ArgumentCaptor.forClass(Runnable.class);

TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, scheduler);
TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, scheduler, executor);
listeners.clusterChanged(new ClusterChangedEvent("test", testState1, ClusterState.EMPTY_STATE));
verify(client, times(1)).nodesInfo(any(), action.capture());
// do response immediately
action.getValue().onFailure(new RuntimeException("failure"));
verify(scheduler).schedule(retry.capture(), any(), anyString());
verify(scheduler).schedule(retry.capture(), any(), same(executor));

// running retry should cause another check
retry.getValue().run();
Expand Down

0 comments on commit be304f0

Please sign in to comment.