Skip to content

Commit

Permalink
SegRep with Remote: Add Remote store as a segment replication source (#…
Browse files Browse the repository at this point in the history
…7653)

* SegRep with Remote: Add Remote store as a segment replication source

Signed-off-by: Ankit Kala <[email protected]>

* Fix Gradle check

Signed-off-by: Ankit Kala <[email protected]>

* Retrying Gradle check

Signed-off-by: Ankit Kala <[email protected]>

---------

Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala authored Jun 7, 2023
1 parent 8773509 commit 2a3683e
Show file tree
Hide file tree
Showing 27 changed files with 486 additions and 82 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))

### Dependencies
- Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897))
Expand All @@ -106,7 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924
- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.lease.Releasable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
Expand Down Expand Up @@ -580,6 +581,7 @@ public void testDeleteOperations() throws Exception {
* from xlog.
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startNode();
Expand Down Expand Up @@ -785,6 +787,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
assumeFalse(
"Skipping the test as pressure service is not compatible with SegRep and Remote store yet.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
Expand Down Expand Up @@ -874,6 +880,7 @@ public void testPressureServiceStats() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -963,6 +970,11 @@ public void testScrollCreatedOnReplica() throws Exception {
* @throws Exception when issue is encountered
*/
public void testScrollWithOngoingSegmentReplication() throws Exception {
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store yet.",
segmentReplicationWithRemoteEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
prepareCreate(
Expand Down Expand Up @@ -1249,4 +1261,8 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
waitForSearchableDocs(2, nodes);
}

private boolean segmentReplicationWithRemoteEnabled() {
return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue()
&& "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
public Settings indexSettings() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* This class runs Segment Replication Integ test suite with remote store enabled.
* Setup is similar to SegmentReplicationRemoteStoreIT but this also enables the segment replication using remote store which
* is behind SEGMENT_REPLICATION_EXPERIMENTAL flag. After this is moved out of experimental, we can combine and keep only one
* test suite for Segment and Remote store integration tests.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationUsingRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2760,6 +2760,13 @@ public final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}

/**
* Returns the current local checkpoint getting refreshed internally.
*/
public final long currentOngoingRefreshCheckpoint() {
return lastRefreshedCheckpointListener.pendingCheckpoint;
}

private final Object refreshIfNeededMutex = new Object();

/**
Expand All @@ -2777,10 +2784,11 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {

private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
private long pendingCheckpoint;
volatile long pendingCheckpoint;

LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
this.pendingCheckpoint = initialLocalCheckpoint;
}

@Override
Expand Down
65 changes: 31 additions & 34 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2231,7 +2231,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4404,7 +4404,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
syncSegmentsFromRemoteSegmentStore(false, true, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4454,23 +4454,15 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog());
}

/**
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, true);
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @param shouldCommit if the shard requires committing the changes after sync from remote.
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit)
throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
Expand Down Expand Up @@ -4529,29 +4521,34 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) - 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
if (shouldCommit) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else {
finalizeReplication(infosSnapshot);
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,11 @@ private boolean isRefreshAfterCommit() throws IOException {
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException {
final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos);

final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNoFromSegmentInfos));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNoFromSegmentInfos));
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNo));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
segmentInfosSnapshot.setUserData(userData, false);

remoteDirectory.uploadMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -100,7 +101,15 @@ public IndexOutput createOutput(String name, IOContext context) {
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name));
InputStream inputStream = null;
try {
inputStream = blobContainer.readBlob(name);
return new RemoteIndexInput(name, inputStream, fileLength(name));
} catch (Exception e) {
// Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak.
if (inputStream != null) inputStream.close();
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ public RemoteSegmentMetadata init() throws IOException {
* @return Map of segment filename to uploaded filename with checksum
* @throws IOException if there were any failures in reading the metadata file
*/
private RemoteSegmentMetadata readLatestMetadataFile() throws IOException {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = new HashMap<>();
public RemoteSegmentMetadata readLatestMetadataFile() throws IOException {
RemoteSegmentMetadata remoteSegmentMetadata = null;

Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
Expand Down Expand Up @@ -199,6 +198,10 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
}

public String getOriginalFilename() {
return originalFilename;
}
}

/**
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ public static RecoveryDiff segmentReplicationDiff(Map<String, StoreFileMetadata>
missing.add(value);
} else {
final StoreFileMetadata fileMetadata = target.get(value.name());
if (fileMetadata.isSame(value)) {
// match segments using checksum
if (fileMetadata.checksum().equals(value.checksum())) {
identical.add(value);
} else {
different.add(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, false);
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Loading

0 comments on commit 2a3683e

Please sign in to comment.