From 8fe0cda92740f73866349080873d9eba9b31cd31 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 23 Jul 2019 12:47:19 -0400 Subject: [PATCH] Use global checkpoint as starting seq in ops-based recovery (#43463) Today we use the local checkpoint of the safe commit on replicas as the starting sequence number of operation-based peer recovery. While this is a good choice due to its simplicity, we need to share this information between copies if we use retention leases in peer recovery. We can avoid this extra work if we use the global checkpoint as the starting sequence number. With this change, we will try to recover replica locally up to the global checkpoint before performing peer recovery. This commit should also increase the chance of operation-based recovery. --- .../elasticsearch/index/shard/IndexShard.java | 123 +++++++++++-- .../org/elasticsearch/index/store/Store.java | 17 ++ .../recovery/PeerRecoveryTargetService.java | 89 ++------- .../indices/recovery/RecoveryState.java | 29 ++- .../indices/recovery/RecoveryTarget.java | 5 +- .../RecoveryDuringReplicationTests.java | 36 ++-- .../indices/recovery/IndexRecoveryIT.java | 61 ++++++ .../PeerRecoveryTargetServiceTests.java | 173 ++++++++++-------- .../index/shard/IndexShardTestCase.java | 15 +- 9 files changed, 343 insertions(+), 205 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ba9f05cb3ffca..18db78789c914 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -159,6 +159,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -1359,6 +1360,81 @@ public void prepareForIndexRecovery() { assert currentEngineReference.get() == null; } + /** + * A best effort to bring up this shard to the global checkpoint using the local translog before performing a peer recovery. + * + * @return a sequence number that an operation-based peer recovery can start with. + * This is the first operation after the local checkpoint of the safe commit if exists. + */ + public long recoverLocallyUpToGlobalCheckpoint() { + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); + } + assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]"; + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + final Optional safeCommit; + final long globalCheckpoint; + try { + final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); + globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); + safeCommit = store.findSafeIndexCommit(globalCheckpoint); + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.trace("skip local recovery as no index commit found"); + return UNASSIGNED_SEQ_NO; + } catch (Exception e) { + logger.debug("skip local recovery as failed to find the safe commit", e); + return UNASSIGNED_SEQ_NO; + } + if (safeCommit.isPresent() == false) { + logger.trace("skip local recovery as no safe commit found"); + return UNASSIGNED_SEQ_NO; + } + assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint; + try { + maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + if (safeCommit.get().localCheckpoint == globalCheckpoint) { + logger.trace("skip local recovery as the safe commit is up to date; safe commit {} global checkpoint {}", + safeCommit.get(), globalCheckpoint); + recoveryState.getTranslog().totalLocal(0); + return globalCheckpoint + 1; + } + try { + final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { + recoveryState.getTranslog().totalLocal(snapshot.totalOperations()); + final int recoveredOps = runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, + recoveryState.getTranslog()::incrementRecoveredOperations); + recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count + return recoveredOps; + }; + innerOpenEngineAndTranslog(); + getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint); + logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); + } finally { + synchronized (mutex) { + IOUtils.close(currentEngineReference.getAndSet(null)); + } + } + } catch (Exception e) { + logger.debug(new ParameterizedMessage("failed to recover shard locally up to global checkpoint {}", globalCheckpoint), e); + return UNASSIGNED_SEQ_NO; + } + try { + // we need to find the safe commit again as we should have created a new one during the local recovery + final Optional newSafeCommit = store.findSafeIndexCommit(globalCheckpoint); + assert newSafeCommit.isPresent() : "no safe commit found after local recovery"; + return newSafeCommit.get().localCheckpoint + 1; + } catch (Exception e) { + if (Assertions.ENABLED) { + throw new AssertionError( + "failed to find the safe commit after recovering shard locally up to global checkpoint " + globalCheckpoint, e); + } + logger.debug(new ParameterizedMessage( + "failed to find the safe commit after recovering shard locally up to global checkpoint {}", globalCheckpoint), e); + return UNASSIGNED_SEQ_NO; + } + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } @@ -1462,6 +1538,9 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat * Operations from the translog will be replayed to bring lucene up to date. **/ public void openEngineAndRecoverFromTranslog() throws IOException { + assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]"; + maybeCheckIndex(); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { translogRecoveryStats.totalOperations(snapshot.totalOperations()); @@ -1478,6 +1557,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException { * The translog is kept but its operations won't be replayed. */ public void openEngineAndSkipTranslogRecovery() throws IOException { + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]"; innerOpenEngineAndTranslog(); getEngine().skipTranslogRecovery(); } @@ -1486,17 +1567,6 @@ private void innerOpenEngineAndTranslog() throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } - recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); - // also check here, before we apply the translog - if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) { - try { - checkIndex(); - } catch (IOException ex) { - throw new RecoveryFailedException(recoveryState, "check index failed", ex); - } - } - recoveryState.setStage(RecoveryState.Stage.TRANSLOG); - final EngineConfig config = newEngineConfig(); // we disable deletes since we allow for operations to be executed against the shard while recovering @@ -1552,14 +1622,22 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { synchronized (mutex) { - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - final Engine engine = this.currentEngineReference.getAndSet(null); - IOUtils.close(engine); - recoveryState().setStage(RecoveryState.Stage.INIT); + IOUtils.close(currentEngineReference.getAndSet(null)); + resetRecoveryStage(); + } + } + + /** + * If a file-based recovery occurs, a recovery target calls this method to reset the recovery stage. + */ + public void resetRecoveryStage() { + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + assert currentEngineReference.get() == null; + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); } + recoveryState().setStage(RecoveryState.Stage.INIT); } /** @@ -2296,6 +2374,17 @@ public void noopUpdate(String type) { internalIndexingStats.noopUpdate(type); } + public void maybeCheckIndex() { + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) { + try { + checkIndex(); + } catch (IOException ex) { + throw new RecoveryFailedException(recoveryState, "check index failed", ex); + } + } + } + void checkIndex() throws IOException { if (store.tryIncRef()) { try { diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index e9430ea7aa098..62f0cdb2c1271 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -97,6 +97,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1583,6 +1584,22 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long } } + /** + * Returns a {@link org.elasticsearch.index.seqno.SequenceNumbers.CommitInfo} of the safe commit if exists. + */ + public Optional findSafeIndexCommit(long globalCheckpoint) throws IOException { + final List commits = DirectoryReader.listCommits(directory); + assert commits.isEmpty() == false : "no commit found"; + final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint); + final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet()); + // all operations of the safe commit must be at most the global checkpoint. + if (commitInfo.maxSeqNo <= globalCheckpoint) { + return Optional.of(commitInfo); + } else { + return Optional.empty(); + } + } + private static void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { final Map userData = getUserData(writer); userData.putAll(keysToUpdate); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8a11cdf5ec961..8ef1ec1ae7249 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -22,8 +22,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; @@ -44,18 +42,14 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.engine.CombinedDeletionPolicy; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogCorruptedException; import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -68,12 +62,11 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.List; -import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** * The recovery target handles recoveries of peer shards of the shard+node to recover to. @@ -178,9 +171,12 @@ private void doRecovery(final long recoveryId) { cancellableThreads = recoveryTarget.cancellableThreads(); try { assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; - request = getStartRecoveryRequest(recoveryTarget); logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); recoveryTarget.indexShard().prepareForIndexRecovery(); + final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : + "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; + request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); } catch (final Exception e) { // this will be logged as warning later on... logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); @@ -319,7 +315,7 @@ public RecoveryResponse read(StreamInput in) throws IOException { * @param recoveryTarget the target of the recovery * @return a snapshot of the store metadata */ - private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { + private static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) { try { return recoveryTarget.indexShard().snapshotStoreMetadata(); } catch (final org.apache.lucene.index.IndexNotFoundException e) { @@ -335,38 +331,25 @@ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget rec /** * Prepare the start recovery request. * + * @param logger the logger + * @param localNode the local node of the recovery target * @param recoveryTarget the target of the recovery + * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. + * This is the first operation after the local checkpoint of the safe commit if exists. * @return a start recovery request */ - private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) { + public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode, + RecoveryTarget recoveryTarget, long startingSeqNo) { final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); + final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget); logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); - - final long startingSeqNo; - if (metadataSnapshot.size() > 0) { - startingSeqNo = getStartingSeqNo(logger, recoveryTarget); - } else { - startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; - } - - if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { - logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - } else { - logger.trace( - "{} preparing for sequence-number-based recovery starting at sequence number [{}] from [{}]", - recoveryTarget.shardId(), - startingSeqNo, - recoveryTarget.sourceNode()); - } - request = new StartRecoveryRequest( recoveryTarget.shardId(), recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(), - clusterService.localNode(), + localNode, metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId(), @@ -374,50 +357,6 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove return request; } - /** - * Get the starting sequence number for a sequence-number-based request. - * - * @param recoveryTarget the target of the recovery - * @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number - * failed - */ - public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) { - try { - final Store store = recoveryTarget.store(); - final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID); - final List existingCommits = DirectoryReader.listCommits(store.directory()); - final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint); - final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit); - if (logger.isTraceEnabled()) { - final StringJoiner descriptionOfExistingCommits = new StringJoiner(","); - for (IndexCommit commit : existingCommits) { - descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit)); - } - logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]", - globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits); - } - if (seqNoStats.maxSeqNo <= globalCheckpoint) { - assert seqNoStats.localCheckpoint <= globalCheckpoint; - /* - * Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global - * checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation - * after the local checkpoint stored in the commit. - */ - return seqNoStats.localCheckpoint + 1; - } else { - return SequenceNumbers.UNASSIGNED_SEQ_NO; - } - } catch (final TranslogCorruptedException | IOException e) { - /* - * This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the - * translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and - * proceeds to attempt a sequence-number-based recovery. - */ - return SequenceNumbers.UNASSIGNED_SEQ_NO; - } - } - public interface RecoveryListener { void onRecoveryDone(RecoveryState state); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index c3d9c7ca630a6..7aa6dd42989a0 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; @@ -33,6 +34,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -461,6 +463,7 @@ public static class Translog extends Timer implements ToXContentFragment, Writea private int recovered; private int total = UNKNOWN; private int totalOnStart = UNKNOWN; + private int totalLocal = UNKNOWN; public Translog() { } @@ -470,6 +473,9 @@ public Translog(StreamInput in) throws IOException { recovered = in.readVInt(); total = in.readVInt(); totalOnStart = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + totalLocal = in.readVInt(); + } } @Override @@ -478,6 +484,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(recovered); out.writeVInt(total); out.writeVInt(totalOnStart); + if (out.getVersion().onOrAfter(Version.V_7_4_0)) { + out.writeVInt(totalLocal); + } } public synchronized void reset() { @@ -485,6 +494,7 @@ public synchronized void reset() { recovered = 0; total = UNKNOWN; totalOnStart = UNKNOWN; + totalLocal = UNKNOWN; } public synchronized void incrementRecoveredOperations() { @@ -526,8 +536,8 @@ public synchronized int totalOperations() { } public synchronized void totalOperations(int total) { - this.total = total; - assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + + this.total = totalLocal == UNKNOWN ? total : totalLocal + total; + assert total == UNKNOWN || this.total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; } @@ -542,7 +552,20 @@ public synchronized int totalOperationsOnStart() { } public synchronized void totalOperationsOnStart(int total) { - this.totalOnStart = total; + this.totalOnStart = totalLocal == UNKNOWN ? total : totalLocal + total; + } + + /** + * Sets the total number of translog operations to be recovered locally before performing peer recovery + * @see IndexShard#recoverLocallyUpToGlobalCheckpoint() + */ + public synchronized void totalLocal(int totalLocal) { + assert totalLocal >= recovered : totalLocal + " < " + recovered; + this.totalLocal = totalLocal; + } + + public synchronized int totalLocal() { + return totalLocal; } public synchronized float recoveredPercent() { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 9be57296cdf04..e2eb67399c2d5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -381,6 +381,8 @@ public void receiveFileInfo(List phase1FileNames, int totalTranslogOps, ActionListener listener) { ActionListener.completeWith(listener, () -> { + indexShard.resetRecoveryStage(); + indexShard.prepareForIndexRecovery(); final RecoveryState.Index index = state().getIndex(); for (int i = 0; i < phase1ExistingFileNames.size(); i++) { index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true); @@ -421,7 +423,8 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada } else { assert indexShard.assertRetentionLeasesPersisted(); } - + indexShard.maybeCheckIndex(); + state().setStage(RecoveryState.Stage.TRANSLOG); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index c60f32132c646..9c6340459f5f0 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -66,6 +66,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -114,31 +115,26 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { int docs = shards.indexDocs(randomInt(50)); shards.flush(); final IndexShard originalReplica = shards.getReplicas().get(0); - long replicaCommittedLocalCheckpoint = docs - 1; - boolean replicaHasDocsSinceLastFlushedCheckpoint = false; for (int i = 0; i < randomInt(2); i++) { final int indexedDocs = shards.indexDocs(randomInt(5)); docs += indexedDocs; - if (indexedDocs > 0) { - replicaHasDocsSinceLastFlushedCheckpoint = true; - } final boolean flush = randomBoolean(); if (flush) { originalReplica.flush(new FlushRequest()); - replicaHasDocsSinceLastFlushedCheckpoint = false; - replicaCommittedLocalCheckpoint = docs - 1; } } // simulate a background global checkpoint sync at which point we expect the global checkpoint to advance on the replicas shards.syncGlobalCheckpoint(); - + long globalCheckpointOnReplica = originalReplica.getLastSyncedGlobalCheckpoint(); + Optional safeCommitOnReplica = + originalReplica.store().findSafeIndexCommit(globalCheckpointOnReplica); + assertTrue(safeCommitOnReplica.isPresent()); shards.removeReplica(originalReplica); final int missingOnReplica = shards.indexDocs(randomInt(5)); docs += missingOnReplica; - replicaHasDocsSinceLastFlushedCheckpoint |= missingOnReplica > 0; final boolean translogTrimmed; if (randomBoolean()) { @@ -157,14 +153,15 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { final IndexShard recoveredReplica = shards.addReplicaWithExistingPath(originalReplica.shardPath(), originalReplica.routingEntry().currentNodeId()); shards.recoverReplica(recoveredReplica); - if (translogTrimmed && replicaHasDocsSinceLastFlushedCheckpoint) { + if (translogTrimmed && missingOnReplica > 0) { // replica has something to catch up with, but since we trimmed the primary translog, we should fall back to full recovery assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty())); } else { assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty()); - assertThat( - recoveredReplica.recoveryState().getTranslog().recoveredOperations(), - equalTo(Math.toIntExact(docs - (replicaCommittedLocalCheckpoint + 1)))); + assertThat(recoveredReplica.recoveryState().getTranslog().recoveredOperations(), + equalTo(Math.toIntExact(docs - 1 - safeCommitOnReplica.get().localCheckpoint))); + assertThat(recoveredReplica.recoveryState().getTranslog().totalLocal(), + equalTo(Math.toIntExact(globalCheckpointOnReplica - safeCommitOnReplica.get().localCheckpoint))); } docs += shards.indexDocs(randomInt(5)); @@ -231,10 +228,9 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { try (ReplicationGroup shards = createGroup(2)) { shards.startAll(); int totalDocs = shards.indexDocs(randomInt(10)); - int committedDocs = 0; + shards.syncGlobalCheckpoint(); if (randomBoolean()) { shards.flush(); - committedDocs = totalDocs; } final IndexShard oldPrimary = shards.getPrimary(); @@ -254,7 +250,10 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { oldPrimary.flush(new FlushRequest(index.getName())); } } - + long globalCheckpointOnOldPrimary = oldPrimary.getLastSyncedGlobalCheckpoint(); + Optional safeCommitOnOldPrimary = + oldPrimary.store().findSafeIndexCommit(globalCheckpointOnOldPrimary); + assertTrue(safeCommitOnOldPrimary.isPresent()); shards.promoteReplicaToPrimary(newPrimary).get(); // check that local checkpoint of new primary is properly tracked after primary promotion @@ -310,7 +309,10 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { if (expectSeqNoRecovery) { assertThat(newReplica.recoveryState().getIndex().fileDetails(), empty()); - assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); + assertThat(newReplica.recoveryState().getTranslog().totalLocal(), + equalTo(Math.toIntExact(globalCheckpointOnOldPrimary - safeCommitOnOldPrimary.get().localCheckpoint))); + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), + equalTo(Math.toIntExact(totalDocs - 1 - safeCommitOnOldPrimary.get().localCheckpoint))); } else { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index ef14de16854ef..2f9b67311d52b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -52,9 +53,11 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.analysis.AbstractTokenFilterFactory; import org.elasticsearch.index.analysis.TokenFilterFactory; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; @@ -99,6 +102,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toList; @@ -997,6 +1001,63 @@ public void testRecoveryFlushReplica() throws Exception { assertThat(syncIds, hasSize(1)); } + public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + List nodes = randomSubsetOf(2, StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false) + .map(node -> node.value.getName()).collect(Collectors.toSet())); + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + // disable global checkpoint background sync so we can verify the start recovery request + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "12h") + .put("index.routing.allocation.include._name", String.join(",", nodes)) + .build()); + ensureGreen(indexName); + int numDocs = randomIntBetween(0, 100); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + client().admin().indices().prepareRefresh(indexName).get(); // avoid refresh when we are failing a shard + String failingNode = randomFrom(nodes); + PlainActionFuture startRecoveryRequestFuture = new PlainActionFuture<>(); + for (String node : nodes) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoverySourceService.Actions.START_RECOVERY)) { + startRecoveryRequestFuture.onResponse((StartRecoveryRequest) request); + } + connection.sendRequest(requestId, action, request, options); + }); + } + IndexShard shard = internalCluster().getInstance(IndicesService.class, failingNode) + .getShardOrNull(new ShardId(resolveIndex(indexName), 0)); + final long lastSyncedGlobalCheckpoint = shard.getLastSyncedGlobalCheckpoint(); + final long localCheckpointOfSafeCommit; + try(Engine.IndexCommitRef safeCommitRef = shard.acquireSafeIndexCommit()){ + localCheckpointOfSafeCommit = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommitRef.getIndexCommit().getUserData().entrySet()).localCheckpoint; + } + final long maxSeqNo = shard.seqNoStats().getMaxSeqNo(); + shard.failShard("test", new IOException("simulated")); + StartRecoveryRequest startRecoveryRequest = startRecoveryRequestFuture.actionGet(); + SequenceNumbers.CommitInfo commitInfoAfterLocalRecovery = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( + startRecoveryRequest.metadataSnapshot().getCommitUserData().entrySet()); + assertThat(commitInfoAfterLocalRecovery.localCheckpoint, equalTo(lastSyncedGlobalCheckpoint)); + assertThat(commitInfoAfterLocalRecovery.maxSeqNo, equalTo(lastSyncedGlobalCheckpoint)); + assertThat(startRecoveryRequest.startingSeqNo(), equalTo(lastSyncedGlobalCheckpoint + 1)); + ensureGreen(indexName); + for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) { + if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) { + assertThat("total recovered translog operations must include both local and remote recovery", + recoveryState.getTranslog().recoveredOperations(), equalTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit))); + } + } + for (String node : nodes) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.clearAllRules(); + } + } + public static final class TestAnalysisPlugin extends Plugin implements AnalysisPlugin { final AtomicBoolean throwParsingError = new AtomicBoolean(); @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 1154ce99078b0..e3d299067910f 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -19,21 +19,22 @@ package org.elasticsearch.indices.recovery; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -41,93 +42,23 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CyclicBarrier; import java.util.stream.Collectors; +import java.util.stream.LongStream; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { - public void testGetStartingSeqNo() throws Exception { - final IndexShard replica = newShard(false); - try { - // Empty store - { - recoveryEmptyReplica(replica, true); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L)); - recoveryTarget.decRef(); - } - // Last commit is good - use it. - final long initDocs = scaledRandomIntBetween(1, 10); - { - for (int i = 0; i < initDocs; i++) { - indexDoc(replica, "_doc", Integer.toString(i)); - if (randomBoolean()) { - flushShard(replica); - } - } - flushShard(replica); - replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test"); - replica.sync(); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); - recoveryTarget.decRef(); - } - // Global checkpoint does not advance, last commit is not good - use the previous commit - final int moreDocs = randomIntBetween(1, 10); - { - for (int i = 0; i < moreDocs; i++) { - indexDoc(replica, "_doc", Long.toString(i)); - if (randomBoolean()) { - flushShard(replica); - } - } - flushShard(replica); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); - recoveryTarget.decRef(); - } - // Advances the global checkpoint, a safe commit also advances - { - replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test"); - replica.sync(); - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs)); - recoveryTarget.decRef(); - } - // Different translogUUID, fallback to file-based - { - replica.close("test", false); - final List commits = DirectoryReader.listCommits(replica.store().directory()); - IndexWriterConfig iwc = new IndexWriterConfig(null) - .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) - .setCommitOnClose(false) - .setMergePolicy(NoMergePolicy.INSTANCE) - .setOpenMode(IndexWriterConfig.OpenMode.APPEND); - try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) { - final Map userData = new HashMap<>(commits.get(commits.size() - 1).getUserData()); - userData.put(Translog.TRANSLOG_UUID_KEY, UUIDs.randomBase64UUID()); - writer.setLiveCommitData(userData.entrySet()); - writer.commit(); - } - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - recoveryTarget.decRef(); - } - } finally { - closeShards(replica); - } - } - public void testWriteFileChunksConcurrently() throws Exception { IndexShard sourceShard = newStartedShard(true); int numDocs = between(20, 100); @@ -202,4 +133,86 @@ public void testWriteFileChunksConcurrently() throws Exception { assertThat(diff.different, empty()); closeShards(sourceShard, targetShard); } + + public void testPrepareIndexForPeerRecovery() throws Exception { + CheckedFunction populateData = shard -> { + List seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList()); + Randomness.shuffle(seqNos); + for (long seqNo : seqNos) { + shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse( + shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON)); + if (randomInt(100) < 5) { + shard.flush(new FlushRequest().waitIfOngoing(true)); + } + } + shard.sync(); + long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint()); + shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + shard.sync(); + return globalCheckpoint; + }; + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + + // empty copy + IndexShard shard = newShard(false); + shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, localNode)); + shard.prepareForIndexRecovery(); + assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); + assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + closeShards(shard); + + // good copy + shard = newStartedShard(false); + long globalCheckpoint = populateData.apply(shard); + Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); + assertTrue(safeCommit.isPresent()); + IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), + RecoverySource.PeerRecoverySource.INSTANCE)); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + replica.prepareForIndexRecovery(); + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), + equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint))); + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), + equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint))); + closeShards(replica); + + // corrupted copy + shard = newStartedShard(false); + if (randomBoolean()) { + populateData.apply(shard); + } + shard.store().markStoreCorrupted(new IOException("test")); + replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), + RecoverySource.PeerRecoverySource.INSTANCE)); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + replica.prepareForIndexRecovery(); + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + closeShards(replica); + + // copy with truncated translog + shard = newStartedShard(false); + globalCheckpoint = populateData.apply(shard); + replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), + RecoverySource.PeerRecoverySource.INSTANCE)); + String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint, + replica.shardId(), replica.getPendingPrimaryTerm()); + replica.store().associateIndexWithNewTranslog(translogUUID); + safeCommit = replica.store().findSafeIndexCommit(globalCheckpoint); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + replica.prepareForIndexRecovery(); + if (safeCommit.isPresent()) { + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); + } else { + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); + } + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + closeShards(replica); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 2c6dff473caba..e0e2aee60e61f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -627,18 +627,9 @@ protected final void recoverUnstartedReplica(final IndexShard replica, } replica.prepareForIndexRecovery(); final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); - final String targetAllocationId = recoveryTarget.indexShard().routingEntry().allocationId().getId(); - - final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica); - final long startingSeqNo; - if (snapshot.size() > 0) { - startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget); - } else { - startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; - } - - final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId, - pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo); + final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( + logger, rNode, recoveryTarget, startingSeqNo); final RecoverySourceHandler recovery = new RecoverySourceHandler(primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));