Skip to content

Commit

Permalink
Add ITs
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed May 8, 2023
1 parent 048398f commit b84a82b
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
Expand All @@ -31,8 +33,10 @@
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -74,28 +78,84 @@ private Settings remoteStoreIndexSettings(int numberOfReplicas) {
.build();
}

@After
public void teardown() {
public void deleteRepo() {
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

public void testRemoteRefreshRetryOnFailure() throws Exception {

Path location = randomRepoPath().toAbsolutePath();
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, location);
setup(location, 0.1d, "metadata");

// Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed
// due to IOExceptions that are thrown while doing uploadBlobs.
indexData(randomIntBetween(5, 10), randomBoolean());
logger.info("--> Indexed data");

// TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts.
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
assertEquals(1, response.getShards().length);

String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

logger.info("--> Verify that the segment files are same on local and repository eventually");
// This can take time as the retry interval is exponential and maxed at 30s
assertBusy(() -> {
Set<String> filesInLocal = getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath));
Set<String> filesInRepo = getSegmentFiles(segmentDataRepoPath);
assertTrue(filesInRepo.containsAll(filesInLocal));
}, 60, TimeUnit.SECONDS);
deleteRepo();
}

public void testWritesRejected() throws Exception {

Path location = randomRepoPath().toAbsolutePath();
setup(location, 1d, "metadata");

Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(request)
.get();
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");

logger.info("--> Indexing data");
OpenSearchRejectedExecutionException ex = assertThrows(OpenSearchRejectedExecutionException.class, () -> indexData(randomIntBetween(10, 20), randomBoolean()));
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
deleteRepo();
}

public void testRemoteRefreshSegmentPressureSettingChanged() {
Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get();
assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");

request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false).build();
response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get();
assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "false");
}

private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
createRepository(
REPOSITORY_NAME,
"mock",
Settings.builder()
.put("location", location)
.put("random_control_io_exception_rate", randomIntBetween(10, 25) / 100f)
.put("location", repoLocation)
.put("random_control_io_exception_rate", ioFailureRate)
.put("skip_exception_on_verification_file", true)
.put("skip_exception_on_list_blobs", true)
// Skipping is required for metadata as it is part of recovery
.put("skip_exception_on_blobs", skipExceptionBlobList)
.put("max_failure_number", Long.MAX_VALUE)
);

internalCluster().startDataOnlyNodes(1);
Expand All @@ -105,24 +165,6 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");

// Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed
// due to IOExceptions that are thrown while doing uploadBlobs.
indexData(randomIntBetween(5, 10), randomBoolean());
logger.info("--> Indexed data");

// TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts.
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
assertEquals(1, response.getShards().length);

String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

logger.info("--> Verify that the segment files are same on local and repository eventually");
assertBusy(
() -> assertEquals(getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)), getSegmentFiles(segmentDataRepoPath))
);
}

/**
Expand All @@ -134,15 +176,20 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
private Set<String> getSegmentFiles(Path location) {
try {
return Arrays.stream(FileSystemUtils.files(location))
.filter(path -> path.getFileName().startsWith("_"))
.filter(path -> path.getFileName().toString().startsWith("_"))
.map(path -> path.getFileName().toString())
.map(this::getLocalSegmentFilename)
.collect(Collectors.toSet());
} catch (IOException exception) {
logger.error("Exception occurred while getting segment files", exception);
}
return Collections.emptySet();
}

private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
Expand All @@ -153,7 +200,7 @@ private IndexResponse indexSingleDoc() {
private void indexData(int numberOfIterations, boolean invokeFlush) {
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(20, 50);
int numberOfOperations = randomIntBetween(1, 5);
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ public void afterIndexShardCreated(IndexShard indexShard) {

@Override
public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) {
if (indexShard.indexSettings().isRemoteStoreEnabled() == false) {
return;
RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = trackerMap.remove(shardId);
if (remoteRefreshSegmentTracker != null) {
logger.trace("Deleted tracker for shardId={}", shardId);
}
trackerMap.remove(shardId);
logger.trace("Deleted tracker for shardId={}", shardId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
/**
* In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially.
*/
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 30_000;
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 10_000;

/**
* Exponential back off policy with max retry interval.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public long getFailureCount() {

private final boolean skipExceptionOnListBlobs;

private final List<String> skipExceptionOnBlobs;

private final boolean useLuceneCorruptionException;

private final long maximumNumberOfFailures;
Expand Down Expand Up @@ -182,6 +184,7 @@ public MockRepository(
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
skipExceptionOnVerificationFile = metadata.settings().getAsBoolean("skip_exception_on_verification_file", false);
skipExceptionOnListBlobs = metadata.settings().getAsBoolean("skip_exception_on_list_blobs", false);
skipExceptionOnBlobs = metadata.settings().getAsList("skip_exception_on_blobs");
useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false);
maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L);
blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false);
Expand Down Expand Up @@ -370,12 +373,14 @@ private int hashCode(String path) {
private void maybeIOExceptionOrBlock(String blobName) throws IOException {
if (INDEX_LATEST_BLOB.equals(blobName) // Condition 1
|| skipExceptionOnVerificationFiles(blobName) // Condition 2
|| skipExceptionOnListBlobs(blobName)) { // Condition 3
|| skipExceptionOnListBlobs(blobName) // Condition 3
|| skipExceptionOnBlob(blobName)) { // Condition 4
// Condition 1 - Don't mess with the index.latest blob here, failures to write to it are ignored by
// upstream logic and we have specific tests that cover the error handling around this blob.
// Condition 2 & 3 - This condition has been added to allow creation of repository which throws IO
// exception during normal remote store operations. However, if we fail during verification as well,
// then we can not add the repository as well.
// Condition 4 - This condition allows to skip exception on specific blobName or blobPrefix
return;
}
if (blobName.startsWith("__")) {
Expand Down Expand Up @@ -582,5 +587,9 @@ private boolean isVerificationFile(String blobName) {
private boolean skipExceptionOnListBlobs(String blobName) {
return skipExceptionOnListBlobs && DUMMY_FILE_NAME_LIST_BLOBS.equals(blobName);
}

private boolean skipExceptionOnBlob(String blobName) {
return skipExceptionOnBlobs.contains(blobName);
}
}
}

0 comments on commit b84a82b

Please sign in to comment.