From 10fab8e26199231b45e2f08fdfd915ddcf485888 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 23 Nov 2022 14:10:18 +0100 Subject: [PATCH] Simplify and optimize deduplication of RepositoryData for a non-caching repository instance This makes use of the new deduplicator infrastructure to move to more efficient deduplication mechanics. The existing solution hardly ever deduplicated because it would only deduplicate after the repository entered a consistent state. The adjusted solution is much simpler, in that it simply deduplicates such that only a single loading of `RepositoryData` will ever happen at a time, fixing memory issues from massively concurrent loading of the repo data as described in #89952. closes #89952 --- .../blobstore/BlobStoreRepository.java | 27 +++++-------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index d7df332673be3..32444dd872ce6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -25,7 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.ResultDeduplicator; +import org.elasticsearch.action.SingleResultDeduplicator; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.ListenableActionFuture; @@ -413,7 +413,7 @@ protected BlobStoreRepository( this.namedXContentRegistry = namedXContentRegistry; this.basePath = basePath; this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings()); - this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); + this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(threadPool.getThreadContext(), this::doGetRepositoryData); shardSnapshotTaskRunner = new ShardSnapshotTaskRunner( threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), threadPool.executor(ThreadPool.Names.SNAPSHOT), @@ -1787,19 +1787,8 @@ public void getRepositoryData(ActionListener listener) { metadata.name(), latestKnownRepoGen ); - // Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state - // Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given - // generation may change - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META); - if (bestEffortConsistency || cacheRepositoryData == false) { - executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); - } else { - repoDataDeduplicator.executeOnce( - metadata, - listener, - (metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData)) - ); - } + threadPool.executor(ThreadPool.Names.SNAPSHOT_META) + .execute(ActionRunnable.wrap(listener, this.repoDataLoadDeduplicator::execute)); } } @@ -1912,7 +1901,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) ), onFailure ), - this::doGetRepositoryData + this.repoDataLoadDeduplicator::execute ) ); } else { @@ -1926,11 +1915,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) } /** - * {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning - * {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is - * unique for a given value of {@link #metadata} at any point in time. + * Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage. */ - private final ResultDeduplicator repoDataDeduplicator; + private final SingleResultDeduplicator repoDataLoadDeduplicator; private void doGetRepositoryData(ActionListener listener) { // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.