diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index c3783c3672198..8b681dee0c939 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -33,6 +33,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -66,6 +67,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -146,8 +148,10 @@ public void recoverToTarget(ActionListener listener) { IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); throw e; }); - final Consumer onFailure = e -> + final Consumer onFailure = e -> { + Transports.assertNotTransportThread("failure of recovery from " + shard.routingEntry() + " to " + request.targetNode()); IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); + }; runUnderPrimaryPermit(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); @@ -208,7 +212,9 @@ public void recoverToTarget(ActionListener listener) { // If the target previously had a copy of this shard then a file-based recovery might move its global // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a // new one later on in the recovery. - shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), deleteRetentionLeaseStep); + shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, + deleteRetentionLeaseStep, false)); } catch (RetentionLeaseNotFoundException e) { logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); deleteRetentionLeaseStep.onResponse(null); @@ -220,6 +226,7 @@ public void recoverToTarget(ActionListener listener) { } deleteRetentionLeaseStep.whenComplete(ignored -> { + Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep); }, onFailure); @@ -233,17 +240,18 @@ public void recoverToTarget(ActionListener listener) { if (shard.indexSettings().isSoftDeleteEnabled() && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { runUnderPrimaryPermit(() -> { - try { - // conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate - final long globalCheckpoint = startingSeqNo - 1; - // blindly create the lease. TODO integrate this with the recovery process - shard.addPeerRecoveryRetentionLease( - request.targetNode().getId(), globalCheckpoint, establishRetentionLeaseStep); - } catch (RetentionLeaseAlreadyExistsException e) { - logger.debug("peer-recovery retention lease already exists", e); - establishRetentionLeaseStep.onResponse(null); - } - }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", + try { + // conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate + final long globalCheckpoint = startingSeqNo - 1; + // blindly create the lease. TODO integrate this with the recovery process + shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint, + new ThreadedActionListener<>(logger, shard.getThreadPool(), + ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false)); + } catch (RetentionLeaseAlreadyExistsException e) { + logger.debug("peer-recovery retention lease already exists", e); + establishRetentionLeaseStep.onResponse(null); + } + }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, logger); } else { establishRetentionLeaseStep.onResponse(null); @@ -251,12 +259,14 @@ public void recoverToTarget(ActionListener listener) { }, onFailure); establishRetentionLeaseStep.whenComplete(r -> { + Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); }, onFailure); prepareEngineStep.whenComplete(prepareEngineTime -> { + Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); /* * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. * This means that any document indexed into the primary after this will be replicated to this replica as well