diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 5d58325d4cff0..a9ab09b059579 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -50,6 +50,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogCorruptedException; import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -327,6 +329,17 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov Store.MetadataSnapshot metadataSnapshot; try { metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); + // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index. + try { + final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); + assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; + } catch (IOException | TranslogCorruptedException e) { + logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " + + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + startingSeqNo = UNASSIGNED_SEQ_NO; + } } catch (final org.apache.lucene.index.IndexNotFoundException e) { // happens on an empty folder. no need to log assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 3db51082b7e9d..4fe0b36cfbac3 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.mapper.SourceToParse; @@ -60,6 +61,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { @@ -285,4 +287,32 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception { recoveryTarget.decRef(); closeShards(shard); } + + public void testResetStartRequestIfTranslogIsCorrupted() throws Exception { + DiscoveryNode pNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + DiscoveryNode rNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + IndexShard shard = newStartedShard(false); + final SeqNoStats seqNoStats = populateRandomData(shard); + shard.close("test", false); + if (randomBoolean()) { + shard.store().associateIndexWithNewTranslog(UUIDs.randomBase64UUID()); + } else if (randomBoolean()) { + Translog.createEmptyTranslog( + shard.shardPath().resolveTranslog(), seqNoStats.getGlobalCheckpoint(), shard.shardId(), shard.getOperationPrimaryTerm()); + } else { + IOUtils.rm(shard.shardPath().resolveTranslog()); + } + shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); + shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode)); + shard.prepareForIndexRecovery(); + RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null); + StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( + logger, rNode, recoveryTarget, randomNonNegativeLong()); + assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(request.metadataSnapshot(), sameInstance(Store.MetadataSnapshot.EMPTY)); + recoveryTarget.decRef(); + closeShards(shard); + } }