Skip to content

Commit

Permalink
Only retain reasonable history for peer recoveries (#45208) (#45355)
Browse files Browse the repository at this point in the history
Today if a shard is not fully allocated we maintain a retention lease for a
lost peer for up to 12 hours, retaining all operations that occur in that time
period so that we can recover this replica using an operations-based recovery
if it returns. However it is not always reasonable to perform an
operations-based recovery on such a replica: if the replica is a very long way
behind the rest of the replication group then it can be much quicker to perform
a file-based recovery instead.

This commit introduces a notion of "reasonable" recoveries. If an
operations-based recovery would involve copying only a small number of
operations, but the index is large, then an operations-based recovery is
reasonable; on the other hand if there are many operations to copy across and
the index itself is relatively small then it makes more sense to perform a
file-based recovery. We measure the size of the index by computing its number
of documents (including deleted documents) in all segments belonging to the
current safe commit, and compare this to the number of operations a lease is
retaining below the local checkpoint of the safe commit. We consider an
operations-based recovery to be reasonable iff it would involve replaying at
most 10% of the documents in the index.

The mechanism for this feature is to expire peer-recovery retention leases
early if they are retaining so much history that an operations-based recovery
using that lease would be unreasonable.

Relates #41536
  • Loading branch information
original-brownbear authored Aug 8, 2019
1 parent 7b0a804 commit 12ed6dc
Show file tree
Hide file tree
Showing 16 changed files with 363 additions and 41 deletions.
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,19 @@ public final class IndexSettings {
public static final Setting<Boolean> INDEX_SEARCH_THROTTLED = Setting.boolSetting("index.search.throttled", false,
Property.IndexScope, Property.PrivateIndex, Property.Dynamic);

/**
* Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an
* operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted
* documents) on the grounds that a file-based peer recovery may copy all of the documents in the shard over to the new peer, but is
* significantly faster than replaying the missing operations on the peer, so once a peer falls far enough behind the primary it makes
* more sense to copy all the data over again instead of replaying history.
*
* Defaults to retaining history for up to 10% of the documents in the shard. This can only be changed in tests, since this setting is
* intentionally unregistered.
*/
public static final Setting<Double> FILE_BASED_RECOVERY_THRESHOLD_SETTING
= Setting.doubleSetting("index.recovery.file_based_threshold", 0.1d, 0.0d, Setting.Property.IndexScope);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
Expand All @@ -43,14 +44,15 @@
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
public class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final SoftDeletesPolicy softDeletesPolicy;
private final LongSupplier globalCheckpointSupplier;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point
private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY;

CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
SoftDeletesPolicy softDeletesPolicy, LongSupplier globalCheckpointSupplier) {
Expand All @@ -62,7 +64,7 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
}

@Override
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
public void onInit(List<? extends IndexCommit> commits) throws IOException {
assert commits.isEmpty() == false : "index is opened, but we have no commits";
onCommit(commits);
if (safeCommit != commits.get(commits.size() - 1)) {
Expand All @@ -74,16 +76,32 @@ public synchronized void onInit(List<? extends IndexCommit> commits) throws IOEx
}

@Override
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
lastCommit = commits.get(commits.size() - 1);
safeCommit = commits.get(keptPosition);
for (int i = 0; i < keptPosition; i++) {
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
deleteCommit(commits.get(i));
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
final IndexCommit safeCommit;
synchronized (this) {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
this.safeCommitInfo = SafeCommitInfo.EMPTY;
this.lastCommit = commits.get(commits.size() - 1);
this.safeCommit = commits.get(keptPosition);
for (int i = 0; i < keptPosition; i++) {
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
deleteCommit(commits.get(i));
}
}
updateRetentionPolicy();
safeCommit = this.safeCommit;
}
updateRetentionPolicy();

assert Thread.holdsLock(this) == false : "should not block concurrent acquire or relesase";
safeCommitInfo = new SafeCommitInfo(Long.parseLong(
safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), getDocCountOfCommit(safeCommit));

// This is protected from concurrent calls by a lock on the IndexWriter, but this assertion makes sure that we notice if that ceases
// to be true in future. It is not disastrous if safeCommitInfo refers to an older safeCommit, it just means that we might retain a
// bit more history and do a few more ops-based recoveries than we would otherwise.
final IndexCommit newSafeCommit = this.safeCommit;
assert safeCommit == newSafeCommit
: "onCommit called concurrently? " + safeCommit.getGeneration() + " vs " + newSafeCommit.getGeneration();
}

private void deleteCommit(IndexCommit commit) throws IOException {
Expand All @@ -109,6 +127,14 @@ private void updateRetentionPolicy() throws IOException {
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
}

protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc();
}

SafeCommitInfo getSafeCommitInfo() {
return safeCommitInfo;
}

/**
* Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}.
* Index files of the capturing commit point won't be released until the commit reference is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,11 @@ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyE
*/
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;

/**
* @return a summary of the contents of the current safe commit
*/
public abstract SafeCommitInfo getSafeCommitInfo();

/**
* If the specified throwable contains a fatal error in the throwable graph, such a fatal error will be thrown. Callers should ensure
* that there are no catch statements that would catch an error in the stack as the fatal error here should go uncaught and be handled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,11 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
}
}

@Override
public SafeCommitInfo getSafeCommitInfo() {
return combinedDeletionPolicy.getSafeCommitInfo();
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
final boolean engineFailed;
// if we are already closed due to some tragic exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class ReadOnlyEngine extends Engine {
private final Lock indexWriterLock;
private final DocsStats docsStats;
private final RamAccountingRefreshListener refreshListener;
private final SafeCommitInfo safeCommitInfo;

protected volatile TranslogStats translogStats;

Expand Down Expand Up @@ -120,6 +121,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
assert translogStats != null || obtainLock : "mutiple translogs instances should not be opened at the same time";
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc());
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -420,6 +422,11 @@ public IndexCommitRef acquireSafeIndexCommit() {
return acquireLastIndexCommit(false);
}

@Override
public SafeCommitInfo getSafeCommitInfo() {
return safeCommitInfo;
}

@Override
public void activateThrottling() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;

import org.elasticsearch.index.seqno.SequenceNumbers;

/**
* Information about the safe commit, for making decisions about recoveries.
*/
public class SafeCommitInfo {

public final long localCheckpoint;
public final int docCount;

public SafeCommitInfo(long localCheckpoint, int docCount) {
this.localCheckpoint = localCheckpoint;
this.docCount = docCount;
}

public static final SafeCommitInfo EMPTY = new SafeCommitInfo(SequenceNumbers.NO_OPS_PERFORMED, 0);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.SafeCommitInfo;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
Expand All @@ -57,6 +58,7 @@
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
Expand Down Expand Up @@ -210,6 +212,17 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private boolean hasAllPeerRecoveryRetentionLeases;

/**
* Supplies information about the current safe commit which may be used to expire peer-recovery retention leases.
*/
private final Supplier<SafeCommitInfo> safeCommitInfoSupplier;

/**
* Threshold for expiring peer-recovery retention leases and falling back to file-based recovery. See
* {@link IndexSettings#FILE_BASED_RECOVERY_THRESHOLD_SETTING}.
*/
private final double fileBasedRecoveryThreshold;

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -237,6 +250,8 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Set<String> leaseIdsForCurrentPeers
= routingTable.assignedShards().stream().map(ReplicationTracker::getPeerRecoveryRetentionLeaseId).collect(Collectors.toSet());
final boolean allShardsStarted = routingTable.allShardsStarted();
final long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : getMinimumReasonableRetainedSeqNo();
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
.leases()
.stream()
Expand All @@ -245,7 +260,12 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
if (leaseIdsForCurrentPeers.contains(lease.id())) {
return false;
}
if (routingTable.allShardsStarted()) {
if (allShardsStarted) {
logger.trace("expiring unused [{}]", lease);
return true;
}
if (lease.retainingSequenceNumber() < minimumReasonableRetainedSeqNo) {
logger.trace("expiring unreasonable [{}] retaining history before [{}]", lease, minimumReasonableRetainedSeqNo);
return true;
}
}
Expand All @@ -264,6 +284,17 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
return Tuple.tuple(true, retentionLeases);
}

private long getMinimumReasonableRetainedSeqNo() {
final SafeCommitInfo safeCommitInfo = safeCommitInfoSupplier.get();
return safeCommitInfo.localCheckpoint + 1 - Math.round(Math.ceil(safeCommitInfo.docCount * fileBasedRecoveryThreshold));
// NB safeCommitInfo.docCount is a very low-level count of the docs in the index, and in particular if this shard contains nested
// docs then safeCommitInfo.docCount counts every child doc separately from the parent doc. However every part of a nested document
// has the same seqno, so we may be overestimating the cost of a file-based recovery when compared to an ops-based recovery and
// therefore preferring ops-based recoveries inappropriately in this case. Correctly accounting for nested docs seems difficult to
// do cheaply, and the circumstances in which this matters should be relatively rare, so we use this naive calculation regardless.
// TODO improve this measure for when nested docs are in use
}

/**
* Adds a new retention lease.
*
Expand Down Expand Up @@ -850,7 +881,8 @@ public ReplicationTracker(
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases,
final Supplier<SafeCommitInfo> safeCommitInfoSupplier) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand All @@ -867,6 +899,8 @@ public ReplicationTracker(
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
assert invariant();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.SafeCommitInfo;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
Expand Down Expand Up @@ -336,7 +337,8 @@ public IndexShard(
UNASSIGNED_SEQ_NO,
globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis,
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener));
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener),
this::getSafeCommitInfo);

// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
Expand Down Expand Up @@ -2612,6 +2614,11 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<Repli
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);
}

private SafeCommitInfo getSafeCommitInfo() {
final Engine engine = getEngineOrNull();
return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo();
}

class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

Expand Down
Loading

0 comments on commit 12ed6dc

Please sign in to comment.