From 6ac0c7cd2054da9a6492c76011d1dcd2e0f518b8 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Mon, 17 Oct 2022 14:51:56 -0700 Subject: [PATCH 1/9] Fix recovery path for searchable snapshots (#4813) Replicas are bootstrapped using the recovery path, as opposed to the restore path used for creating the primary shard. This has been broken in the initial implementation of searchable snapshots. The fix here is to put in the appropriate checks to avoid failing during recovery. I've also updated the integration test to ensure the recovery path is always exercised during testing. Signed-off-by: Andrew Ross Signed-off-by: Andrew Ross --- CHANGELOG.md | 1 + .../opensearch/snapshots/SearchableSnapshotIT.java | 13 +++++++++++-- .../indices/recovery/PeerRecoveryTargetService.java | 9 ++++++--- .../opensearch/indices/recovery/RecoveryTarget.java | 11 +++++++---- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 963ccdcdc9a6b..25efcc838553a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -137,6 +137,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fixing Gradle warnings associated with publishPluginZipPublicationToXxx tasks ([#4696](https://github.com/opensearch-project/OpenSearch/pull/4696)) - Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774)) - Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786)) +- Fix recovery path for searchable snapshots ([4813](https://github.com/opensearch-project/OpenSearch/pull/4813)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index 96fcf0053c9ab..0ab025bb575cc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -49,15 +49,24 @@ protected boolean addMockInternalEngine() { } public void testCreateSearchableSnapshot() throws Exception { + final int numReplicasIndex1 = randomIntBetween(1, 4); + final int numReplicasIndex2 = randomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1); final Client client = client(); createRepository("test-repo", "fs"); createIndex( "test-idx-1", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build() + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex1)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .build() ); createIndex( "test-idx-2", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build() + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex2)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .build() ); ensureGreen(); indexRandomDocs("test-idx-1", 100); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index b5702431ed4bf..556e4db3400e1 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -54,6 +54,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.mapper.MapperException; @@ -244,8 +245,10 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); - boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); - final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); + final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final boolean hasNoTranslog = IndexModule.Type.REMOTE_SNAPSHOT.match(indexShard.indexSettings()); + final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog) == false; + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; startRequest = getStartRecoveryRequest( @@ -253,7 +256,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi clusterService.localNode(), recoveryTarget, startingSeqNo, - !remoteTranslogEnabled + verifyTranslog ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index c1e29e0d866d8..b6122dbeeea09 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -44,6 +44,7 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.index.IndexModule; import org.opensearch.index.engine.Engine; import org.opensearch.index.mapper.MapperException; import org.opensearch.index.seqno.ReplicationTracker; @@ -355,10 +356,12 @@ public void cleanFiles( try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); - // If Segment Replication is enabled, we need to reuse the primary's translog UUID already stored in the index. - // With Segrep, replicas should never create their own commit points. This ensures the index and xlog share the same - // UUID without the extra step to associate the index with a new xlog. - if (indexShard.indexSettings().isSegRepEnabled()) { + // Replicas for segment replication or remote snapshot indices do not create + // their own commit points and therefore do not modify the commit user data + // in their store. In these cases, reuse the primary's translog UUID. + final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabled() + || IndexModule.Type.REMOTE_SNAPSHOT.match(indexShard.indexSettings()); + if (reuseTranslogUUID) { final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY); Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), From 1d654851ff2ea50b0fbb07b7602945f3a0e4cc88 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Mon, 17 Oct 2022 16:20:47 -0700 Subject: [PATCH 2/9] Refactor BalancedAllocator.Balancer to LocalShardsBalancer (#4761) * Refactor BalancedAllocator.Balancer to LocalShardsBalancer Signed-off-by: Kunal Kotwani * Deprecate Balancer to maintain BWC Signed-off-by: Kunal Kotwani Signed-off-by: Kunal Kotwani --- CHANGELOG.md | 1 + .../allocation/AllocationConstraints.java | 7 +- .../allocator/BalancedShardsAllocator.java | 967 +----------------- .../allocator/LocalShardsBalancer.java | 967 ++++++++++++++++++ .../allocation/allocator/ShardsBalancer.java | 75 ++ .../AllocationConstraintsTests.java | 4 +- .../allocation/BalancedSingleShardTests.java | 4 +- 7 files changed, 1083 insertions(+), 942 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 25efcc838553a..4f6e5425f37db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -86,6 +86,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix weighted routing metadata deserialization error on process restart ([#4691](https://github.com/opensearch-project/OpenSearch/pull/4691)) - Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732)) - Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) +- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761)) ### Deprecated ### Removed - Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 8c2c85ce107a6..3d9847ca35931 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -6,6 +6,7 @@ package org.opensearch.cluster.routing.allocation; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; import java.util.ArrayList; import java.util.List; @@ -27,11 +28,11 @@ public AllocationConstraints() { } class ConstraintParams { - private BalancedShardsAllocator.Balancer balancer; + private ShardsBalancer balancer; private BalancedShardsAllocator.ModelNode node; private String index; - ConstraintParams(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { this.balancer = balancer; this.node = node; this.index = index; @@ -50,7 +51,7 @@ class ConstraintParams { * This weight function is used only in case of unassigned shards to avoid overloading a newly added node. * Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this function. */ - public long weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { int constraintsBreached = 0; ConstraintParams params = new ConstraintParams(balancer, node, index); for (Predicate predicate : constraintPredicates) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 181910e3ac1c4..42c8f7987bf3d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -34,47 +34,28 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; -import org.opensearch.cluster.routing.allocation.AllocationDecision; import org.opensearch.cluster.routing.allocation.MoveDecision; -import org.opensearch.cluster.routing.allocation.NodeAllocationResult; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.ShardAllocationDecision; -import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; -import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.cluster.routing.allocation.decider.Decision.Type; -import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; -import org.opensearch.gateway.PriorityComparator; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.StreamSupport; - -import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; /** * The {@link BalancedShardsAllocator} re-balances the nodes allocations @@ -160,23 +141,23 @@ public void allocate(RoutingAllocation allocation) { failAllocationOfNewPrimaries(allocation); return; } - final Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); - balancer.allocateUnassigned(); - balancer.moveShards(); - balancer.balance(); + final ShardsBalancer localShardsBalancer = new LocalShardsBalancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); + localShardsBalancer.allocateUnassigned(); + localShardsBalancer.moveShards(); + localShardsBalancer.balance(); } @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { - Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); + ShardsBalancer localShardsBalancer = new LocalShardsBalancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; if (shard.unassigned()) { - allocateUnassignedDecision = balancer.decideAllocateUnassigned(shard); + allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard); } else { - moveDecision = balancer.decideMove(shard); + moveDecision = localShardsBalancer.decideMove(shard); if (moveDecision.isDecisionTaken() && moveDecision.canRemain()) { - MoveDecision rebalanceDecision = balancer.decideRebalance(shard); + MoveDecision rebalanceDecision = localShardsBalancer.decideRebalance(shard); moveDecision = rebalanceDecision.withRemainDecision(moveDecision.getCanRemainDecision()); } } @@ -277,923 +258,18 @@ static class WeightFunction { this.constraints = new AllocationConstraints(); } - public float weightWithAllocationConstraints(Balancer balancer, ModelNode node, String index) { + public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) { float balancerWeight = weight(balancer, node, index); return balancerWeight + constraints.weight(balancer, node, index); } - float weight(Balancer balancer, ModelNode node, String index) { + float weight(ShardsBalancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); return theta0 * weightShard + theta1 * weightIndex; } } - /** - * A {@link Balancer} - * - * @opensearch.internal - */ - public static class Balancer { - private final Logger logger; - private final Map nodes; - private final RoutingAllocation allocation; - private final RoutingNodes routingNodes; - private final boolean movePrimaryFirst; - private final WeightFunction weight; - - private final float threshold; - private final Metadata metadata; - private final float avgShardsPerNode; - private final NodeSorter sorter; - private final Set inEligibleTargetNode; - - public Balancer(Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, WeightFunction weight, float threshold) { - this.logger = logger; - this.allocation = allocation; - this.movePrimaryFirst = movePrimaryFirst; - this.weight = weight; - this.threshold = threshold; - this.routingNodes = allocation.routingNodes(); - this.metadata = allocation.metadata(); - avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); - nodes = Collections.unmodifiableMap(buildModelFromAssigned()); - sorter = newNodeSorter(); - inEligibleTargetNode = new HashSet<>(); - } - - /** - * Returns an array view on the nodes in the balancer. Nodes should not be removed from this list. - */ - private ModelNode[] nodesArray() { - return nodes.values().toArray(new ModelNode[nodes.size()]); - } - - /** - * Returns the average of shards per node for the given index - */ - public float avgShardsPerNode(String index) { - return ((float) metadata.index(index).getTotalNumberOfShards()) / nodes.size(); - } - - /** - * Returns the global average of shards per node - */ - public float avgShardsPerNode() { - return avgShardsPerNode; - } - - /** - * Returns a new {@link NodeSorter} that sorts the nodes based on their - * current weight with respect to the index passed to the sorter. The - * returned sorter is not sorted. Use {@link NodeSorter#reset(String)} - * to sort based on an index. - */ - private NodeSorter newNodeSorter() { - return new NodeSorter(nodesArray(), weight, this); - } - - /** - * The absolute value difference between two weights. - */ - private static float absDelta(float lower, float higher) { - assert higher >= lower : higher + " lt " + lower + " but was expected to be gte"; - return Math.abs(higher - lower); - } - - /** - * Returns {@code true} iff the weight delta between two nodes is under a defined threshold. - * See {@link #THRESHOLD_SETTING} for defining the threshold. - */ - private static boolean lessThan(float delta, float threshold) { - /* deltas close to the threshold are "rounded" to the threshold manually - to prevent floating point problems if the delta is very close to the - threshold ie. 1.000000002 which can trigger unnecessary balance actions*/ - return delta <= (threshold + 0.001f); - } - - /** - * Balances the nodes on the cluster model according to the weight function. - * The actual balancing is delegated to {@link #balanceByWeights()} - */ - private void balance() { - if (logger.isTraceEnabled()) { - logger.trace("Start balancing cluster"); - } - if (allocation.hasPendingAsyncFetch()) { - /* - * see https://github.com/elastic/elasticsearch/issues/14387 - * if we allow rebalance operations while we are still fetching shard store data - * we might end up with unnecessary rebalance operations which can be super confusion/frustrating - * since once the fetches come back we might just move all the shards back again. - * Therefore we only do a rebalance if we have fetched all information. - */ - logger.debug("skipping rebalance due to in-flight shard/store fetches"); - return; - } - if (allocation.deciders().canRebalance(allocation).type() != Type.YES) { - logger.trace("skipping rebalance as it is disabled"); - return; - } - if (nodes.size() < 2) { /* skip if we only have one node */ - logger.trace("skipping rebalance as single node only"); - return; - } - balanceByWeights(); - } - - /** - * Makes a decision about moving a single shard to a different node to form a more - * optimally balanced cluster. This method is invoked from the cluster allocation - * explain API only. - */ - private MoveDecision decideRebalance(final ShardRouting shard) { - if (shard.started() == false) { - // we can only rebalance started shards - return MoveDecision.NOT_TAKEN; - } - - Decision canRebalance = allocation.deciders().canRebalance(shard, allocation); - - sorter.reset(shard.getIndexName()); - ModelNode[] modelNodes = sorter.modelNodes; - final String currentNodeId = shard.currentNodeId(); - // find currently assigned node - ModelNode currentNode = null; - for (ModelNode node : modelNodes) { - if (node.getNodeId().equals(currentNodeId)) { - currentNode = node; - break; - } - } - assert currentNode != null : "currently assigned node could not be found"; - - // balance the shard, if a better node can be found - final String idxName = shard.getIndexName(); - final float currentWeight = weight.weight(this, currentNode, idxName); - final AllocationDeciders deciders = allocation.deciders(); - Type rebalanceDecisionType = Type.NO; - ModelNode assignedNode = null; - List> betterBalanceNodes = new ArrayList<>(); - List> sameBalanceNodes = new ArrayList<>(); - List> worseBalanceNodes = new ArrayList<>(); - for (ModelNode node : modelNodes) { - if (node == currentNode) { - continue; // skip over node we're currently allocated to - } - final Decision canAllocate = deciders.canAllocate(shard, node.getRoutingNode(), allocation); - // the current weight of the node in the cluster, as computed by the weight function; - // this is a comparison of the number of shards on this node to the number of shards - // that should be on each node on average (both taking the cluster as a whole into account - // as well as shards per index) - final float nodeWeight = weight.weight(this, node, idxName); - // if the node we are examining has a worse (higher) weight than the node the shard is - // assigned to, then there is no way moving the shard to the node with the worse weight - // can make the balance of the cluster better, so we check for that here - final boolean betterWeightThanCurrent = nodeWeight <= currentWeight; - boolean rebalanceConditionsMet = false; - if (betterWeightThanCurrent) { - // get the delta between the weights of the node we are checking and the node that holds the shard - float currentDelta = absDelta(nodeWeight, currentWeight); - // checks if the weight delta is above a certain threshold; if it is not above a certain threshold, - // then even though the node we are examining has a better weight and may make the cluster balance - // more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless - // the gains make it worth it, as defined by the threshold - boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false; - // calculate the delta of the weights of the two nodes if we were to add the shard to the - // node in question and move it away from the node that currently holds it. - // hence we add 2.0f to the weight delta - float proposedDelta = 2.0f + nodeWeight - currentWeight; - boolean betterWeightWithShardAdded = proposedDelta < currentDelta; - - rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded; - // if the simulated weight delta with the shard moved away is better than the weight delta - // with the shard remaining on the current node, and we are allowed to allocate to the - // node in question, then allow the rebalance - if (rebalanceConditionsMet && canAllocate.type().higherThan(rebalanceDecisionType)) { - // rebalance to the node, only will get overwritten if the decision here is to - // THROTTLE and we get a decision with YES on another node - rebalanceDecisionType = canAllocate.type(); - assignedNode = node; - } - } - Tuple nodeResult = Tuple.tuple(node, canAllocate); - if (rebalanceConditionsMet) { - betterBalanceNodes.add(nodeResult); - } else if (betterWeightThanCurrent) { - sameBalanceNodes.add(nodeResult); - } else { - worseBalanceNodes.add(nodeResult); - } - } - - int weightRanking = 0; - List nodeDecisions = new ArrayList<>(modelNodes.length - 1); - for (Tuple result : betterBalanceNodes) { - nodeDecisions.add( - new NodeAllocationResult( - result.v1().routingNode.node(), - AllocationDecision.fromDecisionType(result.v2().type()), - result.v2(), - ++weightRanking - ) - ); - } - int currentNodeWeightRanking = ++weightRanking; - for (Tuple result : sameBalanceNodes) { - AllocationDecision nodeDecision = result.v2().type() == Type.NO ? AllocationDecision.NO : AllocationDecision.WORSE_BALANCE; - nodeDecisions.add( - new NodeAllocationResult(result.v1().routingNode.node(), nodeDecision, result.v2(), currentNodeWeightRanking) - ); - } - for (Tuple result : worseBalanceNodes) { - AllocationDecision nodeDecision = result.v2().type() == Type.NO ? AllocationDecision.NO : AllocationDecision.WORSE_BALANCE; - nodeDecisions.add(new NodeAllocationResult(result.v1().routingNode.node(), nodeDecision, result.v2(), ++weightRanking)); - } - - if (canRebalance.type() != Type.YES || allocation.hasPendingAsyncFetch()) { - AllocationDecision allocationDecision = allocation.hasPendingAsyncFetch() - ? AllocationDecision.AWAITING_INFO - : AllocationDecision.fromDecisionType(canRebalance.type()); - return MoveDecision.cannotRebalance(canRebalance, allocationDecision, currentNodeWeightRanking, nodeDecisions); - } else { - return MoveDecision.rebalance( - canRebalance, - AllocationDecision.fromDecisionType(rebalanceDecisionType), - assignedNode != null ? assignedNode.routingNode.node() : null, - currentNodeWeightRanking, - nodeDecisions - ); - } - } - - /** - * Balances the nodes on the cluster model according to the weight - * function. The configured threshold is the minimum delta between the - * weight of the maximum node and the minimum node according to the - * {@link WeightFunction}. This weight is calculated per index to - * distribute shards evenly per index. The balancer tries to relocate - * shards only if the delta exceeds the threshold. In the default case - * the threshold is set to {@code 1.0} to enforce gaining relocation - * only, or in other words relocations that move the weight delta closer - * to {@code 0.0} - */ - private void balanceByWeights() { - final AllocationDeciders deciders = allocation.deciders(); - final ModelNode[] modelNodes = sorter.modelNodes; - final float[] weights = sorter.weights; - for (String index : buildWeightOrderedIndices()) { - IndexMetadata indexMetadata = metadata.index(index); - - // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to, - // move these nodes to the front of modelNodes so that we can only balance based on these nodes - int relevantNodes = 0; - for (int i = 0; i < modelNodes.length; i++) { - ModelNode modelNode = modelNodes[i]; - if (modelNode.getIndex(index) != null - || deciders.canAllocate(indexMetadata, modelNode.getRoutingNode(), allocation).type() != Type.NO) { - // swap nodes at position i and relevantNodes - modelNodes[i] = modelNodes[relevantNodes]; - modelNodes[relevantNodes] = modelNode; - relevantNodes++; - } - } - - if (relevantNodes < 2) { - continue; - } - - sorter.reset(index, 0, relevantNodes); - int lowIdx = 0; - int highIdx = relevantNodes - 1; - while (true) { - final ModelNode minNode = modelNodes[lowIdx]; - final ModelNode maxNode = modelNodes[highIdx]; - advance_range: if (maxNode.numShards(index) > 0) { - final float delta = absDelta(weights[lowIdx], weights[highIdx]); - if (lessThan(delta, threshold)) { - if (lowIdx > 0 - && highIdx - 1 > 0 // is there a chance for a higher delta? - && (absDelta(weights[0], weights[highIdx - 1]) > threshold) // check if we need to break at all - ) { - /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible - * due to some allocation decider restrictions like zone awareness. if one zone has for instance - * less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we - * can't move to the "lighter" shards since otherwise the zone would go over capacity. - * - * This break jumps straight to the condition below were we start moving from the high index towards - * the low index to shrink the window we are considering for balance from the other direction. - * (check shrinking the window from MAX to MIN) - * See #3580 - */ - break advance_range; - } - if (logger.isTraceEnabled()) { - logger.trace( - "Stop balancing index [{}] min_node [{}] weight: [{}]" + " max_node [{}] weight: [{}] delta: [{}]", - index, - maxNode.getNodeId(), - weights[highIdx], - minNode.getNodeId(), - weights[lowIdx], - delta - ); - } - break; - } - if (logger.isTraceEnabled()) { - logger.trace( - "Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]", - maxNode.getNodeId(), - weights[highIdx], - minNode.getNodeId(), - weights[lowIdx], - delta - ); - } - if (delta <= 1.0f) { - /* - * prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the - * balance if we only achieve the same delta the relocation is useless - * - * NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We - * already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never - * hit this case anyway. - */ - logger.trace( - "Couldn't find shard to relocate from node [{}] to node [{}]", - maxNode.getNodeId(), - minNode.getNodeId() - ); - } else if (tryRelocateShard(minNode, maxNode, index)) { - /* - * TODO we could be a bit smarter here, we don't need to fully sort necessarily - * we could just find the place to insert linearly but the win might be minor - * compared to the added complexity - */ - weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); - weights[highIdx] = sorter.weight(modelNodes[highIdx]); - sorter.sort(0, relevantNodes); - lowIdx = 0; - highIdx = relevantNodes - 1; - continue; - } - } - if (lowIdx < highIdx - 1) { - /* Shrinking the window from MIN to MAX - * we can't move from any shard from the min node lets move on to the next node - * and see if the threshold still holds. We either don't have any shard of this - * index on this node of allocation deciders prevent any relocation.*/ - lowIdx++; - } else if (lowIdx > 0) { - /* Shrinking the window from MAX to MIN - * now we go max to min since obviously we can't move anything to the max node - * lets pick the next highest */ - lowIdx = 0; - highIdx--; - } else { - /* we are done here, we either can't relocate anymore or we are balanced */ - break; - } - } - } - } - - /** - * This builds a initial index ordering where the indices are returned - * in most unbalanced first. We need this in order to prevent over - * allocations on added nodes from one index when the weight parameters - * for global balance overrule the index balance at an intermediate - * state. For example this can happen if we have 3 nodes and 3 indices - * with 3 primary and 1 replica shards. At the first stage all three nodes hold - * 2 shard for each index. Now we add another node and the first index - * is balanced moving three shards from two of the nodes over to the new node since it - * has no shards yet and global balance for the node is way below - * average. To re-balance we need to move shards back eventually likely - * to the nodes we relocated them from. - */ - private String[] buildWeightOrderedIndices() { - final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); - final float[] deltas = new float[indices.length]; - for (int i = 0; i < deltas.length; i++) { - sorter.reset(indices[i]); - deltas[i] = sorter.delta(); - } - new IntroSorter() { - - float pivotWeight; - - @Override - protected void swap(int i, int j) { - final String tmpIdx = indices[i]; - indices[i] = indices[j]; - indices[j] = tmpIdx; - final float tmpDelta = deltas[i]; - deltas[i] = deltas[j]; - deltas[j] = tmpDelta; - } - - @Override - protected int compare(int i, int j) { - return Float.compare(deltas[j], deltas[i]); - } - - @Override - protected void setPivot(int i) { - pivotWeight = deltas[i]; - } - - @Override - protected int comparePivot(int j) { - return Float.compare(deltas[j], pivotWeight); - } - }.sort(0, deltas.length); - - return indices; - } - - /** - * Checks if target node is ineligible and if so, adds to the list - * of ineligible target nodes - */ - private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { - Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); - if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { - inEligibleTargetNode.add(targetNode); - } - } - - /** - * Move started shards that can not be allocated to a node anymore - * - * For each shard to be moved this function executes a move operation - * to the minimal eligible node with respect to the - * weight function. If a shard is moved the shard will be set to - * {@link ShardRoutingState#RELOCATING} and a shadow instance of this - * shard is created with an incremented version in the state - * {@link ShardRoutingState#INITIALIZING}. - */ - public void moveShards() { - // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling - // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are - // offloading the shards. - - // Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes - // when no target is eligible - for (ModelNode currentNode : sorter.modelNodes) { - checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); - } - boolean primariesThrottled = false; - for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) { - // Verify if the cluster concurrent recoveries have been reached. - if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { - logger.info( - "Cannot move any shard in the cluster due to cluster concurrent recoveries getting breached" - + ". Skipping shard iteration" - ); - return; - } - // Early terminate node interleaved shard iteration when no eligible target nodes are available - if (sorter.modelNodes.length == inEligibleTargetNode.size()) { - logger.info( - "Cannot move any shard in the cluster as there is no node on which shards can be allocated" - + ". Skipping shard iteration" - ); - return; - } - - ShardRouting shardRouting = it.next(); - - // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled - if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { - logger.info( - "Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards" - + "are being throttled. Skipping shard iteration" - ); - return; - } - - // Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard - // is not being throttled. - Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation); - if (canMoveAwayDecision.type() != Decision.Type.YES) { - if (logger.isDebugEnabled()) logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting); - if (shardRouting.primary() && canMoveAwayDecision.type() == Type.THROTTLE) { - primariesThrottled = true; - } - continue; - } - - final MoveDecision moveDecision = decideMove(shardRouting); - if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { - final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); - sourceNode.removeShard(shardRouting); - Tuple relocatingShards = routingNodes.relocateShard( - shardRouting, - targetNode.getNodeId(), - allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - allocation.changes() - ); - targetNode.addShard(relocatingShards.v2()); - if (logger.isTraceEnabled()) { - logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); - } - - // Verifying if this node can be considered ineligible for further iterations - if (targetNode != null) { - checkAndAddInEligibleTargetNode(targetNode.getRoutingNode()); - } - } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { - logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); - } - } - } - - /** - * Makes a decision on whether to move a started shard to another node. The following rules apply - * to the {@link MoveDecision} return object: - * 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. - * 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and - * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. - * 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be - * populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then - * {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null. - * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then - * {@link MoveDecision#getNodeDecisions} will have a non-null value. - */ - public MoveDecision decideMove(final ShardRouting shardRouting) { - if (shardRouting.started() == false) { - // we can only move started shards - return MoveDecision.NOT_TAKEN; - } - - final boolean explain = allocation.debugDecision(); - final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - assert sourceNode != null && sourceNode.containsShard(shardRouting); - RoutingNode routingNode = sourceNode.getRoutingNode(); - Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (canRemain.type() != Decision.Type.NO) { - return MoveDecision.stay(canRemain); - } - - sorter.reset(shardRouting.getIndexName()); - /* - * the sorter holds the minimum weight node first for the shards index. - * We now walk through the nodes until we find a node to allocate the shard. - * This is not guaranteed to be balanced after this operation we still try best effort to - * allocate on the minimal eligible node. - */ - Type bestDecision = Type.NO; - RoutingNode targetNode = null; - final List nodeExplanationMap = explain ? new ArrayList<>() : null; - int weightRanking = 0; - int targetNodeProcessed = 0; - for (ModelNode currentNode : sorter.modelNodes) { - if (currentNode != sourceNode) { - RoutingNode target = currentNode.getRoutingNode(); - if (!explain && inEligibleTargetNode.contains(target)) continue; - // don't use canRebalance as we want hard filtering rules to apply. See #17698 - if (!explain) { - // If we cannot allocate any shard to node marking it in eligible - Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(target, allocation); - if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { - inEligibleTargetNode.add(currentNode.getRoutingNode()); - continue; - } - } - targetNodeProcessed++; - // don't use canRebalance as we want hard filtering rules to apply. See #17698 - Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); - if (explain) { - nodeExplanationMap.add( - new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking) - ); - } - // TODO maybe we can respect throttling here too? - if (allocationDecision.type().higherThan(bestDecision)) { - bestDecision = allocationDecision.type(); - if (bestDecision == Type.YES) { - targetNode = target; - if (explain == false) { - // we are not in explain mode and already have a YES decision on the best weighted node, - // no need to continue iterating - break; - } - } - } - } - } - - return MoveDecision.cannotRemain( - canRemain, - AllocationDecision.fromDecisionType(bestDecision), - targetNode != null ? targetNode.node() : null, - nodeExplanationMap - ); - } - - /** - * Builds the internal model from all shards in the given - * {@link Iterable}. All shards in the {@link Iterable} must be assigned - * to a node. This method will skip shards in the state - * {@link ShardRoutingState#RELOCATING} since each relocating shard has - * a shadow shard in the state {@link ShardRoutingState#INITIALIZING} - * on the target node which we respect during the allocation / balancing - * process. In short, this method recreates the status-quo in the cluster. - */ - private Map buildModelFromAssigned() { - Map nodes = new HashMap<>(); - for (RoutingNode rn : routingNodes) { - ModelNode node = new ModelNode(rn); - nodes.put(rn.nodeId(), node); - for (ShardRouting shard : rn) { - assert rn.nodeId().equals(shard.currentNodeId()); - /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ - if (shard.state() != RELOCATING) { - node.addShard(shard); - if (logger.isTraceEnabled()) { - logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); - } - } - } - } - return nodes; - } - - /** - * Allocates all given shards on the minimal eligible node for the shards index - * with respect to the weight function. All given shards must be unassigned. - */ - private void allocateUnassigned() { - RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); - assert !nodes.isEmpty(); - if (logger.isTraceEnabled()) { - logger.trace("Start allocating unassigned shards"); - } - if (unassigned.isEmpty()) { - return; - } - - /* - * TODO: We could be smarter here and group the shards by index and then - * use the sorter to save some iterations. - */ - final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation); - final Comparator comparator = (o1, o2) -> { - if (o1.primary() ^ o2.primary()) { - return o1.primary() ? -1 : 1; - } - final int indexCmp; - if ((indexCmp = o1.getIndexName().compareTo(o2.getIndexName())) == 0) { - return o1.getId() - o2.getId(); - } - // this comparator is more expensive than all the others up there - // that's why it's added last even though it could be easier to read - // if we'd apply it earlier. this comparator will only differentiate across - // indices all shards of the same index is treated equally. - final int secondary = secondaryComparator.compare(o1, o2); - return secondary == 0 ? indexCmp : secondary; - }; - /* - * we use 2 arrays and move replicas to the second array once we allocated an identical - * replica in the current iteration to make sure all indices get allocated in the same manner. - * The arrays are sorted by primaries first and then by index and shard ID so a 2 indices with - * 2 replica and 1 shard would look like: - * [(0,P,IDX1), (0,P,IDX2), (0,R,IDX1), (0,R,IDX1), (0,R,IDX2), (0,R,IDX2)] - * if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with - * the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned. - */ - ShardRouting[] primary = unassigned.drain(); - ShardRouting[] secondary = new ShardRouting[primary.length]; - int secondaryLength = 0; - int primaryLength = primary.length; - ArrayUtil.timSort(primary, comparator); - do { - for (int i = 0; i < primaryLength; i++) { - ShardRouting shard = primary[i]; - final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard); - final String assignedNodeId = allocationDecision.getTargetNode() != null - ? allocationDecision.getTargetNode().getId() - : null; - final ModelNode minNode = assignedNodeId != null ? nodes.get(assignedNodeId) : null; - - if (allocationDecision.getAllocationDecision() == AllocationDecision.YES) { - if (logger.isTraceEnabled()) { - logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); - } - - final long shardSize = DiskThresholdDecider.getExpectedShardSize( - shard, - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, - allocation.clusterInfo(), - allocation.snapshotShardSizeInfo(), - allocation.metadata(), - allocation.routingTable() - ); - shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); - minNode.addShard(shard); - if (!shard.primary()) { - // copy over the same replica shards to the secondary array so they will get allocated - // in a subsequent iteration, allowing replicas of other shards to be allocated first - while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) { - secondary[secondaryLength++] = primary[++i]; - } - } - } else { - // did *not* receive a YES decision - if (logger.isTraceEnabled()) { - logger.trace( - "No eligible node found to assign shard [{}] allocation_status [{}]", - shard, - allocationDecision.getAllocationStatus() - ); - } - - if (minNode != null) { - // throttle decision scenario - assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED; - final long shardSize = DiskThresholdDecider.getExpectedShardSize( - shard, - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, - allocation.clusterInfo(), - allocation.snapshotShardSizeInfo(), - allocation.metadata(), - allocation.routingTable() - ); - minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize)); - } else { - if (logger.isTraceEnabled()) { - logger.trace("No Node found to assign shard [{}]", shard); - } - } - - unassigned.ignoreShard(shard, allocationDecision.getAllocationStatus(), allocation.changes()); - if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas - while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) { - unassigned.ignoreShard(primary[++i], allocationDecision.getAllocationStatus(), allocation.changes()); - } - } - } - } - primaryLength = secondaryLength; - ShardRouting[] tmp = primary; - primary = secondary; - secondary = tmp; - secondaryLength = 0; - } while (primaryLength > 0); - // clear everything we have either added it or moved to ignoreUnassigned - } - - /** - * Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the - * first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the - * {@link ModelNode} representing the node that the shard should be assigned to. If the decision returned - * is of type {@link Type#NO}, then the assigned node will be null. - */ - private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { - if (shard.assignedToNode()) { - // we only make decisions for unassigned shards here - return AllocateUnassignedDecision.NOT_TAKEN; - } - - final boolean explain = allocation.debugDecision(); - Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation); - if (shardLevelDecision.type() == Type.NO && explain == false) { - // NO decision for allocating the shard, irrespective of any particular node, so exit early - return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, null); - } - - /* find an node with minimal weight we can allocate on*/ - float minWeight = Float.POSITIVE_INFINITY; - ModelNode minNode = null; - Decision decision = null; - /* Don't iterate over an identity hashset here the - * iteration order is different for each run and makes testing hard */ - Map nodeExplanationMap = explain ? new HashMap<>() : null; - List> nodeWeights = explain ? new ArrayList<>() : null; - for (ModelNode node : nodes.values()) { - if (node.containsShard(shard) && explain == false) { - // decision is NO without needing to check anything further, so short circuit - continue; - } - - // weight of this index currently on the node - float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName()); - // moving the shard would not improve the balance, and we are not in explain mode, so short circuit - if (currentWeight > minWeight && explain == false) { - continue; - } - - Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation); - if (explain) { - nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); - nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); - } - if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { - final boolean updateMinNode; - if (currentWeight == minWeight) { - /* we have an equal weight tie breaking: - * 1. if one decision is YES prefer it - * 2. prefer the node that holds the primary for this index with the next id in the ring ie. - * for the 3 shards 2 replica case we try to build up: - * 1 2 0 - * 2 0 1 - * 0 1 2 - * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater - * than the id of the shard we need to assign. This works find when new indices are created since - * primaries are added first and we only add one shard set a time in this algorithm. - */ - if (currentDecision.type() == decision.type()) { - final int repId = shard.id(); - final int nodeHigh = node.highestPrimary(shard.index().getName()); - final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); - updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) - && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); - } else { - updateMinNode = currentDecision.type() == Type.YES; - } - } else { - updateMinNode = currentWeight < minWeight; - } - if (updateMinNode) { - minNode = node; - minWeight = currentWeight; - decision = currentDecision; - } - } - } - if (decision == null) { - // decision was not set and a node was not assigned, so treat it as a NO decision - decision = Decision.NO; - } - List nodeDecisions = null; - if (explain) { - nodeDecisions = new ArrayList<>(); - // fill in the correct weight ranking, once we've been through all nodes - nodeWeights.sort((nodeWeight1, nodeWeight2) -> Float.compare(nodeWeight1.v2(), nodeWeight2.v2())); - int weightRanking = 0; - for (Tuple nodeWeight : nodeWeights) { - NodeAllocationResult current = nodeExplanationMap.get(nodeWeight.v1()); - nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking)); - } - } - return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.routingNode.node() : null, nodeDecisions); - } - - private static final Comparator BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed(); - - /** - * Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the - * balance model. Iff this method returns a true the relocation has already been executed on the - * simulation model as well as on the cluster. - */ - private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx) { - final ModelIndex index = maxNode.getIndex(idx); - if (index != null) { - logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); - final Iterable shardRoutings = StreamSupport.stream(index.spliterator(), false) - .filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway - .filter(maxNode::containsShard) - .sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic - ::iterator; - - final AllocationDeciders deciders = allocation.deciders(); - for (ShardRouting shard : shardRoutings) { - final Decision rebalanceDecision = deciders.canRebalance(shard, allocation); - if (rebalanceDecision.type() == Type.NO) { - continue; - } - final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation); - if (allocationDecision.type() == Type.NO) { - continue; - } - - final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); - - maxNode.removeShard(shard); - long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); - - if (decision.type() == Type.YES) { - /* only allocate on the cluster if we are not throttled */ - logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId()); - minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1()); - return true; - } else { - /* allocate on the model even if throttled */ - logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId()); - assert decision.type() == Type.THROTTLE; - minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize)); - return false; - } - } - } - logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); - return false; - } - - } - /** * A model node. * @@ -1277,6 +353,25 @@ public boolean containsShard(ShardRouting shard) { } + /** + * A {@link Balancer} used by the {@link BalancedShardsAllocator} to perform allocation operations + * @deprecated As of 2.4.0, replaced by {@link LocalShardsBalancer} + * + * @opensearch.internal + */ + @Deprecated + public static class Balancer extends LocalShardsBalancer { + public Balancer( + Logger logger, + RoutingAllocation allocation, + boolean movePrimaryFirst, + BalancedShardsAllocator.WeightFunction weight, + float threshold + ) { + super(logger, allocation, movePrimaryFirst, weight, threshold); + } + } + /** * A model index. * @@ -1346,10 +441,10 @@ static final class NodeSorter extends IntroSorter { final float[] weights; private final WeightFunction function; private String index; - private final Balancer balancer; + private final ShardsBalancer balancer; private float pivotWeight; - NodeSorter(ModelNode[] modelNodes, WeightFunction function, Balancer balancer) { + NodeSorter(ModelNode[] modelNodes, WeightFunction function, ShardsBalancer balancer) { this.function = function; this.balancer = balancer; this.modelNodes = modelNodes; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java new file mode 100644 index 0000000000000..53d7c827392d5 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -0,0 +1,967 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IntroSorter; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.AllocationDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; +import org.opensearch.cluster.routing.allocation.NodeAllocationResult; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.common.collect.Tuple; +import org.opensearch.gateway.PriorityComparator; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.StreamSupport; + +import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; + +/** + * A {@link LocalShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations + * for local shards within the cluster. + * + * @opensearch.internal + */ +public class LocalShardsBalancer extends ShardsBalancer { + private final Logger logger; + private final Map nodes; + private final RoutingAllocation allocation; + private final RoutingNodes routingNodes; + private final boolean movePrimaryFirst; + private final BalancedShardsAllocator.WeightFunction weight; + + private final float threshold; + private final Metadata metadata; + private final float avgShardsPerNode; + private final BalancedShardsAllocator.NodeSorter sorter; + private final Set inEligibleTargetNode; + + public LocalShardsBalancer( + Logger logger, + RoutingAllocation allocation, + boolean movePrimaryFirst, + BalancedShardsAllocator.WeightFunction weight, + float threshold + ) { + this.logger = logger; + this.allocation = allocation; + this.movePrimaryFirst = movePrimaryFirst; + this.weight = weight; + this.threshold = threshold; + this.routingNodes = allocation.routingNodes(); + this.metadata = allocation.metadata(); + avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); + nodes = Collections.unmodifiableMap(buildModelFromAssigned()); + sorter = newNodeSorter(); + inEligibleTargetNode = new HashSet<>(); + } + + /** + * Returns an array view on the nodes in the balancer. Nodes should not be removed from this list. + */ + private BalancedShardsAllocator.ModelNode[] nodesArray() { + return nodes.values().toArray(new BalancedShardsAllocator.ModelNode[nodes.size()]); + } + + /** + * Returns the average of shards per node for the given index + */ + @Override + public float avgShardsPerNode(String index) { + return ((float) metadata.index(index).getTotalNumberOfShards()) / nodes.size(); + } + + /** + * Returns the global average of shards per node + */ + @Override + public float avgShardsPerNode() { + return avgShardsPerNode; + } + + /** + * Returns a new {@link BalancedShardsAllocator.NodeSorter} that sorts the nodes based on their + * current weight with respect to the index passed to the sorter. The + * returned sorter is not sorted. Use {@link BalancedShardsAllocator.NodeSorter#reset(String)} + * to sort based on an index. + */ + private BalancedShardsAllocator.NodeSorter newNodeSorter() { + return new BalancedShardsAllocator.NodeSorter(nodesArray(), weight, this); + } + + /** + * The absolute value difference between two weights. + */ + private static float absDelta(float lower, float higher) { + assert higher >= lower : higher + " lt " + lower + " but was expected to be gte"; + return Math.abs(higher - lower); + } + + /** + * Returns {@code true} iff the weight delta between two nodes is under a defined threshold. + * See {@link BalancedShardsAllocator#THRESHOLD_SETTING} for defining the threshold. + */ + private static boolean lessThan(float delta, float threshold) { + /* deltas close to the threshold are "rounded" to the threshold manually + to prevent floating point problems if the delta is very close to the + threshold ie. 1.000000002 which can trigger unnecessary balance actions*/ + return delta <= (threshold + 0.001f); + } + + /** + * Balances the nodes on the cluster model according to the weight function. + * The actual balancing is delegated to {@link #balanceByWeights()} + */ + @Override + void balance() { + if (logger.isTraceEnabled()) { + logger.trace("Start balancing cluster"); + } + if (allocation.hasPendingAsyncFetch()) { + /* + * see https://github.com/elastic/elasticsearch/issues/14387 + * if we allow rebalance operations while we are still fetching shard store data + * we might end up with unnecessary rebalance operations which can be super confusion/frustrating + * since once the fetches come back we might just move all the shards back again. + * Therefore we only do a rebalance if we have fetched all information. + */ + logger.debug("skipping rebalance due to in-flight shard/store fetches"); + return; + } + if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { + logger.trace("skipping rebalance as it is disabled"); + return; + } + if (nodes.size() < 2) { /* skip if we only have one node */ + logger.trace("skipping rebalance as single node only"); + return; + } + balanceByWeights(); + } + + /** + * Makes a decision about moving a single shard to a different node to form a more + * optimally balanced cluster. This method is invoked from the cluster allocation + * explain API only. + */ + @Override + MoveDecision decideRebalance(final ShardRouting shard) { + if (shard.started() == false) { + // we can only rebalance started shards + return MoveDecision.NOT_TAKEN; + } + + Decision canRebalance = allocation.deciders().canRebalance(shard, allocation); + + sorter.reset(shard.getIndexName()); + BalancedShardsAllocator.ModelNode[] modelNodes = sorter.modelNodes; + final String currentNodeId = shard.currentNodeId(); + // find currently assigned node + BalancedShardsAllocator.ModelNode currentNode = null; + for (BalancedShardsAllocator.ModelNode node : modelNodes) { + if (node.getNodeId().equals(currentNodeId)) { + currentNode = node; + break; + } + } + assert currentNode != null : "currently assigned node could not be found"; + + // balance the shard, if a better node can be found + final String idxName = shard.getIndexName(); + final float currentWeight = weight.weight(this, currentNode, idxName); + final AllocationDeciders deciders = allocation.deciders(); + Decision.Type rebalanceDecisionType = Decision.Type.NO; + BalancedShardsAllocator.ModelNode assignedNode = null; + List> betterBalanceNodes = new ArrayList<>(); + List> sameBalanceNodes = new ArrayList<>(); + List> worseBalanceNodes = new ArrayList<>(); + for (BalancedShardsAllocator.ModelNode node : modelNodes) { + if (node == currentNode) { + continue; // skip over node we're currently allocated to + } + final Decision canAllocate = deciders.canAllocate(shard, node.getRoutingNode(), allocation); + // the current weight of the node in the cluster, as computed by the weight function; + // this is a comparison of the number of shards on this node to the number of shards + // that should be on each node on average (both taking the cluster as a whole into account + // as well as shards per index) + final float nodeWeight = weight.weight(this, node, idxName); + // if the node we are examining has a worse (higher) weight than the node the shard is + // assigned to, then there is no way moving the shard to the node with the worse weight + // can make the balance of the cluster better, so we check for that here + final boolean betterWeightThanCurrent = nodeWeight <= currentWeight; + boolean rebalanceConditionsMet = false; + if (betterWeightThanCurrent) { + // get the delta between the weights of the node we are checking and the node that holds the shard + float currentDelta = absDelta(nodeWeight, currentWeight); + // checks if the weight delta is above a certain threshold; if it is not above a certain threshold, + // then even though the node we are examining has a better weight and may make the cluster balance + // more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless + // the gains make it worth it, as defined by the threshold + boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false; + // calculate the delta of the weights of the two nodes if we were to add the shard to the + // node in question and move it away from the node that currently holds it. + // hence we add 2.0f to the weight delta + float proposedDelta = 2.0f + nodeWeight - currentWeight; + boolean betterWeightWithShardAdded = proposedDelta < currentDelta; + + rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded; + // if the simulated weight delta with the shard moved away is better than the weight delta + // with the shard remaining on the current node, and we are allowed to allocate to the + // node in question, then allow the rebalance + if (rebalanceConditionsMet && canAllocate.type().higherThan(rebalanceDecisionType)) { + // rebalance to the node, only will get overwritten if the decision here is to + // THROTTLE and we get a decision with YES on another node + rebalanceDecisionType = canAllocate.type(); + assignedNode = node; + } + } + Tuple nodeResult = Tuple.tuple(node, canAllocate); + if (rebalanceConditionsMet) { + betterBalanceNodes.add(nodeResult); + } else if (betterWeightThanCurrent) { + sameBalanceNodes.add(nodeResult); + } else { + worseBalanceNodes.add(nodeResult); + } + } + + int weightRanking = 0; + List nodeDecisions = new ArrayList<>(modelNodes.length - 1); + for (Tuple result : betterBalanceNodes) { + nodeDecisions.add( + new NodeAllocationResult( + result.v1().getRoutingNode().node(), + AllocationDecision.fromDecisionType(result.v2().type()), + result.v2(), + ++weightRanking + ) + ); + } + int currentNodeWeightRanking = ++weightRanking; + for (Tuple result : sameBalanceNodes) { + AllocationDecision nodeDecision = result.v2().type() == Decision.Type.NO + ? AllocationDecision.NO + : AllocationDecision.WORSE_BALANCE; + nodeDecisions.add( + new NodeAllocationResult(result.v1().getRoutingNode().node(), nodeDecision, result.v2(), currentNodeWeightRanking) + ); + } + for (Tuple result : worseBalanceNodes) { + AllocationDecision nodeDecision = result.v2().type() == Decision.Type.NO + ? AllocationDecision.NO + : AllocationDecision.WORSE_BALANCE; + nodeDecisions.add(new NodeAllocationResult(result.v1().getRoutingNode().node(), nodeDecision, result.v2(), ++weightRanking)); + } + + if (canRebalance.type() != Decision.Type.YES || allocation.hasPendingAsyncFetch()) { + AllocationDecision allocationDecision = allocation.hasPendingAsyncFetch() + ? AllocationDecision.AWAITING_INFO + : AllocationDecision.fromDecisionType(canRebalance.type()); + return MoveDecision.cannotRebalance(canRebalance, allocationDecision, currentNodeWeightRanking, nodeDecisions); + } else { + return MoveDecision.rebalance( + canRebalance, + AllocationDecision.fromDecisionType(rebalanceDecisionType), + assignedNode != null ? assignedNode.getRoutingNode().node() : null, + currentNodeWeightRanking, + nodeDecisions + ); + } + } + + /** + * Balances the nodes on the cluster model according to the weight + * function. The configured threshold is the minimum delta between the + * weight of the maximum node and the minimum node according to the + * {@link BalancedShardsAllocator.WeightFunction}. This weight is calculated per index to + * distribute shards evenly per index. The balancer tries to relocate + * shards only if the delta exceeds the threshold. In the default case + * the threshold is set to {@code 1.0} to enforce gaining relocation + * only, or in other words relocations that move the weight delta closer + * to {@code 0.0} + */ + private void balanceByWeights() { + final AllocationDeciders deciders = allocation.deciders(); + final BalancedShardsAllocator.ModelNode[] modelNodes = sorter.modelNodes; + final float[] weights = sorter.weights; + for (String index : buildWeightOrderedIndices()) { + IndexMetadata indexMetadata = metadata.index(index); + + // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to, + // move these nodes to the front of modelNodes so that we can only balance based on these nodes + int relevantNodes = 0; + for (int i = 0; i < modelNodes.length; i++) { + BalancedShardsAllocator.ModelNode modelNode = modelNodes[i]; + if (modelNode.getIndex(index) != null + || deciders.canAllocate(indexMetadata, modelNode.getRoutingNode(), allocation).type() != Decision.Type.NO) { + // swap nodes at position i and relevantNodes + modelNodes[i] = modelNodes[relevantNodes]; + modelNodes[relevantNodes] = modelNode; + relevantNodes++; + } + } + + if (relevantNodes < 2) { + continue; + } + + sorter.reset(index, 0, relevantNodes); + int lowIdx = 0; + int highIdx = relevantNodes - 1; + while (true) { + final BalancedShardsAllocator.ModelNode minNode = modelNodes[lowIdx]; + final BalancedShardsAllocator.ModelNode maxNode = modelNodes[highIdx]; + advance_range: if (maxNode.numShards(index) > 0) { + final float delta = absDelta(weights[lowIdx], weights[highIdx]); + if (lessThan(delta, threshold)) { + if (lowIdx > 0 + && highIdx - 1 > 0 // is there a chance for a higher delta? + && (absDelta(weights[0], weights[highIdx - 1]) > threshold) // check if we need to break at all + ) { + /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible + * due to some allocation decider restrictions like zone awareness. if one zone has for instance + * less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we + * can't move to the "lighter" shards since otherwise the zone would go over capacity. + * + * This break jumps straight to the condition below were we start moving from the high index towards + * the low index to shrink the window we are considering for balance from the other direction. + * (check shrinking the window from MAX to MIN) + * See #3580 + */ + break advance_range; + } + if (logger.isTraceEnabled()) { + logger.trace( + "Stop balancing index [{}] min_node [{}] weight: [{}]" + " max_node [{}] weight: [{}] delta: [{}]", + index, + maxNode.getNodeId(), + weights[highIdx], + minNode.getNodeId(), + weights[lowIdx], + delta + ); + } + break; + } + if (logger.isTraceEnabled()) { + logger.trace( + "Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]", + maxNode.getNodeId(), + weights[highIdx], + minNode.getNodeId(), + weights[lowIdx], + delta + ); + } + if (delta <= 1.0f) { + /* + * prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the + * balance if we only achieve the same delta the relocation is useless + * + * NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We + * already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never + * hit this case anyway. + */ + logger.trace( + "Couldn't find shard to relocate from node [{}] to node [{}]", + maxNode.getNodeId(), + minNode.getNodeId() + ); + } else if (tryRelocateShard(minNode, maxNode, index)) { + /* + * TODO we could be a bit smarter here, we don't need to fully sort necessarily + * we could just find the place to insert linearly but the win might be minor + * compared to the added complexity + */ + weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); + weights[highIdx] = sorter.weight(modelNodes[highIdx]); + sorter.sort(0, relevantNodes); + lowIdx = 0; + highIdx = relevantNodes - 1; + continue; + } + } + if (lowIdx < highIdx - 1) { + /* Shrinking the window from MIN to MAX + * we can't move from any shard from the min node lets move on to the next node + * and see if the threshold still holds. We either don't have any shard of this + * index on this node of allocation deciders prevent any relocation.*/ + lowIdx++; + } else if (lowIdx > 0) { + /* Shrinking the window from MAX to MIN + * now we go max to min since obviously we can't move anything to the max node + * lets pick the next highest */ + lowIdx = 0; + highIdx--; + } else { + /* we are done here, we either can't relocate anymore or we are balanced */ + break; + } + } + } + } + + /** + * This builds a initial index ordering where the indices are returned + * in most unbalanced first. We need this in order to prevent over + * allocations on added nodes from one index when the weight parameters + * for global balance overrule the index balance at an intermediate + * state. For example this can happen if we have 3 nodes and 3 indices + * with 3 primary and 1 replica shards. At the first stage all three nodes hold + * 2 shard for each index. Now we add another node and the first index + * is balanced moving three shards from two of the nodes over to the new node since it + * has no shards yet and global balance for the node is way below + * average. To re-balance we need to move shards back eventually likely + * to the nodes we relocated them from. + */ + private String[] buildWeightOrderedIndices() { + final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); + final float[] deltas = new float[indices.length]; + for (int i = 0; i < deltas.length; i++) { + sorter.reset(indices[i]); + deltas[i] = sorter.delta(); + } + new IntroSorter() { + + float pivotWeight; + + @Override + protected void swap(int i, int j) { + final String tmpIdx = indices[i]; + indices[i] = indices[j]; + indices[j] = tmpIdx; + final float tmpDelta = deltas[i]; + deltas[i] = deltas[j]; + deltas[j] = tmpDelta; + } + + @Override + protected int compare(int i, int j) { + return Float.compare(deltas[j], deltas[i]); + } + + @Override + protected void setPivot(int i) { + pivotWeight = deltas[i]; + } + + @Override + protected int comparePivot(int j) { + return Float.compare(deltas[j], pivotWeight); + } + }.sort(0, deltas.length); + + return indices; + } + + /** + * Checks if target node is ineligible and if so, adds to the list + * of ineligible target nodes + */ + private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { + Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { + inEligibleTargetNode.add(targetNode); + } + } + + /** + * Move started shards that can not be allocated to a node anymore + * + * For each shard to be moved this function executes a move operation + * to the minimal eligible node with respect to the + * weight function. If a shard is moved the shard will be set to + * {@link ShardRoutingState#RELOCATING} and a shadow instance of this + * shard is created with an incremented version in the state + * {@link ShardRoutingState#INITIALIZING}. + */ + @Override + void moveShards() { + // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling + // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are + // offloading the shards. + + // Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes + // when no target is eligible + for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) { + checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); + } + boolean primariesThrottled = false; + for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) { + // Verify if the cluster concurrent recoveries have been reached. + if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { + logger.info( + "Cannot move any shard in the cluster due to cluster concurrent recoveries getting breached" + + ". Skipping shard iteration" + ); + return; + } + // Early terminate node interleaved shard iteration when no eligible target nodes are available + if (sorter.modelNodes.length == inEligibleTargetNode.size()) { + logger.info( + "Cannot move any shard in the cluster as there is no node on which shards can be allocated" + + ". Skipping shard iteration" + ); + return; + } + + ShardRouting shardRouting = it.next(); + + // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled + if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { + logger.info( + "Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards" + + "are being throttled. Skipping shard iteration" + ); + return; + } + + // Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard + // is not being throttled. + Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation); + if (canMoveAwayDecision.type() != Decision.Type.YES) { + if (logger.isDebugEnabled()) logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting); + if (shardRouting.primary() && canMoveAwayDecision.type() == Decision.Type.THROTTLE) { + primariesThrottled = true; + } + continue; + } + + final MoveDecision moveDecision = decideMove(shardRouting); + if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { + final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + final BalancedShardsAllocator.ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); + sourceNode.removeShard(shardRouting); + Tuple relocatingShards = routingNodes.relocateShard( + shardRouting, + targetNode.getNodeId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + allocation.changes() + ); + targetNode.addShard(relocatingShards.v2()); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); + } + + // Verifying if this node can be considered ineligible for further iterations + if (targetNode != null) { + checkAndAddInEligibleTargetNode(targetNode.getRoutingNode()); + } + } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { + logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + } + } + } + + /** + * Makes a decision on whether to move a started shard to another node. The following rules apply + * to the {@link MoveDecision} return object: + * 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. + * 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and + * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. + * 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be + * populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then + * {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null. + * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then + * {@link MoveDecision#getNodeDecisions} will have a non-null value. + */ + @Override + MoveDecision decideMove(final ShardRouting shardRouting) { + if (shardRouting.started() == false) { + // we can only move started shards + return MoveDecision.NOT_TAKEN; + } + + final boolean explain = allocation.debugDecision(); + final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + assert sourceNode != null && sourceNode.containsShard(shardRouting); + RoutingNode routingNode = sourceNode.getRoutingNode(); + Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if (canRemain.type() != Decision.Type.NO) { + return MoveDecision.stay(canRemain); + } + + sorter.reset(shardRouting.getIndexName()); + /* + * the sorter holds the minimum weight node first for the shards index. + * We now walk through the nodes until we find a node to allocate the shard. + * This is not guaranteed to be balanced after this operation we still try best effort to + * allocate on the minimal eligible node. + */ + Decision.Type bestDecision = Decision.Type.NO; + RoutingNode targetNode = null; + final List nodeExplanationMap = explain ? new ArrayList<>() : null; + int weightRanking = 0; + int targetNodeProcessed = 0; + for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) { + if (currentNode != sourceNode) { + RoutingNode target = currentNode.getRoutingNode(); + if (!explain && inEligibleTargetNode.contains(target)) continue; + // don't use canRebalance as we want hard filtering rules to apply. See #17698 + if (!explain) { + // If we cannot allocate any shard to node marking it in eligible + Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(target, allocation); + if (nodeLevelAllocationDecision.type() != Decision.Type.YES) { + inEligibleTargetNode.add(currentNode.getRoutingNode()); + continue; + } + } + targetNodeProcessed++; + // don't use canRebalance as we want hard filtering rules to apply. See #17698 + Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); + if (explain) { + nodeExplanationMap.add( + new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking) + ); + } + // TODO maybe we can respect throttling here too? + if (allocationDecision.type().higherThan(bestDecision)) { + bestDecision = allocationDecision.type(); + if (bestDecision == Decision.Type.YES) { + targetNode = target; + if (explain == false) { + // we are not in explain mode and already have a YES decision on the best weighted node, + // no need to continue iterating + break; + } + } + } + } + } + + return MoveDecision.cannotRemain( + canRemain, + AllocationDecision.fromDecisionType(bestDecision), + targetNode != null ? targetNode.node() : null, + nodeExplanationMap + ); + } + + /** + * Builds the internal model from all shards in the given + * {@link Iterable}. All shards in the {@link Iterable} must be assigned + * to a node. This method will skip shards in the state + * {@link ShardRoutingState#RELOCATING} since each relocating shard has + * a shadow shard in the state {@link ShardRoutingState#INITIALIZING} + * on the target node which we respect during the allocation / balancing + * process. In short, this method recreates the status-quo in the cluster. + */ + private Map buildModelFromAssigned() { + Map nodes = new HashMap<>(); + for (RoutingNode rn : routingNodes) { + BalancedShardsAllocator.ModelNode node = new BalancedShardsAllocator.ModelNode(rn); + nodes.put(rn.nodeId(), node); + for (ShardRouting shard : rn) { + assert rn.nodeId().equals(shard.currentNodeId()); + /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ + if (shard.state() != RELOCATING) { + node.addShard(shard); + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); + } + } + } + } + return nodes; + } + + /** + * Allocates all given shards on the minimal eligible node for the shards index + * with respect to the weight function. All given shards must be unassigned. + */ + @Override + void allocateUnassigned() { + RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); + assert !nodes.isEmpty(); + if (logger.isTraceEnabled()) { + logger.trace("Start allocating unassigned shards"); + } + if (unassigned.isEmpty()) { + return; + } + + /* + * TODO: We could be smarter here and group the shards by index and then + * use the sorter to save some iterations. + */ + final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation); + final Comparator comparator = (o1, o2) -> { + if (o1.primary() ^ o2.primary()) { + return o1.primary() ? -1 : 1; + } + final int indexCmp; + if ((indexCmp = o1.getIndexName().compareTo(o2.getIndexName())) == 0) { + return o1.getId() - o2.getId(); + } + // this comparator is more expensive than all the others up there + // that's why it's added last even though it could be easier to read + // if we'd apply it earlier. this comparator will only differentiate across + // indices all shards of the same index is treated equally. + final int secondary = secondaryComparator.compare(o1, o2); + return secondary == 0 ? indexCmp : secondary; + }; + /* + * we use 2 arrays and move replicas to the second array once we allocated an identical + * replica in the current iteration to make sure all indices get allocated in the same manner. + * The arrays are sorted by primaries first and then by index and shard ID so a 2 indices with + * 2 replica and 1 shard would look like: + * [(0,P,IDX1), (0,P,IDX2), (0,R,IDX1), (0,R,IDX1), (0,R,IDX2), (0,R,IDX2)] + * if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with + * the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned. + */ + ShardRouting[] primary = unassigned.drain(); + ShardRouting[] secondary = new ShardRouting[primary.length]; + int secondaryLength = 0; + int primaryLength = primary.length; + ArrayUtil.timSort(primary, comparator); + do { + for (int i = 0; i < primaryLength; i++) { + ShardRouting shard = primary[i]; + final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard); + final String assignedNodeId = allocationDecision.getTargetNode() != null + ? allocationDecision.getTargetNode().getId() + : null; + final BalancedShardsAllocator.ModelNode minNode = assignedNodeId != null ? nodes.get(assignedNodeId) : null; + + if (allocationDecision.getAllocationDecision() == AllocationDecision.YES) { + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); + } + + final long shardSize = DiskThresholdDecider.getExpectedShardSize( + shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); + minNode.addShard(shard); + if (!shard.primary()) { + // copy over the same replica shards to the secondary array so they will get allocated + // in a subsequent iteration, allowing replicas of other shards to be allocated first + while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) { + secondary[secondaryLength++] = primary[++i]; + } + } + } else { + // did *not* receive a YES decision + if (logger.isTraceEnabled()) { + logger.trace( + "No eligible node found to assign shard [{}] allocation_status [{}]", + shard, + allocationDecision.getAllocationStatus() + ); + } + + if (minNode != null) { + // throttle decision scenario + assert allocationDecision.getAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED; + final long shardSize = DiskThresholdDecider.getExpectedShardSize( + shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize)); + } else { + if (logger.isTraceEnabled()) { + logger.trace("No Node found to assign shard [{}]", shard); + } + } + + unassigned.ignoreShard(shard, allocationDecision.getAllocationStatus(), allocation.changes()); + if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas + while (i < primaryLength - 1 && comparator.compare(primary[i], primary[i + 1]) == 0) { + unassigned.ignoreShard(primary[++i], allocationDecision.getAllocationStatus(), allocation.changes()); + } + } + } + } + primaryLength = secondaryLength; + ShardRouting[] tmp = primary; + primary = secondary; + secondary = tmp; + secondaryLength = 0; + } while (primaryLength > 0); + // clear everything we have either added it or moved to ignoreUnassigned + } + + /** + * Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the + * first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the + * {@link BalancedShardsAllocator.ModelNode} representing the node that the shard should be assigned to. If the decision returned + * is of type {@link Decision.Type#NO}, then the assigned node will be null. + */ + @Override + AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { + if (shard.assignedToNode()) { + // we only make decisions for unassigned shards here + return AllocateUnassignedDecision.NOT_TAKEN; + } + + final boolean explain = allocation.debugDecision(); + Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation); + if (shardLevelDecision.type() == Decision.Type.NO && explain == false) { + // NO decision for allocating the shard, irrespective of any particular node, so exit early + return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.DECIDERS_NO, null); + } + + /* find an node with minimal weight we can allocate on*/ + float minWeight = Float.POSITIVE_INFINITY; + BalancedShardsAllocator.ModelNode minNode = null; + Decision decision = null; + /* Don't iterate over an identity hashset here the + * iteration order is different for each run and makes testing hard */ + Map nodeExplanationMap = explain ? new HashMap<>() : null; + List> nodeWeights = explain ? new ArrayList<>() : null; + for (BalancedShardsAllocator.ModelNode node : nodes.values()) { + if (node.containsShard(shard) && explain == false) { + // decision is NO without needing to check anything further, so short circuit + continue; + } + + // weight of this index currently on the node + float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName()); + // moving the shard would not improve the balance, and we are not in explain mode, so short circuit + if (currentWeight > minWeight && explain == false) { + continue; + } + + Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation); + if (explain) { + nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); + nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); + } + if (currentDecision.type() == Decision.Type.YES || currentDecision.type() == Decision.Type.THROTTLE) { + final boolean updateMinNode; + if (currentWeight == minWeight) { + /* we have an equal weight tie breaking: + * 1. if one decision is YES prefer it + * 2. prefer the node that holds the primary for this index with the next id in the ring ie. + * for the 3 shards 2 replica case we try to build up: + * 1 2 0 + * 2 0 1 + * 0 1 2 + * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater + * than the id of the shard we need to assign. This works find when new indices are created since + * primaries are added first and we only add one shard set a time in this algorithm. + */ + if (currentDecision.type() == decision.type()) { + final int repId = shard.id(); + final int nodeHigh = node.highestPrimary(shard.index().getName()); + final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); + updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) + && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); + } else { + updateMinNode = currentDecision.type() == Decision.Type.YES; + } + } else { + updateMinNode = currentWeight < minWeight; + } + if (updateMinNode) { + minNode = node; + minWeight = currentWeight; + decision = currentDecision; + } + } + } + if (decision == null) { + // decision was not set and a node was not assigned, so treat it as a NO decision + decision = Decision.NO; + } + List nodeDecisions = null; + if (explain) { + nodeDecisions = new ArrayList<>(); + // fill in the correct weight ranking, once we've been through all nodes + nodeWeights.sort((nodeWeight1, nodeWeight2) -> Float.compare(nodeWeight1.v2(), nodeWeight2.v2())); + int weightRanking = 0; + for (Tuple nodeWeight : nodeWeights) { + NodeAllocationResult current = nodeExplanationMap.get(nodeWeight.v1()); + nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking)); + } + } + return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.getRoutingNode().node() : null, nodeDecisions); + } + + private static final Comparator BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed(); + + /** + * Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the + * balance model. Iff this method returns a true the relocation has already been executed on the + * simulation model as well as on the cluster. + */ + private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, BalancedShardsAllocator.ModelNode maxNode, String idx) { + final BalancedShardsAllocator.ModelIndex index = maxNode.getIndex(idx); + if (index != null) { + logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); + final Iterable shardRoutings = StreamSupport.stream(index.spliterator(), false) + .filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway + .filter(maxNode::containsShard) + .sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic + ::iterator; + + final AllocationDeciders deciders = allocation.deciders(); + for (ShardRouting shard : shardRoutings) { + final Decision rebalanceDecision = deciders.canRebalance(shard, allocation); + if (rebalanceDecision.type() == Decision.Type.NO) { + continue; + } + final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation); + if (allocationDecision.type() == Decision.Type.NO) { + continue; + } + + final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); + + maxNode.removeShard(shard); + long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + + if (decision.type() == Decision.Type.YES) { + /* only allocate on the cluster if we are not throttled */ + logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId()); + minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1()); + return true; + } else { + /* allocate on the model even if throttled */ + logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId()); + assert decision.type() == Decision.Type.THROTTLE; + minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize)); + return false; + } + } + } + logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId()); + return false; + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java new file mode 100644 index 0000000000000..593e6998141fb --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; + +/** + *

+ * A {@link ShardsBalancer} helps the {@link BalancedShardsAllocator} to perform allocation and balancing + * operations on the cluster. + *

+ * + * @opensearch.internal + */ +public abstract class ShardsBalancer { + + /** + * Performs allocation of unassigned shards on nodes within the cluster. + */ + abstract void allocateUnassigned(); + + /** + * Moves shards that cannot be allocated to a node anymore. + */ + abstract void moveShards(); + + /** + * Balances the nodes on the cluster model. + */ + abstract void balance(); + + /** + * Make a decision for allocating an unassigned shard. + * @param shardRouting the shard for which the decision has to be made + * @return the allocation decision + */ + abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting); + + /** + * Makes a decision on whether to move a started shard to another node. + * @param shardRouting the shard for which the decision has to be made + * @return a move decision for the shard + */ + abstract MoveDecision decideMove(ShardRouting shardRouting); + + /** + * Makes a decision about moving a single shard to a different node to form a more + * optimally balanced cluster. + * @param shardRouting the shard for which the move decision has to be made + * @return a move decision for the shard + */ + abstract MoveDecision decideRebalance(ShardRouting shardRouting); + + /** + * Returns the average of shards per node for the given index + */ + public float avgShardsPerNode() { + return Float.MAX_VALUE; + } + + /** + * Returns the global average of shards per node + */ + public float avgShardsPerNode(String index) { + return Float.MAX_VALUE; + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index d115ee0c515cc..ae10a92a5104e 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -10,6 +10,8 @@ import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.LocalShardsBalancer; +import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -45,7 +47,7 @@ public void testSettings() { * for IndexShardPerNode constraint satisfied and breached. */ public void testIndexShardsPerNodeConstraint() { - BalancedShardsAllocator.Balancer balancer = mock(BalancedShardsAllocator.Balancer.class); + ShardsBalancer balancer = mock(LocalShardsBalancer.class); BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class); AllocationConstraints constraints = new AllocationConstraints(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java index a7b53a4c4bc8b..d29249cef0818 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalancedSingleShardTests.java @@ -43,7 +43,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer; +import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer; import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; @@ -65,7 +65,7 @@ import static org.hamcrest.Matchers.startsWith; /** - * Tests for balancing a single shard, see {@link Balancer#decideRebalance(ShardRouting)}. + * Tests for balancing a single shard, see {@link ShardsBalancer#decideRebalance(ShardRouting)}. */ public class BalancedSingleShardTests extends OpenSearchAllocationTestCase { From cdc7a2fbe8c06a0ab11b35bd2142ca3ecdc0a0ba Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 18 Oct 2022 12:33:43 +0530 Subject: [PATCH 3/9] Fix decommission status update to non leader nodes (#4800) * Fix decommission status update to non leader nodes Signed-off-by: Rishab Nahata --- CHANGELOG.md | 2 + .../AwarenessAttributeDecommissionIT.java | 164 ++++++++++++++++++ .../DecommissionAttributeMetadata.java | 21 +-- .../decommission/DecommissionController.java | 8 +- 4 files changed, 179 insertions(+), 16 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f6e5425f37db..bc22e66fe11ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -138,7 +138,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fixing Gradle warnings associated with publishPluginZipPublicationToXxx tasks ([#4696](https://github.com/opensearch-project/OpenSearch/pull/4696)) - Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774)) - Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786)) +- Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800)) - Fix recovery path for searchable snapshots ([4813](https://github.com/opensearch-project/OpenSearch/pull/4813)) + ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java new file mode 100644 index 0000000000000..b8318503ee4a5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateAction; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.test.NodeRoles.onlyRole; +import static org.opensearch.test.OpenSearchIntegTestCase.client; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class AwarenessAttributeDecommissionIT extends OpenSearchIntegTestCase { + private final Logger logger = LogManager.getLogger(AwarenessAttributeDecommissionIT.class); + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockTransportService.TestPlugin.class); + } + + @After + public void cleanup() throws Exception { + assertNoTimeout(client().admin().cluster().prepareHealth().get()); + } + + public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + List dataNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + + logger.info("--> starting decommissioning nodes in zone {}", 'c'); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // assert that decommission status is successful + GetDecommissionStateResponse response = client().execute(GetDecommissionStateAction.INSTANCE, new GetDecommissionStateRequest()) + .get(); + assertEquals(response.getDecommissionedAttribute(), decommissionAttribute); + assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); + + ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState(); + assertEquals(4, clusterState.nodes().getSize()); + + // assert status on nodes that are part of cluster currently + Iterator discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt(); + while (discoveryNodeIterator.hasNext()) { + // assert no node has decommissioned attribute + DiscoveryNode node = discoveryNodeIterator.next(); + assertNotEquals(node.getAttributes().get("zone"), "c"); + + // assert all the nodes has status as SUCCESSFUL + ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName()); + assertEquals( + localNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), + DecommissionStatus.SUCCESSFUL + ); + } + + // assert status on decommissioned node + // Here we will verify that until it got kicked out, it received appropriate status updates + // decommissioned nodes hence will have status as IN_PROGRESS as it will be kicked out later after this + // and won't receive status update to SUCCESSFUL + String randomDecommissionedNode = randomFrom(clusterManagerNodes.get(2), dataNodes.get(2)); + ClusterService decommissionedNodeClusterService = internalCluster().getInstance(ClusterService.class, randomDecommissionedNode); + assertEquals( + decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), + DecommissionStatus.IN_PROGRESS + ); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes) + // as by then all nodes should have joined the cluster + ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java index dbb3fea823eb6..d3d508bf36451 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java @@ -79,35 +79,29 @@ public DecommissionStatus status() { /** * Returns instance of the metadata with updated status * @param newStatus status to be updated with - * @return instance with valid status */ // synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe - public synchronized DecommissionAttributeMetadata setUpdatedStatus(DecommissionStatus newStatus) { - // if the current status is the expected status already, we return the same instance - if (newStatus.equals(status)) { - return this; + public synchronized void validateNewStatus(DecommissionStatus newStatus) { + // if the current status is the expected status already or new status is FAILED, we let the check pass + if (newStatus.equals(status) || newStatus.equals(DecommissionStatus.FAILED)) { + return; } // We don't expect that INIT will be new status, as it is registered only when starting the decommission action switch (newStatus) { case IN_PROGRESS: - validateAndSetStatus(DecommissionStatus.INIT, newStatus); + validateStatus(DecommissionStatus.INIT, newStatus); break; case SUCCESSFUL: - validateAndSetStatus(DecommissionStatus.IN_PROGRESS, newStatus); - break; - case FAILED: - // we don't need to validate here and directly update status to FAILED - this.status = newStatus; + validateStatus(DecommissionStatus.IN_PROGRESS, newStatus); break; default: throw new IllegalArgumentException( "illegal decommission status [" + newStatus.status() + "] requested for updating metadata" ); } - return this; } - private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatus next) { + private void validateStatus(DecommissionStatus expected, DecommissionStatus next) { if (status.equals(expected) == false) { assert false : "can't move decommission status to [" + next @@ -120,7 +114,6 @@ private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatu "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])" ); } - status = next; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 7719012f2f3d7..b58d99a9d59db 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -246,8 +246,12 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.status(), decommissionStatus ); - // setUpdatedStatus can throw IllegalStateException if the sequence of update is not valid - decommissionAttributeMetadata.setUpdatedStatus(decommissionStatus); + // validateNewStatus can throw IllegalStateException if the sequence of update is not valid + decommissionAttributeMetadata.validateNewStatus(decommissionStatus); + decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttributeMetadata.decommissionAttribute(), + decommissionStatus + ); return ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); From f3d038f2703f7928dc8b43d9a8cdc0d4ecbd528a Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 18 Oct 2022 18:23:51 +0530 Subject: [PATCH 4/9] Remove redundant field from GetDecommissionStateResponse (#4751) * Add attribute name to query param and simplify GetDecommissionStateResponse Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../cluster.get_decommission_awareness.json | 12 ++- .../get/GetDecommissionStateRequest.java | 42 +++++++- .../GetDecommissionStateRequestBuilder.java | 9 ++ .../get/GetDecommissionStateResponse.java | 101 +++++------------- .../TransportGetDecommissionStateAction.java | 5 +- .../opensearch/client/ClusterAdminClient.java | 6 +- .../client/support/AbstractClient.java | 6 +- .../RestGetDecommissionStateAction.java | 6 +- .../get/GetDecommissionStateRequestTests.java | 50 +++++++++ .../GetDecommissionStateResponseTests.java | 13 +-- 11 files changed, 158 insertions(+), 93 deletions(-) create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index bc22e66fe11ad..646151674f9f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -136,6 +136,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix new race condition in DecommissionControllerTests ([4688](https://github.com/opensearch-project/OpenSearch/pull/4688)) - Fix SearchStats (de)serialization (caused by https://github.com/opensearch-project/OpenSearch/pull/4616) ([#4697](https://github.com/opensearch-project/OpenSearch/pull/4697)) - Fixing Gradle warnings associated with publishPluginZipPublicationToXxx tasks ([#4696](https://github.com/opensearch-project/OpenSearch/pull/4696)) +- [BUG]: Remove redundant field from GetDecommissionStateResponse ([#4751](https://github.com/opensearch-project/OpenSearch/pull/4751)) - Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774)) - Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786)) - Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_decommission_awareness.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_decommission_awareness.json index 430f96921fbc2..302dea4ec31a7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_decommission_awareness.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_decommission_awareness.json @@ -8,10 +8,16 @@ "url": { "paths": [ { - "path": "/_cluster/decommission/awareness/_status", - "methods": [ + "path":"/_cluster/decommission/awareness/{awareness_attribute_name}/_status", + "methods":[ "GET" - ] + ], + "parts":{ + "awareness_attribute_name":{ + "type":"string", + "description":"Awareness attribute name" + } + } } ] } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequest.java index 90150c71bf3f2..1f301aa2b5273 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequest.java @@ -10,11 +10,14 @@ import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; +import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import java.io.IOException; +import static org.opensearch.action.ValidateActions.addValidationError; + /** * Get Decommissioned attribute request * @@ -22,19 +25,56 @@ */ public class GetDecommissionStateRequest extends ClusterManagerNodeReadRequest { + private String attributeName; + public GetDecommissionStateRequest() {} + /** + * Constructs a new get decommission state request with given attribute name + * + * @param attributeName name of the attribute + */ + public GetDecommissionStateRequest(String attributeName) { + this.attributeName = attributeName; + } + public GetDecommissionStateRequest(StreamInput in) throws IOException { super(in); + attributeName = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeString(attributeName); } @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (attributeName == null || Strings.isEmpty(attributeName)) { + validationException = addValidationError("attribute name is missing", validationException); + } + return validationException; + } + + /** + * Sets attribute name + * + * @param attributeName attribute name + * @return this request + */ + public GetDecommissionStateRequest attributeName(String attributeName) { + this.attributeName = attributeName; + return this; + } + + /** + * Returns attribute name + * + * @return attributeName name of attribute + */ + public String attributeName() { + return this.attributeName; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestBuilder.java index 2b8616d0511cd..e766e9c674ff7 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestBuilder.java @@ -27,4 +27,13 @@ public class GetDecommissionStateRequestBuilder extends ClusterManagerNodeReadOp public GetDecommissionStateRequestBuilder(OpenSearchClient client, GetDecommissionStateAction action) { super(client, action, new GetDecommissionStateRequest()); } + + /** + * @param attributeName name of attribute + * @return current object + */ + public GetDecommissionStateRequestBuilder setAttributeName(String attributeName) { + request.attributeName(attributeName); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponse.java index 2034cdb16e40f..ec0bd7cf7e7eb 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponse.java @@ -10,7 +10,6 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionResponse; -import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -31,49 +30,40 @@ */ public class GetDecommissionStateResponse extends ActionResponse implements ToXContentObject { - private DecommissionAttribute decommissionedAttribute; + private String attributeValue; private DecommissionStatus status; GetDecommissionStateResponse() { this(null, null); } - GetDecommissionStateResponse(DecommissionAttribute decommissionedAttribute, DecommissionStatus status) { - this.decommissionedAttribute = decommissionedAttribute; + GetDecommissionStateResponse(String attributeValue, DecommissionStatus status) { + this.attributeValue = attributeValue; this.status = status; } GetDecommissionStateResponse(StreamInput in) throws IOException { // read decommissioned attribute and status only if it is present if (in.readBoolean()) { - this.decommissionedAttribute = new DecommissionAttribute(in); - } - if (in.readBoolean()) { + this.attributeValue = in.readString(); this.status = DecommissionStatus.fromString(in.readString()); } } @Override public void writeTo(StreamOutput out) throws IOException { - // if decommissioned attribute is null, mark absence of decommissioned attribute - if (decommissionedAttribute == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - decommissionedAttribute.writeTo(out); - } - - // if status is null, mark absence of status - if (status == null) { + // if decommissioned attribute value is null or status is null then mark its absence + if (attributeValue == null || status == null) { out.writeBoolean(false); } else { out.writeBoolean(true); + out.writeString(attributeValue); out.writeString(status.status()); } } - public DecommissionAttribute getDecommissionedAttribute() { - return decommissionedAttribute; + public String getAttributeValue() { + return attributeValue; } public DecommissionStatus getDecommissionStatus() { @@ -83,13 +73,8 @@ public DecommissionStatus getDecommissionStatus() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.startObject("awareness"); - if (decommissionedAttribute != null) { - builder.field(decommissionedAttribute.attributeName(), decommissionedAttribute.attributeValue()); - } - builder.endObject(); - if (status != null) { - builder.field("status", status); + if (attributeValue != null && status != null) { + builder.field(attributeValue, status); } builder.endObject(); return builder; @@ -97,58 +82,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static GetDecommissionStateResponse fromXContent(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - String attributeType = "awareness"; XContentParser.Token token; - DecommissionAttribute decommissionAttribute = null; + String attributeValue = null; DecommissionStatus status = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - String currentFieldName = parser.currentName(); - if (attributeType.equals(currentFieldName)) { - if (parser.nextToken() != XContentParser.Token.START_OBJECT) { - throw new OpenSearchParseException( - "failed to parse decommission attribute type [{}], expected object", - attributeType - ); - } - token = parser.nextToken(); - if (token != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - String fieldName = parser.currentName(); - String value; - token = parser.nextToken(); - if (token == XContentParser.Token.VALUE_STRING) { - value = parser.text(); - } else { - throw new OpenSearchParseException( - "failed to parse attribute [{}], expected string for attribute value", - fieldName - ); - } - decommissionAttribute = new DecommissionAttribute(fieldName, value); - parser.nextToken(); - } else { - throw new OpenSearchParseException("failed to parse attribute type [{}], unexpected type", attributeType); - } - } else { - throw new OpenSearchParseException("failed to parse attribute type [{}]", attributeType); - } - } else if ("status".equals(currentFieldName)) { - if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { - throw new OpenSearchParseException( - "failed to parse status of decommissioning, expected string but found unknown type" - ); - } - status = DecommissionStatus.fromString(parser.text().toLowerCase(Locale.ROOT)); - } else { - throw new OpenSearchParseException( - "unknown field found [{}], failed to parse the decommission attribute", - currentFieldName - ); + attributeValue = parser.currentName(); + if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { + throw new OpenSearchParseException("failed to parse status of decommissioning, expected string but found unknown type"); } + status = DecommissionStatus.fromString(parser.text().toLowerCase(Locale.ROOT)); + } else { + throw new OpenSearchParseException( + "failed to parse decommission state, expected [{}] but found [{}]", + XContentParser.Token.FIELD_NAME, + token + ); } } - return new GetDecommissionStateResponse(decommissionAttribute, status); + return new GetDecommissionStateResponse(attributeValue, status); } @Override @@ -156,11 +108,14 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; GetDecommissionStateResponse that = (GetDecommissionStateResponse) o; - return decommissionedAttribute.equals(that.decommissionedAttribute) && status == that.status; + if (!Objects.equals(attributeValue, that.attributeValue)) { + return false; + } + return Objects.equals(status, that.status); } @Override public int hashCode() { - return Objects.hash(decommissionedAttribute, status); + return Objects.hash(attributeValue, status); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java index 48ed13c6c0aaf..d811ab8cf6948 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java @@ -69,10 +69,11 @@ protected void clusterManagerOperation( ActionListener listener ) throws Exception { DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); - if (decommissionAttributeMetadata != null) { + if (decommissionAttributeMetadata != null + && request.attributeName().equals(decommissionAttributeMetadata.decommissionAttribute().attributeName())) { listener.onResponse( new GetDecommissionStateResponse( - decommissionAttributeMetadata.decommissionAttribute(), + decommissionAttributeMetadata.decommissionAttribute().attributeValue(), decommissionAttributeMetadata.status() ) ); diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 77ddb5e17c742..4ab438ec064f1 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -873,17 +873,17 @@ public interface ClusterAdminClient extends OpenSearchClient { /** * Get Decommissioned attribute */ - ActionFuture getDecommission(GetDecommissionStateRequest request); + ActionFuture getDecommissionState(GetDecommissionStateRequest request); /** * Get Decommissioned attribute */ - void getDecommission(GetDecommissionStateRequest request, ActionListener listener); + void getDecommissionState(GetDecommissionStateRequest request, ActionListener listener); /** * Get Decommissioned attribute */ - GetDecommissionStateRequestBuilder prepareGetDecommission(); + GetDecommissionStateRequestBuilder prepareGetDecommissionState(); /** * Deletes the decommission metadata. diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index b42010d4253d5..828ca5f8083ee 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -1417,17 +1417,17 @@ public DecommissionRequestBuilder prepareDecommission(DecommissionRequest reques } @Override - public ActionFuture getDecommission(GetDecommissionStateRequest request) { + public ActionFuture getDecommissionState(GetDecommissionStateRequest request) { return execute(GetDecommissionStateAction.INSTANCE, request); } @Override - public void getDecommission(GetDecommissionStateRequest request, ActionListener listener) { + public void getDecommissionState(GetDecommissionStateRequest request, ActionListener listener) { execute(GetDecommissionStateAction.INSTANCE, request, listener); } @Override - public GetDecommissionStateRequestBuilder prepareGetDecommission() { + public GetDecommissionStateRequestBuilder prepareGetDecommissionState() { return new GetDecommissionStateRequestBuilder(this, GetDecommissionStateAction.INSTANCE); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestGetDecommissionStateAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestGetDecommissionStateAction.java index 8bc89ebf37960..5d72adbd6ae08 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestGetDecommissionStateAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestGetDecommissionStateAction.java @@ -30,7 +30,7 @@ public class RestGetDecommissionStateAction extends BaseRestHandler { @Override public List routes() { - return singletonList(new Route(GET, "/_cluster/decommission/awareness/_status")); + return singletonList(new Route(GET, "/_cluster/decommission/awareness/{awareness_attribute_name}/_status")); } @Override @@ -41,6 +41,8 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { GetDecommissionStateRequest getDecommissionStateRequest = Requests.getDecommissionStateRequest(); - return channel -> client.admin().cluster().getDecommission(getDecommissionStateRequest, new RestToXContentListener<>(channel)); + String attributeName = request.param("awareness_attribute_name"); + getDecommissionStateRequest.attributeName(attributeName); + return channel -> client.admin().cluster().getDecommissionState(getDecommissionStateRequest, new RestToXContentListener<>(channel)); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestTests.java new file mode 100644 index 0000000000000..973485e1917f7 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateRequestTests.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.decommission.awareness.get; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class GetDecommissionStateRequestTests extends OpenSearchTestCase { + public void testSerialization() throws IOException { + String attributeName = "zone"; + final GetDecommissionStateRequest originalRequest = new GetDecommissionStateRequest(attributeName); + final GetDecommissionStateRequest deserialized = copyWriteable( + originalRequest, + writableRegistry(), + GetDecommissionStateRequest::new + ); + assertEquals(deserialized.attributeName(), originalRequest.attributeName()); + } + + public void testValidation() { + { + String attributeName = null; + final GetDecommissionStateRequest request = new GetDecommissionStateRequest(attributeName); + ActionRequestValidationException e = request.validate(); + assertNotNull(e); + assertTrue(e.getMessage().contains("attribute name is missing")); + } + { + String attributeName = ""; + final GetDecommissionStateRequest request = new GetDecommissionStateRequest(attributeName); + ActionRequestValidationException e = request.validate(); + assertNotNull(e); + assertTrue(e.getMessage().contains("attribute name is missing")); + } + { + String attributeName = "zone"; + final GetDecommissionStateRequest request = new GetDecommissionStateRequest(attributeName); + ActionRequestValidationException e = request.validate(); + assertNull(e); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponseTests.java index 97bc54d8d7b30..437faf2a75720 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/get/GetDecommissionStateResponseTests.java @@ -8,7 +8,6 @@ package org.opensearch.action.admin.cluster.decommission.awareness.get; -import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.test.AbstractXContentTestCase; @@ -18,11 +17,13 @@ public class GetDecommissionStateResponseTests extends AbstractXContentTestCase { @Override protected GetDecommissionStateResponse createTestInstance() { - DecommissionStatus status = randomFrom(DecommissionStatus.values()); - String attributeName = randomAlphaOfLength(10); - String attributeValue = randomAlphaOfLength(10); - DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); - return new GetDecommissionStateResponse(decommissionAttribute, status); + DecommissionStatus status = null; + String attributeValue = null; + if (randomBoolean()) { + status = randomFrom(DecommissionStatus.values()); + attributeValue = randomAlphaOfLength(10); + } + return new GetDecommissionStateResponse(attributeValue, status); } @Override From b19429b97d06876408474e11287be1a8e6131d58 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 18 Oct 2022 21:09:01 +0530 Subject: [PATCH 5/9] Fix bug in AwarenessAttributeDecommissionIT (#4822) * Fix bug in AwarenessAttributeDecommissionIT Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../coordination/AwarenessAttributeDecommissionIT.java | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 646151674f9f4..c7f5ad1c95ab4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -141,6 +141,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786)) - Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800)) - Fix recovery path for searchable snapshots ([4813](https://github.com/opensearch-project/OpenSearch/pull/4813)) +- Fix bug in AwarenessAttributeDecommissionIT([4822](https://github.com/opensearch-project/OpenSearch/pull/4822)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index b8318503ee4a5..a2270d63ba6fa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -40,7 +40,6 @@ import java.util.concurrent.ExecutionException; import static org.opensearch.test.NodeRoles.onlyRole; -import static org.opensearch.test.OpenSearchIntegTestCase.client; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -113,9 +112,11 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); // assert that decommission status is successful - GetDecommissionStateResponse response = client().execute(GetDecommissionStateAction.INSTANCE, new GetDecommissionStateRequest()) - .get(); - assertEquals(response.getDecommissionedAttribute(), decommissionAttribute); + GetDecommissionStateResponse response = client().execute( + GetDecommissionStateAction.INSTANCE, + new GetDecommissionStateRequest(decommissionAttribute.attributeName()) + ).get(); + assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue()); assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState(); From f0b759d19fe0b2650cf3e51ae639d621e5c041c7 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 18 Oct 2022 14:04:26 -0700 Subject: [PATCH 6/9] Exclude jettison version brought in with hadoop-minicluster. (#4787) Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + buildSrc/version.properties | 1 + plugins/discovery-azure-classic/build.gradle | 2 +- test/fixtures/hdfs-fixture/build.gradle | 2 ++ 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7f5ad1c95ab4..1304f8bb203d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Bumps `jna` from 5.11.0 to 5.12.1 ([#4656](https://github.com/opensearch-project/OpenSearch/pull/4656)) - Update Jackson Databind to 2.13.4.2 (addressing CVE-2022-42003) ([#4779](https://github.com/opensearch-project/OpenSearch/pull/4779)) - Bumps `tika` from 2.4.0 to 2.5.0 ([#4791](https://github.com/opensearch-project/OpenSearch/pull/4791)) +- Exclude jettison version brought in with hadoop-minicluster. ([#4787](https://github.com/opensearch-project/OpenSearch/pull/4787)) ### Changed - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) - Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 08784c82a4cc4..db0e5b142f7de 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -17,6 +17,7 @@ supercsv = 2.4.0 log4j = 2.17.1 slf4j = 1.7.36 asm = 9.3 +jettison = 1.5.1 # when updating the JNA version, also update the version in buildSrc/build.gradle jna = 5.12.1 diff --git a/plugins/discovery-azure-classic/build.gradle b/plugins/discovery-azure-classic/build.gradle index 8ca9491f834a6..c88d19f0e2806 100644 --- a/plugins/discovery-azure-classic/build.gradle +++ b/plugins/discovery-azure-classic/build.gradle @@ -59,7 +59,7 @@ dependencies { api "com.sun.jersey:jersey-client:${versions.jersey}" api "com.sun.jersey:jersey-core:${versions.jersey}" api "com.sun.jersey:jersey-json:${versions.jersey}" - api 'org.codehaus.jettison:jettison:1.5.1' + api "org.codehaus.jettison:jettison:${versions.jettison}" api 'com.sun.xml.bind:jaxb-impl:2.2.3-1' // HACK: javax.xml.bind was removed from default modules in java 9, so we pull the api in here, diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index d01db6376db42..03455adc8033d 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -35,7 +35,9 @@ group = 'hdfs' dependencies { api("org.apache.hadoop:hadoop-minicluster:3.3.4") { exclude module: 'websocket-client' + exclude module: 'jettison' } + api "org.codehaus.jettison:jettison:${versions.jettison}" api "org.apache.commons:commons-compress:1.21" api "commons-codec:commons-codec:${versions.commonscodec}" api "org.apache.logging.log4j:log4j-core:${versions.log4j}" From fce34ce68d7f86f8550036e9a51ff58223beb130 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 18 Oct 2022 15:59:08 -0700 Subject: [PATCH 7/9] Bump protobuf-java to 3.21.7 in repository-gcs and repository-hdfs. (#4790) Signed-off-by: Marc Handalian Add missing SHAs. Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian --- CHANGELOG.md | 4 +++- plugins/repository-gcs/build.gradle | 2 +- plugins/repository-gcs/licenses/protobuf-java-3.19.3.jar.sha1 | 1 - plugins/repository-gcs/licenses/protobuf-java-3.21.7.jar.sha1 | 1 + plugins/repository-hdfs/build.gradle | 2 +- .../repository-hdfs/licenses/protobuf-java-3.21.4.jar.sha1 | 1 - .../repository-hdfs/licenses/protobuf-java-3.21.7.jar.sha1 | 1 + 7 files changed, 7 insertions(+), 5 deletions(-) delete mode 100644 plugins/repository-gcs/licenses/protobuf-java-3.19.3.jar.sha1 create mode 100644 plugins/repository-gcs/licenses/protobuf-java-3.21.7.jar.sha1 delete mode 100644 plugins/repository-hdfs/licenses/protobuf-java-3.21.4.jar.sha1 create mode 100644 plugins/repository-hdfs/licenses/protobuf-java-3.21.7.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 1304f8bb203d1..15f3e1e5cdd2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Bumps `azure-storage-common` from 12.18.0 to 12.18.1 - Bumps `forbiddenapis` from 3.3 to 3.4 - Bumps `gson` from 2.9.0 to 2.9.1 -- Bumps `protobuf-java` from 3.21.2 to 3.21.7 +- Bumps `protobuf-java` from 3.21.2 to 3.21.7k - Bumps `azure-core` from 1.31.0 to 1.33.0 - Bumps `avro` from 1.11.0 to 1.11.1 - Bumps `woodstox-core` from 6.3.0 to 6.3.1 @@ -60,6 +60,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Update Jackson Databind to 2.13.4.2 (addressing CVE-2022-42003) ([#4779](https://github.com/opensearch-project/OpenSearch/pull/4779)) - Bumps `tika` from 2.4.0 to 2.5.0 ([#4791](https://github.com/opensearch-project/OpenSearch/pull/4791)) - Exclude jettison version brought in with hadoop-minicluster. ([#4787](https://github.com/opensearch-project/OpenSearch/pull/4787)) +- Bump protobuf-java to 3.21.7 in repository-gcs and repository-hdfs ([#]()) + ### Changed - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) - Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240)) diff --git a/plugins/repository-gcs/build.gradle b/plugins/repository-gcs/build.gradle index 097e96fcd8fdc..05e879547a4b0 100644 --- a/plugins/repository-gcs/build.gradle +++ b/plugins/repository-gcs/build.gradle @@ -66,7 +66,7 @@ dependencies { api 'com.google.api:gax:2.17.0' api 'org.threeten:threetenbp:1.4.4' api 'com.google.protobuf:protobuf-java-util:3.20.0' - api 'com.google.protobuf:protobuf-java:3.19.3' + api 'com.google.protobuf:protobuf-java:3.21.7' api 'com.google.code.gson:gson:2.9.0' api 'com.google.api.grpc:proto-google-common-protos:2.8.0' api 'com.google.api.grpc:proto-google-iam-v1:0.12.0' diff --git a/plugins/repository-gcs/licenses/protobuf-java-3.19.3.jar.sha1 b/plugins/repository-gcs/licenses/protobuf-java-3.19.3.jar.sha1 deleted file mode 100644 index 655ecd1f1c1c9..0000000000000 --- a/plugins/repository-gcs/licenses/protobuf-java-3.19.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4b57f1b1b9e281231c3fcfc039ce3021e29ff570 \ No newline at end of file diff --git a/plugins/repository-gcs/licenses/protobuf-java-3.21.7.jar.sha1 b/plugins/repository-gcs/licenses/protobuf-java-3.21.7.jar.sha1 new file mode 100644 index 0000000000000..faa673a23ef41 --- /dev/null +++ b/plugins/repository-gcs/licenses/protobuf-java-3.21.7.jar.sha1 @@ -0,0 +1 @@ +96cfc7147192f1de72c3d7d06972155ffb7d180c \ No newline at end of file diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle index 792bdc6bacd4a..07dc8b8547b80 100644 --- a/plugins/repository-hdfs/build.gradle +++ b/plugins/repository-hdfs/build.gradle @@ -69,7 +69,7 @@ dependencies { api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api 'com.google.code.gson:gson:2.9.0' runtimeOnly 'com.google.guava:guava:31.1-jre' - api 'com.google.protobuf:protobuf-java:3.21.4' + api 'com.google.protobuf:protobuf-java:3.21.7' api "commons-logging:commons-logging:${versions.commonslogging}" api 'commons-cli:commons-cli:1.5.0' api "commons-codec:commons-codec:${versions.commonscodec}" diff --git a/plugins/repository-hdfs/licenses/protobuf-java-3.21.4.jar.sha1 b/plugins/repository-hdfs/licenses/protobuf-java-3.21.4.jar.sha1 deleted file mode 100644 index f232c9a449547..0000000000000 --- a/plugins/repository-hdfs/licenses/protobuf-java-3.21.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9947febd7a6d0695726c78f603a149b7b7c108e0 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/protobuf-java-3.21.7.jar.sha1 b/plugins/repository-hdfs/licenses/protobuf-java-3.21.7.jar.sha1 new file mode 100644 index 0000000000000..faa673a23ef41 --- /dev/null +++ b/plugins/repository-hdfs/licenses/protobuf-java-3.21.7.jar.sha1 @@ -0,0 +1 @@ +96cfc7147192f1de72c3d7d06972155ffb7d180c \ No newline at end of file From 49363fb06a4cbd48e6baa90f53b74764fb8ec155 Mon Sep 17 00:00:00 2001 From: Nick Knize Date: Tue, 18 Oct 2022 19:01:27 -0500 Subject: [PATCH 8/9] Disable merge on refresh in DiskThresholdDeciderIT (#4828) Disables merge on refresh for DiskThresholdDeciderIT.testRestoreSnapshotAllocationDoesNotExceedWatermark. Signed-off-by: Nicholas Walter Knize --- CHANGELOG.md | 1 + .../routing/allocation/decider/DiskThresholdDeciderIT.java | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15f3e1e5cdd2a..f592ee04e55b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -166,6 +166,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296)) - Commit workflow for dependabot changelog helper ([#4331](https://github.com/opensearch-project/OpenSearch/pull/4331)) - Better plural stemmer than minimal_english ([#4738](https://github.com/opensearch-project/OpenSearch/pull/4738)) +- Disable merge on refresh in DiskThresholdDeciderIT ([#4828](https://github.com/opensearch-project/OpenSearch/pull/4828)) ### Security [Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 10e809e2fb5dc..955f0f0465d88 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -217,6 +217,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6) .put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms") + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false) .build() ); final long minShardSize = createReasonableSizedShards(indexName); From eb3301554ec7e1a07866508ffc390b8651dc3394 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Vl=C4=8Dek?= Date: Wed, 19 Oct 2022 02:27:08 +0200 Subject: [PATCH 9/9] Add groupId value propagation tests for ZIP publication task (#4772) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The groupId can be defined on several levels. This commit adds more tests to cover the "edge" cases. - In one case the groupId is inherited from the top most 'allprojects' section (and thus can be missing in the publications section). - The other case is opposite, it tests that if the groupId is defined on several levels then the most internal level outweighs the other levels. Closes: #4771 Signed-off-by: Lukáš Vlček Signed-off-by: Lukáš Vlček --- CHANGELOG.md | 1 + .../gradle/pluginzip/PublishTests.java | 70 +++++++++++++++++++ .../pluginzip/allProjectsGroup.gradle | 28 ++++++++ .../pluginzip/groupPriorityLevel.gradle | 30 ++++++++ 4 files changed, 129 insertions(+) create mode 100644 buildSrc/src/test/resources/pluginzip/allProjectsGroup.gradle create mode 100644 buildSrc/src/test/resources/pluginzip/groupPriorityLevel.gradle diff --git a/CHANGELOG.md b/CHANGELOG.md index f592ee04e55b9..4c4c7c80e5410 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Recommissioning of zone. REST layer support. ([#4624](https://github.com/opensearch-project/OpenSearch/pull/4604)) - Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565)) - Apply reproducible builds configuration for OpenSearch plugins through gradle plugin ([#4746](https://github.com/opensearch-project/OpenSearch/pull/4746)) +- Add groupId value propagation tests for ZIP publication task ([#4772](https://github.com/opensearch-project/OpenSearch/pull/4772)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 diff --git a/buildSrc/src/test/java/org/opensearch/gradle/pluginzip/PublishTests.java b/buildSrc/src/test/java/org/opensearch/gradle/pluginzip/PublishTests.java index 2ca0e507acb44..148a836f32b41 100644 --- a/buildSrc/src/test/java/org/opensearch/gradle/pluginzip/PublishTests.java +++ b/buildSrc/src/test/java/org/opensearch/gradle/pluginzip/PublishTests.java @@ -271,6 +271,76 @@ public void useDefaultValues() throws IOException, URISyntaxException, XmlPullPa assertEquals(model.getUrl(), "https://github.com/doe/sample-plugin"); } + /** + * If the `group` is defined in gradle's allprojects section then it does not have to defined in publications. + */ + @Test + public void allProjectsGroup() throws IOException, URISyntaxException, XmlPullParserException { + GradleRunner runner = prepareGradleRunnerFromTemplate("allProjectsGroup.gradle", "build", ZIP_PUBLISH_TASK); + BuildResult result = runner.build(); + + /** Check if build and {@value ZIP_PUBLISH_TASK} tasks have run well */ + assertEquals(SUCCESS, result.task(":" + "build").getOutcome()); + assertEquals(SUCCESS, result.task(":" + ZIP_PUBLISH_TASK).getOutcome()); + + // Parse the maven file and validate default values + MavenXpp3Reader reader = new MavenXpp3Reader(); + Model model = reader.read( + new FileReader( + new File( + projectDir.getRoot(), + String.join( + File.separator, + "build", + "local-staging-repo", + "org", + "opensearch", + PROJECT_NAME, + "2.0.0.0", + PROJECT_NAME + "-2.0.0.0.pom" + ) + ) + ) + ); + assertEquals(model.getVersion(), "2.0.0.0"); + assertEquals(model.getGroupId(), "org.opensearch"); + } + + /** + * The groupId value can be defined on several levels. This tests that the most internal level outweighs other levels. + */ + @Test + public void groupPriorityLevel() throws IOException, URISyntaxException, XmlPullParserException { + GradleRunner runner = prepareGradleRunnerFromTemplate("groupPriorityLevel.gradle", "build", ZIP_PUBLISH_TASK); + BuildResult result = runner.build(); + + /** Check if build and {@value ZIP_PUBLISH_TASK} tasks have run well */ + assertEquals(SUCCESS, result.task(":" + "build").getOutcome()); + assertEquals(SUCCESS, result.task(":" + ZIP_PUBLISH_TASK).getOutcome()); + + // Parse the maven file and validate default values + MavenXpp3Reader reader = new MavenXpp3Reader(); + Model model = reader.read( + new FileReader( + new File( + projectDir.getRoot(), + String.join( + File.separator, + "build", + "local-staging-repo", + "level", + "3", + PROJECT_NAME, + "2.0.0.0", + PROJECT_NAME + "-2.0.0.0.pom" + ) + ) + ) + ); + assertEquals(model.getVersion(), "2.0.0.0"); + assertEquals(model.getGroupId(), "level.3"); + } + /** * In this case the Publication entity is completely missing but still the POM file is generated using the default * values including the groupId and version values obtained from the Gradle project object. diff --git a/buildSrc/src/test/resources/pluginzip/allProjectsGroup.gradle b/buildSrc/src/test/resources/pluginzip/allProjectsGroup.gradle new file mode 100644 index 0000000000000..80638107c86e1 --- /dev/null +++ b/buildSrc/src/test/resources/pluginzip/allProjectsGroup.gradle @@ -0,0 +1,28 @@ +plugins { + id 'java-gradle-plugin' + id 'opensearch.pluginzip' +} + +version='2.0.0.0' + +// A bundlePlugin task mockup +tasks.register('bundlePlugin', Zip.class) { + archiveFileName = "sample-plugin-${version}.zip" + destinationDirectory = layout.buildDirectory.dir('distributions') + from layout.projectDirectory.file('sample-plugin-source.txt') +} + +allprojects { + group = 'org.opensearch' +} + +publishing { + publications { + pluginZip(MavenPublication) { publication -> + pom { + name = "sample-plugin" + description = "pluginDescription" + } + } + } +} diff --git a/buildSrc/src/test/resources/pluginzip/groupPriorityLevel.gradle b/buildSrc/src/test/resources/pluginzip/groupPriorityLevel.gradle new file mode 100644 index 0000000000000..4da02c9f191d8 --- /dev/null +++ b/buildSrc/src/test/resources/pluginzip/groupPriorityLevel.gradle @@ -0,0 +1,30 @@ +plugins { + id 'java-gradle-plugin' + id 'opensearch.pluginzip' +} + +version='2.0.0.0' + +// A bundlePlugin task mockup +tasks.register('bundlePlugin', Zip.class) { + archiveFileName = "sample-plugin-${version}.zip" + destinationDirectory = layout.buildDirectory.dir('distributions') + from layout.projectDirectory.file('sample-plugin-source.txt') +} + +allprojects { + group = 'level.1' +} + +publishing { + publications { + pluginZip(MavenPublication) { publication -> + groupId = "level.2" + pom { + name = "sample-plugin" + description = "pluginDescription" + groupId = "level.3" + } + } + } +}