Skip to content

Commit

Permalink
[Remote segments] Add backpressure in write path on segments lag betw…
Browse files Browse the repository at this point in the history
…een local and remote store (opensearch-project#7459)

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored and stephen-crawford committed May 31, 2023
1 parent 767afda commit ed3eaf3
Show file tree
Hide file tree
Showing 29 changed files with 816 additions and 411 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ public static final IndexShard newIndexShard(
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.Before;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase {

protected static final String REPOSITORY_NAME = "my-segment-repo-1";
protected static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

protected void deleteRepo() {
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
createRepository(
REPOSITORY_NAME,
"mock",
Settings.builder()
.put("location", repoLocation)
.put("random_control_io_exception_rate", ioFailureRate)
.put("skip_exception_on_verification_file", true)
.put("skip_exception_on_list_blobs", true)
// Skipping is required for metadata as it is part of recovery
.put("skip_exception_on_blobs", skipExceptionBlobList)
.put("max_failure_number", Long.MAX_VALUE)
);

internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");
}

/**
* Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc.
*
* @param location the path to location where segment files are being searched.
* @return set of file names of all segment file or empty set if there was IOException thrown.
*/
protected Set<String> getSegmentFiles(Path location) {
try {
return Arrays.stream(FileSystemUtils.files(location))
.filter(path -> path.getFileName().toString().startsWith("_"))
.map(path -> path.getFileName().toString())
.map(this::getLocalSegmentFilename)
.collect(Collectors.toSet());
} catch (IOException exception) {
logger.error("Exception occurred while getting segment files", exception);
}
return Collections.emptySet();
}

private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

protected void indexData(int numberOfIterations, boolean invokeFlush) {
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(1, 5);
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
if (invokeFlush) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

public void testWritesRejected() {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 1d, "metadata");

Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(request)
.get();
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");

logger.info("--> Indexing data");
OpenSearchRejectedExecutionException ex = assertThrows(
OpenSearchRejectedExecutionException.class,
() -> indexData(randomIntBetween(10, 20), randomBoolean())
);
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
deleteRepo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static java.util.Arrays.asList;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIT extends OpenSearchIntegTestCase {
public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreIT extends RemoteStoreBaseIT {
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
private static final String TOTAL_OPERATIONS = "total-operations";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,79 +8,21 @@

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractSnapshotIntegTestCase {

private static final String REPOSITORY_NAME = "my-segment-repo-1";
private static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(FeatureFlags.REMOTE_STORE, "true")
.build();
}

@Before
public void setup() {
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@After
public void teardown() {
logger.info("--> Deleting the repository={}", REPOSITORY_NAME);
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

public void testRemoteRefreshRetryOnFailure() throws Exception {

Expand All @@ -107,76 +49,16 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
Set<String> filesInRepo = getSegmentFiles(segmentDataRepoPath);
assertTrue(filesInRepo.containsAll(filesInLocal));
}, 60, TimeUnit.SECONDS);
deleteRepo();
}

private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
createRepository(
REPOSITORY_NAME,
"mock",
Settings.builder()
.put("location", repoLocation)
.put("random_control_io_exception_rate", ioFailureRate)
.put("skip_exception_on_verification_file", true)
.put("skip_exception_on_list_blobs", true)
.put("max_failure_number", Long.MAX_VALUE)
);

internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");
}

/**
* Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc.
*
* @param location the path to location where segment files are being searched.
* @return set of file names of all segment file or empty set if there was IOException thrown.
*/
private Set<String> getSegmentFiles(Path location) {
try {
return Arrays.stream(FileSystemUtils.files(location))
.filter(path -> path.getFileName().toString().startsWith("_"))
.map(path -> path.getFileName().toString())
.map(this::getLocalSegmentFilename)
.collect(Collectors.toSet());
} catch (IOException exception) {
logger.error("Exception occurred while getting segment files", exception);
}
return Collections.emptySet();
}

private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}
public void testRemoteRefreshSegmentPressureSettingChanged() {
Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
ClusterUpdateSettingsResponse response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get();
assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");

private void indexData(int numberOfIterations, boolean invokeFlush) {
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(1, 5);
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
if (invokeFlush) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
}
request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false).build();
response = client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get();
assertEquals(response.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "false");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0)
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIT {
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIntegTestCase {
private int shard_count = 5;

@Override
Expand Down
Loading

0 comments on commit ed3eaf3

Please sign in to comment.