Skip to content

Commit

Permalink
Simplify and optimize deduplication of RepositoryData for a non-cachi…
Browse files Browse the repository at this point in the history
…ng repository instance (elastic#91851)

This makes use of the new deduplicator infrastructure to move to more
efficient deduplication mechanics.
The existing solution hardly ever deduplicated because it would only
deduplicate after the repository entered a consistent state. The
adjusted solution is much simpler, in that it simply deduplicates such
that only a single loading of `RepositoryData` will ever happen at a
time, fixing memory issues from massively concurrent loading of the repo
data as described in elastic#89952.

closes elastic#89952
  • Loading branch information
original-brownbear committed Nov 23, 2022
1 parent ad182b0 commit dca4c5e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 87 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/91851.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 91851
summary: Simplify and optimize deduplication of `RepositoryData` for a non-caching
repository instance
area: Snapshot/Restore
type: bug
issues:
- 89952
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
Expand Down Expand Up @@ -413,7 +413,11 @@ protected BlobStoreRepository(
this.namedXContentRegistry = namedXContentRegistry;
this.basePath = basePath;
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
threadPool.getThreadContext(),
listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))
);
shardSnapshotTaskRunner = new ShardSnapshotTaskRunner(
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
threadPool.executor(ThreadPool.Names.SNAPSHOT),
Expand Down Expand Up @@ -1787,19 +1791,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
metadata.name(),
latestKnownRepoGen
);
// Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
// Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
// generation may change
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
if (bestEffortConsistency || cacheRepositoryData == false) {
executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
} else {
repoDataDeduplicator.executeOnce(
metadata,
listener,
(metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))
);
}
repoDataLoadDeduplicator.execute(listener);
}
}

Expand Down Expand Up @@ -1843,78 +1835,70 @@ private void initializeRepoGenerationTracking(ActionListener<RepositoryData> lis
}
existingListener.onFailure(e);
};
threadPool.generic()
.execute(
ActionRunnable.wrap(
ActionListener.wrap(
repoData -> submitUnbatchedTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata()
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(
metadata.name(),
repoData.getGenId(),
repoData.getGenId()
)
)
repoDataLoadDeduplicator.execute(
ActionListener.wrap(
repoData -> submitUnbatchedTask(
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata metadata = getRepoMetadata(currentState);
// No update to the repository generation should have occurred concurrently in general except
// for
// extreme corner cases like failing over to an older version master node and back to the
// current
// node concurrently
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(
metadata.name(),
"Found unexpected initialized repo metadata [" + metadata + "]"
);
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.getMetadata())
.putCustom(
RepositoriesMetadata.TYPE,
currentState.metadata()
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId())
)
.build();
}
)
.build();
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
logger.trace(
"[{}] initialized repository generation in cluster state to [{}]",
metadata.name(),
repoData.getGenId()
);
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
threadPool.generic().execute(() -> {
final ActionListener<RepositoryData> existingListener;
synchronized (BlobStoreRepository.this) {
existingListener = repoDataInitialized;
repoDataInitialized = null;
}
}
),
onFailure
),
this::doGetRepositoryData
)
);
existingListener.onResponse(repoData);
logger.trace(
"[{}] called listeners after initializing repository to generation [{}]",
metadata.name(),
repoData.getGenId()
);
});
}
}
),
onFailure
)
);
} else {
logger.trace(
"[{}] waiting for existing initialization of repository metadata generation in cluster state",
Expand All @@ -1926,11 +1910,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
}

/**
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
* unique for a given value of {@link #metadata} at any point in time.
* Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage.
*/
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator;
private final SingleResultDeduplicator<RepositoryData> repoDataLoadDeduplicator;

private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ private static ClusterService mockClusterService(ClusterState initialState) {
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.executor(ThreadPool.Names.SNAPSHOT_META)).thenReturn(new SameThreadExecutorService());
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))
Expand Down

0 comments on commit dca4c5e

Please sign in to comment.