forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Remote segments] Add backpressure in write path on segments lag betw…
…een local and remote store (opensearch-project#7459) Signed-off-by: Ashish Singh <[email protected]> Signed-off-by: Shivansh Arora <[email protected]>
- Loading branch information
Showing
29 changed files
with
816 additions
and
411 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
145 changes: 145 additions & 0 deletions
145
...rTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore; | ||
|
||
import org.junit.Before; | ||
import org.opensearch.action.index.IndexResponse; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.UUIDs; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.core.util.FileSystemUtils; | ||
import org.opensearch.index.IndexModule; | ||
import org.opensearch.index.IndexSettings; | ||
import org.opensearch.index.store.RemoteSegmentStoreDirectory; | ||
import org.opensearch.indices.replication.common.ReplicationType; | ||
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; | ||
import org.opensearch.test.FeatureFlagSetter; | ||
|
||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
|
||
public abstract class AbstractRemoteStoreMockRepositoryIntegTestCase extends AbstractSnapshotIntegTestCase { | ||
|
||
protected static final String REPOSITORY_NAME = "my-segment-repo-1"; | ||
protected static final String INDEX_NAME = "remote-store-test-idx-1"; | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); | ||
} | ||
|
||
@Before | ||
public void setup() { | ||
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); | ||
internalCluster().startClusterManagerOnlyNode(); | ||
} | ||
|
||
@Override | ||
public Settings indexSettings() { | ||
return remoteStoreIndexSettings(0); | ||
} | ||
|
||
protected Settings remoteStoreIndexSettings(int numberOfReplicas) { | ||
return Settings.builder() | ||
.put(super.indexSettings()) | ||
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) | ||
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) | ||
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) | ||
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) | ||
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) | ||
.build(); | ||
} | ||
|
||
protected void deleteRepo() { | ||
logger.info("--> Deleting the repository={}", REPOSITORY_NAME); | ||
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); | ||
} | ||
|
||
protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) { | ||
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation); | ||
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in | ||
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the | ||
// repository creation can happen without failure. | ||
createRepository( | ||
REPOSITORY_NAME, | ||
"mock", | ||
Settings.builder() | ||
.put("location", repoLocation) | ||
.put("random_control_io_exception_rate", ioFailureRate) | ||
.put("skip_exception_on_verification_file", true) | ||
.put("skip_exception_on_list_blobs", true) | ||
// Skipping is required for metadata as it is part of recovery | ||
.put("skip_exception_on_blobs", skipExceptionBlobList) | ||
.put("max_failure_number", Long.MAX_VALUE) | ||
); | ||
|
||
internalCluster().startDataOnlyNodes(1); | ||
createIndex(INDEX_NAME); | ||
logger.info("--> Created index={}", INDEX_NAME); | ||
ensureYellowAndNoInitializingShards(INDEX_NAME); | ||
logger.info("--> Cluster is yellow with no initializing shards"); | ||
ensureGreen(INDEX_NAME); | ||
logger.info("--> Cluster is green"); | ||
} | ||
|
||
/** | ||
* Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc. | ||
* | ||
* @param location the path to location where segment files are being searched. | ||
* @return set of file names of all segment file or empty set if there was IOException thrown. | ||
*/ | ||
protected Set<String> getSegmentFiles(Path location) { | ||
try { | ||
return Arrays.stream(FileSystemUtils.files(location)) | ||
.filter(path -> path.getFileName().toString().startsWith("_")) | ||
.map(path -> path.getFileName().toString()) | ||
.map(this::getLocalSegmentFilename) | ||
.collect(Collectors.toSet()); | ||
} catch (IOException exception) { | ||
logger.error("Exception occurred while getting segment files", exception); | ||
} | ||
return Collections.emptySet(); | ||
} | ||
|
||
private String getLocalSegmentFilename(String remoteFilename) { | ||
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0]; | ||
} | ||
|
||
private IndexResponse indexSingleDoc() { | ||
return client().prepareIndex(INDEX_NAME) | ||
.setId(UUIDs.randomBase64UUID()) | ||
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) | ||
.get(); | ||
} | ||
|
||
protected void indexData(int numberOfIterations, boolean invokeFlush) { | ||
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush); | ||
for (int i = 0; i < numberOfIterations; i++) { | ||
int numberOfOperations = randomIntBetween(1, 5); | ||
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i); | ||
for (int j = 0; j < numberOfOperations; j++) { | ||
indexSingleDoc(); | ||
} | ||
if (invokeFlush) { | ||
flush(INDEX_NAME); | ||
} else { | ||
refresh(INDEX_NAME); | ||
} | ||
} | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
...er/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore; | ||
|
||
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.nio.file.Path; | ||
|
||
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { | ||
|
||
public void testWritesRejected() { | ||
Path location = randomRepoPath().toAbsolutePath(); | ||
setup(location, 1d, "metadata"); | ||
|
||
Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build(); | ||
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin() | ||
.cluster() | ||
.prepareUpdateSettings() | ||
.setPersistentSettings(request) | ||
.get(); | ||
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true"); | ||
|
||
logger.info("--> Indexing data"); | ||
OpenSearchRejectedExecutionException ex = assertThrows( | ||
OpenSearchRejectedExecutionException.class, | ||
() -> indexData(randomIntBetween(10, 20), randomBoolean()) | ||
); | ||
assertTrue(ex.getMessage().contains("rejected execution on primary shard")); | ||
deleteRepo(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.