Skip to content

Commit

Permalink
[Remote Store] Add support to restore only unassigned shards of an in…
Browse files Browse the repository at this point in the history
…dex (opensearch-project#8792)

* Add support to restore only unassigned shards of an index

---------

Signed-off-by: Sachin Kale <[email protected]>
Signed-off-by: Sachin Kale <[email protected]>
Co-authored-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale and Sachin Kale committed Jul 28, 2023
1 parent 5164fe2 commit ab5b218
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 77 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303))
- Make Span exporter configurable ([#8620](https://github.com/opensearch-project/OpenSearch/issues/8620))
- Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303))
- [Refactor] StreamIO from common to core.common namespace in core lib ([#8157](https://github.com/opensearch-project/OpenSearch/pull/8157))
- [Refactor] Remaining HPPC to java.util collections ([#8730](https://github.com/opensearch-project/OpenSearch/pull/8730))
- Remote Segment Store Repository setting moved from `index.remote_store.repository` to `index.remote_store.segment.repository` and `cluster.remote_store.repository` to `cluster.remote_store.segment.repository` respectively for Index and Cluster level settings ([#8719](https://github.com/opensearch-project/OpenSearch/pull/8719))
- Change InternalSignificantTerms to sum shard-level superset counts only in final reduce ([#8735](https://github.com/opensearch-project/OpenSearch/pull/8735))
- Exclude 'benchmarks' from codecov report ([#8805](https://github.com/opensearch-project/OpenSearch/pull/8805))
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792))
- [Remote Store] Add support to restore only unassigned shards of an index ([#8792](https://github.com/opensearch-project/OpenSearch/pull/8792))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -37,6 +38,13 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final int REPLICA_COUNT = 1;
protected Path absolutePath;
protected Path absolutePath2;
private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5)
);

@Override
protected boolean addMockInternalEngine() {
Expand All @@ -59,7 +67,7 @@ public Settings indexSettings() {
IndexResponse indexSingleDoc(String indexName) {
return client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,17 @@ private void testRestoreWithMergeFlow(int numberOfIterations, boolean invokeFlus
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, flushAfterMerge, deletedDocs);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
}
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(INDEX_NAME);

if (deletedDocs == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.indices.recovery.RecoveryState;
Expand All @@ -33,13 +33,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.comparesEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
Expand All @@ -65,13 +66,6 @@ public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush, String index) {
long totalOperations = 0;
long refreshedOrFlushedOperations = 0;
Expand All @@ -90,7 +84,7 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush,
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = INDEX_NAME.equals(index) ? indexSingleDoc() : indexSingleDoc(index);
IndexResponse response = indexSingleDoc(index);
maxSeqNo = response.getSeqNo();
shardId = response.getShardId().id();
indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo);
Expand All @@ -106,12 +100,14 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush,
}

private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal, String indexName) {
// This is required to get updated number from already active shards which were not restored
refresh(indexName);
String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS;
String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED;
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity));
IndexResponse response = INDEX_NAME.equals(indexName) ? indexSingleDoc() : indexSingleDoc(indexName);
IndexResponse response = indexSingleDoc(indexName);
assertEquals(indexStats.get(maxSeqNoGranularity + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo());
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity) + 1);
Expand All @@ -127,6 +123,28 @@ private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, St
}
}

private void restore(String... indices) {
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
}

private void restoreAndVerify(int shardCount, int replicaCount, Map<String, Long> indexStats) {
restore(INDEX_NAME);
ensureGreen(INDEX_NAME);
// This is required to get updated number from already active shards which were not restored
assertEquals(shardCount * (1 + replicaCount), getNumShards(INDEX_NAME).totalNumShards);
assertEquals(replicaCount, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, true, INDEX_NAME);
}

/**
* Helper function to test restoring an index with no replication from remote store. Only primary node is dropped.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
Expand All @@ -141,23 +159,16 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int sh
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
verifyRestoredData(indexStats, true, INDEX_NAME);
restoreAndVerify(shardCount, 0, indexStats);
}

/**
* Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop.
* @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
* @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
* @throws IOException IO Exception.
*/
private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush, int shardCount)
throws IOException {
private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException {
prepareCluster(1, 2, INDEX_NAME, 1, shardCount);
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
Expand All @@ -167,14 +178,7 @@ private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int
ensureRed(INDEX_NAME);
internalCluster().startDataOnlyNodes(2);

assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);

assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
assertEquals(0, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, true, INDEX_NAME);
restoreAndVerify(shardCount, 1, indexStats);
}

/**
Expand Down Expand Up @@ -209,17 +213,54 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices);
for (String index : indices) {
assertEquals(shardCount, getNumShards(index).totalNumShards);
verifyRestoredData(indicesStats.get(index), true, index);
}
}

public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture();
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true), future);
try {
future.get();
} catch (ExecutionException e) {
// If the request goes to co-ordinator, e.getCause() can be RemoteTransportException
assertTrue(e.getCause() instanceof IllegalStateException || e.getCause().getCause() instanceof IllegalStateException);
}
}

public void testRestoreFlowNoRedIndex() {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(false), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
verifyRestoredData(indexStats, true, INDEX_NAME);
}

/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
Expand Down Expand Up @@ -262,7 +303,7 @@ public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, 1, true, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(1, true, randomIntBetween(1, 5));
}

/**
Expand All @@ -271,7 +312,7 @@ public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOExce
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, 1, false, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(1, false, randomIntBetween(1, 5));
}

/**
Expand All @@ -281,7 +322,7 @@ public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOExc
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), false, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), false, randomIntBetween(1, 5));
}

/**
Expand All @@ -291,7 +332,7 @@ public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOExcepti
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), true, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), true, randomIntBetween(1, 5));
}

/**
Expand Down Expand Up @@ -338,10 +379,7 @@ public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOExceptio
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices));
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(new String[] {}), PlainActionFuture.newFuture());
restore(indices);
ensureGreen(indices);

for (String index : indices) {
Expand Down Expand Up @@ -378,10 +416,16 @@ public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOExc
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indices[0], indices[1]), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(indices[0], indices[1]).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices[0], indices[1]);
assertEquals(shardCount, getNumShards(indices[0]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]);
Expand Down Expand Up @@ -424,10 +468,16 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*"), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*").restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices[0], indices[1]);
assertEquals(shardCount, getNumShards(indices[0]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]);
Expand Down Expand Up @@ -487,7 +537,7 @@ private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throw
assertEquals(0, recoverySource.get().getIndex().recoveredFileCount());
}

IndexResponse response = indexSingleDoc();
IndexResponse response = indexSingleDoc(INDEX_NAME);
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertBusy(
Expand Down
Loading

0 comments on commit ab5b218

Please sign in to comment.