Skip to content

Commit

Permalink
Simplify Shard Snapshot Upload Code (elastic#48155)
Browse files Browse the repository at this point in the history
The code here was needlessly complicated when it
enqueued all file uploads up-front. Instead, we can
go with a cleaner worker + queue pattern here by taking
the max-parallelism from the threadpool info.

Also, I slightly simplified the rethrow and
listener (step listener is pointless when you add the callback in the next line)
handling it since I noticed that we were needlessly rethrowing in the same
code and that wasn't worth a separate PR.
  • Loading branch information
original-brownbear committed Oct 22, 2019
1 parent 60d8ecb commit e82ceee
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -999,11 +1001,10 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
final StepListener<Void> snapshotDoneListener = new StepListener<>();
snapshotDoneListener.whenComplete(listener::onResponse, e -> {
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e));
listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e
: new IndexShardSnapshotFailedException(store.shardId(), e));

final ActionListener<Void> snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> {
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e));
listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e));
});
try {
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
Expand All @@ -1026,7 +1027,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
}

final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
ArrayList<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new ArrayList<>();
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
store.incRef();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
Expand Down Expand Up @@ -1147,42 +1148,29 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
allFilesUploadedListener.onResponse(Collections.emptyList());
return;
}
final GroupedActionListener<Void> filesListener =
new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
// Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting
final AtomicBoolean alreadyFailed = new AtomicBoolean();
for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
executor.execute(new ActionRunnable<Void>(filesListener) {
@Override
protected void doRun() {
// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
final ActionListener<Void> filesListener = ActionListener.delegateResponse(
new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> {
filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception
l.onFailure(e);
});
for (int i = 0; i < workers; ++i) {
executor.execute(ActionRunnable.run(filesListener, () -> {
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
if (snapshotFileInfo != null) {
store.incRef();
try {
if (alreadyFailed.get() == false) {
if (store.tryIncRef()) {
try {
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
} finally {
store.decRef();
}
} else if (snapshotStatus.isAborted()) {
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
} else {
assert false : "Store was closed before aborting the snapshot";
throw new IllegalStateException("Store is closed already");
}
}
filesListener.onResponse(null);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
do {
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
} while (snapshotFileInfo != null);
} finally {
store.decRef();
}
}

@Override
public void onFailure(Exception e) {
alreadyFailed.set(true);
super.onFailure(e);
}
});
}));
}
} catch (Exception e) {
snapshotDoneListener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public void testOverwriteSnapshotInfoBlob() {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), threadPool, blobStoreContext)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
Expand Down Expand Up @@ -288,6 +290,8 @@ public ThreadPool getThreadPool() {
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
return new ThreadPool(settings) {

private final Map<String, ThreadPool.Info> infos = new HashMap<>();

{
stopCachedTimeThread();
}
Expand All @@ -309,7 +313,7 @@ public ThreadPoolInfo info() {

@Override
public Info info(String name) {
throw new UnsupportedOperationException();
return infos.computeIfAbsent(name, n -> new Info(n, ThreadPoolType.FIXED, random.nextInt(10) + 1));
}

@Override
Expand Down

0 comments on commit e82ceee

Please sign in to comment.