diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 6b1a893667f2c..50abeb2fb7a45 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -544,10 +544,10 @@ class CleanFilesRequestHandler implements TransportRequestHandler listener = new ChannelActionListener<>(channel, Actions.CLEAN_FILES, request); + recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), + ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE)); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index f3e10c13c21da..959c13ccb89cf 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.unit.ByteSizeValue; @@ -75,7 +76,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Supplier; +import java.util.function.IntSupplier; import java.util.stream.StreamSupport; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; @@ -160,15 +161,21 @@ public void recoverToTarget(ActionListener listener) { final long startingSeqNo; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()); - final SendFileResult sendFileResult; + + final StepListener sendFileStep = new StepListener<>(); + final StepListener prepareEngineStep = new StepListener<>(); + final StepListener sendSnapshotStep = new StepListener<>(); + final StepListener finalizeStep = new StepListener<>(); + if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); - sendFileResult = SendFileResult.EMPTY; + sendFileStep.onResponse(SendFileResult.EMPTY); } else { - final Engine.IndexCommitRef phase1Snapshot; + final Engine.IndexCommitRef safeCommitRef; try { - phase1Snapshot = shard.acquireSafeIndexCommit(); + safeCommitRef = shard.acquireSafeIndexCommit(); + resources.add(safeCommitRef); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } @@ -177,24 +184,29 @@ public void recoverToTarget(ActionListener listener) { startingSeqNo = 0; try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); - sendFileResult = phase1(phase1Snapshot.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps); + shard.store().incRef(); + final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef); + resources.add(releaseStore); + sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> { + try { + IOUtils.close(safeCommitRef, releaseStore); + } catch (final IOException ex) { + logger.warn("releasing snapshot caused exception", ex); + } + }); + phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep); } catch (final Exception e) { - throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); - } finally { - try { - IOUtils.close(phase1Snapshot); - } catch (final IOException ex) { - logger.warn("releasing snapshot caused exception", ex); - } + throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e); } } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; - final StepListener prepareEngineStep = new StepListener<>(); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); - final StepListener sendSnapshotStep = new StepListener<>(); + sendFileStep.whenComplete(r -> { + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); + }, onFailure); + prepareEngineStep.whenComplete(prepareEngineTime -> { /* * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. @@ -231,12 +243,12 @@ public void recoverToTarget(ActionListener listener) { }, onFailure); - final StepListener finalizeStep = new StepListener<>(); sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure); finalizeStep.whenComplete(r -> { final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result(); + final SendFileResult sendFileResult = sendFileStep.result(); final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, @@ -333,18 +345,17 @@ static final class SendFileResult { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier translogOps) { + void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener listener) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered - long totalSize = 0; + long totalSizeInBytes = 0; // Total size of segment files that were able to be re-used - long existingTotalSize = 0; + long existingTotalSizeInBytes = 0; final List phase1FileNames = new ArrayList<>(); final List phase1FileSizes = new ArrayList<>(); final List phase1ExistingFileNames = new ArrayList<>(); final List phase1ExistingFileSizes = new ArrayList<>(); final Store store = shard.store(); - store.incRef(); try { StopWatch stopWatch = new StopWatch().start(); final Store.MetadataSnapshot recoverySourceMetadata; @@ -370,12 +381,12 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp for (StoreFileMetaData md : diff.identical) { phase1ExistingFileNames.add(md.name()); phase1ExistingFileSizes.add(md.length()); - existingTotalSize += md.length(); + existingTotalSizeInBytes += md.length(); if (logger.isTraceEnabled()) { logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," + " size [{}]", md.name(), md.checksum(), md.length()); } - totalSize += md.length(); + totalSizeInBytes += md.length(); } List phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size()); phase1Files.addAll(diff.different); @@ -389,75 +400,33 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp } phase1FileNames.add(md.name()); phase1FileSizes.add(md.length()); - totalSize += md.length(); + totalSizeInBytes += md.length(); } logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", - phase1FileNames.size(), new ByteSizeValue(totalSize), - phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); + phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), + phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes)); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo( - phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get())); + phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt())); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps); - // Send the CLEAN_FILES request, which takes all of the files that - // were transferred and renames them from their temporary file - // names to the actual file names. It also writes checksums for - // the files after they have been renamed. - // - // Once the files have been renamed, any other files that are not - // related to this recovery (out of date segments, for example) - // are deleted - try { - cancellableThreads.executeIO(() -> - recoveryTarget.cleanFiles(translogOps.get(), globalCheckpoint, recoverySourceMetadata)); - } catch (RemoteTransportException | IOException targetException) { - final IOException corruptIndexException; - // we realized that after the index was copied and we wanted to finalize the recovery - // the index was corrupted: - // - maybe due to a broken segments file on an empty index (transferred with no checksum) - // - maybe due to old segments without checksums or length only checks - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(targetException)) != null) { - try { - final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot); - StoreFileMetaData[] metadata = - StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new); - ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first - for (StoreFileMetaData md : metadata) { - cancellableThreads.checkForCancel(); - logger.debug("checking integrity for file {} after remove corruption exception", md); - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - shard.failShard("recovery", corruptIndexException); - logger.warn("Corrupted file detected {} checksum mismatch", md); - throw corruptIndexException; - } - } - } catch (IOException ex) { - targetException.addSuppressed(ex); - throw targetException; - } - // corruption has happened on the way to replica - RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but " + - "checksums are ok", null); - exception.addSuppressed(targetException); - logger.warn(() -> new ParameterizedMessage( - "{} Remote file corruption during finalization of recovery on node {}. local checksum OK", - shard.shardId(), request.targetNode()), corruptIndexException); - throw exception; - } else { - throw targetException; - } - } + final long totalSize = totalSizeInBytes; + final long existingTotalSize = existingTotalSizeInBytes; + cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, ActionListener.map(listener, aVoid -> { + final TimeValue took = stopWatch.totalTime(); + logger.trace("recovery [phase1]: took [{}]", took); + return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames, + phase1ExistingFileSizes, existingTotalSize, took); + })); } else { logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", recoverySourceMetadata.getSyncId()); + final TimeValue took = stopWatch.totalTime(); + logger.trace("recovery [phase1]: took [{}]", took); + listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSizeInBytes, phase1ExistingFileNames, + phase1ExistingFileSizes, existingTotalSizeInBytes, took)); } - final TimeValue took = stopWatch.totalTime(); - logger.trace("recovery [phase1]: took [{}]", took); - return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames, - phase1ExistingFileSizes, existingTotalSize, took); } catch (Exception e) { - throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSize), e); - } finally { - store.decRef(); + throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), e); } } @@ -695,7 +664,7 @@ public String toString() { '}'; } - void sendFiles(Store store, StoreFileMetaData[] files, Supplier translogOps) throws Exception { + void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps) throws Exception { ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); final AtomicReference> error = new AtomicReference<>(); @@ -720,7 +689,7 @@ void sendFiles(Store store, StoreFileMetaData[] files, Supplier translo } final long requestFilePosition = position; cancellableThreads.executeIO(() -> - recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(), + recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.getAsInt(), ActionListener.wrap( r -> requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId), e -> { @@ -741,24 +710,53 @@ void sendFiles(Store store, StoreFileMetaData[] files, Supplier translo cancellableThreads.execute(() -> requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo())); } if (error.get() != null) { - handleErrorOnSendFiles(store, error.get().v1(), error.get().v2()); + handleErrorOnSendFiles(store, error.get().v2(), new StoreFileMetaData[]{error.get().v1()}); } } - private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception { - final IOException corruptIndexException; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); - failEngine(corruptIndexException); - throw corruptIndexException; + private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntSupplier translogOps, + long globalCheckpoint, ActionListener listener) { + // Send the CLEAN_FILES request, which takes all of the files that + // were transferred and renames them from their temporary file + // names to the actual file names. It also writes checksums for + // the files after they have been renamed. + // + // Once the files have been renamed, any other files that are not + // related to this recovery (out of date segments, for example) + // are deleted + cancellableThreads.execute(() -> recoveryTarget.cleanFiles(translogOps.getAsInt(), globalCheckpoint, sourceMetadata, + ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> { + StoreFileMetaData[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetaData[]::new); + ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first + handleErrorOnSendFiles(store, e, mds); + throw e; + })))); + } + + private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception { + final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); + if (corruptIndexException != null) { + Exception localException = null; + for (StoreFileMetaData md : mds) { + cancellableThreads.checkForCancel(); + logger.debug("checking integrity for file {} after remove corruption exception", md); + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); + if (localException == null) { + localException = corruptIndexException; + } + failEngine(corruptIndexException); + } + } + if (localException != null) { + throw localException; } else { // corruption has happened on the way to replica - RemoteTransportException exception = new RemoteTransportException( + RemoteTransportException remoteException = new RemoteTransportException( "File corruption occurred on recovery but checksums are ok", null); - exception.addSuppressed(e); + remoteException.addSuppressed(e); logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", - shardId, request.targetNode(), md), corruptIndexException); - throw exception; + shardId, request.targetNode(), mds), corruptIndexException); + throw remoteException; } } else { throw e; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index bbd0cea04af72..55aa5b225959c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -392,57 +392,61 @@ public void receiveFileInfo(List phase1FileNames, } @Override - public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { - state().getTranslog().totalOperations(totalTranslogOps); - // first, we go and move files that were created with the recovery id suffix to - // the actual names, its ok if we have a corrupted index here, since we have replicas - // to recover from in case of a full cluster shutdown just when this code executes... - multiFileWriter.renameAllTempFiles(); - final Store store = store(); - store.incRef(); - try { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); - if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) { - store.ensureIndexHasHistoryUUID(); - } - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); - store.associateIndexWithNewTranslog(translogUUID); - - if (indexShard.getRetentionLeases().leases().isEmpty()) { - // if empty, may be a fresh IndexShard, so write an empty leases file to disk - indexShard.persistRetentionLeases(); - assert indexShard.loadRetentionLeases().leases().isEmpty(); - } else { - assert indexShard.assertRetentionLeasesPersisted(); - } - - } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { - // this is a fatal exception at this stage. - // this means we transferred files from the remote that have not be checksummed and they are - // broken. We have to clean up this shard entirely, remove all files and bubble it up to the - // source shard since this index might be broken there as well? The Source can handle this and checks - // its content on disk if possible. + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + state().getTranslog().totalOperations(totalTranslogOps); + // first, we go and move files that were created with the recovery id suffix to + // the actual names, its ok if we have a corrupted index here, since we have replicas + // to recover from in case of a full cluster shutdown just when this code executes... + multiFileWriter.renameAllTempFiles(); + final Store store = store(); + store.incRef(); try { + store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); + if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) { + store.ensureIndexHasHistoryUUID(); + } + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), globalCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); + store.associateIndexWithNewTranslog(translogUUID); + + if (indexShard.getRetentionLeases().leases().isEmpty()) { + // if empty, may be a fresh IndexShard, so write an empty leases file to disk + indexShard.persistRetentionLeases(); + assert indexShard.loadRetentionLeases().leases().isEmpty(); + } else { + assert indexShard.assertRetentionLeasesPersisted(); + } + + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. try { - store.removeCorruptionMarker(); - } finally { - Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + try { + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); } - } catch (Exception e) { - logger.debug("Failed to clean lucene index", e); - ex.addSuppressed(e); + RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); + fail(rfe, true); + throw rfe; + } catch (Exception ex) { + RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); + fail(rfe, true); + throw rfe; + } finally { + store.decRef(); } - RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); - fail(rfe, true); - throw rfe; - } catch (Exception ex) { - RecoveryFailedException rfe = new RecoveryFailedException(state(), "failed to clean after recovery", ex); - fail(rfe, true); - throw rfe; - } finally { - store.decRef(); - } + return null; + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index d03fe42d90146..89f4cb22c2bbf 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -26,7 +26,6 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; -import java.io.IOException; import java.util.List; public interface RecoveryTargetHandler { @@ -99,7 +98,7 @@ void receiveFileInfo(List phase1FileNames, * @param globalCheckpoint the global checkpoint on the primary * @param sourceMetaData meta data of the source store */ - void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException; + void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, ActionListener listener); /** writes a partial file chunk to the target store */ void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index ec3c22d42a191..6b786fdae4d86 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -140,11 +140,13 @@ public void receiveFileInfo(List phase1FileNames, List phase1FileS } @Override - public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, + ActionListener listener) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(recoveryId, shardId, sourceMetaData, totalTranslogOps, globalCheckpoint), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), + in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index e25557eaabcf6..97e5210c9d09e 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -122,14 +122,15 @@ public void run() { (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) { @Override public void cleanFiles(int totalTranslogOps, long globalCheckpoint, - Store.MetadataSnapshot sourceMetaData) throws IOException { - super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData); - latch.countDown(); - try { - latch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); - } + Store.MetadataSnapshot sourceMetaData, ActionListener listener) { + super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, ActionListener.runAfter(listener, () -> { + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + })); } }); future.get(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index c94c289f51fb1..c60f32132c646 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -848,9 +848,10 @@ public void indexTranslogOperations( } @Override - public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, + ActionListener listener) { blockIfNeeded(RecoveryState.Stage.INDEX); - super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData); + super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index bb4c25e6186de..5a6d7fbaa176a 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -28,6 +28,7 @@ import org.apache.lucene.store.IndexInput; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; @@ -189,7 +190,10 @@ public void testWriteFileChunksConcurrently() throws Exception { for (Thread sender : senders) { sender.join(); } - recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), sourceSnapshot); + PlainActionFuture cleanFilesFuture = new PlainActionFuture<>(); + recoveryTarget.cleanFiles(0, Long.parseLong(sourceSnapshot.getCommitUserData().get(SequenceNumbers.MAX_SEQ_NO)), + sourceSnapshot, cleanFilesFuture); + cleanFilesFuture.actionGet(); recoveryTarget.decRef(); Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata(); Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index b00e89575ccd5..b69033ba9b4c4 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -98,7 +98,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntSupplier; -import java.util.function.Supplier; import java.util.zip.CRC32; import static java.util.Collections.emptyMap; @@ -478,9 +477,9 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE between(1, 8)) { @Override - public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckpoint, final Supplier translogOps) { + void phase1(IndexCommit snapshot, long globalCheckpoint, IntSupplier translogOps, ActionListener listener) { phase1Called.set(true); - return super.phase1(snapshot, globalCheckpoint, translogOps); + super.phase1(snapshot, globalCheckpoint, translogOps, listener); } @Override @@ -758,7 +757,8 @@ public void receiveFileInfo(List phase1FileNames, List phase1FileS } @Override - public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) { + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, + ActionListener listener) { } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index c3f6a3aae89fb..28e84c1210a29 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -47,7 +47,6 @@ import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; -import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -335,9 +334,10 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra assertThat(replicaShard.getLastKnownGlobalCheckpoint(), equalTo(primaryShard.getLastKnownGlobalCheckpoint())); } @Override - public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, + ActionListener listener) { assertThat(globalCheckpoint, equalTo(primaryShard.getLastKnownGlobalCheckpoint())); - super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData); + super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener); } }, true, true); List commits = DirectoryReader.listCommits(replicaShard.store().directory()); diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java index d5a7ab8109e12..cf2b768f46d85 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java @@ -29,7 +29,6 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; -import java.io.IOException; import java.util.List; import java.util.concurrent.Executor; @@ -75,8 +74,9 @@ public void receiveFileInfo(List phase1FileNames, List phase1FileS } @Override - public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData) throws IOException { - target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData); + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, + ActionListener listener) { + executor.execute(() -> target.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, listener)); } @Override