Skip to content

Commit

Permalink
Fix error handling for future completion (#10406) (#10420)
Browse files Browse the repository at this point in the history
(cherry picked from commit 10bae20)

Signed-off-by: Kunal Kotwani <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent abbb7d5 commit 049590a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,23 +241,27 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
return;
}

final List<ReadContext.StreamPartCreator> blobPartInputStreamFutures = new ArrayList<>();
final long blobSize = blobMetadata.objectSize();
final Integer numberOfParts = blobMetadata.objectParts() == null ? null : blobMetadata.objectParts().totalPartsCount();
final String blobChecksum = blobMetadata.checksum().checksumCRC32();

if (numberOfParts == null) {
blobPartInputStreamFutures.add(() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, null));
} else {
// S3 multipart files use 1 to n indexing
for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) {
final int innerPartNumber = partNumber;
blobPartInputStreamFutures.add(
() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, innerPartNumber)
);
try {
final List<ReadContext.StreamPartCreator> blobPartInputStreamFutures = new ArrayList<>();
final long blobSize = blobMetadata.objectSize();
final Integer numberOfParts = blobMetadata.objectParts() == null ? null : blobMetadata.objectParts().totalPartsCount();
final String blobChecksum = blobMetadata.checksum() == null ? null : blobMetadata.checksum().checksumCRC32();

if (numberOfParts == null) {
blobPartInputStreamFutures.add(() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, null));
} else {
// S3 multipart files use 1 to n indexing
for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) {
final int innerPartNumber = partNumber;
blobPartInputStreamFutures.add(
() -> getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobKey, innerPartNumber)
);
}
}
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
} catch (Exception ex) {
listener.onFailure(ex);
}
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
});
} catch (Exception ex) {
listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,51 @@ public void testReadBlobAsyncFailure() throws Exception {
assertEquals(1, readContextActionListener.getFailureCount());
}

public void testReadBlobAsyncOnCompleteFailureMissingData() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final String blobName = randomAlphaOfLengthBetween(1, 10);
final String checksum = randomAlphaOfLength(10);

final long objectSize = 100L;
final int objectPartCount = 10;

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference(
AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null)
);

final S3BlobStore blobStore = mock(S3BlobStore.class);
final BlobPath blobPath = new BlobPath();

when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.serverSideEncryption()).thenReturn(false);
when(blobStore.asyncClientReference()).thenReturn(amazonAsyncS3Reference);

CompletableFuture<GetObjectAttributesResponse> getObjectAttributesResponseCompletableFuture = new CompletableFuture<>();
getObjectAttributesResponseCompletableFuture.complete(
GetObjectAttributesResponse.builder()
.checksum(Checksum.builder().build())
.objectSize(null)
.objectParts(GetObjectAttributesParts.builder().totalPartsCount(objectPartCount).build())
.build()
);
when(s3AsyncClient.getObjectAttributes(any(GetObjectAttributesRequest.class))).thenReturn(
getObjectAttributesResponseCompletableFuture
);

CountDownLatch countDownLatch = new CountDownLatch(1);
CountingCompletionListener<ReadContext> readContextActionListener = new CountingCompletionListener<>();
LatchedActionListener<ReadContext> listener = new LatchedActionListener<>(readContextActionListener, countDownLatch);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.readBlobAsync(blobName, listener);
countDownLatch.await();

assertEquals(0, readContextActionListener.getResponseCount());
assertEquals(1, readContextActionListener.getFailureCount());
}

public void testGetBlobMetadata() throws Exception {
final String checksum = randomAlphaOfLengthBetween(1, 10);
final long objectSize = 100L;
Expand Down

0 comments on commit 049590a

Please sign in to comment.