From 03ddc8a6b07221b1ed60ad5627939d1f957b1c49 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Wed, 30 Aug 2023 12:11:19 -0700 Subject: [PATCH] Add async read support for S3 plugin Signed-off-by: Kunal Kotwani --- .../repositories/s3/S3BlobContainer.java | 47 +++++++- .../s3/async/AsyncTransferManager.java | 78 +++++++++++++ .../repositories/s3/utils/HttpRangeUtils.java | 24 ++++ .../s3/S3BlobStoreContainerTests.java | 104 ++++++++++++++++-- .../s3/async/AsyncTransferManagerTests.java | 97 ++++++++++++++++ .../s3/utils/HttpRangeUtilsTests.java | 32 ++++++ 6 files changed, 374 insertions(+), 8 deletions(-) create mode 100644 plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 183b5f8fe7ac1..18abd2f041548 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -44,6 +44,7 @@ import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; @@ -63,6 +64,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; @@ -75,10 +77,12 @@ import org.opensearch.common.blobstore.support.AbstractBlobContainer; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.repositories.s3.async.UploadRequest; import java.io.ByteArrayInputStream; @@ -91,6 +95,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -212,9 +217,49 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } + @ExperimentalApi @Override public void readBlobAsync(String blobName, ActionListener listener) { - throw new UnsupportedOperationException(); + try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { + final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client(); + final String bucketName = blobStore.bucket(); + final AsyncTransferManager transferManager = blobStore.getAsyncTransferManager(); + + final GetObjectAttributesResponse blobMetadata = transferManager.getBlobPartMetadata(blobName, bucketName, s3AsyncClient).get(); + + final long blobSize = blobMetadata.objectSize(); + final int numberOfParts = blobMetadata.objectParts().totalPartsCount(); + final String blobChecksum = blobMetadata.checksum().checksumCRC32(); + + final List blobPartStreams = new ArrayList<>(); + final List> blobPartInputStreamFutures = new ArrayList<>(); + for (int partNumber = 0; partNumber < numberOfParts; partNumber++) { + int finalPartNumber = partNumber; + CompletableFuture partInputStreamFuture = transferManager.getBlobPartInputStreamContainer( + s3AsyncClient, + bucketName, + blobName, + partNumber + ).whenComplete((inputStreamContainer, error) -> { + if (error == null) { + blobPartStreams.add(finalPartNumber, inputStreamContainer); + } + }); + + blobPartInputStreamFutures.add(partInputStreamFuture); + } + + CompletableFuture.allOf(blobPartInputStreamFutures.toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> { + if (throwable == null) { + listener.onResponse(new ReadContext(blobSize, blobPartStreams, blobChecksum)); + } else { + Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; + listener.onFailure(ex); + } + }); + } catch (ExecutionException | InterruptedException ex) { + listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex)); + } } // package private for testing diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 8d45c2167a3d1..6f4895a14efd0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -8,8 +8,11 @@ package org.opensearch.repositories.s3.async; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; @@ -20,6 +23,11 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ObjectAttributes; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.utils.CompletableFutureUtils; @@ -29,13 +37,16 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; import org.opensearch.common.StreamContext; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.exception.CorruptFileException; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.util.ByteUtils; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.s3.SocketAccess; import org.opensearch.repositories.s3.io.CheckedContainer; +import org.opensearch.repositories.s3.utils.HttpRangeUtils; import java.io.IOException; import java.util.Arrays; @@ -353,4 +364,71 @@ private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest upl return null; }); } + + /** + * Fetches a part of the blob from the S3 bucket and transforms it to an {@link InputStreamContainer}, which holds + * the stream and its related metadata. + * @param s3AsyncClient Async client to be utilized to fetch the object part + * @param bucketName Name of the S3 bucket + * @param blobName Identifier of the blob for which the parts will be fetched + * @param partNumber Part number for the blob to be retrieved + * @return A future of {@link InputStreamContainer} containing the stream and stream metadata. + */ + @ExperimentalApi + public CompletableFuture getBlobPartInputStreamContainer( + S3AsyncClient s3AsyncClient, + String bucketName, + String blobName, + int partNumber + ) { + final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder() + .bucket(bucketName) + .key(blobName) + .partNumber(partNumber); + + return SocketAccess.doPrivileged( + () -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream()) + .thenApply(this::transformResponseToInputStreamContainer) + ); + } + + /** + * Transforms the stream response object from S3 into an {@link InputStreamContainer} + * @param streamResponse Response stream object from S3 + * @return {@link InputStreamContainer} containing the stream and stream metadata + */ + // Package-Private for testing. + InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream streamResponse) { + final GetObjectResponse getObjectResponse = streamResponse.response(); + final String contentRange = getObjectResponse.contentRange(); + final Long contentLength = getObjectResponse.contentLength(); + if (contentRange == null || contentLength == null) { + throw SdkException.builder().message("Failed to fetch required metadata for blob part").build(); + } + final Tuple s3ResponseRange = HttpRangeUtils.fromHttpRangeHeader(getObjectResponse.contentRange()); + return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), s3ResponseRange.v1()); + } + + /** + * Retrieves the metadata like checksum, object size and parts for the provided blob within the S3 bucket. + * @param blobName Identifier of the blob for which the metadata will be fetched + * @param bucketName Name of the S3 bucket + * @param s3AsyncClient Async client to be utilized to fetch the metadata + * @return A future containing the metadata within {@link GetObjectAttributesResponse} + */ + @ExperimentalApi + public CompletableFuture getBlobPartMetadata( + String blobName, + String bucketName, + S3AsyncClient s3AsyncClient + ) { + // Fetch blob metadata - part info, size, checksum + final GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder() + .bucket(bucketName) + .key(blobName) + .objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS) + .build(); + + return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest)); + } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java index 40aec7d52847b..4c50cb708f008 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/HttpRangeUtils.java @@ -8,7 +8,31 @@ package org.opensearch.repositories.s3.utils; +import software.amazon.awssdk.core.exception.SdkException; + +import org.opensearch.common.collect.Tuple; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + public final class HttpRangeUtils { + private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes\\s+(\\d+)-(\\d+)/(\\d+|.*)$"); + + /** + * Parses the content range header string value to calculate the start and end of the stream + * Tests against the RFC9110 specification of content range string. + * Sample values: "bytes 0-10/200", "bytes 0-10/*" + * Details here + * @param headerValue Header content range string value from the HTTP response + * @return Pair of values where v1 represents the lower and v2 represents the upper bound of the stream + */ + public static Tuple fromHttpRangeHeader(String headerValue) { + Matcher matcher = RANGE_PATTERN.matcher(headerValue); + if (!matcher.find()) { + throw SdkException.create("Regex match for Content-Range header {" + headerValue + "} failed", new RuntimeException()); + } + return new Tuple<>(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2))); + } /** * Provides a byte range string per RFC 9110 diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 1c4936cae7eba..6265a54321610 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -32,11 +32,15 @@ package org.opensearch.repositories.s3; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.Checksum; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedPart; @@ -44,6 +48,11 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesParts; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; @@ -61,15 +70,18 @@ import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; +import org.opensearch.action.LatchedActionListener; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.collect.Tuple; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; @@ -86,6 +98,9 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -94,6 +109,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -882,15 +898,60 @@ public void onFailure(Exception e) {} } } - public void testAsyncBlobDownload() { + public void testAsyncBlobDownload() throws InterruptedException { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final String blobName = randomAlphaOfLengthBetween(1, 10); + final String checksum = randomAlphaOfLength(10); + + final long objectSize = 100L; + final int objectPartCount = 10; + final int partSize = 10; + + final String contentRange = "bytes 0-10/100"; + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference amazonAsyncS3Reference = mock(AmazonAsyncS3Reference.class); + final AsyncTransferManager asyncTransferManager = new AsyncTransferManager( + 10000L, + mock(ExecutorService.class), + mock(ExecutorService.class) + ); final S3BlobStore blobStore = mock(S3BlobStore.class); - final BlobPath blobPath = mock(BlobPath.class); - final String blobName = "test-blob"; + final BlobPath blobPath = new BlobPath(); + + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.serverSideEncryption()).thenReturn(false); + AmazonAsyncS3Reference reference = new AmazonAsyncS3Reference( + AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) + ); + when(blobStore.asyncClientReference()).thenReturn(reference); + + // when(amazonAsyncS3Reference.get()).thenReturn(s3AsyncClient); + when(blobStore.getAsyncTransferManager()).thenReturn(asyncTransferManager); + + CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); + getObjectAttributesResponseCompletableFuture.complete( + GetObjectAttributesResponse.builder() + .checksum(Checksum.builder().checksumCRC32(checksum).build()) + .objectSize(objectSize) + .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) + .build() + ); + when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( + getObjectAttributesResponseCompletableFuture + ); + + mockObjectPartResponse(s3AsyncClient, bucketName, blobName, objectPartCount, partSize, objectSize); + + CountDownLatch countDownLatch = new CountDownLatch(1); + ActionListener readContextActionListener = new PlainActionFuture<>(); + LatchedActionListener listener = new LatchedActionListener<>(readContextActionListener, countDownLatch); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.readBlobAsync(blobName, listener); + countDownLatch.await(); - final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> { - final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.readBlobAsync(blobName, new PlainActionFuture<>()); - }); } public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { @@ -912,4 +973,33 @@ public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanPageSiz public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberOfRecords() throws IOException { testListBlobsByPrefixInLexicographicOrder(12, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); } + + private void mockObjectPartResponse( + S3AsyncClient s3AsyncClient, + String bucketName, + String blobName, + int totalNumberOfParts, + int partSize, + long objectSize + ) { + for (int partNumber = 0; partNumber < totalNumberOfParts; partNumber++) { + final int start = partNumber * partSize; + final int end = (partNumber + 1) * partSize; + final String contentRange = "bytes " + start + "-" + end + "/" + objectSize; + final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(partSize)); + + GetObjectResponse getObjectResponse = GetObjectResponse.builder() + .contentLength((long) partSize) + .contentRange(contentRange) + .build(); + + CompletableFuture> getObjectPartResponse = new CompletableFuture<>(); + ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); + getObjectPartResponse.complete(responseInputStream); + + GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucketName).key(blobName).partNumber(partNumber).build(); + + when(s3AsyncClient.getObject(eq(getObjectRequest), any(AsyncResponseTransformer.class))).thenReturn(getObjectPartResponse); + } + } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 9c07b929052bc..175a433d4ec73 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -9,17 +9,26 @@ package org.opensearch.repositories.s3.async; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.Checksum; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesParts; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; +import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -36,6 +45,8 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.io.IOException; +import java.io.InputStream; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -236,4 +247,90 @@ public void testMultipartUploadCorruption() { verify(s3AsyncClient, times(0)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); verify(s3AsyncClient, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); } + + public void testGetBlobPartMetadata() throws ExecutionException, InterruptedException { + final String checksum = "test_checksum"; + final long objectSize = 100L; + final int objectPartCount = 10; + final String blobName = "test_blob"; + final String bucketName = "test_bucket"; + + CompletableFuture getObjectAttributesResponseCompletableFuture = new CompletableFuture<>(); + getObjectAttributesResponseCompletableFuture.complete( + GetObjectAttributesResponse.builder() + .checksum(Checksum.builder().checksumCRC32(checksum).build()) + .objectSize(objectSize) + .objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build()) + .build() + ); + when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn( + getObjectAttributesResponseCompletableFuture + ); + + CompletableFuture responseFuture = asyncTransferManager.getBlobPartMetadata( + blobName, + bucketName, + s3AsyncClient + ); + GetObjectAttributesResponse objectAttributesResponse = responseFuture.get(); + + assertEquals(checksum, objectAttributesResponse.checksum().checksumCRC32()); + assertEquals(Long.valueOf(objectSize), objectAttributesResponse.objectSize()); + assertEquals(Integer.valueOf(objectPartCount), objectAttributesResponse.objectParts().totalPartsCount()); + } + + public void testGetBlobPartInputStream() throws ExecutionException, InterruptedException, IOException { + final String blobName = "test_blob"; + final String bucketName = "test_bucket"; + final long contentLength = 10L; + final String contentRange = "bytes 0-10/100"; + final InputStream inputStream = ResponseInputStream.nullInputStream(); + + GetObjectResponse getObjectResponse = GetObjectResponse.builder().contentLength(contentLength).contentRange(contentRange).build(); + + CompletableFuture> getObjectPartResponse = new CompletableFuture<>(); + ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); + getObjectPartResponse.complete(responseInputStream); + + when(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))).thenReturn(getObjectPartResponse); + + InputStreamContainer inputStreamContainer = asyncTransferManager.getBlobPartInputStreamContainer( + s3AsyncClient, + bucketName, + blobName, + 0 + ).get(); + + assertEquals(0, inputStreamContainer.getOffset()); + assertEquals(contentLength, inputStreamContainer.getContentLength()); + assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); + } + + public void testTransformResponseToInputStreamContainer() throws IOException { + final String contentRange = "bytes 0-10/100"; + final long contentLength = 10L; + final InputStream inputStream = ResponseInputStream.nullInputStream(); + + GetObjectResponse getObjectResponse = GetObjectResponse.builder().contentLength(contentLength).build(); + + ResponseInputStream responseInputStreamNoRange = new ResponseInputStream<>(getObjectResponse, inputStream); + assertThrows(SdkException.class, () -> asyncTransferManager.transformResponseToInputStreamContainer(responseInputStreamNoRange)); + + getObjectResponse = GetObjectResponse.builder().contentRange(contentRange).build(); + ResponseInputStream responseInputStreamNoContentLength = new ResponseInputStream<>( + getObjectResponse, + inputStream + ); + assertThrows( + SdkException.class, + () -> asyncTransferManager.transformResponseToInputStreamContainer(responseInputStreamNoContentLength) + ); + + getObjectResponse = GetObjectResponse.builder().contentRange(contentRange).contentLength(contentLength).build(); + ResponseInputStream responseInputStream = new ResponseInputStream<>(getObjectResponse, inputStream); + InputStreamContainer inputStreamContainer = asyncTransferManager.transformResponseToInputStreamContainer(responseInputStream); + assertEquals(contentLength, inputStreamContainer.getContentLength()); + assertEquals(0, inputStreamContainer.getOffset()); + assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); + } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java new file mode 100644 index 0000000000000..3420dc6df87e8 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/utils/HttpRangeUtilsTests.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.utils; + +import software.amazon.awssdk.core.exception.SdkException; + +import org.opensearch.common.collect.Tuple; +import org.opensearch.test.OpenSearchTestCase; + +public final class HttpRangeUtilsTests extends OpenSearchTestCase { + + public void testFromHttpRangeHeader() { + String headerValue = "bytes 0-10/200"; + Tuple range = HttpRangeUtils.fromHttpRangeHeader(headerValue); + assertEquals(0L, range.v1().longValue()); + assertEquals(10L, range.v2().longValue()); + + headerValue = "bytes 0-10/*"; + range = HttpRangeUtils.fromHttpRangeHeader(headerValue); + assertEquals(0L, range.v1().longValue()); + assertEquals(10L, range.v2().longValue()); + + final String invalidHeaderValue = "bytes */*"; + assertThrows(SdkException.class, () -> HttpRangeUtils.fromHttpRangeHeader(invalidHeaderValue)); + } +}