Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup Bulk Delete Exception Logging #41693

Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ public void error(StorageException exception) {
if (e != null) {
throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e);
}
assert failedBlobs.isEmpty();
}

private static String buildKey(String keyPath, String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
Expand All @@ -34,6 +35,7 @@
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
Expand All @@ -50,6 +52,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
Expand Down Expand Up @@ -127,12 +131,13 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
if (blobNames.isEmpty()) {
return;
}
final Set<String> outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String blob : blobNames) {
partition.add(buildKey(blob));
for (String key : outstanding) {
partition.add(key);
if (partition.size() == MAX_BULK_DELETES ) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
Expand All @@ -144,23 +149,32 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
SocketAccess.doPrivilegedVoid(() -> {
AmazonClientException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
List<String> keysInRequest =
deleteRequest.getKeys().stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
try {
clientReference.client().deleteObjects(deleteRequest);
outstanding.removeAll(keysInRequest);
} catch (MultiObjectDeleteException e) {
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
// first remove all keys that were sent in the request and then add back those that ran into an exception.
outstanding.removeAll(keysInRequest);
outstanding.addAll(
e.getErrors().stream().map(MultiObjectDeleteException.DeleteError::getKey).collect(Collectors.toSet()));
aex = ExceptionsHelper.useOrSuppress(aex, e);
} catch (AmazonClientException e) {
if (aex == null) {
aex = e;
} else {
aex.addSuppressed(e);
}
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex = ExceptionsHelper.useOrSuppress(aex, e);
}
}
if (aex != null) {
throw aex;
}
});
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
} catch (Exception e) {
throw new IOException("Failed to delete blobs [" + outstanding + "]", e);
}
assert outstanding.isEmpty();
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,8 +1000,8 @@ protected void finalize(final List<SnapshotFiles> snapshots,
try {
blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
snapshotId, shardId, blobNames), e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs during finalization",
snapshotId, shardId), e);
throw e;
}

Expand All @@ -1016,8 +1016,8 @@ protected void finalize(final List<SnapshotFiles> snapshots,
try {
blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
snapshotId, shardId, indexBlobs), e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs during finalization",
snapshotId, shardId), e);
throw e;
}

Expand All @@ -1029,8 +1029,8 @@ protected void finalize(final List<SnapshotFiles> snapshots,
try {
blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
snapshotId, shardId, orphanedBlobs), e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs during finalization",
snapshotId, shardId), e);
}
} catch (IOException e) {
String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";
Expand Down