Skip to content

Commit

Permalink
Adding IT to verify local shards recovery doesn't complete until full…
Browse files Browse the repository at this point in the history
… upload

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Jan 23, 2024
1 parent 6db6ce3 commit b37276b
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.util.concurrent.ExecutionException;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -130,4 +132,61 @@ public void testCreateCloneIndex() {

}

public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
Version version = VersionUtils.randomIndexCompatibleVersion(random());
int numPrimaryShards = 1;
prepareCreate("source").setSettings(
Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version)
).get();
final int docs = 2;
for (int i = 0; i < docs; i++) {
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}
internalCluster().ensureAtLeastNumDataNodes(2);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();
// relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureGreen();

// disable rebalancing to be able to capture the right stats. balancing can move the target primary
// making it hard to pin point the source shards.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
.get();
try {
setFailRate(REPOSITORY_NAME, 100);

client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.CLONE)
.setWaitForActiveShards(0)
.setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build())
.get();

Thread.sleep(2000);
ensureYellow("target");

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
setFailRate(REPOSITORY_NAME, 0);
ensureGreen();
// clean up
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
)
.get();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.bulk.BulkItemResponse;
Expand Down Expand Up @@ -37,7 +39,7 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;

Expand All @@ -60,6 +62,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -146,6 +149,18 @@ protected Settings nodeSettings(int nodeOrdinal) {
}
}

protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
RepositoryMetadata rmd = res.repositories().get(0);
Settings.Builder settings = Settings.builder()
.put("location", rmd.settings().get("location"))
.put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
assertAcked(
client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
);
}

public Settings indexSettings() {
return defaultIndexSettings();
}
Expand Down Expand Up @@ -224,10 +239,10 @@ public static Settings buildRemoteStoreNodeAttributes(
return buildRemoteStoreNodeAttributes(
segmentRepoName,
segmentRepoPath,
FsRepository.TYPE,
ReloadableFsRepository.TYPE,
translogRepoName,
translogRepoPath,
FsRepository.TYPE,
ReloadableFsRepository.TYPE,
withRateLimiterAttributes
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,8 @@ boolean isRemoteSegmentStoreInSync(boolean verifyMetadata) {
)
);
}
} catch (AlreadyClosedException e) {
throw e;
} catch (Throwable e) {
logger.error("Exception while reading latest metadata", e);
}
Expand Down
20 changes: 13 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -806,14 +807,19 @@ private void waitForRemoteStoreSync(IndexShard indexShard) {
long startNanos = System.nanoTime();

while (System.nanoTime() - startNanos < indexShard.getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
if (indexShard.isRemoteSegmentStoreInSync()) {
break;
} else {
try {
Thread.sleep(TimeValue.timeValueMinutes(1).seconds());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
try {
if (indexShard.isRemoteSegmentStoreInSync()) {
break;
} else {
try {
Thread.sleep(TimeValue.timeValueMinutes(1).seconds());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
}
} catch (AlreadyClosedException e) {
// There is no point in waiting as shard is now closed .
return;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.RepositoryPlugin;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -71,6 +72,11 @@ public RepositoriesModule(
metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings)
);

factories.put(
ReloadableFsRepository.TYPE,
metadata -> new ReloadableFsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings)
);

for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getRepositories(
env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,51 @@

package org.opensearch.repositories.fs;

import org.opensearch.OpenSearchException;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.indices.recovery.RecoverySettings;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Random;

/**
* Extension of {@link FsRepository} that can be reloaded inplace
* Extension of {@link FsRepository} that can be reloaded inplace , supports failing operation and slowing it down
*
* @opensearch.internal
*/
public class ReloadableFsRepository extends FsRepository {
public static final String TYPE = "reloadable-fs";

private final FailSwitch fail;
private final SlowDownWriteSwitch slowDown;

public static final Setting<Integer> REPOSITORIES_FAILRATE_SETTING = Setting.intSetting(
"repositories.fail.rate",
0,
0,
100,
Setting.Property.NodeScope
);

public static final Setting<Integer> REPOSITORIES_SLOWDOWN_SETTING = Setting.intSetting(
"repositories.slowdown",
0,
0,
100,
Setting.Property.NodeScope
);

/**
* Constructs a shared file system repository that is reloadable in-place.
*/
Expand All @@ -31,6 +64,11 @@ public ReloadableFsRepository(
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings);
fail = new FailSwitch();
fail.failRate(REPOSITORIES_FAILRATE_SETTING.get(metadata.settings()));
slowDown = new SlowDownWriteSwitch();
slowDown.setSleepSeconds(REPOSITORIES_SLOWDOWN_SETTING.get(metadata.settings()));
readRepositoryMetadata();
}

@Override
Expand All @@ -40,12 +78,124 @@ public boolean isReloadable() {

@Override
public void reload(RepositoryMetadata repositoryMetadata) {
if (isReloadable() == false) {
return;
}

super.reload(repositoryMetadata);
readRepositoryMetadata();
validateLocation();
readMetadata();
}

private void readRepositoryMetadata() {
fail.failRate(REPOSITORIES_FAILRATE_SETTING.get(metadata.settings()));
slowDown.setSleepSeconds(REPOSITORIES_SLOWDOWN_SETTING.get(metadata.settings()));
}

protected BlobStore createBlobStore() throws Exception {
final String location = REPOSITORIES_LOCATION_SETTING.get(getMetadata().settings());
final Path locationFile = environment.resolveRepoFile(location);
return new ThrowingBlobStore(bufferSize, locationFile, isReadOnly(), fail, slowDown);
}

// A random integer from min-max (inclusive).
public static int randomIntBetween(int min, int max) {
Random random = new Random();
return random.nextInt(max - min + 1) + min;
}

static class FailSwitch {
private volatile int failRate;
private volatile boolean onceFailedFailAlways = false;

public boolean fail() {
final int rnd = randomIntBetween(1, 100);
boolean fail = rnd <= failRate;
if (fail && onceFailedFailAlways) {
failAlways();
}
return fail;
}

public void failAlways() {
failRate = 100;
}

public void failRate(int rate) {
failRate = rate;
}

public void onceFailedFailAlways() {
onceFailedFailAlways = true;
}
}

static class SlowDownWriteSwitch {
private volatile int sleepSeconds;

public void setSleepSeconds(int sleepSeconds) {
this.sleepSeconds = sleepSeconds;
}

public int getSleepSeconds() {
return sleepSeconds;
}
}

private static class ThrowingBlobStore extends FsBlobStore {

private final FailSwitch fail;
private final SlowDownWriteSwitch slowDown;

public ThrowingBlobStore(int bufferSizeInBytes, Path path, boolean readonly, FailSwitch fail, SlowDownWriteSwitch slowDown)
throws IOException {
super(bufferSizeInBytes, path, readonly);
this.fail = fail;
this.slowDown = slowDown;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
try {
return new ThrowingBlobContainer(this, path, buildAndCreate(path), fail, slowDown);
} catch (IOException ex) {
throw new OpenSearchException("failed to create blob container", ex);
}
}
}

private static class ThrowingBlobContainer extends FsBlobContainer {

private final FailSwitch fail;
private final SlowDownWriteSwitch slowDown;

public ThrowingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, FailSwitch fail, SlowDownWriteSwitch slowDown) {
super(blobStore, blobPath, path);
this.fail = fail;
this.slowDown = slowDown;
}

@Override
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists)
throws IOException {
checkFailRateAndSleep(blobName);
super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists);
}

private void checkFailRateAndSleep(String blobName) throws IOException {
if (fail.fail() && blobName.contains(".dat") == false) {
throw new IOException("blob container throwing error");
}
if (slowDown.getSleepSeconds() > 0) {
try {
Thread.sleep(slowDown.getSleepSeconds() * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
checkFailRateAndSleep(blobName);
super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}
}
}

0 comments on commit b37276b

Please sign in to comment.