Skip to content

Commit

Permalink
Add back support for retries in storage uploads. (#3378)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes authored May 10, 2017
1 parent 3333f71 commit ff4f492
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 30 deletions.
63 changes: 47 additions & 16 deletions storage/google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@
'storageClass',
)
_NUM_RETRIES_MESSAGE = (
'num_retries is no longer supported. When a transient error occurs, '
'such as a 429 Too Many Requests or 500 Internal Server Error, upload '
'requests will be automatically retried. Subsequent retries will be '
'done after waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until '
'10 minutes of wait time have elapsed. At that point, there will be no '
'more attempts to retry.')
'`num_retries` has been deprecated and will be removed in a future '
'release. The default behavior (when `num_retries` is not specified) when '
'a transient error (e.g. 429 Too Many Requests or 500 Internal Server '
'Error) occurs will be as follows: upload requests will be automatically '
'retried. Subsequent retries will be sent after waiting 1, 2, 4, 8, etc. '
'seconds (exponential backoff) until 10 minutes of wait time have '
'elapsed. At that point, there will be no more attempts to retry.')
_READ_LESS_THAN_SIZE = (
'Size {:d} was specified but the file-like object only had '
'{:d} bytes remaining.')
Expand Down Expand Up @@ -583,7 +584,8 @@ def _get_upload_arguments(self, content_type):
content_type = self._get_content_type(content_type)
return headers, object_metadata, content_type

def _do_multipart_upload(self, client, stream, content_type, size):
def _do_multipart_upload(self, client, stream, content_type,
size, num_retries):
"""Perform a multipart upload.
Assumes ``chunk_size`` is :data:`None` on the current blob.
Expand All @@ -610,6 +612,10 @@ def _do_multipart_upload(self, client, stream, content_type, size):
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).
:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
:rtype: :class:`~requests.Response`
:returns: The "200 OK" response object returned after the multipart
upload request.
Expand All @@ -631,13 +637,19 @@ def _do_multipart_upload(self, client, stream, content_type, size):
upload_url = _MULTIPART_URL_TEMPLATE.format(
bucket_path=self.bucket.path)
upload = MultipartUpload(upload_url, headers=headers)

if num_retries is not None:
upload._retry_strategy = resumable_media.RetryStrategy(
max_retries=num_retries)

response = upload.transmit(
transport, data, object_metadata, content_type)

return response

def _initiate_resumable_upload(self, client, stream, content_type,
size, extra_headers=None, chunk_size=None):
size, num_retries, extra_headers=None,
chunk_size=None):
"""Initiate a resumable upload.
The content type of the upload will be determined in order
Expand All @@ -662,6 +674,10 @@ def _initiate_resumable_upload(self, client, stream, content_type,
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).
:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
:type extra_headers: dict
:param extra_headers: (Optional) Extra headers to add to standard
headers.
Expand Down Expand Up @@ -693,13 +709,19 @@ def _initiate_resumable_upload(self, client, stream, content_type,
upload_url = _RESUMABLE_URL_TEMPLATE.format(
bucket_path=self.bucket.path)
upload = ResumableUpload(upload_url, chunk_size, headers=headers)

if num_retries is not None:
upload._retry_strategy = resumable_media.RetryStrategy(
max_retries=num_retries)

upload.initiate(
transport, stream, object_metadata, content_type,
total_bytes=size, stream_final=False)

return upload, transport

def _do_resumable_upload(self, client, stream, content_type, size):
def _do_resumable_upload(self, client, stream, content_type,
size, num_retries):
"""Perform a resumable upload.
Assumes ``chunk_size`` is not :data:`None` on the current blob.
Expand All @@ -726,19 +748,23 @@ def _do_resumable_upload(self, client, stream, content_type, size):
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).
:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
:rtype: :class:`~requests.Response`
:returns: The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

while not upload.finished:
response = upload.transmit_next_chunk(transport)

return response

def _do_upload(self, client, stream, content_type, size):
def _do_upload(self, client, stream, content_type, size, num_retries):
"""Determine an upload strategy and then perform the upload.
If the current blob has a ``chunk_size`` set, then a resumable upload
Expand Down Expand Up @@ -767,17 +793,21 @@ def _do_upload(self, client, stream, content_type, size):
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).
:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
:rtype: dict
:returns: The parsed JSON from the "200 OK" response. This will be the
**only** response in the multipart case and it will be the
**final** response in the resumable case.
"""
if self.chunk_size is None:
response = self._do_multipart_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)
else:
response = self._do_resumable_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

return response.json()

Expand Down Expand Up @@ -831,7 +861,8 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
:param content_type: Optional type of content being uploaded.
:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated.)
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
:type client: :class:`~google.cloud.storage.client.Client`
:param client: (Optional) The client to use. If not passed, falls back
Expand All @@ -846,7 +877,7 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
_maybe_rewind(file_obj, rewind=rewind)
try:
created_json = self._do_upload(
client, file_obj, content_type, size)
client, file_obj, content_type, size, num_retries)
self._set_properties(created_json)
except resumable_media.InvalidResponse as exc:
_raise_from_invalid_response(exc)
Expand Down Expand Up @@ -1004,7 +1035,7 @@ def create_resumable_upload_session(
# to the `ResumableUpload` constructor. The chunk size only
# matters when **sending** bytes to an upload.
upload, _ = self._initiate_resumable_upload(
client, dummy_stream, content_type, size,
client, dummy_stream, content_type, size, None,
extra_headers=extra_headers,
chunk_size=self._CHUNK_SIZE_MULTIPLE)

Expand Down
2 changes: 1 addition & 1 deletion storage/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
REQUIREMENTS = [
'google-cloud-core >= 0.24.1, < 0.25dev',
'google-auth >= 1.0.0',
'google-resumable-media >= 0.1.0',
'google-resumable-media >= 0.1.1',
'requests >= 2.0.0',
]

Expand Down
52 changes: 39 additions & 13 deletions storage/tests/unit/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,8 @@ def _mock_transport(self, status_code, headers, content=b''):
fake_transport.request.return_value = fake_response
return fake_transport

def _do_multipart_success(self, mock_get_boundary, size=None):
def _do_multipart_success(self, mock_get_boundary, size=None,
num_retries=None):
bucket = mock.Mock(path='/b/w00t', spec=[u'path'])
blob = self._make_one(u'blob-name', bucket=bucket)
self.assertIsNone(blob.chunk_size)
Expand All @@ -777,7 +778,7 @@ def _do_multipart_success(self, mock_get_boundary, size=None):
stream = io.BytesIO(data)
content_type = u'application/xml'
response = blob._do_multipart_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

# Check the mocks and the returned value.
self.assertIs(response, fake_transport.request.return_value)
Expand Down Expand Up @@ -817,6 +818,11 @@ def test__do_multipart_upload_no_size(self, mock_get_boundary):
def test__do_multipart_upload_with_size(self, mock_get_boundary):
self._do_multipart_success(mock_get_boundary, size=10)

@mock.patch(u'google.resumable_media._upload.get_boundary',
return_value=b'==0==')
def test__do_multipart_upload_with_retry(self, mock_get_boundary):
self._do_multipart_success(mock_get_boundary, num_retries=8)

def test__do_multipart_upload_bad_size(self):
blob = self._make_one(u'blob-name', bucket=None)

Expand All @@ -826,15 +832,15 @@ def test__do_multipart_upload_bad_size(self):
self.assertGreater(size, len(data))

with self.assertRaises(ValueError) as exc_info:
blob._do_multipart_upload(None, stream, None, size)
blob._do_multipart_upload(None, stream, None, size, None)

exc_contents = str(exc_info.exception)
self.assertIn(
'was specified but the file-like object only had', exc_contents)
self.assertEqual(stream.tell(), len(data))

def _initiate_resumable_helper(self, size=None, extra_headers=None,
chunk_size=None):
chunk_size=None, num_retries=None):
from google.resumable_media.requests import ResumableUpload

bucket = mock.Mock(path='/b/whammy', spec=[u'path'])
Expand Down Expand Up @@ -862,7 +868,7 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None,
stream = io.BytesIO(data)
content_type = u'text/plain'
upload, transport = blob._initiate_resumable_upload(
client, stream, content_type, size,
client, stream, content_type, size, num_retries,
extra_headers=extra_headers, chunk_size=chunk_size)

# Check the returned values.
Expand Down Expand Up @@ -890,6 +896,14 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None,
self.assertEqual(upload._total_bytes, size)
self.assertEqual(upload._content_type, content_type)
self.assertEqual(upload.resumable_url, resumable_url)
retry_strategy = upload._retry_strategy
self.assertEqual(retry_strategy.max_sleep, 64.0)
if num_retries is None:
self.assertEqual(retry_strategy.max_cumulative_retry, 600.0)
self.assertIsNone(retry_strategy.max_retries)
else:
self.assertIsNone(retry_strategy.max_cumulative_retry)
self.assertEqual(retry_strategy.max_retries, num_retries)
self.assertIs(transport, fake_transport)
# Make sure we never read from the stream.
self.assertEqual(stream.tell(), 0)
Expand Down Expand Up @@ -923,6 +937,9 @@ def test__initiate_resumable_upload_with_extra_headers(self):
extra_headers = {'origin': 'http://not-in-kansas-anymore.invalid'}
self._initiate_resumable_helper(extra_headers=extra_headers)

def test__initiate_resumable_upload_with_retry(self):
self._initiate_resumable_helper(num_retries=11)

def _make_resumable_transport(self, headers1, headers2,
headers3, total_bytes):
from google import resumable_media
Expand Down Expand Up @@ -990,7 +1007,7 @@ def _do_resumable_upload_call2(blob, content_type, data,
return mock.call(
'PUT', resumable_url, data=payload, headers=expected_headers)

def _do_resumable_helper(self, use_size=False):
def _do_resumable_helper(self, use_size=False, num_retries=None):
bucket = mock.Mock(path='/b/yesterday', spec=[u'path'])
blob = self._make_one(u'blob-name', bucket=bucket)
blob.chunk_size = blob._CHUNK_SIZE_MULTIPLE
Expand All @@ -1017,7 +1034,7 @@ def _do_resumable_helper(self, use_size=False):
stream = io.BytesIO(data)
content_type = u'text/html'
response = blob._do_resumable_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

# Check the returned values.
self.assertIs(response, responses[2])
Expand All @@ -1039,7 +1056,10 @@ def test__do_resumable_upload_no_size(self):
def test__do_resumable_upload_with_size(self):
self._do_resumable_helper(use_size=True)

def _do_upload_helper(self, chunk_size=None):
def test__do_resumable_upload_with_retry(self):
self._do_resumable_helper(num_retries=6)

def _do_upload_helper(self, chunk_size=None, num_retries=None):
blob = self._make_one(u'blob-name', bucket=None)

# Create a fake response.
Expand All @@ -1061,17 +1081,18 @@ def _do_upload_helper(self, chunk_size=None):
size = 12345654321

# Make the request and check the mocks.
created_json = blob._do_upload(client, stream, content_type, size)
created_json = blob._do_upload(
client, stream, content_type, size, num_retries)
self.assertIs(created_json, mock.sentinel.json)
response.json.assert_called_once_with()
if chunk_size is None:
blob._do_multipart_upload.assert_called_once_with(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)
blob._do_resumable_upload.assert_not_called()
else:
blob._do_multipart_upload.assert_not_called()
blob._do_resumable_upload.assert_called_once_with(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

def test__do_upload_without_chunk_size(self):
self._do_upload_helper()
Expand All @@ -1080,6 +1101,9 @@ def test__do_upload_with_chunk_size(self):
chunk_size = 1024 * 1024 * 1024 # 1GB
self._do_upload_helper(chunk_size=chunk_size)

def test__do_upload_with_retry(self):
self._do_upload_helper(num_retries=20)

def _upload_from_file_helper(self, side_effect=None, **kwargs):
from google.cloud._helpers import UTC

Expand Down Expand Up @@ -1109,8 +1133,9 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs):
self.assertEqual(blob.updated, new_updated)

# Check the mock.
num_retries = kwargs.get('num_retries')
blob._do_upload.assert_called_once_with(
client, stream, content_type, len(data))
client, stream, content_type, len(data), num_retries)

return stream

Expand Down Expand Up @@ -1151,10 +1176,11 @@ def _do_upload_mock_call_helper(self, blob, client, content_type, size):
mock_call = blob._do_upload.mock_calls[0]
call_name, pos_args, kwargs = mock_call
self.assertEqual(call_name, '')
self.assertEqual(len(pos_args), 4)
self.assertEqual(len(pos_args), 5)
self.assertEqual(pos_args[0], client)
self.assertEqual(pos_args[2], content_type)
self.assertEqual(pos_args[3], size)
self.assertIsNone(pos_args[4]) # num_retries
self.assertEqual(kwargs, {})

return pos_args[1]
Expand Down

0 comments on commit ff4f492

Please sign in to comment.