From d275b24c92a8095adbf2134463b19f6b14e6e4f7 Mon Sep 17 00:00:00 2001
From: David Turner <david.turner@elastic.co>
Date: Thu, 18 Jul 2024 06:16:50 +0100
Subject: [PATCH] Combine per-repo results in get-snapshots action

With #107191 we can now safely accumulate results from all targetted
repositories as they're built, rather than staging each repository's
results in intermediate lists in case of failure.
---
 .../get/TransportGetSnapshotsAction.java      | 30 ++++---------------
 1 file changed, 5 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
index d36cf7bf08b1f..05748525178e9 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
@@ -53,14 +53,12 @@
 import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiPredicate;
@@ -182,19 +180,13 @@ private class GetSnapshotsOperation {
         private final GetSnapshotInfoExecutor getSnapshotInfoExecutor;
 
         // results
-        private final Queue<List<SnapshotInfo>> allSnapshotInfos = ConcurrentCollections.newQueue();
+        private final List<SnapshotInfo> allSnapshotInfos = Collections.synchronizedList(new ArrayList<>());
 
         /**
          * Accumulates number of snapshots that match the name/fromSortValue/slmPolicy predicates, to be returned in the response.
          */
         private final AtomicInteger totalCount = new AtomicInteger();
 
-        /**
-         * Accumulates the number of snapshots that match the name/fromSortValue/slmPolicy/after predicates, for sizing the final result
-         * list.
-         */
-        private final AtomicInteger resultsCount = new AtomicInteger();
-
         GetSnapshotsOperation(
             CancellableTask cancellableTask,
             List<RepositoryMetadata> repositories,
@@ -454,18 +446,7 @@ private void loadSnapshotInfos(Iterator<AsyncSnapshotInfo> asyncSnapshotInfoIter
             if (cancellableTask.notifyIfCancelled(listener)) {
                 return;
             }
-            final var repositoryTotalCount = new AtomicInteger();
-
-            final List<SnapshotInfo> snapshots = new ArrayList<>();
-            final List<SnapshotInfo> syncSnapshots = Collections.synchronizedList(snapshots);
-
             try (var listeners = new RefCountingListener(listener)) {
-                final var iterationCompleteListener = listeners.acquire(ignored -> {
-                    totalCount.addAndGet(repositoryTotalCount.get());
-                    // no need to synchronize access to snapshots: all writes happen-before this read
-                    resultsCount.addAndGet(snapshots.size());
-                    allSnapshotInfos.add(snapshots);
-                });
                 ThrottledIterator.run(
                     Iterators.failFast(asyncSnapshotInfoIterator, () -> cancellableTask.isCancelled() || listeners.isFailing()),
                     (ref, asyncSnapshotInfo) -> {
@@ -474,9 +455,9 @@ private void loadSnapshotInfos(Iterator<AsyncSnapshotInfo> asyncSnapshotInfoIter
                             @Override
                             public void onResponse(SnapshotInfo snapshotInfo) {
                                 if (matchesPredicates(snapshotInfo)) {
-                                    repositoryTotalCount.incrementAndGet();
+                                    totalCount.incrementAndGet();
                                     if (afterPredicate.test(snapshotInfo)) {
-                                        syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices));
+                                        allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices));
                                     }
                                 }
                                 refListener.onResponse(null);
@@ -495,7 +476,7 @@ public void onFailure(Exception e) {
                     },
                     getSnapshotInfoExecutor.getMaxRunningTasks(),
                     () -> {},
-                    () -> iterationCompleteListener.onResponse(null)
+                    () -> {}
                 );
             }
         }
@@ -505,12 +486,11 @@ private GetSnapshotsResponse buildResponse() {
             cancellableTask.ensureNotCancelled();
             int remaining = 0;
             final var resultsStream = allSnapshotInfos.stream()
-                .flatMap(Collection::stream)
                 .peek(this::assertSatisfiesAllPredicates)
                 .sorted(sortBy.getSnapshotInfoComparator(order))
                 .skip(offset);
             final List<SnapshotInfo> snapshotInfos;
-            if (size == GetSnapshotsRequest.NO_LIMIT || resultsCount.get() <= size) {
+            if (size == GetSnapshotsRequest.NO_LIMIT || allSnapshotInfos.size() <= size) {
                 snapshotInfos = resultsStream.toList();
             } else {
                 snapshotInfos = new ArrayList<>(size);