Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test downloadretry #18143

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 90 additions & 68 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

import sys
import threading
import time

import warnings
from io import BytesIO
import requests

from azure.core.exceptions import HttpResponseError
from azure.core.exceptions import HttpResponseError, ServiceResponseError
from azure.core.tracing.common import with_current_context
from ._shared.encryption import decrypt_blob
from ._shared.request_handlers import validate_and_format_range_headers
Expand Down Expand Up @@ -43,10 +46,9 @@ def process_range_and_offset(start_range, end_range, length, encryption):
def process_content(data, start_offset, end_offset, encryption):
if data is None:
raise ValueError("Response cannot be None.")
try:
content = b"".join(list(data))
except Exception as error:
raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error)

content = b"".join(list(data))

if content and encryption.get("key") is not None or encryption.get("resolver") is not None:
try:
return decrypt_blob(
Expand Down Expand Up @@ -188,19 +190,29 @@ def _download_chunk(self, chunk_start, chunk_end):
check_content_md5=self.validate_content
)

try:
_, response = self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)
retry_active = True
retry_total = 3
while retry_active:
try:
_, response = self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)

chunk_data = process_content(response, offset[0], offset[1], self.encryption_options)
try:
chunk_data = process_content(response, offset[0], offset[1], self.encryption_options)
retry_active = False
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
time.sleep(1)

# This makes sure that if_match is set so that we can validate
# that subsequent downloads are to an unmodified blob
Expand Down Expand Up @@ -334,16 +346,6 @@ def __init__(
# TODO: Set to the stored MD5 when the service returns this
self.properties.content_md5 = None

if self.size == 0:
self._current_content = b""
else:
self._current_content = process_content(
self._response,
self._initial_offset[0],
self._initial_offset[1],
self._encryption_options
)

def __len__(self):
return self.size

Expand All @@ -356,51 +358,71 @@ def _initial_request(self):
check_content_md5=self._validate_content
)

try:
location_mode, response = self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options
)
retry_active = True
retry_total = 3
while retry_active:
try:
location_mode, response = self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options
)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode
# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the end range index unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the end range index unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options
)
except HttpResponseError as error:
except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options
)
except HttpResponseError as error:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)
try:
if self.size == 0:
self._current_content = b""
else:
self._current_content = process_content(
response,
self._initial_offset[0],
self._initial_offset[1],
self._encryption_options
)
retry_active = False
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
time.sleep(1)

# get page ranges to optimize downloading sparse page blob
if response.properties.blob_type == 'PageBlob':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ async def send(self, request):
request.context.options.pop('raw_response_hook', self._response_callback)

response = await self.next.send(request)
await response.http_response.load_body()

will_retry = is_retry(response, request.context.options.get('mode'))
if not will_retry and download_stream_current is not None:
Expand Down
Loading