Skip to content

Commit

Permalink
[gcloud] Retry on connection/read timeout
Browse files Browse the repository at this point in the history
Revert this comment after a version of `google-cloud-storage` with
googleapis/python-storage#727 gets released and
ScanForm is updated to use it.
  • Loading branch information
mlazowik committed Mar 10, 2022
1 parent 4336884 commit b8fca56
Showing 1 changed file with 41 additions and 9 deletions.
50 changes: 41 additions & 9 deletions storages/backends/gcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
)

try:
from google.api_core import exceptions as api_exceptions
from google.api_core import retry
from google.auth import exceptions as auth_exceptions
from google.cloud.exceptions import NotFound
from google.cloud.storage import Blob, Client
from google.cloud.storage.blob import _quote
from google.cloud.storage.retry import (
DEFAULT_RETRY, DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
_RETRYABLE_TYPES, ConditionalRetryPolicy, is_generation_specified,
_ADDITIONAL_RETRYABLE_STATUS_CODES,
)
import requests.exceptions
except ImportError:
raise ImproperlyConfigured("Could not load Google Cloud Storage bindings.\n"
"See https://github.com/GoogleCloudPlatform/gcloud-python")
Expand All @@ -30,14 +35,40 @@
CONTENT_ENCODING = 'content_encoding'
CONTENT_TYPE = 'content_type'

# TODO(m1): revert the commit that introduced this after a version of
# `google-cloud-storage` with https://github.com/googleapis/python-storage/pull/727
# gets released and ScanForm is updated to use it.
CUSTOM_RETRYABLE_TYPES = _RETRYABLE_TYPES + (
requests.exceptions.Timeout,
)


def _should_retry(exc):
"""Predicate for determining when to retry."""
if isinstance(exc, CUSTOM_RETRYABLE_TYPES):
return True
elif isinstance(exc, api_exceptions.GoogleAPICallError):
return exc.code in _ADDITIONAL_RETRYABLE_STATUS_CODES
elif isinstance(exc, auth_exceptions.TransportError):
return _should_retry(exc.args[0])
else:
return False


RETRY = retry.Retry(predicate=_should_retry)

RETRY_IF_GENERATION_SPECIFIED = ConditionalRetryPolicy(
RETRY, is_generation_specified, ["query_params"]
)


class GoogleCloudFile(CompressedFileMixin, File):
def __init__(self, name, mode, storage):
self.name = name
self.mime_type = mimetypes.guess_type(name)[0]
self._mode = mode
self._storage = storage
self.blob = storage.bucket.get_blob(name, timeout=storage.timeout)
self.blob = storage.bucket.get_blob(name, timeout=storage.timeout, retry=RETRY)
if not self.blob and 'w' in mode:
self.blob = Blob(
self.name, storage.bucket,
Expand All @@ -58,7 +89,8 @@ def _get_file(self):
)
if 'r' in self._mode:
self._is_dirty = False
self.blob.download_to_file(self._file, timeout=self._storage.timeout)
self.blob.download_to_file(self._file, timeout=self._storage.timeout,
retry=RETRY)
self._file.seek(0)
if self._storage.gzip and self.blob.content_encoding == 'gzip':
self._file = self._decompress_file(mode=self._mode, file=self._file)
Expand Down Expand Up @@ -140,9 +172,9 @@ def get_default_settings(self):
@property
def retry_if_generation_specified_or_immutable(self):
if self.all_files_immutable:
return DEFAULT_RETRY
return RETRY

return DEFAULT_RETRY_IF_GENERATION_SPECIFIED
return RETRY_IF_GENERATION_SPECIFIED

@property
def client(self):
Expand Down Expand Up @@ -242,13 +274,13 @@ def delete(self, name):
def exists(self, name):
if not name: # root element aka the bucket
try:
self.client.get_bucket(self.bucket, timeout=self.timeout)
self.client.get_bucket(self.bucket, timeout=self.timeout, retry=RETRY)
return True
except NotFound:
return False

name = self._normalize_name(clean_name(name))
return bool(self.bucket.get_blob(name, timeout=self.timeout))
return bool(self.bucket.get_blob(name, timeout=self.timeout, retry=RETRY))

def listdir(self, name):
name = self._normalize_name(clean_name(name))
Expand All @@ -258,7 +290,7 @@ def listdir(self, name):
name += '/'

iterator = self.bucket.list_blobs(prefix=name, delimiter='/',
timeout=self.timeout)
timeout=self.timeout, retry=RETRY)
blobs = list(iterator)
prefixes = iterator.prefixes

Expand All @@ -276,7 +308,7 @@ def listdir(self, name):

def _get_blob(self, name):
# Wrap google.cloud.storage's blob to raise if the file doesn't exist
blob = self.bucket.get_blob(name, timeout=self.timeout)
blob = self.bucket.get_blob(name, timeout=self.timeout, retry=RETRY)

if blob is None:
raise NotFound('File does not exist: {}'.format(name))
Expand Down

0 comments on commit b8fca56

Please sign in to comment.