diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index 2682ca8ddb6df..545e0cde265a0 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -27,6 +27,7 @@ __version__ = get_distribution('google-cloud-bigquery').version from google.cloud.bigquery._helpers import Row +from google.cloud.bigquery._helpers import DEFAULT_RETRY from google.cloud.bigquery.client import Client from google.cloud.bigquery.dataset import AccessEntry from google.cloud.bigquery.dataset import Dataset @@ -61,4 +62,5 @@ 'Table', 'TableReference', 'UDFResource', + 'DEFAULT_RETRY', ] diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index 0ee7a9c01c6a2..d4230f9ff4f60 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -20,6 +20,7 @@ import six +from google.api.core import retry from google.cloud._helpers import UTC from google.cloud._helpers import _date_from_iso8601_date from google.cloud._helpers import _datetime_from_microseconds @@ -520,3 +521,28 @@ def _rows_page_start(iterator, page, response): total_rows = int(total_rows) iterator.total_rows = total_rows # pylint: enable=unused-argument + + +def _should_retry(exc): + """Predicate for determining when to retry. + + We retry if and only if the 'reason' is 'backendError' + or 'rateLimitExceeded'. + """ + if not hasattr(exc, 'errors'): + return False + if len(exc.errors) == 0: + return False + reason = exc.errors[0]['reason'] + return reason == 'backendError' or reason == 'rateLimitExceeded' + + +DEFAULT_RETRY = retry.Retry(predicate=_should_retry) +"""The default retry object. + +Any method with a ``retry`` parameter will be retried automatically, +with reasonable defaults. To disable retry, pass ``retry=None``. +To modify the default retry behavior, call a ``with_XXX`` method +on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds, +pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. +""" diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 488b409ff77cf..a93b9fb70f3c0 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -17,6 +17,7 @@ from __future__ import absolute_import import collections +import functools import os import uuid @@ -27,6 +28,7 @@ from google.resumable_media.requests import ResumableUpload from google.api.core import page_iterator + from google.cloud import exceptions from google.cloud.client import ClientWithProject from google.cloud.bigquery._http import Connection @@ -44,6 +46,7 @@ from google.cloud.bigquery._helpers import _rows_page_start from google.cloud.bigquery._helpers import _field_to_index_mapping from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW +from google.cloud.bigquery._helpers import DEFAULT_RETRY _DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB @@ -117,7 +120,8 @@ def __init__(self, project=None, credentials=None, _http=None): project=project, credentials=credentials, _http=_http) self._connection = Connection(self) - def list_projects(self, max_results=None, page_token=None): + def list_projects(self, max_results=None, page_token=None, + retry=DEFAULT_RETRY): """List projects for the project associated with this client. See @@ -132,13 +136,16 @@ def list_projects(self, max_results=None, page_token=None): not passed, the API will return the first page of projects. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`~google.api.core.page_iterator.Iterator` :returns: Iterator of :class:`~google.cloud.bigquery.client.Project` accessible to the current client. """ return page_iterator.HTTPIterator( client=self, - api_request=self._connection.api_request, + api_request=functools.partial(self._call_api, retry), path='/projects', item_to_value=_item_to_project, items_key='projects', @@ -146,7 +153,7 @@ def list_projects(self, max_results=None, page_token=None): max_results=max_results) def list_datasets(self, include_all=False, max_results=None, - page_token=None): + page_token=None, retry=DEFAULT_RETRY): """List datasets for the project associated with this client. See @@ -164,6 +171,9 @@ def list_datasets(self, include_all=False, max_results=None, not passed, the API will return the first page of datasets. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`~google.api.core.page_iterator.Iterator` :returns: Iterator of :class:`~google.cloud.bigquery.dataset.Dataset`. accessible to the current client. @@ -174,7 +184,7 @@ def list_datasets(self, include_all=False, max_results=None, path = '/projects/%s/datasets' % (self.project,) return page_iterator.HTTPIterator( client=self, - api_request=self._connection.api_request, + api_request=functools.partial(self._call_api, retry), path=path, item_to_value=_item_to_dataset, items_key='datasets', @@ -241,35 +251,47 @@ def create_table(self, table): method='POST', path=path, data=resource) return Table.from_api_repr(api_response) - def get_dataset(self, dataset_ref): + def _call_api(self, retry, **kwargs): + call = functools.partial(self._connection.api_request, **kwargs) + if retry: + call = retry(call) + return call() + + def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY): """Fetch the dataset referenced by ``dataset_ref`` :type dataset_ref: :class:`google.cloud.bigquery.dataset.DatasetReference` :param dataset_ref: the dataset to use. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`google.cloud.bigquery.dataset.Dataset` :returns: a ``Dataset`` instance """ - api_response = self._connection.api_request( - method='GET', path=dataset_ref.path) + api_response = self._call_api(retry, + method='GET', + path=dataset_ref.path) return Dataset.from_api_repr(api_response) - def get_table(self, table_ref): + def get_table(self, table_ref, retry=DEFAULT_RETRY): """Fetch the table referenced by ``table_ref`` :type table_ref: :class:`google.cloud.bigquery.table.TableReference` :param table_ref: the table to use. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`google.cloud.bigquery.table.Table` :returns: a ``Table`` instance """ - api_response = self._connection.api_request( - method='GET', path=table_ref.path) + api_response = self._call_api(retry, method='GET', path=table_ref.path) return Table.from_api_repr(api_response) - def update_dataset(self, dataset, fields): + def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY): """Change some fields of a dataset. Use ``fields`` to specify which fields to update. At least one field @@ -290,6 +312,9 @@ def update_dataset(self, dataset, fields): :param fields: the fields of ``dataset`` to change, spelled as the Dataset properties (e.g. "friendly_name"). + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`google.cloud.bigquery.dataset.Dataset` :returns: the modified ``Dataset`` instance """ @@ -307,11 +332,11 @@ def update_dataset(self, dataset, fields): headers = {'If-Match': dataset.etag} else: headers = None - api_response = self._connection.api_request( - method='PATCH', path=path, data=partial, headers=headers) + api_response = self._call_api( + retry, method='PATCH', path=path, data=partial, headers=headers) return Dataset.from_api_repr(api_response) - def update_table(self, table, properties): + def update_table(self, table, properties, retry=DEFAULT_RETRY): """API call: update table properties via a PUT request See @@ -321,6 +346,9 @@ def update_table(self, table, properties): :class:`google.cloud.bigquery.table.Table` :param table_ref: the table to update. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`google.cloud.bigquery.table.Table` :returns: a ``Table`` instance """ @@ -329,11 +357,13 @@ def update_table(self, table, properties): headers = {'If-Match': table.etag} else: headers = None - api_response = self._connection.api_request( + api_response = self._call_api( + retry, method='PATCH', path=table.path, data=partial, headers=headers) return Table.from_api_repr(api_response) - def list_dataset_tables(self, dataset, max_results=None, page_token=None): + def list_dataset_tables(self, dataset, max_results=None, page_token=None, + retry=DEFAULT_RETRY): """List tables in the dataset. See @@ -353,6 +383,9 @@ def list_dataset_tables(self, dataset, max_results=None, page_token=None): datasets. If not passed, the API will return the first page of datasets. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`~google.api.core.page_iterator.Iterator` :returns: Iterator of :class:`~google.cloud.bigquery.table.Table` contained within the current dataset. @@ -362,7 +395,7 @@ def list_dataset_tables(self, dataset, max_results=None, page_token=None): path = '%s/tables' % dataset.path result = page_iterator.HTTPIterator( client=self, - api_request=self._connection.api_request, + api_request=functools.partial(self._call_api, retry), path=path, item_to_value=_item_to_table, items_key='tables', @@ -371,7 +404,7 @@ def list_dataset_tables(self, dataset, max_results=None, page_token=None): result.dataset = dataset return result - def delete_dataset(self, dataset): + def delete_dataset(self, dataset, retry=DEFAULT_RETRY): """Delete a dataset. See @@ -381,13 +414,16 @@ def delete_dataset(self, dataset): :class:`~google.cloud.bigquery.dataset.Dataset` :class:`~google.cloud.bigquery.dataset.DatasetReference` + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :param dataset: the dataset to delete, or a reference to it. """ if not isinstance(dataset, (Dataset, DatasetReference)): raise TypeError('dataset must be a Dataset or a DatasetReference') - self._connection.api_request(method='DELETE', path=dataset.path) + self._call_api(retry, method='DELETE', path=dataset.path) - def delete_table(self, table): + def delete_table(self, table, retry=DEFAULT_RETRY): """Delete a table See @@ -397,17 +433,23 @@ def delete_table(self, table): :class:`~google.cloud.bigquery.table.Table` :class:`~google.cloud.bigquery.table.TableReference` :param table: the table to delete, or a reference to it. + + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. """ if not isinstance(table, (Table, TableReference)): raise TypeError('table must be a Table or a TableReference') - self._connection.api_request(method='DELETE', path=table.path) + self._call_api(retry, method='DELETE', path=table.path) - def _get_query_results(self, job_id, project=None, timeout_ms=None): + def _get_query_results(self, job_id, retry, project=None, timeout_ms=None): """Get the query results object for a query job. :type job_id: str :param job_id: Name of the query job. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :type project: str :param project: (Optional) project ID for the query job (defaults to the project of @@ -432,9 +474,11 @@ def _get_query_results(self, job_id, project=None, timeout_ms=None): path = '/projects/{}/queries/{}'.format(project, job_id) - resource = self._connection.api_request( - method='GET', path=path, query_params=extra_params) - + # This call is typically made in a polling loop that checks whether the + # job is complete (from QueryJob.done(), called ultimately from + # QueryJob.result()). So we don't need to poll here. + resource = self._call_api( + retry, method='GET', path=path, query_params=extra_params) return QueryResults.from_api_repr(resource) def job_from_resource(self, resource): @@ -462,7 +506,7 @@ def job_from_resource(self, resource): return QueryJob.from_api_repr(resource, self) raise ValueError('Cannot parse job resource') - def get_job(self, job_id, project=None): + def get_job(self, job_id, project=None, retry=DEFAULT_RETRY): """Fetch a job for the project associated with this client. See @@ -475,6 +519,9 @@ def get_job(self, job_id, project=None): :param project: project ID owning the job (defaults to the client's project) + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`~google.cloud.bigquery.job._AsyncJob` :returns: Concrete job instance, based on the resource returned by the API. @@ -486,13 +533,13 @@ def get_job(self, job_id, project=None): path = '/projects/{}/jobs/{}'.format(project, job_id) - resource = self._connection.api_request( - method='GET', path=path, query_params=extra_params) + resource = self._call_api( + retry, method='GET', path=path, query_params=extra_params) return self.job_from_resource(resource) def list_jobs(self, max_results=None, page_token=None, all_users=None, - state_filter=None): + state_filter=None, retry=DEFAULT_RETRY): """List jobs for the project associated with this client. See @@ -519,6 +566,9 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None, * ``"pending"`` * ``"running"`` + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`~google.api.core.page_iterator.Iterator` :returns: Iterable of job instances. """ @@ -533,7 +583,7 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None, path = '/projects/%s/jobs' % (self.project,) return page_iterator.HTTPIterator( client=self, - api_request=self._connection.api_request, + api_request=functools.partial(self._call_api, retry), path=path, item_to_value=_item_to_job, items_key='jobs', @@ -542,7 +592,8 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None, extra_params=extra_params) def load_table_from_storage(self, source_uris, destination, - job_id=None, job_config=None): + job_id=None, job_config=None, + retry=DEFAULT_RETRY): """Starts a job for loading data into a table from CloudStorage. See @@ -563,6 +614,9 @@ def load_table_from_storage(self, source_uris, destination, :type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig` :param job_config: (Optional) Extra configuration options for the job. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`google.cloud.bigquery.job.LoadJob` :returns: a new ``LoadJob`` instance """ @@ -570,7 +624,7 @@ def load_table_from_storage(self, source_uris, destination, if isinstance(source_uris, six.string_types): source_uris = [source_uris] job = LoadJob(job_id, source_uris, destination, self, job_config) - job.begin() + job.begin(retry=retry) return job def load_table_from_file(self, file_obj, destination, @@ -683,6 +737,8 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries): transport = self._http headers = _get_upload_headers(self._connection.USER_AGENT) upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project) + # TODO: modify ResumableUpload to take a retry.Retry object + # that it can use for the initial RPC. upload = ResumableUpload(upload_url, chunk_size, headers=headers) if num_retries is not None: @@ -738,7 +794,8 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries): return response - def copy_table(self, sources, destination, job_id=None, job_config=None): + def copy_table(self, sources, destination, job_id=None, job_config=None, + retry=DEFAULT_RETRY): """Start a job for copying one or more tables into another table. See @@ -760,6 +817,9 @@ def copy_table(self, sources, destination, job_id=None, job_config=None): :type job_config: :class:`google.cloud.bigquery.job.CopyJobConfig` :param job_config: (Optional) Extra configuration options for the job. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`google.cloud.bigquery.job.CopyJob` :returns: a new ``CopyJob`` instance """ @@ -769,7 +829,7 @@ def copy_table(self, sources, destination, job_id=None, job_config=None): sources = [sources] job = CopyJob(job_id, sources, destination, client=self, job_config=job_config) - job.begin() + job.begin(retry=retry) return job def extract_table(self, source, *destination_uris, **kwargs): @@ -796,20 +856,23 @@ def extract_table(self, source, *destination_uris, **kwargs): * *job_id* (``str``) -- Additional content (Optional) The ID of the job. + * *retry* (:class:`google.api.core.retry.Retry`) + (Optional) How to retry the RPC. :rtype: :class:`google.cloud.bigquery.job.ExtractJob` :returns: a new ``ExtractJob`` instance """ job_config = kwargs.get('job_config') job_id = _make_job_id(kwargs.get('job_id')) + retry = kwargs.get('retry', DEFAULT_RETRY) job = ExtractJob( job_id, source, list(destination_uris), client=self, job_config=job_config) - job.begin() + job.begin(retry=retry) return job - def query(self, query, job_config=None, job_id=None): + def query(self, query, job_config=None, job_id=None, retry=DEFAULT_RETRY): """Start a job that runs a SQL query. See @@ -826,12 +889,15 @@ def query(self, query, job_config=None, job_id=None): :type job_id: str :param job_id: (Optional) ID to use for the query job. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`google.cloud.bigquery.job.QueryJob` :returns: a new ``QueryJob`` instance """ job_id = _make_job_id(job_id) job = QueryJob(job_id, query, client=self, job_config=job_config) - job.begin() + job.begin(retry=retry) return job def create_rows(self, table, rows, row_ids=None, selected_fields=None, @@ -947,7 +1013,8 @@ def create_rows(self, table, rows, row_ids=None, selected_fields=None, return errors - def query_rows(self, query, job_config=None, job_id=None, timeout=None): + def query_rows(self, query, job_config=None, job_id=None, timeout=None, + retry=DEFAULT_RETRY): """Start a query job and wait for the results. See @@ -981,11 +1048,12 @@ def query_rows(self, query, job_config=None, job_id=None, timeout=None): failed or :class:`TimeoutError` if the job did not complete in the given timeout. """ - job = self.query(query, job_config=job_config, job_id=job_id) + job = self.query( + query, job_config=job_config, job_id=job_id, retry=retry) return job.result(timeout=timeout) def list_rows(self, table, selected_fields=None, max_results=None, - page_token=None, start_index=None): + page_token=None, start_index=None, retry=DEFAULT_RETRY): """List the rows of the table. See @@ -1019,6 +1087,9 @@ def list_rows(self, table, selected_fields=None, max_results=None, :param page_token: (Optional) The zero-based index of the starting row to read. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`~google.api.core.page_iterator.Iterator` :returns: Iterator of row data :class:`tuple`s. During each page, the iterator will have the ``total_rows`` attribute set, @@ -1048,7 +1119,7 @@ def list_rows(self, table, selected_fields=None, max_results=None, iterator = page_iterator.HTTPIterator( client=self, - api_request=self._connection.api_request, + api_request=functools.partial(self._call_api, retry), path='%s/data' % (table.path,), item_to_value=_item_to_row, items_key='rows', @@ -1061,7 +1132,7 @@ def list_rows(self, table, selected_fields=None, max_results=None, iterator._field_to_index = _field_to_index_mapping(schema) return iterator - def list_partitions(self, table): + def list_partitions(self, table, retry=DEFAULT_RETRY): """List the partitions in a table. :type table: One of: @@ -1069,6 +1140,9 @@ def list_partitions(self, table): :class:`~google.cloud.bigquery.table.TableReference` :param table: the table to list, or a reference to it. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: list :returns: a list of time partitions """ @@ -1077,7 +1151,8 @@ def list_partitions(self, table): rows = self.query_rows( 'SELECT partition_id from [%s:%s.%s$__PARTITIONS_SUMMARY__]' % (table.project, table.dataset_id, table.table_id), - job_config=config) + job_config=config, + retry=retry) return [row[0] for row in rows] diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 593b14e41fa1e..65da699563699 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -38,6 +38,7 @@ from google.cloud.bigquery._helpers import _EnumApiResourceProperty from google.cloud.bigquery._helpers import _ListApiResourceProperty from google.cloud.bigquery._helpers import _TypedApiResourceProperty +from google.cloud.bigquery._helpers import DEFAULT_RETRY _DONE_STATE = 'DONE' _STOPPED_REASON = 'stopped' @@ -383,7 +384,7 @@ def _get_resource_config(cls, resource): config = resource['configuration'][cls._JOB_TYPE] return job_id, config - def begin(self, client=None): + def begin(self, client=None, retry=DEFAULT_RETRY): """API call: begin the job via a POST request See @@ -394,6 +395,9 @@ def begin(self, client=None): :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :raises: :exc:`ValueError` if the job has already begin. """ if self.state is not None: @@ -402,11 +406,14 @@ def begin(self, client=None): client = self._require_client(client) path = '/projects/%s/jobs' % (self.project,) - api_response = client._connection.api_request( + # jobs.insert is idempotent because we ensure that every new + # job has an ID. + api_response = client._call_api( + retry, method='POST', path=path, data=self._build_resource()) self._set_properties(api_response) - def exists(self, client=None): + def exists(self, client=None, retry=DEFAULT_RETRY): """API call: test for the existence of the job via a GET request See @@ -417,20 +424,24 @@ def exists(self, client=None): :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: bool :returns: Boolean indicating existence of the job. """ client = self._require_client(client) try: - client._connection.api_request(method='GET', path=self.path, - query_params={'fields': 'id'}) + client._call_api(retry, + method='GET', path=self.path, + query_params={'fields': 'id'}) except NotFound: return False else: return True - def reload(self, client=None): + def reload(self, client=None, retry=DEFAULT_RETRY): """API call: refresh job properties via a GET request. See @@ -440,11 +451,13 @@ def reload(self, client=None): ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. + + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. """ client = self._require_client(client) - api_response = client._connection.api_request( - method='GET', path=self.path) + api_response = client._call_api(retry, method='GET', path=self.path) self._set_properties(api_response) def cancel(self, client=None): @@ -494,16 +507,19 @@ def _set_future_result(self): else: self.set_result(self) - def done(self): + def done(self, retry=DEFAULT_RETRY): """Refresh the job and checks if it is complete. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: bool :returns: True if the job is complete, False otherwise. """ # Do not refresh is the state is already done, as the job will not # change once complete. if self.state != _DONE_STATE: - self.reload() + self.reload(retry=retry) return self.state == _DONE_STATE def result(self, timeout=None): @@ -522,6 +538,7 @@ def result(self, timeout=None): """ if self.state is None: self.begin() + # TODO: modify PollingFuture so it can pass a retry argument to done(). return super(_AsyncJob, self).result(timeout=timeout) def cancelled(self): @@ -1830,17 +1847,21 @@ def undeclared_query_paramters(self): return parameters - def query_results(self): + def query_results(self, retry=DEFAULT_RETRY): """Construct a QueryResults instance, bound to this job. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`~google.cloud.bigquery.query.QueryResults` :returns: results instance """ if not self._query_results: - self._query_results = self._client._get_query_results(self.job_id) + self._query_results = self._client._get_query_results( + self.job_id, retry) return self._query_results - def done(self): + def done(self, retry=DEFAULT_RETRY): """Refresh the job and checks if it is complete. :rtype: bool @@ -1849,17 +1870,18 @@ def done(self): # Do not refresh is the state is already done, as the job will not # change once complete. if self.state != _DONE_STATE: - self._query_results = self._client._get_query_results(self.job_id) + self._query_results = self._client._get_query_results( + self.job_id, retry) # Only reload the job once we know the query is complete. # This will ensure that fields such as the destination table are # correctly populated. if self._query_results.complete: - self.reload() + self.reload(retry=retry) return self.state == _DONE_STATE - def result(self, timeout=None): + def result(self, timeout=None, retry=DEFAULT_RETRY): """Start the job and wait for it to complete and get the result. :type timeout: int @@ -1867,6 +1889,9 @@ def result(self, timeout=None): How long to wait for job to complete before raising a :class:`TimeoutError`. + :type retry: :class:`google.api.core.retry.Retry` + :param retry: (Optional) How to retry the call that retrieves rows. + :rtype: :class:`~google.api.core.page_iterator.Iterator` :returns: Iterator of row data :class:`tuple`s. During each page, the @@ -1883,7 +1908,8 @@ def result(self, timeout=None): # Return an iterator instead of returning the job. schema = self.query_results().schema dest_table = self.destination - return self._client.list_rows(dest_table, selected_fields=schema) + return self._client.list_rows(dest_table, selected_fields=schema, + retry=retry) class QueryPlanEntryStep(object): diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 22df27c6358c9..5328536dde0e8 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -68,7 +68,7 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self): with self.assertRaises(NotFound): client._get_query_results( - 'nothere', project='other-project', timeout_ms=500) + 'nothere', None, project='other-project', timeout_ms=500) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -110,7 +110,7 @@ def test__get_query_results_hit(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) client._connection = _Connection(data) - query_results = client._get_query_results(job_id) + query_results = client._get_query_results(job_id, None) self.assertEqual(query_results.total_rows, 10) self.assertTrue(query_results.complete) @@ -274,6 +274,8 @@ def test_dataset_with_default_project(self): self.assertEqual(dataset.project, self.PROJECT) def test_get_dataset(self): + from google.cloud.exceptions import ServerError + path = 'projects/%s/datasets/%s' % (self.PROJECT, self.DS_ID) creds = _make_credentials() http = object() @@ -297,6 +299,39 @@ def test_get_dataset(self): self.assertEqual(req['path'], '/%s' % path) self.assertEqual(dataset.dataset_id, self.DS_ID) + # Test retry. + + # Not a cloud API exception (missing 'errors' field). + client._connection = _Connection(Exception(''), resource) + with self.assertRaises(Exception): + client.get_dataset(dataset_ref) + + # Zero-length errors field. + client._connection = _Connection(ServerError(''), resource) + with self.assertRaises(ServerError): + client.get_dataset(dataset_ref) + + # Non-retryable reason. + client._connection = _Connection( + ServerError('', errors=[{'reason': 'serious'}]), + resource) + with self.assertRaises(ServerError): + client.get_dataset(dataset_ref) + + # Retryable reason, but retry is disabled. + client._connection = _Connection( + ServerError('', errors=[{'reason': 'backendError'}]), + resource) + with self.assertRaises(ServerError): + client.get_dataset(dataset_ref, retry=None) + + # Retryable reason, default retry: success. + client._connection = _Connection( + ServerError('', errors=[{'reason': 'backendError'}]), + resource) + dataset = client.get_dataset(dataset_ref) + self.assertEqual(dataset.dataset_id, self.DS_ID) + def test_create_dataset_minimal(self): from google.cloud.bigquery.dataset import Dataset @@ -2974,4 +3009,6 @@ def api_request(self, **kw): raise NotFound('miss') response, self._responses = self._responses[0], self._responses[1:] + if isinstance(response, Exception): + raise response return response