Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
… into pitsegments
  • Loading branch information
bharath-techie committed Aug 18, 2022
2 parents 903588e + d308a29 commit aa04c05
Show file tree
Hide file tree
Showing 26 changed files with 955 additions and 72 deletions.
1 change: 1 addition & 0 deletions .ci/bwcVersions
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ BWC_VERSION:
- "2.1.0"
- "2.1.1"
- "2.2.0"
- "2.2.1"
- "2.3.0"
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
Expand All @@ -30,6 +33,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
Expand All @@ -38,6 +42,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -72,6 +77,109 @@ protected boolean addMockInternalEngine() {
return false;
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// index another doc but don't refresh, we will ensure this is searchable once replica is promoted.
client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// stop the primary node - we only have one shard on here.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
ensureYellowAndNoInitializingShards(INDEX_NAME);

final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica);
assertNotNull(replicaShardRouting);
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

// assert we can index into the new primary.
client().prepareIndex(INDEX_NAME).setId("3").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4);
assertSegmentStats(REPLICA_COUNT);
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);

final int initialDocCount = 1;
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertDocCounts(initialDocCount, replica, primary);

internalCluster().restartNode(primary);
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), replica);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();

assertDocCounts(initialDocCount, replica, primary);
assertSegmentStats(REPLICA_COUNT);
}

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);

waitForReplicaUpdate();
assertDocCounts(initialDocCount, replica, primary);

final IndexShard indexShard = getIndexShard(primary);
client().admin()
.cluster()
.prepareReroute()
.add(new CancelAllocationCommand(INDEX_NAME, indexShard.shardId().id(), primary, true))
.execute()
.actionGet();
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), replica);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();

assertDocCounts(initialDocCount, replica, primary);
assertSegmentStats(REPLICA_COUNT);
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Expand Down Expand Up @@ -239,9 +347,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

final Index index = resolveIndex(INDEX_NAME);
IndexShard primaryShard = getIndexShard(index, primaryNode);
IndexShard replicaShard = getIndexShard(index, replicaNode);
IndexShard primaryShard = getIndexShard(primaryNode);
IndexShard replicaShard = getIndexShard(replicaNode);
assertEquals(
primaryShard.translogStats().estimatedNumberOfOperations(),
replicaShard.translogStats().estimatedNumberOfOperations()
Expand Down Expand Up @@ -293,9 +400,22 @@ public void testDeleteOperations() throws Exception {

refresh(INDEX_NAME);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount - 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount - 1);
assertBusy(() -> {
final long nodeA_Count = client(nodeA).prepareSearch(INDEX_NAME)
.setSize(0)
.setPreference("_only_local")
.get()
.getHits()
.getTotalHits().value;
assertEquals(expectedHitCount - 1, nodeA_Count);
final long nodeB_Count = client(nodeB).prepareSearch(INDEX_NAME)
.setSize(0)
.setPreference("_only_local")
.get()
.getHits()
.getTotalHits().value;
assertEquals(expectedHitCount - 1, nodeB_Count);
}, 5, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -337,8 +457,7 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException {
final ShardRouting replicaShardRouting = shardSegment.getShardRouting();
ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState();
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
final Index index = resolveIndex(INDEX_NAME);
IndexShard indexShard = getIndexShard(index, replicaNode.getName());
IndexShard indexShard = getIndexShard(replicaNode.getName());
final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory());
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName);
Expand Down Expand Up @@ -378,7 +497,8 @@ private void waitForReplicaUpdate() throws Exception {
});
}

private IndexShard getIndexShard(Index index, String node) {
private IndexShard getIndexShard(String node) {
final Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
Expand All @@ -395,7 +515,8 @@ private List<ShardSegments[]> getShardSegments(IndicesSegmentResponse indicesSeg
}

private Map<String, Segment> getLatestSegments(ShardSegments segments) {
final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get();
final Optional<Long> generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare);
final Long latestPrimaryGen = generation.get();
return segments.getSegments()
.stream()
.filter(s -> s.getGeneration() == latestPrimaryGen)
Expand All @@ -405,4 +526,31 @@ private Map<String, Segment> getLatestSegments(ShardSegments segments) {
private Map<Boolean, List<ShardSegments>> segmentsByShardType(ShardSegments[] replicationGroupSegments) {
return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
}

@Nullable
private ShardRouting getShardRoutingForNodeName(String nodeName) {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
final String nodeId = shardRouting.currentNodeId();
final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId);
if (discoveryNode.getName().equals(nodeName)) {
return shardRouting;
}
}
}
return null;
}

private void assertDocCounts(int expectedDocCount, String... nodeNames) {
for (String node : nodeNames) {
assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount);
}
}

private DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());
}
}
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_2_1_0 = new Version(2010099, org.apache.lucene.util.Version.LUCENE_9_2_0);
public static final Version V_2_1_1 = new Version(2010199, org.apache.lucene.util.Version.LUCENE_9_2_0);
public static final Version V_2_2_0 = new Version(2020099, org.apache.lucene.util.Version.LUCENE_9_3_0);
public static final Version V_2_2_1 = new Version(2020199, org.apache.lucene.util.Version.LUCENE_9_3_0);
public static final Version V_2_3_0 = new Version(2030099, org.apache.lucene.util.Version.LUCENE_9_3_0);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_4_0);
public static final Version CURRENT = V_3_0_0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS
NodeGatewayStartedShards::primary
).reversed();

private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShards::replicationCheckpoint,
Comparator.nullsLast(Comparator.naturalOrder())
);

/**
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
Expand Down Expand Up @@ -381,16 +386,24 @@ protected static NodeShardsResult buildNodeShardsResult(
}
}

/**
* Orders the active shards copies based on below comparators
* 1. No store exception i.e. shard copy is readable
* 2. Prefer previous primary shard
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
*/
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
if (matchAnyShard) {
// prefer shards with matching allocation ids
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())
).reversed();
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
.thenComparing(PRIMARY_FIRST_COMPARATOR);
.thenComparing(PRIMARY_FIRST_COMPARATOR)
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
} else {
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR)
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
}

nodeShardStates.sort(comparator);
Expand Down
Loading

0 comments on commit aa04c05

Please sign in to comment.