From ce8c6b10446d8d168e09b2e549fd3de359edd4d4 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 7 Aug 2023 11:45:41 -0700 Subject: [PATCH] [Segment Replication] Prioritize replica shard movement during shard relocation (#8875) (#9153) When some node or set of nodes is excluded, the shards are moved away in random order. When segment replication is enabled for a cluster, we might end up in a mixed version state where replicas will be on lower version and unable to read segments sent from higher version primaries and fail. To avoid this, we could prioritize replica shard movement to avoid entering this situation. Adding a new setting called shard movement strategy - `SHARD_MOVEMENT_STRATEGY_SETTING` - that will allow us to specify in which order we want to move our shards: `NO_PREFERENCE` (default), `PRIMARY_FIRST` or `REPLICA_FIRST`. The `PRIMARY_FIRST` option will perform the same behavior as the previous setting `SHARD_MOVE_PRIMARY_FIRST_SETTING` which will be now deprecated in favor of the shard movement strategy setting. Expected behavior: If `SHARD_MOVEMENT_STRATEGY_SETTING` is changed from its default behavior to be either `PRIMARY_FIRST` or `REPLICA_FIRST` then we perform this behavior whether or not `SHARD_MOVE_PRIMARY_FIRST_SETTING` is enabled. If `SHARD_MOVEMENT_STRATEGY_SETTING` is still at its default setting of `NO_PREFERENCE` and `SHARD_MOVE_PRIMARY_FIRST_SETTING` is enabled we move the primary shards first. This ensures that users still using this setting will not see any changes in behavior. Reference: https://github.com/opensearch-project/OpenSearch/pull/1445 Parent issue: https://github.com/opensearch-project/OpenSearch/issues/3881 --------- Signed-off-by: Poojita Raj (cherry picked from commit c6e4bcd097969f00956c0f4c152540cb610e4f93) --- CHANGELOG.md | 1 + .../org/opensearch/cluster/ClusterModule.java | 2 +- .../cluster/routing/RoutingNodes.java | 167 +++++++++++------- .../routing/ShardMovementStrategy.java | 57 ++++++ .../allocator/BalancedShardsAllocator.java | 27 ++- .../allocator/LocalShardsBalancer.java | 27 ++- .../decider/NodeVersionAllocationDecider.java | 32 ++++ .../common/settings/ClusterSettings.java | 1 + .../cluster/routing/RoutingNodesTests.java | 52 +++++- ...s.java => ShardMovementStrategyTests.java} | 65 +++++-- .../NodeVersionAllocationDeciderTests.java | 151 +++++++++++++++- 11 files changed, 491 insertions(+), 91 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java rename server/src/test/java/org/opensearch/cluster/routing/{MovePrimaryFirstTests.java => ShardMovementStrategyTests.java} (60%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e8f3a3d6dc24..3d6845951d4a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606)) - Disallow compression level to be set for default and best_compression index codecs ([#8737]()https://github.com/opensearch-project/OpenSearch/pull/8737) - [distribution/archives] [Linux] [x64] Provide the variant of the distributions bundled with JRE ([#8195]()https://github.com/opensearch-project/OpenSearch/pull/8195) +- Prioritize replica shard movement during shard relocation ([#8875](https://github.com/opensearch-project/OpenSearch/pull/8875)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index b80fd1d746831..e797a08aba3cd 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -359,7 +359,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings)); - addAllocationDecider(deciders, new NodeVersionAllocationDecider()); + addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings)); addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index b10c3d00f4c31..83624a79edd75 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -1310,100 +1310,131 @@ private void ensureMutable() { } /** - * Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from - * the first node, then the first shard of the second node, etc. until one shard from each node has been returned. - * The iterator then resumes on the first node by returning the second shard and continues until all shards from - * all the nodes have been returned. - * @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node - * @return iterator of shard routings + * Returns iterator of shard routings used by {@link #nodeInterleavedShardIterator(ShardMovementStrategy)} + * @param primaryFirst true when ShardMovementStrategy = ShardMovementStrategy.PRIMARY_FIRST, false when it is ShardMovementStrategy.REPLICA_FIRST */ - public Iterator nodeInterleavedShardIterator(boolean movePrimaryFirst) { + private Iterator buildIteratorForMovementStrategy(boolean primaryFirst) { final Queue> queue = new ArrayDeque<>(); for (Map.Entry entry : nodesToShards.entrySet()) { queue.add(entry.getValue().copyShards().iterator()); } - if (movePrimaryFirst) { - return new Iterator() { - private Queue replicaShards = new ArrayDeque<>(); - private Queue> replicaIterators = new ArrayDeque<>(); - - public boolean hasNext() { - while (!queue.isEmpty()) { - if (queue.peek().hasNext()) { - return true; - } - queue.poll(); - } - if (!replicaShards.isEmpty()) { + return new Iterator() { + private Queue shardRoutings = new ArrayDeque<>(); + private Queue> shardIterators = new ArrayDeque<>(); + + public boolean hasNext() { + while (queue.isEmpty() == false) { + if (queue.peek().hasNext()) { return true; } - while (!replicaIterators.isEmpty()) { - if (replicaIterators.peek().hasNext()) { - return true; - } - replicaIterators.poll(); + queue.poll(); + } + if (!shardRoutings.isEmpty()) { + return true; + } + while (!shardIterators.isEmpty()) { + if (shardIterators.peek().hasNext()) { + return true; } - return false; + shardIterators.poll(); } + return false; + } - public ShardRouting next() { - if (hasNext() == false) { - throw new NoSuchElementException(); - } - while (!queue.isEmpty()) { - Iterator iter = queue.poll(); + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + while (!queue.isEmpty()) { + Iterator iter = queue.poll(); + if (primaryFirst) { if (iter.hasNext()) { ShardRouting result = iter.next(); if (result.primary()) { queue.offer(iter); return result; } - replicaShards.offer(result); - replicaIterators.offer(iter); + shardRoutings.offer(result); + shardIterators.offer(iter); + } + } else { + while (iter.hasNext()) { + ShardRouting result = iter.next(); + if (result.primary() == false) { + queue.offer(iter); + return result; + } + shardRoutings.offer(result); + shardIterators.offer(iter); } } - if (!replicaShards.isEmpty()) { - return replicaShards.poll(); - } - Iterator replicaIterator = replicaIterators.poll(); - ShardRouting replicaShard = replicaIterator.next(); - replicaIterators.offer(replicaIterator); - - assert !replicaShard.primary(); - return replicaShard; } - - public void remove() { - throw new UnsupportedOperationException(); + if (!shardRoutings.isEmpty()) { + return shardRoutings.poll(); } - }; + Iterator replicaIterator = shardIterators.poll(); + ShardRouting replicaShard = replicaIterator.next(); + shardIterators.offer(replicaIterator); + + assert replicaShard.primary() != primaryFirst; + return replicaShard; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + /** + * Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from + * the first node, then the first shard of the second node, etc. until one shard from each node has been returned. + * The iterator then resumes on the first node by returning the second shard and continues until all shards from + * all the nodes have been returned. + * @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node + * if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node + * if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration + * @return iterator of shard routings + */ + public Iterator nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) { + final Queue> queue = new ArrayDeque<>(); + for (Map.Entry entry : nodesToShards.entrySet()) { + queue.add(entry.getValue().copyShards().iterator()); + } + if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) { + return buildIteratorForMovementStrategy(true); } else { - return new Iterator() { - @Override - public boolean hasNext() { - while (!queue.isEmpty()) { - if (queue.peek().hasNext()) { - return true; + if (shardMovementStrategy == ShardMovementStrategy.REPLICA_FIRST) { + return buildIteratorForMovementStrategy(false); + } else { + return new Iterator() { + @Override + public boolean hasNext() { + while (!queue.isEmpty()) { + if (queue.peek().hasNext()) { + return true; + } + queue.poll(); } - queue.poll(); + return false; } - return false; - } - @Override - public ShardRouting next() { - if (hasNext() == false) { - throw new NoSuchElementException(); + @Override + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + Iterator iter = queue.poll(); + queue.offer(iter); + return iter.next(); } - Iterator iter = queue.poll(); - queue.offer(iter); - return iter.next(); - } - public void remove() { - throw new UnsupportedOperationException(); - } - }; + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java b/server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java new file mode 100644 index 0000000000000..cfdeed5c227b6 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; + +import java.util.Locale; + +/** + * ShardMovementStrategy defines the order in which shard movement occurs. + * + * ShardMovementStrategy values or rather their string representation to be used with + * {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings. + * + * @opensearch.internal + */ +public enum ShardMovementStrategy { + /** + * default behavior in which order of shard movement doesn't matter. + */ + NO_PREFERENCE, + + /** + * primary shards are moved first + */ + PRIMARY_FIRST, + + /** + * replica shards are moved first + */ + REPLICA_FIRST; + + public static ShardMovementStrategy parse(String strValue) { + if (strValue == null) { + return null; + } else { + strValue = strValue.toUpperCase(Locale.ROOT); + try { + return ShardMovementStrategy.valueOf(strValue); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]"); + } + } + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 59d7fab59c266..19e0e318eb805 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -37,6 +37,7 @@ import org.apache.lucene.util.IntroSorter; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardMovementStrategy; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; @@ -107,8 +108,22 @@ public class BalancedShardsAllocator implements ShardsAllocator { "cluster.routing.allocation.move.primary_first", false, Property.Dynamic, + Property.NodeScope, + Property.Deprecated + ); + + /** + * Decides order in which to move shards from node when shards can not stay on node anymore. {@link LocalShardsBalancer#moveShards()} + * Encapsulates behavior of above SHARD_MOVE_PRIMARY_FIRST_SETTING. + */ + public static final Setting SHARD_MOVEMENT_STRATEGY_SETTING = new Setting( + "cluster.routing.allocation.shard_movement_strategy", + ShardMovementStrategy.NO_PREFERENCE.toString(), + ShardMovementStrategy::parse, + Property.Dynamic, Property.NodeScope ); + public static final Setting THRESHOLD_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.threshold", 1.0f, @@ -131,6 +146,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); private volatile boolean movePrimaryFirst; + private volatile ShardMovementStrategy shardMovementStrategy; private volatile boolean preferPrimaryShardBalance; private volatile WeightFunction weightFunction; @@ -145,8 +161,10 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); + setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); + clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); } @@ -155,6 +173,10 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) { this.movePrimaryFirst = movePrimaryFirst; } + private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrategy) { + this.shardMovementStrategy = shardMovementStrategy; + } + private void setWeightFunction(float indexBalance, float shardBalanceFactor) { weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); } @@ -184,6 +206,7 @@ public void allocate(RoutingAllocation allocation) { logger, allocation, movePrimaryFirst, + shardMovementStrategy, weightFunction, threshold, preferPrimaryShardBalance @@ -205,6 +228,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f logger, allocation, movePrimaryFirst, + shardMovementStrategy, weightFunction, threshold, preferPrimaryShardBalance @@ -456,11 +480,12 @@ public Balancer( Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, + ShardMovementStrategy shardMovementStrategy, BalancedShardsAllocator.WeightFunction weight, float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, movePrimaryFirst, weight, threshold, preferPrimaryBalance); + super(logger, allocation, movePrimaryFirst, shardMovementStrategy, weight, threshold, preferPrimaryBalance); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index dc95d77f04c0d..2313fecd2e09e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -16,6 +16,7 @@ import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardMovementStrategy; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; @@ -58,6 +59,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final RoutingAllocation allocation; private final RoutingNodes routingNodes; private final boolean movePrimaryFirst; + private final ShardMovementStrategy shardMovementStrategy; private final boolean preferPrimaryBalance; private final BalancedShardsAllocator.WeightFunction weight; @@ -74,6 +76,7 @@ public LocalShardsBalancer( Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, + ShardMovementStrategy shardMovementStrategy, BalancedShardsAllocator.WeightFunction weight, float threshold, boolean preferPrimaryBalance @@ -93,6 +96,7 @@ public LocalShardsBalancer( sorter = newNodeSorter(); inEligibleTargetNode = new HashSet<>(); this.preferPrimaryBalance = preferPrimaryBalance; + this.shardMovementStrategy = shardMovementStrategy; } /** @@ -527,6 +531,22 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { } } + /** + * Returns the correct Shard movement strategy to use. + * If users are still using deprecated setting "move_primary_first", we want behavior to remain unchanged. + * In the event of changing ShardMovementStrategy setting from default setting NO_PREFERENCE to either PRIMARY_FIRST or REPLICA_FIRST, we want that + * to have priority over values set in move_primary_first setting. + */ + private ShardMovementStrategy getShardMovementStrategy() { + if (shardMovementStrategy != ShardMovementStrategy.NO_PREFERENCE) { + return shardMovementStrategy; + } + if (movePrimaryFirst) { + return ShardMovementStrategy.PRIMARY_FIRST; + } + return ShardMovementStrategy.NO_PREFERENCE; + } + /** * Move started shards that can not be allocated to a node anymore * @@ -549,7 +569,8 @@ void moveShards() { checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); } boolean primariesThrottled = false; - for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) { + for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(getShardMovementStrategy()); it + .hasNext();) { // Verify if the cluster concurrent recoveries have been reached. if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { logger.info( @@ -573,8 +594,8 @@ void moveShards() { continue; } - // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled - if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { + // Ensure that replicas don't relocate if primaries are being throttled and primary first shard movement strategy is enabled + if ((shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) && primariesThrottled && !shardRouting.primary()) { logger.info( "Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards" + "are being throttled. Skipping shard iteration" diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java index 19b7494c000de..9344b4c87830d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java @@ -32,12 +32,18 @@ package org.opensearch.cluster.routing.allocation.decider; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.util.List; +import java.util.stream.Collectors; /** * An allocation decider that prevents relocation or allocation from nodes @@ -52,9 +58,35 @@ public class NodeVersionAllocationDecider extends AllocationDecider { public static final String NAME = "node_version"; + private final ReplicationType replicationType; + + public NodeVersionAllocationDecider(Settings settings) { + replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings); + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (shardRouting.primary()) { + if (replicationType == ReplicationType.SEGMENT) { + List replicas = allocation.routingNodes() + .assignedShards(shardRouting.shardId()) + .stream() + .filter(shr -> !shr.primary() && shr.active()) + .collect(Collectors.toList()); + for (ShardRouting replica : replicas) { + // can not allocate if target node version > any existing replica version + RoutingNode replicaNode = allocation.routingNodes().node(replica.currentNodeId()); + if (node.node().getVersion().after(replicaNode.node().getVersion())) { + return allocation.decision( + Decision.NO, + NAME, + "When segment replication is enabled, cannot relocate primary shard to a node with version [%s] if it has a replica on older version [%s]", + node.node().getVersion(), + replicaNode.node().getVersion() + ); + } + } + } if (shardRouting.currentNodeId() == null) { if (shardRouting.recoverySource() != null && shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) { // restoring from a snapshot - check that the node can handle the version diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index b4d77be42c5f4..ce6ee474dbe1e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -242,6 +242,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, + BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java index 73136a71bc12a..23056ec5782bb 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java @@ -124,7 +124,7 @@ private IndexMetadata.Builder createIndexMetadata(String indexName) { .numberOfShards(this.numberOfShards); } - public void testInterleavedShardIterator() { + public void testInterleavedShardIteratorPrimaryFirst() { // Initialize all the shards for test index 1 and 2 initPrimaries(); startInitializingShards(TEST_INDEX_1); @@ -147,7 +147,8 @@ public void testInterleavedShardIterator() { } // Get primary first shard iterator and assert primary shards are iterated over first - final Iterator iterator = this.clusterState.getRoutingNodes().nodeInterleavedShardIterator(true); + final Iterator iterator = this.clusterState.getRoutingNodes() + .nodeInterleavedShardIterator(ShardMovementStrategy.PRIMARY_FIRST); boolean iteratingPrimary = true; int shardCount = 0; while (iterator.hasNext()) { @@ -155,11 +156,54 @@ public void testInterleavedShardIterator() { if (iteratingPrimary) { iteratingPrimary = shard.primary(); } else { - assert shard.primary() == false; + assertFalse(shard.primary()); } shardCount++; } - assert shardCount == this.totalNumberOfShards; + assertEquals(shardCount, this.totalNumberOfShards); + } + + public void testInterleavedShardIteratorNoPreference() { + // Initialize all the shards for test index 1 and 2 + initPrimaries(); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + startInitializingShards(TEST_INDEX_2); + + final Iterator iterator = this.clusterState.getRoutingNodes() + .nodeInterleavedShardIterator(ShardMovementStrategy.NO_PREFERENCE); + int shardCount = 0; + while (iterator.hasNext()) { + final ShardRouting shard = iterator.next(); + shardCount++; + } + assertEquals(shardCount, this.totalNumberOfShards); + } + + public void testInterleavedShardIteratorReplicaFirst() { + // Initialize all the shards for test index 1 and 2 + initPrimaries(); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + startInitializingShards(TEST_INDEX_2); + + // Get replica first shard iterator and assert replica shards are iterated over first + final Iterator iterator = this.clusterState.getRoutingNodes() + .nodeInterleavedShardIterator(ShardMovementStrategy.REPLICA_FIRST); + boolean iteratingReplica = true; + int shardCount = 0; + while (iterator.hasNext()) { + final ShardRouting shard = iterator.next(); + if (iteratingReplica) { + iteratingReplica = shard.primary() == false; + } else { + assertTrue(shard.primary()); + } + shardCount++; + } + assertEquals(shardCount, this.totalNumberOfShards); } public void testSwapPrimaryWithReplica() { diff --git a/server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java b/server/src/test/java/org/opensearch/cluster/routing/ShardMovementStrategyTests.java similarity index 60% rename from server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java rename to server/src/test/java/org/opensearch/cluster/routing/ShardMovementStrategyTests.java index a30581e2576e2..12994bdfcf6d5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/ShardMovementStrategyTests.java @@ -22,7 +22,7 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @ThreadLeakScope(ThreadLeakScope.Scope.NONE) -public class MovePrimaryFirstTests extends OpenSearchIntegTestCase { +public class ShardMovementStrategyTests extends OpenSearchIntegTestCase { protected String startDataOnlyNode(final String zone) { final Settings settings = Settings.builder().put("node.attr.zone", zone).build(); @@ -48,14 +48,48 @@ protected void createAndIndex(String index, int replicaCount, int shardCount) { flushAndRefresh(index); } + private static Settings.Builder getSettings(ShardMovementStrategy shardMovementStrategy, boolean movePrimaryFirst) { + return Settings.builder() + .put("cluster.routing.allocation.shard_movement_strategy", shardMovementStrategy) + .put("cluster.routing.allocation.move.primary_first", movePrimaryFirst); + } + + public void testClusterGreenAfterPartialRelocationPrimaryFirstShardMovementMovePrimarySettingEnabled() throws InterruptedException { + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.PRIMARY_FIRST, true); + } + + public void testClusterGreenAfterPartialRelocationPrimaryFirstShardMovementMovePrimarySettingDisabled() throws InterruptedException { + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.PRIMARY_FIRST, false); + } + + public void testClusterGreenAfterPartialRelocationReplicaFirstShardMovementPrimaryFirstEnabled() throws InterruptedException { + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.REPLICA_FIRST, true); + } + + public void testClusterGreenAfterPartialRelocationReplicaFirstShardMovementPrimaryFirstDisabled() throws InterruptedException { + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.REPLICA_FIRST, false); + } + + public void testClusterGreenAfterPartialRelocationNoPreferenceShardMovementPrimaryFirstEnabled() throws InterruptedException { + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.NO_PREFERENCE, true); + } + + private boolean shouldMovePrimaryShardsFirst(ShardMovementStrategy shardMovementStrategy, boolean movePrimaryFirst) { + if (shardMovementStrategy == ShardMovementStrategy.NO_PREFERENCE && movePrimaryFirst) { + return true; + } + return shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST; + } + /** * Creates two nodes each in two zones and shuts down nodes in zone1 after * relocating half the number of shards. Shards per node constraint ensures * that exactly 50% of shards relocate to nodes in zone2 giving time to shut down - * nodes in zone1. Since primaries are relocated first as movePrimaryFirst is - * enabled, cluster should not become red and zone2 nodes have all the primaries + * nodes in zone1. Depending on the shard movement strategy, we check whether the + * primary or replica shards are moved first, and zone2 nodes have all the shards */ - public void testClusterGreenAfterPartialRelocation() throws InterruptedException { + private void testClusterGreenAfterPartialRelocation(ShardMovementStrategy shardMovementStrategy, boolean movePrimaryFirst) + throws InterruptedException { internalCluster().startClusterManagerOnlyNodes(1); final String z1 = "zone-1", z2 = "zone-2"; final int primaryShardCount = 6; @@ -73,7 +107,7 @@ public void testClusterGreenAfterPartialRelocation() throws InterruptedException // zone nodes excluded to prevent any shard relocation ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); settingsRequest.persistentSettings( - Settings.builder().put("cluster.routing.allocation.move.primary_first", true).put("cluster.routing.allocation.exclude.zone", z2) + getSettings(shardMovementStrategy, movePrimaryFirst).put("cluster.routing.allocation.exclude.zone", z2) ); client().admin().cluster().updateSettings(settingsRequest).actionGet(); @@ -82,7 +116,7 @@ public void testClusterGreenAfterPartialRelocation() throws InterruptedException // Create cluster state listener to compute number of shards on new zone // nodes before counting down the latch - final CountDownLatch primaryMoveLatch = new CountDownLatch(1); + final CountDownLatch shardMoveLatch = new CountDownLatch(1); final ClusterStateListener listener = event -> { if (event.routingTableChanged()) { final RoutingNodes routingNodes = event.state().getRoutingNodes(); @@ -91,13 +125,22 @@ public void testClusterGreenAfterPartialRelocation() throws InterruptedException RoutingNode routingNode = it.next(); final String nodeName = routingNode.node().getName(); if (nodeName.equals(z2n1) || nodeName.equals(z2n2)) { - startedCount += routingNode.numberOfShardsWithState(ShardRoutingState.STARTED); + int count = 0; + for (ShardRouting shardEntry : routingNode) { + // If shard movement strategy is primary first, asserting that primary shards are moved first; else assert + // shards are replicas + if ((shardEntry.primary() == shouldMovePrimaryShardsFirst(shardMovementStrategy, movePrimaryFirst)) + && shardEntry.state() == ShardRoutingState.STARTED) { + count++; + } + } + startedCount += count; } } - // Count down the latch once all the primary shards have initialized on nodes in zone-2 + // Count down the latch once all the shards have initialized on nodes in zone-2 if (startedCount == primaryShardCount) { - primaryMoveLatch.countDown(); + shardMoveLatch.countDown(); } } }; @@ -108,12 +151,12 @@ public void testClusterGreenAfterPartialRelocation() throws InterruptedException settingsRequest.persistentSettings( Settings.builder() .put("cluster.routing.allocation.exclude.zone", z1) - // Total shards per node constraint is added to pause the relocation after primary shards + // Total shards per node constraint is added to pause the relocation after shards // have relocated to allow time for node shutdown and validate yellow cluster .put("cluster.routing.allocation.total_shards_per_node", primaryShardCount / 2) ); client().admin().cluster().updateSettings(settingsRequest); - primaryMoveLatch.await(); + shardMoveLatch.await(); // Shutdown both nodes in zone 1 and ensure cluster does not become red try { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index 557d7db142671..d4ec340766ec9 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -67,6 +67,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.snapshots.InternalSnapshotsInfoService; @@ -439,7 +440,9 @@ public void testRebalanceDoesNotAllocatePrimaryAndReplicasOnDifferentVersionNode .routingTable(routingTable) .nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)) .build(); - AllocationDeciders allocationDeciders = new AllocationDeciders(Collections.singleton(new NodeVersionAllocationDecider())); + AllocationDeciders allocationDeciders = new AllocationDeciders( + Collections.singleton(new NodeVersionAllocationDecider(Settings.EMPTY)) + ); AllocationService strategy = new MockAllocationService( allocationDeciders, new TestGatewayAllocator(), @@ -509,7 +512,7 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() { .nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)) .build(); AllocationDeciders allocationDeciders = new AllocationDeciders( - Arrays.asList(new ReplicaAfterPrimaryActiveAllocationDecider(), new NodeVersionAllocationDecider()) + Arrays.asList(new ReplicaAfterPrimaryActiveAllocationDecider(), new NodeVersionAllocationDecider(Settings.EMPTY)) ); AllocationService strategy = new MockAllocationService( allocationDeciders, @@ -526,6 +529,148 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() { } } + public void testRebalanceDoesNotAllocatePrimaryOnHigherVersionNodesSegrepEnabled() { + ShardId shard1 = new ShardId("test1", "_na_", 0); + ShardId shard2 = new ShardId("test2", "_na_", 0); + final DiscoveryNode newNode1 = new DiscoveryNode( + "newNode1", + buildNewFakeTransportAddress(), + emptyMap(), + CLUSTER_MANAGER_DATA_ROLES, + Version.CURRENT + ); + final DiscoveryNode newNode2 = new DiscoveryNode( + "newNode2", + buildNewFakeTransportAddress(), + emptyMap(), + CLUSTER_MANAGER_DATA_ROLES, + Version.CURRENT + ); + final DiscoveryNode oldNode1 = new DiscoveryNode( + "oldNode1", + buildNewFakeTransportAddress(), + emptyMap(), + CLUSTER_MANAGER_DATA_ROLES, + VersionUtils.getPreviousVersion() + ); + final DiscoveryNode oldNode2 = new DiscoveryNode( + "oldNode2", + buildNewFakeTransportAddress(), + emptyMap(), + CLUSTER_MANAGER_DATA_ROLES, + VersionUtils.getPreviousVersion() + ); + AllocationId allocationId1P = AllocationId.newInitializing(); + AllocationId allocationId1R = AllocationId.newInitializing(); + AllocationId allocationId2P = AllocationId.newInitializing(); + AllocationId allocationId2R = AllocationId.newInitializing(); + + Settings segmentReplicationSettings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(shard1.getIndexName()) + .settings(settings(Version.CURRENT).put(segmentReplicationSettings)) + .numberOfShards(1) + .numberOfReplicas(1) + .putInSyncAllocationIds(0, Sets.newHashSet(allocationId1P.getId(), allocationId1R.getId())) + ) + .put( + IndexMetadata.builder(shard2.getIndexName()) + .settings(settings(Version.CURRENT).put(segmentReplicationSettings)) + .numberOfShards(1) + .numberOfReplicas(1) + .putInSyncAllocationIds(0, Sets.newHashSet(allocationId2P.getId(), allocationId2R.getId())) + ) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shard1.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shard1).addShard( + TestShardRouting.newShardRouting( + shard1.getIndexName(), + shard1.getId(), + oldNode1.getId(), + null, + true, + ShardRoutingState.STARTED, + allocationId1P + ) + ) + .addShard( + TestShardRouting.newShardRouting( + shard1.getIndexName(), + shard1.getId(), + oldNode2.getId(), + null, + false, + ShardRoutingState.STARTED, + allocationId1R + ) + ) + .build() + ) + ) + .add( + IndexRoutingTable.builder(shard2.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shard2).addShard( + TestShardRouting.newShardRouting( + shard2.getIndexName(), + shard2.getId(), + oldNode2.getId(), + null, + true, + ShardRoutingState.STARTED, + allocationId2P + ) + ) + .addShard( + TestShardRouting.newShardRouting( + shard2.getIndexName(), + shard2.getId(), + oldNode1.getId(), + null, + false, + ShardRoutingState.STARTED, + allocationId2R + ) + ) + .build() + ) + ) + .build(); + ClusterState state = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode1).add(newNode2).add(oldNode1).add(oldNode2)) + .build(); + AllocationDeciders allocationDeciders = new AllocationDeciders( + Collections.singleton(new NodeVersionAllocationDecider(segmentReplicationSettings)) + ); + AllocationService strategy = new MockAllocationService( + allocationDeciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState(); + // the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match + assertThat(state.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat( + state.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).get(0).primary(), + equalTo(false) + ); + assertThat(state.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat( + state.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).get(0).primary(), + equalTo(false) + ); + } + private ClusterState stabilize(ClusterState clusterState, AllocationService service) { logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes()); @@ -626,7 +771,7 @@ public void testMessages() { RoutingAllocation routingAllocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0); routingAllocation.debugDecision(true); - final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider(); + final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider(Settings.EMPTY); Decision decision = allocationDecider.canAllocate(primaryShard, newNode, routingAllocation); assertThat(decision.type(), is(Decision.Type.YES)); assertThat(decision.getExplanation(), is("the primary shard is new or already existed on the node"));