diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index c8df21e91f554..85410bf0d4f0d 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -64,6 +64,7 @@ from google.cloud.bigquery.model import ModelReference from google.cloud.bigquery.query import _QueryResults from google.cloud.bigquery.retry import DEFAULT_RETRY +from google.cloud.bigquery.retry import timeoutable_http from google.cloud.bigquery.routine import Routine from google.cloud.bigquery.routine import RoutineReference from google.cloud.bigquery.schema import SchemaField @@ -1056,7 +1057,13 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False): raise def _get_query_results( - self, job_id, retry, project=None, timeout_ms=None, location=None + self, + job_id, + retry, + project=None, + timeout_ms=None, + location=None, + http_timeout=None, ): """Get the query results object for a query job. @@ -1068,9 +1075,12 @@ def _get_query_results( (Optional) project ID for the query job (defaults to the project of the client). timeout_ms (int): - (Optional) number of milliseconds the the API call should + (Optional) the number of milliseconds the API call should wait for the query to complete before the request times out. location (str): Location of the query job. + http_timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + each time before retrying the HTTP request. Returns: google.cloud.bigquery.query._QueryResults: @@ -1093,12 +1103,18 @@ def _get_query_results( path = "/projects/{}/queries/{}".format(project, job_id) - # This call is typically made in a polling loop that checks whether the - # job is complete (from QueryJob.done(), called ultimately from - # QueryJob.result()). So we don't need to poll here. - resource = self._call_api( - retry, method="GET", path=path, query_params=extra_params + # The following call is typically made in a polling loop that checks + # whether the job is complete (from QueryJob.done(), called ultimately + # from QueryJob.result()). So we don't need to poll here. + call_api = functools.partial( + self._call_api, method="GET", path=path, query_params=extra_params ) + if http_timeout is not None: + with timeoutable_http(self._http, http_timeout, retry) as adjusted_retry: + resource = call_api(adjusted_retry) + else: + resource = call_api(retry) + return _QueryResults.from_api_repr(resource) def job_from_resource(self, resource): diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index f0312b0d4219d..a27090b6756b7 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -2994,7 +2994,7 @@ def estimated_bytes_processed(self): result = int(result) return result - def done(self, retry=DEFAULT_RETRY): + def done(self, retry=DEFAULT_RETRY, timeout=None): """Refresh the job and checks if it is complete. Returns: @@ -3007,11 +3007,11 @@ def done(self, retry=DEFAULT_RETRY): timeout_ms = None if self._done_timeout is not None: # Subtract a buffer for context switching, network latency, etc. - timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS - timeout = max(min(timeout, 10), 0) - self._done_timeout -= timeout + api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS + api_timeout = max(min(api_timeout, 10), 0) + self._done_timeout -= api_timeout self._done_timeout = max(0, self._done_timeout) - timeout_ms = int(timeout * 1000) + timeout_ms = int(api_timeout * 1000) # Do not refresh is the state is already done, as the job will not # change once complete. @@ -3022,6 +3022,7 @@ def done(self, retry=DEFAULT_RETRY): project=self.project, timeout_ms=timeout_ms, location=self.location, + http_timeout=timeout, ) # Only reload the job once we know the query is complete. @@ -3126,7 +3127,11 @@ def result( # Return an iterator instead of returning the job. if not self._query_results: self._query_results = self._client._get_query_results( - self.job_id, retry, project=self.project, location=self.location + self.job_id, + retry, + project=self.project, + location=self.location, + http_timeout=timeout, ) except exceptions.GoogleCloudError as exc: exc.message += self._format_for_exception(self.query, self.job_id) diff --git a/bigquery/google/cloud/bigquery/retry.py b/bigquery/google/cloud/bigquery/retry.py index 4bc4b757f45d8..26c4a3ffb0225 100644 --- a/bigquery/google/cloud/bigquery/retry.py +++ b/bigquery/google/cloud/bigquery/retry.py @@ -12,6 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import contextlib + +import requests + from google.api_core import exceptions from google.api_core import retry @@ -27,6 +31,50 @@ ) +class _TimeoutableHTTPAdapter(requests.adapters.HTTPAdapter): + """An HTTP adapter that allows sending HTTP requests with a timeout.""" + + def __init__(self, timeout=None, *args, **kwargs): + self._timeout = timeout + super(_TimeoutableHTTPAdapter, self).__init__(*args, **kwargs) + + def send(self, *args, **kwargs): + kwargs["timeout"] = self._timeout + return super(_TimeoutableHTTPAdapter, self).send(*args, **kwargs) + + +@contextlib.contextmanager +def timeoutable_http(http, timeout, retry): + """Add timeout to the ``http`` transport object. + + Args: + http (requests.Session): A transport objects for making HTTP requests. + timeout (float): The number of seconds to wait for the transport's HTTP + request before retrying it. + retry (api_core.retry.Retry): A Retry object that will be used for + retrying the requests. + + Yields: + (api_core.retry.Retry) An adjusted Retry object that also retries on the + underlying transport errors, and **done** retrying at ``timeout``. + """ + orig_http_adapter = http.get_adapter("http://") + orig_https_adapter = http.get_adapter("https://") + + http.mount("http://", _TimeoutableHTTPAdapter(timeout=timeout, max_retries=0)) + http.mount("https://", _TimeoutableHTTPAdapter(timeout=timeout, max_retries=0)) + + adjusted_retry = retry.with_deadline(timeout, strict_deadline=True).with_predicate( + _should_retry_with_http + ) + + try: + yield adjusted_retry + finally: + http.mount("http://", orig_http_adapter) + http.mount("https://", orig_https_adapter) + + def _should_retry(exc): """Predicate for determining when to retry. @@ -44,6 +92,13 @@ def _should_retry(exc): return reason in _RETRYABLE_REASONS +def _should_retry_with_http(exc): + # TODO: narrow down the list of retriable Requests exceptions? + if isinstance(exc, requests.exceptions.RequestException): + return True + return _should_retry(exc) + + DEFAULT_RETRY = retry.Retry(predicate=_should_retry) """The default retry object.