Skip to content

Commit

Permalink
Add support for async deletion in S3BlobContainer (opensearch-project…
Browse files Browse the repository at this point in the history
…#15621)

* Add support for async deletion in S3BlobContainer

Signed-off-by: Ashish Singh <[email protected]>

* Move helper methods to helper class

Signed-off-by: Ashish Singh <[email protected]>

* Minor refactor

Signed-off-by: Ashish Singh <[email protected]>

* Add UTs

Signed-off-by: Ashish Singh <[email protected]>

* Add more tests

Signed-off-by: Ashish Singh <[email protected]>

* Integrate async deletion in the snapshot interactions

Signed-off-by: Ashish Singh <[email protected]>

---------

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored and dk2k committed Oct 16, 2024
1 parent b34f1f5 commit 417e041
Show file tree
Hide file tree
Showing 16 changed files with 1,086 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
.put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false)
.put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT)
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.put(super.nodeSettings(nodeOrdinal))
.setSecureSettings(secureSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@

public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {

@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.build();
}

@Override
@Before
@SuppressForbidden(reason = "Need to set system property here for AWS SDK v2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.utils.CollectionUtils;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.S3AsyncDeleteHelper;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
Expand All @@ -109,6 +111,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
Expand Down Expand Up @@ -875,4 +880,123 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();
ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicLong deletedBlobs = new AtomicLong();
AtomicLong deletedBytes = new AtomicLong();

CompletableFuture<Void> listingFuture = new CompletableFuture<>();

listPublisher.subscribe(new Subscriber<>() {
private Subscription subscription;
private final List<String> objectsToDelete = new ArrayList<>();
private CompletableFuture<Void> deletionChain = CompletableFuture.completedFuture(null);

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}

@Override
public void onNext(ListObjectsV2Response response) {
response.contents().forEach(s3Object -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3Object.size());
objectsToDelete.add(s3Object.key());
});

int bulkDeleteSize = blobStore.getBulkDeletesSize();
if (objectsToDelete.size() >= bulkDeleteSize) {
int fullBatchesCount = objectsToDelete.size() / bulkDeleteSize;
int itemsToDelete = fullBatchesCount * bulkDeleteSize;

List<String> batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete));
objectsToDelete.subList(0, itemsToDelete).clear();

deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
batchToDelete,
deletionChain,
() -> subscription.request(1)
);
} else {
subscription.request(1);
}
}

@Override
public void onError(Throwable t) {
listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t));
}

@Override
public void onComplete() {
if (!objectsToDelete.isEmpty()) {
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
objectsToDelete,
deletionChain,
null
);
}
deletionChain.whenComplete((v, throwable) -> {
if (throwable != null) {
listingFuture.completeExceptionally(throwable);
} else {
listingFuture.complete(null);
}
});
}
});

listingFuture.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(
throwable instanceof Exception
? (Exception) throwable
: new IOException("Unexpected error during async deletion", throwable)
);
} else {
completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get()));
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async deletion", e));
}
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
if (blobNames.isEmpty()) {
completionListener.onResponse(null);
return;
}

try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

List<String> keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList());

S3AsyncDeleteHelper.executeDeleteChain(s3AsyncClient, blobStore, keysToDelete, CompletableFuture.completedFuture(null), null)
.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs " + blobNames, throwable));
} else {
completionListener.onResponse(null);
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;

class S3BlobStore implements BlobStore {
public class S3BlobStore implements BlobStore {

private static final Logger logger = LogManager.getLogger(S3BlobStore.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public void publish(MetricCollection metricCollection) {
public void close() {}
};

public MetricPublisher getDeleteObjectsMetricPublisher() {
return deleteObjectsMetricPublisher;
}

public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3.async;

import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.repositories.s3.S3BlobStore;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class S3AsyncDeleteHelper {
private static final Logger logger = LogManager.getLogger(S3AsyncDeleteHelper.class);

public static CompletableFuture<Void> executeDeleteChain(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> objectsToDelete,
CompletableFuture<Void> currentChain,
Runnable afterDeleteAction
) {
List<List<String>> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize());
CompletableFuture<Void> newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, blobStore, batches));
if (afterDeleteAction != null) {
newChain = newChain.thenRun(afterDeleteAction);
}
return newChain;
}

static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteSize) {
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < keys.size(); i += bulkDeleteSize) {
batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize)));
}
return batches;
}

static CompletableFuture<Void> executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<List<String>> batches) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch));
}

return allDeletesFuture;
}

static CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<String> batch) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse);
}

static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
if (!deleteObjectsResponse.errors().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
}
return null;
}

static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Delete.builder()
.objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList()))
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().getDeleteObjectsMetricPublisher()))
.build();
}
}
Loading

0 comments on commit 417e041

Please sign in to comment.