Skip to content

Commit

Permalink
Ignore Lucene index in peer recovery if translog corrupted (#49114)
Browse files Browse the repository at this point in the history
If the translog on a replica is corrupt, we should not perform an 
operation-based recovery or utilize sync_id as we won't be able to open
an engine in the next step. This change adds an extra validation that
ensures translog is okay when preparing a peer recovery request.
  • Loading branch information
dnhatn committed Nov 24, 2019
1 parent 224a5dc commit 7a50aff
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -327,6 +329,17 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov
Store.MetadataSnapshot metadataSnapshot;
try {
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
try {
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " +
"resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
startingSeqNo = UNASSIGNED_SEQ_NO;
}
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log
assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.mapper.SourceToParse;
Expand All @@ -60,6 +61,7 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;

public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {

Expand Down Expand Up @@ -285,4 +287,32 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception {
recoveryTarget.decRef();
closeShards(shard);
}

public void testResetStartRequestIfTranslogIsCorrupted() throws Exception {
DiscoveryNode pNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
DiscoveryNode rNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
IndexShard shard = newStartedShard(false);
final SeqNoStats seqNoStats = populateRandomData(shard);
shard.close("test", false);
if (randomBoolean()) {
shard.store().associateIndexWithNewTranslog(UUIDs.randomBase64UUID());
} else if (randomBoolean()) {
Translog.createEmptyTranslog(
shard.shardPath().resolveTranslog(), seqNoStats.getGlobalCheckpoint(), shard.shardId(), shard.getOperationPrimaryTerm());
} else {
IOUtils.rm(shard.shardPath().resolveTranslog());
}
shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE));
shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode));
shard.prepareForIndexRecovery();
RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null);
StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
logger, rNode, recoveryTarget, randomNonNegativeLong());
assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(request.metadataSnapshot(), sameInstance(Store.MetadataSnapshot.EMPTY));
recoveryTarget.decRef();
closeShards(shard);
}
}

0 comments on commit 7a50aff

Please sign in to comment.