Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor translog download flow and Add support to run SegRep integ tests using remote store settings #6405

Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,22 +282,18 @@ public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exce
testPeerRecovery(false, randomIntBetween(2, 5), false);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(true, 1, true);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception {
testPeerRecovery(true, randomIntBetween(2, 5), true);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(true, 1, false);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception {
testPeerRecovery(true, randomIntBetween(2, 5), false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question - what numDataNodes = 0 means here? I checked the code base and saw multiple such occurrences.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to control number of nodes in the test cluster. That is why we provide 0 and start nodes as per the test requirement.

More on numDataNodes: https://github.com/opensearch-project/OpenSearch/blob/main/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java#L1722

public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ public ReadOnlyEngine(
}
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Copy link
Member

@ashking94 ashking94 Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - would it make this code more readable, if we offload this check to within ensureMaxSeqNoEqualsToGlobalCheckpoint method -

 if (requireCompleteHistory == false || engineConfig.getIndexSettings().isRemoteTranslogStoreEnabled()) {
            return;
        }

protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) {
if (requireCompleteHistory == false) {
return;

if (config.getIndexSettings().isRemoteTranslogStoreEnabled() == false) {
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
}
}
this.seqNoStats = seqNoStats;
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
Expand Down Expand Up @@ -186,7 +188,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStat
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
assert assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert is redundant here as assertMaxSeqNoEqualsToGlobalCheckpoint already has an assert.

assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also change the contract of the method to return void since the return value is not being used any more?

if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) {
throw new IllegalStateException(
"Maximum sequence number ["
Expand Down
33 changes: 22 additions & 11 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -2181,6 +2183,10 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncTranslogFilesFromRemoteTranslog();
loadGlobalCheckpointToReplicationTracker();
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
Expand Down Expand Up @@ -2462,10 +2468,10 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(Repository repository, ActionListener<Boolean> listener) {
public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRemoteStore(this, repository, listener);
storeRecovery.recoverFromRemoteStore(this, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -3079,7 +3085,8 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S
* while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move
* to recovery finalization, or even finished recovery before the update arrives here.
*/
assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED
assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED)
|| (indexSettings.isRemoteTranslogStoreEnabled() == true && state() != IndexShardState.RECOVERING)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a short comment or add to method's java doc on why this condition has been added?

: "supposedly in-sync shard copy received a global checkpoint ["
+ globalCheckpoint
+ "] "
Expand Down Expand Up @@ -3264,14 +3271,7 @@ public void startRecovery(
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case REMOTE_STORE:
final Repository remoteTranslogRepo;
final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository();
if (remoteTranslogRepoName != null) {
remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName);
} else {
remoteTranslogRepo = null;
}
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l));
executeRecovery("from remote store", recoveryState, recoveryListener, this::restoreFromRemoteStore);
break;
case PEER:
try {
Expand Down Expand Up @@ -4338,6 +4338,10 @@ public void close() throws IOException {
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncTranslogFilesFromRemoteTranslog();
loadGlobalCheckpointToReplicationTracker();
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
Expand Down Expand Up @@ -4371,6 +4375,13 @@ public void close() throws IOException {
onSettingsChanged();
}

public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog());
}

/**
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,11 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -118,13 +114,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> liste
}
}

void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener<Boolean> listener) {
void recoverFromRemoteStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType;
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from remote store ...");
recoverFromRemoteStore(indexShard, repository);
recoverFromRemoteStore(indexShard);
return true;
});
} else {
Expand Down Expand Up @@ -439,7 +435,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
});
}

private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException {
private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
final Store remoteStore = indexShard.remoteStore();
if (remoteStore == null) {
throw new IndexShardRecoveryException(
Expand All @@ -460,8 +456,8 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
}
if (repository != null) {
syncTranslogFilesFromRemoteTranslog(indexShard, repository);
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) {
indexShard.syncTranslogFilesFromRemoteTranslog();
} else {
bootstrap(indexShard, store);
}
Expand All @@ -480,19 +476,6 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
}
}

private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
blobStoreRepository,
indexShard.getThreadPool(),
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog());
}

/**
* Recovers the state of the shard from the store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ public Translog newTranslog(
primaryModeSupplier
);
}

public Repository getRepository() {
return repository;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand All @@ -46,7 +48,6 @@
*/
public class RemoteFsTranslog extends Translog {

private final BlobStoreRepository blobStoreRepository;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
private final BooleanSupplier primaryModeSupplier;
Expand Down Expand Up @@ -77,7 +78,6 @@ public RemoteFsTranslog(
BooleanSupplier primaryModeSupplier
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
this.blobStoreRepository = blobStoreRepository;
this.primaryModeSupplier = primaryModeSupplier;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker);
Expand Down Expand Up @@ -116,22 +116,35 @@ public RemoteFsTranslog(
}
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, location);
}

private static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
if (Files.notExists(location)) {
Files.createDirectories(location);
}
// Delete translog files on local before downloading from remote
for (Path file : FileSystemUtils.files(location)) {
Files.delete(file);
} else {
deleteTranslogFilesNotUploaded(location, translogMetadata.getGeneration());
}
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) {
String generation = Long.toString(i);
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location);
}
if (Files.exists(location.resolve(Translog.CHECKPOINT_FILE_NAME))) {
Files.delete(location.resolve(Translog.CHECKPOINT_FILE_NAME));
}
// We copy the latest generation .ckp file to translog.ckp so that flows that depend on
// existence of translog.ckp file work in the same way
Files.copy(
Expand All @@ -141,6 +154,25 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
}
}

private static void deleteTranslogFilesNotUploaded(Path location, long uploadedGeneration) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I recall the earlier check on deleting files before downloading translog files was added as otherwise the download used to fail, right? Now we are deleting files that are greater than the max generation referenced by the remote translog metadata file. While this is really a good optimisation, I think we should check the local file's checksum and the expected checksum. If there is a story on the checksum part already, then do let me know. If not, let's follow it up in next PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On high level, comparing checksum makes sense. This is what we do for segments and can be added here as well.
The only reason I have not added this check was due to the invariant that we have: only one node will be uploading a translog file at any given time. So, if the translog file name matches, checksum must be same. Let me create a tracking issue to discuss this in more detail.

Copy link
Member Author

@sachinpkale sachinpkale Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created tracking issue: #6896

// Delete translog files with generation > translogMetadata.getGeneration()
Arrays.stream(FileSystemUtils.files(location))
.map(filePath -> filePath.getFileName().toString())
.filter(filename -> filename.endsWith(TRANSLOG_FILE_SUFFIX))
.map(
filename -> Long.parseLong(
filename.substring(TRANSLOG_FILE_PREFIX.length(), filename.length() - TRANSLOG_FILE_SUFFIX.length())
)
)
.filter(generation -> generation > uploadedGeneration)
.forEach(
generation -> IOUtils.deleteFilesIgnoringExceptions(
location.resolve(Translog.getCommitCheckpointFileName(generation)),
location.resolve(Translog.getFilename(generation))
)
);
}

public static TranslogTransferManager buildTranslogTransferManager(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
Expand Down
Loading