Skip to content

Commit

Permalink
Use single-task queues in ReservedClusterStateService (elastic#118351)
Browse files Browse the repository at this point in the history
* Refactor: submitUpdateTask method

* Test for one task per reserved state udate; currently fails

* Separate queue per task

* Spotless
  • Loading branch information
prdoyle authored Dec 11, 2024
1 parent d745315 commit 4da7299
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.BuildVersion;
Expand Down Expand Up @@ -61,8 +60,6 @@ public class ReservedClusterStateService {

final Map<String, ReservedClusterStateHandler<?>> handlers;
final ClusterService clusterService;
private final MasterServiceTaskQueue<ReservedStateUpdateTask> updateTaskQueue;
private final MasterServiceTaskQueue<ReservedStateErrorTask> errorTaskQueue;

@SuppressWarnings("unchecked")
private final ConstructingObjectParser<ReservedStateChunk, Void> stateChunkParser = new ConstructingObjectParser<>(
Expand All @@ -77,6 +74,8 @@ public class ReservedClusterStateService {
return new ReservedStateChunk(stateMap, (ReservedStateVersion) a[1]);
}
);
private final ReservedStateUpdateTaskExecutor updateTaskExecutor;
private final ReservedStateErrorTaskExecutor errorTaskExecutor;

/**
* Controller class for saving and reserving {@link ClusterState}.
Expand All @@ -89,12 +88,8 @@ public ReservedClusterStateService(
List<ReservedClusterStateHandler<?>> handlerList
) {
this.clusterService = clusterService;
this.updateTaskQueue = clusterService.createTaskQueue(
"reserved state update",
Priority.URGENT,
new ReservedStateUpdateTaskExecutor(rerouteService)
);
this.errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, new ReservedStateErrorTaskExecutor());
this.updateTaskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService);
this.errorTaskExecutor = new ReservedStateErrorTaskExecutor();
this.handlers = handlerList.stream().collect(Collectors.toMap(ReservedClusterStateHandler::name, Function.identity()));
stateChunkParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> {
if (handlers.containsKey(name) == false) {
Expand Down Expand Up @@ -160,7 +155,7 @@ public void process(
public void initEmpty(String namespace, ActionListener<ActionResponse.Empty> listener) {
var missingVersion = new ReservedStateVersion(EMPTY_VERSION, BuildVersion.current());
var emptyState = new ReservedStateChunk(Map.of(), missingVersion);
updateTaskQueue.submitTask(
submitUpdateTask(
"empty initial cluster state [" + namespace + "]",
new ReservedStateUpdateTask(
namespace,
Expand All @@ -171,10 +166,8 @@ public void initEmpty(String namespace, ActionListener<ActionResponse.Empty> lis
// error state should not be possible since there is no metadata being parsed or processed
errorState -> { throw new AssertionError(); },
listener
),
null
)
);

}

/**
Expand Down Expand Up @@ -234,15 +227,15 @@ public void process(
errorListener.accept(error);
return;
}
updateTaskQueue.submitTask(
submitUpdateTask(
"reserved cluster state [" + namespace + "]",
new ReservedStateUpdateTask(
namespace,
reservedStateChunk,
versionCheck,
handlers,
orderedHandlers,
ReservedClusterStateService.this::updateErrorState,
this::updateErrorState,
new ActionListener<>() {
@Override
public void onResponse(ActionResponse.Empty empty) {
Expand All @@ -261,8 +254,7 @@ public void onFailure(Exception e) {
}
}
}
),
null
)
);
}

Expand Down Expand Up @@ -293,6 +285,11 @@ Exception checkAndReportError(
return null;
}

void submitUpdateTask(String source, ReservedStateUpdateTask task) {
var updateTaskQueue = clusterService.createTaskQueue("reserved state update", Priority.URGENT, updateTaskExecutor);
updateTaskQueue.submitTask(source, task, null);
}

// package private for testing
void updateErrorState(ErrorState errorState) {
// optimistic check here - the cluster state might change after this, so also need to re-check later
Expand All @@ -305,6 +302,7 @@ void updateErrorState(ErrorState errorState) {
}

private void submitErrorUpdateTask(ErrorState errorState) {
var errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, errorTaskExecutor);
errorTaskQueue.submitTask(
"reserved cluster state update error for [ " + errorState.namespace() + "]",
new ReservedStateErrorTask(errorState, new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongFunction;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
Expand All @@ -67,6 +68,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -332,6 +334,81 @@ public void testUpdateErrorState() {
verifyNoMoreInteractions(errorQueue);
}

@SuppressWarnings("unchecked")
public void testOneUpdateTaskPerQueue() {
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
MasterServiceTaskQueue<ReservedStateErrorTask> queue1 = mockTaskQueue();
MasterServiceTaskQueue<ReservedStateErrorTask> queue2 = mockTaskQueue();
MasterServiceTaskQueue<ReservedStateErrorTask> unusedQueue = mockTaskQueue();

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.<ReservedStateErrorTask>createTaskQueue(anyString(), any(), any())) // For non-update tasks
.thenReturn(unusedQueue);
when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state update"), any(), any()))
.thenReturn(queue1, queue2, unusedQueue);
when(clusterService.state()).thenReturn(state);

ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of());
LongFunction<ReservedStateUpdateTask> update = version -> {
ReservedStateUpdateTask task = spy(
new ReservedStateUpdateTask(
"test",
new ReservedStateChunk(Map.of(), new ReservedStateVersion(version, BuildVersion.current())),
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
Map.of(),
Set.of(),
errorState -> {},
ActionListener.noop()
)
);
doReturn(state).when(task).execute(any());
return task;
};

service.submitUpdateTask("test", update.apply(2L));
service.submitUpdateTask("test", update.apply(3L));

// One task to each queue
verify(queue1).submitTask(any(), any(), any());
verify(queue2).submitTask(any(), any(), any());

// No additional unexpected tasks
verifyNoInteractions(unusedQueue);
}

@SuppressWarnings("unchecked")
public void testOneErrorTaskPerQueue() {
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
MasterServiceTaskQueue<ReservedStateErrorTask> queue1 = mockTaskQueue();
MasterServiceTaskQueue<ReservedStateErrorTask> queue2 = mockTaskQueue();
MasterServiceTaskQueue<ReservedStateErrorTask> unusedQueue = mockTaskQueue();

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.<ReservedStateErrorTask>createTaskQueue(anyString(), any(), any())) // For non-error tasks
.thenReturn(unusedQueue);
when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state error"), any(), any()))
.thenReturn(queue1, queue2, unusedQueue);
when(clusterService.state()).thenReturn(state);

ReservedClusterStateService service = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of());
LongFunction<ErrorState> error = version -> new ErrorState(
"namespace",
version,
ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
List.of("error"),
ReservedStateErrorMetadata.ErrorKind.TRANSIENT
);
service.updateErrorState(error.apply(2));
service.updateErrorState(error.apply(3));

// One task to each queue
verify(queue1).submitTask(any(), any(), any());
verify(queue2).submitTask(any(), any(), any());

// No additional unexpected tasks
verifyNoInteractions(unusedQueue);
}

public void testErrorStateTask() throws Exception {
ClusterState state = ClusterState.builder(new ClusterName("test")).build();

Expand Down

0 comments on commit 4da7299

Please sign in to comment.