diff --git a/include/aws/s3/private/s3_meta_request_impl.h b/include/aws/s3/private/s3_meta_request_impl.h index 839d150a7..49c7af0b9 100644 --- a/include/aws/s3/private/s3_meta_request_impl.h +++ b/include/aws/s3/private/s3_meta_request_impl.h @@ -262,7 +262,7 @@ struct aws_s3_mpu_part_info { uint64_t size; struct aws_string *etag; struct aws_byte_buf checksum_base64; - bool uploaded_before_resume; + bool was_previously_uploaded; }; AWS_EXTERN_C_BEGIN diff --git a/include/aws/s3/private/s3_request.h b/include/aws/s3/private/s3_request.h index ffc279dcc..9fb66cc02 100644 --- a/include/aws/s3/private/s3_request.h +++ b/include/aws/s3/private/s3_request.h @@ -204,7 +204,7 @@ struct aws_s3_request { uint32_t is_noop : 1; /* When true, this request has already been uploaded. we still prepare the request to check the durability. */ - uint32_t uploaded_before_resume : 1; + uint32_t was_previously_uploaded : 1; }; AWS_EXTERN_C_BEGIN diff --git a/source/s3_auto_ranged_put.c b/source/s3_auto_ranged_put.c index 8873656f7..974d89adf 100644 --- a/source/s3_auto_ranged_put.c +++ b/source/s3_auto_ranged_put.c @@ -67,8 +67,6 @@ struct aws_s3_prepare_upload_part_job { struct aws_s3_prepare_complete_multipart_upload_job { struct aws_allocator *allocator; struct aws_s3_request *request; - /* async step: skip remaining parts from input stream that were previously uploaded */ - struct aws_future_void *asyncstep_skip_remaining_parts; /* future to set when this job completes */ struct aws_future_http_message *on_complete; }; @@ -119,7 +117,7 @@ static int s_process_part_info_synced(const struct aws_s3_part_info *info, void struct aws_s3_mpu_part_info *part = aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info)); part->size = info->size; part->etag = aws_strip_quotes(meta_request->allocator, info->e_tag); - part->uploaded_before_resume = true; + part->was_previously_uploaded = true; const struct aws_byte_cursor *checksum_cur = NULL; switch (auto_ranged_put->base.checksum_config.checksum_algorithm) { @@ -509,25 +507,20 @@ static bool s_s3_auto_ranged_put_update( } bool should_create_next_part_request = false; - bool request_uploaded = false; + bool request_previously_uploaded = false; if (auto_ranged_put->has_content_length && (auto_ranged_put->synced_data.num_parts_started < auto_ranged_put->total_num_parts_from_content_length)) { - + /* Check if next part was previously uploaded (due to resume) */ size_t part_index = auto_ranged_put->threaded_update_data.next_part_number - 1; struct aws_s3_mpu_part_info *part = NULL; aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index); if (part != NULL) { - AWS_ASSERT(part->uploaded_before_resume == true); + AWS_ASSERT(part->was_previously_uploaded == true); /* This part has been uploaded. */ - request_uploaded = true; + request_previously_uploaded = true; } - // Something went really wrong. we still have parts to send, but have etags for all parts - AWS_FATAL_ASSERT( - auto_ranged_put->threaded_update_data.next_part_number <= - auto_ranged_put->total_num_parts_from_content_length); - if (s_should_skip_scheduling_more_parts_based_on_flags(auto_ranged_put, flags)) { goto has_work_remaining; } @@ -554,8 +547,9 @@ static bool s_s3_auto_ranged_put_update( request->part_number = auto_ranged_put->threaded_update_data.next_part_number; - /* If request already uploaded, the request is noop, we keep working on it to prepare. */ - request->uploaded_before_resume = request_uploaded; + /* If request was previously uploaded, we prepare it to ensure checksums still match, + * but ultimately it gets marked no-op and we don't send it */ + request->was_previously_uploaded = request_previously_uploaded; ++auto_ranged_put->threaded_update_data.next_part_number; ++auto_ranged_put->synced_data.num_parts_started; @@ -929,7 +923,6 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request * /* Read the body */ uint64_t offset = 0; - size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset); aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size); @@ -977,51 +970,48 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { (void *)meta_request); goto on_done; } - struct aws_s3_mpu_part_info *uploaded_part_info = NULL; - request->is_noop = request->part_number > 1 && /* allow first part to have 0 length to support empty unknown content length objects. */ request->request_body.len == 0; /* BEGIN CRITICAL SECTION */ - { - aws_s3_meta_request_lock_synced_data(meta_request); - - --auto_ranged_put->synced_data.num_parts_pending_read; - - auto_ranged_put->synced_data.is_body_stream_at_end = is_body_stream_at_end; - if (request->uploaded_before_resume) { - aws_array_list_get_at( - &auto_ranged_put->synced_data.part_list, &uploaded_part_info, request->part_number - 1); - AWS_ASSERT(uploaded_part_info != NULL && uploaded_part_info->uploaded_before_resume == true); - /* Already uploaded, set the noop to be true. */ - request->is_noop = true; - } - - if (!request->is_noop) { + aws_s3_meta_request_lock_synced_data(meta_request); - /* The part can finish out of order. Resize array-list to be long enough to hold this part, - * filling any intermediate slots with NULL. */ + --auto_ranged_put->synced_data.num_parts_pending_read; - aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number); - while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) { - struct aws_s3_mpu_part_info *null_part = NULL; - aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part); - } - - /* Add part to array-list */ - struct aws_s3_mpu_part_info *part = - aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info)); - part->size = request->request_body.len; - aws_array_list_set_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1); + auto_ranged_put->synced_data.is_body_stream_at_end = is_body_stream_at_end; + struct aws_s3_mpu_part_info *previously_uploaded_info = NULL; + if (request->was_previously_uploaded) { + aws_array_list_get_at( + &auto_ranged_put->synced_data.part_list, &previously_uploaded_info, request->part_number - 1); + AWS_ASSERT(previously_uploaded_info != NULL && previously_uploaded_info->was_previously_uploaded == true); + /* Already uploaded, set the noop to be true. */ + request->is_noop = true; + } + if (!request->is_noop) { + /* The part can finish out of order. Resize array-list to be long enough to hold this part, + * filling any intermediate slots with NULL. */ + aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number); + while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) { + struct aws_s3_mpu_part_info *null_part = NULL; + aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part); } - - aws_s3_meta_request_unlock_synced_data(meta_request); + /* Add part to array-list */ + struct aws_s3_mpu_part_info *part = + aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info)); + part->size = request->request_body.len; + aws_array_list_set_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1); } + aws_s3_meta_request_unlock_synced_data(meta_request); /* END CRITICAL SECTION */ - if (uploaded_part_info) { - /* Check for the durability */ - if (request->request_body.capacity != uploaded_part_info->size) { + + if (previously_uploaded_info) { + /* Part was previously uploaded, check that it matches what we just read. + * (Yes it's weird that we keep a pointer to the part_info even after + * releasing the lock that protects part_list. But it's the resizable + * part_list that needs lock protection. A previously uploaded part_info is const, + * and it's on the heap, so it's safe to keep the pointer around) */ + if (request->request_body.len != previously_uploaded_info->size) { error_code = AWS_ERROR_S3_RESUME_FAILED; AWS_LOGF_ERROR( AWS_LS_S3_META_REQUEST, @@ -1030,12 +1020,12 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) { goto on_done; } /* if previously uploaded part had a checksum, compare it to what we just skipped */ - if (uploaded_part_info->checksum_base64.len > 0 && + if (previously_uploaded_info->checksum_base64.len > 0 && s_verify_part_matches_checksum( meta_request->allocator, aws_byte_cursor_from_buf(&request->request_body), meta_request->checksum_config.checksum_algorithm, - aws_byte_cursor_from_buf(&uploaded_part_info->checksum_base64))) { + aws_byte_cursor_from_buf(&previously_uploaded_info->checksum_base64))) { error_code = aws_last_error_or_unknown(); goto on_done; } @@ -1189,13 +1179,11 @@ static struct aws_future_http_message *s_s3_prepare_complete_multipart_upload(st struct aws_future_http_message *message_future = aws_future_http_message_new(allocator); AWS_FATAL_ASSERT(auto_ranged_put->upload_id); - int error_code = AWS_OP_SUCCESS; if (request->num_times_prepared == 0) { /* Invoke upload_review_callback, and fail meta-request if user raises an error */ if (s_s3_review_multipart_upload(request) != AWS_OP_SUCCESS) { - error_code = aws_last_error(); - aws_future_http_message_set_error(message_future, error_code); + aws_future_http_message_set_error(message_future, aws_last_error()); goto on_done; }