Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Writable warm relocation recovery #14670

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
- [Writable Warm] Changes for shard recovery and relocation in case of composite directory ([#14670](https://github.com/opensearch-project/OpenSearch/pull/14670))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -245,4 +244,11 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
return closeable.get();
}
}

protected boolean warmIndexSegmentReplicationEnabled() {
return Objects.equals(
IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
IndexModule.DataLocalityType.PARTIAL.name()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);

flush(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

Expand All @@ -450,7 +449,11 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
refresh(INDEX_NAME);
verifyStoreContent();
// skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
// store.
if (!warmIndexSegmentReplicationEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know similar logic was implemented with remote store but I would prefer we not couple these test classes together and instead write new tests where appropriate for warm cases. Or, we break up the SegmentReplicationIT such that reusable tests move to the base IT.

verifyStoreContent();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a backlog issue to fix this?

}
}
}

Expand Down Expand Up @@ -623,7 +626,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
Expand Down Expand Up @@ -957,7 +960,11 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}
ensureGreen(INDEX_NAME);
waitForSearchableDocs(docCount, primaryNode, replicaNode);
verifyStoreContent();
// skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
// store.
if (!warmIndexSegmentReplicationEnabled()) {
verifyStoreContent();
}
final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
}
Expand Down Expand Up @@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
// Skipping this test in case of remote store enabled warm index
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startDataOnlyNode();
prepareCreate(
Expand Down Expand Up @@ -1179,7 +1192,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
Expand Down Expand Up @@ -1306,6 +1319,12 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
}

public void testPitCreatedOnReplica() throws Exception {
//// Skipping this test in case of remote store enabled warm index
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);

final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.node.Node;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are likely tests in the regular segrep suite that aren't valid here. Particularly when asserting against store content.


protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

@Before
private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);

return featureSettings.build();
}

@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

protected boolean warmIndexSegmentReplicationEnabled() {
return true;
}

@After
public void teardown() {
for (String nodeName : internalCluster().getNodeNames()) {
logger.info("file cache node name is {}", nodeName);
FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache();
fileCache.clear();
}
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
// a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
// In that case we still commit into the next local generation.
if (incomingGeneration != this.lastReceivedPrimaryGen) {
flush(false, true);
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this engine will only flush here and on close. I don't think we have a reason to commit on close as we will sync from store on re-open? If thats the case lets push this logic to the flush method call and cover both cases? Please also add a comment to why we are not committing here.

flush(false, true);
}
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
Expand Down
20 changes: 13 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -5030,6 +5030,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
boolean syncSegmentSuccess = false;
boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false;

long startTimeMs = System.currentTimeMillis();
assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
logger.trace("Downloading segments from remote segment store");
Expand All @@ -5052,7 +5054,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than adding all these checks to this method on locality can we create a separate trimmed down sync method for this case?

recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);
} else {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);
Expand All @@ -5061,7 +5063,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
} else {
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
if (indexSettings.isStoreLocalityPartial() == false) {
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
}

if (remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
Expand All @@ -5071,13 +5075,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
if (indexSettings.isStoreLocalityPartial() == false) {
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ public void getSegmentFiles(
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not skip the call to getSegmentFiles entirely? That would greatly reduce the changes to SegmentReplicationTarget.

: "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
if (indexShard.indexSettings().isStoreLocalityPartial()) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
return;
}
indexShard.getFileDownloader()
.downloadAsync(
cancellableThreads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -170,14 +171,22 @@ public void startReplication(ActionListener<Void> listener) {
final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();

Map<String, StoreFileMetadata> replicaMd = null;
try {
replicaMd = indexShard.getSegmentMetadataMap();
} catch (IOException e) {
listener.onFailure(new RuntimeException(e));
}

logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description()));
// Get list of files to copy from this checkpoint.
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
cancellableThreads.checkForCancel();
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

Map<String, StoreFileMetadata> finalReplicaMd = replicaMd;
checkpointInfoListener.whenComplete(checkpointInfo -> {
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo, finalReplicaMd);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
source.getSegmentFiles(
Expand All @@ -196,31 +205,37 @@ public void startReplication(ActionListener<Void> listener) {
}, listener::onFailure);
}

private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo, Map<String, StoreFileMetadata> finalReplicaMd)
throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to compute a diff here if all files are remote and simply return early if data locality is partial. The Checkpoint info call will include the sis bytes which we load onto the readermanager.

// local files
final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());
// set of local files that can be reused
final Set<String> reuseFiles = diff.missing.stream()
.filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
.filter(this::validateLocalChecksum)
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());
final List<StoreFileMetadata> missingFiles;
// Skip reuse logic for warm indices
if (indexShard.indexSettings().isStoreLocalityPartial() == true) {
missingFiles = diff.missing;
} else {
// set of local files that can be reused
final Set<String> reuseFiles = diff.missing.stream()
.filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
.filter(this::validateLocalChecksum)
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());

final List<StoreFileMetadata> missingFiles = diff.missing.stream()
.filter(md -> reuseFiles.contains(md.name()) == false)
.collect(Collectors.toList());
missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList());

logger.trace(
() -> new ParameterizedMessage(
"Replication diff for checkpoint {} {} {}",
checkpointInfo.getCheckpoint(),
missingFiles,
diff.different
)
);
}

logger.trace(
() -> new ParameterizedMessage(
"Replication diff for checkpoint {} {} {}",
checkpointInfo.getCheckpoint(),
missingFiles,
diff.different
)
);
/*
* Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming
* snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an
Expand Down
Loading