Skip to content

Commit

Permalink
Catch errors on reloading failed query jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Jan 27, 2021
1 parent 7d9a7e8 commit a19f07e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
27 changes: 16 additions & 11 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,12 +988,8 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
unfinished jobs before checking. Default ``True``.
Returns:
bool: True if the job is complete, False otherwise.
Raises:
google.api_core.exceptions,GoogleAPICallError:
If a non-retryable error ocurrs while reloading the job metadata,
or if too many retry attempts fail.
bool: ``True`` if the job is complete or if fetching its status resulted in
an error, ``False`` otherwise.
"""
# Do not refresh if the state is already done, as the job will not
# change once complete.
Expand All @@ -1008,16 +1004,25 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Reloading also updates error details on self, no need for an
# explicit self.set_exception() call.
self.reload(retry=retry, timeout=transport_timeout)
return self.state == _DONE_STATE
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)
return True
else:
return self.state == _DONE_STATE

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry, timeout=transport_timeout)
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)
return True

return self.state == _DONE_STATE

Expand Down
25 changes: 22 additions & 3 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
call_args = fake_reload.call_args
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)

def test_done_w_query_results_error(self):
def test_done_w_query_results_error_reload_ok(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)
Expand All @@ -378,7 +378,23 @@ def fake_reload(self, *args, **kwargs):
assert is_done
assert isinstance(job._exception, exceptions.BadRequest)

def test_done_w_job_reload_error(self):
def test_done_w_query_results_error_reload_error(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
reload_error = exceptions.DataLoss("Oops, sorry!")
job.reload = mock.Mock(side_effect=reload_error)
job._exception = None

is_done = job.done()

assert is_done
assert job._exception is reload_error

def test_done_w_job_query_results_ok_reload_error(self):
client = _make_client(project=self.PROJECT)
query_results = google.cloud.bigquery.query._QueryResults(
properties={
Expand All @@ -394,7 +410,10 @@ def test_done_w_job_reload_error(self):
job.reload = mock.Mock(side_effect=retry_error)
job._exception = None

self.assertRaisesRegex(exceptions.RetryError, r"Too many retries", job.done)
is_done = job.done()

assert is_done
assert job._exception is retry_error

def test_query_plan(self):
from google.cloud._helpers import _RFC3339_MICROS
Expand Down

0 comments on commit a19f07e

Please sign in to comment.