Skip to content

Commit

Permalink
Parallel prepare - open file on needed (#355)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Graeb <[email protected]>
  • Loading branch information
TingDaoK and graebm authored Oct 16, 2023
1 parent 76c6d9f commit 0f751ea
Show file tree
Hide file tree
Showing 18 changed files with 899 additions and 138 deletions.
3 changes: 3 additions & 0 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ struct aws_s3_client_vtable {
void (*endpoint_shutdown_callback)(struct aws_s3_client *client);

void (*finish_destroy)(struct aws_s3_client *client);

struct aws_parallel_input_stream *(
*parallel_input_stream_new_from_file)(struct aws_allocator *allocator, struct aws_byte_cursor file_name);
};

/* Represents the state of the S3 client. */
Expand Down
10 changes: 8 additions & 2 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ struct aws_s3_meta_request {
/* Initial HTTP Message that this meta request is based on. */
struct aws_http_message *initial_request_message;

/* Async stream for meta request's body.
* NULL if using initial_request_message's synchronous body stream instead. */
/* The meta request's outgoing body comes from one of these:
* 1) request_body_async_stream: if set, then async stream 1 part at a time
* 2) request_body_parallel_stream: if set, then stream multiple parts in parallel
* 3) initial_request_message's body_stream: else synchronously stream parts */
struct aws_async_input_stream *request_body_async_stream;
struct aws_parallel_input_stream *request_body_parallel_stream;

/* Part size to use for uploads and downloads. Passed down by the creating client. */
const size_t part_size;
Expand Down Expand Up @@ -363,10 +366,13 @@ bool aws_s3_meta_request_are_events_out_for_delivery_synced(struct aws_s3_meta_r
* It may read from the underlying stream multiple times, if that's what it takes to fill the buffer.
* Returns a future whose result bool indicates whether end of stream was reached.
* This future may complete on any thread, and may complete synchronously.
*
* Read from offset to fill the buffer
*/
AWS_S3_API
struct aws_future_bool *aws_s3_meta_request_read_body(
struct aws_s3_meta_request *meta_request,
uint64_t offset,
struct aws_byte_buf *buffer);

bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request);
Expand Down
105 changes: 105 additions & 0 deletions include/aws/s3/private/s3_parallel_input_stream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#ifndef AWS_S3_PARALLEL_INPUT_STREAM_H
#define AWS_S3_PARALLEL_INPUT_STREAM_H

#include <aws/s3/s3.h>

#include <aws/common/ref_count.h>

AWS_PUSH_SANE_WARNING_LEVEL

struct aws_byte_buf;
struct aws_future_bool;
struct aws_input_stream;

struct aws_event_loop_group;

struct aws_parallel_input_stream {
const struct aws_parallel_input_stream_vtable *vtable;
struct aws_allocator *alloc;
struct aws_ref_count ref_count;

void *impl;
};

struct aws_parallel_input_stream_vtable {
/**
* Destroy the stream, its refcount has reached 0.
*/
void (*destroy)(struct aws_parallel_input_stream *stream);

/**
* Read into the buffer in parallel.
* The implementation needs to support this to be invoked concurrently from multiple threads
*/
struct aws_future_bool *(
*read)(struct aws_parallel_input_stream *stream, uint64_t offset, struct aws_byte_buf *dest);
};

AWS_EXTERN_C_BEGIN

/**
* Initialize aws_parallel_input_stream "base class"
*/
AWS_S3_API
void aws_parallel_input_stream_init_base(
struct aws_parallel_input_stream *stream,
struct aws_allocator *alloc,
const struct aws_parallel_input_stream_vtable *vtable,
void *impl);

/**
* Increment reference count.
* You may pass in NULL (has no effect).
* Returns whatever pointer was passed in.
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_acquire(struct aws_parallel_input_stream *stream);

/**
* Decrement reference count.
* You may pass in NULL (has no effect).
* Always returns NULL.
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_release(struct aws_parallel_input_stream *stream);

/**
* Read from the offset until fill the dest, or EOF reached.
* It's thread safe to be called from multiple threads without waiting for other read to complete
*
* @param stream The stream to read from
* @param offset The offset in the stream from beginning to start reading
* @param dest The output buffer read to
* @return a future, which will contain an error code if something went wrong,
* or a result bool indicating whether EOF has been reached.
*/
AWS_S3_API
struct aws_future_bool *aws_parallel_input_stream_read(
struct aws_parallel_input_stream *stream,
uint64_t offset,
struct aws_byte_buf *dest);

/**
* Create a new file based parallel input stream.
*
* This implementation will open a file handler when the read happens, and seek to the offset to start reading. Close
* the file handler as read finishes.
*
* @param allocator memory allocator
* @param file_name The file path to read from
* @return aws_parallel_input_stream
*/
AWS_S3_API
struct aws_parallel_input_stream *aws_parallel_input_stream_new_from_file(
struct aws_allocator *allocator,
struct aws_byte_cursor file_name);

AWS_EXTERN_C_END
AWS_POP_SANE_WARNING_LEVEL

#endif /* AWS_S3_PARALLEL_INPUT_STREAM_H */
7 changes: 0 additions & 7 deletions include/aws/s3/private/s3_request_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ struct aws_http_message *aws_s3_message_util_copy_http_message_no_body_filter_he
size_t excluded_headers_size,
bool exclude_x_amz_meta);

/* Copy message and retain all headers, but replace body with one that reads directly from a filepath. */
AWS_S3_API
struct aws_http_message *aws_s3_message_util_copy_http_message_filepath_body_all_headers(
struct aws_allocator *allocator,
struct aws_http_message *message,
struct aws_byte_cursor filepath);

/* Copy headers from one message to the other and exclude specific headers.
* exclude_x_amz_meta controls whether S3 user metadata headers (prefixed with "x-amz-meta) are excluded.*/
AWS_S3_API
Expand Down
1 change: 1 addition & 0 deletions include/aws/s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum aws_s3_errors {
AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE,
AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH,
AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED,
AWS_ERROR_S3_FILE_MODIFIED,
AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
};

Expand Down
1 change: 1 addition & 0 deletions source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE, "The metric data is not available, the requests ends before the metric happens."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, "Request body length must match Content-Length header."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED, "RequestTimeTooSkewed error received from S3."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_FILE_MODIFIED, "The file was modified during upload."),
};
/* clang-format on */

Expand Down
56 changes: 40 additions & 16 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,10 @@ static bool s_s3_auto_ranged_put_update(
* Basically returns either part size or if content is not equally divisible into parts, the size of the remaining last
* part.
*/
static size_t s_compute_request_body_size(const struct aws_s3_meta_request *meta_request, uint32_t part_number) {
static size_t s_compute_request_body_size(
const struct aws_s3_meta_request *meta_request,
uint32_t part_number,
uint64_t *offset_out) {
AWS_PRECONDITION(meta_request);

const struct aws_s3_auto_ranged_put *auto_ranged_put = meta_request->impl;
Expand All @@ -731,6 +734,8 @@ static size_t s_compute_request_body_size(const struct aws_s3_meta_request *meta
request_body_size = content_remainder;
}
}
/* The part_number starts at 1 */
*offset_out = (part_number - 1) * meta_request->part_size;

return request_body_size;
}
Expand Down Expand Up @@ -895,8 +900,9 @@ static void s_skip_parts_from_stream_loop(void *user_data) {
AWS_ASSERT(skip_job->part_being_skipped != NULL);
aws_s3_meta_request_unlock_synced_data(meta_request);
/* END CRITICAL SECTION */
uint64_t offset = 0;

size_t request_body_size = s_compute_request_body_size(meta_request, skip_job->part_index + 1);
size_t request_body_size = s_compute_request_body_size(meta_request, skip_job->part_index + 1, &offset);
if (request_body_size != skip_job->part_being_skipped->size) {
error_code = AWS_ERROR_S3_RESUME_FAILED;
AWS_LOGF_ERROR(
Expand All @@ -915,7 +921,8 @@ static void s_skip_parts_from_stream_loop(void *user_data) {
aws_byte_buf_reset(temp_body_buf, false);
}

skip_job->asyncstep_read_each_part = aws_s3_meta_request_read_body(skip_job->meta_request, temp_body_buf);
skip_job->asyncstep_read_each_part =
aws_s3_meta_request_read_body(skip_job->meta_request, offset, temp_body_buf);

/* the read may or may not complete immediately */
if (aws_future_bool_register_callback_if_not_done(
Expand Down Expand Up @@ -1090,6 +1097,7 @@ struct aws_future_http_message *s_s3_prepare_create_multipart_upload(struct aws_
}
return future;
}

/* Prepare an UploadPart request */
struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *request) {
struct aws_s3_meta_request *meta_request = request->meta_request;
Expand All @@ -1109,10 +1117,16 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *
* Next async step: read through the body stream until we've
* skipped over parts that were already uploaded (in case we're resuming
* from an upload that had been paused) */
part_prep->asyncstep1_skip_prev_parts = s_skip_parts_from_stream(
meta_request, auto_ranged_put->prepare_data.part_index_for_skipping, request->part_number - 1);
aws_future_void_register_callback(
part_prep->asyncstep1_skip_prev_parts, s_s3_prepare_upload_part_on_skipping_done, part_prep);

if (meta_request->request_body_parallel_stream) {
/* For parallel read stream, which is seekable, don't need to skip the part by reading from the stream. */
s_s3_prepare_upload_part_on_skipping_done((void *)part_prep);
} else {
part_prep->asyncstep1_skip_prev_parts = s_skip_parts_from_stream(
meta_request, auto_ranged_put->prepare_data.part_index_for_skipping, request->part_number - 1);
aws_future_void_register_callback(
part_prep->asyncstep1_skip_prev_parts, s_s3_prepare_upload_part_on_skipping_done, part_prep);
}
} else {
/* Not the first time preparing request (e.g. retry).
* We can skip over the async steps that read the body stream */
Expand All @@ -1128,21 +1142,22 @@ static void s_s3_prepare_upload_part_on_skipping_done(void *user_data) {
struct aws_s3_request *request = part_prep->request;
struct aws_s3_meta_request *meta_request = request->meta_request;

int error_code = aws_future_void_get_error(part_prep->asyncstep1_skip_prev_parts);

/* If skipping failed, the prepare-upload-part job has failed. */
if (error_code) {
s_s3_prepare_upload_part_finish(part_prep, error_code);
return;
if (part_prep->asyncstep1_skip_prev_parts) {
int error_code = aws_future_void_get_error(part_prep->asyncstep1_skip_prev_parts);
/* If skipping failed, the prepare-upload-part job has failed. */
if (error_code) {
s_s3_prepare_upload_part_finish(part_prep, error_code);
return;
}
}

/* Skipping succeeded.
* Next async step: read body stream for this part into a buffer */
uint64_t offset = 0;

size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number);
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size);

part_prep->asyncstep2_read_part = aws_s3_meta_request_read_body(meta_request, &request->request_body);
part_prep->asyncstep2_read_part = aws_s3_meta_request_read_body(meta_request, offset, &request->request_body);
aws_future_bool_register_callback(
part_prep->asyncstep2_read_part, s_s3_prepare_upload_part_on_read_done, part_prep);
}
Expand Down Expand Up @@ -1195,6 +1210,15 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
if (!request->is_noop) {
auto_ranged_put->prepare_data.part_index_for_skipping = request->part_number;

/* The part can finish out of order. Resize array-list to be long enough to hold this part,
* filling any intermediate slots with NULL. */

aws_array_list_ensure_capacity(&auto_ranged_put->synced_data.part_list, request->part_number);
while (aws_array_list_length(&auto_ranged_put->synced_data.part_list) < request->part_number) {
struct aws_s3_mpu_part_info *null_part = NULL;
aws_array_list_push_back(&auto_ranged_put->synced_data.part_list, &null_part);
}

/* Add part to array-list */
struct aws_s3_mpu_part_info *part =
aws_mem_calloc(meta_request->allocator, 1, sizeof(struct aws_s3_mpu_part_info));
Expand Down
2 changes: 2 additions & 0 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "aws/s3/private/s3_copy_object.h"
#include "aws/s3/private/s3_default_meta_request.h"
#include "aws/s3/private/s3_meta_request_impl.h"
#include "aws/s3/private/s3_parallel_input_stream.h"
#include "aws/s3/private/s3_request_messages.h"
#include "aws/s3/private/s3_util.h"

Expand Down Expand Up @@ -126,6 +127,7 @@ static struct aws_s3_client_vtable s_s3_client_default_vtable = {
.process_work = s_s3_client_process_work_default,
.endpoint_shutdown_callback = s_s3_client_endpoint_shutdown_callback,
.finish_destroy = s_s3_client_finish_destroy_default,
.parallel_input_stream_new_from_file = aws_parallel_input_stream_new_from_file,
};

void aws_s3_set_dns_ttl(size_t ttl) {
Expand Down
3 changes: 2 additions & 1 deletion source/s3_default_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ static struct aws_future_void *s_s3_default_prepare_request(struct aws_s3_reques
aws_byte_buf_init(&request->request_body, meta_request->allocator, meta_request_default->content_length);

/* Kick off the async read */
request_prep->step1_read_body = aws_s3_meta_request_read_body(meta_request, &request->request_body);
request_prep->step1_read_body =
aws_s3_meta_request_read_body(meta_request, 0 /*offset*/, &request->request_body);
aws_future_bool_register_callback(
request_prep->step1_read_body, s_s3_default_prepare_request_on_read_done, request_prep);
} else {
Expand Down
Loading

0 comments on commit 0f751ea

Please sign in to comment.