From 53a4caee175f131a90d0a757ecd96e113d7bfc20 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 4 Jan 2023 16:10:36 +0530 Subject: [PATCH] [Backport 2.x] Validate attributes of routing nodes for Routing Weights API (#5691) * Validate attributes of routing nodes for Routing Weights API (#5607) * Changes following things in Routing Weights API and Decommission Action - a. Consider only routing node's attribute for validation instead of all nodes b. Ensure no more than 50% node has weight zero c. Removal of decommission action's direct dependence on force zone awareness Signed-off-by: Rishab Nahata --- .../AwarenessAttributeDecommissionIT.java | 98 +++++++++++++++++-- .../put/ClusterPutWeightedRoutingRequest.java | 22 ++++- .../decommission/DecommissionService.java | 15 ++- .../routing/WeightedRoutingService.java | 36 +++++-- ...ClusterPutWeightedRoutingRequestTests.java | 10 ++ .../DecommissionServiceTests.java | 27 ----- .../routing/WeightedRoutingServiceTests.java | 36 ++++++- 7 files changed, 197 insertions(+), 47 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 54765650cd202..8dde5f3dc3a93 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -781,15 +781,15 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception }); } - public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { + public void testDecommissionFailedWithOnlyOneAttributeValueForLeader() throws Exception { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") - .put("cluster.routing.allocation.awareness.force.zone.values", "a") + .put("cluster.routing.allocation.awareness.force.zone.values", "b") // force zone values is only set for zones of routing nodes .build(); - // Start 3 cluster manager eligible nodes + // Start 3 cluster manager eligible nodes in zone a internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); - // start 3 data nodes - internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + // Start 3 data nodes in zone b + internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); ensureStableCluster(6); ClusterHealthResponse health = client().admin() .cluster() @@ -802,7 +802,7 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { assertFalse(health.isTimedOut()); logger.info("--> setting shard routing weights"); - Map weights = Map.of("a", 0.0); + Map weights = Map.of("b", 1.0); // weights are expected to be set only for routing nodes WeightedRouting weightedRouting = new WeightedRouting("zone", weights); ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() @@ -868,6 +868,92 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { ensureStableCluster(6, TimeValue.timeValueMinutes(2)); } + public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throws ExecutionException, InterruptedException { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'd' & 'e' & 'f'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "d") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "e") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "f") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + List dataNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertTrue(weightedRoutingResponse.isAcknowledged()); + + logger.info("--> starting decommissioning nodes in zone {}", 'd'); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "d"); + // Set the timeout to 0 to do immediate Decommission + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); + DecommissionResponse decommissionResponse = client(dataNodes.get(0)).execute(DecommissionAction.INSTANCE, decommissionRequest) + .get(); + assertTrue(decommissionResponse.isAcknowledged()); + + client(dataNodes.get(0)).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + ClusterState clusterState = client(dataNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState(); + + // assert that number of nodes should be 5 ( 2 cluster manager nodes + 3 data nodes ) + assertEquals(clusterState.nodes().getNodes().size(), 5); + assertEquals(clusterState.nodes().getDataNodes().size(), 3); + assertEquals(clusterState.nodes().getClusterManagerNodes().size(), 2); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(dataNodes.get(0)).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster + ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + } + private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener { final CountDownLatch doneLatch; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index 5474f4effa829..8adbf13a000c5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import static org.opensearch.action.ValidateActions.addValidationError; @@ -126,17 +127,36 @@ public ActionRequestValidationException validate() { if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { validationException = addValidationError("Weights are missing", validationException); } + int countValueWithZeroWeights = 0; + double weight; try { for (Object value : weightedRouting.weights().values()) { if (value == null) { validationException = addValidationError(("Weight is null"), validationException); } else { - Double.parseDouble(value.toString()); + weight = Double.parseDouble(value.toString()); + countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights; } } } catch (NumberFormatException e) { validationException = addValidationError(("Weight is not a number"), validationException); } + // Returning validation exception here itself if it is not null, so we can have a descriptive message for the count check + if (validationException != null) { + return validationException; + } + if (countValueWithZeroWeights > weightedRouting.weights().size() / 2) { + validationException = addValidationError( + (String.format( + Locale.ROOT, + "There are too many attribute values [%s] given zero weight [%d]. Maximum expected number of routing weights having zero weight is [%d]", + weightedRouting.weights().toString(), + countValueWithZeroWeights, + weightedRouting.weights().size() / 2 + )), + null + ); + } return validationException; } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index f36d7b3e06da9..4139ad8d36ed0 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -415,10 +415,12 @@ private static void validateAwarenessAttribute( msg = "invalid awareness attribute requested for decommissioning"; } else if (forcedAwarenessAttributes.containsKey(decommissionAttribute.attributeName()) == false) { msg = "forced awareness attribute [" + forcedAwarenessAttributes.toString() + "] doesn't have the decommissioning attribute"; - } else if (forcedAwarenessAttributes.get(decommissionAttribute.attributeName()) - .contains(decommissionAttribute.attributeValue()) == false) { - msg = "invalid awareness attribute value requested for decommissioning. Set forced awareness values before to decommission"; - } + } + // we don't need to check for attributes presence in forced awareness attribute because, weights API ensures that weights are set + // for all discovered routing attributes and forced attributes. + // So, if the weight is not present for the attribute it could mean its a non routing node (eg. cluster manager) + // And in that case, we are ok to proceed with the decommission. A routing node's attribute absence in forced awareness attribute is + // a problem elsewhere if (msg != null) { throw new DecommissioningFailedException(decommissionAttribute, msg); @@ -440,8 +442,11 @@ private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState st "no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]" ); } + // in case the weight is not set for the attribute value, then we know that attribute values was not part of discovered routing node + // attribute or forced awareness attribute and in that case, we are ok if the attribute's value weight is not set. But if it's set, + // its weight has to be zero Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue()); - if (attributeValueWeight == null || attributeValueWeight.equals(0.0) == false) { + if (attributeValueWeight != null && attributeValueWeight.equals(0.0) == false) { throw new DecommissioningFailedException( decommissionAttribute, "weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]" diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 2b5961c7340c1..895b790f8499a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -8,6 +8,8 @@ package org.opensearch.cluster.routing; +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -40,6 +42,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.action.ValidateActions.addValidationError; import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING; @@ -189,12 +192,12 @@ public void verifyAwarenessAttribute(String attributeName) { private void ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(ClusterState state, ClusterPutWeightedRoutingRequest request) { String attributeName = request.getWeightedRouting().attributeName(); + // build attr_value -> nodes map + ObjectIntHashMap nodesPerAttribute = state.getRoutingNodes().nodesPerAttributesCounts(attributeName); Set discoveredAwarenessValues = new HashSet<>(); - state.nodes().forEach(node -> { - if (node.getAttributes().containsKey(attributeName)) { - discoveredAwarenessValues.add(node.getAttributes().get(attributeName)); - } - }); + for (ObjectCursor stringObjectCursor : nodesPerAttribute.keys()) { + if (stringObjectCursor.value != null) discoveredAwarenessValues.add(stringObjectCursor.value); + } Set allAwarenessValues; if (forcedAwarenessAttributes.get(attributeName) == null) { allAwarenessValues = new HashSet<>(); @@ -202,13 +205,34 @@ private void ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(ClusterSta allAwarenessValues = new HashSet<>(forcedAwarenessAttributes.get(attributeName)); } allAwarenessValues.addAll(discoveredAwarenessValues); + AtomicInteger countWithZeroWeight = new AtomicInteger(); allAwarenessValues.forEach(awarenessValue -> { if (request.getWeightedRouting().weights().containsKey(awarenessValue) == false) { throw new UnsupportedWeightedRoutingStateException( - "weight for [" + awarenessValue + "] is not set and it is part of forced awareness value or a node has this attribute." + "weight for [" + + awarenessValue + + "] is not set and it is part of forced awareness value or a routing node has this attribute." ); } + if (request.getWeightedRouting().weights().get(awarenessValue) == 0) { + countWithZeroWeight.addAndGet(1); + } }); + // We have validations in place to check that not more than half of the values weights are set to 0 in the request object + // Adding this check again here on allAwarenessValues such that in no case we land up in a situation where more than half of + // discovered awareness values has weight zero + if (countWithZeroWeight.get() > allAwarenessValues.size() / 2) { + throw addValidationError( + (String.format( + Locale.ROOT, + "There are too many discovered attribute values [%s] given zero weight [%d]. Maximum expected number of routing weights having zero weight is [%d]", + request.getWeightedRouting().weights().toString(), + countWithZeroWeight.get(), + allAwarenessValues.size() / 2 + )), + null + ); + } } private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, ClusterPutWeightedRoutingRequest request) { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index cdec66d6683eb..5e456158941b8 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -53,4 +53,14 @@ public void testValidate_AttributeMissing() { assertTrue(actionRequestValidationException.getMessage().contains("Attribute name is missing")); } + public void testValidate_MoreThanHalfWithZeroWeight() { + String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue( + actionRequestValidationException.getMessage().contains("Maximum expected number of routing weights having zero weight is [1]") + ); + } } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index abbef29208aef..989ba70533de0 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -145,33 +145,6 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } - @SuppressWarnings("unchecked") - public void testDecommissioningNotStartedForInvalidAttributeValue() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "rack-a"); - ActionListener listener = new ActionListener() { - @Override - public void onResponse(DecommissionResponse decommissionResponse) { - fail("on response shouldn't have been called"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(e instanceof DecommissioningFailedException); - assertThat( - e.getMessage(), - Matchers.endsWith( - "invalid awareness attribute value requested for decommissioning. " - + "Set forced awareness values before to decommission" - ) - ); - countDownLatch.countDown(); - } - }; - decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - } - public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException { Map weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0); setWeightedRoutingWeights(weights); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 89d9555fe225b..089fb453ca2c0 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -202,7 +202,7 @@ public void testRegisterWeightedRoutingMetadataWithChangedWeights() throws Inter client, ClusterAddWeightedRoutingAction.INSTANCE ); - WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 0.0)); + WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 0.0)); request.setWeightedRouting(updatedWeightedRouting); final CountDownLatch countDownLatch = new CountDownLatch(1); ActionListener listener = new ActionListener() { @@ -323,7 +323,39 @@ public void onFailure(Exception e) { MatcherAssert.assertThat(exceptionReference.get(), instanceOf(UnsupportedWeightedRoutingStateException.class)); MatcherAssert.assertThat( exceptionReference.get().getMessage(), - containsString("weight for [zone_B] is not set and it is part of forced awareness value or a node has this attribute.") + containsString("weight for [zone_B] is not set and it is part of forced awareness value or a routing node has this attribute.") + ); + } + + public void testAddWeightedRoutingFailsWhenWeightsForMoreThanHalfIsZero() throws InterruptedException { + ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder( + client, + ClusterAddWeightedRoutingAction.INSTANCE + ); + Map weights = Map.of("zone_A", 0.0, "zone_B", 0.0, "zone_C", 1.0, "zone_D", 1.0, "zone_E", 1.0, "zone_F", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(weightedRouting); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference exceptionReference = new AtomicReference<>(); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionReference.set(e); + countDownLatch.countDown(); + } + }; + weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue()); + MatcherAssert.assertThat(exceptionReference.get(), instanceOf(ActionRequestValidationException.class)); + MatcherAssert.assertThat( + exceptionReference.get().getMessage(), + containsString("Maximum expected number of routing weights having zero weight is [1]") ); }