Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add support to restore only unassigned shards of an index #8792

Merged
merged 15 commits into from
Jul 27, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Remote Segment Store Repository setting moved from `index.remote_store.repository` to `index.remote_store.segment.repository` and `cluster.remote_store.repository` to `cluster.remote_store.segment.repository` respectively for Index and Cluster level settings ([#8719](https://github.com/opensearch-project/OpenSearch/pull/8719))
- [Remote Store] Add support to restore only unassigned shards of an index ([#8792](https://github.com/opensearch-project/OpenSearch/pull/8792))
- Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -37,6 +38,13 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final int REPLICA_COUNT = 1;
protected Path absolutePath;
protected Path absolutePath2;
private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5)
);

@Override
protected boolean addMockInternalEngine() {
Expand All @@ -59,7 +67,7 @@ public Settings indexSettings() {
IndexResponse indexSingleDoc(String indexName) {
return client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,17 @@ private void testRestoreWithMergeFlow(int numberOfIterations, boolean invokeFlus
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, flushAfterMerge, deletedDocs);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
}
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(INDEX_NAME);

if (deletedDocs == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.indices.recovery.RecoveryState;
Expand All @@ -34,15 +34,16 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
Expand All @@ -68,13 +69,6 @@ public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush, String index) {
long totalOperations = 0;
long refreshedOrFlushedOperations = 0;
Expand All @@ -93,7 +87,7 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush,
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = INDEX_NAME.equals(index) ? indexSingleDoc() : indexSingleDoc(index);
IndexResponse response = indexSingleDoc(index);
maxSeqNo = response.getSeqNo();
shardId = response.getShardId().id();
indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo);
Expand All @@ -109,12 +103,14 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush,
}

private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal, String indexName) {
// This is required to get updated number from already active shards which were not restored
refresh(indexName);
String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS;
String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED;
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity));
IndexResponse response = INDEX_NAME.equals(indexName) ? indexSingleDoc() : indexSingleDoc(indexName);
IndexResponse response = indexSingleDoc(indexName);
assertEquals(indexStats.get(maxSeqNoGranularity + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo());
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity) + 1);
Expand All @@ -130,6 +126,28 @@ private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, St
}
}

private void restore(String... indices) {
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
}

private void restoreAndVerify(int shardCount, int replicaCount, Map<String, Long> indexStats) {
restore(INDEX_NAME);
ensureGreen(INDEX_NAME);
// This is required to get updated number from already active shards which were not restored
assertEquals(shardCount * (1 + replicaCount), getNumShards(INDEX_NAME).totalNumShards);
assertEquals(replicaCount, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, true, INDEX_NAME);
}

/**
* Helper function to test restoring an index with no replication from remote store. Only primary node is dropped.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
Expand All @@ -144,23 +162,16 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int sh
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
verifyRestoredData(indexStats, true, INDEX_NAME);
restoreAndVerify(shardCount, 0, indexStats);
}

/**
* Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop.
* @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
* @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
* @throws IOException IO Exception.
*/
private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush, int shardCount)
throws IOException {
private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException {
prepareCluster(1, 2, INDEX_NAME, 1, shardCount);
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
Expand All @@ -170,14 +181,7 @@ private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int
ensureRed(INDEX_NAME);
internalCluster().startDataOnlyNodes(2);

assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);

assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
assertEquals(0, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, true, INDEX_NAME);
restoreAndVerify(shardCount, 1, indexStats);
}

/**
Expand Down Expand Up @@ -212,17 +216,54 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices);
for (String index : indices) {
assertEquals(shardCount, getNumShards(index).totalNumShards);
verifyRestoredData(indicesStats.get(index), true, index);
}
}

public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException {
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture();
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true), future);
try {
future.get();
} catch (ExecutionException e) {
// If the request goes to co-ordinator, e.getCause() can be RemoteTransportException
assertTrue(e.getCause() instanceof IllegalStateException || e.getCause().getCause() instanceof IllegalStateException);
}
}
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

public void testRestoreFlowNoRedIndex() {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(false), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
verifyRestoredData(indexStats, true, INDEX_NAME);
}

/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
Expand Down Expand Up @@ -265,7 +306,7 @@ public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, 1, true, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(1, true, randomIntBetween(1, 5));
}

/**
Expand All @@ -274,7 +315,7 @@ public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOExce
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, 1, false, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(1, false, randomIntBetween(1, 5));
}

/**
Expand All @@ -284,7 +325,7 @@ public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOExc
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), false, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), false, randomIntBetween(1, 5));
}

/**
Expand All @@ -294,7 +335,7 @@ public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOExcepti
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), true, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), true, randomIntBetween(1, 5));
}

/**
Expand Down Expand Up @@ -341,10 +382,7 @@ public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOExceptio
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices));
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(new String[] {}), PlainActionFuture.newFuture());
restore(indices);
ensureGreen(indices);

for (String index : indices) {
Expand Down Expand Up @@ -381,10 +419,16 @@ public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOExc
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indices[0], indices[1]), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(indices[0], indices[1]).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices[0], indices[1]);
assertEquals(shardCount, getNumShards(indices[0]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]);
Expand Down Expand Up @@ -427,10 +471,16 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*"), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*").restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices[0], indices[1]);
assertEquals(shardCount, getNumShards(indices[0]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]);
Expand Down Expand Up @@ -490,7 +540,7 @@ private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throw
assertEquals(0, recoverySource.get().getIndex().recoveredFileCount());
}

IndexResponse response = indexSingleDoc();
IndexResponse response = indexSingleDoc(INDEX_NAME);
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertBusy(
Expand Down
Loading