Skip to content

Commit

Permalink
feat: improve timeout structure (#819)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored Jun 29, 2023
1 parent eedde1e commit 07438ca
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 267 deletions.
4 changes: 2 additions & 2 deletions .github/.OwlBot.lock.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# limitations under the License.
docker:
image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest
digest: sha256:240b5bcc2bafd450912d2da2be15e62bc6de2cf839823ae4bf94d4f392b451dc
# created: 2023-06-03T21:25:37.968717478Z
digest: sha256:ddf4551385d566771dc713090feb7b4c1164fb8a698fe52bbe7670b24236565b
# created: 2023-06-27T13:04:21.96690344Z
8 changes: 4 additions & 4 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ def __init__(
table: "TableAsync",
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
per_request_timeout: float | None,
attempt_timeout: float | None,
):
"""
Args:
- gapic_client: the client to use for the mutate_rows call
- table: the table associated with the request
- mutation_entries: a list of RowMutationEntry objects to send to the server
- operation_timeout: the timeout t o use for the entire operation, in seconds.
- per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds.
- operation_timeout: the timeout to use for the entire operation, in seconds.
- attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
"""
# check that mutations are within limits
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(
self._operation = _convert_retry_deadline(retry_wrapped, operation_timeout)
# initialize state
self.timeout_generator = _attempt_timeout_generator(
per_request_timeout, operation_timeout
attempt_timeout, operation_timeout
)
self.mutations = mutation_entries
self.remaining_indices = list(range(len(self.mutations)))
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def __init__(
client: BigtableAsyncClient,
*,
operation_timeout: float = 600.0,
per_request_timeout: float | None = None,
attempt_timeout: float | None = None,
):
"""
Args:
- request: the request dict to send to the Bigtable API
- client: the Bigtable client to use to make the request
- operation_timeout: the timeout to use for the entire operation, in seconds
- per_request_timeout: the timeout to use when waiting for each individual grpc request, in seconds
- attempt_timeout: the timeout to use when waiting for each individual grpc request, in seconds
If not specified, defaults to operation_timeout
"""
self._last_emitted_row_key: bytes | None = None
Expand All @@ -79,7 +79,7 @@ def __init__(
self.operation_timeout = operation_timeout
# use generator to lower per-attempt timeout as we approach operation_timeout deadline
attempt_timeout_gen = _attempt_timeout_generator(
per_request_timeout, operation_timeout
attempt_timeout, operation_timeout
)
row_limit = request.get("rows_limit", 0)
# lock in paramters for retryable wrapper
Expand Down
368 changes: 233 additions & 135 deletions google/cloud/bigtable/data/_async/client.py

Large diffs are not rendered by default.

31 changes: 13 additions & 18 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
from google.cloud.bigtable.data._helpers import _validate_timeouts

from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
from google.cloud.bigtable.data._async._mutate_rows import (
Expand Down Expand Up @@ -189,7 +190,7 @@ def __init__(
flow_control_max_mutation_count: int = 100_000,
flow_control_max_bytes: int = 100 * _MB_SIZE,
batch_operation_timeout: float | None = None,
batch_per_request_timeout: float | None = None,
batch_attempt_timeout: float | None = None,
):
"""
Args:
Expand All @@ -202,26 +203,20 @@ def __init__(
- flow_control_max_mutation_count: Maximum number of inflight mutations.
- flow_control_max_bytes: Maximum number of inflight bytes.
- batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None,
table default_operation_timeout will be used
- batch_per_request_timeout: timeout for each individual request, in seconds. If None,
table default_per_request_timeout will be used
table default_mutate_rows_operation_timeout will be used
- batch_attempt_timeout: timeout for each individual request, in seconds. If None,
table default_mutate_rows_attempt_timeout will be used, or batch_operation_timeout
if that is also None.
"""
self._operation_timeout: float = (
batch_operation_timeout or table.default_operation_timeout
batch_operation_timeout or table.default_mutate_rows_operation_timeout
)
self._per_request_timeout: float = (
batch_per_request_timeout
or table.default_per_request_timeout
self._attempt_timeout: float = (
batch_attempt_timeout
or table.default_mutate_rows_attempt_timeout
or self._operation_timeout
)
if self._operation_timeout <= 0:
raise ValueError("batch_operation_timeout must be greater than 0")
if self._per_request_timeout <= 0:
raise ValueError("batch_per_request_timeout must be greater than 0")
if self._per_request_timeout > self._operation_timeout:
raise ValueError(
"batch_per_request_timeout must be less than batch_operation_timeout"
)
_validate_timeouts(self._operation_timeout, self._attempt_timeout)
self.closed: bool = False
self._table = table
self._staged_entries: list[RowMutationEntry] = []
Expand Down Expand Up @@ -346,7 +341,7 @@ async def _execute_mutate_rows(
Args:
- batch: list of RowMutationEntry objects to send to server
- timeout: timeout in seconds. Used as operation_timeout and per_request_timeout.
- timeout: timeout in seconds. Used as operation_timeout and attempt_timeout.
If not given, will use table defaults
Returns:
- list of FailedMutationEntryError objects for mutations that failed.
Expand All @@ -361,7 +356,7 @@ async def _execute_mutate_rows(
self._table,
batch,
operation_timeout=self._operation_timeout,
per_request_timeout=self._per_request_timeout,
attempt_timeout=self._attempt_timeout,
)
await operation.start()
except MutationsExceptionGroup as e:
Expand Down
23 changes: 23 additions & 0 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,26 @@ def wrapper(*args, **kwargs):
handle_error()

return wrapper_async if iscoroutinefunction(func) else wrapper


def _validate_timeouts(
operation_timeout: float, attempt_timeout: float | None, allow_none: bool = False
):
"""
Helper function that will verify that timeout values are valid, and raise
an exception if they are not.
Args:
- operation_timeout: The timeout value to use for the entire operation, in seconds.
- attempt_timeout: The timeout value to use for each attempt, in seconds.
- allow_none: If True, attempt_timeout can be None. If False, None values will raise an exception.
Raises:
- ValueError if operation_timeout or attempt_timeout are invalid.
"""
if operation_timeout <= 0:
raise ValueError("operation_timeout must be greater than 0")
if not allow_none and attempt_timeout is None:
raise ValueError("attempt_timeout must not be None")
elif attempt_timeout is not None:
if attempt_timeout <= 0:
raise ValueError("attempt_timeout must be greater than 0")
3 changes: 1 addition & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,9 @@ def docfx(session):

session.install("-e", ".")
session.install(
"sphinx==4.0.1",
"gcp-sphinx-docfx-yaml",
"alabaster",
"recommonmark",
"gcp-sphinx-docfx-yaml",
)

shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True)
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/data/_async/test__mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _make_one(self, *args, **kwargs):
kwargs["table"] = kwargs.pop("table", AsyncMock())
kwargs["mutation_entries"] = kwargs.pop("mutation_entries", [])
kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5)
kwargs["per_request_timeout"] = kwargs.pop("per_request_timeout", 0.1)
kwargs["attempt_timeout"] = kwargs.pop("attempt_timeout", 0.1)
return self._target_class()(*args, **kwargs)

async def _mock_stream(self, mutation_list, error_dict):
Expand Down Expand Up @@ -267,7 +267,7 @@ async def test_run_attempt_single_entry_success(self):
mock_gapic_fn = self._make_mock_gapic({0: mutation})
instance = self._make_one(
mutation_entries=[mutation],
per_request_timeout=expected_timeout,
attempt_timeout=expected_timeout,
)
with mock.patch.object(instance, "_gapic_fn", mock_gapic_fn):
await instance._run_attempt()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/data/_async/test__read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_ctor(self):
request,
client,
operation_timeout=expected_operation_timeout,
per_request_timeout=expected_request_timeout,
attempt_timeout=expected_request_timeout,
)
assert time_gen_mock.call_count == 1
time_gen_mock.assert_called_once_with(
Expand Down
Loading

0 comments on commit 07438ca

Please sign in to comment.