Skip to content

Commit

Permalink
Add support to run SegRep integ tests using remote store settings
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Feb 21, 2023
1 parent 87bdd08 commit c74df42
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ public ReadOnlyEngine(
}
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
if (config.getIndexSettings().isRemoteTranslogStoreEnabled() == false) {
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
}
}
this.seqNoStats = seqNoStats;
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
Expand Down Expand Up @@ -186,7 +188,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStat
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
assert assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint());
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint());
if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) {
throw new IllegalStateException(
"Maximum sequence number ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,12 @@ public long getAsLong() {
* @param reason the reason the global checkpoint was updated
*/
public synchronized void updateGlobalCheckpointOnReplica(final long newGlobalCheckpoint, final String reason) {
assert invariant();
assert primaryMode == false;
updateGlobalCheckpoint(newGlobalCheckpoint, reason);
}

public synchronized void updateGlobalCheckpoint(final long newGlobalCheckpoint, final String reason) {
assert invariant();
/*
* The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary
* information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other
Expand Down
45 changes: 34 additions & 11 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,15 @@
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.index.translog.TranslogRecoveryRunner;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.warmer.ShardIndexWarmerService;
import org.opensearch.index.warmer.WarmerStats;
import org.opensearch.indices.IndexingMemoryController;
Expand All @@ -177,6 +181,7 @@
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -2166,6 +2171,10 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncTranslogFilesFromRemoteTranslog();
loadGlobalCheckpointToReplicationTracker();
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
Expand Down Expand Up @@ -2447,10 +2456,10 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(Repository repository, ActionListener<Boolean> listener) {
public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRemoteStore(this, repository, listener);
storeRecovery.recoverFromRemoteStore(this, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -3041,7 +3050,8 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S
* while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move
* to recovery finalization, or even finished recovery before the update arrives here.
*/
assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED
assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED)
|| (indexSettings.isRemoteTranslogStoreEnabled() == true && state() != IndexShardState.RECOVERING)
: "supposedly in-sync shard copy received a global checkpoint ["
+ globalCheckpoint
+ "] "
Expand Down Expand Up @@ -3226,14 +3236,7 @@ public void startRecovery(
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case REMOTE_STORE:
final Repository remoteTranslogRepo;
final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository();
if (remoteTranslogRepoName != null) {
remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName);
} else {
remoteTranslogRepo = null;
}
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l));
executeRecovery("from remote store", recoveryState, recoveryListener, this::restoreFromRemoteStore);
break;
case PEER:
try {
Expand Down Expand Up @@ -4300,6 +4303,10 @@ public void close() throws IOException {
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncTranslogFilesFromRemoteTranslog();
loadGlobalCheckpointToReplicationTracker();
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
Expand Down Expand Up @@ -4333,6 +4340,22 @@ public void close() throws IOException {
onSettingsChanged();
}

public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
blobStoreRepository,
getThreadPool(),
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, shardPath().resolveTranslog());
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
Expand Down
27 changes: 5 additions & 22 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,11 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -118,13 +114,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> liste
}
}

void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener<Boolean> listener) {
void recoverFromRemoteStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType;
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from remote store ...");
recoverFromRemoteStore(indexShard, repository);
recoverFromRemoteStore(indexShard);
return true;
});
} else {
Expand Down Expand Up @@ -439,7 +435,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
});
}

private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException {
private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
final Store remoteStore = indexShard.remoteStore();
if (remoteStore == null) {
throw new IndexShardRecoveryException(
Expand All @@ -460,8 +456,8 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
}
if (repository != null) {
syncTranslogFilesFromRemoteTranslog(indexShard, repository);
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) {
indexShard.syncTranslogFilesFromRemoteTranslog();
} else {
bootstrap(indexShard, store);
}
Expand All @@ -480,19 +476,6 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
}
}

private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
blobStoreRepository,
indexShard.getThreadPool(),
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog());
}

/**
* Recovers the state of the shard from the store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ public Translog newTranslog(
primaryModeSupplier
);
}

public Repository getRepository() {
return repository;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.FileTransferTracker;
Expand Down Expand Up @@ -79,6 +80,9 @@ public RemoteFsTranslog(
try {
download(translogTransferManager, location);
Checkpoint checkpoint = readCheckpoint(location);
assert globalCheckpointSupplier instanceof ReplicationTracker
: "globalCheckpointSupplier is not instance of ReplicationTracker";
((ReplicationTracker) globalCheckpointSupplier).updateGlobalCheckpoint(checkpoint.globalCheckpoint, "RemoteFsTranslog init");
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
Expand Down Expand Up @@ -117,15 +121,24 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
if (Files.notExists(location)) {
Files.createDirectories(location);
}
// Delete translog files on local before downloading from remote
for (Path file : FileSystemUtils.files(location)) {
Files.delete(file);
}
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) {
String generation = Long.toString(i);
boolean ckpExists = Files.exists(location.resolve(Translog.getCommitCheckpointFileName(i)));
boolean tlogExists = Files.exists(location.resolve(Translog.getFilename(i)));
if(ckpExists || tlogExists) {
if (ckpExists) {
Files.delete(location.resolve(Translog.getCommitCheckpointFileName(i)));
}
if (tlogExists) {
Files.delete(location.resolve(Translog.getFilename(i)));
}
}
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location);
}
if (Files.exists(location.resolve(Translog.CHECKPOINT_FILE_NAME))) {
Files.delete(location.resolve(Translog.CHECKPOINT_FILE_NAME));
}
// We copy the latest generation .ckp file to translog.ckp so that flows that depend on
// existence of translog.ckp file work in the same way
Files.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2866,7 +2866,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(null, future);
target.restoreFromRemoteStore(future);
target.remoteStore().decRef();

assertTrue(future.actionGet());
Expand Down
Loading

0 comments on commit c74df42

Please sign in to comment.