Skip to content

Commit

Permalink
feat: retryable resource exhausted handling
Browse files Browse the repository at this point in the history
BigQuery Storage Read API will start returning retryable
RESOURCE_EXHAUSTED errors in 2022 when certain concurrency limits are
hit, so this PR adds some code to handle them.

Tested with unit tests and system tests. System tests ran successfully
on a test project that intentionally returns retryable
RESOURCE_EXHAUSTED errors.
  • Loading branch information
esert-g committed Jan 12, 2022
1 parent fd454e6 commit a69b1ee
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 94 deletions.
16 changes: 8 additions & 8 deletions google/cloud/bigquery_storage_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def read_rows(
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=(),
retry_delay_callback=None,
):
"""
Reads rows from the table in the format prescribed by the read
Expand Down Expand Up @@ -108,6 +109,12 @@ def read_rows(
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
BigQueryReadClient will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
Expand All @@ -122,19 +129,12 @@ def read_rows(
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
retry_delay_callback=retry_delay_callback,
)


Expand Down
89 changes: 70 additions & 19 deletions google/cloud/bigquery_storage_v1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import collections
import io
import json
import time

try:
import fastavro
except ImportError: # pragma: NO COVER
fastavro = None
import google.api_core.exceptions
import google.rpc.error_details_pb2

try:
import pandas
Expand Down Expand Up @@ -79,16 +81,17 @@ class ReadRowsStream(object):
If the pandas and fastavro libraries are installed, use the
:func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.to_dataframe()`
method to parse all messages into a :class:`pandas.DataFrame`.
This object should not be created directly, but is returned by
other methods in this library.
"""

def __init__(self, wrapped, client, name, offset, read_rows_kwargs):
def __init__(
self, client, name, offset, read_rows_kwargs, retry_delay_callback=None
):
"""Construct a ReadRowsStream.
Args:
wrapped (Iterable[ \
~google.cloud.bigquery_storage.types.ReadRowsResponse \
]):
The ReadRows stream to read.
client ( \
~google.cloud.bigquery_storage_v1.services. \
big_query_read.BigQueryReadClient \
Expand All @@ -106,6 +109,12 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs):
read_rows_kwargs (dict):
Keyword arguments to use when reconnecting to a ReadRows
stream.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
ReadRowsStream will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
Iterable[ \
Expand All @@ -116,11 +125,12 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs):

# Make a copy of the read position so that we can update it without
# mutating the original input.
self._wrapped = wrapped
self._client = client
self._name = name
self._offset = offset
self._read_rows_kwargs = read_rows_kwargs
self._retry_delay_callback = retry_delay_callback
self._reconnect()

def __iter__(self):
"""An iterable of messages.
Expand All @@ -131,7 +141,6 @@ def __iter__(self):
]:
A sequence of row messages.
"""

# Infinite loop to reconnect on reconnectable errors while processing
# the row stream.
while True:
Expand All @@ -142,24 +151,66 @@ def __iter__(self):
yield message

return # Made it through the whole stream.
except google.api_core.exceptions.InternalServerError as exc:
resumable_error = any(
resumable_message in exc.message
for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES
)
if not resumable_error:
except Exception as exc:
if not self._exception_is_retryable(exc):
raise
except _STREAM_RESUMPTION_EXCEPTIONS:
# Transient error, so reconnect to the stream.
pass

self._reconnect()

def _reconnect(self):
"""Reconnect to the ReadRows stream using the most recent offset."""
self._wrapped = self._client.read_rows(
read_stream=self._name, offset=self._offset, **self._read_rows_kwargs
)
while True:
try:
self._wrapped = self._client.read_rows(
read_stream=self._name,
offset=self._offset,
**self._read_rows_kwargs
)
break
except Exception as exc:
if not self._exception_is_retryable(exc):
raise

def _exception_is_retryable(self, exc):
"""Return true if the exception is considered to be retryable."""
if isinstance(exc, google.api_core.exceptions.InternalServerError):
return any(
resumable_message in exc.message
for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES
)
elif isinstance(exc, google.api_core.exceptions.ResourceExhausted):
# ResourceExhausted errors are only retried if a valid
# RetryInfo is provided with the error.
# ResourceExhausted doesn't seem to have details/_details
# fields by default when it is generated by Python 3.6 unit
# tests, so we have to work around that.
# TODO: to remove this logic when we require
# google-api-core >= 2.2.0
details = None
if hasattr(exc, "details"):
details = exc.details
elif hasattr(exc, "_details"):
details = exc._details
if details is not None:
for detail in details:
if isinstance(detail, google.rpc.error_details_pb2.RetryInfo):
retry_delay = detail.retry_delay
if retry_delay is not None:
delay = max(
0,
float(retry_delay.seconds)
+ (float(retry_delay.nanos) / 1e9),
)
if self._retry_delay_callback:
self._retry_delay_callback(delay)
time.sleep(delay)
return True
return False
else:
# Check for transient errors.
return any(
isinstance(exc, exc_type) for exc_type in _STREAM_RESUMPTION_EXCEPTIONS
)

def rows(self, read_session=None):
"""Iterate over all rows in the stream.
Expand Down
16 changes: 8 additions & 8 deletions google/cloud/bigquery_storage_v1beta2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def read_rows(
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=(),
retry_delay_callback=None,
):
"""
Reads rows from the table in the format prescribed by the read
Expand Down Expand Up @@ -109,6 +110,12 @@ def read_rows(
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
BigQueryReadClient will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
Expand All @@ -123,19 +130,12 @@ def read_rows(
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
retry_delay_callback=retry_delay_callback,
)


Expand Down
Loading

0 comments on commit a69b1ee

Please sign in to comment.