From 37675843fb59c109591b725b4dd66e1498c534f3 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 28 Jun 2018 17:35:24 -0400 Subject: [PATCH] Move 'DEFAULT_RETRY' (w/ its predicate) to a new public 'retry' module. (#5552) Publish it, w/ its docstring, in the reference docs. --- bigquery/google/cloud/bigquery/__init__.py | 2 +- bigquery/google/cloud/bigquery/_helpers.py | 26 --------- bigquery/google/cloud/bigquery/client.py | 2 +- bigquery/google/cloud/bigquery/job.py | 63 +++++++++++----------- bigquery/google/cloud/bigquery/retry.py | 41 ++++++++++++++ bigquery/tests/unit/test__helpers.py | 32 ----------- bigquery/tests/unit/test_job.py | 17 ------ bigquery/tests/unit/test_retry.py | 48 +++++++++++++++++ docs/bigquery/reference.rst | 9 ++++ 9 files changed, 132 insertions(+), 108 deletions(-) create mode 100644 bigquery/google/cloud/bigquery/retry.py create mode 100644 bigquery/tests/unit/test_retry.py diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index 751efd5a671b..b5cc93b18974 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -31,7 +31,6 @@ from pkg_resources import get_distribution __version__ = get_distribution('google-cloud-bigquery').version -from google.cloud.bigquery._helpers import DEFAULT_RETRY from google.cloud.bigquery.client import Client from google.cloud.bigquery.dataset import AccessEntry from google.cloud.bigquery.dataset import Dataset @@ -57,6 +56,7 @@ from google.cloud.bigquery.query import ScalarQueryParameter from google.cloud.bigquery.query import StructQueryParameter from google.cloud.bigquery.query import UDFResource +from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import EncryptionConfiguration from google.cloud.bigquery.table import Table diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 41474458fdb5..6ef89e14e93f 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -18,7 +18,6 @@ import datetime import decimal -from google.api_core import retry from google.cloud._helpers import UTC from google.cloud._helpers import _date_from_iso8601_date from google.cloud._helpers import _datetime_from_microseconds @@ -330,31 +329,6 @@ def _snake_to_camel_case(value): return words[0] + ''.join(map(str.capitalize, words[1:])) -def _should_retry(exc): - """Predicate for determining when to retry. - - We retry if and only if the 'reason' is 'backendError' - or 'rateLimitExceeded'. - """ - if not hasattr(exc, 'errors'): - return False - if len(exc.errors) == 0: - return False - reason = exc.errors[0]['reason'] - return reason == 'backendError' or reason == 'rateLimitExceeded' - - -DEFAULT_RETRY = retry.Retry(predicate=_should_retry) -"""The default retry object. - -Any method with a ``retry`` parameter will be retried automatically, -with reasonable defaults. To disable retry, pass ``retry=None``. -To modify the default retry behavior, call a ``with_XXX`` method -on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds, -pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. -""" - - def _get_sub_prop(container, keys, default=None): """Get a nested value from a dictionary. diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index b5ef0bc4e7f2..f60edd129a17 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -32,7 +32,6 @@ from google.cloud import exceptions from google.cloud.client import ClientWithProject -from google.cloud.bigquery._helpers import DEFAULT_RETRY from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._http import Connection @@ -41,6 +40,7 @@ from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery import job from google.cloud.bigquery.query import _QueryResults +from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import TableListItem from google.cloud.bigquery.table import TableReference diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 63ddfec3fb16..7898c9b65d6c 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -22,7 +22,6 @@ import google.api_core.future.polling from google.cloud import exceptions from google.cloud.exceptions import NotFound -from google.cloud._helpers import _datetime_from_microseconds from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.query import _query_param_from_api_repr @@ -30,14 +29,13 @@ from google.cloud.bigquery.query import ScalarQueryParameter from google.cloud.bigquery.query import StructQueryParameter from google.cloud.bigquery.query import UDFResource +from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import EncryptionConfiguration from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import TimePartitioning from google.cloud.bigquery import _helpers -from google.cloud.bigquery._helpers import DEFAULT_RETRY -from google.cloud.bigquery._helpers import _int_or_none _DONE_STATE = 'DONE' _STOPPED_REASON = 'stopped' @@ -379,7 +377,7 @@ def created(self): if statistics is not None: millis = statistics.get('creationTime') if millis is not None: - return _datetime_from_microseconds(millis * 1000.0) + return _helpers._datetime_from_microseconds(millis * 1000.0) @property def started(self): @@ -392,7 +390,7 @@ def started(self): if statistics is not None: millis = statistics.get('startTime') if millis is not None: - return _datetime_from_microseconds(millis * 1000.0) + return _helpers._datetime_from_microseconds(millis * 1000.0) @property def ended(self): @@ -405,7 +403,7 @@ def ended(self): if statistics is not None: millis = statistics.get('endTime') if millis is not None: - return _datetime_from_microseconds(millis * 1000.0) + return _helpers._datetime_from_microseconds(millis * 1000.0) def _job_statistics(self): """Helper for job-type specific statistics-based properties.""" @@ -923,7 +921,7 @@ def skip_leading_rows(self): See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.skipLeadingRows """ - return _int_or_none(self._get_sub_prop('skipLeadingRows')) + return _helpers._int_or_none(self._get_sub_prop('skipLeadingRows')) @skip_leading_rows.setter def skip_leading_rows(self, value): @@ -1876,7 +1874,7 @@ def maximum_bytes_billed(self): See https://g.co/cloud/bigquery/docs/reference/rest/v2/jobs#configuration.query.maximumBytesBilled """ - return _int_or_none(self._get_sub_prop('maximumBytesBilled')) + return _helpers._int_or_none(self._get_sub_prop('maximumBytesBilled')) @maximum_bytes_billed.setter def maximum_bytes_billed(self, value): @@ -2361,7 +2359,7 @@ def num_dml_affected_rows(self): @property def slot_millis(self): """Union[int, None]: Slot-milliseconds used by this query job.""" - return _int_or_none(self._job_statistics().get('totalSlotMs')) + return _helpers._int_or_none(self._job_statistics().get('totalSlotMs')) @property def statement_type(self): @@ -2606,7 +2604,7 @@ def start(self): """Union[Datetime, None]: Datetime when the stage started.""" if self._properties.get('startMs') is None: return None - return _datetime_from_microseconds( + return _helpers._datetime_from_microseconds( int(self._properties.get('startMs')) * 1000.0) @property @@ -2614,7 +2612,7 @@ def end(self): """Union[Datetime, None]: Datetime when the stage ended.""" if self._properties.get('endMs') is None: return None - return _datetime_from_microseconds( + return _helpers._datetime_from_microseconds( int(self._properties.get('endMs')) * 1000.0) @property @@ -2622,7 +2620,7 @@ def input_stages(self): """List(int): Entry IDs for stages that were inputs for this stage.""" if self._properties.get('inputStages') is None: return [] - return [_int_or_none(entry) + return [_helpers._int_or_none(entry) for entry in self._properties.get('inputStages')] @property @@ -2630,26 +2628,27 @@ def parallel_inputs(self): """Union[int, None]: Number of parallel input segments within the stage. """ - return _int_or_none(self._properties.get('parallelInputs')) + return _helpers._int_or_none(self._properties.get('parallelInputs')) @property def completed_parallel_inputs(self): """Union[int, None]: Number of parallel input segments completed.""" - return _int_or_none(self._properties.get('completedParallelInputs')) + return _helpers._int_or_none( + self._properties.get('completedParallelInputs')) @property def wait_ms_avg(self): """Union[int, None]: Milliseconds the average worker spent waiting to be scheduled. """ - return _int_or_none(self._properties.get('waitMsAvg')) + return _helpers._int_or_none(self._properties.get('waitMsAvg')) @property def wait_ms_max(self): """Union[int, None]: Milliseconds the slowest worker spent waiting to be scheduled. """ - return _int_or_none(self._properties.get('waitMsMax')) + return _helpers._int_or_none(self._properties.get('waitMsMax')) @property def wait_ratio_avg(self): @@ -2672,14 +2671,14 @@ def read_ms_avg(self): """Union[int, None]: Milliseconds the average worker spent reading input. """ - return _int_or_none(self._properties.get('readMsAvg')) + return _helpers._int_or_none(self._properties.get('readMsAvg')) @property def read_ms_max(self): """Union[int, None]: Milliseconds the slowest worker spent reading input. """ - return _int_or_none(self._properties.get('readMsMax')) + return _helpers._int_or_none(self._properties.get('readMsMax')) @property def read_ratio_avg(self): @@ -2702,14 +2701,14 @@ def compute_ms_avg(self): """Union[int, None]: Milliseconds the average worker spent on CPU-bound processing. """ - return _int_or_none(self._properties.get('computeMsAvg')) + return _helpers._int_or_none(self._properties.get('computeMsAvg')) @property def compute_ms_max(self): """Union[int, None]: Milliseconds the slowest worker spent on CPU-bound processing. """ - return _int_or_none(self._properties.get('computeMsMax')) + return _helpers._int_or_none(self._properties.get('computeMsMax')) @property def compute_ratio_avg(self): @@ -2732,14 +2731,14 @@ def write_ms_avg(self): """Union[int, None]: Milliseconds the average worker spent writing output data. """ - return _int_or_none(self._properties.get('writeMsAvg')) + return _helpers._int_or_none(self._properties.get('writeMsAvg')) @property def write_ms_max(self): """Union[int, None]: Milliseconds the slowest worker spent writing output data. """ - return _int_or_none(self._properties.get('writeMsMax')) + return _helpers._int_or_none(self._properties.get('writeMsMax')) @property def write_ratio_avg(self): @@ -2760,12 +2759,12 @@ def write_ratio_max(self): @property def records_read(self): """Union[int, None]: Number of records read by this stage.""" - return _int_or_none(self._properties.get('recordsRead')) + return _helpers._int_or_none(self._properties.get('recordsRead')) @property def records_written(self): """Union[int, None]: Number of records written by this stage.""" - return _int_or_none(self._properties.get('recordsWritten')) + return _helpers._int_or_none(self._properties.get('recordsWritten')) @property def status(self): @@ -2777,14 +2776,16 @@ def shuffle_output_bytes(self): """Union[int, None]: Number of bytes written by this stage to intermediate shuffle. """ - return _int_or_none(self._properties.get('shuffleOutputBytes')) + return _helpers._int_or_none( + self._properties.get('shuffleOutputBytes')) @property def shuffle_output_bytes_spilled(self): """Union[int, None]: Number of bytes written by this stage to intermediate shuffle and spilled to disk. """ - return _int_or_none(self._properties.get('shuffleOutputBytesSpilled')) + return _helpers._int_or_none( + self._properties.get('shuffleOutputBytesSpilled')) @property def steps(self): @@ -2828,31 +2829,31 @@ def from_api_repr(cls, resource): def elapsed_ms(self): """Union[int, None]: Milliseconds elapsed since start of query execution.""" - return _int_or_none(self._properties.get('elapsedMs')) + return _helpers._int_or_none(self._properties.get('elapsedMs')) @property def active_units(self): """Union[int, None]: Current number of input units being processed by workers, reported as largest value since the last sample.""" - return _int_or_none(self._properties.get('activeUnits')) + return _helpers._int_or_none(self._properties.get('activeUnits')) @property def pending_units(self): """Union[int, None]: Current number of input units remaining for query stages active at this sample time.""" - return _int_or_none(self._properties.get('pendingUnits')) + return _helpers._int_or_none(self._properties.get('pendingUnits')) @property def completed_units(self): """Union[int, None]: Current number of input units completed by this query.""" - return _int_or_none(self._properties.get('completedUnits')) + return _helpers._int_or_none(self._properties.get('completedUnits')) @property def slot_millis(self): """Union[int, None]: Cumulative slot-milliseconds consumed by this query.""" - return _int_or_none(self._properties.get('totalSlotMs')) + return _helpers._int_or_none(self._properties.get('totalSlotMs')) class UnknownJob(_AsyncJob): diff --git a/bigquery/google/cloud/bigquery/retry.py b/bigquery/google/cloud/bigquery/retry.py new file mode 100644 index 000000000000..1e1ef57f116c --- /dev/null +++ b/bigquery/google/cloud/bigquery/retry.py @@ -0,0 +1,41 @@ + +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.api_core import retry + + +def _should_retry(exc): + """Predicate for determining when to retry. + + We retry if and only if the 'reason' is 'backendError' + or 'rateLimitExceeded'. + """ + if not hasattr(exc, 'errors'): + return False + if len(exc.errors) == 0: + return False + reason = exc.errors[0]['reason'] + return reason == 'backendError' or reason == 'rateLimitExceeded' + + +DEFAULT_RETRY = retry.Retry(predicate=_should_retry) +"""The default retry object. + +Any method with a ``retry`` parameter will be retried automatically, +with reasonable defaults. To disable retry, pass ``retry=None``. +To modify the default retry behavior, call a ``with_XXX`` method +on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds, +pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. +""" diff --git a/bigquery/tests/unit/test__helpers.py b/bigquery/tests/unit/test__helpers.py index b53ac85354fb..b70b81b1bea7 100644 --- a/bigquery/tests/unit/test__helpers.py +++ b/bigquery/tests/unit/test__helpers.py @@ -17,8 +17,6 @@ import decimal import unittest -import mock - class Test_not_null(unittest.TestCase): @@ -809,36 +807,6 @@ def test_w_camel_case_string(self): self.assertEqual(self._call_fut('friendlyName'), 'friendlyName') -class Test_should_retry(unittest.TestCase): - - def _call_fut(self, exc): - from google.cloud.bigquery._helpers import _should_retry - - return _should_retry(exc) - - def test_wo_errors_attribute(self): - self.assertFalse(self._call_fut(object())) - - def test_w_empty_errors(self): - exc = mock.Mock(errors=[], spec=['errors']) - self.assertFalse(self._call_fut(exc)) - - def test_w_non_matching_reason(self): - exc = mock.Mock( - errors=[{'reason': 'bogus'}], spec=['errors']) - self.assertFalse(self._call_fut(exc)) - - def test_w_backendError(self): - exc = mock.Mock( - errors=[{'reason': 'backendError'}], spec=['errors']) - self.assertTrue(self._call_fut(exc)) - - def test_w_rateLimitExceeded(self): - exc = mock.Mock( - errors=[{'reason': 'rateLimitExceeded'}], spec=['errors']) - self.assertTrue(self._call_fut(exc)) - - class Test__get_sub_prop(unittest.TestCase): def _call_fut(self, container, keys, **kw): diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index a8d0ed96299f..594c605b505d 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -57,23 +57,6 @@ def _make_connection(*responses): return mock_conn -class Test__int_or_none(unittest.TestCase): - - def _call_fut(self, *args, **kwargs): - from google.cloud.bigquery import job - - return job._int_or_none(*args, **kwargs) - - def test_w_int(self): - self.assertEqual(self._call_fut(13), 13) - - def test_w_none(self): - self.assertIsNone(self._call_fut(None)) - - def test_w_str(self): - self.assertEqual(self._call_fut('13'), 13) - - class Test__error_result_to_exception(unittest.TestCase): def _call_fut(self, *args, **kwargs): diff --git a/bigquery/tests/unit/test_retry.py b/bigquery/tests/unit/test_retry.py new file mode 100644 index 000000000000..b94c26a3b990 --- /dev/null +++ b/bigquery/tests/unit/test_retry.py @@ -0,0 +1,48 @@ + +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import mock + + +class Test_should_retry(unittest.TestCase): + + def _call_fut(self, exc): + from google.cloud.bigquery.retry import _should_retry + + return _should_retry(exc) + + def test_wo_errors_attribute(self): + self.assertFalse(self._call_fut(object())) + + def test_w_empty_errors(self): + exc = mock.Mock(errors=[], spec=['errors']) + self.assertFalse(self._call_fut(exc)) + + def test_w_non_matching_reason(self): + exc = mock.Mock( + errors=[{'reason': 'bogus'}], spec=['errors']) + self.assertFalse(self._call_fut(exc)) + + def test_w_backendError(self): + exc = mock.Mock( + errors=[{'reason': 'backendError'}], spec=['errors']) + self.assertTrue(self._call_fut(exc)) + + def test_w_rateLimitExceeded(self): + exc = mock.Mock( + errors=[{'reason': 'rateLimitExceeded'}], spec=['errors']) + self.assertTrue(self._call_fut(exc)) diff --git a/docs/bigquery/reference.rst b/docs/bigquery/reference.rst index 794419272871..ca2d8c6f8a6b 100644 --- a/docs/bigquery/reference.rst +++ b/docs/bigquery/reference.rst @@ -114,6 +114,15 @@ Query query.UDFResource +Retries +======= + +.. autosummary:: + :toctree: generated + + retry.DEFAULT_RETRY + + External Configuration ======================