diff --git a/sdk/storage/azure-storage-blob/CHANGELOG.md b/sdk/storage/azure-storage-blob/CHANGELOG.md index 9386c9ab2d78..22e6b44af568 100644 --- a/sdk/storage/azure-storage-blob/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob/CHANGELOG.md @@ -8,7 +8,8 @@ This version and all future versions will require Python 3.7+. Python 3.6 is no - Added support for `AzureNamedKeyCredential` as a valid `credential` type. ### Bugs Fixed -- Updated exception catching of `StorageStreamDownloader`'s retry mechanism. +- Removed dead retry meachism from async `azure.storage.blob.aio.StorageStreamDownloader`. +- Updated exception catching of `azure.storage.blob.StorageStreamDownloader`'s retry mechanism. - Adjusted type hints for `upload_blob` and `StorageStreamDownloader.readall`. ## 12.13.0 (2022-07-07) diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py index fc9fef581db3..133c721ec383 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py @@ -12,8 +12,8 @@ from typing import AsyncIterator, Generic, TypeVar import asyncio -from aiohttp import ClientPayloadError -from azure.core.exceptions import HttpResponseError, ServiceResponseError + +from azure.core.exceptions import HttpResponseError from .._shared.request_handlers import validate_and_format_range_headers from .._shared.response_handlers import process_storage_error, parse_length_from_content_range @@ -102,27 +102,18 @@ async def _download_chunk(self, chunk_start, chunk_end): download_range[1], check_content_md5=self.validate_content ) - retry_active = True - retry_total = 3 - while retry_active: - try: - _, response = await self.client.download( - range=range_header, - range_get_content_md5=range_validation, - validate_content=self.validate_content, - data_stream_total=self.total_size, - download_stream_current=self.progress_total, - **self.request_options - ) - retry_active = False + try: + _, response = await self.client.download( + range=range_header, + range_get_content_md5=range_validation, + validate_content=self.validate_content, + data_stream_total=self.total_size, + download_stream_current=self.progress_total, + **self.request_options + ) - except HttpResponseError as error: - process_storage_error(error) - except ClientPayloadError as error: - retry_total -= 1 - if retry_total <= 0: - raise ServiceResponseError(error, error=error) - await asyncio.sleep(1) + except HttpResponseError as error: + process_storage_error(error) chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options) @@ -332,63 +323,52 @@ async def _initial_request(self): end_range_required=False, check_content_md5=self._validate_content) - retry_active = True - retry_total = 3 - while retry_active: - try: - location_mode, response = await self._clients.blob.download( - range=range_header, - range_get_content_md5=range_validation, - validate_content=self._validate_content, - data_stream_total=None, - download_stream_current=0, - **self._request_options) - - # Check the location we read from to ensure we use the same one - # for subsequent requests. - self._location_mode = location_mode - - # Parse the total file size and adjust the download size if ranges - # were specified - self._file_size = parse_length_from_content_range(response.properties.content_range) - # Remove any extra encryption data size from blob size - self._file_size = adjust_blob_size_for_encryption(self._file_size, self._encryption_data) - - if self._end_range is not None: - # Use the length unless it is over the end of the file - self.size = min(self._file_size, self._end_range - self._start_range + 1) - elif self._start_range is not None: - self.size = self._file_size - self._start_range - else: - self.size = self._file_size - retry_active = False + try: + location_mode, response = await self._clients.blob.download( + range=range_header, + range_get_content_md5=range_validation, + validate_content=self._validate_content, + data_stream_total=None, + download_stream_current=0, + **self._request_options) - except HttpResponseError as error: - if self._start_range is None and error.response.status_code == 416: - # Get range will fail on an empty file. If the user did not - # request a range, do a regular get request in order to get - # any properties. - try: - _, response = await self._clients.blob.download( - validate_content=self._validate_content, - data_stream_total=0, - download_stream_current=0, - **self._request_options) - retry_active = False - except HttpResponseError as error: - process_storage_error(error) - - # Set the download size to empty - self.size = 0 - self._file_size = 0 - else: + # Check the location we read from to ensure we use the same one + # for subsequent requests. + self._location_mode = location_mode + + # Parse the total file size and adjust the download size if ranges + # were specified + self._file_size = parse_length_from_content_range(response.properties.content_range) + # Remove any extra encryption data size from blob size + self._file_size = adjust_blob_size_for_encryption(self._file_size, self._encryption_data) + + if self._end_range is not None: + # Use the length unless it is over the end of the file + self.size = min(self._file_size, self._end_range - self._start_range + 1) + elif self._start_range is not None: + self.size = self._file_size - self._start_range + else: + self.size = self._file_size + + except HttpResponseError as error: + if self._start_range is None and error.response.status_code == 416: + # Get range will fail on an empty file. If the user did not + # request a range, do a regular get request in order to get + # any properties. + try: + _, response = await self._clients.blob.download( + validate_content=self._validate_content, + data_stream_total=0, + download_stream_current=0, + **self._request_options) + except HttpResponseError as error: process_storage_error(error) - except ClientPayloadError as error: - retry_total -= 1 - if retry_total <= 0: - raise ServiceResponseError(error, error=error) - await asyncio.sleep(1) + # Set the download size to empty + self.size = 0 + self._file_size = 0 + else: + process_storage_error(error) # get page ranges to optimize downloading sparse page blob if response.properties.blob_type == 'PageBlob': diff --git a/sdk/storage/azure-storage-blob/tests/test_retry_async.py b/sdk/storage/azure-storage-blob/tests/test_retry_async.py index d46ecd8eb851..c307b252f5ec 100644 --- a/sdk/storage/azure-storage-blob/tests/test_retry_async.py +++ b/sdk/storage/azure-storage-blob/tests/test_retry_async.py @@ -3,21 +3,21 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. # -------------------------------------------------------------------------- -import unittest -import pytest import asyncio +import functools +from unittest import mock +import pytest from azure.core.exceptions import ( HttpResponseError, + IncompleteReadError, ResourceExistsError, AzureError, - ClientAuthenticationError -) -from azure.core.pipeline.transport import ( - AioHttpTransport + ClientAuthenticationError, + ServiceRequestError ) -from azure.core.pipeline.transport import AioHttpTransport +from azure.core.pipeline.transport import AsyncHttpResponse, AioHttpTransportResponse, AioHttpTransport from multidict import CIMultiDict, CIMultiDictProxy from azure.storage.blob import LocationMode @@ -32,6 +32,9 @@ from devtools_testutils import ResourceGroupPreparer, StorageAccountPreparer, RetryCounter, ResponseCallback from devtools_testutils.storage.aio import AsyncStorageTestCase +from aiohttp.client_exceptions import ClientPayloadError +from aiohttp.streams import StreamReader +from aiohttp.client import ClientResponse class AiohttpTestTransport(AioHttpTransport): """Workaround to vcrpy bug: https://github.com/kevin1024/vcrpy/pull/461 @@ -471,4 +474,39 @@ async def test_invalid_account_key_async(self, storage_account_name, storage_acc # No retry should be performed since the signing error is fatal self.assertEqual(retry_counter.count, 0) + @staticmethod + def _count_wrapper(counter, func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + counter[0] += 1 + return func(*args, **kwargs) + return wrapper + + @pytest.mark.live_test_only + @BlobPreparer() + @AsyncStorageTestCase.await_prepared_test + async def test_streaming_retry_async(self, storage_account_name, storage_account_key): + """Test that retry mechanisms are working when streaming data.""" + container_name = self.get_resource_name('utcontainer') + retry = LinearRetry(backoff = 0.1, random_jitter_range=0) + + service = self._create_storage_service( + BlobServiceClient, storage_account_name, storage_account_key, retry_policy=retry) + container = service.get_container_client(container_name) + await container.create_container() + assert await container.exists() + blob_name = "myblob" + await container.upload_blob(blob_name, b"abcde") + + stream_reader_read_mock = mock.MagicMock() + future = asyncio.Future() + future.set_exception(ClientPayloadError()) + stream_reader_read_mock.return_value = future + with mock.patch.object(StreamReader, "read", stream_reader_read_mock), pytest.raises(ServiceRequestError): + blob = container.get_blob_client(blob=blob_name) + count = [0] + blob._pipeline._transport.send = self._count_wrapper(count, blob._pipeline._transport.send) + await blob.download_blob() + assert stream_reader_read_mock.call_count == count[0] == 4 + # ------------------------------------------------------------------------------