Skip to content

Commit

Permalink
Fix recovery path for searchable snapshots
Browse files Browse the repository at this point in the history
Replicas are bootstrapped using the recovery path, as opposed to the
restore path used for creating the primary shard. This has been broken
in the initial implementation of searchable snapshots. The fix here is
to put in the appropriate checks to avoid failing during recovery.

I've also updated the integration test to ensure the recovery path is
always exercised during testing.

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Oct 17, 2022
1 parent e44158d commit 8c49389
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,24 @@ protected boolean addMockInternalEngine() {
}

public void testCreateSearchableSnapshot() throws Exception {
final int numReplicasIndex1 = randomIntBetween(1, 4);
final int numReplicasIndex2 = randomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
final Client client = client();
createRepository("test-repo", "fs");
createIndex(
"test-idx-1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex1))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
createIndex(
"test-idx-2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex2))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();
indexRandomDocs("test-idx-1", 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.mapper.MapperException;
Expand Down Expand Up @@ -244,16 +245,18 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled);
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = IndexModule.Type.REMOTE_SNAPSHOT.match(indexShard.indexSettings());
final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog) == false;
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
startRequest = getStartRecoveryRequest(
logger,
clusterService.localNode(),
recoveryTarget,
startingSeqNo,
!remoteTranslogEnabled
verifyTranslog
);
requestToSend = startRequest;
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.IndexModule;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.seqno.ReplicationTracker;
Expand Down Expand Up @@ -355,10 +356,12 @@ public void cleanFiles(
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata);

// If Segment Replication is enabled, we need to reuse the primary's translog UUID already stored in the index.
// With Segrep, replicas should never create their own commit points. This ensures the index and xlog share the same
// UUID without the extra step to associate the index with a new xlog.
if (indexShard.indexSettings().isSegRepEnabled()) {
// Replicas for segment replication or remote snapshot indices do not create
// their own commit points and therefore do not modify the commit user data
// in their store. In these cases, reuse the primary's translog UUID.
final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabled()
|| IndexModule.Type.REMOTE_SNAPSHOT.match(indexShard.indexSettings());
if (reuseTranslogUUID) {
final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY);
Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
Expand Down

0 comments on commit 8c49389

Please sign in to comment.