Skip to content

Commit

Permalink
[Storage] Remove dead retry from async blob download (#25455)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven Jin authored Aug 4, 2022
1 parent 102d717 commit 60b1048
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 84 deletions.
3 changes: 2 additions & 1 deletion sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ This version and all future versions will require Python 3.7+. Python 3.6 is no
- Added support for `AzureNamedKeyCredential` as a valid `credential` type.

### Bugs Fixed
- Updated exception catching of `StorageStreamDownloader`'s retry mechanism.
- Removed dead retry meachism from async `azure.storage.blob.aio.StorageStreamDownloader`.
- Updated exception catching of `azure.storage.blob.StorageStreamDownloader`'s retry mechanism.
- Adjusted type hints for `upload_blob` and `StorageStreamDownloader.readall`.

## 12.13.0 (2022-07-07)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from typing import AsyncIterator, Generic, TypeVar

import asyncio
from aiohttp import ClientPayloadError
from azure.core.exceptions import HttpResponseError, ServiceResponseError

from azure.core.exceptions import HttpResponseError

from .._shared.request_handlers import validate_and_format_range_headers
from .._shared.response_handlers import process_storage_error, parse_length_from_content_range
Expand Down Expand Up @@ -102,27 +102,18 @@ async def _download_chunk(self, chunk_start, chunk_end):
download_range[1],
check_content_md5=self.validate_content
)
retry_active = True
retry_total = 3
while retry_active:
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
retry_active = False
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)

except HttpResponseError as error:
process_storage_error(error)
except ClientPayloadError as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
await asyncio.sleep(1)
except HttpResponseError as error:
process_storage_error(error)

chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options)

Expand Down Expand Up @@ -332,63 +323,52 @@ async def _initial_request(self):
end_range_required=False,
check_content_md5=self._validate_content)

retry_active = True
retry_total = 3
while retry_active:
try:
location_mode, response = await self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
# Remove any extra encryption data size from blob size
self._file_size = adjust_blob_size_for_encryption(self._file_size, self._encryption_data)

if self._end_range is not None:
# Use the length unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size
retry_active = False
try:
location_mode, response = await self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options)

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options)
retry_active = False
except HttpResponseError as error:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
# Remove any extra encryption data size from blob size
self._file_size = adjust_blob_size_for_encryption(self._file_size, self._encryption_data)

if self._end_range is not None:
# Use the length unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options)
except HttpResponseError as error:
process_storage_error(error)

except ClientPayloadError as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
await asyncio.sleep(1)
# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)

# get page ranges to optimize downloading sparse page blob
if response.properties.blob_type == 'PageBlob':
Expand Down
52 changes: 45 additions & 7 deletions sdk/storage/azure-storage-blob/tests/test_retry_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import unittest
import pytest
import asyncio
import functools
from unittest import mock
import pytest

from azure.core.exceptions import (
HttpResponseError,
IncompleteReadError,
ResourceExistsError,
AzureError,
ClientAuthenticationError
)
from azure.core.pipeline.transport import (
AioHttpTransport
ClientAuthenticationError,
ServiceRequestError
)

from azure.core.pipeline.transport import AioHttpTransport
from azure.core.pipeline.transport import AsyncHttpResponse, AioHttpTransportResponse, AioHttpTransport
from multidict import CIMultiDict, CIMultiDictProxy

from azure.storage.blob import LocationMode
Expand All @@ -32,6 +32,9 @@
from devtools_testutils import ResourceGroupPreparer, StorageAccountPreparer, RetryCounter, ResponseCallback
from devtools_testutils.storage.aio import AsyncStorageTestCase

from aiohttp.client_exceptions import ClientPayloadError
from aiohttp.streams import StreamReader
from aiohttp.client import ClientResponse

class AiohttpTestTransport(AioHttpTransport):
"""Workaround to vcrpy bug: https://github.com/kevin1024/vcrpy/pull/461
Expand Down Expand Up @@ -471,4 +474,39 @@ async def test_invalid_account_key_async(self, storage_account_name, storage_acc
# No retry should be performed since the signing error is fatal
self.assertEqual(retry_counter.count, 0)

@staticmethod
def _count_wrapper(counter, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
counter[0] += 1
return func(*args, **kwargs)
return wrapper

@pytest.mark.live_test_only
@BlobPreparer()
@AsyncStorageTestCase.await_prepared_test
async def test_streaming_retry_async(self, storage_account_name, storage_account_key):
"""Test that retry mechanisms are working when streaming data."""
container_name = self.get_resource_name('utcontainer')
retry = LinearRetry(backoff = 0.1, random_jitter_range=0)

service = self._create_storage_service(
BlobServiceClient, storage_account_name, storage_account_key, retry_policy=retry)
container = service.get_container_client(container_name)
await container.create_container()
assert await container.exists()
blob_name = "myblob"
await container.upload_blob(blob_name, b"abcde")

stream_reader_read_mock = mock.MagicMock()
future = asyncio.Future()
future.set_exception(ClientPayloadError())
stream_reader_read_mock.return_value = future
with mock.patch.object(StreamReader, "read", stream_reader_read_mock), pytest.raises(ServiceRequestError):
blob = container.get_blob_client(blob=blob_name)
count = [0]
blob._pipeline._transport.send = self._count_wrapper(count, blob._pipeline._transport.send)
await blob.download_blob()
assert stream_reader_read_mock.call_count == count[0] == 4

# ------------------------------------------------------------------------------

0 comments on commit 60b1048

Please sign in to comment.