Skip to content

Commit

Permalink
feat: retryable resource exhausted handling
Browse files Browse the repository at this point in the history
BigQuery Storage Read API will start returning retryable
RESOURCE_EXHAUSTED errors in 2022 when certain concurrency limits are
hit, so this PR adds some code to handle them.

Tested with unit tests and system tests. System tests ran successfully
on a test project that intentionally returns retryable
RESOURCE_EXHAUSTED errors.
  • Loading branch information
esert-g committed Jan 7, 2022
1 parent bbc93d5 commit b207dc1
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 86 deletions.
16 changes: 8 additions & 8 deletions google/cloud/bigquery_storage_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def read_rows(
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=(),
retry_delay_callback=None,
):
"""
Reads rows from the table in the format prescribed by the read
Expand Down Expand Up @@ -108,6 +109,12 @@ def read_rows(
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
BigQueryReadClient will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
Expand All @@ -122,19 +129,12 @@ def read_rows(
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
retry_delay_callback=retry_delay_callback,
)


Expand Down
62 changes: 51 additions & 11 deletions google/cloud/bigquery_storage_v1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import collections
import io
import json
import time

try:
import fastavro
except ImportError: # pragma: NO COVER
fastavro = None
import google.api_core.exceptions
import google.rpc.error_details_pb2

try:
import pandas
Expand Down Expand Up @@ -79,16 +81,17 @@ class ReadRowsStream(object):
If the pandas and fastavro libraries are installed, use the
:func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.to_dataframe()`
method to parse all messages into a :class:`pandas.DataFrame`.
This object should not be created directly, but is returned by
other methods in this library.
"""

def __init__(self, wrapped, client, name, offset, read_rows_kwargs):
def __init__(
self, client, name, offset, read_rows_kwargs, retry_delay_callback=None
):
"""Construct a ReadRowsStream.
Args:
wrapped (Iterable[ \
~google.cloud.bigquery_storage.types.ReadRowsResponse \
]):
The ReadRows stream to read.
client ( \
~google.cloud.bigquery_storage_v1.services. \
big_query_read.BigQueryReadClient \
Expand All @@ -106,6 +109,12 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs):
read_rows_kwargs (dict):
Keyword arguments to use when reconnecting to a ReadRows
stream.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
ReadRowsStream will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
Iterable[ \
Expand All @@ -116,11 +125,11 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs):

# Make a copy of the read position so that we can update it without
# mutating the original input.
self._wrapped = wrapped
self._client = client
self._name = name
self._offset = offset
self._read_rows_kwargs = read_rows_kwargs
self._retry_delay_callback = retry_delay_callback

def __iter__(self):
"""An iterable of messages.
Expand All @@ -131,12 +140,13 @@ def __iter__(self):
]:
A sequence of row messages.
"""

# Infinite loop to reconnect on reconnectable errors while processing
# the row stream.
while True:
try:
for message in self._wrapped:
messages = self._reconnect()

for message in messages:
rowcount = message.row_count
self._offset += rowcount
yield message
Expand All @@ -149,15 +159,45 @@ def __iter__(self):
)
if not resumable_error:
raise
except google.api_core.exceptions.ResourceExhausted as exc:
# ResourceExhausted errors are only retried if a valid
# RetryInfo is provided with the error.
resumable_error = False
# ResourceExhausted doesn't seem to have
# details/_details fields by default when it is
# generated by Python 3.6 unit tests, so we have to
# work around that.
# TODO: to remove this logic when we require
# google-api-core >= 2.2.0
details = None
if hasattr(exc, "details"):
details = exc.details
elif hasattr(exc, "_details"):
details = exc._details
if details is not None:
for detail in details:
if isinstance(detail, google.rpc.error_details_pb2.RetryInfo):
retry_delay = detail.retry_delay
if retry_delay is not None:
resumable_error = True
delay = max(
0,
float(retry_delay.seconds)
+ (float(retry_delay.nanos) / 1e9),
)
if self._retry_delay_callback:
self._retry_delay_callback(delay)
time.sleep(delay)
break
if not resumable_error:
raise
except _STREAM_RESUMPTION_EXCEPTIONS:
# Transient error, so reconnect to the stream.
pass

self._reconnect()

def _reconnect(self):
"""Reconnect to the ReadRows stream using the most recent offset."""
self._wrapped = self._client.read_rows(
return self._client.read_rows(
read_stream=self._name, offset=self._offset, **self._read_rows_kwargs
)

Expand Down
16 changes: 8 additions & 8 deletions google/cloud/bigquery_storage_v1beta2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def read_rows(
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT,
metadata=(),
retry_delay_callback=None,
):
"""
Reads rows from the table in the format prescribed by the read
Expand Down Expand Up @@ -109,6 +110,12 @@ def read_rows(
specified, the timeout applies to each individual attempt.
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
that is provided to the method.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
BigQueryReadClient will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
~google.cloud.bigquery_storage_v1.reader.ReadRowsStream:
Expand All @@ -123,19 +130,12 @@ def read_rows(
ValueError: If the parameters are invalid.
"""
gapic_client = super(BigQueryReadClient, self)
stream = gapic_client.read_rows(
read_stream=name,
offset=offset,
retry=retry,
timeout=timeout,
metadata=metadata,
)
return reader.ReadRowsStream(
stream,
gapic_client,
name,
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
retry_delay_callback=retry_delay_callback,
)


Expand Down
Loading

0 comments on commit b207dc1

Please sign in to comment.