Skip to content

Commit

Permalink
Make peer recovery work with archive data (#81522)
Browse files Browse the repository at this point in the history
Adapts peer recovery so that it properly integrates with the hook to convert old indices.

Relates #81210
  • Loading branch information
ywelsch authored Dec 14, 2021
1 parent 0410fbf commit 1e99bc6
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@

import static org.elasticsearch.common.lucene.Lucene.indexWriterConfigWithNoMerging;
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore;

/**
* This package private utility class encapsulates the logic to recover an index shard from either an existing index on
* disk or from a snapshot in a repository.
*/
final class StoreRecovery {
public final class StoreRecovery {

private final Logger logger;
private final ShardId shardId;
Expand Down Expand Up @@ -549,14 +550,17 @@ private void restore(
}
}

private void bootstrap(final IndexShard indexShard, final Store store) throws IOException {
store.bootstrapNewHistory();
public static void bootstrap(final IndexShard indexShard, final Store store) throws IOException {
if (isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) == false) {
// not bootstrapping new history for searchable snapshots (which are read-only) allows sequence-number based peer recoveries
store.bootstrapNewHistory();
}
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
localCheckpoint,
shardId,
indexShard.shardId(),
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.StoreRecovery;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
Expand All @@ -65,6 +66,7 @@

import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore;

/**
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
Expand Down Expand Up @@ -226,6 +228,17 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
if (isSearchableSnapshotStore(indexShard.indexSettings().getSettings())) {
// for searchable snapshots, peer recovery is treated similarly to recovery from snapshot
indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
final Store store = indexShard.store();
store.incRef();
try {
StoreRecovery.bootstrap(indexShard, store);
} finally {
store.decRef();
}
}
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2674,30 +2674,23 @@ public void snapshotShard(SnapshotShardContext context) {
long indexIncrementalSize = 0;
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
if (filesFromSegmentInfos == null) {

if (isSearchableSnapshotStore(store.indexSettings().getSettings())) {
indexCommitPointFiles = Collections.emptyList();
} else if (filesFromSegmentInfos == null) {
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
indexCommitPointFiles = new ArrayList<>();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
if (isSearchableSnapshotStore(store.indexSettings().getSettings())) {
fileNames = Collections.emptyList();
metadataFromStore = Store.MetadataSnapshot.EMPTY;
} else {
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]",
shardId,
snapshotId,
snapshotIndexCommit
);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
}
for (String fileName : fileNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.xpack.lucene.bwc.codecs.lucene70.BWCLucene70Codec;

import java.io.IOException;
Expand Down Expand Up @@ -169,6 +170,10 @@ public void write(Directory directory, SegmentInfo segmentInfo, String segmentSu
private static FieldInfos filterFields(FieldInfos fieldInfos) {
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
for (FieldInfo fieldInfo : fieldInfos) {
// omit sequence number field so that it doesn't interfere with peer recovery
if (fieldInfo.name.equals(SeqNoFieldMapper.NAME)) {
continue;
}
fieldInfoCopy.add(
new FieldInfo(
fieldInfo.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand All @@ -18,20 +17,15 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService;
import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory;

import java.nio.file.Path;

import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.isSearchableSnapshotStore;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
Expand Down Expand Up @@ -65,7 +59,6 @@ public SearchableSnapshotIndexEventListener(
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
ensureSnapshotIsLoaded(indexShard);
associateNewEmptyTranslogWithIndex(indexShard);
}

private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
Expand Down Expand Up @@ -93,26 +86,6 @@ private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
: "loading snapshot must not be called twice unless we are retrying a peer recovery";
}

private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId;
if (indexShard.routingEntry().primary()
&& indexShard.routingEntry().recoverySource().getType().equals(RecoverySource.Type.SNAPSHOT)) {
// translog initialization is done later in the restore step
return;
}
try {
final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long primaryTerm = indexShard.getPendingPrimaryTerm();
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
final Path translogLocation = indexShard.shardPath().resolveTranslog();
Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null);
} catch (Exception e) {
throw new TranslogException(shardId, "failed to associate a new translog", e);
}
}

@Override
public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) {
if (shouldEvictCacheFiles(reason)) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/qa/repository-old-versions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) {

def testClusterProvider = testClusters.register(clusterName) {
testDistribution = 'DEFAULT'
numberOfNodes = 2

setting 'path.repo', repoLocation
setting 'xpack.license.self_generated.type', 'trial'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.http.HttpHost;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
Expand All @@ -26,6 +27,8 @@
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.searchable_snapshots.MountSnapshotRequest;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -211,6 +214,16 @@ private void restoreMountAndVerify(int numDocs, Set<String> expectedIds, RestHig
assertEquals(numberOfShards, restoreSnapshotResponse.getRestoreInfo().totalShards());
assertEquals(numberOfShards, restoreSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(
ClusterHealthStatus.GREEN,
client.cluster()
.health(
new ClusterHealthRequest("restored_test").waitForGreenStatus().waitForNoRelocatingShards(true),
RequestOptions.DEFAULT
)
.getStatus()
);

// run a search against the index
assertDocs("restored_test", numDocs, expectedIds, client);

Expand All @@ -219,13 +232,24 @@ private void restoreMountAndVerify(int numDocs, Set<String> expectedIds, RestHig
.mountSnapshot(
new MountSnapshotRequest("testrepo", "snap1", "test").storage(MountSnapshotRequest.Storage.FULL_COPY)
.renamedIndex("mounted_full_copy_test")
.indexSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build())
.waitForCompletion(true),
RequestOptions.DEFAULT
);
assertNotNull(mountSnapshotResponse.getRestoreInfo());
assertEquals(numberOfShards, mountSnapshotResponse.getRestoreInfo().totalShards());
assertEquals(numberOfShards, mountSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(
ClusterHealthStatus.GREEN,
client.cluster()
.health(
new ClusterHealthRequest("mounted_full_copy_test").waitForGreenStatus().waitForNoRelocatingShards(true),
RequestOptions.DEFAULT
)
.getStatus()
);

// run a search against the index
assertDocs("mounted_full_copy_test", numDocs, expectedIds, client);

Expand Down

0 comments on commit 1e99bc6

Please sign in to comment.