diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index cdb4082b82e70..8598917b82313 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -59,17 +59,18 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.FutureTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.List; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -142,6 +143,8 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) { // create a new recovery status, and process... final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout()); + // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause + // assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool. threadPool.generic().execute(new RecoveryRunner(recoveryId)); } @@ -165,17 +168,16 @@ private void retryRecovery(final long recoveryId, final TimeValue retryAfter, fi private void doRecovery(final long recoveryId) { final StartRecoveryRequest request; - final CancellableThreads cancellableThreads; final RecoveryState.Timer timer; - + CancellableThreads cancellableThreads; try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { if (recoveryRef == null) { logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId); return; } final RecoveryTarget recoveryTarget = recoveryRef.target(); - cancellableThreads = recoveryTarget.cancellableThreads(); timer = recoveryTarget.state().getTimer(); + cancellableThreads = recoveryTarget.cancellableThreads(); try { assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; request = getStartRecoveryRequest(recoveryTarget); @@ -189,51 +191,11 @@ private void doRecovery(final long recoveryId) { return; } } - - try { - logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode()); - final AtomicReference responseHolder = new AtomicReference<>(); - cancellableThreads.execute(() -> responseHolder.set( - transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request, - new FutureTransportResponseHandler() { - @Override - public RecoveryResponse read(StreamInput in) throws IOException { - RecoveryResponse recoveryResponse = new RecoveryResponse(); - recoveryResponse.readFrom(in); - return recoveryResponse; - } - }).txGet())); - final RecoveryResponse recoveryResponse = responseHolder.get(); - final TimeValue recoveryTime = new TimeValue(timer.time()); - // do this through ongoing recoveries to remove it from the collection - onGoingRecoveries.markRecoveryAsDone(recoveryId); - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id()) - .append("] "); - sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n"); - sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with " + - "total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]") - .append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append - (timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']') - .append("\n"); - sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with " + - "total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n"); - sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n"); - sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log " + - "operations") - .append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]") - .append("\n"); - logger.trace("{}", sb); - } else { - logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), recoveryTime); - } - } catch (CancellableThreads.ExecutionCancelledException e) { - logger.trace("recovery cancelled", e); - } catch (Exception e) { + Consumer handleException = e -> { if (logger.isTraceEnabled()) { logger.trace(() -> new ParameterizedMessage( - "[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(), request.shardId().id()), e); + "[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(), + request.shardId().id()), e); } Throwable cause = ExceptionsHelper.unwrapCause(e); if (cause instanceof CancellableThreads.ExecutionCancelledException) { @@ -267,14 +229,16 @@ public RecoveryResponse read(StreamInput in) throws IOException { } if (cause instanceof DelayRecoveryException) { - retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout()); + retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), + recoverySettings.activityTimeout()); return; } if (cause instanceof ConnectTransportException) { logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(), recoverySettings.retryDelayNetwork(), cause.getMessage()); - retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), recoverySettings.activityTimeout()); + retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), + recoverySettings.activityTimeout()); return; } @@ -285,6 +249,71 @@ public RecoveryResponse read(StreamInput in) throws IOException { } onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true); + }; + + try { + logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode()); + cancellableThreads.executeIO(() -> + // we still execute under cancelableThreads here to ensure we interrupt any blocking call to the network if any + // on the underlying transport. It's unclear if we need this here at all after moving to async execution but + // the issues that a missing call to this could cause are sneaky and hard to debug. If we don't need it on this + // call we can potentially remove it altogether which we should do it in a major release only with enough + // time to test. This shoudl be done for 7.0 if possible + transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request, + new TransportResponseHandler() { + @Override + public void handleResponse(RecoveryResponse recoveryResponse) { + final TimeValue recoveryTime = new TimeValue(timer.time()); + // do this through ongoing recoveries to remove it from the collection + onGoingRecoveries.markRecoveryAsDone(recoveryId); + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append('[').append(request.shardId().getIndex().getName()).append(']') + .append('[').append(request.shardId().id()).append("] "); + sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime) + .append("]\n"); + sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]") + .append(" with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]") + .append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [") + .append(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']').append("\n"); + sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()) + .append("] with total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)) + .append("]\n"); + sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n"); + sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]") + .append(" transaction log operations") + .append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]") + .append("\n"); + logger.trace("{}", sb); + } else { + logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), + recoveryTime); + } + } + + @Override + public void handleException(TransportException e) { + handleException.accept(e); + } + + @Override + public String executor() { + // we do some heavy work like refreshes in the response so fork off to the generic threadpool + return ThreadPool.Names.GENERIC; + } + + @Override + public RecoveryResponse read(StreamInput in) throws IOException { + RecoveryResponse recoveryResponse = new RecoveryResponse(); + recoveryResponse.readFrom(in); + return recoveryResponse; + } + }) + ); + } catch (CancellableThreads.ExecutionCancelledException e) { + logger.trace("recovery cancelled", e); + } catch (Exception e) { + handleException.accept(e); } } @@ -632,5 +661,4 @@ public void doRun() { doRecovery(recoveryId); } } - }