From 00ccfc49f3aa2bf06ddc22ff8f0bc24790b222da Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Tue, 10 Oct 2023 19:54:15 -0500 Subject: [PATCH] Replace multipart download with parallel file download (#10519) There are a few open issues with the multi-stream download approach: - Recovery stats are not being reported correctly - It is incompatible (short of reopening and re-reading the entire file) with the existing Lucene checksum validation logic - There are some issues with integrating it with the pending client side encryption work Given this, I attempted an experiment where I replaced with multi-stream-within-a-single-file approach with simply parallelizing downloads across files (this is how snapshot restore works). I actually got better results with this approach: recovering a ~52GiB shard took about 4.7 minutes with the multi-stream code versus 3.9 minutes with the parallel file approach (r7g.4xlarge EC2 instance, 500MiB/s EBS volume, S3 as remote repository). I think this is the right approach as it leverages the more battle-tested code path and addresses the three issues listed above. The multi-stream approach still has promise as it will allow us to download very large files faster (whereas this approach they can be the long poll on the transfer operation). However, given that 5GB segments (made up of multiple files in practice) are the norm, we generally aren't dealing with huge files. Signed-off-by: Andrew Ross --- .../org/opensearch/index/IndexModule.java | 7 +- .../org/opensearch/index/IndexService.java | 9 +- .../opensearch/index/shard/IndexShard.java | 47 ++---- .../opensearch/index/shard/StoreRecovery.java | 11 +- .../index/store/RemoteDirectory.java | 4 - .../store/RemoteSegmentStoreDirectory.java | 74 +-------- .../RemoteSegmentStoreDirectoryFactory.java | 11 +- .../store/RemoteStoreFileDownloader.java | 147 ++++++++++++++++++ .../opensearch/indices/IndicesService.java | 9 +- .../indices/recovery/RecoverySettings.java | 4 +- .../RemoteStoreReplicationSource.java | 32 +--- .../main/java/org/opensearch/node/Node.java | 6 +- .../blobstore/BlobStoreRepository.java | 6 +- .../opensearch/index/IndexModuleTests.java | 5 +- .../RemoteStoreRefreshListenerTests.java | 4 +- ...moteSegmentStoreDirectoryFactoryTests.java | 7 +- .../RemoteSegmentStoreDirectoryTests.java | 127 +-------------- .../store/RemoteStoreFileDownloaderTests.java | 119 ++++++++++++++ .../snapshots/SnapshotResiliencyTests.java | 5 +- .../index/shard/IndexShardTestCase.java | 11 +- 20 files changed, 333 insertions(+), 312 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 8692876412ea9..e29283724ebf8 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -81,6 +81,7 @@ import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; @@ -602,7 +603,8 @@ public IndexService newIndexService( IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, - Supplier clusterRemoteTranslogBufferIntervalSupplier + Supplier clusterRemoteTranslogBufferIntervalSupplier, + RecoverySettings recoverySettings ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -660,7 +662,8 @@ public IndexService newIndexService( recoveryStateFactory, translogFactorySupplier, clusterDefaultRefreshIntervalSupplier, - clusterRemoteTranslogBufferIntervalSupplier + clusterRemoteTranslogBufferIntervalSupplier, + recoverySettings ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index af23145be9f89..a53dbe246fa44 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -89,13 +89,13 @@ import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; @@ -179,6 +179,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final BiFunction translogFactorySupplier; private final Supplier clusterDefaultRefreshIntervalSupplier; private final Supplier clusterRemoteTranslogBufferIntervalSupplier; + private final RecoverySettings recoverySettings; public IndexService( IndexSettings indexSettings, @@ -213,7 +214,8 @@ public IndexService( IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, - Supplier clusterRemoteTranslogBufferIntervalSupplier + Supplier clusterRemoteTranslogBufferIntervalSupplier, + RecoverySettings recoverySettings ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -290,6 +292,7 @@ public IndexService( this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); this.translogFactorySupplier = translogFactorySupplier; this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier; + this.recoverySettings = recoverySettings; updateFsyncTaskIfNecessary(); } @@ -522,7 +525,7 @@ public synchronized IndexShard createShard( remoteStoreStatsTrackerFactory, clusterRemoteTranslogBufferIntervalSupplier, nodeEnv.nodeId(), - (RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory + recoverySettings ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 251f9a5ae01c0..2643af501bbc9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -62,7 +62,6 @@ import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.PendingReplicationActions; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.DataStream; @@ -160,9 +159,8 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.index.store.RemoteStoreFileDownloader; import org.opensearch.index.store.Store; import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; @@ -184,6 +182,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryListener; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -201,7 +200,6 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -342,7 +340,7 @@ Runnable getGlobalCheckpointSyncer() { private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory; private final List internalRefreshListener = new ArrayList<>(); - private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; + private final RemoteStoreFileDownloader fileDownloader; public IndexShard( final ShardRouting shardRouting, @@ -371,10 +369,7 @@ public IndexShard( final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, final Supplier clusterRemoteTranslogBufferIntervalSupplier, final String nodeId, - // Wiring a directory factory here breaks some intended abstractions, but this remote directory - // factory is used not as a Lucene directory but instead to copy files from a remote store when - // restoring a shallow snapshot. - @Nullable final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory + final RecoverySettings recoverySettings ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -470,7 +465,7 @@ public boolean shouldCache(Query query) { ? false : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory; - this.remoteSegmentStoreDirectoryFactory = remoteSegmentStoreDirectoryFactory; + this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); } public ThreadPool getThreadPool() { @@ -568,6 +563,10 @@ public String getNodeId() { return translogConfig.getNodeId(); } + public RemoteStoreFileDownloader getFileDownloader() { + return fileDownloader; + } + @Override public void updateShardState( final ShardRouting newRouting, @@ -2706,7 +2705,7 @@ public void restoreFromRemoteStore(ActionListener listener) { public void restoreFromSnapshotAndRemoteStore( Repository repository, - RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, + RepositoriesService repositoriesService, ActionListener listener ) { try { @@ -2714,7 +2713,7 @@ public void restoreFromSnapshotAndRemoteStore( assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, remoteSegmentStoreDirectoryFactory, listener); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool); } catch (Exception e) { listener.onFailure(e); } @@ -3554,7 +3553,7 @@ public void startRecovery( "from snapshot and remote store", recoveryState, recoveryListener, - l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), remoteSegmentStoreDirectoryFactory, l) + l -> restoreFromSnapshotAndRemoteStore(repositoriesService.repository(repo), repositoriesService, l) ); // indicesService.indexService(shardRouting.shardId().getIndex()).addMetadataListener(); } else { @@ -4912,7 +4911,7 @@ private String copySegmentFiles( if (toDownloadSegments.isEmpty() == false) { try { - downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); + fileDownloader.download(sourceRemoteDirectory, storeDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); } catch (Exception e) { throw new IOException("Error occurred when downloading segments from remote store", e); } @@ -4925,26 +4924,6 @@ private String copySegmentFiles( return segmentNFile; } - private void downloadSegments( - Directory storeDirectory, - RemoteSegmentStoreDirectory sourceRemoteDirectory, - RemoteSegmentStoreDirectory targetRemoteDirectory, - Set toDownloadSegments, - final Runnable onFileSync - ) throws IOException { - final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); - final DirectoryFileTransferTracker fileTransferTracker = store.getDirectoryFileTransferTracker(); - for (String segment : toDownloadSegments) { - final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - sourceRemoteDirectory.copyTo(segment, storeDirectory, indexPath, fileTransferTracker, segmentListener); - segmentListener.actionGet(); - onFileSync.run(); - if (targetRemoteDirectory != null) { - targetRemoteDirectory.copyFrom(storeDirectory, segment, segment, IOContext.DEFAULT); - } - } - } - private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { if (checksum == CodecUtil.retrieveChecksum(indexInput)) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 762aab51469d0..c0211e1257c8e 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -70,7 +70,9 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.FileChannel; @@ -360,8 +362,9 @@ void recoverFromRepository(final IndexShard indexShard, Repository repository, A void recoverFromSnapshotAndRemoteStore( final IndexShard indexShard, Repository repository, - RemoteSegmentStoreDirectoryFactory directoryFactory, - ActionListener listener + RepositoriesService repositoriesService, + ActionListener listener, + ThreadPool threadPool ) { try { if (canRecover(indexShard)) { @@ -389,6 +392,10 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); } + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool + ); RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( remoteStoreRepository, indexUUID, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 70a88f6159764..345583bbbd1be 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -336,10 +336,6 @@ public boolean copyFrom( return false; } - protected UnaryOperator getDownloadRateLimiter() { - return downloadRateLimiter; - } - private void uploadBlob( Directory from, String src, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c0b302c68c0ea..be1f2341236ab 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -24,8 +24,6 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Version; import org.opensearch.common.UUIDs; -import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.logging.Loggers; @@ -38,7 +36,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; @@ -46,7 +43,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -93,8 +89,6 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final ThreadPool threadPool; - private final RecoverySettings recoverySettings; - /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. @@ -127,15 +121,13 @@ public RemoteSegmentStoreDirectory( RemoteDirectory remoteMetadataDirectory, RemoteStoreLockManager mdLockManager, ThreadPool threadPool, - ShardId shardId, - RecoverySettings recoverySettings + ShardId shardId ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; this.threadPool = threadPool; - this.recoverySettings = recoverySettings; this.logger = Loggers.getLogger(getClass(), shardId); init(); } @@ -473,70 +465,6 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } } - /** - * Copies an existing {@code source} file from this directory to a non-existent file (also - * named {@code source}) in either {@code destinationDirectory} or {@code destinationPath}. - * If the blob container backing this directory supports multipart downloads, the {@code source} - * file will be downloaded (potentially in multiple concurrent parts) directly to - * {@code destinationPath}. This method will return immediately and {@code fileCompletionListener} - * will be notified upon completion. - *

- * If multipart downloads are not supported, then {@code source} file will be copied to a file named - * {@code source} in a single part to {@code destinationDirectory}. The download will happen on the - * calling thread and {@code fileCompletionListener} will be notified synchronously before this - * method returns. - * - * @param source The source file name - * @param destinationDirectory The destination directory (if multipart is not supported) - * @param destinationPath The destination path (if multipart is supported) - * @param fileTransferTracker Tracker used for file transfer stats - * @param fileCompletionListener The listener to notify of completion - */ - public void copyTo( - String source, - Directory destinationDirectory, - Path destinationPath, - DirectoryFileTransferTracker fileTransferTracker, - ActionListener fileCompletionListener - ) { - final String blobName = getExistingRemoteFilename(source); - if (destinationPath != null && remoteDataDirectory.getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { - long length = 0L; - try { - length = fileLength(source); - } catch (IOException ex) { - logger.error("Unable to fetch segment length for stats tracking", ex); - } - final long fileLength = length; - final long startTime = System.currentTimeMillis(); - fileTransferTracker.addTransferredBytesStarted(fileLength); - final AsyncMultiStreamBlobContainer blobContainer = (AsyncMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer(); - final Path destinationFilePath = destinationPath.resolve(source); - final ActionListener completionListener = ActionListener.wrap(response -> { - fileTransferTracker.addTransferredBytesSucceeded(fileLength, startTime); - fileCompletionListener.onResponse(response); - }, e -> { - fileTransferTracker.addTransferredBytesFailed(fileLength, startTime); - fileCompletionListener.onFailure(e); - }); - final ReadContextListener readContextListener = new ReadContextListener( - blobName, - destinationFilePath, - completionListener, - threadPool, - remoteDataDirectory.getDownloadRateLimiter(), - recoverySettings.getMaxConcurrentRemoteStoreStreams() - ); - blobContainer.readBlobAsync(blobName, readContextListener); - } else { - // Fallback to older mechanism of downloading the file - ActionListener.completeWith(fileCompletionListener, () -> { - destinationDirectory.copyFrom(this, source, source, IOContext.DEFAULT); - return source; - }); - } - } - /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index cc55380894ecd..a5e89ec6a8327 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -15,7 +15,6 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -35,18 +34,12 @@ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.Dire private static final String SEGMENTS = "segments"; private final Supplier repositoriesService; - private final RecoverySettings recoverySettings; private final ThreadPool threadPool; - public RemoteSegmentStoreDirectoryFactory( - Supplier repositoriesService, - ThreadPool threadPool, - RecoverySettings recoverySettings - ) { + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; this.threadPool = threadPool; - this.recoverySettings = recoverySettings; } @Override @@ -78,7 +71,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s String.valueOf(shardId.id()) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId, recoverySettings); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java new file mode 100644 index 0000000000000..4fc721f2b96b5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java @@ -0,0 +1,147 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.concurrent.UncategorizedExecutionException; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; + +/** + * Helper class to downloads files from a {@link RemoteSegmentStoreDirectory} + * instance to a local {@link Directory} instance in parallel depending on thread + * pool size and recovery settings. + */ +@InternalApi +public final class RemoteStoreFileDownloader { + private final Logger logger; + private final ThreadPool threadPool; + private final RecoverySettings recoverySettings; + + public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, RecoverySettings recoverySettings) { + this.logger = Loggers.getLogger(RemoteStoreFileDownloader.class, shardId); + this.threadPool = threadPool; + this.recoverySettings = recoverySettings; + } + + /** + * Copies the given segments from the remote segment store to the given + * local directory. + * @param source The remote directory to copy segment files from + * @param destination The local directory to copy segment files to + * @param toDownloadSegments The list of segment files to download + */ + public void download(Directory source, Directory destination, Collection toDownloadSegments) throws IOException { + downloadInternal(source, destination, null, toDownloadSegments, () -> {}); + } + + /** + * Copies the given segments from the remote segment store to the given + * local directory, while also copying the segments _to_ another remote directory. + * @param source The remote directory to copy segment files from + * @param destination The local directory to copy segment files to + * @param secondDestination The second remote directory that segment files are + * copied to after being copied to the local directory + * @param toDownloadSegments The list of segment files to download + * @param onFileCompletion A generic runnable that is invoked after each file download. + * Must be thread safe as this may be invoked concurrently from + * different threads. + */ + public void download( + Directory source, + Directory destination, + Directory secondDestination, + Collection toDownloadSegments, + Runnable onFileCompletion + ) throws IOException { + downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion); + } + + private void downloadInternal( + Directory source, + Directory destination, + @Nullable Directory secondDestination, + Collection toDownloadSegments, + Runnable onFileCompletion + ) throws IOException { + final Queue queue = new ConcurrentLinkedQueue<>(toDownloadSegments); + // Choose the minimum of: + // - number of files to download + // - max thread pool size + // - "indices.recovery.max_concurrent_remote_store_streams" setting + final int threads = Math.min( + toDownloadSegments.size(), + Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams()) + ); + logger.trace("Starting download of {} files with {} threads", queue.size(), threads); + final PlainActionFuture> listener = PlainActionFuture.newFuture(); + final ActionListener allFilesListener = new GroupedActionListener<>(listener, threads); + for (int i = 0; i < threads; i++) { + copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener); + } + try { + listener.actionGet(); + } catch (UncategorizedExecutionException e) { + // Any IOException will be double-wrapped so dig it out and throw it + if (e.getCause() instanceof ExecutionException) { + if (e.getCause().getCause() instanceof IOException) { + throw (IOException) e.getCause().getCause(); + } + } + throw e; + } + } + + private void copyOneFile( + Directory source, + Directory destination, + @Nullable Directory secondDestination, + Queue queue, + Runnable onFileCompletion, + ActionListener listener + ) { + final String file = queue.poll(); + if (file == null) { + // Queue is empty, so notify listener we are done + listener.onResponse(null); + } else { + threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY).submit(() -> { + logger.trace("Downloading file {}", file); + try { + destination.copyFrom(source, file, file, IOContext.DEFAULT); + onFileCompletion.run(); + if (secondDestination != null) { + secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT); + } + } catch (Exception e) { + // Clear the queue to stop any future processing, report the failure, then return + queue.clear(); + listener.onFailure(e); + return; + } + copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener); + }); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..e27a597007b62 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -144,6 +144,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; @@ -335,6 +336,7 @@ public class IndicesService extends AbstractLifecycleComponent private final CountDownLatch closeLatch = new CountDownLatch(1); private volatile boolean idFieldDataEnabled; private volatile boolean allowExpensiveQueries; + private final RecoverySettings recoverySettings; @Nullable private final OpenSearchThreadPoolExecutor danglingIndicesThreadPoolExecutor; @@ -380,7 +382,8 @@ public IndicesService( Supplier repositoriesServiceSupplier, FileCacheCleaner fileCacheCleaner, SearchRequestStats searchRequestStats, - @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + RecoverySettings recoverySettings ) { this.settings = settings; this.threadPool = threadPool; @@ -477,6 +480,7 @@ protected void closeInternal() { this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setClusterRemoteTranslogBufferInterval); + this.recoverySettings = recoverySettings; } /** @@ -874,7 +878,8 @@ private synchronized IndexService createIndexService( remoteDirectoryFactory, translogFactorySupplier, this::getClusterDefaultRefreshInterval, - this::getClusterRemoteTranslogBufferInterval + this::getClusterRemoteTranslogBufferInterval, + this.recoverySettings ); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index ed9755bf824ea..44dfb2f4cb00a 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -85,11 +85,11 @@ public class RecoverySettings { ); /** - * Controls the maximum number of streams that can be started concurrently when downloading from the remote store. + * Controls the maximum number of streams that can be started concurrently per recovery when downloading from the remote store. */ public static final Setting INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING = Setting.intSetting( "indices.recovery.max_concurrent_remote_store_streams", - 20, + 10, 1, Property.Dynamic, Property.NodeScope diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index ddbcb86269aa9..d2000a56401f5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -14,20 +14,16 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.Version; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -109,49 +105,31 @@ public void getSegmentFiles( logger.debug("Downloading segment files from remote store {}", filesToFetch); RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); - List toDownloadSegments = new ArrayList<>(); Collection directoryFiles = List.of(indexShard.store().directory().listAll()); if (remoteSegmentMetadata != null) { try { indexShard.store().incRef(); indexShard.remoteStore().incRef(); final Directory storeDirectory = indexShard.store().directory(); - final ShardPath shardPath = indexShard.shardPath(); + final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; - toDownloadSegments.add(fileMetadata); + toDownloadSegmentNames.add(file); } - final DirectoryFileTransferTracker fileTransferTracker = indexShard.store().getDirectoryFileTransferTracker(); - downloadSegments(storeDirectory, remoteDirectory, toDownloadSegments, shardPath, fileTransferTracker, listener); - logger.debug("Downloaded segment files from remote store {}", toDownloadSegments); + indexShard.getFileDownloader().download(remoteDirectory, storeDirectory, toDownloadSegmentNames); + logger.debug("Downloaded segment files from remote store {}", filesToFetch); } finally { indexShard.store().decRef(); indexShard.remoteStore().decRef(); } } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } catch (Exception e) { listener.onFailure(e); } } - private void downloadSegments( - Directory storeDirectory, - RemoteSegmentStoreDirectory remoteStoreDirectory, - List toDownloadSegments, - ShardPath shardPath, - DirectoryFileTransferTracker fileTransferTracker, - ActionListener completionListener - ) { - final Path indexPath = shardPath == null ? null : shardPath.resolveIndex(); - for (StoreFileMetadata storeFileMetadata : toDownloadSegments) { - final PlainActionFuture segmentListener = PlainActionFuture.newFuture(); - remoteStoreDirectory.copyTo(storeFileMetadata.name(), storeDirectory, indexPath, fileTransferTracker, segmentListener); - segmentListener.actionGet(); - } - completionListener.onResponse(new GetSegmentFilesResponse(toDownloadSegments)); - } - @Override public String getDescription() { return "RemoteStoreReplicationSource"; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index f44c8c8bea7f1..5b3b064a47c66 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -771,8 +771,7 @@ protected Node( final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( repositoriesServiceReference::get, - threadPool, - recoverySettings + threadPool ); final SearchRequestStats searchRequestStats = new SearchRequestStats(); @@ -803,7 +802,8 @@ protected Node( repositoriesServiceReference::get, fileCacheCleaner, searchRequestStats, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + recoverySettings ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index bf85e19493203..8e6649c366eeb 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1144,8 +1144,7 @@ private void executeStaleShardDelete( // see https://github.com/opensearch-project/OpenSearch/issues/8469 new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool, - recoverySettings + threadPool ).newDirectory( remoteStoreRepoForIndex, indexUUID, @@ -1615,8 +1614,7 @@ private void executeOneStaleIndexDelete( // see https://github.com/opensearch-project/OpenSearch/issues/8469 new RemoteSegmentStoreDirectoryFactory( remoteStoreLockManagerFactory.getRepositoriesService(), - threadPool, - recoverySettings + threadPool ).newDirectory( remoteStoreRepoForIndex, indexUUID, diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index bbd73bcf97aab..97bc822be7d51 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -258,10 +258,11 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, DefaultRecoverySettings.INSTANCE), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier, () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, - () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + DefaultRecoverySettings.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 941f2f48e71af..5a13f57db2c87 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -33,7 +33,6 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -156,8 +155,7 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { remoteMetadataDirectory, mock(RemoteStoreLockManager.class), mock(ThreadPool.class), - shardId, - DefaultRecoverySettings.INSTANCE + shardId ); FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 78c7fe64cebd9..cad5e47531cc6 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -20,7 +20,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -58,11 +57,7 @@ public void setup() { repositoriesService = mock(RepositoriesService.class); threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( - repositoriesServiceSupplier, - threadPool, - DefaultRecoverySettings.INSTANCE - ); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } public void testNewDirectory() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 3e45699ef36eb..36cfd84ff960a 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -25,9 +25,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; -import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; -import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -42,7 +40,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -51,18 +48,15 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.UnaryOperator; import org.mockito.Mockito; @@ -72,8 +66,6 @@ import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.hamcrest.CoreMatchers.is; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -151,14 +143,12 @@ public void setup() throws IOException { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId(), - DefaultRecoverySettings.INSTANCE + indexShard.shardId() ); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } - when(remoteDataDirectory.getDownloadRateLimiter()).thenReturn(UnaryOperator.identity()); when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); } @@ -568,118 +558,6 @@ public void onFailure(Exception e) {} storeDirectory.close(); } - public void testCopyFilesToMultipart() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); - - Mockito.doAnswer(invocation -> { - ActionListener completionListener = invocation.getArgument(1); - final CompletableFuture future = new CompletableFuture<>(); - future.complete(new InputStreamContainer(new ByteArrayInputStream(new byte[] { 42 }), 0, 1)); - completionListener.onResponse(new ReadContext(1, List.of(() -> future), "")); - return null; - }).when(blobContainer).readBlobAsync(any(), any()); - - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - Path path = createTempDir(); - DirectoryFileTransferTracker directoryFileTransferTracker = new DirectoryFileTransferTracker(); - long sourceFileLengthInBytes = remoteSegmentStoreDirectory.fileLength(filename); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, directoryFileTransferTracker, completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.SECONDS)); - verify(blobContainer, times(1)).readBlobAsync(contains(filename), any()); - verify(storeDirectory, times(0)).copyFrom(any(), any(), any(), any()); - - // Verify stats are updated to DirectoryFileTransferTracker - assertEquals(sourceFileLengthInBytes, directoryFileTransferTracker.getTransferredBytesSucceeded()); - } - - public void testCopyFilesTo() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - - public void testCopyFilesToEmptyPath() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); - when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); - - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - downloadLatch.countDown(); - } - - @Override - public void onFailure(Exception e) {} - }; - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, null, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - - public void testCopyFilesToException() throws Exception { - String filename = "_0.cfe"; - populateMetadata(); - remoteSegmentStoreDirectory.init(); - - Directory storeDirectory = mock(Directory.class); - Mockito.doThrow(new IOException()) - .when(storeDirectory) - .copyFrom(any(Directory.class), anyString(), anyString(), any(IOContext.class)); - CountDownLatch downloadLatch = new CountDownLatch(1); - ActionListener completionListener = new ActionListener<>() { - @Override - public void onResponse(String unused) { - - } - - @Override - public void onFailure(Exception e) { - downloadLatch.countDown(); - } - }; - Path path = createTempDir(); - remoteSegmentStoreDirectory.copyTo(filename, storeDirectory, path, new DirectoryFileTransferTracker(), completionListener); - assertTrue(downloadLatch.await(5000, TimeUnit.MILLISECONDS)); - verify(storeDirectory, times(1)).copyFrom(any(), eq(filename), eq(filename), eq(IOContext.DEFAULT)); - } - public void testCopyFilesFromMultipartIOException() throws Exception { String filename = "_100.si"; AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); @@ -689,8 +567,7 @@ public void testCopyFilesFromMultipartIOException() throws Exception { remoteMetadataDirectory, mdLockManager, threadPool, - indexShard.shardId(), - DefaultRecoverySettings.INSTANCE + indexShard.shardId() ); populateMetadata(); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java new file mode 100644 index 0000000000000..588d9e8bb13a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.NIOFSDirectory; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class RemoteStoreFileDownloaderTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private Directory source; + private Directory destination; + private Directory secondDestination; + private RemoteStoreFileDownloader fileDownloader; + private Map files = new HashMap<>(); + + @Before + public void setup() throws IOException { + final int streamLimit = randomIntBetween(1, 20); + final RecoverySettings recoverySettings = new RecoverySettings( + Settings.builder().put("indices.recovery.max_concurrent_remote_store_streams", streamLimit).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + threadPool = new TestThreadPool(getTestName()); + source = new NIOFSDirectory(createTempDir()); + destination = new NIOFSDirectory(createTempDir()); + secondDestination = new NIOFSDirectory(createTempDir()); + for (int i = 0; i < 10; i++) { + final String filename = "file_" + i; + final int content = randomInt(); + try (IndexOutput output = source.createOutput(filename, IOContext.DEFAULT)) { + output.writeInt(content); + } + files.put(filename, content); + } + fileDownloader = new RemoteStoreFileDownloader( + ShardId.fromString("[RemoteStoreFileDownloaderTests][0]"), + threadPool, + recoverySettings + ); + } + + @After + public void stopThreadPool() throws Exception { + threadPool.shutdown(); + assertTrue(threadPool.awaitTermination(5, TimeUnit.SECONDS)); + } + + public void testDownload() throws IOException { + fileDownloader.download(source, destination, files.keySet()); + assertContent(files, destination); + } + + public void testDownloadWithSecondDestination() throws IOException { + fileDownloader.download(source, destination, secondDestination, files.keySet(), () -> {}); + assertContent(files, destination); + assertContent(files, secondDestination); + } + + public void testDownloadWithFileCompletionHandler() throws IOException { + final AtomicInteger counter = new AtomicInteger(0); + fileDownloader.download(source, destination, null, files.keySet(), counter::incrementAndGet); + assertContent(files, destination); + assertEquals(files.size(), counter.get()); + } + + public void testDownloadNonExistentFile() { + assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, Set.of("not real"))); + } + + public void testDownloadExtraNonExistentFile() { + List filesWithExtra = new ArrayList<>(files.keySet()); + filesWithExtra.add("not real"); + assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, filesWithExtra)); + } + + private static void assertContent(Map expected, Directory destination) throws IOException { + // Note that Lucene will randomly write extra files (see org.apache.lucene.tests.mockfile.ExtraFS) + // so we just need to check that all the expected files are present but not that _only_ the expected + // files are present + final Set actualFiles = Set.of(destination.listAll()); + for (String file : expected.keySet()) { + assertTrue(actualFiles.contains(file)); + try (IndexInput input = destination.openInput(file, IOContext.DEFAULT)) { + assertEquals(expected.get(file), Integer.valueOf(input.readInt())); + assertThrows(EOFException.class, input::readByte); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 80731b378f369..97c5d23831965 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2067,11 +2067,12 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, DefaultRecoverySettings.INSTANCE), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner, null, - new RemoteStoreStatsTrackerFactory(clusterService, settings) + new RemoteStoreStatsTrackerFactory(clusterService, settings), + DefaultRecoverySettings.INSTANCE ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 186c1c7e78f6b..ffa37817e8a03 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -702,7 +702,7 @@ protected IndexShard newShard( remoteStoreStatsTrackerFactory, () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", - null + DefaultRecoverySettings.INSTANCE ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) { @@ -789,14 +789,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory( - dataDirectory, - metadataDirectory, - remoteStoreLockManager, - threadPool, - shardId, - DefaultRecoverySettings.INSTANCE - ); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException {