Skip to content

Commit

Permalink
Separate queue per task
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle committed Dec 10, 2024
1 parent 78221aa commit aa9dfd6
Showing 1 changed file with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.BuildVersion;
Expand Down Expand Up @@ -61,8 +60,6 @@ public class ReservedClusterStateService {

final Map<String, ReservedClusterStateHandler<?>> handlers;
final ClusterService clusterService;
private final MasterServiceTaskQueue<ReservedStateUpdateTask> updateTaskQueue;
private final MasterServiceTaskQueue<ReservedStateErrorTask> errorTaskQueue;

@SuppressWarnings("unchecked")
private final ConstructingObjectParser<ReservedStateChunk, Void> stateChunkParser = new ConstructingObjectParser<>(
Expand All @@ -77,6 +74,8 @@ public class ReservedClusterStateService {
return new ReservedStateChunk(stateMap, (ReservedStateVersion) a[1]);
}
);
private final ReservedStateUpdateTaskExecutor updateTaskExecutor;
private final ReservedStateErrorTaskExecutor errorTaskExecutor;

/**
* Controller class for saving and reserving {@link ClusterState}.
Expand All @@ -89,12 +88,8 @@ public ReservedClusterStateService(
List<ReservedClusterStateHandler<?>> handlerList
) {
this.clusterService = clusterService;
this.updateTaskQueue = clusterService.createTaskQueue(
"reserved state update",
Priority.URGENT,
new ReservedStateUpdateTaskExecutor(rerouteService)
);
this.errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, new ReservedStateErrorTaskExecutor());
this.updateTaskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService);
this.errorTaskExecutor = new ReservedStateErrorTaskExecutor();
this.handlers = handlerList.stream().collect(Collectors.toMap(ReservedClusterStateHandler::name, Function.identity()));
stateChunkParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> {
if (handlers.containsKey(name) == false) {
Expand Down Expand Up @@ -291,6 +286,11 @@ Exception checkAndReportError(
}

void submitUpdateTask(String source, ReservedStateUpdateTask task) {
var updateTaskQueue = clusterService.createTaskQueue(
"reserved state update",
Priority.URGENT,
updateTaskExecutor
);
updateTaskQueue.submitTask(source, task, null);
}

Expand All @@ -306,6 +306,7 @@ void updateErrorState(ErrorState errorState) {
}

private void submitErrorUpdateTask(ErrorState errorState) {
var errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, errorTaskExecutor);
errorTaskQueue.submitTask(
"reserved cluster state update error for [ " + errorState.namespace() + "]",
new ReservedStateErrorTask(errorState, new ActionListener<>() {
Expand Down

0 comments on commit aa9dfd6

Please sign in to comment.