From 2968087f1c19a94e22aac7a15c8701956c08d364 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 2 Oct 2023 20:34:30 +1100 Subject: [PATCH] address feedback --- .../gcs/GoogleCloudStorageBlobStore.java | 4 +-- .../s3/S3BlobStoreRepositoryTests.java | 2 +- .../repositories/s3/S3BlobStore.java | 14 +++++----- .../common/blobstore/BlobContainer.java | 26 +++++++++---------- .../common/blobstore/BlobStore.java | 2 +- .../common/blobstore/OperationPurpose.java | 4 +++ .../repositories/fs/FsRepositoryTests.java | 15 ++++++++--- 7 files changed, 39 insertions(+), 28 deletions(-) diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 15e68339e936a..51d26a169ad0e 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -489,7 +489,7 @@ private void writeBlobMultipart(BlobInfo blobInfo, byte[] buffer, int offset, in /** * Deletes the given path and all its children. * - * @param purpose The purpose of the delete operation, useful for stats collection + * @param purpose The purpose of the delete operation * @param pathStr Name of path to delete */ DeleteResult deleteDirectory(OperationPurpose purpose, String pathStr) throws IOException { @@ -524,7 +524,7 @@ public String next() { /** * Deletes multiple blobs from the specific bucket using a batch request * - * @param purpose the purpose of the delete operation, useful for stats collection + * @param purpose the purpose of the delete operation * @param blobNames names of the blobs to delete */ @Override diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 11eb86c50dd2e..d7294cab93844 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -216,7 +216,7 @@ public void testRequestStatsWithOperationPurposes() throws IOException { final BlobPath blobPath = repository.basePath().add(randomAlphaOfLength(10)); final BlobContainer blobContainer = blobStore.blobContainer(blobPath); - final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT, () -> randomFrom(OperationPurpose.values())); + final OperationPurpose purpose = randomFrom(OperationPurpose.values()); final BytesArray whatToWrite = new BytesArray(randomByteArrayOfLength(randomIntBetween(100, 1000))); blobContainer.writeBlob(purpose, "test.txt", whatToWrite, true); try (InputStream is = blobContainer.readBlob(purpose, "test.txt")) { diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 1d18ca2481904..668a648112f64 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -228,30 +228,30 @@ public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator { partition.add(key); if (partition.size() == MAX_BULK_DELETES) { - deletePartition(clientReference, partition, aex, purpose); + deletePartition(purpose, clientReference, partition, aex); partition.clear(); } }); if (partition.isEmpty() == false) { - deletePartition(clientReference, partition, aex, purpose); + deletePartition(purpose, clientReference, partition, aex); } }); if (aex.get() != null) { throw aex.get(); } } catch (Exception e) { - throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e); + throw new IOException("Failed to delete blobs " + partition.stream().limit(delete10).toList(), e); } } private void deletePartition( + OperationPurpose purpose, AmazonS3Reference clientReference, List partition, - AtomicReference aex, - OperationPurpose purpose + AtomicReference aex ) { try { - clientReference.client().deleteObjects(bulkDelete(this, partition, purpose)); + clientReference.client().deleteObjects(bulkDelete(purpose, this, partition)); } catch (MultiObjectDeleteException e) { // We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead // first remove all keys that were sent in the request and then add back those that ran into an exception. @@ -270,7 +270,7 @@ private void deletePartition( } } - private static DeleteObjectsRequest bulkDelete(S3BlobStore blobStore, List blobs, OperationPurpose purpose) { + private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobStore blobStore, List blobs) { return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)) .withQuiet(true) .withRequestMetricCollector(blobStore.deleteMetricCollector); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 5ac330dcb8418..ce4d249b2e112 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -37,7 +37,7 @@ public interface BlobContainer { /** * Tests whether a blob with the given blob name exists in the container. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobName The name of the blob whose existence is to be determined. * @return {@code true} if a blob exists in the {@link BlobContainer} with the given name, and {@code false} otherwise. */ @@ -51,7 +51,7 @@ default boolean blobExists(String blobName) throws IOException { /** * Creates a new {@link InputStream} for the given blob name. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobName The name of the blob to get an {@link InputStream} for. * @return The {@code InputStream} to read the blob. * @throws NoSuchFileException if the blob does not exist @@ -69,7 +69,7 @@ default InputStream readBlob(String blobName) throws IOException { * a specific {@code position} in the blob. The {@code length} is an indication of the * number of bytes that are expected to be read from the {@link InputStream}. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobName The name of the blob to get an {@link InputStream} for. * @param position The position in the blob where the next byte will be read. * @param length An indication of the number of bytes to be read. @@ -107,7 +107,7 @@ default long readBlobPreferredLength() { * This method assumes the container does not already contain a blob of the same blobName. If a blob by the * same name already exists, the operation will fail and an {@link IOException} will be thrown. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobName The name of the blob to write the contents of the input stream to. * @param inputStream The input stream from which to retrieve the bytes to write to the blob. * @param blobSize The size of the blob to be written, in bytes. It is implementation dependent whether @@ -127,7 +127,7 @@ default void writeBlob(String blobName, InputStream inputStream, long blobSize, /** * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobName The name of the blob to write the contents of the input stream to. * @param bytes The bytes to write * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists @@ -150,7 +150,7 @@ default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlre * This method is only used for streaming serialization of repository metadata that is known to be of limited size * at any point in time and across all concurrent invocations of this method. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobName the name of the blob to write * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists * @param atomic whether the write should be atomic in case the implementation supports it @@ -178,7 +178,7 @@ default void writeMetadataBlob( * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobName The name of the blob to write the contents of the input stream to. * @param bytes The bytes to write * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists @@ -195,7 +195,7 @@ default void writeBlobAtomic(String blobName, BytesReference bytes, boolean fail /** * Deletes this container and all its contents from the repository. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @return delete result * @throws IOException on failure */ @@ -210,7 +210,7 @@ default DeleteResult delete() throws IOException { * Deletes the blobs with given names. This method will not throw an exception * when one or multiple of the given blobs don't exist and simply ignore this case. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobNames the names of the blobs to delete * @throws IOException if a subset of blob exists but could not be deleted. */ @@ -240,7 +240,7 @@ default Map listBlobs() throws IOException { * a path that has this containers {@link #path()} return as its prefix and has one more path element than the current * container's path. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @return Map of name of the child container to child container * @throws IOException on failure to list child containers */ @@ -254,7 +254,7 @@ default Map children() throws IOException { /** * Lists all blobs in the container that match the specified prefix. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param blobNamePrefix The prefix to match against blob names in the container. * @return A map of the matching blobs in the container. The keys in the map are the names of the blobs * and the values are {@link BlobMetadata}, containing basic information about each blob. @@ -271,7 +271,7 @@ default Map listBlobsByPrefix(String blobNamePrefix) throw * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}. * Keys not yet used start at initial value 0. Returns the current value (before it was updated). * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param key key of the value to update * @param expected the expected value * @param updated the new value @@ -332,7 +332,7 @@ default void compareAndSetRegister(String key, BytesReference expected, BytesRef * Gets the value set by {@link #compareAndSetRegister} or {@link #compareAndExchangeRegister} for a given key. * If a key has not yet been used, the initial value is an empty {@link BytesReference}. * - * @param purpose The purpose of the operation, useful for stats collection. + * @param purpose The purpose of the operation * @param key key of the value to get * @param listener a listener, completed with the value read from the register or {@code OptionalBytesReference#MISSING} if the value * could not be read due to concurrent activity. diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java index fef92f5319e6a..4b602822f5f2f 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java @@ -37,7 +37,7 @@ default void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws I /** * Delete all the provided blobs from the blob store. Each blob could belong to a different {@code BlobContainer} * - * @param purpose the purpose of the delete operation, useful for stats collection + * @param purpose the purpose of the delete operation * @param blobNames the blobs to be deleted */ default void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator blobNames) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/OperationPurpose.java b/server/src/main/java/org/elasticsearch/common/blobstore/OperationPurpose.java index 685baca20e293..2cfa309c1f7c1 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/OperationPurpose.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/OperationPurpose.java @@ -8,6 +8,10 @@ package org.elasticsearch.common.blobstore; +/** + * The purpose of an operation against the blobstore. For example, it can be useful for stats collection + * as well as other things that requires further differentiation for the same blob operation. + */ public enum OperationPurpose { SNAPSHOT("Snapshot"), CLUSTER_STATE("ClusterState"), diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 66eaeb2da9108..b5361a22226d1 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.ClusterSettings; @@ -242,18 +243,24 @@ public BlobContainer blobContainer(BlobPath path) { final BlobContainer blobContainer = blobStore.blobContainer(path); return new FilterBlobContainer(blobContainer) { @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) - throws IOException { + public void writeBlob( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { if (canErrorForWriteBlob.get() && randomIntBetween(0, 10) == 0) { writeBlobErrored.set(true); throw new IOException("disk full"); } else { - super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + super.writeBlob(purpose, blobName, inputStream, blobSize, failIfAlreadyExists); } } @Override public void writeMetadataBlob( + OperationPurpose purpose, String blobName, boolean failIfAlreadyExists, boolean atomic, @@ -262,7 +269,7 @@ public void writeMetadataBlob( if (shouldErrorForWriteMetadataBlob.get() && blobName.startsWith("snap-")) { throw new RuntimeException("snap file error"); } - super.writeMetadataBlob(blobName, failIfAlreadyExists, atomic, writer); + super.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer); } @Override