diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index d4cc38f0b959c..ca8a24ea93d0b 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -301,6 +301,19 @@ public final class IndexSettings { public static final Setting INDEX_SEARCH_THROTTLED = Setting.boolSetting("index.search.throttled", false, Property.IndexScope, Property.PrivateIndex, Property.Dynamic); + /** + * Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an + * operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted + * documents) on the grounds that a file-based peer recovery may copy all of the documents in the shard over to the new peer, but is + * significantly faster than replaying the missing operations on the peer, so once a peer falls far enough behind the primary it makes + * more sense to copy all the data over again instead of replaying history. + * + * Defaults to retaining history for up to 10% of the documents in the shard. This can only be changed in tests, since this setting is + * intentionally unregistered. + */ + public static final Setting FILE_BASED_RECOVERY_THRESHOLD_SETTING + = Setting.doubleSetting("index.recovery.file_based_threshold", 0.1d, 0.0d, Setting.Property.IndexScope); + private final Index index; private final Version version; private final Logger logger; diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 313598e1d8ec7..8166a0d37d429 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; @@ -43,7 +44,7 @@ * In particular, this policy will delete index commits whose max sequence number is at most * the current global checkpoint except the index commit which has the highest max sequence number among those. */ -public final class CombinedDeletionPolicy extends IndexDeletionPolicy { +public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; private final SoftDeletesPolicy softDeletesPolicy; @@ -51,6 +52,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point + private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY; CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) { @@ -62,7 +64,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { } @Override - public synchronized void onInit(List commits) throws IOException { + public void onInit(List commits) throws IOException { assert commits.isEmpty() == false : "index is opened, but we have no commits"; onCommit(commits); if (safeCommit != commits.get(commits.size() - 1)) { @@ -74,16 +76,32 @@ public synchronized void onInit(List commits) throws IOEx } @Override - public synchronized void onCommit(List commits) throws IOException { - final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); - lastCommit = commits.get(commits.size() - 1); - safeCommit = commits.get(keptPosition); - for (int i = 0; i < keptPosition; i++) { - if (snapshottedCommits.containsKey(commits.get(i)) == false) { - deleteCommit(commits.get(i)); + public void onCommit(List commits) throws IOException { + final IndexCommit safeCommit; + synchronized (this) { + final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); + this.safeCommitInfo = SafeCommitInfo.EMPTY; + this.lastCommit = commits.get(commits.size() - 1); + this.safeCommit = commits.get(keptPosition); + for (int i = 0; i < keptPosition; i++) { + if (snapshottedCommits.containsKey(commits.get(i)) == false) { + deleteCommit(commits.get(i)); + } } + updateRetentionPolicy(); + safeCommit = this.safeCommit; } - updateRetentionPolicy(); + + assert Thread.holdsLock(this) == false : "should not block concurrent acquire or relesase"; + safeCommitInfo = new SafeCommitInfo(Long.parseLong( + safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), getDocCountOfCommit(safeCommit)); + + // This is protected from concurrent calls by a lock on the IndexWriter, but this assertion makes sure that we notice if that ceases + // to be true in future. It is not disastrous if safeCommitInfo refers to an older safeCommit, it just means that we might retain a + // bit more history and do a few more ops-based recoveries than we would otherwise. + final IndexCommit newSafeCommit = this.safeCommit; + assert safeCommit == newSafeCommit + : "onCommit called concurrently? " + safeCommit.getGeneration() + " vs " + newSafeCommit.getGeneration(); } private void deleteCommit(IndexCommit commit) throws IOException { @@ -109,6 +127,14 @@ private void updateRetentionPolicy() throws IOException { Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); } + protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException { + return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc(); + } + + SafeCommitInfo getSafeCommitInfo() { + return safeCommitInfo; + } + /** * Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}. * Index files of the capturing commit point won't be released until the commit reference is closed. diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7a50d3471a335..f26e5b8ad1ffe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1122,6 +1122,11 @@ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyE */ public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException; + /** + * @return a summary of the contents of the current safe commit + */ + public abstract SafeCommitInfo getSafeCommitInfo(); + /** * If the specified throwable contains a fatal error in the throwable graph, such a fatal error will be thrown. Callers should ensure * that there are no catch statements that would catch an error in the stack as the fatal error here should go uncaught and be handled diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5a8662845c482..b83c0a70178a0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2008,6 +2008,11 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException { } } + @Override + public SafeCommitInfo getSafeCommitInfo() { + return combinedDeletionPolicy.getSafeCommitInfo(); + } + private boolean failOnTragicEvent(AlreadyClosedException ex) { final boolean engineFailed; // if we are already closed due to some tragic exception diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 30b3d0221f36d..ded39c51b3712 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -77,6 +77,7 @@ public class ReadOnlyEngine extends Engine { private final Lock indexWriterLock; private final DocsStats docsStats; private final RamAccountingRefreshListener refreshListener; + private final SafeCommitInfo safeCommitInfo; protected volatile TranslogStats translogStats; @@ -120,6 +121,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time"; this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; + this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc()); success = true; } finally { if (success == false) { @@ -420,6 +422,11 @@ public IndexCommitRef acquireSafeIndexCommit() { return acquireLastIndexCommit(false); } + @Override + public SafeCommitInfo getSafeCommitInfo() { + return safeCommitInfo; + } + @Override public void activateThrottling() { } diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java b/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java new file mode 100644 index 0000000000000..37461177c93cf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeCommitInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.elasticsearch.index.seqno.SequenceNumbers; + +/** + * Information about the safe commit, for making decisions about recoveries. + */ +public class SafeCommitInfo { + + public final long localCheckpoint; + public final int docCount; + + public SafeCommitInfo(long localCheckpoint, int docCount) { + this.localCheckpoint = localCheckpoint; + this.docCount = docCount; + } + + public static final SafeCommitInfo EMPTY = new SafeCommitInfo(SequenceNumbers.NO_OPS_PERFORMED, 0); +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 465abdd0e27b3..1ef7c27c51796 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; @@ -57,6 +58,7 @@ import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -210,6 +212,17 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private boolean hasAllPeerRecoveryRetentionLeases; + /** + * Supplies information about the current safe commit which may be used to expire peer-recovery retention leases. + */ + private final Supplier safeCommitInfoSupplier; + + /** + * Threshold for expiring peer-recovery retention leases and falling back to file-based recovery. See + * {@link IndexSettings#FILE_BASED_RECOVERY_THRESHOLD_SETTING}. + */ + private final double fileBasedRecoveryThreshold; + /** * Get all retention leases tracked on this shard. * @@ -237,6 +250,8 @@ public synchronized Tuple getRetentionLeases(final boo final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); final Set leaseIdsForCurrentPeers = routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()); + final boolean allShardsStarted = routingTable.allShardsStarted(); + final long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : getMinimumReasonableRetainedSeqNo(); final Map> partitionByExpiration = retentionLeases .leases() .stream() @@ -245,7 +260,12 @@ public synchronized Tuple getRetentionLeases(final boo if (leaseIdsForCurrentPeers.contains(lease.id())) { return false; } - if (routingTable.allShardsStarted()) { + if (allShardsStarted) { + logger.trace("expiring unused [{}]", lease); + return true; + } + if (lease.retainingSequenceNumber() < minimumReasonableRetainedSeqNo) { + logger.trace("expiring unreasonable [{}] retaining history before [{}]", lease, minimumReasonableRetainedSeqNo); return true; } } @@ -264,6 +284,17 @@ public synchronized Tuple getRetentionLeases(final boo return Tuple.tuple(true, retentionLeases); } + private long getMinimumReasonableRetainedSeqNo() { + final SafeCommitInfo safeCommitInfo = safeCommitInfoSupplier.get(); + return safeCommitInfo.localCheckpoint + 1 - Math.round(Math.ceil(safeCommitInfo.docCount * fileBasedRecoveryThreshold)); + // NB safeCommitInfo.docCount is a very low-level count of the docs in the index, and in particular if this shard contains nested + // docs then safeCommitInfo.docCount counts every child doc separately from the parent doc. However every part of a nested document + // has the same seqno, so we may be overestimating the cost of a file-based recovery when compared to an ops-based recovery and + // therefore preferring ops-based recoveries inappropriately in this case. Correctly accounting for nested docs seems difficult to + // do cheaply, and the circumstances in which this matters should be relatively rare, so we use this naive calculation regardless. + // TODO improve this measure for when nested docs are in use + } + /** * Adds a new retention lease. * @@ -850,7 +881,8 @@ public ReplicationTracker( final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer> onSyncRetentionLeases) { + final BiConsumer> onSyncRetentionLeases, + final Supplier safeCommitInfoSupplier) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -867,6 +899,8 @@ public ReplicationTracker( this.routingTable = null; this.replicationGroup = null; this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); + this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); + this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dafd379b1927d..225056e2edc31 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -92,6 +92,7 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; @@ -336,7 +337,8 @@ public IndexShard( UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, - (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener)); + (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener), + this::getSafeCommitInfo); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -2612,6 +2614,11 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener> delegates = new CopyOnWriteArrayList<>(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 110a27ff5510f..4e82a77ce43a7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -55,7 +55,7 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, extraRetainedOps, () -> RetentionLeases.EMPTY); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -102,7 +102,7 @@ public void testAcquireIndexCommit() throws Exception { new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); long lastTranslogGen = between(1, 20); @@ -182,7 +182,7 @@ public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -217,7 +217,7 @@ public void testCheckUnreferencedCommits() throws Exception { final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, () -> RetentionLeases.EMPTY); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); - CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); + CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); @@ -254,6 +254,17 @@ public void testCheckUnreferencedCommits() throws Exception { } } + private CombinedDeletionPolicy newCombinedDeletionPolicy(TranslogDeletionPolicy translogPolicy, SoftDeletesPolicy softDeletesPolicy, + AtomicLong globalCheckpoint) { + return new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get) + { + @Override + protected int getDocCountOfCommit(IndexCommit indexCommit) { + return between(0, 1000); + } + }; + } + IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { final Map userData = new HashMap<>(); userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java index 22d4f5e86f964..fe2d8f27aa308 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseExpiryTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.IndexSettingsModule; import org.junit.Before; @@ -37,6 +38,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -48,6 +50,7 @@ public class PeerRecoveryRetentionLeaseExpiryTests extends ReplicationTrackerTes private ReplicationTracker replicationTracker; private AtomicLong currentTimeMillis; private Settings settings; + private SafeCommitInfo safeCommitInfo; @Before public void setUpReplicationTracker() throws InterruptedException { @@ -63,6 +66,8 @@ public void setUpReplicationTracker() throws InterruptedException { settings = Settings.EMPTY; } + safeCommitInfo = null; // must be set in each test + final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); replicationTracker = new ReplicationTracker( new ShardId("test", "_na", 0), @@ -72,7 +77,8 @@ public void setUpReplicationTracker() throws InterruptedException { UNASSIGNED_SEQ_NO, value -> { }, currentTimeMillis::get, - (leases, listener) -> { }); + (leases, listener) -> { }, + () -> safeCommitInfo); replicationTracker.updateFromMaster(1L, Collections.singleton(primaryAllocationId.getId()), routingTable(Collections.emptySet(), primaryAllocationId)); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); @@ -109,6 +115,7 @@ public void testPeerRecoveryRetentionLeasesForAssignedCopiesDoNotEverExpire() { } currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get())); + safeCommitInfo = randomSafeCommitInfo(); final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertFalse(retentionLeases.v1()); @@ -121,11 +128,14 @@ public void testPeerRecoveryRetentionLeasesForAssignedCopiesDoNotEverExpire() { public void testPeerRecoveryRetentionLeasesForUnassignedCopiesDoNotExpireImmediatelyIfShardsNotAllStarted() { final String unknownNodeId = randomAlphaOfLength(10); - replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + final long globalCheckpoint = randomNonNegativeLong(); // not NO_OPS_PERFORMED since this always results in file-based recovery + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis())); + safeCommitInfo = randomSafeCommitInfoSuitableForOpsBasedRecovery(globalCheckpoint); + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertFalse("should not have expired anything", retentionLeases.v1()); @@ -142,12 +152,15 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireEventually() } final String unknownNodeId = randomAlphaOfLength(10); - replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, randomCheckpoint(), EMPTY_LISTENER); + final long globalCheckpoint = randomCheckpoint(); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); currentTimeMillis.set(randomLongBetween( currentTimeMillis.get() + IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis() + 1, Long.MAX_VALUE)); + safeCommitInfo = randomSafeCommitInfoSuitableForOpsBasedRecovery(globalCheckpoint); + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertTrue("should have expired something", retentionLeases.v1()); @@ -167,6 +180,7 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireImmediatelyI (usually() ? randomLongBetween(0, IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.get(settings).millis()) : randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()))); + safeCommitInfo = randomSafeCommitInfo(); final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); assertTrue(retentionLeases.v1()); @@ -176,4 +190,41 @@ public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireImmediatelyI assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); } + + public void testPeerRecoveryRetentionLeasesForUnassignedCopiesExpireIfRetainingTooMuchHistory() { + if (randomBoolean()) { + startReplica(); + } + + final String unknownNodeId = randomAlphaOfLength(10); + final long globalCheckpoint = randomValueOtherThan(SequenceNumbers.NO_OPS_PERFORMED, this::randomCheckpoint); + replicationTracker.addPeerRecoveryRetentionLease(unknownNodeId, globalCheckpoint, EMPTY_LISTENER); + + safeCommitInfo = randomSafeCommitInfoSuitableForFileBasedRecovery(globalCheckpoint); + + final Tuple retentionLeases = replicationTracker.getRetentionLeases(true); + assertTrue("should have expired something", retentionLeases.v1()); + + final Set leaseIds = retentionLeases.v2().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()); + assertThat(leaseIds, hasSize(2)); + assertThat(leaseIds, equalTo(replicationTracker.routingTable.shards().stream() + .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet()))); + } + + private SafeCommitInfo randomSafeCommitInfo() { + return randomBoolean() ? SafeCommitInfo.EMPTY : new SafeCommitInfo( + randomFrom(randomNonNegativeLong(), (long) randomIntBetween(0, Integer.MAX_VALUE)), + randomIntBetween(0, Integer.MAX_VALUE)); + } + + private SafeCommitInfo randomSafeCommitInfoSuitableForOpsBasedRecovery(long globalCheckpoint) { + // simulate a safe commit that is behind the given global checkpoint, so that no files need to be transferrred + final long localCheckpoint = randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint); + return new SafeCommitInfo(localCheckpoint, between(0, Math.toIntExact(Math.min(localCheckpoint + 1, Integer.MAX_VALUE)))); + } + + private SafeCommitInfo randomSafeCommitInfoSuitableForFileBasedRecovery(long globalCheckpoint) { + // simulate a later safe commit containing no documents, which is always better to transfer than any ops + return new SafeCommitInfo(randomLongBetween(globalCheckpoint + 1, Long.MAX_VALUE), 0); + } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 7611fad5a7e43..bdf7acf478b91 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -70,7 +70,8 @@ public void testAddOrRenewRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -111,7 +112,8 @@ public void testAddDuplicateRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -139,7 +141,8 @@ public void testRenewNotFoundRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -174,7 +177,8 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() { .stream() .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), equalTo(retainingSequenceNumbers)); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); reference.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -210,7 +214,8 @@ public void testRemoveRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -266,7 +271,8 @@ public void testCloneRetentionLease() { assertFalse(Thread.holdsLock(replicationTrackerRef.get())); assertTrue(synced.compareAndSet(false, true)); listener.onResponse(new ReplicationResponse()); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTrackerRef.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -309,7 +315,8 @@ public void testCloneNonexistentRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> { }); + (leases, listener) -> { }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -331,7 +338,8 @@ public void testCloneDuplicateRetentionLease() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> { }); + (leases, listener) -> { }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -357,7 +365,8 @@ public void testRemoveNotFound() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -392,7 +401,8 @@ public void testRemoveRetentionLeaseCausesRetentionLeaseSync() { .stream() .collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), equalTo(retainingSequenceNumbers)); - }); + }, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); reference.set(replicationTracker); replicationTracker.updateFromMaster( randomNonNegativeLong(), @@ -445,7 +455,8 @@ private void runExpirationTest(final boolean primaryMode) { UNASSIGNED_SEQ_NO, value -> {}, currentTimeMillis::get, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -519,7 +530,8 @@ public void testReplicaIgnoresOlderRetentionLeasesVersion() { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -572,7 +584,8 @@ public void testLoadAndPersistRetentionLeases() throws IOException { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -605,7 +618,8 @@ public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), @@ -653,7 +667,8 @@ public void testPersistRetentionLeasesUnderConcurrency() throws IOException { UNASSIGNED_SEQ_NO, value -> {}, () -> 0L, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); replicationTracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(allocationId.getId()), diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java index 5f035a3604f41..cc32d5198c8b0 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -32,6 +33,7 @@ import java.util.Set; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -49,9 +51,12 @@ ReplicationTracker newTracker( UNASSIGNED_SEQ_NO, updatedGlobalCheckpoint, currentTimeMillisSupplier, - (leases, listener) -> {}); + (leases, listener) -> {}, + OPS_BASED_RECOVERY_ALWAYS_REASONABLE); } + static final Supplier OPS_BASED_RECOVERY_ALWAYS_REASONABLE = () -> SafeCommitInfo.EMPTY; + static String nodeIdFromAllocationId(final AllocationId allocationId) { return "n-" + allocationId.getId().substring(0, 8); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index afbd560758cf1..e7d68baf26599 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -694,10 +694,10 @@ public void testPrimaryContextHandoff() throws IOException { final long globalCheckpoint = UNASSIGNED_SEQ_NO; final BiConsumer> onNewRetentionLease = (leases, listener) -> {}; - ReplicationTracker oldPrimary = new ReplicationTracker( - shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); - ReplicationTracker newPrimary = new ReplicationTracker( - shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); + ReplicationTracker oldPrimary = new ReplicationTracker(shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, + onUpdate, () -> 0L, onNewRetentionLease, OPS_BASED_RECOVERY_ALWAYS_REASONABLE); + ReplicationTracker newPrimary = new ReplicationTracker(shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, + onUpdate, () -> 0L, onNewRetentionLease, OPS_BASED_RECOVERY_ALWAYS_REASONABLE); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index ad791702ffe89..77d47d6d24191 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -1206,6 +1206,109 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); } + public void testUsesFileBasedRecoveryIfOperationsBasedRecoveryWouldBeUnreasonable() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + + String indexName = "test-index"; + final Settings.Builder settings = Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "12h") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"); + + final double reasonableOperationsBasedRecoveryProportion; + if (randomBoolean()) { + reasonableOperationsBasedRecoveryProportion = randomDoubleBetween(0.05, 0.99, true); + settings.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), + reasonableOperationsBasedRecoveryProportion); + } else { + reasonableOperationsBasedRecoveryProportion + = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(Settings.EMPTY); + } + logger.info("--> performing ops-based recoveries up to [{}%] of docs", reasonableOperationsBasedRecoveryProportion * 100.0); + + createIndex(indexName, settings.build()); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(0, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + flush(indexName); + // wait for all history to be discarded + assertBusy(() -> { + for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) { + final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + assertTrue(shardStats.getRetentionLeaseStats().retentionLeases() + " should discard history up to " + maxSeqNo, + shardStats.getRetentionLeaseStats().retentionLeases().leases().stream().allMatch( + l -> l.retainingSequenceNumber() == maxSeqNo + 1)); + } + }); + flush(indexName); // ensure that all operations are in the safe commit + + final ShardStats shardStats = client().admin().indices().prepareStats(indexName).get().getShards()[0]; + final long docCount = shardStats.getStats().docs.getCount(); + assertThat(shardStats.getStats().docs.getDeleted(), equalTo(0L)); + assertThat(shardStats.getSeqNoStats().getMaxSeqNo() + 1, equalTo(docCount)); + + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final DiscoveryNodes discoveryNodes = clusterService().state().nodes(); + final IndexShardRoutingTable indexShardRoutingTable = clusterService().state().routingTable().shardRoutingTable(shardId); + + final ShardRouting replicaShardRouting = indexShardRoutingTable.replicaShards().get(0); + assertTrue("should have lease for " + replicaShardRouting, + client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats() + .retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting))); + internalCluster().restartNode(discoveryNodes.get(replicaShardRouting.currentNodeId()).getName(), + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertFalse(client().admin().cluster().prepareHealth() + .setWaitForNodes(Integer.toString(discoveryNodes.getSize() - 1)) + .setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + + final int newDocCount = Math.toIntExact(Math.round(Math.ceil( + (1 + Math.ceil(docCount * reasonableOperationsBasedRecoveryProportion)) + / (1 - reasonableOperationsBasedRecoveryProportion)))); + + /* + * newDocCount >= (ceil(docCount * p) + 1) / (1-p) + * + * ==> 0 <= newDocCount * (1-p) - ceil(docCount * p) - 1 + * = newDocCount - (newDocCount * p + ceil(docCount * p) + 1) + * < newDocCount - (ceil(newDocCount * p) + ceil(docCount * p)) + * <= newDocCount - ceil(newDocCount * p + docCount * p) + * + * ==> docCount < newDocCount + docCount - ceil((newDocCount + docCount) * p) + * == localCheckpoint + 1 - ceil((newDocCount + docCount) * p) + * == firstReasonableSeqNo + * + * The replica has docCount docs, i.e. has operations with seqnos [0..docCount-1], so a seqno-based recovery will start + * from docCount < firstReasonableSeqNo + * + * ==> it is unreasonable to recover the replica using a seqno-based recovery + */ + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, newDocCount) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + + flush(indexName); + + assertBusy(() -> assertFalse("should no longer have lease for " + replicaShardRouting, + client().admin().indices().prepareStats(indexName).get().getShards()[0].getRetentionLeaseStats() + .retentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaShardRouting)))); + + return super.onNodeStopped(nodeName); + } + }); + + ensureGreen(indexName); + + //noinspection OptionalGetWithoutIsPresent because it fails the test if absent + final RecoveryState recoveryState = client().admin().indices().prepareRecoveries(indexName).get() + .shardRecoveryStates().get(indexName).stream().filter(rs -> rs.getPrimary() == false).findFirst().get(); + assertThat(recoveryState.getIndex().totalFileCount(), greaterThan(0)); + } + public void testDoesNotCopyOperationsInSafeCommit() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2c54189e20c16..9e1cc211008fc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -674,7 +674,8 @@ public EngineConfig config( SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L, - (leases, listener) -> listener.onResponse(new ReplicationResponse())); + (leases, listener) -> listener.onResponse(new ReplicationResponse()), + () -> SafeCommitInfo.EMPTY); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; } else { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index fdb623d1d1e91..246dac18ef8a2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; import java.util.Arrays; @@ -51,6 +52,7 @@ public List> getSettings() { TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING, IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING, + IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING, IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING ); }