Skip to content

Commit

Permalink
feat(bigquery): add timeout parameter to QueryJob.done() method (#9875)
Browse files Browse the repository at this point in the history
* feat(bigquery): add timeout to QueryJob.done()

* Add tests for methods that got timeout param

In addition, fix the timeout logic in QueryJob.done() - the timeouts
are in different units (seconds vs. milliseconds)

* Fix lint warning (unused variable)

* Adjust timeout exception type in QueryJob.result()

* Update dependency pins

The new timeout feature requires more recent versions of the API core
and google auth dependencies.

* Add safety margin on top of server-side timeout

If the server-side processing timeout is used (the `timeout_ms` API
parameter) as the total timeout, it should be slightly longer than
the actual server-side timeout in order to not timeout the connection
while there might still be chance that the server-side processing
has actually completed.
  • Loading branch information
plamut authored Dec 19, 2019
1 parent 2dabc2d commit 13bd849
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 19 deletions.
7 changes: 5 additions & 2 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False):
raise

def _get_query_results(
self, job_id, retry, project=None, timeout_ms=None, location=None
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None,
):
"""Get the query results object for a query job.
Expand All @@ -1096,6 +1096,9 @@ def _get_query_results(
(Optional) number of milliseconds the the API call should
wait for the query to complete before the request times out.
location (str): Location of the query job.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.
Returns:
google.cloud.bigquery.query._QueryResults:
Expand All @@ -1122,7 +1125,7 @@ def _get_query_results(
# job is complete (from QueryJob.done(), called ultimately from
# QueryJob.result()). So we don't need to poll here.
resource = self._call_api(
retry, method="GET", path=path, query_params=extra_params
retry, method="GET", path=path, query_params=extra_params, timeout=timeout
)
return _QueryResults.from_api_repr(resource)

Expand Down
52 changes: 44 additions & 8 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

"""Define API Jobs."""

from __future__ import division

import concurrent.futures
import copy
import re
import threading

import requests
import six
from six.moves import http_client

Expand Down Expand Up @@ -50,6 +54,7 @@
_DONE_STATE = "DONE"
_STOPPED_REASON = "stopped"
_TIMEOUT_BUFFER_SECS = 0.1
_SERVER_TIMEOUT_MARGIN_SECS = 1.0
_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)

_ERROR_REASON_TO_EXCEPTION = {
Expand Down Expand Up @@ -663,7 +668,7 @@ def exists(self, client=None, retry=DEFAULT_RETRY):
else:
return True

def reload(self, client=None, retry=DEFAULT_RETRY):
def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None):
"""API call: refresh job properties via a GET request.
See
Expand All @@ -675,6 +680,9 @@ def reload(self, client=None, retry=DEFAULT_RETRY):
``client`` stored on the current dataset.
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.
"""
client = self._require_client(client)

Expand All @@ -683,7 +691,11 @@ def reload(self, client=None, retry=DEFAULT_RETRY):
extra_params["location"] = self.location

api_response = client._call_api(
retry, method="GET", path=self.path, query_params=extra_params
retry,
method="GET",
path=self.path,
query_params=extra_params,
timeout=timeout,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -2994,9 +3006,16 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY):
def done(self, retry=DEFAULT_RETRY, timeout=None):
"""Refresh the job and checks if it is complete.
Args:
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves query results.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.
Returns:
bool: True if the job is complete, False otherwise.
"""
Expand All @@ -3007,11 +3026,25 @@ def done(self, retry=DEFAULT_RETRY):
timeout_ms = None
if self._done_timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
timeout = max(min(timeout, 10), 0)
self._done_timeout -= timeout
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
api_timeout = max(min(api_timeout, 10), 0)
self._done_timeout -= api_timeout
self._done_timeout = max(0, self._done_timeout)
timeout_ms = int(timeout * 1000)
timeout_ms = int(api_timeout * 1000)

# If the server-side processing timeout (timeout_ms) is specified and
# would be picked as the total request timeout, we want to add a small
# margin to it - we don't want to timeout the connection just as the
# server-side processing might have completed, but instead slightly
# after the server-side deadline.
# However, if `timeout` is specified, and is shorter than the adjusted
# server timeout, the former prevails.
if timeout_ms is not None and timeout_ms > 0:
server_timeout_with_margin = timeout_ms / 1000 + _SERVER_TIMEOUT_MARGIN_SECS
if timeout is not None:
timeout = min(server_timeout_with_margin, timeout)
else:
timeout = server_timeout_with_margin

# Do not refresh is the state is already done, as the job will not
# change once complete.
Expand All @@ -3022,13 +3055,14 @@ def done(self, retry=DEFAULT_RETRY):
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=timeout,
)

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry)
self.reload(retry=retry, timeout=timeout)

return self.state == _DONE_STATE

Expand Down Expand Up @@ -3132,6 +3166,8 @@ def result(
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
raise
except requests.exceptions.Timeout as exc:
six.raise_from(concurrent.futures.TimeoutError, exc)

# If the query job is complete but there are no query results, this was
# special job, such as a DDL query. Return an empty result set to
Expand Down
2 changes: 2 additions & 0 deletions bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
release_status = "Development Status :: 5 - Production/Stable"
dependencies = [
'enum34; python_version < "3.4"',
"google-auth >= 1.9.0, < 2.0dev",
"google-api-core >= 1.15.0, < 2.0dev",
"google-cloud-core >= 1.0.3, < 2.0dev",
"google-resumable-media >= 0.3.1, != 0.4.0, < 0.6.0dev",
"protobuf >= 3.6.0",
Expand Down
24 changes: 24 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import uuid
import re

import requests
import six
import psutil
import pytest
Expand Down Expand Up @@ -1893,6 +1894,29 @@ def test_query_iter(self):
row_tuples = [r.values() for r in query_job]
self.assertEqual(row_tuples, [(1,)])

def test_querying_data_w_timeout(self):
job_config = bigquery.QueryJobConfig()
job_config.use_query_cache = False

query_job = Config.CLIENT.query(
"""
SELECT name, SUM(number) AS total_people
FROM `bigquery-public-data.usa_names.usa_1910_current`
GROUP BY name
""",
location="US",
job_config=job_config,
)

# Specify a very tight deadline to demonstrate that the timeout
# actually has effect.
with self.assertRaises(requests.exceptions.Timeout):
query_job.done(timeout=0.1)

# Now wait for the result using a more realistic deadline.
query_job.result(timeout=30)
self.assertTrue(query_job.done(timeout=30))

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_query_results_to_dataframe(self):
QUERY = """
Expand Down
3 changes: 3 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,14 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self):
project="other-project",
location=self.LOCATION,
timeout_ms=500,
timeout=42,
)

conn.api_request.assert_called_once_with(
method="GET",
path="/projects/other-project/queries/nothere",
query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION},
timeout=42,
)

def test__get_query_results_miss_w_client_location(self):
Expand All @@ -248,6 +250,7 @@ def test__get_query_results_miss_w_client_location(self):
method="GET",
path="/projects/PROJECT/queries/nothere",
query_params={"maxResults": 0, "location": self.LOCATION},
timeout=None,
)

def test__get_query_results_hit(self):
Expand Down
Loading

0 comments on commit 13bd849

Please sign in to comment.