Skip to content

Commit

Permalink
Don't block on peer recovery on the target side.
Browse files Browse the repository at this point in the history
  • Loading branch information
kovrus committed Sep 12, 2019
1 parent b77b1c2 commit 6d96ee4
Showing 1 changed file with 155 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;

Expand Down Expand Up @@ -145,16 +146,25 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
// we fork off quickly here and go async but this is called from the cluster state applier
// thread too and that can cause assertions to trip if we executed it on the same thread
// hence we fork off to the generic threadpool.
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}

protected void retryRecovery(final long recoveryId, final Throwable reason, TimeValue retryAfter, TimeValue activityTimeout) {
private void retryRecovery(final long recoveryId,
final Throwable reason,
TimeValue retryAfter,
TimeValue activityTimeout) {
logger.trace(() -> new ParameterizedMessage(
"will retry recovery with id [{}] in [{}]", recoveryId, retryAfter), reason);
retryRecovery(recoveryId, retryAfter, activityTimeout);
}

protected void retryRecovery(final long recoveryId, final String reason, TimeValue retryAfter, TimeValue activityTimeout) {
private void retryRecovery(final long recoveryId,
final String reason,
TimeValue retryAfter,
TimeValue activityTimeout) {
logger.trace("will retry recovery with id [{}] in [{}] (reason [{}])", recoveryId, retryAfter, reason);
retryRecovery(recoveryId, retryAfter, activityTimeout);
}
Expand All @@ -168,17 +178,16 @@ private void retryRecovery(final long recoveryId, final TimeValue retryAfter, fi

private void doRecovery(final long recoveryId) {
final StartRecoveryRequest request;
final CancellableThreads cancellableThreads;
final RecoveryState.Timer timer;

CancellableThreads cancellableThreads;
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
return;
}
final RecoveryTarget recoveryTarget = recoveryRef.target();
cancellableThreads = recoveryTarget.cancellableThreads();
timer = recoveryTarget.state().getTimer();
cancellableThreads = recoveryTarget.cancellableThreads();
try {
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
request = getStartRecoveryRequest(recoveryTarget);
Expand All @@ -188,59 +197,24 @@ private void doRecovery(final long recoveryId) {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);
new RecoveryFailedException(recoveryTarget.state(),
"failed to prepare shard for recovery",
e), true);
return;
}
}

try {
logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode());
final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>();
cancellableThreads.execute(() -> responseHolder.set(
transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse read(StreamInput in) throws IOException {
return new RecoveryResponse(in);
}
}).txGet()));
final RecoveryResponse recoveryResponse = responseHolder.get();
final TimeValue recoveryTime = new TimeValue(timer.time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryId);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id())
.append("] ");
sb.append("recovery completed from ").append(request.sourceNode()).append(", took[").append(recoveryTime).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryResponse.phase1FileNames.size()).append("]").append(" with " +
"total_size of [").append(new ByteSizeValue(recoveryResponse.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryResponse.phase1Time)).append("], throttling_wait [").append
(timeValueMillis(recoveryResponse.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" : reusing_files [").append(recoveryResponse.phase1ExistingFileNames.size()).append("] with " +
"total_size of [").append(new ByteSizeValue(recoveryResponse.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: start took [").append(timeValueMillis(recoveryResponse.startTime)).append("]\n");
sb.append(" : recovered [").append(recoveryResponse.phase2Operations).append("]").append(" transaction log " +
"operations")
.append(", took [").append(timeValueMillis(recoveryResponse.phase2Time)).append("]")
.append("\n");
logger.trace("{}", sb);
} else {
logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), recoveryTime);
}
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace("recovery cancelled", e);
} catch (Exception e) {
Consumer<Exception> handleException = e -> {
if (logger.isTraceEnabled()) {
logger.trace(() -> new ParameterizedMessage(
"[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(), request.shardId().id()), e);
"[{}][{}] Got exception on recovery", request.shardId().getIndex().getName(),
request.shardId().id()), e);
}
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
// this can also come from the source wrapped in a RemoteTransportException
onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request,
"source has canceled the recovery", cause), false);
"source has canceled the recovery",
cause), false);
return;
}
if (cause instanceof RecoveryEngineException) {
Expand Down Expand Up @@ -268,24 +242,135 @@ public RecoveryResponse read(StreamInput in) throws IOException {
}

if (cause instanceof DelayRecoveryException) {
retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), recoverySettings.activityTimeout());
retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(),
recoverySettings.activityTimeout());
return;
}

if (cause instanceof ConnectTransportException) {
logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(),
recoverySettings.retryDelayNetwork(), cause.getMessage());
retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), recoverySettings.activityTimeout());
recoverySettings.retryDelayNetwork(), cause.getMessage());
retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(),
recoverySettings.activityTimeout());
return;
}

if (cause instanceof AlreadyClosedException) {
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(request, "source shard is closed", cause), false);
new RecoveryFailedException(request, "source shard is closed", cause),
false);
return;
}

onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true);
};

try {
logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode());
cancellableThreads.executeIO(
() ->
// we still execute under cancelableThreads here to ensure we interrupt any blocking call to the network if any
// on the underlying transport. It's unclear if we need this here at all after moving to async execution but
// the issues that a missing call to this could cause are sneaky and hard to debug. If we don't need it on this
// call we can potentially remove it altogether which we should do it in a major release only with enough
// time to test. This shoudl be done for 7.0 if possible
transportService.submitRequest(
request.sourceNode(),
PeerRecoverySourceService.Actions.START_RECOVERY,
request,
new TransportResponseHandler<RecoveryResponse>() {
@Override
public void handleResponse(
RecoveryResponse recoveryResponse) {
final TimeValue recoveryTime = new TimeValue(
timer.time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(
recoveryId);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(request.shardId().getIndex().getName()).append(
']')
.append('[').append(request.shardId().id()).append(
"] ");
sb.append(
"recovery completed from ").append(
request.sourceNode()).append(
", took[").append(
recoveryTime)
.append("]\n");
sb.append(
" phase1: recovered_files [").append(
recoveryResponse.phase1FileNames.size()).append(
"]")
.append(
" with total_size of [").append(
new ByteSizeValue(
recoveryResponse.phase1TotalSize)).append(
"]")
.append(", took [").append(
timeValueMillis(
recoveryResponse.phase1Time)).append(
"], throttling_wait [")
.append(timeValueMillis(
recoveryResponse.phase1ThrottlingWaitTime)).append(
']').append("\n");
sb.append(
" : reusing_files [").append(
recoveryResponse.phase1ExistingFileNames.size())
.append(
"] with total_size of [").append(
new ByteSizeValue(
recoveryResponse.phase1ExistingTotalSize))
.append("]\n");
sb.append(
" phase2: start took [").append(
timeValueMillis(
recoveryResponse.startTime)).append(
"]\n");
sb.append(
" : recovered [").append(
recoveryResponse.phase2Operations).append(
"]")
.append(
" transaction log operations")
.append(", took [").append(
timeValueMillis(
recoveryResponse.phase2Time)).append(
"]")
.append("\n");
logger.trace("{}", sb);
} else {
logger.debug(
"{} recovery done from [{}], took [{}]",
request.shardId(),
request.sourceNode(),
recoveryTime);
}
}

@Override
public void handleException(
TransportException e) {
handleException.accept(e);
}

@Override
public String executor() {
// we do some heavy work like refreshes in the response so fork off to the generic threadpool
return ThreadPool.Names.GENERIC;
}

@Override
public RecoveryResponse read(StreamInput in) throws IOException {
return new RecoveryResponse(in);
}
})
);
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace("recovery cancelled", e);
} catch (Exception e) {
handleException.accept(e);
}
}

Expand Down Expand Up @@ -476,7 +561,7 @@ public void onNewClusterState(ClusterState state) {
}
}

protected void onFailure(Exception e) {
void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
Expand All @@ -503,14 +588,18 @@ public void onTimeout(TimeValue timeout) {

private void waitForClusterState(long clusterStateVersion) {
final ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, TimeValue.timeValueMinutes(5), logger,
ClusterStateObserver observer = new ClusterStateObserver(
clusterState,
clusterService,
TimeValue.timeValueMinutes(5),
logger,
threadPool.getThreadContext());
if (clusterState.getVersion() >= clusterStateVersion) {
logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion,
clusterState.getVersion());
return;
logger.trace("node has cluster state with version higher than {} (current: {})",
clusterStateVersion, clusterState.getVersion());
} else {
logger.trace("waiting for cluster state version {} (current: {})", clusterStateVersion, clusterState.getVersion());
logger.trace("waiting for cluster state version {} (current: {})",
clusterStateVersion, clusterState.getVersion());
final PlainActionFuture<Long> future = new PlainActionFuture<>();
observer.waitForNextChange(new ClusterStateObserver.Listener() {

Expand Down Expand Up @@ -621,10 +710,12 @@ class RecoveryRunner extends AbstractRunnable {
public void onFailure(Exception e) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
if (recoveryRef != null) {
logger.error(() -> new ParameterizedMessage("unexpected error during recovery [{}], failing shard", recoveryId), e);
onGoingRecoveries.failRecovery(recoveryId,
new RecoveryFailedException(recoveryRef.target().state(), "unexpected error", e),
true // be safe
logger.error(() -> new ParameterizedMessage(
"unexpected error during recovery [{}], failing shard", recoveryId), e);
onGoingRecoveries.failRecovery(
recoveryId,
new RecoveryFailedException(recoveryRef.target().state(), "unexpected error", e),
true // be safe
);
} else {
logger.debug(() -> new ParameterizedMessage(
Expand All @@ -638,5 +729,4 @@ public void doRun() {
doRecovery(recoveryId);
}
}

}

0 comments on commit 6d96ee4

Please sign in to comment.