Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Snapshot Delete Further #47626

Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;

Expand Down Expand Up @@ -387,12 +388,40 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
ActionListener<Void> listener) throws IOException {
final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
writeIndexGen(updatedRepositoryData, repositoryStateId);
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(afterCleanupsListener,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe add a single line comment like // clean up unreferenced blob in parallel with the snapshot deletion ?

l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null))));
deleteIndices(
updatedRepositoryData,
repositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId),
snapshotId,
ActionListener.delegateFailure(listener,
(l, v) -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null))));
ActionListener.runAfter(
ActionListener.wrap(
deleteResults -> {
// Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths)
// has been updated we can execute the delete operations for all blobs that have become unreferenced as a result
final String basePath = basePath().buildAsString();
final int basePathLen = basePath.length();
blobContainer().deleteBlobsIgnoringIfNotExists(
Stream.concat(
deleteResults.stream().flatMap(shardResult -> {
final String shardPath =
shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
}),
deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId ->
indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID()))
).map(absolutePath -> {
assert absolutePath.startsWith(basePath);
return absolutePath.substring(basePathLen);
}).collect(Collectors.toList()));
},
// Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request
e -> logger.warn(
() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)),
() -> afterCleanupsListener.onResponse(null))
);
}

/**
Expand Down Expand Up @@ -551,89 +580,57 @@ private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices
* @param listener Listener to invoke when finished
*/
private void deleteIndices(RepositoryData repositoryData, List<IndexId> indices, SnapshotId snapshotId,
ActionListener<Void> listener) {
ActionListener<Collection<ShardSnapshotMetaDeleteResult>> listener) {

if (indices.isEmpty()) {
listener.onResponse(null);
listener.onResponse(Collections.emptyList());
return;
}
// listener to complete once all shards folders affected by this delete have been added new metadata blobs without this snapshot
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> deleteFromMetaListener = new StepListener<>();

// Listener that flattens out the delete results for each index
final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>(
ActionListener.map(deleteFromMetaListener,
results -> results.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
ActionListener.map(listener, res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
for (IndexId indexId : indices) {
executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener,
deleteIdxMetaListener -> {
IndexMetaData indexMetaData = null;
final IndexMetaData indexMetaData;
try {
indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
} catch (Exception ex) {
logger.warn(() ->
new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
}
deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId);
if (indexMetaData != null) {
final int shardCount = indexMetaData.getNumberOfShards();
assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
final Index index = indexMetaData.getIndex();
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
final ShardId shard = new ShardId(index, shardId);
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
allShardsListener.onResponse(
deleteShardSnapshot(repositoryData, indexId, shard, snapshotId));
}

@Override
public void onFailure(Exception ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
snapshotId, indexId.getName(), shard.id()), ex);
// Just passing null here to count down the listener instead of failing it, the stale data left behind
// here will be retried in the next delete or repository cleanup
allShardsListener.onResponse(null);
}
});
}
} else {
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
// by the stale data cleanup in the end.
deleteIdxMetaListener.onResponse(null);
return;
}
}));
}

// Delete all the now unreferenced blobs in the shard paths
deleteFromMetaListener.whenComplete(newGens -> {
final String basePath = basePath().buildAsString();
final int basePathLen = basePath.length();
blobContainer().deleteBlobsIgnoringIfNotExists(
newGens.stream().flatMap(shardBlob -> {
final String shardPathAbs = shardContainer(shardBlob.indexId, shardBlob.shardId).path().buildAsString();
assert shardPathAbs.startsWith(basePath);
final String pathToShard = shardPathAbs.substring(basePathLen);
return shardBlob.blobsToDelete.stream().map(blob -> pathToShard + blob);
}).collect(Collectors.toList())
);
listener.onResponse(null);
}, e -> {
logger.warn(() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e);
listener.onResponse(null);
});
}
final int shardCount = indexMetaData.getNumberOfShards();
assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
final Index index = indexMetaData.getIndex();
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
final ShardId shard = new ShardId(index, shardId);
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
allShardsListener.onResponse(
deleteShardSnapshot(repositoryData, indexId, shard, snapshotId));
}

private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) {
try {
indexMetaDataFormat.delete(indexContainer(indexId), snapshotId.getUUID());
} catch (IOException ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]",
snapshotId, indexId.getName()), ex);
@Override
public void onFailure(Exception ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
snapshotId, indexId.getName(), shard.id()), ex);
// Just passing null here to count down the listener instead of failing it, the stale data left behind
// here will be retried in the next delete or repository cleanup
allShardsListener.onResponse(null);
}
});
}
}));
}
}

Expand Down