Skip to content

Commit

Permalink
Batch delete requests for exchange spooling on S3
Browse files Browse the repository at this point in the history
  • Loading branch information
linzebing authored and arhimondr committed May 31, 2022
1 parent bbe9441 commit 7f2eeeb
Showing 1 changed file with 27 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -92,6 +94,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -257,14 +260,32 @@ public ListenableFuture<Void> deleteRecursively(List<URI> directories)
})));
}
else {
return stats.getDeleteRecursively().record(asVoid(Futures.allAsList(directories.stream().map(dir -> {
ImmutableMultimap.Builder<String, ListenableFuture<List<String>>> bucketToListObjectsFuturesBuilder = ImmutableMultimap.builder();
for (URI dir : directories) {
ImmutableList.Builder<String> keys = ImmutableList.builder();
return transformFuture(Futures.transformAsync(
toListenableFuture((listObjectsRecursively(dir).subscribe(listObjectsV2Response ->
listObjectsV2Response.contents().stream().map(S3Object::key).forEach(keys::add)))),
ignored -> deleteObjects(getBucketName(dir), keys.build()),
ListenableFuture<List<String>> listObjectsFuture = Futures.transform(
toListenableFuture((listObjectsRecursively(dir)
.subscribe(listObjectsV2Response -> listObjectsV2Response.contents().stream()
.map(S3Object::key)
.forEach(keys::add)))),
ignored -> keys.build(),
directExecutor());
bucketToListObjectsFuturesBuilder.put(getBucketName(dir), listObjectsFuture);
}
Multimap<String, ListenableFuture<List<String>>> bucketToListObjectsFutures = bucketToListObjectsFuturesBuilder.build();

ImmutableList.Builder<ListenableFuture<List<DeleteObjectsResponse>>> deleteObjectsFutures = ImmutableList.builder();
for (String bucketName : bucketToListObjectsFutures.keySet()) {
deleteObjectsFutures.add(Futures.transformAsync(
Futures.allAsList(bucketToListObjectsFutures.get(bucketName)),
keys -> deleteObjects(
bucketName,
keys.stream()
.flatMap(Collection::stream)
.collect(toImmutableList())),
directExecutor()));
}).collect(toImmutableList()))));
}
return transformFuture(Futures.allAsList(deleteObjectsFutures.build()));
}
}

Expand Down

0 comments on commit 7f2eeeb

Please sign in to comment.