Skip to content

Commit

Permalink
feat: add crc32c_checksum argument to download_chunks_concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsg committed Oct 6, 2023
1 parent 6c64c6d commit 970f84d
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 32 deletions.
13 changes: 13 additions & 0 deletions google/cloud/storage/crashtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from google.oauth2 import service_account
import pickle

credentials = service_account.Credentials.from_service_account_file(
"/Users/gorcester/credentials/andrewsg-joonix-8e5ff7102a7a.json"
)

from google.cloud.storage import Client, transfer_manager

cl = Client(credentials=credentials)
bu = cl.bucket("andrewsg-test")
bl = bu.blob("hangtest0")
transfer_manager.download_chunks_concurrently(bl, "dest")
87 changes: 59 additions & 28 deletions google/cloud/storage/transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024
DEFAULT_MAX_WORKERS = 8
MAX_CRC32C_ZERO_ARRAY_SIZE = 4 * 1024 * 1024
METADATA_HEADER_TRANSLATION = {
"cacheControl": "Cache-Control",
"contentDisposition": "Content-Disposition",
Expand Down Expand Up @@ -787,9 +788,9 @@ def download_chunks_concurrently(
passed into the download methods and is not validated by this function.
Keyword arguments "start" and "end" which are not supported and will
cause a ValueError if present. The argument "checksum" is also not
supported in download_kwargs, but see "crc32c_checksum" (which does not
go in download_kwargs) below.
cause a ValueError if present. The key "checksum" is also not supported
in download_kwargs, but see the argument "crc32c_checksum" (which does
not go in download_kwargs) below.
:type deadline: int
:param deadline:
Expand Down Expand Up @@ -832,8 +833,8 @@ def download_chunks_concurrently(
:type crc32c_checksum: bool
:param crc32c_checksum:
Whether to compute a checksum for the resulting object, using the crc32c
algorithm. The checksums for each chunk must be combined using a feature
of crc32c that is not available for md5, so md5 is not supported.
algorithm. As the checksums for each chunk must be combined using a
feature of crc32c that is not available for md5, md5 is not supported.
:raises:
:exc:`concurrent.futures.TimeoutError` if deadline is exceeded.
Expand All @@ -853,7 +854,7 @@ def download_chunks_concurrently(
)
if "checksum" in download_kwargs:
raise ValueError(
"'checksum' is in download_kwargs, but is not supported because sliced downloads have a different checksum mechanism from regular downloads. Use the 'crc32c' argument on download_chunks_concurrently instead."
"'checksum' is in download_kwargs, but is not supported because sliced downloads have a different checksum mechanism from regular downloads. Use the 'crc32c_checksum' argument on download_chunks_concurrently instead."
)

download_kwargs["command"] = "tm.download_sharded"
Expand Down Expand Up @@ -886,7 +887,7 @@ def download_chunks_concurrently(
start=start,
end=cursor - 1,
download_kwargs=download_kwargs,
crc32c_checksum=crc32c_checksum
crc32c_checksum=crc32c_checksum,
)
)

Expand All @@ -895,35 +896,33 @@ def download_chunks_concurrently(
)

# Raise any exceptions; combine checksums.
crc1 = None
zeroes = bytes(0)
results = []
for future in futures:
crc2, crc2_size = future.result()
if crc2 and not crc1:
crc1 = crc2
elif crc1 and crc2:
crc1 ^= 0xffffffff # precondition
if len(zeroes) != crc2_size:
zeroes = bytes(crc2_size)
crc1 = google_crc32c.extend(crc1, zeroes)
crc1 ^= 0xffffffff # postcondition
crc1 ^= crc2
results.append(future.result())

if crc32c_checksum:
crc_digest = struct.pack(">L", crc1) # https://cloud.google.com/storage/docs/json_api/v1/objects#crc32c
crc_digest = _digest_ordered_checksum_and_size_pairs(results)
actual_checksum = base64.b64encode(crc_digest).decode("utf-8")
expected_checksum = blob.crc32c
if actual_checksum != expected_checksum:
# For consistency with other download methods we will use
# "google.resumable_media.common.DataCorruption" despite the error not
# originating inside google.resumable_media.
# "google.resumable_media.common.DataCorruption" despite the error
# not originating inside google.resumable_media.
download_url = blob._get_download_url(
client,
if_generation_match=download_kwargs.get("if_generation_match"),
if_generation_not_match=download_kwargs.get("if_generation_not_match"),
if_metageneration_match=download_kwargs.get("if_metageneration_match"),
if_metageneration_not_match=download_kwargs.get("if_metageneration_not_match"),
if_metageneration_not_match=download_kwargs.get(
"if_metageneration_not_match"
),
)
raise DataCorruption(
None,
DOWNLOAD_CRC32C_MISMATCH_TEMPLATE.format(
download_url, expected_checksum, actual_checksum
),
)
raise DataCorruption(None, DOWNLOAD_CRC32C_MISMATCH_TEMPLATE.format(download_url, expected_checksum, actual_checksum))
return None


Expand Down Expand Up @@ -1200,18 +1199,19 @@ def _download_and_write_chunk_in_place(

with _ChecksummingSparseFileWrapper(filename, start, crc32c_checksum) as f:
blob._prep_and_do_download(f, start=start, end=end, **download_kwargs)
return (f.crc, (end-start)+1)
return (f.crc, (end - start) + 1)


class _ChecksummingSparseFileWrapper():
class _ChecksummingSparseFileWrapper:
"""A file wrapper that writes to a sparse file and optionally checksums.
This wrapper only implements write() and does not inherit from `io` module
base classes.
"""

def __init__(self, filename, start_position, crc32c_enabled):
# Open in mixed read/write mode to avoid truncating or appending
self.f = open(filename, 'rb+')
self.f = open(filename, "rb+")
self.f.seek(start_position)
self._crc = None
self._crc32c_enabled = crc32c_enabled
Expand All @@ -1235,7 +1235,6 @@ def __exit__(self, exc_type, exc_value, tb):
self.f.close()



def _call_method_on_maybe_pickled_blob(
maybe_pickled_blob, method_name, *args, **kwargs
):
Expand Down Expand Up @@ -1304,6 +1303,38 @@ def _get_pool_class_and_requirements(worker_type):
)


def _digest_ordered_checksum_and_size_pairs(checksum_and_size_pairs):
base_crc = None
zeroes = bytes(0)
for part_crc, size in checksum_and_size_pairs:
if not base_crc:
base_crc = part_crc
else:
base_crc ^= 0xFFFFFFFF # precondition

# Zero pad base_crc32c. To conserve memory, do so with only
# MAX_CRC32C_ZERO_ARRAY_SIZE at a time. Reuse the zeroes array where
# possible.
padded = 0
while padded < size:
desired_zeroes_size = min((size - padded), MAX_CRC32C_ZERO_ARRAY_SIZE)
# resize zeroes array, unless we already have the correct size.
zeroes = (
bytes(desired_zeroes_size)
if len(zeroes) != desired_zeroes_size
else zeroes
)
base_crc = google_crc32c.extend(base_crc, zeroes)
padded += desired_zeroes_size

base_crc ^= 0xFFFFFFFF # postcondition
base_crc ^= part_crc
crc_digest = struct.pack(
">L", base_crc
) # https://cloud.google.com/storage/docs/json_api/v1/objects#crc32c
return crc_digest


class _LazyClient:
"""An object that will transform into either a cached or a new Client"""

Expand Down
66 changes: 62 additions & 4 deletions tests/unit/test_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,15 +546,14 @@ def test_download_chunks_concurrently():
expected_download_kwargs = EXPECTED_DOWNLOAD_KWARGS.copy()
expected_download_kwargs["command"] = "tm.download_sharded"

blob_mock._handle_filename_and_download.return_value = FAKE_RESULT

with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()):
result = transfer_manager.download_chunks_concurrently(
blob_mock,
FILENAME,
chunk_size=CHUNK_SIZE,
download_kwargs=DOWNLOAD_KWARGS,
worker_type=transfer_manager.THREAD,
crc32c_checksum=False,
)
for x in range(MULTIPLE):
blob_mock._prep_and_do_download.assert_any_call(
Expand All @@ -567,6 +566,34 @@ def test_download_chunks_concurrently():
assert result is None


def test_download_chunks_concurrently_with_crc32c():
blob_mock = mock.Mock(spec=Blob)
FILENAME = "file_a.txt"
MULTIPLE = 4
BLOB_CHUNK = b"abcdefgh"
BLOB_CONTENTS = BLOB_CHUNK * MULTIPLE
blob_mock.size = len(BLOB_CONTENTS)
blob_mock.crc32c = "eOVVVw=="

expected_download_kwargs = EXPECTED_DOWNLOAD_KWARGS.copy()
expected_download_kwargs["command"] = "tm.download_sharded"

def write_to_file(f, *args, **kwargs):
f.write(BLOB_CHUNK)

blob_mock._prep_and_do_download.side_effect = write_to_file

with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()):
transfer_manager.download_chunks_concurrently(
blob_mock,
FILENAME,
chunk_size=CHUNK_SIZE,
download_kwargs=DOWNLOAD_KWARGS,
worker_type=transfer_manager.THREAD,
crc32c_checksum=True,
)


def test_download_chunks_concurrently_raises_on_start_and_end():
blob_mock = mock.Mock(spec=Blob)
FILENAME = "file_a.txt"
Expand Down Expand Up @@ -616,6 +643,7 @@ def test_download_chunks_concurrently_passes_concurrency_options():
deadline=DEADLINE,
worker_type=transfer_manager.THREAD,
max_workers=MAX_WORKERS,
crc32c_checksum=False,
)
pool_patch.assert_called_with(max_workers=MAX_WORKERS)
wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY)
Expand Down Expand Up @@ -819,6 +847,7 @@ def __init__(
self.generation = generation
self._size_after_reload = size_after_reload
self._generation_after_reload = generation_after_reload
self.client = _PickleableMockClient()

def reload(self):
self.size = self._size_after_reload
Expand Down Expand Up @@ -876,6 +905,7 @@ def test_download_chunks_concurrently_with_processes():
chunk_size=CHUNK_SIZE,
download_kwargs=DOWNLOAD_KWARGS,
worker_type=transfer_manager.PROCESS,
crc32c_checksum=False,
)
assert result is None

Expand Down Expand Up @@ -907,9 +937,9 @@ def test__download_and_write_chunk_in_place():
FILENAME = "file_a.txt"
with mock.patch("google.cloud.storage.transfer_manager.open", mock.mock_open()):
result = transfer_manager._download_and_write_chunk_in_place(
pickled_mock, FILENAME, 0, 8, {}
pickled_mock, FILENAME, 0, 8, {}, False
)
assert result == "SUCCESS"
assert result is not None


def test__upload_part():
Expand Down Expand Up @@ -973,3 +1003,31 @@ def test__call_method_on_maybe_pickled_blob():
pickled_blob, "_prep_and_do_download"
)
assert result == "SUCCESS"


def test__ChecksummingSparseFileWrapper():
FILENAME = "file_a.txt"
import google_crc32c

with mock.patch(
"google.cloud.storage.transfer_manager.open", mock.mock_open()
) as open_mock:
# test no checksumming
wrapper = transfer_manager._ChecksummingSparseFileWrapper(FILENAME, 0, False)
wrapper.write(b"abcdefgh")
handle = open_mock()
handle.write.assert_called_with(b"abcdefgh")
wrapper.write(b"ijklmnop")
assert wrapper.crc is None
handle.write.assert_called_with(b"ijklmnop")

with mock.patch(
"google.cloud.storage.transfer_manager.open", mock.mock_open()
) as open_mock:
wrapper = transfer_manager._ChecksummingSparseFileWrapper(FILENAME, 0, True)
wrapper.write(b"abcdefgh")
handle = open_mock()
handle.write.assert_called_with(b"abcdefgh")
wrapper.write(b"ijklmnop")
assert wrapper.crc == google_crc32c.value(b"abcdefghijklmnop")
handle.write.assert_called_with(b"ijklmnop")

0 comments on commit 970f84d

Please sign in to comment.