Skip to content

Commit

Permalink
Cluster health call to throw decommissioned exception for local decom…
Browse files Browse the repository at this point in the history
…missioned nodes

Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Jan 25, 2023
1 parent 715ff72 commit bd1df17
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
"awareness_attribute":{
"type":"string",
"description":"The awareness attribute for which the health is required"
},
"ensureLocalNodeCommissioned":{
"type":"boolean",
"description": "Checks whether local node is commissioned or not. If set to true on a local call it will throw exception if node is decommissioned (default: false)"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ public ActionRequestValidationException validate() {
} else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) {
return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null);
}
if (ensureLocalNodeCommissioned && local == false) {
return addValidationError("not a local request to ensure local node commissioned", null);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.cluster.LocalClusterUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -57,6 +59,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CollectionUtils;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
Expand All @@ -77,6 +80,7 @@ public class TransportClusterHealthAction extends TransportClusterManagerNodeRea
private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);

private final AllocationService allocationService;
private final Discovery discovery;

@Inject
public TransportClusterHealthAction(
Expand All @@ -85,7 +89,8 @@ public TransportClusterHealthAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AllocationService allocationService
AllocationService allocationService,
Discovery discovery
) {
super(
ClusterHealthAction.NAME,
Expand All @@ -98,6 +103,7 @@ public TransportClusterHealthAction(
indexNameExpressionResolver
);
this.allocationService = allocationService;
this.discovery = discovery;
}

@Override
Expand Down Expand Up @@ -134,7 +140,9 @@ protected void clusterManagerOperation(
final ClusterState unusedState,
final ActionListener<ClusterHealthResponse> listener
) {

if (request.ensureLocalNodeCommissioned() && discovery instanceof Coordinator && ((Coordinator) discovery).localNodeCommissioned() == false) {
listener.onFailure(new NodeDecommissionedException("local node is decommissioned"));
}
final int waitCount = getWaitCount(request);

if (request.waitForEvents() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,21 @@ public abstract class ClusterManagerNodeReadRequest<Request extends ClusterManag
ClusterManagerNodeRequest<Request> {

protected boolean local = false;
protected boolean ensureLocalNodeCommissioned = false;

protected ClusterManagerNodeReadRequest() {}

protected ClusterManagerNodeReadRequest(StreamInput in) throws IOException {
super(in);
local = in.readBoolean();
ensureLocalNodeCommissioned = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(local);
out.writeBoolean(ensureLocalNodeCommissioned);
}

@SuppressWarnings("unchecked")
Expand All @@ -66,6 +69,12 @@ public final Request local(boolean local) {
return (Request) this;
}

@SuppressWarnings("unchecked")
public final Request ensureLocalNodeCommissioned(boolean ensureLocalNodeCommissioned) {
this.ensureLocalNodeCommissioned = ensureLocalNodeCommissioned;
return (Request) this;
}

/**
* Return local information, do not retrieve the state from cluster-manager node (default: false).
* @return <code>true</code> if local information is to be returned;
Expand All @@ -74,4 +83,13 @@ public final Request local(boolean local) {
public final boolean local() {
return local;
}

/**
* For a given local request, checks if the local node is commissioned or not (default: false).
* @return <code>true</code> if local information is to be returned only when local node is also commissioned
* <code>false</code> to not check local node if commissioned or not for a local request
*/
public final boolean ensureLocalNodeCommissioned() {
return ensureLocalNodeCommissioned;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1437,8 +1437,7 @@ synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
peerFinder.onNodeCommissionStatusChange(localNodeCommissioned);
}

// package-visible for testing
boolean localNodeCommissioned() {
public boolean localNodeCommissioned() {
return localNodeCommissioned;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

Expand All @@ -28,4 +29,9 @@ public NodeDecommissionedException(String msg, Object... args) {
public NodeDecommissionedException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.UNPROCESSABLE_ENTITY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public static ClusterHealthRequest fromRequest(final RestRequest request) {
final ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
clusterHealthRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterHealthRequest.indicesOptions()));
clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
clusterHealthRequest.ensureLocalNodeCommissioned(request.paramAsBoolean("ensureLocalNodeCommissioned", clusterHealthRequest.ensureLocalNodeCommissioned()));
clusterHealthRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", clusterHealthRequest.clusterManagerNodeTimeout())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
threadPool,
new ActionFilters(new HashSet<>()),
indexNameExpressionResolver,
new AllocationService(null, new TestGatewayAllocator(), null, null, null)
new AllocationService(null, new TestGatewayAllocator(), null, null, null), null
);
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener);
Expand Down

0 comments on commit bd1df17

Please sign in to comment.