Skip to content

Commit

Permalink
Merge branch 'release-0.11.0'
Browse files Browse the repository at this point in the history
* release-0.11.0:
  Bumping version to 0.11.0
  Merge customizations for S3
  • Loading branch information
aws-sdk-python-automation committed Jan 15, 2025
2 parents 0323658 + aef3dfa commit 0778911
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 54 deletions.
7 changes: 7 additions & 0 deletions .changes/0.11.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
{
"category": "manager",
"description": "Use CRC32 by default and support user provided full-object checksums.",
"type": "feature"
}
]
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
CHANGELOG
=========

0.11.0
======

* feature:manager: Use CRC32 by default and support user provided full-object checksums.


0.10.4
======

Expand Down
12 changes: 11 additions & 1 deletion s3transfer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def __call__(self, bytes_amount):
from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError

__author__ = 'Amazon Web Services'
__version__ = '0.10.4'
__version__ = '0.11.0'


class NullHandler(logging.Handler):
Expand Down Expand Up @@ -717,13 +717,23 @@ class S3Transfer:

def __init__(self, client, config=None, osutil=None):
self._client = client
self._client.meta.events.register(
'before-call.s3.*', self._update_checksum_context
)
if config is None:
config = TransferConfig()
self._config = config
if osutil is None:
osutil = OSUtils()
self._osutil = osutil

def _update_checksum_context(self, params, **kwargs):
request_context = params.get("context", {})
checksum_context = request_context.get("checksum", {})
if "request_algorithm" in checksum_context:
# Force request checksum algorithm in the header if specified.
checksum_context["request_algorithm"]["in"] = "header"

def upload_file(
self, filename, bucket, key, callback=None, extra_args=None
):
Expand Down
8 changes: 8 additions & 0 deletions s3transfer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,13 @@
'ExpectedBucketOwner',
]

FULL_OBJECT_CHECKSUM_ARGS = [
'ChecksumCRC32',
'ChecksumCRC32C',
'ChecksumCRC64NVME',
'ChecksumSHA1',
'ChecksumSHA256',
]

USER_AGENT = f's3transfer/{s3transfer.__version__}'
PROCESS_USER_AGENT = f'{USER_AGENT} processpool'
29 changes: 21 additions & 8 deletions s3transfer/crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from botocore.exceptions import NoCredentialsError
from botocore.utils import ArnParser, InvalidArnException

from s3transfer.constants import MB
from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS, MB
from s3transfer.exceptions import TransferNotDoneError
from s3transfer.futures import BaseTransferFuture, BaseTransferMeta
from s3transfer.manager import TransferManager
Expand Down Expand Up @@ -491,6 +491,9 @@ def __init__(self, session, client_kwargs=None):
self._client.meta.events.register(
'before-send.s3.*', self._make_fake_http_response
)
self._client.meta.events.register(
'before-call.s3.*', self._remove_checksum_context
)

def _resolve_client_config(self, session, client_kwargs):
user_provided_config = None
Expand Down Expand Up @@ -620,6 +623,11 @@ def _translate_crt_s3_response_error(self, s3_response_error):
error_class = self._client.exceptions.from_code(error_code)
return error_class(parsed_response, operation_name=operation_name)

def _remove_checksum_context(self, params, **kwargs):
request_context = params.get("context", {})
if "checksum" in request_context:
del request_context["checksum"]


class FakeRawResponse(BytesIO):
def stream(self, amt=1024, decode_content=None):
Expand Down Expand Up @@ -786,13 +794,18 @@ def _get_make_request_args_put_object(
else:
call_args.extra_args["Body"] = call_args.fileobj

checksum_algorithm = call_args.extra_args.pop(
'ChecksumAlgorithm', 'CRC32'
).upper()
checksum_config = awscrt.s3.S3ChecksumConfig(
algorithm=awscrt.s3.S3ChecksumAlgorithm[checksum_algorithm],
location=awscrt.s3.S3ChecksumLocation.TRAILER,
)
checksum_config = None
if not any(
checksum_arg in call_args.extra_args
for checksum_arg in FULL_OBJECT_CHECKSUM_ARGS
):
checksum_algorithm = call_args.extra_args.pop(
'ChecksumAlgorithm', 'CRC32'
).upper()
checksum_config = awscrt.s3.S3ChecksumConfig(
algorithm=awscrt.s3.S3ChecksumAlgorithm[checksum_algorithm],
location=awscrt.s3.S3ChecksumLocation.TRAILER,
)
# Suppress botocore's automatic MD5 calculation by setting an override
# value that will get deleted in the BotocoreCRTRequestSerializer.
# As part of the CRT S3 request, we request the CRT S3 client to
Expand Down
32 changes: 23 additions & 9 deletions s3transfer/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
import threading

from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
from s3transfer.constants import ALLOWED_DOWNLOAD_ARGS, KB, MB
from s3transfer.constants import (
ALLOWED_DOWNLOAD_ARGS,
FULL_OBJECT_CHECKSUM_ARGS,
KB,
MB,
)
from s3transfer.copies import CopySubmissionTask
from s3transfer.delete import DeleteSubmissionTask
from s3transfer.download import DownloadSubmissionTask
Expand All @@ -35,8 +40,8 @@
OSUtils,
SlidingWindowSemaphore,
TaskSemaphore,
add_s3express_defaults,
get_callbacks,
set_default_checksum_algorithm,
signal_not_transferring,
signal_transferring,
)
Expand Down Expand Up @@ -157,7 +162,7 @@ def _validate_attrs_are_nonzero(self):
class TransferManager:
ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS

ALLOWED_UPLOAD_ARGS = [
_ALLOWED_SHARED_ARGS = [
'ACL',
'CacheControl',
'ChecksumAlgorithm',
Expand Down Expand Up @@ -187,7 +192,16 @@ class TransferManager:
'WebsiteRedirectLocation',
]

ALLOWED_COPY_ARGS = ALLOWED_UPLOAD_ARGS + [
ALLOWED_UPLOAD_ARGS = (
_ALLOWED_SHARED_ARGS
+ [
'ChecksumType',
'MpuObjectSize',
]
+ FULL_OBJECT_CHECKSUM_ARGS
)

ALLOWED_COPY_ARGS = _ALLOWED_SHARED_ARGS + [
'CopySourceIfMatch',
'CopySourceIfModifiedSince',
'CopySourceIfNoneMatch',
Expand Down Expand Up @@ -315,13 +329,13 @@ def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
:rtype: s3transfer.futures.TransferFuture
:returns: Transfer future representing the upload
"""
if extra_args is None:
extra_args = {}

extra_args = extra_args.copy() if extra_args else {}
if subscribers is None:
subscribers = []
self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
self._validate_if_bucket_supported(bucket)
self._add_operation_defaults(bucket, extra_args)
self._add_operation_defaults(extra_args)
call_args = CallArgs(
fileobj=fileobj,
bucket=bucket,
Expand Down Expand Up @@ -504,8 +518,8 @@ def _validate_all_known_args(self, actual, allowed):
"must be one of: {}".format(kwarg, ', '.join(allowed))
)

def _add_operation_defaults(self, bucket, extra_args):
add_s3express_defaults(bucket, extra_args)
def _add_operation_defaults(self, extra_args):
set_default_checksum_algorithm(extra_args)

def _submit_transfer(
self, call_args, submission_task_cls, extra_main_kwargs=None
Expand Down
40 changes: 37 additions & 3 deletions s3transfer/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from io import BytesIO

from s3transfer.compat import readable, seekable
from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS
from s3transfer.futures import IN_MEMORY_UPLOAD_TAG
from s3transfer.tasks import (
CompleteMultipartUploadTask,
Expand Down Expand Up @@ -512,6 +513,10 @@ def _wrap_data(self, data, callbacks, close_callbacks):
class UploadSubmissionTask(SubmissionTask):
"""Task for submitting tasks to execute an upload"""

PUT_OBJECT_BLOCKLIST = ["ChecksumType", "MpuObjectSize"]

CREATE_MULTIPART_BLOCKLIST = FULL_OBJECT_CHECKSUM_ARGS + ["MpuObjectSize"]

UPLOAD_PART_ARGS = [
'ChecksumAlgorithm',
'SSECustomerKey',
Expand All @@ -527,7 +532,9 @@ class UploadSubmissionTask(SubmissionTask):
'SSECustomerKeyMD5',
'RequestPayer',
'ExpectedBucketOwner',
]
'ChecksumType',
'MpuObjectSize',
] + FULL_OBJECT_CHECKSUM_ARGS

def _get_upload_input_manager_cls(self, transfer_future):
"""Retrieves a class for managing input for an upload based on file type
Expand Down Expand Up @@ -621,6 +628,10 @@ def _submit_upload_request(
):
call_args = transfer_future.meta.call_args

put_object_extra_args = self._extra_put_object_args(
call_args.extra_args
)

# Get any tags that need to be associated to the put object task
put_object_tag = self._get_upload_task_tag(
upload_input_manager, 'put_object'
Expand All @@ -638,7 +649,7 @@ def _submit_upload_request(
),
'bucket': call_args.bucket,
'key': call_args.key,
'extra_args': call_args.extra_args,
'extra_args': put_object_extra_args,
},
is_final=True,
),
Expand All @@ -656,6 +667,19 @@ def _submit_multipart_request(
):
call_args = transfer_future.meta.call_args

# When a user provided checksum is passed, set "ChecksumType" to "FULL_OBJECT"
# and "ChecksumAlgorithm" to the related algorithm.
for checksum in FULL_OBJECT_CHECKSUM_ARGS:
if checksum in call_args.extra_args:
call_args.extra_args["ChecksumType"] = "FULL_OBJECT"
call_args.extra_args["ChecksumAlgorithm"] = checksum.replace(
"Checksum", ""
)

create_multipart_extra_args = self._extra_create_multipart_args(
call_args.extra_args
)

# Submit the request to create a multipart upload.
create_multipart_future = self._transfer_coordinator.submit(
request_executor,
Expand All @@ -665,7 +689,7 @@ def _submit_multipart_request(
'client': client,
'bucket': call_args.bucket,
'key': call_args.key,
'extra_args': call_args.extra_args,
'extra_args': create_multipart_extra_args,
},
),
)
Expand Down Expand Up @@ -739,6 +763,16 @@ def _extra_upload_part_args(self, extra_args):
def _extra_complete_multipart_args(self, extra_args):
return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)

def _extra_create_multipart_args(self, extra_args):
return get_filtered_dict(
extra_args, blocklisted_keys=self.CREATE_MULTIPART_BLOCKLIST
)

def _extra_put_object_args(self, extra_args):
return get_filtered_dict(
extra_args, blocklisted_keys=self.PUT_OBJECT_BLOCKLIST
)

def _get_upload_task_tag(self, upload_input_manager, operation_name):
tag = None
if upload_input_manager.stores_body_in_memory(operation_name):
Expand Down
29 changes: 24 additions & 5 deletions s3transfer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
ReadTimeoutError,
ResponseStreamingError,
)
from botocore.httpchecksum import AwsChunkedWrapper
from botocore.httpchecksum import DEFAULT_CHECKSUM_ALGORITHM, AwsChunkedWrapper
from botocore.utils import is_s3express_bucket

from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file
from s3transfer.constants import FULL_OBJECT_CHECKSUM_ARGS

MAX_PARTS = 10000
# The maximum file size you can upload via S3 per request.
Expand Down Expand Up @@ -148,20 +149,27 @@ def invoke_progress_callbacks(callbacks, bytes_transferred):
callback(bytes_transferred=bytes_transferred)


def get_filtered_dict(original_dict, whitelisted_keys):
"""Gets a dictionary filtered by whitelisted keys
def get_filtered_dict(
original_dict, whitelisted_keys=None, blocklisted_keys=None
):
"""Gets a dictionary filtered by whitelisted and blocklisted keys.
:param original_dict: The original dictionary of arguments to source keys
and values.
:param whitelisted_key: A list of keys to include in the filtered
dictionary.
:param blocklisted_key: A list of keys to exclude in the filtered
dictionary.
:returns: A dictionary containing key/values from the original dictionary
whose key was included in the whitelist
whose key was included in the whitelist and/or not included in the
blocklist.
"""
filtered_dict = {}
for key, value in original_dict.items():
if key in whitelisted_keys:
if (whitelisted_keys and key in whitelisted_keys) or (
blocklisted_keys and key not in blocklisted_keys
):
filtered_dict[key] = value
return filtered_dict

Expand Down Expand Up @@ -809,6 +817,17 @@ def _adjust_for_max_parts(self, current_chunksize, file_size):


def add_s3express_defaults(bucket, extra_args):
"""
This function has been deprecated, but is kept for backwards compatibility.
This function is subject to removal in a future release.
"""
if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args:
# Default Transfer Operations to S3Express to use CRC32
extra_args["ChecksumAlgorithm"] = "crc32"


def set_default_checksum_algorithm(extra_args):
"""Set the default algorithm to CRC32 if not specified by the user."""
if any(checksum in extra_args for checksum in FULL_OBJECT_CHECKSUM_ARGS):
return
extra_args.setdefault("ChecksumAlgorithm", DEFAULT_CHECKSUM_ALGORITHM)
Loading

0 comments on commit 0778911

Please sign in to comment.