Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update api_core submodule #897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions google/__init__.py

This file was deleted.

6 changes: 0 additions & 6 deletions google/cloud/__init__.py

This file was deleted.

2 changes: 0 additions & 2 deletions google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from google.cloud.bigtable.data.mutations import DeleteAllFromFamily
from google.cloud.bigtable.data.mutations import DeleteAllFromRow

from google.cloud.bigtable.data.exceptions import IdleTimeout
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
Expand Down Expand Up @@ -63,7 +62,6 @@
"DeleteAllFromRow",
"Row",
"Cell",
"IdleTimeout",
"InvalidChunk",
"FailedMutationEntryError",
"FailedQueryShardError",
Expand Down
33 changes: 11 additions & 22 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
from __future__ import annotations

from typing import Sequence, TYPE_CHECKING
import asyncio
from dataclasses import dataclass
import functools

from google.api_core import exceptions as core_exceptions
from google.api_core import retry_async as retries
from google.api_core import retry as retries
import google.cloud.bigtable_v2.types.bigtable as types_pb
import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _convert_retry_deadline
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory

# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
Expand Down Expand Up @@ -101,17 +100,13 @@ def __init__(
# Entry level errors
bt_exceptions._MutateRowsIncomplete,
)
# build retryable operation
retry = retries.AsyncRetry(
predicate=self.is_retryable,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
)
retry_wrapped = retry(self._run_attempt)
self._operation = _convert_retry_deadline(
retry_wrapped, operation_timeout, is_async=True
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
self._operation = retries.retry_target_async(
self._run_attempt,
self.is_retryable,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
# initialize state
self.timeout_generator = _attempt_timeout_generator(
Expand All @@ -130,7 +125,7 @@ async def start(self):
"""
try:
# trigger mutate_rows
await self._operation()
await self._operation
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
Expand Down Expand Up @@ -180,6 +175,7 @@ async def _run_attempt(self):
result_generator = await self._gapic_fn(
timeout=next(self.timeout_generator),
entries=request_entries,
retry=None,
)
async for result_list in result_generator:
for result in result_list.entries:
Expand All @@ -195,13 +191,6 @@ async def _run_attempt(self):
self._handle_entry_error(orig_idx, entry_error)
# remove processed entry from active list
del active_request_indices[result.index]
except asyncio.CancelledError:
# when retry wrapper timeout expires, the operation is cancelled
# make sure incomplete indices are tracked,
# but don't record exception (it will be raised by wrapper)
# TODO: remove asyncio.wait_for in retry wrapper. Let grpc call handle expiration
self.remaining_indices.extend(active_request_indices.values())
raise
except Exception as exc:
# add this exception to list for each mutation that wasn't
# already handled, and update remaining_indices if mutation is retryable
Expand Down
42 changes: 4 additions & 38 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@
from google.cloud.bigtable.data.row import Row, Cell
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _retry_exception_factory

from google.api_core import retry_async as retries
from google.api_core.retry_streaming_async import retry_target_stream
from google.api_core import retry as retries
from google.api_core.retry import exponential_sleep_generator
from google.api_core import exceptions as core_exceptions

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync
Expand Down Expand Up @@ -107,12 +105,12 @@ def start_operation(self) -> AsyncGenerator[Row, None]:
"""
Start the read_rows operation, retrying on retryable errors.
"""
return retry_target_stream(
return retries.retry_target_stream_async(
self._read_rows_attempt,
self._predicate,
exponential_sleep_generator(0.01, 60, multiplier=2),
self.operation_timeout,
exception_factory=self._build_exception,
exception_factory=_retry_exception_factory,
)

def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
Expand Down Expand Up @@ -343,35 +341,3 @@ def _revise_request_rowset(
# this will avoid an unwanted full table scan
raise _RowSetComplete()
return RowSetPB(row_keys=adjusted_keys, row_ranges=adjusted_ranges)

@staticmethod
def _build_exception(
exc_list: list[Exception], is_timeout: bool, timeout_val: float
) -> tuple[Exception, Exception | None]:
"""
Build retry error based on exceptions encountered during operation

Args:
- exc_list: list of exceptions encountered during operation
- is_timeout: whether the operation failed due to timeout
- timeout_val: the operation timeout value in seconds, for constructing
the error message
Returns:
- tuple of the exception to raise, and a cause exception if applicable
"""
if is_timeout:
# if failed due to timeout, raise deadline exceeded as primary exception
source_exc: Exception = core_exceptions.DeadlineExceeded(
f"operation_timeout of {timeout_val} exceeded"
)
elif exc_list:
# otherwise, raise non-retryable error as primary exception
source_exc = exc_list.pop()
else:
source_exc = RuntimeError("failed with unspecified exception")
# use the retry exception group as the cause of the exception
cause_exc: Exception | None = (
RetryExceptionGroup(exc_list) if exc_list else None
)
source_exc.__cause__ = cause_exc
return source_exc, cause_exc
71 changes: 25 additions & 46 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import random
import os

from functools import partial

from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta
from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient
Expand All @@ -43,9 +44,8 @@
)
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest
from google.cloud.client import ClientWithProject
from google.api_core.exceptions import GoogleAPICallError
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
from google.api_core import retry_async as retries
from google.api_core import retry as retries
from google.api_core.exceptions import DeadlineExceeded
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.exceptions import Aborted
Expand All @@ -65,7 +65,7 @@
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _convert_retry_deadline
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data._helpers import _validate_timeouts
from google.cloud.bigtable.data._helpers import _get_retryable_errors
from google.cloud.bigtable.data._helpers import _get_timeouts
Expand Down Expand Up @@ -223,7 +223,7 @@ async def close(self, timeout: float = 2.0):

async def _ping_and_warm_instances(
self, channel: grpc.aio.Channel, instance_key: _WarmedInstanceKey | None = None
) -> list[GoogleAPICallError | None]:
) -> list[BaseException | None]:
"""
Prepares the backend for requests on a channel

Expand Down Expand Up @@ -578,7 +578,6 @@ async def read_rows_stream(
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
from any retries that failed
- GoogleAPIError: raised if the request encounters an unrecoverable error
- IdleTimeout: if iterator was abandoned
"""
operation_timeout, attempt_timeout = _get_timeouts(
operation_timeout, attempt_timeout, self
Expand Down Expand Up @@ -761,6 +760,9 @@ async def read_rows_sharded(
for result in batch_result:
if isinstance(result, Exception):
error_dict[shard_idx] = result
elif isinstance(result, BaseException):
# BaseException not expected; raise immediately
raise result
else:
results_list.extend(result)
shard_idx += 1
Expand Down Expand Up @@ -872,22 +874,8 @@ async def sample_row_keys(
# prepare retryable
retryable_excs = _get_retryable_errors(retryable_errors, self)
predicate = retries.if_exception_type(*retryable_excs)
transient_errors = []

def on_error_fn(exc):
# add errors to list if retryable
if predicate(exc):
transient_errors.append(exc)

retry = retries.AsyncRetry(
predicate=predicate,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
on_error=on_error_fn,
is_stream=False,
)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

# prepare request
metadata = _make_metadata(self.table_name, self.app_profile_id)
Expand All @@ -902,10 +890,13 @@ async def execute_rpc():
)
return [(s.row_key, s.offset_bytes) async for s in results]

wrapped_fn = _convert_retry_deadline(
retry(execute_rpc), operation_timeout, transient_errors, is_async=True
return await retries.retry_target_async(
execute_rpc,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
return await wrapped_fn()

def mutations_batcher(
self,
Expand Down Expand Up @@ -1014,37 +1005,25 @@ async def mutate_row(
# mutations should not be retried
predicate = retries.if_exception_type()

transient_errors = []

def on_error_fn(exc):
if predicate(exc):
transient_errors.append(exc)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

retry = retries.AsyncRetry(
predicate=predicate,
on_error=on_error_fn,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
)
# wrap rpc in retry logic
retry_wrapped = retry(self.client._gapic_client.mutate_row)
# convert RetryErrors from retry wrapper into DeadlineExceeded errors
deadline_wrapped = _convert_retry_deadline(
retry_wrapped, operation_timeout, transient_errors, is_async=True
)
metadata = _make_metadata(self.table_name, self.app_profile_id)
# trigger rpc
await deadline_wrapped(
target = partial(
self.client._gapic_client.mutate_row,
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=attempt_timeout,
metadata=metadata,
metadata=_make_metadata(self.table_name, self.app_profile_id),
retry=None,
)
return await retries.retry_target_async(
target,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)

async def bulk_mutate_rows(
self,
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ async def _wait_for_batch_results(
if isinstance(result, Exception):
# will receive direct Exception objects if request task fails
found_errors.append(result)
elif isinstance(result, BaseException):
# BaseException not expected from grpc calls. Raise immediately
raise result
elif result:
# completed requests will return a list of FailedMutationEntryError
for e in result:
Expand Down
Loading
Loading