Skip to content

Commit

Permalink
Return recovery to generic thread post-PRRL action (elastic#44000)
Browse files Browse the repository at this point in the history
Today we perform `TransportReplicationAction` derivatives during recovery, and
these actions call their response handlers on the transport thread. This change
moves the continued execution of the recovery back onto the generic threadpool.
  • Loading branch information
DaveCTurner committed Jul 5, 2019
1 parent a4d5cf1 commit 5dd6c68
Showing 1 changed file with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transports;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -146,8 +148,10 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e ->
final Consumer<Exception> onFailure = e -> {
Transports.assertNotTransportThread("failure of recovery from " + shard.routingEntry() + " to " + request.targetNode());
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
};

runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
Expand Down Expand Up @@ -208,7 +212,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
// If the target previously had a copy of this shard then a file-based recovery might move its global
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
// new one later on in the recovery.
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), deleteRetentionLeaseStep);
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
deleteRetentionLeaseStep, false));
} catch (RetentionLeaseNotFoundException e) {
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
deleteRetentionLeaseStep.onResponse(null);
Expand All @@ -220,6 +226,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
}

deleteRetentionLeaseStep.whenComplete(ignored -> {
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
}, onFailure);

Expand All @@ -233,30 +240,33 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
if (shard.indexSettings().isSoftDeleteEnabled()
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
runUnderPrimaryPermit(() -> {
try {
// conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate
final long globalCheckpoint = startingSeqNo - 1;
// blindly create the lease. TODO integrate this with the recovery process
shard.addPeerRecoveryRetentionLease(
request.targetNode().getId(), globalCheckpoint, establishRetentionLeaseStep);
} catch (RetentionLeaseAlreadyExistsException e) {
logger.debug("peer-recovery retention lease already exists", e);
establishRetentionLeaseStep.onResponse(null);
}
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",
try {
// conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate
final long globalCheckpoint = startingSeqNo - 1;
// blindly create the lease. TODO integrate this with the recovery process
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint,
new ThreadedActionListener<>(logger, shard.getThreadPool(),
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
} catch (RetentionLeaseAlreadyExistsException e) {
logger.debug("peer-recovery retention lease already exists", e);
establishRetentionLeaseStep.onResponse(null);
}
}, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]",
shard, cancellableThreads, logger);
} else {
establishRetentionLeaseStep.onResponse(null);
}
}, onFailure);

establishRetentionLeaseStep.whenComplete(r -> {
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
}, onFailure);

prepareEngineStep.whenComplete(prepareEngineTime -> {
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");
/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
* This means that any document indexed into the primary after this will be replicated to this replica as well
Expand Down

0 comments on commit 5dd6c68

Please sign in to comment.