From 4b1dacb5f5e354b2c5a6ae2145a4a84e63e3ac1c Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Tue, 10 Dec 2024 15:31:07 -0500 Subject: [PATCH] Separate queue per task --- .../service/ReservedClusterStateService.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java index 5e191067e817e..1f900a563b636 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.BuildVersion; @@ -61,8 +60,6 @@ public class ReservedClusterStateService { final Map> handlers; final ClusterService clusterService; - private final MasterServiceTaskQueue updateTaskQueue; - private final MasterServiceTaskQueue errorTaskQueue; @SuppressWarnings("unchecked") private final ConstructingObjectParser stateChunkParser = new ConstructingObjectParser<>( @@ -77,6 +74,8 @@ public class ReservedClusterStateService { return new ReservedStateChunk(stateMap, (ReservedStateVersion) a[1]); } ); + private final ReservedStateUpdateTaskExecutor updateTaskExecutor; + private final ReservedStateErrorTaskExecutor errorTaskExecutor; /** * Controller class for saving and reserving {@link ClusterState}. @@ -89,12 +88,8 @@ public ReservedClusterStateService( List> handlerList ) { this.clusterService = clusterService; - this.updateTaskQueue = clusterService.createTaskQueue( - "reserved state update", - Priority.URGENT, - new ReservedStateUpdateTaskExecutor(rerouteService) - ); - this.errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, new ReservedStateErrorTaskExecutor()); + this.updateTaskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService); + this.errorTaskExecutor = new ReservedStateErrorTaskExecutor(); this.handlers = handlerList.stream().collect(Collectors.toMap(ReservedClusterStateHandler::name, Function.identity())); stateChunkParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> { if (handlers.containsKey(name) == false) { @@ -291,6 +286,11 @@ Exception checkAndReportError( } void submitUpdateTask(String source, ReservedStateUpdateTask task) { + var updateTaskQueue = clusterService.createTaskQueue( + "reserved state update", + Priority.URGENT, + updateTaskExecutor + ); updateTaskQueue.submitTask(source, task, null); } @@ -306,6 +306,7 @@ void updateErrorState(ErrorState errorState) { } private void submitErrorUpdateTask(ErrorState errorState) { + var errorTaskQueue = clusterService.createTaskQueue("reserved state error", Priority.URGENT, errorTaskExecutor); errorTaskQueue.submitTask( "reserved cluster state update error for [ " + errorState.namespace() + "]", new ReservedStateErrorTask(errorState, new ActionListener<>() {