Skip to content

Commit

Permalink
Test for one task per reserved state udate; currently fails
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle committed Dec 10, 2024
1 parent 3ddabd9 commit 78221aa
Showing 1 changed file with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongFunction;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
Expand All @@ -67,6 +68,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -332,6 +334,82 @@ public void testUpdateErrorState() {
verifyNoMoreInteractions(errorQueue);
}

@SuppressWarnings("unchecked")
public void testOneUpdateTaskPerQueue() {
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
MasterServiceTaskQueue<ReservedStateErrorTask> queue1 = mockTaskQueue();
MasterServiceTaskQueue<ReservedStateErrorTask> queue2 = mockTaskQueue();
MasterServiceTaskQueue<ReservedStateErrorTask> unusedQueue = mockTaskQueue();

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.<ReservedStateErrorTask>createTaskQueue(anyString(), any(), any())) // For non-update tasks
.thenReturn(unusedQueue);
when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state update"), any(), any()))
.thenReturn(queue1, queue2, unusedQueue);
when(clusterService.state()).thenReturn(state);

ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of());
LongFunction<ReservedStateUpdateTask> update = version -> {
ReservedStateUpdateTask task = spy(
new ReservedStateUpdateTask(
"test",
new ReservedStateChunk(Map.of(), new ReservedStateVersion(version, BuildVersion.current())),
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
Map.of(),
Set.of(),
errorState -> {
},
ActionListener.noop()
)
);
doReturn(state).when(task).execute(any());
return task;
};

service.submitUpdateTask("test", update.apply(2L));
service.submitUpdateTask("test", update.apply(3L));

// One task to each queue
verify(queue1).submitTask(any(), any(), any());
verify(queue2).submitTask(any(), any(), any());

// No additional unexpected tasks
verifyNoInteractions(unusedQueue);
}

@SuppressWarnings("unchecked")
public void testOneErrorTaskPerQueue() {
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
MasterServiceTaskQueue<ReservedStateErrorTask> queue1 = mockTaskQueue();
MasterServiceTaskQueue<ReservedStateErrorTask> queue2 = mockTaskQueue();
MasterServiceTaskQueue<ReservedStateErrorTask> unusedQueue = mockTaskQueue();

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.<ReservedStateErrorTask>createTaskQueue(anyString(), any(), any())) // For non-error tasks
.thenReturn(unusedQueue);
when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state error"), any(), any()))
.thenReturn(queue1, queue2, unusedQueue);
when(clusterService.state()).thenReturn(state);

ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of());
LongFunction<ErrorState> error = version -> new ErrorState(
"namespace",
version,
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
List.of("error"),
ReservedStateErrorMetadata.ErrorKind.TRANSIENT
);
service.updateErrorState(error.apply(2));
service.updateErrorState(error.apply(3));

// One task to each queue
verify(queue1).submitTask(any(), any(), any());
verify(queue2).submitTask(any(), any(), any());

// No additional unexpected tasks
verifyNoInteractions(unusedQueue);
}

public void testErrorStateTask() throws Exception {
ClusterState state = ClusterState.builder(new ClusterName("test")).build();

Expand Down

0 comments on commit 78221aa

Please sign in to comment.