Skip to content

Commit

Permalink
[Remote Store] Waiting for remote store upload in snapshot/local reco…
Browse files Browse the repository at this point in the history
…very (#11720)

* Giving time for snapshot recovery/local time to upload all the data to remote

Signed-off-by: Gaurav Bafna <[email protected]>
(cherry picked from commit 4d055b8)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Feb 5, 2024
1 parent 4a3c324 commit 71e1b24
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.util.concurrent.ExecutionException;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -130,4 +132,61 @@ public void testCreateCloneIndex() {

}

public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
Version version = VersionUtils.randomIndexCompatibleVersion(random());
int numPrimaryShards = 1;
prepareCreate("source").setSettings(
Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version)
).get();
final int docs = 2;
for (int i = 0; i < docs; i++) {
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}
internalCluster().ensureAtLeastNumDataNodes(2);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();
// relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureGreen();

// disable rebalancing to be able to capture the right stats. balancing can move the target primary
// making it hard to pin point the source shards.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
.get();
try {
setFailRate(REPOSITORY_NAME, 100);

client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.CLONE)
.setWaitForActiveShards(0)
.setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build())
.get();

Thread.sleep(2000);
ensureYellow("target");

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
setFailRate(REPOSITORY_NAME, 0);
ensureGreen();
// clean up
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
)
.get();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.bulk.BulkItemResponse;
Expand Down Expand Up @@ -37,7 +39,7 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;

Expand All @@ -60,6 +62,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -146,6 +149,18 @@ protected Settings nodeSettings(int nodeOrdinal) {
}
}

protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
assertAcked(
client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
);
}

public Settings indexSettings() {
return defaultIndexSettings();
}
Expand Down Expand Up @@ -224,10 +239,10 @@ public static Settings buildRemoteStoreNodeAttributes(
return buildRemoteStoreNodeAttributes(
segmentRepoName,
segmentRepoPath,
FsRepository.TYPE,
ReloadableFsRepository.TYPE,
translogRepoName,
translogRepoPath,
FsRepository.TYPE,
ReloadableFsRepository.TYPE,
withRateLimiterAttributes
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -479,7 +480,14 @@ public void testRateLimitedRemoteDownloads() throws Exception {
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
settings.put("location", segmentRepoPath).put("max_remote_download_bytes_per_sec", 4, ByteSizeUnit.KB);

assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get());
assertAcked(
client().admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType(ReloadableFsRepository.TYPE)
.setSettings(settings)
.get()
);

for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME);
Expand Down Expand Up @@ -508,7 +516,14 @@ public void testRateLimitedRemoteDownloads() throws Exception {
// revert repo metadata to pass asserts on repo metadata vs. node attrs during teardown
// https://github.com/opensearch-project/OpenSearch/pull/9569#discussion_r1345668700
settings.remove("max_remote_download_bytes_per_sec");
assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get());
assertAcked(
client().admin()
.cluster()
.preparePutRepository(REPOSITORY_NAME)
.setType(ReloadableFsRepository.TYPE)
.setSettings(settings)
.get()
);
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME);
assertNull(segmentRepo.getMetadata().settings().get("max_remote_download_bytes_per_sec"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -53,7 +53,7 @@ public void testRestrictedSettingsCantBeUpdated() {
assertEquals(
e.getMessage(),
"[system-repo-name] trying to modify an unmodifiable attribute type of system "
+ "repository from current value [fs] to new value [mock]"
+ "repository from current value [reloadable-fs] to new value [mock]"
);
}

Expand All @@ -65,7 +65,12 @@ public void testSystemRepositoryNonRestrictedSettingsCanBeUpdated() {
final Settings.Builder repoSettings = Settings.builder().put("location", absolutePath).put("chunk_size", new ByteSizeValue(20));

assertAcked(
client.admin().cluster().preparePutRepository(systemRepoName).setType(FsRepository.TYPE).setSettings(repoSettings).get()
client.admin()
.cluster()
.preparePutRepository(systemRepoName)
.setType(ReloadableFsRepository.TYPE)
.setSettings(repoSettings)
.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
Expand Down
28 changes: 20 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2034,23 +2034,35 @@ public RemoteSegmentStoreDirectory getRemoteDirectory() {
}

/**
Returns true iff it is able to verify that remote segment store
is in sync with local
* Returns true iff it is able to verify that remote segment store
* is in sync with local
*/
boolean isRemoteSegmentStoreInSync() {
assert indexSettings.isRemoteStoreEnabled();
try {
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
if (directory.readLatestMetadataFile() != null) {
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
Collection<String> uploadFiles = directory.getSegmentsUploadedToRemoteStore().keySet();
SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
Collection<String> localFiles = segmentInfos.files(true);
if (uploadFiles.containsAll(localFiles)) {
return true;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = getSegmentInfosSnapshot()) {
Collection<String> localSegmentInfosFiles = segmentInfosGatedCloseable.get().files(true);
Set<String> localFiles = new HashSet<>(localSegmentInfosFiles);
// verifying that all files except EXCLUDE_FILES are uploaded to the remote
localFiles.removeAll(RemoteStoreRefreshListener.EXCLUDE_FILES);
if (uploadFiles.containsAll(localFiles)) {
return true;
}
logger.debug(
() -> new ParameterizedMessage(

Check warning on line 2055 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L2054-L2055

Added lines #L2054 - L2055 were not covered by tests
"RemoteSegmentStoreSyncStatus localSize={} remoteSize={}",
localFiles.size(),
uploadFiles.size()

Check warning on line 2058 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L2057-L2058

Added lines #L2057 - L2058 were not covered by tests
)
);
}
}
} catch (IOException e) {
} catch (AlreadyClosedException e) {
throw e;
} catch (Throwable e) {

Check warning on line 2065 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L2063-L2065

Added lines #L2063 - L2065 were not covered by tests
logger.error("Exception while reading latest metadata", e);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,33 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
|| isRefreshAfterCommitSafe()
|| isRemoteSegmentStoreInSync() == false;
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
return this.primaryTerm != indexShard.getOperationPrimaryTerm();
}

/**
* Checks if all files present in local store are uploaded to remote store or part of excluded files.
*
* Different from IndexShard#isRemoteSegmentStoreInSync as
* it uses files uploaded cache in RemoteDirector and it doesn't make a remote store call.
* Doesn't throw an exception on store getting closed as store will be open
*
*
* @return true iff all the local files are uploaded to remote store.
*/
boolean isRemoteSegmentStoreInSync() {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
return segmentInfosGatedCloseable.get().files(true).stream().allMatch(this::skipUpload);
} catch (Throwable throwable) {
logger.error("Throwable thrown during isRemoteSegmentStoreInSync", throwable);
}
return false;
}

/*
@return false if retry is needed
*/
Expand Down
38 changes: 35 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -191,7 +193,8 @@ void recoverFromLocalShards(
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID());
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);

Check warning on line 197 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L197

Added line #L197 was not covered by tests
if (indexShard.isRemoteSegmentStoreInSync() == false) {
throw new IndexShardRecoveryException(
indexShard.shardId(),
Expand Down Expand Up @@ -432,7 +435,8 @@ void recoverFromSnapshotAndRemoteStore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);

Check warning on line 439 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L439

Added line #L439 was not covered by tests
if (indexShard.isRemoteSegmentStoreInSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
Expand Down Expand Up @@ -717,7 +721,8 @@ private void restore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
waitForRemoteStoreSync(indexShard);

Check warning on line 725 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L725

Added line #L725 was not covered by tests
if (indexShard.isRemoteSegmentStoreInSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
Expand Down Expand Up @@ -791,4 +796,31 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO
);
store.associateIndexWithNewTranslog(translogUUID);
}

/*
Blocks the calling thread, waiting for the remote store to get synced till internal Remote Upload Timeout
*/
private void waitForRemoteStoreSync(IndexShard indexShard) {
if (indexShard.shardRouting.primary() == false) {
return;

Check warning on line 805 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L805

Added line #L805 was not covered by tests
}
long startNanos = System.nanoTime();

Check warning on line 807 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L807

Added line #L807 was not covered by tests

while (System.nanoTime() - startNanos < indexShard.getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
try {
if (indexShard.isRemoteSegmentStoreInSync()) {
break;

Check warning on line 812 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L812

Added line #L812 was not covered by tests
} else {
try {
Thread.sleep(TimeValue.timeValueMinutes(1).seconds());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}

Check warning on line 818 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L815-L818

Added lines #L815 - L818 were not covered by tests
}
} catch (AlreadyClosedException e) {

Check warning on line 820 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L820

Added line #L820 was not covered by tests
// There is no point in waiting as shard is now closed .
return;
}

Check warning on line 823 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L822-L823

Added lines #L822 - L823 were not covered by tests
}
}

Check warning on line 825 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L825

Added line #L825 was not covered by tests
}
Loading

0 comments on commit 71e1b24

Please sign in to comment.