Skip to content

Commit

Permalink
Restore snapshot changes for V2
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 28, 2024
1 parent 23cba28 commit c9d6deb
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
// could not be taken due to partial being set to false.
shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped");
} else {
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), indexId, shardId);
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo, indexId, shardId);
}
shardStatus.put(shardId, shardSnapshotStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.opensearch.Version.V_2_17_0;

/**
* Represents the recovery source of a shard. Available recovery types are:
* <p>
Expand Down Expand Up @@ -265,8 +267,14 @@ public static class SnapshotRecoverySource extends RecoverySource {
private final boolean remoteStoreIndexShallowCopy;
private final String sourceRemoteStoreRepository;

private final long pinnedTimestamp;

public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
this(restoreUUID, snapshot, version, indexId, false, false, null);
this(restoreUUID, snapshot, version, indexId, false, false, null, 0L);
}

public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId, long pinnedTimestamp) {
this(restoreUUID, snapshot, version, indexId, false, false, null, pinnedTimestamp);
}

public SnapshotRecoverySource(
Expand All @@ -285,6 +293,27 @@ public SnapshotRecoverySource(
this.isSearchableSnapshot = isSearchableSnapshot;
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.sourceRemoteStoreRepository = sourceRemoteStoreRepository;
this.pinnedTimestamp = 0L;
}

public SnapshotRecoverySource(
String restoreUUID,
Snapshot snapshot,
Version version,
IndexId indexId,
boolean isSearchableSnapshot,
boolean remoteStoreIndexShallowCopy,
@Nullable String sourceRemoteStoreRepository,
long pinnedTimestamp
) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(indexId);
this.isSearchableSnapshot = isSearchableSnapshot;
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.sourceRemoteStoreRepository = sourceRemoteStoreRepository;
this.pinnedTimestamp = pinnedTimestamp;
}

SnapshotRecoverySource(StreamInput in) throws IOException {
Expand All @@ -304,6 +333,11 @@ public SnapshotRecoverySource(
remoteStoreIndexShallowCopy = false;
sourceRemoteStoreRepository = null;
}
if (in.getVersion().onOrAfter(V_2_17_0)) {
pinnedTimestamp = in.readLong();
} else {
pinnedTimestamp = 0L;
}
}

public String restoreUUID() {
Expand Down Expand Up @@ -340,6 +374,10 @@ public boolean remoteStoreIndexShallowCopy() {
return remoteStoreIndexShallowCopy;
}

public long getPinnedTimestamp() {
return pinnedTimestamp;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
Expand All @@ -353,6 +391,10 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeBoolean(remoteStoreIndexShallowCopy);
out.writeOptionalString(sourceRemoteStoreRepository);
}
if (out.getVersion().onOrAfter(V_2_17_0)) {
out.writeLong(pinnedTimestamp);
}

}

@Override
Expand Down
76 changes: 66 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.search.stats.ShardSearchStats;
Expand Down Expand Up @@ -2479,6 +2480,10 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
openEngineAndRecoverFromTranslog(true);
}

public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOException {
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
Expand All @@ -2499,7 +2504,16 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
loadGlobalCheckpointToReplicationTracker();
}

innerOpenEngineAndTranslog(replicationTracker);
if (isSnapshotV2Restore()) {
translogConfig.setPinnedTimestamp(((SnapshotRecoverySource) routingEntry().recoverySource()).getPinnedTimestamp());
}

innerOpenEngineAndTranslog(replicationTracker, syncFromRemote);

if (isSnapshotV2Restore()) {
translogConfig.setPinnedTimestamp(-1);
}

getEngine().translogManager()
.recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE);
}
Expand Down Expand Up @@ -2561,7 +2575,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
if (shardRouting.primary()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
} else if (isSnapshotV2Restore() == false) {
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
Expand Down Expand Up @@ -2607,6 +2621,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}

private boolean isSnapshotV2Restore() {
return routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT
&& ((SnapshotRecoverySource) routingEntry().recoverySource()).getPinnedTimestamp() > 0;
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = fetchUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
Expand Down Expand Up @@ -2892,7 +2911,12 @@ public void restoreFromSnapshotAndRemoteStore(
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: "
+ recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState().getRecoverySource();
if (recoverySource.getPinnedTimestamp() > -1) {
storeRecovery.recoverShallowSnapshotV2(this, repository, repositoriesService, listener, threadPool);
} else {
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
}
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down Expand Up @@ -5000,16 +5024,33 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
syncTranslogFilesFromRemoteTranslog(
repository,
shardId,
indexSettings.getRemoteStorePathStrategy(),
indexSettings().isTranslogMetadataEnabled(),
-1
);
}

public void syncTranslogFilesFromRemoteTranslog(
Repository repository,
ShardId shardId,
RemoteStorePathStrategy remoteStorePathStrategy,
boolean isTranslogMetadataEnabled,
long timestamp
) throws IOException {
RemoteFsTranslog.download(
repository,
shardId,
getThreadPool(),
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStorePathStrategy,
remoteStoreSettings,
logger,
shouldSeedRemoteStore(),
indexSettings().isTranslogMetadataEnabled()
isTranslogMetadataEnabled,
timestamp
);
}

Expand Down Expand Up @@ -5098,15 +5139,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
* Downloads segments from given remote segment store for a specific commit.
* @param overrideLocal flag to override local segment files with those in remote store
* @param sourceRemoteDirectory RemoteSegmentDirectory Instance from which we need to sync segments
* @param primaryTerm Primary Term for shard at the time of commit operation for which we are syncing segments
* @param commitGeneration commit generation at the time of commit operation for which we are syncing segments
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromGivenRemoteSegmentStore(
boolean overrideLocal,
RemoteSegmentStoreDirectory sourceRemoteDirectory,
long primaryTerm,
long commitGeneration
RemoteSegmentMetadata remoteSegmentMetadata,
boolean pinnedTimestamp
) throws IOException {
logger.trace("Downloading segments from given remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = null;
Expand All @@ -5129,12 +5168,29 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
overrideLocal,
() -> {}
);
if (segmentsNFile != null) {
if (pinnedTimestamp) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else if (segmentsNFile != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentsNFile, IOContext.DEFAULT)
)
) {
long commitGeneration = SegmentInfos.generationFromSegmentsFileName(segmentsNFile);
SegmentInfos infosSnapshot = SegmentInfos.readCommit(store.directory(), indexInput, commitGeneration);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
if (remoteStore != null) {
Expand Down
82 changes: 79 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.Checkpoint;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogHeader;
Expand All @@ -72,6 +75,7 @@
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -405,14 +409,14 @@ void recoverFromSnapshotAndRemoteStore(
shardId,
shallowCopyShardMetadata.getRemoteStorePathStrategy()
);
sourceRemoteDirectory.initializeToSpecificCommit(
RemoteSegmentMetadata remoteSegmentMetadata = sourceRemoteDirectory.initializeToSpecificCommit(
primaryTerm,
commitGeneration,
recoverySource.snapshot().getSnapshotId().getUUID()
);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, remoteSegmentMetadata, false);
final Store store = indexShard.store();
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
if (indexShard.indexSettings.isRemoteStoreEnabled() == false) {
bootstrap(indexShard, store);
} else {
bootstrapForSnapshot(indexShard, store);
Expand Down Expand Up @@ -441,6 +445,78 @@ void recoverFromSnapshotAndRemoteStore(
}
}

void recoverShallowSnapshotV2(
final IndexShard indexShard,
Repository repository,
RepositoriesService repositoriesService,
ActionListener<Boolean> listener,
ThreadPool threadPool
) {
try {
if (canRecover(indexShard)) {
indexShard.preRecovery();
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType;
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource();
indexShard.prepareForIndexRecovery();

assert recoverySource.getPinnedTimestamp() > -1;
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repository.getRepositoryData(repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
IndexId indexId = repositoryData.resolveIndexId(recoverySource.index().getName());
IndexMetadata prevIndexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
recoverySource.snapshot().getSnapshotId(),
indexId
);
RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(
() -> repositoriesService,
threadPool
);
String remoteStoreRepository = ((SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource())
.sourceRemoteStoreRepository();
if (remoteStoreRepository == null) {
remoteStoreRepository = IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get(prevIndexMetadata.getSettings());
}
RemoteStorePathStrategy remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(prevIndexMetadata);
RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory(
remoteStoreRepository,
prevIndexMetadata.getIndexUUID(),
shardId,
remoteStorePathStrategy
);
RemoteSegmentMetadata remoteSegmentMetadata = sourceRemoteDirectory.initializeToSpecificTimestamp(
recoverySource.getPinnedTimestamp()
);

indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, remoteSegmentMetadata, true);
indexShard.syncTranslogFilesFromRemoteTranslog(
repositoriesService.repository(remoteStoreRepository),
new ShardId(prevIndexMetadata.getIndex(), shardId.id()),
remoteStorePathStrategy,
RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata),
recoverySource.getPinnedTimestamp()
);

assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
indexShard.recoveryState().getIndex().setFileDetailsComplete();
indexShard.openEngineAndRecoverFromTranslog(false);
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) {
indexShard.waitForRemoteStoreSync();
}
indexShard.postRecovery("post recovery from remote_store");
listener.onResponse(true);
}, listener::onFailure);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

private boolean canRecover(IndexShard indexShard) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
return remoteSegmentMetadata;
}

public RemoteSegmentMetadata initializeToSpecificTimestamp(long timestamp) {
return null;
}

/**
* Initializes the remote segment metadata to a specific timestamp.
*
Expand Down
Loading

0 comments on commit c9d6deb

Please sign in to comment.