Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python BQ] Retry get_table for quota errors #28820

Merged
merged 18 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from apache_beam.internal import pickler
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery as beam_bq
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
Expand Down Expand Up @@ -83,6 +84,7 @@

try:
from apitools.base.py.exceptions import HttpError
from apitools.base.py.exceptions import HttpForbiddenError
from google.cloud import bigquery as gcp_bigquery
from google.api_core import exceptions
except ImportError:
Expand Down Expand Up @@ -419,6 +421,189 @@ def test_create_temp_dataset_exception(self, exception_type, error_message):
mock_insert.assert_called()
self.assertIn(error_message, exc.exception.args[0])

@parameterized.expand([
# first attempt returns a Http 403 error with bad contents and will retry
# second attempt returns a Http 408 error,
# third attempt passes
param(
responses=[
HttpForbiddenError(
response={'status': 403}, content="bad contents", url=""),
HttpForbiddenError(
response={'status': 408}, content="bad contents", url="")
],
expected_retries=2),
# first attempts returns a 403 rateLimitExceeded error
# second attempt returns a 403 quotaExceeded error
# third attempt returns a Http 403 quotaExceeded error
# fourth attempt passes
param(
responses=[
exceptions.Forbidden(
"some message",
errors=({
"message": "transient", "reason": "rateLimitExceeded"
}, )),
exceptions.Forbidden(
"some message",
errors=({
"message": "transient", "reason": "quotaExceeded"
}, )),
HttpForbiddenError(
response={'status': 403},
content={
"error": {
"errors": [{
"message": "transient", "reason": "quotaExceeded"
}]
}
},
url=""),
],
expected_retries=3),
])
def test_get_table_transient_exception(self, responses, expected_retries):
class DummyTable:
class DummySchema:
fields = []

numBytes = 5
schema = DummySchema()

with mock.patch('time.sleep'), \
mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService,
'Get') as mock_get_table, \
mock.patch.object(BigQueryWrapper,
'wait_for_bq_job'), \
mock.patch.object(BigQueryWrapper,
'perform_extract_job'), \
mock.patch.object(FileSystems,
'match'), \
mock.patch.object(FileSystems,
'delete'), \
beam.Pipeline() as p:
call_counter = 0

def store_callback(unused_request):
nonlocal call_counter
if call_counter < len(responses):
exception = responses[call_counter]
call_counter += 1
raise exception
else:
call_counter += 1
return DummyTable()

mock_get_table.side_effect = store_callback
_ = p | beam.io.ReadFromBigQuery(
table="project.dataset.table", gcs_location="gs://some_bucket")

# ReadFromBigQuery export mode calls get_table() twice. Once to get
# metadata (numBytes), and once to retrieve the table's schema
# Any additional calls are retries
self.assertEqual(expected_retries, mock_get_table.call_count - 2)

@parameterized.expand([
# first attempt returns a Http 403 with transient reason and retries
# second attempt returns a Http 403 with non-transient reason and fails
param(
responses=[
HttpForbiddenError(
response={'status': 403},
content={
"error": {
"errors": [{
"message": "transient", "reason": "quotaExceeded"
}]
}
},
url=""),
HttpForbiddenError(
response={'status': 403},
content={
"error": {
"errors": [{
"message": "transient", "reason": "accessDenied"
}]
}
},
url="")
],
expected_retries=1),
# first attempt returns a transient 403 error and retries
# second attempt returns a non-transient error and fails
param(
responses=[
HttpError(
response={'status': 403}, content="rateLimitExceeded",
url=""),
HttpError(response={'status': 400}, content="invalid", url="")
],
expected_retries=1),
# first attempt returns a transient 403 error and retries
# second attempt returns a 403 error with bad contents and retries
# third attempt returns a 403 with non-transient reason and fails
param(
responses=[
exceptions.Forbidden(
"some error",
errors=({
"message": "transient", "reason": "rateLimitExceeded"
}, )),
HttpError(
response={'status': 403}, content="bad contents", url=""),
exceptions.Forbidden(
"some error",
errors=({
"message": "transient", "reason": "accessDenied"
}, )),
],
expected_retries=2),
])
def test_get_table_non_transient_exception(self, responses, expected_retries):
class DummyTable:
class DummySchema:
fields = []

numBytes = 5
schema = DummySchema()

with mock.patch('time.sleep'), \
mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService,
'Get') as mock_get_table, \
mock.patch.object(BigQueryWrapper,
'wait_for_bq_job'), \
mock.patch.object(BigQueryWrapper,
'perform_extract_job'), \
mock.patch.object(FileSystems,
'match'), \
mock.patch.object(FileSystems,
'delete'), \
self.assertRaises(Exception), \
beam.Pipeline() as p:
call_counter = 0

def store_callback(unused_request):
nonlocal call_counter
if call_counter < len(responses):
exception = responses[call_counter]
call_counter += 1
raise exception
else:
call_counter += 1
return DummyTable()

mock_get_table.side_effect = store_callback
_ = p | beam.io.ReadFromBigQuery(
table="project.dataset.table", gcs_location="gs://some_bucket")

# ReadFromBigQuery export mode calls get_table() twice. Once to get
# metadata (numBytes), and once to retrieve the table's schema
# However, the second call is never reached because this test will always
# fail before it does so
# After the first call, any additional calls are retries
self.assertEqual(expected_retries, mock_get_table.call_count - 1)

@parameterized.expand([
param(
exception_type=exceptions.BadRequest if exceptions else None,
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ def _insert_all_rows(

@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter)
def get_table(self, project_id, dataset_id, table_id):
"""Lookup a table's metadata object.

Expand Down
35 changes: 32 additions & 3 deletions sdks/python/apache_beam/utils/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import sys
import time
import traceback
import requests.exceptions

from apache_beam.io.filesystem import BeamIOError

Expand All @@ -42,9 +43,11 @@
try:
from apitools.base.py.exceptions import HttpError
from google.api_core.exceptions import GoogleAPICallError
from google.api_core import exceptions
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
except ImportError as e:
HttpError = None
GoogleAPICallError = None # type: ignore
exceptions = None

# Protect against environments where aws tools are not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
Expand All @@ -57,6 +60,18 @@
# pylint: enable=wrong-import-order, wrong-import-position

_LOGGER = logging.getLogger(__name__)
_RETRYABLE_REASONS = [
"rateLimitExceeded", "quotaExceeded", "internalError", "backendError"
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
]
_RETRYABLE_TYPES = (
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
exceptions.TooManyRequests if exceptions else None,
exceptions.InternalServerError if exceptions else None,
exceptions.BadGateway if exceptions else None,
exceptions.ServiceUnavailable if exceptions else None,
exceptions.DeadlineExceeded if exceptions else None,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
ConnectionError)


class PermanentException(Exception):
Expand Down Expand Up @@ -168,15 +183,29 @@ def retry_on_server_errors_and_timeout_filter(exception):
def retry_on_server_errors_timeout_or_quota_issues_filter(exception):
"""Retry on server, timeout and 403 errors.

403 errors can be accessDenied, billingNotEnabled, and also quotaExceeded,
rateLimitExceeded."""
403 errors include both transient (accessDenied, billingNotEnabled) and
non-transient errors (quotaExceeded, rateLimitExceeded). Only retry transient
errors."""
if HttpError is not None and isinstance(exception, HttpError):
if exception.status_code == 403:
try:
# attempt to extract the reason and check if it's retryable
return exception.content["error"]["errors"][0][
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
"reason"] in _RETRYABLE_REASONS
except Exception:
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.debug(
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
"Could not determine if HttpError is non-transient. Will retry: %s",
exception)
return True
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
if GoogleAPICallError is not None and isinstance(exception,
GoogleAPICallError):
if exception.code == 403:
return True
if not hasattr(exception, "errors") or len(exception.errors) == 0:
# default to retrying
return True

reason = exception.errors[0]["reason"]
return reason in _RETRYABLE_REASONS
if S3ClientError is not None and isinstance(exception, S3ClientError):
if exception.code == 403:
return True
Expand Down