Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control concurrency and handle retries #87

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionR
public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;
private boolean retryOnClusterManagerChange;

private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

Expand All @@ -41,14 +42,20 @@ public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionR
public DecommissionRequest() {}

public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
this(decommissionAttribute, false);
}

public DecommissionRequest(DecommissionAttribute decommissionAttribute, boolean retryOnClusterManagerChange) {
this.decommissionAttribute = decommissionAttribute;
this.retryOnClusterManagerChange = retryOnClusterManagerChange;
}

public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
this.retryOnClusterManagerChange = in.readBoolean();
}

@Override
Expand All @@ -57,6 +64,7 @@ public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
out.writeBoolean(retryOnClusterManagerChange);
}

/**
Expand Down Expand Up @@ -96,6 +104,24 @@ public boolean isNoDelay() {
return noDelay;
}

/**
* Sets retryOnClusterManagerChange for decommission request
*
* @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change
* @return this request
*/
public DecommissionRequest setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) {
this.retryOnClusterManagerChange = retryOnClusterManagerChange;
return this;
}

/**
* @return Returns whether decommission is retry eligible on cluster manager change
*/
public boolean retryOnClusterManagerChange() {
return this.retryOnClusterManagerChange;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -122,6 +148,12 @@ public ActionRequestValidationException validate() {

@Override
public String toString() {
return "DecommissionRequest{" + "decommissionAttribute=" + decommissionAttribute + '}';
return "DecommissionRequest{" +
"decommissionAttribute=" + decommissionAttribute +
", retryOnClusterManagerChange=" + retryOnClusterManagerChange +
", delayTimeout=" + delayTimeout +
", noDelay=" + noDelay +
", clusterManagerNodeTimeout=" + clusterManagerNodeTimeout +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,15 @@ public DecommissionRequestBuilder setNoDelay(boolean noDelay) {
request.setNoDelay(noDelay);
return this;
}

/**
* Sets retryOnClusterManagerChange for decommission request
*
* @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change
* @return current object
*/
public DecommissionRequestBuilder setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) {
request.setRetryOnClusterManagerChange(retryOnClusterManagerChange);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
Expand Down Expand Up @@ -74,6 +77,57 @@ public class DecommissionController {
this.threadPool = threadPool;
}

/**
* This method sends a transport call to retry decommission action, given that -
* 1. cluster_manager_node_timeout is not timed out
* 2. And executed when there was a cluster manager change
*
* @param decommissionRequest decommission request object
* @param startTime start time of previous request
* @param listener callback for the retry action
*/
public void retryDecommissionAction(
DecommissionRequest decommissionRequest,
long startTime,
ActionListener<DecommissionResponse> listener
) {
final long remainingTimeoutMS = decommissionRequest.clusterManagerNodeTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime);
if (remainingTimeoutMS <= 0) {
String errorMsg = "cluster manager node timed out before retrying [" + DecommissionAction.NAME + "] for attribute [" + decommissionRequest.getDecommissionAttribute() + "] after cluster manager change";
logger.debug(errorMsg);
listener.onFailure(new OpenSearchTimeoutException(errorMsg));
return;
}
decommissionRequest.setRetryOnClusterManagerChange(true);
decommissionRequest.clusterManagerNodeTimeout(TimeValue.timeValueMillis(remainingTimeoutMS));
transportService.sendRequest(
transportService.getLocalNode(),
DecommissionAction.NAME,
decommissionRequest,
new TransportResponseHandler<DecommissionResponse>() {
@Override
public void handleResponse(DecommissionResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public DecommissionResponse read(StreamInput in) throws IOException {
return new DecommissionResponse(in);
}
}
);
}

/**
* This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor}
* Once the tasks are submitted, it waits for an expected cluster state to guarantee
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class DecommissionService {
private final TransportService transportService;
private final ThreadPool threadPool;
private final DecommissionController decommissionController;
private final long startTime;
private volatile List<String> awarenessAttributes;
private volatile Map<String, List<String>> forcedAwarenessAttributes;
private volatile int maxVotingConfigExclusions;
Expand All @@ -91,6 +92,7 @@ public DecommissionService(
this.transportService = transportService;
this.threadPool = threadPool;
this.decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool);
this.startTime = threadPool.relativeTimeInMillis();
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);

Expand Down Expand Up @@ -127,7 +129,7 @@ private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) {
* Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT}
* Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration
*
* @param decommissionRequest decommission request Object
* @param decommissionRequest request for decommission action
* @param listener register decommission listener
*/
public void startDecommissionAction(
Expand All @@ -146,6 +148,7 @@ public ClusterState execute(ClusterState currentState) {
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
// check that request is eligible to proceed and attribute is weighed away
ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute);
ensureEligibleRetry(decommissionRequest, decommissionAttributeMetadata);
ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute);

ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute);
Expand Down Expand Up @@ -238,18 +241,22 @@ public void onNewClusterState(ClusterState state) {
drainNodesWithDecommissionedAttribute(decommissionRequest);
}
} else {
// explicitly calling listener.onFailure with NotClusterManagerException as the local node is not leader
// this will ensures that request is retried until cluster manager times out
logger.info(
"local node is not eligible to process the request, "
+ "throwing NotClusterManagerException to attempt a retry on an eligible node"
);
listener.onFailure(
new NotClusterManagerException(
"node ["
+ transportService.getLocalNode().toString()
+ "] not eligible to execute decommission request. Will retry until timeout."
)
// since the local node is no longer cluster manager which could've happened due to leader abdication,
// hence retrying the decommission action until it times out
logger.info("local node is not eligible to process the request, retrying the transport action until it times out");
decommissionController.retryDecommissionAction(
decommissionRequest,
startTime,
ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
logger.debug(
() -> new ParameterizedMessage(
"failed to retry decommission action for attribute [{}]",
decommissionRequest.getDecommissionAttribute()
),
t
);
delegatedListener.onFailure(t);
})
);
}
}
Expand Down Expand Up @@ -498,6 +505,21 @@ private static void ensureEligibleRequest(
}
}

private static void ensureEligibleRetry(
DecommissionRequest decommissionRequest,
DecommissionAttributeMetadata decommissionAttributeMetadata
) {
if (decommissionAttributeMetadata != null) {
if (decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT)
&& decommissionRequest.retryOnClusterManagerChange() == false) {
throw new DecommissioningFailedException(
decommissionRequest.getDecommissionAttribute(),
"concurrent request received to decommission attribute"
);
}
}
}

private ActionListener<DecommissionStatus> statusUpdateListener() {
return new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ DecommissionRequest createRequest(RestRequest request) throws IOException {
TimeValue delayTimeout = request.paramAsTime("delay_timeout", DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT);
decommissionRequest.setDelayTimeout(delayTimeout);
}
return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue));
return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue))
.setRetryOnClusterManagerChange(false);
}
}