Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ywangd committed Oct 2, 2023
1 parent cd4e17f commit 2968087
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, in
/**
* Deletes the given path and all its children.
*
* @param purpose The purpose of the delete operation, useful for stats collection
* @param purpose The purpose of the delete operation
* @param pathStr Name of path to delete
*/
DeleteResult deleteDirectory(OperationPurpose purpose, String pathStr) throws IOException {
Expand Down Expand Up @@ -524,7 +524,7 @@ public String next() {
/**
* Deletes multiple blobs from the specific bucket using a batch request
*
* @param purpose the purpose of the delete operation, useful for stats collection
* @param purpose the purpose of the delete operation
* @param blobNames names of the blobs to delete
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testRequestStatsWithOperationPurposes() throws IOException {

final BlobPath blobPath = repository.basePath().add(randomAlphaOfLength(10));
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT, () -> randomFrom(OperationPurpose.values()));
final OperationPurpose purpose = randomFrom(OperationPurpose.values());
final BytesArray whatToWrite = new BytesArray(randomByteArrayOfLength(randomIntBetween(100, 1000)));
blobContainer.writeBlob(purpose, "test.txt", whatToWrite, true);
try (InputStream is = blobContainer.readBlob(purpose, "test.txt")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,30 +228,30 @@ public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<St
blobNames.forEachRemaining(key -> {
partition.add(key);
if (partition.size() == MAX_BULK_DELETES) {
deletePartition(clientReference, partition, aex, purpose);
deletePartition(purpose, clientReference, partition, aex);
partition.clear();
}
});
if (partition.isEmpty() == false) {
deletePartition(clientReference, partition, aex, purpose);
deletePartition(purpose, clientReference, partition, aex);
}
});
if (aex.get() != null) {
throw aex.get();
}
} catch (Exception e) {
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
throw new IOException("Failed to delete blobs " + partition.stream().limit(delete10).toList(), e);
}
}

private void deletePartition(
OperationPurpose purpose,
AmazonS3Reference clientReference,
List<String> partition,
AtomicReference<Exception> aex,
OperationPurpose purpose
AtomicReference<Exception> aex
) {
try {
clientReference.client().deleteObjects(bulkDelete(this, partition, purpose));
clientReference.client().deleteObjects(bulkDelete(purpose, this, partition));
} catch (MultiObjectDeleteException e) {
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
// first remove all keys that were sent in the request and then add back those that ran into an exception.
Expand All @@ -270,7 +270,7 @@ private void deletePartition(
}
}

private static DeleteObjectsRequest bulkDelete(S3BlobStore blobStore, List<String> blobs, OperationPurpose purpose) {
private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobStore blobStore, List<String> blobs) {
return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY))
.withQuiet(true)
.withRequestMetricCollector(blobStore.deleteMetricCollector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface BlobContainer {
/**
* Tests whether a blob with the given blob name exists in the container.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobName The name of the blob whose existence is to be determined.
* @return {@code true} if a blob exists in the {@link BlobContainer} with the given name, and {@code false} otherwise.
*/
Expand All @@ -51,7 +51,7 @@ default boolean blobExists(String blobName) throws IOException {
/**
* Creates a new {@link InputStream} for the given blob name.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobName The name of the blob to get an {@link InputStream} for.
* @return The {@code InputStream} to read the blob.
* @throws NoSuchFileException if the blob does not exist
Expand All @@ -69,7 +69,7 @@ default InputStream readBlob(String blobName) throws IOException {
* a specific {@code position} in the blob. The {@code length} is an indication of the
* number of bytes that are expected to be read from the {@link InputStream}.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobName The name of the blob to get an {@link InputStream} for.
* @param position The position in the blob where the next byte will be read.
* @param length An indication of the number of bytes to be read.
Expand Down Expand Up @@ -107,7 +107,7 @@ default long readBlobPreferredLength() {
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobName The name of the blob to write the contents of the input stream to.
* @param inputStream The input stream from which to retrieve the bytes to write to the blob.
* @param blobSize The size of the blob to be written, in bytes. It is implementation dependent whether
Expand All @@ -127,7 +127,7 @@ default void writeBlob(String blobName, InputStream inputStream, long blobSize,
/**
* Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobName The name of the blob to write the contents of the input stream to.
* @param bytes The bytes to write
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
Expand All @@ -150,7 +150,7 @@ default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlre
* This method is only used for streaming serialization of repository metadata that is known to be of limited size
* at any point in time and across all concurrent invocations of this method.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobName the name of the blob to write
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
* @param atomic whether the write should be atomic in case the implementation supports it
Expand Down Expand Up @@ -178,7 +178,7 @@ default void writeMetadataBlob(
* Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobName The name of the blob to write the contents of the input stream to.
* @param bytes The bytes to write
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
Expand All @@ -195,7 +195,7 @@ default void writeBlobAtomic(String blobName, BytesReference bytes, boolean fail
/**
* Deletes this container and all its contents from the repository.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @return delete result
* @throws IOException on failure
*/
Expand All @@ -210,7 +210,7 @@ default DeleteResult delete() throws IOException {
* Deletes the blobs with given names. This method will not throw an exception
* when one or multiple of the given blobs don't exist and simply ignore this case.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobNames the names of the blobs to delete
* @throws IOException if a subset of blob exists but could not be deleted.
*/
Expand Down Expand Up @@ -240,7 +240,7 @@ default Map<String, BlobMetadata> listBlobs() throws IOException {
* a path that has this containers {@link #path()} return as its prefix and has one more path element than the current
* container's path.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @return Map of name of the child container to child container
* @throws IOException on failure to list child containers
*/
Expand All @@ -254,7 +254,7 @@ default Map<String, BlobContainer> children() throws IOException {
/**
* Lists all blobs in the container that match the specified prefix.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param blobNamePrefix The prefix to match against blob names in the container.
* @return A map of the matching blobs in the container. The keys in the map are the names of the blobs
* and the values are {@link BlobMetadata}, containing basic information about each blob.
Expand All @@ -271,7 +271,7 @@ default Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) throw
* Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}.
* Keys not yet used start at initial value 0. Returns the current value (before it was updated).
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param key key of the value to update
* @param expected the expected value
* @param updated the new value
Expand Down Expand Up @@ -332,7 +332,7 @@ default void compareAndSetRegister(String key, BytesReference expected, BytesRef
* Gets the value set by {@link #compareAndSetRegister} or {@link #compareAndExchangeRegister} for a given key.
* If a key has not yet been used, the initial value is an empty {@link BytesReference}.
*
* @param purpose The purpose of the operation, useful for stats collection.
* @param purpose The purpose of the operation
* @param key key of the value to get
* @param listener a listener, completed with the value read from the register or {@code OptionalBytesReference#MISSING} if the value
* could not be read due to concurrent activity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ default void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws I
/**
* Delete all the provided blobs from the blob store. Each blob could belong to a different {@code BlobContainer}
*
* @param purpose the purpose of the delete operation, useful for stats collection
* @param purpose the purpose of the delete operation
* @param blobNames the blobs to be deleted
*/
default void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.elasticsearch.common.blobstore;

/**
* The purpose of an operation against the blobstore. For example, it can be useful for stats collection
* as well as other things that requires further differentiation for the same blob operation.
*/
public enum OperationPurpose {
SNAPSHOT("Snapshot"),
CLUSTER_STATE("ClusterState"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -242,18 +243,24 @@ public BlobContainer blobContainer(BlobPath path) {
final BlobContainer blobContainer = blobStore.blobContainer(path);
return new FilterBlobContainer(blobContainer) {
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
public void writeBlob(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
if (canErrorForWriteBlob.get() && randomIntBetween(0, 10) == 0) {
writeBlobErrored.set(true);
throw new IOException("disk full");
} else {
super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
super.writeBlob(purpose, blobName, inputStream, blobSize, failIfAlreadyExists);
}
}

@Override
public void writeMetadataBlob(
OperationPurpose purpose,
String blobName,
boolean failIfAlreadyExists,
boolean atomic,
Expand All @@ -262,7 +269,7 @@ public void writeMetadataBlob(
if (shouldErrorForWriteMetadataBlob.get() && blobName.startsWith("snap-")) {
throw new RuntimeException("snap file error");
}
super.writeMetadataBlob(blobName, failIfAlreadyExists, atomic, writer);
super.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer);
}

@Override
Expand Down

0 comments on commit 2968087

Please sign in to comment.