Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not send recovery requests with CancellableThreads #46287

Merged
merged 2 commits into from
Sep 4, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,9 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
final StepListener<Void> sendFilesStep = new StepListener<>();
final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
final StepListener<Void> cleanFilesStep = new StepListener<>();
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep));
cancellableThreads.checkForCancel();
recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,
phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);

sendFileInfoStep.whenComplete(r ->
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure);
Expand Down Expand Up @@ -634,8 +634,8 @@ void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> li
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
logger.trace("recovery [phase1]: prepare remote engine for translog");
cancellableThreads.execute(() ->
recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener));
cancellableThreads.checkForCancel();
recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener);
}

/**
Expand Down Expand Up @@ -741,30 +741,29 @@ private void sendBatch(
final List<Translog.Operation> operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {
cancellableThreads.execute(() -> {
recoveryTarget.indexTranslogOperations(
operations,
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
nextBatch,
false,
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
listener);
},
listener::onFailure
));
});
cancellableThreads.checkForCancel();
recoveryTarget.indexTranslogOperations(
operations,
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
nextBatch,
false,
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
listener);
},
listener::onFailure
));
} else {
listener.onResponse(targetLocalCheckpoint);
}
Expand All @@ -787,7 +786,8 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener));
cancellableThreads.checkForCancel();
recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener);
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
Expand Down Expand Up @@ -894,8 +894,9 @@ protected FileChunk nextChunkRequest(StoreFileMetaData md) throws IOException {

@Override
protected void sendChunkRequest(FileChunk request, ActionListener<Void> listener) {
cancellableThreads.execute(() -> recoveryTarget.writeFileChunk(
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener));
cancellableThreads.checkForCancel();
recoveryTarget.writeFileChunk(
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener);
}

@Override
Expand All @@ -922,13 +923,14 @@ private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntS
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
cancellableThreads.checkForCancel();
recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata,
ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new);
ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
handleErrorOnSendFiles(store, e, mds);
throw e;
}))));
})));
}

private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception {
Expand Down