From 4da72997e956532a2a9d2b98c4c4659a5d26ea86 Mon Sep 17 00:00:00 2001 From: Patrick Doyle <810052+prdoyle@users.noreply.github.com> Date: Wed, 11 Dec 2024 09:38:40 -0500 Subject: [PATCH] Use single-task queues in ReservedClusterStateService (#118351) * Refactor: submitUpdateTask method * Test for one task per reserved state udate; currently fails * Separate queue per task * Spotless --- .../service/ReservedClusterStateService.java | 32 ++++---- .../ReservedClusterStateServiceTests.java | 77 +++++++++++++++++++ 2 files changed, 92 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java index 499b5e6515a8..248d37914cf3 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.BuildVersion; @@ -61,8 +60,6 @@ public class ReservedClusterStateService { final Map> handlers; final ClusterService clusterService; - private final MasterServiceTaskQueue updateTaskQueue; - private final MasterServiceTaskQueue errorTaskQueue; @SuppressWarnings("unchecked") private final ConstructingObjectParser stateChunkParser = new ConstructingObjectParser<>( @@ -77,6 +74,8 @@ public class ReservedClusterStateService { return new ReservedStateChunk(stateMap, (ReservedStateVersion) a[1]); } ); + private final ReservedStateUpdateTaskExecutor updateTaskExecutor; + private final ReservedStateErrorTaskExecutor errorTaskExecutor; /** * Controller class for saving and reserving {@link ClusterState}. @@ -89,12 +88,8 @@ public ReservedClusterStateService( List> handlerList ) { this.clusterService = clusterService; - this.updateTaskQueue = clusterService.createTaskQueue( - "reserved state update", - Priority.URGENT, - new ReservedStateUpdateTaskExecutor(rerouteService) - ); - this.errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, new ReservedStateErrorTaskExecutor()); + this.updateTaskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService); + this.errorTaskExecutor = new ReservedStateErrorTaskExecutor(); this.handlers = handlerList.stream().collect(Collectors.toMap(ReservedClusterStateHandler::name, Function.identity())); stateChunkParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> { if (handlers.containsKey(name) == false) { @@ -160,7 +155,7 @@ public void process( public void initEmpty(String namespace, ActionListener listener) { var missingVersion = new ReservedStateVersion(EMPTY_VERSION, BuildVersion.current()); var emptyState = new ReservedStateChunk(Map.of(), missingVersion); - updateTaskQueue.submitTask( + submitUpdateTask( "empty initial cluster state [" + namespace + "]", new ReservedStateUpdateTask( namespace, @@ -171,10 +166,8 @@ public void initEmpty(String namespace, ActionListener lis // error state should not be possible since there is no metadata being parsed or processed errorState -> { throw new AssertionError(); }, listener - ), - null + ) ); - } /** @@ -234,7 +227,7 @@ public void process( errorListener.accept(error); return; } - updateTaskQueue.submitTask( + submitUpdateTask( "reserved cluster state [" + namespace + "]", new ReservedStateUpdateTask( namespace, @@ -242,7 +235,7 @@ public void process( versionCheck, handlers, orderedHandlers, - ReservedClusterStateService.this::updateErrorState, + this::updateErrorState, new ActionListener<>() { @Override public void onResponse(ActionResponse.Empty empty) { @@ -261,8 +254,7 @@ public void onFailure(Exception e) { } } } - ), - null + ) ); } @@ -293,6 +285,11 @@ Exception checkAndReportError( return null; } + void submitUpdateTask(String source, ReservedStateUpdateTask task) { + var updateTaskQueue = clusterService.createTaskQueue("reserved state update", Priority.URGENT, updateTaskExecutor); + updateTaskQueue.submitTask(source, task, null); + } + // package private for testing void updateErrorState(ErrorState errorState) { // optimistic check here - the cluster state might change after this, so also need to re-check later @@ -305,6 +302,7 @@ void updateErrorState(ErrorState errorState) { } private void submitErrorUpdateTask(ErrorState errorState) { + var errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, errorTaskExecutor); errorTaskQueue.submitTask( "reserved cluster state update error for [ " + errorState.namespace() + "]", new ReservedStateErrorTask(errorState, new ActionListener<>() { diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index efe356606417..982f5c4a93ae 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.LongFunction; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; @@ -67,6 +68,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -332,6 +334,81 @@ public void testUpdateErrorState() { verifyNoMoreInteractions(errorQueue); } + @SuppressWarnings("unchecked") + public void testOneUpdateTaskPerQueue() { + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + MasterServiceTaskQueue queue1 = mockTaskQueue(); + MasterServiceTaskQueue queue2 = mockTaskQueue(); + MasterServiceTaskQueue unusedQueue = mockTaskQueue(); + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.createTaskQueue(anyString(), any(), any())) // For non-update tasks + .thenReturn(unusedQueue); + when(clusterService.createTaskQueue(ArgumentMatchers.contains("reserved state update"), any(), any())) + .thenReturn(queue1, queue2, unusedQueue); + when(clusterService.state()).thenReturn(state); + + ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of()); + LongFunction update = version -> { + ReservedStateUpdateTask task = spy( + new ReservedStateUpdateTask( + "test", + new ReservedStateChunk(Map.of(), new ReservedStateVersion(version, BuildVersion.current())), + ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + Map.of(), + Set.of(), + errorState -> {}, + ActionListener.noop() + ) + ); + doReturn(state).when(task).execute(any()); + return task; + }; + + service.submitUpdateTask("test", update.apply(2L)); + service.submitUpdateTask("test", update.apply(3L)); + + // One task to each queue + verify(queue1).submitTask(any(), any(), any()); + verify(queue2).submitTask(any(), any(), any()); + + // No additional unexpected tasks + verifyNoInteractions(unusedQueue); + } + + @SuppressWarnings("unchecked") + public void testOneErrorTaskPerQueue() { + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + MasterServiceTaskQueue queue1 = mockTaskQueue(); + MasterServiceTaskQueue queue2 = mockTaskQueue(); + MasterServiceTaskQueue unusedQueue = mockTaskQueue(); + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.createTaskQueue(anyString(), any(), any())) // For non-error tasks + .thenReturn(unusedQueue); + when(clusterService.createTaskQueue(ArgumentMatchers.contains("reserved state error"), any(), any())) + .thenReturn(queue1, queue2, unusedQueue); + when(clusterService.state()).thenReturn(state); + + ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of()); + LongFunction error = version -> new ErrorState( + "namespace", + version, + ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + List.of("error"), + ReservedStateErrorMetadata.ErrorKind.TRANSIENT + ); + service.updateErrorState(error.apply(2)); + service.updateErrorState(error.apply(3)); + + // One task to each queue + verify(queue1).submitTask(any(), any(), any()); + verify(queue2).submitTask(any(), any(), any()); + + // No additional unexpected tasks + verifyNoInteractions(unusedQueue); + } + public void testErrorStateTask() throws Exception { ClusterState state = ClusterState.builder(new ClusterName("test")).build();