diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index ad8dcf7085bf2..159f9df239e94 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -108,8 +108,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -999,11 +1001,10 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); - final StepListener snapshotDoneListener = new StepListener<>(); - snapshotDoneListener.whenComplete(listener::onResponse, e -> { - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e)); - listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e - : new IndexShardSnapshotFailedException(store.shardId(), e)); + + final ActionListener snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> { + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e)); + listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e)); }); try { logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); @@ -1026,7 +1027,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } final List indexCommitPointFiles = new ArrayList<>(); - ArrayList filesToSnapshot = new ArrayList<>(); + final BlockingQueue filesToSnapshot = new LinkedBlockingQueue<>(); store.incRef(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; @@ -1147,42 +1148,29 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s allFilesUploadedListener.onResponse(Collections.emptyList()); return; } - final GroupedActionListener filesListener = - new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - // Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting - final AtomicBoolean alreadyFailed = new AtomicBoolean(); - for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { - executor.execute(new ActionRunnable(filesListener) { - @Override - protected void doRun() { + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount); + final ActionListener filesListener = ActionListener.delegateResponse( + new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> { + filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception + l.onFailure(e); + }); + for (int i = 0; i < workers; ++i) { + executor.execute(ActionRunnable.run(filesListener, () -> { + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); + if (snapshotFileInfo != null) { + store.incRef(); try { - if (alreadyFailed.get() == false) { - if (store.tryIncRef()) { - try { - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); - } finally { - store.decRef(); - } - } else if (snapshotStatus.isAborted()) { - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } else { - assert false : "Store was closed before aborting the snapshot"; - throw new IllegalStateException("Store is closed already"); - } - } - filesListener.onResponse(null); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); + do { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); + } while (snapshotFileInfo != null); + } finally { + store.decRef(); } } - - @Override - public void onFailure(Exception e) { - alreadyFailed.set(true); - super.onFailure(e); - } - }); + })); } } catch (Exception e) { snapshotDoneListener.onFailure(e); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 9d94fc3ed4368..ee766ef7360b5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -138,6 +138,8 @@ public void testOverwriteSnapshotInfoBlob() { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); final ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); + when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn( + new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), xContentRegistry(), threadPool, blobStoreContext)) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 4567b97700604..0837f431fff9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -30,7 +30,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; @@ -288,6 +290,8 @@ public ThreadPool getThreadPool() { public ThreadPool getThreadPool(Function runnableWrapper) { return new ThreadPool(settings) { + private final Map infos = new HashMap<>(); + { stopCachedTimeThread(); } @@ -309,7 +313,7 @@ public ThreadPoolInfo info() { @Override public Info info(String name) { - throw new UnsupportedOperationException(); + return infos.computeIfAbsent(name, n -> new Info(n, ThreadPoolType.FIXED, random.nextInt(10) + 1)); } @Override