From f22d86b9cab0987bda682c1ea0400b1742384cbd Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Tue, 30 Apr 2024 20:13:16 +0400 Subject: [PATCH 01/10] Calculate checksum from local file if upload optimization succeeds --- api/python/quilt3/data_transfer.py | 132 +++++++++++++++++------------ api/python/quilt3/packages.py | 1 + 2 files changed, 79 insertions(+), 54 deletions(-) diff --git a/api/python/quilt3/data_transfer.py b/api/python/quilt3/data_transfer.py index 4973ced0810..50d97991868 100644 --- a/api/python/quilt3/data_transfer.py +++ b/api/python/quilt3/data_transfer.py @@ -530,43 +530,59 @@ def upload_part(i, start, end): ctx.run(upload_part, i, start, end) -def _upload_or_copy_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket: str, dest_path: str): +def _calculate_local_checksum(path: str, size: int): + chunksize = get_checksum_chunksize(size) + + part_hashes = [] + for start in range(0, size, chunksize): + end = min(start + chunksize, size) + part_hashes.append(_calculate_local_part_checksum(path, start, end - start)) + + return _make_checksum_from_parts(part_hashes) + + +def _reuse_remote_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket: str, dest_path: str): # Optimization: check if the remote file already exists and has the right ETag, # and skip the upload. - if size >= UPLOAD_ETAG_OPTIMIZATION_THRESHOLD: - try: - params = dict(Bucket=dest_bucket, Key=dest_path) - s3_client = ctx.s3_client_provider.find_correct_client(S3Api.HEAD_OBJECT, dest_bucket, params) - resp = s3_client.head_object(**params, ChecksumMode='ENABLED') - except ClientError: - # Destination doesn't exist, so fall through to the normal upload. - pass - except S3NoValidClientError: - # S3ClientProvider can't currently distinguish between a user that has PUT but not LIST permissions and a - # user that has no permissions. If we can't find a valid client, proceed to the upload stage anyway. - pass - else: - # Check the ETag. - dest_size = resp['ContentLength'] - dest_etag = resp['ETag'] - dest_version_id = resp.get('VersionId') - if size == dest_size and resp.get('ServerSideEncryption') != 'aws:kms': - src_etag = _calculate_etag(src_path) - if src_etag == dest_etag: - # Nothing more to do. We should not attempt to copy the object because - # that would cause the "copy object to itself" error. - # TODO: Check SHA256 before checking ETag? - s3_checksum = resp.get('ChecksumSHA256') - if s3_checksum is None: - checksum = None - elif '-' in s3_checksum: - checksum, _ = s3_checksum.split('-', 1) - else: - checksum = _simple_s3_to_quilt_checksum(s3_checksum) - ctx.progress(size) - ctx.done(PhysicalKey(dest_bucket, dest_path, dest_version_id), checksum) - return # Optimization succeeded. + if size < UPLOAD_ETAG_OPTIMIZATION_THRESHOLD: + return None + try: + params = dict(Bucket=dest_bucket, Key=dest_path) + s3_client = ctx.s3_client_provider.find_correct_client(S3Api.HEAD_OBJECT, dest_bucket, params) + resp = s3_client.head_object(**params, ChecksumMode='ENABLED') + except ClientError: + # Destination doesn't exist, so fall through to the normal upload. + pass + except S3NoValidClientError: + # S3ClientProvider can't currently distinguish between a user that has PUT but not LIST permissions and a + # user that has no permissions. If we can't find a valid client, proceed to the upload stage anyway. + pass + else: + dest_size = resp['ContentLength'] + if dest_size != size: + return None + # XXX: shouldn't we check part sizes? + s3_checksum = resp.get('ChecksumSHA256') + if s3_checksum is not None: + if '-' in s3_checksum: + checksum, _ = s3_checksum.split('-', 1) + else: + checksum = _simple_s3_to_quilt_checksum(s3_checksum) + if checksum == _calculate_local_checksum(src_path, size): + return resp.get('VersionId'), checksum + elif resp.get('ServerSideEncryption') != 'aws:kms' and resp['ETag'] == _calculate_etag(src_path): + return resp.get('VersionId'), _calculate_local_checksum(src_path, size) + + return None + +def _upload_or_reuse_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket: str, dest_path: str): + result = _reuse_remote_file(ctx, size, src_path, dest_bucket, dest_path) + if result is not None: + dest_version_id, checksum = result + ctx.progress(size) + ctx.done(PhysicalKey(dest_bucket, dest_path, dest_version_id), checksum) + return # Optimization succeeded. # If the optimization didn't happen, do the normal upload. _upload_file(ctx, size, src_path, dest_bucket, dest_path) @@ -648,7 +664,7 @@ def done_callback(value, checksum): else: if dest.version_id: raise ValueError("Cannot set VersionId on destination") - _upload_or_copy_file(ctx, size, src.path, dest.bucket, dest.path) + _upload_or_reuse_file(ctx, size, src.path, dest.bucket, dest.path) else: if dest.is_local(): _download_file(ctx, size, src.bucket, src.path, src.version_id, dest.path) @@ -970,6 +986,29 @@ def wrapper(*args, **kwargs): return wrapper +# XXX: name +def _calculate_local_part_checksum(src: str, offset: int, length: int, callback=None) -> bytes: + hash_obj = hashlib.sha256() + bytes_remaining = length + with open(src, "rb") as fd: + fd.seek(offset) + while bytes_remaining > 0: + chunk = fd.read(min(s3_transfer_config.io_chunksize, bytes_remaining)) + if not chunk: + # Should not happen, but let's not get stuck in an infinite loop. + raise QuiltException("Unexpected end of file") + hash_obj.update(chunk) + if callback is not None: + callback(len(chunk)) + bytes_remaining -= len(chunk) + + return hash_obj.digest() + + +def _make_checksum_from_parts(parts: List[bytes]) -> str: # XXX: name + return binascii.b2a_base64(hashlib.sha256(b"".join(parts)).digest(), newline=False).decode() + + @retry(stop=stop_after_attempt(MAX_FIX_HASH_RETRIES), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_result(lambda results: any(r is None or isinstance(r, Exception) for r in results)), @@ -990,21 +1029,10 @@ def _calculate_checksum_internal(src_list, sizes, results) -> List[bytes]: progress_update = with_lock(progress.update) def _process_url_part(src: PhysicalKey, offset: int, length: int): - hash_obj = hashlib.sha256() - if src.is_local(): - bytes_remaining = length - with open(src.path, 'rb') as fd: - fd.seek(offset) - while bytes_remaining > 0: - chunk = fd.read(min(s3_transfer_config.io_chunksize, bytes_remaining)) - if not chunk: - # Should not happen, but let's not get stuck in an infinite loop. - raise QuiltException("Unexpected end of file") - hash_obj.update(chunk) - progress_update(len(chunk)) - bytes_remaining -= len(chunk) + return _calculate_local_part_checksum(src.path, offset, length, progress_update) else: + hash_obj = hashlib.sha256() end = offset + length - 1 params = dict( Bucket=src.bucket, @@ -1026,7 +1054,7 @@ def _process_url_part(src: PhysicalKey, offset: int, length: int): except (ConnectionError, HTTPClientError, ReadTimeoutError) as ex: return ex - return hash_obj.digest() + return hash_obj.digest() futures: List[Tuple[int, List[Future]]] = [] @@ -1046,11 +1074,7 @@ def _process_url_part(src: PhysicalKey, offset: int, length: int): for idx, future_list in futures: future_results = [future.result() for future in future_list] exceptions = [ex for ex in future_results if isinstance(ex, Exception)] - if exceptions: - results[idx] = exceptions[0] - else: - hashes_hash = hashlib.sha256(b''.join(future_results)).digest() - results[idx] = binascii.b2a_base64(hashes_hash, newline=False).decode() + results[idx] = exceptions[0] if exceptions else _make_checksum_from_parts(future_results) finally: stopped = True for _, future_list in futures: diff --git a/api/python/quilt3/packages.py b/api/python/quilt3/packages.py index a9f5394b70d..2487a6ee08b 100644 --- a/api/python/quilt3/packages.py +++ b/api/python/quilt3/packages.py @@ -1541,6 +1541,7 @@ def check_hash_conficts(latest_hash): pkg._set(logical_key, new_entry) # Needed if the files already exist in S3, but were uploaded without ChecksumAlgorithm='SHA256'. + # XXX: do we need this now? pkg._fix_sha256() top_hash = pkg._calculate_top_hash(pkg._meta, pkg.walk()) From e19022bd5a9d8085472c7c0c84694466f16d2faa Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Wed, 1 May 2024 16:56:36 +0400 Subject: [PATCH 02/10] more work --- api/python/quilt3/data_transfer.py | 2 ++ api/python/quilt3/packages.py | 4 ---- api/python/tests/test_data_transfer.py | 5 ++++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/api/python/quilt3/data_transfer.py b/api/python/quilt3/data_transfer.py index 50d97991868..c6a03d741cc 100644 --- a/api/python/quilt3/data_transfer.py +++ b/api/python/quilt3/data_transfer.py @@ -562,6 +562,8 @@ def _reuse_remote_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket if dest_size != size: return None # XXX: shouldn't we check part sizes? + # XXX: we could check hashes of parts, to finish faster + # XXX: support other checksum algorithms? s3_checksum = resp.get('ChecksumSHA256') if s3_checksum is not None: if '-' in s3_checksum: diff --git a/api/python/quilt3/packages.py b/api/python/quilt3/packages.py index 2487a6ee08b..4a4affedd03 100644 --- a/api/python/quilt3/packages.py +++ b/api/python/quilt3/packages.py @@ -1540,10 +1540,6 @@ def check_hash_conficts(latest_hash): new_entry.hash = dict(type=SHA256_CHUNKED_HASH_NAME, value=checksum) pkg._set(logical_key, new_entry) - # Needed if the files already exist in S3, but were uploaded without ChecksumAlgorithm='SHA256'. - # XXX: do we need this now? - pkg._fix_sha256() - top_hash = pkg._calculate_top_hash(pkg._meta, pkg.walk()) if dedupe and top_hash == latest_hash: diff --git a/api/python/tests/test_data_transfer.py b/api/python/tests/test_data_transfer.py index 3c9549cb502..8b6d65511e5 100644 --- a/api/python/tests/test_data_transfer.py +++ b/api/python/tests/test_data_transfer.py @@ -280,7 +280,10 @@ def test_upload_large_file_etag_match(self): urls = data_transfer.copy_file_list([ (PhysicalKey.from_path(path), PhysicalKey.from_url('s3://example/large_file.npy'), path.stat().st_size), ]) - assert urls[0] == (PhysicalKey.from_url('s3://example/large_file.npy?versionId=v1'), None) + assert urls[0] == ( + PhysicalKey.from_url('s3://example/large_file.npy?versionId=v1'), + "IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=", + ) def test_upload_large_file_etag_mismatch(self): path = DATA_DIR / 'large_file.npy' From 57030ac843429d679e3ad1902aa2e14dbd21ad40 Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Wed, 1 May 2024 18:32:37 +0400 Subject: [PATCH 03/10] changelog --- docs/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e3d2fae8745..c6ba19ac9af 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -18,6 +18,8 @@ Entries inside each section should be ordered by type: ## Python API * [Removed] Drop Python 3.8 support ([#3993](https://github.com/quiltdata/quilt/pull/3993)) +* [Fixed] If upload optimization during `push()` succeeds the checksum is calculated from local file instead of remote file ([#3968](https://github.com/quiltdata/quilt/pull/3968)) +* [Changed] Upload optimization check now tries to use S3 SHA-256 checksum and fallbacks to ETag ([#3968](https://github.com/quiltdata/quilt/pull/3968)) ## CLI From 1a2b177c00e643baed0f47d1675add6ae7e5d6ed Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Wed, 1 May 2024 18:43:37 +0400 Subject: [PATCH 04/10] blackify --- api/python/quilt3/data_transfer.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/api/python/quilt3/data_transfer.py b/api/python/quilt3/data_transfer.py index c6a03d741cc..4093d1db2cf 100644 --- a/api/python/quilt3/data_transfer.py +++ b/api/python/quilt3/data_transfer.py @@ -549,7 +549,7 @@ def _reuse_remote_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket try: params = dict(Bucket=dest_bucket, Key=dest_path) s3_client = ctx.s3_client_provider.find_correct_client(S3Api.HEAD_OBJECT, dest_bucket, params) - resp = s3_client.head_object(**params, ChecksumMode='ENABLED') + resp = s3_client.head_object(**params, ChecksumMode="ENABLED") except ClientError: # Destination doesn't exist, so fall through to the normal upload. pass @@ -558,22 +558,22 @@ def _reuse_remote_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket # user that has no permissions. If we can't find a valid client, proceed to the upload stage anyway. pass else: - dest_size = resp['ContentLength'] + dest_size = resp["ContentLength"] if dest_size != size: return None # XXX: shouldn't we check part sizes? # XXX: we could check hashes of parts, to finish faster # XXX: support other checksum algorithms? - s3_checksum = resp.get('ChecksumSHA256') + s3_checksum = resp.get("ChecksumSHA256") if s3_checksum is not None: - if '-' in s3_checksum: - checksum, _ = s3_checksum.split('-', 1) + if "-" in s3_checksum: + checksum, _ = s3_checksum.split("-", 1) else: checksum = _simple_s3_to_quilt_checksum(s3_checksum) if checksum == _calculate_local_checksum(src_path, size): - return resp.get('VersionId'), checksum - elif resp.get('ServerSideEncryption') != 'aws:kms' and resp['ETag'] == _calculate_etag(src_path): - return resp.get('VersionId'), _calculate_local_checksum(src_path, size) + return resp.get("VersionId"), checksum + elif resp.get("ServerSideEncryption") != "aws:kms" and resp["ETag"] == _calculate_etag(src_path): + return resp.get("VersionId"), _calculate_local_checksum(src_path, size) return None From dcbc7cce02836296072fd608f413f28cef70b09d Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Wed, 1 May 2024 19:19:18 +0400 Subject: [PATCH 05/10] remove some XXX comments --- api/python/quilt3/data_transfer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api/python/quilt3/data_transfer.py b/api/python/quilt3/data_transfer.py index 4093d1db2cf..7ec57f1e93e 100644 --- a/api/python/quilt3/data_transfer.py +++ b/api/python/quilt3/data_transfer.py @@ -988,7 +988,6 @@ def wrapper(*args, **kwargs): return wrapper -# XXX: name def _calculate_local_part_checksum(src: str, offset: int, length: int, callback=None) -> bytes: hash_obj = hashlib.sha256() bytes_remaining = length @@ -1007,7 +1006,7 @@ def _calculate_local_part_checksum(src: str, offset: int, length: int, callback= return hash_obj.digest() -def _make_checksum_from_parts(parts: List[bytes]) -> str: # XXX: name +def _make_checksum_from_parts(parts: List[bytes]) -> str: return binascii.b2a_base64(hashlib.sha256(b"".join(parts)).digest(), newline=False).decode() From ce905ff67e6b24718488a9743cb87109761f4c61 Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Thu, 2 May 2024 11:23:26 +0400 Subject: [PATCH 06/10] add some tests --- api/python/tests/test_data_transfer.py | 52 ++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/api/python/tests/test_data_transfer.py b/api/python/tests/test_data_transfer.py index 8b6d65511e5..1b862d4ab04 100644 --- a/api/python/tests/test_data_transfer.py +++ b/api/python/tests/test_data_transfer.py @@ -325,6 +325,58 @@ def test_upload_large_file_etag_mismatch(self): 'Ij4KFgr52goD5t0sRxnFb11mpjPL6E54qqnzc1hlUio=', ) + def test_upload_file_checksum_match(self): + path = DATA_DIR / 'large_file.npy' + + self.s3_stubber.add_response( + method='head_object', + service_response={ + 'ContentLength': path.stat().st_size, + 'ETag': data_transfer._calculate_etag(path), + 'VersionId': 'v1', + 'ChecksumSHA256': 'J+KTXLmOXrP7AmRZQQZWSj6DznTh7TbeeP6YbL1j+5w=', + }, + expected_params={ + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'ChecksumMode': 'ENABLED', + } + ) + + urls = data_transfer.copy_file_list([ + (PhysicalKey.from_path(path), PhysicalKey.from_url('s3://example/large_file.npy'), path.stat().st_size), + ]) + assert urls[0] == ( + PhysicalKey.from_url('s3://example/large_file.npy?versionId=v1'), + "IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=", + ) + + def test_upload_file_checksum_multipart_match(self): + path = DATA_DIR / 'large_file.npy' + + self.s3_stubber.add_response( + method='head_object', + service_response={ + 'ContentLength': path.stat().st_size, + 'ETag': data_transfer._calculate_etag(path), + 'VersionId': 'v1', + 'ChecksumSHA256': 'IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=-1', + }, + expected_params={ + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'ChecksumMode': 'ENABLED', + } + ) + + urls = data_transfer.copy_file_list([ + (PhysicalKey.from_path(path), PhysicalKey.from_url('s3://example/large_file.npy'), path.stat().st_size), + ]) + assert urls[0] == ( + PhysicalKey.from_url('s3://example/large_file.npy?versionId=v1'), + "IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=", + ) + def test_multipart_upload(self): name = 'very_large_file.bin' path = pathlib.Path(name) From d11e4030c8345d31c6ceffc30520c27a3d50e9dd Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Thu, 2 May 2024 11:46:10 +0400 Subject: [PATCH 07/10] one more test --- api/python/tests/test_data_transfer.py | 45 ++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/api/python/tests/test_data_transfer.py b/api/python/tests/test_data_transfer.py index 1b862d4ab04..e979575698d 100644 --- a/api/python/tests/test_data_transfer.py +++ b/api/python/tests/test_data_transfer.py @@ -332,7 +332,7 @@ def test_upload_file_checksum_match(self): method='head_object', service_response={ 'ContentLength': path.stat().st_size, - 'ETag': data_transfer._calculate_etag(path), + 'ETag': '"123"', 'VersionId': 'v1', 'ChecksumSHA256': 'J+KTXLmOXrP7AmRZQQZWSj6DznTh7TbeeP6YbL1j+5w=', }, @@ -358,7 +358,7 @@ def test_upload_file_checksum_multipart_match(self): method='head_object', service_response={ 'ContentLength': path.stat().st_size, - 'ETag': data_transfer._calculate_etag(path), + 'ETag': '"123"', 'VersionId': 'v1', 'ChecksumSHA256': 'IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=-1', }, @@ -377,6 +377,47 @@ def test_upload_file_checksum_multipart_match(self): "IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=", ) + def test_upload_file_size_mismatch(self): + path = DATA_DIR / 'large_file.npy' + + self.s3_stubber.add_response( + method='head_object', + service_response={ + 'ContentLength': path.stat().st_size + 1, + 'ETag': data_transfer._calculate_etag(path), + 'VersionId': 'v1', + 'ChecksumSHA256': 'IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=-1', + }, + expected_params={ + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'ChecksumMode': 'ENABLED', + } + ) + + self.s3_stubber.add_response( + method='put_object', + service_response={ + 'VersionId': 'v2', + # b2a_base64(a2b_hex(b'0123456789abcdef0123456789abcdef')) + 'ChecksumSHA256': 'ASNFZ4mrze8BI0VniavN7w==', + }, + expected_params={ + 'Body': ANY, + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'ChecksumAlgorithm': 'SHA256', + } + ) + + urls = data_transfer.copy_file_list([ + (PhysicalKey.from_path(path), PhysicalKey.from_url('s3://example/large_file.npy'), path.stat().st_size), + ]) + assert urls[0] == ( + PhysicalKey.from_url('s3://example/large_file.npy?versionId=v2'), + "Ij4KFgr52goD5t0sRxnFb11mpjPL6E54qqnzc1hlUio=", + ) + def test_multipart_upload(self): name = 'very_large_file.bin' path = pathlib.Path(name) From 5a6da4b8ccfe05b7f2a31f682743e89452a86b54 Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Mon, 13 May 2024 14:32:32 +0400 Subject: [PATCH 08/10] add _fix_sha256() back --- api/python/quilt3/packages.py | 3 +++ api/python/tests/integration/test_packages.py | 6 ++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/api/python/quilt3/packages.py b/api/python/quilt3/packages.py index 4a4affedd03..342e59967bc 100644 --- a/api/python/quilt3/packages.py +++ b/api/python/quilt3/packages.py @@ -1540,6 +1540,9 @@ def check_hash_conficts(latest_hash): new_entry.hash = dict(type=SHA256_CHUNKED_HASH_NAME, value=checksum) pkg._set(logical_key, new_entry) + # Some entries may miss hash values (e.g because of selector_fn), so we need + # to fix them before calculating the top hash. + pkg._fix_sha256() top_hash = pkg._calculate_top_hash(pkg._meta, pkg.walk()) if dedupe and top_hash == latest_hash: diff --git a/api/python/tests/integration/test_packages.py b/api/python/tests/integration/test_packages.py index 84bcb1fd1a2..bcab2f2bab8 100644 --- a/api/python/tests/integration/test_packages.py +++ b/api/python/tests/integration/test_packages.py @@ -1914,10 +1914,11 @@ def test_push_selector_fn_false(self): selector_fn = mock.MagicMock(return_value=False) push_manifest_mock = self.patch_s3_registry('push_manifest') self.patch_s3_registry('shorten_top_hash', return_value='7a67ff4') - with patch('quilt3.packages.calculate_checksum', return_value=[('SHA256', "a" * 64)]): + with patch('quilt3.packages.calculate_checksum', return_value=["a" * 64]) as calculate_checksum_mock: pkg.push(pkg_name, registry=f's3://{dst_bucket}', selector_fn=selector_fn, force=True) selector_fn.assert_called_once_with(lk, pkg[lk]) + calculate_checksum_mock.assert_called_once_with([PhysicalKey(src_bucket, src_key, src_version)], [0]) push_manifest_mock.assert_called_once_with(pkg_name, mock.sentinel.top_hash, ANY) assert Package.load( BytesIO(push_manifest_mock.call_args[0][2]) @@ -1960,10 +1961,11 @@ def test_push_selector_fn_true(self): ) push_manifest_mock = self.patch_s3_registry('push_manifest') self.patch_s3_registry('shorten_top_hash', return_value='7a67ff4') - with patch('quilt3.packages.calculate_checksum', return_value=["a" * 64]): + with patch('quilt3.packages.calculate_checksum', return_value=[]) as calculate_checksum_mock: pkg.push(pkg_name, registry=f's3://{dst_bucket}', selector_fn=selector_fn, force=True) selector_fn.assert_called_once_with(lk, pkg[lk]) + calculate_checksum_mock.assert_called_once_with([], []) push_manifest_mock.assert_called_once_with(pkg_name, mock.sentinel.top_hash, ANY) assert Package.load( BytesIO(push_manifest_mock.call_args[0][2]) From fabb939ed669e7730f5061d34e60adf04d77b565 Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Tue, 14 May 2024 11:38:42 +0400 Subject: [PATCH 09/10] Update docs/CHANGELOG.md Co-authored-by: Alexei Mochalov --- docs/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c6ba19ac9af..82528906f1d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,7 +19,7 @@ Entries inside each section should be ordered by type: * [Removed] Drop Python 3.8 support ([#3993](https://github.com/quiltdata/quilt/pull/3993)) * [Fixed] If upload optimization during `push()` succeeds the checksum is calculated from local file instead of remote file ([#3968](https://github.com/quiltdata/quilt/pull/3968)) -* [Changed] Upload optimization check now tries to use S3 SHA-256 checksum and fallbacks to ETag ([#3968](https://github.com/quiltdata/quilt/pull/3968)) +* [Changed] Upload optimization check now tries to use S3 SHA-256 checksum and falls back to ETag ([#3968](https://github.com/quiltdata/quilt/pull/3968)) ## CLI From b40d45dda73b2ea06b865ffd1a64238fc9a47189 Mon Sep 17 00:00:00 2001 From: Sergey Fedoseev Date: Thu, 13 Jun 2024 16:32:30 +0400 Subject: [PATCH 10/10] check number of parts --- api/python/quilt3/data_transfer.py | 21 +++-- api/python/tests/test_data_transfer.py | 123 ++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 10 deletions(-) diff --git a/api/python/quilt3/data_transfer.py b/api/python/quilt3/data_transfer.py index 7ec57f1e93e..f31c28cc93f 100644 --- a/api/python/quilt3/data_transfer.py +++ b/api/python/quilt3/data_transfer.py @@ -268,6 +268,10 @@ def get_checksum_chunksize(file_size: int) -> int: return chunksize +def is_mpu(file_size: int) -> bool: + return file_size >= CHECKSUM_MULTIPART_THRESHOLD + + _EMPTY_STRING_SHA256 = hashlib.sha256(b'').digest() @@ -303,7 +307,7 @@ def _copy_local_file(ctx: WorkerContext, size: int, src_path: str, dest_path: st def _upload_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket: str, dest_key: str): s3_client = ctx.s3_client_provider.standard_client - if size < CHECKSUM_MULTIPART_THRESHOLD: + if not is_mpu(size): with ReadFileChunk.from_filename(src_path, 0, size, [ctx.progress]) as fd: resp = s3_client.put_object( Body=fd, @@ -460,7 +464,7 @@ def _copy_remote_file(ctx: WorkerContext, size: int, src_bucket: str, src_key: s s3_client = ctx.s3_client_provider.standard_client - if size < CHECKSUM_MULTIPART_THRESHOLD: + if not is_mpu(size): params: Dict[str, Any] = dict( CopySource=src_params, Bucket=dest_bucket, @@ -561,16 +565,17 @@ def _reuse_remote_file(ctx: WorkerContext, size: int, src_path: str, dest_bucket dest_size = resp["ContentLength"] if dest_size != size: return None - # XXX: shouldn't we check part sizes? - # XXX: we could check hashes of parts, to finish faster - # XXX: support other checksum algorithms? + # TODO: we could check hashes of parts, to finish faster s3_checksum = resp.get("ChecksumSHA256") if s3_checksum is not None: if "-" in s3_checksum: - checksum, _ = s3_checksum.split("-", 1) + checksum, num_parts_str = s3_checksum.split("-", 1) + num_parts = int(num_parts_str) else: checksum = _simple_s3_to_quilt_checksum(s3_checksum) - if checksum == _calculate_local_checksum(src_path, size): + num_parts = None + expected_num_parts = math.ceil(size / get_checksum_chunksize(size)) if is_mpu(size) else None + if num_parts == expected_num_parts and checksum == _calculate_local_checksum(src_path, size): return resp.get("VersionId"), checksum elif resp.get("ServerSideEncryption") != "aws:kms" and resp["ETag"] == _calculate_etag(src_path): return resp.get("VersionId"), _calculate_local_checksum(src_path, size) @@ -719,7 +724,7 @@ def _calculate_etag(file_path): """ size = pathlib.Path(file_path).stat().st_size with open(file_path, 'rb') as fd: - if size < CHECKSUM_MULTIPART_THRESHOLD: + if not is_mpu(size): contents = fd.read() etag = hashlib.md5(contents).hexdigest() else: diff --git a/api/python/tests/test_data_transfer.py b/api/python/tests/test_data_transfer.py index e979575698d..05a71597a98 100644 --- a/api/python/tests/test_data_transfer.py +++ b/api/python/tests/test_data_transfer.py @@ -327,6 +327,7 @@ def test_upload_large_file_etag_mismatch(self): def test_upload_file_checksum_match(self): path = DATA_DIR / 'large_file.npy' + assert path.stat().st_size < data_transfer.CHECKSUM_MULTIPART_THRESHOLD self.s3_stubber.add_response( method='head_object', @@ -351,8 +352,9 @@ def test_upload_file_checksum_match(self): "IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=", ) - def test_upload_file_checksum_multipart_match(self): + def test_upload_file_checksum_match_unexpected_parts(self): path = DATA_DIR / 'large_file.npy' + assert path.stat().st_size < data_transfer.CHECKSUM_MULTIPART_THRESHOLD self.s3_stubber.add_response( method='head_object', @@ -369,12 +371,129 @@ def test_upload_file_checksum_multipart_match(self): } ) + self.s3_stubber.add_response( + method='put_object', + service_response={ + 'VersionId': 'v2', + # b2a_base64(a2b_hex(b'0123456789abcdef0123456789abcdef')) + 'ChecksumSHA256': 'ASNFZ4mrze8BI0VniavN7w==', + }, + expected_params={ + 'Body': ANY, + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'ChecksumAlgorithm': 'SHA256', + } + ) + + urls = data_transfer.copy_file_list([ + (PhysicalKey.from_path(path), PhysicalKey.from_url('s3://example/large_file.npy'), path.stat().st_size), + ]) + assert urls[0] == ( + PhysicalKey.from_url('s3://example/large_file.npy?versionId=v2'), + "Ij4KFgr52goD5t0sRxnFb11mpjPL6E54qqnzc1hlUio=", + ) + + def test_upload_file_checksum_multipart_match(self): + path = pathlib.Path("test-file") + path.write_bytes(bytes(data_transfer.CHECKSUM_MULTIPART_THRESHOLD)) + + self.s3_stubber.add_response( + method='head_object', + service_response={ + 'ContentLength': path.stat().st_size, + 'ETag': '"123"', + 'VersionId': 'v1', + 'ChecksumSHA256': 'MIsGKY+ykqN4CPj3gGGu4Gv03N7OWKWpsZqEf+OrGJs=-1', + }, + expected_params={ + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'ChecksumMode': 'ENABLED', + } + ) + urls = data_transfer.copy_file_list([ (PhysicalKey.from_path(path), PhysicalKey.from_url('s3://example/large_file.npy'), path.stat().st_size), ]) assert urls[0] == ( PhysicalKey.from_url('s3://example/large_file.npy?versionId=v1'), - "IsygGcHBbQgZ3DCzdPy9+0od5VqDJjcW4R0mF2v/Bu8=", + "MIsGKY+ykqN4CPj3gGGu4Gv03N7OWKWpsZqEf+OrGJs=", + ) + + def test_upload_file_checksum_multipart_match_unexpected_parts(self): + path = pathlib.Path("test-file") + path.write_bytes(bytes(data_transfer.CHECKSUM_MULTIPART_THRESHOLD)) + + self.s3_stubber.add_response( + method='head_object', + service_response={ + 'ContentLength': path.stat().st_size, + 'ETag': '"123"', + 'VersionId': 'v1', + 'ChecksumSHA256': 'La6x82CVtEsxhBCz9Oi12Yncx7sCPRQmxJLasKMFPnQ=', + }, + expected_params={ + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'ChecksumMode': 'ENABLED', + } + ) + + self.s3_stubber.add_response( + method='create_multipart_upload', + service_response={ + 'UploadId': '123' + }, + expected_params={ + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'ChecksumAlgorithm': 'SHA256', + } + ) + self.s3_stubber.add_response( + method='upload_part', + service_response={ + 'ETag': '"123"', + 'ChecksumSHA256': 'La6x82CVtEsxhBCz9Oi12Yncx7sCPRQmxJLasKMFPnQ=', + }, + expected_params={ + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'UploadId': '123', + 'Body': ANY, + 'PartNumber': 1, + 'ChecksumAlgorithm': 'SHA256', + } + ) + self.s3_stubber.add_response( + method='complete_multipart_upload', + service_response={ + 'ChecksumSHA256': "MIsGKY+ykqN4CPj3gGGu4Gv03N7OWKWpsZqEf+OrGJs=-1", + 'VersionId': 'v1', + }, + expected_params={ + 'Bucket': 'example', + 'Key': 'large_file.npy', + 'UploadId': '123', + 'MultipartUpload': { + 'Parts': [ + { + 'ETag': '"123"', + 'ChecksumSHA256': 'La6x82CVtEsxhBCz9Oi12Yncx7sCPRQmxJLasKMFPnQ=', + 'PartNumber': 1, + }, + ] + } + } + ) + + urls = data_transfer.copy_file_list([ + (PhysicalKey.from_path(path), PhysicalKey.from_url('s3://example/large_file.npy'), path.stat().st_size), + ]) + assert urls[0] == ( + PhysicalKey.from_url('s3://example/large_file.npy?versionId=v1'), + "MIsGKY+ykqN4CPj3gGGu4Gv03N7OWKWpsZqEf+OrGJs=", ) def test_upload_file_size_mismatch(self):