Skip to content

Commit

Permalink
Making changes for replication and recovery flow for writable warm index
Browse files Browse the repository at this point in the history
Signed-off-by: Nishant Goel <[email protected]>
  • Loading branch information
nisgoel-amazon committed Jul 4, 2024
1 parent 4c28408 commit d4d58dc
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.indices.replication;

import java.util.Objects;
import org.apache.lucene.index.SegmentInfos;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -49,8 +50,6 @@
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -245,4 +244,8 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
return closeable.get();
}
}

protected boolean warmIndexSegmentReplicationEnabled() {
return Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), IndexModule.DataLocalityType.PARTIAL.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
Expand Down Expand Up @@ -430,7 +432,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);

flush(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

Expand All @@ -450,7 +451,10 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
refresh(INDEX_NAME);
verifyStoreContent();
//skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store.
if(!warmIndexSegmentReplicationEnabled()) {
verifyStoreContent();
}
}
}

Expand Down Expand Up @@ -623,7 +627,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
Expand Down Expand Up @@ -957,7 +961,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}
ensureGreen(INDEX_NAME);
waitForSearchableDocs(docCount, primaryNode, replicaNode);
verifyStoreContent();
//skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store.
if (!warmIndexSegmentReplicationEnabled()) {
verifyStoreContent();
}
final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
}
Expand Down Expand Up @@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
// Skipping this test in case of remote store enabled warm index
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startDataOnlyNode();
prepareCreate(
Expand Down Expand Up @@ -1179,7 +1192,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
Expand Down Expand Up @@ -1306,6 +1319,12 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
}

public void testPitCreatedOnReplica() throws Exception {
//// Skipping this test in case of remote store enabled warm index
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);

final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import java.nio.file.Path;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.node.Node;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT {

protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
protected Path translogRepoPath;
protected boolean clusterSettingsSuppliedByTest = false;

@Before
private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
if (clusterSettingsSuppliedByTest) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
} else {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
//.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1)
.build();
}
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);

return featureSettings.build();
}

@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

protected boolean warmIndexSegmentReplicationEnabled() {
return true;
}

@After
public void teardown() {
clusterSettingsSuppliedByTest = false;
for (String nodeName : internalCluster().getNodeNames()) {
logger.info("file cache node name is {}", nodeName);
FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache();
fileCache.clear();
}
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get();
}

public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
Map<String, String> nodeAttributes = node.getAttributes();
String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));

String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name);
Map<String, String> settingsMap = node.getAttributes()
.keySet()
.stream()
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key)));

Settings.Builder settings = Settings.builder();
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true);

return new RepositoryMetadata(name, type, settings.build());
}

public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) {
RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0])
.state()
.metadata()
.custom(RepositoriesMetadata.TYPE);
RepositoryMetadata actualRepository = repositories.repository(repositoryName);

final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName);

for (String nodeName : internalCluster().getNodeNames()) {
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
DiscoveryNode node = clusterService.localNode();
RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName);

// Validated that all the restricted settings are entact on all the nodes.
repository.getRestrictedSystemRepositorySettings()
.stream()
.forEach(
setting -> assertEquals(
String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()),
setting.get(actualRepository.settings()),
setting.get(expectedRepository.settings())
)
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
// a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
// In that case we still commit into the next local generation.
if (incomingGeneration != this.lastReceivedPrimaryGen) {
flush(false, true);
if(engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
flush(false, true);
}
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
Expand Down
20 changes: 13 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -5030,6 +5030,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
boolean syncSegmentSuccess = false;
boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false;

long startTimeMs = System.currentTimeMillis();
assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
logger.trace("Downloading segments from remote segment store");
Expand All @@ -5052,7 +5054,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);
} else {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);
Expand All @@ -5061,7 +5063,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
} else {
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
if (indexSettings.isStoreLocalityPartial() == false) {
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
}

if (remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
Expand All @@ -5071,13 +5075,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
if (indexSettings.isStoreLocalityPartial() == false) {
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,13 @@ public void getSegmentFiles(
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
if(indexShard.indexSettings().isStoreLocalityPartial()) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
return;
}
indexShard.getFileDownloader()
.downloadAsync(
cancellableThreads,
Expand Down
Loading

0 comments on commit d4d58dc

Please sign in to comment.