From b95cb8c40dc701cbce18308cdcd5538fd60facb9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 23 Feb 2024 06:51:38 +0000 Subject: [PATCH] Introduce `TransportGetSnapshotsAction#GetSnapshotsOperation` (#105609) This commit introduces a class to encapsulate each invocation of `TransportGetSnapshotsAction`, avoiding the need to pass around a huge list of parameters between all the methods that make up the implementation. --- .../get/TransportGetSnapshotsAction.java | 983 +++++++++--------- 1 file changed, 465 insertions(+), 518 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index ca910a8d94078..4996096492354 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -109,598 +109,514 @@ protected void masterOperation( ) { assert task instanceof CancellableTask : task + " not cancellable"; - getMultipleReposSnapshotInfo( - request.isSingleRepositoryRequest() == false, - SnapshotsInProgress.get(state), + new GetSnapshotsOperation( + (CancellableTask) task, TransportGetRepositoriesAction.getRepositories(state, request.repositories()), + request.isSingleRepositoryRequest() == false, request.snapshots(), request.ignoreUnavailable(), - request.verbose(), - (CancellableTask) task, + SnapshotPredicates.fromRequest(request), request.sort(), - request.after(), - request.offset(), - request.size(), request.order(), request.fromSortValue(), - SnapshotPredicates.fromRequest(request), - request.includeIndexNames(), - listener - ); + request.offset(), + request.after(), + request.size(), + SnapshotsInProgress.get(state), + request.verbose(), + request.includeIndexNames() + ).getMultipleReposSnapshotInfo(listener); } /** - * Filters the list of repositories that a request will fetch snapshots from in the special case of sorting by repository - * name and having a non-null value for {@link GetSnapshotsRequest#fromSortValue()} on the request to exclude repositories outside - * the sort value range if possible. + * A single invocation of the get-snapshots API. + *

+ * Decides which repositories to query, picks a collection of candidate {@link SnapshotId} values from each {@link RepositoryData}, + * chosen according to the request parameters, loads the relevant {@link SnapshotInfo} blobs, and finally sorts and filters the + * results. */ - private static List maybeFilterRepositories( - List repositories, - GetSnapshotsRequest.SortBy sortBy, - SortOrder order, - @Nullable String fromSortValue - ) { - if (sortBy != GetSnapshotsRequest.SortBy.REPOSITORY || fromSortValue == null) { - return repositories; - } - final Predicate predicate = order == SortOrder.ASC - ? repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) <= 0 - : repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) >= 0; - return repositories.stream().filter(predicate).toList(); - } - - private void getMultipleReposSnapshotInfo( - boolean isMultiRepoRequest, - SnapshotsInProgress snapshotsInProgress, - TransportGetRepositoriesAction.RepositoriesResult repositoriesResult, - String[] snapshots, - boolean ignoreUnavailable, - boolean verbose, - CancellableTask cancellableTask, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order, - String fromSortValue, - SnapshotPredicates predicates, - boolean indices, - ActionListener listener - ) { - // Process the missing repositories - final Map failures = ConcurrentCollections.newConcurrentMap(); - for (String missingRepo : repositoriesResult.missing()) { - failures.put(missingRepo, new RepositoryMissingException(missingRepo)); + private class GetSnapshotsOperation { + private final CancellableTask cancellableTask; + + // repositories + private final List repositories; + private final boolean isMultiRepoRequest; + + // snapshots selection + private final String[] snapshots; + private final boolean ignoreUnavailable; + private final SnapshotPredicates predicates; + + // snapshot ordering/pagination + private final GetSnapshotsRequest.SortBy sortBy; + private final SortOrder order; + @Nullable + private final String fromSortValue; + private final int offset; + @Nullable + private final GetSnapshotsRequest.After after; + private final int size; + + // current state + private final SnapshotsInProgress snapshotsInProgress; + + // output detail + private final boolean verbose; + private final boolean indices; + + // results + private final Map failuresByRepository = ConcurrentCollections.newConcurrentMap(); + private final Queue> allSnapshotInfos = ConcurrentCollections.newQueue(); + private final AtomicInteger remaining = new AtomicInteger(); + private final AtomicInteger totalCount = new AtomicInteger(); + + GetSnapshotsOperation( + CancellableTask cancellableTask, + TransportGetRepositoriesAction.RepositoriesResult repositoriesResult, + boolean isMultiRepoRequest, + String[] snapshots, + boolean ignoreUnavailable, + SnapshotPredicates predicates, + GetSnapshotsRequest.SortBy sortBy, + SortOrder order, + String fromSortValue, + int offset, + GetSnapshotsRequest.After after, + int size, + SnapshotsInProgress snapshotsInProgress, + boolean verbose, + boolean indices + ) { + this.cancellableTask = cancellableTask; + this.repositories = repositoriesResult.metadata(); + this.isMultiRepoRequest = isMultiRepoRequest; + this.snapshots = snapshots; + this.ignoreUnavailable = ignoreUnavailable; + this.predicates = predicates; + this.sortBy = sortBy; + this.order = order; + this.fromSortValue = fromSortValue; + this.offset = offset; + this.after = after; + this.size = size; + this.snapshotsInProgress = snapshotsInProgress; + this.verbose = verbose; + this.indices = indices; + + for (final var missingRepo : repositoriesResult.missing()) { + failuresByRepository.put(missingRepo, new RepositoryMissingException(missingRepo)); + } } - final Queue> allSnapshotInfos = ConcurrentCollections.newQueue(); - final var remaining = new AtomicInteger(); - final var totalCount = new AtomicInteger(); - - List repositories = maybeFilterRepositories(repositoriesResult.metadata(), sortBy, order, fromSortValue); - try (var listeners = new RefCountingListener(listener.map(ignored -> { - cancellableTask.ensureNotCancelled(); - final var sortedSnapshotsInRepos = sortSnapshots( - allSnapshotInfos.stream().flatMap(Collection::stream), - totalCount.get(), - sortBy, - after, - offset, - size, - order - ); - final var snapshotInfos = sortedSnapshotsInRepos.snapshotInfos(); - assert indices || snapshotInfos.stream().allMatch(snapshotInfo -> snapshotInfo.indices().isEmpty()); - final int finalRemaining = sortedSnapshotsInRepos.remaining() + remaining.get(); - return new GetSnapshotsResponse( - snapshotInfos, - failures, - finalRemaining > 0 - ? GetSnapshotsRequest.After.from(snapshotInfos.get(snapshotInfos.size() - 1), sortBy).asQueryParam() - : null, - totalCount.get(), - finalRemaining - ); - }))) { - for (final RepositoryMetadata repository : repositories) { - final String repoName = repository.name(); - getSingleRepoSnapshotInfo( - snapshotsInProgress, - repoName, - snapshots, - predicates, - ignoreUnavailable, - verbose, - cancellableTask, - sortBy, - after, - order, - indices, - listeners.acquire((SnapshotsInRepo snapshotsInRepo) -> { + /** + * Filters the list of repositories that a request will fetch snapshots from in the special case of sorting by repository + * name and having a non-null value for {@link GetSnapshotsRequest#fromSortValue()} on the request to exclude repositories outside + * the sort value range if possible. + */ + private List maybeFilterRepositories() { + if (sortBy != GetSnapshotsRequest.SortBy.REPOSITORY || fromSortValue == null) { + return repositories; + } + final Predicate predicate = order == SortOrder.ASC + ? repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) <= 0 + : repositoryMetadata -> fromSortValue.compareTo(repositoryMetadata.name()) >= 0; + return repositories.stream().filter(predicate).toList(); + } + + void getMultipleReposSnapshotInfo(ActionListener listener) { + List filteredRepositories = maybeFilterRepositories(); + try (var listeners = new RefCountingListener(listener.map(ignored -> { + cancellableTask.ensureNotCancelled(); + final var sortedSnapshotsInRepos = sortSnapshots( + allSnapshotInfos.stream().flatMap(Collection::stream), + totalCount.get(), + offset, + size + ); + final var snapshotInfos = sortedSnapshotsInRepos.snapshotInfos(); + assert indices || snapshotInfos.stream().allMatch(snapshotInfo -> snapshotInfo.indices().isEmpty()); + final int finalRemaining = sortedSnapshotsInRepos.remaining() + remaining.get(); + return new GetSnapshotsResponse( + snapshotInfos, + failuresByRepository, + finalRemaining > 0 + ? GetSnapshotsRequest.After.from(snapshotInfos.get(snapshotInfos.size() - 1), sortBy).asQueryParam() + : null, + totalCount.get(), + finalRemaining + ); + }))) { + for (final RepositoryMetadata repository : filteredRepositories) { + final String repoName = repository.name(); + getSingleRepoSnapshotInfo(repoName, listeners.acquire((SnapshotsInRepo snapshotsInRepo) -> { allSnapshotInfos.add(snapshotsInRepo.snapshotInfos()); remaining.addAndGet(snapshotsInRepo.remaining()); totalCount.addAndGet(snapshotsInRepo.totalCount()); }).delegateResponse((l, e) -> { if (isMultiRepoRequest && e instanceof ElasticsearchException elasticsearchException) { - failures.put(repoName, elasticsearchException); + failuresByRepository.put(repoName, elasticsearchException); l.onResponse(SnapshotsInRepo.EMPTY); } else { l.onFailure(e); } - }) - ); + })); + } } } - } - private void getSingleRepoSnapshotInfo( - SnapshotsInProgress snapshotsInProgress, - String repo, - String[] snapshots, - SnapshotPredicates predicates, - boolean ignoreUnavailable, - boolean verbose, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - SortOrder order, - boolean indices, - ActionListener listener - ) { - final Map allSnapshotIds = new HashMap<>(); - final List currentSnapshots = new ArrayList<>(); - for (SnapshotInfo snapshotInfo : currentSnapshots(snapshotsInProgress, repo)) { - Snapshot snapshot = snapshotInfo.snapshot(); - allSnapshotIds.put(snapshot.getSnapshotId().getName(), snapshot); - currentSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); - } + private void getSingleRepoSnapshotInfo(String repo, ActionListener listener) { + final Map allSnapshotIds = new HashMap<>(); + final List currentSnapshots = new ArrayList<>(); + for (final SnapshotInfo snapshotInfo : currentSnapshots(repo)) { + Snapshot snapshot = snapshotInfo.snapshot(); + allSnapshotIds.put(snapshot.getSnapshotId().getName(), snapshot); + currentSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); + } - final ListenableFuture repositoryDataListener = new ListenableFuture<>(); - if (isCurrentSnapshotsOnly(snapshots)) { - repositoryDataListener.onResponse(null); - } else { - repositoriesService.getRepositoryData(repo, repositoryDataListener); - } + final ListenableFuture repositoryDataListener = new ListenableFuture<>(); + if (isCurrentSnapshotsOnly()) { + repositoryDataListener.onResponse(null); + } else { + repositoriesService.getRepositoryData(repo, repositoryDataListener); + } - repositoryDataListener.addListener( - listener.delegateFailureAndWrap( - (l, repositoryData) -> loadSnapshotInfos( - snapshotsInProgress, - repo, - snapshots, - ignoreUnavailable, - verbose, - allSnapshotIds, - currentSnapshots, - repositoryData, - task, - sortBy, - after, - order, - predicates, - indices, - l + repositoryDataListener.addListener( + listener.delegateFailureAndWrap( + (l, repositoryData) -> loadSnapshotInfos(repo, allSnapshotIds, currentSnapshots, repositoryData, l) ) - ) - ); - } - - /** - * Returns a list of currently running snapshots from repository sorted by snapshot creation date - * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param repositoryName repository name - * @return list of snapshots - */ - private static List currentSnapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName) { - List snapshotList = new ArrayList<>(); - List entries = SnapshotsService.currentSnapshots( - snapshotsInProgress, - repositoryName, - Collections.emptyList() - ); - for (SnapshotsInProgress.Entry entry : entries) { - snapshotList.add(SnapshotInfo.inProgress(entry)); + ); } - return snapshotList; - } - private void loadSnapshotInfos( - SnapshotsInProgress snapshotsInProgress, - String repo, - String[] snapshots, - boolean ignoreUnavailable, - boolean verbose, - Map allSnapshotIds, - List currentSnapshots, - @Nullable RepositoryData repositoryData, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - SortOrder order, - SnapshotPredicates predicates, - boolean indices, - ActionListener listener - ) { - if (task.notifyIfCancelled(listener)) { - return; + /** + * Returns a list of currently running snapshots from repository sorted by snapshot creation date + * + * @param repositoryName repository name + * @return list of snapshots + */ + private List currentSnapshots(String repositoryName) { + List snapshotList = new ArrayList<>(); + List entries = SnapshotsService.currentSnapshots( + snapshotsInProgress, + repositoryName, + Collections.emptyList() + ); + for (SnapshotsInProgress.Entry entry : entries) { + snapshotList.add(SnapshotInfo.inProgress(entry)); + } + return snapshotList; } - if (repositoryData != null) { - for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { - if (predicates.test(snapshotId, repositoryData)) { - allSnapshotIds.put(snapshotId.getName(), new Snapshot(repo, snapshotId)); + private void loadSnapshotInfos( + String repo, + Map allSnapshotIds, + List currentSnapshots, + @Nullable RepositoryData repositoryData, + ActionListener listener + ) { + if (cancellableTask.notifyIfCancelled(listener)) { + return; + } + + if (repositoryData != null) { + for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + if (predicates.test(snapshotId, repositoryData)) { + allSnapshotIds.put(snapshotId.getName(), new Snapshot(repo, snapshotId)); + } } } - } - final Set toResolve = new HashSet<>(); - if (TransportGetRepositoriesAction.isMatchAll(snapshots)) { - toResolve.addAll(allSnapshotIds.values()); - } else { - final List includePatterns = new ArrayList<>(); - final List excludePatterns = new ArrayList<>(); - boolean hasCurrent = false; - boolean seenWildcard = false; - for (String snapshotOrPattern : snapshots) { - if (seenWildcard && snapshotOrPattern.length() > 1 && snapshotOrPattern.startsWith("-")) { - excludePatterns.add(snapshotOrPattern.substring(1)); - } else { - if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { - seenWildcard = true; - includePatterns.add(snapshotOrPattern); - } else if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { - hasCurrent = true; - seenWildcard = true; + final Set toResolve = new HashSet<>(); + if (TransportGetRepositoriesAction.isMatchAll(snapshots)) { + toResolve.addAll(allSnapshotIds.values()); + } else { + final List includePatterns = new ArrayList<>(); + final List excludePatterns = new ArrayList<>(); + boolean hasCurrent = false; + boolean seenWildcard = false; + for (String snapshotOrPattern : snapshots) { + if (seenWildcard && snapshotOrPattern.length() > 1 && snapshotOrPattern.startsWith("-")) { + excludePatterns.add(snapshotOrPattern.substring(1)); } else { - if (ignoreUnavailable == false && allSnapshotIds.containsKey(snapshotOrPattern) == false) { - throw new SnapshotMissingException(repo, snapshotOrPattern); + if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { + seenWildcard = true; + includePatterns.add(snapshotOrPattern); + } else if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { + hasCurrent = true; + seenWildcard = true; + } else { + if (ignoreUnavailable == false && allSnapshotIds.containsKey(snapshotOrPattern) == false) { + throw new SnapshotMissingException(repo, snapshotOrPattern); + } + includePatterns.add(snapshotOrPattern); } - includePatterns.add(snapshotOrPattern); } } - } - final String[] includes = includePatterns.toArray(Strings.EMPTY_ARRAY); - final String[] excludes = excludePatterns.toArray(Strings.EMPTY_ARRAY); - for (Map.Entry entry : allSnapshotIds.entrySet()) { - final Snapshot snapshot = entry.getValue(); - if (toResolve.contains(snapshot) == false - && Regex.simpleMatch(includes, entry.getKey()) - && Regex.simpleMatch(excludes, entry.getKey()) == false) { - toResolve.add(snapshot); - } - } - if (hasCurrent) { - for (SnapshotInfo snapshotInfo : currentSnapshots) { - final Snapshot snapshot = snapshotInfo.snapshot(); - if (Regex.simpleMatch(excludes, snapshot.getSnapshotId().getName()) == false) { + final String[] includes = includePatterns.toArray(Strings.EMPTY_ARRAY); + final String[] excludes = excludePatterns.toArray(Strings.EMPTY_ARRAY); + for (Map.Entry entry : allSnapshotIds.entrySet()) { + final Snapshot snapshot = entry.getValue(); + if (toResolve.contains(snapshot) == false + && Regex.simpleMatch(includes, entry.getKey()) + && Regex.simpleMatch(excludes, entry.getKey()) == false) { toResolve.add(snapshot); } } + if (hasCurrent) { + for (SnapshotInfo snapshotInfo : currentSnapshots) { + final Snapshot snapshot = snapshotInfo.snapshot(); + if (Regex.simpleMatch(excludes, snapshot.getSnapshotId().getName()) == false) { + toResolve.add(snapshot); + } + } + } + if (toResolve.isEmpty() && ignoreUnavailable == false && isCurrentSnapshotsOnly() == false) { + throw new SnapshotMissingException(repo, snapshots[0]); + } } - if (toResolve.isEmpty() && ignoreUnavailable == false && isCurrentSnapshotsOnly(snapshots) == false) { - throw new SnapshotMissingException(repo, snapshots[0]); + + if (verbose) { + snapshots(repo, toResolve.stream().map(Snapshot::getSnapshotId).toList(), listener); + } else { + assert predicates.isMatchAll() : "filtering is not supported in non-verbose mode"; + final SnapshotsInRepo snapshotInfos; + if (repositoryData != null) { + // want non-current snapshots as well, which are found in the repository data + snapshotInfos = buildSimpleSnapshotInfos(toResolve, repo, repositoryData, currentSnapshots); + } else { + // only want current snapshots + snapshotInfos = sortSnapshotsWithNoOffsetOrLimit(currentSnapshots.stream().map(SnapshotInfo::basic).toList()); + } + listener.onResponse(snapshotInfos); } } - if (verbose) { - snapshots( + /** + * Returns a list of snapshots from repository sorted by snapshot creation date + * + * @param repositoryName repository name + * @param snapshotIds snapshots for which to fetch snapshot information + */ + private void snapshots(String repositoryName, Collection snapshotIds, ActionListener listener) { + if (cancellableTask.notifyIfCancelled(listener)) { + return; + } + final Set snapshotSet = new HashSet<>(); + final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); + // first, look at the snapshots in progress + final List entries = SnapshotsService.currentSnapshots( snapshotsInProgress, - repo, - toResolve.stream().map(Snapshot::getSnapshotId).toList(), - ignoreUnavailable, - task, - sortBy, - after, - order, - predicates, - indices, - listener + repositoryName, + snapshotIdsToIterate.stream().map(SnapshotId::getName).toList() ); - } else { - assert predicates.isMatchAll() : "filtering is not supported in non-verbose mode"; - final SnapshotsInRepo snapshotInfos; - if (repositoryData != null) { - // want non-current snapshots as well, which are found in the repository data - snapshotInfos = buildSimpleSnapshotInfos(toResolve, repo, repositoryData, currentSnapshots, sortBy, after, order, indices); + for (SnapshotsInProgress.Entry entry : entries) { + if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) { + final SnapshotInfo snapshotInfo = SnapshotInfo.inProgress(entry); + if (predicates.test(snapshotInfo)) { + snapshotSet.add(snapshotInfo.maybeWithoutIndices(indices)); + } + } + } + // then, look in the repository if there's any matching snapshots left + final List snapshotInfos; + if (snapshotIdsToIterate.isEmpty()) { + snapshotInfos = Collections.emptyList(); } else { - // only want current snapshots - snapshotInfos = sortSnapshots( - currentSnapshots.stream().map(SnapshotInfo::basic).toList(), - sortBy, - after, - 0, - GetSnapshotsRequest.NO_LIMIT, - order - ); + snapshotInfos = Collections.synchronizedList(new ArrayList<>()); } - listener.onResponse(snapshotInfos); - } - } - - /** - * Returns a list of snapshots from repository sorted by snapshot creation date - * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param repositoryName repository name - * @param snapshotIds snapshots for which to fetch snapshot information - * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning, - * @param indices if false, drop the list of indices from each result - */ - private void snapshots( - SnapshotsInProgress snapshotsInProgress, - String repositoryName, - Collection snapshotIds, - boolean ignoreUnavailable, - CancellableTask task, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - SortOrder order, - SnapshotPredicates predicate, - boolean indices, - ActionListener listener - ) { - if (task.notifyIfCancelled(listener)) { - return; - } - final Set snapshotSet = new HashSet<>(); - final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); - // first, look at the snapshots in progress - final List entries = SnapshotsService.currentSnapshots( - snapshotsInProgress, - repositoryName, - snapshotIdsToIterate.stream().map(SnapshotId::getName).toList() - ); - for (SnapshotsInProgress.Entry entry : entries) { - if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) { - final SnapshotInfo snapshotInfo = SnapshotInfo.inProgress(entry); - if (predicate.test(snapshotInfo)) { - snapshotSet.add(snapshotInfo.maybeWithoutIndices(indices)); - } + final ActionListener allDoneListener = listener.safeMap(v -> { + final ArrayList snapshotList = new ArrayList<>(snapshotInfos); + snapshotList.addAll(snapshotSet); + return sortSnapshotsWithNoOffsetOrLimit(snapshotList); + }); + if (snapshotIdsToIterate.isEmpty()) { + allDoneListener.onResponse(null); + return; } + final Repository repository; + try { + repository = repositoriesService.repository(repositoryName); + } catch (RepositoryMissingException e) { + listener.onFailure(e); + return; + } + repository.getSnapshotInfo( + new GetSnapshotInfoContext( + snapshotIdsToIterate, + ignoreUnavailable == false, + cancellableTask::isCancelled, + (context, snapshotInfo) -> { + if (predicates.test(snapshotInfo)) { + snapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); + } + }, + allDoneListener + ) + ); } - // then, look in the repository if there's any matching snapshots left - final List snapshotInfos; - if (snapshotIdsToIterate.isEmpty()) { - snapshotInfos = Collections.emptyList(); - } else { - snapshotInfos = Collections.synchronizedList(new ArrayList<>()); - } - final ActionListener allDoneListener = listener.safeMap(v -> { - final ArrayList snapshotList = new ArrayList<>(snapshotInfos); - snapshotList.addAll(snapshotSet); - return sortSnapshots(snapshotList, sortBy, after, 0, GetSnapshotsRequest.NO_LIMIT, order); - }); - if (snapshotIdsToIterate.isEmpty()) { - allDoneListener.onResponse(null); - return; - } - final Repository repository; - try { - repository = repositoriesService.repository(repositoryName); - } catch (RepositoryMissingException e) { - listener.onFailure(e); - return; - } - repository.getSnapshotInfo( - new GetSnapshotInfoContext(snapshotIdsToIterate, ignoreUnavailable == false, task::isCancelled, (context, snapshotInfo) -> { - if (predicate.test(snapshotInfo)) { - snapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices)); - } - }, allDoneListener) - ); - } - private static boolean isCurrentSnapshotsOnly(String[] snapshots) { - return (snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0])); - } + private boolean isCurrentSnapshotsOnly() { + return snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0]); + } - private static SnapshotsInRepo buildSimpleSnapshotInfos( - final Set toResolve, - final String repoName, - final RepositoryData repositoryData, - final List currentSnapshots, - final GetSnapshotsRequest.SortBy sortBy, - @Nullable final GetSnapshotsRequest.After after, - final SortOrder order, - boolean indices - ) { - List snapshotInfos = new ArrayList<>(); - for (SnapshotInfo snapshotInfo : currentSnapshots) { - if (toResolve.remove(snapshotInfo.snapshot())) { - snapshotInfos.add(snapshotInfo.basic()); + private SnapshotsInRepo buildSimpleSnapshotInfos( + final Set toResolve, + final String repoName, + final RepositoryData repositoryData, + final List currentSnapshots + ) { + List snapshotInfos = new ArrayList<>(); + for (SnapshotInfo snapshotInfo : currentSnapshots) { + if (toResolve.remove(snapshotInfo.snapshot())) { + snapshotInfos.add(snapshotInfo.basic()); + } } - } - Map> snapshotsToIndices = new HashMap<>(); - if (indices) { - for (IndexId indexId : repositoryData.getIndices().values()) { - for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) { - if (toResolve.contains(new Snapshot(repoName, snapshotId))) { - snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>()).add(indexId.getName()); + Map> snapshotsToIndices = new HashMap<>(); + if (indices) { + for (IndexId indexId : repositoryData.getIndices().values()) { + for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) { + if (toResolve.contains(new Snapshot(repoName, snapshotId))) { + snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>()).add(indexId.getName()); + } } } } + for (Snapshot snapshot : toResolve) { + snapshotInfos.add( + new SnapshotInfo( + snapshot, + snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()), + Collections.emptyList(), + Collections.emptyList(), + repositoryData.getSnapshotState(snapshot.getSnapshotId()) + ) + ); + } + return sortSnapshotsWithNoOffsetOrLimit(snapshotInfos); } - for (Snapshot snapshot : toResolve) { - snapshotInfos.add( - new SnapshotInfo( - snapshot, - snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()), - Collections.emptyList(), - Collections.emptyList(), - repositoryData.getSnapshotState(snapshot.getSnapshotId()) - ) - ); - } - return sortSnapshots(snapshotInfos, sortBy, after, 0, GetSnapshotsRequest.NO_LIMIT, order); - } - private static final Comparator BY_START_TIME = Comparator.comparingLong(SnapshotInfo::startTime) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_START_TIME = Comparator.comparingLong(SnapshotInfo::startTime) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_DURATION = Comparator.comparingLong( - sni -> sni.endTime() - sni.startTime() - ).thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_DURATION = Comparator.comparingLong( + sni -> sni.endTime() - sni.startTime() + ).thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_INDICES_COUNT = Comparator.comparingInt(sni -> sni.indices().size()) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_INDICES_COUNT = Comparator.comparingInt(sni -> sni.indices().size()) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::totalShards) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::totalShards) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_FAILED_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::failedShards) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_FAILED_SHARDS_COUNT = Comparator.comparingInt(SnapshotInfo::failedShards) + .thenComparing(SnapshotInfo::snapshotId); - private static final Comparator BY_NAME = Comparator.comparing(sni -> sni.snapshotId().getName()); + private static final Comparator BY_NAME = Comparator.comparing(sni -> sni.snapshotId().getName()); - private static final Comparator BY_REPOSITORY = Comparator.comparing(SnapshotInfo::repository) - .thenComparing(SnapshotInfo::snapshotId); + private static final Comparator BY_REPOSITORY = Comparator.comparing(SnapshotInfo::repository) + .thenComparing(SnapshotInfo::snapshotId); - private static long getDuration(SnapshotId snapshotId, RepositoryData repositoryData) { - final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); - if (details == null) { - return -1; - } - final long startTime = details.getStartTimeMillis(); - if (startTime == -1) { - return -1; + private SnapshotsInRepo sortSnapshotsWithNoOffsetOrLimit(List snapshotInfos) { + return sortSnapshots(snapshotInfos.stream(), snapshotInfos.size(), 0, GetSnapshotsRequest.NO_LIMIT); } - final long endTime = details.getEndTimeMillis(); - if (endTime == -1) { - return -1; - } - return endTime - startTime; - } - private static long getStartTime(SnapshotId snapshotId, RepositoryData repositoryData) { - final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); - return details == null ? -1 : details.getStartTimeMillis(); - } + private SnapshotsInRepo sortSnapshots(Stream infos, int totalCount, int offset, int size) { + final Comparator comparator = switch (sortBy) { + case START_TIME -> BY_START_TIME; + case NAME -> BY_NAME; + case DURATION -> BY_DURATION; + case INDICES -> BY_INDICES_COUNT; + case SHARDS -> BY_SHARDS_COUNT; + case FAILED_SHARDS -> BY_FAILED_SHARDS_COUNT; + case REPOSITORY -> BY_REPOSITORY; + }; - private static int indexCount(SnapshotId snapshotId, RepositoryData repositoryData) { - // TODO: this could be made more efficient by caching this number in RepositoryData - int indexCount = 0; - for (IndexId idx : repositoryData.getIndices().values()) { - if (repositoryData.getSnapshots(idx).contains(snapshotId)) { - indexCount++; + if (after != null) { + assert offset == 0 : "can't combine after and offset but saw [" + after + "] and offset [" + offset + "]"; + infos = infos.filter(buildAfterPredicate()); } + infos = infos.sorted(order == SortOrder.DESC ? comparator.reversed() : comparator).skip(offset); + final List allSnapshots = infos.toList(); + final List snapshots; + if (size != GetSnapshotsRequest.NO_LIMIT) { + snapshots = allSnapshots.stream().limit(size + 1).toList(); + } else { + snapshots = allSnapshots; + } + final List resultSet = size != GetSnapshotsRequest.NO_LIMIT && size < snapshots.size() + ? snapshots.subList(0, size) + : snapshots; + return new SnapshotsInRepo(resultSet, totalCount, allSnapshots.size() - resultSet.size()); + } + + private Predicate buildAfterPredicate() { + final String snapshotName = after.snapshotName(); + final String repoName = after.repoName(); + final String value = after.value(); + return switch (sortBy) { + case START_TIME -> filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(value), snapshotName, repoName, order); + case NAME -> + // TODO: cover via pre-flight predicate + order == SortOrder.ASC + ? (info -> compareName(snapshotName, repoName, info) < 0) + : (info -> compareName(snapshotName, repoName, info) > 0); + case DURATION -> filterByLongOffset( + info -> info.endTime() - info.startTime(), + Long.parseLong(value), + snapshotName, + repoName, + order + ); + case INDICES -> + // TODO: cover via pre-flight predicate + filterByLongOffset(info -> info.indices().size(), Integer.parseInt(value), snapshotName, repoName, order); + case SHARDS -> filterByLongOffset(SnapshotInfo::totalShards, Integer.parseInt(value), snapshotName, repoName, order); + case FAILED_SHARDS -> filterByLongOffset( + SnapshotInfo::failedShards, + Integer.parseInt(value), + snapshotName, + repoName, + order + ); + case REPOSITORY -> + // TODO: cover via pre-flight predicate + order == SortOrder.ASC + ? (info -> compareRepositoryName(snapshotName, repoName, info) < 0) + : (info -> compareRepositoryName(snapshotName, repoName, info) > 0); + }; + } + + private static Predicate filterByLongOffset( + ToLongFunction extractor, + long after, + String snapshotName, + String repoName, + SortOrder order + ) { + return order == SortOrder.ASC ? info -> { + final long val = extractor.applyAsLong(info); + return after < val || (after == val && compareName(snapshotName, repoName, info) < 0); + } : info -> { + final long val = extractor.applyAsLong(info); + return after > val || (after == val && compareName(snapshotName, repoName, info) > 0); + }; + } + + private static int compareRepositoryName(String name, String repoName, SnapshotInfo info) { + final int res = repoName.compareTo(info.repository()); + if (res != 0) { + return res; + } + return name.compareTo(info.snapshotId().getName()); } - return indexCount; - } - - private static SnapshotsInRepo sortSnapshots( - List snapshotInfos, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order - ) { - return sortSnapshots(snapshotInfos.stream(), snapshotInfos.size(), sortBy, after, offset, size, order); - } - - private static SnapshotsInRepo sortSnapshots( - Stream infos, - int totalCount, - GetSnapshotsRequest.SortBy sortBy, - @Nullable GetSnapshotsRequest.After after, - int offset, - int size, - SortOrder order - ) { - final Comparator comparator = switch (sortBy) { - case START_TIME -> BY_START_TIME; - case NAME -> BY_NAME; - case DURATION -> BY_DURATION; - case INDICES -> BY_INDICES_COUNT; - case SHARDS -> BY_SHARDS_COUNT; - case FAILED_SHARDS -> BY_FAILED_SHARDS_COUNT; - case REPOSITORY -> BY_REPOSITORY; - }; - - if (after != null) { - assert offset == 0 : "can't combine after and offset but saw [" + after + "] and offset [" + offset + "]"; - infos = infos.filter(buildAfterPredicate(sortBy, after, order)); - } - infos = infos.sorted(order == SortOrder.DESC ? comparator.reversed() : comparator).skip(offset); - final List allSnapshots = infos.toList(); - final List snapshots; - if (size != GetSnapshotsRequest.NO_LIMIT) { - snapshots = allSnapshots.stream().limit(size + 1).toList(); - } else { - snapshots = allSnapshots; - } - final List resultSet = size != GetSnapshotsRequest.NO_LIMIT && size < snapshots.size() - ? snapshots.subList(0, size) - : snapshots; - return new SnapshotsInRepo(resultSet, totalCount, allSnapshots.size() - resultSet.size()); - } - private static Predicate buildAfterPredicate( - GetSnapshotsRequest.SortBy sortBy, - GetSnapshotsRequest.After after, - SortOrder order - ) { - final String snapshotName = after.snapshotName(); - final String repoName = after.repoName(); - final String value = after.value(); - return switch (sortBy) { - case START_TIME -> filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(value), snapshotName, repoName, order); - case NAME -> - // TODO: cover via pre-flight predicate - order == SortOrder.ASC - ? (info -> compareName(snapshotName, repoName, info) < 0) - : (info -> compareName(snapshotName, repoName, info) > 0); - case DURATION -> filterByLongOffset( - info -> info.endTime() - info.startTime(), - Long.parseLong(value), - snapshotName, - repoName, - order - ); - case INDICES -> - // TODO: cover via pre-flight predicate - filterByLongOffset(info -> info.indices().size(), Integer.parseInt(value), snapshotName, repoName, order); - case SHARDS -> filterByLongOffset(SnapshotInfo::totalShards, Integer.parseInt(value), snapshotName, repoName, order); - case FAILED_SHARDS -> filterByLongOffset(SnapshotInfo::failedShards, Integer.parseInt(value), snapshotName, repoName, order); - case REPOSITORY -> - // TODO: cover via pre-flight predicate - order == SortOrder.ASC - ? (info -> compareRepositoryName(snapshotName, repoName, info) < 0) - : (info -> compareRepositoryName(snapshotName, repoName, info) > 0); - }; - } - - private static Predicate filterByLongOffset( - ToLongFunction extractor, - long after, - String snapshotName, - String repoName, - SortOrder order - ) { - return order == SortOrder.ASC ? info -> { - final long val = extractor.applyAsLong(info); - return after < val || (after == val && compareName(snapshotName, repoName, info) < 0); - } : info -> { - final long val = extractor.applyAsLong(info); - return after > val || (after == val && compareName(snapshotName, repoName, info) > 0); - }; - } - - private static int compareRepositoryName(String name, String repoName, SnapshotInfo info) { - final int res = repoName.compareTo(info.repository()); - if (res != 0) { - return res; + private static int compareName(String name, String repoName, SnapshotInfo info) { + final int res = name.compareTo(info.snapshotId().getName()); + if (res != 0) { + return res; + } + return repoName.compareTo(info.repository()); } - return name.compareTo(info.snapshotId().getName()); - } - private static int compareName(String name, String repoName, SnapshotInfo info) { - final int res = name.compareTo(info.snapshotId().getName()); - if (res != 0) { - return res; - } - return repoName.compareTo(info.repository()); } /** @@ -881,6 +797,37 @@ private static Predicate filterByLongOffset(ToLongFunction after <= extractor.applyAsLong(info) : info -> after >= extractor.applyAsLong(info); } + private static long getDuration(SnapshotId snapshotId, RepositoryData repositoryData) { + final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); + if (details == null) { + return -1; + } + final long startTime = details.getStartTimeMillis(); + if (startTime == -1) { + return -1; + } + final long endTime = details.getEndTimeMillis(); + if (endTime == -1) { + return -1; + } + return endTime - startTime; + } + + private static long getStartTime(SnapshotId snapshotId, RepositoryData repositoryData) { + final RepositoryData.SnapshotDetails details = repositoryData.getSnapshotDetails(snapshotId); + return details == null ? -1 : details.getStartTimeMillis(); + } + + private static int indexCount(SnapshotId snapshotId, RepositoryData repositoryData) { + // TODO: this could be made more efficient by caching this number in RepositoryData + int indexCount = 0; + for (IndexId idx : repositoryData.getIndices().values()) { + if (repositoryData.getSnapshots(idx).contains(snapshotId)) { + indexCount++; + } + } + return indexCount; + } } private record SnapshotsInRepo(List snapshotInfos, int totalCount, int remaining) {