Skip to content

Commit

Permalink
gcp: bulk request for object storage deletion
Browse files Browse the repository at this point in the history
Batch multiple delete google API calls in a single HTTP request.
A single batch request is limited to 1000 calls.
When we request more calls that that, we execute multiple batch requests.
  • Loading branch information
tilman-aiven committed Sep 17, 2024
1 parent 47d3ed7 commit 45d6cca
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 5 deletions.
43 changes: 39 additions & 4 deletions rohmu/object_storage/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

from contextlib import contextmanager
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.errors import BatchError, HttpError
from googleapiclient.http import (
BatchHttpRequest,
build_http,
HttpRequest,
MediaDownloadProgress,
Expand Down Expand Up @@ -45,12 +46,13 @@
GoogleObjectStorageConfig as Config,
)
from rohmu.typing import AnyPath, Metadata
from rohmu.util import get_total_size_from_content_range
from rohmu.util import batched, get_total_size_from_content_range
from typing import (
Any,
BinaryIO,
Callable,
cast,
Collection,
Iterable,
Iterator,
Optional,
Expand Down Expand Up @@ -302,13 +304,15 @@ def _bucket_client(self) -> Iterator[Any]:
self.gs_bucket_client = self.gs.buckets()
yield self.gs_bucket_client

def _retry_on_reset(self, request: HttpRequest, action: Callable[[], ResType], retry_reporter: Reporter) -> ResType:
def _retry_on_reset(
self, request: HttpRequest | BatchHttpRequest, action: Callable[[], ResType], retry_reporter: Reporter
) -> ResType:
retries = 60
retry_wait = 2.0
while True:
try:
return action()
except (IncompleteRead, HttpError, ssl.SSLEOFError, socket.timeout, OSError, socket.gaierror) as ex:
except (IncompleteRead, HttpError, BatchError, ssl.SSLEOFError, socket.timeout, OSError, socket.gaierror) as ex:
# Note that socket.timeout and ssl.SSLEOFError inherit from OSError
# and the order of handling the errors here needs to be correct
if not retries:
Expand All @@ -327,6 +331,9 @@ def _retry_on_reset(self, request: HttpRequest, action: Callable[[], ResType], r
# getaddrinfo sometimes fails with "Name or service not known"
elif isinstance(ex, socket.gaierror) and ex.errno != socket.EAI_NONAME:
raise
# batch request or response has a wrong format
elif isinstance(ex, BatchError):
raise

self.log.warning("%s failed: %s (%s), retrying in %.2fs", action, ex.__class__.__name__, ex, retry_wait)

Expand Down Expand Up @@ -463,6 +470,34 @@ def delete_key(self, key: str) -> None:
reporter.report(self.stats)
self.notifier.object_deleted(key)

def delete_keys(self, keys: Collection[str]) -> None:
self.log.debug("Deleting %i keys", len(keys))
reporter = Reporter(StorageOperation.delete_key)
with self._object_client() as object_resource:
for keys_batch in batched(keys, 1000): # Cannot delete more than 1000 objects at a time
assert self.gs is not None
batch_request = self.gs.new_batch_http_request(callback=self._delete_keys_callback)
for key in keys_batch:
path = self.format_key_for_backend(key)
self.log.debug("Deleting key: %r", path)
batch_request.add(object_resource.delete(bucket=self.bucket_name, object=path), request_id=key)

self._retry_on_reset(batch_request, batch_request.execute, retry_reporter=reporter)

for key in keys_batch:
self.log.debug("Deleted key: %r", key)
self.notifier.object_deleted(key)

reporter.report(self.stats)

def _delete_keys_callback(self, request_id: str, response: HttpRequest, exception: HttpError | None) -> None:
if exception is not None:
# Do something with the exception
...
else:
# Do something with the response
...

def get_contents_to_fileobj(
self,
key: str,
Expand Down
41 changes: 40 additions & 1 deletion test/object_storage/test_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,44 @@ def test_store_file_object() -> None:
)


@pytest.mark.parametrize(
("total_keys,expected_bulk_request_count"),
(
(0, 0),
(1, 1),
(1000, 1),
(1999, 2),
(2000, 2),
(10_000, 10),
),
)
def test_delete_keys(total_keys: int, expected_bulk_request_count: int) -> None:
notifier = MagicMock()
test_keys = _generate_keys(total_keys)
with ExitStack() as stack:
stack.enter_context(patch("rohmu.object_storage.google.get_credentials"))
stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._create_object_store_if_needed_unwrapped"))
mock_retry_on_reset = stack.enter_context(patch("rohmu.object_storage.google.GoogleTransfer._retry_on_reset"))
transfer = GoogleTransfer(
project_id="test-project-id",
bucket_name="test-bucket",
notifier=notifier,
)
mock_client = stack.enter_context(patch.object(transfer, "_object_client"))
mock_request = _mock_request([], resumable=None)
mock_client.return_value.__enter__.return_value.delete.return_value = mock_request

transfer.delete_keys(keys=test_keys)

assert notifier.object_deleted.call_count == total_keys
notifier.object_deleted.assert_has_calls([call(key) for key in test_keys])
assert mock_retry_on_reset.call_count == expected_bulk_request_count


def _generate_keys(total: int, prefix: str = "test_key_") -> list[str]:
return [f"{prefix}{i+1}" for i in range(total)]


def test_upload_size_unknown_to_reporter() -> None:
notifier = MagicMock()
with ExitStack() as stack:
Expand Down Expand Up @@ -253,7 +291,7 @@ def test_get_contents_to_fileobj_raises_error_on_invalid_byte_range() -> None:
)


def _mock_request(calls: list[tuple[str, bytes]]) -> Mock:
def _mock_request(calls: list[tuple[str, bytes]], resumable: bool | None = None) -> Mock:
results = []
for call_content_range, call_content in calls:
response = Mock()
Expand All @@ -268,6 +306,7 @@ def _mock_request(calls: list[tuple[str, bytes]]) -> Mock:
request = Mock()
request.headers = {}
request.http.request = http_call
request.resumable = resumable
return request


Expand Down

0 comments on commit 45d6cca

Please sign in to comment.