Skip to content

Commit

Permalink
[Segment Replication] Update force segment replication round to be sy…
Browse files Browse the repository at this point in the history
…nchronous (#5898)

* Update force segment replication sync to be synchronous

Signed-off-by: Suraj Singh <[email protected]>

* Add logs and fix spotlessApply

Signed-off-by: Suraj Singh <[email protected]>

    pick da8cb72ab4f Update unit test post rebase

* Update unit test post rebase

Signed-off-by: Suraj Singh <[email protected]>

* Update integration tests

Signed-off-by: Suraj Singh <[email protected]>

* Mute testPrimaryRelocationWithSegRepFailure

Signed-off-by: Suraj Singh <[email protected]>

* Remove extra closing bracket after main merge

Signed-off-by: Suraj Singh <[email protected]>

* PR feedback

Signed-off-by: Suraj Singh <[email protected]>

* Spotless fix

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Jan 27, 2023
1 parent 8d4bbef commit ebb5813
Show file tree
Hide file tree
Showing 15 changed files with 595 additions and 549 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Nullable
protected ShardRouting getShardRoutingForNodeName(String nodeName) {
final ClusterState state = getClusterState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
final String nodeId = shardRouting.currentNodeId();
final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId);
if (discoveryNode.getName().equals(nodeName)) {
return shardRouting;
}
}
}
return null;
}

protected void assertDocCounts(int expectedDocCount, String... nodeNames) {
for (String node : nodeNames) {
assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount);
}
}

protected ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

protected DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = getClusterState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());
}

/**
* Waits until all given nodes have at least the expected docCount.
*
* @param docCount - Expected Doc count.
* @param nodes - List of node names.
*/
protected void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
// wait until the replica has the latest segment generation.
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value;
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
}
}
}, 1, TimeUnit.MINUTES);
}

protected void waitForSearchableDocs(long docCount, String... nodes) throws Exception {
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
}

protected void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
final String indexName = primaryRouting.getIndexName();
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName);
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
for (ShardRouting replica : replicaRouting) {
IndexShard replicaShard = getIndexShard(clusterState, replica, indexName);
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
primarySegmentMetadata,
replicaShard.getSegmentMetadataMap()
);
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
fail(
"Expected no missing or different segments between primary and replica but diff was missing: "
+ recoveryDiff.missing
+ " Different: "
+ recoveryDiff.different
+ " Primary Replication Checkpoint : "
+ primaryShard.getLatestReplicationCheckpoint()
+ " Replica Replication Checkpoint: "
+ replicaShard.getLatestReplicationCheckpoint()
);
}
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
replicaShard.store().readLastCommittedSegmentsInfo();
}
}
}
}, 1, TimeUnit.MINUTES);
}

private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName);
}

protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,29 @@
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.Collection;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -61,56 +43,8 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void ingestDocs(int docCount) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);
refresh(INDEX_NAME);
}
}

public class SegmentReplicationIT extends SegmentReplicationBaseIT {
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -660,106 +594,4 @@ public void testDropPrimaryDuringReplication() throws Exception {
verifyStoreContent();
}
}

/**
* Waits until all given nodes have at least the expected docCount.
*
* @param docCount - Expected Doc count.
* @param nodes - List of node names.
*/
private void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
// wait until the replica has the latest segment generation.
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value;
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
}
}
}, 1, TimeUnit.MINUTES);
}

private void waitForSearchableDocs(long docCount, String... nodes) throws Exception {
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
}

private void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
final String indexName = primaryRouting.getIndexName();
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName);
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
for (ShardRouting replica : replicaRouting) {
IndexShard replicaShard = getIndexShard(clusterState, replica, indexName);
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
primarySegmentMetadata,
replicaShard.getSegmentMetadataMap()
);
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
fail(
"Expected no missing or different segments between primary and replica but diff was missing: "
+ recoveryDiff.missing
+ " Different: "
+ recoveryDiff.different
+ " Primary Replication Checkpoint : "
+ primaryShard.getLatestReplicationCheckpoint()
+ " Replica Replication Checkpoint: "
+ replicaShard.getLatestReplicationCheckpoint()
);
}
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
replicaShard.store().readLastCommittedSegmentsInfo();
}
}
}
}, 1, TimeUnit.MINUTES);
}

private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName);
}

private IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
}

@Nullable
private ShardRouting getShardRoutingForNodeName(String nodeName) {
final ClusterState state = getClusterState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
final String nodeId = shardRouting.currentNodeId();
final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId);
if (discoveryNode.getName().equals(nodeName)) {
return shardRouting;
}
}
}
return null;
}

private void assertDocCounts(int expectedDocCount, String... nodeNames) {
for (String node : nodeNames) {
assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount);
}
}

private ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

private DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = getClusterState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());
}
}
Loading

0 comments on commit ebb5813

Please sign in to comment.