Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK committed Oct 12, 2023
1 parent 9bcdfda commit e87fdd4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 57 deletions.
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ struct aws_s3_mpu_part_info {
uint64_t size;
struct aws_string *etag;
struct aws_byte_buf checksum_base64;
bool uploaded_before_resume;
bool was_previously_uploaded;
};

AWS_EXTERN_C_BEGIN
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ struct aws_s3_request {
uint32_t is_noop : 1;

/* When true, this request has already been uploaded. we still prepare the request to check the durability. */
uint32_t uploaded_before_resume : 1;
uint32_t was_previously_uploaded : 1;
};

AWS_EXTERN_C_BEGIN
Expand Down
98 changes: 43 additions & 55 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ struct aws_s3_prepare_upload_part_job {
struct aws_s3_prepare_complete_multipart_upload_job {
struct aws_allocator *allocator;
struct aws_s3_request *request;
/* async step: skip remaining parts from input stream that were previously uploaded */
struct aws_future_void *asyncstep_skip_remaining_parts;
/* future to set when this job completes */
struct aws_future_http_message *on_complete;
};
Expand Down Expand Up @@ -119,7 +117,7 @@ static int s_process_part_info_synced(const struct aws_s3_part_info *info, void
struct aws_s3_mpu_part_info *part = aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info));
part->size = info->size;
part->etag = aws_strip_quotes(meta_request->allocator, info->e_tag);
part->uploaded_before_resume = true;
part->was_previously_uploaded = true;

const struct aws_byte_cursor *checksum_cur = NULL;
switch (auto_ranged_put->base.checksum_config.checksum_algorithm) {
Expand Down Expand Up @@ -509,25 +507,20 @@ static bool s_s3_auto_ranged_put_update(
}

bool should_create_next_part_request = false;
bool request_uploaded = false;
bool request_previously_uploaded = false;
if (auto_ranged_put->has_content_length && (auto_ranged_put->synced_data.num_parts_started <
auto_ranged_put->total_num_parts_from_content_length)) {

/* Check if next part was previously uploaded (due to resume) */
size_t part_index = auto_ranged_put->threaded_update_data.next_part_number - 1;

struct aws_s3_mpu_part_info *part = NULL;
aws_array_list_get_at(&auto_ranged_put->synced_data.part_list, &part, part_index);
if (part != NULL) {
AWS_ASSERT(part->uploaded_before_resume == true);
AWS_ASSERT(part->was_previously_uploaded == true);
/* This part has been uploaded. */
request_uploaded = true;
request_previously_uploaded = true;
}

// Something went really wrong. we still have parts to send, but have etags for all parts
AWS_FATAL_ASSERT(
auto_ranged_put->threaded_update_data.next_part_number <=
auto_ranged_put->total_num_parts_from_content_length);

if (s_should_skip_scheduling_more_parts_based_on_flags(auto_ranged_put, flags)) {
goto has_work_remaining;
}
Expand All @@ -554,8 +547,9 @@ static bool s_s3_auto_ranged_put_update(

request->part_number = auto_ranged_put->threaded_update_data.next_part_number;

/* If request already uploaded, the request is noop, we keep working on it to prepare. */
request->uploaded_before_resume = request_uploaded;
/* If request was previously uploaded, we prepare it to ensure checksums still match,
* but ultimately it gets marked no-op and we don't send it */
request->was_previously_uploaded = request_previously_uploaded;

++auto_ranged_put->threaded_update_data.next_part_number;
++auto_ranged_put->synced_data.num_parts_started;
Expand Down Expand Up @@ -929,7 +923,6 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *

/* Read the body */
uint64_t offset = 0;

size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size);

Expand Down Expand Up @@ -977,51 +970,48 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
(void *)meta_request);
goto on_done;
}
struct aws_s3_mpu_part_info *uploaded_part_info = NULL;

request->is_noop = request->part_number >
1 && /* allow first part to have 0 length to support empty unknown content length objects. */
request->request_body.len == 0;

/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);

--auto_ranged_put->synced_data.num_parts_pending_read;

auto_ranged_put->synced_data.is_body_stream_at_end = is_body_stream_at_end;
if (request->uploaded_before_resume) {
aws_array_list_get_at(
&auto_ranged_put->synced_data.part_list, &uploaded_part_info, request->part_number - 1);
AWS_ASSERT(uploaded_part_info != NULL && uploaded_part_info->uploaded_before_resume == true);
/* Already uploaded, set the noop to be true. */
request->is_noop = true;
}

if (!request->is_noop) {
aws_s3_meta_request_lock_synced_data(meta_request);

/* The part can finish out of order. Resize array-list to be long enough to hold this part,
* filling any intermediate slots with NULL. */
--auto_ranged_put->synced_data.num_parts_pending_read;

aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number);
while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) {
struct aws_s3_mpu_part_info *null_part = NULL;
aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part);
}

/* Add part to array-list */
struct aws_s3_mpu_part_info *part =
aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info));
part->size = request->request_body.len;
aws_array_list_set_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1);
auto_ranged_put->synced_data.is_body_stream_at_end = is_body_stream_at_end;
struct aws_s3_mpu_part_info *previously_uploaded_info = NULL;
if (request->was_previously_uploaded) {
aws_array_list_get_at(
&auto_ranged_put->synced_data.part_list, &previously_uploaded_info, request->part_number - 1);
AWS_ASSERT(previously_uploaded_info != NULL && previously_uploaded_info->was_previously_uploaded == true);
/* Already uploaded, set the noop to be true. */
request->is_noop = true;
}
if (!request->is_noop) {
/* The part can finish out of order. Resize array-list to be long enough to hold this part,
* filling any intermediate slots with NULL. */
aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number);
while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) {
struct aws_s3_mpu_part_info *null_part = NULL;
aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part);
}

aws_s3_meta_request_unlock_synced_data(meta_request);
/* Add part to array-list */
struct aws_s3_mpu_part_info *part =
aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info));
part->size = request->request_body.len;
aws_array_list_set_at(&auto_ranged_put->synced_data.part_list, &part, request->part_number - 1);
}
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
if (uploaded_part_info) {
/* Check for the durability */
if (request->request_body.capacity != uploaded_part_info->size) {

if (previously_uploaded_info) {
/* Part was previously uploaded, check that it matches what we just read.
* (Yes it's weird that we keep a pointer to the part_info even after
* releasing the lock that protects part_list. But it's the resizable
* part_list that needs lock protection. A previously uploaded part_info is const,
* and it's on the heap, so it's safe to keep the pointer around) */
if (request->request_body.len != previously_uploaded_info->size) {
error_code = AWS_ERROR_S3_RESUME_FAILED;
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
Expand All @@ -1030,12 +1020,12 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
goto on_done;
}
/* if previously uploaded part had a checksum, compare it to what we just skipped */
if (uploaded_part_info->checksum_base64.len > 0 &&
if (previously_uploaded_info->checksum_base64.len > 0 &&
s_verify_part_matches_checksum(
meta_request->allocator,
aws_byte_cursor_from_buf(&request->request_body),
meta_request->checksum_config.checksum_algorithm,
aws_byte_cursor_from_buf(&uploaded_part_info->checksum_base64))) {
aws_byte_cursor_from_buf(&previously_uploaded_info->checksum_base64))) {
error_code = aws_last_error_or_unknown();
goto on_done;
}
Expand Down Expand Up @@ -1189,13 +1179,11 @@ static struct aws_future_http_message *s_s3_prepare_complete_multipart_upload(st
struct aws_future_http_message *message_future = aws_future_http_message_new(allocator);

AWS_FATAL_ASSERT(auto_ranged_put->upload_id);
int error_code = AWS_OP_SUCCESS;

if (request->num_times_prepared == 0) {
/* Invoke upload_review_callback, and fail meta-request if user raises an error */
if (s_s3_review_multipart_upload(request) != AWS_OP_SUCCESS) {
error_code = aws_last_error();
aws_future_http_message_set_error(message_future, error_code);
aws_future_http_message_set_error(message_future, aws_last_error());
goto on_done;
}

Expand Down

0 comments on commit e87fdd4

Please sign in to comment.