From fe1b7f1091f8ddac00a610d71e2a200e17c3a56d Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 18 Jul 2024 09:54:52 +0100 Subject: [PATCH] Combine per-repo results in get-snapshots action (#111004) With #107191 we can now safely accumulate results from all targetted repositories as they're built, rather than staging each repository's results in intermediate lists in case of failure. --- .../get/TransportGetSnapshotsAction.java | 30 ++++--------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 384a004861776..c4f3f3cddf2ec 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -53,14 +53,12 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; @@ -182,19 +180,13 @@ private class GetSnapshotsOperation { private final GetSnapshotInfoExecutor getSnapshotInfoExecutor; // results - private final Queue> allSnapshotInfos = ConcurrentCollections.newQueue(); + private final List allSnapshotInfos = Collections.synchronizedList(new ArrayList<>()); /** * Accumulates number of snapshots that match the name/fromSortValue/slmPolicy predicates, to be returned in the response. */ private final AtomicInteger totalCount = new AtomicInteger(); - /** - * Accumulates the number of snapshots that match the name/fromSortValue/slmPolicy/after predicates, for sizing the final result - * list. - */ - private final AtomicInteger resultsCount = new AtomicInteger(); - GetSnapshotsOperation( CancellableTask cancellableTask, List repositories, @@ -438,18 +430,7 @@ private void loadSnapshotInfos(Iterator asyncSnapshotInfoIter if (cancellableTask.notifyIfCancelled(listener)) { return; } - final var repositoryTotalCount = new AtomicInteger(); - - final List snapshots = new ArrayList<>(); - final List syncSnapshots = Collections.synchronizedList(snapshots); - try (var listeners = new RefCountingListener(listener)) { - final var iterationCompleteListener = listeners.acquire(ignored -> { - totalCount.addAndGet(repositoryTotalCount.get()); - // no need to synchronize access to snapshots: all writes happen-before this read - resultsCount.addAndGet(snapshots.size()); - allSnapshotInfos.add(snapshots); - }); ThrottledIterator.run( Iterators.failFast(asyncSnapshotInfoIterator, () -> cancellableTask.isCancelled() || listeners.isFailing()), (ref, asyncSnapshotInfo) -> { @@ -458,9 +439,9 @@ private void loadSnapshotInfos(Iterator asyncSnapshotInfoIter @Override public void onResponse(SnapshotInfo snapshotInfo) { if (matchesPredicates(snapshotInfo)) { - repositoryTotalCount.incrementAndGet(); + totalCount.incrementAndGet(); if (afterPredicate.test(snapshotInfo)) { - syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); + allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); } } refListener.onResponse(null); @@ -479,7 +460,7 @@ public void onFailure(Exception e) { }, getSnapshotInfoExecutor.getMaxRunningTasks(), () -> {}, - () -> iterationCompleteListener.onResponse(null) + () -> {} ); } } @@ -489,12 +470,11 @@ private GetSnapshotsResponse buildResponse() { cancellableTask.ensureNotCancelled(); int remaining = 0; final var resultsStream = allSnapshotInfos.stream() - .flatMap(Collection::stream) .peek(this::assertSatisfiesAllPredicates) .sorted(sortBy.getSnapshotInfoComparator(order)) .skip(offset); final List snapshotInfos; - if (size == GetSnapshotsRequest.NO_LIMIT || resultsCount.get() <= size) { + if (size == GetSnapshotsRequest.NO_LIMIT || allSnapshotInfos.size() <= size) { snapshotInfos = resultsStream.toList(); } else { snapshotInfos = new ArrayList<>(size);