-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'searchbackpressure/search-task-cancellation' of github.…
…com:PritLadani/OpenSearch into searchbackpressure/search-task-cancellation
- Loading branch information
Showing
33 changed files
with
1,338 additions
and
127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,8 @@ | ||
--- | ||
title: '[AUTOCUT] Gradle Check Failure.' | ||
title: '[AUTOCUT] Gradle Check Failure on push to {{ env.branch_name }}' | ||
labels: '>test-failure, bug' | ||
--- | ||
|
||
A gradle check workflow has failed after merge. | ||
|
||
PR: {{ env.workflow_url }} | ||
CommitId: {{ env.pr_from_sha }} | ||
Gradle check has failed on push of your commit {{ env.pr_from_sha }}. | ||
Please examine the workflow log {{ env.workflow_url }}. | ||
Is the failure [a flaky test](https://github.com/opensearch-project/OpenSearch/blob/main/DEVELOPER_GUIDE.md#flaky-tests) unrelated to your change? |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
libs/common/src/main/java/org/opensearch/common/collect/Triplet.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.common.collect; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
* A container for 3 elements, similar to {@link org.opensearch.common.collect.Tuple} | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class Triplet<V1, V2, V3> { | ||
|
||
public static <V1, V2, V3> Triplet<V1, V2, V3> tuple(V1 v1, V2 v2, V3 v3) { | ||
return new Triplet<>(v1, v2, v3); | ||
} | ||
|
||
private final V1 v1; | ||
private final V2 v2; | ||
|
||
private final V3 v3; | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
Triplet<?, ?, ?> triplet = (Triplet<?, ?, ?>) o; | ||
return Objects.equals(v1, triplet.v1) && Objects.equals(v2, triplet.v2) && Objects.equals(v3, triplet.v3); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(v1, v2, v3); | ||
} | ||
|
||
public Triplet(V1 v1, V2 v2, V3 v3) { | ||
this.v1 = v1; | ||
this.v2 = v2; | ||
this.v3 = v3; | ||
} | ||
|
||
public V1 v1() { | ||
return v1; | ||
} | ||
|
||
public V2 v2() { | ||
return v2; | ||
} | ||
|
||
public V3 v3() { | ||
return v3; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Tuple [v1=" + v1 + ", v2=" + v2 + ", v3=" + v3 + "]"; | ||
} | ||
} |
307 changes: 307 additions & 0 deletions
307
...alClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,307 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.indices.replication; | ||
|
||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.cluster.routing.RoutingNode; | ||
import org.opensearch.cluster.routing.RoutingNodes; | ||
import org.opensearch.cluster.routing.ShardRouting; | ||
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.index.IndexModule; | ||
import org.opensearch.indices.replication.common.ReplicationType; | ||
import org.opensearch.test.InternalTestCluster; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Formatter; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.TreeMap; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class SegmentReplicationAllocationIT extends SegmentReplicationBaseIT { | ||
|
||
private void createIndex(String idxName, int shardCount, int replicaCount, boolean isSegRep) { | ||
Settings.Builder builder = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount) | ||
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT); | ||
if (isSegRep) { | ||
builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); | ||
} else { | ||
builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT); | ||
} | ||
prepareCreate(idxName, builder).get(); | ||
} | ||
|
||
/** | ||
* This test verifies primary shard allocation is balanced. | ||
*/ | ||
public void testShardAllocation() throws Exception { | ||
internalCluster().startClusterManagerOnlyNode(); | ||
final int maxReplicaCount = 2; | ||
final int maxShardCount = 5; | ||
final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); | ||
final int numberOfIndices = randomIntBetween(5, 10); | ||
|
||
final List<String> nodeNames = new ArrayList<>(); | ||
logger.info("--> Creating {} nodes", nodeCount); | ||
for (int i = 0; i < nodeCount; i++) { | ||
nodeNames.add(internalCluster().startNode()); | ||
} | ||
assertAcked( | ||
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setPersistentSettings( | ||
Settings.builder().put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f") | ||
) | ||
); | ||
|
||
int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; | ||
ShardAllocations shardAllocations = new ShardAllocations(); | ||
ClusterState state; | ||
for (int i = 0; i < numberOfIndices; i++) { | ||
shardCount = randomIntBetween(1, maxShardCount); | ||
totalShardCount += shardCount; | ||
replicaCount = randomIntBetween(0, maxReplicaCount); | ||
totalReplicaCount += replicaCount; | ||
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); | ||
logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount); | ||
assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); | ||
state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
shardAllocations.printShardDistribution(state); | ||
} | ||
state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
RoutingNodes nodes = state.getRoutingNodes(); | ||
final float avgNumShards = (float) (totalShardCount) / (float) (nodes.size()); | ||
final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); | ||
final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); | ||
|
||
for (RoutingNode node : nodes) { | ||
assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); | ||
assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); | ||
} | ||
} | ||
|
||
/** | ||
* This test verifies shard allocation with changes to cluster config i.e. node add, removal keeps the primary shard | ||
* allocation balanced. | ||
*/ | ||
public void testAllocationWithDisruption() throws Exception { | ||
internalCluster().startClusterManagerOnlyNode(); | ||
final int maxReplicaCount = 2; | ||
final int maxShardCount = 5; | ||
final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); | ||
final int numberOfIndices = randomIntBetween(1, 10); | ||
|
||
logger.info("--> Creating {} nodes", nodeCount); | ||
final List<String> nodeNames = new ArrayList<>(); | ||
for (int i = 0; i < nodeCount; i++) { | ||
nodeNames.add(internalCluster().startNode()); | ||
} | ||
assertAcked( | ||
client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setPersistentSettings( | ||
Settings.builder() | ||
.put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f") | ||
.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0f") | ||
.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0f") | ||
.build() | ||
) | ||
); | ||
|
||
int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; | ||
ShardAllocations shardAllocations = new ShardAllocations(); | ||
ClusterState state; | ||
for (int i = 0; i < numberOfIndices; i++) { | ||
shardCount = randomIntBetween(1, maxShardCount); | ||
totalShardCount += shardCount; | ||
replicaCount = randomIntBetween(1, maxReplicaCount); | ||
totalReplicaCount += replicaCount; | ||
logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount); | ||
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); | ||
assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); | ||
if (logger.isTraceEnabled()) { | ||
state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
shardAllocations.printShardDistribution(state); | ||
} | ||
} | ||
state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
float avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); | ||
int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); | ||
int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); | ||
|
||
for (RoutingNode node : state.getRoutingNodes()) { | ||
assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); | ||
assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); | ||
} | ||
|
||
final int additionalNodeCount = randomIntBetween(1, 5); | ||
logger.info("--> Adding {} nodes", additionalNodeCount); | ||
|
||
internalCluster().startNodes(additionalNodeCount); | ||
assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); | ||
state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); | ||
minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); | ||
maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); | ||
shardAllocations.printShardDistribution(state); | ||
for (RoutingNode node : state.getRoutingNodes()) { | ||
assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); | ||
assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); | ||
} | ||
|
||
logger.info("--> Stop one third nodes"); | ||
for (int i = 1; i < nodeCount; i += 3) { | ||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(i))); | ||
// give replica a chance to promote as primary before terminating node containing the replica | ||
assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); | ||
} | ||
state = client().admin().cluster().prepareState().execute().actionGet().getState(); | ||
avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); | ||
minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); | ||
maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); | ||
shardAllocations.printShardDistribution(state); | ||
|
||
for (RoutingNode node : state.getRoutingNodes()) { | ||
assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); | ||
assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); | ||
} | ||
} | ||
|
||
/** | ||
* This class is created for debugging purpose to show shard allocation across nodes. It keeps cluster state which | ||
* is used to build the node's shard allocation | ||
*/ | ||
private class ShardAllocations { | ||
ClusterState state; | ||
|
||
public static final String separator = "==================================================="; | ||
public static final String ONE_LINE_RETURN = "\n"; | ||
public static final String TWO_LINE_RETURN = "\n\n"; | ||
|
||
/** | ||
Store shard primary/replica shard count against a node for segrep indices. | ||
String: NodeId | ||
int[]: tuple storing primary shard count in 0th index and replica's in 1 | ||
*/ | ||
TreeMap<String, int[]> nodeToSegRepCountMap = new TreeMap<>(); | ||
/** | ||
Store shard primary/replica shard count against a node for docrep indices. | ||
String: NodeId | ||
int[]: tuple storing primary shard count in 0th index and replica's in 1 | ||
*/ | ||
TreeMap<String, int[]> nodeToDocRepCountMap = new TreeMap<>(); | ||
|
||
/** | ||
* Helper map containing NodeName to NodeId | ||
*/ | ||
TreeMap<String, String> nameToNodeId = new TreeMap<>(); | ||
|
||
/* | ||
Unassigned array containing primary at 0, replica at 1 | ||
*/ | ||
int[] unassigned = new int[2]; | ||
|
||
int[] totalShards = new int[2]; | ||
|
||
public final String printShardAllocationWithHeader(int[] docrep, int[] segrep) { | ||
StringBuffer sb = new StringBuffer(); | ||
Formatter formatter = new Formatter(sb, Locale.getDefault()); | ||
formatter.format("%-20s %-20s %-20s %-20s\n", "P", docrep[0] + segrep[0], docrep[0], segrep[0]); | ||
formatter.format("%-20s %-20s %-20s %-20s\n", "R", docrep[1] + segrep[1], docrep[1], segrep[1]); | ||
return sb.toString(); | ||
} | ||
|
||
public void reset() { | ||
nodeToSegRepCountMap.clear(); | ||
nodeToDocRepCountMap.clear(); | ||
nameToNodeId.clear(); | ||
totalShards[0] = totalShards[1] = 0; | ||
unassigned[0] = unassigned[1] = 0; | ||
} | ||
|
||
public void setState(ClusterState state) { | ||
this.reset(); | ||
this.state = state; | ||
buildMap(); | ||
} | ||
|
||
private void buildMap() { | ||
for (RoutingNode node : state.getRoutingNodes()) { | ||
nameToNodeId.putIfAbsent(node.node().getName(), node.nodeId()); | ||
nodeToSegRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); | ||
nodeToDocRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); | ||
} | ||
for (ShardRouting shardRouting : state.routingTable().allShards()) { | ||
// Fetch shard to update. Initialize local array | ||
if (isIndexSegRep(shardRouting.getIndexName())) { | ||
updateMap(nodeToSegRepCountMap, shardRouting); | ||
} else { | ||
updateMap(nodeToDocRepCountMap, shardRouting); | ||
} | ||
} | ||
} | ||
|
||
void updateMap(TreeMap<String, int[]> mapToUpdate, ShardRouting shardRouting) { | ||
int[] shard; | ||
shard = shardRouting.assignedToNode() ? mapToUpdate.get(shardRouting.currentNodeId()) : unassigned; | ||
// Update shard type count | ||
if (shardRouting.primary()) { | ||
shard[0]++; | ||
totalShards[0]++; | ||
} else { | ||
shard[1]++; | ||
totalShards[1]++; | ||
} | ||
// For assigned shards, put back counter | ||
if (shardRouting.assignedToNode()) mapToUpdate.put(shardRouting.currentNodeId(), shard); | ||
} | ||
|
||
boolean isIndexSegRep(String indexName) { | ||
return state.metadata() | ||
.index(indexName) | ||
.getSettings() | ||
.get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey()) | ||
.equals(ReplicationType.SEGMENT.toString()); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
StringBuffer sb = new StringBuffer(); | ||
sb.append(TWO_LINE_RETURN + separator + ONE_LINE_RETURN); | ||
Formatter formatter = new Formatter(sb, Locale.getDefault()); | ||
for (Map.Entry<String, String> entry : nameToNodeId.entrySet()) { | ||
String nodeId = nameToNodeId.get(entry.getKey()); | ||
formatter.format("%-20s %-20s %-20s %-20s\n", entry.getKey().toUpperCase(Locale.getDefault()), "TOTAL", "DOCREP", "SEGREP"); | ||
sb.append(printShardAllocationWithHeader(nodeToDocRepCountMap.get(nodeId), nodeToSegRepCountMap.get(nodeId))); | ||
} | ||
sb.append(ONE_LINE_RETURN); | ||
formatter.format("%-20s %-20s %-20s\n\n", "Unassigned ", unassigned[0], unassigned[1]); | ||
formatter.format("%-20s %-20s %-20s\n\n", "Total Shards", totalShards[0], totalShards[1]); | ||
return sb.toString(); | ||
} | ||
|
||
public void printShardDistribution(ClusterState state) { | ||
this.setState(state); | ||
logger.info("--> Shard distribution {}", this); | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.