diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index efe3566064170..9b8e0feb1b56c 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.LongFunction; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; @@ -67,6 +68,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -332,6 +334,82 @@ public void testUpdateErrorState() { verifyNoMoreInteractions(errorQueue); } + @SuppressWarnings("unchecked") + public void testOneUpdateTaskPerQueue() { + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + MasterServiceTaskQueue queue1 = mockTaskQueue(); + MasterServiceTaskQueue queue2 = mockTaskQueue(); + MasterServiceTaskQueue unusedQueue = mockTaskQueue(); + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.createTaskQueue(anyString(), any(), any())) // For non-update tasks + .thenReturn(unusedQueue); + when(clusterService.createTaskQueue(ArgumentMatchers.contains("reserved state update"), any(), any())) + .thenReturn(queue1, queue2, unusedQueue); + when(clusterService.state()).thenReturn(state); + + ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of()); + LongFunction update = version -> { + ReservedStateUpdateTask task = spy( + new ReservedStateUpdateTask( + "test", + new ReservedStateChunk(Map.of(), new ReservedStateVersion(version, BuildVersion.current())), + ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + Map.of(), + Set.of(), + errorState -> { + }, + ActionListener.noop() + ) + ); + doReturn(state).when(task).execute(any()); + return task; + }; + + service.submitUpdateTask("test", update.apply(2L)); + service.submitUpdateTask("test", update.apply(3L)); + + // One task to each queue + verify(queue1).submitTask(any(), any(), any()); + verify(queue2).submitTask(any(), any(), any()); + + // No additional unexpected tasks + verifyNoInteractions(unusedQueue); + } + + @SuppressWarnings("unchecked") + public void testOneErrorTaskPerQueue() { + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + MasterServiceTaskQueue queue1 = mockTaskQueue(); + MasterServiceTaskQueue queue2 = mockTaskQueue(); + MasterServiceTaskQueue unusedQueue = mockTaskQueue(); + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.createTaskQueue(anyString(), any(), any())) // For non-error tasks + .thenReturn(unusedQueue); + when(clusterService.createTaskQueue(ArgumentMatchers.contains("reserved state error"), any(), any())) + .thenReturn(queue1, queue2, unusedQueue); + when(clusterService.state()).thenReturn(state); + + ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of()); + LongFunction error = version -> new ErrorState( + "namespace", + version, + ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + List.of("error"), + ReservedStateErrorMetadata.ErrorKind.TRANSIENT + ); + service.updateErrorState(error.apply(2)); + service.updateErrorState(error.apply(3)); + + // One task to each queue + verify(queue1).submitTask(any(), any(), any()); + verify(queue2).submitTask(any(), any(), any()); + + // No additional unexpected tasks + verifyNoInteractions(unusedQueue); + } + public void testErrorStateTask() throws Exception { ClusterState state = ClusterState.builder(new ClusterName("test")).build();