diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/_shared/uploads.py b/sdk/storage/azure-storage-blob/azure/storage/blob/_shared/uploads.py index a8d120ec30b9..941a90faf539 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/_shared/uploads.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/_shared/uploads.py @@ -477,6 +477,13 @@ def read(self, size=None): raise IOError("Stream failed to seek to the desired location.") buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size) else: + absolute_position = self._stream_begin_index + self._position + # It's possible that there's connection problem during data transfer, + # so when we retry we don't want to read from current position of wrapped stream, + # instead we should seek to where we want to read from. + if self._wrapped_stream.tell() != absolute_position: + self._wrapped_stream.seek(absolute_position, SEEK_SET) + buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size) if buffer_from_stream: diff --git a/sdk/storage/azure-storage-blob/tests/test_largest_block_blob.py b/sdk/storage/azure-storage-blob/tests/test_largest_block_blob.py index d632a9cbfb96..e00abbf91ca8 100644 --- a/sdk/storage/azure-storage-blob/tests/test_largest_block_blob.py +++ b/sdk/storage/azure-storage-blob/tests/test_largest_block_blob.py @@ -22,6 +22,8 @@ from _shared.testcase import StorageTestCase, GlobalStorageAccountPreparer # ------------------------------------------------------------------------------ +from azure.storage.blob._shared.uploads import SubStream + TEST_BLOB_PREFIX = 'largestblob' LARGEST_BLOCK_SIZE = 4000 * 1024 * 1024 LARGEST_SINGLE_UPLOAD_SIZE = 5000 * 1024 * 1024 @@ -207,6 +209,31 @@ def test_create_largest_blob_from_path(self, resource_group, location, storage_a # Assert self._teardown(FILE_PATH) + def test_substream_for_single_thread_upload_large_block(self): + FILE_PATH = 'largest_blob_from_path.temp.{}.dat'.format(str(uuid.uuid4())) + with open(FILE_PATH, 'wb') as stream: + largeStream = LargeStream(LARGEST_BLOCK_SIZE, 100 * 1024 * 1024) + chunk = largeStream.read() + while chunk: + stream.write(chunk) + chunk = largeStream.read() + + with open(FILE_PATH, 'rb') as stream: + substream = SubStream(stream, 0, 2 * 1024 * 1024, None) + # this is to mimic stage large block: SubStream.read() is getting called by http client + data1 = substream.read(2 * 1024 * 1024) + substream.read(2 * 1024 * 1024) + substream.read(2 * 1024 * 1024) + + # this is to mimic rewinding request body after connection error + substream.seek(0) + + # this is to mimic retry: stage that large block from beginning + data2 = substream.read(2 * 1024 * 1024) + + self.assertEqual(data1, data2) + self._teardown(FILE_PATH) + @pytest.mark.live_test_only @GlobalStorageAccountPreparer() def test_create_largest_blob_from_path_without_network(self, resource_group, location, storage_account, storage_account_key): diff --git a/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_shared/uploads.py b/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_shared/uploads.py index 91558d30857d..1b619dfc3bea 100644 --- a/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_shared/uploads.py +++ b/sdk/storage/azure-storage-file-datalake/azure/storage/filedatalake/_shared/uploads.py @@ -477,6 +477,13 @@ def read(self, size=None): raise IOError("Stream failed to seek to the desired location.") buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size) else: + absolute_position = self._stream_begin_index + self._position + # It's possible that there's connection problem during data transfer, + # so when we retry we don't want to read from current position of wrapped stream, + # instead we should seek to where we want to read from. + if self._wrapped_stream.tell() != absolute_position: + self._wrapped_stream.seek(absolute_position, SEEK_SET) + buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size) if buffer_from_stream: diff --git a/sdk/storage/azure-storage-file-share/azure/storage/fileshare/_shared/uploads.py b/sdk/storage/azure-storage-file-share/azure/storage/fileshare/_shared/uploads.py index 91558d30857d..1b619dfc3bea 100644 --- a/sdk/storage/azure-storage-file-share/azure/storage/fileshare/_shared/uploads.py +++ b/sdk/storage/azure-storage-file-share/azure/storage/fileshare/_shared/uploads.py @@ -477,6 +477,13 @@ def read(self, size=None): raise IOError("Stream failed to seek to the desired location.") buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size) else: + absolute_position = self._stream_begin_index + self._position + # It's possible that there's connection problem during data transfer, + # so when we retry we don't want to read from current position of wrapped stream, + # instead we should seek to where we want to read from. + if self._wrapped_stream.tell() != absolute_position: + self._wrapped_stream.seek(absolute_position, SEEK_SET) + buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size) if buffer_from_stream: diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py index 91558d30857d..1b619dfc3bea 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py @@ -477,6 +477,13 @@ def read(self, size=None): raise IOError("Stream failed to seek to the desired location.") buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size) else: + absolute_position = self._stream_begin_index + self._position + # It's possible that there's connection problem during data transfer, + # so when we retry we don't want to read from current position of wrapped stream, + # instead we should seek to where we want to read from. + if self._wrapped_stream.tell() != absolute_position: + self._wrapped_stream.seek(absolute_position, SEEK_SET) + buffer_from_stream = self._wrapped_stream.read(current_max_buffer_size) if buffer_from_stream: