Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.6] Simplify and optimize deduplication of RepositoryData for a non-caching repository instance (#91851) #91866

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/91851.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 91851
summary: Simplify and optimize deduplication of `RepositoryData` for a non-caching
repository instance
area: Snapshot/Restore
type: bug
issues:
- 89952
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action;

import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
*
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
* share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers
* stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to
* be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to
* be used to deduplicate results from actions that produce results that change over time transparently.
*
* @param <T> Result type
*/
public final class SingleResultDeduplicator<T> {

private final ThreadContext threadContext;

/**
* List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in
* progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued
* up here once done.
*/
private List<ActionListener<T>> waitingListeners;

private final Consumer<ActionListener<T>> executeAction;

public SingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
this.threadContext = threadContext;
this.executeAction = executeAction;
}

/**
* Execute the action for the given {@code listener}.
* @param listener listener to resolve with execution result
*/
public void execute(ActionListener<T> listener) {
synchronized (this) {
if (waitingListeners == null) {
// no queued up listeners, just execute this one directly without deduplication and instantiate the list so that
// subsequent executions will wait
waitingListeners = new ArrayList<>();
} else {
// already running an execution, queue this one up
waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
return;
}
}
doExecute(listener);
}

private void doExecute(ActionListener<T> listener) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final List<ActionListener<T>> listeners;
synchronized (this) {
if (waitingListeners.isEmpty()) {
// no listeners were queued up while this execution ran, so we just reset the state to not having a running execution
waitingListeners = null;
return;
} else {
// we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the
// listeners currently queued up
listeners = waitingListeners;
waitingListeners = new ArrayList<>();
}
}
doExecute(new ActionListener<>() {
@Override
public void onResponse(T response) {
ActionListener.onResponse(listeners, response);
}

@Override
public void onFailure(Exception e) {
ActionListener.onFailure(listeners, e);
}
});
});
try {
executeAction.accept(wrappedListener);
} catch (Exception e) {
wrappedListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
Expand Down Expand Up @@ -413,7 +413,11 @@ protected BlobStoreRepository(
this.namedXContentRegistry = namedXContentRegistry;
this.basePath = basePath;
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
threadPool.getThreadContext(),
listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))
);
shardSnapshotTaskRunner = new ShardSnapshotTaskRunner(
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
threadPool.executor(ThreadPool.Names.SNAPSHOT),
Expand Down Expand Up @@ -1787,19 +1791,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
metadata.name(),
latestKnownRepoGen
);
// Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
// Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
// generation may change
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
if (bestEffortConsistency || cacheRepositoryData == false) {
executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
} else {
repoDataDeduplicator.executeOnce(
metadata,
listener,
(metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))
);
}
repoDataLoadDeduplicator.execute(listener);
}
}

Expand Down Expand Up @@ -1843,78 +1835,70 @@ private void initializeRepoGenerationTracking(ActionListener<RepositoryData> lis
}
existingListener.onFailure(e);
};
threadPool.generic()
.execute(
ActionRunnable.wrap(
ActionListener.wrap(
repoData -> submitUnbatchedTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata()
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(
metadata.name(),
repoData.getGenId(),
repoData.getGenId()
)
)
repoDataLoadDeduplicator.execute(
ActionListener.wrap(
repoData -> submitUnbatchedTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata()
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId())
)
.build();
}
)
.build();
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
}
),
onFailure
),
this::doGetRepositoryData
)
);
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
}
}
),
onFailure
)
);
} else {
logger.trace(
"[{}] waiting for existing initialization of repository metadata generation in cluster state",
Expand All @@ -1926,11 +1910,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
}

/**
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
* unique for a given value of {@link #metadata} at any point in time.
* Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage.
*/
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator;
private final SingleResultDeduplicator<RepositoryData> repoDataLoadDeduplicator;

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ private static ClusterService mockClusterService(ClusterState initialState) {
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.executor(ThreadPool.Names.SNAPSHOT_META)).thenReturn(new SameThreadExecutorService());
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))
Expand Down