From f710806f290d9a4d34c4c5ce07be66d314b6fba2 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Mon, 16 Oct 2023 14:49:57 -0700 Subject: [PATCH] instead of skipping, let's prepare the parts and mark it noop (#356) Co-authored-by: Michael Graeb --- include/aws/s3/private/s3_auto_ranged_put.h | 13 - include/aws/s3/private/s3_meta_request_impl.h | 1 + include/aws/s3/private/s3_request.h | 3 + source/s3_auto_ranged_put.c | 445 ++++-------------- tests/s3_data_plane_tests.c | 5 +- 5 files changed, 100 insertions(+), 367 deletions(-) diff --git a/include/aws/s3/private/s3_auto_ranged_put.h b/include/aws/s3/private/s3_auto_ranged_put.h index 283d5556e..b10d83388 100644 --- a/include/aws/s3/private/s3_auto_ranged_put.h +++ b/include/aws/s3/private/s3_auto_ranged_put.h @@ -47,19 +47,6 @@ struct aws_s3_auto_ranged_put { uint32_t next_part_number; } threaded_update_data; - /* - * Should only be used during prepare requests. Note: stream reads must be sequential, - * so prepare currently never runs concurrently with another prepare - */ - struct { - /* - * Start index of skipping parts. - * This is used to keep track of how many parts have been read from input steam and where to try to start - * skipping parts from. - */ - uint32_t part_index_for_skipping; - } prepare_data; - /* Members to only be used when the mutex in the base type is locked. */ struct { /* Array list of `struct aws_s3_mpu_part_info *` diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 0c1c9fd83..49c7af0b9 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -262,6 +262,7 @@ struct aws_s3_mpu_part_info { uint64_t size; struct aws_string *etag; struct aws_byte_buf checksum_base64; + bool was_previously_uploaded; }; AWS_EXTERN_C_BEGIN diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index 620077f62..9fb66cc02 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -202,6 +202,9 @@ struct aws_s3_request { * requests for uploading data after the end of the stream (those requests * will use below flag to indicate that they should not be sent). */ uint32_t is_noop : 1; + + /* When true, this request has already been uploaded. we still prepare the request to check the durability. */ + uint32_t was_previously_uploaded : 1; }; AWS_EXTERN_C_BEGIN diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index 946d6137c..974d89adf 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -57,10 +57,8 @@ struct aws_s3_auto_ranged_put_prepare_request_job { struct aws_s3_prepare_upload_part_job { struct aws_allocator *allocator; struct aws_s3_request *request; - /* async step: skip parts from input stream that were previously uploaded */ - struct aws_future_void *asyncstep1_skip_prev_parts; /* async step: read this part from input stream */ - struct aws_future_bool *asyncstep2_read_part; + struct aws_future_bool *asyncstep_read_part; /* future to set when this job completes */ struct aws_future_http_message *on_complete; }; @@ -69,8 +67,6 @@ struct aws_s3_prepare_upload_part_job { struct aws_s3_prepare_complete_multipart_upload_job { struct aws_allocator *allocator; struct aws_s3_request *request; - /* async step: skip remaining parts from input stream that were previously uploaded */ - struct aws_future_void *asyncstep_skip_remaining_parts; /* future to set when this job completes */ struct aws_future_http_message *on_complete; }; @@ -82,8 +78,6 @@ static bool s_s3_auto_ranged_put_update( uint32_t flags, struct aws_s3_request **out_request); -static void s_skip_parts_from_stream_loop(void *user_data); - static struct aws_future_void *s_s3_auto_ranged_put_prepare_request(struct aws_s3_request *request); static void s_s3_auto_ranged_put_prepare_request_finish(void *user_data); @@ -92,15 +86,10 @@ static struct aws_future_http_message *s_s3_prepare_list_parts(struct aws_s3_req static struct aws_future_http_message *s_s3_prepare_create_multipart_upload(struct aws_s3_request *request); static struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *request); -static void s_s3_prepare_upload_part_on_skipping_done(void *user_data); static void s_s3_prepare_upload_part_on_read_done(void *user_data); static void s_s3_prepare_upload_part_finish(struct aws_s3_prepare_upload_part_job *part_prep, int error_code); static struct aws_future_http_message *s_s3_prepare_complete_multipart_upload(struct aws_s3_request *request); -static void s_s3_prepare_complete_multipart_upload_on_skipping_done(void *user_data); -static void s_s3_prepare_complete_multipart_upload_finish( - struct aws_s3_prepare_complete_multipart_upload_job *complete_mpu_prep, - int error_code); static struct aws_future_http_message *s_s3_prepare_abort_multipart_upload(struct aws_s3_request *request); @@ -128,6 +117,7 @@ static int s_process_part_info_synced(const struct aws_s3_part_info *info, void struct aws_s3_mpu_part_info *part = aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info)); part->size = info->size; part->etag = aws_strip_quotes(meta_request->allocator, info->e_tag); + part->was_previously_uploaded = true; const struct aws_byte_cursor *checksum_cur = NULL; switch (auto_ranged_put->base.checksum_config.checksum_algorithm) { @@ -374,7 +364,6 @@ struct aws_s3_meta_request *aws_s3_meta_request_auto_ranged_put_new( aws_s3_meta_request_resume_token_acquire(auto_ranged_put->resume_token); auto_ranged_put->threaded_update_data.next_part_number = 1; - auto_ranged_put->prepare_data.part_index_for_skipping = 0; auto_ranged_put->synced_data.is_body_stream_at_end = false; uint32_t initial_num_parts = auto_ranged_put->has_content_length ? num_parts : s_unknown_length_default_num_parts; @@ -518,31 +507,20 @@ static bool s_s3_auto_ranged_put_update( } bool should_create_next_part_request = false; + bool request_previously_uploaded = false; if (auto_ranged_put->has_content_length && (auto_ranged_put->synced_data.num_parts_started < auto_ranged_put->total_num_parts_from_content_length)) { - - /* Check if the etag/checksum list has the result already */ - for (size_t part_index = auto_ranged_put->threaded_update_data.next_part_number - 1; - part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list); - ++part_index) { - - struct aws_s3_mpu_part_info *part; - aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); - if (part != NULL) { - /* part already downloaded, skip it here and prepare will take care of adjusting the buffer */ - ++auto_ranged_put->threaded_update_data.next_part_number; - - } else { - // incomplete part found. break out and create request for it. - break; - } + /* Check if next part was previously uploaded (due to resume) */ + size_t part_index = auto_ranged_put->threaded_update_data.next_part_number - 1; + + struct aws_s3_mpu_part_info *part = NULL; + aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); + if (part != NULL) { + AWS_ASSERT(part->was_previously_uploaded == true); + /* This part has been uploaded. */ + request_previously_uploaded = true; } - // Something went really wrong. we still have parts to send, but have etags for all parts - AWS_FATAL_ASSERT( - auto_ranged_put->threaded_update_data.next_part_number <= - auto_ranged_put->total_num_parts_from_content_length); - if (s_should_skip_scheduling_more_parts_based_on_flags(auto_ranged_put, flags)) { goto has_work_remaining; } @@ -569,6 +547,10 @@ static bool s_s3_auto_ranged_put_update( request->part_number = auto_ranged_put->threaded_update_data.next_part_number; + /* If request was previously uploaded, we prepare it to ensure checksums still match, + * but ultimately it gets marked no-op and we don't send it */ + request->was_previously_uploaded = request_previously_uploaded; + ++auto_ranged_put->threaded_update_data.next_part_number; ++auto_ranged_put->synced_data.num_parts_started; ++auto_ranged_put->synced_data.num_parts_pending_read; @@ -802,184 +784,6 @@ static int s_verify_part_matches_checksum( return return_status; } -/* Data for the async skip-parts-from-stream job */ -struct aws_s3_skip_parts_from_stream_job { - struct aws_allocator *allocator; - struct aws_s3_meta_request *meta_request; - uint32_t part_index; - uint32_t skip_until_part_number; - struct aws_byte_buf temp_body_buf; - /* non-owning reference to info about part we're currently skipping */ - struct aws_s3_mpu_part_info *part_being_skipped; - /* repeated async step: read each part we're skipping */ - struct aws_future_bool *asyncstep_read_each_part; - /* future to set when this job completes */ - struct aws_future_void *on_complete; -}; - -/** - * Async function that skips parts from input stream that were previously uploaded. - * Assumes input stream has part_index_for_skipping specifying which part stream is on - * and will read into temp buffer until it gets to skip_until_part_number (i.e. skipping does include - * that part). If checksum is set on the request and parts with checksums were uploaded before, checksum will be - * verified. - * - * Note: If there's no content_length, pause/resume is not supported, so skipping parts will be noop. - */ -static struct aws_future_void *s_skip_parts_from_stream( - struct aws_s3_meta_request *meta_request, - uint32_t num_parts_read_from_stream, - uint32_t skip_until_part_number) { - - AWS_PRECONDITION(meta_request); - - const struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; - (void)auto_ranged_put; - - struct aws_future_void *skip_future = aws_future_void_new(meta_request->allocator); - if (num_parts_read_from_stream == skip_until_part_number || !auto_ranged_put->has_content_length) { - aws_future_void_set_result(skip_future); - return skip_future; - } - - AWS_PRECONDITION(num_parts_read_from_stream <= skip_until_part_number); - AWS_PRECONDITION(skip_until_part_number <= auto_ranged_put->total_num_parts_from_content_length); - - /* Store data for async job */ - struct aws_s3_skip_parts_from_stream_job *skip_job = - aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_skip_parts_from_stream_job)); - skip_job->allocator = meta_request->allocator; - skip_job->meta_request = meta_request; - skip_job->part_index = num_parts_read_from_stream; - skip_job->skip_until_part_number = skip_until_part_number; - skip_job->on_complete = aws_future_void_acquire(skip_future); - - AWS_LOGF_DEBUG( - AWS_LS_S3_META_REQUEST, - "id=%p: Skipping parts %d through %d", - (void *)meta_request, - num_parts_read_from_stream, - skip_until_part_number); - - /* Kick off async work loop */ - s_skip_parts_from_stream_loop(skip_job); - - return skip_future; -} - -/* Async work loop for reading over skipped parts. - * In a loop, try to read each part. If the read completes immediately then keep looping. - * Otherwise, register this same function as the completion callback and bail out, - * we'll resume the loop when the callback fires. - * - * It would have been simpler to always set a completion callback, - * but this risks the call stack growing very large (if there are many parts - * to skip and the reads all complete immediately). */ -static void s_skip_parts_from_stream_loop(void *user_data) { - struct aws_s3_skip_parts_from_stream_job *skip_job = user_data; - struct aws_s3_meta_request *meta_request = skip_job->meta_request; - struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; - struct aws_byte_buf *temp_body_buf = &skip_job->temp_body_buf; - - int error_code = AWS_ERROR_SUCCESS; - - for (; skip_job->part_index < skip_job->skip_until_part_number; ++skip_job->part_index) { - - /* kick off an async read if none are pending */ - if (skip_job->asyncstep_read_each_part == NULL) { - - /* Get info about part being skipped. - * (Yes it's weird that we keep a pointer to the part_info even after - * releasing the lock that protects part_list. But it's the resizable - * part_list that needs lock protection. A previously uploaded part_info is const, - * and it's on the heap, so it's safe to keep the pointer around) */ - /* BEGIN CRITICAL SECTION */ - aws_s3_meta_request_lock_synced_data(meta_request); - aws_array_list_get_at( - &auto_ranged_put->synced_data.part_list, &skip_job->part_being_skipped, skip_job->part_index); - AWS_ASSERT(skip_job->part_being_skipped != NULL); - aws_s3_meta_request_unlock_synced_data(meta_request); - /* END CRITICAL SECTION */ - uint64_t offset = 0; - - size_t request_body_size = s_compute_request_body_size(meta_request, skip_job->part_index + 1, &offset); - if (request_body_size != skip_job->part_being_skipped->size) { - error_code = AWS_ERROR_S3_RESUME_FAILED; - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, - "id=%p: Failed resuming upload, previous upload used different part size.", - (void *)meta_request); - goto on_done; - } - - if (temp_body_buf->capacity != request_body_size) { - // reinit with correct size - aws_byte_buf_clean_up(temp_body_buf); - aws_byte_buf_init(temp_body_buf, meta_request->allocator, request_body_size); - } else { - // reuse buffer - aws_byte_buf_reset(temp_body_buf, false); - } - - skip_job->asyncstep_read_each_part = - aws_s3_meta_request_read_body(skip_job->meta_request, offset, temp_body_buf); - - /* the read may or may not complete immediately */ - if (aws_future_bool_register_callback_if_not_done( - skip_job->asyncstep_read_each_part, s_skip_parts_from_stream_loop, skip_job)) { - - /* read is pending, we'll resume this loop when callback fires */ - return; - } - } - - /* read is complete, check results */ - error_code = aws_future_bool_get_error(skip_job->asyncstep_read_each_part); - skip_job->asyncstep_read_each_part = - aws_future_bool_release(skip_job->asyncstep_read_each_part); /* release and set NULL */ - - if (error_code != AWS_ERROR_SUCCESS) { - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, - "id=%p: Failed resuming upload, error reading request body %d (%s)", - (void *)meta_request, - error_code, - aws_error_str(error_code)); - goto on_done; - } - - if (temp_body_buf->len < temp_body_buf->capacity) { - AWS_LOGF_ERROR( - AWS_LS_S3_META_REQUEST, - "id=%p: Failed resuming upload, request body smaller than it was during previous upload.", - (void *)meta_request); - error_code = AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH; - goto on_done; - } - - // if previously uploaded part had a checksum, compare it to what we just skipped - if (skip_job->part_being_skipped->checksum_base64.len > 0 && - s_verify_part_matches_checksum( - meta_request->allocator, - aws_byte_cursor_from_buf(temp_body_buf), - meta_request->checksum_config.checksum_algorithm, - aws_byte_cursor_from_buf(&skip_job->part_being_skipped->checksum_base64))) { - error_code = aws_last_error_or_unknown(); - goto on_done; - } - } - -on_done: - aws_byte_buf_clean_up(&skip_job->temp_body_buf); - if (error_code == AWS_ERROR_SUCCESS) { - aws_future_void_set_result(skip_job->on_complete); - } else { - aws_future_void_set_error(skip_job->on_complete, error_code); - } - aws_future_void_release(skip_job->on_complete); - aws_mem_release(skip_job->allocator, skip_job); -} - /* Given a request, prepare it for sending based on its description. */ static struct aws_future_void *s_s3_auto_ranged_put_prepare_request(struct aws_s3_request *request) { @@ -1101,7 +905,6 @@ struct aws_future_http_message *s_s3_prepare_create_multipart_upload(struct aws_ /* Prepare an UploadPart request */ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *request) { struct aws_s3_meta_request *meta_request = request->meta_request; - struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; struct aws_allocator *allocator = request->allocator; struct aws_future_http_message *message_future = aws_future_http_message_new(allocator); @@ -1118,15 +921,14 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request * * skipped over parts that were already uploaded (in case we're resuming * from an upload that had been paused) */ - if (meta_request->request_body_parallel_stream) { - /* For parallel read stream, which is seekable, don't need to skip the part by reading from the stream. */ - s_s3_prepare_upload_part_on_skipping_done((void *)part_prep); - } else { - part_prep->asyncstep1_skip_prev_parts = s_skip_parts_from_stream( - meta_request, auto_ranged_put->prepare_data.part_index_for_skipping, request->part_number - 1); - aws_future_void_register_callback( - part_prep->asyncstep1_skip_prev_parts, s_s3_prepare_upload_part_on_skipping_done, part_prep); - } + /* Read the body */ + uint64_t offset = 0; + size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset); + aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size); + + part_prep->asyncstep_read_part = aws_s3_meta_request_read_body(meta_request, offset, &request->request_body); + aws_future_bool_register_callback( + part_prep->asyncstep_read_part, s_s3_prepare_upload_part_on_read_done, part_prep); } else { /* Not the first time preparing request (e.g. retry). * We can skip over the async steps that read the body stream */ @@ -1136,32 +938,6 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request * return message_future; } -/* Completion callback for skipping over parts that were previously uploaded. */ -static void s_s3_prepare_upload_part_on_skipping_done(void *user_data) { - struct aws_s3_prepare_upload_part_job *part_prep = user_data; - struct aws_s3_request *request = part_prep->request; - struct aws_s3_meta_request *meta_request = request->meta_request; - - if (part_prep->asyncstep1_skip_prev_parts) { - int error_code = aws_future_void_get_error(part_prep->asyncstep1_skip_prev_parts); - /* If skipping failed, the prepare-upload-part job has failed. */ - if (error_code) { - s_s3_prepare_upload_part_finish(part_prep, error_code); - return; - } - } - /* Skipping succeeded. - * Next async step: read body stream for this part into a buffer */ - uint64_t offset = 0; - - size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset); - aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size); - - part_prep->asyncstep2_read_part = aws_s3_meta_request_read_body(meta_request, offset, &request->request_body); - aws_future_bool_register_callback( - part_prep->asyncstep2_read_part, s_s3_prepare_upload_part_on_read_done, part_prep); -} - /* Completion callback for reading this part's chunk of the body stream */ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { struct aws_s3_prepare_upload_part_job *part_prep = user_data; @@ -1170,7 +946,7 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; bool has_content_length = auto_ranged_put->has_content_length != 0; - int error_code = aws_future_bool_get_error(part_prep->asyncstep2_read_part); + int error_code = aws_future_bool_get_error(part_prep->asyncstep_read_part); /* If reading failed, the prepare-upload-part job has failed */ if (error_code != AWS_ERROR_SUCCESS) { @@ -1182,12 +958,8 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { aws_error_str(error_code)); goto on_done; } - /* Reading succeeded. */ - bool is_body_stream_at_end = aws_future_bool_get_result(part_prep->asyncstep2_read_part); - request->is_noop = request->part_number > - 1 && /* allow first part to have 0 length to support empty unknown content length objects. */ - request->request_body.len == 0; + bool is_body_stream_at_end = aws_future_bool_get_result(part_prep->asyncstep_read_part); /* If Content-Length is defined, check that we read the expected amount */ if (has_content_length && (request->request_body.len < request->request_body.capacity)) { @@ -1198,38 +970,67 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { (void *)meta_request); goto on_done; } + request->is_noop = request->part_number > + 1 && /* allow first part to have 0 length to support empty unknown content length objects. */ + request->request_body.len == 0; /* BEGIN CRITICAL SECTION */ - { - aws_s3_meta_request_lock_synced_data(meta_request); - - --auto_ranged_put->synced_data.num_parts_pending_read; - - auto_ranged_put->synced_data.is_body_stream_at_end = is_body_stream_at_end; - - if (!request->is_noop) { - auto_ranged_put->prepare_data.part_index_for_skipping = request->part_number; - - /* The part can finish out of order. Resize array-list to be long enough to hold this part, - * filling any intermediate slots with NULL. */ + aws_s3_meta_request_lock_synced_data(meta_request); - aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number); - while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) { - struct aws_s3_mpu_part_info *null_part = NULL; - aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part); - } + --auto_ranged_put->synced_data.num_parts_pending_read; - /* Add part to array-list */ - struct aws_s3_mpu_part_info *part = - aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info)); - part->size = request->request_body.len; - aws_array_list_set_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1); + auto_ranged_put->synced_data.is_body_stream_at_end = is_body_stream_at_end; + struct aws_s3_mpu_part_info *previously_uploaded_info = NULL; + if (request->was_previously_uploaded) { + aws_array_list_get_at( + &auto_ranged_put->synced_data.part_list, &previously_uploaded_info, request->part_number - 1); + AWS_ASSERT(previously_uploaded_info != NULL && previously_uploaded_info->was_previously_uploaded == true); + /* Already uploaded, set the noop to be true. */ + request->is_noop = true; + } + if (!request->is_noop) { + /* The part can finish out of order. Resize array-list to be long enough to hold this part, + * filling any intermediate slots with NULL. */ + aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number); + while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) { + struct aws_s3_mpu_part_info *null_part = NULL; + aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part); } - - aws_s3_meta_request_unlock_synced_data(meta_request); + /* Add part to array-list */ + struct aws_s3_mpu_part_info *part = + aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info)); + part->size = request->request_body.len; + aws_array_list_set_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1); } + aws_s3_meta_request_unlock_synced_data(meta_request); /* END CRITICAL SECTION */ + if (previously_uploaded_info) { + /* Part was previously uploaded, check that it matches what we just read. + * (Yes it's weird that we keep a pointer to the part_info even after + * releasing the lock that protects part_list. But it's the resizable + * part_list that needs lock protection. A previously uploaded part_info is const, + * and it's on the heap, so it's safe to keep the pointer around) */ + if (request->request_body.len != previously_uploaded_info->size) { + error_code = AWS_ERROR_S3_RESUME_FAILED; + AWS_LOGF_ERROR( + AWS_LS_S3_META_REQUEST, + "id=%p: Failed resuming upload, previous upload used different part size.", + (void *)meta_request); + goto on_done; + } + /* if previously uploaded part had a checksum, compare it to what we just skipped */ + if (previously_uploaded_info->checksum_base64.len > 0 && + s_verify_part_matches_checksum( + meta_request->allocator, + aws_byte_cursor_from_buf(&request->request_body), + meta_request->checksum_config.checksum_algorithm, + aws_byte_cursor_from_buf(&previously_uploaded_info->checksum_base64))) { + error_code = aws_last_error_or_unknown(); + goto on_done; + } + } + /* We throttle the number of parts that can be "pending read" * (e.g. only 1 at a time if reading from async-stream). * Now that read is complete, poke the client to see if it can give us more work. @@ -1305,63 +1106,11 @@ static void s_s3_prepare_upload_part_finish(struct aws_s3_prepare_upload_part_jo on_done: AWS_FATAL_ASSERT(aws_future_http_message_is_done(part_prep->on_complete)); - aws_future_void_release(part_prep->asyncstep1_skip_prev_parts); - aws_future_bool_release(part_prep->asyncstep2_read_part); + aws_future_bool_release(part_prep->asyncstep_read_part); aws_future_http_message_release(part_prep->on_complete); aws_mem_release(part_prep->allocator, part_prep); } -/* Prepare a CompleteMultipartUpload request. */ -static struct aws_future_http_message *s_s3_prepare_complete_multipart_upload(struct aws_s3_request *request) { - struct aws_s3_meta_request *meta_request = request->meta_request; - struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; - struct aws_allocator *allocator = request->allocator; - - struct aws_future_http_message *message_future = aws_future_http_message_new(allocator); - - struct aws_s3_prepare_complete_multipart_upload_job *complete_mpu_prep = - aws_mem_calloc(allocator, 1, sizeof(struct aws_s3_prepare_complete_multipart_upload_job)); - complete_mpu_prep->allocator = allocator; - complete_mpu_prep->request = request; - complete_mpu_prep->on_complete = aws_future_http_message_acquire(message_future); - - if (request->num_times_prepared == 0) { - - /* Corner case of last part being previously uploaded during resume. - * Read it from input stream and potentially verify checksum */ - complete_mpu_prep->asyncstep_skip_remaining_parts = s_skip_parts_from_stream( - meta_request, - auto_ranged_put->prepare_data.part_index_for_skipping, - auto_ranged_put->total_num_parts_from_content_length); - - aws_future_void_register_callback( - complete_mpu_prep->asyncstep_skip_remaining_parts, - s_s3_prepare_complete_multipart_upload_on_skipping_done, - complete_mpu_prep); - } else { - /* Not the first time preparing request (e.g. retry). - * We can skip over the async steps. */ - s_s3_prepare_complete_multipart_upload_finish(complete_mpu_prep, AWS_ERROR_SUCCESS); - } - - return message_future; -} - -/* Completion callback for skipping over parts that were previously uploaded. */ -static void s_s3_prepare_complete_multipart_upload_on_skipping_done(void *user_data) { - - struct aws_s3_prepare_complete_multipart_upload_job *complete_mpu_prep = user_data; - - int error_code = aws_future_void_get_error(complete_mpu_prep->asyncstep_skip_remaining_parts); - if (error_code != AWS_ERROR_SUCCESS) { - s_s3_prepare_complete_multipart_upload_finish(complete_mpu_prep, error_code); - return; - } - - /* Async steps complete, finish up job */ - s_s3_prepare_complete_multipart_upload_finish(complete_mpu_prep, AWS_ERROR_SUCCESS); -} - /* Allow user to review what we've uploaded, and fail the meta-request if they don't approve. */ static int s_s3_review_multipart_upload(struct aws_s3_request *request) { struct aws_s3_meta_request *meta_request = request->meta_request; @@ -1421,27 +1170,20 @@ static int s_s3_review_multipart_upload(struct aws_s3_request *request) { } } -/* Finish async preparation of a CompleteMultipartUpload request */ -static void s_s3_prepare_complete_multipart_upload_finish( - struct aws_s3_prepare_complete_multipart_upload_job *complete_mpu_prep, - int error_code) { - - struct aws_s3_request *request = complete_mpu_prep->request; +/* Prepare a CompleteMultipartUpload request. */ +static struct aws_future_http_message *s_s3_prepare_complete_multipart_upload(struct aws_s3_request *request) { struct aws_s3_meta_request *meta_request = request->meta_request; struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl; + struct aws_allocator *allocator = request->allocator; - if (error_code != AWS_ERROR_SUCCESS) { - aws_future_http_message_set_error(complete_mpu_prep->on_complete, error_code); - goto on_done; - } + struct aws_future_http_message *message_future = aws_future_http_message_new(allocator); AWS_FATAL_ASSERT(auto_ranged_put->upload_id); if (request->num_times_prepared == 0) { /* Invoke upload_review_callback, and fail meta-request if user raises an error */ if (s_s3_review_multipart_upload(request) != AWS_OP_SUCCESS) { - error_code = aws_last_error(); - aws_future_http_message_set_error(complete_mpu_prep->on_complete, error_code); + aws_future_http_message_set_error(message_future, aws_last_error()); goto on_done; } @@ -1471,18 +1213,16 @@ static void s_s3_prepare_complete_multipart_upload_finish( /* END CRITICAL SECTION */ if (message == NULL) { - aws_future_http_message_set_error(complete_mpu_prep->on_complete, aws_last_error()); + aws_future_http_message_set_error(message_future, aws_last_error()); goto on_done; } /* Success! */ - aws_future_http_message_set_result_by_move(complete_mpu_prep->on_complete, &message); + aws_future_http_message_set_result_by_move(message_future, &message); on_done: - AWS_FATAL_ASSERT(aws_future_http_message_is_done(complete_mpu_prep->on_complete)); - aws_future_void_release(complete_mpu_prep->asyncstep_skip_remaining_parts); - aws_future_http_message_release(complete_mpu_prep->on_complete); - aws_mem_release(complete_mpu_prep->allocator, complete_mpu_prep); + AWS_FATAL_ASSERT(aws_future_http_message_is_done(message_future)); + return message_future; } /* Prepare an AbortMultipartUpload request. @@ -1598,6 +1338,7 @@ static void s_s3_auto_ranged_put_request_finished( error_code = AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED; } else if (!has_more_results) { uint64_t bytes_previously_uploaded = 0; + int parts_previously_uploaded = 0; for (size_t part_index = 0; part_index < aws_array_list_length(&auto_ranged_put->synced_data.part_list); @@ -1606,9 +1347,7 @@ static void s_s3_auto_ranged_put_request_finished( aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); if (part != NULL) { /* Update the number of parts sent/completed previously */ - ++auto_ranged_put->synced_data.num_parts_started; - ++auto_ranged_put->synced_data.num_parts_completed; - + ++parts_previously_uploaded; bytes_previously_uploaded += part->size; } } @@ -1617,7 +1356,7 @@ static void s_s3_auto_ranged_put_request_finished( AWS_LS_S3_META_REQUEST, "id=%p: Resuming PutObject. %d out of %d parts have completed during previous request.", (void *)meta_request, - auto_ranged_put->synced_data.num_parts_completed, + parts_previously_uploaded, auto_ranged_put->total_num_parts_from_content_length); /* Deliver an initial progress_callback to report all previously uploaded parts. */ diff --git a/tests/s3_data_plane_tests.c b/tests/s3_data_plane_tests.c index 89a155030..9a64fdf6a 100644 --- a/tests/s3_data_plane_tests.c +++ b/tests/s3_data_plane_tests.c @@ -5985,7 +5985,10 @@ static void s_meta_request_finished_request_patched_for_pause_resume_tests( if ((error_code == AWS_ERROR_SUCCESS) && (meta_request->type == AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) && (request->request_tag == AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART)) { - aws_atomic_fetch_add(&test_data->total_bytes_uploaded, request->request_body.len); + if (!request->is_noop) { + /* If the request is noop, we are not really uploading the part */ + aws_atomic_fetch_add(&test_data->total_bytes_uploaded, request->request_body.len); + } size_t total_bytes_uploaded = aws_atomic_load_int(&test_data->total_bytes_uploaded); uint64_t offset_to_pause = aws_atomic_load_int(&test_data->request_pause_offset);