Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[storage] remove dead retry #25455

Merged
merged 25 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ This version and all future versions will require Python 3.7+. Python 3.6 is no
- Added support for `AzureNamedKeyCredential` as a valid `credential` type.

### Bugs Fixed
- Updated exception catching of `StorageStreamDownloader`'s retry mechanism.
- Removed dead retry meachism from async `azure.storage.blob.aio.StorageStreamDownloader`.
- Updated exception catching of `azure.storage.blob.StorageStreamDownloader`'s retry mechanism.
- Adjusted type hints for `upload_blob` and `StorageStreamDownloader.readall`.

## 12.13.0 (2022-07-07)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from typing import AsyncIterator, Generic, TypeVar

import asyncio
from aiohttp import ClientPayloadError
from azure.core.exceptions import HttpResponseError, ServiceResponseError

from azure.core.exceptions import HttpResponseError

from .._shared.request_handlers import validate_and_format_range_headers
from .._shared.response_handlers import process_storage_error, parse_length_from_content_range
Expand Down Expand Up @@ -102,27 +102,18 @@ async def _download_chunk(self, chunk_start, chunk_end):
download_range[1],
check_content_md5=self.validate_content
)
retry_active = True
retry_total = 3
while retry_active:
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
retry_active = False
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)

except HttpResponseError as error:
process_storage_error(error)
except ClientPayloadError as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
await asyncio.sleep(1)
except HttpResponseError as error:
process_storage_error(error)

chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options)

Expand Down Expand Up @@ -332,63 +323,52 @@ async def _initial_request(self):
end_range_required=False,
check_content_md5=self._validate_content)

retry_active = True
retry_total = 3
while retry_active:
try:
location_mode, response = await self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
# Remove any extra encryption data size from blob size
self._file_size = adjust_blob_size_for_encryption(self._file_size, self._encryption_data)

if self._end_range is not None:
# Use the length unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size
retry_active = False
try:
location_mode, response = await self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options)

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options)
retry_active = False
except HttpResponseError as error:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
# Remove any extra encryption data size from blob size
self._file_size = adjust_blob_size_for_encryption(self._file_size, self._encryption_data)

if self._end_range is not None:
# Use the length unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options)
except HttpResponseError as error:
process_storage_error(error)

except ClientPayloadError as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
await asyncio.sleep(1)
# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)

# get page ranges to optimize downloading sparse page blob
if response.properties.blob_type == 'PageBlob':
Expand Down
52 changes: 45 additions & 7 deletions sdk/storage/azure-storage-blob/tests/test_retry_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import unittest
import pytest
import asyncio
import functools
from unittest import mock
import pytest

from azure.core.exceptions import (
HttpResponseError,
IncompleteReadError,
ResourceExistsError,
AzureError,
ClientAuthenticationError
)
from azure.core.pipeline.transport import (
AioHttpTransport
ClientAuthenticationError,
ServiceRequestError
)

from azure.core.pipeline.transport import AioHttpTransport
from azure.core.pipeline.transport import AsyncHttpResponse, AioHttpTransportResponse, AioHttpTransport
from multidict import CIMultiDict, CIMultiDictProxy

from azure.storage.blob import LocationMode
Expand All @@ -32,6 +32,9 @@
from devtools_testutils import ResourceGroupPreparer, StorageAccountPreparer, RetryCounter, ResponseCallback
from devtools_testutils.storage.aio import AsyncStorageTestCase

from aiohttp.client_exceptions import ClientPayloadError
from aiohttp.streams import StreamReader
from aiohttp.client import ClientResponse

class AiohttpTestTransport(AioHttpTransport):
"""Workaround to vcrpy bug: https://github.com/kevin1024/vcrpy/pull/461
Expand Down Expand Up @@ -471,4 +474,39 @@ async def test_invalid_account_key_async(self, storage_account_name, storage_acc
# No retry should be performed since the signing error is fatal
self.assertEqual(retry_counter.count, 0)

@staticmethod
def _count_wrapper(counter, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
counter[0] += 1
return func(*args, **kwargs)
return wrapper

@pytest.mark.live_test_only
@BlobPreparer()
@AsyncStorageTestCase.await_prepared_test
async def test_streaming_retry_async(self, storage_account_name, storage_account_key):
Stevenjin8 marked this conversation as resolved.
Show resolved Hide resolved
"""Test that retry mechanisms are working when streaming data."""
container_name = self.get_resource_name('utcontainer')
retry = LinearRetry(backoff = 0.1, random_jitter_range=0)

service = self._create_storage_service(
BlobServiceClient, storage_account_name, storage_account_key, retry_policy=retry)
container = service.get_container_client(container_name)
await container.create_container()
assert await container.exists()
blob_name = "myblob"
await container.upload_blob(blob_name, b"abcde")

stream_reader_read_mock = mock.MagicMock()
future = asyncio.Future()
future.set_exception(ClientPayloadError())
stream_reader_read_mock.return_value = future
with mock.patch.object(StreamReader, "read", stream_reader_read_mock), pytest.raises(ServiceRequestError):
blob = container.get_blob_client(blob=blob_name)
count = [0]
blob._pipeline._transport.send = self._count_wrapper(count, blob._pipeline._transport.send)
await blob.download_blob()
assert stream_reader_read_mock.call_count == count[0] == 4

# ------------------------------------------------------------------------------