Skip to content

Commit

Permalink
Simplify and optimize deduplication of RepositoryData for a non-cachi…
Browse files Browse the repository at this point in the history
…ng repository instance

This makes use of the new deduplicator infrastructure to move to more
efficient deduplication mechanics.
The existing solution hardly ever deduplicated because it would only
deduplicate after the repository entered a consistent state. The
adjusted solution is much simpler, in that it simply deduplicates such
that only a single loading of `RepositoryData` will ever happen at a
time, fixing memory issues from massively concurrent loading of the repo
data as described in elastic#89952.

closes elastic#89952
  • Loading branch information
original-brownbear committed Nov 23, 2022
1 parent 751dc24 commit 10fab8e
Showing 1 changed file with 7 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
Expand Down Expand Up @@ -413,7 +413,7 @@ protected BlobStoreRepository(
this.namedXContentRegistry = namedXContentRegistry;
this.basePath = basePath;
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(threadPool.getThreadContext(), this::doGetRepositoryData);
shardSnapshotTaskRunner = new ShardSnapshotTaskRunner(
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
threadPool.executor(ThreadPool.Names.SNAPSHOT),
Expand Down Expand Up @@ -1787,19 +1787,8 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
metadata.name(),
latestKnownRepoGen
);
// Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
// Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
// generation may change
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
if (bestEffortConsistency || cacheRepositoryData == false) {
executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
} else {
repoDataDeduplicator.executeOnce(
metadata,
listener,
(metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))
);
}
threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
.execute(ActionRunnable.wrap(listener, this.repoDataLoadDeduplicator::execute));
}
}

Expand Down Expand Up @@ -1912,7 +1901,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
),
onFailure
),
this::doGetRepositoryData
this.repoDataLoadDeduplicator::execute
)
);
} else {
Expand All @@ -1926,11 +1915,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
}

/**
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
* unique for a given value of {@link #metadata} at any point in time.
* Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage.
*/
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator;
private final SingleResultDeduplicator<RepositoryData> repoDataLoadDeduplicator;

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
Expand Down

0 comments on commit 10fab8e

Please sign in to comment.