Skip to content

Commit

Permalink
Add full object checksum callback (#473)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Graeb <[email protected]>
  • Loading branch information
TingDaoK and graebm authored Dec 10, 2024
1 parent 337155f commit 1e8a980
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 41 deletions.
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_checksums.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ struct checksum_config_storage {
struct aws_byte_buf full_object_checksum;
bool has_full_object_checksum;

aws_s3_meta_request_full_object_checksum_fn *full_object_checksum_callback;
void *user_data;

enum aws_s3_checksum_location location;
enum aws_s3_checksum_algorithm checksum_algorithm;
bool validate_response_checksum;
Expand Down
7 changes: 7 additions & 0 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,13 @@ bool aws_s3_meta_request_checksum_config_has_algorithm(
struct aws_s3_meta_request *meta_request,
enum aws_s3_checksum_algorithm algorithm);

void aws_s3_meta_request_schedule_prepare_request_default_impl(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request,
bool parallel,
aws_s3_meta_request_prepare_request_callback_fn *callback,
void *user_data);

AWS_EXTERN_C_END

#endif /* AWS_S3_META_REQUEST_IMPL_H */
34 changes: 34 additions & 0 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ typedef void(aws_s3_meta_request_shutdown_fn)(void *user_data);

typedef void(aws_s3_client_shutdown_complete_callback_fn)(void *user_data);

/**
* Optional callback, for you to provide the full object checksum after the object was read.
* Client will NOT check the checksum provided before sending it to the server.
*
* @param meta_request pointer to the aws_s3_meta_request of the upload.
* @param user_data pointer to the user_data set.
*
* @return A new string with the full object checksum, as it is sent in a PutObject request (base64-encoded):
* https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
* If an error occurs, call aws_raise_error(E) with a proper error code and return NULL.
*/
typedef struct aws_string *(aws_s3_meta_request_full_object_checksum_fn)(struct aws_s3_meta_request *meta_request,
void *user_data);

enum aws_s3_meta_request_tls_mode {
AWS_MR_TLS_ENABLED,
AWS_MR_TLS_DISABLED,
Expand Down Expand Up @@ -557,6 +571,7 @@ struct aws_s3_client_config {

struct aws_s3_checksum_config {

/****************************** PUT Object specific *******************************/
/**
* The location of client added checksum header.
*
Expand All @@ -576,6 +591,25 @@ struct aws_s3_checksum_config {
*/
enum aws_s3_checksum_algorithm checksum_algorithm;

/**
* Optional.
* Provide the full object checksum. This callback is invoked once, after the entire body has been read.
* sent.
*
* NOTE:
* - Do not set this callback if the HTTP message already has a checksum header (e.g. x-amz-checksum-crc32). Doing
* so will raise AWS_ERROR_INVALID_ARGUMENT.
* - checksum_algorithm must be set to the algorithm you will use.
*
* WARNING: This feature is experimental/unstable.
* At this time, full object checksum callback is only available for multipart upload
* (when Content-Length is above the `multipart_upload_threshold`,
* or Content-Length not specified). Otherwise, it will be ignored.
*/
aws_s3_meta_request_full_object_checksum_fn *full_object_checksum_callback;
void *user_data;

/****************************** GET Object specific *******************************/
/**
* Enable checksum mode header will be attached to GET requests, this will tell s3 to send back checksums headers if
* they exist. Calculate the corresponding checksum on the response bodies. The meta request will finish with a did
Expand Down
65 changes: 62 additions & 3 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ static bool s_s3_auto_ranged_put_update(
uint32_t flags,
struct aws_s3_request **out_request);

static void s_s3_auto_ranged_put_schedule_prepare_request(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request,
aws_s3_meta_request_prepare_request_callback_fn *callback,
void *user_data);

static struct aws_future_void *s_s3_auto_ranged_put_prepare_request(struct aws_s3_request *request);
static void s_s3_auto_ranged_put_prepare_request_finish(void *user_data);

Expand Down Expand Up @@ -297,6 +303,7 @@ static int s_try_init_resume_state_from_persisted_data(
static struct aws_s3_meta_request_vtable s_s3_auto_ranged_put_vtable = {
.update = s_s3_auto_ranged_put_update,
.send_request_finish = s_s3_auto_ranged_put_send_request_finish,
.schedule_prepare_request = s_s3_auto_ranged_put_schedule_prepare_request,
.prepare_request = s_s3_auto_ranged_put_prepare_request,
.init_signing_date_time = aws_s3_meta_request_init_signing_date_time_default,
.sign_request = aws_s3_meta_request_sign_request_default,
Expand Down Expand Up @@ -342,15 +349,22 @@ static int s_init_and_verify_checksum_config_from_headers(
if (checksum_config->checksum_algorithm != AWS_SCA_NONE && checksum_config->checksum_algorithm != header_algo) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p Could not create auto-ranged-put meta request; checksum config mismatch the checksum from header.",
"id=%p: Could not create auto-ranged-put meta request; checksum config mismatch the checksum from header.",
log_id);
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}
if (checksum_config->has_full_object_checksum) {
/* If the full object checksum has been set, it's malformed request */
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Could not create auto-ranged-put meta request; full object checksum is set from multiple ways.",
log_id);
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}
AWS_ASSERT(!checksum_config->has_full_object_checksum);

AWS_LOGF_DEBUG(
AWS_LS_S3_META_REQUEST,
"id=%p Setting the full-object checksum from header; algorithm: " PRInSTR ", value: " PRInSTR ".",
"id=%p: Setting the full-object checksum from header; algorithm: " PRInSTR ", value: " PRInSTR ".",
log_id,
AWS_BYTE_CURSOR_PRI(aws_get_checksum_algorithm_name(header_algo)),
AWS_BYTE_CURSOR_PRI(header_value));
Expand Down Expand Up @@ -887,6 +901,23 @@ static int s_verify_part_matches_checksum(
return return_status;
}

void s_s3_auto_ranged_put_schedule_prepare_request(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request,
aws_s3_meta_request_prepare_request_callback_fn *callback,
void *user_data) {
AWS_PRECONDITION(meta_request);
AWS_PRECONDITION(request);

/* When the body stream supports reading in parallel, and it's upload parts, do parallel preparation to speed up
* reading. */
bool parallel_prepare =
(meta_request->request_body_parallel_stream && request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART);

aws_s3_meta_request_schedule_prepare_request_default_impl(
meta_request, request, parallel_prepare /*parallel*/, callback, user_data);
}

/* Given a request, prepare it for sending based on its description. */
static struct aws_future_void *s_s3_auto_ranged_put_prepare_request(struct aws_s3_request *request) {

Expand Down Expand Up @@ -1291,11 +1322,39 @@ static struct aws_future_http_message *s_s3_prepare_complete_multipart_upload(st
AWS_FATAL_ASSERT(auto_ranged_put->upload_id);

if (request->num_times_prepared == 0) {
/**
* The prepare stage for CompleteMultipartUpload is guaranteed to happen from the the meta-request's
* io_event_loop thread. So that it's safe to invoke the callback from this stage and not overlapping with any
* other callbacks.
*/

/* Invoke upload_review_callback, and fail meta-request if user raises an error */
if (s_s3_review_multipart_upload(request) != AWS_OP_SUCCESS) {
aws_future_http_message_set_error(message_future, aws_last_error());
goto on_done;
}
if (auto_ranged_put->base.checksum_config.full_object_checksum_callback) {
/* Invoke the callback to fill up the full object checksum. Let server side to verify the checksum. */
struct aws_string *result = auto_ranged_put->base.checksum_config.full_object_checksum_callback(
meta_request, auto_ranged_put->base.checksum_config.user_data);
if (!result) {
int error_code = aws_last_error_or_unknown();
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Full object checksum callback raised error %d (%s)",
(void *)meta_request,
error_code,
aws_error_str(error_code));
/* Error from the callback */
aws_future_http_message_set_error(message_future, error_code);
goto on_done;
}
aws_byte_buf_init_copy_from_cursor(
&auto_ranged_put->base.checksum_config.full_object_checksum,
allocator,
aws_byte_cursor_from_string(result));
aws_string_destroy(result);
}

/* Allocate request body */
aws_byte_buf_init(
Expand Down
7 changes: 7 additions & 0 deletions source/s3_checksums.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ void aws_checksum_config_storage_init(
internal_config->location = config->location;
internal_config->validate_response_checksum = config->validate_response_checksum;

internal_config->full_object_checksum_callback = config->full_object_checksum_callback;
internal_config->user_data = config->user_data;
if (internal_config->full_object_checksum_callback) {
/* allocate the full object checksum when the callback was set. */
internal_config->has_full_object_checksum = true;
}

if (config->validate_checksum_algorithms) {
const size_t count = aws_array_list_length(config->validate_checksum_algorithms);
for (size_t i = 0; i < count; ++i) {
Expand Down
35 changes: 21 additions & 14 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,6 @@ bool aws_s3_meta_request_is_finished(struct aws_s3_meta_request *meta_request) {
return is_finished;
}

static void s_s3_meta_request_prepare_request_task(struct aws_task *task, void *arg, enum aws_task_status task_status);
static void s_s3_meta_request_on_request_prepared(void *user_data);

/* TODO: document how this is final step in prepare-request sequence.
* Could be invoked on any thread. */
static void s_s3_prepare_request_payload_callback_and_destroy(
Expand Down Expand Up @@ -630,7 +627,11 @@ static void s_s3_meta_request_schedule_prepare_request_default(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request,
aws_s3_meta_request_prepare_request_callback_fn *callback,
void *user_data);
void *user_data) {
/* By default, don't do any parallel. */
aws_s3_meta_request_schedule_prepare_request_default_impl(
meta_request, request, false /*parallel*/, callback, user_data);
}

void aws_s3_meta_request_prepare_request(
struct aws_s3_meta_request *meta_request,
Expand All @@ -647,9 +648,13 @@ void aws_s3_meta_request_prepare_request(
}
}

static void s_s3_meta_request_schedule_prepare_request_default(
static void s_s3_meta_request_prepare_request_task(struct aws_task *task, void *arg, enum aws_task_status task_status);
static void s_s3_meta_request_on_request_prepared(void *user_data);

void aws_s3_meta_request_schedule_prepare_request_default_impl(
struct aws_s3_meta_request *meta_request,
struct aws_s3_request *request,
bool parallel,
aws_s3_meta_request_prepare_request_callback_fn *callback,
void *user_data) {
AWS_PRECONDITION(meta_request);
Expand All @@ -671,9 +676,10 @@ static void s_s3_meta_request_schedule_prepare_request_default(

aws_task_init(
&payload->task, s_s3_meta_request_prepare_request_task, payload, "s3_meta_request_prepare_request_task");
if (meta_request->request_body_parallel_stream) {
/* The body stream supports reading in parallel, so schedule task on any I/O thread.
* If we always used the meta-request's dedicated io_event_loop, we wouldn't get any parallelism. */

if (parallel) {
/* To support reading in parallel, schedule task on any I/O thread in the streaming elg.
* Otherwise, we wouldn't get any parallelism. */
struct aws_event_loop *loop = aws_event_loop_group_get_next_loop(client->body_streaming_elg);
aws_event_loop_schedule_task_now(loop, &payload->task);
} else {
Expand Down Expand Up @@ -885,8 +891,8 @@ static void s_meta_request_resolve_signing_config(
AWS_FATAL_ASSERT(false);
}

/* If the checksum is configured to be added to the trailer, the payload will be aws-chunked encoded. The payload
* will need to be streaming signed/unsigned. */
/* If the checksum is configured to be added to the trailer, the payload will be aws-chunked encoded. The
* payload will need to be streaming signed/unsigned. */
if (meta_request->checksum_config.location == AWS_SCL_TRAILER &&
aws_byte_cursor_eq(&out_signing_config->signed_body_value, &g_aws_signed_body_value_unsigned_payload)) {
out_signing_config->signed_body_value = g_aws_signed_body_value_streaming_unsigned_payload_trailer;
Expand Down Expand Up @@ -1138,8 +1144,8 @@ void aws_s3_meta_request_send_request(struct aws_s3_meta_request *meta_request,
goto error_finish;
}

/* Activate the stream within the lock as once the activate invoked, the HTTP level callback can happen right
* after. */
/* Activate the stream within the lock as once the activate invoked, the HTTP level callback can happen
* right after. */
if (aws_http_stream_activate(stream) != AWS_OP_SUCCESS) {
aws_s3_meta_request_unlock_synced_data(meta_request);
AWS_LOGF_ERROR(
Expand Down Expand Up @@ -1338,8 +1344,9 @@ static int s_s3_meta_request_headers_block_done(
AWS_PRECONDITION(meta_request);

/*
* When downloading parts via partNumber, if the size is larger than expected, cancel the request immediately so we
* don't end up downloading more into memory than we can handle. We'll retry the download using ranged gets instead.
* When downloading parts via partNumber, if the size is larger than expected, cancel the request immediately so
* we don't end up downloading more into memory than we can handle. We'll retry the download using ranged gets
* instead.
*/
if (request->request_type == AWS_S3_REQUEST_TYPE_GET_OBJECT &&
request->request_tag == AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_GET_OBJECT_WITH_PART_NUMBER_1) {
Expand Down
6 changes: 4 additions & 2 deletions source/s3_request_messages.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ struct aws_http_message *aws_s3_create_multipart_upload_message_new(
goto error_clean_up;
}
}
if (checksum_config && checksum_config->location != AWS_SCL_NONE) {

if (checksum_config && (checksum_config->location != AWS_SCL_NONE || checksum_config->has_full_object_checksum)) {
if (checksum_config->checksum_algorithm) {
if (aws_http_headers_set(
headers,
Expand Down Expand Up @@ -595,7 +596,8 @@ struct aws_http_message *aws_s3_complete_multipart_message_new(
struct aws_byte_cursor mpu_algorithm_checksum_name;
AWS_ZERO_STRUCT(mpu_algorithm_checksum_name);
struct aws_http_message *message = NULL;
bool set_checksums = checksum_config && checksum_config->location != AWS_SCL_NONE;
bool set_checksums =
checksum_config && (checksum_config->location != AWS_SCL_NONE || checksum_config->has_full_object_checksum);
const struct aws_http_headers *initial_message_headers = aws_http_message_get_headers(base_message);
AWS_ASSERT(initial_message_headers);
if (set_checksums) {
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ add_net_test_case(test_s3_round_trip_with_filepath_no_content_length)
add_net_test_case(test_s3_round_trip_mpu_with_filepath_no_content_length)
add_net_test_case(test_s3_round_trip_mpu_multipart_get_full_object_checksum_fc)
add_net_test_case(test_s3_round_trip_mpu_multipart_get_full_object_checksum_fc_header)
add_net_test_case(test_s3_round_trip_mpu_multipart_get_full_object_checksum_via_callback)
add_net_test_case(test_s3_chunked_then_unchunked)

add_net_test_case(test_s3_cancel_mpu_one_part_completed_fc)
Expand Down
Loading

0 comments on commit 1e8a980

Please sign in to comment.