Skip to content

Commit

Permalink
Move 'DEFAULT_RETRY' (w/ its predicate) to a new public 'retry' modul…
Browse files Browse the repository at this point in the history
…e. (#5552)

Publish it, w/ its docstring, in the reference docs.
  • Loading branch information
tseaver authored Jun 28, 2018
1 parent baf0057 commit 3767584
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 108 deletions.
2 changes: 1 addition & 1 deletion bigquery/google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from pkg_resources import get_distribution
__version__ = get_distribution('google-cloud-bigquery').version

from google.cloud.bigquery._helpers import DEFAULT_RETRY
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.dataset import AccessEntry
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -57,6 +56,7 @@
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import StructQueryParameter
from google.cloud.bigquery.query import UDFResource
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import EncryptionConfiguration
from google.cloud.bigquery.table import Table
Expand Down
26 changes: 0 additions & 26 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import datetime
import decimal

from google.api_core import retry
from google.cloud._helpers import UTC
from google.cloud._helpers import _date_from_iso8601_date
from google.cloud._helpers import _datetime_from_microseconds
Expand Down Expand Up @@ -330,31 +329,6 @@ def _snake_to_camel_case(value):
return words[0] + ''.join(map(str.capitalize, words[1:]))


def _should_retry(exc):
"""Predicate for determining when to retry.
We retry if and only if the 'reason' is 'backendError'
or 'rateLimitExceeded'.
"""
if not hasattr(exc, 'errors'):
return False
if len(exc.errors) == 0:
return False
reason = exc.errors[0]['reason']
return reason == 'backendError' or reason == 'rateLimitExceeded'


DEFAULT_RETRY = retry.Retry(predicate=_should_retry)
"""The default retry object.
Any method with a ``retry`` parameter will be retried automatically,
with reasonable defaults. To disable retry, pass ``retry=None``.
To modify the default retry behavior, call a ``with_XXX`` method
on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds,
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""


def _get_sub_prop(container, keys, default=None):
"""Get a nested value from a dictionary.
Expand Down
2 changes: 1 addition & 1 deletion bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

from google.cloud.bigquery._helpers import DEFAULT_RETRY
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._http import Connection
Expand All @@ -41,6 +40,7 @@
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import job
from google.cloud.bigquery.query import _QueryResults
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TableListItem
from google.cloud.bigquery.table import TableReference
Expand Down
63 changes: 32 additions & 31 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,20 @@
import google.api_core.future.polling
from google.cloud import exceptions
from google.cloud.exceptions import NotFound
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.query import _query_param_from_api_repr
from google.cloud.bigquery.query import ArrayQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import StructQueryParameter
from google.cloud.bigquery.query import UDFResource
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import EncryptionConfiguration
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TimePartitioning
from google.cloud.bigquery import _helpers
from google.cloud.bigquery._helpers import DEFAULT_RETRY
from google.cloud.bigquery._helpers import _int_or_none

_DONE_STATE = 'DONE'
_STOPPED_REASON = 'stopped'
Expand Down Expand Up @@ -379,7 +377,7 @@ def created(self):
if statistics is not None:
millis = statistics.get('creationTime')
if millis is not None:
return _datetime_from_microseconds(millis * 1000.0)
return _helpers._datetime_from_microseconds(millis * 1000.0)

@property
def started(self):
Expand All @@ -392,7 +390,7 @@ def started(self):
if statistics is not None:
millis = statistics.get('startTime')
if millis is not None:
return _datetime_from_microseconds(millis * 1000.0)
return _helpers._datetime_from_microseconds(millis * 1000.0)

@property
def ended(self):
Expand All @@ -405,7 +403,7 @@ def ended(self):
if statistics is not None:
millis = statistics.get('endTime')
if millis is not None:
return _datetime_from_microseconds(millis * 1000.0)
return _helpers._datetime_from_microseconds(millis * 1000.0)

def _job_statistics(self):
"""Helper for job-type specific statistics-based properties."""
Expand Down Expand Up @@ -923,7 +921,7 @@ def skip_leading_rows(self):
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.skipLeadingRows
"""
return _int_or_none(self._get_sub_prop('skipLeadingRows'))
return _helpers._int_or_none(self._get_sub_prop('skipLeadingRows'))

@skip_leading_rows.setter
def skip_leading_rows(self, value):
Expand Down Expand Up @@ -1876,7 +1874,7 @@ def maximum_bytes_billed(self):
See
https://g.co/cloud/bigquery/docs/reference/rest/v2/jobs#configuration.query.maximumBytesBilled
"""
return _int_or_none(self._get_sub_prop('maximumBytesBilled'))
return _helpers._int_or_none(self._get_sub_prop('maximumBytesBilled'))

@maximum_bytes_billed.setter
def maximum_bytes_billed(self, value):
Expand Down Expand Up @@ -2361,7 +2359,7 @@ def num_dml_affected_rows(self):
@property
def slot_millis(self):
"""Union[int, None]: Slot-milliseconds used by this query job."""
return _int_or_none(self._job_statistics().get('totalSlotMs'))
return _helpers._int_or_none(self._job_statistics().get('totalSlotMs'))

@property
def statement_type(self):
Expand Down Expand Up @@ -2606,50 +2604,51 @@ def start(self):
"""Union[Datetime, None]: Datetime when the stage started."""
if self._properties.get('startMs') is None:
return None
return _datetime_from_microseconds(
return _helpers._datetime_from_microseconds(
int(self._properties.get('startMs')) * 1000.0)

@property
def end(self):
"""Union[Datetime, None]: Datetime when the stage ended."""
if self._properties.get('endMs') is None:
return None
return _datetime_from_microseconds(
return _helpers._datetime_from_microseconds(
int(self._properties.get('endMs')) * 1000.0)

@property
def input_stages(self):
"""List(int): Entry IDs for stages that were inputs for this stage."""
if self._properties.get('inputStages') is None:
return []
return [_int_or_none(entry)
return [_helpers._int_or_none(entry)
for entry in self._properties.get('inputStages')]

@property
def parallel_inputs(self):
"""Union[int, None]: Number of parallel input segments within
the stage.
"""
return _int_or_none(self._properties.get('parallelInputs'))
return _helpers._int_or_none(self._properties.get('parallelInputs'))

@property
def completed_parallel_inputs(self):
"""Union[int, None]: Number of parallel input segments completed."""
return _int_or_none(self._properties.get('completedParallelInputs'))
return _helpers._int_or_none(
self._properties.get('completedParallelInputs'))

@property
def wait_ms_avg(self):
"""Union[int, None]: Milliseconds the average worker spent waiting to
be scheduled.
"""
return _int_or_none(self._properties.get('waitMsAvg'))
return _helpers._int_or_none(self._properties.get('waitMsAvg'))

@property
def wait_ms_max(self):
"""Union[int, None]: Milliseconds the slowest worker spent waiting to
be scheduled.
"""
return _int_or_none(self._properties.get('waitMsMax'))
return _helpers._int_or_none(self._properties.get('waitMsMax'))

@property
def wait_ratio_avg(self):
Expand All @@ -2672,14 +2671,14 @@ def read_ms_avg(self):
"""Union[int, None]: Milliseconds the average worker spent reading
input.
"""
return _int_or_none(self._properties.get('readMsAvg'))
return _helpers._int_or_none(self._properties.get('readMsAvg'))

@property
def read_ms_max(self):
"""Union[int, None]: Milliseconds the slowest worker spent reading
input.
"""
return _int_or_none(self._properties.get('readMsMax'))
return _helpers._int_or_none(self._properties.get('readMsMax'))

@property
def read_ratio_avg(self):
Expand All @@ -2702,14 +2701,14 @@ def compute_ms_avg(self):
"""Union[int, None]: Milliseconds the average worker spent on CPU-bound
processing.
"""
return _int_or_none(self._properties.get('computeMsAvg'))
return _helpers._int_or_none(self._properties.get('computeMsAvg'))

@property
def compute_ms_max(self):
"""Union[int, None]: Milliseconds the slowest worker spent on CPU-bound
processing.
"""
return _int_or_none(self._properties.get('computeMsMax'))
return _helpers._int_or_none(self._properties.get('computeMsMax'))

@property
def compute_ratio_avg(self):
Expand All @@ -2732,14 +2731,14 @@ def write_ms_avg(self):
"""Union[int, None]: Milliseconds the average worker spent writing
output data.
"""
return _int_or_none(self._properties.get('writeMsAvg'))
return _helpers._int_or_none(self._properties.get('writeMsAvg'))

@property
def write_ms_max(self):
"""Union[int, None]: Milliseconds the slowest worker spent writing
output data.
"""
return _int_or_none(self._properties.get('writeMsMax'))
return _helpers._int_or_none(self._properties.get('writeMsMax'))

@property
def write_ratio_avg(self):
Expand All @@ -2760,12 +2759,12 @@ def write_ratio_max(self):
@property
def records_read(self):
"""Union[int, None]: Number of records read by this stage."""
return _int_or_none(self._properties.get('recordsRead'))
return _helpers._int_or_none(self._properties.get('recordsRead'))

@property
def records_written(self):
"""Union[int, None]: Number of records written by this stage."""
return _int_or_none(self._properties.get('recordsWritten'))
return _helpers._int_or_none(self._properties.get('recordsWritten'))

@property
def status(self):
Expand All @@ -2777,14 +2776,16 @@ def shuffle_output_bytes(self):
"""Union[int, None]: Number of bytes written by this stage to
intermediate shuffle.
"""
return _int_or_none(self._properties.get('shuffleOutputBytes'))
return _helpers._int_or_none(
self._properties.get('shuffleOutputBytes'))

@property
def shuffle_output_bytes_spilled(self):
"""Union[int, None]: Number of bytes written by this stage to
intermediate shuffle and spilled to disk.
"""
return _int_or_none(self._properties.get('shuffleOutputBytesSpilled'))
return _helpers._int_or_none(
self._properties.get('shuffleOutputBytesSpilled'))

@property
def steps(self):
Expand Down Expand Up @@ -2828,31 +2829,31 @@ def from_api_repr(cls, resource):
def elapsed_ms(self):
"""Union[int, None]: Milliseconds elapsed since start of query
execution."""
return _int_or_none(self._properties.get('elapsedMs'))
return _helpers._int_or_none(self._properties.get('elapsedMs'))

@property
def active_units(self):
"""Union[int, None]: Current number of input units being processed
by workers, reported as largest value since the last sample."""
return _int_or_none(self._properties.get('activeUnits'))
return _helpers._int_or_none(self._properties.get('activeUnits'))

@property
def pending_units(self):
"""Union[int, None]: Current number of input units remaining for
query stages active at this sample time."""
return _int_or_none(self._properties.get('pendingUnits'))
return _helpers._int_or_none(self._properties.get('pendingUnits'))

@property
def completed_units(self):
"""Union[int, None]: Current number of input units completed by
this query."""
return _int_or_none(self._properties.get('completedUnits'))
return _helpers._int_or_none(self._properties.get('completedUnits'))

@property
def slot_millis(self):
"""Union[int, None]: Cumulative slot-milliseconds consumed by
this query."""
return _int_or_none(self._properties.get('totalSlotMs'))
return _helpers._int_or_none(self._properties.get('totalSlotMs'))


class UnknownJob(_AsyncJob):
Expand Down
41 changes: 41 additions & 0 deletions bigquery/google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.api_core import retry


def _should_retry(exc):
"""Predicate for determining when to retry.
We retry if and only if the 'reason' is 'backendError'
or 'rateLimitExceeded'.
"""
if not hasattr(exc, 'errors'):
return False
if len(exc.errors) == 0:
return False
reason = exc.errors[0]['reason']
return reason == 'backendError' or reason == 'rateLimitExceeded'


DEFAULT_RETRY = retry.Retry(predicate=_should_retry)
"""The default retry object.
Any method with a ``retry`` parameter will be retried automatically,
with reasonable defaults. To disable retry, pass ``retry=None``.
To modify the default retry behavior, call a ``with_XXX`` method
on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds,
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""
Loading

0 comments on commit 3767584

Please sign in to comment.