From 6ea49a91422c9f25e3f97b759a390232741d6db4 Mon Sep 17 00:00:00 2001 From: Sourav Gupta Date: Wed, 6 Dec 2023 13:55:46 +0530 Subject: [PATCH 1/3] return ErrUnexpectedEOF as error in UploadStream --- sdk/storage/azblob/blockblob/chunkwriting.go | 4 +- sdk/storage/azblob/blockblob/client_test.go | 128 +++++++++++++++++++ sdk/storage/azblob/internal/shared/shared.go | 24 ++++ 3 files changed, 154 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azblob/blockblob/chunkwriting.go b/sdk/storage/azblob/blockblob/chunkwriting.go index 212255d4c66b..24df42c75ef0 100644 --- a/sdk/storage/azblob/blockblob/chunkwriting.go +++ b/sdk/storage/azblob/blockblob/chunkwriting.go @@ -75,7 +75,7 @@ func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWrit } var n int - n, err = io.ReadFull(src, buffer) + n, err = shared.ReadAtLeast(src, buffer, len(buffer)) if n > 0 { // some data was read, upload it @@ -108,7 +108,7 @@ func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWrit } if err != nil { // The reader is done, no more outgoing buffers - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + if errors.Is(err, io.EOF) { // these are expected errors, we don't surface those err = nil } else { diff --git a/sdk/storage/azblob/blockblob/client_test.go b/sdk/storage/azblob/blockblob/client_test.go index d1078e09c082..e7d3841ab3a0 100644 --- a/sdk/storage/azblob/blockblob/client_test.go +++ b/sdk/storage/azblob/blockblob/client_test.go @@ -5033,6 +5033,134 @@ func (s *BlockBlobUnrecordedTestsSuite) TestUploadStreamToBlobProperties() { _require.EqualValues(actualBlobData, blobData) } +func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadDownloadStream() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + blobSize := 11 * 1024 * 1024 + bufferSize := 2 * 1024 * 1024 + maxBuffers := 2 + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + // Set up test blob + blobName := testcommon.GenerateBlobName(testName) + bbClient := testcommon.GetBlockBlobClient(blobName, containerClient) + blobContentReader, blobData := testcommon.GenerateData(blobSize) + + _, err = bbClient.UploadStream(context.Background(), blobContentReader, &blockblob.UploadStreamOptions{ + BlockSize: int64(bufferSize), + Concurrency: maxBuffers, + Metadata: testcommon.BasicMetadata, + Tags: testcommon.BasicBlobTagsMap, + HTTPHeaders: &testcommon.BasicHeaders, + }) + _require.NoError(err) + + downloadResponse, err := bbClient.DownloadStream(context.Background(), nil) + _require.NoError(err) + + bbClient2 := testcommon.GetBlockBlobClient("blobName2", containerClient) + + // UploadStream using http.Response.Body as the reader + _, err = bbClient2.UploadStream(context.Background(), downloadResponse.Body, &blockblob.UploadStreamOptions{ + BlockSize: int64(bufferSize), + Concurrency: maxBuffers, + }) + _require.NoError(err) + + downloadResp2, err := bbClient2.DownloadStream(context.Background(), nil) + _require.NoError(err) + + // Assert that the content is correct + actualBlobData, err := io.ReadAll(downloadResp2.Body) + _require.NoError(err) + _require.Equal(len(actualBlobData), len(blobData)) + _require.EqualValues(actualBlobData, blobData) +} + +// This test simulates UploadStream and DownloadBuffer methods, +// and verifies length and content of file +func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadStreamDownloadBuffer() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + const MiB = 1024 * 1024 + testUploadDownload := func(contentSize int) { + content := make([]byte, contentSize) + _, _ = rand.Read(content) + contentMD5 := md5.Sum(content) + body := streaming.NopCloser(bytes.NewReader(content)) + + srcBlob := containerClient.NewBlockBlobClient("srcblob") + + // Prepare source bbClient for copy. + _, err = srcBlob.UploadStream(context.Background(), body, &blockblob.UploadStreamOptions{ + BlockSize: 4 * MiB, + Concurrency: 5, + }) + _require.NoError(err) + + // Download to a buffer and verify contents + buff := make([]byte, contentSize) + b := blob.DownloadBufferOptions{ + BlockSize: 5 * MiB, + Concurrency: 4, + } + n, err := srcBlob.DownloadBuffer(context.Background(), buff, &b) + _require.NoError(err) + _require.Equal(int64(contentSize), n) + _require.Equal(contentMD5, md5.Sum(buff[:])) + } + + testUploadDownload(0) // zero byte blob + testUploadDownload(5 * MiB) + testUploadDownload(20 * MiB) + testUploadDownload(199 * MiB) +} + +type fakeReader struct { + cnt int +} + +func (a *fakeReader) Read(bytes []byte) (count int, err error) { + if a.cnt < 5 { + _, buf := testcommon.GenerateData(1024) + n := copy(bytes, buf) + a.cnt++ + return n, nil + } + return 0, io.ErrUnexpectedEOF +} + +func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadStreamUsingCustomReader() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + bbClient := testcommon.GetBlockBlobClient(testcommon.GenerateBlobName(testName), containerClient) + + r := &fakeReader{} + _, err = bbClient.UploadStream(context.Background(), r, nil) + _require.Error(err) + _require.Equal(err, io.ErrUnexpectedEOF) +} + func (s *BlockBlobRecordedTestsSuite) TestBlockBlobSetTierOnVersions() { _require := require.New(s.T()) testName := s.T().Name() diff --git a/sdk/storage/azblob/internal/shared/shared.go b/sdk/storage/azblob/internal/shared/shared.go index e9f1f41177a4..10ba130011f9 100644 --- a/sdk/storage/azblob/internal/shared/shared.go +++ b/sdk/storage/azblob/internal/shared/shared.go @@ -254,3 +254,27 @@ func IsIPEndpointStyle(host string) bool { } return net.ParseIP(host) != nil } + +// ReadAtLeast reads from r into buf until it has read at least min bytes. +// It returns the number of bytes copied and an error. +// The EOF error is returned if no bytes were read or +// EOF happened after reading fewer than min bytes. +// If min is greater than the length of buf, ReadAtLeast returns ErrShortBuffer. +// On return, n >= min if and only if err == nil. +// If r returns an error having read at least min bytes, the error is dropped. +// This method is same as io.ReadAtLeast except that +// it does not return io.ErrUnexpectedEOF. +func ReadAtLeast(r io.Reader, buf []byte, min int) (n int, err error) { + if len(buf) < min { + return 0, io.ErrShortBuffer + } + for n < min && err == nil { + var nn int + nn, err = r.Read(buf[n:]) + n += nn + } + if n >= min { + err = nil + } + return +} From 4acc56ba465fafda49dcaa247313dfe7d6247e73 Mon Sep 17 00:00:00 2001 From: Sourav Gupta Date: Wed, 6 Dec 2023 14:12:04 +0530 Subject: [PATCH 2/3] Adding changelog --- sdk/storage/azblob/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/storage/azblob/CHANGELOG.md b/sdk/storage/azblob/CHANGELOG.md index ad4214f99d00..87de0feb74e5 100644 --- a/sdk/storage/azblob/CHANGELOG.md +++ b/sdk/storage/azblob/CHANGELOG.md @@ -15,6 +15,7 @@ * Fixed an issue that would cause metadata keys with empty values to be omitted when enumerating blobs. * Fixed an issue where passing empty map to set blob tags API was causing panic. Fixes [#21869](https://github.com/Azure/azure-sdk-for-go/issues/21869). * Fixed an issue where downloaded file has incorrect size when not a multiple of block size. Fixes [#21995](https://github.com/Azure/azure-sdk-for-go/issues/21995). +* Fixed case where `io.ErrUnexpectedEOF` was treated as expected error in `UploadStream`. Fixes [#21837](https://github.com/Azure/azure-sdk-for-go/issues/21837). ### Other Changes From 99f9e828b471588912d19f90d68f72ba526965e3 Mon Sep 17 00:00:00 2001 From: Sourav Gupta Date: Thu, 7 Dec 2023 18:03:27 +0530 Subject: [PATCH 3/3] doc update --- sdk/storage/azblob/internal/shared/shared.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azblob/internal/shared/shared.go b/sdk/storage/azblob/internal/shared/shared.go index 10ba130011f9..c131facf7b7f 100644 --- a/sdk/storage/azblob/internal/shared/shared.go +++ b/sdk/storage/azblob/internal/shared/shared.go @@ -262,8 +262,8 @@ func IsIPEndpointStyle(host string) bool { // If min is greater than the length of buf, ReadAtLeast returns ErrShortBuffer. // On return, n >= min if and only if err == nil. // If r returns an error having read at least min bytes, the error is dropped. -// This method is same as io.ReadAtLeast except that -// it does not return io.ErrUnexpectedEOF. +// This method is same as io.ReadAtLeast except that it does not +// return io.ErrUnexpectedEOF when fewer than min bytes are read. func ReadAtLeast(r io.Reader, buf []byte, min int) (n int, err error) { if len(buf) < min { return 0, io.ErrShortBuffer