Skip to content

Commit

Permalink
Support request level part size configuration (#539)
Browse files Browse the repository at this point in the history
  -  Add support for request level override for part size and upload threshold
  -  Add support for on_done callback with the did_validate checksum and the checksum algorithm.
  -  Submodules update for several S3 related improvements:
       - Fix Get-Object with partNumber
       - Skip HeaderRequest for ranged get
       - Cancel/Pause s3 request now faster (will cancel the ongoing HTTP requests now)


Co-authored-by: Michael Graeb <[email protected]>
  • Loading branch information
TingDaoK and graebm committed Dec 30, 2023
1 parent 9a91961 commit 3b6c349
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 12 deletions.
68 changes: 60 additions & 8 deletions awscrt/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,14 @@ class S3Client(NativeResource):
for each connection, unless `tls_mode` is :attr:`S3RequestTlsMode.DISABLED`
part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
Note: for :attr:`S3RequestType.PUT_OBJECT` request, S3 requires the part size greater than 5 MiB.
(8*1024*1024 by default)
Note: for :attr:`S3RequestType.PUT_OBJECT` request, client will adjust the part size to meet the service limits.
(max number of parts per upload is 10,000, minimum upload part size is 5 MiB)
multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads.
This only affects :attr:`S3RequestType.PUT_OBJECT` request.
Uploads over this size will use the multipart upload strategy.
Uploads this size or less will use a single request.
If not set, `part_size` is used as the threshold.
If not set, maximal of `part_size` and 5 MiB will be used.
throughput_target_gbps (Optional[float]): Throughput target in
Gigabits per second (Gbps) that we are trying to reach.
Expand Down Expand Up @@ -296,6 +297,8 @@ def make_request(
signing_config=None,
credential_provider=None,
checksum_config=None,
part_size=None,
multipart_upload_threshold=None,
on_headers=None,
on_body=None,
on_done=None,
Expand Down Expand Up @@ -347,6 +350,20 @@ def make_request(
checksum_config (Optional[S3ChecksumConfig]): Optional checksum settings.
part_size (Optional[int]): Size, in bytes, of parts that files will be downloaded or uploaded in.
If not set, the part size configured for the client will be used.
Note: for :attr:`S3RequestType.PUT_OBJECT` request, client will adjust the part size to meet the service limits.
(max number of parts per upload is 10,000, minimum upload part size is 5 MiB)
multipart_upload_threshold (Optional[int]): The size threshold in bytes, for when to use multipart uploads.
This only affects :attr:`S3RequestType.PUT_OBJECT` request.
Uploads over this size will use the multipart upload strategy.
Uploads this size or less will use a single request.
If set, this should be at least `part_size`.
If not set, `part_size` adjusted by client will be used as the threshold.
If both `part_size` and `multipart_upload_threshold` are not set,
the values from `aws_s3_client_config` are used.
on_headers: Optional callback invoked as the response received, and even the API request
has been split into multiple parts, this callback will only be invoked once as
it's just making one API request to S3.
Expand Down Expand Up @@ -401,6 +418,13 @@ def make_request(
this is the final response's status code. If the operation
failed for another reason, None is returned.
* `did_validate_checksum` (bool):
Was the server side checksum compared against a calculated checksum of the response body.
This may be false even if :attr:`S3ChecksumConfig.validate_response` was set because
the object was uploaded without a checksum, or downloaded differently from how it's uploaded.
* `checksum_validation_algorithm` (Optional[S3ChecksumAlgorithm]): The checksum algorithm used to validate the response.
* `**kwargs` (dict): Forward-compatibility kwargs.
on_progress: Optional callback invoked when part of the transfer is done to report the progress.
Expand All @@ -423,6 +447,8 @@ def make_request(
signing_config=signing_config,
credential_provider=credential_provider,
checksum_config=checksum_config,
part_size=part_size,
multipart_upload_threshold=multipart_upload_threshold,
on_headers=on_headers,
on_body=on_body,
on_done=on_done,
Expand Down Expand Up @@ -458,6 +484,8 @@ def __init__(
signing_config=None,
credential_provider=None,
checksum_config=None,
part_size=None,
multipart_upload_threshold=None,
on_headers=None,
on_body=None,
on_done=None,
Expand All @@ -468,14 +496,21 @@ def __init__(
assert callable(on_headers) or on_headers is None
assert callable(on_body) or on_body is None
assert callable(on_done) or on_done is None
assert isinstance(part_size, int) or part_size is None
assert isinstance(multipart_upload_threshold, int) or multipart_upload_threshold is None

super().__init__()

self._finished_future = Future()
self.shutdown_event = threading.Event()

checksum_algorithm = 0 # 0 means NONE in C
checksum_location = 0 # 0 means NONE in C
# C layer uses 0 to indicate defaults
if part_size is None:
part_size = 0
if multipart_upload_threshold is None:
multipart_upload_threshold = 0
checksum_algorithm = 0
checksum_location = 0
validate_response_checksum = False
if checksum_config is not None:
if checksum_config.algorithm is not None:
Expand Down Expand Up @@ -509,6 +544,8 @@ def __init__(
checksum_algorithm,
checksum_location,
validate_response_checksum,
part_size,
multipart_upload_threshold,
s3_request_core)

@property
Expand Down Expand Up @@ -623,15 +660,22 @@ def _on_body(self, chunk, offset):
def _on_shutdown(self):
self._shutdown_event.set()

def _on_finish(self, error_code, status_code, error_headers, error_body, error_operation_name):
def _on_finish(
self,
error_code,
status_code,
error_headers,
error_body,
error_operation_name,
did_validate_checksum,
checksum_validation_algorithm):
# If C layer gives status_code 0, that means "unknown"
if status_code == 0:
status_code = None

error = None
if error_code:
error = awscrt.exceptions.from_code(error_code)

if isinstance(error, awscrt.exceptions.AwsCrtError):
if (error.name == "AWS_ERROR_CRT_CALLBACK_EXCEPTION"
and self._python_callback_exception is not None):
Expand All @@ -651,13 +695,21 @@ def _on_finish(self, error_code, status_code, error_headers, error_body, error_o
self._finished_future.set_exception(error)
else:
self._finished_future.set_result(None)

if checksum_validation_algorithm:
checksum_validation_algorithm = S3ChecksumAlgorithm(checksum_validation_algorithm)
else:
checksum_validation_algorithm = None

if self._on_done_cb:
self._on_done_cb(
error=error,
error_headers=error_headers,
error_body=error_body,
error_operation_name=error_operation_name,
status_code=status_code)
status_code=status_code,
did_validate_checksum=did_validate_checksum,
checksum_validation_algorithm=checksum_validation_algorithm)

def _on_progress(self, progress):
if self._on_progress_cb:
Expand Down
14 changes: 11 additions & 3 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,15 @@ static void s_s3_request_on_finish(
result = PyObject_CallMethod(
request_binding->py_core,
"_on_finish",
"(iiOy#s)",
"(iiOy#sOi)",
error_code,
meta_request_result->response_status,
header_list ? header_list : Py_None,
(const char *)(error_body.buffer),
(Py_ssize_t)error_body.len,
operation_name);
operation_name,
meta_request_result->did_validate ? Py_True : Py_False,
(int)meta_request_result->validation_algorithm);

if (result) {
Py_DECREF(result);
Expand Down Expand Up @@ -372,10 +374,12 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
enum aws_s3_checksum_algorithm checksum_algorithm; /* i */
enum aws_s3_checksum_location checksum_location; /* i */
int validate_response_checksum; /* p - boolean predicate */
uint64_t part_size; /* K */
uint64_t multipart_upload_threshold; /* K */
PyObject *py_core; /* O */
if (!PyArg_ParseTuple(
args,
"OOOizOOzzs#iipO",
"OOOizOOzzs#iipKKO",
&py_s3_request,
&s3_client_py,
&http_request_py,
Expand All @@ -390,6 +394,8 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
&checksum_algorithm,
&checksum_location,
&validate_response_checksum,
&part_size,
&multipart_upload_threshold,
&py_core)) {
return NULL;
}
Expand Down Expand Up @@ -470,6 +476,8 @@ PyObject *aws_py_s3_client_make_meta_request(PyObject *self, PyObject *args) {
.finish_callback = s_s3_request_on_finish,
.shutdown_callback = s_s3_request_on_shutdown,
.progress_callback = s_s3_request_on_progress,
.part_size = part_size,
.multipart_upload_threshold = multipart_upload_threshold,
.user_data = meta_request,
};

Expand Down
54 changes: 53 additions & 1 deletion test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ def setUp(self):
self.done_error_headers = None
self.done_error_body = None
self.done_error_operation_name = None
self.done_did_validate_checksum = None
self.done_checksum_validation_algorithm = None

self.files = FileCreator()
self.temp_put_obj_file_path = self.files.create_file_with_size("temp_put_obj_10mb", 10 * MB)
Expand Down Expand Up @@ -305,12 +307,23 @@ def _on_request_headers(self, status_code, headers, **kargs):
def _on_request_body(self, chunk, offset, **kargs):
self.received_body_len = self.received_body_len + len(chunk)

def _on_request_done(self, error, error_headers, error_body, error_operation_name, status_code, **kwargs):
def _on_request_done(
self,
error,
error_headers,
error_body,
error_operation_name,
status_code,
did_validate_checksum,
checksum_validation_algorithm,
**kwargs):
self.done_error = error
self.done_error_headers = error_headers
self.done_error_body = error_body
self.done_error_operation_name = error_operation_name
self.done_status_code = status_code
self.done_did_validate_checksum = did_validate_checksum
self.done_checksum_validation_algorithm = checksum_validation_algorithm

def _on_progress(self, progress):
self.transferred_len += progress
Expand All @@ -323,6 +336,11 @@ def _validate_successful_response(self, is_put_object):
self.assertIsNone(self.done_error_headers)
self.assertIsNone(self.done_error_body)
self.assertIsNone(self.done_error_operation_name)
self.assertIsInstance(self.done_did_validate_checksum, bool)
if self.done_did_validate_checksum:
self.assertIsInstance(self.done_checksum_validation_algorithm, S3ChecksumAlgorithm)
else:
self.assertIsNone(self.done_checksum_validation_algorithm)
headers = HttpHeaders(self.response_headers)
self.assertIsNone(headers.get("Content-Range"))
body_length = headers.get("Content-Length")
Expand Down Expand Up @@ -452,6 +470,38 @@ def test_put_object_multiple_times(self):
del s3_client
self.assertTrue(client_shutdown_event.wait(self.timeout))

def test_put_object_request_override_part_size(self):
s3_client = s3_client_new(False, self.region, 5 * MB)

tempfile = self.files.create_file_with_size("temp_file_override", 10 * MB)
path = "/put_object_test_py_10MB_override.txt"
content_length = os.stat(tempfile).st_size
request = self._put_object_request(None, content_length, path=path)
# Override the threshold to 10 MB, which will result in a single part upload
s3_request = s3_client.make_request(
request=request,
type=S3RequestType.PUT_OBJECT,
send_filepath=tempfile,
on_headers=self._on_request_headers,
on_body=self._on_request_body,
on_done=self._on_request_done,
multipart_upload_threshold=10 * MB)
try:
s3_request.finished_future.result(self.timeout)
except Exception as e:
# failed
self.assertTrue(False)

# Etag headers for a MPU will be formatted with `-[part number]`
etag = HttpHeaders(self.response_headers).get("Etag")
# make sure we uploaded as single part as we override the threshold
self.assertFalse("-" in etag)

del s3_request
client_shutdown_event = s3_client.shutdown_event
del s3_client
self.assertTrue(client_shutdown_event.wait(self.timeout))

def test_get_object_filepath(self):
request = self._get_object_request(self.get_test_object_path)
request_type = S3RequestType.GET_OBJECT
Expand Down Expand Up @@ -559,6 +609,8 @@ def test_put_get_with_checksum(self):
download_checksum_config = S3ChecksumConfig(validate_response=True)
self._test_s3_put_get_object(download_request, S3RequestType.GET_OBJECT,
checksum_config=download_checksum_config)
self.assertTrue(self.done_did_validate_checksum)
self.assertEqual(self.done_checksum_validation_algorithm, S3ChecksumAlgorithm.CRC32)
self.assertEqual(HttpHeaders(self.response_headers).get('x-amz-checksum-crc32'),
crc32_base64_str)

Expand Down

0 comments on commit 3b6c349

Please sign in to comment.