From ab2a145c266ae7db711e80820c327ef3913b11c3 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 20 Jun 2022 09:18:52 -0700 Subject: [PATCH] Cherry-picking https://github.com/opensearch-project/OpenSearch/pull/3525. Signed-off-by: Rishikesh1159 --- .../indices/RunUnderPrimaryPermit.java | 72 +++++ .../indices/recovery/FileChunkWriter.java | 31 +++ .../recovery/RecoverySourceHandler.java | 259 +++--------------- .../recovery/RecoveryTargetHandler.java | 14 +- .../recovery/RemoteRecoveryTargetHandler.java | 81 ++---- .../recovery/RetryableTransportClient.java | 2 +- .../replication/GetSegmentFilesRequest.java | 4 + .../OngoingSegmentReplications.java | 230 ++++++++++++++++ .../RemoteSegmentFileChunkWriter.java | 125 +++++++++ .../SegmentFileTransferHandler.java | 239 ++++++++++++++++ .../SegmentReplicationSourceHandler.java | 170 ++++++++++++ .../SegmentReplicationSourceService.java | 140 +++++----- .../indices/replication/common/CopyState.java | 16 +- .../SegmentReplicationTransportRequest.java | 25 ++ .../main/java/org/opensearch/node/Node.java | 7 + .../recovery/RecoverySourceHandlerTests.java | 38 +-- .../OngoingSegmentReplicationsTests.java | 231 ++++++++++++++++ .../SegmentFileTransferHandlerTests.java | 251 +++++++++++++++++ .../SegmentReplicationSourceHandlerTests.java | 193 +++++++++++++ .../SegmentReplicationSourceServiceTests.java | 104 ++++--- .../replication/common/CopyStateTests.java | 10 +- 21 files changed, 1812 insertions(+), 430 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java create mode 100644 server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java diff --git a/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java b/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java new file mode 100644 index 0000000000000..29cac1601dc67 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.FutureUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardRelocatedException; +import org.opensearch.threadpool.ThreadPool; + +import java.util.concurrent.CompletableFuture; + +/** + * Execute a Runnable after acquiring the primary's operation permit. + * + * @opensearch.internal + */ +public final class RunUnderPrimaryPermit { + + public static void run( + CancellableThreads.Interruptible runnable, + String reason, + IndexShard primary, + CancellableThreads cancellableThreads, + Logger logger + ) { + cancellableThreads.execute(() -> { + CompletableFuture permit = new CompletableFuture<>(); + final ActionListener onAcquired = new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + if (permit.complete(releasable) == false) { + releasable.close(); + } + } + + @Override + public void onFailure(Exception e) { + permit.completeExceptionally(e); + } + }; + primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); + try (Releasable ignored = FutureUtils.get(permit)) { + // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent + // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated() + if (primary.isRelocatedPrimary()) { + throw new IndexShardRelocatedException(primary.shardId()); + } + runnable.run(); + } finally { + // just in case we got an exception (likely interrupted) while waiting for the get + permit.whenComplete((r, e) -> { + if (r != null) { + r.close(); + } + if (e != null) { + logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); + } + }); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java new file mode 100644 index 0000000000000..cb43af3b82e09 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.action.ActionListener; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.index.store.StoreFileMetadata; + +/** + * Writes a partial file chunk to the target store. + * + * @opensearch.internal + */ +@FunctionalInterface +public interface FileChunkWriter { + + void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ); +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 0870fd4ca9295..9e219db5a4c96 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -33,17 +33,13 @@ package org.opensearch.indices.recovery; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.SetOnce; -import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; @@ -55,13 +51,10 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.StopWatch; -import org.opensearch.common.bytes.BytesArray; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; -import org.opensearch.common.lucene.store.InputStreamIndexInput; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CancellableThreads; @@ -77,26 +70,22 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; -import org.opensearch.index.shard.IndexShardRelocatedException; import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.indices.replication.SegmentFileTransferHandler; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.Transports; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.Deque; import java.util.List; import java.util.Locale; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -128,13 +117,13 @@ public class RecoverySourceHandler { private final StartRecoveryRequest request; private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; - private final int maxConcurrentFileChunks; private final int maxConcurrentOperations; private final ThreadPool threadPool; private final CancellableThreads cancellableThreads = new CancellableThreads(); private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); public static final String PEER_RECOVERY_NAME = "peer-recovery"; + private final SegmentFileTransferHandler transferHandler; public RecoverySourceHandler( IndexShard shard, @@ -145,15 +134,24 @@ public RecoverySourceHandler( int maxConcurrentFileChunks, int maxConcurrentOperations ) { + this.logger = Loggers.getLogger(RecoverySourceHandler.class, request.shardId(), "recover to " + request.targetNode().getName()); + this.transferHandler = new SegmentFileTransferHandler( + shard, + request.targetNode(), + recoveryTarget, + logger, + threadPool, + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); this.shard = shard; - this.recoveryTarget = recoveryTarget; this.threadPool = threadPool; this.request = request; + this.recoveryTarget = recoveryTarget; this.shardId = this.request.shardId().id(); - this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName()); this.chunkSizeInBytes = fileChunkSizeInBytes; // if the target is on an old version, it won't be able to handle out-of-order file chunks. - this.maxConcurrentFileChunks = maxConcurrentFileChunks; this.maxConcurrentOperations = maxConcurrentOperations; } @@ -192,7 +190,7 @@ public void recoverToTarget(ActionListener listener) { final SetOnce retentionLeaseRef = new SetOnce<>(); - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); if (targetShardRouting == null) { @@ -286,7 +284,7 @@ && isTargetSameHistory() }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { try { // If the target previously had a copy of this shard then a file-based recovery might move its global // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a @@ -332,7 +330,7 @@ && isTargetSameHistory() * make sure to do this before sampling the max sequence number in the next step, to ensure that we send * all documents up to maxSeqNo in phase2. */ - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.initiateTracking(request.targetAllocationId()), shardId + " initiating tracking of " + request.targetAllocationId(), shard, @@ -420,50 +418,6 @@ private int countNumberOfHistoryOperations(long startingSeqNo) throws IOExceptio return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE); } - static void runUnderPrimaryPermit( - CancellableThreads.Interruptible runnable, - String reason, - IndexShard primary, - CancellableThreads cancellableThreads, - Logger logger - ) { - cancellableThreads.execute(() -> { - CompletableFuture permit = new CompletableFuture<>(); - final ActionListener onAcquired = new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - if (permit.complete(releasable) == false) { - releasable.close(); - } - } - - @Override - public void onFailure(Exception e) { - permit.completeExceptionally(e); - } - }; - primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); - try (Releasable ignored = FutureUtils.get(permit)) { - // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent - // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated() - if (primary.isRelocatedPrimary()) { - throw new IndexShardRelocatedException(primary.shardId()); - } - runnable.run(); - } finally { - // just in case we got an exception (likely interrupted) while waiting for the get - permit.whenComplete((r, e) -> { - if (r != null) { - r.close(); - } - if (e != null) { - logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); - } - }); - } - }); - } - /** * Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool. * We must never release the store using an interruptible thread as we can risk invalidating the node lock. @@ -708,8 +662,19 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } } + void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { + final MultiChunkTransfer transfer = transferHandler.createTransfer( + store, + files, + translogOps, + listener + ); + resources.add(transfer); + transfer.start(); + } + void createRetentionLease(final long startingSeqNo, ActionListener listener) { - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local // checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an // existing lease we (approximately) know that all our peers are also retaining history as requested by the cloned lease. If @@ -983,7 +948,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire * the permit then the state of the shard will be relocated and this recovery will fail. */ - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), shardId + " marking " + request.targetAllocationId() + " as in sync", shard, @@ -995,7 +960,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis cancellableThreads.checkForCancel(); recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener); finalizeListener.whenComplete(r -> { - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, @@ -1056,121 +1021,6 @@ public String toString() { + '}'; } - /** - * A file chunk from the recovery source - * - * @opensearch.internal - */ - private static class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable { - final StoreFileMetadata md; - final BytesReference content; - final long position; - final boolean lastChunk; - final Releasable onClose; - - FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) { - this.md = md; - this.content = content; - this.position = position; - this.lastChunk = lastChunk; - this.onClose = onClose; - } - - @Override - public boolean lastChunk() { - return lastChunk; - } - - @Override - public void close() { - onClose.close(); - } - } - - void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { - ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first - - final MultiChunkTransfer multiFileSender = new MultiChunkTransfer( - logger, - threadPool.getThreadContext(), - listener, - maxConcurrentFileChunks, - Arrays.asList(files) - ) { - - final Deque buffers = new ConcurrentLinkedDeque<>(); - InputStreamIndexInput currentInput = null; - long offset = 0; - - @Override - protected void onNewResource(StoreFileMetadata md) throws IOException { - offset = 0; - IOUtils.close(currentInput, () -> currentInput = null); - final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); - currentInput = new InputStreamIndexInput(indexInput, md.length()) { - @Override - public void close() throws IOException { - IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop - } - }; - } - - private byte[] acquireBuffer() { - final byte[] buffer = buffers.pollFirst(); - if (buffer != null) { - return buffer; - } - return new byte[chunkSizeInBytes]; - } - - @Override - protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { - assert Transports.assertNotTransportThread("read file chunk"); - cancellableThreads.checkForCancel(); - final byte[] buffer = acquireBuffer(); - final int bytesRead = currentInput.read(buffer); - if (bytesRead == -1) { - throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name()); - } - final boolean lastChunk = offset + bytesRead == md.length(); - final FileChunk chunk = new FileChunk( - md, - new BytesArray(buffer, 0, bytesRead), - offset, - lastChunk, - () -> buffers.addFirst(buffer) - ); - offset += bytesRead; - return chunk; - } - - @Override - protected void executeChunkRequest(FileChunk request, ActionListener listener) { - cancellableThreads.checkForCancel(); - recoveryTarget.writeFileChunk( - request.md, - request.position, - request.content, - request.lastChunk, - translogOps.getAsInt(), - ActionListener.runBefore(listener, request::close) - ); - } - - @Override - protected void handleError(StoreFileMetadata md, Exception e) throws Exception { - handleErrorOnSendFiles(store, e, new StoreFileMetadata[] { md }); - } - - @Override - public void close() throws IOException { - IOUtils.close(currentInput, () -> currentInput = null); - } - }; - resources.add(multiFileSender); - multiFileSender.start(); - } - private void cleanFiles( Store store, Store.MetadataSnapshot sourceMetadata, @@ -1194,52 +1044,9 @@ private void cleanFiles( ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> { StoreFileMetadata[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetadata[]::new); ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetadata::length)); // check small files first - handleErrorOnSendFiles(store, e, mds); + transferHandler.handleErrorOnSendFiles(store, e, mds); throw e; })) ); } - - private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception { - final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[handle error on send/clean files]"); - if (corruptIndexException != null) { - Exception localException = null; - for (StoreFileMetadata md : mds) { - cancellableThreads.checkForCancel(); - logger.debug("checking integrity for file {} after remove corruption exception", md); - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); - if (localException == null) { - localException = corruptIndexException; - } - failEngine(corruptIndexException); - } - } - if (localException != null) { - throw localException; - } else { // corruption has happened on the way to replica - RemoteTransportException remoteException = new RemoteTransportException( - "File corruption occurred on recovery but checksums are ok", - null - ); - remoteException.addSuppressed(e); - logger.warn( - () -> new ParameterizedMessage( - "{} Remote file corruption on node {}, recovering {}. local checksum OK", - shardId, - request.targetNode(), - mds - ), - corruptIndexException - ); - throw remoteException; - } - } - throw e; - } - - protected void failEngine(IOException cause) { - shard.failShard("recovery", cause); - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index 84b6ec170d3f7..c750c0e88364b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -32,11 +32,9 @@ package org.opensearch.indices.recovery; import org.opensearch.action.ActionListener; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.store.Store; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import java.util.List; @@ -46,7 +44,7 @@ * * @opensearch.internal */ -public interface RecoveryTargetHandler { +public interface RecoveryTargetHandler extends FileChunkWriter { /** * Prepares the target to receive translog operations, after all file have been copied @@ -123,15 +121,5 @@ void receiveFileInfo( */ void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata, ActionListener listener); - /** writes a partial file chunk to the target store */ - void writeFileChunk( - StoreFileMetadata fileMetadata, - long position, - BytesReference content, - boolean lastChunk, - int totalTranslogOps, - ActionListener listener - ); - default void cancel() {} } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index ab6466feb11f8..e7ae62c1bee7d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -34,8 +34,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.RateLimiter; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesReference; @@ -46,12 +44,12 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.RemoteSegmentFileChunkWriter; import org.opensearch.transport.EmptyTransportResponseHandler; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -72,13 +70,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final RecoverySettings recoverySettings; private final TransportRequestOptions translogOpsRequestOptions; - private final TransportRequestOptions fileChunkRequestOptions; - private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final AtomicLong requestSeqNoGenerator = new AtomicLong(0); - - private final Consumer onSourceThrottle; private final RetryableTransportClient retryableTransportClient; + private final RemoteSegmentFileChunkWriter fileChunkWriter; public RemoteRecoveryTargetHandler( long recoveryId, @@ -102,15 +97,19 @@ public RemoteRecoveryTargetHandler( this.shardId = shardId; this.targetNode = targetNode; this.recoverySettings = recoverySettings; - this.onSourceThrottle = onSourceThrottle; this.translogOpsRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionLongTimeout()) .build(); - this.fileChunkRequestOptions = TransportRequestOptions.builder() - .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); + this.fileChunkWriter = new RemoteSegmentFileChunkWriter( + recoveryId, + recoverySettings, + retryableTransportClient, + shardId, + PeerRecoveryTargetService.Actions.FILE_CHUNK, + requestSeqNoGenerator, + onSourceThrottle + ); } public DiscoveryNode targetNode() { @@ -235,6 +234,11 @@ public void cleanFiles( retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } + @Override + public void cancel() { + retryableTransportClient.cancel(); + } + @Override public void writeFileChunk( StoreFileMetadata fileMetadata, @@ -244,57 +248,6 @@ public void writeFileChunk( int totalTranslogOps, ActionListener listener ) { - // Pause using the rate limiter, if desired, to throttle the recovery - final long throttleTimeInNanos; - // always fetch the ratelimiter - it might be updated in real-time on the recovery settings - final RateLimiter rl = recoverySettings.rateLimiter(); - if (rl != null) { - long bytes = bytesSinceLastPause.addAndGet(content.length()); - if (bytes > rl.getMinPauseCheckBytes()) { - // Time to pause - bytesSinceLastPause.addAndGet(-bytes); - try { - throttleTimeInNanos = rl.pause(bytes); - onSourceThrottle.accept(throttleTimeInNanos); - } catch (IOException e) { - throw new OpenSearchException("failed to pause recovery", e); - } - } else { - throttleTimeInNanos = 0; - } - } else { - throttleTimeInNanos = 0; - } - - final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK; - final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); - /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can - * see how many translog ops we accumulate while copying files across the network. A future optimization - * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. - */ - final FileChunkRequest request = new FileChunkRequest( - recoveryId, - requestSeqNo, - shardId, - fileMetadata, - position, - content, - lastChunk, - totalTranslogOps, - throttleTimeInNanos - ); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - retryableTransportClient.executeRetryableAction( - action, - request, - fileChunkRequestOptions, - ActionListener.map(listener, r -> null), - reader - ); - } - - @Override - public void cancel() { - retryableTransportClient.cancel(); + fileChunkWriter.writeFileChunk(fileMetadata, position, content, lastChunk, totalTranslogOps, listener); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java index bc10cc80b7fdc..a7113fb3fee5c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java @@ -74,7 +74,7 @@ public void executeRetryableAction( executeRetryableAction(action, request, options, actionListener, reader); } - void executeRetryableAction( + public void executeRetryableAction( String action, TransportRequest request, TransportRequestOptions options, diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java index 21749d3fe7d8a..daad33ed93f28 100644 --- a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java @@ -57,4 +57,8 @@ public void writeTo(StreamOutput out) throws IOException { public ReplicationCheckpoint getCheckpoint() { return checkpoint; } + + public List getFilesToFetch() { + return filesToFetch; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java new file mode 100644 index 0000000000000..6302d364fc6d1 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -0,0 +1,230 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Manages references to ongoing segrep events on a node. + * Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication. + * CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments. + * + * @opensearch.internal + */ +class OngoingSegmentReplications { + + private final RecoverySettings recoverySettings; + private final IndicesService indicesService; + private final Map copyStateMap; + private final Map nodesToHandlers; + + /** + * Constructor. + * + * @param indicesService {@link IndicesService} + * @param recoverySettings {@link RecoverySettings} + */ + OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) { + this.indicesService = indicesService; + this.recoverySettings = recoverySettings; + this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); + this.nodesToHandlers = ConcurrentCollections.newConcurrentMap(); + } + + /** + * Operations on the {@link #copyStateMap} member. + */ + + /** + * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key + * and returns the cached value if one is present. If the key is not present, a {@link CopyState} + * object is constructed and stored in the map before being returned. + */ + synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { + if (isInCopyStateMap(checkpoint)) { + final CopyState copyState = fetchFromCopyStateMap(checkpoint); + // we incref the copyState for every replica that is using this checkpoint. + // decref will happen when copy completes. + copyState.incRef(); + return copyState; + } else { + // From the checkpoint's shard ID, fetch the IndexShard + ShardId shardId = checkpoint.getShardId(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + // build the CopyState object and cache it before returning + final CopyState copyState = new CopyState(checkpoint, indexShard); + + /** + * Use the checkpoint from the request as the key in the map, rather than + * the checkpoint from the created CopyState. This maximizes cache hits + * if replication targets make a request with an older checkpoint. + * Replication targets are expected to fetch the checkpoint in the response + * CopyState to bring themselves up to date. + */ + addToCopyStateMap(checkpoint, copyState); + return copyState; + } + } + + /** + * Start sending files to the replica. + * + * @param request {@link GetSegmentFilesRequest} + * @param listener {@link ActionListener} that resolves when sending files is complete. + */ + void startSegmentCopy(GetSegmentFilesRequest request, ActionListener listener) { + final DiscoveryNode node = request.getTargetNode(); + final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node); + if (handler != null) { + if (handler.isReplicating()) { + throw new OpenSearchException( + "Replication to shard {}, on node {} has already started", + request.getCheckpoint().getShardId(), + request.getTargetNode() + ); + } + // update the given listener to release the CopyState before it resolves. + final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { + final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node); + if (sourceHandler != null) { + removeCopyState(sourceHandler.getCopyState()); + } + }); + handler.sendFiles(request, wrappedListener); + } else { + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } + } + + /** + * Cancel any ongoing replications for a given {@link DiscoveryNode} + * + * @param node {@link DiscoveryNode} node for which to cancel replication events. + */ + void cancelReplication(DiscoveryNode node) { + final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node); + if (handler != null) { + handler.cancel("Cancel on node left"); + removeCopyState(handler.getCopyState()); + } + } + + /** + * Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current + * nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its + * local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas + * with the list of required files. + * + * @param request {@link CheckpointInfoRequest} + * @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer. + * @return {@link CopyState} the built CopyState for this replication event. + * @throws IOException - When there is an IO error building CopyState. + */ + CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException { + final CopyState copyState = getCachedCopyState(request.getCheckpoint()); + if (nodesToHandlers.putIfAbsent( + request.getTargetNode(), + createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter) + ) != null) { + throw new OpenSearchException( + "Shard copy {} on node {} already replicating", + request.getCheckpoint().getShardId(), + request.getTargetNode() + ); + } + return copyState; + } + + /** + * Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down. + * + * @param shard {@link IndexShard} + * @param reason {@link String} - Reason for the cancel + */ + synchronized void cancel(IndexShard shard, String reason) { + for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) { + if (entry.getCopyState().getShard().equals(shard)) { + entry.cancel(reason); + } + } + copyStateMap.clear(); + } + + /** + * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} + * as a key by invoking {@link Map#containsKey(Object)}. + */ + boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.containsKey(replicationCheckpoint); + } + + int size() { + return nodesToHandlers.size(); + } + + int cachedCopyStateSize() { + return copyStateMap.size(); + } + + private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) { + return new SegmentReplicationSourceHandler( + node, + fileChunkWriter, + copyState.getShard().getThreadPool(), + copyState, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks() + ); + } + + /** + * Adds the input {@link CopyState} object to {@link #copyStateMap}. + * The key is the CopyState's {@link ReplicationCheckpoint} object. + */ + private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { + copyStateMap.putIfAbsent(checkpoint, copyState); + } + + /** + * Given a {@link ReplicationCheckpoint}, return the corresponding + * {@link CopyState} object, if any, from {@link #copyStateMap}. + */ + private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.get(replicationCheckpoint); + } + + /** + * Remove a CopyState. Intended to be called after a replication event completes. + * This method will remove a copyState from the copyStateMap only if its refCount hits 0. + * + * @param copyState {@link CopyState} + */ + private synchronized void removeCopyState(CopyState copyState) { + if (copyState.decRef() == true) { + copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java new file mode 100644 index 0000000000000..05f1c9d757e5c --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.store.RateLimiter; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * This class handles sending file chunks over the transport layer to a target shard. + * + * @opensearch.internal + */ +public final class RemoteSegmentFileChunkWriter implements FileChunkWriter { + + private final AtomicLong requestSeqNoGenerator; + private final RetryableTransportClient retryableTransportClient; + private final ShardId shardId; + private final RecoverySettings recoverySettings; + private final long replicationId; + private final AtomicLong bytesSinceLastPause = new AtomicLong(); + private final TransportRequestOptions fileChunkRequestOptions; + private final Consumer onSourceThrottle; + private final String action; + + public RemoteSegmentFileChunkWriter( + long replicationId, + RecoverySettings recoverySettings, + RetryableTransportClient retryableTransportClient, + ShardId shardId, + String action, + AtomicLong requestSeqNoGenerator, + Consumer onSourceThrottle + ) { + this.replicationId = replicationId; + this.recoverySettings = recoverySettings; + this.retryableTransportClient = retryableTransportClient; + this.shardId = shardId; + this.requestSeqNoGenerator = requestSeqNoGenerator; + this.onSourceThrottle = onSourceThrottle; + this.fileChunkRequestOptions = TransportRequestOptions.builder() + .withType(TransportRequestOptions.Type.RECOVERY) + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); + + this.action = action; + } + + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + // Pause using the rate limiter, if desired, to throttle the recovery + final long throttleTimeInNanos; + // always fetch the ratelimiter - it might be updated in real-time on the recovery settings + final RateLimiter rl = recoverySettings.rateLimiter(); + if (rl != null) { + long bytes = bytesSinceLastPause.addAndGet(content.length()); + if (bytes > rl.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytes); + try { + throttleTimeInNanos = rl.pause(bytes); + onSourceThrottle.accept(throttleTimeInNanos); + } catch (IOException e) { + throw new OpenSearchException("failed to pause recovery", e); + } + } else { + throttleTimeInNanos = 0; + } + } else { + throttleTimeInNanos = 0; + } + + final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); + /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can + * see how many translog ops we accumulate while copying files across the network. A future optimization + * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. + */ + final FileChunkRequest request = new FileChunkRequest( + replicationId, + requestSeqNo, + shardId, + fileMetadata, + position, + content, + lastChunk, + totalTranslogOps, + throttleTimeInNanos + ); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + retryableTransportClient.executeRetryableAction( + action, + request, + fileChunkRequestOptions, + ActionListener.map(listener, r -> null), + reader + ); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java new file mode 100644 index 0000000000000..e95c2c6470b4b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java @@ -0,0 +1,239 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.ArrayUtil; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.Transports; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.IntSupplier; + +/** + * SegmentFileSender handles building and starting a {@link MultiChunkTransfer} to orchestrate sending chunks to a given targetNode. + * This class delegates to a {@link FileChunkWriter} to handle the transport of chunks. + * + * @opensearch.internal + * // TODO: make this package-private after combining recovery and replication into single package. + */ +public final class SegmentFileTransferHandler { + + private final Logger logger; + private final IndexShard shard; + private final FileChunkWriter chunkWriter; + private final ThreadPool threadPool; + private final int chunkSizeInBytes; + private final int maxConcurrentFileChunks; + private final DiscoveryNode targetNode; + private final CancellableThreads cancellableThreads; + + public SegmentFileTransferHandler( + IndexShard shard, + DiscoveryNode targetNode, + FileChunkWriter chunkWriter, + Logger logger, + ThreadPool threadPool, + CancellableThreads cancellableThreads, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks + ) { + this.shard = shard; + this.targetNode = targetNode; + this.chunkWriter = chunkWriter; + this.logger = logger; + this.threadPool = threadPool; + this.cancellableThreads = cancellableThreads; + this.chunkSizeInBytes = fileChunkSizeInBytes; + // if the target is on an old version, it won't be able to handle out-of-order file chunks. + this.maxConcurrentFileChunks = maxConcurrentFileChunks; + } + + /** + * Returns a closeable {@link MultiChunkTransfer} to initiate sending a list of files. + * Callers are responsible for starting the transfer and closing the resource. + * @param store {@link Store} + * @param files {@link StoreFileMetadata[]} + * @param translogOps {@link IntSupplier} + * @param listener {@link ActionListener} + * @return {@link MultiChunkTransfer} + */ + public MultiChunkTransfer createTransfer( + Store store, + StoreFileMetadata[] files, + IntSupplier translogOps, + ActionListener listener + ) { + ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first + return new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) { + + final Deque buffers = new ConcurrentLinkedDeque<>(); + InputStreamIndexInput currentInput = null; + long offset = 0; + + @Override + protected void onNewResource(StoreFileMetadata md) throws IOException { + offset = 0; + IOUtils.close(currentInput, () -> currentInput = null); + final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); + currentInput = new InputStreamIndexInput(indexInput, md.length()) { + @Override + public void close() throws IOException { + IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop + } + }; + } + + private byte[] acquireBuffer() { + final byte[] buffer = buffers.pollFirst(); + if (buffer != null) { + return buffer; + } + return new byte[chunkSizeInBytes]; + } + + @Override + protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { + assert Transports.assertNotTransportThread("read file chunk"); + cancellableThreads.checkForCancel(); + final byte[] buffer = acquireBuffer(); + final int bytesRead = currentInput.read(buffer); + if (bytesRead == -1) { + throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name()); + } + final boolean lastChunk = offset + bytesRead == md.length(); + final FileChunk chunk = new FileChunk( + md, + new BytesArray(buffer, 0, bytesRead), + offset, + lastChunk, + () -> buffers.addFirst(buffer) + ); + offset += bytesRead; + return chunk; + } + + @Override + protected void executeChunkRequest(FileChunk request, ActionListener listener1) { + cancellableThreads.checkForCancel(); + chunkWriter.writeFileChunk( + request.md, + request.position, + request.content, + request.lastChunk, + translogOps.getAsInt(), + ActionListener.runBefore(listener1, request::close) + ); + } + + @Override + protected void handleError(StoreFileMetadata md, Exception e) throws Exception { + handleErrorOnSendFiles(store, e, new StoreFileMetadata[] { md }); + } + + @Override + public void close() throws IOException { + IOUtils.close(currentInput, () -> currentInput = null); + } + }; + } + + public void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception { + final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); + assert Transports.assertNotTransportThread(this + "[handle error on send/clean files]"); + if (corruptIndexException != null) { + Exception localException = null; + for (StoreFileMetadata md : mds) { + cancellableThreads.checkForCancel(); + logger.debug("checking integrity for file {} after remove corruption exception", md); + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); + if (localException == null) { + localException = corruptIndexException; + } + shard.failShard("error sending files", corruptIndexException); + } + } + if (localException != null) { + throw localException; + } else { // corruption has happened on the way to replica + RemoteTransportException remoteException = new RemoteTransportException( + "File corruption occurred on recovery but checksums are ok", + null + ); + remoteException.addSuppressed(e); + logger.warn( + () -> new ParameterizedMessage( + "{} Remote file corruption on node {}, recovering {}. local checksum OK", + shard.shardId(), + targetNode, + mds + ), + corruptIndexException + ); + throw remoteException; + } + } + throw e; + } + + /** + * A file chunk from the recovery source + * + * @opensearch.internal + */ + public static final class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable { + final StoreFileMetadata md; + final BytesReference content; + final long position; + final boolean lastChunk; + final Releasable onClose; + + FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) { + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + this.onClose = onClose; + } + + @Override + public boolean lastChunk() { + return lastChunk; + } + + @Override + public void close() { + onClose.close(); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java new file mode 100644 index 0000000000000..fdabd48c62929 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -0,0 +1,170 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.ListenableFuture; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.indices.recovery.DelayRecoveryException; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.Transports; + +import java.io.Closeable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Orchestrates sending requested segment files to a target shard. + * + * @opensearch.internal + */ +class SegmentReplicationSourceHandler { + + private final IndexShard shard; + private final CopyState copyState; + private final SegmentFileTransferHandler segmentFileTransferHandler; + private final CancellableThreads cancellableThreads = new CancellableThreads(); + private final ListenableFuture future = new ListenableFuture<>(); + private final List resources = new CopyOnWriteArrayList<>(); + private final Logger logger; + private final AtomicBoolean isReplicating = new AtomicBoolean(); + + /** + * Constructor. + * + * @param targetNode - {@link DiscoveryNode} target node where files should be sent. + * @param writer {@link FileChunkWriter} implementation that sends file chunks over the transport layer. + * @param threadPool {@link ThreadPool} Thread pool. + * @param copyState {@link CopyState} CopyState holding segment file metadata. + * @param fileChunkSizeInBytes {@link Integer} + * @param maxConcurrentFileChunks {@link Integer} + */ + SegmentReplicationSourceHandler( + DiscoveryNode targetNode, + FileChunkWriter writer, + ThreadPool threadPool, + CopyState copyState, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks + ) { + this.shard = copyState.getShard(); + this.logger = Loggers.getLogger( + SegmentReplicationSourceHandler.class, + copyState.getShard().shardId(), + "sending segments to " + targetNode.getName() + ); + this.segmentFileTransferHandler = new SegmentFileTransferHandler( + copyState.getShard(), + targetNode, + writer, + logger, + threadPool, + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + this.copyState = copyState; + } + + /** + * Sends Segment files from the local node to the given target. + * + * @param request {@link GetSegmentFilesRequest} request object containing list of files to be sent. + * @param listener {@link ActionListener} that completes with the list of files sent. + */ + public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener listener) { + if (isReplicating.compareAndSet(false, true) == false) { + throw new OpenSearchException("Replication to {} is already running.", shard.shardId()); + } + future.addListener(listener, OpenSearchExecutors.newDirectExecutorService()); + final Closeable releaseResources = () -> IOUtils.close(resources); + try { + + final Consumer onFailure = e -> { + assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]"); + IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); + }; + + RunUnderPrimaryPermit.run(() -> { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); + if (targetShardRouting == null) { + logger.debug( + "delaying replication of {} as it is not listed as assigned to target node {}", + shard.shardId(), + request.getTargetNode() + ); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } + }, + shard.shardId() + " validating recovery target [" + request.getTargetAllocationId() + "] registered ", + shard, + cancellableThreads, + logger + ); + + final StepListener sendFileStep = new StepListener<>(); + Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); + final StoreFileMetadata[] storeFileMetadata = request.getFilesToFetch() + .stream() + .filter(file -> storeFiles.contains(file.name())) + .toArray(StoreFileMetadata[]::new); + + final MultiChunkTransfer transfer = segmentFileTransferHandler + .createTransfer(shard.store(), storeFileMetadata, () -> 0, sendFileStep); + resources.add(transfer); + transfer.start(); + + sendFileStep.whenComplete(r -> { + try { + future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); + } finally { + IOUtils.close(resources); + } + }, onFailure); + } catch (Exception e) { + IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); + } + } + + /** + * Cancels the recovery and interrupts all eligible threads. + */ + public void cancel(String reason) { + cancellableThreads.cancel(reason); + } + + CopyState getCopyState() { + return copyState; + } + + public boolean isReplicating() { + return isReplicating.get(); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 9f70120dedd6c..d428459884f97 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -10,11 +10,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.index.IndexService; +import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; +import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -23,9 +32,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** * Service class that handles segment replication requests from replica shards. @@ -33,9 +40,12 @@ * * @opensearch.internal */ -public class SegmentReplicationSourceService { +public final class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class); + private final RecoverySettings recoverySettings; + private final TransportService transportService; + private final IndicesService indicesService; /** * Internal actions used by the segment replication source service on the primary shard @@ -43,20 +53,21 @@ public class SegmentReplicationSourceService { * @opensearch.internal */ public static class Actions { + public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info"; public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files"; } - private final Map copyStateMap; - private final TransportService transportService; - private final IndicesService indicesService; + private final OngoingSegmentReplications ongoingSegmentReplications; - // TODO mark this as injected and bind in Node - public SegmentReplicationSourceService(TransportService transportService, IndicesService indicesService) { - copyStateMap = Collections.synchronizedMap(new HashMap<>()); + public SegmentReplicationSourceService( + IndicesService indicesService, + TransportService transportService, + RecoverySettings recoverySettings + ) { this.transportService = transportService; this.indicesService = indicesService; - + this.recoverySettings = recoverySettings; transportService.registerRequestHandler( Actions.GET_CHECKPOINT_INFO, ThreadPool.Names.GENERIC, @@ -69,14 +80,27 @@ public SegmentReplicationSourceService(TransportService transportService, Indice GetSegmentFilesRequest::new, new GetSegmentFilesRequestHandler() ); + this.ongoingSegmentReplications = new OngoingSegmentReplications(indicesService, recoverySettings); } private class CheckpointInfoRequestHandler implements TransportRequestHandler { @Override public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception { - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - logger.trace("Received request for checkpoint {}", checkpoint); - final CopyState copyState = getCachedCopyState(checkpoint); + final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter( + request.getReplicationId(), + recoverySettings, + new RetryableTransportClient( + transportService, + request.getTargetNode(), + recoverySettings.internalActionRetryTimeout(), + logger + ), + request.getCheckpoint().getShardId(), + SegmentReplicationTargetService.Actions.FILE_CHUNK, + new AtomicLong(0), + (throttleTime) -> {} + ); + final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( new CheckpointInfoResponse( copyState.getCheckpoint(), @@ -88,73 +112,47 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan } } - class GetSegmentFilesRequestHandler implements TransportRequestHandler { + private class GetSegmentFilesRequestHandler implements TransportRequestHandler { @Override public void messageReceived(GetSegmentFilesRequest request, TransportChannel channel, Task task) throws Exception { - if (isInCopyStateMap(request.getCheckpoint())) { - // TODO send files - } else { - // Return an empty list of files - channel.sendResponse(new GetSegmentFilesResponse(Collections.emptyList())); - } + ongoingSegmentReplications.startSegmentCopy(request, new ChannelActionListener<>(channel, Actions.GET_SEGMENT_FILES, request)); } } - /** - * Operations on the {@link #copyStateMap} member. - */ + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + ongoingSegmentReplications.cancelReplication(removedNode); + } + } + } - /** - * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key - * and returns the cached value if one is present. If the key is not present, a {@link CopyState} - * object is constructed and stored in the map before being returned. - */ - private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { - if (isInCopyStateMap(checkpoint)) { - final CopyState copyState = fetchFromCopyStateMap(checkpoint); - copyState.incRef(); - return copyState; - } else { - // From the checkpoint's shard ID, fetch the IndexShard - ShardId shardId = checkpoint.getShardId(); - final IndexService indexService = indicesService.indexService(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.id()); - // build the CopyState object and cache it before returning - final CopyState copyState = new CopyState(indexShard); - - /** - * Use the checkpoint from the request as the key in the map, rather than - * the checkpoint from the created CopyState. This maximizes cache hits - * if replication targets make a request with an older checkpoint. - * Replication targets are expected to fetch the checkpoint in the response - * CopyState to bring themselves up to date. - */ - addToCopyStateMap(checkpoint, copyState); - return copyState; + @Override + protected void doStart() { + final ClusterService clusterService = indicesService.clusterService(); + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + clusterService.addListener(this); } } - /** - * Adds the input {@link CopyState} object to {@link #copyStateMap}. - * The key is the CopyState's {@link ReplicationCheckpoint} object. - */ - private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { - copyStateMap.putIfAbsent(checkpoint, copyState); + @Override + protected void doStop() { + final ClusterService clusterService = indicesService.clusterService(); + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + indicesService.clusterService().removeListener(this); + } } - /** - * Given a {@link ReplicationCheckpoint}, return the corresponding - * {@link CopyState} object, if any, from {@link #copyStateMap}. - */ - private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.get(replicationCheckpoint); + @Override + protected void doClose() throws IOException { + } - /** - * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} - * as a key by invoking {@link Map#containsKey(Object)}. - */ - private boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.containsKey(replicationCheckpoint); + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + if (indexShard != null) { + ongoingSegmentReplications.cancel(indexShard, "shard is closed"); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 250df3481435a..c0e0b4dee2b3f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -33,14 +33,20 @@ public class CopyState extends AbstractRefCounted { private final GatedCloseable segmentInfosRef; + /** ReplicationCheckpoint requested */ + private final ReplicationCheckpoint requestedReplicationCheckpoint; + /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; private final Store.MetadataSnapshot metadataSnapshot; private final HashSet pendingDeleteFiles; private final byte[] infosBytes; private GatedCloseable commitRef; + private final IndexShard shard; - public CopyState(IndexShard shard) throws IOException { + public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { super("CopyState-" + shard.shardId()); + this.requestedReplicationCheckpoint = requestedReplicationCheckpoint; + this.shard = shard; this.segmentInfosRef = shard.getSegmentInfosSnapshot(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); this.metadataSnapshot = shard.store().getMetadata(segmentInfos); @@ -100,4 +106,12 @@ public byte[] getInfosBytes() { public Set getPendingDeleteFiles() { return pendingDeleteFiles; } + + public IndexShard getShard() { + return shard; + } + + public ReplicationCheckpoint getRequestedReplicationCheckpoint() { + return requestedReplicationCheckpoint; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java index db8206d131c13..09b14fb1b5333 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java @@ -46,4 +46,29 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(this.targetAllocationId); targetNode.writeTo(out); } + + public long getReplicationId() { + return replicationId; + } + + public String getTargetAllocationId() { + return targetAllocationId; + } + + public DiscoveryNode getTargetNode() { + return targetNode; + } + + @Override + public String toString() { + return "SegmentReplicationTransportRequest{" + + "replicationId=" + + replicationId + + ", targetAllocationId='" + + targetAllocationId + + '\'' + + ", targetNode=" + + targetNode + + '}'; + } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7a8f7ecb87dcc..cf99d116131a3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -40,6 +40,8 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.RunnableTaskExecutionListener; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -223,6 +225,7 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.opensearch.common.util.FeatureFlags.REPLICATION_TYPE; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; /** @@ -944,6 +947,10 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); + if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { + b.bind(SegmentReplicationSourceService.class) + .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); + } } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index fc5c429d74b16..2b5550b71a627 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -94,6 +94,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.DummyShardLock; @@ -544,21 +545,22 @@ public void writeFileChunk( }); } }; + IndexShard mockShard = mock(IndexShard.class); + when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0)); + doAnswer(invocation -> { + assertFalse(failedEngine.get()); + failedEngine.set(true); + return null; + }).when(mockShard).failShard(any(), any()); RecoverySourceHandler handler = new RecoverySourceHandler( - null, + mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8), between(1, 8) - ) { - @Override - protected void failEngine(IOException cause) { - assertFalse(failedEngine.get()); - failedEngine.set(true); - } - }; + ); SetOnce sendFilesError = new SetOnce<>(); CountDownLatch latch = new CountDownLatch(1); handler.sendFiles( @@ -570,6 +572,7 @@ protected void failEngine(IOException cause) { latch.await(); assertThat(sendFilesError.get(), instanceOf(IOException.class)); assertNotNull(ExceptionsHelper.unwrapCorruption(sendFilesError.get())); + failedEngine.get(); assertTrue(failedEngine.get()); // ensure all chunk requests have been completed; otherwise some files on the target are left open. IOUtils.close(() -> terminate(threadPool), () -> threadPool = null); @@ -617,21 +620,22 @@ public void writeFileChunk( } } }; + IndexShard mockShard = mock(IndexShard.class); + when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0)); + doAnswer(invocation -> { + assertFalse(failedEngine.get()); + failedEngine.set(true); + return null; + }).when(mockShard).failShard(any(), any()); RecoverySourceHandler handler = new RecoverySourceHandler( - null, + mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10), between(1, 4) - ) { - @Override - protected void failEngine(IOException cause) { - assertFalse(failedEngine.get()); - failedEngine.set(true); - } - }; + ); PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); Exception ex = expectThrows(Exception.class, sendFilesFuture::actionGet); @@ -747,7 +751,7 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test")); cancelingThread.start(); try { - RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger); + RunUnderPrimaryPermit.run(() -> {}, "test", shard, cancellableThreads, logger); } catch (CancellableThreads.ExecutionCancelledException e) { // expected. } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java new file mode 100644 index 0000000000000..260f6a13b5010 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.junit.Assert; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class OngoingSegmentReplicationsTests extends IndexShardTestCase { + + private final IndicesService mockIndicesService = mock(IndicesService.class); + private ReplicationCheckpoint testCheckpoint; + private DiscoveryNode primaryDiscoveryNode; + private DiscoveryNode replicaDiscoveryNode; + private IndexShard primary; + private IndexShard replica; + + private GetSegmentFilesRequest getSegmentFilesRequest; + + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + + @Override + public void setUp() throws Exception { + super.setUp(); + primary = newStartedShard(true); + replica = newShard(primary.shardId(), false); + recoverReplica(replica, primary, true); + replicaDiscoveryNode = replica.recoveryState().getTargetNode(); + primaryDiscoveryNode = replica.recoveryState().getSourceNode(); + + ShardId testShardId = primary.shardId(); + + // This mirrors the creation of the ReplicationCheckpoint inside CopyState + testCheckpoint = new ReplicationCheckpoint( + testShardId, + primary.getOperationPrimaryTerm(), + 0L, + primary.getProcessedLocalCheckpoint(), + 0L + ); + IndexService mockIndexService = mock(IndexService.class); + when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); + when(mockIndexService.getShard(testShardId.id())).thenReturn(primary); + + TransportService transportService = mock(TransportService.class); + when(transportService.getThreadPool()).thenReturn(threadPool); + } + + @Override + public void tearDown() throws Exception { + closeShards(primary, replica); + super.tearDown(); + } + + public void testPrepareAndSendSegments() throws IOException { + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(1, replications.size()); + assertEquals(1, copyState.refCount()); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + testCheckpoint + ); + + final Collection expectedFiles = List.copyOf(primary.store().getMetadata().asMap().values()); + replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertEquals(1, getSegmentFilesResponse.files.size()); + assertEquals(1, expectedFiles.size()); + assertTrue(expectedFiles.stream().findFirst().get().isSame(getSegmentFilesResponse.files.get(0))); + assertEquals(0, copyState.refCount()); + assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(0, replications.size()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Unexpected failure", e); + Assert.fail(); + } + }); + } + + public void testCancelReplication() throws IOException { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + primaryDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + // this shouldn't be called in this test. + Assert.fail(); + }; + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertEquals(1, replications.size()); + assertEquals(1, replications.cachedCopyStateSize()); + + replications.cancelReplication(primaryDiscoveryNode); + assertEquals(0, copyState.refCount()); + assertEquals(0, replications.size()); + assertEquals(0, replications.cachedCopyStateSize()); + } + + public void testMultipleReplicasUseSameCheckpoint() throws IOException { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + primaryDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + // this shouldn't be called in this test. + Assert.fail(); + }; + + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertEquals(1, copyState.refCount()); + + final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + replications.prepareForReplication(secondRequest, segmentSegmentFileChunkWriter); + + assertEquals(2, copyState.refCount()); + assertEquals(2, replications.size()); + assertEquals(1, replications.cachedCopyStateSize()); + + replications.cancelReplication(primaryDiscoveryNode); + replications.cancelReplication(replicaDiscoveryNode); + assertEquals(0, copyState.refCount()); + assertEquals(0, replications.size()); + assertEquals(0, replications.cachedCopyStateSize()); + } + + public void testStartCopyWithoutPrepareStep() { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final ActionListener listener = spy(new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertTrue(getSegmentFilesResponse.files.isEmpty()); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + testCheckpoint + ); + + replications.startSegmentCopy(getSegmentFilesRequest, listener); + verify(listener, times(1)).onResponse(any()); + } + + public void testShardAlreadyReplicatingToNode() throws IOException { + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); }); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java new file mode 100644 index 0000000000000..5fd8bc1e74625 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java @@ -0,0 +1,251 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFileNames; +import org.junit.Assert; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.IntSupplier; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.Mockito.*; + +public class SegmentFileTransferHandlerTests extends IndexShardTestCase { + + private IndexShard shard; + private StoreFileMetadata[] filesToSend; + private final DiscoveryNode targetNode = new DiscoveryNode( + "foo", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ); + + final int fileChunkSizeInBytes = 5000; + final int maxConcurrentFileChunks = 1; + private CancellableThreads cancellableThreads; + final IntSupplier translogOps = () -> 0; + + @Override + public void setUp() throws Exception { + super.setUp(); + cancellableThreads = new CancellableThreads(); + shard = spy(newStartedShard(true)); + filesToSend = getFilestoSend(shard); + // we should only have a Segments_N file at this point. + assertEquals(1, filesToSend.length); + } + + private StoreFileMetadata[] getFilestoSend(IndexShard shard) throws IOException { + final Store.MetadataSnapshot metadata = shard.store().getMetadata(); + return metadata.asMap().values().toArray(StoreFileMetadata[]::new); + } + + @Override + public void tearDown() throws Exception { + closeShards(shard); + super.tearDown(); + } + + public void testSendFiles_invokesChunkWriter() throws IOException, InterruptedException { + // use counDownLatch and countDown when our chunkWriter is invoked. + final CountDownLatch countDownLatch = new CountDownLatch(1); + final FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + assertTrue(filesToSend[0].isSame(fileMetadata)); + assertTrue(lastChunk); + countDownLatch.countDown(); + } + }); + + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + final MultiChunkTransfer transfer = handler.createTransfer( + shard.store(), + filesToSend, + translogOps, + mock(ActionListener.class) + ); + + // start the transfer + transfer.start(); + countDownLatch.await(5, TimeUnit.SECONDS); + verify(chunkWriter, times(1)).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); + IOUtils.close(transfer); + } + + public void testSendFiles_cancelThreads_beforeStart() throws IOException, InterruptedException { + final FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + Assert.fail(); + } + }); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + + final MultiChunkTransfer transfer = handler.createTransfer( + shard.store(), + filesToSend, + translogOps, + mock(ActionListener.class) + ); + + // start the transfer + cancellableThreads.cancel("test"); + transfer.start(); + verifyNoInteractions(chunkWriter); + IOUtils.close(transfer); + } + + public void testSendFiles_cancelThreads_afterStart() throws IOException, InterruptedException { + // index a doc a flush so we have more than 1 file to send. + indexDoc(shard, "_doc", "test"); + flushShard(shard, true); + filesToSend = getFilestoSend(shard); + + // we should have 4 files to send now - + // [_0.cfe, _0.si, _0.cfs, segments_3] + assertEquals(4, filesToSend.length); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + // cancel the threads at this point, we'll ensure this is not invoked more than once. + cancellableThreads.cancel("test"); + listener.onResponse(null); + } + }); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + + final MultiChunkTransfer transfer = handler.createTransfer( + shard.store(), + filesToSend, + translogOps, + new ActionListener() { + @Override + public void onResponse(Void unused) { + // do nothing here, we will just resolve in test. + } + + @Override + public void onFailure(Exception e) { + assertEquals(CancellableThreads.ExecutionCancelledException.class, e.getClass()); + countDownLatch.countDown(); + } + } + ); + + // start the transfer + transfer.start(); + countDownLatch.await(30, TimeUnit.SECONDS); + verify(chunkWriter, times(1)).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); + IOUtils.close(transfer); + } + + public void testSendFiles_CorruptIndexException() throws Exception { + final CancellableThreads cancellableThreads = new CancellableThreads(); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + mock(FileChunkWriter.class), + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata( + IndexFileNames.SEGMENTS, + 1L, + "0", + org.apache.lucene.util.Version.LATEST + ); + + doNothing().when(shard).failShard(anyString(), any()); + assertThrows( + CorruptIndexException.class, + () -> { + handler.handleErrorOnSendFiles( + shard.store(), + new CorruptIndexException("test", "test"), + new StoreFileMetadata[] { SEGMENTS_FILE } + ); + } + ); + + verify(shard, times(1)).failShard(any(), any()); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java new file mode 100644 index 0000000000000..70061c54d0da2 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.mockito.Mockito; +import org.opensearch.OpenSearchException; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase { + + private final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + private DiscoveryNode replicaDiscoveryNode; + private IndexShard primary; + private IndexShard replica; + + private FileChunkWriter chunkWriter; + + @Override + public void setUp() throws Exception { + super.setUp(); + primary = newStartedShard(true); + replica = newShard(primary.shardId(), false); + recoverReplica(replica, primary, true); + replicaDiscoveryNode = replica.recoveryState().getTargetNode(); + } + + @Override + public void tearDown() throws Exception { + closeShards(primary, replica); + super.tearDown(); + } + + public void testSendFiles() throws IOException { + chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onResponse(null); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + expectedFiles, + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + MatcherAssert.assertThat(getSegmentFilesResponse.files, Matchers.containsInAnyOrder(expectedFiles.toArray())); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + public void testSendFiles_emptyRequest() throws IOException { + chunkWriter = mock(FileChunkWriter.class); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertTrue(getSegmentFilesResponse.files.isEmpty()); + Mockito.verifyNoInteractions(chunkWriter); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + public void testSendFileFails() throws IOException { + chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onFailure( + new OpenSearchException("Test") + ); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + expectedFiles, + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(e.getClass(), OpenSearchException.class); + } + }); + } + + public void testReplicationAlreadyRunning() throws IOException { + chunkWriter = mock(FileChunkWriter.class); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, mock(ActionListener.class)); + Assert.assertThrows(OpenSearchException.class, () -> { handler.sendFiles(getSegmentFilesRequest, mock(ActionListener.class)); }); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 67c867d360e70..8d2ca9ff63f3d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -9,13 +9,16 @@ package org.opensearch.indices.replication; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyStateTests; import org.opensearch.test.OpenSearchTestCase; @@ -30,30 +33,23 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class SegmentReplicationSourceServiceTests extends OpenSearchTestCase { - private ShardId testShardId; private ReplicationCheckpoint testCheckpoint; - private IndicesService mockIndicesService; - private IndexService mockIndexService; - private IndexShard mockIndexShard; private TestThreadPool testThreadPool; - private CapturingTransport transport; private TransportService transportService; private DiscoveryNode localNode; - private SegmentReplicationSourceService segmentReplicationSourceService; @Override public void setUp() throws Exception { super.setUp(); // setup mocks - mockIndexShard = CopyStateTests.createMockIndexShard(); - testShardId = mockIndexShard.shardId(); - mockIndicesService = mock(IndicesService.class); - mockIndexService = mock(IndexService.class); + IndexShard mockIndexShard = CopyStateTests.createMockIndexShard(); + ShardId testShardId = mockIndexShard.shardId(); + IndicesService mockIndicesService = mock(IndicesService.class); + IndexService mockIndexService = mock(IndexService.class); when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); @@ -66,7 +62,7 @@ public void setUp() throws Exception { 0L ); testThreadPool = new TestThreadPool("test", Settings.EMPTY); - transport = new CapturingTransport(); + CapturingTransport transport = new CapturingTransport(); localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); transportService = transport.createTransportService( Settings.EMPTY, @@ -78,7 +74,16 @@ public void setUp() throws Exception { ); transportService.start(); transportService.acceptIncomingRequests(); - segmentReplicationSourceService = new SegmentReplicationSourceService(transportService, mockIndicesService); + + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + + SegmentReplicationSourceService segmentReplicationSourceService = new SegmentReplicationSourceService( + mockIndicesService, + transportService, + recoverySettings + ); } @Override @@ -88,7 +93,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testGetSegmentFiles_EmptyResponse() { + public void testGetSegmentFiles() { final GetSegmentFilesRequest request = new GetSegmentFilesRequest( 1, "allocationId", @@ -96,19 +101,52 @@ public void testGetSegmentFiles_EmptyResponse() { Collections.emptyList(), testCheckpoint ); + executeGetSegmentFiles(request, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse response) { + assertEquals(0, response.files.size()); + } + + @Override + public void onFailure(Exception e) { + fail("unexpected exception: " + e); + } + }); + } + + public void testCheckpointInfo() { + executeGetCheckpointInfo(new ActionListener<>() { + @Override + public void onResponse(CheckpointInfoResponse response) { + assertEquals(testCheckpoint, response.getCheckpoint()); + assertNotNull(response.getInfosBytes()); + // CopyStateTests sets up one pending delete file and one committed segments file + assertEquals(1, response.getPendingDeleteFiles().size()); + assertEquals(1, response.getSnapshot().size()); + } + + @Override + public void onFailure(Exception e) { + fail("unexpected exception: " + e); + } + }); + } + + private void executeGetCheckpointInfo(ActionListener listener) { + final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint); transportService.sendRequest( localNode, - SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, + SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, request, - new TransportResponseHandler() { + new TransportResponseHandler() { @Override - public void handleResponse(GetSegmentFilesResponse response) { - assertEquals(0, response.files.size()); + public void handleResponse(CheckpointInfoResponse response) { + listener.onResponse(response); } @Override public void handleException(TransportException e) { - fail("unexpected exception: " + e); + listener.onFailure(e); } @Override @@ -117,32 +155,27 @@ public String executor() { } @Override - public GetSegmentFilesResponse read(StreamInput in) throws IOException { - return new GetSegmentFilesResponse(in); + public CheckpointInfoResponse read(StreamInput in) throws IOException { + return new CheckpointInfoResponse(in); } } ); } - public void testCheckpointInfo() { - final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint); + private void executeGetSegmentFiles(GetSegmentFilesRequest request, ActionListener listener) { transportService.sendRequest( localNode, - SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, + SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, request, - new TransportResponseHandler() { + new TransportResponseHandler() { @Override - public void handleResponse(CheckpointInfoResponse response) { - assertEquals(testCheckpoint, response.getCheckpoint()); - assertNotNull(response.getInfosBytes()); - // CopyStateTests sets up one pending delete file and one committed segments file - assertEquals(1, response.getPendingDeleteFiles().size()); - assertEquals(1, response.getSnapshot().size()); + public void handleResponse(GetSegmentFilesResponse response) { + listener.onResponse(response); } @Override public void handleException(TransportException e) { - fail("unexpected exception: " + e); + listener.onFailure(e); } @Override @@ -151,11 +184,10 @@ public String executor() { } @Override - public CheckpointInfoResponse read(StreamInput in) throws IOException { - return new CheckpointInfoResponse(in); + public GetSegmentFilesResponse read(StreamInput in) throws IOException { + return new GetSegmentFilesResponse(in); } } ); } - } diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index afa38afb0cf2f..a6f0cf7e98411 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -47,7 +47,15 @@ public class CopyStateTests extends IndexShardTestCase { ); public void testCopyStateCreation() throws IOException { - CopyState copyState = new CopyState(createMockIndexShard()); + final IndexShard mockIndexShard = createMockIndexShard(); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( + mockIndexShard.shardId(), + mockIndexShard.getOperationPrimaryTerm(), + 0L, + mockIndexShard.getProcessedLocalCheckpoint(), + 0L + ); + CopyState copyState = new CopyState(testCheckpoint, mockIndexShard); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero