Skip to content

Commit

Permalink
Combine per-repo results in get-snapshots action (#111004)
Browse files Browse the repository at this point in the history
With #107191 we can now safely accumulate results from all targetted
repositories as they're built, rather than staging each repository's
results in intermediate lists in case of failure.
  • Loading branch information
DaveCTurner authored Jul 18, 2024
1 parent 6749371 commit fe1b7f1
Showing 1 changed file with 5 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -182,19 +180,13 @@ private class GetSnapshotsOperation {
private final GetSnapshotInfoExecutor getSnapshotInfoExecutor;

// results
private final Queue<List<SnapshotInfo>> allSnapshotInfos = ConcurrentCollections.newQueue();
private final List<SnapshotInfo> allSnapshotInfos = Collections.synchronizedList(new ArrayList<>());

/**
* Accumulates number of snapshots that match the name/fromSortValue/slmPolicy predicates, to be returned in the response.
*/
private final AtomicInteger totalCount = new AtomicInteger();

/**
* Accumulates the number of snapshots that match the name/fromSortValue/slmPolicy/after predicates, for sizing the final result
* list.
*/
private final AtomicInteger resultsCount = new AtomicInteger();

GetSnapshotsOperation(
CancellableTask cancellableTask,
List<RepositoryMetadata> repositories,
Expand Down Expand Up @@ -438,18 +430,7 @@ private void loadSnapshotInfos(Iterator<AsyncSnapshotInfo> asyncSnapshotInfoIter
if (cancellableTask.notifyIfCancelled(listener)) {
return;
}
final var repositoryTotalCount = new AtomicInteger();

final List<SnapshotInfo> snapshots = new ArrayList<>();
final List<SnapshotInfo> syncSnapshots = Collections.synchronizedList(snapshots);

try (var listeners = new RefCountingListener(listener)) {
final var iterationCompleteListener = listeners.acquire(ignored -> {
totalCount.addAndGet(repositoryTotalCount.get());
// no need to synchronize access to snapshots: all writes happen-before this read
resultsCount.addAndGet(snapshots.size());
allSnapshotInfos.add(snapshots);
});
ThrottledIterator.run(
Iterators.failFast(asyncSnapshotInfoIterator, () -> cancellableTask.isCancelled() || listeners.isFailing()),
(ref, asyncSnapshotInfo) -> {
Expand All @@ -458,9 +439,9 @@ private void loadSnapshotInfos(Iterator<AsyncSnapshotInfo> asyncSnapshotInfoIter
@Override
public void onResponse(SnapshotInfo snapshotInfo) {
if (matchesPredicates(snapshotInfo)) {
repositoryTotalCount.incrementAndGet();
totalCount.incrementAndGet();
if (afterPredicate.test(snapshotInfo)) {
syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices));
allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices));
}
}
refListener.onResponse(null);
Expand All @@ -479,7 +460,7 @@ public void onFailure(Exception e) {
},
getSnapshotInfoExecutor.getMaxRunningTasks(),
() -> {},
() -> iterationCompleteListener.onResponse(null)
() -> {}
);
}
}
Expand All @@ -489,12 +470,11 @@ private GetSnapshotsResponse buildResponse() {
cancellableTask.ensureNotCancelled();
int remaining = 0;
final var resultsStream = allSnapshotInfos.stream()
.flatMap(Collection::stream)
.peek(this::assertSatisfiesAllPredicates)
.sorted(sortBy.getSnapshotInfoComparator(order))
.skip(offset);
final List<SnapshotInfo> snapshotInfos;
if (size == GetSnapshotsRequest.NO_LIMIT || resultsCount.get() <= size) {
if (size == GetSnapshotsRequest.NO_LIMIT || allSnapshotInfos.size() <= size) {
snapshotInfos = resultsStream.toList();
} else {
snapshotInfos = new ArrayList<>(size);
Expand Down

0 comments on commit fe1b7f1

Please sign in to comment.