Skip to content

Commit

Permalink
[Segment Replication] Prioritize replica shard movement during shard …
Browse files Browse the repository at this point in the history
…relocation (#8875) (#9153)

When some node or set of nodes is excluded, the shards are moved away in random order. When segment replication is enabled for a cluster, we might end up in a mixed version state where replicas will be on lower version and unable to read segments sent from higher version primaries and fail.

To avoid this, we could prioritize replica shard movement to avoid entering this situation.

Adding a new setting called shard movement strategy - `SHARD_MOVEMENT_STRATEGY_SETTING` - that will allow us to specify in which order we want to move our shards: `NO_PREFERENCE` (default), `PRIMARY_FIRST` or `REPLICA_FIRST`. 

The `PRIMARY_FIRST` option will perform the same behavior as the previous setting `SHARD_MOVE_PRIMARY_FIRST_SETTING` which will be now deprecated in favor of the shard movement strategy setting. 

Expected behavior: 

If `SHARD_MOVEMENT_STRATEGY_SETTING` is changed from its default behavior to be either `PRIMARY_FIRST` or `REPLICA_FIRST` then we perform this behavior whether or not `SHARD_MOVE_PRIMARY_FIRST_SETTING` is enabled. 

If `SHARD_MOVEMENT_STRATEGY_SETTING` is still at its default setting of `NO_PREFERENCE` and `SHARD_MOVE_PRIMARY_FIRST_SETTING` is enabled we move the primary shards first. This ensures that users still using this setting will not see any changes in behavior. 

Reference: #1445

Parent issue: #3881
---------

Signed-off-by: Poojita Raj <[email protected]>
(cherry picked from commit c6e4bcd)
  • Loading branch information
Poojita-Raj authored Aug 7, 2023
1 parent f4f4886 commit ce8c6b1
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606))
- Disallow compression level to be set for default and best_compression index codecs ([#8737]()https://github.com/opensearch-project/OpenSearch/pull/8737)
- [distribution/archives] [Linux] [x64] Provide the variant of the distributions bundled with JRE ([#8195]()https://github.com/opensearch-project/OpenSearch/pull/8195)
- Prioritize replica shard movement during shard relocation ([#8875](https://github.com/opensearch-project/OpenSearch/pull/8875))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
Expand Down
167 changes: 99 additions & 68 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -1310,100 +1310,131 @@ private void ensureMutable() {
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node
* @return iterator of shard routings
* Returns iterator of shard routings used by {@link #nodeInterleavedShardIterator(ShardMovementStrategy)}
* @param primaryFirst true when ShardMovementStrategy = ShardMovementStrategy.PRIMARY_FIRST, false when it is ShardMovementStrategy.REPLICA_FIRST
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(boolean movePrimaryFirst) {
private Iterator<ShardRouting> buildIteratorForMovementStrategy(boolean primaryFirst) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (movePrimaryFirst) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> shardRoutings = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> shardIterators = new ArrayDeque<>();

public boolean hasNext() {
while (queue.isEmpty() == false) {
if (queue.peek().hasNext()) {
return true;
}
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;
}
replicaIterators.poll();
queue.poll();
}
if (!shardRoutings.isEmpty()) {
return true;
}
while (!shardIterators.isEmpty()) {
if (shardIterators.peek().hasNext()) {
return true;
}
return false;
shardIterators.poll();
}
return false;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (primaryFirst) {
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
shardRoutings.offer(result);
shardIterators.offer(iter);
}
} else {
while (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary() == false) {
queue.offer(iter);
return result;
}
shardRoutings.offer(result);
shardIterators.offer(iter);
}
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);

assert !replicaShard.primary();
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
if (!shardRoutings.isEmpty()) {
return shardRoutings.poll();
}
};
Iterator<ShardRouting> replicaIterator = shardIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
shardIterators.offer(replicaIterator);

assert replicaShard.primary() != primaryFirst;
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
}

};
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node
* if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node
* if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration
* @return iterator of shard routings
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) {
return buildIteratorForMovementStrategy(true);
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
if (shardMovementStrategy == ShardMovementStrategy.REPLICA_FIRST) {
return buildIteratorForMovementStrategy(false);
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
queue.poll();
return false;
}
return false;
}

@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}

public void remove() {
throw new UnsupportedOperationException();
}
};
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;

import java.util.Locale;

/**
* ShardMovementStrategy defines the order in which shard movement occurs.
*
* ShardMovementStrategy values or rather their string representation to be used with
* {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings.
*
* @opensearch.internal
*/
public enum ShardMovementStrategy {
/**
* default behavior in which order of shard movement doesn't matter.
*/
NO_PREFERENCE,

/**
* primary shards are moved first
*/
PRIMARY_FIRST,

/**
* replica shards are moved first
*/
REPLICA_FIRST;

public static ShardMovementStrategy parse(String strValue) {
if (strValue == null) {
return null;
} else {
strValue = strValue.toUpperCase(Locale.ROOT);
try {
return ShardMovementStrategy.valueOf(strValue);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]");
}
}
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
Expand Down Expand Up @@ -107,8 +108,22 @@ public class BalancedShardsAllocator implements ShardsAllocator {
"cluster.routing.allocation.move.primary_first",
false,
Property.Dynamic,
Property.NodeScope,
Property.Deprecated
);

/**
* Decides order in which to move shards from node when shards can not stay on node anymore. {@link LocalShardsBalancer#moveShards()}
* Encapsulates behavior of above SHARD_MOVE_PRIMARY_FIRST_SETTING.
*/
public static final Setting<ShardMovementStrategy> SHARD_MOVEMENT_STRATEGY_SETTING = new Setting<ShardMovementStrategy>(
"cluster.routing.allocation.shard_movement_strategy",
ShardMovementStrategy.NO_PREFERENCE.toString(),
ShardMovementStrategy::parse,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.threshold",
1.0f,
Expand All @@ -131,6 +146,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;
private volatile WeightFunction weightFunction;
Expand All @@ -145,8 +161,10 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}
Expand All @@ -155,6 +173,10 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) {
this.movePrimaryFirst = movePrimaryFirst;
}

private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrategy) {
this.shardMovementStrategy = shardMovementStrategy;
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
}
Expand Down Expand Up @@ -184,6 +206,7 @@ public void allocate(RoutingAllocation allocation) {
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand All @@ -205,6 +228,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand Down Expand Up @@ -456,11 +480,12 @@ public Balancer(
Logger logger,
RoutingAllocation allocation,
boolean movePrimaryFirst,
ShardMovementStrategy shardMovementStrategy,
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, movePrimaryFirst, weight, threshold, preferPrimaryBalance);
super(logger, allocation, movePrimaryFirst, shardMovementStrategy, weight, threshold, preferPrimaryBalance);
}
}

Expand Down
Loading

0 comments on commit ce8c6b1

Please sign in to comment.