From c63621f60b9b1914547eb5789de0bdc7ff697742 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 8 Nov 2022 22:08:46 +0530 Subject: [PATCH] Control concurrency and handle retries Signed-off-by: Rishab Nahata --- .../awareness/put/DecommissionRequest.java | 34 +++++++++++- .../put/DecommissionRequestBuilder.java | 11 ++++ .../decommission/DecommissionController.java | 54 +++++++++++++++++++ .../decommission/DecommissionService.java | 48 ++++++++++++----- .../admin/cluster/RestDecommissionAction.java | 3 +- 5 files changed, 135 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 7ec2cea769069..755c82f5d6c9d 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -32,6 +32,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest() { + @Override + public void handleResponse(DecommissionResponse response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public DecommissionResponse read(StreamInput in) throws IOException { + return new DecommissionResponse(in); + } + } + ); + } + /** * This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor} * Once the tasks are submitted, it waits for an expected cluster state to guarantee diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 7721d5202fe52..2e69d1948049f 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -74,6 +74,7 @@ public class DecommissionService { private final TransportService transportService; private final ThreadPool threadPool; private final DecommissionController decommissionController; + private final long startTime; private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; private volatile int maxVotingConfigExclusions; @@ -91,6 +92,7 @@ public DecommissionService( this.transportService = transportService; this.threadPool = threadPool; this.decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool); + this.startTime = threadPool.relativeTimeInMillis(); this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); @@ -127,7 +129,7 @@ private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) { * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration * - * @param decommissionRequest decommission request Object + * @param decommissionRequest request for decommission action * @param listener register decommission listener */ public void startDecommissionAction( @@ -146,6 +148,7 @@ public ClusterState execute(ClusterState currentState) { DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); // check that request is eligible to proceed and attribute is weighed away ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); + ensureEligibleRetry(decommissionRequest, decommissionAttributeMetadata); ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); @@ -238,18 +241,22 @@ public void onNewClusterState(ClusterState state) { drainNodesWithDecommissionedAttribute(decommissionRequest); } } else { - // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not leader - // this will ensures that request is retried until cluster manager times out - logger.info( - "local node is not eligible to process the request, " - + "throwing NotClusterManagerException to attempt a retry on an eligible node" - ); - listener.onFailure( - new NotClusterManagerException( - "node [" - + transportService.getLocalNode().toString() - + "] not eligible to execute decommission request. Will retry until timeout." - ) + // since the local node is no longer cluster manager which could've happened due to leader abdication, + // hence retrying the decommission action until it times out + logger.info("local node is not eligible to process the request, retrying the transport action until it times out"); + decommissionController.retryDecommissionAction( + decommissionRequest, + startTime, + ActionListener.delegateResponse(listener, (delegatedListener, t) -> { + logger.debug( + () -> new ParameterizedMessage( + "failed to retry decommission action for attribute [{}]", + decommissionRequest.getDecommissionAttribute() + ), + t + ); + delegatedListener.onFailure(t); + }) ); } } @@ -498,6 +505,21 @@ private static void ensureEligibleRequest( } } + private static void ensureEligibleRetry( + DecommissionRequest decommissionRequest, + DecommissionAttributeMetadata decommissionAttributeMetadata + ) { + if (decommissionAttributeMetadata != null) { + if (decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT) + && decommissionRequest.retryOnClusterManagerChange() == false) { + throw new DecommissioningFailedException( + decommissionRequest.getDecommissionAttribute(), + "concurrent request received to decommission attribute" + ); + } + } + } + private ActionListener statusUpdateListener() { return new ActionListener<>() { @Override diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index c041974165eb6..f4fdb6a3aa3fc 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -58,6 +58,7 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { TimeValue delayTimeout = request.paramAsTime("delay_timeout", DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT); decommissionRequest.setDelayTimeout(delayTimeout); } - return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)); + return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)) + .setRetryOnClusterManagerChange(false); } }