Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decommission retry/pr #66

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;

Expand All @@ -28,22 +29,34 @@
public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionRequest> {

private DecommissionAttribute decommissionAttribute;
private boolean retryOnClusterManagerChange;
private TimeValue retryTimeout;

public DecommissionRequest() {}

public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
public DecommissionRequest(DecommissionAttribute decommissionAttribute, boolean retryOnClusterManagerChange, TimeValue retryTimeout) {
this.decommissionAttribute = decommissionAttribute;
this.retryOnClusterManagerChange = retryOnClusterManagerChange;
this.retryTimeout = retryTimeout;
}

public DecommissionRequest(DecommissionAttribute decommissionAttribute, TimeValue retryTimeout) {
this(decommissionAttribute, false, retryTimeout);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we making sure that user doesn't specify this parameter ? did we evaluate putting this in request context as it is an internal detail of a request and not a user facing one ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the retry flag or the timeout?
For flag, The REST action doesn't support accepting retry flag with the user and hence any request created at REST layer will always have it set to false. Today, it is only set to true when we trigger retry action from service
For timeout, the default is set and user can set one according to his case

Let me know if this answers your question

}

public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
retryOnClusterManagerChange = in.readBoolean();
retryTimeout = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
decommissionAttribute.writeTo(out);
out.writeBoolean(retryOnClusterManagerChange);
out.writeTimeValue(retryTimeout);
}

/**
Expand All @@ -57,13 +70,49 @@ public DecommissionRequest setDecommissionAttribute(DecommissionAttribute decomm
return this;
}

/**
* Sets retryOnClusterManagerChange for decommission request
*
* @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change
* @return this request
*/
public DecommissionRequest setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) {
this.retryOnClusterManagerChange = retryOnClusterManagerChange;
return this;
}

/**
* Sets the retry timeout for the request
*
* @param retryTimeout retry time out for the request
* @return this request
*/
public DecommissionRequest setRetryTimeout(TimeValue retryTimeout) {
this.retryTimeout = retryTimeout;
return this;
}

/**
* @return Returns the decommission attribute key-value
*/
public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

/**
* @return Returns whether decommission is retry eligible on cluster manager change
*/
public boolean retryOnClusterManagerChange() {
return this.retryOnClusterManagerChange;
}

/**
* @return retry timeout
*/
public TimeValue getRetryTimeout() {
return this.retryTimeout;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -78,6 +127,13 @@ public ActionRequestValidationException validate() {

@Override
public String toString() {
return "DecommissionRequest{" + "decommissionAttribute=" + decommissionAttribute + '}';
return "DecommissionRequest{"
+ "decommissionAttribute="
+ decommissionAttribute
+ ", retryOnClusterManagerChange="
+ retryOnClusterManagerChange
+ ", retryTimeout="
+ retryTimeout
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.common.unit.TimeValue;

/**
* Register decommission request builder
Expand All @@ -35,4 +36,26 @@ public DecommissionRequestBuilder setDecommissionedAttribute(DecommissionAttribu
request.setDecommissionAttribute(decommissionAttribute);
return this;
}

/**
* Sets retryOnClusterManagerChange for decommission request
*
* @param retryOnClusterManagerChange boolean for request to retry decommission action on cluster manager change
* @return current object
*/
public DecommissionRequestBuilder setRetryOnClusterManagerChange(boolean retryOnClusterManagerChange) {
request.setRetryOnClusterManagerChange(retryOnClusterManagerChange);
return this;
}

/**
* Sets the retry timeout for the decommission request
*
* @param retryTimeout retry time out for the request
* @return current object
*/
public DecommissionRequestBuilder setRetryTimeout(TimeValue retryTimeout) {
request.setRetryTimeout(retryTimeout);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS
protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener<DecommissionResponse> listener)
throws Exception {
logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString());
decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener);
decommissionService.startDecommissionAction(request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand Down Expand Up @@ -72,6 +75,57 @@ public class DecommissionController {
this.threadPool = threadPool;
}

public void retryDecommissionAction(
DecommissionRequest decommissionRequest,
long startTime,
ActionListener<DecommissionResponse> listener
) {
final long remainingTimeoutMS = decommissionRequest.getRetryTimeout().millis() - (threadPool.relativeTimeInMillis() - startTime);
if (remainingTimeoutMS <= 0) {
logger.debug(
"timed out before retrying [{}] for attribute [{}] after cluster manager change",
DecommissionAction.NAME,
decommissionRequest.getDecommissionAttribute()
);
listener.onFailure(
new OpenSearchTimeoutException(
"timed out before retrying [{}] for attribute [{}] after cluster manager change",
DecommissionAction.NAME,
decommissionRequest.getDecommissionAttribute()
)
);
return;
}
Comment on lines +83 to +98
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need remainingTimeoutMS check here only ? Why do we need it ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout is a check for retry eligibility only. Other actions as part of decommission has their own timeouts. This is the place where we are actually triggering the retry and hence the check. If timed out, request will not be eligible for a retry

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do without this ? In worst case, the retried action will timeout and we will throw a timeout exception . This part of the code is looking out of place from readability POV.

decommissionRequest.setRetryOnClusterManagerChange(true);
decommissionRequest.setRetryTimeout(TimeValue.timeValueMillis(remainingTimeoutMS));
transportService.sendRequest(
transportService.getLocalNode(),
DecommissionAction.NAME,
decommissionRequest,
new TransportResponseHandler<DecommissionResponse>() {
@Override
public void handleResponse(DecommissionResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public DecommissionResponse read(StreamInput in) throws IOException {
return new DecommissionResponse(in);
}
}
);
}

/**
* Transport call to add nodes to voting config exclusion
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -66,6 +66,7 @@ public class DecommissionService {
private final TransportService transportService;
private final ThreadPool threadPool;
private final DecommissionController decommissionController;
private final long startTime;
private volatile List<String> awarenessAttributes;
private volatile Map<String, List<String>> forcedAwarenessAttributes;

Expand All @@ -82,6 +83,7 @@ public DecommissionService(
this.transportService = transportService;
this.threadPool = threadPool;
this.decommissionController = new DecommissionController(clusterService, transportService, allocationService, threadPool);
this.startTime = threadPool.relativeTimeInMillis();
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);

Expand Down Expand Up @@ -112,13 +114,14 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {
* Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT}
* Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration
*
* @param decommissionAttribute register decommission attribute in the metadata request
* @param decommissionRequest request for decommission action
* @param listener register decommission listener
*/
public void startDecommissionAction(
final DecommissionAttribute decommissionAttribute,
final DecommissionRequest decommissionRequest,
final ActionListener<DecommissionResponse> listener
) {
final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute();
// register the metadata with status as INIT as first step
clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
Expand All @@ -128,6 +131,7 @@ public ClusterState execute(ClusterState currentState) {
DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata();
// check that request is eligible to proceed
ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute);
ensureEligibleRetry(decommissionRequest, decommissionAttributeMetadata);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute);
logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString());
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -156,15 +160,17 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
decommissionAttributeMetadata.decommissionAttribute(),
decommissionAttributeMetadata.status()
);
decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener);
assert decommissionAttributeMetadata.decommissionAttribute().equals(decommissionRequest.getDecommissionAttribute());
decommissionClusterManagerNodes(decommissionRequest, listener);
}
});
}

private synchronized void decommissionClusterManagerNodes(
final DecommissionAttribute decommissionAttribute,
final DecommissionRequest decommissionRequest,
ActionListener<DecommissionResponse> listener
) {
final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute();
ClusterState state = clusterService.getClusterApplierService().state();
// since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further
// join the cluster
Expand Down Expand Up @@ -210,18 +216,23 @@ public void onResponse(Void unused) {
failDecommissionedNodes(clusterService.getClusterApplierService().state());
}
} else {
// explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager
// this will ensures that request is retried until cluster manager times out
logger.info(
"local node is not eligible to process the request, "
+ "throwing NotClusterManagerException to attempt a retry on an eligible node"
);
listener.onFailure(
new NotClusterManagerException(
"node ["
+ transportService.getLocalNode().toString()
+ "] not eligible to execute decommission request. Will retry until timeout."
)
// since the local node is no longer cluster manager which could've happened due to leader abdication,
// hence retrying the decommission action until it times out
logger.info("local node is not eligible to process the request, " + "retrying the transport action until it times out");
decommissionController.retryDecommissionAction(
decommissionRequest,
startTime,
ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
logger.debug(
() -> new ParameterizedMessage(
"failed to retry decommission action for attribute [{}]",
decommissionRequest.getDecommissionAttribute()
),
t
);
clearVotingConfigExclusionAndUpdateStatus(false, false); // TODO - need to test this
delegatedListener.onFailure(t);
})
);
Comment on lines -213 to 236
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are responding to user API call now ? Earlier , the retried request on new elected cluster manager would respond it . Would the same hold true now as well ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, response to the retried request is attached to the listener of original request. The chain will continue if multiple retries gets executed.

}
}
Expand Down Expand Up @@ -468,6 +479,18 @@ private static void ensureEligibleRequest(
}
}

public void ensureEligibleRetry(DecommissionRequest decommissionRequest, DecommissionAttributeMetadata decommissionAttributeMetadata) {
if (decommissionAttributeMetadata != null) {
if (decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT)
&& decommissionRequest.retryOnClusterManagerChange() == false) {
throw new DecommissioningFailedException(
decommissionRequest.getDecommissionAttribute(),
"concurrent request received to decommission attribute"
);
}
}
}

private ActionListener<DecommissionStatus> statusUpdateListener() {
return new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.client.Requests;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
Expand All @@ -29,6 +30,9 @@
*/
public class RestDecommissionAction extends BaseRestHandler {

private static final TimeValue DEFAULT_RETRY_TIMEOUT = TimeValue.timeValueMinutes(5L); // setting sufficiently large default retry
// timeout

@Override
public List<Route> routes() {
return singletonList(new Route(PUT, "/_cluster/decommission/awareness/{awareness_attribute_name}/{awareness_attribute_value}"));
Expand Down Expand Up @@ -56,6 +60,11 @@ DecommissionRequest createRequest(RestRequest request) throws IOException {
if (request.hasParam("awareness_attribute_value")) {
attributeValue = request.param("awareness_attribute_value");
}
return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue));
// for REST request, we will set the retry flag to false. User won't have the option to execute retry on REST
return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue))
.setRetryOnClusterManagerChange(false)
.setRetryTimeout(
TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_RETRY_TIMEOUT, getClass().getSimpleName() + ".timeout")
);
}
}
Loading