Skip to content

Commit

Permalink
Make peer recovery clean files step async (#43787)
Browse files Browse the repository at this point in the history
Relates #36195
  • Loading branch information
dnhatn authored Jun 29, 2019
1 parent b33ffc1 commit a452fff
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,10 @@ class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanF

@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.CLEAN_FILES, request);
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -391,54 +391,58 @@ public void receiveFileInfo(List<String> phase1FileNames,
}

@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);

if (indexShard.getRetentionLeases().leases().isEmpty()) {
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
} else {
assert indexShard.assertRetentionLeasesPersisted();
}

} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
state().getTranslog().totalOperations(totalTranslogOps);
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);

if (indexShard.getRetentionLeases().leases().isEmpty()) {
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
} else {
assert indexShard.assertRetentionLeasesPersisted();
}

} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
try {
store.removeCorruptionMarker();
} finally {
Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
try {
store.removeCorruptionMarker();
} finally {
Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
}
} catch (Exception e) {
logger.debug("Failed to clean lucene index", e);
ex.addSuppressed(e);
}
} catch (Exception e) {
logger.debug("Failed to clean lucene index", e);
ex.addSuppressed(e);
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} catch (Exception ex) {
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} finally {
store.decRef();
}
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} catch (Exception ex) {
RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex);
fail(rfe, true);
throw rfe;
} finally {
store.decRef();
}
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.List;

public interface RecoveryTargetHandler {
Expand Down Expand Up @@ -99,7 +98,7 @@ void receiveFileInfo(List<String> phase1FileNames,
* @param globalCheckpoint the global checkpoint on the primary
* @param sourceMetaData meta data of the source store
*/
void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException;
void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, ActionListener<Void> listener);

/** writes a partial file chunk to the target store */
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,15 @@ public void run() {
(indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint,
Store.MetadataSnapshot sourceMetaData) throws IOException {
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Store.MetadataSnapshot sourceMetaData, ActionListener<Void> listener) {
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, ActionListener.runAfter(listener, () -> {
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}));
}
});
future.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,9 +848,10 @@ public void indexTranslogOperations(
}

@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
blockIfNeeded(RecoveryState.Stage.INDEX);
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -189,7 +190,10 @@ public void testWriteFileChunksConcurrently() throws Exception {
for (Thread sender : senders) {
sender.join();
}
recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), sourceSnapshot);
PlainActionFuture<Void> cleanFilesFuture = new PlainActionFuture<>();
recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)),
sourceSnapshot, cleanFilesFuture);
cleanFilesFuture.actionGet();
recoveryTarget.decRef();
Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata();
Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.zip.CRC32;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -478,9 +477,9 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
between(1, 8)) {

@Override
public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier<Integer> translogOps) {
void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
phase1Called.set(true);
return super.phase1(snapshot, globalCheckpoint, translogOps);
super.phase1(snapshot, globalCheckpoint, translogOps, listener);
}

@Override
Expand Down Expand Up @@ -758,7 +757,8 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -335,9 +334,10 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
assertThat(replicaShard.getLastKnownGlobalCheckpoint(), equalTo(primaryShard.getLastKnownGlobalCheckpoint()));
}
@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
assertThat(globalCheckpoint, equalTo(primaryShard.getLastKnownGlobalCheckpoint()));
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener);
}
}, true, true);
List<IndexCommit> commits = DirectoryReader.listCommits(replicaShard.store().directory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;

Expand Down Expand Up @@ -75,8 +74,9 @@ public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileS
}

@Override
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException {
target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData);
public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData,
ActionListener<Void> listener) {
executor.execute(() -> target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener));
}

@Override
Expand Down

0 comments on commit a452fff

Please sign in to comment.