Skip to content

Commit

Permalink
Use peer recovery retention leases for indices without soft-deletes (#…
Browse files Browse the repository at this point in the history
…50351)

Today, the replica allocator uses peer recovery retention leases to 
select the best-matched copies when allocating replicas of indices with
soft-deletes. We can employ this mechanism for indices without
soft-deletes because the retaining sequence number of a PRRL is the
persisted global checkpoint (plus one) of that copy. If the primary and 
replica have the same retaining sequence number, then we should be able
to perform a noop recovery. The reason is that we must be retaining
translog up to the local checkpoint of the safe commit, which is at most
the global checkpoint of either copy). The only limitation is that we
might not cancel ongoing file-based recoveries with PRRLs for noop
recoveries. We can't make the translog retention policy comply with
PRRLs. We also have this problem with soft-deletes if a PRRL is about to
expire.

Relates #45136
Relates #46959
  • Loading branch information
dnhatn authored Dec 20, 2019
1 parent 40ef785 commit cec6678
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ public void testOperationBasedRecovery() throws Exception {
}
}
flush(index, true);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false);
// less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1));
for (int i = 0; i < uncommittedDocs; i++) {
Expand All @@ -1288,6 +1288,7 @@ public void testOperationBasedRecovery() throws Exception {
} else {
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true);
}
}

Expand All @@ -1312,6 +1313,7 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
ensureGreen(index);
flush(index, true);
assertEmptyTranslog(index);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ public void testOperationBasedRecovery() throws Exception {
ensureGreen(index);
indexDocs(index, 0, randomIntBetween(100, 200));
flush(index, randomBoolean());
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false);
// uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
} else {
Expand All @@ -705,6 +705,9 @@ public void testOperationBasedRecovery() throws Exception {
|| nodeName.startsWith(CLUSTER_NAME + "-0")
|| (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false));
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,9 +820,7 @@ private void maybeSyncGlobalCheckpoints() {
}

private void syncRetentionLeases() {
if (indexSettings.isSoftDeleteEnabled()) {
sync(IndexShard::syncRetentionLeases, "retention lease");
}
sync(IndexShard::syncRetentionLeases, "retention lease");
}

private void sync(final Consumer<IndexShard> sync, final String source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,10 +895,12 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() &&
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN));
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)
|| (indexSettings.isSoftDeleteEnabled() &&
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN)));

this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
Expand Down Expand Up @@ -994,10 +996,7 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint);
updateGlobalCheckpointOnPrimary();

if (indexSettings.isSoftDeleteEnabled()) {
addPeerRecoveryRetentionLeaseForSolePrimary();
}

addPeerRecoveryRetentionLeaseForSolePrimary();
assert invariant();
}

Expand Down Expand Up @@ -1358,7 +1357,7 @@ public synchronized boolean hasAllPeerRecoveryRetentionLeases() {
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
*/
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) {
if (hasAllPeerRecoveryRetentionLeases == false) {
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
setHasAllPeerRecoveryRetentionLeases();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1892,10 +1892,10 @@ boolean shouldRollTranslogGeneration() {
public void onSettingsChanged() {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null) {
final boolean useRetentionLeasesInPeerRecovery = this.useRetentionLeasesInPeerRecovery;
final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery;
engineOrNull.onSettingsChanged(
useRetentionLeasesInPeerRecovery ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
useRetentionLeasesInPeerRecovery ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);
}
Expand Down Expand Up @@ -2224,7 +2224,6 @@ public boolean assertRetentionLeasesPersisted() throws IOException {
public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
replicationTracker.renewPeerRecoveryRetentionLeases();
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
Expand Down Expand Up @@ -2619,7 +2618,7 @@ public RetentionLease addPeerRecoveryRetentionLease(String nodeId, long globalCh
ActionListener<ReplicationResponse> listener) {
assert assertPrimaryMode();
// only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs:
assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
assert indexSettings.getIndexVersionCreated().before(Version.V_7_4_0) || indexSettings.isSoftDeleteEnabled() == false;
return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get(
ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null);
retentionLeaseRef.set(
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
shard, cancellableThreads, logger);
final Engine.HistorySource historySource;
if (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) {
if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) {
historySource = Engine.HistorySource.INDEX;
} else {
historySource = Engine.HistorySource.TRANSLOG;
Expand All @@ -190,7 +190,7 @@ && isTargetSameHistory()
// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
// without having a complete history.

if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) {
if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) {
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
retentionLock.close();
logger.trace("history is retained by {}", retentionLeaseRef.get());
Expand All @@ -209,7 +209,7 @@ && isTargetSameHistory()
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
if (softDeletesEnabled && retentionLeaseRef.get() == null) {
if (retentionLeaseRef.get() == null) {
createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));
} else {
sendFileStep.onResponse(SendFileResult.EMPTY);
Expand Down Expand Up @@ -251,36 +251,24 @@ && isTargetSameHistory()
});

final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
if (softDeletesEnabled) {
runUnderPrimaryPermit(() -> {
try {
// If the target previously had a copy of this shard then a file-based recovery might move its global
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
// new one later on in the recovery.
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
deleteRetentionLeaseStep, false));
} catch (RetentionLeaseNotFoundException e) {
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
deleteRetentionLeaseStep.onResponse(null);
}
}, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]",
shard, cancellableThreads, logger);
} else {
deleteRetentionLeaseStep.onResponse(null);
}
runUnderPrimaryPermit(() -> {
try {
// If the target previously had a copy of this shard then a file-based recovery might move its global
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
// new one later on in the recovery.
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
deleteRetentionLeaseStep, false));
} catch (RetentionLeaseNotFoundException e) {
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
deleteRetentionLeaseStep.onResponse(null);
}
}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]",
shard, cancellableThreads, logger);

deleteRetentionLeaseStep.whenComplete(ignored -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");

final Consumer<ActionListener<RetentionLease>> createRetentionLeaseAsync;
if (softDeletesEnabled) {
createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l);
} else {
createRetentionLeaseAsync = l -> l.onResponse(null);
}

phase1(safeCommitRef.getIndexCommit(), createRetentionLeaseAsync, () -> estimateNumOps, sendFileStep);
phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep);
}, onFailure);

} catch (final Exception e) {
Expand Down Expand Up @@ -451,8 +439,7 @@ static final class SendFileResult {
* segments that are missing. Only segments that have the same size and
* checksum can be reused
*/
void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> createRetentionLease,
IntSupplier translogOps, ActionListener<SendFileResult> listener) {
void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
cancellableThreads.checkForCancel();
final Store store = shard.store();
try {
Expand Down Expand Up @@ -526,7 +513,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
sendFileInfoStep.whenComplete(r ->
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);

sendFilesStep.whenComplete(r -> createRetentionLease.accept(createRetentionLeaseStep), listener::onFailure);
sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);

createRetentionLeaseStep.whenComplete(retentionLease ->
{
Expand Down Expand Up @@ -554,7 +541,7 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat

// but we must still create a retention lease
final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
createRetentionLease.accept(createRetentionLeaseStep);
createRetentionLease(startingSeqNo, createRetentionLeaseStep);
createRetentionLeaseStep.whenComplete(retentionLease -> {
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
Expand Down Expand Up @@ -590,7 +577,8 @@ private void createRetentionLease(final long startingSeqNo, ActionListener<Reten
// it's possible that the primary has no retention lease yet if we are doing a rolling upgrade from a version before
// 7.4, and in that case we just create a lease using the local checkpoint of the safe commit which we're using for
// recovery as a conservative estimate for the global checkpoint.
assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0);
assert shard.indexSettings().getIndexVersionCreated().before(Version.V_7_4_0)
|| shard.indexSettings().isSoftDeleteEnabled() == false;
final StepListener<ReplicationResponse> addRetentionLeaseStep = new StepListener<>();
final long estimatedGlobalCheckpoint = startingSeqNo - 1;
final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(request.targetNode().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testPreferCopyCanPerformNoopRecovery() throws Exception {
assertAcked(
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f)
Expand Down Expand Up @@ -211,7 +211,7 @@ public void testFullClusterRestartPerformNoopRecovery() throws Exception {
assertAcked(
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
Expand Down Expand Up @@ -248,7 +248,7 @@ public void testPreferCopyWithHighestMatchingOperations() throws Exception {
assertAcked(
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
Expand Down Expand Up @@ -329,7 +329,7 @@ public void testPeerRecoveryForClosedIndices() throws Exception {
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build());
Expand Down
Loading

0 comments on commit cec6678

Please sign in to comment.